Go ahead!

Memoization for Everything

Released fluent-plugin-mongo 0.6.0

| Comments

Gem page is here.

This version requires mongo gem version 1.5.2 for Replica Set.

New features

Replica Set support

You can use mongo_replset for connecting to Replica Set cluster.

Example configuration is below:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<match mongo.**>
  type mongo_replset
  database fluent
  collection logs

  # each node separated by ','
  nodes localhost:27017,localhost:27018,localhost:27019

  # num_retries is threshold at failover, default is 60.
  # If retry count reached this threshold, mongo plugin raises an exception.
  num_retries 30

  # following optional parameters passed to ReplSetConnection of mongo-ruby-driver.
  # See mongo-ruby-driver docs for more detail.
  #name replset_name
  #read secondary
  #refresh_mode sync
  #refresh_interval 60
</match>

Handling invalid records

Fluentd is an event collector, so Mongo plugin should handle an invalid record as a BSON.

Mongo plugin approach marshals an invalid record when mongo-ruby-driver detects such record. And Mongo plugin inserts marshaled record as a broken data to same collection.

If passed following invalid record:

1
{"key1": "invalid value", "key2": "valid value", "time": ISODate("2012-01-15T21:09:53Z")}

then Mongo plugin converts this record to following format:

1
{"__broken_data": Marshal.dump result of {"key1": "invalid value", "key2": "valid value"}, "time": ISODate("2012-01-15T21:09:53Z")}

In the result, we can rescue and analyze a broken data later.

NOTE

Mongo-ruby-driver cannot detect an invalid attribute, so Mongo plugin marshals all attributes excluding Fluentd keys(“tag_key” and “time_key”).

Ignore an invalid record

If you want to ignore an invalid record, set ignore_invalid_document parameter in match.

1
2
3
4
5
6
7
8
<match forward.*>
  ...

  # ignore invalid documents at write operation
  ignore_invalid_document true

  ...
</match>

Tag mapped mode in mongo type

0.6.0 merges mongo_tag_collection type into mongo type. You can use tag_mapped parameter in mongo type for enabling tag mapped mode.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<match forward.*>
  type mongo
  database fluent

  # You use 'tag_mapped', then tag mapped mode enabled.
  tag_mapped

  # If tag is "forward.foo.bar", then prefix "forward." is removed.
  # Collection name to insert is "foo.bar".
  remove_tag_prefix forward.

  # This configuration is used if tag not found. Default is 'untagged'.
  collection misc

  # Other configurations here
</match>

TODO

  • Support multi-process processing using DetachMultiProcessMixin
  • Support authentication if needed

Released MessagePack RPC Python 0.2.0

| Comments

Source package is here.

The main feature is MessagePack RPC Server support.
Example is below:

1
2
3
4
5
6
7
8
9
import msgpackrpc

class SumServer:
  def sum(x, y):
    return x + y

server = msgpackrpc.Server(SumServer())
server.listen(msgpackrpc.Address("localhost", 10000))
server.start()

You can use Client to access this server:

1
2
client = msgpackrpc.Client(msgpackrpc.Address("localhost", 10000))
client.call("sum", 1, 2) # => 3

Other improvements

  • Support Python 3 (tested with Python 3.2.2)
  • Add notify request to Client
  • Add (un)pack_encoding option for MessagePack (de)serializer

TODO

Current implementaion lacks some features compared to Ruby implementaion. I will add new features, e.g. advanced return, async return, etc. In addition, I will refactor the internal architecture.