山縣です。
今回は Spark を無料で手軽に触れる Databricks Community Edition について書きたいと思います。
Databricks Community Editionとは
Databricks は Databricks 社が提供しているSparkサービスで、Community Edition はその無料版にあたります。
無料にもかかわらずメモリ6GBのAWSのインスタンスとNotebook が使えるというなかなか太っ腹なサービスです。
弊社では Cloudera社の CDH のクラスタがありSparkもその上で動かしていますが、個人的にSpark そのものの動作を確認したり、異なるバージョンを触りたいときに時々使わせてもらっています。
Sparkの学習用環境としては十分な機能が備わっていると思いますので、簡単に紹介したいと思います。
ユーザ登録
Community Editionを使用するためにはユーザ登録が必要です。
ユーザ登録はTRY DATABRICKS画面の右側の Community Edition の下にある
START TODAY ボタンを押してフォームに情報を入力します。
クレジットカードなどの課金情報の入力は必要ありません。
フォームを登録後、登録したアドレスにメールが来るのでリンクをクリックしてメールアドレスの確認ができれば完了です。
一般的な Web サービスのユーザ登録と変わらないと思います。
登録が完了したら ログイン画面からログインします。
ログインすると下記のような画面になります。
左縦にメニューがあります。
クラスタの起動
まずは Spark クラスタを作成・起動します。
左側のメニューの Clusters ボタンを押すと、クラスタの一覧表示の画面になります。 画面上部にある + Create Cluster ボタンを押して新規にクラスタを作成します。
Cluster Name に適当に名前を入れ、Databricks Runtime Version で起動したい Sparkを選択します。
選択するSparkの一覧は大まかに言ってSpark のバージョンそのものと、Scala のバージョンを 2.10 にするか 2.11 にするかの組み合わせの一覧となっています。
Sparkのバージョン名は Spark <バージョン>-dbN という形式になっており、<バージョン>の部分は元となるApache Spark のバージョンを、 dbN の部分はdatabricks release番号を示しています。 同じSparkのバージョンに対してDatabricks がbackport や独自機能を加えてリリースしています。
また (Auto-updating) となっているものはdatabricks release が常に最新のものを自動的に選択するようになります。
なおこの記事を書いている時点で Spark2.2 のRC2 が利用可能になっていました。 折角なので “Spark 2.2 RC2 (Experimental, Scala 2.11)” を指定します。
Community Edition の場合、Instance の細かいカスタマイズ等は出来ませんので、あとは AWS の AZ の選択とSpark の Config を指定するのみです。 ここでは何も指定せず、Create Cluster ボタンを押して起動します。
Cluster 一覧画面に戻り、しばらくすると起動が完了します。
起動したクラスタは Spark のWebUIである、Spark UI にアクセスすることができます。SparkUIを見ると起動したクラスタの詳細が分かります。
Environment を見てみると spark.master が “local[8]” となっておりlocalモードで起動していることが分かります。
またログを確認することもできます。 起動したクラスタはアイドル状態がしばらくすると自動的に停止します。停止したクラスタは一覧に Terminated Cluster として残っていますので再起動可能です。
Workspace
Workspace はルートフォルダでNotebook やライブラリなどはすべて Workspace 以下に保管することになります。サブフォルダを作ることもできます。 標準で共有用の Shared というフォルダとユーザのホームフォルダである Users/<アカウント> があります。Shared は企業などが複数のアカウント間でNotebook などを共有するためのフォルダのようです。とりあえず今回は Users/<アカウント>以下にNotebook やライブラリを置いていきます。
Notebook
コードはNotebookに書いていきます。 Databricks の NotebookはIPython と互換性があるそうなのでIpython ベースなのかもしれません。
左メニューの Workspace から Users → <メールアドレス>を選択して Create→Notebook を選択します。
Name を適当に入れ、 Language で使いたいプログラミング言語を選択します。 言語は Scala, Python, SQL, R が選べます。 ここでは Scala を選びます。
またNotebookを実行するにはSparkクラスタにアタッチする必要がありますので、接続する Spark クラスタとして先ほど作成したクラスタを選択します。
作成後、空のセルが一つだけあるノートブックが表示されます。
空のセルに以下の内容をコピペして Shift + Enter でセルを実行します。
case class T1(id:Int, message:String) val ds1 = Seq(T1(1, "Hello"), T1(2, "World"), T1(3, "Hello"), T1(4, "Spark")).toDS display(ds1)
display() によって結果を表示することができます。
上記のセル実行後に下に空のセルができますので、さらにコードを追加します。
ds1.createOrReplaceTempView("t1") val ds2 = spark.sql("select message, count(*) as num from t1 group by message order by num desc") display(ds2)
Notebook には以下の変数が事前に定義されています。
- sc:SparkContext
- sqlContext:HiveContext
- spark:SparkSession (2.0以降)
結果はテーブルだけでなくグラフとして表示することもできます。 結果の左下のグラフボタンを押すとグラフに変えることができます。
このNotebook は Scala 用として作成しましたが、他の言語のセルを追加することも出来ます。 上記の Scala で書いたSQLクエリは以下のように直接書くことも可能です。
%sql select message, count(*) as num from t1 group by message order by num desc
%sql 以外に %python, %r, %scala, %sh(shell), %fs(Databricks Utilities) の指定もできます。
DBFS
Databricks の固有機能としてDBFS(Databricks File System)が使えます。 これは AWS S3上に作れらたファイルシステムでSSDにキャッシュすることで高速化しているとのことです。DBFS は Scala と Python から利用することが可能で、dbutils (Databricks Utilities)というライブラリ経由でアクセスします。dbutils は Notebook にビルトインされているので簡単に利用可能です。
display(dbutils.fs.ls("/"))
fsのヘルプは以下のようにしてみることが可能です。
dbutils.fs.help()
sparkのデータの保存先としても利用可能です。 下記ではDataset ds1 を parquet フォーマットで書き込んでいます。
ds1.write.parquet("dbfs:/tmp/ds1") display(dbutils.fs.ls("/tmp/ds1"))
Table
Databricks ではテーブルを利用することができます。 テーブルの作成はデータのインポートにより行います。データのインポート元してはファイルのアップロード、AWS S3などいくつかを選ぶことが可能です。
ここでは総務省が運用している統計データダッシュボードサイト のAPI を使って国内の都道府県別、年代別、西暦別の人口データををCSVとしてダウンロードしてインポートしてみます。また都道府県マスタもダウンロードします。
$ wget -O - 'http://data.e-stat.go.jp/dashboard/api/1.0/Csv/getData?Lang=JP&ParentRegionCode=00000&IndicatorCode=0201010010000010010,0201010010000010020,0201010010000010030' |tail -n +31 > jp_population.csv $ wget -O - 'http://data.e-stat.go.jp/dashboard/api/1.0/Csv/getRegionInfo?Lang=JP&ParentRegionCode=00000'|tail -n +18 > jp_region.csv $ head jp_population.csv "indicatorCd","unitCd","statCd","regionCd","timeCd","cycle","regionRank","isSeasonal","isProvisional","value","cellAnnotations" "0201010010000010010","090","20020101","01000","1975CY00","3","3","1","0","1312611", "0201010010000010010","090","20020101","02000","1975CY00","3","3","1","0","380218", ... $ head jp_region.csv "upAreaCd","upAreaNm","upHiragana","areaCd","areaNm","areaLevel","hiragana","fromDate","toDate" "00000","全国","ぜんこく","01000","北海道","3","ほっかいどう","190001","999912" "00000","全国","ぜんこく","02000","青森県","3","あおもりけん","190001","999912" ...
上記の方法でダウンロードしたCSVをインポートします。 左メニュー → Tables→ +Create Tables でインポート画面が出ます。インポートのソースとしてFile を選択して jp_popluation.csv をアップロードを行います。
Preview Table を押して Table name に “jp_population” を入力、First row is headerをチェックします。カラムのデータ型を選べますが、面倒なので String のままにしておきます。 (この辺データを見て自動で推測してくれると楽なのですが。)
最後に Create Table を押します。 なぜか画面にエラーメッセージが一瞬出ますが、問題なく登録できます。 作成したテーブルはSQLからその名前でアクセス可能です。
%sql select * from jp_population
同様に都道府県データを jp_region という名前でテーブルに登録します。
インポートした二つのテーブルをJoinして使いやすいようにしてみます。
val pop = spark.sql(""" select regexp_replace(p.timeCd, 'CY00', '') as Year, r.areaNm as Prefecture, case p.indicatorCd when '0201010010000010010' then '0~14歳' when '0201010010000010020' then '15~64歳' when '0201010010000010030' then '65歳以上' else 'その他' END as Age, cast(p.value as Int) as Population from jp_population p join jp_region r on p.regionCd = r.areaCd """) pop.createOrReplaceTempView("pop") display(pop)
上記処理によって “prefecture”, “year”, “population” をフィールドとして持つテーブル “pop” を作ることができました。
例えばこのテーブルを使って東京都の西暦、年齢層のグラフを作ると以下のようになります。
外部ライブラリの利用
外部ライブラリの利用はScala/Javaの場合は JAR ファイルをアップロードするか、Maven Central/Spark Pcakages からダウンロードするかになります。
まず Maven から 自然言語処理ライブラリであるStandford CoreNLPをダウンロードしてみます。
ライブラリの取り込みもNotebook と同じようにWorkspace のフォルダ内のメニューで Create → Library と実行します。
Source として Maven Coordinate を選択し、Search Spark Packages and Maven Central を押します。
検索ボックスに corenlp と入れ、spark-corenlp を、Rleases は利用している Scala のバージョンに合うものを選択します。ここでは “0.2.0_s_2.11” を選択しました。
元の画面に戻るので Create Library を押すと、選択したフォルダ内にライブラリが作られます。
早速利用してみます。
import org.apache.spark.sql.functions._ import com.databricks.spark.corenlp.functions._ val input = Seq( (1, "<xml>Stanford University is located in California. It is a great university.</xml>") ).toDF("id", "text") val output = input .select(cleanxml('text).as('doc)) .select(explode(ssplit('doc)).as('sen)) .select( 'sen, tokenize('sen).as('words) ) display(output)
とりあえずトークナイズはできました。他の機能も使ってみます。
val output = input .select(cleanxml('text).as('doc)) .select(explode(ssplit('doc)).as('sen)) .select( 'sen, tokenize('sen).as('words), ner('sen).as('nerTags), sentiment('sen).as('sentiment), lemma('sen).as('lemma) ) display(output)
ner(), sentiment(), lemma() というメソッドを使おうとしましたがエラーとなりました。 language model データが無いのが原因なのですが、Maven経由ではlanguage modelデータを入れられませんでした。 調べてみると Databricks社のほうで、対応方法を記したNotebookが出ていました。
こちらは model の jar ファイルをダウンロードしてSprkContext に直接jarを追加するというやり方です。このやり方を試してみましたが、残念ながらうまく動きませんでした。さらに調べているとクラスローダを取得してそちらに追加する方法を書かれている人もいましたがこちらもうまく行きませんでした。 仕方ないので model の jar ファイルをライブラリとして直接アップロードを試みました。
これは先ほどの Maven経由と同じ手順で方法で “Create” → “Library” としてSource を “Upload Java/Scala JAR” にしてファイルを指定する方法となります。
残念ながらmodel のJARファイルは1GB近くあり、アップロードの途中でエラーとなってしまいました。最終的には model jarファイルを3つに分割してアップロードすることで何とか対応できました。
ライブラリ登録後、再度実行すると今度は結果を返すことができました。
APIによる操作
Databricks はREST API を提供しており多くの操作がAPI経由で可能です。
APIの認証はBASIC認証で行われ、ユーザとパスワードはWebUIにログインするときと同じものとなります。
APIへのアクセスは以下のURLになります。
https://community.cloud.databricks.com/api/2.0/…
例えばクラスタの一覧を得るには以下のような URL でアクセスします。
https://community.cloud.databricks.com/api/2.0/clusters/list
APIに関する情報はこちらのドキュメントにあるので参照してください。
終わりに
以上、Databricks Community Edition で使える機能について見てみました。無料で使える割にはクラスタの作成、Notebook、データストアと必要な機能が一通りそろっておりSparkの学習をしたい人にはかなりおススメです。
最後に弊社では Spark をはじめとして様々なデータ処理や基盤構築などを行うソフトウェアエンジニアを募集中です。興味のある方はこちらのサイトをご参照ください。