山縣です。
前回に引き続き CoreNLP を触っていきたいと思います。 前回までに API の使い方を見てきたので、今回は Spark からの使い方を見ていきたいと思います。
spark-corenlp
セットアップ
spark からCoreNLPを容易に使用する方法として spark-corenlp パッケージがあります。 spark-corenlp は Spark Packages で公開されています。また CoreNLP も Maven Repository で公開されており、下記のように spark shell の引数 --packages で指定するだけで利用することができます。 (弊社では CDH の spark 2.2.0を scala 2.11で利用しています)
$ spark2-shell --packages databricks:spark-corenlp:0.2.0-s_2.11,edu.stanford.nlp:stanford-corenlp:3.6.0
ためしに下記のサンプルを実行してみます。
import org.apache.spark.sql.functions._ import com.databricks.spark.corenlp.functions._ import spark.implicits._ val input = Seq( (1, "Stanford University is located in California. It is a great university.") ).toDF("id", "text") val output = input .select(explode(ssplit('text)).as('sen)) .select('sen, tokenize('sen).as('words), ner('sen).as('nerTags), sentiment('sen).as('sentiment)) output.show(truncate = false)
pasteモードでコピペして Ctrl-D します。
scala> :pa // Entering paste mode (ctrl-D to finish) import org.apache.spark.sql.functions._ import com.databricks.spark.corenlp.functions._ ... // Exiting paste mode, now interpreting. 18/04/20 14:42:51 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, cdh-dn03-dt.c.stg-astamuse-astamus e.internal, executor 1): org.apache.spark.SparkException: Failed to execute user defined function(anonfun$ner$1: (stri ng) => array<string>) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) ... Caused by: java.io.IOException: Unable to open "edu/stanford/nlp/models/pos-tagger/english-left3words/english-left3wor ds-distsim.tagger" as class path, filename or URL at edu.stanford.nlp.io.IOUtils.getInputStreamFromURLOrClasspathOrFileSystem(IOUtils.java:485)
エラーになってしまいました。モデルデータが無いのが原因です。Maven にはモデルデータも上がっているのですが、 使用するためには classifier を指定する必要があります。しかし --packages でのclassifier の記述の仕方が不明でうまく指定することが出来ませんでした。仕方がないのでモデルファイルをダウンロードして直接 jar ファイルを指定することで解決しました。
$ spark2-shell --packages databricks:spark-corenlp:0.2.0-s_2.11,edu.stanford.nlp:stanford-corenlp:3.6.0 \ --jars ./libs/stanford-corenlp-3.6.0-models-english.jar
上記のようにダウンロードした英語のモデルデータ stanford-corenlp-3.6.0-models-english.jar へのパスを --jars で指定します。
再度実行してみます。
scala> :pa // Entering paste mode (ctrl-D to finish) import org.apache.spark.sql.functions._ ... // Exiting paste mode, now interpreting. +----------------------------------------------+----------------------------------------------------+--------------------------------------------------+---------+ |sen |words |nerTags |sentiment| +----------------------------------------------+----------------------------------------------------+--------------------------------------------------+---------+ |Stanford University is located in California .|[Stanford, University, is, located, in, California, .]|[ORGANIZATION,ORGANIZATION, O, O, O, LOCATION, O]|1 ||It is a great university . |[It, is, a, great, university, .] |[O, O, O, O, O, O] |4 | +----------------------------------------------+----------------------------------------------------+--------------------------------------------------+---------+
今度は実行されました。これで Spark から CoreNLPが使用できるようになりました。
spark-corenlp の概要
spark-corenlp パッケージは小さいパッケージで、コードは object com.databricks.spark.corenlp.functions があるだけです。このobject に定義されている UDF として CoreNLP の各機能を提供する形になっています。 UDF として以下のものが定義されています。
- cleanxml
- tokenize
- ssplit
- pos
- lemma
- ner
- depparse
- coref
- natlog
- openie
- sentiment
ソースコードを見ると以下のように SimpleAPI を利用して実装されています。
functions.scala
... def tokenize = udf { sentence: String => new Sentence(sentence).words().asScala } ... def pos = udf { sentence: String => new Sentence(sentence).posTags().asScala } ... def lemma = udf { sentence: String => new Sentence(sentence).lemmas().asScala }
なお sentiment だけ、対応する SimpleAPIがないようで StanfordCoreNLP インスタンスを利用していました。
自前でUDFを定義する
spark-corenlp パッケージを使わずに直接 CoreNLP を使用する場合、私は以下のように UDF を定義して使っています。
package test2 { import java.util.Properties import org.apache.spark.sql.functions.udf import edu.stanford.nlp.ling.CoreAnnotations.{SentencesAnnotation, TokensAnnotation} import edu.stanford.nlp.pipeline.{Annotation, StanfordCoreNLP} import org.apache.spark.sql.{DataFrame, SparkSession} import scala.collection.JavaConverters._ case class ann(word:String, lemma:String, pos:String) object NLProc { val props = new Properties() props.setProperty("annotators", "tokenize,ssplit,pos,lemma") @transient val pipeline = new StanfordCoreNLP(props) val nlproc_f = (text: String) => { val document = new Annotation(text) pipeline.annotate(document) document.get(classOf[SentencesAnnotation]).asScala flatMap { s => s.get(classOf[TokensAnnotation]).asScala map { t => ann(t.word(), t.lemma(), t.tag()) } } }:Seq[ann] val nlproc = udf(nlproc_f) } }
paste -raw モードでコピペします。
scala> :pa -raw // Entering paste mode (ctrl-D to finish) package test2 { ... // Exiting paste mode, now interpreting.
性能劣化を避けるために pipline を UDFの呼び出しの度に作らないようにしています。 以下のように使用します。
import test2._ val df = Seq( (1, "Stanford University is located in California. It is a great university.") ).toDF("id", "Text") val df2 = df.select(NLProc.nlproc($"Text").as("nlp")) df2.show(truncate=false)
scala> :pa // Entering paste mode (ctrl-D to finish) import test2._ ... // Exiting paste mode, now interpreting. +--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |nlp | +--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |[[Stanford,Stanford,NNP], [University,University,NNP], [is,be,VBZ], [located,located,JJ], [in,in,IN], [California,California,NNP], [.,.,.], [It,it,PRP], [is,be,VBZ], [a,a,DT], [great,great,JJ], [university,university,NN], [.,.,.]]| +--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
spark-corenlp の問題点
spark-corenlp は手軽なので、私も当初使っていたのですが最近は上記のように自分でUDFを定義して使用しています。
理由のひとつは対応する CoreNLP のバージョンが古いままな事があげられます。現在の spark-corenlp が対応する CoreNLP のバージョンは 3.6.0 ですが、CoreNLP の最新バージョンは 3.9.1 まで上がっており、かなりバージョンの差が開いてしまっています。
またもう一つの理由は性能面で問題があることに気がついたからです。
性能面での問題を明らかにするため、spark-corenlp を使った場合と、上記のような UDF を定義した場合で処理時間を比較してみました。
spark-corenlp と 自前のUDFの処理時間の比較
データとして Kaggle の Amazon Fine Food Reviews を使ってみます。zip ファイルを展開して CSV ファイルを hdfs 上に上げて下記のように読み込みます。
import com.databricks.spark.corenlp.functions._ val df = spark.read.format("com.databricks.spark.csv").option("inferSchema", "true").option("header", "true"). load("hdfs://nameservice1/user/y.yamagata/AmazonFineFoodReviews.csv").repartition(4096).persist
scala> df.count res2: Long = 568454 scala> df.printSchema root |-- Id: integer (nullable = true) |-- ProductId: string (nullable = true) |-- UserId: string (nullable = true) |-- ProfileName: string (nullable = true) |-- HelpfulnessNumerator: string (nullable = true) |-- HelpfulnessDenominator: string (nullable = true) |-- Score: string (nullable = true) |-- Time: string (nullable = true) |-- Summary: string (nullable = true) |-- Text: string (nullable = true)
件数は57万件弱、今回は Text カラムのみを利用します。
scala> df.select("Text").show(truncate=false) ... |"This was a serious surprise to us... our cats have always refused wet cat food except for people quality tuna in the can. Soon as we opened these for them ( was a varity pack ) they scarfed one down and everyday wanted more .. it real ly looks, smells like people food! The package says ""people food for cats"" and after reading the ingrediants I belie ve it is!" | |Not too over powering. Lots of powder and not too salty. Comes with pieces of dried veggies and very small beef piece s but adding more beef is not a problem. lol ...
上記のようなテキストデータが入っています。 spark-corenlp で Text カラムのデータに以下のような処理を実行します。
def run(df: DataFrame): DataFrame = { ... val df2 = df.select(tokenize($"Text").as("word"), lemma($"Text").as("lemma"), pos($"Text").as("pos")).persist df2.count df2.unpersist()
tokenize, lemma, pos の処理を呼び出します。 自前のUDFでは以下のような処理を実行します。
def run(df:DataFrame):DataFrame = { val df2 = df.select(NLProc.nlproc($"Text")).as("nlp").persist() df2.count() df2.unpersist()
NLProc.nlproc は前に定義したものになり、こちらもtokenize, lemma, posを呼び出しています。
この2つの処理を3回ずつ実行してJob の実行時間をSparkUI から取得しました。 Spark の実行パラメータは、Executor数:4, Core数:4 Executorメモリ12GB となります。 結果は以下のとおりです。
spark-corenlp(分) | 自前UDF(分) | |
---|---|---|
1回目 | 9.0 | 1.9 |
2回目 | 8.9 | 1.8 |
3回目 | 9.0 | 1.9 |
平均 | 9.0 | 1.9 |
自前のUDFで実行したほうが4倍以上速くなりました。
このような差がついてしまう原因としては、spark-corenlp が tokenize, lemma, pos と各 UDF を呼ぶ毎に別々に CoreNLP の API を呼ぶためではないかと考えています。 たとえば lemma UDF は内部で lemma アノテータ を呼んでおり、lemma アノテータはアノテータの依存関係から tokenize, ssplit, pos を呼びます。そして pos UDF は内部で pos アノテータを呼び、posアノテータは tokenize, ssplit を呼びます。このようにそれぞれのUDF毎に別々に処理を実行すると同じ処理を何度も呼び出すことになり計算量が多くなっているのではと予想しています。実際、呼び出す UDFを lemma だけにすると処理時間は2.4分程度になり、自前UDFに近い処理時間となりました。
終わりに
以上、Spark から CoreNLP を使う方法を見てきました。spark-corenlp パッケージは便利ですが処理速度やCoreNLPのバージョンへの追随に問題があると思われます。 現時点では自前でUDFを使ったほうが良いのではないかと考えています。 Spark で使用する場合は大量のドキュメントを処理したいことが多いかと思います。NLP周りの処理は重く時間がかかることが多いので、性能が落ちないように処理するように注意したほうが良いと思います。