Tech Notes

FluentdのFilter&Label周りのコードを読んだメモ

| Comments

Fluentd v0.12の目玉機能としてFilterとLabelがある. この機能の導入にあたってはメッセージのルーティングを行う部分のコードがガラリと変わっているはずなので、興味本位で読んでみた.

機能についての参考文書

そもそもFilterやLabelって何?というあたりは以下が参考になる。

v0.10ではどうだったか

Matchクラスがmatchディレクティブで宣言されたタグのパターンと、行き先のOutputクラスを保持していて、EngineClass#emit (最終的な呼び出し先はemit_stream)で該当するMatchを探し出し、そこに向けてemitする、という形だった.

1
2
3
4
5
6
7
8
9
10
11
12
13
  def emit_stream(tag, es)
      target = @match_cache[tag]
      unless target
        target = match(tag) || NoMatchMatch.new
        # this is not thread-safe but inconsistency doesn't
        # cause serious problems while locking causes.
        if @match_cache_keys.size >= MATCH_CACHE_SIZE
          @match_cache.delete @match_cache_keys.shift
        end
        @match_cache[tag] = target
        @match_cache_keys << tag
      end
      target.emit(tag, es)

なので、ルーティングを管理するテーブルはEngineClass@matches一つだったし、tagがルーティングのキーになっていた. 複数のOutputで順番に処理したい場合は都度tagを書き換えていく必要があった.

v0.12の場合

Agent, RootAgent, Label, EventRouterと言った新しいクラスが導入されている.

  • Label
    • labelディレクティブの中に存在するFilterおよびOutputプラグインを管理するクラス
  • RootAgent
    • labelディレクティブに属さない(設定ファイルのルート直下にある)Input, Filter, Outputプラグイン、および各Labelクラスを管理するクラス
  • Agent
    • RootAgentおよびLabelの親クラス.
  • EventRouter
    • ルーティングのためのルール(どのタグパターンに対して、どのようなfilterやmatchが存在するか)を管理し、イベントのルーティングを行うクラス

RootAgentおよびLabelのインスタンスは、それぞれ自分自身が管理する範囲のルーティングを行うEventRouterクラスのインスタンスを保持している.

以下、実際のコードを見てみる.

起動部分

まずは、configurationを読み込んでいく段階でどのようなクラスが生成されていくのかを見てみる.

Supervisor#start (init_engine)

Supervisor#start

Supervisorが起動して、色々と準備していく部分. init_engine内で Engine#init が呼ばれ、ここでRootAgentが生成される.

RootAgentの親クラスであるAgentのコンストラクタは以下のようになっている

Agent#initialize

1
2
3
4
5
6
7
8
9
10
11
12
13
def initialize(opts = {})
  super()

  @context = nil
  @outputs = []
  @filters = []
  @started_outputs = []
  @started_filters = []

  @log = Engine.log
  @event_router = EventRouter.new(NoMatchMatch.new(log), self)
  @error_collector = nil
end

自身が管理するOutputクラス、Filterクラス達を保持するための変数が存在している. また、そのスコープでのルーティングを行うEventRouterクラスをここで生成している.

RootAgentについては、これに加えて更にInputやLabelクラスも管理するような構造になっている. (root_agent.rb)

Supervisor#start (run_configure)

Supervisor#run_conigureが呼ばれると、Engine#configureを経由してRootAgent#configureが呼ばれる.

RootAgent#configure

ちょっと長いが引用.これにより以下が行われる

  • labelディレクティブがあった場合
    • add_labelにより新規Labelオブジェクトを生成
    • さらに、そのLabelオブジェクトのconfigureを呼び出す. configureの内容については、以下のAgent#configureを参照.
  • sourceディレクティブがあった場合
    • add_sourceによりInputプラグインのインスタンスを生成
    • Inputプラグインがemitする際の投げ先として、以下を登録.
      • そのInputプラグインで@labelが設定されている場合→設定されたLabelオブジェクトのEventRouterを登録
      • それ以外の場合→RootAgentEventRouterを登録

Inputプラグインについては@labelが設定されている場合とそうでない場合で、emit先のEventRouterを切り替えることができるようになっている.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def configure(conf)
  error_label_config = nil

  # initialize <label> elements before configuring all plugins to avoid 'label not found' in input, filter and output.
  label_configs = {}
  conf.elements.select { |e| e.name == 'label' }.each { |e|
    name = e.arg
    raise ConfigError, "Missing symbol argument on <label> directive" if name.empty?

    if name == ERROR_LABEL
      error_label_config = e
    else
      add_label(name)
      label_configs[name] = e
    end
  }
  # Call 'configure' here to avoid 'label not found'
  label_configs.each { |name, e| @labels[name].configure(e) }
  setup_error_label(error_label_config) if error_label_config

  super

  # initialize <source> elements
  if @without_source
    log.info "'--without-source' is applied. Ignore <source> sections"
  else
    conf.elements.select { |e| e.name == 'source' }.each { |e|
      type = e['@type'] || e['type']
      raise ConfigError, "Missing 'type' parameter on <source> directive" unless type
      add_source(type, e)
    }
  end
end

さらに、RootAgentの親クラスであるAgent#configureでは同様にmatchに対してadd_match、filterについてadd_filterが呼ばれる.

Anget#configure

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def configure(conf)
  super

  # initialize <match> and <filter> elements
  conf.elements.select { |e| e.name == 'filter' || e.name == 'match' }.each { |e|
    pattern = e.arg.empty? ? '**' : e.arg
    type = e['@type'] || e['type']
    if e.name == 'filter'
      add_filter(type, pattern, e)
    else
      add_match(type, pattern, e)
    end
  }
end

AgentLabelの親クラスでもあるので、新しいLabelオブジェクトのconfigureが呼び出された際もこのコードが実行されることになる.

add_filteradd_matchが何をしているかというと、そのAgentが持っているEventRouterに対してルーティングのルール(Ruleオブジェクト)を登録している.

絵にすると、、、

FluentdのBlogに書かれているサンプルを元に、どんな感じのオブジェクトたちが出来上がるかを絵にするとこんな感じ.

RootAgent

labelごとにルーティングテーブルを持つので、labelが違えば異なるルールでルーティングする、ということができるようになる.

emitの動き

ここまでで、各Input, Filter, Outputプラグインインスタンスは、自分がemitする先のEventRouterオブジェクトを知っていることになる.

まず、Inputプラグイン内では自分が知っているEventRouteremitする.

1
router.emit(tag, time, record)

emitemit_streamに飛ぶので、以下のコードが呼び出される. matchメソッドが返してきたオブジェクトに対してemitする.

EventRouter#emit_stream

1
2
3
4
5
def emit_stream(tag, es)
  match(tag).emit(tag, es, @chain)
rescue => e
  @emit_error_handler.handle_emits_error(tag, es, e)
end

EventRouter#match に飛ぶ. matchはemitされたtagを受け取るべきCollectorを返す.

1
2
3
4
5
6
def match(tag)
  collector = @match_cache.get(tag) {
    c = find(tag) || @default_collector
  }
  collector
end

このCollectorを探す部分がどうなっているかと言うと、

event_router#find

こんな感じになっている. つまり、Filterが使われていればPipelineオブジェクトを生成してそこにFilterやOutputを順次追加していく. FilterがなければPipelineの代わりにOutputを直接返す.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def find(tag)
  pipeline = nil
  @match_rules.each_with_index { |rule, i|
    if rule.match?(tag)
      if rule.collector.is_a?(Filter)
        pipeline ||= Pipeline.new
        pipeline.add_filter(rule.collector)
      else
        if pipeline
          pipeline.set_output(rule.collector)
        else
          # Use Output directly when filter is not matched
          pipeline = rule.collector
        end
        return pipeline
      end
    end
  }

  if pipeline
    # filter is matched but no match
    pipeline.set_output(@default_collector)
    pipeline
  else
    nil
  end
end

よって、emitされたレコードはPipelineまたはOutputemitされることになる. そして、Pipelineemitされた場合は以下のコードに辿り着き、順番にFilterを通った後に最終的にOutputにemitされることになる.

Pipeline#emit

1
2
3
4
5
6
7
  def emit(tag, es, chain)
    processed = es
    @filters.each { |filter|
      processed = filter.filter_stream(tag, processed)
    }
    @output.emit(tag, processed, chain)
  end

このようにFilterを実現するためにPipelineという新しい仕組みを導入しているため、tagの書き換えによる多段フィルタをしなくて済むようになっている.

まとめ

  • v0.12のLabel, Filterを実現している部分のコードを読んでみた
  • Labelの部分は、RootAgent(設定ファイルのROOT部分)および各labelディレクティブごとにルーティングテーブル(EventRouter)を分けることにより実現されている
  • Filterは、レコードに対する連続した処理を表現する、Pipelineという新たな仕組みを導入することで実現されている

Fluentdで Incompatible Character Encodingのエラーが出た話

| Comments

Fluentdに対して、in_httpのインタフェースで送信してくるアプリケーションがあるのだけど、そのアプリケーションに対してリリースがあった後、一部のメッセージがうまく送れないという話があった. 見てみたら、受信側Fluentdに以下のようなメッセージが多発していた. fluentdのバージョンは0.10.48.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
2015-02-06 14:14:52 +0900 [warn]: fluent/engine.rb:156:rescue in emit_stream: em
it transaction failed  error_class=Encoding::CompatibilityError error=#<Encoding
::CompatibilityError: incompatible character encodings: ASCII-8BIT and UTF-8>
  2015-02-06 14:14:52 +0900 [warn]: plugin/in_http.rb:147:on_request: /opt/td-ag
