Go ahead!

Memoization for Everything

Mongoプラグインの仕様

| Comments

分散パフォーマンステスト関係を書こうと思っていたんですが,よくよく考えたらMongoプラグインについて日本語でまともな記事を書いたことなかったので書きます.

このエントリはウィークリーFluentdユースケースエントリリレーの参加エントリです.

概要

MongoプラグインはMongoDBに対するInput/Outputプラグインを提供します.またユーティリティとして,MongoDBのcappedコレクションに対してtailを行うmongo-tailコマンドも付属しています.

リポジトリ: https://github.com/fluent/fluent-plugin-mongo

MongoDBは内部はBSONですが,API的にはJSONでやりとりしており,また明示的なスキーマもいらないため,Fluentd周辺では集計サーバやテンポラリサーバとして広く利用されています. td-agentには既に同梱されているため,td-agentを利用している方は特別なインストールをすることなく使えます.

Outputプラグイン

Outputプラグインには3タイプ用意されています.

  • mongo: 単一のMongoDBに出力
  • mongo_replset: レプリカセットで構築されているMongoDBに出力
  • mongo_backup: ローカルのMongoDBに保存しつつ,他のプラグインを使って出力.利用者もほとんどいないため,今回は割愛.

以降に設定含めもう少し詳しく書いて行きます

mongo

MongoDBにイベントを書き込みます.BufferedOutputを利用しているため,イベントをバッファリングし,フラッシュする時にバルクでMongoDBに書き込みます.

1
2
3
4
5
6
7
8
<match mongo.**>
  type mongo

  database fluentd
  collection log
  host host1
  port 10000
</match>

これが基本的な設定の例です.この設定に対してApacheのログを読み込ませた場合には,例えば以下のようになります.

1
2
3
4
5
$ mongo --host host1 --port 10000
> use fluentd
> db.log.find()
{"host": "127.0.0.1", "method": "GET", "path": "/", "code": "200", ...}
...
  • database (必須)

イベントを書き込むデータベース名を指定します.

  • collection (デフォルト: untagged)

イベントを書き込むコレクション名を指定します.

  • host (デフォルト: localhost)

書き込み先のMongoDBのホストを指定します.

  • port (デフォルト: 27017)

書き込み先のMongoDBのポートを指定します.

  • バッファの設定

BufferedOutputを使用しているため,バッファに関してもいくつか指定出来る項目があります.指定出来る項目については,公式ドキュメントのバッファプラグインを見てください.

  • ミックスイン

SetTagKeyMixinとSetTimeKeyMixinの二つを利用しています.そのため,この設定も利用出来ます.

認証

MongoDBのユーザ認証を使うようにします.以下のように userpassword を設定に書きます.

1
2
3
4
5
6
7
<match mongo.**>
  type mongo
  # ...

  user handa
  password shinobu
</match>

capped

MongoDBの場合,出力先のコレクションにcappedコレクションを使いたくなる時が結構あります.そのため,設定で指定することが出来ます.

1
2
3
4
5
6
7
8
<match mongo.**>
  type mongo
  # ...

  capped
  capped_max  100
  capped_size 100m
</match>
  • capped

これを書くと作られるコレクションがcappedコレクションになります.この項目を指定した場合,以下の二つの設定が使用されます.

  • capped_size (必須)

cappedコレクションの容量を指定します.

  • capped_max (オプション)

cappedコレクションで保存出来るドキュメント数を指定します.

tag_mapped

MongoDBのようにコレクションが勝手に作られるデータベースを使っていると,Fluentdのタグに対応したそれぞれのコレクションに勝手に振り分けて保存したい時が出てきます.その時には tag_mapped を使います.

1
2
3
4
5
6
7
<match mongo.**>
  type mongo
  #...

  tag_mapped
  remove_tag_prefix mongo.
</match>
  • tag_mapped

これを書くとtag_mappedが有効になります.

  • remove_tag_prefix

タグからコレクション名を抽出する時に,この設定を元に整形します.指定された文字列でタグの先頭から正規表現マッチを行い,そのマッチした部分を削除します.

上記の例でmongo.fooというタグが来た場合, remove_tag_prefix がない場合にはmongo.fooというコレクションに書き込むが,”remove_tag_prefix mongo.”があった場合には”mongo.”が削除されfooというコレクションに書き込みます.

mongo_replset

MongoDBのレプリカセットにイベントを書き込みます.mongo_replset タイプは mongo タイプを継承して実装しているため, databasetag_mapped など,基本的な設定は mongo タイプを引き継ぎます.そのため,ここでは mongo_replset だけの設定に関して書きます

1
2
3
4
5
6
7
8
9
10
11
12
<match mongo.**>
  type mongo_replset
  #...

  nodes host1:27017,host2:27018,host3:27019
  num_retries 30

  name replset_name
  read secondary
  refresh_mode sync
  refresh_interval 60
</match>
  • nodes (必須)

mongo タイプではhost/portのペアを指定していましたが, mongo_replset タイプではレプリカセットのクラスタを指定します.現時点でFluentdは一つの設定項目に複数の値を指定する方法がないので,例のようにhost:portのリストを’,’で区切って指定します.

  • num_retries (デフォルト: 60回)

フェイルオーバーなどで一時的に繋がらない時に,何回リトライするかを指定します.この回数を超えた場合にプラグインは例外を投げます.

  • name, read, refresh_mode, refresh_interval (全てオプション)

これらはMongoクライアントのReplsetConnectionにそのまま渡されます.Rubyクライアントでの詳細はRDocレプリカセットの説明を参照してください.

Inputプラグイン

