astamuse Lab

astamuse Labとは、アスタミューゼのエンジニアとデザイナーのブログです。アスタミューゼの事業・サービスを支えている知識と舞台裏の今を発信しています。

Python が Cloud Functionsで使えるようになったので試してみました

f:id:astamuse:20181030211402j:plain

こんにちは、アスタミューゼでデザイナーをしている@YojiShirakiです。最近、細かい自動化したい処理が増えてきて困っておりまして。そんな折に Cloud Functions で Python が使えるようになったと聞いたので早速触ってみようかと。

そこそこちゃんと書いたので、読む時間が無い方はブックマークすることをおススメします。


目次

Cloud Functions とは

昨今のサーバーレスアーキテクチャ思想に呼応する関数をクラウド上で実行できるサービスです。有名どころでは AWS の Lambda、 Microsoft の Azure Functions があります。GCP にも Cloud Functions というサービスがありこれまでは JavaScript のみに対応していました。最近、これの Python 対応がベータリリースされたので練習がてら一つ作ってみます。

今回の練習アプリ

今回は Search Console のデータを Cloud Functions 上で取得して BigQuery に入れる処理を作ります。Search Console は非常に便利ですが、データ保持期間が16カ月であるため年間比較する際にやや不便です。そこで今回、Search Console のデータを引っこ抜いて保存してしまいます。

構成は下図。途中、JSON 鍵の管理などで Cloud KMS、Cron 処理のために GAE + Cloud Pub/Sub が登場するてんこ盛りの内容ですがどうぞお付き合いください。

f:id:astamuse:20181030184822j:plain

前提

今回のコードについては GCP プロジェクトの選択・作成やAPIの有効化は事前に終わっているものとします。また、今回の機能はすべてベータ版なので仕様変更により動作しなくなることもあります。悪しからずご了承ください。また Cloud のサービスの性質上、様々なタスクを走らせた結果、一定の料金が発生します。今回の構成は無茶なことをしなければ大きな金額は発生しませんが、プログラムの実行は自己責任でお願いします。

JSON キー の発行と管理

まずは今回のプロジェクト用に API キー(JSON) を発行します。Cloud Console のAPIとサービスから認証情報を生成しましょう。鍵のタイプはサービスアカウントキーにします。役割は後で編集しますが、一度「BigQueryのデータ編集者」で良いです。取得した JSON は一度ローカルに保存しておいてください。

f:id:astamuse:20181030184851j:plain サービスアカウントキーを選択。

f:id:astamuse:20181030184901j:plain 権限は BigQuery のデータ編集者で(たぶんジョブユーザーでも良い)

権限の追加

鍵を発行したら権限の追加設定を行います。IAMから、今発行した鍵ユーザーの編集画面を開き、役割に「Pub/Subパブリッシャー」を追加してください。これで Pub/Sub に対してメッセージ投げられます。

f:id:astamuse:20181030184940j:plain

Search Console へのアカウント追加

ついでに、データを取得したい Search Console に今回の JSON キー のアカウントを追加しておきます。これによってOAuth 認証を経ずとも Search Console のデータを取得できます。取得した JSON キー の client_mail の値(メールアドレス形式)を Search Console に追加しましょう。権限はフルで良いかと思います。

f:id:astamuse:20181030185154j:plain JSON キー の client_mail の値を確認します。

f:id:astamuse:20181030185223j:plain 先ほどのアドレスを Search Console のユーザーに追加します。これによって OAuth 認証を回避できます。

なお、Search Console の権限ドキュメントはこちらです。

Search Console の権限資料

KMS を利用して JSON 鍵 を暗号化する

さて次は、取得した JSON 鍵 を暗号化します。JSON 鍵 をそのまま git 管理するのは憚られますし、況やコードに埋め込むのも問題です。そこで今回は Cloud KMS を使って暗号化された状態で保持し、必要に応じて復号する流れにします。

KMSの暗号化・復号化については公式の説明がわかりやすいので、一度ご覧ください。

暗号化されたリソースの使用

JSON 鍵 を暗号化するための暗号鍵を作成

まず暗号化の前に、暗号化のための暗号鍵を準備します。この概念についても公式(オブジェクト階層 )に詳しい説明があるのでそちらをご参照ください。

鍵の作成は以下のコマンドでできます。まずkeyring(鍵束)を作成し、その後、key(鍵)を作成します。

# KEY RING の作成(KEYRING-NAME は任意に決めてください)
gcloud kms keyrings create [KEYRING-NAME] --location=global

