pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>akkademo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<akka.version>2.4.16</akka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.8</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.12</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.12</artifactId>
<version>${akka.version}</version>
</dependency>
</dependencies>
</project>
Master.scala代码
package com.sensetime.actdemo
import java.util.concurrent.TimeUnit
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration.Duration
import scala.collection.mutable.HashSet
import scala.collection.mutable.HashMap
class Master(masterHost: String, masterPort: Int) extends Actor {
val id2Worker = new HashMap[String, WorkerInfo]()
val workers = new HashSet[WorkerInfo]()
// 每隔15秒检查一次WORKER心跳
val CHECK_INTERVAL = 15000
// Actor启动时被调用一次
override def preStart(): Unit = {
import context.dispatcher
// 每隔15秒,发一个CheckTimeoutWorker消息给Master,检查一下有没有挂掉的Worker
context.system.scheduler.schedule(
Duration(0, TimeUnit.MILLISECONDS),
Duration(CHECK_INTERVAL, TimeUnit.MILLISECONDS),
self, CheckTimeoutWorker)
}
override def receive: Receive ={
// 处理Worker注册消息
case RegisterWorker(id, host, port, memory, cores) => {
// 说明之前没有加入过缓存
if(!id2Worker.contains(id)) {
val workerInfo = new WorkerInfo(id, host, port, memory, cores)
id2Worker += (id-> workerInfo)
workers += workerInfo
println("A worker registered!")
// 发送消息给Master,注册该 Worker
sender ! RegisteredWorker(s"akka.tcp://${Master.MASTER_SYSTEM}" +
s"@$masterHost:$masterPort/user/${Master.MASTER_ACTOR}")
}
}
// Worker 发送心跳包,更新心跳时间
case HeartBeat(id) => {
val workerInfo = id2Worker(id)
val currentTime = System.currentTimeMillis()
// 更新worker的心跳时间
workerInfo.lastHeartBeatTime = currentTime
}
// 处理CheckTimeOut消息
case CheckTimeoutWorker => {
val currentTime = System.currentTimeMillis()
// 找到所有心跳过期的worker
val toRemoves = workers.filter(w => currentTime - w.lastHeartBeatTime > CHECK_INTERVAL)
toRemoves.foreach(w => {
// 从缓存中移除超时的worker
id2Worker -= w.id
workers -= w
})
println(s"Num of alive workers : ${workers.size}")
}
}
}
object Master {
val MASTER_SYSTEM = "MasterSystem"
val MASTER_ACTOR = "Master"
def main(args: Array[String]): Unit = {
val host = args(0)
val port = args(1).toInt
val configStr =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
""".stripMargin
val config = ConfigFactory.parseString(configStr)
// 先创建一个ActorSystem,单例
val actorSystem = ActorSystem(MASTER_SYSTEM, config)
actorSystem.actorOf(Props(new Master(host, port)), MASTER_ACTOR)
actorSystem.awaitTermination()
}
}
Worker.scala代码
package com.sensetime.actdemo
import java.util.UUID
import java.util.concurrent.TimeUnit
import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration.Duration
class Worker(val host: String, val port: Int, val masterHost: String, val masterPort: Int,
memory: Int, cores: Int) extends Actor {
val workerId = UUID.randomUUID().toString
var masterUrl: String = _
val HEART_BEAT_INTERVAL = 10000
var master: ActorSelection = _
override def preStart(): Unit = {
// 连接到master
master = context.actorSelection(s"akka.tcp://${Master.MASTER_SYSTEM}" +
s"@$masterHost:$masterPort/user/${Master.MASTER_ACTOR}")
// 向master发送注册消息
master ! RegisterWorker(workerId, host, port, memory, cores)
}
override def receive: Receive = {
// 收到注册Worker成功的消息
case RegisteredWorker(masterUrl) => {
this.masterUrl = masterUrl
println(s"注册到: $masterUrl 成功!")
// 定时向master发送心跳
import context.dispatcher
// 每隔10秒调用自己的发送心跳包消息, 向master发送心跳
context.system.scheduler.schedule(Duration(0, TimeUnit.MILLISECONDS),
Duration(HEART_BEAT_INTERVAL, TimeUnit.MILLISECONDS),
self,
SendHeartBeat)
}
// 需要发送心跳包了
case SendHeartBeat => {
// 向master发送心跳包
master ! HeartBeat(workerId)
}
}
}
object Worker {
val WORKER_SYSTEM = "WorkerSystem"
val WORKER_ACTOR = "Worker"
def main(args: Array[String]): Unit = {
val host = args(0)
val port = args(1).toInt
val masterHost = args(2)
val masterPort = args(3).toInt
val memory = args(4).toInt
val cores = args(5).toInt
val configStr =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
|""".stripMargin
val config = ConfigFactory.parseString(configStr)
val actorSystem = ActorSystem(WORKER_SYSTEM, config)
actorSystem.actorOf(Props(new Worker(host, port, masterHost, masterPort, memory, cores)), WORKER_ACTOR)
actorSystem.awaitTermination()
}
}
RemoteMessages.scala
package com.sensetime.actdemo
// 有些Case class需要在两个进程之间通信,需要集成Serializable
trait RemoteMsg extends Serializable
// Worker --> Master Register Worker
case class RegisterWorker(id: String, host: String,
port: Int, memory: Int, cores: Int) extends RemoteMsg
// Master --> Worker Worker registered
case class RegisteredWorker(masterUrl: String) extends RemoteMsg
// Worker --> Master, 发送心跳包
case object SendHeartBeat
// Worker --> Master 更新心跳时间
case class HeartBeat(workerId: String)
// 检查Worker是否过期的样例类
case object CheckTimeoutWorker
WorkerInfo.scala
package com.sensetime.actdemo
class WorkerInfo(val id: String, val host: String, val port: Int,
val memory: Int, val cores: Int) {
// 最后一次心跳时间
var lastHeartBeatTime: Long = _
}
程序输出
Master输出

Worker输出

网友评论