Tech Notes

多種多様なログをFluentd-Elasticsearch-Kibanaしたメモ

| Comments

自分のところでは、社内の様々なログをfluentdで集めているのだけど、それらをElasticsearchに入れて、Kibanaで見えるようした話を書いてみる.

Fluentd, Elasticsearch, Kibanaな組み合わせは既に多くの人が使ってるし、ブログ等の記事も沢山ある. けど、いくつか 引っ掛かった点などもあるので、それらを書き留めて置こうと思う.

要件

  • fluentdでストリームで集めてるログを、リアルタイムに近い鮮度でKibanaで見たい.
    • 見たい、というのは検索したり、集計・可視化したり、ということ
  • Kibanaでは短期(2日とか)のログが見えれば良い
  • 規模感的には、ログの種類(=fluentdのtag数)が200くらい、流量はピークで2万メッセージ/秒くらい.
    • が、実際には最も流量の多いログは除いたので、この半分〜3分の1くらいの流量
    • 10shard/index にしてあるので、2日分だと4000 shardくらいになる. これはかなり多いと思う
  • ログの種類が増えたときも、人手をかけずにKibanaで見えるようになって欲しい

構成

fluentd→Kafka→kafka-fluentd-consumer→fluentd→Elasticsearch

な感じ. Elasticsearchへの投入が詰まる、的な話はよくあるけど、Kafkaを挟んでいるのでfluentdの他の経路に影響が出ることは無いようになっている.

経験した問題と対応

fluentdからの投入タイムアウト

これに出会ったのは大分前なのだけど、流量が多い場合にバッファのサイズが大きくなると、Elasticsearchへの投入でタイムアウトが発生したりした.

1
2015-09-30 18:02:48 +0900 [warn]: temporarily failed to flush the buffer. next_retry=2015-09-30 18:01:02 +0900 error_class="Fluent::ElasticsearchOutput::ConnectionFailure" error="Could not push logs to Elasticsearch after 2 retries. read timeout reached" plugin_id="object:3fe40898b7d8"

対応として、fluent-plugin-elasticsearchの設定でrequest_timeoutを長めに設定している.

Size of the emitted data exceeds buffer_chunk_limit

kafka-fluentd-consumer からログを受けているfluentdで、以下のようなwarningが多発した.

1
2
3
4
2016-07-31 21:22:51 +0900 [warn]: Size of the emitted data exceeds buffer_chunk_limit.
2016-07-31 21:22:51 +0900 [warn]: This may occur problems in the output plugins ``at this server.``
2016-07-31 21:22:51 +0900 [warn]: To avoid problems, set a smaller number to the buffer_chunk_limit
2016-07-31 21:22:51 +0900 [warn]: in the forward output ``at the log forwarding server.``

fluentdがin_forwardで受け取ったchunkのサイズが、受け側fluentdでのbuffer_chunk_limit(デフォルト8MB)より大きいよ、という メッセージ.

kafka-fluentd-consumerが大きいchunkで送信していたと思われる. 当時はこれを制御する術は無かったのだけど、issueをあげたら 対応してもらえた. このためにkafka-fluentd-consumerが内部で使っているFluencyにも、改修を入れてくれた様子. 有り難い.

https://github.com/treasure-data/kafka-fluentd-consumer/issues/15

こんな感じで、Fluencyのバッファ周りのパラメータを指定できるようになったので、fluentd.client.buffer.chunk.retention.bytesがfluentdのbuffer_chunk_limitより小さくなるようにした. あとは、Elasticsearchに投入するfluentdプロセスの数を多めにして、一つ一つの負荷が大きくならないようにしている.

1
2
3
fluentd.client.buffer.chunk.initial.bytes = 1000000
fluentd.client.buffer.chunk.retention.bytes = 8000000
fluentd.client.buffer.max.bytes = 512000000

実際には、この設定を入れる前の段階で(なぜか)warningが消えていたので、明確にこれの効果があったのかは不明. だけど、その後ログの流量を大分増やしたりしても 出ていないので、多分効果はあったのだろう.

日付が変わったタイミングでElasticsearchが固まる問題

時系列データなので、indexはsomelog-YYYY.MM.DDのような形で、日毎に分けるようにしている. 一部のログだけを入れていた時は特に問題は無かったのだけど、 全ての(大体200種類の)ログを入れるようにした所、日付が変わった際にElasticsearchへのindexingが固まる、という事象に出会った.

Elasticsearch_indexing_rate

Elasticsearchのログを見ると、以下のようなログが大量に発生している.

1
2
3
4
5
ProcessClusterEventTimeoutException[failed to process cluster event (put-mapping [fluentd]) within 30s]
at org.elasticsearch.cluster.service.InternalClusterService$2$1.run(InternalClusterService.java:349)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

これについては、Elasticsearchのフォーラムで質問をしてみた. https://discuss.elastic.co/t/timeout-exception-with-many-time-based-indices-after-00-00/63340/1

mapping等の情報はcluster stateと呼ばれるメタデータで管理されていて、これの更新はシングルスレッドになっている. dynamic mappingを使っている場合、 新規indexの作成時に新たなmappingの作成も発生し、indexの数が多い場合は、これが大量に発生するために固まったような状態になるのだろう、ということ.

対策として、ここで教えてもらったように、翌日分のindexを事前に作成しておくようにした.

フィールド名にドットが含まれている場合の問題

色々なログが入って来た結果、フィールド名にドットが含まれているものがあり、以下のようなエラーが発生

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
[2016-11-10 11:30:06,915][DEBUG][action.admin.indices.mapping.put] [ESHOST1] failed to put mappings on indices [[some.log-2016.11.10]], type [fluentd]
MapperParsingException[Field name [somefield.channel] cannot contain '.']
        at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseProperties(ObjectMapper.java:277)
        at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseObjectOrDocumentTypeProperties(ObjectMapper.java:222)
        at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parse(ObjectMapper.java:197)
        at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseProperties(ObjectMapper.java:309)
        at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseObjectOrDocumentTypeProperties(ObjectMapper.java:222)
        at org.elasticsearch.index.mapper.object.RootObjectMapper$TypeParser.parse(RootObjectMapper.java:139)
        at org.elasticsearch.index.mapper.DocumentMapperParser.parse(DocumentMapperParser.java:118)
        at org.elasticsearch.index.mapper.DocumentMapperParser.parse(DocumentMapperParser.java:99)
        at org.elasticsearch.index.mapper.MapperService.parse(MapperService.java:549)
        at org.elasticsearch.cluster.metadata.MetaDataMappingService$PutMappingExecutor.applyRequest(MetaDataMappingService.java:257)
        at org.elasticsearch.cluster.metadata.MetaDataMappingService$PutMappingExecutor.execute(MetaDataMappingService.java:230)
        at org.elasticsearch.cluster.service.InternalClusterService.runTasksForExecutor(InternalClusterService.java:468)
        at org.elasticsearch.cluster.service.InternalClusterService$UpdateTask.run(InternalClusterService.java:772)
        at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:231)
        at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:194)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

