astamuse Lab

astamuse Labとは、アスタミューゼのエンジニアとデザイナーのブログです。アスタミューゼの事業・サービスを支えている知識と舞台裏の今を発信しています。

デザイナーだけどデブサミいってきたよ #devsumi

f:id:astamuse:20200217001338p:plain
#devsumi

こんにちは、アスタミューゼでデザイナーをしている@YojiShirakiです。

今年もデブサミが無事に開催されましたね!運良くブログ順も回ってきたので備忘も兼ねてレポートします。ちなみに、過去にはデブサミについてこんな投稿もしておりますので御覧ください!

lab.astamuse.co.jp

デブサミとは

デブサミは正式名称は Developers Summit 。翔泳社主催のエンジニア向けイベントです。

Developers Summit 2020

3000人規模の集客を誇る国内でも大規模な部類のイベントで、2日間にわたり約100のセッションによって展開されます。内容は、エンジニアリングにとどまらず、組織、ビジネス、キャリアなど、幅広く、二日間で様々なトピックに触れられるのが特徴・魅力的なイベントです。

今年のテーマは「ともにつくる」。チーム内に留まらず、チームを超え、組織を超え、皆で良いサービスを作ろう、というメッセージですね。公式サイトに良いことが書いてあったので引用しておきます。

ともにつくる。それは、さまざまなテクノロジーを組み合わせ、エンジニア同士が協力すること。エンジニアと他のロールのメンバーが手を取り合うこと。プロダクトの先にあるユーザーのことを思うこと。組織を越えた仲間と志を一つにすること。デブサミ2020では、一歩外へ踏み出す勇気を携え、まわりをエンパワーメントしていきたいエンジニアに対して、エールを送ります。

では見ていきましょう!

注目:クリエイター向けのセッション群 Creators MIX 2020

f:id:astamuse:20200217003959j:plain

今年のデブサミの一番の注目はここですね!デザイナー・クリエイター内容に特化した「Creators MIX 2020」なるセッショントラック。ラインナップが迫力モノで、PARTYの伊藤さん、フラクタの河野さん、ベイジの枌谷(そぎたに)さんと馴染みのある方たちが並んでいます。

サービス・プロダクト開発というコンテキストにおいて、こういった方たちが越境登壇して発信されることの貴重さたるや。目の付け所もさることながら、高い質感で体現された翔泳社様並びにスタッフの皆さんには感謝感激しました。

で、実際どうだったの?

二日目のみの参加ではありましたが、例年通り安定した内容だったように思います。特に先程の Creators MIX 2020 のトラック。正直なところ PARTY 伊藤さんのセッションだけでもデブサミ行ってよかったレベルです。

ということでいくつか印象残ったものをご紹介。

印象深いセッション1 : 組織の創造性を高めるために必要なこと

f:id:astamuse:20200217003844j:plain

今回一番印象に残ったセッションがこちら。クリエイティブファーム PARTY伊藤さん のセッション。組織の創造性を高めるという観点でお話いただきましたが、それに留まらず、直近の PARTY の取り組みなども併せて紹介されてました。個人的に刺さったのは以下の3点。

  1. エクイティで業務を受けることがある。そこで強力に自分事化する
  2. アート・サイエンス・デザイン・エンジニアリングを越境をすることが大事
  3. 産学・業種を超えたスペシャリストで構成されたメンバーが必要

PARTY がエクイティで業務うけているとは、不勉強にも知りませんでした(汗。しかも VALU がそうだったんですね。

エクイティでの業務請負は最近よく聞くようになりましたが、当然、リターン相応にリスクもある訳で。他の収益基盤が安定していればこそ取れる選択であり、そのあたり含めてやりきれるかどうかが難しいところです。おそらく下記の点含めていろいろ課題もあるでしょうから、そのあたりのノウハウを蓄積して公開されてくるるともっと面白そうだと感じました。

  • 提供リソースと対価の関係はどうやって勘定しているのか
  • DD/Valuation の妥当性(適当にやると税務怖そう)
  • SOなのか実株なのか
  • どこでEXITするつもりなのか

f:id:astamuse:20200217003906j:plain

また、3の「産学・業種を超えたスペシャリストで構成されたメンバーが必要」という点について、「産学」「大学」というキーワードにグッときました。クリエイティブファームで「産学」や「大学」という観点がそもそも面白いです。また、これは社会的に良い傾向だと感じます。実際、大学発技術系ベンチャーでは取り扱っている技術がわかりにくくても、メッセージ・アウトプットを研ぎ澄まして大化けするものもあります。また、技術の用途展開を新しい切り口から見つけるのはクリエイティブの得意とするところですから、元来この二者は相性は良いはずです。ただ現実では、クリエイティブ側に技術理解のキャパシティが不足していたり、相互認識・共通文化が薄く完成度の高いアウトプットを実現するのが難しいところがある。PARTY のように「産学・業種を超えたスペシャリストで構成されたメンバー」がそういった溝を埋めてくれる可能性は十分にありそうです。また、大学発技術ベンチャーはキャッシュが厳しいので、先程のエクイティと絡ませてデザインファームが支援するというのは大ありですね(リスクあるけど)。

印象深いセッション2 : クリエイティブとブランディングの関係

もともとデザイン出身ではない枌谷さんがどのようにして今の仕事に至り、何を考えてきたのかが垣間見えるセッションでした。

特に印象的だったのは「デザインとビジネスをやる人がいない」というフレーズ。PARTY 伊藤さんの「経営と創造は分離すべき」という言葉と相補的なトピックで、且つ、takram 田川さん進められているデザイン経営とも通じるところがあり面白かったです。

lab.astamuse.co.jp

また、デザイナーのアカウンタビリティや事業との関係性にかかる話も自分の立ち位置を確認する上で非常に有用でした。この辺りの議論は本当によく聞くのですが界隈とは、どうしてもテクニカルな情報に耳目が行きがちなのでもっとマインド変えていかないとなぁとは私も思います。

印象深いセッション3 : 自己組織的な開発チームを如何にして作り上げるか

f:id:astamuse:20200217003928j:plain

タマネギ嫌いに定評のある @kiririmode さんの発表でした。

資料:自己組織的な開発チームを如何にして作り上げるか

アジャイル・スクラムの前提として自己組織化されたチームがありますが、それをどう作り上げているかという発表です。書籍エラスティック・リーダーシップを起点に、@kiririmodeさんの経験を交えて解りやすくまとまっていました。

特に、個々人の相互作用においてのファシリテーターのとらえ方が個人的には刺さりました。なるほどファシリテーションとはそういうことか、と。

全体通して感じたこと

ということで3つほど印象深かったセッション挙げてみました。

他にもテクニカルなセッションをちょいちょい見ましたが、技術トピック系は、 Infrastracture as a Code や DevOps, ML&AI 全盛期よりはだいぶ落ち着いた印象です。やや小康状態なのでしょうね。

更に、参加できなかったセッションの資料も公開されている範囲で全て目を通しましたが、ハイコンテキストなレイヤーでは「越境」の時代が依然続いている印象でした。結局のところ個々の技術が先鋭化する結果として、それをどうオーケストレーションするかが掛け算の肝になるわけで。そのためには一人が全部越境するではなく、個々人が可能な範囲で越境し掛け算を成立させるのが最も効率的なのだと。当座、その方向性は崩れないのでしょう。

では、本日も最後までお読みいただきありがとうございました。

例によって当社では一緒にサービス開発してくれるエンジニア・デザイナー・ディレクターを募集しております。カジュアル面談も随時行っておりますので、「ちょっと話聞きたい」という方は、このブログのサイドバー下にあるアドレスか@YojiShirakiにDMいただければと思います。採用サイトもありますので下の水色のバナーから是非どうぞ!

@YojiShirakiの過去記事)

