astamuse Lab

astamuse Labとは、アスタミューゼのエンジニアとデザイナーのブログです。アスタミューゼの事業・サービスを支えている知識と舞台裏の今を発信しています。

Akka HTTP の Graceful な Shutdown の方法 または私は如何にして OSS のコードを読んでいるか

どうもみなさまおはこんばんちわー。アプリケーションエンジニアの池田 (@yukung) です。3 度目の登場になります。

2020 年始まりましたね!時が経つのは早いもので私も入社してはや 1 年が経ちましたが、最近はエンジニアだけに限らず、様々な分野で専門を持つ方々が弊社に入社してきてくださっていて、そんな方々の自己紹介を聞くたびに、世の中には知らない世界がまだまだたくさんあるなぁ、と思う刺激的な毎日です。

2020 年の心配事は、オリンピックによる混雑ですねぇ。あまりに混雑するようなら、どうせなら期間中はリモートワークで切り抜けようかな、とか密かに思っています。(弊社には宮崎からリモートワークで働いてるエンジニアもいます)

lab.astamuse.co.jp

Akka HTTP はいいぞ

f:id:astamuse:20200124182009p:plain

さて、今回は何を書こうかと考えた時に、前々回前回と技術的なことを書いていなかったことを思い出して、今回は素直に技術に関することを書くことにします。

弊社では主に Scala を使ったアプリケーション開発を行っていますが、 Web アプリケーションの開発は Play Framework の他に、 Akka HTTP を使うプロジェクトもちらほら出てきました。私自身も最近初めて Akka HTTP を用いて新規のアプリケーションを構築しましたが、画面を伴ったアプリケーションではなく、 RESTish な API を作るのであれば、 Akka HTTP でも十分開発はできるなぁ、という感触を持ちました。

Akka HTTP は、公式ドキュメントに「 Web アプリケーションフレームワークではなく HTTP ベースのサービスを提供するより汎用的なツールキットである」と記載がある通り、 Play Framework のようなフルスタックなフレームワークではないため、初めから色々揃っているものではありません。ただしその分解決したい問題に対して自分で使う道具を取捨選択できるため、いわゆる「薄い」フレームワークのようなものを好む方にとってはこちらの方が肌に合うように感じます。1

実際使って作ってみた感触としては、「書いたようにしか動かない」という感覚を持ちました。これはどちらかというとポジティブな感覚で、要はハマっても予測可能である、ということです。エラーが出ても自分がそう書いていないからエラーになるのであって、エラーの内容とスタックトレースを元に地道に調べれば必ず答えはそこにあります。個人の感覚ですが、実際他のフレームワークと比べて、フレームワーク固有のお作法などによる原因の分からない問題に振り回されることは少なかった気がしています。

以下はサーバを起動させる部分のコードの最小のサンプルです。公式ドキュメントからの転載となりますが、

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import scala.io.StdIn

object WebServer {
  def main(args: Array[String]) {

    implicit val system = ActorSystem("my-system")
    implicit val materializer = ActorMaterializer()
    // needed for the future flatMap/onComplete in the end
    implicit val executionContext = system.dispatcher

    val route =
      path("hello") {
        get {
          complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, "<h1>Say hello to akka-http</h1>"))
        }
      }

    val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)

    println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
    StdIn.readLine() // let it run until user presses return
    bindingFuture
      .flatMap(_.unbind()) // trigger unbinding from the port
      .onComplete(_ => system.terminate()) // and shutdown when done
  }
}

これだけで、 /hello エンドポイントからレスポンスが返ってくるサーバが立ち上がります。また main メソッドを持つ単なる Scala プログラムですので、ソースコードを追う際もエントリポイントが明確なため調査がしやすいところも気に入っています。

Akka HTTP の Graceful Shutdown ってどうやるの?

そんな Akka HTTP について、サーバを起動させることは簡単なのですが、サーバの終了処理について少し悩みました。上記のサンプルのように基本的には akka.actor.ActorSystem#terminate() を呼び出せば、起動したサーバは停止します。が、これは起動中のプロセスに対して割り込みで停止させるため、通信中のコネクションも問答無用で切断します。いわゆる Graceful shutdown ではありません。

昨今は Docker といったコンテナベースのアプリケーション運用も多くなってきていますし、そうなるとカジュアルにサーバプロセスが停止することもままあります。そんな状況においては、サーバ終了時はアプリケーションに依存しているリソース( DB や接続中の HTTP コネクションなど)は丁寧に解放してから、サーバを終了したくなってきます。では、 Akka HTTP で Graceful shutdown を実現するにはどうしたらよいのでしょうか。

Akka HTTP の公式ドキュメントには、 Graceful termination についてのページがあり、こちらに割と詳細に記載があります。ここではサーバを終了させる方法として 2 つ紹介されています。

これらどちらも、有効な HTTP コネクションを終了させるには有効な方法です。が、その他にもサーバの終了処理では DB などのバックエンドとの通信を閉じるなどリソースの終了処理を挟み込みたくなるのですが、その方法については記載がありませんでした。理想的には、 SIGTERM を送ると Graceful shutdown を実行するように、 JVM の ShutdownHook を仕込むような形にしたいのです。

そこで、自前で JVM の ShutdownHook を挟み込んでみました。

// (途中まで省略)

val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)
indingFuture.onComplete {
  case Success(binding) =>
    system.log.info(s"Server online at http://\${binding.localAddress.getHostName}:\${binding.localAddress.getPort}/")
  case Failure(ex) =>
    system.log.error(ex, "An error has occurred!")
}
sys.addShutdownHook {
  bindingFuture
    .flatMap(_.unbind())
    .onComplete { _ =>
      materializer.shutdown()
      system.terminate()
    }
}

この実装で、サーバに対して SIGTERM を送ってみるとプロセスは終了してくれるものの、 debug ログを見るとやはり途中でバッサリ切れているように見えました。