ドットを含むフィールド名の扱いはバージョンによって違っていて、1.xでは可能だったが、2.xでは不可になった. https://www.elastic.co/guide/en/elasticsearch/reference/2.0/breaking_20_mapping_changes.html#_field_names_may_not_contain_dots

その後、2.4以降では起動時のオプションに-Dmapper.allow_dots_in_name=trueを付けると、ドットを含むフィールド名も入るようになった. https://www.elastic.co/guide/en/elasticsearch/reference/2.4/dots-in-names.html

そして、5.0以降では再びドットを含むフィールドがOKになった. 今使っているElasticsearchは2.4.0なので、オプションで解決することもできたが、 開発者がドットを含まない形にログの形式を変更してくれることになった.

Kibanaでのindex pattern作成が面倒

kafka-fluentd-consumerは、Kafkaから読み込むtopicを正規表現のパターンで指定することができて、fluentd tag=topicが増えた場合は自動的に追随してくれる. なので、ログの種類が増えてもElasticsearchまでは自動的に格納される.

が、Kibanaで見るためにはindex patternの設定が必要になる. これを毎回やるのは面倒なので、どうにかしたい.

同じような要望はあって、この操作をAPIでできるようにして欲しい、というissueが存在している. https://github.com/elastic/kibana/issues/5199

結構長く存在するissueだが、対応されてKibana5ではできるようになるらしい.

が、Kibana5はつい先日GAが出たばかり. 使っているのはまだKibana4だ. Kibana4では、index patternを始めKibana上で作ったオブジェクトの定義は、.kibanaというindexに格納されているので、 これを自分で作る方針にする.

.kibanaに格納されているindex patternの定義は、例えばこんな感じ.

1
2
3
4
5
6
7
8
9
10
11
12
  {
      "_index": ".kibana",
      "_type": "index-pattern",
      "_id": "log.hiveserver2-*",
      "_version": 2,
      "_score": 1,
      "_source": {
          "title": "log.hiveserver2-*",
          "timeFieldName": "@timestamp",
          "fields": "[{"name":"_index","type":"string","count":0,"scripted":false,"indexed":false,"analyzed":false,"doc_values":false},{"name":"_source","type":"_source","count":0,"scripted":false,"indexed":false,"analyzed":false,"doc_values":false},{"name":"tag_key","type":"string","count":0,"scripted":false,"indexed":true,"analyzed":false,"doc_values":true},{"name":"message","type":"string","count":0,"scripted":false,"indexed":true,"analyzed":false,"doc_values":true},{"name":"hostname","type":"string","count":0,"scripted":false,"indexed":true,"analyzed":false,"doc_values":true},{"name":"@timestamp","type":"date","count":0,"scripted":false,"indexed":true,"analyzed":false,"doc_values":true},{"name":"_id","type":"string","count":0,"scripted":false,"indexed":false,"analyzed":false,"doc_values":false},{"name":"_type","type":"string","count":0,"scripted":false,"indexed":false,"analyzed":false,"doc_values":false},{"name":"_score","type":"number","count":0,"scripted":false,"indexed":false,"analyzed":false,"doc_values":false}]"
      }
  }

fieldsの中身がちょっと面倒. 基本的には、既存のindexのmappingを取得し、Kibanaのソースコードを参考にしながら定義を作る.

https://github.com/elastic/kibana/blob/4.6/src/ui/public/index_patterns/_map_field.js https://github.com/elastic/kibana/blob/4.6/src/ui/public/index_patterns/_cast_mapping_type.js

例えば、indexedフィールドは、mappingの中のindexフィールドが存在しない、またはnoのときのみfalseとして、その他の場合はtrueにする. とか、ElasticsaerchとKibanaでは型が違うので、long→numberみたいに変換する、とか.

これも、日次のバッチで回すようにした

まとめ

  • こんな感じで様々なログを、Elasticsearch+Kibanaで見ることができるようになった.
  • fluentd→Elasticsearchについては、以下の部分がキャパシティ的に問題になりやすい印象
    • fluentd: 流量が多いと普通にCPUが上がって詰まる. 対策として、プロセスを増やして負荷分散する
    • Elasticsearch: 流量よりは、index数(shard数)が増えることに対して注意がいる. 流量はスケールアウトで対応できるが、cluster state管理の部分はスケールしない. 今回は、日付が切り替わった際の翌日分index作成で問題がおきたので、事前にindexを作成することで対応した. できるだけ、index数は多くなりすぎないように設計した方が良いと思う.

現在稼働しているFlinkクラスタについて

| Comments

先日の発表で、Apache Flinkを導入するに至った経緯を話したのだけど、具体的な構成とかには触れられなかったので書いておく。

クラスタの構成について

今運用してるFlinkクラスタは2つ。サービスで使うためのデータを生成しているものと、社内のレポーティングやモニタリングで使っているもの。前者の方は安定性重視、後者は割とカジュアルにジョブを追加したり、構成を弄ったりできるもの、という感じになっている.

Flinkとしては、クラスタのデプロイメント方式として、独立したdaemonとして動かす方法と、YARNの上で動かす方法があるのだけど、前者の方法にしている. その方が運用上もわかりやすいし、レイヤが少ない分トラブルも少ないだろう、というのが理由.

どちらも物理サーバで、TaskManagerサーバは前者が3台、後者が10台になっている. Flinkのバージョンはそれぞれ1.0.3と1.1.1. JobManagerはもちろんHAで、Flinkが使うHDFSやZookeeperは既存Hadoopクラスタを共用している.

周辺の構成