ent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.10.48/lib/fluent/event.rb:32:in
`<<'
  2015-02-06 14:14:52 +0900 [warn]: plugin/in_http.rb:147:on_request: /opt/td-ag
ent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.10.48/lib/fluent/event.rb:32:in
`to_msgpack'
  2015-02-06 14:14:52 +0900 [warn]: plugin/in_http.rb:147:on_request: /opt/td-ag
ent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.10.48/lib/fluent/event.rb:32:in
`block in to_msgpack_stream'
  2015-02-06 14:14:52 +0900 [warn]: plugin/in_http.rb:147:on_request: /opt/td-ag
ent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.10.48/lib/fluent/event.rb:54:in
`call'
  2015-02-06 14:14:52 +0900 [warn]: plugin/in_http.rb:147:on_request: /opt/td-ag
ent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.10.48/lib/fluent/event.rb:54:in
`each'
  2015-02-06 14:14:52 +0900 [warn]: plugin/in_http.rb:147:on_request: /opt/td-ag
ent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.10.48/lib/fluent/event.rb:31:in
`to_msgpack_stream'
  2015-02-06 14:14:52 +0900 [warn]: plugin/in_http.rb:147:on_request: /opt/td-ag
ent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.10.48/lib/fluent/output.rb:424:i
n `emit'
  2015-02-06 14:14:52 +0900 [warn]: plugin/in_http.rb:147:on_request: /opt/td-ag
ent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.10.48/lib/fluent/output.rb:33:in
 `next'

全てのメッセージが送れてないわけではない模様. 適当なメッセージをcurlで送ってみても問題ないので なんだろう、、、と思っていたら、以下のpull requestを見つけた. 偶然、事象が発生したのと同日.

https://github.com/fluent/fluentd/pull/550

Messagepackの不具合?で、5KBを超える、且つエンコードがASCII-8BIT以外の文字列をシリアライズしようとした時に、このエラーが出ることがあるそうな. msgpack 0.5.11で修正.

アプリケーションの人に聞いてみると、確かにメッセージサイズは大幅に増えているようだし、エラー発生時のパケットを見ても、メッセージサイズが数百KBととても大きい. このpull requestは v0.10.60で取り込まれているようなので、td-agentを2.1.4に上げてテストをしてみたら発生しなくなった.

もっと前にこれが発生してたら困ってただろうな. ちょうど修正されてて助かりました.

HDP2.2でResourceManagerが両系standbyになった事象

| Comments

HDP2.2で、daemonやクラスタの再起動を繰り返していた所、ResourceManagerが両系standbyになってしまった.

ResourceManagerのログには以下のように出力されている.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
2015-01-16 16:29:19,495 WARN  resourcemanager.RMAuditLogger
(RMAuditLogger.java:logFailure(285)) - USER=yarn
OPERATION=transitionToActive    TARGET=RMHAProtocolService
RESULT=FAILURE  DESCRIPTION=Exception transitioning to active
PERMISSIONS=All users are allowed
2015-01-16 16:29:19,495 WARN  ha.ActiveStandbyElector
(ActiveStandbyElector.java:becomeActive(809)) - Exception handling the
winning of election
org.apache.hadoop.ha.ServiceFailedException: RM could not transition to Active
        at org.apache.hadoop.yarn.server.resourcemanager.EmbeddedElectorService.becomeActive(EmbeddedElectorService.java:128)
        at org.apache.hadoop.ha.ActiveStandbyElector.becomeActive(ActiveStandbyElector.java:805)
        at org.apache.hadoop.ha.ActiveStandbyElector.processResult(ActiveStandbyElector.java:416)
        at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:599)
        at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:498)
Caused by: org.apache.hadoop.ha.ServiceFailedException: Error when
transitioning to Active mode
        at org.apache.hadoop.yarn.server.resourcemanager.AdminService.transitionToActive(AdminService.java:304)
        at org.apache.hadoop.yarn.server.resourcemanager.EmbeddedElectorService.becomeActive(EmbeddedElectorService.java:126)
        ... 4 more
Caused by: org.apache.hadoop.service.ServiceStateException:
org.apache.hadoop.yarn.exceptions.YarnException: Application with id
application_1421115867116_0001 is already present! Cannot add a
duplicate!
        at org.apache.hadoop.service.ServiceStateException.convert(ServiceStateException.java:59)
        at org.apache.hadoop.service.AbstractService.start(AbstractService.java:204)
        at org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.startActiveServices(ResourceManager.java:1014)
        at org.apache.hadoop.yarn.server.resourcemanager.ResourceManager$1.run(ResourceManager.java:1051)
        at org.apache.hadoop.yarn.server.resourcemanager.ResourceManager$1.run(ResourceManager.java:1047)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
        at org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.transitionToActive(ResourceManager.java:1047)
        at org.apache.hadoop.yarn.server.resourcemanager.AdminService.transitionToActive(AdminService.java:295)
        ... 5 more
Caused by: org.apache.hadoop.yarn.exceptions.YarnException:
Application with id application_1421115867116_0001 is already present!
Cannot add a duplicate!
        at org.apache.hadoop.yarn.ipc.RPCUtil.getRemoteException(RPCUtil.java:45)
        at org.apache.hadoop.yarn.server.resourcemanager.RMAppManager.createAndPopulateNewRMApp(RMAppManager.java:338)
        at org.apache.hadoop.yarn.server.resourcemanager.RMAppManager.recoverApplication(RMAppManager.java:309)
        at org.apache.hadoop.yarn.server.resourcemanager.RMAppManager.recover(RMAppManager.java:413)
        at org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.recover(ResourceManager.java:1207)
        at org.apache.hadoop.yarn.server.resourcemanager.ResourceManager$RMActiveServices.serviceStart(ResourceManager.java:590)
        at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)

Zookeeperに変なエントリが残ってる?と思われたのでRMを停止した上で/rmstore/ZKRMStateRoot/RMAppRoot/以下のApplicatoinIDのエントリを全て削除.

1
2
3
4
5
6
7
8
[zk: localhost:2181(CONNECTED) 2] rmr /rmstore/ZKRMStateRoot/RMAppRoot/application_1421387925857_0002
[zk: localhost:2181(CONNECTED) 3] rmr /rmstore/ZKRMStateRoot/RMAppRoot/application_1421115867116_0002
[zk: localhost:2181(CONNECTED) 4] rmr /rmstore/ZKRMStateRoot/RMAppRoot/application_1421115867116_0003
[zk: localhost:2181(CONNECTED) 5] rmr /rmstore/ZKRMStateRoot/RMAppRoot/application_1421115867116_0004
[zk: localhost:2181(CONNECTED) 6] rmr /rmstore/ZKRMStateRoot/RMAppRoot/application_1421320519530_0002
[zk: localhost:2181(CONNECTED) 7] rmr /rmstore/ZKRMStateRoot/RMAppRoot/application_1421115867116_0005
[zk: localhost:2181(CONNECTED) 8] rmr /rmstore/ZKRMStateRoot/RMAppRoot/application_1421320519530_0001
[zk: localhost:2181(CONNECTED) 11] rmr /rmstore/ZKRMStateRoot/RMAppRoot/application_1421387925857_0001

で、RMを起動したら復旧した.

どうやらYARN-2865の事象っぽい. Hadoop 2.7.0でFIX. ZK上のエントリを消してしまったので、その分のジョブについてはクライアントから再投入する必要がある.

HDP2.2をセットアップするためにハマった箇所のメモ

| Comments

HDP2.2を手元のVMで試しにセットアップしてみたが、色々ハマった部分があったのでメモ

環境

CentOS6.3のVMを7つ用意して、以下のようにHA含めて構成することにした.

  • master1: NameNode(active), ZKFC, JournalNode, Zookeeper
  • master2: NameNode(standby), ZKFC, JournalNode, ResourceManager(standby), Zookeeper
  • master3: JournalNode, ResourceManager(active), Zookeeper, HiveServer2, MySQL
  • slaves(3ノード): DataNode, NodeManager
  • client: MR/Tez client

ドキュメントは、こちらの”Installing HDP Manually”を使用. http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.2.0/HDP_Man_Install_v22/index.html

トラブルシュートなどのメモ

以下、ドキュメントには無いが変更しないといけなかったもの、引っかかったトラブルなど. 単に自分が手順を見落としていたり、間違っていたために発生したものもあるかも.

全般

  • 基本的に、設定はcompanion filesのものをベースにする. 2.1を動かしていた際の設定もあったが、大分変わっているようなので一旦companion filesのをまるごと持ってきた
  • インストールのベースが/usr/hdp/2.2.0.0-2041/になっているが、実際のスクリプトの中では/usr/lib/hadoop等を参照しているものもあるため、/usr/hdp/2.2.0.0-2041/hadoop -> /usr/hdp/2.2.0.0-2041/hadoop 等のようなシンボリックリンクをひと通り作成した.
  • Daemonの起動スクリプト類はhadoop-hdfs-namenode等のような別RPMになっている.これらのインストール先は/usr/hdp/2.2.0.0-2041/etc/となっているため、/etc/init.dの下などに、こちらもシンボリックリンクを作成した. ちなみに、/etc/defaultの下に置くファイルも用意されているが、initスクリプトをみてもこれらを読むようには見えない.
  • マニュアルにはcompanion filesに含まれる、 usersAndGroups.sh, directories.sh を設定した上で ~/.bash_profileでこれらのファイルを読む設定を入れるようにあるが、daemonの動作がbash_profileに依存するのが気持ち悪かったので、それはやってない. それに起因したトラブルもあるかも.

