こんにちは、朴と申します。
本日はSpark-testing-baseを使ってSpark処理の単体テストの書き方について触れてみたいと思います。
ローカルで並列処理の単体テスト動かすのは少しハードル高く感じるかもしれませんが、
ちょっとした設定でスムーズに動かせたので、設定から簡単なテストまで書いてみたいと思います。
Spark-testing-baseの概要
ローカルでSparkのプログラムの単体テストを書く為のフレームワークです
複雑な設定をせず簡単にSparkプログラムの初期化、起動ができます。
簡単な紹介など、以下のリンクから確認できます。
Spark-testing-base
設定
とりあえず、公式サイトにある通り設定を行います。
- sbtプロジェクトのbuild.sbtに以下のdependencyを追加します。
※2.2.0の部分は実際使うSparkのバージョンに合わせてください。
"com.holdenkarau" %% "spark-testing-base" % "2.2.0_0.8.0" % "test"
- OutOfMemoryError 防止の為以下の設定も追加して置きます、メモリは適宜調整してください。
fork in Test := true javaOptions ++= Seq("-Xms512M", "-Xmx2048M", "-XX:MaxPermSize=2048M", "-XX:+CMSClassUnloadingEnabled")
- テストでのparallel 実行を無効にする
parallelExecution in Test := false
以上で設定完了です。
書いてみる1
続いて公式サイトのwikiにあるサンプルを一個書いて、動作確認をします。
class test extends FunSuite with DatasetSuiteBase { test("simple test") { val sqlCtx = sqlContext import sqlCtx.implicits._ val input1 = sc.parallelize(List(1, 2, 3)).toDS assertDatasetEquals(input1, input1) // equal val input2 = sc.parallelize(List(4, 5, 6)).toDS intercept[org.scalatest.exceptions.TestFailedException] { assertDatasetEquals(input1, input2) // not equal } } }
IntelliJ IDEA起動して動かしますと以下のエラーが出てました。
java.lang.NoClassDefFoundError: org/apache/hadoop/hive/conf/HiveConf$ConfVars ... ... Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.conf.HiveConf$ConfVars
エラー内容でググってみたらどうやらdependencyが足りないようなので、以下を追加します。
sparkVersionはご自身の環境に合わせてください。
"org.apache.spark" %% "spark-hive" % sparkVersion % "test"
今度無事起動するかと思ったら違うエラーが出てました。 どうやらHiveSession起動時にローカルだとうまく行かないみたいですね。
java.lang.IllegalArgumentException: Error while instantiating 'org.apache.spark.sql.hive.HiveSessionStateBuilder': ... ... Caused by: java.lang.RuntimeException: java.lang.RuntimeException: Error while running command to get file permissions : java.io.IOException: (null) entry in command string: null ls -F C:\tmp\hive
Spark Hiveをoffにすることでエラーは回避できるみたいなので、
上記のテストケースの一番最初に以下を追加します。
class test extends FunSuite with DatasetSuiteBase { override implicit def enableHiveSupport: Boolean = false // 追加 test("simple test") { val sqlCtx = sqlContext import sqlCtx.implicits._ val input1 = sc.parallelize(List(1, 2, 3)).toDS assertDatasetEquals(input1, input1) // equal val input2 = sc.parallelize(List(4, 5, 6)).toDS intercept[org.scalatest.exceptions.TestFailedException] { assertDatasetEquals(input1, input2) // not equal } } }
これで無事起動できました。
主なクラスの紹介
SharedSparkContext
SparkContextをローカルで使える形で提供RDDComparisons
RDD比較用クラスDataFrameSuiteBase
DataFrameのテスト用DatasetSuitBase
Datasetのテスト用StreamingSuiteBase
streamのテスト用
各クラス使い方は公式サイトのwikiに乗ってる通り直感的で分かりやすいので、ここでは割愛します。
自分のテストを書いてみる
いよいよご自身のテストを書きます。
以下のようなクラウドファンディングデータを読み込んでそのデータが期待とおりとなってるか確認します。
以下データ一部※
{ "raised": { "currency": "USD", "amount": 1685 }, "goal": { "currency": "USD", "amount": 2000 }, "short_description": "to find a better/safer way to diagnose, monitor and guide those with EGID's on their life journey", "name": "EoE Chompers", "project_id": "193451" } { "raised": { "currency": "USD", "amount": 50 }, "goal": { "currency": "USD", "amount": 12000 }, "short_description": "Help provide 70 children with a new roof to keep them healthy and a boundary wall to keep them safe!", "name": "A New Roof and Safety Wall for 70 children", "project_id": "193469" }
テストコード
class MyTest1 extends FunSuite with DatasetSuiteBase { override implicit def enableHiveSupport: Boolean = false test("count should be equal") { val ds1 = spark.read.json("c:\\temp\\cf_example_projects.json.gz") assertTrue(ds1.count() == 38080) // 実際Datasetのレコード件数とファイルの行数が一致するか } test("id should be unique") { val ds1 = spark.read.json("c:\\temp\\cf_example_projects.json.gz") val projectIdCount = ds1.select("project_id").distinct().count() assertTrue(projectIdCount == 38080) } }
ちょっと進化させてみる
上のファイルを読み込む部分は以下のようにメソッド化して、このメソッドについてテストを書きます
object MyObject { case class Cf(projectId : String, name : String, shortDesc : String, currency : String, goalAmount: Double,raisedAmount:Double) def json2ds(spark:SparkSession,path:String) : Dataset[Cf] = { import spark.implicits._ val ds1 = spark.read.json(path) ds1.map { r => Cf( r.getAs[String]("project_id"), r.getAs[String]("name"), r.getAs[String]("short_description"), r.getAs[Row]("goal").getAs[String]("currency"), r.getAs[Row]("goal").getAs[Double]("amount"), r.getAs[Row]("raised").getAs[Double]("amount") ) } } } class MyTest2 extends FunSuite with DatasetSuiteBase { override implicit def enableHiveSupport: Boolean = false test("one of selected record should be exist") { val ds1 = MyObject.json2ds(spark,"c:\\temp\\cf_example_projects.json.gz") val result = ds1.where($"projectId".equalTo("193469")) val excepted = Seq(Cf("193469","A New Roof and Safety Wall for 70 children","Help provide 70 children with a new roof to keep them healthy and a boundary wall to keep them safe!","USD",15000d,50d)).toDS() assertDatasetEquals(excepted,result) } test("no such record") { val ds1 = MyObject.json2ds(spark,"c:\\temp\\cf_example_projects.json.gz") val result = ds1.where($"projectId".equalTo("xxxxxx")) val excepted = Seq(Cf("xxxxxx","A New Roof and Safety Wall for 70 children","Help provide 70 children with a new roof to keep them healthy and a boundary wall to keep them safe!","USD",12000d,50d)).toDS() intercept[org.scalatest.exceptions.TestFailedException] { assertDatasetEquals(result, excepted) } } }
テストデータ作成用のクラス(Generator)も
使う場面がちょっと微妙ですが、テストデータ用のDataset、DataFrameを生成することも出来ます。
サンプルそのまま張り付けておきますが、こんな感じです。
動かすには
class SampleDatasetGeneratorTest extends FunSuite with SharedSparkContext with Checkers { test("test generating Datasets[String]") { val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val property = forAll(DatasetGenerator.genDataset[String](sqlContext)(Arbitrary.arbitrary[String])) { dataset => dataset.map(_.length).count() == dataset.count() } check(property) } }
最後に
Sparkの単体テストについてはメジャーなやつがないのが現状だと認識してますので、
ここで紹介したのは一つの選択肢としてみて頂ければと思います
上記以外にもいくつか選択肢があるみたいですので、色々試してみるとご自身に合うものを見つけられるでしょう。
弊社ではエンジニア・デザイナー絶賛大募集中ですので、興味ある方は気軽にお話でも聞いてみませんか?