astamuse Lab

astamuse Labとは、アスタミューゼのエンジニアとデザイナーのブログです。アスタミューゼの事業・サービスを支えている知識と舞台裏の今を発信しています。

pythonのArgumentParserような使い心地!picocliのご紹介

f:id:astamuse:20200415113243p:plain

こんにちは、開発部のnishikawaです。本日はpicocliというライブラリを使ってScalaでコマンドを実装する機会があったのでご紹介します。

コマンド作りあるある

コマンドを作る時によく実装されるのが引数のパースです。これはどんな言語で実装されていても多くの人が実装すると思います。よく使われるのがgetoptsですが、これを使っても毎回引数のパースを作り込むのは面倒です。

もちろんjavaも例外ではないのですが、この実装が面倒で時間がない時はバリデーションを省略するような実装をすることも多いと思います。

コマンド作るならやっぱりpythonだよねー

その点pythonだと、デフォルトで高機能な引数のパーサーがあります。それがArgumentParserです。これが存在することや便利なライブラリがあるのでコマンド作りはpythonでという人は多いと思います。

しかし、プロダクトの多くをJVM言語で実装しているようなチームでは資産が流用できないのでやっぱりpythonは・・・というようなところも多いかもしれません。じゃあJVM言語でもArgumentParserのような機能が欲しいということで出てくるのがpicocliです。

picocli とは

picocliはjavaで実装されたコマンドラインインターフェースを提供するライブラリです。とても高機能でJVM言語で利用可能です。

picocli: https://github.com/remkop/picocli

pythonのArgumentParserと比較しながら使ってみる

picocliはArgumentParserのような使い心地です。実際にpythonのArgumentParserと比較してみたいと思います。

以下は引数に数字を列挙すると最大値を返し、オプションで「--sum」をつけると列挙した引数の数字を合計するコマンドをArgumentParserとpicocliを利用して実装した例です。

ArgumentParser

まずはpythonのArgumentParserを使った例です。

test.py

import sys
from argparse import ArgumentParser, _SubParsersAction


import argparse

def main():
    parser = argparse.ArgumentParser(description='Process some integers.')
    parser.add_argument('integers', metavar='N', type=int, nargs='+',
                        help='an integer for the accumulator')
    parser.add_argument('--sum', dest='accumulate', action='store_const',
                        const=sum, default=max,
                        help='sum the integers (default: find the max)')

    args = parser.parse_args()
    print(args.accumulate(args.integers))

    return 0


if __name__ == '__main__':
    sys.exit(main())

サンプルを実装したら、実行してみます。

ヘルプ実行例

$ python3 test.py -h
usage: test.py [-h] [--sum] N [N ...]

Process some integers.

positional arguments:
  N           an integer for the accumulator

optional arguments:
  -h, --help  show this help message and exit
  --sum       sum the integers (default: find the max)
$

通常実行(--sumオプションなし)

$ python3 test.py 1 2 3
3
$

通常実行(--sumオプションあり)

$ python3 test.py 1 2 3 --sum
6
$

picocli

TestCommand.scala

package com.example

import java.util.concurrent.Callable

import picocli.CommandLine
import picocli.CommandLine.{Command, Option, Parameters}

import scala.util.{Failure, Success, Try};



object TestCommand {

  def main(args: Array[String]) = {
    System.exit(new CommandLine(new TestCommand).execute(args: _*))
  }

}

@Command(name = "test", mixinStandardHelpOptions = true, version = Array("test sample command 1.0.0"))
class TestCommand extends Callable[Int] {

  @Parameters(arity = "1..*", paramLabel = "N", description = Array("an integer for the accumulator"))
  var integers: Array[Int] = Array()

  @Option(names = Array("-s", "--sum"), description = Array("sum the integers (default: find the max)"))
  var isSum: Boolean = false

  override def call(): Int = {
    Try { if (isSum) integers.sum else integers.max } match {
      case Success(result) =>
        println(result)
        0
      case Failure(exception) =>
        exception.printStackTrace()
        1
    }
  }

}

サンプルを実装したら、実行してみます。(scalaの例ではassemblyで実行可能なjarにあらかじめパッケージングしております)

ヘルプ実行例

$ java -jar test.jar -h
Usage: test [-hsV] N...
      N...        an integer for the accumulator
  -h, --help      Show this help message and exit.
  -s, --sum       sum the integers (default: find the max)
  -V, --version   Print version information and exit.
$

通常実行(--sumオプションなし)

$ java -jar test.jar 1 2 3
3
$

通常実行(--sumオプションあり)

$ java -jar test.jar 1 2 3 --sum
6
$

まとめ

  • コマンド実装をJVM言語で行う時にはpicocliを利用すると面倒な引数のパースを簡略化することができる
  • 使い勝手はpythonのArgumentParserに近くとても使いやすい

ArgumentParserとpicocliを使ってみての感想

以上、pythonとscalaで二つのライブラリを使用して同じコマンドを実装してみましたが、言語差異によるコンパイルなどの手間はありますが、それ以外はpythonでの実装に近い使い勝手だったかなと思いました。

また、今回は簡単な例を実装しましたが、picocliは機能が豊富でドキュメント量が多いのでもっと色んなことができそうです。

まだまだ盛んに開発が行われているので、コマンドやバッチの実装などがある方は導入を検討してみるのもいいと思います。

それでは。

PythonからDataprocを操作してシームレスに並列処理を実現する

初めまして。2019年6月にAstamuseにjoinした rinoguchi です。

ついに昨日、日本でも緊急事態宣言が出ましたね。小学校の休校も1ヶ月程度延長されましたし、会社もリモートワークにほぼ移行してますし、ここできっちりウイルスの拡散を防ぎたいところです。
ちなみに、妻がドイツに単身赴任中なのですが、ドイツでは感染者は多くて外出自粛モードになっているものの、現地の人たちはせっかくだからと日曜大工したり、庭を改造したりとそれなりに楽しんでいるみたいです。私たちも制限された環境の中ですが、せっかくなので楽しみたいですね!

f:id:astamuse:20200330100826j:plain
屋根瓦. なんとなく並列処理を連想しませんか?

はじめに

それはそうと、私は当社で、特許データなどの名寄せ(同一人物に対してユニークなIDをふる作業)を担当しております。

特許の名寄せには、人物名・組織名・出願日・共同出願人など様々な特徴を利用するのですが、中国人名などでは同姓同名が起こりやすく、同じ名前で数万件の特許が出願されるケースもあるため、別人を判別する処理(名分け)が重要です。その根拠の一つとして特許の類似性を導入することにしたのですが、特許文書をベクトル化に尋常じゃなく時間がかかるため、処理を並列化することにしました。