# 暗号鍵の作成(KEY-NAME は任意に決めてください)
gcloud kms keys create [KEY-NAME] \
  --location=global \
  --keyring=[KEYRING-NAME] \
  --purpose=encryption

これで暗号化に必要な鍵ができました。鍵は KMS 内にあり、私たちは直接触ることはありません。

暗号鍵を用いて JSON 鍵 を暗号化

先ほど作成した暗号鍵(@Cloud KMS)を用いて JSON 鍵 を暗号化します。

gcloud kms encrypt \
  --plaintext-file=secrets.json \
  --ciphertext-file=secrets.json.enc \
  --location=global \
  --keyring=[KEYRING-NAME] \
  --key=[KEY-NAME]

secret.jsonというのが先ほど取得した JSON 鍵 だとお考え下さい。これを KMS の暗号鍵を使って暗号化するという寸法です。そして、このコマンドで作成されたsecret.json.encというバイナリファイルが暗号化された JSON 鍵です。基本はこれをgit管理し必要に応じて復号して利用します。

バイナリファイルを文字に変換する

さて、出来上がったsecret.json.encについてはこのまま利用してもよいのですが、Cloud Function の性質上バイナリでやり取りするのが面倒なので文字列変換しておきます。ターミナルから Python でデコードします。

$ python
> import io
> import base64
> with io.open('secret.json.enc', 'rb') as f:
>     base64.b64encode(f.read()).decode('ascii')

結果は

CiQA6mnLKL032sEfR3rnCEocT8dbZxHAS03RsiAN6uxVHSA......3a5ckdgNQ==

といったようなよく見る文字列が返ってきます。これを今後使っていきます。

Cloud Functions のコード

鍵周りの下準備が整ったので次に Cloud Functions に移ります。

構成

Cloud Functions 側を作るためにまずは3つのファイルを用意します。

project_dir(任意にお作りください)
├ main.py
├ requirements.txt
└ env.yaml

main.py は実際にコードを書くファイルです。requirements.txt にはインストール必要なモジュールを記載します。env.yaml には実行時に与えたい環境変数を定義します。後者2ファイルは以下のようなものです。

google-api-python-client
google-cloud-bigquery

requirements.txt

CIPHERTEXT : CiQA6mnLKL032sEfR3rnCEocT8dbZxHAS03RsiAN6uxVHSA......3a5ckdgNQ==
PROJECT_ID : [YOUR_PROJECT_ID]
LOCATION : [KMS_LOCATION]
KEY_RING : [KMS_KEYRING_NAME}
KEY_NAME : [KMS_KEY_NAME]
DATASET : [BIGQUERY_DATASET_NAME]
TABLE : [BIGQUERY_TABLE_NAME]
SITE_URL : [YOUR_SITE_URL] # 取得したサイトのURL。プロトコルから記載する(例:https://astamuse.co.jp/

env.yaml

CIPHERTEXTには、先ほど KMS で 暗号済の JSON 鍵 を入れます。 Cloud Functions は外部ファイルを持てないのでこういった形にせざるを得ません。

その他PROJECT_IDなどはご自身のプロジェクトのIDなど適宜記入してください。

main.py

main.py はやや長いですが、以下のようなコードです。

