astamuse Lab

astamuse Labとは、アスタミューゼのエンジニアとデザイナーのブログです。アスタミューゼの事業・サービスを支えている知識と舞台裏の今を発信しています。

Spark で機械学習を社内データに適用してみた

山縣です。夏休みの宿題のようにブログの当番が回ってきました。

機械学習が非常に注目を浴びている今日このごろですが、私もデータ関連を扱うソフトエンジニアの端くれとして機械学習について学んだり、機械学習のアルゴリズムを時々試したりしています。

機械学習は面白いとは思うのですが、いざ実際に業務に適用しようとするとなかなか難しいなあと感じることもあります。ちょっと試してみると思ったような精度が出なかったり、機械学習でできないかというような要望と、機械学習できそうなこと(自分自身の知識的な問題も含む)に隔たりがある気がします。

今回は比較的扱いやすそうな課題があったので、ものは試しに機械学習でやってみました的なところを書いてみたいと思います。 また機械学習のプラットフォームとして Spark を使っているのでそのあたりについても書いてみました。

残念ながら機械学習や統計などについての十分な知識や経験があるわけではないので、おかしなことをやっている可能性もありますのでその点ご了承ください。

何をするか

弊社では特許を含めて様々なデータを収集し、Webサービスやコンサルティングに活用しています。その中の一つとしてクラウドファンディングサービスのデータがあります。海外も含めたクラウドファンディングのサービスのデータを集積したデータベースです。

以前、コンサルティングチームとのミーティングでクラウドファンディングのデータは彼らのコンサルティング業務に関連しないデータが多数あるのでそういったものを除去できないか、という話がでました。 その時は学習データを用意するのが難しいということで実際に進めるところまではいきませんでした。

ただ問題はデータを2つに分類する2値分類であることから機械学習を適用するには良い問題かなと思っていました。そこで今回は「クラウドファンディングのデータを自社業務に役立ちそうなデータとそうでないデータに分類する」という問題に取り組んでみました。

対象データの確認をする

対象となるデータベースには、500万件ほどのデータで、クラウドファンディングのサービス名、タイトルや説明文、開始日、目標金額、獲得金額など様々なデータ項目があります。またデータは多言語で格納されています。 サービス名、獲得金額などはデータを分類する上で重要な情報になりそうですが、今回はシンプルに説明文だけを特徴データとして使うことにしました。また対象となる言語は英語のみにして、原文が英語、または英語の翻訳があるデータについてのみを対象としました。なおこのようなデータは500万件のうち400万件程度でした。

分類の定義を決める

データを業務に関連するデータとそうでないデータに分けるのが今回の課題ですが、関連する、しないはどう定義すれば良いでしょうか?本来はきちんと検討して決める必要がありますが、今回は試験的なものでもあるので私自身がざっくりと決めました。

今回は「関連するデータ」を「何らかの製品やサービスなどの開発をしているもの」と定義してみました。ただし芸術系(音楽、映像、ゲーム)などは除きます。 ではそのようなデータはどれくらいあるのでしょうか? ランダムにデータを取得して確認する作業をしました。

その結果 200件中11件の「関連するデータ」を見つけることができました。全体の 5.5% くらいが「関連するデータ」と推測されます。2値分類をする上で分類が大きく偏っているのが気になりますがとりあえずこれで進めました。

学習データを集める

上記で分類を定義したので、それに基づき機械学習に使う学習データを収集する必要があります。一件一件データを見て分類していくのが確実ではありますが、それでは手間がかかりすぎます。そこで別の方法を考えました。

対象データは社内に提供するにあたって BigQuery にデータを入れ、 Re:dash 経由で見れるようにしてあります。

そこで BigQueryで試行錯誤しながらデータを集めることにしました。説明文データにそれっぽいキーワードで検索をかけたり逆にキーワードを除外する、また調達金額などの属性情報を利用して絞り込みました。 このようにしてRe:dash で SQL により集めたデータを CSV でダウンロードして間違っているデータの除外作業を行いました。この作業はなかなか大変でした。

これらの作業により最終的に「関連するデータ」4517件、「関連しないデータ」9257件、計13774件の学習データを作りました。

どのアルゴリズムを使うか