社内Podcastはじめました〜文字起こし〜形態素解析

ご挨拶

どうもお久しぶりです、gucciです。
入社してついに3年目に突入しました。信じられません。
まだまだ力不足な私ですが、周りの人に支えられてなんとかここまでやって参りました。
新しい仲間もどんどんと増え、これからは支える立場になれるように日々精進して参ります。

さて、今回は社内コミュニケーション活性化プロジェクトそれを使ったある分析をご紹介したいと思います。

以前axtstarさんが書かれた、この記事にあるような社内イベントが昨年の9月に再び開催されました。

lab.astamuse.co.jp

我がJチームが提案し、見事3位に選ばれ実現したプロジェクトが「Podcast」です。
弊社は今どんどんと新しい人がジョインしてきていて、どんどんと成長してきています。
新しい仲間が増えるのは大変嬉しいことですが、新しい仲間が増えると、
「あの人ってどんな人なんだろう、どんな趣味をお持ちなんだろう。」
「あ〜顔はなんとなく覚えたけど名前がすぐ出てこないな〜」
こんなことはよくあると思います。

そんな課題を全て解決してくれるのが、Podcastです。
f:id:astamuse:20200205131922p:plain 内容としては主に「新人さんいらっしゃい」というメインコーナーを据え、
アスタミューゼに入社された新人さんをゲストにお招きし、根掘り葉掘り話を聞くことでその魅力を引き出し、社内コミュニケーションの活性化を目指すという企画です。
(※注:Podcastとは言っていますが、Podcastのアプリから聞けるわけではなくあくまで社内向けに録音した音声ファイルを社内で発信している現状です)

もちろん任意での参加ですし、事前にちょっとした質問票に回答いただくことで、
軽快なトークが持ち味のDJトッキー切れ味鋭いツッコミが秀逸なMCスミィ(どちらも弊社社員です)がゲストの新人さんを丸裸にし、その人の魅力をどんどん引き出していきます。

なるべく手間をかけず良い物を提供するをモットーにしていますので、 マイクを立ててサウンドチェックして…などはせずに、iPhoneのボイスメモアプリで録音!
本番中ちょっと間違えちゃってもリテイクはなし!録音した音源を少し編集したらはい完成!
それでも内容はすごい充実っぷりです。
「面白過ぎてコーヒーを吹いた」「家族と聞いてたら『いい会社だね』と言ってもらえた」などご紹介しきれないぐらいの高評価をこれまでいただいております。

チームメンバーはいろんな部署から集まった個性派集団たちで、とても楽しく活動できております。
そんなチーム内での私の担当は、ミキサー(いわゆる音源編集)をしておりまして、簡単なBGMを入れたりする程度。
ちなみに作業画面はこんな感じ↓↓

f:id:astamuse:20200205111115p:plain
mixing
ゆくゆくは専用のラジオブースを設け、世界へと発信していけたら・・・という野望を抱いております。

さて、せっかく手元に生の音源があるのならばそれを生かして何かできないだろうかということで、
今回は録音音源を用いて、はじめての「文字起こし」はじめての「形態素解析」をやってみたいと思います!
(ちょいちょいハマったポイントがあるので、ハマった箇所は最後にまとめてあります)

文字起こし

今回使ってみるのは、GoogleのAPI「Google Speech-to-Text」です。
機械学習を利用して音声をテキストに変換してくれちゃうというAPIですね。

cloud.google.com

簡単なトライアルもありますのでよかったら試してみてください。
これがなかなか精度が良いとの噂を聞き、使ってみることにしました。

Google Speech-to-Textを使って、Podcastの音源を文字起こしするのにあたりいくつかの不安点がありました。

  1. お金がかかる(API使用などの費用の問題)
  2. iPhoneの内臓マイクで録音しているため、音質がよくない(音源データの音質の問題)
  3. 複数人が同時に喋ることがある(そもそも複数人は対応できないのではという問題)

1は、Google Cloud Platformの無料枠というのが12 か月間 300ドル分あるので、それを使うことで解決!とてもありがたいですね。使い倒してやりましょう。

cloud.google.com

2は録音環境の問題なので現状では改善のしようがないため、ダメだったら仕方がない!
3も同様に、ダメだったら仕方がない!

それでは、レッツ文字起こし。
初めてのGCEのインスタンス作成も、有識者の方々がたくさん記事を挙げてくれているのであまり苦しむことなく無事に作れました。
GCEのインスタンス作成→Google Speech-to-Text APIの有効化→音源ファイルを配置するGCSバケットの作成 といったことをやりました。

Google Speech-to-TextのAPIは、以下の3種類があります。

  • 短い音声ファイルの同期音声認識
  • 長い音声ファイルの非同期音声認識
  • ストリーミング入力のリアルタイム音声認識

今回はPodcast音声が1回あたり約30分なので非同期音声認識「長い音声ファイルの文字変換」を行います。

cloud.google.com

このあたりもネットに良い記事がいくつかあるのでそちらを参考にして、

# !/usr/bin/env python
# coding: utf-8
import argparse
import io
import sys
import codecs
import datetime
import locale

def transcribe_gcs(gcs_uri):
    from google.cloud import speech
    from google.cloud.speech import enums
    from google.cloud.speech import types
    client = speech.SpeechClient()

    audio = types.RecognitionAudio(uri=gcs_uri)
    config = types.RecognitionConfig(
        encoding=enums.RecognitionConfig.AudioEncoding.LINEAR16, # 拡張子wavの設定
        sample_rate_hertz=44100, # 音声ファイルのヘルツ
        language_code='ja-JP') # 日本語の場合

    operation = client.long_running_recognize(config, audio)

    print('Waiting for operation to complete...')
    operationResult = operation.result()

    d = datetime.datetime.today()
    today = d.strftime("%Y%m%d-%H%M%S")
    fout = codecs.open('output{}.txt'.format(today), 'a', 'utf-8')

    for result in operationResult.results:
      for alternative in result.alternatives:
          fout.write(u'{}\n'.format(alternative.transcript))
    fout.close()

if __name__ == '__main__':
    parser = argparse.ArgumentParser(
        description=__doc__,
        formatter_class=argparse.RawDescriptionHelpFormatter)
    parser.add_argument(
        'path', help='GCS path for audio file to be recognized')
    args = parser.parse_args()
    transcribe_gcs(args.path)

いざ!

$ python3 transcribe.py gs://[GCSのバケット名]/[音声データファイル名].wav

10分ほど待って、出力されたファイルをおそるおそる見てみると・・・ ※ハマりポイント1

新人さんいらっしゃいはアスタミューゼに入社された新人さんに根掘り葉掘り話を聞くことでその魅力を引き出し社内コミュニケーションの活性化を目指すオーナーです・・・

キタ━━━(゚∀゚)━━━ !!!!!
最後の部分、正しくは「コーナー」が「オーナー」になっているけど、きてますきてます。
30分のファイルでだいたい変換にかかる時間は10分ちょいってところです。いいですね。

ただ、やはり・・・不安点2と3の予想は的中しました。
変換されたテキストをみると、

  • 抜けている箇所がある
  • 変換がおかしくなっている
  • 複数人で重なって喋っているとゴソッとその部分が抜けている

というのが結構見受けられます。 また、あだ名のような固有名詞はそもそも登録されていないからか、DJトッキーがDJポッキーになったりしていますね。
だいたい文字起こしの精度としては2〜3割程度でしょうか。
元の会話を知っていれば理解できますが、知らないとちょっと何言ってるかわからないという文になっています。