def import_gsc(event, context):
    import os
    import json
    import datetime
    import base64
    import logging
    import tempfile

    from google.oauth2 import service_account
    from googleapiclient.discovery import build
    from google.cloud import bigquery

    ## 固定変数
    PROJECT_ID = os.environ['PROJECT_ID']
    LOCATION   = os.environ['LOCATION']
    KEY_RING   = os.environ['KEY_RING']
    KEY_NAME   = os.environ['KEY_NAME']
    DATASET    = os.environ['DATASET']
    TABLE      = os.environ['TABLE']
    SITE_URL   = os.environ['SITE_URL']

    date_format = "%Y-%m-%d"

    ## 日付のジェネレータ(後で使います)
    def date_range(start_date, end_date, delta=datetime.timedelta(days=1)):
        current_date = start_date
        while current_date <= end_date:
            yield current_date
            current_date += delta

    ## 引数取得(実際は引数チェックなど入れます)
    messages   = json.loads(base64.b64decode(event['data']).decode('ascii'))
    start_date = datetime.datetime.strptime(messages['start_date'], date_format)
    end_date   = datetime.datetime.strptime(messages['end_date'], date_format)


    ## KMSからJSONキーの復元
    logging.info("Fetch encrypted key form kms and decrypt it.")
    kms_client   = build('cloudkms', 'v1')
    key_path     = 'projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}'.format(
                        PROJECT_ID, LOCATION, KEY_RING, KEY_NAME)
    crypto_keys  = kms_client.projects().locations().keyRings().cryptoKeys()
    kms_request  = crypto_keys.decrypt(
                          name = key_path
                        , body = {'ciphertext': os.environ['CIPHERTEXT']})
    kms_response = kms_request.execute()
    json_key     = json.loads(base64.b64decode(kms_response['plaintext'].encode('ascii')))


    ## GSCからデータ取得, GBQへインサート
    logging.info("Fetch data from gsc.")
    scope        = ['https://www.googleapis.com/auth/webmasters.readonly']
    credentials  = service_account.Credentials.from_service_account_info(json_key, scopes=scope)
    gsc_client   = build('webmasters', 'v3', credentials=credentials)

    for target_date in date_range(start_date, end_date): # 先ほどのジェネレーターを利用して日付ごとにデータ取得
        target_date_str = target_date.strftime("%Y-%m-%d")
        gsc_request = {
                      'startDate'  : target_date_str
                    , 'endDate'    : target_date_str
                    , 'dimensions' : ["page","query","device"]
                    , 'rowLimit'   : 1000
                }

        gsc_response = gsc_client.searchanalytics().query(
                                siteUrl   = SITE_URL
                                , body    = gsc_request).execute()
        print("Fetched data of {}".format(target_date_str))

        ## GBQへインサート
        output_rows = []

        if 'rows' in gsc_response:
            for row in gsc_response['rows']:
                output_rows.append([
                    row['keys'][0]
                    , row['keys'][1]
                    , row['keys'][2]
                    , row['clicks']
                    , row['impressions']
                    , row['ctr']
                    , row['position']
                    , target_date_str
                    , SITE_URL
                ])

            if len(output_rows) > 0:
                # JSON KEY を一時的にファイルで利用したいのでtempfileで書き出します
                with tempfile.NamedTemporaryFile(mode='w+t', encoding='utf-8') as tf:
                    json.dump(json_key, tf)
                    tf.seek(0)

                    bq_client   = bigquery.Client.from_service_account_json(tf.name, project=PROJECT_ID)
                    dataset_ref = bq_client.dataset(DATASET)
                    table_ref   = dataset_ref.table(TABLE)
                    table_obj   = bq_client.get_table(table_ref)
                    error       = bq_client.insert_rows(table_obj, output_rows) 

                    # TODO : error 内容に応じた処理

