astamuse Lab

astamuse Labとは、アスタミューゼのエンジニアとデザイナーのブログです。アスタミューゼの事業・サービスを支えている知識と舞台裏の今を発信しています。

MongoDB 4.0 探検日記

f:id:astamuse:20180918174726j:plain

MongoDBと過ごした8年を振り返りながら、お気に入りのMongoDBマグカップで濃いめのモカを啜っています。

たのしくテンポの良い開発、フラグメンテーションとの長い夜、主語の大きいロック、星の数ほどのMongoDBステッカー、高嶺の花のMongoDB公式Tシャツなどが走馬燈のように目の裏を駆け巡ります。

今年6月、ついに昨年10月のNasdaq上場後初のメジャーバージョンアップであるMongoDB 4.0がリリースされました。

この成長著しいMongoDBにキャッチアップすべく、私fdkはMongoDB 4.0の探検に出かけました。*1

MongoDBについて

MongoDBは2007年より10gen(現MongoDB Inc.)により開発され、2009年の初版リリース以来、継続的にリリースを重ねているドキュメント指向データベースです。階層構造を表現できるドキュメントと呼ばれるデータ構造を、BSONという表向きはJSONによく似たバイナリ形式で格納します。高速なオペレーション、スキーマレス、透過的圧縮のサポート、レプリカセットによる冗長性とスケーラビリティなどが強みとして挙げられます。

一方で、これまでトランザクション機能がない点がアキレスの踵と囁かれていましたが、MongoDB 4.0の登場により、そのような通説は風と共に消え去ろうとしています。

MongoDB 4.0の主な新機能

1. Multi-document ACID transactions

これまでMongoDBでは、ドキュメント単位のアトミックな更新は実現されており、実用上多くの利用ケースでは十分とされていました。MongoDB 4.0では、複数ドキュメントにまたがるトランザクションが実装され、課金、予約、口座間の資金移動などの、トランザクションが求められる領域にも対応できるようになりました。

MongoDB 4.0ではRDB同様にトランザクションの制御をデータベース側でサポートしてくれるので、アプリケーション側での実装コストが削減できます。

トランザクションは複数ステートメントで記述するようになっており、そのためのAPIが用意されています。*2

以下、MongoDB 4.0のトランザクションについてのメモです。

  • 単一もしくは複数のコレクションやデータベースに含まれるドキュメントに対して適用される
  • Snapshot isolationにより、各トランザクションはデータの一貫性ビューを提供し、データの整合性を維持するall-or-nothing実行を強制する*3
  • 実行中のトランザクション内では、自身の未コミットの書き込みは参照できるが、トランザクション外の操作からは一切見えない
  • 未コミットの書き込みはトランザクションがコミットされるまでセカンダリのノードにレプリケーションされず、トランザクションがコミットされると自動的に全てのセカンダリセカンダリに複製され適用される
  • 長いトランザクションや膨大な操作を伴う単一のトランザクションはストレージエンジンであるWiredTigerのキャッシュを圧迫する(スナップショット以降の全ての書き込みの状態を維持しなければならないため)
  • デフォルトでは60秒を超えるとトランザクションは自動的にabortされる
  • トランザクション内で読むことのできるドキュメント数の上限は無いが、1トランザクション内での更新は1000ドキュメント以内であることが推奨される
  • トランザクションは単一のoplogエントリとして記録されるため、16MBのサイズ制限を受け(更新の場合は差分のみ、新規の場合はドキュメント全体がカウント対象となる)、これを超えた場合トランザクションはabortされ、rollbackされる
  • トランザクションがabortされるとドライバに例外が返され完全にrollbackされるが、ネットワーク障害やプライマリ選出中などの一時的な障害を想定し、アプリケーション側で適宜リトライなどの実装をするべきである
  • コミット操作時にエラーが発生した場合、ドライバは1度だけリトライする
  • トランザクションを使用しない場合のオーバーヘッドは全く無い

トランザクション機能が追加されたことで、将来的にトランザクション制御が必要になるかもしれないというケースにもMongoDBは有効な選択肢となりそうです。

2. Aggregation Pipeline Type Conversions

MongoDBはスキーマフリーのため*4、データ構造について柔軟です。それゆえに、収集したデータをBIや機械学習で活用する際に、同一のエレメントで複数の異なる型のデータが存在しているケースがありえます。

そのようなケースで、集計パイプラインにおける型変換を、別途外部のETLプロセスを介することなくデータベース内で行う機能です。*5

