どうもみなさまおはこんばんちわー。アプリケーションエンジニアの池田 (@yukung) です。3 度目の登場になります。
2020 年始まりましたね!時が経つのは早いもので私も入社してはや 1 年が経ちましたが、最近はエンジニアだけに限らず、様々な分野で専門を持つ方々が弊社に入社してきてくださっていて、そんな方々の自己紹介を聞くたびに、世の中には知らない世界がまだまだたくさんあるなぁ、と思う刺激的な毎日です。
2020 年の心配事は、オリンピックによる混雑ですねぇ。あまりに混雑するようなら、どうせなら期間中はリモートワークで切り抜けようかな、とか密かに思っています。(弊社には宮崎からリモートワークで働いてるエンジニアもいます)
Akka HTTP はいいぞ
さて、今回は何を書こうかと考えた時に、前々回、前回と技術的なことを書いていなかったことを思い出して、今回は素直に技術に関することを書くことにします。
弊社では主に Scala を使ったアプリケーション開発を行っていますが、 Web アプリケーションの開発は Play Framework の他に、 Akka HTTP を使うプロジェクトもちらほら出てきました。私自身も最近初めて Akka HTTP を用いて新規のアプリケーションを構築しましたが、画面を伴ったアプリケーションではなく、 RESTish な API を作るのであれば、 Akka HTTP でも十分開発はできるなぁ、という感触を持ちました。
Akka HTTP は、公式ドキュメントに「 Web アプリケーションフレームワークではなく HTTP ベースのサービスを提供するより汎用的なツールキットである」と記載がある通り、 Play Framework のようなフルスタックなフレームワークではないため、初めから色々揃っているものではありません。ただしその分解決したい問題に対して自分で使う道具を取捨選択できるため、いわゆる「薄い」フレームワークのようなものを好む方にとってはこちらの方が肌に合うように感じます。1
実際使って作ってみた感触としては、「書いたようにしか動かない」という感覚を持ちました。これはどちらかというとポジティブな感覚で、要はハマっても予測可能である、ということです。エラーが出ても自分がそう書いていないからエラーになるのであって、エラーの内容とスタックトレースを元に地道に調べれば必ず答えはそこにあります。個人の感覚ですが、実際他のフレームワークと比べて、フレームワーク固有のお作法などによる原因の分からない問題に振り回されることは少なかった気がしています。
以下はサーバを起動させる部分のコードの最小のサンプルです。公式ドキュメントからの転載となりますが、
import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.model._ import akka.http.scaladsl.server.Directives._ import akka.stream.ActorMaterializer import scala.io.StdIn object WebServer { def main(args: Array[String]) { implicit val system = ActorSystem("my-system") implicit val materializer = ActorMaterializer() // needed for the future flatMap/onComplete in the end implicit val executionContext = system.dispatcher val route = path("hello") { get { complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, "<h1>Say hello to akka-http</h1>")) } } val bindingFuture = Http().bindAndHandle(route, "localhost", 8080) println(s"Server online at http://localhost:8080/\nPress RETURN to stop...") StdIn.readLine() // let it run until user presses return bindingFuture .flatMap(_.unbind()) // trigger unbinding from the port .onComplete(_ => system.terminate()) // and shutdown when done } }
これだけで、 /hello
エンドポイントからレスポンスが返ってくるサーバが立ち上がります。また main
メソッドを持つ単なる Scala プログラムですので、ソースコードを追う際もエントリポイントが明確なため調査がしやすいところも気に入っています。
Akka HTTP の Graceful Shutdown ってどうやるの?
そんな Akka HTTP について、サーバを起動させることは簡単なのですが、サーバの終了処理について少し悩みました。上記のサンプルのように基本的には akka.actor.ActorSystem#terminate()
を呼び出せば、起動したサーバは停止します。が、これは起動中のプロセスに対して割り込みで停止させるため、通信中のコネクションも問答無用で切断します。いわゆる Graceful shutdown ではありません。
昨今は Docker といったコンテナベースのアプリケーション運用も多くなってきていますし、そうなるとカジュアルにサーバプロセスが停止することもままあります。そんな状況においては、サーバ終了時はアプリケーションに依存しているリソース( DB や接続中の HTTP コネクションなど)は丁寧に解放してから、サーバを終了したくなってきます。では、 Akka HTTP で Graceful shutdown を実現するにはどうしたらよいのでしょうか。
Akka HTTP の公式ドキュメントには、 Graceful termination についてのページがあり、こちらに割と詳細に記載があります。ここではサーバを終了させる方法として 2 つ紹介されています。
これらどちらも、有効な HTTP コネクションを終了させるには有効な方法です。が、その他にもサーバの終了処理では DB などのバックエンドとの通信を閉じるなどリソースの終了処理を挟み込みたくなるのですが、その方法については記載がありませんでした。理想的には、 SIGTERM
を送ると Graceful shutdown を実行するように、 JVM の ShutdownHook を仕込むような形にしたいのです。
そこで、自前で JVM の ShutdownHook を挟み込んでみました。
// (途中まで省略) val bindingFuture = Http().bindAndHandle(route, "localhost", 8080) indingFuture.onComplete { case Success(binding) => system.log.info(s"Server online at http://\${binding.localAddress.getHostName}:\${binding.localAddress.getPort}/") case Failure(ex) => system.log.error(ex, "An error has occurred!") } sys.addShutdownHook { bindingFuture .flatMap(_.unbind()) .onComplete { _ => materializer.shutdown() system.terminate() } }
この実装で、サーバに対して SIGTERM
を送ってみるとプロセスは終了してくれるものの、 debug ログを見るとやはり途中でバッサリ切れているように見えました。
[DEBUG] [07/26/2019 12:48:58.015] [main] [EventStream(akka://app-server)] logger log1-Logging$DefaultLogger started [DEBUG] [07/26/2019 12:48:58.016] [main] [EventStream(akka://app-server)] Default Loggers started 7月 26, 2019 12:48:59 午後 wvlet.log.Logger log 情報: [session:58015e56] Starting a new lifecycle ... 7月 26, 2019 12:48:59 午後 wvlet.log.Logger log 情報: [session:58015e56] ======== STARTED ======== [DEBUG] [07/26/2019 12:48:59.597] [main] [AkkaSSLConfig(akka://app-server)] Initializing AkkaSSLConfig extension... [DEBUG] [07/26/2019 12:48:59.599] [main] [AkkaSSLConfig(akka://app-server)] buildHostnameVerifier: created hostname verifier: com.typesafe.sslconfig.ssl.DefaultHostnameVerifier@5f0bab7e 7月 26, 2019 12:48:59 午後 wvlet.log.Logger log 情報: [session:58015e56] Stopping the lifecycle ... 7月 26, 2019 12:48:59 午後 wvlet.log.Logger log 情報: [session:58015e56] The lifecycle has stopped. [DEBUG] [07/26/2019 12:48:59.915] [app-server-akka.actor.default-dispatcher-3] [akka://app-server/system/IO-TCP/selectors/$a/0] Successfully bound to /0:0:0:0:0:0:0:0:9000 [INFO] [07/26/2019 12:48:59.927] [app-server-akka.actor.default-dispatcher-3] [SearchServer(akka://app-server)] Server online at http://0:0:0:0:0:0:0:0:9000/ [DEBUG] [07/26/2019 12:49:01.471] [app-server-akka.actor.default-dispatcher-5] [akka://app-server/system/IO-TCP/selectors/$a/0] New connection accepted Process finished with exit code 143 (interrupted by signal 15: SIGTERM)
これでは Graceful shutdown にはなりません。
公式ドキュメントの Graceful termination のページには、もう一つ Akka の Coordinated Shutdown についての記載があります。ただし、これについては Akka HTTP ではまだ未実装という記載があるのみですが、 issue へのリンク (#1210) があるのでこれを見てみると、 issue のやり取りの中で CoordinatedShutdown#addTask
を Akka の各終了フェーズ (CoordinatedShutdown
のコンパニオンオブジェクト内に定義があります) ごとに任意のタスクを挟み込むことができることが分かりました。
以下のようなコードでそれが実現できます。
val shutdown = CoordinatedShutdown(system) shutdown.addTask(CoordinatedShutdown.PhaseServiceUnbind, "http-unbind") { () => binding.unbind().map(_ => Done) } shutdown.addTask(CoordinatedShutdown.PhaseServiceRequestsDone, "http-graceful-terminate") { () => binding.terminate(10.seconds).map(_ => Done) } shutdown.addTask(CoordinatedShutdown.PhaseServiceStop, "http-shutdown") { () => Http().shutdownAllConnectionPools().map(_ => Done) }
これによって、新しい HTTP 接続を拒否し、現存している HTTP 接続の終了を待ってから、バックエンドリソースへの接続を終了する処理を挟み込むことができそうです。この方針で終了処理を実装してログを見てみました。
shutdown.addTask(CoordinatedShutdown.PhaseServiceUnbind, "http-unbind") { () => bindingFuture.map(_.unbind()).map(_ => Done)} shutdown.addTask(CoordinatedShutdown.PhaseServiceRequestsDone, "http-graceful-terminate") { () => bindingFuture.map(_.terminate(10.seconds)).map(_ => Done)} shutdown.addTask(CoordinatedShutdown.PhaseServiceStop, "http-shutdown") { () => Http(system).shutdownAllConnectionPools().map(_ => Done)} shutdown.addTask(CoordinatedShutdown.PhaseActorSystemTerminate, "app-shutdown") { () => system.terminate().map(_ => Done)}
7月 26, 2019 12:35:02 午後 wvlet.log.Logger log 情報: [session:58015e56] Stopping the lifecycle ... 7月 26, 2019 12:35:02 午後 wvlet.log.Logger log 情報: [session:58015e56] The lifecycle has stopped. [DEBUG] [07/26/2019 12:35:02.883] [app-server-akka.actor.default-dispatcher-5] [akka://app-server/system/IO-TCP/selectors/$a/0] Successfully bound to /0:0:0:0:0:0:0:0:9000 [INFO] [07/26/2019 12:35:02.893] [app-server-akka.actor.default-dispatcher-5] [SearchServer(akka://app-server)] Server online at http://0:0:0:0:0:0:0:0:9000/ [DEBUG] [07/26/2019 12:35:09.445] [app-server-akka.actor.default-dispatcher-2] [akka://app-server/system/IO-TCP/selectors/$a/0] New connection accepted [DEBUG] [07/26/2019 12:35:17.835] [app-server-shutdown-hook-1] [CoordinatedShutdown(akka://app-server)] Starting coordinated shutdown from JVM shutdown hook [DEBUG] [07/26/2019 12:35:17.837] [app-server-shutdown-hook-1] [CoordinatedShutdown(akka://app-server)] Performing phase [before-service-unbind] with [0] tasks [DEBUG] [07/26/2019 12:35:17.839] [app-server-akka.actor.default-dispatcher-3] [CoordinatedShutdown(akka://app-server)] Performing phase [service-unbind] with [1] tasks: [http-unbind] [DEBUG] [07/26/2019 12:35:17.845] [app-server-akka.actor.default-dispatcher-5] [akka://app-server/system/IO-TCP/selectors/$a/0] Unbinding endpoint 0:0:0:0:0:0:0:0/0:0:0:0:0:0:0:0:9000 [DEBUG] [07/26/2019 12:35:17.845] [app-server-akka.actor.default-dispatcher-3] [CoordinatedShutdown(akka://app-server)] Performing phase [service-requests-done] with [1] tasks: [http-graceful-terminate] [DEBUG] [07/26/2019 12:35:17.847] [app-server-akka.actor.default-dispatcher-3] [CoordinatedShutdown(akka://app-server)] Performing phase [service-stop] with [1] tasks: [http-shutdown] [DEBUG] [07/26/2019 12:35:17.848] [app-server-akka.actor.default-dispatcher-5] [akka://app-server/system/IO-TCP/selectors/$a/0] Unbound endpoint 0:0:0:0:0:0:0:0/0:0:0:0:0:0:0:0:9000, stopping listener [DEBUG] [07/26/2019 12:35:17.850] [app-server-akka.actor.default-dispatcher-3] [CoordinatedShutdown(akka://app-server)] Performing phase [before-cluster-shutdown] with [0] tasks [DEBUG] [07/26/2019 12:35:17.850] [app-server-akka.actor.default-dispatcher-3] [CoordinatedShutdown(akka://app-server)] Performing phase [cluster-sharding-shutdown-region] with [0] tasks [DEBUG] [07/26/2019 12:35:17.851] [app-server-akka.actor.default-dispatcher-3] [CoordinatedShutdown(akka://app-server)] Performing phase [cluster-leave] with [0] tasks [DEBUG] [07/26/2019 12:35:17.851] [app-server-akka.actor.default-dispatcher-3] [CoordinatedShutdown(akka://app-server)] Performing phase [cluster-exiting] with [0] tasks [DEBUG] [07/26/2019 12:35:17.851] [app-server-akka.actor.default-dispatcher-3] [CoordinatedShutdown(akka://app-server)] Performing phase [cluster-exiting-done] with [0] tasks [DEBUG] [07/26/2019 12:35:17.851] [app-server-akka.actor.default-dispatcher-3] [CoordinatedShutdown(akka://app-server)] Performing phase [cluster-shutdown] with [0] tasks [DEBUG] [07/26/2019 12:35:17.851] [app-server-akka.actor.default-dispatcher-3] [CoordinatedShutdown(akka://app-server)] Performing phase [before-actor-system-terminate] with [0] tasks [DEBUG] [07/26/2019 12:35:17.851] [app-server-akka.actor.default-dispatcher-3] [CoordinatedShutdown(akka://app-server)] Performing phase [actor-system-terminate] with [2] tasks: [terminate-system, app-shutdown] [DEBUG] [07/26/2019 12:35:17.856] [app-server-akka.actor.default-dispatcher-7] [GracefulTerminatorStage(akka://app-server)] [terminator] Initializing termination of server, deadline: 10.00 s [DEBUG] [07/26/2019 12:35:17.859] [app-server-akka.actor.default-dispatcher-9] [EventStream] shutting down: StandardOutLogger Process finished with exit code 143 (interrupted by signal 15: SIGTERM)
うまく終了フェーズごとにタスクが実行されている様が確認できますね。
本当にこれでいいんだっけ?
ここまで見てきた方法で Graceful shutdown が実装できそうな気がしてきましたが、まだ確信が持てなかったので、もうちょっと当たりをつけたく、直接ソースコードも見に行きました。
確認したのは以下の 2 つです。
akka.actor.ActorSystem
の終了処理部分- Play Framework の Akka バックエンドの終了処理
- Play Framework も内部で Akka HTTP を使っているので同じようなことをしているはず、という予測を立てました
akka.actor.ActorSystem
の終了処理部分を見てみる
ActorSystem
のコードを読んでみて知りましたが、実は、 ActorSystem
のインスタンスが作られるときには、 JVM の ShutdownHook に ActorSystem
の終了処理が追加されています。CoordinatedShutdown#createExtension
にて、終了処理のフックを登録しています。
override def createExtension(system: ExtendedActorSystem): CoordinatedShutdown = { val conf = system.settings.config.getConfig("akka.coordinated-shutdown") val phases = phasesFromConfig(conf) val coord = new CoordinatedShutdown(system, phases) initPhaseActorSystemTerminate(system, conf, coord) initJvmHook(system, conf, coord) // Avoid leaking actor system references when system is terminated before JVM is #23384 // Catching RejectedExecutionException in case extension is accessed first time when // system is already terminated, see #25592. The extension is eagerly loaded when ActorSystem // is started but it might be a race between (failing?) startup and shutdown. def cleanupActorSystemJvmHook(): Unit = { coord.actorSystemJvmHook match { case OptionVal.Some(cancellable) if !runningJvmHook && !cancellable.isCancelled => cancellable.cancel() coord.actorSystemJvmHook = OptionVal.None case _ => } } try system.registerOnTermination(cleanupActorSystemJvmHook()) catch { case _: RejectedExecutionException => cleanupActorSystemJvmHook() } coord }
CoordinatedShutdown#initPhaseActorSystemTerminate
の実装は以下のようになっています。
private def initPhaseActorSystemTerminate( system: ExtendedActorSystem, conf: Config, coord: CoordinatedShutdown): Unit = { coord.addTask(PhaseActorSystemTerminate, "terminate-system") { () => val confForReason = confWithOverrides(conf, coord.shutdownReason()) val terminateActorSystem = confForReason.getBoolean("terminate-actor-system") val exitJvm = confForReason.getBoolean("exit-jvm") val exitCode = confForReason.getInt("exit-code") if (exitJvm && terminateActorSystem) { // In case ActorSystem shutdown takes longer than the phase timeout, // exit the JVM forcefully anyway. // We must spawn a separate thread to not block current thread, // since that would have blocked the shutdown of the ActorSystem. val timeout = coord.timeout(PhaseActorSystemTerminate) val t = new Thread { override def run(): Unit = { if (Try(Await.ready(system.whenTerminated, timeout)).isFailure && !runningJvmHook) System.exit(exitCode) } } t.setName("CoordinatedShutdown-exit") t.start() } if (terminateActorSystem) { system.finalTerminate() system.whenTerminated.map { _ => if (exitJvm && !runningJvmHook) System.exit(exitCode) Done }(ExecutionContexts.sameThreadExecutionContext) } else if (exitJvm) { System.exit(exitCode) Future.successful(Done) } else Future.successful(Done) } }
`CoordinatedShutdown#initJvmHook
は以下のような実装になっています。
private def initJvmHook(system: ActorSystem, conf: Config, coord: CoordinatedShutdown): Unit = { val runByJvmShutdownHook = system.settings.JvmShutdownHooks && conf.getBoolean("run-by-jvm-shutdown-hook") if (runByJvmShutdownHook) { coord.actorSystemJvmHook = OptionVal.Some(coord.addCancellableJvmShutdownHook { runningJvmHook = true // avoid System.exit from PhaseActorSystemTerminate task if (!system.whenTerminated.isCompleted) { coord.log.debug("Starting coordinated shutdown from JVM shutdown hook") try { // totalTimeout will be 0 when no tasks registered, so at least 3.seconds val totalTimeout = coord.totalTimeout().max(3.seconds) Await.ready(coord.run(JvmExitReason), totalTimeout) } catch { case NonFatal(e) => coord.log.warning("CoordinatedShutdown from JVM shutdown failed: {}", e.getMessage) } } }) } }
initPhaseActorSystemTerminate
で ActorSystem
に対する終了処理が、 initJvmHook
で JVM の終了処理が登録されます。これによって先に試した自前で JVM の ShutdownHook で終了させようとしても、CoordinatedShutdown
が登録したフックで、タイミングによっては ActorSystem
が先に終了してしまい自前の処理が実行される前に JVM が終了してしまうようだ、ということが分かりました。
ということは HTTP 接続が全て開放され、 ActorSystem
が終了する直前、 つまりCoordinatedShutdown#PhaseBeforeActorSystemTerminate
フェーズで各種リソースの開放処理をすれば良さそうだ、ということが分かります。
実際に、 CoordinatedShutdown#PhaseBeforeActorSystemTerminate
フェーズで終了の確認をしてみました。
CoordinatedShutdown(system).addTask(CoordinatedShutdown.PhaseBeforeActorSystemTerminate, "app-shutdown") { () => implicit val ec = system.dispatcher bindingFuture .flatMap(_.unbind()) .onComplete { _ => system.log.info("ちゃんと止まる?") } Future.successful(Done) }
7月 26, 2019 12:36:26 午後 wvlet.log.Logger log 情報: [session:4d3c6593] Stopping the lifecycle ... 7月 26, 2019 12:36:26 午後 wvlet.log.Logger log 情報: [session:4d3c6593] The lifecycle has stopped. [DEBUG] [07/26/2019 12:36:26.053] [app-server-akka.actor.default-dispatcher-2] [akka://app-server/system/IO-TCP/selectors/$a/0] Successfully bound to /0:0:0:0:0:0:0:0:9000 [INFO] [07/26/2019 12:36:26.065] [app-server-akka.actor.default-dispatcher-2] [SearchServer(akka://app-server)] Server online at http://0:0:0:0:0:0:0:0:9000/ [DEBUG] [07/26/2019 12:36:30.507] [app-server-akka.actor.default-dispatcher-2] [akka://app-server/system/IO-TCP/selectors/$a/0] New connection accepted [DEBUG] [07/26/2019 12:36:40.595] [app-server-shutdown-hook-1] [CoordinatedShutdown(akka://app-server)] Starting coordinated shutdown from JVM shutdown hook [DEBUG] [07/26/2019 12:36:40.596] [app-server-shutdown-hook-1] [CoordinatedShutdown(akka://app-server)] Performing phase [before-service-unbind] with [0] tasks [DEBUG] [07/26/2019 12:36:40.598] [app-server-akka.actor.default-dispatcher-4] [CoordinatedShutdown(akka://app-server)] Performing phase [service-unbind] with [0] tasks [DEBUG] [07/26/2019 12:36:40.598] [app-server-akka.actor.default-dispatcher-4] [CoordinatedShutdown(akka://app-server)] Performing phase [service-requests-done] with [0] tasks [DEBUG] [07/26/2019 12:36:40.598] [app-server-akka.actor.default-dispatcher-4] [CoordinatedShutdown(akka://app-server)] Performing phase [service-stop] with [0] tasks [DEBUG] [07/26/2019 12:36:40.598] [app-server-akka.actor.default-dispatcher-4] [CoordinatedShutdown(akka://app-server)] Performing phase [before-cluster-shutdown] with [0] tasks [DEBUG] [07/26/2019 12:36:40.598] [app-server-akka.actor.default-dispatcher-4] [CoordinatedShutdown(akka://app-server)] Performing phase [cluster-sharding-shutdown-region] with [0] tasks [DEBUG] [07/26/2019 12:36:40.598] [app-server-akka.actor.default-dispatcher-4] [CoordinatedShutdown(akka://app-server)] Performing phase [cluster-leave] with [0] tasks [DEBUG] [07/26/2019 12:36:40.598] [app-server-akka.actor.default-dispatcher-4] [CoordinatedShutdown(akka://app-server)] Performing phase [cluster-exiting] with [0] tasks [DEBUG] [07/26/2019 12:36:40.598] [app-server-akka.actor.default-dispatcher-4] [CoordinatedShutdown(akka://app-server)] Performing phase [cluster-exiting-done] with [0] tasks [DEBUG] [07/26/2019 12:36:40.598] [app-server-akka.actor.default-dispatcher-4] [CoordinatedShutdown(akka://app-server)] Performing phase [cluster-shutdown] with [0] tasks [DEBUG] [07/26/2019 12:36:40.599] [app-server-akka.actor.default-dispatcher-4] [CoordinatedShutdown(akka://app-server)] Performing phase [before-actor-system-terminate] with [1] tasks: [app-shutdown] [DEBUG] [07/26/2019 12:36:40.604] [app-server-akka.actor.default-dispatcher-5] [akka://app-server/system/IO-TCP/selectors/$a/0] Unbinding endpoint 0:0:0:0:0:0:0:0/0:0:0:0:0:0:0:0:9000 [DEBUG] [07/26/2019 12:36:40.605] [app-server-akka.actor.default-dispatcher-4] [CoordinatedShutdown(akka://app-server)] Performing phase [actor-system-terminate] with [1] tasks: [terminate-system] [DEBUG] [07/26/2019 12:36:40.608] [app-server-akka.actor.default-dispatcher-2] [akka://app-server/system/IO-TCP/selectors/$a/0] Unbound endpoint 0:0:0:0:0:0:0:0/0:0:0:0:0:0:0:0:9000, stopping listener [INFO] [07/26/2019 12:36:40.609] [app-server-akka.actor.default-dispatcher-9] [akka.actor.ActorSystemImpl(app-server)] ちゃんと止まる? [DEBUG] [07/26/2019 12:36:40.611] [app-server-akka.actor.default-dispatcher-3] [EventStream] shutting down: StandardOutLogger Process finished with exit code 143 (interrupted by signal 15: SIGTERM)
良さそうですね。
Play Framework の Akka バックエンドの終了処理を見てみる
実は Play Framework にも、 Coordinated Shutdown について記載されているページがあります。バックエンドが Akka HTTP ですので当たり前といえば当たり前ですね。
このページによれば、 Play Framework は CoordinatedShutdown
を利用はしているが、完全には依存していないとも記載があります。Play Framework には CoordinatedShutdown
の他に従来の ApplicationLifecycle
での終了の方法もあり、移行も可能だそうです。
実際に、ソースコードも確認してみました。 Play Framework で Akka HTTP のサーバを使っているのは、 play.core.server.AkkaHttpServer
クラスです。
// play.core.server.AkkaHttpServer /** * Starts a Play server using Akka HTTP. */ class AkkaHttpServer(context: AkkaHttpServer.Context) extends Server { registerShutdownTasks()
private def registerShutdownTasks(): Unit = { implicit val exCtx: ExecutionContext = context.actorSystem.dispatcher // Register all shutdown tasks val cs = CoordinatedShutdown(context.actorSystem) cs.addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "trace-server-stop-request") { () => mode match { case Mode.Test => case _ => logger.info("Stopping server...") } Future.successful(Done) } // Stop listening. // TODO: this can be improved so unbind is deferred until `service-stop`. We could // respond 503 in the meantime. cs.addTask(CoordinatedShutdown.PhaseServiceUnbind, "akka-http-server-unbind") { () => def unbind(binding: Option[Http.ServerBinding]): Future[Done] = binding.map(_.unbind()).getOrElse(Future.successful(Done)) for { _ <- unbind(httpServerBinding) _ <- unbind(httpsServerBinding) } yield Done } // Call provided hook // Do this last because the hooks were created before the server, // so the server might need them to run until the last moment. cs.addTask(CoordinatedShutdown.PhaseBeforeActorSystemTerminate, "user-provided-server-stop-hook") { () => context.stopHook().map(_ => Done) } cs.addTask(CoordinatedShutdown.PhaseBeforeActorSystemTerminate, "shutdown-logger") { () => Future { super.stop() Done } } }
Play Framework バックエンドの AkkaHttpServer
でも、 ActorSystem
停止前のフェーズCoordinatedShutdown.PhaseBeforeActorSystemTerminate
でユーザー定義の停止時のフックを実行するような実装になっています。
ここまでの確認で、このタイミングで Graceful shutdown の処理を実行すればよい、ということに自信を持つことができました。
CoordinatedShutdown
についての補足
なお、 CoordinatedShutdown
の振る舞い自体は設定である程度変えることができます。 CoordinatedShutdown
自体は Akka HTTP の仕組みというよりは主に akka-cluster のための機能のようなので、設定に関する情報も Akka HTTP のドキュメントではなく Akka Actors 本体のドキュメントに記載があります。
ここの設定で、CoordinatedShutdown
が登録する JVM の ShutdownHook を無効にしたり、 ActorSystem
の終了とともに JVM 自体の終了するデフォルトの振る舞いを止めて、 ActorSystem
だけを停止し JVM
のみ生かしたままにする、といった振る舞いにすることもできます。
実際に私が新規に構築したアプリケーションでは、 Akka HTTP に加えて DI ライブラリとして Airframe DI を利用しました。 Airframe DI の有効な Session もサーバの停止時に停止したかったため、CoordinatedShutdown
の終了処理の外側で Airframe の Session
を停止するために、 独自定義の JVM の ShutdownHook を登録する必要がありました。そこで ActorSystem が登録する ShutdownHook は無効にし、自前で ShutdownHook を登録してサーバ停止するように実装しました。
具体的には以下のような実装になりました。該当箇所だけ抜粋して抜粋します。
def run(host: String, port: Int, config: Config, system: ActorSystem) = { val app = pureconfig.loadConfigOrThrow[AppSettings](config, "app") val design = newDesign .bind[ActorSystem].toInstance(system) .bind[ExecutionContext].toInstance(system.dispatcher) .add(configure(app)) // Disable the airframe shutdown hook, as it will conflict with the akka-http shutdownhook val session = design.newSessionBuilder.noShutdownHook.create // Starting a dependency injection enabled scope by airframe session.start val server = session.build[AppServer].start(host, port, config, settings = ServerSettings(system)) // Manually register a shutdown hook for stopping the server // The reason is that we want to stop backend data storage and DI container after stopping akka-http server. sys.addShutdownHook { server.stop() } server } // server#start(), server#stop() def start(host: String, port: Int, config: Config, settings: ServerSettings): Server = { val bindingFuture = Http().bindAndHandle(handler = routes, interface = host, port = port, settings = settings) val bindTimeout = Duration(config.getString("akka.http.server.bind-timeout")) val serverBinding = Await.result(bindingFuture, bindTimeout) log.info(s"Server online at http://${serverBinding.localAddress.getHostName}:${serverBinding.localAddress.getPort}/") // Add task for graceful shutdown, and stop backend data storage registerShutdownTasks(serverBinding) this } def stop(): Unit = { val shutdownTimeout = CoordinatedShutdown(system).totalTimeout() + Duration(5, TimeUnit.SECONDS) Await.result(CoordinatedShutdown(system).run(JvmExitReason), shutdownTimeout) }
// application.conf akka { jvm-shutdown-hooks = off // CoordinatedShutdown による ShutdownHook の追加を止める coordinated-shutdown { terminate-actor-system = on // ActorSystem は停止する exit-jvm = off // ActorSystem と同時に JVM の停止するのは止める } }
まとめ
Akka HTTP での Graceful shutdown の実装を行うにあたり、調査したことをまとめてみました。Akka HTTP は薄い HTTP Toolkit ライブラリという位置づけもあって何から何までよしなにやってくれるフレームワークではありませんが、その分ソースコードを読めば何をやっているか把握しやすく、ハマった時に対処しやすいというメリットも感じています。
また、 OSS のソースコードを読む場合、公式のドキュメントを読んで概念やキーワードを把握した上で、実際にテストコードやデバッグログを出して挙動を把握し、問題に関連しそうなクラス名に当たりをつけて、起動スクリプトのエントリポイントやそのクラス名で grep したところからコードリーディングをしていく、といったことを私はよくやります。
こういったコードリーディングの方法は、体系的な方法や情報源はあまりないとは思いますが、もし問題にぶつかった時の切り抜け方として参考になれば幸いです。
最後に、お約束ではありますが、弊社では上記のように OSS を使って問題解決が開発がしたいエンジニアや、デザイナー、プロダクトマネージャーなど絶賛大募集中です。少しでもご興味を持っていただけたら、お気軽にカジュアルランチからでも構いませんので、下のバナーや @yukung へ DM 等でご連絡いただければと思います。
最後までお読みいただいてありがとうございました!!
-
Play Framework もバージョン 2.6 系から Akka HTTP をデフォルトのバックエンドとして利用していますし、単純に比較できるものではありませんが↩