この関数は、想定として Pub/Sub から送られてくるメッセージの中にstart_dateend_date`%Y-%m-%dのフォーマットで渡ってくる仕様にしています。その日付に基づいてrange_dateで日付をジェネレートして一日ごとに Search Console からデータ取得し BigQuery にインサートするという流れです。

関数の引数となっているeventcontextは Pub/Sub から送られてくるデータを受ける引数です。この辺りは公式仕様を確認しておくとよいです。

引数チェックや、同一レコードチェック、例外処理などは割愛しているので適宜追加してください。

BigQuery のテーブルスキーマ

BigQuery のテーブルスキーマーも掲載しておきます。先ほどの関数をデプロイする前にテーブルは作っておきましょう。

[
      {
        "name": "page",
        "type": "STRING"
      },
      {
        "name": "words",
        "type": "STRING"
      },
      {
        "name" : "device",
        "type" : "STRING"
      },
      {
        "name": "clicks",
        "type": "FLOAT"
      },
      {
        "name": "impressions",
        "type": "FLOAT"
      },
      {
        "name": "ctr",
        "type": "FLOAT"
      },
      {
        "name": "position",
        "type": "FLOAT"
      },
      {
        "name": "data_date",
        "type": "DATE"
      },
      {
        "name": "site_url",
        "type": "STRING"
      }
]

このスキーマ―については、Search Console から取得するデータのディメンジョンによってはカラムを追加する必要があるかもしれません(countryとか?・・使わないと思うけど)。

function のデプロイ

さて一通り準備できたので関数をデプロイします。デプロイはコマンドから実行します。今回は Pub/Sub からトリガーを引くことができるように下記のコマンドでデプロイします。

gcloud beta functions deploy [YOUR_FUNCTION_NAME] --trigger-resource [YOUR_TOPIC_NAME] --trigger-event google.pubsub.topic.publish --runtime=python37 --env-vars-file env.yaml 

上記コマンドを実行すると、自動的に、実行ディレクトリのmain.py内にある対象関数をデプロイするようになっています。ですので YOUR_FUNCTION_NAMEmain.pyに記述しているデプロイしたい関数名を入れてください(今回の場合はimport_gsc)。YOUR_TOPIC_NAMEは今回利用する Pub/Sub 内のトピック名を指定します。おそらく既存のトピックを使うことにはならないと思うので、このタイミングで任意に決めて問題ありません(例:import_gsc_topic など)。デプロイすると自動で Cloud Pub/Sub にトピックが作成されています(下記のリンク先で確認できます)。

Cloud Pub/Sub トピック一覧

デプロイした関数の実行

先ほどデプロイした関数を実行します。gcloudコマンドには Pub/Sub 経由でトリガーを引くコマンドがあるのでデバッグとしてこちらを利用します。

gcloud beta pubsub topics publish [YOUR_TOPIC_NAME] --message '{"start_date" : "2018-10-01", "end_date" : "2018-10-02"}'

実行ログの確認

実行ログについては Stackdriver から確認できます。利用している module の関係で WARNING が出ていますが気にしないでください。

GAE と Cloud Pub/Sub でスケジューリングする

Cloud Functions で Search Console のデータを BigQuery に格納できるようになったので、これを定期実行します。が、Cloud Function には定期実行する仕組みありません。そこで Google App Engine の Cron 機能を利用します。Pub/Sub にメッセージを送るだけなのでそこまで複雑ではありませんが、面倒だと思う場合は Cloud Function を HTTP トリガーできるようにデプロイして、その HTTPエンドポイントを 外部サービスから叩くという手もありだと思います(誰でも叩けてしまいますが)。

構成がやや複雑に見えますが、Google 中の人も同じ方法を謳ってるので仕方ありません。

Cloud Functions for Firebase でジョブをスケジューリング(cron)する

構成

GAE 側は4つのファイルを用意します。

project_dir(任意)
├ app.yaml
├ cron.yaml
├ main.py
└ requirements.txt

app.yaml

これは GAE に乗せるアプリケーションの設定と環境変数を記載しておきます。KMS の鍵情報や Pub/Sub のトピック情報。また JSON 鍵 の暗号化文字列を持たせています。

runtime: python37
env_variables:
  PROJECT_ID: [YOUR_PROJECT_ID]
  LOCATION: [KMS_LOCATION]
  KEY_RING: [KMS_KEY_RING]
  KEY_NAME: [KMS_KEY_NAME]
  CIPHERTEXT: XXXXXXXXXXXX(JSON 鍵の暗号化済文字列)XXXXXXXXXXXXXXXXXXX...
  PUBSUB_TOPIC: [YOUR_TOPIC_NAME]

cron.yaml

次に cron の設定を記載します。ここで定期的に GAE 上のURLを叩く仕組みにします。

cron:
- description: "daily summary job"
  url: /import_gsc
  schedule: every monday 09:00
  timezone: Asia/Tokyo

timezone 設定は忘れずにね。

requirements.txt

requirement.txt は Cloud Function と同様に必要な外部モジュールを記載します。今回はアプリケーションプレームワークに Bottle を利用しました。Flask でも問題ありません。お好みでどうぞ。

bottle
google-cloud-pubsub
google-api-python-client

main.py

main.py では実行するトリガーのためのURLの処理を記述します。

from google.oauth2 import service_account
from googleapiclient.discovery import build
from google.cloud import pubsub
from bottle import Bottle, debug, route, run, default_app, request
from pprint import pprint
import datetime
import os
import json
import base64
import logging

logging.getLogger().setLevel( logging.DEBUG )

@route('/')
def index():
    return "Hi, Georgie! Aren't you gonna say hello?"


@route('/import_gsc')
def import_gsc():

    ## アクセス制御 
    if request.headers.get('X-Appengine-Cron') != 'true':
        logging.error('Invalid access.')
        return 'Error'
    
    ## KMSからキーの復元
    logging.info("Fetch encrypted key form kms. And decrypt.")
    kms_client   = build('cloudkms', 'v1')
    key_path     = 'projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}'.format(
                          os.environ['PROJECT_ID']
                        , os.environ['LOCATION']
                        , os.environ['KEY_RING']
                        , os.environ['KEY_NAME']
                    )
    crypto_keys  = kms_client.projects().locations().keyRings().cryptoKeys()
    kms_request  = crypto_keys.decrypt(
                        name = key_path,
                        body = {'ciphertext': os.environ['CIPHERTEXT']})
    response     = kms_request.execute()
    json_key     = json.loads(base64.b64decode(response['plaintext'].encode('ascii')))

    ## PUB/SUBにメッセージを投げる
    logging.info("Send message to pub/sub.")
    
    credentials = service_account.Credentials.from_service_account_info(json_key)
    pub_client  = pubsub.PublisherClient(credentials=credentials)
    topic_path  = 'projects/{}/topics/{}'.format(os.environ['PROJECT_ID'], os.environ['PUBSUB_TOPIC'])

    target_date = datetime.datetime.today() - datetime.timedelta(2)
    target_date = target_date.strftime("%Y-%m-%d")

    logging.info("Target date is {}".format(target_date))
    message     = {"start_date" : target_date, "end_date" : target_date }
    message     = json.dumps(message)
    
    response    = pub_client.publish(topic_path, message.encode('utf-8'))
    pprint(response)

    return 'Success'

app = default_app()

流れとしては、まずアクセス制御。ヘッダーにX-Appengine-Cron:Trueが無いものは弾きます。このヘッダーは GAE の Cron アクセスに付与されるものです。外部からこのヘッダーを持ったアクセスが来た場合が Google 側がそのヘッダーを削除する仕組みになっています(参考:cron.yaml リファレンス )。また Google App Engine の Cron は IP アドレス 0.1.0.1 からのアクセスするらしいのでそれで制御しても良さそうです(未検証)。

アクセス制御が終わったら KMS でJSON 鍵を復号します。その鍵を利用して Pub/Sub にメッセージを送り処理は終了します。データの取得対象日は2日前を指定します。前日だとAPI経由でデータが取得できないことがあるためです。

GAE にデプロイ

コードができたら GAE にデプロイします。cron.yamlもデプロイします。

gcloud app deploy app.yaml cron.yaml

cron を強制実行してテスト

デプロイが完了したら cron を強制実行してテストします。先ほどのデプロイで GAE の Cron Job にジョブが登録されているので、そこから強制実行します。

Google App Engin Cron Job

実行した際のログについては Stackdriver から確認できます。利用している module の関係で WARNING が出ていますが気にしないでください(二回目)。

BigQuery のデータを確認

特にエラーがなければ BigQuery にデータがインサートされていると思います。確認してみましょう。

Big Query (コマンドで確認してもいいです)

特に問題なければ、Pub/Sub 経由で過去分のデータを取得するのもいいかもしれません。先に載せたコマンドを再掲しますので適宜ご利用ください。

gcloud beta pubsub topics publish [YOUR_TOPIC_NAME] --message '{"start_date" : "2018-10-01", "end_date" : "2018-10-02"}'

Data Studio で表示してみる

さて最後に余興として BigQuery のデータを Data Studio で取得してダッシュボードにしてみます。ダッシュボードはいろいろ載せすぎると却ってよく分からなくなるので2,3グラフを載せるのに留めます。 下記のリンクから Data Studio を開いてください。

Google Data Studio

開いたら新しいレポートの開始をします。開いたら右側のデータソースの追加のところから、一番下にある「新しいデータソースを追加」をクリックします。遷移先で BigQuery を選択しウィザードを進んでください。途中、BigQuery のデータセットを選ぶ場面があると思うので、そこで今回作成したデータセットを選択すれば後は自由にデータを抽出できます。

私は自分のダッシュボードとして以下のようなものを作りました。

f:id:astamuse:20181030210745j:plain

全体の検索データを俯瞰しつつ、特に注力したいキーワードを含む検索クエリの数値をグラフ化してます。もう少し本気で作りこめばまぁまぁ使えそうです。 なお、当然ながら BigQuery のデータの取得には料金が発生しますので、誤ってクラウド破産などしないように十分にお気を付けください(自己責任ですお願いします)。

BigQueryで150万円溶かした人の顔

まとめ

ちょっと興味がのってしまいややゴツイ構成になってしまいましたが、サーバーレスアーキテクチャというのは往々にしてこうなるものなのかと思います。ネット上でそこまで情報がなく、手探りだったこともあり土日を潰しましたが非常にいい勉強になりました。もし本稿について何かご質問があれば@YojiShirakiにDMお願いします。

では、本日も最後までお読みいただきありがとうございました。

………あ、そうだ。

例によって当社では一緒にサービス開発してくれるエンジニア・デザイナー・ディレクターを超絶賛募集しております。

カジュアル面談も随時行っておりますので、「ちょっと話聞きたい」という方は、このブログのサイドバー下にあるアドレスか@YojiShirakiにDMいただければと思います。いつでもウェルカムです。採用サイトもありますので下の水色のバナーから是非どうぞ!ではまた:- )

@YojiShirakiの過去記事)

参考

google cloud functions

Cloud KMS

Cloud Pub/Sub

GAE Cron Job

Data Studio

Copyright © astamuse company, ltd. all rights reserved.