具体的には$convertという演算子を使い、型の変換ルールを指定します。

{
   $convert:
      {
         input: <expression>,
         to: <type expression>,
         onError: <expression>,  // Optional
         onNull: <expression>    // Optional
      }
}

この機能を活用すると、データ分析において別途のETL処理を省くことができるため、複雑性やコストの低減が期待できます。

3. The aggregation pipeline builder

MongoDB CompassというGUIツールにより、MongoDB内のデータの構造の可視化、アドホックのクエリ実行、インデックスの作成、バリデーション規則の作成といったことができます。このツールで集計のパイプラインの構築を試行錯誤しながら行えるようになりました。

f:id:astamuse:20180918174824p:plain

画面は、サンプルデータ*6 で前述の型変換を試しつつパイプラインを構築してみた様子。作成したパイプラインを名前付けして保存できます。

コード吐き出し機能があり、作成したパイプラインのコードを出力できます。現時点でPython3, Node, Java, C#の4言語に対応しています。

直感的で使いやすく、データの観察、データ分析、アプリケーション開発の助けとなるツールではないかと思います。

4. MongoDB Charts (β版)

MongoDB Chartsは、MongoDB専用のBIツールです。Docker Swarm上のコンテナで動作します。 Redashなど他のツールと同様にデータソースに接続し、データの美しい可視化、ダッシュボードの作成、共有ができます。

f:id:astamuse:20180918174906p:plain

画面はローカルで起動したMongoDB Chartsで作成したダッシュボード。MongoDB Atlas上に作成したレプリカセットに接続し、テスト用に作成したサンプルデータでチャートを作成してみました。beta版のためチャート作成の細かい部分はまだこれからという印象です。

5. Non-Blocking Secondary Reads

MongoDB 4.0ではセカンダリからの読み出しとレプリケーションの書き込みが同時に実行できるようになりました。

MongoDBでは、プライマリで発生した書き込みはセカンダリに対してバッチ適用するようになっています。

よく言われるようにMongoDBは結果整合では無く、実際には一連の書き込みは各ノードで同じ順序で見えるように制御されます。

4.0より前のバージョンでは、順序を維持するため、セカンダリに対する読み出し要求はブロックされていました。つまり、oplogが適用されるのを待たされるケースが発生しており、書き込み負荷が大きいほど停止時間は長く、セカンダリからの読み出しレイテンシのメトリクスに悪影響を与えていました。

加えて、書き込み側は全ての読み出しの完了を待つロックを必要とし、セカンダリへの読み出し要求が多い時に、書き込みが遅延する原因となっていました。

MongoDB 4.0では、一貫性のあるスナップショットから読むことにより、セカンダリへの読み出し要求はoplogの適用中にブロックされず、またプライマリへのwrite concernがmajorityの書込みはロックの緩和により速く承認されるため、読み出の遅延とセカンダリの遅延を減らすと同時にレプリカセット全体のスループットが最大化されるとのことです。*7

6. SHA-2 Authentication

MongoDB 4.0では、認証メカニズムの強化として、SCRAM SHA-256が追加され、MONGODB-CRは廃止になりました。 MONGODB-CRスキーマでの認証を行なっている場合は、MongoDB 4.0へのアップグレードに先立ってSCRAM-SHA-1にアップグレードする必要があります。

企業のセキュリティポリシーにより、既に脆弱性の見つかっているSHA-1を使わないようにする必要がある場合、SCRAM SHA-1からSCRAM-SHA256へのアップデートが選択肢となります。 *8

7. The improved shard balancer

MongoDB 4.0では、シャード・バランサーの性能が、同時実行により最大40%向上し、より時機を得たスケールができるようになったとのことです。
*9

アップグレードについて

3.6系にアップグレードしてから4.0にアップグレードする必要があります。

これまで同様に、ローリングアップグレードができます。

SCRAM-SHA-256などいくつかの新機能を有効にするためにはfeatureCompatibilityVersionを4.0に変更する必要があります。 当然ですが、テスト環境での事前の検証が推奨されます。

まとめ

MongoDB 4.0の新機能を駆け足で見てきました。木を見て森を見るようにMongoDBの深淵を垣間見ることができました。

トランザクションの実装により守備範囲を広げ、データ分析や機械学習向けデータ処理パイプラインのサポート、BIツールの提供、そしてクラウド、モバイルを含むあらゆる場所での実行をサポートするというホリスティックでダイナミックな舵切りに期待が高まりました。

