みなさんこんにちは。ご無沙汰しておりますKJです。
前回投稿したのは、昨年2020年春の緊急事態宣言が発令される前だったので、ほぼ1年ぶりになります。 昨年春の時は保育園に預けることができずにお家に自粛していた娘はすっかりFireTVにハマってしまい、 2歳半近くになった今はFireTVのリモコンを使いこなすようになりました。。
最近はEテレの幼児向け番組である、いないいないばあっ!のDVD ワンツー!パンツー! にハマっています。
「わん!ちゅー!! ぱん!ちゅー!!!」と言いながら毎日無限リピート再生しています。
一方、その頃私は何をしているかというと、
弊社内に蓄えられているデータから、目的によって様々な集計を行い事業チームに連携しております。 この際、データの集計にはBigQueryをよく利用しており、 事業チームはExcel等で更にアドホックな分析や顧客提案用のグラフに落とし込んでいたりしています。
もちろん、日次や週次など定期的に決まった手段で集計が必要なものや、頻繁に利用される集計であるならば、 データスタジオを利用したり、もしくはダッシュボードを用意するのが便利でしょう。
一方で、顧客提案に利用する場合だと、データの抽出条件や集計条件が案件ごとに異なったりするため、 同一のクエリを再利用できることはほとんどありません。 こういった場合は、データスタジオにビューを定義したり、ダッシュボードを作成する手間をかけるよりも、 CSV等のファイルにエクスポートしたものを渡すほうが手っ取り早いです。 また、受け取る事業チームも使い慣れたツールにすぐに取り込むことができるので、ファイルで連携するのが便利だったりします。
このため、データ集計や抽出の業務は大まかに以下のような流れで運用していました。
- 本番環境セグメントでBigQueryで集計
- 上記の結果をGCSにCSV形式でエクスポートし、それを本番環境セグメントに配置されているGCEにダウンロード
- 本番環境セグメントからローカルマシンにファイルを移す
- GoogleDriveにアップロードして事業チームに連携
もしくは
- ブラウザUIにてBigQueryで集計
- ブラウザからローカルマシンにダウンロード
- GoogleDriveにアップロードして事業チームに連携
上記の手順でしばらく運用していたのですが、次第に 「ファイルをダウンロードしてGoogleDriveにアップロードするのが地味に手間」 という風に思うようになりました。 定型的な業務でありながら時間が取られるため、なんとか簡略化できないかと考えたわけです。
結論として、「BigQueryで集計した結果を、GoogleDrive上のスプレッドシートに直にエクスポートする」環境を整えたので、 それをコード例含めて紹介したいと思います。
そもそもですが。。。
実は、BigQueryとスプレッドシートの連携についてですが、 今回対象となる「BigQueryのデータをスプレッドシートで閲覧する」ための機能はGoogleから提供されています。
しかし、この機能を利用するには Google Workspace (旧GSuite) のEnterpriseプランが前提となります。 Enterpriseアカウントがなければこの機能は利用できませんが、 既にEnterpriseプランを利用しているのでしたら、この機能を積極的に利用しましょう。
今回紹介するのは、Enterpriseプランなしで、BigQueryとGoogleDrive、およびスプレッドシートから提供されているAPIを利用して実現します。
本題
以下の流れをプログラムで処理することで、BigQueryのクエリ結果をスプレッドシートで閲覧できるようにします。
- BigQueryにて集計したいクエリを実行し、結果をBigQuery上のテーブルに保存する。
- GoogleDrive APIを利用して、エクスポート先のスプレッドシートをGoogleDrive上に作成する。
- BigQueryのAPIを利用して、上記1で作成したテーブルのレコードを順次取得する。
- SheetAPIを利用して、上記3で取得したレコードをスプレッドシートに順次書き込む。
- 最後にスプレッドシートのURLを事業チームに連携するだけで、データの連携は完了。
上記のフローのメリットは以下のとおりです。
- GCPへの接続のみで、本番環境セグメントを意識することなく、データをGoogleDriveに出力できる。
- 上記1のクエリ実行による課金コスト、およびAPI実行によるデータ取得等のネットワーク通信による課金コストのみ意識すればよい。
- スプレッドシートへの保存については、現時点ではGoogleDriveの利用容量を消費しませんが、Googleのストレージポリシー変更により、2021/06/01 以降はスプレッドシートの保存にもGoogleDriveの容量を消費するようになります。
- プログラムで全て解決できるので、手作業の手間がかからない。
- 本プログラムを実行する環境をGCP内に配置すれば、通信はGCP内で完結するので、ローカルのインターネット速度制約に縛れれない
一方で、以降の実装例を見れば分かると思いますが、下記のようなデメリットもあるので、この点はご了承ください。
- API実行で、レコード単位での取得および送信が発生するので、レコード件数に比例して時間がかかる。
SheetAPIの時間あたりの実行可能数がかなり抑えられているので、時間がかかる。
また、以下の実装例はPython3.7+になります。
実装詳細
利用するライブラリのインストール
まずは予め利用するライブラリをインストールしてください。 なお、以下ではpipを利用していますが、仮想環境を利用する場合は、適宜ツールに合ったコマンドに置き換えてください。
$ pip install google-cloud-bigquery $ pip install google-api-python-client $ pip install gspread
今回、スプレッドシートを操作するにあたり、gspreadを利用しています。 Googleが提供するSheetAPIを直接利用してもよいのですが、こちらは柔軟な操作ができる分、API仕様が複雑です。 今回のようにスプレッドシートを作成して、シートにデータを記入するだけなら、gspreadで簡単に実装できます。
また、GCPのプロジェクト設定にて、スプレッドシートAPIを有効にしてください。
BigQuery APIを利用して取得対象のテーブル情報を取得
まずはBigQueryにアクセスするためのclientを作成のうえ、テーブル情報を取得します。 この時点ではテーブル情報までで、実レコードは取得していません。
import google.auth from google.cloud import bigquery table_name: str = ... # 取得対象のテーブル名をデータセット名含めて指定する credentials, project_id = google.auth.default() bq_client: bigquery.Client = bigquery.Client(project=project_id, credentials=credentials) bq_table: bigquery.Table = bq_client.get_table(table_name)
clientを作成するにあたり、ハマりがちになるのがクレデンシャルの取得でしょう。
環境によってユーザアカウントやサービスアカウントを直接利用していたり、
もしくは、Application Default Credential を利用していたりなど様々かと思います。
私が観測している範囲のみになりますが、 google.auth.default()
により大方問題なくクレデンシャルを取得できるかと思います。
詳細はこちらをご参照ください。
(今回、エンドユーザ認証は想定しておりません!)
エクスポート先のスプレッドシートを作成
では次にgspreadを利用して、スプレッドシートを作成します。 以下のコードではエクスポート先のシートの作成も合わせて行っています。
import gspread from gspread.models import Spreadsheet, Worksheet file_name: str = ... # エクスポート先となるスプレッドシートのファイル名 credentials, _ = google.auth.default( scopes=[ 'https://spreadsheets.google.com/feeds', 'https://www.googleapis.com/auth/drive' ] ) gspread_client: gspread.client.Client = gspread.authorize(credentials) # スプレッドシートを作成する spread_file: Spreadsheet = gspread_client.create(file_name) default_sheet: Worksheet = spread_file.get_worksheet(0) # レコード出力先のシートを作成し、スプレッドシート作成時にデフォルトで作成されるシートを削除する to_sheet: Worksheet = spread_file.add_worksheet( title=bq_table.table_id, rows=(bq_table.num_rows + 1), cols=len(bq_table.schema) ) spread_file.del_worksheet(default_sheet)
gspreadのクライアントを作成するにあたり、bigqueryのクライアントと同様クレデンシャルが必要になります。
ここでは再度、新規にクレデンシャルを取得していますが、bigquery.Client作成時のクレデンシャルを流用しても問題ないと思います。
ただし、scopesに https://spreadsheets.google.com/feeds
, https://www.googleapis.com/auth/drive
の指定を
忘れないようにしてください。
次に、スプレッドシート作成後、エクスポート先となるシートを作成しているのですが、
to_sheet: Worksheet = spread_file.add_worksheet( title=bq_table.table_id, rows=(bq_table.num_rows + 1), cols=len(bq_table.schema) )
シート名はbqテーブルのテーブル名(bq_table.table_id
)とし、
さらにエクスポート元のテーブルのレコード数とカラム数に合わせて、シートの行数と列数を決めています。
カラム名の行を初めに入れるため、rows
パラメータに指定する値を「レコード数 + 1」としています。
(もちろん、シート名はご自由に指定しても問題ありません)
なお、スプレッドシートの制約上、1ファイルにつき最大500万セルまでしか作成できません。 このため、「(レコード数 + 1) * カラム数」が500万以上になるとエラーが発生します。 また、スプレッドシートを作成した際、初期シートとして「1000行 * 27列(AからZまで) = 2万7千セル」が作られているため、 シート追加時には、この初期シートのセル数を含めて500万セルを越えないよう、注意する必要があります。
スプレッドシートを特定のGoogleDrive上に移動
実は上記の処理でスプレッドシートが作成される場所は、 利用したクレデンシャルに紐づくアカウントのGoogleDriveホーム直下になります。 このままだと、ファイルの共有が難しいので、 「ファイルを共有する」もしくは「共有状態に予め設定している場所に移動する」必要があります。 今回は後者の方針をとります。
import googleapiclient.discovery as discovery gdrive_service: discovery.Resource = discovery.build('drive', 'v3', cache_discovery=False) gdrive_files: discovery.Resource = gdrive_service.files() spreadsheet_file: dict = gdrive_files.get(fileId=spread_file.id, fields='parents').execute() previous_parents: str = ",".join(spreadsheet_file.get('parents')) directory_id: str = ... # 移動先となるGoogleDriveのディレクトリID # 親を変更することで、ファイル移動を実現する gdrive_files.update( fileId=spread_file.id, fields='id, parents', addParents=directory_id, removeParents=previous_parents ).execute()
ここで、directory_id
は移動先となるGoogleDriveのディレクトリIDとなります。
ディレクトリIDには、URLの https://drive.google.com/drive/folders/
以降の文字列を指定してください。
ファイルを移動するにあたり、移動先のディレクトリに対して、
クレデンシャルが紐づくアカウントに編集権限が予め付与されている必要があります。
シートの1行目にカラム名を記載する
テーブルをレコードを出力する前に、各カラム名を先に出力します。 これは既に取得しているテーブル情報から対応できます。
from typing import Dict, List from gspread.models import Cell # カラム名->列番号 の対応関係を作成する column_mapping: Dict[str, int] = {} column: int = 1 for field in bq_table.schema: if field.mode.upper() == "REPEATED": raise Exception() if len(field.fields) > 0: raise Exception() column_mapping[field.name] = column column = column + 1 # スプレッドシートの1行目に各列のタイトルを追加 sheet_header: List[Cell] = [ Cell(row=1, col=column_number, value=header_name) for (header_name, column_number) in column_mapping.items() ] to_sheet.update_cells(sheet_header) to_sheet.freeze(rows=1)
ここで、カラムが配列もしくは構造体の場合は例外を発行しています。
配列および構造体が絡むと出力時の形式が煩雑になるので、今回は対象外にしています(処理としては例外が投げられることでプログラムが中断されます)。
また、ここで作成した column_mapping
は最後のエクスポート処理でも利用しています。
bqテーブルよりレコードを取得してスプレッドシートに出力
前置きが長くなりましたが、実レコードをbqテーブルより取得して、シートに書き込むのは以下のようになります。
row_iterator: bigquery.table.RowIterator = bq_client.list_rows( bq_table, start_index=0, max_results=bq_table.num_rows ) row_number: int = 1 sheet_cells: List[Cell] = [] for record in row_iterator: row_number += 1 for target_key, target_value in record.item(): column_number: int = column_mapping[target_key] sheet_cells.append( Cell(row=row_number, col=column_number, value=target_value) ) to_sheet.update_cells(sheet_cells)
list_rows()
は bq head
コマンドに該当するもので、このAPIでレコードを取得する分にはクエリコストはかかりません。
スプレッドシートに書き込む際は、 Cell
オブジェクトに行番号と列番号、およびセルの値を指定した上で、
Worksheet.update_cells()
の引数に指定するリストに渡します。
このAPIの呼び出しは、たとえ1セルだけであっても1秒程度時間かかるので、
更新するセルをある程度の件数まとめた上で呼び出すようにするとよいでしょう。
これで、bqテーブルのレコードをスプレッドシートにエクスポートするプログラムが作成できました。
その他、気をつけること
既に記載したことも含まれますが、実運用時は以下の点を考慮する必要があります。
- スプレッドシートは最大500万セルまでしか作成できません。500万セルを超過するような場合は、bqテーブルのレコード数を絞る、もしくは、スプレッドシートの出力先を複数ファイルに分割する必要があります。
- 1セルあたり最大文字列長は5万文字となりますので、超過しないよう制御する必要があります。
- SheetAPIの流量制限はかなり厳しく設定されています。スプレッドシート書き込み時にエラーコード429が返ってきたらAPIリソース超過になります。しばらく待ってから処理を再開するよう、制御する必要があります。(半年ほど前はリソース超過時に500エラーが返却されることもありましたが、最近は安定しているように思えます)
- 上記の実装例ではbqテーブルのレコードを全て取得してからスプレッドシートに書き込んでいますが、レコード数が多いとメモリが足りなくなる可能性があります。適宜、ページングしながらスプレッドシートに書き込むようにしてください。
データの連携というのは単純作業で地味な上、データサイズが大きいと時間がかかったりで、チリツモなストレス要因だったりします。 今回紹介したコードがみなさまのデータ連携の一助になれば幸いです。
最後になりますが、アスタミューゼでは、引き続きエンジニアを積極募集中です。是非、下記バナーよりお尋ねください。