Flinkへのインプットはfluentdで集めたログをKafkaに投入したもの。もともとログはがっつりfluentdで集めてたので、Kafkaを導入して、そちらにも飛ばすようにした。ちなみに、fluentd→kafkaの部分について、導入当初はfluent-plugin-kafkaが使っていたposeidon gemがメンテナンス停止を宣言したりしてて若干不安があったけど、その後poseidonはruby-kafkaに置き換えられ、pluginもfluentdの公式になったりして、安心感が大分増した。開発者の方々には感謝してます。

アウトプットはRedis, Elasticsearch, Kafkaの3パターンがある. 汎用的に使いやすいのはElasticserachで、Kibanaで見たり、集計結果をAPIサーバ経由で他に提供したりしている. Redisは最新の情報にしか興味がなくて、かつ更新頻度が高い場合に使っている. 特定のキーの値を10秒間隔でアップデート、とか. Kafkaは、集計結果を他と連携したいケース. 今は、kafka-topic-exporter 経由で集計結果をPrometheusに入れ、Grafanaで見るために使っている.

運用周り

モニタリングは基本的にPrometheus. flink-exporterjmx-expoterを使ってメトリクスをPrometheusに送り、Grafanaで見ている.

Flinkのメトリクスについては https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/metrics.html に記載があるが、JMXで見るためには、flink-conf.yamlに以下の設定を追加する. なお、これが使えるのはFlink 1.1以降.

1
2
3
metrics.reporters: jmx_reporter
metrics.reporter.jmx_reporter.class: org.apache.flink.metrics.jmx.JMXReporter
env.java.opts: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.port=5560

jmx_expoterの設定は、こんな感じ のものを使っている

すると、こんな感じでFlinkのKafkaConsumerのlagやconsumeされているレコード数を見ることができる.

Flink_Grafana_Kafka

以下は、あるジョブのチェックポイントのサイズ. 長期のスパンで見ると、チェックポイントサイズが増加傾向なので何かしら不要なstateがpurgeされずに溜まっていることが疑われる

Flink_Grafana_Checkpoint

監視は外部からdaemonのTCPポートが空いていることを確認している. JobManagerはWebUIポート(8081)を見れば良いが、TaskManagerのポートは起動するたびに変わるので、以下のような設定を入れて固定している.

1
2
taskmanager.rpc.port: 6122
taskmanager.data.port: 6121

Flink的なメトリクスとしては、Exceptionの発生数とか、ジョブが失敗や再起動状態にあるジョブの数とかも使ってアラートを上げた方がいいんだろうなぁ、と思いつつまだやっていない

まとめ

今productionで動かしているFlinkクラスタについて、構成や運用周りを紹介してみた. これからFlinkを使ってみよう、という人の参考になれば幸いです.

Apache Flinkを試してみての感想

| Comments

しばらくApache Flinkを試してみたので、感想を書いておこうと思う.

試したこと

  • standalone modeでのクラスタ構築
  • ストリーミングジョブを書いてみる
    • TumblingTimeWindowやSlidingTimeWindowでの集計
    • Kafka SourceとElasticsearch Sinkの利用
    • 必要だったので、カスタムトリガは書いた
  • 幾つかのジョブで性能測定
  • 社内の本番fluentdからKafka経由でFlinkにストリームを投入し、ジョブを十数日くらい連続稼働してみる
  • state backendをHDFSやRocksDBにしてみる
  • JobManager HA
  • TaskManagerやJobManagerを落としてみる
  • Flink on YARN (ジョブを起動してみただけ)

試してないこと

  • DataSet APIの利用
  • savepoint, savepointからの復旧
  • event timeやingenstion timeの利用(processing timeしか使ってない)
  • 複数ジョブの相乗り
  • 高トラフィックかつ複数ジョブでの連続稼働
  • Flink on YARNをもうちょっと色々使ってみる
  • (多分他にも色々)

所感

全体としてはすごく良く出来ていると思う. プロセスが落ちたりしても勝手に復旧するし、性能も出るし、WebUIやREST APIで色々情報取れるので運用もしやすそう. 今後は本番に入れていこうと思っている.

  • ちゃんと動く?
    • 期待通りに動かなくて、MLで聞いたらBugだ、って言われて直してくれたのが2件
    • ContinuousProcessingTimeTriggerは期待通りの動作をしなかったので、修正版を書く必要があった
    • 後は、上記の範囲では特に問題なく動いている
  • 処理の書きやすさ
    • APIがハイレベルなので、割と少ないコードで処理を書くことができて良い
    • 状態の保存とか、windowとか、自分で書くのは大変なのでその辺の面倒はFlinkが見てくれる
  • 性能
    • もちろん処理によるが、単純なものであれば1CPUで10万records/sec以上は普通に処理できそう
  • 耐障害性
    • ジョブ実行中にJobManagerやTaskManagerを落としてみたが、勝手に復旧された. すごい.
    • HDFS障害でどう振る舞うか?はまだ試せてないけど気になるところ
  • 運用
    • WebUIやREST APIで色々取れる. WebUIで大抵のものは見える印象.
    • スロット数とかはもちろん、ジョブ内のオペレータ単位で処理レコード数とか取れる
    • チェックポイントの所要時間やサイズも取れる
  • 不安なところ
    • チェックポイントのサイズには注意がいりそう. FsStateBackendの場合、状態は全て各TaskManagerのメモリに持ち、チェックポイントでHDFSに書き出される. 差分スナップショット的なものはないので、状態が大きくなってくるとスナップショットに時間がかかり、性能劣化する. キー(keyByで指定する奴)の数が大きいケースでは、RocksDBを使うことで改善されるらしいが、まだそれが有効なケースに出会ってない.
    • FlinkジョブをYARNで動かす場合を除いて、各TaskManagerは1プロセスで、その中で複数ジョブがスレッドとして動く形になっているのでisolationの部分は不安. あるジョブに引きづられてTaskManagerが固まるとかありそう.
    • ストリーム処理のプロダクトはいくつかあるし、Facebook, Twitter, Likedinみたいな大御所がバックにいるわけではないので、生き残っていくのだろうか

まとめ

ということで、自分がしばらく試しての所感を書いてみた. 実績の少ないプロダクトという意味での不安感はあるけど、性能、拡張性、耐障害性、APIの使いやすさは素晴らしいと思うので、使っていきたい. これ以上は使ってみないとわからないと思うし. あと、Flink on YARNの場合はYARNが動くクラスタがあれば、別途サーバを用意しなくてもFlinkジョブを動かすことができるので、既にHadoop環境がある場合なんかは、とりあえず試してみると良いんじゃないかと思う.