今回来たって言わどうですかねもう秋にスーツ屋さんにしか見えない放射能と三宅のシグマの笑顔が見えのが新人さん

よくわかりませんね。
スーツも放射能も三宅もシグマも話していませんが、このような面白い結果の箇所がありました。

ただそれでもすごい!もっとぐちゃぐちゃになるかと思いました。さすがGoogleさん。
音質にも原因があると思いますので、精度の検証はまた別の機会にさせていただきたく思います。
もっと文字起こしの結果をご紹介したいのですが、パーソナルな内容が多々あるのでご理解くださいませ。
それじゃあ何のために文字起こししたんだい、と言われてしまうと思います。
そこで今回は、主に使われている単語は何だろなチェックをしてみたいと思います!

いわゆる、文章を形態素解析して頻出単語順に並べるってやつです。
これでDJトッキーとMCスミィがよく使う単語が明らかになれば・・・これを学ぶことであなたもラジオパーソナリティになれるかも!?

それではやってみましょう。

形態素解析

私は自然言語処理の知識も何もないど素人ですが、よろしくお願いします!
日本語の形態素解析といえば、そう、MeCabを使います。
MeCabには追加で新語辞書もあるみたいなのでそちらを使いたいと思います。

MeCabのインストール

$ sudo apt-get install mecab libmecab-dev mecab-ipadic mecab-ipadic-utf8

入ったか確認してみよう

$ mecab -h

新語辞書を入れてみよう

$ git clone --depth 1 https://github.com/neologd/mecab-ipadic-neologd.git
$ cd mecab-ipadic-neolog
$ sudo ./bin/install-mecab-ipadic-neologd -n -a

-aオプションで全ての追加辞書をインストール(2GBくらいあるので注意) ※ハマりポイント2

[install-mecab-ipadic-NEologd] : Do you want to install mecab-ipadic-NEologd? Type yes or no.

と聞かれるのでyesと答えてください。

[install-mecab-ipadic-NEologd] : Usage of mecab-ipadic-NEologd is here.
Usage:
    $ mecab -d /usr/lib/x86_64-linux-gnu/mecab/dic/mecab-ipadic-neologd ...

と出ればインストール完了!さっそくやってみましょう!
まずは新語辞書を使わずに普通にmecabコマンド

$ mecab
おはようございます、今日はいい天気ですね #入力文がこちら
おはよう    感動詞,*,*,*,*,*,おはよう,オハヨウ,オハヨー
ござい   助動詞,*,*,*,五段・ラ行特殊,連用形,ござる,ゴザイ,ゴザイ
ます  助動詞,*,*,*,特殊・マス,基本形,ます,マス,マス
、 記号,読点,*,*,*,*,、,、,、
今日  名詞,副詞可能,*,*,*,*,今日,キョウ,キョー
は 助詞,係助詞,*,*,*,*,は,ハ,ワ
いい  形容詞,自立,*,*,形容詞・イイ,基本形,いい,イイ,イイ
天気  名詞,一般,*,*,*,*,天気,テンキ,テンキ
です  助動詞,*,*,*,特殊・デス,基本形,です,デス,デス
ね 助詞,終助詞,*,*,*,*,ね,ネ,ネ
EOS

うおおおおお形態素解析されてるうう。感動です。
続いて新語辞書に最近追加された「鬼滅の刃」という単語を使ってみます。
まずは新語辞書を使わずに・・・

鬼滅の刃が面白いらしいですね
鬼 名詞,一般,*,*,*,*,鬼,オニ,オニ
滅 名詞,一般,*,*,*,*,滅,メツ,メツ
の 助詞,連体化,*,*,*,*,の,ノ,ノ
刃 名詞,一般,*,*,*,*,刃,ハ,ハ
が 助詞,格助詞,一般,*,*,*,が,ガ,ガ
面白い   形容詞,自立,*,*,形容詞・アウオ段,基本形,面白い,オモシロイ,オモシロイ
らしい   助動詞,*,*,*,形容詞・イ段,基本形,らしい,ラシイ,ラシイ
です  助動詞,*,*,*,特殊・デス,基本形,です,デス,デス
ね 助詞,終助詞,*,*,*,*,ね,ネ,ネ
EOS

なるほど。ぶった切られている。 続いて新語辞書を使ってリトライ!

$ mecab -d /usr/lib/x86_64-linux-gnu/mecab/dic/mecab-ipadic-neologd
鬼滅の刃が面白いらしいですね
鬼滅の刃    名詞,固有名詞,一般,*,*,*,鬼滅の刃,キメツノヤイバ,キメツノヤイバ
が 助詞,格助詞,一般,*,*,*,が,ガ,ガ
面白い   形容詞,自立,*,*,形容詞・アウオ段,基本形,面白い,オモシロイ,オモシロイ
らしい   助動詞,*,*,*,形容詞・イ段,基本形,らしい,ラシイ,ラシイ
です  助動詞,*,*,*,特殊・デス,基本形,です,デス,デス
ね 助詞,終助詞,*,*,*,*,ね,ネ,ネ
EOS

すごいやん。ちゃんと一つの固有名詞として解析してくれました。
このように新語辞書は新しい単語を随時追加してくれているようです。ありがたや。
それではpythonでpodcastのテキスト文を形態素解析してみましょう!

python用のmecabライブラリのインストール ※ハマりポイント3

$ pip3 install mecab-python3

第6回配信までのPodcastを文字起こししたものを1つのファイルにまとめて、それをインプットに頻出単語順に並べてみます。

import MeCab
import sys
import re
from collections import Counter

with open("podcast_all.txt") as f: #文字起こししたpodcastデータの読み込み
  podcast = f.read()  

wakati = MeCab.Tagger("-d /usr/lib/x86_64-linux-gnu/mecab/dic/mecab-ipadic-neologd") #新語辞書を適用
parse = wakati.parse(podcast)
lines = parse.split("\n")
items = (re.split("[\t,]", line) for line in lines)

#「EOS」と「空文字」と「ー」以外
words = [item[0] for item in items if (item[0] not in ("EOS", "", "ー"))]

## 標準出力に出力 ##########################
counter = Counter(words)
for word, count in counter.most_common():
  print(f"{word}: {count}")
の: 564
て: 489
に: 338
た: 311
です: 304
で: 278
は: 275
が: 256
と: 232
ね: 224
・
・
・

うおおおお・・・おぉぉ?ようわからんですね。
品詞の指定ができるのでしてみましょう。
名詞を指定し、さらに名詞の中の「一般」を指定してみます。以下に書き換えてもう一度。

#「EOS」と「空文字」と「ー」以外の「名詞」で「一般」のもの
words = [item[0] for item in items if (item[0] not in ("EOS", "", "ー") and item[1] == "名詞" and item[2] == "一般")]
人: 27
ゲスト: 25
皆さん: 24
番組: 24
コーナー: 17
新人: 15
自分: 15
魅力: 12
感じ: 12
家: 12
会社: 12
楽しみ: 9
社内: 9
次: 9
気: 9
感想: 9
コーヒー: 8
地元: 8
人生: 8
最後: 8
店: 8
理由: 7
気持ち: 7
音: 7
いらっしゃい: 6
コミュニケーション: 6
曲: 6
アスタミューゼ: 6
・
・
・

求めてやつ!そうそうこんなのが欲しかった! ついにたどり着きました。
せっかくなのでもう一歩だけ踏み込みます。

形態素解析して頻出単語を出せたので、それを使ってワードクラウドを作ってみましょう。