分散処理用のクラスタを準備・管理していくのがハードルが高いので、フルマネージドな分散コンピューティングサービスであるDataprocを利用することにしたのですが、Dataprocであっても普通にコンソールやCLIから利用すると、手作業やbashスクリプトが登場してしまいます。

巨大な名寄せアプリケーションは全てPythonで実装され、一連の処理は自動化されていて、特許文書のベクトル化というごく一部の処理のために世界観を崩したくありません。なので、PythonからDataprocを操作してクラスタを構築しPythonのアプリケーションを送り込んでジョブを実行する形にすることで、他のプログラムと同様に全てをPythonコードでコントロールするように工夫しました。

この記事ではトラブル対応も含めPythonからDataprocを操作してシームレスに並列処理を行う方法を紹介していきたいと思います。

アーキテクチャ構成および処理の流れ

以下の図のように、Batch Server上のmain.pyを起点に、①から順番に処理が実行されていきます。

f:id:astamuse:20200407222756p:plain

Dataproc周りの環境準備

Dataprocを有効化

こちらに従ってDataprocを有効化します。書いてある通りに進めれば大丈夫です。

  1. GCP Console のプロジェクト セレクタのページで、GCP プロジェクトを選択または作成する
  2. Google Cloud Platform プロジェクトに対して課金が有効になっていることを確認する
    • クレジットカードを登録する必要があるため個人でやるときは少しハードルがあるかも
    • 一定時間アイドル状態のクラスタを自動削除する設定で払い過ぎリスクを削減します(後述)
  3. Cloud DataprocAPI を有効にする

権限設定

PythonからDataprocを操作する際に利用するサービスアカウントを作成し、権限を与えます。与えるロールは二つです。

  • Dataproc 編集者(roles/dataproc.editor)
    • Dataprocを操作するための権限です。詳細はこちらを参照
  • ストレージ管理者(roles/storage.admin)
    • DataprocではGCS上のソースコードを読み込んだり、GCS上にログを出力したりします。そのため、ストレージに対する権限がないとdoes not have storage.buckets.getというエラーが発生します。本来もう少し弱いロールを与えたいのですが、こちらを見る限りstorage.buckets.get権限はストレージ管理者ロールにしかないようです。

サービスアカウントを作成したら、キーを作成し保管しておきます。

GCSバケットを作成

実行対象のpythonソースコードや実行対象データ、結果データを保存するために利用します。

Dataprocクラスタを作成するリージョンと同じロケーションを指定し、ストレージ クラスにStandardを指定しておけば問題ないと思います。

実行状況やログの確認

クラスタやジョブの状況は、コンソール画面上で確認しますので、個人アカウントに以下のロールを割り当てる必要があります。

  • DataProc 閲覧者
    • クラスタやジョブの一覧・詳細表示に必要です
  • ストレージ オブジェクト閲覧者
    • ジョブの実行ログを参照するために必要です
  • モニタリング 閲覧者
    • リソース使用状況のグラフを閲覧する際に必要です

クラスタやジョブの状況はこちらから確認できます。
ジョブのタブでは、ジョブの実行状況やログを確認できます。ただし、このログはmasterノードの標準出力ジョブだけで、workerノードのログは確認するすることができません。

f:id:astamuse:20200330131601p:plain

workerノード側のログを確認したい場合は、ログビューアを利用します。
ノード名がlabels.compute.googleapis.com/resource_nameに出力されているので、ノード別にフィルタリングすることも可能です。

f:id:astamuse:20200330132321p:plain

Python実装

フォルダ構成

以下のようなフォルダ構成にしてあります。

├── main.py            # 処理の起点
├── master.py          # ドライバプログラム
├── worker.py          # 並列処理の実体
└── module
    ├── dataproc.py    # Dataprocを操作するモジュール
    └── storage.py     # ストレージを操作するモジュール

それぞれ、以下のようなことをやっています。

  1. main.py は、処理の起点となるプログラムです。PySparkで実行させたいPythonプログラムや処理対象データをGCSにアップロードして、Dataprocクラスタを作成し、ジョブをsubmitします。
  2. master.pyは、ドライバプログラムです。GCSから処理対象データを抽出し、SparkContextを生成、RDDを生成し、並列タスクを実行します。
  3. worker.pyは、並列タスクの実体です。文書のベクトル化を行います。
  4. moduleフォルダには、共通モジュールを配置してあります。

パッケージインストール

必要なパッケージをインストールしておきます。

pip install google-cloud-dataproc
pip install google-cloud-storage

ソースコード説明

公式サイトgithubにサンプルソースが書いてあるものの、正直分かりにくかったので、実際に動いているソースコードを全部晒して、詳細に説明していきたいと思います。

main.py

名前からも想像がつくように、処理の起点となるプログラムです。クラスタの外側で、クラスタ内で分散処理を実行するための準備を行い、処理を開始しています。具体的には、以下のような流れです。

  1. 実行対象のPythonファイル(master.pyworker.pystorage.py)をGCSにアップロードする
  2. 処理対象のデータをGCSにアップロードする
  3. with文でDataprocのクラスタを作成し、ジョブをsubmitして、処理が完了したらクラスタを自動で削除する
    • with文で扱えるようにする具体的な実装は後述のdataproc.pyを参照のこと
    • クラスタ作成時に、pip installするパッケージを指定したり、masterノードに定義される環境変数を指定したりしている

サービスアカウントのjsonキーのファイルパスなどは、環境変数で渡します。
なお、サンプルではベクトル化対象の文書には、英語のことわざを利用しています。

from typing import List
import os
from os import environ as env
from google.protobuf.duration_pb2 import Duration
from module.dataproc import DataprocCluster
from module.storage import StorageClient

SENTENCES: List[str] = [
    "Good words cool more than c",
    "Great talkers are like leaky pitchers, everything runs out of them.",
    "Every path has a puddle.",
    "Fools grow without watering.",
    "We never know the worth of water till the well is dry.",
    "Don't go near the water until you learn how to swim.",
    "Fire and water may be good servants, but bad masters.",
    "Let the past drift away with the water.",
    "Fish must swim thrice; once in the water, a second time in the sauce, and a third time in wine in the stomach.",
    "A sieve will hold water better than a woman's mouth a secret.",
    "If the lad go to the well against his will, either the can will break or the water will spill.",
    "Don't throw out your dirty water until you get in fresh.",
    "A straight stick is crooked in the water.",
    "You never miss the water till the well runs dry.",
    "Drinking water neither makes a man sick, nor in debt, nor his wife a widow.",
    "Mills will not grind if you give them not water.",
    "The pitcher goes so often to the well that it is broken at last.",
    "The mill cannot grind with the water that is past.",
    "The mill gets by going.",
    "A monk out of his cloister is like a fish out of water.",
    "Standing pools gather filth.",
]


