CoffeeScript(1.7+)で書いたテストをmochaで実行する

mocha

メモ。
検索しても古いやりかたしか出てこなかったので。

mochaはnode.js製のテストフレームワークです。
CoffeeScriptはJavaScriptのトランスパイラです。

CoffeeScriptで書いたテストをmochaで実行できるのですが ドキュメント によると、 CoffeeScript 1.7からそのコマンドが変わっています。

coffee-script is no longer supported out of the box. CS and similar transpilers may be used by mapping the file extensions (for use with –watch) and the module name. For example –compilers coffee:coffee-script with CoffeeScript 1.6- or –compilers coffee:coffee-script/register with CoffeeScript 1.7+.

というわけで、以下のように実行します:

$ mocha --compilers coffee:coffee-script/register

古いやり方で出るエラー

$ mocha --compilers coffee:coffee-script
/.../node_modules/coffee-script/lib/coffee-script/coffee-script.js:195
          throw new Error("Use CoffeeScript.register() or require the coffee-s
                ^
Error: Use CoffeeScript.register() or require the coffee-script/register module to require .coffee.md files.
  at Object._base.(anonymous function) [as .coffee] (/.../node_modules/coffee-script/lib/coffee-script/coffee-script.js:195:17)
  at Module.load (module.js:355:32)
  at Function.Module._load (module.js:310:12)
  at Module.require (module.js:365:17)
  at require (module.js:384:17)
  at /.../node_modules/mocha/lib/mocha.js:184:27
  at Array.forEach (native)
  at Mocha.loadFiles (/.../node_modules/mocha/lib/mocha.js:181:14)
  at Mocha.run (/.../node_modules/mocha/lib/mocha.js:393:31)
  at Object.<anonymous> (/.../node_modules/mocha/bin/_mocha:380:16)
  at Module._compile (module.js:460:26)
  at Object.Module._extensions..js (module.js:478:10)
  at Module.load (module.js:355:32)
  at Function.Module._load (module.js:310:12)
  at Function.Module.runMain (module.js:501:10)
  at startup (node.js:124:16)
  at node.js:842:3

koaをCoffeeScriptで書く

tl;dr

  • koaは、JavaScript(ES6)のgeneratorを活用したnode.js向けのwebフレームワークです
  • CoffeeScriptの開発版で、最近generatorの構文がサポートされました
  • 早速、試しにCoffeeScriptでkoaを使ってみたらちゃんと動きました

Generatorとは?

JavaScriptの機能の一つです。
JavaScriptでは非同期的な処理の結果はコールバック関数で受け取るのが通例です。
しかし、例えばネットワーク通信系の処理などでは、コールバックが多くなりすぎてコードがとても複雑になってしまう問題がありました。
Generatorは、このコールバック地獄を解決できると期待されている機能です。

以下に、node.jsによるコールバック関数を使った非同期処理を記します:

var fs = require('fs');

// カレント ディレクトリーのファイル一覧を取得する
fs.readdir('.', function(err, files) {
    // 先頭のファイルの中身を読み取る
    fs.readFile(files[0], 'utf-8', function(err, data) {
        // 読み取った結果を出力する
        console.log(data);
    });
});

みづらいですね。
しかしGeneratorを使うと:

var co = require('co');
var fs = require('fs');

co(function *() {
  var files = yield co.wrap(fs.readdir)('.');
  var data = yield co.wrap(fs.readFile)(files[0], 'utf-8');
  console.log(data);
});

シンプル!
構文的には、function*yieldキーワードが新しい要素です。

詳しくは下記の記事が分かりやすいので読んでみて下さい。

CoffeeScriptでgeneratorがサポートされた

ECMAScript 6でサポートされるgeneratorは、V8で実装されました。
V8を使っているnode.jsでも、unstableバージョンで使用出来ます。
先述の通り、このgeneratorは新しい構文が必要なため、CoffeeScriptでは使えませんでした。
しかし長い議論の末、開発版でついにgeneratorが対応になりました!finally!

上記Pull Requestによると、function内にyieldキーワードがあると、自動的にgenerator(function*)に変換されるようです。

さっそく使ってみる

node.jsはバージョン 0.11.9 以上を用意して下さい。

開発版のCoffeeScriptをインストールします:

$ npm install --save git://github.com/jashkenas/coffeescript.git

以下に簡単な例を示します(a2z.coffee):

a2z = ->
  c = 97

  while (c < = 'z'.charCodeAt(0))
    yield String.fromCharCode(c++)

g = a2z()
i = 0
console.log ++i, g.next().value
console.log ++i, g.next().value
console.log ++i, g.next().value

以下のコマンドで実行します:

$ coffee --nodejs --harmony a2z.coffee
1 'a'
2 'b'
3 'c'

使えました!

koaをCoffeeScriptで書く

koa は、node.js用の次世代Webフレームワークです。
generatorの仕組みを活用しています。
CoffeeScriptのgeneratorサポートによって、koaもCoffeeScriptで書けるようになりました。
さっそく書いてみます。

app.coffee:

koa = require 'koa'
app = module.exports = koa()

app.use (next)->
  this.body = yield (cb)->
    cb null, 'hello, world'

app.listen 3000 if !module.parent

以下のコマンドで実行します:

$ coffee --nodejs --harmony app.coffee

http://localhost:3000/ を開いてみましょう。
hello, worldと表示されたら成功です!

今後の進化が楽しみですね。

Cayleyでお手軽グラフデータ操作入門

Cayley logo

Cayleyとは

Cayleyは、最近リリースされたばっかりのグラフエンジンです。
グラフエンジンとは、グラフ構造を扱うために特化したNOSQLのDBMSです。グラフデータベースなどと呼ばれたりもします。
Cayleyの他には、neo4jが有名です。
Cayleyの作者のBarak Michener氏はGoogleナレッジチームのエンジニアです。
特徴は以下の通り:

  • Googleナレッジグラフにインスパイアされている
  • Go言語で実装
  • バックエンドストアはMongoDBとLevelDB、エフェメラルな用途でのインメモリに対応
  • Gremlinに似たオブジェクトを備えるJavaScript、MQLに対応

Google Open Source Blogによると、Cayleyが作られた経緯には、Freebaseの膨大なグラフデータを素早くフリーで簡単に使えるようにする事が挙げられています。

本エントリでは、付属のサンプルデータセットを使って基本的な使い方を把握したいと思います。

グラフ構造の基本

Cayleyを使う前に、それが扱うデータ構造についての理解が必要です。
Cayleyが扱うグラフ構造とは、以下のような要素間関係構造をもつデータのことです。

Screen Shot 2014-08-13 at 8.20.56 PM

データは、Vertex(頂点)とEdge(枝)の群で構成されます。
これで表される情報は、例えば友達関係、親子関係、所属、ネットワーク、部品構成などです。
巷で人間関係が「ソーシャルグラフ」と呼ばれるゆえんは、このデータ構造の名称にあります。

Screen Shot 2014-08-13 at 8.21.06 PM

上図は、Steveの親はJohnである事を示すグラフデータです。
このようなエッジに方向性を含むグラフ構造を有向グラフと呼びます。
Cayleyは有向グラフを扱うエンジンです。

インストールと開始

Release Binaryからプラットフォームに合ったバイナリをダウンロードして任意の場所に展開します。

$ wget https://github.com/google/cayley/releases/download/v0.3.1/cayley_0.3.1_linux_amd64.tar.gz
$ cd cayley_0.3.1_linux_amd64

次にサンプルデータのパスを指定して起動します。

$ ./cayley http --dbpath=30kmoviedata.nq.gz
Cayley now listening on 0.0.0.0:64210

起動すると、ポート64210でウェブサーバが立ち上がります。
ブラウザからアクセスすると以下のような画面が表示されます。

Screen Shot 2014-08-13 at 8.45.18 PM

そのほかの起動方法

起動オプションは他にもいろいろあります。詳しくはこちらを参照。

httpの代わりにreplを指定するとREPLプロンプトが立ち上がります。

$ ./cayley repl --dbpath=30kmoviedata.nq.gz
cayley>

基本的なクエリ

クエリはJavaScriptで書けます。
読み込んだサンプルデータはFreebaseの映画データです。
まずは女優の「Uma Thurman」のVertexを探してみます。

graph.Vertex("Uma Thurman").All()

結果:

{
 "result": [
  {
   "id": "Uma Thurman"
  }
 ]
}

このように、graphオブジェクトのメソッドを呼び出してグラフデータにアクセスします。

ここで、Uma Thurmanは名前でありエンティティではありません。
nameUma Thurmanを持つエンティティを取得してみます。

graph.Vertex("Uma Thurman").In("name").All()

結果:

{
 "result": [
  {
   "id": "/en/uma_thurman"
  }
 ]
}

また、graphにはgVertexにはVというエイリアスがあります。
以下の式は等価です:

graph.Vertex("Uma Thurman").In("name").All()
g.V("Uma Thurman").In("name").All()

では、Uma Thurmanのエンティティが取得できたので、更に主演した映画の一覧を取得してみます:

graph.V().Has("name", "Uma Thurman").In("/film/performance/actor").In("/film/film/starring").Out("name").All()

結果:

{
 "result": [
  {
   "id": "The Golden Bowl"
  },
  {
   "id": "Les Miserables - The Dream Cast in Concert"
  },
  {
   "id": "Vatel"
  },

...

  {
   "id": "Kill Bill"
  },

...

  {
   "id": "The Life Before Her Eyes"
  },
  {
   "id": "The Avengers"
  }
 ]
}

みなさんご存知「キル・ビル」に主演している事が分かりました!
では逆に、Kill Billの出演者一覧を取得してみます:

g.V().Has("name","Kill Bill").Out("/film/film/starring").Out("/film/performance/actor").Out("name").All()

結果:

{
 "result": [
  {
   "id": "Uma Thurman"
  },
  {
   "id": "David Carradine"
  },
  {
   "id": "Daryl Hannah"
  },

...

  {
   "id": "Sonny Chiba"
  },
  {
   "id": "Julie Dreyfus"
  }
 ]
}

使用したメソッドが In から Out に変化した事にお気づきでしょうか?
これは、起点とするVertexが逆になり、それに伴ってエッジの方向も逆になったためです。

クエリのビジュアライズ

ウェブUIのQuery Shapeを選択して、下記の主演映画一覧のクエリを実行してみます。

graph.V().Has("name", "Uma Thurman").In("/film/performance/actor").In("/film/film/starring").Out("name").All()

すると、下記のような図が表示されます。なんかぐねぐねしてます。

Screen Shot 2014-08-13 at 9.10.40 PM

モーフィズムとタグを使う

graph.Morphism(Alias: graph.M)は、パスオブジェクトを作ります。
以下のようによく使うパスを定義すれば、後から使い回せます。

var shorterPath = graph.Morphism().Out("foo").Out("bar")

以下のようにMorphismを呼び出します:

var p = graph.M().Has("name", "Uma Thurman").In("/film/performance/actor").In("/film/film/starring").Out("name");
graph.V().Follow(p).All();

タグを使うと、辿ったパスの任意の頂点を保存できます。
例えば、Uma Thurmanが主演した全ての映画で、共演した俳優・女優の一覧を取得するには:

costar = g.M().In("/film/performance/actor").In("/film/film/starring").Out("name")
graph.V().Has("name", "Uma Thurman").Follow(costar).As("Movie")
.FollowR(costar).Out("name").As("Costar").All()

結果:

{
 "result": [
  {
   "Costar": "Jeremy Northam",
   "Movie": "The Golden Bowl",
   "id": "Jeremy Northam"
  },
  {
   "Costar": "Peter Eyre",
   "Movie": "The Golden Bowl",
   "id": "Peter Eyre"
  },
  {
   "Costar": "Madeleine Potter",
   "Movie": "The Golden Bowl",
   "id": "Madeleine Potter"
  },
  {
   "Costar": "Nickolas Grace",
   "Movie": "The Golden Bowl",
   "id": "Nickolas Grace"
  },

...

  {
   "Costar": "Uma Thurman",
   "Movie": "The Avengers",
   "id": "Uma Thurman"
  },
  {
   "Costar": "Sean Connery",
   "Movie": "The Avengers",
   "id": "Sean Connery"
  },
  {
   "Costar": "Eddie Izzard",
   "Movie": "The Avengers",
   "id": "Eddie Izzard"
  },
  {
   "Costar": "Fiona Shaw",
   "Movie": "The Avengers",
   "id": "Fiona Shaw"
  },
  {
   "Costar": "Patrick Macnee",
   "Movie": "The Avengers",
   "id": "Patrick Macnee"
  },
  {
   "Costar": "Jim Broadbent",
   "Movie": "The Avengers",
   "id": "Jim Broadbent"
  }
 ]
}

クエリ結果のビジュアライズ

Uma Thurmanと過去に共演した事のある主演者同士の関係を取得します:

costar = g.M().In("/film/performance/actor").In("/film/film/starring")


function getCostars(x) {
  return g.V(x).As("source").In("name")
          .Follow(costar).FollowR(costar)
          .Out("name").As("target")
}


function getActorNeighborhood(primary_actor) {
  actors = getCostars(primary_actor).TagArray()
  seen = {}
  for (a in actors) {
    g.Emit(actors[a])
    seen[actors[a].target] = true
  }
  seen[primary_actor] = false
  actor_list = []
  for (actor in seen) {
    if (seen[actor]) {
      actor_list.push(actor)
    }
  }
  getCostars(actor_list).Intersect(g.V(actor_list)).ForEach(function(d) 
{
    if (d.source < d.target) {
      g.Emit(d)
    }
  })
}

getActorNeighborhood("Uma Thurman")

ウェブUIでVisualizeを選択して、上記クエリを実行すると、下図のような画像が得られます:

Screen Shot 2014-08-13 at 9.45.56 PM

おお、かっこいいですね!

参考

fluentd + MongoDB + Elasticsearch + Kibanaでログを可視化する

elasticsearch+kibana

SaaSは利用料が高いのでOSSを使う

サーバのログを可視化するSaaSは沢山あり、手軽にBusiness Intelligenceを施行できます。
DataDogとかKeen IOとかlibratoLogglyなどなど。
とても便利そうですね。でも価格が高い!
なんでもかんでもSaaSに頼ってたら毎月数十万とかになりそうです。貧乏にはつらいです。
だから、OSSで無料でやりましょう。
もちろんインフラ代は最低限かかります。
でも、クラウドリソースはいずれ水みたいになりますから、そこはしばらくの辛抱です。

要件

独自フォーマットのログを扱いたい

今回はApacheのアクセスログみたいな一般的なものではなく、独自のアプリケーションログを集積して可視化します。
ログデータのフォーマットは以下のようなイメージです:

{
    "logid": "Log ID",
    "level": "Log level (DBG/MSG/WRN/ERR)",
    "message": "Message",
    "module": "Name of the application module",
    "hostname": "Hostname sent the log",
    "time": "timestamp of the log",
    <Further data..>
}

このフォーマットは昔に設計したもので、解析するのを意識していなかった事情があります。

アプリケーション特化の情報も一緒に格納したい

上記フォーマットに書いてある<further data..>の部分は、アプリケーション特化の情報です。
エラーログならコールスタックとか、ユーザ登録ならユーザIDとかです。
そういう特別な情報は、可視化までしなくとも個別に見られるようにしておきたい。
また、今回の構成が将来的に変更になっても対応しやすいように、フォーマットをシステム特化で縛りたくない。

グラフ設定を簡単に柔軟に変えられるようにしたい

Graphiteとかすごく良いんですが、細かい調整がやりにくい。
タイムスケールをもうちょっと細かくみたいな、とか、ヒストグラムの分割単位を変えたいとか、そこはパイグラフで見たいとか、トレンドで見たいとか、このシリーズだけ抜き出したいとか。
あと、上記でも挙げた特化データを個別にすぐ表示できると便利です。
GrowthForecastもいいんだけど、イマドキRRDtoolかよ・・JSでグリグリ動くのがいい。

システム構成

上記の要件を満たす構成は、以下のようなイメージです。
タイトルの通り、fluentd + MongoDB + Elasticsearch + Kibanaとなりました。

log_visualization_system

以下、それぞれ概要と用途を説明します。

fluentd

ログ集積のためのツール。
PHP/node.js/Ruby/Python製などのさまざまなプログラムと連携させられます。
syslogやApache/nginxログにも対応。
それらのログを集めて、フィルタリングしたり整形した後、集積先へと転送してくれます。

MongoDB

NOSQLのDBMSです。
構造体のデータを格納できます。
スキーマレスなので、アプリケーション特化の情報をどんどん追加格納できます。
Capped collectionで高速にログを蓄積できます。

今回は、ログの一次格納先として使用します。
のちのち可視化部分のシステム構成が変更になった場合に、元のデータをオリジナルで保持しておけば対応がいくらか容易になるからです。

Elasticsearch

Luceneベースの全文検索サーバ。
MongoDBと同じく、スキーマレスで構造体が扱えます。
MongoDBからデータを流し込んで、可視化用にインデックスを作成してくれます。

Kibana

タイムスタンプがついたデータならなんでも可視化できるというツール。
Elasticsearchと密結合になっています。
UIがよくできていて、柔軟に設定を変えられるので、今回の用途にもってこいです。

Chefを使ったセットアップ手順

基本的にサーバはChefで管理したいので、Cookbookを拾ってきて使います。
以下を前提に進めます

  • MongoDBサーバはすでに立ててある
  • サーバはDebian/Ubuntu
  • インフラはEC2

手順内にsspeと出てきますが、プロジェクトのコードネームです。
ご自身のプロジェクト名と読み替えてください。

fluentdの設定

アプリケーションは、td-agent(fluentd agent)に対してhttp経由でログを投げます。
td-agentは、受け取ったログを指定のmongodbに転送します。
mongodbはreplicasetなので、typeをmongo_replsetとしています。

<source>
  type http
  port 8888
</source>

<match sspe.log.*>
  type mongo_replset
  database fluent
  collection log

  # Following attibutes are optional
  nodes mongodb-server-1:27017,mongodb-server-2:27017

  # Set 'capped' if you want to use capped collection
  capped
  capped_size 1000m

  # Other buffer configurations here
  flush_interval 10s
</match>

ElasticsearchとKibanaのインストール

1つのサーバにElasticsearchとKibanaの両方を入れます。
Elasticsearchには、以下の4つのプラグインもインストールします。

  • elasticsearch-mapper-attachments
    • elasticsearch-river-mongodbから依存
    • GridFSを取り扱うためのもの
  • elasticsearch-river-mongodb
    • MongoDBからElasticsearchにデータを流し込むためのもの
  • elasticsearch-head
    • 簡易管理UI
  • elasticsearch-HQ
    • headよりリッチな管理UI

まずはEC2ノードを立てる

Knifeコマンドでサクっと立てます。

  • OS: Ubuntu Server 14.04 LTS (HVM), SSD Volume Type
$ knife ec2 server create 
    -I ami-a1124fa0 
    -f t2.small 
    --node-name server-name 
    --security-group-ids sg-hogehoge 
    --subnet subnet-111111111 
    --associate-public-ip 
    --ebs-size 50 
    --ssh-key ssh-key-name 
    --identity-file ~/.ssh/id_rsa 
    --ssh-user ubuntu 
    --ssh-gateway ubuntu@hogehuga 
    --verbose

CookbookをChef serverにアップロードする

Berksfileに以下の行を追記します。

cookbook 'elasticsearch'
cookbook 'java'
cookbook 'kibana', git: 'git@github.com:lusis/chef-kibana.git'

なぜかKibanaだけ同名の想定とは別のものが入ってしまったので、repos uriを指定します。
インストールしてアップロードします。

$ berks install
$ berks upload

Chefノードを設定する

ノードの設定を編集します。

$ knife node edit server-name
{
  "name": "server-name",
  "chef_environment": "_default",
  "normal": {
    "tags": [

    ],
    "java": {
      "install_flavor": "openjdk",
      "jdk_version": "7"
    },
    "elasticsearch": {
      "version": "1.2.2",
      "cluster": {
        "name": "sspe"
      },
      "plugins": {
        "elasticsearch/elasticsearch-mapper-attachments": {
          "version": "2.3.0"
        },
        "com.github.richardwilly98.elasticsearch/elasticsearch-river-mongodb": {
          "version": "2.0.1"
        },
        "mobz/elasticsearch-head": {
        },
        "royrusso/elasticsearch-HQ": {
        },
        "elasticsearch/elasticsearch-lang-javascript": {
          "version": "2.1.0"
        }
      }
    },
    "kibana": {
      "config": {
        "elasticsearch": "window.location.protocol+"//"+window.location.hostname+":"+9200",
        "default_route": "/dashboard/file/guided.json"
      },
      "kibana": {
        "web_dir": "/opt/kibana/current"
      }
    },
  },
  "run_list": [
    "recipe[java]",
    "recipe[elasticsearch]",
    "recipe[elasticsearch::plugins]",
    "recipe[kibana::install]"
  ]
}

設定出来たら、chef-clientを実行します。

動作確認

以下のように結果が返ってきたら成功です。

$ curl "http://localhost:9200/_cluster/health?pretty"
{
  "cluster_name" : "sspe",
  "status" : "green",
  "timed_out" : false,
  "number_of_nodes" : 1,
  "number_of_data_nodes" : 1,
  "active_primary_shards" : 0,
  "active_shards" : 0,
  "relocating_shards" : 0,
  "initializing_shards" : 0,
  "unassigned_shards" : 0
}

Elasticsearchの設定

インデックスの作成

Elasticsearchのインデックスとは、RDBMSでいうデータベースみたいなもんです。
下記コマンドで作成します。

curl -XPUT 'http://localhost:9200/sspe/'

マッピングの作成

マッピングとは、RDBMSにおけるスキーマ定義みたいなもんです。
明示しなくても自動で作成させる事もできますが、今回は勉強も兼ねて指定します。

$ curl -XPUT 'http://localhost:9200/sspe/log/_mapping' -d '
{
    "log" : {
        "properties" : {
            "logid" : {"type" : "string" },
            "level" : {"type" : "string" },
            "message" : {"type" : "string", "store" : true },
            "module" : {"type" : "string" },
            "hostname" : {"type" : "string" },
            "time" : {"type" : "date" },
            "userinfo": { "type": "object", "enabled": false, "include_in_all": true }
        }
    }
}
'

MongoDB riverの設定

riverとは、Elasticsearchにデータを流し込む経路設定のことです。
elasticsearch-river-mongodbをインストールしてあるので、これを使う設定をします。

$ curl -XPUT "localhost:9200/_river/mongodb/_meta" -d '
{
  "type": "mongodb",
  "mongodb": { 
    "servers":
    [
      { "host": "mongodb-server-1", "port": 27017 },
      { "host": "mongodb-server-2", "port": 27017 }
    ],
    "options": { 
      "secondary_read_preference" : true
    },
    "db": "fluent", 
    "collection": "log"
  }, 
  "index": { 
    "name": "sspe", 
    "type": "log"
  }
}'

実行すると、データの抽出が開始されるはずです。

データの流入確認

下記URLから、headプラグインの管理画面を開きます。

http://<elasticsearch-server>:9200/_plugin/head/

Screen Shot 2014-08-05 at 3.43.45 AM

ドキュメント数が増えていれば成功です。

トラブルシューティング

Cannot start river mongodb. Current status is INITIAL_IMPORT_FAILED

もし設定がうまく動かなくて、Elasticsearchを再起動してもログに上記が吐かれて止まってしまう場合。
Riverのステータスをリセットしてやれば再稼働します。

$ curl -XDELETE 'http://localhost:9200/_river/mongodb/_riverstatus'

実行後、Elasticsearchを再起動します。

Kibanaの設定

ElasticSearchとの連動確認

ESとKibanaをインストールしたサーバにブラウザからアクセスします。

http://<elasticsearch-server>/#/dashboard/file/guided.json

すると、下記のような画面が表示されるはずです。

Screen Shot 2014-08-05 at 3.40.39 AM

自分用のダッシュボードを設定する

以下にアクセスすると、空っぽのダッシュボードにアクセスできます。

http://<elasticsearch-server>/#/dashboard/file/blank.json

ADD A ROWというボタンをクリックして、グラフを表示する領域の行を追加します。

Screen Shot 2014-08-05 at 3.46.27 AM

Rowの名前を入力後、Create Rowします。

Screen Shot 2014-08-05 at 3.47.23 AM

次に、パネルを追加します。

Screen Shot 2014-08-05 at 3.48.53 AM

Screen_Shot_2014-08-05_at_3.51.10_AM

  1. パネルタイプをhistogramにします
  2. 幅を12に設定
  3. タイムフィールドにtimeを指定
  4. Auto-intervalのチェックを外す
  5. Saveボタンをクリック

すると・・・

Screen Shot 2014-08-05 at 3.51.19 AM

やった!出ましたね!
エラーログのみを表示したい場合は、QUERYの欄に level:ERR と打ちます。
クエリ構文はLuceneと基本同じものが使えるようです。以下が参考になります。

参考リンク

Node.jsにおける大量のデータ処理の際の非同期処理コールバックがスタックしすぎる問題への対処

ストリーミング処理の落とし穴

Mongoose(MongoDBのODM)を使って、コレクションからほんの10万ほどのドキュメントを処理するスクリプトを実行した時でした。処理が固まってしまうのです。なぜでしょう?

各ドキュメント処理では、コールバックを伴う非同期的な処理を含みます。
問題のCoffeeScriptのコードは以下のようなイメージです:

Foo.find()
    .sort value:-1
    .stream()
    .on 'data', (d)->
        someTask d

someTask = (d)->
    doAwesomeJob (result)->
        console.log "Great!"

モデルFooのドキュメントを全て読み込み、各ドキュメントをストリームに流し込んでいます。
メモリを節約するためにストリームの手法を採用しています。
doAwesomeJobは、例えばAPIを叩くとか、そういう非同期な処理です。
しかし、doAwesomeJobのコールバック関数がなかなか呼ばれない。遅い。ついには全く呼ばれなくなる。

イベントループのキューが溢れている

Node.jsアプリケーションはシングルスレッドです。
下図のように、イベントループ機構が備わっていて、Event Queueに溜まった処理キューを順次消化していく仕組みになっています。

event-loop

(via http://misclassblog.com/interactive-web-development/node-js/)

つまり、Node.jsはひとつずつしかキューを処理できないのです。
上記スクリプトで、ストリームのイベントのInvocationが、大量にこのイベントキューに押し寄せている事を想像してください。
doAwesomeJobのコールバックは、その非同期処理の完了時点で最後列にエンキューされます。
しかし、非同期処理の間にドキュメント読み出しイベントの処理キューが大量に溜まります。
その結果、doAwesomeJobのコールバックが呼ばれるまでに時間がかかってしまっているのでした。

似た問題に直面している人がいました。

RabbitMQのメッセージキューがいちどに大量に来た時に、イベントループのキューが溢れるようです。
問題は僕のケースより少し深刻なようです。見なかったことにしましょう

ストリームを分割しよう

溢れるのなら、分割すればいいですね。
以下のように変更を加えました:

processNext = (offset, num, on_completion)->
    i = 0, j = 0, closed = false
    Foo.find()
        .sort value:-1
        .skip offset
        .limit num
        .stream()
        .on 'data', (d)->
            ++j
            someTask d, ->
                ++i
                if i==j && closed
                    processNext offset+i, num, on_completion
        .on 'close', ->
            closed = true
            on_completion() if j==0

someTask = (d, callback)->
    doAwesomeJob (result)->
        console.log "Great!"
        callback()

processNext 0, 100, ->
    console.log "done!"

ドキュメントを100個ずつ読み出して、処理して、終わったら次の100個・・というフローに変更しました。
今のところ、これで上手く動いています。

でもこのコードは少し汚いので、よりエレガントに書くためにasyncを使うことをお薦めします。
drainで完了をフックするといいと思います。

このストリームの分割では、ストリームを処理している間にドキュメントが追加・削除される事を想定していません。
コレクションの変更まで考慮する場合は、想定される変更内容によって変わると思います。

ご参考まで!

[iOS] Swiftの隠し機能:数学記号をカスタムオペレータとして使う

swift-book

新言語Swiftでは、C++のようにカスタムのオペレータを定義できます。
このオペレータには、Unicodeの文字が使えるとのこと!
下記のツイートのような感じで使えるらしいです。

let r = ¬(a ∧ b)

詳しい例はこちら:

https://gist.github.com/mattt/f457625af116721ffb57.js