これからもMongoDBと歩み続けよう、と気持ちを新たにした一日でした。

平成最後の夏、Playとscalaと私の記録。

f:id:astamuse:20170815181256p:plain

おはようございます、chotaroです。
我が家のエアコンは、平成最後のお仕事を終えました。
平成最後の秋は音楽の秋にする予定です(๑•̀ㅂ•́)و✧

さてこの夏、2つほどの機能を持つ小規模なWEBアプリケーションを、Playで組んでリリースしました。 Playに関しての詳細は公式ドキュメントや、弊ブログ内の記事が詳しいので割愛します。

何かとはじめてのことばかりで、初々しく甘酸っぱい経験になりました。

本エントリはその甘酸っぱさを忘れないうちに形にしておくための試みです。

概要

つくるもの

メインの画面は2つ。

  • URLパラメータをもとに検索結果一覧を表示する画面
  • 検索結果の詳細情報を表示する画面 また、詳細情報の画面から、問い合わせを送る事ができる仕組みと、アクセスログを専用のテーブルに保存する仕組みを実装しています。

つくるひと

メイン担当者はこんな感じのスキルセット

  • java 2年半
  • swift3 1年
  • スクラッチでのwebアプリ開発経験なし
  • データベース設計の経験なし

こんなことやりました

DB設計

要件から、エンティティ図を出して、そこからEDMに落とし込んで、というプロセスで設計し、edmを作成。
が、一人ではそもそもエンティティ図の段階で限界があったので経験値豊富な先輩にフォローしてもらってなんとか形に。

Scalaの採用

Javaのリリースサイクルが大幅に変更されるにあたり、既存アプリケーションも大きな影響を受けざるを得ませんでした。
(開発リソース上の問題もあり、モジュールシステムの検証も十分にできてない状況ではありますし。)

せっかく小規模なアプリ開発案件なのだから、実験的にやってみてもいいのではないか、ということでscalaを採用してみました。

Play frameworkの採用

公式で提供されているプロジェクトテンプレートを用いれば動くものが手っ取り早く用意できるので、上述のscalaのトレーニング的な意味も込めて、採用しました。

感想

良かったこと

scala

細かいところを詰めるとわからないことだらけではあるが、javaよりも記法に柔軟性があり、どちらかといえばswiftライクで個人的にとても好印象。
(あと単純に新しい言語を学ぶのはすごく楽しい。)

Play framework

最高です。ドキドキしました。

  • htmlからscalaのfunctionを呼び出せる
  • 実装の柔軟性が高い
  • routesによるurl定義がかなり可読性高くわかりやすい
  • webpackのように常時実装とコンパイル、画面の表示結果を確認しながら開発が可能なので、上記の確認も高速でできる

今までmavenやらtomcatやらjspやらの世界にいたのでなんかこう、アツいものがこみ上げてきます。最高。
自分一人で組むならこれ以外の選択肢はない、気がします(ただし、実装者に関する課題点は多々あるのと、多人数で進めるときのテスト戦略は別途検討が必要な印象。)

しんどかったこと

DB設計

とりあえず議論しながら作ってみよう、で進めたがうまく議論が回らず時間ばかりかかり、失敗。
泣きそうになりながら「楽々ERDレッスン」を読んで考えたりしてみましたが、一向に具体的な正解が見えませんでした・・・

最終的に原理原則を知った上でたたき台を作って、それをベースに議論を突き合わせる、でなんとかEDMまでたどり着きました。
先輩には多謝です。ありがとうございました

保守性の問題

WEB自体はスムーズにできたものの、自分ひとりで組んでしまったので怪しい部分がいくつかあります。
箇条書きするとこんな感じです。

  • 実装の構造がやや煩雑
  • コードに他人の目が入ってないので、冗長なコードになっている可能性がある
  • 動きを確認しながら進めていたので、現状は問題ないが、単体テストの充実度低めで改修負荷がやや高い
  • (私がこっちにほぼかかりっきりになってしまったので、並行しているプロジェクトのコードも同様に属人化してしまっている)

また、playや基盤の問題として次のようなことがあります。

  • テンプレートhtmlへの変数の当て込みがControllerのfunctionを把握してないと書けないので、デザイナーさんやフロントエンドの人にとって難しい印象
  • ユーザも絞られているため、最低限の実装のみ具備。→セキュリティや次フェーズの改修、となったタイミングで検討が必要

もう少しうまく作ることでそれぞれ解消できるかもしれず、自分の経験値的な壁を感じました。

