注意:Flink内部节点之间的通信是用Akka,比如JobManager和TaskManager之间的通信。而operator之间的数据传输是利用NettyFlink uses Akka for RPC between components (JobManager/TaskManager/ResourceManager). Flink does not use Akka for data transport.
- 发送端
RpcGateway
RpcEndpoint(绑定一个rpcserver)
protected RpcEndpoint(final RpcService rpcService, final String endpointId) {
this.rpcService = checkNotNull(rpcService, "rpcService");
this.endpointId = checkNotNull(endpointId, "endpointId");
this.rpcServer = rpcService.startServer(this);
The main thread executor to be used to execute future callbacks in the main thread
of the executing rpc server
this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);
}
注意:The main thread executor to be used to execute future callbacks in the main thread of the executing rpc server
对于同一个flink的 rpcEndpoint(actor) 调用都是在同一个主线里串行执行,因此不会有并发问题
RpcServer
AkkaRpcService implements RpcService
AkkaRpcService #startServer 启动了akka actor
if (rpcEndpoint instanceof FencedRpcEndpoint) {
akkaRpcActorProps = Props.create(
FencedAkkaRpcActor.class,
rpcEndpoint,
terminationFuture,
getVersion(),
configuration.getMaximumFramesize());
} else {
akkaRpcActorProps = Props.create(
AkkaRpcActor.class,
rpcEndpoint,
terminationFuture,
getVersion(),
configuration.getMaximumFramesize());
}
ActorRef actorRef;
synchronized (lock) {
checkState(!stopped, "RpcService is stopped");
注意:创建akka actor,利用父actor上下文创建子actor
actorRef = actorSystem.actorOf(akkaRpcActorProps, rpcEndpoint.getEndpointId());
actors.put(actorRef, rpcEndpoint);
}
创建FencedAkkaRpcActor(需要验证rpc tocken,The rpc is then only executed if the attached fencing token equals the endpoint's own token)或AkkaRpcActor(Akka rpc actor which receivesAkka rpc actor which receives,不需要验证)
AkkaInvocationHandler implements InvocationHandler反射实现类
AkkaInvocationHandler # invoke(),真正执行rpc远程调用逻辑,有
1.Patterns.ask(rpcEndpoint, message, timeout.toMilliseconds()))异步调用方式有返回值
2.rpcEndpoint.tell(message, ActorRef.noSender())无返回值调用
- 接收端
AkkaRpcActor extends AbstractActorAbstractActor 是akka中actor的抽象实现类
实现了createReceive方法
@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.match(RemoteHandshakeMessage.class, this::handleHandshakeMessage) 握手消息
.match(ControlMessages.class, this::handleControlMessage) 控制起停消息
.matchAny(this::handleMessage)
.build();
}
private void handleMessage(final Object message) {
if (state.isRunning()) {
mainThreadValidator.enterMainThread();
try {
handleRpcMessage(message);
} finally {
mainThreadValidator.exitMainThread();
}
} else {
log.info("The rpc endpoint {} has not been started yet. Discarding message {} until processing is started.",
rpcEndpoint.getClass().getName(),
message.getClass().getName());
sendErrorIfSender(new AkkaRpcException(
String.format("Discard message, because the rpc endpoint %s has not been started yet.", rpcEndpoint.getAddress())));
}
}
1.handleMessage真正消息处理端
handleRpcInvocation((RpcInvocation) message);->
result = rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
同样执行的是动态代理调用方式
2.mainThreadValidator.enterMainThread(),这里会获取RpcEndpoint,的主线程并且进入,使用主线程进行消息处理
总结:
akka如何解决多线程问题并发,数据一致性问题?
- 数据共享一致性,锁的问题
我们一开始说过并发导致最大的问题就是对共享数据的操作,我们在面对并发问题时多采用的是用锁去保证共享数据的一致性,但这同样也会带来其他相关问题,比如要去考虑锁的粒度(对方法,程序块等),锁的形式(读锁,写锁等)等问题 - Actor就不导致这些问题,首先Actor的消息特性(不可变)就决定了在与Actor通信上不会有共享数据的困扰,另外在Actor内部是串行处理消息的,就可以保证Actor内部状态数据的一致性,实现了jvm多线程数据一致性
- 一个Actor它可能会有很多线程同时向它发送消息,之前我们也说到Actor本身是串行处理的消息的,那它是如何保障这种机制的呢?
Mailbox.scala 内部维护了一个messageQueue这样的消息队列,消息队列保证了消息执行的异步性,
processMailbox 方法采用递归的方式逐条取消息并处理。
actor接受消息支持并发,处理消息是单个线程执行的,所以保证了actor内部状态的一致性
递归调用处理mailbox消息
/**
* Process the messages in the mailbox
*/
@tailrec private final def processMailbox(
left: Int = java.lang.Math.max(dispatcher.throughput, 1),
deadlineNs: Long =
if (dispatcher.isThroughputDeadlineTimeDefined)
System.nanoTime + dispatcher.throughputDeadlineTime.toNanos
else 0L): Unit =
if (shouldProcessMessage) {
val next = dequeue()
if (next ne null) {
if (Mailbox.debug) println(actor.self + " processing message " + next)
actor.invoke(next)
if (Thread.interrupted())
throw new InterruptedException("Interrupted while processing actor messages")
processAllSystemMessages()
if ((left > 1) && (!dispatcher.isThroughputDeadlineTimeDefined || (System.nanoTime - deadlineNs) < 0))
processMailbox(left - 1, deadlineNs)
}
}
是否有线程正在调度执行该MailBox的任务,若没有则去更改状态为以调度,直到被其他线程抢占或者更改成功
@tailrec
final def setAsScheduled(): Boolean = {
val s = currentStatus
/*
* Only try to add Scheduled bit if pure Open/Suspended, not Closed or with
* Scheduled bit already set.
*/
if ((s & shouldScheduleMask) != Open) false
else updateStatus(s, s | Scheduled) || setAsScheduled()
}
示例代码
package org.apache.flink.runtime.rpc;
import akka.actor.ActorSystem;
import akka.actor.Terminated;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
/**
* @author shao.hongxiao
*/
public class RpcTest {
private static final Time TIMEOUT = Time.seconds(10L);
private static ActorSystem actorSystem = null;
private static RpcService rpcService = null;
// 定义通信协议
public interface HelloGateway extends RpcGateway {
String hello();
}
public interface HiGateway extends RpcGateway {
String hi();
}
// 具体实现
public static class HelloRpcEndpoint extends RpcEndpoint implements HelloGateway {
protected HelloRpcEndpoint(RpcService rpcService) {
super(rpcService);
}
@Override
public String hello() {
return "hello";
}
}
public static class HiRpcEndpoint extends RpcEndpoint implements HiGateway {
protected HiRpcEndpoint(RpcService rpcService) {
super(rpcService);
}
@Override
public String hi() {
return "hi";
}
}
@BeforeClass
public static void setup() {
actorSystem = AkkaUtils.createDefaultActorSystem();
// 创建 RpcService, 基于 AKKA 的实现
rpcService = new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
}
@AfterClass
public static void teardown() throws Exception {
final CompletableFuture<Void> rpcTerminationFuture = rpcService.stopService();
final CompletableFuture<Terminated> actorSystemTerminationFuture = FutureUtils.toJava(actorSystem.terminate());
FutureUtils
.waitForAll(Arrays.asList(rpcTerminationFuture, actorSystemTerminationFuture))
.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
}
@Test
public void test() throws Exception {
HelloRpcEndpoint helloEndpoint = new HelloRpcEndpoint(rpcService);
HiRpcEndpoint hiEndpoint = new HiRpcEndpoint(rpcService);
helloEndpoint.start();
//获取 endpoint 的 self gateway
HelloGateway helloGateway = helloEndpoint.getSelfGateway(HelloGateway.class);
String hello = helloGateway.hello();
System.out.println(hello);
hiEndpoint.start();
// 通过 endpoint 的地址获得代理
HiGateway hiGateway = rpcService.connect(hiEndpoint.getAddress(),HiGateway.class).get();
String hi = hiGateway.hi();
System.out.println(hi);
}
}
import akka.actor.{Actor, ActorSystem, Props}
import akka.event.Logging
import akka.pattern.Patterns
import scala.util.{Failure, Success}
trait Action{
val message: String
val time: Int
}
case class TurnOnLight(time: Int) extends Action { // 开灯消息
val message = "Turn on the living room light"
}
case class BoilWater(time: Int) extends Action { // 烧水消息
val message = "Burn a pot of water"
}
class RobotActor extends Actor {
val log = Logging(context.system, this)
def receive: Receive = { //机器人接受指令
case t: TurnOnLight => log.info(s"${t.message} after ${t.time} hour")
case b: BoilWater => log.info(s"${b.message} after ${b.time} hour")
case s:String => log.info(s"I can not handle this message: ${s}")
}
}
/**
* https://scala.cool/tags/Akka/
* https://cloud.tencent.com/developer/article/1460210
*
* 对并发模型进行了更高的抽象
* 异步、非阻塞、高性能的事件驱动编程模型
* 轻量级事件处理(1GB内存可容纳百万级别个Actor)
*JVM中的Actor有以下几个特点:
* 每个Actor都有对应一个邮箱
* Actor是串行处理消息的
* Actor中的消息是不可变的
*
* akka解决多线程问题
* 1. 数据共享,锁的问题
* 我们一开始说过并发导致最大的问题就是对共享数据的操作,我们在面对并发问题时多采用的是用锁去保证共享数据的一致性,
* 但这同样也会带来其他相关问题,比如要去考虑锁的粒度(对方法,程序块等),锁的形式(读锁,写锁等)等问题,
* 这些问题对并发程序来说是至关重要的,但一个初写并发程序的程序员来说,往往不能掌控的很好,这无疑给程序员在编程上提高了复杂性,
* 而且还不容易掌控,但使用Actor就不导致这些问题,首先Actor的消息特性就觉得了在与Actor通信上不会有共享数据的困扰,
* 另外在Actor内部是串行处理消息的,同样不会对Actor内的数据造成污染,用Actor编写并发程序无疑大大降低了编码的复杂度。
*/
object Demo {
def main(args: Array[String]): Unit = {
val actorSyatem = ActorSystem("flink")
val robotActor = actorSyatem.actorOf(Props(new RobotActor()), "robot") //创建一个机器人
// tel 方式
robotActor ! TurnOnLight(1) //给机器人发送一个开灯命令
robotActor ! BoilWater(2) //给机器人发送一个烧水命令
robotActor ! "who are you" //给机器人发送一个任意命令
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration
val t = Duration.create(1, TimeUnit.SECONDS)
//使用ask发送消息,actor处理完,必须有返回(超时时间5秒),异步处理
val res = Patterns.ask(robotActor, "Hello,world", t)
import scala.concurrent.ExecutionContext.Implicits.global
res onComplete {
case Success(result) => println(result)
case Failure(e) => println("error: " + e.getMessage)
}
actorSyatem terminate ()
}
}
参考
https://cloud.tencent.com/developer/article/1460210
https://likehui.top/2019/09/05/akka-%E6%A0%B8%E5%BF%83%E7%9F%A5%E8%AF%86%E6%A2%B3%E7%90%86/
https://scala.cool/tags/Akka/











网友评论