astamuse Lab

astamuse Labとは、アスタミューゼのエンジニアとデザイナーのブログです。アスタミューゼの事業・サービスを支えている知識と舞台裏の今を発信しています。

Spark でUnit Testを書く

こんにちは、朴と申します。

本日はSpark-testing-baseを使ってSpark処理の単体テストの書き方について触れてみたいと思います。
ローカルで並列処理の単体テスト動かすのは少しハードル高く感じるかもしれませんが、
ちょっとした設定でスムーズに動かせたので、設定から簡単なテストまで書いてみたいと思います。

Spark-testing-baseの概要

ローカルでSparkのプログラムの単体テストを書く為のフレームワークです
複雑な設定をせず簡単にSparkプログラムの初期化、起動ができます。
簡単な紹介など、以下のリンクから確認できます。
Spark-testing-base

設定

とりあえず、公式サイトにある通り設定を行います。

  • sbtプロジェクトのbuild.sbtに以下のdependencyを追加します。
    ※2.2.0の部分は実際使うSparkのバージョンに合わせてください。
"com.holdenkarau" %% "spark-testing-base" % "2.2.0_0.8.0" % "test"


  • OutOfMemoryError 防止の為以下の設定も追加して置きます、メモリは適宜調整してください。
fork in Test := true
javaOptions ++= Seq("-Xms512M", "-Xmx2048M", "-XX:MaxPermSize=2048M", "-XX:+CMSClassUnloadingEnabled")


  • テストでのparallel 実行を無効にする
parallelExecution in Test := false

以上で設定完了です。

書いてみる1

続いて公式サイトのwikiにあるサンプルを一個書いて、動作確認をします。

class test extends FunSuite with DatasetSuiteBase {
  test("simple test") {
    val sqlCtx = sqlContext
    import sqlCtx.implicits._

    val input1 = sc.parallelize(List(1, 2, 3)).toDS
    assertDatasetEquals(input1, input1) // equal

    val input2 = sc.parallelize(List(4, 5, 6)).toDS
    intercept[org.scalatest.exceptions.TestFailedException] {
        assertDatasetEquals(input1, input2) // not equal
    }
  }
}

IntelliJ IDEA起動して動かしますと以下のエラーが出てました。

java.lang.NoClassDefFoundError: org/apache/hadoop/hive/conf/HiveConf$ConfVars
...
...
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.conf.HiveConf$ConfVars

エラー内容でググってみたらどうやらdependencyが足りないようなので、以下を追加します。
sparkVersionはご自身の環境に合わせてください。

"org.apache.spark" %% "spark-hive" % sparkVersion % "test"

今度無事起動するかと思ったら違うエラーが出てました。 どうやらHiveSession起動時にローカルだとうまく行かないみたいですね。

java.lang.IllegalArgumentException: Error while instantiating 'org.apache.spark.sql.hive.HiveSessionStateBuilder':
...
...
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: Error while running command to get file permissions : java.io.IOException: (null) entry in command string: null ls -F C:\tmp\hive

Spark Hiveをoffにすることでエラーは回避できるみたいなので、
上記のテストケースの一番最初に以下を追加します。

class test extends FunSuite with DatasetSuiteBase {
  override implicit def enableHiveSupport: Boolean = false  // 追加
  test("simple test") {
    val sqlCtx = sqlContext
    import sqlCtx.implicits._

    val input1 = sc.parallelize(List(1, 2, 3)).toDS
    assertDatasetEquals(input1, input1) // equal

    val input2 = sc.parallelize(List(4, 5, 6)).toDS
    intercept[org.scalatest.exceptions.TestFailedException] {
        assertDatasetEquals(input1, input2) // not equal
    }
  }
}

これで無事起動できました。

 主なクラスの紹介

  • SharedSparkContext
    SparkContextをローカルで使える形で提供

  • RDDComparisons
    RDD比較用クラス

  • DataFrameSuiteBase
    DataFrameのテスト用

  • DatasetSuitBase
    Datasetのテスト用

  • StreamingSuiteBase
    streamのテスト用

各クラス使い方は公式サイトのwikiに乗ってる通り直感的で分かりやすいので、ここでは割愛します。

自分のテストを書いてみる

いよいよご自身のテストを書きます。
以下のようなクラウドファンディングデータを読み込んでそのデータが期待とおりとなってるか確認します。
以下データ一部※