ワードクラウド とは
文章中で出現頻度が高い単語を複数選び出し、その頻度に応じた大きさで図示する手法。 ウェブページやブログなどに頻出する単語を自動的に並べることなどを指す。 文字の大きさだけでなく、色、字体、向きに変化をつけることで、文章の内容をひと目で印象づけることができる。

事前準備として、形態素解析した結果をテキストファイルに出しておきましょう。
wordcloudは文字列をスペース区切りで分割してカウントしていくようなので、さきほどの最後のコードを以下に書き換えて出力しておきます。

## ファイルに出力 #########################
out_path = "./wakati.txt"
with open(out_path, mode='w') as f:
  f.write(" ".join(words))

それではいざワードクラウドへ。
pythonならwordcloudのライブラリをインストールすれば簡単にできちゃいます。
(最終的にはpngファイルで出力するのでここからはmacで作業しています)

$ pip3 install wordcloud
from wordcloud import WordCloud

with open("wakati.txt") as f:  #形態素解析したデータを読み込む
  podcast = f.read() 

wordcloud = WordCloud(background_color="white",
    font_path="/System/Library/Fonts/ヒラギノ角ゴシック W3.ttc",
    width=800,height=600, colormap="jet", regexp=r"[\w']+").generate(podcast)

wordcloud.to_file("[お好きなパスにどうぞ]/wordcloud.png")

wordcloudは様々な設定ができるそうです。
フォントのパスはお使いのPCの日本語フォントのパスを指定してください。
幅や高さ、colormapまで変えられます。
wordcloudのデフォルトだと1文字はスキップしてしまうので、regexp=r"[\w']+"と指定しています。

そして出力した結果がこちら・・・

f:id:astamuse:20200205120408p:plain
podcast_wordcloud
とても見やすい。とてもそれっぽい!

ついにここまでたどり着きました。
我が社のPodcastは「人」という単語が一番登場するということが可視化されましたね。
やはり「人から着火しよう」というアスタミューゼの行動指針が根付いていますね。
さすがはDJトッキーとMCスミィです。

まとめ

いかがだったでしょうか。
文字起こしと形態素解析をはじめて挑戦してみましたが、とっても楽しいですね!
私のようにはじめてでも簡単にアウトプットを出すことができました。
GoogleAPIのすごさとpythonライブラリの素晴らしさを実感しました。
簡単に分析ができる環境が整っているからこそ、何をどう分析するのかが大事ということですね。

最後に、アスタミューゼではアプリエンジニア、デザイナー、プロダクトマネージャー、データエンジニア、機械学習エンジニアなどなど絶賛大募集中です! どしどしご応募ください!お待ちしております!

最後までお読みいただき、ありがとうございました!


ハマったポイント
ポイント1:モノラル音源データじゃないとだめ

ステレオ音源を文字起こししようとすると、

google.api_core.exceptions.InvalidArgument: 400 Must use single channel (mono) audio, but WAV header indicates 2 channels.

のエラーが基本的にはでます。
ただ、30分のpodcastデータを最初ステレオのまま文字起こししようとスクリプトを実行したら特にエラーがでることもなく 変換ファイルが出力されました。

ゲンタシン軟膏
ごめんね
真実への鍵をポストに入れて欲しい
シュメール文明
1070人組のソフト女の子
普通38中古
東照宮の行き方
香港ランニングウェア
ナガンヌ島シュノーケル
視聴率ランキング


文にすらなっていない・・・しかも、まったくこんな話していないのです。
「終わった。他にブログネタない、どうしよう。」と大慌てになったのはいい思い出です。
モノラル音源に変換して文字起こしをし直したところ、想定通りの結果になり一安心でした。
ある程度長いファイルになるとエラーが返せないのですかね?このあたりは謎でした。

ポイント2:メモリ不足

GCEの無料インスタンスであるfi-microではメモリ不足になりました。
g1-smallでもだめ。
n1-standard-1でようやくインストールできました。
2GBほどはメモリが必要ということですね。

ポイント3:swigが入ってないよ

error: command 'swig' failed with exit status 1

とでたら、

$ sudo apt install swig

と叩いてswigをインストールしてあげてください。


とっても参考になったURLたち
GCE無料枠での設定

これから始めるGCP(GCE) 安全に無料枠を使い倒せ - Qiita
GCE の無料枠のサーバを立るときに、初見でハマりそうなところ - Qiita

文字起こし

Google Cloud Speech API を使った音声の文字起こし手順 - Qiita
Google APIを使って音声ファイルを自動でテキストに文字起こしする - Qiita
Google Cloud Speech-to-Textのために音声をいろいろ加工して解析させてみた | ハックノート

MeCab

【Python3】MeCabでテキストファイルから名詞を頻出順に抽出 - Qiita
MecabをPythonで使うまで - Qiita
[Python]MeCabで誰でも簡単に分かち書きをする方法 | エンジニアの眠れない夜

WordCloud

Word Cloudで文章の単語出現頻度を可視化する。[Python] - Qiita
Pythonでテキストマイニング ②Word Cloudで可視化 - Qiita
Python - Word Cloud を作成する方法について - Pynote
【日本語文章の表示サンプルあり】Pythonのワードクラウドで使えるカラーマップ一覧✰ matplotlib/WordCloud✰ - なろう分析記録

Akka HTTP の Graceful な Shutdown の方法 または私は如何にして OSS のコードを読んでいるか

どうもみなさまおはこんばんちわー。アプリケーションエンジニアの池田 (@yukung) です。3 度目の登場になります。

2020 年始まりましたね!時が経つのは早いもので私も入社してはや 1 年が経ちましたが、最近はエンジニアだけに限らず、様々な分野で専門を持つ方々が弊社に入社してきてくださっていて、そんな方々の自己紹介を聞くたびに、世の中には知らない世界がまだまだたくさんあるなぁ、と思う刺激的な毎日です。

2020 年の心配事は、オリンピックによる混雑ですねぇ。あまりに混雑するようなら、どうせなら期間中はリモートワークで切り抜けようかな、とか密かに思っています。(弊社には宮崎からリモートワークで働いてるエンジニアもいます)

lab.astamuse.co.jp

Akka HTTP はいいぞ

f:id:astamuse:20200124182009p:plain

さて、今回は何を書こうかと考えた時に、前々回前回と技術的なことを書いていなかったことを思い出して、今回は素直に技術に関することを書くことにします。

弊社では主に Scala を使ったアプリケーション開発を行っていますが、 Web アプリケーションの開発は Play Framework の他に、 Akka HTTP を使うプロジェクトもちらほら出てきました。私自身も最近初めて Akka HTTP を用いて新規のアプリケーションを構築しましたが、画面を伴ったアプリケーションではなく、 RESTish な API を作るのであれば、 Akka HTTP でも十分開発はできるなぁ、という感触を持ちました。

Akka HTTP は、公式ドキュメントに「 Web アプリケーションフレームワークではなく HTTP ベースのサービスを提供するより汎用的なツールキットである」と記載がある通り、 Play Framework のようなフルスタックなフレームワークではないため、初めから色々揃っているものではありません。ただしその分解決したい問題に対して自分で使う道具を取捨選択できるため、いわゆる「薄い」フレームワークのようなものを好む方にとってはこちらの方が肌に合うように感じます。1

実際使って作ってみた感触としては、「書いたようにしか動かない」という感覚を持ちました。これはどちらかというとポジティブな感覚で、要はハマっても予測可能である、ということです。エラーが出ても自分がそう書いていないからエラーになるのであって、エラーの内容とスタックトレースを元に地道に調べれば必ず答えはそこにあります。個人の感覚ですが、実際他のフレームワークと比べて、フレームワーク固有のお作法などによる原因の分からない問題に振り回されることは少なかった気がしています。