def main():
    # 実行するPythonファイルをGCSにアップロード
    storage_client: StorageClient = StorageClient(env['BUCKET_NAME'], env['PROJECT_ID'], env['STORAGE_CREDENTIAL_PATH'])
    main_python_file_uri: str = storage_client.upload_to_gcs('./master.py', 'dataproc/src')
    python_file_uris: List[str] = [
        storage_client.upload_to_gcs('./worker.py', 'dataproc/src'),
        storage_client.upload_to_gcs('./module/storage.py', 'dataproc/src/module'),
    ]

    # 処理対象データをGCSにアップロード
    data_file_path: str = './data.txt'
    with open(data_file_path, 'w') as f:
        for sentence in SENTENCES:
            f.write(sentence + '\n')
    storage_client.upload_to_gcs(data_file_path, 'dataproc/input')
    os.remove(data_file_path)

    # クラスタ生成
    with DataprocCluster(
        env['PROJECT_ID'], env['DATAPROC_CREDENTIAL_PATH'],
        idle_delete_ttl=Duration(seconds=1000),
        pip_packages='more-itertools==5.0.0 nltk==3.4.5 gensim==3.8.1 google-cloud-storage==1.20.0',
        environment_variables={'PROJECT_ID': env['PROJECT_ID'], 'BUCKET_NAME': env['BUCKET_NAME']}
    ) as cluster:
        # ジョブ登録
        cluster.submit_pyspark_job(main_python_file_uri, python_file_uris)
        print('do something')


if __name__ == "__main__":
    main()
master.py

Dataprocクラスタのmasterノードで実行されるいわゆるドライバープログラムです。処理の流れは以下の通りです。

  1. GCSから処理対象のデータを取得する
  2. 文書データを5件毎にグルーピングする
    • 実際には約2億件のデータをTF-IDFやLSIで処理するが、これらの処理は件数の増加に対して非線形に処理時間が増加するため、処理時間を抑えるために一回の処理件数を数万件程度に分割している
    • 名分けにおいては、同一の可能性がある候補群の中でベクトル化すれば良いので、イニシャルが同じ(実際にはもう少し複雑だが)候補でグルーピングするようにしている
  3. SparkContextを生成する
  4. グルーピングした文書をRDDに変換して、並列タスクを行う
    • 実際には文書そのものはブロードキャスト変数でノード間で共有し、文書インデックスをRDDに変換して処理している
    • このmap関数がworkerノード上のExecuterで実行される並列タスクとなる
from os import environ as env
from pyspark import SparkContext, RDD, Broadcast
from typing import List, Tuple
from decimal import Decimal
from itertools import chain
from more_itertools import chunked

from google.cloud.storage.blob import Blob

import worker
try:
    from module.storage import StorageClient
except ModuleNotFoundError:  # pysparkノード上、全てのソースはGCS上の見かけ上のフォルダ構成は無視されて、同一フォルダ配下に配備されるため
    from storage import StorageClient


def main():
    # GCSからデータを取得
    storage_client: StorageClient = StorageClient(env['BUCKET_NAME'])
    sentence_blob: Blob = storage_client.download_from_gcs('dataproc/input/data.txt')[0]  # 1件目取得
    sentences: List[str] = sentence_blob.download_as_string().decode().split('\n')  # 改行で区切った文書リスト

    # 5件毎にリストを分割
    chuncked_sentences: List[List[str]] = list(chunked(sentences, 5))
    chuncked_sentences_indexes: List[int] = range(len(chuncked_sentences))

    # 分散処理
    sc: SparkContext = SparkContext()
    broadcasted_chuncked_sentences: Broadcast = sc.broadcast(chuncked_sentences)
    rdd: RDD = sc.parallelize(chuncked_sentences_indexes)
    vectors: List[List[Tuple[int, Decimal]]] = list(chain.from_iterable(
        rdd.map(lambda index: worker.vectorize(broadcasted_chuncked_sentences.value[index])).collect()
    ))

    print(vectors)


if __name__ == "__main__":
    main()
worker.py

vectorize()がworkerノードのExecuterで実行されるアプリケーションコードの実体です。
前処理してword2bowして、TF-IDFで重み付けし、LSIで次元圧縮しているだけです。
今回の本筋と離れるので説明は省きます。

from typing import List, Tuple
import string
from decimal import Decimal

from gensim import models
from gensim.corpora import Dictionary
from gensim.interfaces import TransformedCorpus
from gensim.models import LsiModel

import nltk
from nltk import tokenize
from nltk.stem.porter import PorterStemmer
from nltk.corpus import stopwords

nltk_data_dir: str = '/tmp/nltk_data'
nltk.download('punkt', download_dir=nltk_data_dir)
nltk.download('stopwords', download_dir=nltk_data_dir)
nltk.data.path.append(nltk_data_dir)


def vectorize(sentences: List[str]) -> List[List[Tuple[int, Decimal]]]:

    words_list: List[List[str]]
    # 単語に分割
    words_list = list(tokenize.word_tokenize(sentence) for sentence in sentences)

    # 単語の語頭・語尾の記号を除去
    words_list = list(list(word.strip(f'{"".join(string.punctuation)}') for word in words) for words in words_list)

    # ステミング(goes, pencils => go, pencilのように、語幹部分に変換する)
    ps: PorterStemmer = PorterStemmer()
    words_list = list(list(ps.stem(word) for word in words) for words in words_list)

    # ストップワード除去(the, a, ofなどの特徴にならない単語を除く)
    stop_words: List[str] = stopwords.words('english')
    exclude_words: List[str] = stop_words + list(string.ascii_lowercase) + list(string.digits) + list(string.punctuation) + ['``']
    words_list = list(list(word for word in words if word not in exclude_words) for words in words_list)

    # 辞書を作成する(単語 -> ID)
    dictionary: Dictionary = Dictionary(words_list)

    # コーパス化(各文書に含まれる単語群を、Tuple[ID, 出現回数]のリストに変換)
    corpus: List[List[Tuple[int, int]]] = list(map(dictionary.doc2bow, words_list))

    # TF-IDF modelの生成
    tfidf_model: models.TfidfModel = models.TfidfModel(corpus)

    # corpusへのモデル適用
    tfidf_corpus: TransformedCorpus = tfidf_model[corpus]

    # LSI modelの生成
    lsi_model: LsiModel = LsiModel(corpus=tfidf_corpus, id2word=dictionary, num_topics=300)

    # LSIモデルを適用
    lsi_corpus: TransformedCorpus = lsi_model[tfidf_corpus]

    # リスト形式に変更
    sentence_vectors: List[List[Tuple[int, Decimal]]] = list(
        c for c in lsi_corpus
    )

    return sentence_vectors