Zookeeperのセットアップ

  • initスクリプト内で呼ばれる、zookeeper-server,zookeeper-server-initialize/usr/hdp/2.2.0.0-2041/zookeeper/bin/にあるので、これらを/usr/bin下に置くよう、シンボリックリンクを作成した

service start の戻り値が3

また、停止に失敗したりする.

以下の様な感じ.

1
2
3
4
5
# service hadoop-yarn-resourcemanager start
Starting Hadoop resourcemanager:                           [  OK  ]
starting resourcemanager, logging to /var/log/hadoop/yarn/yarn-yarn-resourcemanager-hdp15.hadoop.local.out
[root@hdp15 init.d]# echo $?
3

initスクリプト内で以下のようにPIDFILEを設定しているが、環境変数が正しく設定されていないと、PIDFILEがうまく作られずにこのような状態になる.

1
PIDFILE="$HADOOP_PID_DIR/yarn-$YARN_IDENT_STRING-resourcemanager.pid"

yarn-env.shHADOOP_PID_DIR, YARN_IDENT_STRING, hadoop-env.shでもHADOOP_PID_DIRを設定するようにした.

HistoryServerでPermission Deninedが発生

以下の様なエラーが発生. (何をしようとして発生したのか忘れた…)

1
2
3
4
5
6
2014-12-27 03:15:05,824 ERROR hs.HistoryFileManager (HistoryFileManager.java:scanIfNeeded(285)) - Error while trying to scan the directory hdfs://hdpexperiment:
8020/mr-history/tmp/client
org.apache.hadoop.security.AccessControlException: Permission denied: user=mapred, access=READ_EXECUTE, inode="/mr-history/tmp/client":client:hdfs:drwxrwx---
        at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkFsPermission(FSPermissionChecker.java:271)
        at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:257)
        at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:185)

HDFSのパーミッションを見ると以下のようになっており、/mr-history/tmp/clientに対してmapredユーザのパーミッションが無い.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# sudo -u hdfs hdfs dfs -ls /
Found 6 items
drwxrwxrwt   - yarn   yarn            0 2014-12-27 03:11 /app-logs
drwxr-xr-x   - hdfs   hdfs            0 2014-12-27 02:43 /apps
drwxr-xr-x   - hdfs   hadoop          0 2014-12-26 16:17 /hdp
drwxr-xr-x   - mapred hdfs            0 2014-12-26 16:16 /mr-history
drwxrwxrwt   - hdfs   hdfs            0 2014-12-27 03:11 /tmp
drwxr-xr-x   - hdfs   hdfs            0 2014-12-27 02:33 /user
# sudo -u hdfs hdfs dfs -ls /mr-history
Found 2 items
drwxrwxrwt   - mapred hdfs          0 2014-12-26 16:16 /mr-history/done
drwxrwxrwt   - mapred hdfs          0 2014-12-26 23:27 /mr-history/tmp
# sudo -u hdfs hdfs dfs -ls /mr-history/tmp
Found 1 items
drwxrwx---   - client hdfs          0 2014-12-27 00:14 /mr-history/tmp/client

以下のように、/mr-history以下のgroupをmapredとすることで対応.

1
# sudo -u hdfs hdfs dfs -chgrp -R mapred /mr-history

MapReduceジョブ実行中のエラー. Slaveにつながらない

exampleのpiを実行した際のエラー

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
14/12/30 11:17:59 INFO ipc.Client: Retrying connect to server: hdp18.hadoop.local/10.29.254.69:39110. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=3, sleepTime=1000 MILLISECONDS)
14/12/30 11:18:00 INFO ipc.Client: Retrying connect to server: hdp18.hadoop.local/10.29.254.69:39110. Already tried 1 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=3, sleepTime=1000 MILLISECONDS)
14/12/30 11:18:01 INFO ipc.Client: Retrying connect to server: hdp18.hadoop.local/10.29.254.69:39110. Already tried 2 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=3, sleepTime=1000 MILLISECONDS)
14/12/30 11:18:29 INFO ipc.Client: Retrying connect to server: hdp17.hadoop.local/10.29.254.67:43296. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=3, sleepTime=1000 MILLISECONDS)
14/12/30 11:18:30 INFO ipc.Client: Retrying connect to server: hdp17.hadoop.local/10.29.254.67:43296. Already tried 1 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=3, sleepTime=1000 MILLISECONDS)
14/12/30 11:18:31 INFO ipc.Client: Retrying connect to server: hdp17.hadoop.local/10.29.254.67:43296. Already tried 2 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=3, sleepTime=1000 MILLISECONDS)
14/12/30 11:18:31 INFO mapreduce.Job: Job job_1419895534181_0002 failed with state FAILED due to: Application application_1419895534181_0002 failed 2 times due to AM Container for appattempt_1419895534181_0002_000002 exited with  exitCode: 255
For more detailed output, check application tracking page:http://hdp16.hadoop.local:8088/proxy/application_1419895534181_0002/Then, click on links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_1419895534181_0002_02_000001
Exit code: 255
Stack trace: ExitCodeException exitCode=255: 
        at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
        at org.apache.hadoop.util.Shell.run(Shell.java:455)
        at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
        at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
        at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
        at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)


Container exited with a non-zero exit code 255
Failing this attempt. Failing the application.
14/12/30 11:18:31 INFO mapreduce.Job: Counters: 0
Job Finished in 99.802 seconds
java.io.FileNotFoundException: File does not exist: hdfs://hdpexperiment/user/hdfs/QuasiMonteCarlo_1419905807487_296085847/out/reduce-out
        at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1122)
        at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1114)
        at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1114)
        at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1750)
        at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1774)
        at org.apache.hadoop.examples.QuasiMonteCarlo.estimatePi(QuasiMonteCarlo.java:314)
        at org.apache.hadoop.examples.QuasiMonteCarlo.run(QuasiMonteCarlo.java:354)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
        at org.apache.hadoop.examples.QuasiMonteCarlo.main(QuasiMonteCarlo.java:363)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.hadoop.util.ProgramDriver$ProgramDescription.invoke(ProgramDriver.java:71)
        at org.apache.hadoop.util.ProgramDriver.run(ProgramDriver.java:144)
        at org.apache.hadoop.examples.ExampleDriver.main(ExampleDriver.java:74)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
        at org.apache.hadoop.util.RunJar.main(RunJar.java:136)

yarn logsではログが見えなかったので、実行中のサーバでコンテナのログを見てみた. ${hdp.version}というのがそのままになっている.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
2014-12-30 13:08:49,568 FATAL [AsyncDispatcher event handler] org.apache.hadoop.yarn.event.AsyncDispatcher: Error in dispatcher thread
java.lang.IllegalArgumentException: Unable to parse '/hdp/apps/${hdp.version}/mapreduce/mapreduce.tar.gz#mr-framework' as a URI, check the setting for mapreduce.application.framework.path
        at org.apache.hadoop.mapreduce.v2.util.MRApps.getMRFrameworkName(MRApps.java:178)
        at org.apache.hadoop.mapreduce.v2.util.MRApps.setMRFrameworkClasspath(MRApps.java:203)
        at org.apache.hadoop.mapreduce.v2.util.MRApps.setClasspath(MRApps.java:248)
        at org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl.getInitialClasspath(TaskAttemptImpl.java:620)
        at org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl.createCommonContainerLaunchContext(TaskAttemptImpl.java:755)
        at org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl.createContainerLaunchContext(TaskAttemptImpl.java:812)
        at org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl$ContainerAssignedTransition.transition(TaskAttemptImpl.java:1527)
        at org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl$ContainerAssignedTransition.transition(TaskAttemptImpl.java:1504)
        at org.apache.hadoop.yarn.state.StateMachineFactory$SingleInternalArc.doTransition(StateMachineFactory.java:362)
        at org.apache.hadoop.yarn.state.StateMachineFactory.doTransition(StateMachineFactory.java:302)
        at org.apache.hadoop.yarn.state.StateMachineFactory.access$300(StateMachineFactory.java:46)
        at org.apache.hadoop.yarn.state.StateMachineFactory$InternalStateMachine.doTransition(StateMachineFactory.java:448)
        at org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl.handle(TaskAttemptImpl.java:1069)
        at org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl.handle(TaskAttemptImpl.java:145)
        at org.apache.hadoop.mapreduce.v2.app.MRAppMaster$TaskAttemptEventDispatcher.handle(MRAppMaster.java:1311)
        at org.apache.hadoop.mapreduce.v2.app.MRAppMaster$TaskAttemptEventDispatcher.handle(MRAppMaster.java:1303)
        at org.apache.hadoop.yarn.event.AsyncDispatcher.dispatch(AsyncDispatcher.java:173)
        at org.apache.hadoop.yarn.event.AsyncDispatcher$1.run(AsyncDispatcher.java:106)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.URISyntaxException: Illegal character in path at index 11: /hdp/apps/${hdp.version}/mapreduce/mapreduce.tar.gz#mr-framework
        at java.net.URI$Parser.fail(URI.java:2829)
        at java.net.URI$Parser.checkChars(URI.java:3002)
        at java.net.URI$Parser.parseHierarchical(URI.java:3086)
        at java.net.URI$Parser.parse(URI.java:3044)
        at java.net.URI.<init>(URI.java:595)
        at org.apache.hadoop.mapreduce.v2.util.MRApps.getMRFrameworkName(MRApps.java:176)
        ... 18 more

