はじめまして山縣です。
私のチームでは弊社のサービスやコンサルティングなどで必要となる特許やその他さまざまなデータを収集・解析しています。特許情報などのデータはデータサイズとして大規模であったり複雑だったりすることもあり Hadoop なども含めたいろいろなミドルウェア・ツールを使っています。
そこで私の記事ではそういった業務に活用しているツールについて書いていきたいと思います。
今回は Spark についてです。(なお Spark が何かなどの解説はいたるところにあると思うので省略します。)
弊社でのSparkの導入は去年の春くらいからはじまりました。はじめは試験的に使っていましたが徐々に利用範囲を広げ、今ではデータの処理には欠かせないものとなっています。 弊社には Cloudera社のCDHを使った Hadoopクラスタがあり、Spark はそのクラスタのYARN上で稼働しています。YARN上で動かすことで Spark専用にリソースを割く必要がなかったことは Spark 導入に大きく寄与したと思います。
Sparkの活用が進む一方で、実際に利用していると運用が難しいと感じることもあります。例えばメモリ周りの調節をしないとExecutor が落ちてしまったり、SparkSQLで複雑なクエリを実行すると処理がこけたりと、いろいろな現象に出くわしました。この辺の問題はSpark自身が急速に進化していく中で改善されてきていると思います。しかしまだまだ何も考えずとも全部Sparkにお任せというところまではきていないかなという印象を受けます。
今回も開発していたSparkのジョブがメモリの問題で落ちるという現象に出くわしましたのでそのことについて書きたいと思います。
なお以下の内容については実際に行ったことをメモして書いていますが、試行錯誤しながら行ったこともあり不正確、曖昧な点もあるかもしれません。ご了承下さい。
問題となった Spark のジョブは HDFS上にある複数種のCSVを読み込み、集計やJoinなどを行い中間的なCSVを出力しながら最終的なCSVを生成するものです。弊社の小さいクラスタでは3時間ほどかかります。 このジョブを実行すると処理が2,30%進んだところでジョブが失敗するという現象に出くわしました。
ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-37] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOf(Arrays.java:3236)
コンソールのエラー出力を見ると、OutOfMemoryError が起きていましたが、Executor 側ではなく Driver 側が原因のようでした。実行環境の都合もあり Driver へのメモリ割り当ては2GB程度と小さくしていましたが、とりあえずの対策として spark-submit の引数で Driver のメモリを4GBまで増やしてみました。 この対応により今まで落ちていたあたりより先まで処理を進めることができました。 しかしそれでも処理が70-80%くらいまで進んだところで同じようにエラーとなり落ちてしまいます。 解決を急ぐならさらにメモリを足すか、Jobを分割すれば対応はできそうでした。しかしそもそも自分としては Driver 側でメモリを消費するような処理(たとえば巨大な DataFrame に対して collect()を行うなどをすると driver 側でメモリを消費します)を入れたつもりがありませんでした。 それにも関わらずこのようにメモリを喰うのはなぜなのか?さすがに気になったので調べてみることにしました。
当初はパラメータをいろいろいじったりしていたのですが、なかなか状況の改善につながりませんでした。そこで実行中の Driver のプロセスのメモリダンプを取ってメモリの利用状況を確認してみました。 メモリのダンプはジョブを実行中の Driver のプロセスのPIDを調べ jmap コマンドで取得しました。 取得したダンプファイルを Eclipse Memory Analyzer(MAT) に読み込ませたところ以下のような結果となりました。
MATではメモリリークの疑いのあるものを自動的に調べて教えてくれます。 これを見るとヒープメモリの7割以上を “org.apache.spark.ui.jobs.JobProgressListner"が占めていることが分かりました。 "spark.ui"というパッケージの下にあること、 JobProgress… という言葉がクラスに使われていることから SparkUI の Job の実行状況を管理するクラスであることが想像できます。 そこで実際にSparkのソースコードを確認してみしました。
class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { ... val activeJobs = new HashMap[JobId, JobUIData] val completedJobs = ListBuffer[JobUIData]() ... val pendingStages = new HashMap[StageId, StageInfo] val activeStages = new HashMap[StageId, StageInfo]
このクラスは scala の mutable な collection である HashMap や ListBuffer のメンバを多数持っています。 したがってジョブが長時間走るとメンバー変数のデータ量が増え、それがメモリを圧迫していると想像されます。 もう少しコードを見ていたら以下の様な記述を見つけました。
val retainedStages = conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES) val retainedJobs = conf.getInt("spark.ui.retainedJobs", SparkUI.DEFAULT_RETAINED_JOBS)
UIまわりのパラメータですね。 ドキュメントを見ると以下のように、ジョブ数、ステージ数をいくつまで保存しておくかを指定するパラメータで、デフォルトで1000となっています。
Property Name | Default | Meaning |
---|---|---|
spark.ui.retainedJobs | 1000 | How many jobs the Spark UI and status APIs remember before garbage collecting. |
spark.ui.retainedStages | 1000 | How many stages the Spark UI and status APIs remember before garbage collecting. |
今回のジョブでは落ちるところまでで、ステージ数で2、300、ジョブ数はもっと少なかったと記憶しています。 つまり実行中の Job, Stage 情報が削除されずどんどんメモリに追加されていっていたと考えられます。 したがってこのパラメータを小さくすれば安定して動くのではないかと考えました。
spark-submit \ ... --conf spark.ui.retainedJobs=10 \ --conf spark.ui.retainedStages=20 \ ...
そこで上記のように極端に少ないジョブ数、ステージ数を保持するようにして実行してみました。 その結果、無事にジョブを最後まで実行することができました。 念のため、7,80%処理が進んだ状況でメモリダンプを取ってみました。
これを見ると先ほどと異なり JobProgressListner は出てきていません。期待通りにメモリ使用量を減らせたと考えられます。
今回は結果をわかりやすく確認するためパラメータをかなり小さくしましたが、実際には Driver 側のメモリ量に応じて調整するとよいのではないかと思います。
今回のトラブルは、はじめからこれらのパラメータを理解していればパラメータ調整だけで済む話でした。ですが手間をかけたおかげでメモリの使用状況を確認したりSparkのソースを見たりしたことで、多少とはいえ Sparkへの理解も深めることができたと思っています。
Sparkはここ当面は大規模データ処理関連ではホットなプロダクトとなると考えています。 今後も折を見て内部構造なども理解できるようにしていきたいと思います。