今後のこと

この夏の経験を踏まえた学びとしてはこんな感じです。

個人

とにかく、より多く0から作ってみる事が必要

  • セキュリティ観点
  • 例外のハンドリング
  • 実装の責務配置、手スタビリティを考慮した構造化

より美しく、かつ強固なアプリケーションを組めるように経験や知識を得る必要がありそうです。

チーム的な話

  • 技術的な負債の解消が前提になる
  • 改修決定時のタイミングなどでにペアプロでテストコードを充実させるなど、コモンセンスを広げる施策を打つ必要がある
  • いくつか輻輳している状況であっても、レビューとテストファーストの文化を大事にしていく必要があるかも(git-flowとか、サイクルに沿った運営を徹底していくべき)

課題点も含め、得られたものが多く、平成最後の夏は良い夏でした。
いざ実践。

それではこのあたりで失礼いたしますm(_ _)m

Spark で機械学習を社内データに適用してみた

山縣です。夏休みの宿題のようにブログの当番が回ってきました。

機械学習が非常に注目を浴びている今日このごろですが、私もデータ関連を扱うソフトエンジニアの端くれとして機械学習について学んだり、機械学習のアルゴリズムを時々試したりしています。

機械学習は面白いとは思うのですが、いざ実際に業務に適用しようとするとなかなか難しいなあと感じることもあります。ちょっと試してみると思ったような精度が出なかったり、機械学習でできないかというような要望と、機械学習できそうなこと(自分自身の知識的な問題も含む)に隔たりがある気がします。

今回は比較的扱いやすそうな課題があったので、ものは試しに機械学習でやってみました的なところを書いてみたいと思います。 また機械学習のプラットフォームとして Spark を使っているのでそのあたりについても書いてみました。

残念ながら機械学習や統計などについての十分な知識や経験があるわけではないので、おかしなことをやっている可能性もありますのでその点ご了承ください。

何をするか

弊社では特許を含めて様々なデータを収集し、Webサービスやコンサルティングに活用しています。その中の一つとしてクラウドファンディングサービスのデータがあります。海外も含めたクラウドファンディングのサービスのデータを集積したデータベースです。

以前、コンサルティングチームとのミーティングでクラウドファンディングのデータは彼らのコンサルティング業務に関連しないデータが多数あるのでそういったものを除去できないか、という話がでました。 その時は学習データを用意するのが難しいということで実際に進めるところまではいきませんでした。

ただ問題はデータを2つに分類する2値分類であることから機械学習を適用するには良い問題かなと思っていました。そこで今回は「クラウドファンディングのデータを自社業務に役立ちそうなデータとそうでないデータに分類する」という問題に取り組んでみました。

対象データの確認をする

対象となるデータベースには、500万件ほどのデータで、クラウドファンディングのサービス名、タイトルや説明文、開始日、目標金額、獲得金額など様々なデータ項目があります。またデータは多言語で格納されています。 サービス名、獲得金額などはデータを分類する上で重要な情報になりそうですが、今回はシンプルに説明文だけを特徴データとして使うことにしました。また対象となる言語は英語のみにして、原文が英語、または英語の翻訳があるデータについてのみを対象としました。なおこのようなデータは500万件のうち400万件程度でした。

分類の定義を決める

データを業務に関連するデータとそうでないデータに分けるのが今回の課題ですが、関連する、しないはどう定義すれば良いでしょうか?本来はきちんと検討して決める必要がありますが、今回は試験的なものでもあるので私自身がざっくりと決めました。

今回は「関連するデータ」を「何らかの製品やサービスなどの開発をしているもの」と定義してみました。ただし芸術系(音楽、映像、ゲーム)などは除きます。 ではそのようなデータはどれくらいあるのでしょうか? ランダムにデータを取得して確認する作業をしました。

その結果 200件中11件の「関連するデータ」を見つけることができました。全体の 5.5% くらいが「関連するデータ」と推測されます。2値分類をする上で分類が大きく偏っているのが気になりますがとりあえずこれで進めました。

学習データを集める

上記で分類を定義したので、それに基づき機械学習に使う学習データを収集する必要があります。一件一件データを見て分類していくのが確実ではありますが、それでは手間がかかりすぎます。そこで別の方法を考えました。

対象データは社内に提供するにあたって BigQuery にデータを入れ、 Re:dash 経由で見れるようにしてあります。