mapred-site.xmlで、${hdp.version}となっている部分を実際のバージョン番号(2.2.0.0-2041)に変えたら解消した.

Journalnodeの停止に失敗

journalnodeを停止すると、no journalnode to stopというエラーが発生. ただ、プロセスは停止している.

1
2
3
4
5
6
7
8
9
10
# service hadoop-hdfs-journalnode stop
Stopping Hadoop journalnode:                               [  OK  ]
no journalnode to stop
rm: cannot remove `/var/run/hadoop/hadoop-hdfs-journalnode.pid': Permission denied
# ls -l /var/run/hadoop/hadoop-hdfs-journalnode.pid
-rw-r--r-- 1 hdfs hdfs 6 12月 30 08:24 2014 /var/run/hadoop/hadoop-hdfs-journalnode.pid
# ps -ef | grep -i journal
root       374     2  0 Dec29 ?        00:00:02 [kjournald]
root       811     2  0 Dec29 ?        00:00:00 [kjournald]
root     14406 14342  0 14:15 pts/0    00:00:00 grep -i journal

ディレクトリのowner/groupがmapredになっている.

1
2
# ls -ld /var/run/hadoop
drwxr-xr-x 5 mapred mapred 4096 12月 30 14:16 2014 /var/run/hadoop

これをhdfs.hdfsにしたら事象は解消したが、HistoryServerをrestartしたら/var/run/hadoopのownerがmapred.mapredに戻ってしまった.

hadoop-mapreduce-historyserverのinitスクリプトにある、以下の部分のせい. つまり、historyserverとHDFS系のdaemonが同居している場合に発生する問題.

1
2
install -d -m 0755 -o mapred -g mapred $HADOOP_PID_DIR 1>/dev/null 2>&1 || :
[ -d "$LOCKDIR" ] || install -d -m 0755 $LOCKDIR 1>/dev/null 2>&1 || :

hadoop-env.shに以下の記述を入れ、HistoryServerのHADOOP_PID_DIRをHDFS系と分けることで対応することにした.

1
2
3
4
# For HistoryServer
if [ "${SVC_USER}" = "mapred" ]; then
  HADOOP_PID_DIR=/var/run/hadoop-mapreduce
fi

Hive CREATE TABLE時のNo privilege

beelineからhiveユーザで接続し、create tableを発行した際のエラー

1
2
0: jdbc:hive2://hdp16.hadoop.local:10000> create table test2(a int, b string); 
Error: Error while compiling statement: FAILED: SemanticException MetaException(message:No privilege 'Select' found for inputs { database:default}) (state=42000,code=40000)

Storage Based Authorization in the Metastore Serverに引っかかっている模様

一旦以下の設定を外して対応

1
2
3
4
5
  <property>
    <name>hive.metastore.pre.event.listeners</name>
    <value>org.apache.hadoop.hive.ql.security.authorization.AuthorizationPreEventListener</value>
    <description>List of comma separated listeners for metastore events.</description>
  </property>

Hive on MRでのクエリ実行エラー

HiveServer2のログには以下のメッセージが出ている.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
2015-01-06 03:14:42,300 ERROR [HiveServer2-Background-Pool: Thread-65]: exec.Task (SessionState.java:printError(833)) - Ended Job = job_1420474977406_0003 with
exception 'java.lang.NumberFormatException(For input string: "100000L")'
java.lang.NumberFormatException: For input string: "100000L"
        at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
        at java.lang.Long.parseLong(Long.java:441)
        at java.lang.Long.parseLong(Long.java:483)
        at org.apache.hadoop.conf.Configuration.getLong(Configuration.java:1189)
        at org.apache.hadoop.hive.conf.HiveConf.getLongVar(HiveConf.java:2253)
        at org.apache.hadoop.hive.ql.exec.mr.HadoopJobExecHelper.checkFatalErrors(HadoopJobExecHelper.java:209)
        at org.apache.hadoop.hive.ql.exec.mr.HadoopJobExecHelper.progress(HadoopJobExecHelper.java:308)
        at org.apache.hadoop.hive.ql.exec.mr.HadoopJobExecHelper.progress(HadoopJobExecHelper.java:547)
        at org.apache.hadoop.hive.ql.exec.mr.ExecDriver.execute(ExecDriver.java:435)
        at org.apache.hadoop.hive.ql.exec.mr.MapRedTask.execute(MapRedTask.java:137)
        at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:160)
        at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:85)
        at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1604)
        at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1364)
        at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1177)
        at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1004)
        at org.apache.hadoop.hive.ql.Driver.run(Driver.java:999)
        at org.apache.hive.service.cli.operation.SQLOperation.runQuery(SQLOperation.java:144)
        at org.apache.hive.service.cli.operation.SQLOperation.access$100(SQLOperation.java:69)
        at org.apache.hive.service.cli.operation.SQLOperation$1$1.run(SQLOperation.java:196)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
        at org.apache.hadoop.hive.shims.HadoopShimsSecure.doAs(HadoopShimsSecure.java:536)
        at org.apache.hive.service.cli.operation.SQLOperation$1.run(SQLOperation.java:208)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

AMBARI-8219 の事例に従い、hive-site.xmlの以下を変更したらOK

1
2
3
4
5
6
   <property>
     <name>hive.exec.max.created.files</name>
-    <value>100000L</value>
+    <value>100000</value>
     <description>Maximum number of HDFS files created by all mappers/reducers in a MapReduce job.</description>
   </property>

hive.execution.engine=tez でのHiveクエリ実行エラー1

HiveServer2のログは以下の通り

1
2
3
4
5
6
7
8
5841 end=1419741946024 duration=183 from=org.apache.hadoop.hive.ql.Driver>
2014-12-28 13:45:46,025 ERROR [HiveServer2-Handler-Pool: Thread-56]: thrift.ProcessFunction (ProcessFunction.java:process(41)) - Internal error processing ExecuteStatement
java.lang.NoClassDefFoundError: org/apache/tez/dag/api/SessionNotRunning
        at java.lang.Class.getDeclaredConstructors0(Native Method)
        at java.lang.Class.privateGetDeclaredConstructors(Class.java:2585)
        at java.lang.Class.getConstructor0(Class.java:2885)
        at java.lang.Class.newInstance(Class.java:350)
        at org.apache.hadoop.hive.ql.exec.TaskFactory.get(TaskFactory.java:133)

このクラスはtez-apiのjarに含まれているが、HiveServer2のサーバにTezクライアントをセットアップしていなかったのが原因だった. セットアップして解消.

Tez OrderedWordCountの実行エラー

Tezの動作確認として、tez-examplesに含まれる、OrderedWordCountを実行した際のエラー.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# su - hdfs
-bash-4.1$ cat /tmp/test.txt 
foo bar foo bar foo
$ hadoop fs -put /tmp/test.txt /tmp/test.txt 
-bash-4.1$ hadoop jar /usr/hdp/2.2.0.0-2041/tez/tez-examples-0.5.2.2.2.0.0-2041.jar orderedwordcount /tmp/test.txt /tmp/out
Running OrderedWordCount
14/12/27 02:54:57 INFO client.TezClient: Tez Client Version: [ component=tez-api, version=0.5.2.2.2.0.0-2041, revision=db32ad437885baf17ab90885b4ddb226fbbe3559, SCM-URL=scm:git:https://git-wip-us.apache.org/repos/asf/tez.git, buildTIme=20141119-1512 ]
14/12/27 02:54:59 INFO client.TezClient: Submitting DAG application with id: application_1419606523103_0010
14/12/27 02:54:59 INFO Configuration.deprecation: fs.default.name is deprecated. Instead, use fs.defaultFS
14/12/27 02:54:59 INFO client.TezClientUtils: Using tez.lib.uris value from configuration: hdfs://hdpexperiment/apps/tez/,hdfs://hdpexperiment/apps/tez/lib/,hdfs://hdpexperiment/hdp/apps/current/tez/tez.tar.gz
14/12/27 02:54:59 WARN client.TezClientUtils: Duplicate resource found, resourceName=tez.tar.gz, existingPath=scheme: "hdfs" host: "hdpexperiment" port: -1 file: "/apps/tez/lib/tez.tar.gz", newPath=hdfs://hdpexperiment/hdp/apps/current/tez/tez.tar.gz
14/12/27 02:54:59 INFO client.TezClient: Tez system stage directory hdfs://hdpexperiment/tmp/hdfs/staging/.tez/application_1419606523103_0010 doesn't exist and is created
14/12/27 02:55:00 INFO client.TezClient: Submitting DAG to YARN, applicationId=application_1419606523103_0010, dagName=OrderedWordCount
14/12/27 02:55:01 INFO impl.YarnClientImpl: Submitted application application_1419606523103_0010
14/12/27 02:55:01 INFO client.TezClient: The url to track the Tez AM: http://hdp16.hadoop.local:8088/proxy/application_1419606523103_0010/
14/12/27 02:55:01 INFO client.DAGClientImpl: Waiting for DAG to start running
14/12/27 02:55:13 INFO client.DAGClientImpl: DAG completed. FinalState=FAILED
OrderedWordCount failed with diagnostics: [Application application_1419606523103_0010 failed 2 times due to AM Container for appattempt_1419606523103_0010_000002 exited with  exitCode: 1
For more detailed output, check application tracking page:http://hdp16.hadoop.local:8088/proxy/application_1419606523103_0010/Then, click on links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_1419606523103_0010_02_000001
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
        at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
        at org.apache.hadoop.util.Shell.run(Shell.java:455)
        at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
        at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
        at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
        at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)


Container exited with a non-zero exit code 1
Failing this attempt. Failing the application.]

コンテナのログを見ると以下の通り. クラスが見つからない.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
Container: container_1419606523103_0009_01_000001 on hdp17.hadoop.local_45454
===============================================================================
LogType:stderr
Log Upload Time:27-12-2014 03:37:52
LogLength:1445
Log Contents:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/service/AbstractService
        at java.lang.ClassLoader.defineClass1(Native Method)
        at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
        at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
        at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
        at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
        at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:482)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.service.AbstractService
        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
        ... 13 more

このクラスは、hadoop-common.jarに入っており、yarn classpathで確認するとこのjarもCLASSPATHに含まれているのだが、、、

(1/9追記)どうやら、tez.lib.urisの問題だった模様.以下のように、HDFSに乗せたtez.tar.gzのパスを指定したらうまく動作した.

1
2
3
4
<property>
  <name>tez.lib.uris</name>
  <value>/hdp/apps/current/tez/tez.tar.gz</value>
</property>

まとめ

ということで、HDP2.2のクラスタを作ろうとして色々うまくいかなかったのでまとめてみた. あとはTezのエラーを解消したいなぁ. あと、マニュアルをブラウザで見るととても見づらいので、PDFをダウンロードして手元で見た方が良い.

Fluentd v0.12のAt-least-once Semanticsを試す

| Comments

Fluentd v0.12のin/out_forwardでAt-least-once semanticsがサポートされるようになった. 今まではアプリケーションレイヤでの到達確認がなかったので、一部のネットワーク障害などのケースでは、送信されたように見えて実は送信されていない、という事象が発生し得た. v0.12から導入されたrequire_ack_responseオプションを使うと、このような事象を避けることができる.

この機能が導入されたpull requestはこちら. https://github.com/fluent/fluentd/pull/428

ということで試してみた.

require_ack_responseがない場合

fluentd 0.10.56で試す. (0.12で試しても良かったのだけど..)

送信側は以下の設定. 相手先fluentdが早々にdetachされてしまうのを避けるため、hard_timeoutphi_thresholdを入れた

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<source>
   type forward
</source>
<match test.**>
   type forward
   flush_interval 1s
   heartbeat_type tcp
   hard_timeout 600
   phi_threshold 300
   buffer_type file
   buffer_path /var/log/fluentd.*.buffer
   <server>
     host  192.168.1.2
     port 24224
   </server>
</match>

受信側はこんな感じ

1
2
3
4
5
6
7
8
<source>
  type forward
</source>

<match test.**>
  type file
  path /tmp/fluentd_forward.log
</match>

で、パケットが届かないが、アプリケーションにエラーが返らない状況を作るため、受信側のiptablesでSYNが立っていないパケットをドロップするようにする. SYNは相手に到達し、SYN-ACKも返るため、アプリケーションからは正常に接続されている様に見えることになる.

1
2
# iptables -A INPUT -p tcp --syn --dport 24224 -j ACCEPT
# iptables -A INPUT -p tcp --dport 24224 -j DROP

これでログを送ってみる

1
2
# echo '{"aaa": 1}' | fluent-cat  test.data
# echo '{"bbb": 2}' | fluent-cat test.data

netstatで送信側のソケットを見る. Send-Qにデータが溜まっている.

1
2
3
4
5
# netstat -na | grep 24224
tcp        0      0 0.0.0.0:24224               0.0.0.0:*                   LISTEN
tcp        0      1 192.168.1.1:10652          192.168.1.2:24224          FIN_WAIT1
tcp        0     41 192.168.1.1:10655          192.168.1.2:24224          FIN_WAIT1
udp        0      0 0.0.0.0:24224               0.0.0.0:*

しばらくすると、ソケットが破棄される.

1
2
3
4
# netstat -na | grep 24224
tcp        0      0 0.0.0.0:24224               0.0.0.0:*                   LISTEN      
tcp        0      1 192.168.1.1:10664           192.168.1.2:24224          FIN_WAIT1   
udp        0      0 0.0.0.0:24224               0.0.0.0:*             

この状況だと、アプリケーション的には正常に送れているように見えてしまうので、バッファは削除される. つまりログがロストした状況.

1
2
# ls /var/log/fluentd.*.buffer
ls: cannot access /var/log/fluentd.*.buffer: そのようなファイルやディレクトリはありません

require_ack_responseを使う

次に、送受信共にv0.12.1にして、送信側にrequire_ack_responseの設定を入れてみる.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
  <source>
    type forward
  </source>
  <match test.**>
    type forward
    flush_interval 1s
    heartbeat_type tcp
    hard_timeout 600
    phi_threshold 300
    buffer_type file
    buffer_path /var/log/fluentd.*.buffer
    require_ack_response 
    <server>
      host 192.168.1.2
      port 24224
    </server>
  </match>

同様にfluent-catで送る. 今度は、一定時間後に以下のようにエラーになった.

1
2
3
4
5
6
7
8
9
10
11
2014-12-15 15:25:56 +0900 [warn]: no response from 192.168.1.2:24224. regard it as unavailable.
2014-12-15 15:26:56 +0900 [warn]: temporarily failed to flush the buffer. next_retry=2014-12-15 15:22:46 +0900 error_class="Fluent::ForwardOutputACKTimeoutError" error="node 10.29.254.66:24224 does not return ACK" plugin_id="object:16c7e3c"
  2014-12-15 15:26:56 +0900 [warn]: /usr/local/rvm/gems/ruby-2.1.5/gems/fluentd-0.12.1/lib/fluent/plugin/out_forward.rb:321:in `send_data'
  2014-12-15 15:26:56 +0900 [warn]: /usr/local/rvm/gems/ruby-2.1.5/gems/fluentd-0.12.1/lib/fluent/plugin/out_forward.rb:169:in `block in write_objects'
  2014-12-15 15:26:56 +0900 [warn]: /usr/local/rvm/gems/ruby-2.1.5/gems/fluentd-0.12.1/lib/fluent/plugin/out_forward.rb:163:in `times'
  2014-12-15 15:26:56 +0900 [warn]: /usr/local/rvm/gems/ruby-2.1.5/gems/fluentd-0.12.1/lib/fluent/plugin/out_forward.rb:163:in `write_objects'
  2014-12-15 15:26:56 +0900 [warn]: /usr/local/rvm/gems/ruby-2.1.5/gems/fluentd-0.12.1/lib/fluent/output.rb:459:in `write'
  2014-12-15 15:26:56 +0900 [warn]: /usr/local/rvm/gems/ruby-2.1.5/gems/fluentd-0.12.1/lib/fluent/buffer.rb:325:in `write_chunk'
  2014-12-15 15:26:56 +0900 [warn]: /usr/local/rvm/gems/ruby-2.1.5/gems/fluentd-0.12.1/lib/fluent/buffer.rb:304:in `pop'
  2014-12-15 15:26:56 +0900 [warn]: /usr/local/rvm/gems/ruby-2.1.5/gems/fluentd-0.12.1/lib/fluent/output.rb:320:in `try_flush'
  2014-12-15 15:26:56 +0900 [warn]: /usr/local/rvm/gems/ruby-2.1.5/gems/fluentd-0.12.1/lib/fluent/output.rb:140:in `run'

