Tech Notes

FluentdのFilter&Label周りのコードを読んだメモ

| Comments

Fluentd v0.12の目玉機能としてFilterとLabelがある. この機能の導入にあたってはメッセージのルーティングを行う部分のコードがガラリと変わっているはずなので、興味本位で読んでみた.

機能についての参考文書

そもそもFilterやLabelって何?というあたりは以下が参考になる。

v0.10ではどうだったか

Matchクラスがmatchディレクティブで宣言されたタグのパターンと、行き先のOutputクラスを保持していて、EngineClass#emit (最終的な呼び出し先はemit_stream)で該当するMatchを探し出し、そこに向けてemitする、という形だった.

1
2
3
4
5
6
7
8
9
10
11
12
13
  def emit_stream(tag, es)
      target = @match_cache[tag]
      unless target
        target = match(tag) || NoMatchMatch.new
        # this is not thread-safe but inconsistency doesn't
        # cause serious problems while locking causes.
        if @match_cache_keys.size >= MATCH_CACHE_SIZE
          @match_cache.delete @match_cache_keys.shift
        end
        @match_cache[tag] = target
        @match_cache_keys << tag
      end
      target.emit(tag, es)

なので、ルーティングを管理するテーブルはEngineClass@matches一つだったし、tagがルーティングのキーになっていた. 複数のOutputで順番に処理したい場合は都度tagを書き換えていく必要があった.

v0.12の場合

Agent, RootAgent, Label, EventRouterと言った新しいクラスが導入されている.

  • Label
    • labelディレクティブの中に存在するFilterおよびOutputプラグインを管理するクラス
  • RootAgent
    • labelディレクティブに属さない(設定ファイルのルート直下にある)Input, Filter, Outputプラグイン、および各Labelクラスを管理するクラス
  • Agent
    • RootAgentおよびLabelの親クラス.
  • EventRouter
    • ルーティングのためのルール(どのタグパターンに対して、どのようなfilterやmatchが存在するか)を管理し、イベントのルーティングを行うクラス

RootAgentおよびLabelのインスタンスは、それぞれ自分自身が管理する範囲のルーティングを行うEventRouterクラスのインスタンスを保持している.

以下、実際のコードを見てみる.

起動部分

まずは、configurationを読み込んでいく段階でどのようなクラスが生成されていくのかを見てみる.

Supervisor#start (init_engine)

Supervisor#start

Supervisorが起動して、色々と準備していく部分. init_engine内で Engine#init が呼ばれ、ここでRootAgentが生成される.

RootAgentの親クラスであるAgentのコンストラクタは以下のようになっている

Agent#initialize

1
2
3
4
5
6
7
8
9
10
11
12
13
def initialize(opts = {})
  super()

  @context = nil
  @outputs = []
  @filters = []
  @started_outputs = []
  @started_filters = []

  @log = Engine.log
  @event_router = EventRouter.new(NoMatchMatch.new(log), self)
  @error_collector = nil
end

自身が管理するOutputクラス、Filterクラス達を保持するための変数が存在している. また、そのスコープでのルーティングを行うEventRouterクラスをここで生成している.

RootAgentについては、これに加えて更にInputやLabelクラスも管理するような構造になっている. (root_agent.rb)

Supervisor#start (run_configure)

Supervisor#run_conigureが呼ばれると、Engine#configureを経由してRootAgent#configureが呼ばれる.

RootAgent#configure

ちょっと長いが引用.これにより以下が行われる

  • labelディレクティブがあった場合
    • add_labelにより新規Labelオブジェクトを生成
    • さらに、そのLabelオブジェクトのconfigureを呼び出す. configureの内容については、以下のAgent#configureを参照.
  • sourceディレクティブがあった場合
    • add_sourceによりInputプラグインのインスタンスを生成
    • Inputプラグインがemitする際の投げ先として、以下を登録.
      • そのInputプラグインで@labelが設定されている場合→設定されたLabelオブジェクトのEventRouterを登録
      • それ以外の場合→RootAgentEventRouterを登録