Apache Flinkのstate管理について

| Comments

はじめに

ストリーム処理の中で、処理をstatefulにしたい、という要求はよくある。例えば、1時間のtime windowで件数を集計している場合、ストリームが流れるにつれて内部で保持しているカウンタは増加していく. そして、障害等で再起動をした時とかには、そのカウンタの値も一緒に復旧したい.

Flinkにおけるstateの保存

これに対して、Apache Flinkは定期的に処理状態のスナップショットを取得する、という方法で対応している. そして、分散環境でまともに全てのスナップショットを取るのは辛いので、分散してスナップショットを取るようにしている. 具体的にはここ に詳しいが、ストリームのソースから定期的にBarrierと呼ばれる印を流して、各オペレータはこれを受け取るとスナップショットを保存するようになっている. こうすることで、処理全体を止めずに一貫性のあるスナップショットを取れるよね、という話. 障害時にはスナップショットから状態を復旧し、スナップショット以降のログをリプレイすることで処理を継続する.

スナップショットの保存先は、以下から選択することができる. グローバルのデフォルトを設定することも、ジョブ単位でそれを上書きすることも可能. (詳しくはここ)

  • MemoryStateBacked
    • デフォルト. チェックポイントの際にJobManagerのメモリ上に状態を保持する. デフォルトはサイズが5MBまで.
  • FsStateBackend
    • ファイルシステムに保持する. 最新の状態はTaskManagerのメモリに保持し、チェックポイントの際にファイルシステムに保存する. ファイルシステムはローカルでもHDFSでも良いが、障害時に別ノードで復旧することを考えるとHDFSになるだろう.
  • RocksDBStateBackend
    • 最新の状態はTaskManagerローカルファイルシステム上のRocksDBに保持し、チェックポイントの際にそれをFsStateBackendと同じようにファイルシステムに保存する. スループットは若干落ちるが、大量のstateを保持することができる.

実験

とは言え、処理によってはstateが大きくなるケースもあるわけで、そういう時にどうなるんだろう?と思って実験してみた. 環境はFlink 1.0.0でKafka, Flink, HDFSは物理サーバ.

こんな感じで、foobarというフィールドに100バイトのランダム文字列を入れて、それのdistinct countを取る. 重複を判断するためには、過去の値をすべて持たないといけないので状態は大きくなる.

1
2
$ head -n 1 dummy_log.aa 
id:0000 time:[2016-04-05 12:40:57]      level:DEBUG     method:POST     uri:/api/v1/people1     reqtime:3.053540515830725       foobar:dNjtvYKxgyym6bMNxUyrLznijuZqZfpVasJyXZDttoNGbj5GFk

こんな感じのコードで、処理を記述した(MyContinuousProcessingTimeTriggerはデフォルトのContinuousProcessingTimeTriggerに若干の改修を入れたもの). 保存先はHDFSとしている. timeWindowAllなので、処理は分散されず単一のスレッドで処理される.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
val stream = env
  .addSource(new FlinkKafkaConsumer09[String]("kafka.json2", new SimpleStringSchema(), properties))

.map(parseJson(_))
  .timeWindowAll(Time.of(10, TimeUnit.DAYS))
  .trigger(MyContinuousProcessingTimeTrigger.of(Time.seconds(5)))
  .fold(Set[String]()){(r,i) => { r + i}}
  .map{x => (System.currentTimeMillis(), x.size)}
  .addSink(new ElasticsearchSink(config, transports, new IndexRequestBuilder[Tuple2[Long, Int]]  {
    override def createIndexRequest(element: Tuple2[Long, Int], ctx: RuntimeContext): IndexRequest = {
      val json = new HashMap[String, AnyRef]
      json.put("@timestamp", new Timestamp(element._1))
      json.put("count", element._2: java.lang.Integer)
      Requests.indexRequest.index("dummy3").`type`("my-type").source(json)
    }
  }))

まずは、260万件のログを送ってスナップショットのサイズを確認してみる. 送信したログのサイズ(100bytes×260万件)とほぼ同じ. そして、同じログを2回投入しても(distinct countなので )サイズは増えない. 処理内部で集計した結果を保存している模様.

1
2
3
4
5
6
$ cat dummy_log.aa >> /tmp/dummy_log2.log
$ hadoop fs -du  /apps/flink/checkpoints/9005988ca4dc6e871d50c2b310ed0a63
26000230  /apps/flink/checkpoints/9005988ca4dc6e871d50c2b310ed0a63/chk-183
$ cat dummy_log.aa >> /tmp/dummy_log2.log
$ hadoop fs -du  /apps/flink/checkpoints/9005988ca4dc6e871d50c2b310ed0a63
26000230  /apps/flink/checkpoints/9005988ca4dc6e871d50c2b310ed0a63/chk-196

そして、その時のチェックポイント時間はJobManagerのログから確認できる. 大体6秒くらいかかる. TaskManagerのCPU使用率は定常的に高い.

1
INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 43 (in 6291 ms)

さらに、260万件を追加で投入するとチェックポイントに10秒〜数十秒かかるようになった. ジョブを実行しているTaskManagerは CPU100%になり、スループットも大幅に落ちていた.

1
2
INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 145 (in 41835 ms)
INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 146 (in 13530 ms)

MLで聞いてみた ところ、バックエンドでRocksDBを 試してみたら、とアドバイスをもらったのでやってみたが、変わらなかった. この処理の場合、状態を保存するストアには1レコードしか入らなくて、その値が非常に大きいので、効果が出なかった模様.

思ったことなど

  • ユニークユーザ数等をカウントするようなケースだと、値の種類が数千万というのもあり得るので、上のような性能だと辛い. そして、このような処理の場合、最終的には一箇所にまとめて 数を数える感じになるので、分散スナップショットの恩恵も受けづらい.
  • 現行のFlinkだと、スナップショットのたびに保持しているデータを全て書き出すので、incremental snapshotが導入されれば改善されるかも.
  • 今の時点での対応としては、値のリストをFlinkに保持せずに外部のデータストア(Redisとか)に保持するというのが一つの解決策. その場合、デメリットとしてはスケールアウトが 制限されることと、障害時などにログがリプレイされることへの対応. ただ、後者については処理の性質上、何度同じデータを投入しても結果は変わらないので、問題にならない.
  • もしくは、HyperLogLogのような推定アルゴリズムを使って、精度と引き換えに状態のサイズを削減するか.