バッファも残っている

1
2
# ls /var/log/fluentd.test.data.*.buffer
/var/log/fluentd.test.data.b50a3b457dcfed028.buffer  /var/log/fluentd.test.data.q50a3b455b1eac4ca.buffer

しばらく放置した後、iptablesを解除したら無事に送信された.

まとめ

Fluentd v0.12で導入されたAt-least-once semanticsを試してみた. アプリケーションレイヤでの到達確認が実装されることで、TCPレイヤでパケットがうまく届いていないケースについても、fluentdがそれを検知して再送してくれることが確認できた.

ちなみに自分のところでは、ruby1.9上でfluentdを動かしていた時にプロセスが短時間ブロックするような事象が多発していて、それに起因してログのロストが発生したことがある. 恐らく、上記のようにTCPのコネクションは確立したように見えて、実は相手側がハング状態だったためにソケットバッファに滞留、最終的にソケットクローズ時にパケットが破棄されたのだと考えている. (この時は、td-agent2にしたら解消した)

require_ack_responseにより、そのようなケースでもfluentdがちゃんと検知して再送してくれるので、このオプションは是非入れておきたい.

Dockerを使い始めるための検討

| Comments

実環境でDockerを使い始めてみよう、と思うけど考えないといけないことが色々ありそう。どんなことを考える必要があるか、それぞれどうしていくのが良いか、考えてみる。

導入の目的

以下を目的にする。

  • アプリケーションのポータビリティ
    • 開発・テスト環境と同じイメージを本番でも利用することで、環境差異に起因するトラブルを減らす
    • サーバが増えた際に、簡単に同じアプリケーション環境をセットアップできるようにする
  • リソースの分離
    • 同じサーバに複数のアプリケーションが乗った際に、それぞれの環境やOSリソースを分離し、お互いに影響しないようにする