{
  "raised": {
    "currency": "USD",
    "amount": 1685
  },
  "goal": {
    "currency": "USD",
    "amount": 2000
  },
  "short_description": "to find a better/safer way to diagnose, monitor and guide those with EGID's on their life journey",
  "name": "EoE Chompers",
  "project_id": "193451"
}
{
  "raised": {
    "currency": "USD",
    "amount": 50
  },
  "goal": {
    "currency": "USD",
    "amount": 12000
  },
  "short_description": "Help provide 70 children with a new roof to keep them healthy and a boundary wall to keep them safe!",
  "name": "A New Roof and Safety Wall for 70 children",
  "project_id": "193469"
}

テストコード

class MyTest1 extends FunSuite with DatasetSuiteBase {
  override implicit def enableHiveSupport: Boolean = false

  test("count should be equal") {
    val ds1 = spark.read.json("c:\\temp\\cf_example_projects.json.gz")
    assertTrue(ds1.count() == 38080) // 実際Datasetのレコード件数とファイルの行数が一致するか
  }

  test("id should be unique") {
    val ds1 = spark.read.json("c:\\temp\\cf_example_projects.json.gz")
    val projectIdCount = ds1.select("project_id").distinct().count()
    assertTrue(projectIdCount == 38080)
  }
}

ちょっと進化させてみる

上のファイルを読み込む部分は以下のようにメソッド化して、このメソッドについてテストを書きます

object MyObject {
  case class Cf(projectId : String, name : String, shortDesc : String, currency : String, goalAmount: Double,raisedAmount:Double)
  def json2ds(spark:SparkSession,path:String) : Dataset[Cf] = {
    import spark.implicits._
    val ds1 = spark.read.json(path)
    ds1.map { r =>
      Cf(
        r.getAs[String]("project_id"),
        r.getAs[String]("name"),
        r.getAs[String]("short_description"),
        r.getAs[Row]("goal").getAs[String]("currency"),
        r.getAs[Row]("goal").getAs[Double]("amount"),
        r.getAs[Row]("raised").getAs[Double]("amount")
      )
    }
  }
}


class MyTest2 extends FunSuite with DatasetSuiteBase {
  override implicit def enableHiveSupport: Boolean = false

  test("one of selected record should be exist") {
    val ds1 = MyObject.json2ds(spark,"c:\\temp\\cf_example_projects.json.gz")
    val result = ds1.where($"projectId".equalTo("193469"))
    val excepted = Seq(Cf("193469","A New Roof and Safety Wall for 70 children","Help provide 70 children with a new roof to keep them healthy and a boundary wall to keep them safe!","USD",15000d,50d)).toDS()
    assertDatasetEquals(excepted,result)
  }

  test("no such record") {
    val ds1 = MyObject.json2ds(spark,"c:\\temp\\cf_example_projects.json.gz")
    val result = ds1.where($"projectId".equalTo("xxxxxx"))
    val excepted = Seq(Cf("xxxxxx","A New Roof and Safety Wall for 70 children","Help provide 70 children with a new roof to keep them healthy and a boundary wall to keep them safe!","USD",12000d,50d)).toDS()
    intercept[org.scalatest.exceptions.TestFailedException] {
      assertDatasetEquals(result, excepted)
    }
  }
}

テストデータ作成用のクラス(Generator)も

使う場面がちょっと微妙ですが、テストデータ用のDataset、DataFrameを生成することも出来ます。

サンプルそのまま張り付けておきますが、こんな感じです。
動かすにはhttps://github.com/rickynils/scalacheck/blob/master/doc/UserGuide.mdというものが必要らしいです。

class SampleDatasetGeneratorTest extends FunSuite with SharedSparkContext with Checkers {
  test("test generating Datasets[String]") {
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    val property =
      forAll(DatasetGenerator.genDataset[String](sqlContext)(Arbitrary.arbitrary[String])) {
        dataset => dataset.map(_.length).count() == dataset.count()
      }

    check(property)
  }

}

最後に

Sparkの単体テストについてはメジャーなやつがないのが現状だと認識してますので、
ここで紹介したのは一つの選択肢としてみて頂ければと思います
上記以外にもいくつか選択肢があるみたいですので、色々試してみるとご自身に合うものを見つけられるでしょう。

弊社ではエンジニア・デザイナー絶賛大募集中ですので、興味ある方は気軽にお話でも聞いてみませんか?

Copyright © astamuse company, ltd. all rights reserved.