以下はサーバを起動させる部分のコードの最小のサンプルです。公式ドキュメントからの転載となりますが、

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import scala.io.StdIn

object WebServer {
  def main(args: Array[String]) {

    implicit val system = ActorSystem("my-system")
    implicit val materializer = ActorMaterializer()
    // needed for the future flatMap/onComplete in the end
    implicit val executionContext = system.dispatcher

    val route =
      path("hello") {
        get {
          complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, "<h1>Say hello to akka-http</h1>"))
        }
      }

    val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)

    println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
    StdIn.readLine() // let it run until user presses return
    bindingFuture
      .flatMap(_.unbind()) // trigger unbinding from the port
      .onComplete(_ => system.terminate()) // and shutdown when done
  }
}

これだけで、 /hello エンドポイントからレスポンスが返ってくるサーバが立ち上がります。また main メソッドを持つ単なる Scala プログラムですので、ソースコードを追う際もエントリポイントが明確なため調査がしやすいところも気に入っています。

Akka HTTP の Graceful Shutdown ってどうやるの?

そんな Akka HTTP について、サーバを起動させることは簡単なのですが、サーバの終了処理について少し悩みました。上記のサンプルのように基本的には akka.actor.ActorSystem#terminate() を呼び出せば、起動したサーバは停止します。が、これは起動中のプロセスに対して割り込みで停止させるため、通信中のコネクションも問答無用で切断します。いわゆる Graceful shutdown ではありません。

昨今は Docker といったコンテナベースのアプリケーション運用も多くなってきていますし、そうなるとカジュアルにサーバプロセスが停止することもままあります。そんな状況においては、サーバ終了時はアプリケーションに依存しているリソース( DB や接続中の HTTP コネクションなど)は丁寧に解放してから、サーバを終了したくなってきます。では、 Akka HTTP で Graceful shutdown を実現するにはどうしたらよいのでしょうか。

Akka HTTP の公式ドキュメントには、 Graceful termination についてのページがあり、こちらに割と詳細に記載があります。ここではサーバを終了させる方法として 2 つ紹介されています。

これらどちらも、有効な HTTP コネクションを終了させるには有効な方法です。が、その他にもサーバの終了処理では DB などのバックエンドとの通信を閉じるなどリソースの終了処理を挟み込みたくなるのですが、その方法については記載がありませんでした。理想的には、 SIGTERM を送ると Graceful shutdown を実行するように、 JVM の ShutdownHook を仕込むような形にしたいのです。

そこで、自前で JVM の ShutdownHook を挟み込んでみました。

// (途中まで省略)

val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)
indingFuture.onComplete {
  case Success(binding) =>
    system.log.info(s"Server online at http://\${binding.localAddress.getHostName}:\${binding.localAddress.getPort}/")
  case Failure(ex) =>
    system.log.error(ex, "An error has occurred!")
}
sys.addShutdownHook {
  bindingFuture
    .flatMap(_.unbind())
    .onComplete { _ =>
      materializer.shutdown()
      system.terminate()
    }
}

この実装で、サーバに対して SIGTERM を送ってみるとプロセスは終了してくれるものの、 debug ログを見るとやはり途中でバッサリ切れているように見えました。