導入の対象は、まずはデータをストアしないjavaアプリケーションサーバとする.

考える事

いざ、使ってみようと思うと以下のような点が悩ましくなってきた。

  • イメージを作る単位
  • 既存の構成管理ツール(Ansible)との連携
  • ネットワークをどうする?外部からどう接続するか
  • 起動先の環境に依存する部分をどう吸収するか

考えてみる

さて、どうしようか

イメージを作る単位

イメージの差分管理ができる、とは言え何か変わる度に一直線に更新をしていくのもなんか違う気がする.

以下のようにレイヤを分けて作るのが管理しやすいのでは、と考えた.

  • baseイメージ
    • OSの基本パッケージと、構成管理ツールをインストールしたイメージ.
  • ミドルウェアイメージ
    • baseイメージを元にビルドする. アプリケーションデプロイの手前までの環境を提供する
    • 割と複雑になりそうなので、Dockerfile内からAnsible playbookを呼び出すことで実行する
  • アプリケーションイメージ
    • ミドルウェアイメージに対して、アプリケーションをデプロイして使える状態にしたイメージ

新しいアプリケーションをデプロイする際は、最新のミドルウェアイメージを利用して新しいアプリケーションイメージを作成する. もしミドルウェアの設定変更などが発生した場合は、ミドルウェアイメージを再作成し、それを元に再度アプリケーションイメージを作成する.

それぞれのレイヤごとに更新の頻度も違うし、更新を入れる担当も違うのでこのように分けて、それぞれのレイヤでバージョンを管理していくのが良さそう.

既存の構成管理ツールとの連携

Ansible playbookの資産があるので、再利用したい. また、Dockerfileは基本コマンドを羅列するだけなので、ある程度複雑になってくると辛い. Dockerfileだけで環境を作ることもできるが、メンテナンス性や再利用性を考えると構成管理ツールは必要だと思う.

そこで、DockerfileからAnsible playbookを実行して環境を構築することにする.

通常ansible-playbookはSSH経由で実行するが、そのためにコンテナの中でsshdを上げるのも。。。と思ってたら、SSHを使わなくても実行できるらしい. Local Playbooks

playbookをDockerfile内でADDするか、VOLUMEに置くかしてコンテナから見えるようにすれば、コンテナに対するplaybookの適用はできそう.

ネットワークをどうする?

普通にコンテナを起動すると、Dockerホスト内のプライベートネットワーク空間に入るので、外部からどう接続するかを考える必要がある.

ドキュメントによると、ネットワークのオプションもいくつかある.

  • --net=bridge(デフォルト)
    • Dockerホストが提供する仮想ブリッジに接続される. 外部にポートを公開するにはexposeする必要がある
  • --net=container
    • 起動済みコンテナのネットワークスタックを再利用する. ネットワークスタックを共用しているコンテナ同士は、localhostで通信できる. 外部から繋ぐには、やはりexposeが必要
  • --net=host
    • ホストのネットワークスタックをそのまま利用する. 普通にホストの中にプロセスが起動しているのと同じ状態になる.

IBMの検証結果や、こちらによると、--net=bridgeだとNATのオーバーヘッドにより--net=hostに対して20%程度性能が落ちる場合があるらしい.

ただ、--net=hostだと同一ホストで複数コンテナ起動する場合にはポートがバッティングしないように何かしら工夫する必要がある. --net=bridgeだとdocker runするときにオプションでマッピングを変えればいいので、そっちの方が扱いやすいか. これは、とりあえず両方試してみて後で決める.

外部からの接続方法については、Kubernetesとか、serf+HAProxy/nginxとか、カッコイイ方法はありそうだけど、とりあえずは手で接続元やHAProxy等の設定を変更することにする. 自動化は後.

起動先の環境に依存する部分をどう吸収するか

実行環境がコンテナ内に固められている、とは言えやはり環境に依存する部分はあり得る. アプリケーションサーバから接続する先のDBとか、ホストのメモリに応じてコンテナ内のjavaアプリケーションサーバのヒープサイズを変えたいとか、自ホストのIPアドレスが設定ファイルに書かれている場合とか. 調べてみると、docker run-eオプションを付けて環境変数を渡すことはできそう. すると、環境変数を元に関連する部分を書き換えた上でアプリケーションサーバを起動するようなラッパースクリプトを作り、DockerfileのCMDでコンテナ起動時に実行するようにする必要がある.

以下が参考になりそう

他には?

ログをどうするとか、モニタリングとか、他にも色々考えることはありそうだけど、とりあえずここまで.

Getting Stated With OpenShiftを読んだ

| Comments

Getting Stated with OpenShiftを読んだので、簡単にまとめ. とりあえず4章まで.

この本について

OpenShiftの管理者ではなく、Webアプリケーション開発者向けの本. OpenShift Onlineを使って、どのようにWebアプリケーションを動かすことができるか、ということが書いてある

1. Introduction

  • OpenShiftとは?
    • RedHatが提供するPaaS
  • 3つのバージョンがある
    • OpenShift Origin : オープンソースであり、最新版. 自分の環境に入れて使うことができる. OnlineやEnterpriseのUpstreamとなる.
    • OpenShift Online : RedHatが提供するクラウドサービス版. AWS上で動いており、アカウントを作ればOpenShiftの環境を使うことができる. 本書はこれを対象に書かれている.
    • OpenShift Enterprise : Productionでの利用を想定し、RedHatによりサポートされる安定版. 自分でインストールして使う.
  • 基本的な用語
    • Application : OpenShift上で動かすWebアプリケーション
    • Gear : サーバコンテナ. 使えるリソースに応じて、small, medium, largeの3タイプがある
    • Cartridge: Gearに追加する機能の固まり. JBossとか、Pythonとか、DatabaseとかCronとか.

2. Creating Application

  • OpenShift Onlineを使うための流れは以下の通り
    • アカウントを作る
    • コマンドラインツール(rhc)を入れる. rhcはrubygemなので、gem install rhcで入れることができる
    • コマンドラインツールをセットアップする. rhc setupで行う. OpenShiftのAPIをコマンドラインから使うための認証情報などを登録する.
  • 以後、OpenShift上のアプリケーション管理はrhcコマンドで行う
  • OpenShiftのアプリケーションを作るにはrhc app create insultapp python-2.7のようにする. 引数は、アプリケーション名と言語.
    • 実行すると、アプリケーションにアクセスするためのURLや、GearにSSHログインするためのユーザ名@ホスト名が表示される
    • また、カレントディレクトリにGitリポジトリが作成される. このリポジトリ内にアプリケーションのコードや色々な設定が格納される.
  • アプリケーションを作成する際に、 -gオプションを付けるとautoscaling機能が有効になる. 有効にすると、HAProxyがセットアップされ、アプリケーションがスケールアウトできるようになる
    • 本書では簡略化のためautoscalingは使っていないが、有効にしておいた方が良い
  • smallのgearであれば無償で使える. 無償版の場合、48時間リクエストが無いとアプリケーションは一旦ディスクに書かれ、次回リクエスト時に再ロードされる (応答に時間がかかる)
    • お金を払えば、gearを増やしたり、より大きなgearを使ったり、ディスク容量を拡張したり、JBoss EAP等の有償Cartridgeを使ったり、サポートチケットを発行したり、などなどができる.

3. Making Code Modifications

  • アプリケーション作成時に作成されたGitリポジトリ内にコードを書いて、git pushするとデプロイされる
  • .openshift/action_hooksディレクトリ内にファイルを作成しておくことで、デプロイの特定のタイミングで任意の処理を実行することができる. (action hook script)
  • 通常、アプリケーションのデプロイ時にアプリケーションは一旦停止する. .openshift/markers/hot_deploy ファイルを作成しておくと、ホットデプロイが有効になる.

4. Adding Application Components

  • Cartridgeを追加することで、アプリケーションに様々な機能を追加することができる(rhc cartridge add <cartridge名>). この章では以下のCartridgeが紹介されている.
  • Database
    • PostgreSQL, MySQL, MongoDB等のデータベースを使うことができる
    • アプリケーションがscalableでなければ同じgearに、そうでなければDatabase専用gearが作成され、その中にインストールされる
    • 追加時に、DBへアクセスするためのアカウントやURL等が表示される
  • Cron
    • cronを使うことができる
    • git repositoryのopenshift/cron/{minutely,hourly,daily,weekly,monthly}/以下にファイルを作成し、git pushすると有効になる
  • Continuous Integration (jenkins)
    • まず、rhc app createでJenkins用のアプリケーションを作成する
    • その上で、元のアプリケーションにrhc cartridge addでJenkinsのクライアントをCartridgeとして追加.
    • これにより、アプリケーションをpushした際にJenkins上でビルドやテストが走るようになる. ビルドが失敗した場合、アプリケーションはデプロイされない
  • Metrics and Monitoring
    • rhc cartridge add metrics-0.1 -a appnameのようにすることで、アプリケーションのモニタリング画面が追加される. CPUやメモリの状況等を確認可能
  • その他にも、コミュニティで開発された多数のCartridgeが存在し、利用することができる.

第6回Elasticsearch勉強会

| Comments

第6回Elasticsearch勉強会に参加したので、記憶が新しいうちにメモ. 内容は資料を見れば分かると思うので、個人的に特に記憶に残ったポイントと、感想のみ.