試しにFlinkで受け取った値をRedisのSETに入れて同じ処理を実現してみたら、あっさり1000万件くらい処理できた. stateが小さい、または分散される場合はFlinkに任せて、 今回のような一部のケースでは外部のデータストアを使う、というのが現実的だろうか、と思っているところ.

Apache Flink ContinuousProcessingTimeTriggerの話

| Comments

(注: window周りのAPIを変えようという話(FLINK-3643)があるので、以下で書かれている内容は近い将来obsoleteになる可能性があります)

自分のところでは、ストリーム処理のユースケースとして、ある程度長期間のwindowでデータを保持しながら、集計結果は短い間隔で出力したい、というのがある。

例えば、Norikra(Esper)のクエリだとこんな感じ。site_idごとに、過去3日間(72時間)のユニークユーザを集計し、結果は1分ごとに出力する、というもの。 集計対象のwindowは72時間のsliding time windowとなる。こういうwindowが長いケースは、Norikraだとプロセスを再起動したりした時にwindowの内容が全て失われるので辛い。Flinkでいい感じに実装したい。

1
2
3
4
5
6
7
8
SELECT
    site_id,
    count(distinct userkey) as num_uu
FROM
    viewer_log.win:time(3 day)
GROUP BY
    site_id
OUTPUT LAST EVERY 1 min

ドキュメントを読むと、以下のようにwindowの定義とは別にいつ計算処理がトリガされるか、いつwindowの中身が削除されるか、を好きに設定できるように見える。

1
2
3
4
keyedStream
    .window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))
    .trigger(CountTrigger.of(100))
    .evictor(CountEvictor.of(10));

なので、上記のようなものは以下の様なコードで実現できるかと思った。ContinuousProcessingTimeTriggerは、指定した間隔で集計を実行するが、その際にwindowの内容の削除は行わない、というもの。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
  def main(args: Array[String]) {
//...
    val input = env.readFileStream(fileName,100,FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED)
      .map(lineToTuple(_))
      .keyBy(0)
      .timeWindow(Time.of(3, TimeUnit.DAYS))
      .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(60)))
      .fold(("",new scala.collection.mutable.HashSet[String]())){(r,i) => { (i._1, r._2 + i._2)}}
      .map{x => (new Timestamp(System.currentTimeMillis()), x._1, x._2.size)}
//...
  }
  def lineToTuple(line: String) = {
    val x = line.split("\\W+") filter {_.nonEmpty}
    (x(0),x(1))
  }

が、実際には期待どおりに動かないことが分かった。(MLで教えてもらった。こことかここ)

  • トリガが発火しないケースがある
  • timeWindowが終了してもwindow内のデータが削除されない。結果、stateのサイズが増加する一方になる。(.trigger()を呼び出した時点で、元のtimeWindowのトリガは上書きされるらしい)

って、これContinuousProcessingTimeTriggerは使えないのでは・・・。

結局、ContinuousProcessingTimeTriggerに修正を入れたカスタムTriggerを作成し、これを使用することで期待する動作をするようになった。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
@PublicEvolving
public class MyContinuousProcessingTimeTrigger<W extends TimeWindow> extends Trigger<Object, W> {
  private static final long serialVersionUID = 1L;

  private final long interval;

  private final ValueStateDescriptor<Long> stateDesc =
          new ValueStateDescriptor<>("fire-timestamp", LongSerializer.INSTANCE, 0L);


  private MyContinuousProcessingTimeTrigger(long interval) {
      this.interval = interval;
  }

  @Override
  public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
      long currentTime = System.currentTimeMillis();

      ValueState<Long> fireState = ctx.getPartitionedState(stateDesc);
      long nextFireTimestamp = fireState.value();

      if (nextFireTimestamp == 0) {
          long start = currentTime - (currentTime % interval);
          fireState.update(start + interval);

          ctx.registerProcessingTimeTimer((start + interval));
          return TriggerResult.CONTINUE;
      }
      if (currentTime >= nextFireTimestamp) {
          long start = currentTime - (currentTime % interval);
          fireState.update(start + interval);

          ctx.registerProcessingTimeTimer((start + interval));

          return TriggerResult.FIRE;
      }
      return TriggerResult.CONTINUE;
  }

  @Override
  public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
      return TriggerResult.CONTINUE;
  }

  @Override
  public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {

      ValueState<Long> fireState = ctx.getPartitionedState(stateDesc);
      long nextFireTimestamp = fireState.value();

      // only fire if an element didn't already fire
      long currentTime = System.currentTimeMillis();
      if (currentTime >= nextFireTimestamp) {
          long start = currentTime - (currentTime % interval);
                        fireState.update(0L);
                        if (nextFireTimestamp >= window.getEnd()) {
                           return TriggerResult.FIRE_AND_PURGE;
                        }
                        else {
                            return TriggerResult.FIRE;
                        }
                }
      return TriggerResult.CONTINUE;
  }

  @Override
  public void clear(W window, TriggerContext ctx) throws Exception {}

  @VisibleForTesting
  public long getInterval() {
      return interval;
  }

  @Override
  public String toString() {
      return "ContinuousProcessingTimeTrigger(" + interval + ")";
  }

  /**
  * Creates a trigger that continuously fires based on the given interval.
  *
  * @param interval The time interval at which to fire.
  * @param <W> The type of {@link Window Windows} on which this trigger can operate.
  */
  public static <W extends TimeWindow> MyContinuousProcessingTimeTrigger<W> of(Time interval) {
      return new MyContinuousProcessingTimeTrigger<>(interval.toMilliseconds());
  }
}

Apache Flinkの性能 - デフォルトのJSONパーサが遅かった話

| Comments

