Go ahead!

Memoization for Everything

Fluentd v0.12でのFilterとLabel

| Comments

Fluentd,最近だと海外でも露出が増えてきていて,軽量・柔軟・ロバストという所で, 新規の他,既存のログコレクタのリプレース含め,採用する所が増えてたりします.

より改善するため色々とユーザにヒアリングした結果,「フィルタ機能が欲しい」というのが一番多い意見でした. Fluentdは元々Treasure Dataへロバストにデータを転送するためのミドルウェアで,「ETLとかはTreasure Dataで」 というのもあり,組み込みでフィルタ機能はありませんでした.

今現在のOutputプラグインによるフィルタ実装は,タグの書き換えが必要だったりして少し慣れが必要で,初心者にはちと難しい. ということで,より簡単に効率よくデータストリームを扱えるフィルタ機能を入れることにしました!

前置きが長くなりましたが,次のバージョンであるv0.12ではFilterとLabelの導入が目玉機能になります. これらは二つともデータストリームの処理をより楽にするための機能です.

後,Fluentd v0.10.53のリリースアナウンスでも書きましたが, 今現在のmasterブランチはすでにv0.12用になっています.また,以下の機能はまだmasterにはマージされていません.今週中にはマージ予定ですが, 試して見たい方は,PR #416をチェックしてください.

Filter

Outputプラグインで出力する前に,イベント群に処理を適用するための仕組みです. greprecord_refomergeoipプラグインなどは,Filterとして実装することでより効率良く処理出来るようになります.

最初に実装,その後に設定の例を見せます.

実装

FilterのAPIは以下のようになっています.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
module Fluent
  class SomeFilter < Filter
    Plugin.register_filter('some_filter', self)

    def filter(tag, time, record)
      # レコードを弄るプラグインは,ここを実装すればOK
      # レコードを返す必要がある
      modified_record
    end

    def filter_stream(tag, es)
      # 以下はFilterのデフォルト実装.grepなどはこちらを置き換えればOK
      new_es = MultiEventStream.new
      es.each { |time, record|
        new_es.add(time, filter(tag, time, record))
      }
      new_es
    end
  end if defined?(Filter) # v0.10とかでエラーにならないための回避策
end

filter(tag, time, record)

SetXXXMixinを置き換えるadd_metadataフィルタを見て貰えればわかりやすいと思いますが, 引数で渡ってくるレコードを弄って,それを返り値にするだけです
filterとは違いfilter_streamは上記のデフォルト実装があるので,変更する必要はありません.

大抵のプラグインはこのAPIをオーバーライドして処理を実装すれば,その結果が次のフィルタに渡されるようになります.

filter_stream(tag, es)

EventStreamを直接処理するFilterを書くためのAPIです.例えばgrepはレコードを弄らず,EventStream全体に対して処理を行うプラグインなので, このAPIをオーバーライドします.filter_streamを変更する場合,filterを変更する必要はありません.
filterとは違い,返り値としてEventStreamを返す必要があります.Fluentdは,このfilter_streamの返り値を次のFilterに渡し,最終結果をOutputプラグインに渡します.

また,record_reformerrenew_recordのように,新しくレコードを生成する系のプラグインもこちら側で処理を実装することになります. greprecord_reformerのFilter例は以下にあります.

設定

<filter>セクションが追加されています.たとえば,add_metadatagrepを使う場合には以下のようになります.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<source>
  type forward
</source>

<filter> # <filter>と<filter **>は同じ
  type grep
  input_key message
  regexp WARN
</filter>
<filter>
  type add_metadata
  include_tag_key
</filter>

<match **>
  type stdout
</match>

Fluentdの設定ファイルの流儀にしたがって,上から順番にmatchにマッチしたところまでのFilterが適用されます. この例では,まずgrepが適用され,その次にadd_metadata,そしてこれらの結果のEventStreamがstdoutに渡されます.
例えば以下のデータが入ってきたとして:

{"message":"INFO"}
{"message":"WARN"}

stdoutには,grepで最初のレコードがフィルタリングされ,add_metadataでタグが追加された以下のデータが渡されます:

{"message":"WARN", "tag":"filter.test"}

タグを書き換えてmatchの条件を調整して〜という煩わしさから解放されるようになります.

注意点としては,マッチしたmatchの下にあるfilterは,たとえtagがマッチしようが適用されない所です.

パフォーマンス

タグの書き換えや再emitが発生しないため,速度やメモリ効率は多少よくなるはずです.

手元で約450MBのファイルをin_tailで読み込み,record_reformerでホスト名などを付加して転送する, というのをv0.10とv0.12で測ってみた所,v0.10は181秒,v0.12は149秒で,約30秒短縮されました.

ユーザによってはもっとFilterがチェーンしている所もあるはずなので,そういう場合には恩恵が大きいと思います.

Label

これはFilterやOutputをまとめるための機能になります.制限として,InputはLabelの中に含めることは出来ません. こちらは先に設定から見てみます.

設定

以下が簡単な例になります.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<source>
  type forward
  @label @forward
</source>

<source>
  type tail
  path /path/to/file
  tag tail
</source>

<filter **>
  type grep
  # ...
</filter>

<match **>
  type mongo
  # ...
</match>

<label @forward>
  <filter **>
    type add_metadata
    include_tag_key
  </filter>
  <match **>
    type file
  </match>
</label>

forwardの中に@label @forwardというパラメータがあります.これがLabelの指定になっていて, @labelがあるとその指定されたLabelにイベント群が流れて行きます.なので,この例ではforwardで 受け付けたイベント群は,add_metadataフィルタを通り,その下のfileに到達します.
一方,@labelのないInputは今までと同じ挙動となり,トップにイベント群を流します. この例では,grepフィルタを通り,その下のmongoに到達します.

Labelの機能によって,各プラグイン間の関係がわかりやすくなり,またルーティングのためにタグを調整する必要もなくなります. out_relabelプラグインを使えばデータを別のLabelに飛ばせるので,入力によって違うフィルタを適用したいが,書き込み先は一緒にしたい, みたいな処理も簡単にできるようになります.

実装

v0.12から,routerというのがInput/Outputに増えてます.今までEngine.emitと書いていた所を,router.emitに書き換えるだけで, Labelの機能が使えるようになります.
言い換えれば,この書き換えをしていないInput/Outputプラグインは,@labelが使えないということになります.

まとめ

v0.12の主要機能であるFilterとLabelについて簡単に説明しました.まだ実装としていくつか細かな改善はあるのですが, 基本機能は上記のままで行くと思います.
何かフィードバックがあれば@repeatedlyにmentionをするか,上記の該当PRにコメントをして貰えればと思います.

Comments