Tech Notes

Apache Flink ContinuousProcessingTimeTriggerの話

| Comments

(注: window周りのAPIを変えようという話(FLINK-3643)があるので、以下で書かれている内容は近い将来obsoleteになる可能性があります)

自分のところでは、ストリーム処理のユースケースとして、ある程度長期間のwindowでデータを保持しながら、集計結果は短い間隔で出力したい、というのがある。

例えば、Norikra(Esper)のクエリだとこんな感じ。site_idごとに、過去3日間(72時間)のユニークユーザを集計し、結果は1分ごとに出力する、というもの。 集計対象のwindowは72時間のsliding time windowとなる。こういうwindowが長いケースは、Norikraだとプロセスを再起動したりした時にwindowの内容が全て失われるので辛い。Flinkでいい感じに実装したい。

1
2
3
4
5
6
7
8
SELECT
    site_id,
    count(distinct userkey) as num_uu
FROM
    viewer_log.win:time(3 day)
GROUP BY
    site_id
OUTPUT LAST EVERY 1 min

ドキュメントを読むと、以下のようにwindowの定義とは別にいつ計算処理がトリガされるか、いつwindowの中身が削除されるか、を好きに設定できるように見える。

1
2
3
4
keyedStream
    .window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))
    .trigger(CountTrigger.of(100))
    .evictor(CountEvictor.of(10));

なので、上記のようなものは以下の様なコードで実現できるかと思った。ContinuousProcessingTimeTriggerは、指定した間隔で集計を実行するが、その際にwindowの内容の削除は行わない、というもの。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
  def main(args: Array[String]) {
//...
    val input = env.readFileStream(fileName,100,FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED)
      .map(lineToTuple(_))
      .keyBy(0)
      .timeWindow(Time.of(3, TimeUnit.DAYS))
      .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(60)))
      .fold(("",new scala.collection.mutable.HashSet[String]())){(r,i) => { (i._1, r._2 + i._2)}}
      .map{x => (new Timestamp(System.currentTimeMillis()), x._1, x._2.size)}
//...
  }
  def lineToTuple(line: String) = {
    val x = line.split("\\W+") filter {_.nonEmpty}
    (x(0),x(1))
  }

が、実際には期待どおりに動かないことが分かった。(MLで教えてもらった。こことかここ)

  • トリガが発火しないケースがある
  • timeWindowが終了してもwindow内のデータが削除されない。結果、stateのサイズが増加する一方になる。(.trigger()を呼び出した時点で、元のtimeWindowのトリガは上書きされるらしい)

って、これContinuousProcessingTimeTriggerは使えないのでは・・・。

結局、ContinuousProcessingTimeTriggerに修正を入れたカスタムTriggerを作成し、これを使用することで期待する動作をするようになった。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
@PublicEvolving
public class MyContinuousProcessingTimeTrigger<W extends TimeWindow> extends Trigger<Object, W> {
  private static final long serialVersionUID = 1L;

  private final long interval;

  private final ValueStateDescriptor<Long> stateDesc =
          new ValueStateDescriptor<>("fire-timestamp", LongSerializer.INSTANCE, 0L);


  private MyContinuousProcessingTimeTrigger(long interval) {
      this.interval = interval;
  }

  @Override
  public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
      long currentTime = System.currentTimeMillis();

      ValueState<Long> fireState = ctx.getPartitionedState(stateDesc);
      long nextFireTimestamp = fireState.value();

      if (nextFireTimestamp == 0) {
          long start = currentTime - (currentTime % interval);
          fireState.update(start + interval);

          ctx.registerProcessingTimeTimer((start + interval));
          return TriggerResult.CONTINUE;
      }
      if (currentTime >= nextFireTimestamp) {
          long start = currentTime - (currentTime % interval);
          fireState.update(start + interval);

          ctx.registerProcessingTimeTimer((start + interval));

          return TriggerResult.FIRE;
      }
      return TriggerResult.CONTINUE;
  }

  @Override
  public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
      return TriggerResult.CONTINUE;
  }

  @Override
  public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {

      ValueState<Long> fireState = ctx.getPartitionedState(stateDesc);
      long nextFireTimestamp = fireState.value();

      // only fire if an element didn't already fire
      long currentTime = System.currentTimeMillis();
      if (currentTime >= nextFireTimestamp) {
          long start = currentTime - (currentTime % interval);
                        fireState.update(0L);
                        if (nextFireTimestamp >= window.getEnd()) {
                           return TriggerResult.FIRE_AND_PURGE;
                        }
                        else {
                            return TriggerResult.FIRE;
                        }
                }
      return TriggerResult.CONTINUE;
  }

  @Override
  public void clear(W window, TriggerContext ctx) throws Exception {}

  @VisibleForTesting
  public long getInterval() {
      return interval;
  }

  @Override
  public String toString() {
      return "ContinuousProcessingTimeTrigger(" + interval + ")";
  }

  /**
  * Creates a trigger that continuously fires based on the given interval.
  *
  * @param interval The time interval at which to fire.
  * @param <W> The type of {@link Window Windows} on which this trigger can operate.
  */
  public static <W extends TimeWindow> MyContinuousProcessingTimeTrigger<W> of(Time interval) {
      return new MyContinuousProcessingTimeTrigger<>(interval.toMilliseconds());
  }
}

Comments