Spark の MLlib には分類問題を扱う様々なアルゴリズムが実装されていますが、今回はロジスティック回帰を使いました。深い理由は特にないのですが、最も基本的な学習アルゴリズムらしいことと、MLlib の 分類・回帰アルゴリズムのドキュメントでも一番始めに記述されているのでまずはこれで、という感じです。もし十分な精度が出ないなら他のも試そうということで気軽に決めました。

特徴データとしては説明文のテキストデータを使います。CoreNLP を使って形態素解析を行い、Word2Vec により特徴データを作ります。

MLlib で処理を実装する

MLlib で実際に学習するまでの流れは次のとおりです。

まずは学習データを Dataset(DataFrame) にロードします。学習データは関連するものと、そうでないものを同じフォーマットの別 CSV ファイルに保存しています。

val trainTrue = ss.read.format("org.apache.spark.csv").option("header", true)
  .option("inferSchema", true).csv( "関連するデータファイルへのパス").withColumn("label", lit(1))
val trainFalse = ss.read. ...  csv("関連しないデータファイルへのパス").withColumn("label", lit(0))
val train1 = trainTrue.union(trainFalse)

データをロードすると同時に label カラムを定義し、関連するデータには 1 を、そうでないものには 0 を設定し、union で一つの学習データ train1 を作ります。

次にロジスティック回帰に使う特徴ベクトルを作成します。 学習に使うデータは説明文のテキストデータなので、これをWord2Vec の入力データに出来るように単語のリストに変換する必要があります。説明文の形態素解析には CoreNLP を使いました。また記号などのストップワードを除去したり、数字を固定文字列に置き換えるなどの処理をしました。Word2vec への入力としては CoreNLP で取得した lemma を使います。

例えば "Mount Fuji (Fujisan), located on Honshu, is the highest mountain in Japan." という文章は [Mount, Fuji, Fujisan, located, on, Honshu, be, highest, mountain, in, Japan] という単語リストに変換されます。

scala> def nlp(t:DataFrame, ss:SparkSession):DataFrame = ...

scala> val train2 = nlp(train, ss) ...