軽くApache Flinkの性能を測ってみた. 構成としては、Fluentd(in_tail→out_kafka_buffered)→Kafka→Flink→Elasticsearchで、仮想アクセスログ的なものに対して、URIごとの件数を1分ごとに集計して出力する、というもの。 メッセージフォーマットはJSON。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(1000)
// ....
val stream = env
  .addSource(new FlinkKafkaConsumer09[String]("kafka.dummy", new SimpleStringSchema(), properties))
  .map{ json => JSON.parseFull(json).get.asInstanceOf[Map[String, AnyRef]] }
  .map{ x => x.get("uri") match {
    case Some(y) => (y.asInstanceOf[String],1)
    case None => ("", 1)
  }}
  .keyBy(0)
  .timeWindow(Time.of(1, TimeUnit.MINUTES))
  .sum(1)
  .map{ x => (System.currentTimeMillis(), x)}
  .addSink(new ElasticsearchSink(config, transports, new IndexRequestBuilder[Tuple2[Long, Tuple2[String, Int]]]  {
    override def createIndexRequest(element: Tuple2[Long, Tuple2[String, Int]], ctx: RuntimeContext): IndexRequest = {
      val json = new HashMap[String, AnyRef]
      json.put("@timestamp", new Timestamp(element._1))
      json.put("uri", element._2._1)
      json.put("count", element._2._2: java.lang.Integer)
      println("SENDING: " + element)
      Requests.indexRequest.index("dummy2").`type`("my-type").source(json)
    }
  }))

環境は以下の通り

  • Kafka : 8vCPU, 8GBMemoryのVM*3, HDP2.4 (Kafka-0.9)
    • パーティション数は1なので、実質broker1台
  • Flink : JobManager, TaskManagerともに8vCPU, 8GBMemoryのVM (Flink-1.0.0)
    • TaskManagerは3台だが、ジョブ並列度を1にしたので1台でしか動かない状態
  • Elasticsearch: 4CPU, 4GB MemoryのVM*1 (Elasticsearch 1.7.2)

URIは9種類なので、Elasticsearchには毎分9レコードが出力されることにいなる。 Elasticsearchに出力されたURIごとの件数を1分単位に合計したものを、Flinkのスループットと考える。 レコードの生成には dummerを使用した。

で、普通にやったら2,000msg/secの入力を与えて、89,000msg/min = 1,483msg/secしか処理できなかった。FlinkプロセスのCPUが100%(1CPU使いきり)となり、Kafkaのlagが増えていく状態。 チェックポイント外したりESへの出力をやめたりしてみたのだけど、性能はさほど変わらず。

何にCPUを食っているんだろう?と思って、Flinkプロセスのjstackを何回か取ってみたら、こんな感じでJSONのパースを実行中であるケースが殆んどだった。

1
2
3
4
5
6
    at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:881)
    at scala.util.parsing.json.JSON$.parseRaw(JSON.scala:51)
    at scala.util.parsing.json.JSON$.parseFull(JSON.scala:65)
    at KafkaTest3$$anonfun$1.apply(KafkaTest3.scala:46)
    at KafkaTest3$$anonfun$1.apply(KafkaTest3.scala:46)
    at org.apache.flink.streaming.api.scala.DataStream$$anon$4.map(DataStream.scala:485)

ということで、パーサをJacksonにしてみた。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
  val mapper = new ObjectMapper()

  def main(args: Array[String]) {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.enableCheckpointing(1000)

    // ...

    val stream = env
     .addSource(new FlinkKafkaConsumer09[String]("kafka.json", new SimpleStringSchema(), properties))
      .map(parseJson(_))
      .map{ x=> (x.get("uri").asInstanceOf[String], 1)}
      .keyBy(0)
      .timeWindow(Time.of(1, TimeUnit.MINUTES))
      .sum(1)
      .map{ x => (System.currentTimeMillis(), x)}
      .addSink(new ElasticsearchSink(config, transports, new IndexRequestBuilder[Tuple2[Long, Tuple2[String, Int]]]  {
        override def createIndexRequest(element: Tuple2[Long, Tuple2[String, Int]], ctx: RuntimeContext): IndexRequest = {
          val json = new HashMap[String, AnyRef]
          json.put("@timestamp", new Timestamp(element._1))
          json.put("uri", element._2._1)
          json.put("count", element._2._2: java.lang.Integer)
          Requests.indexRequest.index("dummy2").`type`("my-type").source(json)
        }
      }))

    env.execute("KafkaTest9")
  }

  def parseJson(x: String): Map[String,AnyRef] = {

    mapper.readValue(x,classOf[Map[String,AnyRef]])
  }

そうしたら、スループットが大幅に向上して900,000msg/min=15,000msg/secを20%程度のCPU(1CPUの5分の1)で処理できた。

http://www.slideshare.net/nestorhp/scala-json-features-and-performance にScalaで使えるJSONライブラリを比較したスライドがあるのだけど、 これを見るとScala標準のパーサと、その他のライブラリで速度が3桁くらい違う。えーーー、、、

ともあれ、十分に実用的な性能が出そうであることがわかったので良かった。

GREE Tech Talk 10に行ってきた

| Comments

GREE Tech Talk #10 に行ってきたのでメモ.

今回のテーマはData Visualizationということで、どんな見せ方をしたら効果的か、とかその辺の話が聞けたら良いな、と思って参加した. 実際にはそれよりも、こんなツールでこんなことできるよ、系の話が多かった. まあ、それはそれで面白かったのだけど、実際ツールは色々あるので、それらの中から適切なものを 選ぶためにも、みんな何をどう見て、どう活かしているのか、というあたりの話がもっとあると良かったなあ、というのが全体的な感想.

行って一番の収穫は、GREE反田さんのGrafanaにPrometheusのデータソースを追加した話だった. 自分も仕事でGrafana+Prometheusの組み合わせに出会って、 これはすごい、素晴らしいと思って最近使っているのだけど、Prometheusデータソースを作ったのが日本の方だとは知らなかった. 自分はまだProemtheusを十分に使いこなせて いないので、懇親会の中で色々話を聞けたのは大変良かった。幾つかメモしておくと、

  • Prometheusのアラートは任意のURLにJSONで投げることができるので、fluentdで受けて好きなところに飛ばす、ということができる
  • service discoveryにはファイルを使うことができる. つまり、例えばcronで社内の構成管理システムを叩いて、 ファイルに吐き出すようにしておけば、Prometheusがそれを随時読み込んでポーリング対象のリストを更新してくれる
  • Prometheusのクエリは結構色々できそう. 移動平均的な感じでトレンドを表示したり、変化が一定以上のものを表示したり.
  • GrafanaのTemplatingの機能も便利そう. 特定のホストグループに対してグラフを作ったり、というのが動的にできるっぽい
  • 新しいメトリックを追加するのはちょっと面倒だなぁ、と思っているのだけど、そこはやはりexporterを書く感じになるらしい.