Aggregationあれこれ @johtani 氏

  • ElasticsearchのAggregation機能を使うと、SQLで言う集計 group by 的なものができると、という話
  • 動きとしては、各shard内でaggregateし、最後にその結果を集約する、という形. 普通のqueryと変わらない
  • クエリの種類によっては、正確な値ではなく近似値だったりする
  • Field Collapsingという、URLごとにtop Nを検索するようなこともできる
  • 個人的な感想
    • 便利な機能なので、是非使いたい. 現状だとKibana3で使えないのがネックだけど、Kibana4だと使えるようになるらしいので期待

秒間3万の広告配信ログをElasticSearchで リアルタイム集計してきた戦いの記録 @satully 氏

  • 資料
  • 商用での利用事例、という意味ですごく興味深かった
  • DSPの配信システムで利用. 最大秒間30000ログ. 1.5TB/日くらい
  • 各サーバのログ –> fluentd –> Elasticsearch –> MySQL/Redis という構成
  • fluentdは10インスタンス
  • ElasticsearchはCoordinate:2ノード, Search:2ノード, Data:28ノード.全てAWSのr3.largeインスタンス. メモリは30GBくらい?
    • diskは途中でSSDに変えた
    • CoordinateとSearchを分離する、というのは世間で見かけるけど、実際に負荷をみると必要性が疑問
    • 1日1index, 1index12shard, 1レプリ.
  • 単純なログのinsertのみではなく、Bidと紐付けるべきログが来ると、元のBidのログに項目を追加する. fluentd pluginとして実装
  • 運用ツールとしては、Elasticsearchのpluginとしてhead, bigdesk, ElasticHQ. 死活監視やfluentdのメトリックにはZABBIX
  • 個人的な感想
    • サーバスペック、台数、流量の具体的な事例としてとても参考になった
    • indexサイズはかなり大きくなるけど、いけるものなんだ.
    • お金の計算に関わる部分に、fluentdとかElasticsearchとか使っている、というのに驚いた

Elasticsearch 日本語スキーマレス環境構築と、ついでに多言語対応 @9215 氏

  • 資料
  • スキーマをちゃんと設計して、それに合わせてマッピング定義するのは面倒. どんなフィールドでどんなマッピングが最適、とか個別に設計すると人材がスケールしない.
  • そこで、dynamic templateとindex templateを使い、ノウハウが必要な部分はtemplateに集約、各利用者(データを投入する人)にはフィールド名のネーミングルールを周知する、というアプローチにした
  • 例えば、利用する言語を”language”というフィールドの値として設定し、template側でそれに合わせて適切なanalyzerを定義しておくことで、多言語に対してもうまくindexできる.
  • 個人的な感想
    • 確かに、どんな型にして、どんなanalyzerにして、とかはノウハウの部分になるので、それをtemplateに集約するのはとても良いアプローチだと思った

elasticsearchソースコードを読みはじめてみた @furandon_pig 氏

  • Elasticsearchのソースコードを読み始めてみた
  • 開発者の人には、はじめにREST APIを受けて検索する部分の動作をみてみるといいよ、と言われたけど、それがどこか分からなかったので、起動の部分を追いかけてみた話
  • 個人的な感想
    • ちょうど、そろそろソースみないと、と思っていたのでそのとっかかりとして大変参考になった

(LT)Elasticsearch RerouteAPIを使ったシャード配置の制御 @pisatoshi 氏

  • 資料
  • Reroute APIを使って、具体的にどのシャードをどこに配置するとかの制御ができるよ、という話
  • リバランスを有効にしていると、APIでシャードを別のノードに配置しても、リバランスされてしまうので注意
  • 個人的な感想
    • オチがとてもおもしろかった.
    • Reroute APIって、制御できるのは良いけど、逆にいちいち制御するのはやってられないし、どーいう場面で使うのだろう、と思った.

(LT)検索のダウンタイム0でバックアップからIndexをリストアする方法 @k_bigwheel 氏

  • 資料
  • indexのスナップショットを取れるようになったけど、普通にやるとリストア対象のindexを一旦closeしないといけない. サービスの継続を考えると、ダウンタイム無しでやりたい、という話
  • 予めクライアントからはalias経由でアクセスするようにしておく. aliasの切り替えはatomicにできるので、それを利用して、リストア完了後にスッパリ切り替える、という形にする
  • 個人的な感想
    • リストアだけでなく、mappingを変えるためにindexしなおすとか、aliasを使うと便利な場面はありそう
    • しかし、自分の環境はfluentdで”name-YYYY.MM.DD”の形でindexを作っているので、この場合aliasはどうしたらいいんだろう

全体所感

  • 最近、Elasticsearchを真面目に使い始めてみたので、どのトークも大変参考になった. 特に、 @satully 氏の話は良かった
  • 結構、みんなAWSなんだな
  • 懇親会で、発表についてのもう少し詳しい話とか、他の人がどんな感じに使ってるとか聞けたので、そっちも良かった. しかも参加費無料. 素晴らしい.
  • 主催の@johtaniさん、会場提供のリクルートテクノロジーズさんに感謝.

Fluentdの障害時動作

| Comments

Fluentdが障害の時にどのような動作をするのか調べてみたので、そのメモ. td-agent 1.1.17(fluentd v0.10.39)で確認したつもりだが、もしかしたらもう少し新しいので確認したケースもあるかも. BufferedOutputを中心に記載している.

BufferdOutputの基本

fluentdの特徴の一つとして、fluentd送信先で障害があり、メッセージが送れなかった場合は大抵(BufferedOutputを使っているプラグインであれば)fluentdでバッファリングし、一定時間後に再送してくれる.

このバッファリングのサイズは、BufferedOutputプラグインのbuffer_chunk_limit*buffer_queue_limitで決まる.

これらのデフォルト値は以下に解説付きでまとまっている. (良く参照させて頂いています) FluentdでバッファつきOutputPluginを使うときのデフォルト値

何回くらいリトライするの? リトライの間隔は?

リトライの回数は、retry_limit(デフォルト 17)で指定された回数まで. 間隔は一定ではなく、段々延びていく. 具体的に間隔を計算してるのは、BufferedOutput#calc_retry_wait.

リトライ間隔は、以下のパラメータでコントロールすることができる

  • max_retry_wait(デフォルト: nil = 上限なし)
  • retry_wait (デフォルト: 1.0)

同じメソッドを使って、実際にどれくらいになるのかを計算させてみた. 以下で例えば、1=>2は、一度送信に失敗してから、2回目の送信を試みるまでという意味. 単位は秒. 全てデフォルト値だと、以下の様な感じ. 最大だと、33765秒=9時間22分になる.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1=>2 : 1.0799124443510373
2=>3 : 1.8141315928054327
3=>4 : 3.5115188260172046
4=>5 : 7.106397160810471
5=>6 : 14.175590112052593
6=>7 : 31.434005639868758
7=>8 : 68.4743252224448
8=>9 : 116.47949944913451
9=>10 : 279.97276701667636
10=>11 : 487.69976826480445
11=>12 : 909.7729519328531
12=>13 : 2125.0559803853725
13=>14 : 3717.0255349933364
14=>15 : 8658.913465429461
15=>16 : 18189.354025481873
16=>17 : 33765.98470398931

例えば、max_retry_wait=120とすると、以下のようになる. 何回リトライしても、リトライ間隔の上限はmax_retry_waitまでになる.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1=>2 : 1.0717022666140232
2=>3 : 1.9866738239982864
3=>4 : 3.9258714996769903
4=>5 : 7.002702902759963
5=>6 : 15.817343449261045
6=>7 : 34.49173945537066
7=>8 : 65.98469012616731
8=>9 : 120
9=>10 : 120
10=>11 : 120
11=>12 : 120
12=>13 : 120
13=>14 : 120
14=>15 : 120
15=>16 : 120
16=>17 : 120

retry_waitを半分の0.5にすると、全てのリトライ間隔が半分になる.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1=>2 : 0.46442639898905974
2=>3 : 0.9688421553729557
3=>4 : 2.2291735347851613
4=>5 : 3.545406346443683
5=>6 : 7.824124603156501
6=>7 : 17.564462446502926
7=>8 : 30.97024814321994
8=>9 : 71.84343582620227
9=>10 : 127.87010583643446
10=>11 : 286.751861977861
11=>12 : 551.32668884554
12=>13 : 1077.2785515357239
13=>14 : 2095.196745718026
14=>15 : 3995.080966184667
15=>16 : 9131.408473518048
16=>17 : 16810.484835714517

リトライの頻度を増やす(リトライ間隔を減らす)場合は、併せてretry_limitも変更しないと、早々にリトライアウトしてしまう、ということになるので注意.

リトライ回数が超過したら?

リトライ回数を超過した場合、secodaryディレクティブを指定しておけば、そちらに出力される. 通常は、ファイルに出力しておいて、後からリカバリに使う、というケースが多いと思う.

1
2
3
4
  <secondary>
    type file
    path /path/to/forward-failed
  </secondary>

このようなケースでは、ログに以下のように出力される

1
2014-06-22 07:06:40 +0900 [warn]: fluent/output.rb:352:rescue in try_flush: retry count exceededs limit. falling back to secondary output.

キューが溢れたら?

キュー(バッファ)が溢れると、fluentdのログに以下のようなメッセージが出る.

1
2014-05-25 11:30:23 +0900 [warn]: fluent/engine.rb:149:rescue in emit_stream: emit transaction failed  error_class=Fluent::BufferQueueLimitError error=#<Fluent::BufferQueueLimitError: queue size exceeds limit>

