山縣です。
新年あけましておめでとうございます。
本年もよろしくお願いいたします。
去年弊社の福田が CDH のアップグレードをしてくれてSpark が1.6系になるとともに、並行してSparkの2.0が使えるようになりました。(Spark2.0の導入については福田の記事をどうぞ→ もう待てない、Spark2.0の導入と実践 - astamuse Lab)
現在弊社の環境でインストールされている Spark2.0 はまだベータということもあり、当初は少し触ってみる程度に留めようと考えていました。しかし1系のSparkが1.5→1.6に上がったことで問題に遭遇したこともあり、いくつかのバッチは2.0で動かしています。
今回は1.6や2.0を触っていて遭遇した問題や気になった点などについて書いていきたいと思います。
Spark 1.6 と Tungsten
社内のSparkが1.6系にバージョンアップされ、自分のジョブの一つが途中で abort してしまう現象が起きてしまいました。ログを見ると複数のDataFrameをJOINする処理で OutOfMemory が出ています。
原因が分からずいろいろ調査をしていたのですが、改めてログを見直すとTungsten が有効になっていることに気が付きました。
実は自分の実行している Job では今まで以下のパラメータ指定で Tungsten を無効にしているはずでした。
--conf spark.sql.tungsten.enabled=false
Tungsten は Spark 1.4 から導入されたSpark の性能を上げるための仕組みです。社内のSpark が 1.3系から1.5系に上がったときに、この Tungsten が原因と思われるエラーが起きたため、上記パラメータで Tungsten を無効にすることで回避していました。
もしやと思ってSpark 1.6 のリリースノートを見てみると以下のような記述がありました。
The flag (spark.sql.tungsten.enabled) that turns off Tungsten mode and code generation has been removed. Tungsten mode and code generation are always enabled (SPARK-11644).
Tungstenモードとコード生成を無効にするフラグ spark.sql.tungsten.enabled は削除されました。Tungstenモードとコード生成は常に有効となります。(SPARK-11644)
つまり Tungsten は1.6から常に有効で無効化することができなくなってしまったようです。
その後エラーを回避するためパラメータの調整などを試みましたがうまくいかず、最終手段として Executor のメモリ量を多くすることで何とか回避はできました。
とりあえず回避はできましたが、このやり方ではデータ量が増えたり、処理がより複雑になったりしたときに、問題が再発する恐れがあります。そこで試しにこのバッチを2.0へ移行してみました。
Spark 2.0 への書き換え
1系から2系へのメジャーバージョンアップということで当初は結構大変かと身構えましたが、やってみると意外と問題は少なく小規模な修正で済みました。具体的には以下のような修正をしました。
- Scala のバージョン変更…Scala が 2.10系から2.11系に変わりました。
- Spark ライブラリの変更…build.sbt で指定するSpark ライブラリのバージョンをSpark2.0のものに変更しました。
- SparkSession 対応…Sparkのプログラミングをする上でのエントリポイントとして新しくSparkSession が導入されました。spark-shellにおいてsqlContext:SqlContext が無くなったので spark:SparkSession からSqlContext を取るようにしました。
- spark-csv の置き換え … CSVファイルの入出力をするためのライブラリspark-csv の機能が2.0から標準で含まれるようになったので spark-csv を依存ライブラリから外しました。合わせて csv の入出力周りの処理を修正しました。
- registerTempTable … Dataframe を SparkSQLから使うときにDataFrameのregisterTempTable メソッドでテーブルとして登録します。2.0からはこのメソッドがDeprecatedになったので createOrReplaceTempView に変えました。 (注 2.0 からDataFrameクラスは無くなりDataset になったので正確には Dataset のメソッドになります)
つらつらと書いてみましたが、どれも大した変更ではなく、予想していたよりも楽に移行ができました。実際にビルドして実行してみると1.6で落ちていたジョブは無事に完了することができました。
ただジョブそのものはtaskのエラーもなく進んでいるのに、driverのコンソールにエラーログが大量に表示されたり、途中で止まった処理の情報がコンソールに残ってしまったり、ベータだからか、まだ挙動が少し安定していない雰囲気もありました。そういうこともあり2.0への移行は様子を見ながら必要に応じてと考えています。
と、これを書いている時点で、Cloudera社から Spark 2.0 Release1 が出ていることに気が付きました。いずれ社内にも導入されると期待しています。
Dataset について
Dataset はSpark 1.6から導入されたSparkの新しいデータ形式です。Dataset は従来のDataFrame を拡張し、型パラメータを持ちます。
scala> import org.apache.spark.sql.{Dataset,Row} scala> case class Msg(id:Int, msg:String) scala> val ds:Dataset[Msg] = Seq(Msg(1, "I have a pen."),Msg(2, "I have an apple."),Msg(3, "Do you have a pen?")).toDS() ds: org.apache.spark.sql.Dataset[Msg] = [id: int, msg: string]
型パラメータを持つことでタイプセーフなプログラミングが可能となりました。従来 DataFrame を RDDに変換したりmap()
やforeach()
などで処理する場合、DataFrame の各レコードは Row というクラスで表され、カラムからデータを取り出すときはRowクラスの getAs[T]()
や getString()
, getInt()
などのメソッドでデータを変換して取り出す必要がありました。
Dataset では型パラメータとしてcase class などを指定することが可能で、各行を取り出すとき case class のインスタンスとして取り出すことが可能です。
scala> ds.map(x => x.msg).collect //x の型は Msg res11: Array[String] = Array(I have a pen., I have an apple., Do you have a pen?)
DataFrame では、例えばテーブルAのDataframe (dfA)もテーブルBのDataframe(dfB) も同じDataFrame のインスタンスなので、dfAに対して処理する関数を間違えてdfBに適用したとしてもコンパイルエラーになりません。
しかしDataset を使えば型パラメータが違うのでコンパイル時にバグを発見することができます。下記のように Dataset[Msg] に対して処理する関数procDSMsg()
に Dataset[Person] を渡すと引数の型が違うのでエラーとなります。
scala> case class Person(id:Int, name:String) scala> val ds2 = Seq(Person(1, "Taro"), Person(2, "Jiro"), Person(3, "Subro")).toDS ds2: org.apache.spark.sql.Dataset[Person] = [id: int, name: string] scala> def procDSMsg(ds:Dataset[Msg]):Unit = println("Hello") scala> procDSMsg(ds) Hello scala> procDSMsg(ds2) // 引数の型が違うのでエラーとなる <console>:35: error: type mismatch; found : org.apache.spark.sql.Dataset[Person] required: org.apache.spark.sql.Dataset[Msg] procDSMsg(ds2)
なお 2.0 からは 従来までのDataFrame はクラスとしては無くなり、Dataset に統合されました。DataFrame は以下の通り定義されています。
type DataFrame = Dataset[Row]
つまりDataFrame は Rowを型パラメータとしてもつDatasetということになります。
Dataset用のCase classを半自動生成する
Dataset によりタイプセーフなプログラミングが可能になりましたが、一方でこの case class 誰が作るの?テーブルごとに作るのめんどくさいので自動生成したい、と思うのは自然なことかと思います。
ということでどうしようかなと思って考えたのですが、はじめに思い付いたのはDBライブラリのクラスの自動生成機能を使うことです。
普段利用させていただいている ScalikeJDBC にも sbt のプラグインとしてclassを生成する scalikejdbcGen があります。試しに一つのテーブル用のclassを生成し、必要なところだけを抜き出して試してみました。しかし結果はうまくいきませんでした。理由はscalaikejdbcGen ではDBのカラム名の記法が sneak(例 abc_def_ghi) の場合に、対応するメンバ変数の名前を camel (例 abcDefGhi) にしてくれるのですが、Dataset 側でそういう変換には対応してくれないからでした。また、よくよく考えるとこの方法だと JDBCに対応したRDBMS以外のデータソースに対応できないという問題がありました。
そこで次に思い付いたのが DataFrame/Datasetが保有しているスキーマ情報から生成することです。
DataFrame/Dataset は schema:StructType に、DataFrame/Dataset を構成するカラム情報を保有しています。 StructType は各カラムのカラム名やデータ型などの情報を表すStructField の配列(Array)をデータとして持っていますのでこのデータを使えば case class のメンバーを定義できるはずです。
scala> ds.schema.fields.foreach(println) StructField(id,IntegerType,false) StructField(msg,StringType,true)
そこで、case class を生成する Schema2CaseClass というクラスを作ってみました。
試すにはリンク先のソースコードをコピーして spark-shell の paste で実行します。
scala> :paste // Entering paste mode (ctrl-D to finish) import org.apache.spark.sql.types._ class Schema2CaseClass { ... ... // Exiting paste mode, now interpreting. import org.apache.spark.sql.types._ defined class Schema2CaseClass
使い方は以下のようになります。
scala> val df = Seq(Msg(1, "I have a pen."),Msg(2, "I have an apple."),Msg(3, "Do you have a pen?")).toDF() scala> val s2cc = new Schema2CaseClass scala> import s2cc.implicits._ scala> println(s2cc.schemaToCaseClass(df.schema, "Msg2")) case class Msg2 ( id:Int, msg:Option[String] )
上記のようにDataFrameのスキーマ情報から Msg2 という case class を生成したので実際に試してみます。
scala> :paste // Entering paste mode (ctrl-D to finish) case class Msg2 ( id:Int, msg:Option[String] ) // Exiting paste mode, now interpreting. defined class Msg2 scala> val ds = df.as[Msg2] ds: org.apache.spark.sql.Dataset[Msg2] = [id: int, msg: string] scala> ds.collect res14: Array[Msg2] = Array(Msg2(1,Some(I have a pen.)), Msg2(2,Some(I have an apple.)), Msg2(3,Some(Do you have a pen?)))
上記のように Msg2 を 型パラメータとして指定してDataset を作ることができました。
生成されたMsg2 は msg がOption[String]型になっています。これは元のDataFrameのスキーマ定義で nullable が設定されているからです。
Optionが適切に処理されるのか確認してみます。
scala> val df = Seq(Msg(1, "I have a pen."),Msg(2, "I have an apple."),Msg(3, null)).toDF() df: org.apache.spark.sql.DataFrame = [id: int, msg: string] scala> val ds = df.as[Msg2] ds: org.apache.spark.sql.Dataset[Msg2] = [id: int, msg: string] scala> ds.collect res16: Array[Msg2] = Array(Msg2(1,Some(I have a pen.)), Msg2(2,Some(I have an apple.)), Msg2(3,None)) scala> ds.filter(_.msg.isDefined).collect res17: Array[Msg2] = Array(Msg2(1,Some(I have a pen.)), Msg2(2,Some(I have an apple.)))
上記のように元データが nullの場合は None と変換され、filterなどで処理ができます。
DatasetとDataFrame の関係
Datasetが出たので今後は DataFrame から Dataset に移行が進んでDataFrameは使われなくなっていくのかなと思っていました。しかし実際に使ってみると DataFrame も今後も使われていくという印象を受けました。 Dataset は確かにタイプセーフで良いのですが、例えばJoin した結果などで、それ自体が処理を分割するための中間的なデータであったり、一時的にしか使われないようなデータに対して、わざわざ case class を定義する必要は無いのではと思います。 また SparkSession.sql は DataFrame を返します。 2.0からDataFrameをDataset[Row] としたことでDataFrame/Dataset間がシームレスに使えるようになったことも合わせると今後ともDataFrameとDataset をうまく使い分けていくのが良いのかなと考えています。
終わりに
2016年の Spark は2.0のメジャーバージョンアップも含めて非常に高速に進化していった印象を受けました。年末にはすでに2.1.0も出るなど相変わらずバージョンアップが激しいです。
今年もどのように進化していくのか楽しみなプロダクトです。