Prometheusの話以外だと、E2D3というのも面白そうだった。Excelを使って、d3.js等を使った色々な可視化をすることができる. 可視化のパターンはテンプレートという形で誰でも作って公開できるので、既存のテンプレートを使って、そこにExcelから値を埋めて、、、という感じで 簡単に、見た目のインパクトがある素敵なグラフなどを作ることができるとのこと. Excel、好きではないけどやっぱり手軽で素晴らしいツールなので、これとE2D3を使うと結構楽しそうだな、と思った.

自分はインフラとかミドルウェアが中心で、あまりUIを作る人では無いのだけど、処理した結果のデータから最終的な価値を引き出すところは可視化がやっぱり鍵で、 こんな風に見たいから、こんなデータを集めないと、とか、大量のデータを素早く処理したい、とか、もっとリアルタイムに、とか裏側のモチベーションに繋がる ところがあると思う. いい感じの裏側作っても、見た目が無いと伝わらないし.

あと、会場では飲み物とお菓子が豊富でビックリした。無料で興味深い話を聞けた上に、懇親会まで含めて飲み食いも無料でできるとは、なんて素晴らしいんだと 思った勉強会でした. 関係者の皆さんに感謝です.

Apache Flinkを試している

| Comments

耐障害性と拡張性のあるストリーム処理基盤が欲しい、と思ってApache Flinkを調べている. 今はリアルタイム集計にNorikraを使っていて、これはとてもカジュアルに使えて良いのだけど、以下の様なケースだと難しい。

  • 比較的止めたくない処理で、サーバ障害時にも自動的に回復して欲しい
  • 1日とか長いtime windowの集計をしているので、途中でサーバが落ちて集計中の状態が失われると辛い
  • トラフィックが増えてきて、複数サーバに負荷を分散したい
  • 例えばストリームに含まれているIDに対応する値を外部のテーブルから取ってくるような、ちょっと複雑な処理をしたい

Flinkとはどのようなソフトウェアか

一言で言うと、対障害性と拡張性を備えた、分散ストリーム処理基盤。バッチ処理もストリーム処理の仕組みでできるよね、ということでバッチ用、ストリーム用両方のAPIが提供されている。実行環境としては、Hadoop等と同じようにワーカプロセスが複数のサーバで立ち上がっていて、そこに対してジョブを投げるような感じになっている。バッチではジョブを投げるとデータを処理して終了、だけど、ストリーム処理では投げたジョブはずっと生きていて、ストリームにデータが流れてくるたびにそのデータを処理して出力する、という形になる。ジョブは、JavaとScalaで書くことができる。ロードマップにはSQL的なものもあるっぽいけど、今は存在しない。

SparkとかStormと何が違うの?

ググると色々出てくるが、米Yahooのベンチマークとか、このページが分かりやすかった。

Stromとの比較で言うと、処理形態はほぼ同じ。ただ、大きな違いとしてStormでは各処理オペレータはstatelessになっていて、落ちると状態が失われる。永続化したかったら、自分で外部ストレージを使うようなコードを書く必要がある。耐障害性ということろだと、各レコードに対して処理が完了した際にackを返す仕組みがあるので、障害などでackが無かったら再度処理する、というような処理を書く感じになる。なので、例えば処理が途中まで行われて障害になると、そのレコードは重複して処理される形になる。Flinkはここに詳しく書いてあるけど、オペレータがstatefulになっていて、チェックポイントごとに状態が保存される。障害時には、チェックポイントから復旧し、それ以降のレコードをリプレイすることで、exactly-onceを実現している。

あとは、Stormだと各処理オペレータ(Bolt)をJavaのクラスとして書かないといけないけど、Flinkはstream.keyBy(...).map{...}.timeWindow(...)みたいな感じのハイレベルのAPIが提供されている。

Spark Streamingとの違いで言うと、Spark Streamingは正確にはストリーミングではなくてマイクロバッチなので、そのバッチの間隔にストリーム処理のwindowが左右される。sliding time windowとか、一定数のレコードを保持するようなwindowは作ることができない。(ちゃんとドキュメント読んでないけど、そのはず)

触ってみた

とりあえず触ってみるには、https://ci.apache.org/projects/flink/flink-docs-release-0.10/quickstart/setup_quickstart.html に従えば良い。

ざっくりした流れとしては、

  • バイナリをダウンロードして展開
  • ./bin/start-local.sh 実行
  • http://localhost:8081/ でJobManagerを開く
  • exampleがバイナリと一緒に配布されているので、それを実行

という感じで試すことができる。

クラスタを組むのも割と簡単で、 https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/cluster_setup.html に従えばできる。

JobManagerというのがマスタで、TaskManagerというのがワーカになっているので、これをサーバに分散配置することになる。

流れとしては、

  • 各サーバにflink用のユーザとssh keyを用意
    • ssh keyは、起動スクリプトの中でssh経由でTaskManagerを起動するので必要.自分で各サーバのTaskManagerを起動して回るなら無くても良い
  • 各サーバにバイナリを配布
  • 設定ファイル(flink-conf.yaml, slaves)を用意
    • flink-conf.yamlは、配布されているものをそのまま使って、JobManagerのアドレスを設定すれば良い
    • slavesはTaskManagerホストを列挙したもの。クラスタ起動スクリプトの中で使われる

となる。 ジョブを書くには、https://ci.apache.org/projects/flink/flink-docs-release-0.10/quickstart/scala_api_quickstart.html に従う。mavenのアーキタイプが用意されてるので、それでプロジェクトのテンプレートを作って、ドキュメントの中にあるWordCountをコピーして走らせれば良い。

感想

試しにfluend→Kafka→Flink→Elasticsearch、とつなげて分散実行してみたり、プロセスを落としてみたりしたが、期待通りに動作した。例えば、5分間のtime windowで件数を集計するような処理を作って、実行中にプロセスを落とすと、障害が検知されてジョブが再実行される。そして、勝手に必要なリカバリがされるので、障害があっても無くても実行結果は変わらない。すごい。(もう少しちゃんと見ると、ずれるケースもありそうだけど、まだ詳しく見ていない)

並列度は、ジョブの投入時に指定できるので、ちゃんとKafkaのパーティションを複数作って並列度を指定して実行すれば、各TaskManagerに分散して実行される。

ただ、性能のところが今一つで、処理を3サーバに分散、5,000msg/secを投入してやってみたら、各サーバ1CPUを100%使って、Kafkaにメッセージが滞留する状態になった。実際に処理できたのは4,300msg/secくらい。3CPUで4,300msg/secだと大分コストが高いなあ、という印象。まあ、とりあえず動かしてみただけなので、何か正しくない可能性はある。もう少し試してみたい。

