山縣です。
前回はSpark について書きましたが今回は最近良く使うようになっているEmbulk について書きたいと思います。
Embulkとは?
Embulk はオープンソースのバルクローダーです。あるデータストアから別のデータストアにデータを転送するためのツールです。たとえば CSV形式のデータをPostgreSQLやMySQLのようなRDBMS や Google のBigQueryなどにロードしたい、あるいはRDBMS上のデータを CSV 形式で gzipで圧縮して取得したいなどというような使い方ができます。
Embulk では様々なデータストアへの対応をプラグインという形でサポートします。Embulk のサイトから持ってきた下図のように、入力プラグインから取り込まれたデータはEmbulk のデータ型に変換され、出力プラグインに渡されます。このように入力と出力が独立したプラグインとして実装可能なため、利用者側は様々な入力出力を組み合わせて利用することが可能です。
例えば今まで CSVのテキストファイルを PostgreSQL に取り込むという処理を Embulk で行っていたとして、分析のために BigQuery にも入れたいとします。この場合、入力側で行っていた細かい設定はそのままで出力側の設定だけ BigQuery にするだけで対応することが可能になります。
Embulk では入力、出力プラグインだけでなく、フィルタープラグインなど様々な種類のプラグインが実装可能となっており、すでに多くのプラグインが公開されています。Embulk のプラグイン一覧はこちらから確認することができます。
私が Embulk に興味を持ったのはこの拡張性の高いアーキテクチャと、すでに多くのプラグインが提供されており、今後 Embulk がデファクトスタンダードとなっていくのではと感じたからでした。
Embulkの導入
私が感じているEmbulk の魅力のもう一つはその導入の容易さです。Java が動いている環境であれば、jarファイルを取得してくるだけで済みます。
サイトでは Unix環境(Mac含む),Windows環境でのインストール手順が書かれていますが、その手順はとても簡単です。
curl --create-dirs -o ~/.embulk/bin/embulk -L "http://dl.embulk.org/embulk-latest.jar"
chmod +x ~/.embulk/bin/embulk
echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
source ~/.bashrc
上記はUnix環境での手順になりますが、見れば分かるように embulk 本体の jar ファイルを取得して path を通しているだけです。この手順をコピペして実行するだけで簡単に embulk が使えるようになります。
$ embulk --help
Embulk v0.8.13
Usage: embulk [-vm-options] <command> [--options]
Commands:...
ちなみにインストール手順では embulk の jar ファイルに実行権限を設定して直接実行していますが、これはヘッダ部分がスクリプトとなっており、そのスクリプトの中で java -jar コマンドで自分自身を実行するという方法で実現されています。
$ cat .embulk/bin/embulk |less
: <<BAT
@echo off...
BAT
java_args=""
...
if test ...; then
java_args="-XX:+AggressiveOpts -XX:+UseConcMarkSweepGC $java_args"
Windows のBATと Unix系の shell script の両方が書かれており、どちらでもシングルバイナリで動くようにしているのが興味深いです。
Embulk を試す
少し話がずれてしまいましたが、Embulk はインストールが簡単なだけでなく、初めての利用者が手軽に試すことができるようにサンプルデータも含んでおり、 "embulk example" というコマンドを実行することでサンプルのデータと設定ファイルを生成できます。またチュートリアルとしてこのサンプルの使い方が出力されます。
$ embulk example ./try1
2016-09-12 15:53:13.122 +0900: Embulk v0.8.13
Creating ./try1 directory...
Creating ./try1/
Creating ./try1/csv/
Creating ./try1/csv/sample_01.csv.gz
Creating ./try1/seed.ymlRun following subcommands to try embulk:
1. embulk guess ./try1/seed.yml -o config.yml
2. embulk preview config.yml
3. embulk run config.yml
上記のコマンドで try1 というディレクトリ配下にテスト用のデータ csv/sample_01.csv.gz と設定ファイル seed.yml が作られています。
$ cat seed.yml
in:
type: file
path_prefix: "/home/xxxx/try1/csv/sample_"
out:
type: stdout
seed.yml は YAML の形式となっており、設定内容は大きく分けて入力プラグインの設定(in:)と出力プラグインの設定(out:) に分かれています。type フィールドでは入力、出力それぞれで使用するプラグインを指定しています。この場合、入力側の 'file' は Local file input plugin 、出力の 'stdout' はStdout out plugin を指定していることになります。いずれも本体にビルトインされているプラグインで、プラグインのインストールの必要がありません。ビルトインプラグインの設定はこちらに記載されています。Local file input plugin の情報では path_prefix で指定したプレフィック以下のファイルを読み込むようになっています。
Embulk を実行するには "embulk run" コマンドを使います。しかしこのseed.yml を使って実行してもエラーになります。
$ embulk run ./seed.yml
2016-09-13 17:53:30.887 +0900: Embulk v0.8.13
org.embulk.exec.PartialExecutionException: org.embulk.config.ConfigException: com.fasterxml.jackson.databind.JsonMappingException: Field 'parser' is required but not set...
これは Local file input plugin の設定で、必須パラメータ parser が指定されていないからです。今回の入力データは CSV形式なので parser には "csv" を指定しその設定を、またgzipで圧縮されているので decoders の指定が必要となります。幸いこれらの設定は "embulk guess" コマンドを使うことで自動生成することが可能です。
$ embulk guess seed.yml -o config.yml
...
in:
type: file
path_prefix: /home/youy/try1/csv/sample_
decoders:
- {type: gzip}
parser:
charset: UTF-8
newline: CRLF
type: csv
delimiter: ','....
columns:
- {name: id, type: long}
- {name: account, type: long}
- {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
- {name: purchase, type: timestamp, format: '%Y%m%d'}
- {name: comment, type: string}out: {type: stdout}
上記のように必要な設定を入力データに基づき生成してくれます。各カラムのデータ型も指定されています。あくまでも推測なので、常に期待したフォーマットになるわけではないと思いますが、このファイルをベースに修正をかければ一から書くよりは手間が省けるのではないかと思います。
最後に生成した config.yml を実行すると以下のようになります。
$ embulk run ./config.yml
...
1,32864,2015-01-27 19:23:49,20150127,embulk
2,14824,2015-01-27 19:01:23,20150127,embulk jruby
3,27559,2015-01-28 02:20:02,20150128,Embulk "csv" parser plugin
4,11270,2015-01-29 11:54:36,20150129,...
この設定では標準出力にデータを出力するので上記のようになります。なお "embulk preview" コマンドを使うと出力する前にデータの中身を確認することができます。
$ embulk preview ./config.yml
+---------+--------------+-------------------------+-------------------------+----------------------------+
| id:long | account:long | time:timestamp | purchase:timestamp | comment:string |
+---------+--------------+-------------------------+-------------------------+----------------------------+
| 1 | 32,864 | 2015-01-27 19:23:49 UTC | 2015-01-27 00:00:00 UTC | embulk |
| 2 | 14,824 | 2015-01-27 19:01:23 UTC | 2015-01-27 00:00:00 UTC | embulk jruby |
| 3 | 27,559 | 2015-01-28 02:20:02 UTC | 2015-01-28 00:00:00 UTC | Embulk "csv" parser plugin |
| 4 | 11,270 | 2015-01-29 11:54:36 UTC | 2015-01-29 00:00:00 UTC | |
+---------+--------------+-------------------------+-------------------------+----------------------------+
プラグイン・更新
以上Embulkのドキュメントに従って Embulk のコマンドの使い方を簡単に見てみましたが、実際に使う上では自分が使うデータストアのプラグインをインストールして利用することになります。プラグインのインストールは簡単で、"embulk gem install <plugin name>" でインストールできます。
$ embulk gem install embulk-output-sqlite3
2016-09-14 11:40:54.196 +0900: Embulk v0.8.13
Fetching: jdbc-sqlite3-3.8.11.2.gem (100%)
Successfully installed jdbc-sqlite3-3.8.11.2
Fetching: embulk-output-sqlite3-0.0.1.gem (100%)
Successfully installed embulk-output-sqlite3-0.0.1
2 gems installed
上記は sqlite3 へ出力するembulk-output-sqlite3 プラグインのインストール例になります。インストール時に指定する名称は embulk-<プラグインの種別>-<プラグインの名前> という形をしています。プラグインの一覧から利用したいプラグインを調べて、上記の名前を指定してインストールすると良いと思います。
sqlite3 へロードするように config.yml を変更します。
out:
type: sqlite3
database: '/tmp/try1.db'
table: 'example1'
変更箇所は out: の部分だけです。それ以外のところを変更する必要はありません。/tmp/try1.db というデータベースの example1 というテーブルにロードします。
$ embulk run config.yml
2016-09-14 11:44:52.807 +0900: Embulk v0.8.13
2016-09-14 11:44:57.071 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-sqlite3 (0.0.1)...
2016-09-14 11:44:59.913 +0900 [INFO] (main): Committed....
データを確認してみます。
$ sqlite3 /tmp/try1.db
SQLite version 3.6.20sqlite> select * from example1;
id|account|time|purchase|comment
1|32864|2015-01-27 19:23:49 UTC|2015-01-27 00:00:00 UTC|embulk
2|14824|2015-01-27 19:01:23 UTC|2015-01-27 00:00:00 UTC|embulk jruby
3|27559|2015-01-28 02:20:02 UTC|2015-01-28 00:00:00 UTC|Embulk "csv" parser plugin
4|11270|2015-01-29 11:54:36 UTC|2015-01-29 00:00:00 UTC|
確かにデータがロードされていることがわかります。
embulk はバージョンアップもコマンドで簡単にできます。
$ embulk selfupdate
2016-09-14 11:57:07.549 +0900: Embulk v0.8.13
Checking the latest version...
Already up-to-date. 0.8.13 is the latest version.
特定のバージョンの Embulk をインストールすることも可能です。
$ embulk selfupdate 0.8.12
2016-09-14 12:14:31.023 +0900: Embulk v0.8.13
Checking version 0.8.12...
Found version 0.8.12.
Downloading https://dl.bintray.com/embulk/maven/embulk-0.8.12.jar ...
Updated to 0.8.12.
きちんと自分で検証したバージョンを使いたい場合に便利です。
以上、Embulk 本体の example を元に使い方を見てきましたが、非常に手軽に試せることが分かるかと思います。
アスタミューゼでの活用
Embulk は弊社においていくつかのデータ処理に活用されています。(現状使用しているの私だけかもしれませんが...)
私が Embulk を使いはじめたのは Spark を使ったデータ処理においてPostgreSQL上のデータを HDFS上にCSVなどにダンプしたいと思ったのがきっかけでした。Spark は様々なデータソースにアクセス出来るので、はじめは直接PostgreSQLへアクセスして利用していました。しかしDB自体の性能やネットワークの負荷などを考えるとHadoopクラスタ内にデータを置いておいたほうがレスポンスもよく、開発がしやすいと判断しました。この処理のため embulk-input-jdbc, embulk-output-hdfs などのプラグインを利用させてもらっています。
また別のところでは業務担当者から受け取った CSVファイル を加工してHDFS に上げるため、embulk-filter-column なども利用しています。それ以外では GCP(Google Cloud Platform)の課金データを BigQuery に入れて集計するのに使ってもいます。
データのローディングは個別にプログラムを書いていくこともできますが、そういったバッチプログラムは往々にしてメンテが滞りレガシーとなってしまうことがよくあります。そのため複雑なロジックを必要としないデータロードについては今後は embulk を活用し、変換処理を embulk の設定ファイルに落としこむようにしていきたいと思っています。
embulk を利用していて感じたことは、embulk はプラグインがすでに豊富に揃っており多くのデータストアに対応できている一方、実際に利用しようとすると機能面で欲しいと思うものがいろいろ出てくるということです。
そういうこともあり機能追加のためにいくつかのプラグインにパッチをコントリビュートさせてもらったりしています。
データのロードに関する要求は企業やプロジェクトによって様々で、そういったニーズを拾い上げていくには、より利用者と開発者が増えていくことが重要なのかと思っています。自分としても利用者、開発者として貢献していければと思っています。