この場合、inputプラグインがEngine.emitを実行する際にExceptionが発生する. プラグインが、これをrescueしていない場合、inputプラグインは停止する. rescueしている場合はinputプラグインの実装次第だが、大抵Exceptionは無視されてemitしたデータは破棄される. (既に溜めるためのキューがあふれているので、それしか無い)

送信先が復活したら?

再送中に送信先が復活し、再送に成功した場合は以下の様なメッセージが出力される.

1
2014-05-25 11:33:01 +0900 [warn]: fluent/output.rb:312:try_flush: retry succeeded. instance=70365422937420

ここで、注意点として送信先が復活してもすぐに再送してくれるわけではない. これは、送信先とのハートビートを行っているout_forwardでも一緒. BufferedOutput#try_flushのコードを見ると分かるが、リトライ中で、まだ次のリトライ時刻に達していない場合は、送信は行わない.

なので、retryを繰り返して再送間隔が延びている場合は、次の再送タイミングになるまでキューが溜まり続ける(もしくは、既に溢れている場合は溢れ続ける)

キューを強制的に送信することはできないの?

v0.10.59以前の場合 → リトライ中の場合以外であれば、fluentdのプロセスにSIGUSR1を送ることでキューが吐き出される. リトライ中の場合は、次のリトライタイミングまでは送信されない.全てのキューを吐き出すには、プロセスを停止するしかない.

v0.10.59以降の場合 → リトライ中の場合含め、fluentdのプロセスにSIGUSR1を送ることでキューが吐き出される. (2014/4/4追記: @sonotsさんに頂いたコメントを反映しました)

プロセス停止時の挙動は使用しているバッファプラグインによって異なるが

  • buf_memoryの場合
    • プロセス停止時に全てのキューが吐き出される.
  • buf_fileの場合
    • flush_at_shutdownがtrue(デフォルト false)の場合のみ、プロセス停止時に全てのキューが吐き出される.

長くなったのでここまで.

Puppetを使っていて思ったことなどをまとめてみる

| Comments

この記事について

puppetを初めて触ってから3,4年くらい経ったが、その間に思ったことなどをまとめてみる. なぜpuppetか、何に使っているか、使っていて思ったこと、課題など.

利用状況を簡単に

こんな環境で使ってます.

puppetの使用環境

CentOS 5 and 6で、puppetは2.7.21. 大体4つくらいの独立した(ネットワーク的に分断された)環境で、サーバ台数は合計百数十台. サーバ上で稼働する(した)ソフトウェアはHadoop, fluentd, MongoDB, Cassandra, MySQL, Munin等.

何に使っているか

上記ソフトウェアの環境は基本的にpuppetで構築している.が、puppetで完結できていることだけではないので、それは後述

使っていて思ったこと

今まで使っていて思ったことなどを書いてみる.

puppetで何が良いか

puppetだけではなく、chef等でも一緒だと思うけど、この手のソフトウェアを使わない場合だと、シェルスクリプトで頑張るか、VMのイメージをコピーするか、になると思う. それぞれに対して比較すると

  • シェルスクリプト等でやる場合
    • 色々全部自分で書かないといけないのでしんどい. 単一の環境、初回構築なら良いけど、ちょっと違う環境を作るとか、一旦作った環境に変更を加えるとか. 規模が大きくなってくると、モジュール化したり、過去に作ったものを再利用したくなるけど、その点でもつらい.
  • VMのイメージコピーの場合
    • 既に構築済の環境に対する変更をどう適用するか、環境の変更履歴をどう管理するか、というところが課題.

puppetは、”このサーバはこうあるべき”と定義すれば、前の状態がどうであってもその状態にしてくれるので、いちいちチェックして、期待と違ったら処理をして、みたいなことを書かなくて済む. また、クラス、モジュール、等再利用性を促すような言語の機能もあるので、うまく書けば既存のコードを流用しつつ、新しい要件に対応するようにマニフェストを変更していくようなこともやりやすい.

ただ、実はそれだけではなくてInfrastructure as Code, サーバ環境がコードで表現できるようになり、アプリケーションのコードと同じように変更管理、レビュー、テスト、デプロイができるようになるのが大きいと思う. このあたりはnaoyaさんの記事に詳しく書かれている.

puppetの使いどころ

サーバ構築のうち、何をpuppetでやるべきか?サーバ構築はBootstrapping, Configuration, Orchestrationの3つのレイヤに分けて考えられることが多いが、この中だとConfigurationの部分をpuppetでやることが有効

つまり、

  • Bootstrappingに相当する部分、OSインストール、ネットワーク設定、puppet自体のインストール、はやらない(やれない)
  • Orchestrationに相当する部分、複数台の協調操作が必要な操作もやらない. 今はここはfabric(局所的にserf)でやっている.
  • その他、puppetが動作可能になった後のconfigurationで、単一サーバ内で完結するようなものは全てpuppetで行う.

という感じにしている.

マニフェストのいい感じの書き方

いい感じ、というのは読みやすく、再利用しやすい、ということ. これは、使い始めてからずっと試行錯誤しているところ. そのための要素としては、以下があると思う.

  1. モジュールの書き方
  2. モジュールを組み合わせて、実際のサーバに適用するマニフェストにする部分をどう書くか
  3. 環境依存する部分をどのように切り出すか

まず、1について. モジュールはライブラリのように環境が変わっても再利用可能なものであるべき. どのような書き方が良いか、というのはこのドキュメントに書かれている. puppetlabs公式のntpモジュールが、お手本としては良いらしい.

2については、以下で紹介されている、RoleとProfileという考え方を導入するのが良い.

moduleを複数まとめてprofileが構成され、さらに複数profileをまとめてroleが構成される. node(=物理ホスト)は、一つのroleに紐付けられる、という形になっている. 例えば、profile::webserverにはhttpdとphpが必要で、role::www::devは、profile::webserverprofile::databaseが含まれる、みたいな形に書く. ただ、自分はまだroleの必要性がしっくりきてないので、profileとroleを分けていない.

3については、hieraやENCを使ってあるサーバに適用するクラス(role/profile)や、そのパラメータをマニフェストの外に切り出す、というのが定番らしい. ただ、自分はまだこれらは導入していなくて、以下のように変数をまとめたクラスを作って、それを呼び出すときの引数で環境を切り替える、という風にしている. 今後はhiera or ENCに移行していきたい.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class base_env($envname="") {
  case $envname {
    "testenv" :{
      $reposrv = "192.168.1.1"
      $repourl = "http://$reposrv/Lang/Ruby/"
    }
    default: {
      $masternode="192.168.1.2"
      $reposrv = "example.deploy.local"
      $repourl = "http://$reposrv/Lang/Ruby/"
      $gfurl="http://$masternode:5125"
      $munin_node=$masternode
      $serf_join = ['192.168.1.1','192.168.1.2']
      $munin_allow='^192\.168\..*$'

    }
  }
}
1
2
3
4
5
6
class profile::base($envname="") {
  class{"base_env":
    envname => $envname
  }
  ...
}
1
2
3
4
5
node /hadooptest\.hadoop.local/ {
   class{"profile::base":
     envname => "testenv"
   }
}

puppetmasterについて

つい最近まではpuppetmasterを使っていた. ただ、以下の問題があった.

  • 新しくサーバを構築するときにpuppetmasterが無ければ立てる必要がある
  • 証明書が面倒. 自動署名にはしてるけど、同じホスト名でサーバの再構築することもそれなりにあったので、その度に証明書のクリアが必要. クリアしてなくてうまく同期できない、というのも良くあった.

なので、最近はgitリポジトリにmanifestを配置して、各サーバでgit clone. という風にしている. マニフェストを更新した時は、fabricを使って並列にgit pull & puppet applyする. 証明書の問題から開放されたし、サーバ台数が増えてもgitリポジトリを複数にすればスケールするし、gitが無くてもmanifestだけコピーすれば環境作れるし、でとても楽になった.

今後の課題

今はできていないので、これから取り組みたいこと.

テスト

上記の通り、puppet化、というかコード化するメリットとしてCI的なものは外せないと思うのだけど、まだできていない. 幸い先人の事例があるので、こちらを真似しながら環境はできた. ただ、まだ実験的な段階で、プロセスとして固まってない. Dockerだけでなく実機の環境もあるので、Dockerでテストして、通ったものは即検証環境に自動的にデプロイして、みたいな形にしたい.

外部モジュール(Puppet Forge)の利用

puppetを始めた当初は、公開されているモジュールがあると言っても今一つやりたいことにフィットしなくて、全て自分で書いていたのだけど、最近は公開されているものを有効活用した方が良い気がしてきた. 実際serfとかsensuとかのモジュールを使ってみたが、全然変更の必要が無く使えたので、これからは外部のものも使っていきたい. ただ、自分のいる環境はInternetにつながらないのでpuppet moduleコマンドは使えず、どうやって配布するかを考えないといけない. (自前リポジトリとか立てられるのかな…)

最後に

最近Infrastructure as Codeという言葉も良く聞かれるようになったし、自分も今までpuppetを使ってて思ったことを一度まとめてみようと思って書いてみた. しかし、puppetってやっぱり日本語の記事が少ない. 入門的な記事はあるのだけど、どう使ってるとか、何に困ったとか、どう工夫してるとかの情報がもっとあると良いなぁ. Qiitaでそれぞれのタグを見てみたら、Chefの投稿が242に対して、Puppetは11でした…