Hadoop / Spark Conference Japan 2016 に行ってきた

| Comments

行ってきた。本当はストリーム処理をする上でのSparkってどうよ、的なあたりを聞きたかったのだけど、気がついたらHadoop周りの話題を多く聞いていたように思う。後で資料が公開されたら、行くことができなかったセッションの内容も見ておきたい。

以下、思ったことなどをつらつらと書いてみる。

ひとつ目は、これからはCPUをどうするかというのが大きい課題なのかなぁ、ということ。Sparkの、Project Tungstenの話の中でも出てきたけど、ハードウェアの進化という面だと、ディスクはSSDが普及してきたし、ネットワークは10Gbpsになってきたし、メモリは安価・大容量になったけど、CPUは劇的に速くなっていない。分散処理とか、GPUを使うとか、CPUの集積度を上げる、とかしていっても、最後には消費電力という課題が残る。数年前まではディスクI/Oがボトルネックで、これをどうするか、的な感じだったけど、その辺がある程度解決してきたのと、機械学習周りの普及で、今はCPUをいかに効率よく回すか、ということが重要になってきているのだなぁ、というのを感じた。

Sparkについては、懇親会の中で少し聞いたところだと、他の人が踏み固めた道を歩く分には割と大丈夫、という感じらしい。一般的なユースケースと違う使い方をしたり、リソース的にシビアだったりすると辛いのかな、と。これはSparkに限らない気はするけど。挙動が分かりやすいとか、今どうなっているのかが把握しやすいとか、再起動のしやすさとかの、運用の容易さは重要だよな、と思ったけど、そこは実際に動かしてみないとわからなそうだな。使っている人たちにもう少し詳しく聞けばよかった、というのは反省。

Hadoopは、まあ成熟している印象。使っていて大変だった系の話としては、Yahooさんの数百ノードまでスケールさせた時にHive MetastoreとかYARN Resource Managerに負荷が集中し話とか、Softbankさんの、オンプレでサーバの物理的な構築とか、故障対応のオペレーションの話があった。自分のところでも使っているけど、そこそこ余裕のある構成で組んで、普通に集計で使用している分には問題ないなあ、と思う。構築・運用周りも前は自分でがんばってたけど、Ambariを使い始めてから大分楽になった。もちろん、Hadoop自体が停滞しているわけではなくてYARNもHDFSもHiveも改善されて性能が上がったり、使いやすくなったりはしているわけで、それは使っている身としてはとてもありがたい。

あと、本題とは関係ないけど、懇親会の中で同僚と話していて、日頃仕事でやっていることとか、公開できる範囲でブログに書いて世の中に共有することは大事だよね、という話になった。自分もそういう記事に多分にお世話になっているので。ということで、まずは忘れないうちにこの記事を書いておこう、と思った次第です。(誰かの役は立たなそうだけど、まずは書く、ということで)

最後に、運営に関わられた方々、発表者の方々、どうもありがとうございました。各発表も、運営周りも、ランチも懇親会も素晴らしかったです。

EsperのEPLでjava methodとGROUP Byを使ったらレコードが重複した話

| Comments

Norikraで以下の様な感じの、GROUP BYして、SELECTの中でjavaメソッドを使うようなクエリを登録していたのだけど、なんか生成されるイベントが多い、というのが発端で調べてみた.

1
SELECT str, str.split(",") as splitted, count(*) as cnt FROM str_stream.win:time_batch(10 sec) GROUP BY str

上記のクエリを登録した状態で、以下のようにイベントを投入する.

1
2
3
$ echo '{"str":"a,b,c"}' | norikra-client event send str_stream
$ echo '{"str":"a,b,c"}' | norikra-client event send str_stream
$ echo '{"str":"a,b,c"}' | norikra-client event send str_stream

と、こんな感じで同じようなレコードが重複して生成される. count(*)は正しくカウントされているのだが、GROUP BYしているので1レコードになってて欲しい.

1
2
3
4
$ norikra-client event sweep
{"time":"2015/07/31 12:06:52","query":"splittest","str":"a,b,c","splitted":["a","b","c"],"cnt":3}
{"time":"2015/07/31 12:06:52","query":"splittest","str":"a,b,c","splitted":["a","b","c"],"cnt":3}
{"time":"2015/07/31 12:06:52","query":"splittest","str":"a,b,c","splitted":["a","b","c"],"cnt":3}

Norikraではなく、Esperで同じことを実験してみても発生するので、Esperの事象っぽい.公式ドキュメントの”3.7.2. Output for Aggregation and Group-By”を見ると、以下の様な記述がある。 GROUP BYの中に無いフィールドをSELECTすると、入力イベントと同じ数の出力イベントが発生するよ、という話.

If your statement selects non-aggregated properties and aggregation values, and groups only some properties using the group by clause, your statement may look as below:

select account, accountName, sum(amount) from Withdrawal.win:time_batch(1 sec) group by account

At the end of a time interval, the engine posts to listeners one row per event. The aggregation result aggregates per unique account.

とは言え、GROUP BYの中にstr.split(",")を入れても同じ. 値は同じでも、Stringクラスのオブジェクトは別になってしまうからなのかなぁ. 一応、回避策としてはfirsteverという、イベントの中で最初の値だけを取る関数があるので、firstever(str.split(","))のようにすると1レコードに集約できた.

Issueで聞いてみた. https://github.com/espertechinc/esper/issues/18

若干うまく伝わらなかった気がするけど、javaメソッドではなく同等の機能を持つUDFを作ればうまくいくよ、ということらしい。実際、EsperのUDFを作ってみたら、ちゃんと集約された。

ちなみに、EsperのUDFを作るのは簡単で、以下のようにstaticなメソッドを持つクラスを作って

1
2
3
4
5
public class MyUtilityClass {
    public static String[] split(String str, String regex){
        return str.split(regex);
    }
}

ConfigurationクラスのaddPlugInSingleRowFunctionメソッドで、function名、クラス名、メソッド名を渡してやれば良い。

1
addPlugInSingleRowFunction("split", MyUtilityClass.class.getName(), "split");

まあ、都度UDF作るのも面倒なので、firsteverで回避出来る場合はそうするかなぁ.

そんな話でした.