[DEBUG] [07/26/2019 12:48:58.015] [main] [EventStream(akka://app-server)] logger log1-Logging$DefaultLogger started
[DEBUG] [07/26/2019 12:48:58.016] [main] [EventStream(akka://app-server)] Default Loggers started
7月 26, 2019 12:48:59 午後 wvlet.log.Logger log
情報: [session:58015e56] Starting a new lifecycle ...
7月 26, 2019 12:48:59 午後 wvlet.log.Logger log
情報: [session:58015e56] ======== STARTED ========
[DEBUG] [07/26/2019 12:48:59.597] [main] [AkkaSSLConfig(akka://app-server)] Initializing AkkaSSLConfig extension...
[DEBUG] [07/26/2019 12:48:59.599] [main] [AkkaSSLConfig(akka://app-server)] buildHostnameVerifier: created hostname verifier: com.typesafe.sslconfig.ssl.DefaultHostnameVerifier@5f0bab7e
7月 26, 2019 12:48:59 午後 wvlet.log.Logger log
情報: [session:58015e56] Stopping the lifecycle ...
7月 26, 2019 12:48:59 午後 wvlet.log.Logger log
情報: [session:58015e56] The lifecycle has stopped.
[DEBUG] [07/26/2019 12:48:59.915] [app-server-akka.actor.default-dispatcher-3] [akka://app-server/system/IO-TCP/selectors/$a/0] Successfully bound to /0:0:0:0:0:0:0:0:9000
[INFO] [07/26/2019 12:48:59.927] [app-server-akka.actor.default-dispatcher-3] [SearchServer(akka://app-server)] Server online at http://0:0:0:0:0:0:0:0:9000/
[DEBUG] [07/26/2019 12:49:01.471] [app-server-akka.actor.default-dispatcher-5] [akka://app-server/system/IO-TCP/selectors/$a/0] New connection accepted

Process finished with exit code 143 (interrupted by signal 15: SIGTERM)

これでは Graceful shutdown にはなりません。

公式ドキュメントの Graceful termination のページには、もう一つ Akka の Coordinated Shutdown についての記載があります。ただし、これについては Akka HTTP ではまだ未実装という記載があるのみですが、 issue へのリンク (#1210) があるのでこれを見てみると、 issue のやり取りの中で CoordinatedShutdown#addTask を Akka の各終了フェーズ (CoordinatedShutdown のコンパニオンオブジェクト内に定義があります) ごとに任意のタスクを挟み込むことができることが分かりました。

以下のようなコードでそれが実現できます。

val shutdown = CoordinatedShutdown(system)

shutdown.addTask(CoordinatedShutdown.PhaseServiceUnbind, "http-unbind") { () =>
  binding.unbind().map(_ => Done)
}

shutdown.addTask(CoordinatedShutdown.PhaseServiceRequestsDone, "http-graceful-terminate") { () =>
  binding.terminate(10.seconds).map(_ => Done)
}

shutdown.addTask(CoordinatedShutdown.PhaseServiceStop, "http-shutdown") { () =>
  Http().shutdownAllConnectionPools().map(_ => Done)
}

これによって、新しい HTTP 接続を拒否し、現存している HTTP 接続の終了を待ってから、バックエンドリソースへの接続を終了する処理を挟み込むことができそうです。この方針で終了処理を実装してログを見てみました。

shutdown.addTask(CoordinatedShutdown.PhaseServiceUnbind, "http-unbind") { () => bindingFuture.map(_.unbind()).map(_ => Done)}

shutdown.addTask(CoordinatedShutdown.PhaseServiceRequestsDone, "http-graceful-terminate") { () => bindingFuture.map(_.terminate(10.seconds)).map(_ => Done)}

shutdown.addTask(CoordinatedShutdown.PhaseServiceStop, "http-shutdown") { () => Http(system).shutdownAllConnectionPools().map(_ => Done)}

shutdown.addTask(CoordinatedShutdown.PhaseActorSystemTerminate, "app-shutdown") { () => system.terminate().map(_ => Done)}
7月 26, 2019 12:35:02 午後 wvlet.log.Logger log
情報: [session:58015e56] Stopping the lifecycle ...
7月 26, 2019 12:35:02 午後 wvlet.log.Logger log
情報: [session:58015e56] The lifecycle has stopped.
[DEBUG] [07/26/2019 12:35:02.883] [app-server-akka.actor.default-dispatcher-5] [akka://app-server/system/IO-TCP/selectors/$a/0] Successfully bound to /0:0:0:0:0:0:0:0:9000
[INFO] [07/26/2019 12:35:02.893] [app-server-akka.actor.default-dispatcher-5] [SearchServer(akka://app-server)] Server online at http://0:0:0:0:0:0:0:0:9000/
[DEBUG] [07/26/2019 12:35:09.445] [app-server-akka.actor.default-dispatcher-2] [akka://app-server/system/IO-TCP/selectors/$a/0] New connection accepted
[DEBUG] [07/26/2019 12:35:17.835] [app-server-shutdown-hook-1] [CoordinatedShutdown(akka://app-server)] Starting coordinated shutdown from JVM shutdown hook
[DEBUG] [07/26/2019 12:35:17.837] [app-server-shutdown-hook-1] [CoordinatedShutdown(akka://app-server)] Performing phase [before-service-unbind] with [0] tasks
[DEBUG] [07/26/2019 12:35:17.839] [app-server-akka.actor.default-dispatcher-3] [CoordinatedShutdown(akka://app-server)] Performing phase [service-unbind] with [1] tasks: [http-unbind]
[DEBUG] [07/26/2019 12:35:17.845] [app-server-akka.actor.default-dispatcher-5] [akka://app-server/system/IO-TCP/selectors/$a/0] Unbinding endpoint 0:0:0:0:0:0:0:0/0:0:0:0:0:0:0:0:9000
[DEBUG] [07/26/2019 12:35:17.845] [app-server-akka.actor.default-dispatcher-3] [CoordinatedShutdown(akka://app-server)] Performing phase [service-requests-done] with [1] tasks: [http-graceful-terminate]
[DEBUG] [07/26/2019 12:35:17.847] [app-server-akka.actor.default-dispatcher-3] [CoordinatedShutdown(akka://app-server)] Performing phase [service-stop] with [1] tasks: [http-shutdown]
[DEBUG] [07/26/2019 12:35:17.848] [app-server-akka.actor.default-dispatcher-5] [akka://app-server/system/IO-TCP/selectors/$a/0] Unbound endpoint 0:0:0:0:0:0:0:0/0:0:0:0:0:0:0:0:9000, stopping listener
[DEBUG] [07/26/2019 12:35:17.850] [app-server-akka.actor.default-dispatcher-3] [CoordinatedShutdown(akka://app-server)] Performing phase [before-cluster-shutdown] with [0] tasks
[DEBUG] [07/26/2019 12:35:17.850] [app-server-akka.actor.default-dispatcher-3] [CoordinatedShutdown(akka://app-server)] Performing phase [cluster-sharding-shutdown-region] with [0] tasks
[DEBUG] [07/26/2019 12:35:17.851] [app-server-akka.actor.default-dispatcher-3] [CoordinatedShutdown(akka://app-server)] Performing phase [cluster-leave] with [0] tasks
[DEBUG] [07/26/2019 12:35:17.851] [app-server-akka.actor.default-dispatcher-3] [CoordinatedShutdown(akka://app-server)] Performing phase [cluster-exiting] with [0] tasks
[DEBUG] [07/26/2019 12:35:17.851] [app-server-akka.actor.default-dispatcher-3] [CoordinatedShutdown(akka://app-server)] Performing phase [cluster-exiting-done] with [0] tasks
[DEBUG] [07/26/2019 12:35:17.851] [app-server-akka.actor.default-dispatcher-3] [CoordinatedShutdown(akka://app-server)] Performing phase [cluster-shutdown] with [0] tasks
[DEBUG] [07/26/2019 12:35:17.851] [app-server-akka.actor.default-dispatcher-3] [CoordinatedShutdown(akka://app-server)] Performing phase [before-actor-system-terminate] with [0] tasks
[DEBUG] [07/26/2019 12:35:17.851] [app-server-akka.actor.default-dispatcher-3] [CoordinatedShutdown(akka://app-server)] Performing phase [actor-system-terminate] with [2] tasks: [terminate-system, app-shutdown]
[DEBUG] [07/26/2019 12:35:17.856] [app-server-akka.actor.default-dispatcher-7] [GracefulTerminatorStage(akka://app-server)] [terminator] Initializing termination of server, deadline: 10.00 s
[DEBUG] [07/26/2019 12:35:17.859] [app-server-akka.actor.default-dispatcher-9] [EventStream] shutting down: StandardOutLogger

Process finished with exit code 143 (interrupted by signal 15: SIGTERM)

うまく終了フェーズごとにタスクが実行されている様が確認できますね。

本当にこれでいいんだっけ?

ここまで見てきた方法で Graceful shutdown が実装できそうな気がしてきましたが、まだ確信が持てなかったので、もうちょっと当たりをつけたく、直接ソースコードも見に行きました。

確認したのは以下の 2 つです。

  • akka.actor.ActorSystem の終了処理部分
  • Play Framework の Akka バックエンドの終了処理
    • Play Framework も内部で Akka HTTP を使っているので同じようなことをしているはず、という予測を立てました

akka.actor.ActorSystem の終了処理部分を見てみる

ActorSystem のコードを読んでみて知りましたが、実は、 ActorSystem のインスタンスが作られるときには、 JVM の ShutdownHook に ActorSystem の終了処理が追加されています。CoordinatedShutdown#createExtension にて、終了処理のフックを登録しています。

https://github.com/akka/akka/blob/master/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala

  override def createExtension(system: ExtendedActorSystem): CoordinatedShutdown = {
    val conf = system.settings.config.getConfig("akka.coordinated-shutdown")
    val phases = phasesFromConfig(conf)
    val coord = new CoordinatedShutdown(system, phases)
    initPhaseActorSystemTerminate(system, conf, coord)
    initJvmHook(system, conf, coord)
    // Avoid leaking actor system references when system is terminated before JVM is #23384
    // Catching RejectedExecutionException in case extension is accessed first time when
    // system is already terminated, see #25592. The extension is eagerly loaded when ActorSystem
    // is started but it might be a race between (failing?) startup and shutdown.
    def cleanupActorSystemJvmHook(): Unit = {
      coord.actorSystemJvmHook match {
        case OptionVal.Some(cancellable) if !runningJvmHook && !cancellable.isCancelled =>
          cancellable.cancel()
          coord.actorSystemJvmHook = OptionVal.None
        case _ =>
      }
    }
    try system.registerOnTermination(cleanupActorSystemJvmHook())
    catch {
      case _: RejectedExecutionException => cleanupActorSystemJvmHook()
    }
    coord
  }

CoordinatedShutdown#initPhaseActorSystemTerminate の実装は以下のようになっています。

  private def initPhaseActorSystemTerminate(
      system: ExtendedActorSystem,
      conf: Config,
      coord: CoordinatedShutdown): Unit = {
    coord.addTask(PhaseActorSystemTerminate, "terminate-system") { () =>
      val confForReason = confWithOverrides(conf, coord.shutdownReason())
      val terminateActorSystem = confForReason.getBoolean("terminate-actor-system")
      val exitJvm = confForReason.getBoolean("exit-jvm")
      val exitCode = confForReason.getInt("exit-code")

      if (exitJvm && terminateActorSystem) {
        // In case ActorSystem shutdown takes longer than the phase timeout,
        // exit the JVM forcefully anyway.
        // We must spawn a separate thread to not block current thread,
        // since that would have blocked the shutdown of the ActorSystem.
        val timeout = coord.timeout(PhaseActorSystemTerminate)
        val t = new Thread {
          override def run(): Unit = {
            if (Try(Await.ready(system.whenTerminated, timeout)).isFailure && !runningJvmHook)
              System.exit(exitCode)
          }
        }
        t.setName("CoordinatedShutdown-exit")
        t.start()
      }

      if (terminateActorSystem) {
        system.finalTerminate()
        system.whenTerminated.map { _ =>
          if (exitJvm && !runningJvmHook) System.exit(exitCode)
          Done
        }(ExecutionContexts.sameThreadExecutionContext)
      } else if (exitJvm) {
        System.exit(exitCode)
        Future.successful(Done)
      } else
        Future.successful(Done)
    }
  }

`CoordinatedShutdown#initJvmHook は以下のような実装になっています。

  private def initJvmHook(system: ActorSystem, conf: Config, coord: CoordinatedShutdown): Unit = {
    val runByJvmShutdownHook = system.settings.JvmShutdownHooks && conf.getBoolean("run-by-jvm-shutdown-hook")
    if (runByJvmShutdownHook) {
      coord.actorSystemJvmHook = OptionVal.Some(coord.addCancellableJvmShutdownHook {
        runningJvmHook = true // avoid System.exit from PhaseActorSystemTerminate task
        if (!system.whenTerminated.isCompleted) {
          coord.log.debug("Starting coordinated shutdown from JVM shutdown hook")
          try {
            // totalTimeout will be 0 when no tasks registered, so at least 3.seconds
            val totalTimeout = coord.totalTimeout().max(3.seconds)
            Await.ready(coord.run(JvmExitReason), totalTimeout)
          } catch {
            case NonFatal(e) =>
              coord.log.warning("CoordinatedShutdown from JVM shutdown failed: {}", e.getMessage)
          }
        }
      })
    }
  }

initPhaseActorSystemTerminateActorSystem に対する終了処理が、 initJvmHook で JVM の終了処理が登録されます。これによって先に試した自前で JVM の ShutdownHook で終了させようとしても、CoordinatedShutdown が登録したフックで、タイミングによっては ActorSystem が先に終了してしまい自前の処理が実行される前に JVM が終了してしまうようだ、ということが分かりました。

ということは HTTP 接続が全て開放され、 ActorSystem が終了する直前、 つまりCoordinatedShutdown#PhaseBeforeActorSystemTerminate フェーズで各種リソースの開放処理をすれば良さそうだ、ということが分かります。

実際に、 CoordinatedShutdown#PhaseBeforeActorSystemTerminate フェーズで終了の確認をしてみました。

  CoordinatedShutdown(system).addTask(CoordinatedShutdown.PhaseBeforeActorSystemTerminate, "app-shutdown") { () =>
    implicit val ec = system.dispatcher
    bindingFuture
      .flatMap(_.unbind())
      .onComplete { _ =>
        system.log.info("ちゃんと止まる?")
      }
    Future.successful(Done)
  }
7月 26, 2019 12:36:26 午後 wvlet.log.Logger log
情報: [session:4d3c6593] Stopping the lifecycle ...
7月 26, 2019 12:36:26 午後 wvlet.log.Logger log
情報: [session:4d3c6593] The lifecycle has stopped.
[DEBUG] [07/26/2019 12:36:26.053] [app-server-akka.actor.default-dispatcher-2] [akka://app-server/system/IO-TCP/selectors/$a/0] Successfully bound to /0:0:0:0:0:0:0:0:9000
[INFO] [07/26/2019 12:36:26.065] [app-server-akka.actor.default-dispatcher-2] [SearchServer(akka://app-server)] Server online at http://0:0:0:0:0:0:0:0:9000/
[DEBUG] [07/26/2019 12:36:30.507] [app-server-akka.actor.default-dispatcher-2] [akka://app-server/system/IO-TCP/selectors/$a/0] New connection accepted
[DEBUG] [07/26/2019 12:36:40.595] [app-server-shutdown-hook-1] [CoordinatedShutdown(akka://app-server)] Starting coordinated shutdown from JVM shutdown hook
[DEBUG] [07/26/2019 12:36:40.596] [app-server-shutdown-hook-1] [CoordinatedShutdown(akka://app-server)] Performing phase [before-service-unbind] with [0] tasks
[DEBUG] [07/26/2019 12:36:40.598] [app-server-akka.actor.default-dispatcher-4] [CoordinatedShutdown(akka://app-server)] Performing phase [service-unbind] with [0] tasks
[DEBUG] [07/26/2019 12:36:40.598] [app-server-akka.actor.default-dispatcher-4] [CoordinatedShutdown(akka://app-server)] Performing phase [service-requests-done] with [0] tasks
[DEBUG] [07/26/2019 12:36:40.598] [app-server-akka.actor.default-dispatcher-4] [CoordinatedShutdown(akka://app-server)] Performing phase [service-stop] with [0] tasks
[DEBUG] [07/26/2019 12:36:40.598] [app-server-akka.actor.default-dispatcher-4] [CoordinatedShutdown(akka://app-server)] Performing phase [before-cluster-shutdown] with [0] tasks
[DEBUG] [07/26/2019 12:36:40.598] [app-server-akka.actor.default-dispatcher-4] [CoordinatedShutdown(akka://app-server)] Performing phase [cluster-sharding-shutdown-region] with [0] tasks
[DEBUG] [07/26/2019 12:36:40.598] [app-server-akka.actor.default-dispatcher-4] [CoordinatedShutdown(akka://app-server)] Performing phase [cluster-leave] with [0] tasks
[DEBUG] [07/26/2019 12:36:40.598] [app-server-akka.actor.default-dispatcher-4] [CoordinatedShutdown(akka://app-server)] Performing phase [cluster-exiting] with [0] tasks
[DEBUG] [07/26/2019 12:36:40.598] [app-server-akka.actor.default-dispatcher-4] [CoordinatedShutdown(akka://app-server)] Performing phase [cluster-exiting-done] with [0] tasks
[DEBUG] [07/26/2019 12:36:40.598] [app-server-akka.actor.default-dispatcher-4] [CoordinatedShutdown(akka://app-server)] Performing phase [cluster-shutdown] with [0] tasks
[DEBUG] [07/26/2019 12:36:40.599] [app-server-akka.actor.default-dispatcher-4] [CoordinatedShutdown(akka://app-server)] Performing phase [before-actor-system-terminate] with [1] tasks: [app-shutdown]
[DEBUG] [07/26/2019 12:36:40.604] [app-server-akka.actor.default-dispatcher-5] [akka://app-server/system/IO-TCP/selectors/$a/0] Unbinding endpoint 0:0:0:0:0:0:0:0/0:0:0:0:0:0:0:0:9000
[DEBUG] [07/26/2019 12:36:40.605] [app-server-akka.actor.default-dispatcher-4] [CoordinatedShutdown(akka://app-server)] Performing phase [actor-system-terminate] with [1] tasks: [terminate-system]
[DEBUG] [07/26/2019 12:36:40.608] [app-server-akka.actor.default-dispatcher-2] [akka://app-server/system/IO-TCP/selectors/$a/0] Unbound endpoint 0:0:0:0:0:0:0:0/0:0:0:0:0:0:0:0:9000, stopping listener
[INFO] [07/26/2019 12:36:40.609] [app-server-akka.actor.default-dispatcher-9] [akka.actor.ActorSystemImpl(app-server)] ちゃんと止まる?
[DEBUG] [07/26/2019 12:36:40.611] [app-server-akka.actor.default-dispatcher-3] [EventStream] shutting down: StandardOutLogger

Process finished with exit code 143 (interrupted by signal 15: SIGTERM)

良さそうですね。

Play Framework の Akka バックエンドの終了処理を見てみる

実は Play Framework にも、 Coordinated Shutdown について記載されているページがあります。バックエンドが Akka HTTP ですので当たり前といえば当たり前ですね。

このページによれば、 Play Framework は CoordinatedShutdown を利用はしているが、完全には依存していないとも記載があります。Play Framework には CoordinatedShutdown の他に従来の ApplicationLifecycleでの終了の方法もあり、移行も可能だそうです。

実際に、ソースコードも確認してみました。 Play Framework で Akka HTTP のサーバを使っているのは、 play.core.server.AkkaHttpServer クラスです。

https://github.com/playframework/playframework/blob/master/transport/server/play-akka-http-server/src/main/scala/play/core/server/AkkaHttpServer.scala

// play.core.server.AkkaHttpServer
/**
 * Starts a Play server using Akka HTTP.
 */
class AkkaHttpServer(context: AkkaHttpServer.Context) extends Server {

  registerShutdownTasks()
  private def registerShutdownTasks(): Unit = {

    implicit val exCtx: ExecutionContext = context.actorSystem.dispatcher

    // Register all shutdown tasks
    val cs = CoordinatedShutdown(context.actorSystem)
    cs.addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "trace-server-stop-request") { () =>
      mode match {
        case Mode.Test =>
        case _         => logger.info("Stopping server...")
      }
      Future.successful(Done)
    }

    // Stop listening.
    // TODO: this can be improved so unbind is deferred until `service-stop`. We could
    // respond 503 in the meantime.
    cs.addTask(CoordinatedShutdown.PhaseServiceUnbind, "akka-http-server-unbind") { () =>
      def unbind(binding: Option[Http.ServerBinding]): Future[Done] =
        binding.map(_.unbind()).getOrElse(Future.successful(Done))

      for {
        _ <- unbind(httpServerBinding)
        _ <- unbind(httpsServerBinding)
      } yield Done
    }

    // Call provided hook
    // Do this last because the hooks were created before the server,
    // so the server might need them to run until the last moment.
    cs.addTask(CoordinatedShutdown.PhaseBeforeActorSystemTerminate, "user-provided-server-stop-hook") { () =>
      context.stopHook().map(_ => Done)
    }
    cs.addTask(CoordinatedShutdown.PhaseBeforeActorSystemTerminate, "shutdown-logger") { () =>
      Future {
        super.stop()
        Done
      }
    }

  }

Play Framework バックエンドの AkkaHttpServer でも、 ActorSystem 停止前のフェーズCoordinatedShutdown.PhaseBeforeActorSystemTerminate でユーザー定義の停止時のフックを実行するような実装になっています。

ここまでの確認で、このタイミングで Graceful shutdown の処理を実行すればよい、ということに自信を持つことができました。

CoordinatedShutdown についての補足

なお、 CoordinatedShutdown の振る舞い自体は設定である程度変えることができますCoordinatedShutdown 自体は Akka HTTP の仕組みというよりは主に akka-cluster のための機能のようなので、設定に関する情報も Akka HTTP のドキュメントではなく Akka Actors 本体のドキュメントに記載があります。

ここの設定で、CoordinatedShutdown が登録する JVM の ShutdownHook を無効にしたり、 ActorSystem の終了とともに JVM 自体の終了するデフォルトの振る舞いを止めて、 ActorSystem だけを停止し JVM のみ生かしたままにする、といった振る舞いにすることもできます。

実際に私が新規に構築したアプリケーションでは、 Akka HTTP に加えて DI ライブラリとして Airframe DI を利用しました。 Airframe DI の有効な Session もサーバの停止時に停止したかったため、CoordinatedShutdownの終了処理の外側で Airframe の Session を停止するために、 独自定義の JVM の ShutdownHook を登録する必要がありました。そこで ActorSystem が登録する ShutdownHook は無効にし、自前で ShutdownHook を登録してサーバ停止するように実装しました。

具体的には以下のような実装になりました。該当箇所だけ抜粋して抜粋します。

  def run(host: String, port: Int, config: Config, system: ActorSystem) = {
    val app = pureconfig.loadConfigOrThrow[AppSettings](config, "app")
    val design = newDesign
      .bind[ActorSystem].toInstance(system)
      .bind[ExecutionContext].toInstance(system.dispatcher)
      .add(configure(app))

    // Disable the airframe shutdown hook, as it will conflict with the akka-http shutdownhook
    val session = design.newSessionBuilder.noShutdownHook.create
    // Starting a dependency injection enabled scope by airframe
    session.start

    val server = session.build[AppServer].start(host, port, config, settings = ServerSettings(system))
    // Manually register a shutdown hook for stopping the server
    // The reason is that we want to stop backend data storage and DI container after stopping akka-http server.
    sys.addShutdownHook {
      server.stop()
    }
    server
  }

// server#start(), server#stop()
  def start(host: String, port: Int, config: Config, settings: ServerSettings): Server = {
    val bindingFuture = Http().bindAndHandle(handler = routes, interface = host, port = port, settings = settings)
    val bindTimeout = Duration(config.getString("akka.http.server.bind-timeout"))
    val serverBinding = Await.result(bindingFuture, bindTimeout)
    log.info(s"Server online at http://${serverBinding.localAddress.getHostName}:${serverBinding.localAddress.getPort}/")

    // Add task for graceful shutdown, and stop backend data storage
    registerShutdownTasks(serverBinding)
    this
  }

  def stop(): Unit = {
    val shutdownTimeout = CoordinatedShutdown(system).totalTimeout() + Duration(5, TimeUnit.SECONDS)
    Await.result(CoordinatedShutdown(system).run(JvmExitReason), shutdownTimeout)
  }
// application.conf
akka {
  jvm-shutdown-hooks = off      // CoordinatedShutdown による ShutdownHook の追加を止める
  coordinated-shutdown {
    terminate-actor-system = on // ActorSystem は停止する
    exit-jvm = off              // ActorSystem と同時に JVM の停止するのは止める
  }
}

まとめ

Akka HTTP での Graceful shutdown の実装を行うにあたり、調査したことをまとめてみました。Akka HTTP は薄い HTTP Toolkit ライブラリという位置づけもあって何から何までよしなにやってくれるフレームワークではありませんが、その分ソースコードを読めば何をやっているか把握しやすく、ハマった時に対処しやすいというメリットも感じています。

また、 OSS のソースコードを読む場合、公式のドキュメントを読んで概念やキーワードを把握した上で、実際にテストコードやデバッグログを出して挙動を把握し、問題に関連しそうなクラス名に当たりをつけて、起動スクリプトのエントリポイントやそのクラス名で grep したところからコードリーディングをしていく、といったことを私はよくやります。

こういったコードリーディングの方法は、体系的な方法や情報源はあまりないとは思いますが、もし問題にぶつかった時の切り抜け方として参考になれば幸いです。

最後に、お約束ではありますが、弊社では上記のように OSS を使って問題解決が開発がしたいエンジニアや、デザイナー、プロダクトマネージャーなど絶賛大募集中です。少しでもご興味を持っていただけたら、お気軽にカジュアルランチからでも構いませんので、下のバナーや @yukung へ DM 等でご連絡いただければと思います。

最後までお読みいただいてありがとうございました!!


  1. Play Framework もバージョン 2.6 系から Akka HTTP をデフォルトのバックエンドとして利用していますし、単純に比較できるものではありませんが

BERTの事前学習済みモデルを使って特許検索に挑戦してみる

f:id:astamuse:20200108191002p:plain

ご挨拶

新年、明けましておめでとうございます
本年もよろしくお願いいたします

データエンジニアのaranです
昨年の6月以来、2回目の登場になります

去年の1月に入社して、ちょうど1年経ち
月日の流れの早さを感じています

いきなりですが、皆様は年末年始をどのように過ごされましたでしょうか?
私は、家族の1人が、1月2日誕生日なこともあり
年始は、家族全員が本家に集まります

月日の経過と共に、甥っ子・姪っ子も増え、彼らも歳を重ねて行くので
年始の出費が、年々厳しくなっています
(財布から諭吉がいっぱい消えてゆく・・・涙)

年始の出費が気にならないぐらい稼ぎたいって心に誓い、新年を過ごしました

前書き

前回はコレスポンデンス分析についてお話しさせて頂きましたが
今回は、BERTを使った文章検索についてお話しさせて頂きます

何でまた?

弊社では、特許情報を取り扱っていて
ある特定の業界・分野の特許群を検索することがあります

目的の特許を検索するには
J-PlatPatと同じように単語マッチングで検索することが多いのですが
見つけたい特許を検索するには
どの単語を選択し、どのような組み合わせ条件にするかが重要で
これには、高度なスキルと経験を必要とします

このスキルに依存する状況を、少しでも改善させるベく
ある分野に関する1つ特許の要約・請求項・詳細説明等の文章から
その分野に関連する特許一覧を検索できれば
検索品質の平準化と作業工数の削減が期待できます

そこで、文章分類・質問応答・固有表現抽出等で
公開当時の最高性能(SOTA: State of the Art)を達成した
BERTに目をつけ、特許検索に使えるかどうか遊んで検証してみました
BERTを触っていると言えると、カッコつけられるとかでは・・

BERTとは

BERT(Bidirectional Encoder Representations from Transformers)は
2018年10月にGoogleの研究チームが公開した言語表現モデルです

論文:BERT: Pre-training of Deep Bidirectional Transformers for Language Understanding

今回、BERTを利用するに辺り、こちらのサイトで勉強させて頂きました www.ogis-ri.co.jp

BERTの特徴を1つあげるとすれば
事前学習(ラベルが不要)、ファインチューニング(ラベルが必要)と
2段階の学習ができる構成になります

事前学習は、大量データが必要のため、学習時間もリソース(お金)も必要としますが
事前学習済みモデルがGitにて公開されています
お試し段階では、これは非常に助かります!!
今回は、こちらの学習済みモデルを利用しました

何のデータを使うの?

弊社は国内特許のみならず、海外特許も取り扱います
検索対象にする特許情報は、EPO(欧州特許庁)のPATSTAT データベースを利用しました

PATSTATは、100ヶ国以上の特許庁において公開された特許が
収録されている特許データベースになります

どんな事をやるの?

PATSTATは英語の特許が多いので
特許の要約(英文)で検索できるようにして、類似した順に表示することを目的とし
複数の事前学習済みモデルで精度を比較してみました

やってみる

  1. 制約
    まず、BERTのREADME を読みますと、検索する文章を512トークン以下にする制約がありました
    (トークンについては、リンクを参照願います)

  2. 前提条件
    PATSATにある特許の要約(英文のみ)で
    512トークン以下になるだろう割合は99.5%以上を占めるため
    今回は、上記制約を考慮していません

  3. 評価データを用意
    1分野につき約20件、9分野で約180件データを用意してもらいました
    (お忙しい中、他部署の方にご協力頂きました!ありがとうございます)

  4. 評価データの妥当性チェック
    評価データ約180件を、プログラム上で9つに分類できないと
    特徴がない(同じような)文章になり、評価が難しくなります
    今回用意した評価データだけ、正しく分類できるかどうかを確認しました

  5. 非評価データの作成
    評価データ20件とは違う分野の特許を1,000件を作成しました
    非評価データの抽出方法は、 評価データに付与されている国際特許分類のクラスが違う特許を
    ランダムサンプリングしました

  6. 特許の要約をベクトル化
    評価データ20件、非評価データ1,000件の要約を
    BERTを使い、768次元のベクトルに変換しました

  7. 変換したベクトルデータを近傍探索
    評価データ20件+非評価データ1,000件を対象として
    全評価データで検索しました

  8. 検索結果を評価
    検索結果TOP20に、評価データが何件含まれるかでモデルを評価しました
    全評価データ20件を検索しているので、精度はアベレージになります

どうやって?

特許の要約をベクトル化するには bert-as-serviceを使いました

import os
import csv
from bert_serving.client import BertClient

# CSVファイル(アブスト一覧)を読み込む
with open('./abstracts.csv', encoding="utf_8") as f:
    abstracts = list(csv.reader(f, delimiter=",", doublequote=True, lineterminator="\r\n", quotechar='"', skipinitialspace=True))

# 要約をベクトル化する
with BertClient() as bc:
    vectors = bc.encode(abstracts)

評価データの妥当性チェックには
K-Meansでクラスタ分析しました

import pandas as pd
from sklearn.cluster import KMeans

# TSVファイル(ベクトル化データ+正解ラベル付)を読み込む
vec = pd.read_csv('./vec.tsv', sep='\t', header=None)

# クラスタ分析
km = KMeans(n_clusters=9, init='k-means++', n_init=10, max_iter=10000, tol=1e-06, n_jobs=-1)
pred = km.fit_predict(vec.iloc[:, :-1])

尚、評価データ(要約)に対してデータクレンジングすると
F1スコアが0.75から0.93まで上がりました

これにより、評価データを十分な精度でクラスタリングできるため
有効なデータと判断しました

文章の類似度判定には
単純に検索する文章ベクトルとのユーグリッド距離で
類似度をスコアリングしました

import pandas as pd

# 評価データ20件+非評価データ1,000件のベクトルデータを読み込む
vectors = pd.read_csv('./vectors.tsv', sep='\t', header=None)

# 評価データ各20件を文章検索する
# ※search_keyは、検索するINDEXを任意に設定しています
search_abstracts = vectors.iloc[search_key, :768].values
target_abstracts = vectors.iloc[:, :768].values
# 類似度スコアはユーグリッド距離とする
result ={i:np.linalg.norm(search_abstracts - target_abstracts[i]) for i in range(len(target_abstracts))}

まとめ

最終的に3つの学習済モデルで、精度検証しました
学習済みモデル別の各9分野の精度結果は、こちらになります

f:id:astamuse:20200109012715p:plain

具体的な分野名を明示できず、申し訳ないのですが
各分野で精度にバラツキがあり、類似する特許を検索しやすい分野と
しにくい分野があることがわかりました

また、本来なら 要約だけではなく タイトル、本文、請求項、出願者等の要素も考慮した上で
類似度スコアリングをしますが
要約だけでも、ある程度類似する特許を検索できたことで
BERTのポテンシャルの高さを感じました
(今回は評価データが少ないため、偏った結果になっているかもしれませんが)

最後に

今回のお話で
俺・私・ミーの方がもっとできるよって思う人はたくさんいると思います
そんな方は是非、弊社で一緒に働きませんか?

ここにジョインするまで、自然言語処理は未経験でしたが
手を挙げればチャレンジさせてもらえる弊社は
エンジニアライフを楽しめる会社だと、勝手に自負しています!!

アスタミューゼでは、エンジニア・デザイナーを募集中です
ご興味のある方は遠慮なく採用サイトからご応募ください。お待ちしています

GitLab CIを四倍速にした話

f:id:astamuse:20191220191229p:plain

chotaroです。豊洲PITでたくさんの人と頭の中のOMOIDEを共有してから一週間、興奮冷めやらぬ日々です。 astamuse lab年内最後の記事になります!

さて先日、ICPチームで利用しているGitLab CIの実行時間を4分の1にしました。 一度CIを動かすと1時間強待ちが発生する状況になっていたため、改善が急務でした。改善の結果と、どんなことをしたか自分の備忘も兼ねて書いておこうと思います。
悩める人の参考になれば嬉しいです。

before/after

10月に改善を実施したので、9月と11月の比較になります。

pipeline実行回数 合計実行時間(時間[分]) 平均実行時間
9月 164回 114時間[6880分] 0:41:57
11月 106回 15時間[945分] 0:08:55

1回のpipelineにかかる時間がなんと4分の1以下になりました(๑•̀ㅂ•́)و✧
せめて平均30分以内、を目指していたので大成功です!

改善ポイント

以下が主なポイントになります。

  • 無駄なjobは動かさないようにしましょう
  • 一度pullしたdocker imageを再利用するようにしましょう。
  • executorのスペックを見直しましょう。
  • 毎回利用する外部リソース(依存するjarとか)が存在する場合、cacheを活用して使いまわしましょう。

当たり前ですね!(※できてなかった

基本的には公式documentの記述をもとにそれぞれの環境に当てはめていけばいい感じになります。

それでは良いCIライフを!アデュー!

......というのもあんまりなのでかなり泥臭い内容ですが、私の闘いの記録を残します。

改善する前の状況

  • 言語は java / scala
  • マルチプロジェクトな構成のリポジトリ
  ~/pjroot
.
├── .git
├── .gitlab-ci.yml
├── api
│   ├── src
│   └── test
├── batch
│   ├── src
│   └── test
├── commonLibrary
│   ├── src
│   └── test
└── web
    ├── src
    └── test
  • 各PJで自動testを実施
  • どんな小さな変更でもすべてのテストを実施(README変更でもpipelineが実行されてしまう)
  • 一度のpipelineですべてのプロジェクトについて順次テストを実行
  • コミットのたび毎回全部のjobが実行される

->諸々の結果、一回あたり1時間強かかる上、pipelineの実行回数が無駄に多い

GitLab CIについて

利用方法

GitLab上で利用できるCIです。
ドキュメントはこちら
紙面の都合から詳細は省きますm( )m

ざっくり概要

  • ymlでjobや環境の設定を管理することができる
  • gitlab-runnerというCIランナーをgitlab上のGUIで管理して利用する。
  • executorがjobを実際に実行する。
    • executorには様々なものを設定できる。dockerや仮想マシン インスタンスなど。(インスタンスについてはawsやgcpが利用可能。)

我々の利用方法

  • GCPインスタンス上でgitlab-runnerが動いている
  • executorはdocker-machine形式により実行。
    • インスタンスを立ち上げ, dockerコンテナをそれ上で動かしてjobを実行する。

現状を確認してボトルネックを明確にする

jobの実行をgitlab上で観測する

まずどう動いているのかがわからなかったので、ひたすら眺めてみました。

  • gitlab-ciのjobのログをひたすら眺める
    • どこで詰まっているのかを調査するため、timeコマンドを挟むなどして実行時間を確認する
  • gitlab-runnerにモニタリングツールを挿してもらう
    • オートスケールの動き方など、どのようにexecutorがjobを処理していくのかをGUIで観測する

手元でrunnerを立てて観測してみる

リソースの状況などもっと詳細を確認したかったので手元でrunner、およびexecutorを立てて眺めてみました。

  • gitlab-runnerは手元で実行することが可能
    • gitlab-runner registerで対話形式で登録
  • executorをdockerにして実行
    • remote上で動かすときも実体はdockerコンテナ上でtestを実行しているだけなので、それをローカルで再現
    • 手元であればdocker psdocker statsで観測が可能

で、結局何がボトルネックになっていそうなのか

  • pipeline一本あたりに実行されるjobが多すぎる(再確認

複数PJが一つのリポジトリに乗っかっていて、一つのPJ修正でも全部のテストが起動している

  • 一度に並列に実行されるjobが少なく、待ちが発生している

jobが2並列しか実行できないのにpiplineの同じステージに5個jobが積まれている、、、

  • docker imageをpullするのに毎回時間がかかっている

前回実行時に取得したimageなどを使えないのか?

  • 依存するライブラリの取得に時間がかかっている

cacheで共有化できないか?

  • CPUの使用率がコンパイルのタイミングで跳ね上がる

最小のインスタンスを使っている現状ではスペック不足なのでは?

ボトルネックをそれぞれ対処してみる

pipelineで実行されるjobの制御

更新されているものが何か、によって実行JOBを制御します。

  • タイミングの制御:MRを立てている場合だけ実行する
  • 実行JOBの制御:該当directoryを変更している場合のみ実行する

各JOBについて、以下のようにonlyを記載することで制御します。

job:
    only:
        changes:
            - batch/**/*
        refs:
            - merge_request

並列実行数を増やす

これはgitlab-runnerのconcurrent設定で実現できます
ただし、このconcurrentを増やすほどexecutorが並列実行される=リソースを食い合うので、設定には注意しましょう

/[config_directory]>/config.toml

concurrent = 5

docker imageを使い回す

利用しようとしているimageがすでにpullされている場合、新たにpullするのをやめるようにします。
これはgitlab-runnerのpull_policy設定で実現できます

※公式documentに記載有り

/[config_directory]>/config.toml

[[runners]]
  [runners.docker]
    pull_policy = "if-not-present"

cacheの仕組みを利用して、一度取得したjarを別のpipelineでも使い回す

cacheを利用して、scalaの依存解決速度を爆速にします
※cacheについての公式ドキュメントはこちら

cacheの保存場所の設定を行わない場合、実行する環境でしかそのcacheを使い回せません。
そのため、gcs上にcacheを保存するbucketを作成して、そこに保存するようにします。runnerの設定にて指定することができます。

/[config_directory]>/config.toml

  [runners.cache]
    Type = "gcs"
    Path = "cache"
    Shared = true
      [runners.cache.gcs]
        CredentialsFile = "<credential fileを配置したpath>"
        BucketName = "<cacheを保存先のbucketのname>"

cacheの保存先の設定ができたら、次にcacheを利用するようjobの設定を行います。
ivyでの依存解決に時間がかかっていることがわかったので、.ivy2配下のcacheをpipelineを跨いで使い回すように設定します。

# 環境変数でsbtの参照先を上書きします
variables:
    JAVA_TOOL_OPTIONS: "-Dsbt.ivy.home=${CI_PROJECT_DIR}/.ivy2/ -Dsbt.global.base=./.sbtboot -Dsbt.boot.directory=./.boot"
    # coursierを利用する場合はこちらも指定してあげましょう
    COURSIER_CACHE: "${CI_PROJECT_DIR}/.cache"

...

job:
    cache:
        # jobの名前単位でkeyを設定することで、pipelineを跨いで同じjobでcacheを再利用させます
        key: app-dependencies-cache-${CI_JOB_NAME}
        paths:
            - ${CI_PROJECT_DIR}/.ivy2/

設定したら何度かjobを実行してみます。

cacheを利用できている場合、開始時にDownloading cache.zip from ~~, jobの終了時にUploading cache.zip to ~~~と表示されるので確認しましょう。

注意事項

cacheが効いてない?と感じる場合、以下のことを疑いましょう

  • cacheするディレクトリは間違っていませんか?
  • 使いまわしたいリソースを上書きしていませんか?
    • keyを元にpathを作成し、そのpathにzipファイルを保存しにいくため、同じkeyを用いてしまうと、cacheの内容が上書きされてしまいます
    • 改善策:jobごとにcache-keyを作成して、jobごとに保存するようにする(上記のkey設定を参考にしてください)

executorのスペックを見直してコンパイル速度を改善する 金で殴るともいう

ローカルでdockerに4コア使わせたところ、300%ほど利用していることがわかったので、スペックの調整をします。
executorが起動するインスタンスの設定を修正します。

docker-machineのオートスケールはDockerMachineの仕組みに依存するとのことなので、DockerMachineのgce向けのドキュメントを参考にインフラ担当に設定してもらいます(ありがとうございました!)

/[config_directory]>/config.toml

[[runners]]
    [runners.machine]
        MachineOptions = [
        ...

job設定に環境のスペックを確認するコマンドを入れるなどして反映されていることを確認しましょう。

まとめとして

これらの結果が前述の成果になります(再掲

pipeline実行回数 合計実行時間(時間[分]) 平均実行時間
9月 164回 114時間[6880分] 0:41:57
11月 106回 15時間[945分] 0:08:55

とにかく改善しないと開発のスピードがバリヤバいという危機感から、一つ一つ調べたり、仮説を立てて実践を繰り返しました。

一番効果があったのは何かと言われると明確ではないのですが、

  • executorのスペックを見直してコンパイル速度を改善する

が一つの鍵になり、そこ以外の地味な部分がすべて活用されるようになった結果劇的にjob時間が短くなった印象です。

成果だけみると月間100時間短縮して素晴らしい改善効果なんですが、一方で「改善をあとに回した結果、数十ないし数百時間を無駄にしてしまっていた」という事実も浮き彫りに。。。

もっと早く問題意識を持って改善すべきだった事案です。反省。

gitlab-ciはまだ不慣れな面も多々あるのですが、今回の件でだいたいお友達になれたような気がします。 来年はDeployフローの改善に着手したいところです。

次回更新は年明け予定です。それでは皆様、良いお年を!

Copyright © astamuse company, ltd. all rights reserved.