Inputプラグインについては@labelが設定されている場合とそうでない場合で、emit先のEventRouterを切り替えることができるようになっている.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def configure(conf)
  error_label_config = nil

  # initialize <label> elements before configuring all plugins to avoid 'label not found' in input, filter and output.
  label_configs = {}
  conf.elements.select { |e| e.name == 'label' }.each { |e|
    name = e.arg
    raise ConfigError, "Missing symbol argument on <label> directive" if name.empty?

    if name == ERROR_LABEL
      error_label_config = e
    else
      add_label(name)
      label_configs[name] = e
    end
  }
  # Call 'configure' here to avoid 'label not found'
  label_configs.each { |name, e| @labels[name].configure(e) }
  setup_error_label(error_label_config) if error_label_config

  super

  # initialize <source> elements
  if @without_source
    log.info "'--without-source' is applied. Ignore <source> sections"
  else
    conf.elements.select { |e| e.name == 'source' }.each { |e|
      type = e['@type'] || e['type']
      raise ConfigError, "Missing 'type' parameter on <source> directive" unless type
      add_source(type, e)
    }
  end
end

さらに、RootAgentの親クラスであるAgent#configureでは同様にmatchに対してadd_match、filterについてadd_filterが呼ばれる.

Anget#configure

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def configure(conf)
  super

  # initialize <match> and <filter> elements
  conf.elements.select { |e| e.name == 'filter' || e.name == 'match' }.each { |e|
    pattern = e.arg.empty? ? '**' : e.arg
    type = e['@type'] || e['type']
    if e.name == 'filter'
      add_filter(type, pattern, e)
    else
      add_match(type, pattern, e)
    end
  }
end

AgentLabelの親クラスでもあるので、新しいLabelオブジェクトのconfigureが呼び出された際もこのコードが実行されることになる.

add_filteradd_matchが何をしているかというと、そのAgentが持っているEventRouterに対してルーティングのルール(Ruleオブジェクト)を登録している.

絵にすると、、、

FluentdのBlogに書かれているサンプルを元に、どんな感じのオブジェクトたちが出来上がるかを絵にするとこんな感じ.

RootAgent

labelごとにルーティングテーブルを持つので、labelが違えば異なるルールでルーティングする、ということができるようになる.

emitの動き

ここまでで、各Input, Filter, Outputプラグインインスタンスは、自分がemitする先のEventRouterオブジェクトを知っていることになる.

まず、Inputプラグイン内では自分が知っているEventRouteremitする.

1
router.emit(tag, time, record)

emitemit_streamに飛ぶので、以下のコードが呼び出される. matchメソッドが返してきたオブジェクトに対してemitする.

EventRouter#emit_stream

1
2
3
4
5
def emit_stream(tag, es)
  match(tag).emit(tag, es, @chain)
rescue => e
  @emit_error_handler.handle_emits_error(tag, es, e)
end

EventRouter#match に飛ぶ. matchはemitされたtagを受け取るべきCollectorを返す.

1
2
3
4
5
6
def match(tag)
  collector = @match_cache.get(tag) {
    c = find(tag) || @default_collector
  }
  collector
end

このCollectorを探す部分がどうなっているかと言うと、

event_router#find

こんな感じになっている. つまり、Filterが使われていればPipelineオブジェクトを生成してそこにFilterやOutputを順次追加していく. FilterがなければPipelineの代わりにOutputを直接返す.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def find(tag)
  pipeline = nil
  @match_rules.each_with_index { |rule, i|
    if rule.match?(tag)
      if rule.collector.is_a?(Filter)
        pipeline ||= Pipeline.new
        pipeline.add_filter(rule.collector)
      else
        if pipeline
          pipeline.set_output(rule.collector)
        else
          # Use Output directly when filter is not matched
          pipeline = rule.collector
        end
        return pipeline
      end
    end
  }

  if pipeline
    # filter is matched but no match
    pipeline.set_output(@default_collector)
    pipeline
  else
    nil
  end
end

よって、emitされたレコードはPipelineまたはOutputemitされることになる. そして、Pipelineemitされた場合は以下のコードに辿り着き、順番にFilterを通った後に最終的にOutputにemitされることになる.

Pipeline#emit

1
2
3
4
5
6
7
  def emit(tag, es, chain)
    processed = es
    @filters.each { |filter|
      processed = filter.filter_stream(tag, processed)
    }
    @output.emit(tag, processed, chain)
  end

このようにFilterを実現するためにPipelineという新しい仕組みを導入しているため、tagの書き換えによる多段フィルタをしなくて済むようになっている.

まとめ

  • v0.12のLabel, Filterを実現している部分のコードを読んでみた
  • Labelの部分は、RootAgent(設定ファイルのROOT部分)および各labelディレクティブごとにルーティングテーブル(EventRouter)を分けることにより実現されている
  • Filterは、レコードに対する連続した処理を表現する、Pipelineという新たな仕組みを導入することで実現されている

Comments