scala> train2.select("lemma").show
+--------------------+
|               lemma|
+--------------------+
|[Whisper, Noise, ...|
|[special, dedicat...|
|[Legion, Meter, c...|
...

次に以下のように Word2Vec インスタンスを生成します。

  import org.apache.spark.ml.feature.Word2Vec
  val word2Vec = new Word2Vec()
    .setInputCol("lemma")
    .setOutputCol("features")
    .setVectorSize(100)

Scalaの 機械学習ライブラリはRDDベースの org.apache.spark.mllib と DataFrame ベースのorg.apache.spark.mlの2つのパッケージがありますが、すでに DataFrame ベースの API がメインとなっているのでこちらを使います。

単語のリストは DataFrame の lemma というカラムに保存されています。そして Word2Vec の結果は features というカラムに保存します。それ以外に Word2Vec のハイパーパラメータを適当に決めています。

  > val word2vecModel = word2Vec.fit(train2)
  > val train3 = word2vecModel.transform(train2)
  > train3.select($"features").show
+--------------------+
|            features|
+--------------------+
|[-0.0451879731548...|
|[-0.0263045642996...|

実際に学習をするのは fit() メソッドになります。学習することで Word2VecModel のインスタンスであるモデルデータを得ることが出来ます。そしてこのモデルデータに対して transform() メソッドを呼ぶことで特徴ベクトルを生成することが出来ます。

ちなみに Word2VecModel に対しては類似語を表示させたりすることが出来ます。

scala> word2vecModel.findSynonyms("accident", 5).show(truncate=false)
+-------+------------------+
|word   |similarity        |
+-------+------------------+
|injure |0.6633260250091553|
|acident|0.6160710453987122|
|wreck  |0.604138970375061 |
|icu    |0.604096531867981 |
|rte    |0.5960714817047119|
+-------+------------------+

特徴ベクトルが出来たのでロジスティック回帰を定義します。

val lr = new LogisticRegression()
  .setFeaturesCol("features")
  .setLabelCol("label")
  .setMaxIter(1000)
  .setRegParam(0.3)
  .setElasticNetParam(0.8)

ロジスティック回帰が学習する特徴ベクトルのカラムは features になります。そして対応するラベルは labelになります。学習は繰り返し行われますが、その最大数を 1000 に指定します。regParam は過学習を避けるための正則項の重みになりデフォルトでは 0(正則項無し)です。elasticNetParam は正則項として第一正則項と第二正則項というものがあり、その割合を指定します。デフォルトは0で第二正則項のみを使う設定になっています。どう指定するのが良いのかわからないのでとりあえずドキュメントのサンプルの値をそのまま指定します。 学習は以下のように行われます。Word2Vec と同じく fit() を呼び出します。

val lrModel = lr.fit(train3)

なお上記までに各アルゴリズムを独立して実行する方法を見ましたが、 MLlib では 学習の流れを Pipelineとして定義して一括して実行することができます。

val pipeline = new Pipeline().setStages(
  Array(word2Vec, lr)
)

val pmodel = pipeline.fit(train2)

学習とその結果を見る

実際に上記のpipeline を使って実行してみます。 実行環境は以下のようになっています。

  • CDH 5.13
  • Spark 2.3.0 (YARN)
  • Executor数 6
  • Core数 25 (total)
  • Memory 76GB (total)

実行時間は5分以下でした。

pmodel は PipelineModel のインスタンスであり、 pipeline で定義したアルゴリズムのモデルを保存しています。 以下のようにすることで、Word2Vec, LogisticRegression の学習モデルを取得することが出来ます。

val w2vModel = pmodel.stages(0).asInstanceOf[Word2VecModel]
val lrModel = pmodel.stages(1).asInstanceOf[LogisticRegressionModel]

F値等の値は以下のようになりました。

scala> lrModel.summary.labels
res19: Array[Double] = Array(0.0, 1.0)

scala> lrModel.summary.fMeasureByLabel
res2: Array[Double] = Array(0.8035621198957428, 0.0013250883392226147)

scala> lrModel.summary.recallByLabel
res3: Array[Double] = Array(0.9991357891325483, 6.641576267434137E-4)


scala> lrModel.summary.precisionByLabel
res4: Array[Double] = Array(0.6720191818644191, 0.2727272727272727)

label 1.0 が、「関連するデータ」になりますが、F値がかなり低くこれでは使い物になりません。

交差検証でハイパーパラメータを決める

上記の学習では各アルゴリズムに使うハイパーパラメータを適当に固定したものを使っていました。 しかし実際にはどういう値が適切なのかはわかりません。そこで性能を改善するため交差検証によりパラメータの最適化を試みます。 Spark では CrossValidator という交差検証をするクラスが提供されています。 交差検証で調べたいパラメータのリストを渡すことでそれらのパラメータの組み合わせについて学習をして性能を評価し、最適なパラメータを決めてくれます。

val paramGrid = new ParamGridBuilder()
  .addGrid(w2v.vectorSize, Array(100, 300))
  .addGrid(w2v.minCount, Array(3, 5))
  .addGrid(lr.regParam, Array(0.01, 0.05, 0.1, 0.3))
  .addGrid(lr.elasticNetParam, Array(0.0, 0.5))
  .build()

val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(new BinaryClassificationEvaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(5)
  .setParallelism(8)

まず、 ParamGridBuilder を使って、交差検証で検証したいハイパーパラメータとその値を指定します。 上記では Word2Vec の vectorSize(次元数)、minCount(単語の最小頻度数)、LogisticRegression の regParam, elasticNetParam を指定しています。

そして CrossValidator クラスには、学習する pipeline と上記のパラメータの組み合わせ等を指定します。evaluator を設定する必要がありますが今回は2値分類なので BinaryClassificationEvaluator を設定します。numFolds はデータの分割数です。交差検証では、与えられた学習データを numFolds で指定した数に分割して、そのうちの一つを検証用に残りを学習用に使います。parallelism は並列処理数です。

numFoldsは今回5にしたので検証用と学習用のデータの組み合わせは5個あります。また paramGrid で指定したパラメータの組み合わせは 2x2x4x2=32 です。したがって全部で160回学習が行われます。

交差検証の実行は以下のように行います。

val cvModel = cv.fit(train2)

並行実行したとしても今回は 160回学習を行うため処理時間は長くなります。 今回は約90分ほどかかりました。160回の学習を順番にしていた場合、ひとつ4分ほどで終了したとしても640分かかっていたと予想されるので並列化の効果はかなりあると考えられます。

cvModel は CrossValidationModel のインスタンスです。以下のようにベストの学習モデルを取得することができます。

val bestModel = cvModel.bestModel.asInstanceOf[PipelineModel]
val bestLrModel = bestModel.stages(1).asInstanceOf[LogisticRegressionModel]
val bestW2vModel = bestModel.stages(0).asInstanceOf[Word2VecModel]

ベストのモデルのF値等を見てみます。

printf("labels: %s\n", bestLrModel.summary.labels.mkString(", "))
printf("fMeasure: %s\n", bestLrModel.summary.fMeasureByLabel.mkString(", "))
printf("precision: %s\n", bestLrModel.summary.precisionByLabel.mkString(", "))
printf("recall: %s\n", bestLrModel.summary.recallByLabel.mkString(", "))


labels: 0.0, 1.0
fMeasure: 0.9499597747385358, 0.8952038638661126
precision: 0.9433319130805283, 0.9085727314181486
recall: 0.9566814302689857, 0.8822227141908346

ラベル1.0 「関連するデータ」のF値が大きく改善していることがわかります。 ベストモデルのハイパーパラメータは以下のように確認することが出来ます。

scala> bestW2vModel.getVectorSize
res12: Int = 300

scala> bestW2vModel.getMinCount
res13: Int = 3

scala> bestLrModel.getRegParam
res14: Double = 0.01

scala> bestLrModel.getElasticNetParam
res15: Double = 0.0

Word2Vec についてはベクトルの次元数が多いほうが良いことがわかりました。100次元だと十分に表現できていないのでしょうか?また minCount は小さい方が良いようです。これはあまり単語を除去しすぎると文章のコンテキストが失われるとかがあるのでしょうか? LogisticRegression の regParam は指定した中で一番小さい値が選択されました。 一方で elasticNetParam は 0.0 で第2正則項のみのほうが良い値が出たようです。

最後に再利用可能なようにモデルを保存します。

cvModel.write.overwrite.save("保存先のパス")

上記では上書きOKで保存しています。 もし ベストなものだけで良いのなら bestModel について同様に write() で保存することもできます。 ちなみに LogisticRegressionModel の summary は保存されないので F値等を残しておきたい場合は自前で記録しておく必要があるようです。

予測をしてみる

交差検証を行うことでそこそこの精度のモデルを作ることができました。 このモデルを使って実際に予測を行うには以下のようにします。

val target = ...  //予測したいDataFrame
val cvModel = CrossValidatorModel.load(cvModelPath)
val pipeline = cvModel.bestModel.asInstanceOf[PipelineModel]
val w2vModel = pipeline.stages(0).asInstanceOf[Word2VecModel]
val lrModel = pipeline.stages(1).asInstanceOf[LogisticRegressionModel]

上記のように CrossValidatorModel を読み込んでベストなモデルを取得するか、ベストなモデルを保存しておき、それを利用することで pipeline を作ります。

val predicted = pipeline.transform(target)

predicted.select("description", "prediction").show
+--------------------+----------+
|         description|prediction|
+--------------------+----------+
|Finnley Michael P...|       0.0|
|We are Russian mu...|       1.0|
|The above video i...|       1.0|
|Greetings! I'm Ka...|       0.0|
|Our mother, Dalla...|       0.0|
|Friend,brother,so...|       0.0|
|I am a full time ...|       0.0|
|Hello,my name is ...|       0.0|

そして pipeline.transform(入力DataFrame) を実行することで予測結果のカラム "prediction" が追加されたDataFrame predicted を取得することが出来ます。

処理時間ですが、400万弱のデータに対して上記の予測する部分については10分以下で実行できました。 ただし説明文を単語リストに変換する部分には7,8時間程度かかっていました。形態素解析の処理は重いですね。

予測結果を評価してみる

さて、予測データとして「関連するデータ」を何件抽出することができたでしょうか?

val predictedTrue = predicted.filter("prediction > 0.0").persist
predictedTrue.count

res13: Long = 170967

17万件ほどのデータを取ることが出来ました。 これらのうち真の「関連するデータ」はどれくらいあるのでしょうか?ランダムに20件を抽出して目で見て評価してみました。 すると 20件中15件が「関連するデータ」と判断できました。したがって適合率は75%程度と考えられます。

再現率、すなわち全体の「関連するデータ」のうちどれくらいをカバーしているでしょうか? 事前に行ったデータのランダム抽出とその評価で全データのうち5.5%くらいが「関連するデータ」ではないかと予測していたので、400万件だと22万件の「関連するデータ」があると推測されます。一方でモデルによって抽出された件数は17万件で適合率75%だと大体13万件弱の真の「関連するデータ」を抽出していると考えられます。したがって大体60%弱程度の再現率と判断しました。 期待していたより少し低いかなという印象です。

F値が高いにもかかわらず実データの結果があまり良くない原因としては過学習が考えられるかもしれません。また学習データの取得方法から考えて学習データが十分ではない、偏りがある、また学習データを特徴ベクトルに変換する部分で不十分なところもあるのかもしれません。

Spark での機械学習の実行について

今回、Spark を使って機械学習を試してみましたが、実行しているとジョブが落ちたり性能が極端に落ちたり安定して実行させるのが意外と手間でした。ここでは気がついたポイントなどを簡単に書いておきたいと思います。

なお以下は、弊社のクラスタの環境や使用した学習アルゴリズムに依存している可能性があります。

コードを修正しながらトライ&エラーで実行していて気がついたのですが、速度が大きく異なる場合がありました。何が違うのかJVMのオプションや Spark のパラメータを変えながら調べていたのですが、学習データのDataFrame のパーティション数が大きく関係していることがわかりました。 パーティション数が多くなるにつれ性能が大幅に落ちていきました。 実際にどの程度影響するか確認するため、学習データの数を減らすなどして比較的短い時間で終わるようにしたコードで処理時間を計測してみました。

パーティション数 10 100 500
1回目(秒) 582 521 1504
2回目(秒) 424 523 1170
3回目(秒) 426 510 2193

実行しているクラスタが他のジョブを実行している場合もあるため厳密なものではありませんが、傾向としてパーティション数が増えると処理時間が大きく増えていることがわかります。自分の場合、安定稼働を期待してパーティション数を大きくすることが多いのですが、少なくとも今回使ったアルゴリズムでは安定して動く範囲内でパーティション数を小さくしたほうが良いようです。

実行当初、長く実行している途中で Executor の OutOfMemory などで落ちることがよくありました。そこで こちら やその他のサイトを参考に Java のオプションを Executor, Driver 両側に GCまわりのオプションを追加するなどして調整をしてみました。 また Exectuor, Driver ともにメモリを多めに取るようにしています。 とくに弊社は Spark を YARN で実行していますがコンテナがメモリ不足で kill されることが頻発したため spark.executor.memoryOverhead を多く取るようにしています。今回 Word2Vec を特徴ベクトルに使用していますが、Spark の Word2Vec の実装はメモリをかなり必要とするようでその辺が原因なのかもしれません。

感想

機械学習ではよく言われるように学習データの収集がとても重要だなということを実感しました。実はこのブログの内容は部内勉強会の発表内容をベースにしているのですが、勉強会のときは Re:dashで集めた学習データの精査をしていなかったため、実際にモデルで予測してみると思っていた以上にノイズが除去できていない感じでした。したがって手間はかかっても学習データの品質向上は重要だと思います。

さらにデータの事前調査、予測結果の精査などデータを見ていくという作業がどうしても避けられないので泥臭い根気がいる作業だと思いました。

また学習結果の評価やさらなる改善には学習アルゴリズムや統計などの知識が必要でありその辺が自分は不十分で難しさを感じました。

Sparkで機械学習をする観点では、機械学習というと Python で scikit-learn などを使うのがメジャーなようで、情報が少ないようにも思えます。また学習アルゴリズムのカバー範囲ももう少し広がるとうれしいです。また特定の学習アルゴリズムやあるアルゴリズムの特定のメソッドが大量にメモリを必要としたりするようで分散システムを有効に使えてない感じもありその辺を改善してほしいと思いました。一方で交差検証などを並列化で高速化出来きたりするところは分散システムの大きなメリットな気がします。

参考文献

様々な書籍やサイトなどを参照させてもらっていますが、いくつか参考に載せておきます。

  1. 仕事ではじめる機械学習
  2. Machine Learning Library (MLlib) Guide
  3. Apache Sparkにおけるメモリ - アプリケーションを落とさないメモリ設計手法 -
  4. はじめての word2vec with Spark

Copyright © astamuse company, ltd. all rights reserved.