astamuse Lab

astamuse Labとは、アスタミューゼのエンジニアとデザイナーのブログです。アスタミューゼの事業・サービスを支えている知識と舞台裏の今を発信しています。

簡易的にデータリネージを試してみる

はじめまして。データチームのKimy(@yuu_kimy)です。
日々、各種データの整備に関わる開発を行っています。
早いもので、アスタミューゼにジョインしてから、1年が過ぎました。

ジョインしてから、グラント(研究助成)や特許データの整備開発、各種案件対応を行ってきましたが、最近では、特許の概念や用語の勉強の日々です。
だいぶ慣れてきたかなと感じていますが、やっぱり、難しいですね..

さて、案件対応では、蓄積されたデータを活用して、各種データの加工・集計等を行っていますが、様々なデータを利用することで、データのインプット・アウトプットの流れが分かり辛くなってくることがあります。
利用するテーブルが数個であれば、そのデータを処理するソースコードを見れば、パッと流れが掴めますが、数多くのテーブルを利用して、集計して、集計した結果を別に処理をした結果とジョインして、更に、フィルターをかけて..etc となってくると、処理が複雑となり、大まかな流れが掴み辛くなってきます。

データマネジメント領域で言う「データリネージ」(どのデータを利用して、どう生成されているかが明らかになっていること)ができると何かと便利そうなので、簡易的なデータリネージにトライしてみたいと思います。

今回は、SQLに対するデータリネージをやってみます。
(SQLであれば、何でも大丈夫だと思います。)

データリネージとは?

前述の通り、「どのデータを使って、どういう風に生成しているか」といったデータの流れが整理・可視化されることを指します。また、データの処理における変換も対象となります。
データリネージにより、データガバナンスにおける色々な検証を可能にするかと思いますが、それだけで話が多岐に渡りそうなので、詳細は割愛させて頂きます。

Pythonモジュールの「sqllineage」について

Pythonのモジュールに、sqllineage なるツールを聞きつけたので、試しに使ってみたいと思います。
このツールは、SQLを解析して、ソーステーブルやターゲットテーブルの情報を教えてくれる便利なものです。(裏側では、sqlparseというSQLパーサが利用されているようです。)

早速、使ってみる

インストールは、非常に簡単です。以下を実行するだけです。

pip install sqllineage

インストール後は、ドキュメントにあるように、以下のようにコマンドを実行すれば、SQLの解析結果を表示してくれます。

sqllineage -e "insert into table_foo 
    select * from table_bar union select * from table_baz"

結果は、以下のように表示されます。

