美文网首页
scala akka模拟spark master worker通

scala akka模拟spark master worker通

作者: FredricZhu | 来源:发表于2020-09-03 11:46 被阅读0次

pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>akkademo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <akka.version>2.4.16</akka.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.12.8</version>
        </dependency>

        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-actor_2.12</artifactId>
            <version>${akka.version}</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-remote_2.12</artifactId>
            <version>${akka.version}</version>
        </dependency>
    </dependencies>

</project>

Master.scala代码

package com.sensetime.actdemo

import java.util.concurrent.TimeUnit

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

import scala.concurrent.duration.Duration
import scala.collection.mutable.HashSet
import scala.collection.mutable.HashMap

class Master(masterHost: String, masterPort: Int) extends Actor {

  val id2Worker = new HashMap[String, WorkerInfo]()
  val workers = new HashSet[WorkerInfo]()
  // 每隔15秒检查一次WORKER心跳
  val CHECK_INTERVAL = 15000

  // Actor启动时被调用一次
  override def preStart(): Unit = {
    import context.dispatcher

    // 每隔15秒,发一个CheckTimeoutWorker消息给Master,检查一下有没有挂掉的Worker
    context.system.scheduler.schedule(
      Duration(0, TimeUnit.MILLISECONDS),
      Duration(CHECK_INTERVAL, TimeUnit.MILLISECONDS),
      self, CheckTimeoutWorker)
  }

  override def receive: Receive ={
        // 处理Worker注册消息
    case RegisterWorker(id, host, port, memory, cores) => {
      // 说明之前没有加入过缓存
      if(!id2Worker.contains(id)) {
        val workerInfo = new WorkerInfo(id, host, port, memory, cores)
        id2Worker += (id-> workerInfo)
        workers += workerInfo
        println("A worker registered!")
        // 发送消息给Master,注册该 Worker
        sender ! RegisteredWorker(s"akka.tcp://${Master.MASTER_SYSTEM}" +
        s"@$masterHost:$masterPort/user/${Master.MASTER_ACTOR}")
      }
    }

      // Worker 发送心跳包,更新心跳时间
    case HeartBeat(id) => {
      val workerInfo = id2Worker(id)
      val currentTime = System.currentTimeMillis()
      // 更新worker的心跳时间
      workerInfo.lastHeartBeatTime = currentTime
    }
        // 处理CheckTimeOut消息
    case CheckTimeoutWorker => {
      val currentTime = System.currentTimeMillis()
      // 找到所有心跳过期的worker
      val toRemoves = workers.filter(w => currentTime - w.lastHeartBeatTime > CHECK_INTERVAL)
      toRemoves.foreach(w => {
        // 从缓存中移除超时的worker
        id2Worker -= w.id
        workers -= w
      })

      println(s"Num of alive workers : ${workers.size}")
    }
  }
}


object Master {
  val MASTER_SYSTEM = "MasterSystem"
  val MASTER_ACTOR = "Master"

  def main(args: Array[String]): Unit = {
    val host = args(0)
    val port = args(1).toInt

    val configStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
         """.stripMargin
    val config = ConfigFactory.parseString(configStr)
    // 先创建一个ActorSystem,单例
    val actorSystem = ActorSystem(MASTER_SYSTEM, config)
    actorSystem.actorOf(Props(new Master(host, port)), MASTER_ACTOR)
    actorSystem.awaitTermination()
  }
}

Worker.scala代码

package com.sensetime.actdemo

import java.util.UUID
import java.util.concurrent.TimeUnit

import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

import scala.concurrent.duration.Duration

class Worker(val host: String, val port: Int, val masterHost: String, val masterPort: Int,
             memory: Int, cores: Int) extends Actor {

  val workerId = UUID.randomUUID().toString
  var masterUrl: String = _
  val HEART_BEAT_INTERVAL = 10000
  var master: ActorSelection = _

  override def preStart(): Unit = {
    // 连接到master
    master = context.actorSelection(s"akka.tcp://${Master.MASTER_SYSTEM}" +
    s"@$masterHost:$masterPort/user/${Master.MASTER_ACTOR}")
    // 向master发送注册消息
    master ! RegisterWorker(workerId, host, port, memory, cores)
  }

  override def receive: Receive = {
        // 收到注册Worker成功的消息
    case RegisteredWorker(masterUrl) => {
      this.masterUrl = masterUrl
      println(s"注册到: $masterUrl 成功!")
      // 定时向master发送心跳
      import context.dispatcher
      // 每隔10秒调用自己的发送心跳包消息, 向master发送心跳
      context.system.scheduler.schedule(Duration(0, TimeUnit.MILLISECONDS),
        Duration(HEART_BEAT_INTERVAL, TimeUnit.MILLISECONDS),
        self,
        SendHeartBeat)
    }
      // 需要发送心跳包了
    case SendHeartBeat => {
        // 向master发送心跳包
      master ! HeartBeat(workerId)
    }
  }
}


object Worker {
  val WORKER_SYSTEM = "WorkerSystem"
  val WORKER_ACTOR = "Worker"

  def main(args: Array[String]): Unit = {
    val host = args(0)
    val port = args(1).toInt

    val masterHost = args(2)
    val masterPort = args(3).toInt

    val memory = args(4).toInt
    val cores = args(5).toInt

    val configStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
         |""".stripMargin

    val config = ConfigFactory.parseString(configStr)
    val actorSystem = ActorSystem(WORKER_SYSTEM, config)
    actorSystem.actorOf(Props(new Worker(host, port, masterHost, masterPort, memory, cores)), WORKER_ACTOR)
    actorSystem.awaitTermination()
  }
}

RemoteMessages.scala

package com.sensetime.actdemo


// 有些Case class需要在两个进程之间通信,需要集成Serializable
trait RemoteMsg extends Serializable

// Worker --> Master Register Worker
case class RegisterWorker(id: String, host: String,
                          port: Int, memory: Int, cores: Int) extends RemoteMsg


// Master --> Worker Worker registered
case class RegisteredWorker(masterUrl: String) extends RemoteMsg

// Worker --> Master, 发送心跳包
case object SendHeartBeat

// Worker --> Master 更新心跳时间
case class HeartBeat(workerId: String)


// 检查Worker是否过期的样例类
case object CheckTimeoutWorker

WorkerInfo.scala

package com.sensetime.actdemo

class WorkerInfo(val id: String, val host: String, val port: Int,
                 val memory: Int, val cores: Int) {
  // 最后一次心跳时间
  var lastHeartBeatTime: Long = _
}

程序输出
Master输出


图片.png

Worker输出


图片.png

相关文章

网友评论

      本文标题:scala akka模拟spark master worker通

      本文链接:https://www.haomeiwen.com/subject/onmnsktx.html