そこで BigQueryで試行錯誤しながらデータを集めることにしました。説明文データにそれっぽいキーワードで検索をかけたり逆にキーワードを除外する、また調達金額などの属性情報を利用して絞り込みました。 このようにしてRe:dash で SQL により集めたデータを CSV でダウンロードして間違っているデータの除外作業を行いました。この作業はなかなか大変でした。

これらの作業により最終的に「関連するデータ」4517件、「関連しないデータ」9257件、計13774件の学習データを作りました。

どのアルゴリズムを使うか

Spark の MLlib には分類問題を扱う様々なアルゴリズムが実装されていますが、今回はロジスティック回帰を使いました。深い理由は特にないのですが、最も基本的な学習アルゴリズムらしいことと、MLlib の 分類・回帰アルゴリズムのドキュメントでも一番始めに記述されているのでまずはこれで、という感じです。もし十分な精度が出ないなら他のも試そうということで気軽に決めました。

特徴データとしては説明文のテキストデータを使います。CoreNLP を使って形態素解析を行い、Word2Vec により特徴データを作ります。

MLlib で処理を実装する

MLlib で実際に学習するまでの流れは次のとおりです。

まずは学習データを Dataset(DataFrame) にロードします。学習データは関連するものと、そうでないものを同じフォーマットの別 CSV ファイルに保存しています。

val trainTrue = ss.read.format("org.apache.spark.csv").option("header", true)
  .option("inferSchema", true).csv( "関連するデータファイルへのパス").withColumn("label", lit(1))
val trainFalse = ss.read. ...  csv("関連しないデータファイルへのパス").withColumn("label", lit(0))
val train1 = trainTrue.union(trainFalse)

データをロードすると同時に label カラムを定義し、関連するデータには 1 を、そうでないものには 0 を設定し、union で一つの学習データ train1 を作ります。

次にロジスティック回帰に使う特徴ベクトルを作成します。 学習に使うデータは説明文のテキストデータなので、これをWord2Vec の入力データに出来るように単語のリストに変換する必要があります。説明文の形態素解析には CoreNLP を使いました。また記号などのストップワードを除去したり、数字を固定文字列に置き換えるなどの処理をしました。Word2vec への入力としては CoreNLP で取得した lemma を使います。

例えば "Mount Fuji (Fujisan), located on Honshu, is the highest mountain in Japan." という文章は [Mount, Fuji, Fujisan, located, on, Honshu, be, highest, mountain, in, Japan] という単語リストに変換されます。

scala> def nlp(t:DataFrame, ss:SparkSession):DataFrame = ...

scala> val train2 = nlp(train, ss) ...