dataproc.py

Dataprocを操作するためのモジュールです。ここは説明したいポイントがたくさんあります。

  • __init__ では、クラスタの作成をしています
    • クラスタ設定は、cluster_dataで指示しています
    • 設定内容は、こちらを参考に、プロパティ名を lower snake case に読み替えればOKです(ここを発見するまで時間かかりました...)
    • 知っておいた方が良さそうな設定をいくつか紹介します
      • software_config.image_version: こちらを参考に仮想マシンのイメージを設定します。defaultのイメージだとPython 2.7がインストールされていますw
      • software_config. properties: sparkの場合、環境変数はspark-env.shという構成ファイルを経由して設定します。こちらに書いてあるのですが、プロパティ名にspark-env:のようなプレフィックスをつけるどの構成ファイル向けの設定かを指定しています
      • lifecycle_config.idle_delete_ttl: 一定時間クラスタがIDLE状態の場合にクラスタを削除してくれる安全設定です
      • initialization_actions. executable_file: ここにpip-install.shを指定して、gce_cluster_config. metadata.PIP_PACKAGESにパッケージ名とバージョンを指定することで、クラスタ内に指定したパッケージがpip installされます
    • クラスタ作成は非同期で実行されます。そのため、クラスタ作成が完了した時に呼び出されるcallback関数を設定しておき、callback関数が呼び出されるまで待ち合わせる必要があります
  • __enter____exit__with文で扱うために定義する関数です
    • Dataprocはクラスタ生存期間分だけ課金されるので、クラスタの削除し忘れが一番コスト的には怖いです。なので、万が一にも削除し忘れが発生しないように、with文で勝手にクラスタが削除されるように工夫しています
    • __enter__はwith文の入り口で呼び出される関数で、インスタンスをそのまま返しています
    • __exit__はwith文の出口で呼び出される関数で、正常終了時も異常終了時も呼び出されます
      • 正常終了時も異常発生時もどちらもクラスタを削除して、削除完了を待ち合わせます
      • 異常終了時はクラスタ削除完了後、例外が伝播されます
  • submit_pyspark_jobはpysparkのジョブ実行を登録するための関数です
    • 設定はこちらを参照してください
    • ジョブも非同期で実行されるのですが、こちらはcallback関数を受け取ってくれないため、job.statusを監視することでジョブの完了を待ち合わせます
import traceback
import time
import random
import string
from typing import List, Dict

from google.cloud.dataproc_v1 import ClusterControllerClient, JobControllerClient
from google.cloud.dataproc_v1.gapic.transports.cluster_controller_grpc_transport import ClusterControllerGrpcTransport
from google.cloud.dataproc_v1.gapic.transports.job_controller_grpc_transport import JobControllerGrpcTransport
from google.oauth2.service_account import Credentials
from google.api_core.operation import Operation
from google.protobuf.duration_pb2 import Duration


class DataprocCluster:
    project_id: str
    region: str
    zone: str
    cluster_client: ClusterControllerClient
    cluster_name: str
    dataproc_credentials: Credentials
    creates_cluster: bool
    waiting_callback: bool

    def __init__(
        self,
        project_id: str,
        dataproc_credential_path: str,
        region: str = 'asia-east1',
        zone: str = 'asia-east1-a',
        cluster_name='cluster-' + ''.join(random.choices(string.ascii_lowercase, k=10)),
        creates_cluster: bool = True,
        master_machine_type: str = 'n1-standard-1',
        num_master_instances: int = 1,
        worker_machine_type: str = 'n1-standard-1',
        num_worker_instances: int = 2,
        idle_delete_ttl: Duration = Duration(seconds=3600),  # defaultは1時間
        pip_packages: str = '',
        environment_variables: Dict[str, str] = dict()
    ):
        self.project_id = project_id
        self.region = region
        self.zone = zone
        self.dataproc_credentials = Credentials.from_service_account_file(dataproc_credential_path)
        client_transport: ClusterControllerGrpcTransport = ClusterControllerGrpcTransport(
            address='{}-dataproc.googleapis.com:443'.format(self.region),
            credentials=self.dataproc_credentials
        )
        self.cluster_client = ClusterControllerClient(client_transport)
        self.cluster_name = cluster_name
        self.creates_cluster = creates_cluster
        if not self.creates_cluster:
            return

        print(f'create_cluster {self.cluster_name} started.')

        properties: Dict[str, str] = dict()
        properties['yarn:yarn.nodemanager.vmem-check-enabled'] = 'false'  # yarn-site.xmlの形式
        for item in environment_variables.items():
            properties[f'spark-env:{item[0]}'] = item[1]  # spark-env.shの形式

        cluster_data = {
            'project_id': self.project_id,
            'cluster_name': self.cluster_name,
            'config': {
                'software_config': {
                    'image_version': '1.4-ubuntu18',
                    'properties': properties
                },
                'lifecycle_config': {
                    'idle_delete_ttl': idle_delete_ttl
                },
                'initialization_actions': [{
                    'executable_file': 'gs://dataproc-initialization-actions/python/pip-install.sh'
                }],
                'gce_cluster_config': {
                    'zone_uri': f'https://www.googleapis.com/compute/v1/projects/{self.project_id}/zones/{self.zone}',
                    'metadata': {
                        'PIP_PACKAGES': pip_packages
                    }
                },
                'master_config': {
                    'num_instances': num_master_instances,
                    'machine_type_uri': master_machine_type,
                    'disk_config': {
                        'boot_disk_size_gb': 512
                    }
                },
                'worker_config': {
                    'num_instances': num_worker_instances,
                    'machine_type_uri': worker_machine_type,
                    'disk_config': {
                        'boot_disk_size_gb': 512
                    }
                }
            }
        }

        response: Operation = self.cluster_client.create_cluster(self.project_id, self.region, cluster_data)
        response.add_done_callback(self.__callback)
        self.waiting_callback = True
        self.__wait_for_callback()
        print(f'create_cluster {self.cluster_name} finished.')

    def __callback(self, operation_future):
        print('callback called.')
        print(operation_future.result())
        self.waiting_callback = False

    def __wait_for_callback(self):
        print('waiting for callback call...')
        while True:
            if not self.waiting_callback:
                break
            time.sleep(1)

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, tb):
        if tb is not None:
            print(''.join(traceback.format_tb(tb)))
        if not self.creates_cluster:
            return

        print(f'delete_cluster {self.cluster_name} started.')
        response: Operation = self.cluster_client.delete_cluster(self.project_id, self.region, self.cluster_name)
        response.add_done_callback(self.__callback)
        self.waiting_callback = True
        self.__wait_for_callback()
        print(f'delete_cluster {self.cluster_name} finished.')

    def submit_pyspark_job(self, main_python_file_uri: str, python_file_uris: List[str]):
        print(f'submit pyspark job started.')
        job_details = {
            'placement': {
                'cluster_name': self.cluster_name
            },
            'pyspark_job': {
                'main_python_file_uri': main_python_file_uri,
                'python_file_uris': python_file_uris
            }
        }

        job_transport: JobControllerGrpcTransport = JobControllerGrpcTransport(
            address='{}-dataproc.googleapis.com:443'.format(self.region),
            credentials=self.dataproc_credentials
        )
        dataproc_job_client = JobControllerClient(job_transport)

        result = dataproc_job_client.submit_job(
            project_id=self.project_id, region=self.region, job=job_details)
        job_id = result.reference.job_id
        print(f'job {job_id} is submitted.')

        print(f'waiting for job {job_id} to finish...')
        while True:
            time.sleep(1)
            job = dataproc_job_client.get_job(self.project_id, self.region, job_id)
            if job.status.State.Name(job.status.state) == 'ERROR':
                raise Exception(job.status.details)
            elif job.status.State.Name(job.status.state) == 'DONE':
                print(f'job {job_id} is finished.')
                break