Inputプラグインは今のところ mongo_tail タイプのみです.

mongo_tail

このInputプラグインは,in_tailと同じような動作をMongoDBのcappedコレクションに対して行います(通常のコレクションではソートされた状態で,位置を覚えて取得することが出来ないためです).また,MongoDBから取得するドキュメントの’_id’フィールドの値はObjectIDになっておりJSONやMessagePackで扱えないため,’_id_str’フィールドに文字列としてセットされます.

1
2
3
4
5
6
7
8
9
10
11
12
<source>
  type mongo_tail

  database fluentd
  collection capped_log
  host host1
  port 10000

  tag mongo.log
  time_key time
  id_store_file /path/to/last_id
</source>
  • tag (必須 or tag_keyがあるときはオプション)

出力先のタグを指定する.tag_key が指定されていた場合には,そちらが優先される.

  • tag_key (必須 or tagがあるときはオプション)

入力のドキュメントから tag_key で指定されたフィールドの値を取得し,その値をイベントのタグとする.見つからない場合は’mongo.missing_tag’がタグとなる.

  • time_key (オプション)

入力のドキュメントから time_key で指定されたフィールドを取得し,イベントの時間とする.なければ入力を取得した時を,イベントの時間とする.

  • id_store_file (オプション)

最後に取得したドキュメントのObjectIDを保存し,tailのポジションを記憶する.再起動した場合にはこのObjectIDを元にtailを再開する.指定しない場合には保存せず,再起動したら再度末尾からtailするようになる.

  • database, collection, host, port (必須,必須,デフォルト: localhost, デフォルト: 27017)

Outputプラグインの mongo タイプと指定は同じですが, collection が必須になっている所が違います.

認証

Outputプラグインと同じです.

気をつけること

以下では利用する際に注意する所と,なんでそうなっているかを説明します.

buffer_chunk_limitの制限

mongoのOutputプラグインでは, buffer_chunk_limit に上限が設けられています.上限は

  • MongoDBのバージョンが1.8未満だと2MB
  • MongoDBのバージョンが1.8以上だと10MB

という感じに分けられています.これは,MongoDBで有名なドキュメント制限に由来しており,制限より大きなドキュメントや操作はクライアントやMongoDB本体で弾かれます.

ドキュメント制限ギリギリではない理由は,MessagePackとBSONのサイズの違いによります.JSONと較べるとMessagePackは小さくなる傾向にあり,BSONはあまり変わらないか少し大きくなる傾向にあります.特に整数などが項目に多いと顕著になるため,このように制限の約半分くらいにしています.しかしこれもヒューリスティックな制限であるため,実際のログの傾向などを調べて,適切な buffer_chunk_limit を指定する必要があります.

コレクションの設定チェック

今のところはcappedのチェックしかしてませんが,違う設定で既に存在しているコレクションに書き込もうとしていた場合には,以下のような例外を投げるようにしています.これは設定の使い回しなどで,うっかり誤ったコレクションに書き込むなどのミスを防ぐためにつけています.

1
"New configuration is different from existing collection: new = foo, old = bar"

また,tag_mappedの場合は現在細かな指定が出来ないため,デフォルトではこのチェックはしないようになっています. この動作のon/offを切り替えるには disable_collection_check で指定します.

1
2
3
4
<match mongo.**>
  #...
  disable_collection_check true
</match>

壊れているドキュメントの扱い

様々なアプリケーションのログをFluentdに流し込んでいると,たまにBSONとしてシリアライズ出来ないログが混じったりすることがあります(文字コードが壊れている,MongoDBとしてInvalidなフォーマットを持っている).この場合に,Fluentdとしてはログを勝手に捨てることは出来ません.なので,mongoのOutputプラグインでは,そのようなログはバイナリにしてMongoDBに保存するようにしています. 例えば以下のようなログが来たとします:

1
{"key1": "invalid value", "key2": "valid value", "time": ISODate("2012-01-15T21:09:53Z") }

この時key1の値がBSONに出来ないので,その壊れているフィールドと,Mixinで特別扱いされるtimeなどのフィールドを除くその他のフィールドを丸ごとバイナリに変換します(なぜ丸ごとするのかというと,RubyのBSONライブラリが失敗したフィールドを教えてくれないからです).その結果,以下のようなフォーマットになります.

1
{"__broken_data": BinData(0, Marshal.dump result of {"key1": "invalid value", "key2": "valid value"}), "time": ISODate("2012-01-15T21:09:53Z") }

バイナリになった壊れたデータは’__broken_data’という特別なフィールドにセットされ,再度insertが行われます.後でこの壊れたデータをチェックしたい場合には,以下のようなRubyスクリプトでチェックすることが出来ます

1
2
3
collection.find({'__broken_data' => {'$exists' => true}}).each do |doc|
  p Marshal.load(doc['__broken_data'].to_s) #=> {"key1": "invalid value", "key2": "valid value"}
end

ここまで書いてなんですが,そもそもログの種類によっては壊れているものは捨てても良い場合があると思います.その時は, ignore_invalid_record を使うことで捨てることが出来ます

1
2
3
4
<match mongo.**>
  #...
  ignore_invalid_record true
</match>

MongoDBそのものの制限

これはプラグインではどうしもようないので,使う前にチェックしてください.

まとめ

図を書く元気がなかったので文字が多くなりましたが,MongoDBをFluentdで使う時に利用出来るプラグインとその設定について書きました.MongoDBを使った事例に関してはスライドやブログが既にいくつかあるので,興味をある方は検索してみてください.

また,使っていてなんか不具合があった場合にはissueやpull requestをして頂ければと思います.

Comments