Statements(#): 1
Source Tables:
    <default>.table_bar
    <default>.table_baz
Target Tables:
    <default>.table_foo

SQLに書かれている通り、ソーステーブルの"table_bar" と "table_baz"、ターゲットテーブルの"table_foo"が表示されています。

SQLを解析してみる

それでは、SQLを解析してみます。コマンドに、SQLの全てを書くのは面倒なので、クエリファイルに保存してから、先ほどと同様に、コマンドを実行します。

# 検証用SQL: target_sql_1.sqlとして保存
# 注意: 以下のSQLは、あくまで、今回のサンプル用に用意したものです。(以下同様)

CREATE TABLE output.final_result_table AS
WITH intermediate_table AS (
    SELECT
        input_table_1.original_col_1,
        input_table_2.original_col_2,
        LEFT(input_table_3.original_col_3, 4) AS converted_col_3
    FROM input.input_table_1 AS input_table_1
        INNER JOIN input.input_table_2 AS input_table_2
            ON input_table_1.dummy_common_col_1
                = input_table_2.dummy_common_col_1
                AND input_table_2.dummiy_flag = '1'
        INNER JOIN input.input_table_3 AS input_table_3
            ON input_table_2.dummy_common_col_2
                = input_table_3.dummy_common_col_2
)

SELECT
    input_table_5.original_col_5,
    input_table_6.original_col_6,
    input_table_7.original_col_7,
    ROUND(input_table_8.original_col_8 /
        input_table_9.original_col_9) AS round_result_1,
    intermediate_table.original_col_1,
    intermediate_table.original_col_2,
    intermediate_table.converted_col_3
FROM input.input_table_5 AS input_table_5
    LEFT JOIN input.input_table_6 AS input_table_6
        ON input_table_5.dummy_common_col_5
            = input_table_6.dummy_common_col_5
    LEFT JOIN input.input_table_7 AS input_table_7
        ON input_table_6.dummy_common_col_6
            = input_table_7.dummy_common_col_6
    LEFT JOIN input.input_table_8 AS input_table_8
        ON input_table_7.dummy_common_col_7
            = input_table_8.dummy_common_col_7
    LEFT JOIN input.input_table_9 AS input_table_9
        ON input_table_8.dummy_common_col_8
            = input_table_9.dummy_common_col_8
    LEFT JOIN intermediate_table
        ON input_table_9.dummy_common_col_9
            = intermediate_table.dummy_common_col_9

上記のSQLを解析してみます。クエリファイルを指定する場合は、-f のオプションを指定します。

sqllineage -f target_sql_1.sql

結果は以下の通りです。

Statements(#): 1
Source Tables:
    input.input_table_1
    input.input_table_2
    input.input_table_3
    input.input_table_5
    input.input_table_6
    input.input_table_7
    input.input_table_8
    input.input_table_9
Target Tables:
    output.final_result_table

WITH句で利用したテーブルのinput_table_1〜3も、きちんと、ソーステーブルとして表示されているようです。

更に、詳細な情報が欲しい場合は、-v オプションも指定して、実行します。(-v -fの順番で指定する必要があるようです。)

次は、以下のSQLで解析を実行してみます。

## 検証用SQL: target_sql_2.sqlとして保存
INSERT INTO output.final_result_table_2
SELECT
  input_table_1.original_col_1,
  converted_table.common_col_2_1,
  converted_table.converted_rank,
  input_table_3.original_col_3
FROM
  input.input_table_1 AS input_table_1
INNER JOIN
   (
        SELECT DISTINCT
            common_col_2_1,
            common_col_2_2,
            RANK() OVER (
                PARTITION BY common_col_2_1 ORDER BY common_col_2_2
            ) AS converted_rank
        FROM input.input_table_2
    ) AS converted_table
    ON
      input_table_1.common_col_1 = converted_table.common_col_1
INNER JOIN
  input.input_table_3 AS input_table_3
    ON input_table_1.common_col_2 = input_table_3.common_col_2
WHERE
    input_table_3.dummy_flag = '1'
    AND input_table_3.dummy_date <= '2021-09-21'

以下のように実行します。

sqllineage -v -f target_sql_2.sql

結果は、以下の通りです。

Statement #1: INSERT INTO output.final_result_table_2SELECT  inp...
    table read: [Table: input.input_table_1, Table: input.input_table_2, Table: input.input_table_3]
    table write: [Table: output.final_result_table_2]
    table rename: []
    table drop: []
    table intermediate: []
==========
Summary:
Statements(#): 1
Source Tables:
    input.input_table_1
    input.input_table_2
    input.input_table_3
Target Tables:
    output.final_result_table_2

サブクエリの中で呼び出している input_table_2 も、きちんと、ソーステーブルに含まれているようです。 -v オプションを付けることで、SQLのステートメントごとのREAD/WRITE等を表示してくれるようですね。

更には、SQLの解析結果を可視化することも可能です。

# -gオプションをつけて実行する
sqllineage -g -f target_sql/target_sql_5.sql

実行すると、リンクが表示されますので、ブラウザからアクセスすると、以下のような描画が表示されます。
f:id:astamuse:20210921230253p:plain

描画されたテーブルのノードにカーソルを当てると、ハイライトしてくれるようですね。 f:id:astamuse:20210921230602p:plain

SQLのテーブルのインプットとアウトプットの流れが、パッと見て、分かる状態になりました。

まとめ

sqllineageを利用することで、SQLの流れを容易に掴むことができました。今回のサンプルでは、シンプルなSQLでしたが、更に、複雑なSQLだと、その威力を発揮してくれそうです。
ただ、あくまで、テーブルのインプットとアウトプットの流れを掴むことがメインのため、カラムレベルまで踏み込んだデータリネージではないようです。

また、注意ですが、sqllineage (その裏側で動いているsqlparse)は、SQL自体の妥当性をチェックしているわけではないので、本来は、エラーが発生するようなSQLでも、リネージが表示されてしまいます。その場合、想定した解析結果にならないケースもあり得るため、注意が必要なようです。

アスタミューゼでは、エンジニア・デザイナーを募集中です。ご興味のある方は、弊社の採用サイトからご応募頂けたらと思います。

ご応募をお待ちしておりますm( )m

データベースを使わずに、有効期限付きURLを生成したい

自称データエンジニアのaranです。 月日の流れは早いもので、去年の9月以来の再登板になります

私ごとですが
先月に健康診断があり、試しに1日1食の生活を3ヶ月ほど実施してみました。

ストイックに毎日続けるのは無理なので、以下の条件下で試しました

  • 1日1食だが、好きなだけ食べる

  • 土日は食事制限しない

  • ガマンできない時はチョコレート(一口分)を食べてよい。ただし3口分まで

  • みんなと食事するときは、お昼を食べてよい

開始当初は、お腹がなりまくっていたのですが、しばらくするとお腹はならなくなります。 また、一番の体の変化は、昼以降に眠くならないことです。
(前日の睡眠時間次第のところはありますが)

友人によく、朝食べないと体にエネルギーがなく、パフォーマンスが...的なことを言われました。
ただ、私に限ったことですが、まったく問題なかったです。
むしろ、昼以降に眠くならず、集中力が続くので、作業効率はUPしたような気がします

健康診断の結果は改善した項目があり、私にとって1日3食は食べ過ぎなんだなって思いました。

はじめに

わたしのここ最近の業務はデータ整備で
構造化データをパースし、データ整形してデータ登録する いわゆるETL作業を行っております。

楽しく作業している傍ら、データソース元は公開データでかつ、閉じた環境で作業していることもあり
セキュリティに無頓着になっています。。

こんな状況の中とある案件で
簡単に改ざんできないような有効期限付きURLが必要になり
久しぶりにセキュリティを意識することになりました。

今回は、有効期限付きURLをどう生成したかについてお話したいと思います。

有効期限付きURLについて

有効期限付きURLが必要の際
URLにパラメータを入れて、そのURLと有効期限をデータベースに登録する方法を
主に採用すると思います。

当案件の要件(制約)は、データベース利用せずに有効期限付きURLを生成することで
これって賢人が既にやっているだろうと思い、ちょっと調べていたら
賢人のブログを発見しました。

このブログのサンプルソースは PHP なのですが
当案件では、python(ver 3.8)を利用しているので、一部移植してみました。

手始めに

今回は参考サイトの手法以外も試しています。

AES方式を使って有効期限付きURLを生成

まず、AES方式を使って有効期限付きURLを生成してみます。

簡単な手順は以下になります

  1. 秘密鍵を事前に生成する

  2. 有効期限を暗号化

  3. 暗号化した有効期限をURLパラメータ化

  4. 受け取ったパラメータを復号化

  5. 有効期限のチェック

暗号化・復号化には PyCryptodome ライブラリを利用しました。

pycryptodome.readthedocs.io

最初は PyCrypto ライブラリを利用していましたが
ドキュメントを読むと2013年10月以降アップデートが止まっているので、
PyCryptoの利用は控えました。

尚、暗号化・復号化の処理はこちらのソースを参考しています

有効期限付きURL生成するサンプルコードです。

サンプルコード

import datetime
import base64
from urllib import parse

from Crypto import Random
from Crypto.Cipher import AES
from Crypto.Util import Padding

SECRET_KEY: bytes = b'秘密鍵を生成し、適宜管理して下さい'
BASE_URL: str = 'http://example.com'


def generate_url() -> str:
    def __encrypt(raw_data: str) -> str:
        iv: bytes = Random.get_random_bytes(AES.block_size)
        cipher = AES.new(SECRET_KEY, AES.MODE_CBC, iv)
        pad_data = Padding.pad(raw_data.encode('utf-8'), AES.block_size, 'pkcs7')
        return base64.b64encode(iv + cipher.encrypt(pad_data)).decode()

    # 有効期限の設定
    expiry: datetime.datetime = datetime.datetime.now() + datetime.timedelta(minutes=5)
    expiry_string: str = str(int(expiry.timestamp()))
    # 有効期限の暗号化
    param_expiry: str = __encrypt(expiry_string)
    # 有効期限付きURL生成
    return f"{BASE_URL}?expiry={param_expiry}"

こちらを実行すると、以下のように有効期限つきURLを生成できます

>>> generate_url()
https://example.com?expiry=N934g0hrNZp4weCvIpYsTw1psEgGIwW6T4NMSVjDI6E=

生成した有効期限付きURLをパースしてみます。

サンプルコード

# import文は上記と同じなので、省略

def validate_url(url: str):
    def __parse_url(url: str) -> str:
        qs: str = parse.urlparse(url).query
        q: dict = parse.parse_qs(qs)

        expiry_from_request: str = q['expiry'][0]
        return expiry_from_request

    def __decrypt(enc_data: str) -> str:
        enc: bytes = base64.b64decode(enc_data)
        iv: bytes = enc[:AES.block_size]
        cipher = AES.new(SECRET_KEY, AES.MODE_CBC, iv)
        unpad_data: bytes = Padding.unpad(cipher.decrypt(enc[AES.block_size:]), AES.block_size, 'pkcs7')
        return unpad_data.decode('utf-8')

    def __now_ts() -> int:
        return int(datetime.datetime.now().timestamp())

    # URLパラメータのパース
    expiry_from_request: str = __parse_url(url)
    # 有効期限の復号化
    expiry_ts: int = int(__decrypt(expiry_from_request))

    if expiry_ts < __now_ts():
        print('期限切れURL')

有効期限付きURLをチェックしてみます。

>>> url: str = 'https://example.com?expiry=N934g0hrNZp4weCvIpYsTw1psEgGIwW6T4NMSVjDI6E='
>>> validate_url(url)
期限切れURL

有効期限の時間経過後にチェックしたところ、期限切れを確認できました。

ハッシュ(HMAC)を使って有効期限付きURLを生成

次にブログにある
ハッシュ(HMAC)を使って、有効期限付きURLを生成したいと思います。

簡単な手順は以下になります

  1. 秘密鍵を事前に生成する

  2. 有効期限をシリアライズ

  3. 共有鍵を生成する

  4. 導出鍵を生成する

  5. 有効期限、共有鍵、導出鍵をURLパラメータ化

  6. 受け取ったパラメータを復号化

  7. 改ざんチェック

  8. 有効期限をデシリアライズ

  9. 有効期限チェック

尚、ブログでPythonにはない関数を利用していますので、完全な移植ではありませんし、
一部処理を簡素化しています。

サンプルコード

有効期限付きURLを生成するサンプルコードです。

import base64
import datetime
import hashlib
import hmac
import secrets
import pickle
from typing import Tuple
from urllib import parse

def generate_url() -> str:
    def __generate_salt() -> bytes:
        return secrets.token_hex(16).encode('utf-8')

    def __generate_context(expiry: str) -> bytes:
        return pickle.dumps((base_url, expiry))

    def __generate_derived_key(salt: bytes, context: bytes) -> bytes:
        prk: bytes = hmac.new(SECRET_KEY, salt, hashlib.sha256).digest()
        return hmac.new(prk, context, hashlib.sha256).digest()

    # 有効期限の設定
    expiry: datetime.datetime = datetime.datetime.now() + datetime.timedelta(minutes=5)
    expiry_string: str = str(int(expiry.timestamp()))
    # コンテクスト(URL, 有効期限)
    context: bytes = __generate_context(expiry=expiry_string)
    # 公開鍵の生成
    salt: bytes = __generate_salt()
    # 導出鍵
    derived_key: bytes = __generate_derived_key(salt=salt, context=context)

    param_key1: str = base64.b64encode(derived_key).decode()
    param_key2: str = base64.b64encode(salt).decode()
    param_context: str = base64.b64encode(context).decode()
    # 有効期限付きURL生成
    return f"{BASE_URL}?key1={param_key1}&key2={param_key2}&context={param_context}"

生成した有効期限付きURLをチェックするサンプルコードです。

サンプルコード

# import文は上記と同じため、省略

def validate_url(url: str):
    def __parse_url(url: str) -> Tuple[bytes, bytes, bytes]:
        qs: str = parse.urlparse(url).query
        q: dict = parse.parse_qs(qs)

        # TODO: デコード時にエラーが発生する場合がありますが、今回はその対策を省略しています

        # 送られてきた導出鍵
        encoding_derived_key = q['key1'][0]
        derived_key_from_request: bytes = base64.b64decode(encoding_derived_key)

        # 送られてきた共有鍵
        encoding_salt = q['key2'][0]
        salt_from_request: bytes = base64.b64decode(encoding_salt)

        # 送られてきたコンテクスト(URL, 有効期限)
        encoding_context = q['context'][0]
        context_from_request: bytes = base64.b64decode(encoding_context)

        return (derived_key_from_request, salt_from_request, context_from_request)

    def __validate_request(derived_key_from_request: bytes, salt_from_request: bytes, context_from_request: bytes) -> bool:
        validate_prk: bytes = hmac.new(SECRET_KEY, salt_from_request, hashlib.sha256).digest()
        validate_derived_key: bytes = hmac.new(validate_prk, context_from_request, hashlib.sha256).digest()

        return hmac.compare_digest(derived_key_from_request, validate_derived_key)

    def __now_ts() -> int:
        return int(datetime.datetime.now().timestamp())

    def __extracted_expiry_ts(context_from_request: bytes) -> int:
        deserialize_contents: Tuple['str', 'str'] = pickle.loads(context_from_request)
        return int(deserialize_contents[1])

    # リクエストパラメータのパース
    derived_key_from_request, salt_from_request, context_from_request = __parse_url(url)

    if not __validate_request(derived_key_from_request, salt_from_request, context_from_request):
        raise Exception("不正なリクエスト")

    # 有効期限の抽出
    expiry_ts: int = __extracted_expiry_ts(context_from_request)
    if expiry_ts < __now_ts():
        print('期限切れURL')

最後に

参考にしたブログに書いてある通り、データベースを使わないので、大量の有効期限付きURLを発行しても
アクセス管理ができることがメリットだとわかりました。
要件次第になりますが、選択肢のひとつになるかと思います。

アスタミューゼでは、エンジニア・デザイナーを募集中です。 ご興味のある方は遠慮なく採用サイトからご応募願います。是非、お待ちしています。

Google Apps Scriptで脱Excel化!?

f:id:astamuse:20210901103932p:plain お久しぶりでございます。scala等でバックエンドを開発しているaxtstarでございます。

はじめに

DX*1が叫ばれて久しい昨今ですが、日々作成されるデータが json だったり yaml だったりにもともとなっていて、簡単にプログラムで取り込めることはあまりないことだと思います。 人が作るデータ形式は、大体はExcelやcsvまたはスプレッドシートの形式が多いと思います。

私もDXの流れに乗って日々スプレッドシートに向かっております。

当然、人の作成したものですので誤りがあるのはある程度は仕方ないので、できる範囲のチェックを実施していることもあるかと思います。

例えば、文字列を範囲の中から選ぶようにするとか、

f:id:astamuse:20210901080608p:plain
範囲から選択

計算式を埋め込んでチェックをしていたり、

f:id:astamuse:20210901080023p:plain
計算式を埋め込んでチェック

関数を一元管理したい

上記以外にも関数などをスプレッドシートにApp Scriptで追加して効率化していました

例 URLエンコード、URLデコード

/**
 * URLエンコード
 * @param value 対象文字列
 */
function encode(value) {
    return encodeURIComponent(value);
}
/**
 * URLデコード
 * @param value 対象文字列
 */
function decode(value) {
    return decodeURIComponent(value);
}

↑こういうのをスプレッドシートで開いて、コピペで張り付けていました。

f:id:astamuse:20210901090744p:plain
コピペで張り付ける前に開くスクリプトエディタ

チェックすべきスプレッドシートが少なかったころは、このようなやり方でも問題は無かったのですが、多くなってくると、どんどんどんどん、どのスプレッドシートに関数があったかよくわからなくなってきます。 また一度作った関数を改良した場合などはもっとカオスで。。。
せっかく 脱Excel をスプレッドシートで実現したのに、App Scriptがいわゆる メンテ不能なExcelマクロ と同様になってしまうという罠に陥りかけました。

そこで目を付けたのが、Google製の clasp というツール。

github.com

このツールはローカルで作られたApp ScriptのプロジェクトからターゲットのスプレッドシートにApp Scriptを埋め込むことができます。

下記で認証して認可してしまえば、あとはスクリプトのIDを準備すればOKでした。*2

clasp login

許可

f:id:astamuse:20210831171420p:plain
claspに認可を与える図

こういうのを用意(xxxxxxxxxxxxxxxxxxxxxxはスクリプトのID*3が入ります)
.clasp.json

{
    "scriptId":"xxxxxxxxxxxxxxxxxxxxxx",
    "rootDir": "./src"
}

フォルダ構成

.
├── .clasp.json
├── clasp
└── src
     └── Code.js

App Scriptのアップロード

clasp push

f:id:astamuse:20210901092610p:plain

おおすごい、楽だ!!!

何度もやってるバリデーションを汎用化してみた

不思議なもので 、こうやってバージョン管理が簡単にできるようになってくると、関数の修正にもモチベーションが出てきます。

App Scriptで必須チェックや文字列チェック、数値範囲チェックなどの関数を作って、チェック列で視覚的にわかるようにしてみました。

f:id:astamuse:20210901004850p:plain
record_check() 関数を作って入力の不備をお知らせしている図

ダウンロード機能も追加してみた

また、バリデーションの列が追加されてしまったために、ダウンロード時に Excel化 してその列を削除するみたいなことをするのは、本末転倒なのでスプレッドシートから直接CSVへのダウンロードも合わせて作りました。

f:id:astamuse:20210831181140p:plain
ダウンロードしている図

この変更のせいで初回?実行時にGドライブへの権限が求められるようになってしまいました。

どうやったか?

こちらにあげてますのでよろしければ参考にしていただければと思います

github.com

まとめ

  • 関数を一元管理できるのは、イマドキとしては当然なのですが大変すばらしことだった。
  • 認証認可の部分はちょっとややこしいが、イマドキとしては仕方ないことだった。
  • どこにあるかよくわからないものはモチベーションを下げるということがよくわかった。

それではハッピーなDXライフを!!

注意点

AppScriptを実際に実行するには結構認証認可手順が必要です。

  • AppSscriptAPIの実行許可
  • デプロイツール自体の認証認可(これはclaspの話です)
  • AppScriptの認証(初回)
  • 実行時の認可(使用する権限によって)

これだけ必要なのがちょっとつらいところかもしれません。

<<参考>>

AppSscriptAPIの実行許可

f:id:astamuse:20210831170959p:plain
AppSscriptの実行許可

*1:デジタルトランスフォーメション

*2:App Scriptを全部書き換えてしまうハズなのでその辺は最初注意が必要です。

*3:スプレッドシートのIDではなくスクリプトのIDです

Copyright © astamuse company, ltd. all rights reserved.