storage.py

GCSを操作する処理をまとめたモジュールです。
この共通モジュールは、Dataprocクラスタ内でも利用されますし、クラスタの外側(名寄せアプリケーションのその他大勢の処理)でも利用されます。
内容は本筋と関係ないので説明は割愛します。

import os
from typing import Optional, List

from google.oauth2.service_account import Credentials
from google.cloud import storage
from google.cloud.storage.bucket import Bucket
from google.cloud.storage.blob import Blob


class StorageClient:
    bucket_name: str
    storage_client: storage.Client
    bucket: Bucket

    def __init__(self, bucket_name: str, project_id: Optional[str] = None, storage_credential_path: Optional[str] = None):
        self.bucket_name = bucket_name
        if project_id is None or storage_credential_path is None:
            self.storage_client = storage.Client()
        else:
            storage_credentials = Credentials.from_service_account_file(storage_credential_path)
            self.storage_client = storage.Client(project=project_id, credentials=storage_credentials)
        self.bucket = self.storage_client.get_bucket(self.bucket_name)

    def upload_to_gcs(self, file_path: str, gcs_base_path: str) -> str:
        blob: Blob = self.bucket.blob(f'{gcs_base_path}/{os.path.basename(file_path)}')
        blob.upload_from_filename(file_path)
        return f'gs://{self.bucket_name}/{blob.name}'

    def download_from_gcs(self, prefix: str) -> List[Blob]:
        return list(self.bucket.list_blobs(prefix=prefix))

実行結果

main.pyを実行すると、クラスタが作成され分散処理が実行されます。

python main.py

ドライバープログラムのログは以下のような感じになりました。
無事に、並列処理で作成したベクトルデータがログに出力されました!(下から3行目)

