例えば、Norikra(Esper)のクエリだとこんな感じ。site_idごとに、過去3日間(72時間)のユニークユーザを集計し、結果は1分ごとに出力する、というもの。 集計対象のwindowは72時間のsliding time windowとなる。こういうwindowが長いケースは、Norikraだとプロセスを再起動したりした時にwindowの内容が全て失われるので辛い。Flinkでいい感じに実装したい。
@PublicEvolvingpublicclassMyContinuousProcessingTimeTrigger<WextendsTimeWindow>extendsTrigger<Object,W>{privatestaticfinallongserialVersionUID=1L;privatefinallonginterval;privatefinalValueStateDescriptor<Long>stateDesc=newValueStateDescriptor<>("fire-timestamp",LongSerializer.INSTANCE,0L);privateMyContinuousProcessingTimeTrigger(longinterval){this.interval=interval;}@OverridepublicTriggerResultonElement(Objectelement,longtimestamp,Wwindow,TriggerContextctx)throwsException{longcurrentTime=System.currentTimeMillis();ValueState<Long>fireState=ctx.getPartitionedState(stateDesc);longnextFireTimestamp=fireState.value();if(nextFireTimestamp==0){longstart=currentTime-(currentTime%interval);fireState.update(start+interval);ctx.registerProcessingTimeTimer((start+interval));returnTriggerResult.CONTINUE;}if(currentTime>=nextFireTimestamp){longstart=currentTime-(currentTime%interval);fireState.update(start+interval);ctx.registerProcessingTimeTimer((start+interval));returnTriggerResult.FIRE;}returnTriggerResult.CONTINUE;}@OverridepublicTriggerResultonEventTime(longtime,Wwindow,TriggerContextctx)throwsException{returnTriggerResult.CONTINUE;}@OverridepublicTriggerResultonProcessingTime(longtime,Wwindow,TriggerContextctx)throwsException{ValueState<Long>fireState=ctx.getPartitionedState(stateDesc);longnextFireTimestamp=fireState.value();// only fire if an element didn't already firelongcurrentTime=System.currentTimeMillis();if(currentTime>=nextFireTimestamp){longstart=currentTime-(currentTime%interval);fireState.update(0L);if(nextFireTimestamp>=window.getEnd()){returnTriggerResult.FIRE_AND_PURGE;}else{returnTriggerResult.FIRE;}}returnTriggerResult.CONTINUE;}@Overridepublicvoidclear(Wwindow,TriggerContextctx)throwsException{}@VisibleForTestingpubliclonggetInterval(){returninterval;}@OverridepublicStringtoString(){return"ContinuousProcessingTimeTrigger("+interval+")";}/** * Creates a trigger that continuously fires based on the given interval. * * @param interval The time interval at which to fire. * @param <W> The type of {@link Window Windows} on which this trigger can operate. */publicstatic<WextendsTimeWindow>MyContinuousProcessingTimeTrigger<W>of(Timeinterval){returnnewMyContinuousProcessingTimeTrigger<>(interval.toMilliseconds());}}