こんにちは!Spark大好きな朴です。
本日はSpark 2.0で大幅の改善が行われてたSpark SQLについて書こうと思います。
弊社ではCDHのバージョンアップをこまめに行うことでSpark,HBaseなどのプロダクトの最新バージョンを常に試せる環境を作っております。
Spark 2.0についても先日弊社福田のもう待てない、Spark2.0の導入と実践にも書いてたとおり
もう使えるようになりました。
ということで少し乗り遅れた感もありますが、本日はSpark 2.0でSpark SQLの実力を試したいと思います。
Spark 2.0でSpark SQLの主な変更点は以下の3つ
- SparkSession
- 性能改善
- サポートするSQLが増えた
本日は上記3つの改善について触れてみたいと思います。
【変更その1】 SparkSQLのニューエントリポイントSparkSession
Spark 1.6まではユーザは以下のようにSqlContext(SparkSQL)とHiveContext(HiveQL)を使い分ける必要がありました。
// Spark SQL
val sc = new SparkContext(spConf)
val sqlContext = new SQLContext(sc)
sqlContext.sql("select * from hoge").show()
// HiveQL
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
hiveContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
hiveContext.sql("LOAD DATA LOCAL INPATH 'temp/kv1.txt' INTO TABLE src")
hiveContext.sql("FROM src SELECT key, value").collect().foreach(println)
Spark 2.0では以下のようSparkSessionでSparkSQL,HiveQL両方書けるようになりました。
val spark = SparkSession.builder()
.master("yarn")
.appName("Test")
.getOrCreate()
// Spark SQL
spark.sql("select * from hoge").show()
// HiveQL
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
spark.sql("LOAD DATA LOCAL INPATH 'temp/kv1.txt' INTO TABLE src")
spark.sql("FROM src SELECT key, value").collect().foreach(println)
【変更その2】 性能面での改善
SQLおよびDataFrame関連の処理にてwhole stage code generationの新技術を使うことでおよそ2-10倍の性能改善を実現したそうです。
リリースノートの原文はこちら
それでは実際どうなのかちょっと試してみたいと思います。
以下同じ条件でSpark 1.6 と Spark 2.0でいくつか簡単なSQLを試してどれぐらい実行スピードが違うかを実験してみます。
データソースはwikipediaのデータを使います。
こちらのリンクよりenwiki-20170201-abstract.xmlをダウンロードします。
https://dumps.wikimedia.org/enwiki/20170201/
上記データは英語wikiの概要とタイトル、urlを含めたxmlになります。
<doc>
<title>Wikipedia: Anarchism</title>
<url>https://en.wikipedia.org/wiki/Anarchism</url>
<abstract>Anarchism is a political philosophy that advocates self-governed societies based on voluntary institutions.</abstract>
<links>
<sublink linktype="nav"><anchor>External links</anchor><link>https://en.wikipedia.org/wiki/Anarchism#External_links</link></sublink>
</links>
</doc>
<doc>
..
..
</doc>
- 前準備として上記のデータをhdfsにアップします。
hadoop fs -put enwiki-20170201-abstract.xml temp/
- 以下のコマンドでspark2-shellを起動します。
spark2-shell --master yarn \
--driver-memory 2G --executor-memory 4G \
--packages com.databricks:spark-xml_2.10:0.4.1 \
--conf spark.serializer="org.apache.spark.serializer.KryoSerializer"
※注 spark2.0ではspark-xmlのバージョンを0.4.1以上に指定する必要があります。
scala> val enwiki=spark.read.format("com.databricks.spark.xml").option("rowTag", "doc").option("inferSchema", "true").load("temp/enwiki-20170201-abstract.xml")
scala> enwiki.printSchema
root
|-- abstract: string (nullable = true)
|-- links: string (nullable = true)
|-- title: string (nullable = true)
|-- url: string (nullable = true)
scala> enwiki.createOrReplaceTempView("enwiki")
scala > def showTiming[T](proc: => T): T = {
val start=System.nanoTime()
val res = proc
val end = System.nanoTime()
val esplatedSec = java.util.concurrent.TimeUnit.SECONDS.convert((end-start), java.util.concurrent.TimeUnit.NANOSECONDS)
println("Time elapsed: " + esplatedSec + " secs")
res
}
性能検証1 件数count
Spark 2.0
scala> showTiming{ enwiki.count() }
Time elapsed: 116 secs
res4: Long = 5328417
結果でました。実行時間は116秒、件数は532万件ちょっとありました。
同じ処理をSpark 1.6で実行してみた結果
scala> showTiming{enwiki.count()}
Time elapsed: 146 secs
res4: Long = 5328417
こちらは実行時間が30秒増えてますね。
劇的ではありませんが、DataFrameの件数カウントではSpark2.0のほうが早いことが分かります。
性能検証2 like検索
Spark 2.0
scala> showTiming{ spark.sql("select count(*) from enwiki where abstract like '%Robot%'").show() }
+--------+
|count(1)|
+--------+
| 769|
+--------+
Time elapsed: 100 secs
Spark 2.0でlike検索の実行時間は116秒、件数は769件ありました
同じ処理をSpark 1.6で実行してみた結果
scala> showTiming{ sqlContext.sql("select count(*) from enwiki where abstract like '%Robot%'").show() }
+---+
|_c0|
+---+
|769|
+---+
Time elapsed: 143 secs
こちらは実行時間が43秒増えてますね。countと同じくSpark2.0のほうが性能的には上ですね
性能検証3 join処理
join系の処理を試してみたいと思います。
enwikiとjoinするためのデータを準備するためにsimplewiki-20170201-abstract.xmlを
以下のリンクよりダウンロードします。
https://dumps.wikimedia.org/enwiki/20170201/
データの構造はenwikiと同じでurlが違うだけです。
<doc>
<title>Wikipedia: April</title>
<url>https://simple.wikipedia.org/wiki/April</url>
<abstract>April is the fourth month of the year, and comes between March and May. It is one of four months to have 30 days.</abstract>
<links>
<sublink linktype="nav"><anchor>The Month</anchor><link>https://simple.wikipedia.org/wiki/April#The_Month</link></sublink>
</links>
</doc>
<doc>
..
..
</doc>
hadoop fs -put simplewiki-20170201-abstract.xml temp/
- simplewikiのデータをロードしてDataFrameに変換します。
scala> val simplewiki=spark.read.format("com.databricks.spark.xml").option("rowTag", "doc").option("inferSchema", "true").load("temp/simplewiki-20170201-abstract.xml")
scala> simplewiki.printSchema
root
|-- abstract: string (nullable = true)
|-- links: string (nullable = true)
|-- title: string (nullable = true)
|-- url: string (nullable = true)
scala> simplewiki.createOrReplaceTempView("simplewiki")
- 下記のSQLでenwikiとsimplewikiで同じタイトルのデータ同士でinner joinをします。
Spark 2.0
scala> showTiming{spark.sql("select count(*) from enwiki en inner join simplewiki simple on (en.title = simple.title)").show()}
+--------+
|count(1)|
+--------+
| 107550|
+--------+
Time elapsed: 128 secs
Spark 2.0では128秒かかりました。
Spark 1.6
scala> showTiming{sqlContext.sql("select count(*) from enwiki en inner join simplewiki simple on (en.title = simple.title)").show()}
+------+
| _c0|
+------+
|107550|
+------+
Time elapsed: 152 secs
Spark 1.6では152秒となり、Spark2.0より遅いことが分かります。
とりあえず、以上で簡単にcount,like検索,joinでSpark 2.0とSpark 1.6でSQLの性能検証を実施してみました。
どの処理もSpark 2.0で性能が改善されてることが分かりました。
【変更その3】 SQL2003をサポート、99個のTPC-DS クエリサポート
主には以下可能になりました。
- ANSI-SQLとHive QLの両方をサポートするネイティブSQLパーサー
- DDLが書けるようになった
- Subqueryが書けるようになった
- in,exists,not in, not existsなどなど(これは嬉しいー)
- View canonicalization support (viewが使えるようになった?)
それではサブクエリ関連ちょっと触ってみたいと思います。
- 給料を管理するテーブル(DataFrame)を定義
scala> val df1 = spark.createDataFrame(Array(("1001",1000),("1002",2000),("1003",1300),("1004",1500),("1005",1600),("1006",1700),("1007",1600),("1008",1700),("1009",1000),("1010",1900)))
df1: org.apache.spark.sql.DataFrame = [_1: string, _2: int]
scala> val salary=df1.withColumnRenamed("_1", "staffId").withColumnRenamed("_2", "salary")
salary: org.apache.spark.sql.DataFrame = [staffId: string, salary: int]
scala> salary.show()
+-------+------+
|staffId|salary|
+-------+------+
| 1001| 1000|
| 1002| 2000|
| 1003| 1300|
| 1004| 1500|
| 1005| 1600|
| 1006| 1700|
| 1007| 1600|
| 1008| 1700|
| 1009| 1000|
| 1010| 1900|
+-------+------+
- 社員情報を管理するテーブル(DataFrame)を定義
scala> val df2 = spark.createDataFrame(Array(("1001","Staff1","Marketing"),("1002","Staff2","Marketing"),("1003","Staff3","Marketing"),("1004","Staff4","Sales"),("1005","Staff5","Sales"),("1006","Staff6","Sales"),("1007","Staff7","Sales"),("1008","Staff8","Development"),("1009","Staff9","Development"),("1010","Staff10","Development")))
df2: org.apache.spark.sql.DataFrame = [_1: string, _2: string ... 1 more field]
scala> val depart=df2.withColumnRenamed("_1", "staffId").withColumnRenamed("_2", "staffName").withColumnRenamed("_3", "depart")
depart: org.apache.spark.sql.DataFrame = [staffId: string, staffName: string ... 1 more field]
scala> depart.show
+-------+---------+-----------+
|staffId|staffName| depart|
+-------+---------+-----------+
| 1001| Staff1| Marketing|
| 1002| Staff2| Marketing|
| 1003| Staff3| Marketing|
| 1004| Staff4| Sales|
| 1005| Staff5| Sales|
| 1006| Staff6| Sales|
| 1007| Staff7| Sales|
| 1008| Staff8|Development|
| 1009| Staff9|Development|
| 1010| Staff10|Development|
+-------+---------+-----------+
scala> salary.createOrReplaceTempView("salary")
scala> depart.createOrReplaceTempView("depart")
scala> spark.sql("select * from salary s where exists (select 1 from depart d where s.staffId = d.staffId and d.depart = 'Marketing')").show()
+-------+------+
|staffId|salary|
+-------+------+
| 1001| 1000|
| 1002| 2000|
| 1003| 1300|
+-------+------+
scala> spark.sql("select * from salary s where not exists (select 1 from depart d where s.staffId = d.staffId and d.depart = 'Marketing')").show()
+-------+------+
|staffId|salary|
+-------+------+
| 1004| 1500|
| 1005| 1600|
| 1006| 1700|
| 1007| 1600|
| 1008| 1700|
| 1009| 1000|
| 1010| 1900|
+-------+------+
scala> spark.sql("select * from salary s where staffId in (select staffId from depart d where d.depart = 'Marketing')").show()
+-------+------+
|staffId|salary|
+-------+------+
| 1001| 1000|
| 1002| 2000|
| 1003| 1300|
+-------+------+
scala> spark.sql("select * from salary s where staffId not in (select staffId from depart d where d.depart = 'Marketing')").show()
+-------+------+
|staffId|salary|
+-------+------+
| 1004| 1500|
| 1005| 1600|
| 1006| 1700|
| 1007| 1600|
| 1008| 1700|
| 1009| 1000|
| 1010| 1900|
+-------+------+
エラーなく実行できました!
終わりに
去年(2016年)の12月にSpark2.1がリリースされて、Spark2系も開発が活発に進められてる印象があります。
本日はSpark2系のSQL面での主な改善点を書きましたが、上記以外にもたくさんの改善が行われてますので、
これから既存のSparkジョブをSpark2系向けに移行しながらその良さを実感したいと思います。
ではまた!