[nltk_data] Downloading package punkt to /tmp/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package stopwords to /tmp/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
20/03/30 04:14:41 INFO org.spark_project.jetty.util.log: Logging initialized @9200ms
20/03/30 04:14:41 INFO org.spark_project.jetty.server.Server: jetty-9.3.z-SNAPSHOT, build timestamp: unknown, git hash: unknown
20/03/30 04:14:41 INFO org.spark_project.jetty.server.Server: Started @9410ms
20/03/30 04:14:41 INFO org.spark_project.jetty.server.AbstractConnector: Started ServerConnector@xxxxxxxxxx{HTTP/1.1,[http/1.1]}{0.0.0.0:xxxx}
20/03/30 04:14:41 WARN org.apache.spark.scheduler.FairSchedulableBuilder: Fair Scheduler configuration file not found so jobs will be scheduled in FIFO order. To use fair scheduling, configure pools in fairscheduler.xml or set spark.scheduler.allocation.file to a file that contains the configuration.
20/03/30 04:14:42 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at test-cluster-m/xx.xx.xx.xx:xxxx
20/03/30 04:14:42 INFO org.apache.hadoop.yarn.client.AHSProxy: Connecting to Application History server at test-cluster-m/xx.xx.xx.xx:xxxx
20/03/30 04:14:46 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: Submitted application application_1585541507498_0001
[[(2, 1.0)], [(0, -0.14448800681796353), (1, -0.6778027557186347), (3, -0.7205046986043693), (4, -0.02407528847231786)], [(0, -0.10353187224533954), (1, -0.7187360523274335), (3, 0.6873792958176269), (4, -0.014468662530294052)], [(0, -0.7229380237059977), (1, 0.09970102003471652), (3, 0.01737657210670717), (4, 0.6834605879097376)], [(0, -0.7190388298211174), (1, 0.13944837996685944), (3, 0.028338438513956645), (4, -0.6802457228360109)], [(0, -0.7205843071984989), (4, -0.6933673313758443)], [(1, 0.7437427088440981), (2, -0.6684659924343523)], [(1, -0.302843196389016), (2, -0.33694671350015426), (3, -0.8914891534182169)], [(0, -0.7205843071984968), (4, 0.6933673313758468)], [(1, -0.5959301816841694), (2, -0.6630385578683955), (3, 0.45304203926089587)], [(0, 0.7313550517447567), (4, 0.6819969122271922)], [(1, 0.19120874026768891), (2, 0.34759578248858897), (3, -0.9179413868223778)], [(1, -0.6530879900683576), (2, -0.6530879900683579), (3, -0.38334338973946774)], [(0, 0.5711616686116605), (1, -0.45765536984922495), (2, 0.42021204383576954), (3, 0.06379081309898492), (4, -0.5326147586543657)], [(0, 0.4567872152559851), (1, 0.5722471994880232), (2, -0.5254284798523805), (3, -0.07976332531831758), (4, -0.4259592787473683)], [(0, -0.8020751369226582), (3, 0.1701089001275885), (4, -0.5724844424330511)], [(1, 0.5061042429570382), (2, -0.8624723156489627)], [(0, -0.8020751369226586), (3, 0.17010890012758795), (4, 0.5724844424330502)], [(0, -0.24550844631891117), (1, -0.4365008983829189), (2, -0.25614150473912134), (3, -0.8267914477345281)], [(0, -0.1440659185336258), (1, 0.743858495260885), (2, 0.436500898382919), (3, -0.4851664826182654)], [(0, 0.9999999999999998)], []]
20/03/30 04:15:15 INFO org.spark_project.jetty.server.AbstractConnector: Stopped Spark@7516b90b{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
ジョブの出力が完了しました

余談ですが、nltkデータのdownloadがドライバープログラムのログに出力されていることから、ドライバープログラムでdownloadされたntlkデータがリモートサーバに共有されてリモートサーバ側でntlkの処理が実行されていることがわかります。

このdownload処理は、worker.pyのグローバルスコープに実装されているので、ドライバープログラム(master.py)でimportされた時点で実行され、そのデータはグローバルスコープの変数として保持されます。この変数をrdd.map()に渡したアプリケーションコード(lambda関数)で利用しているため、sparkがその変数をリモートサーバに共有して、リモートサーバでも利用できています。

トラブルシューティング

実装していると、色々なトラブルが起きました。解決に時間がかかったものもありますので、参考までに原因と対処方法を紹介しておきます。

Pythonのバージョンが2.7になってしまう

原因

クラスタ内のノードを構築する際の仮想VMのイメージのバージョンは、2020/3/30時点ではdefaultだと1.3-debian9なのですが、Pythonのバージョンが2.7になっているためです

対処

Pythonのバージョンが3.6の1.4-debian9や1.4-ubuntu18をimage_versionに指定しました。

実装サンプル
cluster_data = {
    'config': {
        'software_config': {
            'image_version': '1.4-ubuntu18',
        },
    },
}

google.api_core.exceptions.InvalidArgument: 400 Insufficient 'DISKS_TOTAL_GB' quota. Requested xxxx.x, available xxxx.x.

原因

利用可能なトータルDISKサイズを超えてしまっているためのようです。 こちらには、各ノードのDISKサイズはdefaultだと500GBと書いてあるのですが、実際に試した結果、VMのイメージによってdefault値は異なるようです。1.4-ubuntu18だと1000GBでした。

対処

合計のDISKサイズがDISKS_TOTAL_GB quotaを超えないように、disk_config.boot_disk_size_gbを設定しました。

実装サンプル
cluster_data = {
    'config': {
        'master_config': {
            'disk_config': {
                'boot_disk_size_gb': 128
            }
        },
        'worker_config': {
            'disk_config': {
                'boot_disk_size_gb': 128
            }
        },
    },
}

workerノードを10台用意しているのに、1台しか使われない

原因

sparkのdocumentには Normally, Spark tries to set the number of partitions automatically based on your clusterと書いてあるので、RDDはいい感じにパーティショニングされることを期待していたのですが、全てのデータが一つのパーティションに含まれてしまい、1台のworkerノードで順番に処理されていました。

対処

numSlicesパラメータで、明示的パーティションを区切るようにしました。
numSlicesには、ノード数 × vCPU数 を指定すると良いようです。

実装サンプル
rdd: RDD = sc.parallelize(range(len(grouped_sentences)), numSlices=40) 

org.apache.spark.scheduler.TaskSetManager: Stage 0 contains a task of very large size (xxxx KB). The maximum recommended task size is xxx KB.

分散処理しようとするRDDのサイズが大きすぎるとこのような警告が出ます。警告が出る上、一向に応答が返ってこなくなります。

対処

文書そのものはサイズが大きいので、ブロードキャスト変数として、ノード間で共有しました。

実装サンプル

文書のindexだけをRDD化して並列タスクを起動し、並列タスクの中で、ブロードキャスト変数から対象データをindexで取り出して処理するようにしています。
mapに渡しているlambda式は、workerノードのexecuterで実行される並列タスクのアプリケーションコードになります。

sc: SparkContext = SparkContext()
rdd: RDD = sc.parallelize(chuncked_sentences_indexes)
broadcasted_chuncked_sentences: Broadcast = sc.broadcast(chuncked_sentences)
vectors: List[List[Tuple[int, Decimal]]] = list(chain.from_iterable(
    rdd.map(lambda index: worker.vectorize(broadcasted_chuncked_sentences.value[index])).collect()
))

org.apache.hadoop.hdfs.DataStreamer: DataStreamer Exception java.io.IOException: Broken pipe

原因

collect()はmap処理で変換したRDD全体をmasterノードにfetchする処理ですので、大きすぎるサイズのRDDをcollect()した際に発生します。ずっと応答がなく、約1000秒経過してから本警告が出力され、最終的には異常終了しました。

対処

本文中のサンプルコードでは、コードを簡素にするためにベクトルデータをそのままcollect()していますが、実際には、collect()せず、workerノード側でベクトルをGCSに直接出力するようにしました。
GCSに出力するだけなので、実装サンプルは割愛しますが、workerノード内で利用されるデフォルトのサービスアカウントもGCSへのwrite権限は持っているので、特に詰まることもないと思います。

workerノードのExecuterで実行されるタスク内で環境変数を参照できない

原因

spark-env.shで設定する環境変数は、ドライバプログラムからしか参照できないようです。つまりExecuterには環境変数はコピーされません。

対処

並列タスクのアプリケーションコードに含まれる変数はコピーされるため、masterノード内で取得した環境変数を変数や引数として並列タスクのアプリケーションに渡します。

実装サンプル

このケースでは、並列タスク中のvectorize()の引数に環境変数を渡しています。

sc: SparkContext = SparkContext()
rdd: RDD = sc.parallelize(chuncked_sentences)
vectors: List[List[Tuple[int, Decimal]]] = list(chain.from_iterable(
    rdd.map(lambda index: worker.vectorize(chuncked_sentences, os.environ['BUCKET_NAME'])).collect()
))

クラスタ作成中にCtrl+Cなどでプログラムを強制終了すると、意図せずクラスタが残ってしまうことがある

対処

クラスタを作成する際に、IDLE状態で一定時間経過したクラスタを削除する設定を行います。
この設定をしておけば、クラスタを削除し忘れて必要以上に課金が必要になってしまう事故も発生しえないので、安心です。

実装サンプル

lifecycle_configidle_delete_ttlで指定します。
Durationクラスを探し出すのが地味に大変でした...

from google.protobuf.duration_pb2 import Duration

cluster_data = {
    'config': {
        'lifecycle_config': {
            'idle_delete_ttl': Duration(seconds=3600)
        },
    },
}

Unable to allocate xxx. MiB for an array with shape (xxxxxxxx,) and data type float64 at org.apache.spark.api.python.BasePythonRunner

原因

物理メモリが足りていない場合に発生します。

対処

RDDを出来るだけ分割して1サーバの処理件数を減らしたり、使わなくなった変数をdelで解放したりしたものの、どうしてもエラー発生を抑えられなかったので、最終的にはworkerノードのマシンタイプを変更しメモリを増強しました。

Container killed by YARN for exceeding memory limits. x.x GB of x GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead or disabling yarn.nodemanager.vmem-check-enabled because of YARN-4714.

原因

こちらを参照すると、仮想メモリが足りない場合に発生するようです。

対処

物理メモリ利用量をモニタリングしている限り、問題なさそうに見えたので、エラーログに出ているように"disabling yarn.nodemanager.vmem-check-enabled"の対処を行いました。

実装サンプル

software_configpropertiesyarn-site.xmlyarn.nodemanager.vmem-check-enabledの値がfalseになるように設定しています。

cluster_data = {
    'config': {
        'software_config': {
            'properties': {'yarn:yarn.nodemanager.vmem-check-enabled': 'false'}
        },
    },
}

クラスタ内で実行されるPythonファイルにおいて、別フォルダのPythonモジュールを読み込めない

原因

今回、GCS上でも共通モジュールをmoduleフォルダに配置していたのですが、

├── master.py          # masterノードでの処理
├── worker.py          # workerノードでの分散処理
└── module
    ├── dataproc.py    # Dataprocを操作するモジュール
    └── storage.py     # ストレージを操作するモジュール

ジョブをsubmitすると、GCSからPythonのソースコードがmasterノードに配備されるのですが、この際ソースコードはGCS上の論理的なフォルダ構成を無視して一つのフォルダに全て配置されます。
そのため、from module import storageのように階層構造を意識した構成だとModuleNotFoundErrorが発生してしまいます。

対処

元から同一の階層に全てのPythonソースコードを配置するのも手ですが、今回はそもそも巨大な名寄せアプリケーションがあり、その中でたくさんの共通モジュールを作成しており、その共通モジュールを分散処理でも利用したかったので、その手は取りたくありませんでした。

なので、まず階層ありimportして、もしエラーが発生したら階層無視でimportするような作りにしました。

実装サンプル
try:
    from module.storage import StorageClient
    from module import logger
except ModuleNotFoundError:
    from storage import StorageClient
    import logger

PermissionError: [Errno 13] Permission denied: '/home/nltk_data'

原因

nltk_dataをダウンロードする際のデフォルトフォルダである/homeに対して書き込み権限がないためです。

対処

書き込み可能なフォルダに対して、nltk_dataをダウンロードするように変更しました。

実装サンプル

`/tmp/nltk_data'をダウンロード先にう設定し、さらに、nltkが参照する先のパスに加えています。

import nltk

nltk_data_dir: str = '/tmp/nltk_data'
nltk.download('punkt', download_dir=nltk_data_dir)
nltk.download('stopwords', download_dir=nltk_data_dir)
nltk.data.path.append(nltk_data_dir)

なぜDataProc?

参考までにDataprocを採用した理由も書いておきます。

Google Cloud Dataprocは、オープンソースのデータツールを利用してバッチ処理、クエリ実行、ストリーミング、機械学習を行えるフルマネージドな Spark / Hadoop クラウドサービスです。

フルマネージドなので、Linuxの環境構築やSparkクラスタ構成用の設定など何も知らなくても分散処理を行うことができることが最大のメリットだと思います。

また、Dataprocは料金もかなり安いです。Dataprocではクラスタ生存期間に対して課金されるのですが、仮想CPU毎に1時間あたり1セント($0.01)です。

N1標準マシンタイプ(n1-standard-1)10台 × 1週間
で分散処理を行なったとしても、Dataprocの料金は
$0.010 × 10 × (24 × 7) = $16.8 = 1,848円
です。

ただし、GCEの料金が別途かかることに注意してください。

さいごに

今回、文書のベクトル化にDataprocを利用してみましたが、ある程度工夫することで、他のPythonプログラムと同様に、分散処理についてもPython起点でシームレスに実行することができました。

実行時間も1ヶ月->1週間まで短縮できましたし、学習コストも実際の料金もそれほどかからなかったので、とても満足しています。今後も活用していきたいと感じました。

弊社では、Dataprocに限らず新しい技術を取り入れることに対して個々人の裁量が大きく、組織としても後押ししてくれる文化があります。個人的にはそこがすごく気に入っていて、新しい技術を実務の中でチャレンジしていきたい人にはとても良い職場だと思います。

少しでも弊社に興味を持った方は↓↓↓のリンクから気軽にご連絡ください。

KeycloakとKeycloak Gatekeeperを用いてWordPressにログイン画面を作成する

初めまして、7月にAstamuseにjoinしました、植木です。
ICPのエンジニアを担当しております。
私は、普段なら桜が咲いて楽しみが増える季節ですが、コロナの影響もあり外出を控えないといけないと思うと残念で仕方がない今日この頃でございます。

はじめに

今回は、Keycloakについて紹介する記事です。 https://www.keycloak.org/resources/images/keycloak_logo_480x108.png
弊社のICPで導入に向けて作業を進めており、非常に便利でしたので筆をとるに至りました! 下記のような方は是非見ていただいて、参考にしていただけると幸いです。

  • Keycloakを知らない方
  • ざっくり概要を知りたい方
  • 導入を検討する方

概要

Keycloakとは、 OpenID / SAML の認証方式を用いて、「認証」「認可」を行うことができるOSS のミドルウェアです。 安全性をKeycloakが管理を担ってくれるため、アプリケーションはユーザーアクセス管理を意識することから解放され、余計な処理が不要になるため、コードがシンプルになります。 その他、シングルサインオン(1アカウントで複数のアプリケーションへのログイン)、ソーシャルログイン(FacebookやGoogle+などの、外部のIdPを使用してのログイン)にも対応しており、既存のアプリケーションにも容易に導入できます。

ICPで導入に至った経緯

機能開発や新規のプランごとに、ユーザや所属する組織に対する権限情報が増え行くことは、火を見るよりも明らかであると言えるでしょう。 例えば、API開発であれば、一般ユーザーAさんがアクセスできるエンドポイントは参照権限のみで、管理者ユーザーのBさんは参照加えて、作成/更新/削除も可能にする、といったような仕様を常に心に止めて開発する必要があります。
ICPでも同じ悩みを抱えており、1. (既存もしくは新規に)認証/認可サーバを作成する, 2. 認証/認可ミドルウェアを使用することを検討し、結論としては2を選択しました。
1に関してはSilhouette(シルエットと読みます)というライブラリを検討し、見送りました。理由は、SilhouetteはPlay Frameworkをベースとしており、リクエストが届いた際にエントリポイント単位で、認証ロジックを挟むといった形になっているため、Play Framework以外での実装となっているアプリケーションは対応できなかったためです。

起動方法

まずはローカル環境で試したいという方には、jboss/keycloakのdockerを用いて起動するが手軽でおすすめです。後の解説ではこれを使用します

# 例:最初の管理者ユーザを id=admin, password=adminで作成する場合。今回は8888ポートにしました。
$ docker run --rm --name keycloak -p 8888:8080 -e KEYCLOAK_USER=admin -e KEYCLOAK_PASSWORD=admin jboss/keycloak

直接サーバで動作させる場合は、tarボールをダウンロードしてきて、./standalone.sh を実行します。 Keycloak - Downloads

Keycloakの管理ページ、設定項目について

http://localhost:8888/にアクセスすると、KeycloakのWelcomeページをはじめに確認できます。

KeycloakのWelcomeページ画像
KeycloakのWelcomeページ
「Administration Console」から、管理者ユーザでログイン(今回の例ではadmin/admin)すると管理者用ページが現れ、下記のようなメニューがあることが確認できます。
Keycloakの管理者用ページ画像
Keycloakの管理者用ページ

  • 1. Realm Settings
    Realmというのはプロジェクトのワークスペースのようなものです。Masterというのがはじめに作成されています。ユーザのログイン方法やテーマのなどの設定ができます

  • 2. Clients
    Keycloakへアクセスするクライアント(アプリケーション)の設定を行います。

  • 3. Roles
    権限の名前がRoleになります。このMapping(RoleとGroups/Usersを組み合わせる)設定することで、権限制御ができます RealmにおけるRoleを設定できます。先ほどの2.ClientsでもRole設定があり、スコープが少し違います

  • 4. Groups
    ユーザーをひとまとまりにする単位です

  • 5. Users
    ユーザーの管理を行います

アプリケーション(Clients)の管理方法

Keycloakがアプリケーションの管理する方法として、大きく分類して2つの方法があります。
アプリケーションにKeycloak用のライブラリを組み込むクライアントアダプターパターンと、
Keycloak Gatekeeperを使用するクライアントプロキシパターンがあります。
クライアントアダプターパターンの、詳しい設定内容についてはこちらがわかりやすいです。 www.atmarkit.co.jp クライアントプロキシパターンには、リバースプロキシ(Apache、Nginx)にOpenID Connectのモジュールを組み込む方法もあります Keycloakで実用的なリバースプロキシ型構成を構築してみよう (1/4):Keycloak超入門(7) - @ITwww.atmarkit.co.jp

ICPでは、

  • クライアントアダプターの場合は、対応されていないアダブターを自作する必要があることがわかったため(ICPの場合は、Play Framework用)
  • クライアントプロキシパターン(Nginx)の場合は、Nginx Plus(有償)である必要がある
  • クライアントプロキシパターン(Apache)の場合は、NginxからApacheへの切り替えコストがかかる といった理由から、Keycloak Gatekeeperを選びました。Keycloak Gatekeeperはマイクロサービスを意識してシングルバイナリで動作し、Dockerコンテナ内で動かすことも想定して設計されていると感じており、期待の意味も込めて、クライアントプロキシ(Keycloak Gatekeeper)を選択しました。

この記事では、以降、Keycloak Gatekeeperを中心に書いていきます。

Keycloak Gatekeeperを用いてクライアントプロキシを試してみよう

Keycloakを用いる場合ユーザーはKeycloakで用意しているのログインページを使用するようになります。
Keycloak Gatekeeperのバックエンドにアプリケーションが配置されるようにし、認証/認可を判断したものだけがアプリケーションへ到達できるようなハンズオンをやってみましょう。
下記に、ハンズオンのために必要最低限の設定を書いたdocker-composeを用意しました。Mac OSでのみで動作します(Docker for Mac向けのhost.docker.internalを使用しているため)
github.com 下記を打つとWordPress、Keycloak、Keycloak Gatekeeperが起動します。 ※ 今回はWordPressへ到達までが目的なので、DBの設定などは入れていません

$ docker-compose up

f:id:astamuse:20200326165148p:plain
docker-compose後の起動イメージ

WordPressは読者の認証認可を想定するアプリケーションに読み替えていただけるとわかりやすいと思います

次に、テストユーザを追加しましょう

ユーザ追加ページ画像
ユーザ追加ページ
パスワードの初期値を適当に決め、設定します。
テストのためパスワードリセット画像
テストのためパスワードリセット

設定ができたら、Keycloak Gatekeeperのアドレスhttp://localhost:3000/へアクセスしてみましょう。初回なので、ログインページへリダイレクトされることが確認できます。

Keycloakのログインページ画像
Keycloakのログインページ
先ほど登録したユーザでログインしてみると、OIDC認証されWordPressのページ http://host.docker.internal:8880/ へリダイレクトされてきたことが確認できたと思います。
このような手軽さから、既存のアプリケーションに対して使用する場合でも、プロキシパスをKeycloak Gatekeeper経由に変更さえすれば、認証を担ってくれることがメリットと言えます。
リダイレクトされWordPressへ到達した様子の画像
リダイレクトされWordPressへ到達した様子

テーマについて

今回試したログインの際、テーマはKeycloakのデフォルトでしたが、Freemakerで作成したテーマを配置すれば、切り替えが可能です。
テーマはログインテーマだけでなく、アカウント管理、管理コンソール、電子メール、ウェルカムページのカスタマイズが可能です
新規に配置する際にはすぐに反映されるのですが、更新の場合はデフォルトでテーマファイルのキャッシュが効いているため、注意が必要です。
テーマに関しては、下記が参考になりました。 keycloak-documentation.openstandia.jp qiita.com

認証後の付与情報について

Keycloak Gatekeeperを通過しアプリケーションへ向かう際に、X-Auth系のヘッダが付与されるため、アプリケーションレベルでユーザを扱うことができるようにするため、Username(ログイン時のID), Email, 姓名などKeycloakで設定したユーザ情報が送られるようになっています。
また、上記以外にも、独自で定義したフィールドに値を詰めて渡すといったこともできるため自由度は高いと言えるでしょう。

最後に

ここまで読んでみていかがでしたでしょうか?
Keycloakは多機能で、今回紹介できた部分はほんの一握りでしたが、ざっくり分かっていただけていたら幸いです。 弊社では上記のように OSS を使って問題解決が開発がしたいエンジニアや、デザイナー、プロダクトマネージャーなど絶賛大募集中です。少しでもご興味を持っていただけたら、お気軽にカジュアルランチからでも構いませんので、下のバナーや @yoshixmk へ DM 等でご連絡いただければと思います。

Copyright © astamuse company, ltd. all rights reserved.