import MeCab
import sys
import re
from collections import Counter
withopen("podcast_all.txt") as f: #文字起こししたpodcastデータの読み込み
podcast = f.read()
wakati = MeCab.Tagger("-d /usr/lib/x86_64-linux-gnu/mecab/dic/mecab-ipadic-neologd") #新語辞書を適用
parse = wakati.parse(podcast)
lines = parse.split("\n")
items = (re.split("[\t,]", line) for line in lines)
#「EOS」と「空文字」と「ー」以外
words = [item[0] for item in items if (item[0] notin ("EOS", "", "ー"))]
## 標準出力に出力 ##########################
counter = Counter(words)
for word, count in counter.most_common():
print(f"{word}: {count}")
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import scala.io.StdIn
object WebServer {
def main(args: Array[String]) {
implicit val system = ActorSystem("my-system")
implicit val materializer = ActorMaterializer()
// needed for the future flatMap/onComplete in the end
implicit val executionContext = system.dispatcher
val route =
path("hello") {
get {
complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, "<h1>Say hello to akka-http</h1>"))
}
}
val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)
println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
StdIn.readLine() // let it run until user presses return
bindingFuture
.flatMap(_.unbind()) // trigger unbinding from the port
.onComplete(_ => system.terminate()) // and shutdown when done
}
}
これだけで、 /hello エンドポイントからレスポンスが返ってくるサーバが立ち上がります。また main メソッドを持つ単なる Scala プログラムですので、ソースコードを追う際もエントリポイントが明確なため調査がしやすいところも気に入っています。
override def createExtension(system: ExtendedActorSystem): CoordinatedShutdown = {
val conf = system.settings.config.getConfig("akka.coordinated-shutdown")
val phases = phasesFromConfig(conf)
val coord = new CoordinatedShutdown(system, phases)
initPhaseActorSystemTerminate(system, conf, coord)
initJvmHook(system, conf, coord)
// Avoid leaking actor system references when system is terminated before JVM is #23384// Catching RejectedExecutionException in case extension is accessed first time when// system is already terminated, see #25592. The extension is eagerly loaded when ActorSystem// is started but it might be a race between (failing?) startup and shutdown. def cleanupActorSystemJvmHook(): Unit = {
coord.actorSystemJvmHook match {
case OptionVal.Some(cancellable) if !runningJvmHook && !cancellable.isCancelled =>
cancellable.cancel()
coord.actorSystemJvmHook = OptionVal.None
case _ =>
}
}
try system.registerOnTermination(cleanupActorSystemJvmHook())
catch {
case _: RejectedExecutionException => cleanupActorSystemJvmHook()
}
coord
}
private def initPhaseActorSystemTerminate(
system: ExtendedActorSystem,
conf: Config,
coord: CoordinatedShutdown): Unit = {
coord.addTask(PhaseActorSystemTerminate, "terminate-system") { () =>
val confForReason = confWithOverrides(conf, coord.shutdownReason())
val terminateActorSystem = confForReason.getBoolean("terminate-actor-system")
val exitJvm = confForReason.getBoolean("exit-jvm")
val exitCode = confForReason.getInt("exit-code")
if (exitJvm && terminateActorSystem) {
// In case ActorSystem shutdown takes longer than the phase timeout,// exit the JVM forcefully anyway.// We must spawn a separate thread to not block current thread,// since that would have blocked the shutdown of the ActorSystem.val timeout = coord.timeout(PhaseActorSystemTerminate)
val t = new Thread {
override def run(): Unit = {
if (Try(Await.ready(system.whenTerminated, timeout)).isFailure && !runningJvmHook)
System.exit(exitCode)
}
}
t.setName("CoordinatedShutdown-exit")
t.start()
}
if (terminateActorSystem) {
system.finalTerminate()
system.whenTerminated.map { _ =>
if (exitJvm && !runningJvmHook) System.exit(exitCode)
Done
}(ExecutionContexts.sameThreadExecutionContext)
} elseif (exitJvm) {
System.exit(exitCode)
Future.successful(Done)
} else
Future.successful(Done)
}
}
// play.core.server.AkkaHttpServer/** * Starts a Play server using Akka HTTP. */class AkkaHttpServer(context: AkkaHttpServer.Context) extends Server {
registerShutdownTasks()
private def registerShutdownTasks(): Unit = {
implicit val exCtx: ExecutionContext = context.actorSystem.dispatcher
// Register all shutdown tasksval cs = CoordinatedShutdown(context.actorSystem)
cs.addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "trace-server-stop-request") { () =>
mode match {
case Mode.Test =>
case _ => logger.info("Stopping server...")
}
Future.successful(Done)
}
// Stop listening.// TODO: this can be improved so unbind is deferred until `service-stop`. We could// respond 503 in the meantime.
cs.addTask(CoordinatedShutdown.PhaseServiceUnbind, "akka-http-server-unbind") { () =>
def unbind(binding: Option[Http.ServerBinding]): Future[Done] =
binding.map(_.unbind()).getOrElse(Future.successful(Done))
for {
_ <- unbind(httpServerBinding)
_ <- unbind(httpsServerBinding)
} yield Done
}
// Call provided hook// Do this last because the hooks were created before the server,// so the server might need them to run until the last moment.
cs.addTask(CoordinatedShutdown.PhaseBeforeActorSystemTerminate, "user-provided-server-stop-hook") { () =>
context.stopHook().map(_ => Done)
}
cs.addTask(CoordinatedShutdown.PhaseBeforeActorSystemTerminate, "shutdown-logger") { () =>
Future {
super.stop()
Done
}
}
}
Play Framework バックエンドの AkkaHttpServer でも、 ActorSystem 停止前のフェーズCoordinatedShutdown.PhaseBeforeActorSystemTerminate でユーザー定義の停止時のフックを実行するような実装になっています。
実際に私が新規に構築したアプリケーションでは、 Akka HTTP に加えて DI ライブラリとして Airframe DI を利用しました。 Airframe DI の有効な Session もサーバの停止時に停止したかったため、CoordinatedShutdownの終了処理の外側で Airframe の Session を停止するために、 独自定義の JVM の ShutdownHook を登録する必要がありました。そこで ActorSystem が登録する ShutdownHook は無効にし、自前で ShutdownHook を登録してサーバ停止するように実装しました。
具体的には以下のような実装になりました。該当箇所だけ抜粋して抜粋します。
def run(host: String, port: Int, config: Config, system: ActorSystem) = {
val app = pureconfig.loadConfigOrThrow[AppSettings](config, "app")
val design = newDesign
.bind[ActorSystem].toInstance(system)
.bind[ExecutionContext].toInstance(system.dispatcher)
.add(configure(app))
// Disable the airframe shutdown hook, as it will conflict with the akka-http shutdownhookval session = design.newSessionBuilder.noShutdownHook.create
// Starting a dependency injection enabled scope by airframe
session.start
val server = session.build[AppServer].start(host, port, config, settings = ServerSettings(system))
// Manually register a shutdown hook for stopping the server// The reason is that we want to stop backend data storage and DI container after stopping akka-http server.
sys.addShutdownHook {
server.stop()
}
server
}
// server#start(), server#stop() def start(host: String, port: Int, config: Config, settings: ServerSettings): Server = {
val bindingFuture = Http().bindAndHandle(handler = routes, interface = host, port = port, settings = settings)
val bindTimeout = Duration(config.getString("akka.http.server.bind-timeout"))
val serverBinding = Await.result(bindingFuture, bindTimeout)
log.info(s"Server online at http://${serverBinding.localAddress.getHostName}:${serverBinding.localAddress.getPort}/")
// Add task for graceful shutdown, and stop backend data storage
registerShutdownTasks(serverBinding)
this
}
def stop(): Unit = {
val shutdownTimeout = CoordinatedShutdown(system).totalTimeout() + Duration(5, TimeUnit.SECONDS)
Await.result(CoordinatedShutdown(system).run(JvmExitReason), shutdownTimeout)
}
// application.conf
akka {
jvm-shutdown-hooks = off // CoordinatedShutdown による ShutdownHook の追加を止める
coordinated-shutdown {
terminate-actor-system = on // ActorSystem は停止する
exit-jvm = off // ActorSystem と同時に JVM の停止するのは止める
}
}