[DEBUG] [07/26/2019 12:48:58.015] [main] [EventStream(akka://app-server)] logger log1-Logging$DefaultLogger started
[DEBUG] [07/26/2019 12:48:58.016] [main] [EventStream(akka://app-server)] Default Loggers started
7月 26, 2019 12:48:59 午後 wvlet.log.Logger log
情報: [session:58015e56] Starting a new lifecycle ...
7月 26, 2019 12:48:59 午後 wvlet.log.Logger log
情報: [session:58015e56] ======== STARTED ========
[DEBUG] [07/26/2019 12:48:59.597] [main] [AkkaSSLConfig(akka://app-server)] Initializing AkkaSSLConfig extension...
[DEBUG] [07/26/2019 12:48:59.599] [main] [AkkaSSLConfig(akka://app-server)] buildHostnameVerifier: created hostname verifier: com.typesafe.sslconfig.ssl.DefaultHostnameVerifier@5f0bab7e
7月 26, 2019 12:48:59 午後 wvlet.log.Logger log
情報: [session:58015e56] Stopping the lifecycle ...
7月 26, 2019 12:48:59 午後 wvlet.log.Logger log
情報: [session:58015e56] The lifecycle has stopped.
[DEBUG] [07/26/2019 12:48:59.915] [app-server-akka.actor.default-dispatcher-3] [akka://app-server/system/IO-TCP/selectors/$a/0] Successfully bound to /0:0:0:0:0:0:0:0:9000
[INFO] [07/26/2019 12:48:59.927] [app-server-akka.actor.default-dispatcher-3] [SearchServer(akka://app-server)] Server online at http://0:0:0:0:0:0:0:0:9000/
[DEBUG] [07/26/2019 12:49:01.471] [app-server-akka.actor.default-dispatcher-5] [akka://app-server/system/IO-TCP/selectors/$a/0] New connection accepted

Process finished with exit code 143 (interrupted by signal 15: SIGTERM)

これでは Graceful shutdown にはなりません。

公式ドキュメントの Graceful termination のページには、もう一つ Akka の Coordinated Shutdown についての記載があります。ただし、これについては Akka HTTP ではまだ未実装という記載があるのみですが、 issue へのリンク (#1210) があるのでこれを見てみると、 issue のやり取りの中で CoordinatedShutdown#addTask を Akka の各終了フェーズ (CoordinatedShutdown のコンパニオンオブジェクト内に定義があります) ごとに任意のタスクを挟み込むことができることが分かりました。

以下のようなコードでそれが実現できます。

val shutdown = CoordinatedShutdown(system)

shutdown.addTask(CoordinatedShutdown.PhaseServiceUnbind, "http-unbind") { () =>
  binding.unbind().map(_ => Done)
}

shutdown.addTask(CoordinatedShutdown.PhaseServiceRequestsDone, "http-graceful-terminate") { () =>
  binding.terminate(10.seconds).map(_ => Done)
}

shutdown.addTask(CoordinatedShutdown.PhaseServiceStop, "http-shutdown") { () =>
  Http().shutdownAllConnectionPools().map(_ => Done)
}

これによって、新しい HTTP 接続を拒否し、現存している HTTP 接続の終了を待ってから、バックエンドリソースへの接続を終了する処理を挟み込むことができそうです。この方針で終了処理を実装してログを見てみました。

shutdown.addTask(CoordinatedShutdown.PhaseServiceUnbind, "http-unbind") { () => bindingFuture.map(_.unbind()).map(_ => Done)}

shutdown.addTask(CoordinatedShutdown.PhaseServiceRequestsDone, "http-graceful-terminate") { () => bindingFuture.map(_.terminate(10.seconds)).map(_ => Done)}

shutdown.addTask(CoordinatedShutdown.PhaseServiceStop, "http-shutdown") { () => Http(system).shutdownAllConnectionPools().map(_ => Done)}

shutdown.addTask(CoordinatedShutdown.PhaseActorSystemTerminate, "app-shutdown") { () => system.terminate().map(_ => Done)}
7月 26, 2019 12:35:02 午後 wvlet.log.Logger log
情報: [session:58015e56] Stopping the lifecycle ...
7月 26, 2019 12:35:02 午後 wvlet.log.Logger log
情報: [session:58015e56] The lifecycle has stopped.
[DEBUG] [07/26/2019 12:35:02.883] [app-server-akka.actor.default-dispatcher-5] [akka://app-server/system/IO-TCP/selectors/$a/0] Successfully bound to /0:0:0:0:0:0:0:0:9000
[INFO] [07/26/2019 12:35:02.893] [app-server-akka.actor.default-dispatcher-5] [SearchServer(akka://app-server)] Server online at http://0:0:0:0:0:0:0:0:9000/
[DEBUG] [07/26/2019 12:35:09.445] [app-server-akka.actor.default-dispatcher-2] [akka://app-server/system/IO-TCP/selectors/$a/0] New connection accepted
[DEBUG] [07/26/2019 12:35:17.835] [app-server-shutdown-hook-1] [CoordinatedShutdown(akka://app-server)] Starting coordinated shutdown from JVM shutdown hook
[DEBUG] [07/26/2019 12:35:17.837] [app-server-shutdown-hook-1] [CoordinatedShutdown(akka://app-server)] Performing phase [before-service-unbind] with [0] tasks
[DEBUG] [07/26/2019 12:35:17.839] [app-server-akka.actor.default-dispatcher-3] [CoordinatedShutdown(akka://app-server)] Performing phase [service-unbind] with [1] tasks: [http-unbind]
[DEBUG] [07/26/2019 12:35:17.845] [app-server-akka.actor.default-dispatcher-5] [akka://app-server/system/IO-TCP/selectors/$a/0] Unbinding endpoint 0:0:0:0:0:0:0:0/0:0:0:0:0:0:0:0:9000
[DEBUG] [07/26/2019 12:35:17.845] [app-server-akka.actor.default-dispatcher-3] [CoordinatedShutdown(akka://app-server)] Performing phase [service-requests-done] with [1] tasks: [http-graceful-terminate]
[DEBUG] [07/26/2019 12:35:17.847] [app-server-akka.actor.default-dispatcher-3] [CoordinatedShutdown(akka://app-server)] Performing phase [service-stop] with [1] tasks: [http-shutdown]
[DEBUG] [07/26/2019 12:35:17.848] [app-server-akka.actor.default-dispatcher-5] [akka://app-server/system/IO-TCP/selectors/$a/0] Unbound endpoint 0:0:0:0:0:0:0:0/0:0:0:0:0:0:0:0:9000, stopping listener
[DEBUG] [07/26/2019 12:35:17.850] [app-server-akka.actor.default-dispatcher-3] [CoordinatedShutdown(akka://app-server)] Performing phase [before-cluster-shutdown] with [0] tasks
[DEBUG] [07/26/2019 12:35:17.850] [app-server-akka.actor.default-dispatcher-3] [CoordinatedShutdown(akka://app-server)] Performing phase [cluster-sharding-shutdown-region] with [0] tasks
[DEBUG] [07/26/2019 12:35:17.851] [app-server-akka.actor.default-dispatcher-3] [CoordinatedShutdown(akka://app-server)] Performing phase [cluster-leave] with [0] tasks
[DEBUG] [07/26/2019 12:35:17.851] [app-server-akka.actor.default-dispatcher-3] [CoordinatedShutdown(akka://app-server)] Performing phase [cluster-exiting] with [0] tasks
[DEBUG] [07/26/2019 12:35:17.851] [app-server-akka.actor.default-dispatcher-3] [CoordinatedShutdown(akka://app-server)] Performing phase [cluster-exiting-done] with [0] tasks
[DEBUG] [07/26/2019 12:35:17.851] [app-server-akka.actor.default-dispatcher-3] [CoordinatedShutdown(akka://app-server)] Performing phase [cluster-shutdown] with [0] tasks
[DEBUG] [07/26/2019 12:35:17.851] [app-server-akka.actor.default-dispatcher-3] [CoordinatedShutdown(akka://app-server)] Performing phase [before-actor-system-terminate] with [0] tasks
[DEBUG] [07/26/2019 12:35:17.851] [app-server-akka.actor.default-dispatcher-3] [CoordinatedShutdown(akka://app-server)] Performing phase [actor-system-terminate] with [2] tasks: [terminate-system, app-shutdown]
[DEBUG] [07/26/2019 12:35:17.856] [app-server-akka.actor.default-dispatcher-7] [GracefulTerminatorStage(akka://app-server)] [terminator] Initializing termination of server, deadline: 10.00 s
[DEBUG] [07/26/2019 12:35:17.859] [app-server-akka.actor.default-dispatcher-9] [EventStream] shutting down: StandardOutLogger

Process finished with exit code 143 (interrupted by signal 15: SIGTERM)

うまく終了フェーズごとにタスクが実行されている様が確認できますね。

本当にこれでいいんだっけ?

ここまで見てきた方法で Graceful shutdown が実装できそうな気がしてきましたが、まだ確信が持てなかったので、もうちょっと当たりをつけたく、直接ソースコードも見に行きました。

確認したのは以下の 2 つです。

  • akka.actor.ActorSystem の終了処理部分
  • Play Framework の Akka バックエンドの終了処理
    • Play Framework も内部で Akka HTTP を使っているので同じようなことをしているはず、という予測を立てました

akka.actor.ActorSystem の終了処理部分を見てみる

ActorSystem のコードを読んでみて知りましたが、実は、 ActorSystem のインスタンスが作られるときには、 JVM の ShutdownHook に ActorSystem の終了処理が追加されています。CoordinatedShutdown#createExtension にて、終了処理のフックを登録しています。

https://github.com/akka/akka/blob/master/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala

  override def createExtension(system: ExtendedActorSystem): CoordinatedShutdown = {
    val conf = system.settings.config.getConfig("akka.coordinated-shutdown")
    val phases = phasesFromConfig(conf)
    val coord = new CoordinatedShutdown(system, phases)
    initPhaseActorSystemTerminate(system, conf, coord)
    initJvmHook(system, conf, coord)
    // Avoid leaking actor system references when system is terminated before JVM is #23384
    // Catching RejectedExecutionException in case extension is accessed first time when
    // system is already terminated, see #25592. The extension is eagerly loaded when ActorSystem
    // is started but it might be a race between (failing?) startup and shutdown.
    def cleanupActorSystemJvmHook(): Unit = {
      coord.actorSystemJvmHook match {
        case OptionVal.Some(cancellable) if !runningJvmHook && !cancellable.isCancelled =>
          cancellable.cancel()
          coord.actorSystemJvmHook = OptionVal.None
        case _ =>
      }
    }
    try system.registerOnTermination(cleanupActorSystemJvmHook())
    catch {
      case _: RejectedExecutionException => cleanupActorSystemJvmHook()
    }
    coord
  }

CoordinatedShutdown#initPhaseActorSystemTerminate の実装は以下のようになっています。

  private def initPhaseActorSystemTerminate(
      system: ExtendedActorSystem,
      conf: Config,
      coord: CoordinatedShutdown): Unit = {
    coord.addTask(PhaseActorSystemTerminate, "terminate-system") { () =>
      val confForReason = confWithOverrides(conf, coord.shutdownReason())
      val terminateActorSystem = confForReason.getBoolean("terminate-actor-system")
      val exitJvm = confForReason.getBoolean("exit-jvm")
      val exitCode = confForReason.getInt("exit-code")

      if (exitJvm && terminateActorSystem) {
        // In case ActorSystem shutdown takes longer than the phase timeout,
        // exit the JVM forcefully anyway.
        // We must spawn a separate thread to not block current thread,
        // since that would have blocked the shutdown of the ActorSystem.
        val timeout = coord.timeout(PhaseActorSystemTerminate)
        val t = new Thread {
          override def run(): Unit = {
            if (Try(Await.ready(system.whenTerminated, timeout)).isFailure && !runningJvmHook)
              System.exit(exitCode)
          }
        }
        t.setName("CoordinatedShutdown-exit")
        t.start()
      }

      if (terminateActorSystem) {
        system.finalTerminate()
        system.whenTerminated.map { _ =>
          if (exitJvm && !runningJvmHook) System.exit(exitCode)
          Done
        }(ExecutionContexts.sameThreadExecutionContext)
      } else if (exitJvm) {
        System.exit(exitCode)
        Future.successful(Done)
      } else
        Future.successful(Done)
    }
  }

`CoordinatedShutdown#initJvmHook は以下のような実装になっています。

  private def initJvmHook(system: ActorSystem, conf: Config, coord: CoordinatedShutdown): Unit = {
    val runByJvmShutdownHook = system.settings.JvmShutdownHooks && conf.getBoolean("run-by-jvm-shutdown-hook")
    if (runByJvmShutdownHook) {
      coord.actorSystemJvmHook = OptionVal.Some(coord.addCancellableJvmShutdownHook {
        runningJvmHook = true // avoid System.exit from PhaseActorSystemTerminate task
        if (!system.whenTerminated.isCompleted) {
          coord.log.debug("Starting coordinated shutdown from JVM shutdown hook")
          try {
            // totalTimeout will be 0 when no tasks registered, so at least 3.seconds
            val totalTimeout = coord.totalTimeout().max(3.seconds)
            Await.ready(coord.run(JvmExitReason), totalTimeout)
          } catch {
            case NonFatal(e) =>
              coord.log.warning("CoordinatedShutdown from JVM shutdown failed: {}", e.getMessage)
          }
        }
      })
    }
  }

initPhaseActorSystemTerminateActorSystem に対する終了処理が、 initJvmHook で JVM の終了処理が登録されます。これによって先に試した自前で JVM の ShutdownHook で終了させようとしても、CoordinatedShutdown が登録したフックで、タイミングによっては ActorSystem が先に終了してしまい自前の処理が実行される前に JVM が終了してしまうようだ、ということが分かりました。

ということは HTTP 接続が全て開放され、 ActorSystem が終了する直前、 つまりCoordinatedShutdown#PhaseBeforeActorSystemTerminate フェーズで各種リソースの開放処理をすれば良さそうだ、ということが分かります。

実際に、 CoordinatedShutdown#PhaseBeforeActorSystemTerminate フェーズで終了の確認をしてみました。

  CoordinatedShutdown(system).addTask(CoordinatedShutdown.PhaseBeforeActorSystemTerminate, "app-shutdown") { () =>
    implicit val ec = system.dispatcher
    bindingFuture
      .flatMap(_.unbind())
      .onComplete { _ =>
        system.log.info("ちゃんと止まる?")
      }
    Future.successful(Done)
  }
7月 26, 2019 12:36:26 午後 wvlet.log.Logger log
情報: [session:4d3c6593] Stopping the lifecycle ...
7月 26, 2019 12:36:26 午後 wvlet.log.Logger log
情報: [session:4d3c6593] The lifecycle has stopped.
[DEBUG] [07/26/2019 12:36:26.053] [app-server-akka.actor.default-dispatcher-2] [akka://app-server/system/IO-TCP/selectors/$a/0] Successfully bound to /0:0:0:0:0:0:0:0:9000
[INFO] [07/26/2019 12:36:26.065] [app-server-akka.actor.default-dispatcher-2] [SearchServer(akka://app-server)] Server online at http://0:0:0:0:0:0:0:0:9000/
[DEBUG] [07/26/2019 12:36:30.507] [app-server-akka.actor.default-dispatcher-2] [akka://app-server/system/IO-TCP/selectors/$a/0] New connection accepted
[DEBUG] [07/26/2019 12:36:40.595] [app-server-shutdown-hook-1] [CoordinatedShutdown(akka://app-server)] Starting coordinated shutdown from JVM shutdown hook
[DEBUG] [07/26/2019 12:36:40.596] [app-server-shutdown-hook-1] [CoordinatedShutdown(akka://app-server)] Performing phase [before-service-unbind] with [0] tasks
[DEBUG] [07/26/2019 12:36:40.598] [app-server-akka.actor.default-dispatcher-4] [CoordinatedShutdown(akka://app-server)] Performing phase [service-unbind] with [0] tasks
[DEBUG] [07/26/2019 12:36:40.598] [app-server-akka.actor.default-dispatcher-4] [CoordinatedShutdown(akka://app-server)] Performing phase [service-requests-done] with [0] tasks
[DEBUG] [07/26/2019 12:36:40.598] [app-server-akka.actor.default-dispatcher-4] [CoordinatedShutdown(akka://app-server)] Performing phase [service-stop] with [0] tasks
[DEBUG] [07/26/2019 12:36:40.598] [app-server-akka.actor.default-dispatcher-4] [CoordinatedShutdown(akka://app-server)] Performing phase [before-cluster-shutdown] with [0] tasks
[DEBUG] [07/26/2019 12:36:40.598] [app-server-akka.actor.default-dispatcher-4] [CoordinatedShutdown(akka://app-server)] Performing phase [cluster-sharding-shutdown-region] with [0] tasks
[DEBUG] [07/26/2019 12:36:40.598] [app-server-akka.actor.default-dispatcher-4] [CoordinatedShutdown(akka://app-server)] Performing phase [cluster-leave] with [0] tasks
[DEBUG] [07/26/2019 12:36:40.598] [app-server-akka.actor.default-dispatcher-4] [CoordinatedShutdown(akka://app-server)] Performing phase [cluster-exiting] with [0] tasks
[DEBUG] [07/26/2019 12:36:40.598] [app-server-akka.actor.default-dispatcher-4] [CoordinatedShutdown(akka://app-server)] Performing phase [cluster-exiting-done] with [0] tasks
[DEBUG] [07/26/2019 12:36:40.598] [app-server-akka.actor.default-dispatcher-4] [CoordinatedShutdown(akka://app-server)] Performing phase [cluster-shutdown] with [0] tasks
[DEBUG] [07/26/2019 12:36:40.599] [app-server-akka.actor.default-dispatcher-4] [CoordinatedShutdown(akka://app-server)] Performing phase [before-actor-system-terminate] with [1] tasks: [app-shutdown]
[DEBUG] [07/26/2019 12:36:40.604] [app-server-akka.actor.default-dispatcher-5] [akka://app-server/system/IO-TCP/selectors/$a/0] Unbinding endpoint 0:0:0:0:0:0:0:0/0:0:0:0:0:0:0:0:9000
[DEBUG] [07/26/2019 12:36:40.605] [app-server-akka.actor.default-dispatcher-4] [CoordinatedShutdown(akka://app-server)] Performing phase [actor-system-terminate] with [1] tasks: [terminate-system]
[DEBUG] [07/26/2019 12:36:40.608] [app-server-akka.actor.default-dispatcher-2] [akka://app-server/system/IO-TCP/selectors/$a/0] Unbound endpoint 0:0:0:0:0:0:0:0/0:0:0:0:0:0:0:0:9000, stopping listener
[INFO] [07/26/2019 12:36:40.609] [app-server-akka.actor.default-dispatcher-9] [akka.actor.ActorSystemImpl(app-server)] ちゃんと止まる?
[DEBUG] [07/26/2019 12:36:40.611] [app-server-akka.actor.default-dispatcher-3] [EventStream] shutting down: StandardOutLogger

Process finished with exit code 143 (interrupted by signal 15: SIGTERM)

良さそうですね。

Play Framework の Akka バックエンドの終了処理を見てみる

実は Play Framework にも、 Coordinated Shutdown について記載されているページがあります。バックエンドが Akka HTTP ですので当たり前といえば当たり前ですね。

このページによれば、 Play Framework は CoordinatedShutdown を利用はしているが、完全には依存していないとも記載があります。Play Framework には CoordinatedShutdown の他に従来の ApplicationLifecycleでの終了の方法もあり、移行も可能だそうです。

実際に、ソースコードも確認してみました。 Play Framework で Akka HTTP のサーバを使っているのは、 play.core.server.AkkaHttpServer クラスです。

https://github.com/playframework/playframework/blob/master/transport/server/play-akka-http-server/src/main/scala/play/core/server/AkkaHttpServer.scala

// play.core.server.AkkaHttpServer
/**
 * Starts a Play server using Akka HTTP.
 */
class AkkaHttpServer(context: AkkaHttpServer.Context) extends Server {

  registerShutdownTasks()
  private def registerShutdownTasks(): Unit = {

    implicit val exCtx: ExecutionContext = context.actorSystem.dispatcher

    // Register all shutdown tasks
    val cs = CoordinatedShutdown(context.actorSystem)
    cs.addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "trace-server-stop-request") { () =>
      mode match {
        case Mode.Test =>
        case _         => logger.info("Stopping server...")
      }
      Future.successful(Done)
    }

    // Stop listening.
    // TODO: this can be improved so unbind is deferred until `service-stop`. We could
    // respond 503 in the meantime.
    cs.addTask(CoordinatedShutdown.PhaseServiceUnbind, "akka-http-server-unbind") { () =>
      def unbind(binding: Option[Http.ServerBinding]): Future[Done] =
        binding.map(_.unbind()).getOrElse(Future.successful(Done))

      for {
        _ <- unbind(httpServerBinding)
        _ <- unbind(httpsServerBinding)
      } yield Done
    }

    // Call provided hook
    // Do this last because the hooks were created before the server,
    // so the server might need them to run until the last moment.
    cs.addTask(CoordinatedShutdown.PhaseBeforeActorSystemTerminate, "user-provided-server-stop-hook") { () =>
      context.stopHook().map(_ => Done)
    }
    cs.addTask(CoordinatedShutdown.PhaseBeforeActorSystemTerminate, "shutdown-logger") { () =>
      Future {
        super.stop()
        Done
      }
    }

  }

Play Framework バックエンドの AkkaHttpServer でも、 ActorSystem 停止前のフェーズCoordinatedShutdown.PhaseBeforeActorSystemTerminate でユーザー定義の停止時のフックを実行するような実装になっています。

ここまでの確認で、このタイミングで Graceful shutdown の処理を実行すればよい、ということに自信を持つことができました。

CoordinatedShutdown についての補足

なお、 CoordinatedShutdown の振る舞い自体は設定である程度変えることができますCoordinatedShutdown 自体は Akka HTTP の仕組みというよりは主に akka-cluster のための機能のようなので、設定に関する情報も Akka HTTP のドキュメントではなく Akka Actors 本体のドキュメントに記載があります。

ここの設定で、CoordinatedShutdown が登録する JVM の ShutdownHook を無効にしたり、 ActorSystem の終了とともに JVM 自体の終了するデフォルトの振る舞いを止めて、 ActorSystem だけを停止し JVM のみ生かしたままにする、といった振る舞いにすることもできます。

実際に私が新規に構築したアプリケーションでは、 Akka HTTP に加えて DI ライブラリとして Airframe DI を利用しました。 Airframe DI の有効な Session もサーバの停止時に停止したかったため、CoordinatedShutdownの終了処理の外側で Airframe の Session を停止するために、 独自定義の JVM の ShutdownHook を登録する必要がありました。そこで ActorSystem が登録する ShutdownHook は無効にし、自前で ShutdownHook を登録してサーバ停止するように実装しました。

具体的には以下のような実装になりました。該当箇所だけ抜粋して抜粋します。

  def run(host: String, port: Int, config: Config, system: ActorSystem) = {
    val app = pureconfig.loadConfigOrThrow[AppSettings](config, "app")
    val design = newDesign
      .bind[ActorSystem].toInstance(system)
      .bind[ExecutionContext].toInstance(system.dispatcher)
      .add(configure(app))

    // Disable the airframe shutdown hook, as it will conflict with the akka-http shutdownhook
    val session = design.newSessionBuilder.noShutdownHook.create
    // Starting a dependency injection enabled scope by airframe
    session.start

    val server = session.build[AppServer].start(host, port, config, settings = ServerSettings(system))
    // Manually register a shutdown hook for stopping the server
    // The reason is that we want to stop backend data storage and DI container after stopping akka-http server.
    sys.addShutdownHook {
      server.stop()
    }
    server
  }

// server#start(), server#stop()
  def start(host: String, port: Int, config: Config, settings: ServerSettings): Server = {
    val bindingFuture = Http().bindAndHandle(handler = routes, interface = host, port = port, settings = settings)
    val bindTimeout = Duration(config.getString("akka.http.server.bind-timeout"))
    val serverBinding = Await.result(bindingFuture, bindTimeout)
    log.info(s"Server online at http://${serverBinding.localAddress.getHostName}:${serverBinding.localAddress.getPort}/")

    // Add task for graceful shutdown, and stop backend data storage
    registerShutdownTasks(serverBinding)
    this
  }

  def stop(): Unit = {
    val shutdownTimeout = CoordinatedShutdown(system).totalTimeout() + Duration(5, TimeUnit.SECONDS)
    Await.result(CoordinatedShutdown(system).run(JvmExitReason), shutdownTimeout)
  }
// application.conf
akka {
  jvm-shutdown-hooks = off      // CoordinatedShutdown による ShutdownHook の追加を止める
  coordinated-shutdown {
    terminate-actor-system = on // ActorSystem は停止する
    exit-jvm = off              // ActorSystem と同時に JVM の停止するのは止める
  }
}

まとめ

Akka HTTP での Graceful shutdown の実装を行うにあたり、調査したことをまとめてみました。Akka HTTP は薄い HTTP Toolkit ライブラリという位置づけもあって何から何までよしなにやってくれるフレームワークではありませんが、その分ソースコードを読めば何をやっているか把握しやすく、ハマった時に対処しやすいというメリットも感じています。

また、 OSS のソースコードを読む場合、公式のドキュメントを読んで概念やキーワードを把握した上で、実際にテストコードやデバッグログを出して挙動を把握し、問題に関連しそうなクラス名に当たりをつけて、起動スクリプトのエントリポイントやそのクラス名で grep したところからコードリーディングをしていく、といったことを私はよくやります。

こういったコードリーディングの方法は、体系的な方法や情報源はあまりないとは思いますが、もし問題にぶつかった時の切り抜け方として参考になれば幸いです。

最後に、お約束ではありますが、弊社では上記のように OSS を使って問題解決が開発がしたいエンジニアや、デザイナー、プロダクトマネージャーなど絶賛大募集中です。少しでもご興味を持っていただけたら、お気軽にカジュアルランチからでも構いませんので、下のバナーや @yukung へ DM 等でご連絡いただければと思います。

最後までお読みいただいてありがとうございました!!


  1. Play Framework もバージョン 2.6 系から Akka HTTP をデフォルトのバックエンドとして利用していますし、単純に比較できるものではありませんが

Copyright © astamuse company, ltd. all rights reserved.