scala> train2.select("lemma").show
+--------------------+
|               lemma|
+--------------------+
|[Whisper, Noise, ...|
|[special, dedicat...|
|[Legion, Meter, c...|
...

次に以下のように Word2Vec インスタンスを生成します。

  import org.apache.spark.ml.feature.Word2Vec
  val word2Vec = new Word2Vec()
    .setInputCol("lemma")
    .setOutputCol("features")
    .setVectorSize(100)

Scalaの 機械学習ライブラリはRDDベースの org.apache.spark.mllib と DataFrame ベースのorg.apache.spark.mlの2つのパッケージがありますが、すでに DataFrame ベースの API がメインとなっているのでこちらを使います。

単語のリストは DataFrame の lemma というカラムに保存されています。そして Word2Vec の結果は features というカラムに保存します。それ以外に Word2Vec のハイパーパラメータを適当に決めています。

  > val word2vecModel = word2Vec.fit(train2)
  > val train3 = word2vecModel.transform(train2)
  > train3.select($"features").show
+--------------------+
|            features|
+--------------------+
|[-0.0451879731548...|
|[-0.0263045642996...|

実際に学習をするのは fit() メソッドになります。学習することで Word2VecModel のインスタンスであるモデルデータを得ることが出来ます。そしてこのモデルデータに対して transform() メソッドを呼ぶことで特徴ベクトルを生成することが出来ます。

ちなみに Word2VecModel に対しては類似語を表示させたりすることが出来ます。

scala> word2vecModel.findSynonyms("accident", 5).show(truncate=false)
+-------+------------------+
|word   |similarity        |
+-------+------------------+
|injure |0.6633260250091553|
|acident|0.6160710453987122|
|wreck  |0.604138970375061 |
|icu    |0.604096531867981 |
|rte    |0.5960714817047119|
+-------+------------------+

特徴ベクトルが出来たのでロジスティック回帰を定義します。

val lr = new LogisticRegression()
  .setFeaturesCol("features")
  .setLabelCol("label")
  .setMaxIter(1000)
  .setRegParam(0.3)
  .setElasticNetParam(0.8)

ロジスティック回帰が学習する特徴ベクトルのカラムは features になります。そして対応するラベルは labelになります。学習は繰り返し行われますが、その最大数を 1000 に指定します。regParam は過学習を避けるための正則項の重みになりデフォルトでは 0(正則項無し)です。elasticNetParam は正則項として第一正則項と第二正則項というものがあり、その割合を指定します。デフォルトは0で第二正則項のみを使う設定になっています。どう指定するのが良いのかわからないのでとりあえずドキュメントのサンプルの値をそのまま指定します。 学習は以下のように行われます。Word2Vec と同じく fit() を呼び出します。

val lrModel = lr.fit(train3)

なお上記までに各アルゴリズムを独立して実行する方法を見ましたが、 MLlib では 学習の流れを Pipelineとして定義して一括して実行することができます。

val pipeline = new Pipeline().setStages(
  Array(word2Vec, lr)
)

val pmodel = pipeline.fit(train2)

学習とその結果を見る

実際に上記のpipeline を使って実行してみます。 実行環境は以下のようになっています。

  • CDH 5.13
  • Spark 2.3.0 (YARN)
  • Executor数 6
  • Core数 25 (total)
  • Memory 76GB (total)

実行時間は5分以下でした。

pmodel は PipelineModel のインスタンスであり、 pipeline で定義したアルゴリズムのモデルを保存しています。 以下のようにすることで、Word2Vec, LogisticRegression の学習モデルを取得することが出来ます。

val w2vModel = pmodel.stages(0).asInstanceOf[Word2VecModel]
val lrModel = pmodel.stages(1).asInstanceOf[LogisticRegressionModel]

F値等の値は以下のようになりました。

scala> lrModel.summary.labels
res19: Array[Double] = Array(0.0, 1.0)

scala> lrModel.summary.fMeasureByLabel
res2: Array[Double] = Array(0.8035621198957428, 0.0013250883392226147)

scala> lrModel.summary.recallByLabel
res3: Array[Double] = Array(0.9991357891325483, 6.641576267434137E-4)


scala> lrModel.summary.precisionByLabel
res4: Array[Double] = Array(0.6720191818644191, 0.2727272727272727)

label 1.0 が、「関連するデータ」になりますが、F値がかなり低くこれでは使い物になりません。

交差検証でハイパーパラメータを決める

上記の学習では各アルゴリズムに使うハイパーパラメータを適当に固定したものを使っていました。 しかし実際にはどういう値が適切なのかはわかりません。そこで性能を改善するため交差検証によりパラメータの最適化を試みます。 Spark では CrossValidator という交差検証をするクラスが提供されています。 交差検証で調べたいパラメータのリストを渡すことでそれらのパラメータの組み合わせについて学習をして性能を評価し、最適なパラメータを決めてくれます。

val paramGrid = new ParamGridBuilder()
  .addGrid(w2v.vectorSize, Array(100, 300))
  .addGrid(w2v.minCount, Array(3, 5))
  .addGrid(lr.regParam, Array(0.01, 0.05, 0.1, 0.3))
  .addGrid(lr.elasticNetParam, Array(0.0, 0.5))
  .build()

val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(new BinaryClassificationEvaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(5)
  .setParallelism(8)

まず、 ParamGridBuilder を使って、交差検証で検証したいハイパーパラメータとその値を指定します。 上記では Word2Vec の vectorSize(次元数)、minCount(単語の最小頻度数)、LogisticRegression の regParam, elasticNetParam を指定しています。

そして CrossValidator クラスには、学習する pipeline と上記のパラメータの組み合わせ等を指定します。evaluator を設定する必要がありますが今回は2値分類なので BinaryClassificationEvaluator を設定します。numFolds はデータの分割数です。交差検証では、与えられた学習データを numFolds で指定した数に分割して、そのうちの一つを検証用に残りを学習用に使います。parallelism は並列処理数です。

numFoldsは今回5にしたので検証用と学習用のデータの組み合わせは5個あります。また paramGrid で指定したパラメータの組み合わせは 2x2x4x2=32 です。したがって全部で160回学習が行われます。

交差検証の実行は以下のように行います。

val cvModel = cv.fit(train2)

並行実行したとしても今回は 160回学習を行うため処理時間は長くなります。 今回は約90分ほどかかりました。160回の学習を順番にしていた場合、ひとつ4分ほどで終了したとしても640分かかっていたと予想されるので並列化の効果はかなりあると考えられます。

cvModel は CrossValidationModel のインスタンスです。以下のようにベストの学習モデルを取得することができます。

val bestModel = cvModel.bestModel.asInstanceOf[PipelineModel]
val bestLrModel = bestModel.stages(1).asInstanceOf[LogisticRegressionModel]
val bestW2vModel = bestModel.stages(0).asInstanceOf[Word2VecModel]

ベストのモデルのF値等を見てみます。

printf("labels: %s\n", bestLrModel.summary.labels.mkString(", "))
printf("fMeasure: %s\n", bestLrModel.summary.fMeasureByLabel.mkString(", "))
printf("precision: %s\n", bestLrModel.summary.precisionByLabel.mkString(", "))
printf("recall: %s\n", bestLrModel.summary.recallByLabel.mkString(", "))


labels: 0.0, 1.0
fMeasure: 0.9499597747385358, 0.8952038638661126
precision: 0.9433319130805283, 0.9085727314181486
recall: 0.9566814302689857, 0.8822227141908346

ラベル1.0 「関連するデータ」のF値が大きく改善していることがわかります。 ベストモデルのハイパーパラメータは以下のように確認することが出来ます。

scala> bestW2vModel.getVectorSize
res12: Int = 300

scala> bestW2vModel.getMinCount
res13: Int = 3

scala> bestLrModel.getRegParam
res14: Double = 0.01

scala> bestLrModel.getElasticNetParam
res15: Double = 0.0

Word2Vec についてはベクトルの次元数が多いほうが良いことがわかりました。100次元だと十分に表現できていないのでしょうか?また minCount は小さい方が良いようです。これはあまり単語を除去しすぎると文章のコンテキストが失われるとかがあるのでしょうか? LogisticRegression の regParam は指定した中で一番小さい値が選択されました。 一方で elasticNetParam は 0.0 で第2正則項のみのほうが良い値が出たようです。

最後に再利用可能なようにモデルを保存します。

cvModel.write.overwrite.save("保存先のパス")

上記では上書きOKで保存しています。 もし ベストなものだけで良いのなら bestModel について同様に write() で保存することもできます。 ちなみに LogisticRegressionModel の summary は保存されないので F値等を残しておきたい場合は自前で記録しておく必要があるようです。

予測をしてみる

交差検証を行うことでそこそこの精度のモデルを作ることができました。 このモデルを使って実際に予測を行うには以下のようにします。

val target = ...  //予測したいDataFrame
val cvModel = CrossValidatorModel.load(cvModelPath)
val pipeline = cvModel.bestModel.asInstanceOf[PipelineModel]
val w2vModel = pipeline.stages(0).asInstanceOf[Word2VecModel]
val lrModel = pipeline.stages(1).asInstanceOf[LogisticRegressionModel]

上記のように CrossValidatorModel を読み込んでベストなモデルを取得するか、ベストなモデルを保存しておき、それを利用することで pipeline を作ります。

val predicted = pipeline.transform(target)

predicted.select("description", "prediction").show
+--------------------+----------+
|         description|prediction|
+--------------------+----------+
|Finnley Michael P...|       0.0|
|We are Russian mu...|       1.0|
|The above video i...|       1.0|
|Greetings! I'm Ka...|       0.0|
|Our mother, Dalla...|       0.0|
|Friend,brother,so...|       0.0|
|I am a full time ...|       0.0|
|Hello,my name is ...|       0.0|

そして pipeline.transform(入力DataFrame) を実行することで予測結果のカラム "prediction" が追加されたDataFrame predicted を取得することが出来ます。

処理時間ですが、400万弱のデータに対して上記の予測する部分については10分以下で実行できました。 ただし説明文を単語リストに変換する部分には7,8時間程度かかっていました。形態素解析の処理は重いですね。

予測結果を評価してみる

さて、予測データとして「関連するデータ」を何件抽出することができたでしょうか?

val predictedTrue = predicted.filter("prediction > 0.0").persist
predictedTrue.count

res13: Long = 170967

17万件ほどのデータを取ることが出来ました。 これらのうち真の「関連するデータ」はどれくらいあるのでしょうか?ランダムに20件を抽出して目で見て評価してみました。 すると 20件中15件が「関連するデータ」と判断できました。したがって適合率は75%程度と考えられます。

再現率、すなわち全体の「関連するデータ」のうちどれくらいをカバーしているでしょうか? 事前に行ったデータのランダム抽出とその評価で全データのうち5.5%くらいが「関連するデータ」ではないかと予測していたので、400万件だと22万件の「関連するデータ」があると推測されます。一方でモデルによって抽出された件数は17万件で適合率75%だと大体13万件弱の真の「関連するデータ」を抽出していると考えられます。したがって大体60%弱程度の再現率と判断しました。 期待していたより少し低いかなという印象です。

F値が高いにもかかわらず実データの結果があまり良くない原因としては過学習が考えられるかもしれません。また学習データの取得方法から考えて学習データが十分ではない、偏りがある、また学習データを特徴ベクトルに変換する部分で不十分なところもあるのかもしれません。

Spark での機械学習の実行について

今回、Spark を使って機械学習を試してみましたが、実行しているとジョブが落ちたり性能が極端に落ちたり安定して実行させるのが意外と手間でした。ここでは気がついたポイントなどを簡単に書いておきたいと思います。

なお以下は、弊社のクラスタの環境や使用した学習アルゴリズムに依存している可能性があります。

コードを修正しながらトライ&エラーで実行していて気がついたのですが、速度が大きく異なる場合がありました。何が違うのかJVMのオプションや Spark のパラメータを変えながら調べていたのですが、学習データのDataFrame のパーティション数が大きく関係していることがわかりました。 パーティション数が多くなるにつれ性能が大幅に落ちていきました。 実際にどの程度影響するか確認するため、学習データの数を減らすなどして比較的短い時間で終わるようにしたコードで処理時間を計測してみました。

パーティション数 10 100 500
1回目(秒) 582 521 1504
2回目(秒) 424 523 1170
3回目(秒) 426 510 2193

実行しているクラスタが他のジョブを実行している場合もあるため厳密なものではありませんが、傾向としてパーティション数が増えると処理時間が大きく増えていることがわかります。自分の場合、安定稼働を期待してパーティション数を大きくすることが多いのですが、少なくとも今回使ったアルゴリズムでは安定して動く範囲内でパーティション数を小さくしたほうが良いようです。

実行当初、長く実行している途中で Executor の OutOfMemory などで落ちることがよくありました。そこで こちら やその他のサイトを参考に Java のオプションを Executor, Driver 両側に GCまわりのオプションを追加するなどして調整をしてみました。 また Exectuor, Driver ともにメモリを多めに取るようにしています。 とくに弊社は Spark を YARN で実行していますがコンテナがメモリ不足で kill されることが頻発したため spark.executor.memoryOverhead を多く取るようにしています。今回 Word2Vec を特徴ベクトルに使用していますが、Spark の Word2Vec の実装はメモリをかなり必要とするようでその辺が原因なのかもしれません。

感想

機械学習ではよく言われるように学習データの収集がとても重要だなということを実感しました。実はこのブログの内容は部内勉強会の発表内容をベースにしているのですが、勉強会のときは Re:dashで集めた学習データの精査をしていなかったため、実際にモデルで予測してみると思っていた以上にノイズが除去できていない感じでした。したがって手間はかかっても学習データの品質向上は重要だと思います。

さらにデータの事前調査、予測結果の精査などデータを見ていくという作業がどうしても避けられないので泥臭い根気がいる作業だと思いました。

また学習結果の評価やさらなる改善には学習アルゴリズムや統計などの知識が必要でありその辺が自分は不十分で難しさを感じました。

Sparkで機械学習をする観点では、機械学習というと Python で scikit-learn などを使うのがメジャーなようで、情報が少ないようにも思えます。また学習アルゴリズムのカバー範囲ももう少し広がるとうれしいです。また特定の学習アルゴリズムやあるアルゴリズムの特定のメソッドが大量にメモリを必要としたりするようで分散システムを有効に使えてない感じもありその辺を改善してほしいと思いました。一方で交差検証などを並列化で高速化出来きたりするところは分散システムの大きなメリットな気がします。

参考文献

様々な書籍やサイトなどを参照させてもらっていますが、いくつか参考に載せておきます。

  1. 仕事ではじめる機械学習
  2. Machine Learning Library (MLlib) Guide
  3. Apache Sparkにおけるメモリ - アプリケーションを落とさないメモリ設計手法 -
  4. はじめての word2vec with Spark

Copyright © astamuse company, ltd. all rights reserved.