Tech Notes

Apache Flinkを試している

| Comments

耐障害性と拡張性のあるストリーム処理基盤が欲しい、と思ってApache Flinkを調べている. 今はリアルタイム集計にNorikraを使っていて、これはとてもカジュアルに使えて良いのだけど、以下の様なケースだと難しい。

  • 比較的止めたくない処理で、サーバ障害時にも自動的に回復して欲しい
  • 1日とか長いtime windowの集計をしているので、途中でサーバが落ちて集計中の状態が失われると辛い
  • トラフィックが増えてきて、複数サーバに負荷を分散したい
  • 例えばストリームに含まれているIDに対応する値を外部のテーブルから取ってくるような、ちょっと複雑な処理をしたい

Flinkとはどのようなソフトウェアか

一言で言うと、対障害性と拡張性を備えた、分散ストリーム処理基盤。バッチ処理もストリーム処理の仕組みでできるよね、ということでバッチ用、ストリーム用両方のAPIが提供されている。実行環境としては、Hadoop等と同じようにワーカプロセスが複数のサーバで立ち上がっていて、そこに対してジョブを投げるような感じになっている。バッチではジョブを投げるとデータを処理して終了、だけど、ストリーム処理では投げたジョブはずっと生きていて、ストリームにデータが流れてくるたびにそのデータを処理して出力する、という形になる。ジョブは、JavaとScalaで書くことができる。ロードマップにはSQL的なものもあるっぽいけど、今は存在しない。

SparkとかStormと何が違うの?

ググると色々出てくるが、米Yahooのベンチマークとか、このページが分かりやすかった。

Stromとの比較で言うと、処理形態はほぼ同じ。ただ、大きな違いとしてStormでは各処理オペレータはstatelessになっていて、落ちると状態が失われる。永続化したかったら、自分で外部ストレージを使うようなコードを書く必要がある。耐障害性ということろだと、各レコードに対して処理が完了した際にackを返す仕組みがあるので、障害などでackが無かったら再度処理する、というような処理を書く感じになる。なので、例えば処理が途中まで行われて障害になると、そのレコードは重複して処理される形になる。Flinkはここに詳しく書いてあるけど、オペレータがstatefulになっていて、チェックポイントごとに状態が保存される。障害時には、チェックポイントから復旧し、それ以降のレコードをリプレイすることで、exactly-onceを実現している。

あとは、Stormだと各処理オペレータ(Bolt)をJavaのクラスとして書かないといけないけど、Flinkはstream.keyBy(...).map{...}.timeWindow(...)みたいな感じのハイレベルのAPIが提供されている。

Spark Streamingとの違いで言うと、Spark Streamingは正確にはストリーミングではなくてマイクロバッチなので、そのバッチの間隔にストリーム処理のwindowが左右される。sliding time windowとか、一定数のレコードを保持するようなwindowは作ることができない。(ちゃんとドキュメント読んでないけど、そのはず)

触ってみた

とりあえず触ってみるには、https://ci.apache.org/projects/flink/flink-docs-release-0.10/quickstart/setup_quickstart.html に従えば良い。

ざっくりした流れとしては、

  • バイナリをダウンロードして展開
  • ./bin/start-local.sh 実行
  • http://localhost:8081/ でJobManagerを開く
  • exampleがバイナリと一緒に配布されているので、それを実行

という感じで試すことができる。

クラスタを組むのも割と簡単で、 https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/cluster_setup.html に従えばできる。

JobManagerというのがマスタで、TaskManagerというのがワーカになっているので、これをサーバに分散配置することになる。

流れとしては、

  • 各サーバにflink用のユーザとssh keyを用意
    • ssh keyは、起動スクリプトの中でssh経由でTaskManagerを起動するので必要.自分で各サーバのTaskManagerを起動して回るなら無くても良い
  • 各サーバにバイナリを配布
  • 設定ファイル(flink-conf.yaml, slaves)を用意
    • flink-conf.yamlは、配布されているものをそのまま使って、JobManagerのアドレスを設定すれば良い
    • slavesはTaskManagerホストを列挙したもの。クラスタ起動スクリプトの中で使われる

となる。 ジョブを書くには、https://ci.apache.org/projects/flink/flink-docs-release-0.10/quickstart/scala_api_quickstart.html に従う。mavenのアーキタイプが用意されてるので、それでプロジェクトのテンプレートを作って、ドキュメントの中にあるWordCountをコピーして走らせれば良い。

感想

試しにfluend→Kafka→Flink→Elasticsearch、とつなげて分散実行してみたり、プロセスを落としてみたりしたが、期待通りに動作した。例えば、5分間のtime windowで件数を集計するような処理を作って、実行中にプロセスを落とすと、障害が検知されてジョブが再実行される。そして、勝手に必要なリカバリがされるので、障害があっても無くても実行結果は変わらない。すごい。(もう少しちゃんと見ると、ずれるケースもありそうだけど、まだ詳しく見ていない)

並列度は、ジョブの投入時に指定できるので、ちゃんとKafkaのパーティションを複数作って並列度を指定して実行すれば、各TaskManagerに分散して実行される。

ただ、性能のところが今一つで、処理を3サーバに分散、5,000msg/secを投入してやってみたら、各サーバ1CPUを100%使って、Kafkaにメッセージが滞留する状態になった。実際に処理できたのは4,300msg/secくらい。3CPUで4,300msg/secだと大分コストが高いなあ、という印象。まあ、とりあえず動かしてみただけなので、何か正しくない可能性はある。もう少し試してみたい。

Comments