title: EventBus源码解析-图床版
date: 2020-03-13 20:48:00
tags: [android 工具源码]
typora-root-url: ./EventBus源码解析
typora-copy-images-to: upload
概述
eventBus 的目的就是简化时间的传递.并且是以对象作为数据传递. 还加入了监听者可以选择在哪个线程监听.
其实也是监听者模式.那么就有.发送信息方, 接受信息方,和信息. 同时由于接收方可以选择线程,并且一个接收者可以接收多中消息.发送者也可以发送种消息.就有常驻的后台线程,并且发送的线程都有消息队列.支持不断的添加消息.
简单看下使用过程
public class EventBusAActivity extends AppCompatActivity {
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_event_bus_a);
EventBus.getDefault().register(this); //注册为监听者.
}
@Override
protected void onDestroy() {
super.onDestroy();
EventBus.getDefault().unregister(this); //销毁时取消注册.取消监听.
}
public void onEvent(B b) { //监听B对象的到来,方法名都是onEvent开头.通过参数来确定要监听什么对象
Toast.makeText(this, b.toString(), Toast.LENGTH_SHORT).show();
}
public void postEvent(View view) {//发送消息.
new Publish().postEvent();
}
class B { 被发送的对象
String name;
public B(String name) {
this.name = name;
}
}
public class Publish {
void postEvent() {
发送消息.所有监听 B的监听者.都会收到消息.
EventBus.getDefault().post(new B("eventb"));
}
}
}
事件订阅者
.也就是监听消息发生的.我们要通过EventBus.getDefault().register(this)注册事件订阅者.同时.重写onEvent、onEventMainThread、onEventBackgroundThread和onEventAsync.方法表示要在不同线程进行监听.
eventBus 的事件订阅和 事件分发是分开的. 也就是 订阅者和发布者不知道对方是谁,因此就通过订阅的消息类型.也就是onEvent方法的参数. 来确定 public void onEvent(B b) { } 这样写后, 所有发送B类对象的消息. 这个订阅者都会收到. 因此.eventbus 中有一个 事件类型和订阅者的绑定关系. 一个事件可能有多个订阅者.因此关系如下
Map<Event, ArrayList<subscriber>> subscriberByEventType;
这里的event就是消息类型.这里进行 而subscriber则是订阅者. 同时.一个消息的订阅者的处理方法可能有onEvent、onEventMainThread、onEventBackgroundThread和onEventAsync 这几种.因此需要进行一下抽象.
把订阅者对象.和订阅者的方法.封装成一个类.
Subscription {
final Object subscriber; 订阅者对象
final SubscriberMethod subscriberMethod; 订阅者监听的方法
final int priority; 优先级
}
而订阅消息 则通过他的class类型来标识,因此在代码中是
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
因此通过消息类型找到消息的订阅者.
同时.一个时间订阅者可能订阅多中消息类型事件,
消息重载
假设A extends B. 此时有两个事件订阅者类, SubScriberA订阅了A. SubscriberB订阅了B. 那么当我们post(A)时,SubscribeA和SubscribeB都应该收到消息. 而我们post(b) 时.只有SubscribeB能收到消息.也就是子类型的消息应该可以通知监听他父类型的订阅者.
因此我们要保存一个消息类型和他对应的所有父类型的匹配关系
private static final Map<Class<?>, List<Class<?>>> eventTypesCache = new HashMap<Class<?>, List<Class<?>>>();
key是一个消息的类型. value 是他的所有父类行和所有接口类型组成的集合,当添加一个消息类型时,会迭代他所有的父类和接口来找出全部父类型.
private List<Class<?>> lookupAllEventTypes(Class<?> eventClass) { evevntClass是消息类型
synchronized (eventTypesCache) {
List<Class<?>> eventTypes = eventTypesCache.get(eventClass);
if (eventTypes == null) {
eventTypes = new ArrayList<Class<?>>();
Class<?> clazz = eventClass;
while (clazz != null) {
eventTypes.add(clazz);
addInterfaces(eventTypes, clazz.getInterfaces()); //遍历接口类型
clazz = clazz.getSuperclass(); //遍历父类型
}
eventTypesCache.put(eventClass, eventTypes);
}
return eventTypes;
}
}
/** Recurses through super interfaces. */
static void addInterfaces(List<Class<?>> eventTypes, Class<?>[] interfaces) {
for (Class<?> interfaceClass : interfaces) {
if (!eventTypes.contains(interfaceClass)) {
eventTypes.add(interfaceClass);
addInterfaces(eventTypes, interfaceClass.getInterfaces());
}
}
}
线程切换
因为监听者可以选择在什么线程得到消息.onEvent 表示在事件发送的线程监听、onEventMainThread表示在主线程监听事件、onEventBackgroundThread 表示在后台线程. onEventAsync表示在单独的线程.这里可以看到.是监听者指定线程.发送者需要参考监听者要求.那么肯定是能有个变量保存监听者的请求线程.
final class SubscriberMethod {
final Method method;
final ThreadMode threadMode;
final Class<?> eventType;
}
这是对订阅者方法的抽象 method代表方法. threadMode代表监听者要求在哪个线程接收事件. eventType表示消息的类型,也就是订阅者方法形参的类型
public void onEventMainThred(B b) {}
对于这个方法 SubscriberMethod如下
method = onEventMainThread
threadMode =mainThread
eventType = B.class
main线程消息
HandlerPoster 是main线程 handler.
HandlerPoster mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
他里边还有个链表结构的队列.PendingPostQueue.把消息交给他后.他先把消息入队,然后在handlerMessage中遍历PendingPostQueue队列.执行里边的消息. 就达到了把消息发送到main线程执行.
void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
//消息入队
queue.enqueue(pendingPost);
//到主线程执行
sendMessage(obtainMessage()
}
}
@Override
public void handleMessage(Message msg) {
while (true) {
//循环拿出消息.通过eventBus执行. 那么就执行在住线程了.
PendingPost pendingPost = queue.poll();
eventBus.invokeSubscriber(pendingPost);
}
}
后台线程消息
BackgroundPoster 继承自Runnable. 利用也有一个消息队列PendingPostQueue,消息入栈时,他是通过把消息加入到消息队列.再把自己加入ExecutorService线程池,然后在run里执行消息的处理.就在后台执行了.
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
//消息入队. 把自己加入到线程池中执行
queue.enqueue(pendingPost);
eventBus.getExecutorService().execute(this);
}
}
@Override 线程池中执行eventBus 的处理消息函数.
public void run() {
while (true) {
PendingPost pendingPost = queue.poll(1000);
eventBus.invokeSubscriber(pendingPost);
}
}
}
异步消息
AsyncPoster 也是集成自runnable. 里边也有链表结构的PendingPostQueue消息队列, 他也是通过ExecutorService线程池来执行.不过他是每次加入一个消息就把自己加入到线程池中执行,也就是每次只执行一个消息.
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost); //消息入队. 自己加入线程池
eventBus.getExecutorService().execute(this);
}
@Override
public void run() { 取出一个消息,执行
PendingPost pendingPost = queue.poll();
eventBus.invokeSubscriber(pendingPost);
}
这里我们看到.这三个线程的选择.只设置把消息放到哪里执行.但是最后还都是eventBus.invokeSubscriber(pendingPost);来执行的消息.
消息处理对象池
PendingPost 是 一个对象池. 他封装了要处理的消息类型event和每次的一个订阅者subscription, 因为一次消息可能要把一个event传递给多个subscription,在上边的几个线程中, 加入队列的就是PendingPost ,他链接event和一个subscription. 用完后在把pendingPost回收.
final class PendingPost {
private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();
Object event;
Subscription subscription;
//获取的方法
static PendingPost obtainPendingPost(Subscription subscription, Object event) {
synchronized (pendingPostPool) {
int size = pendingPostPool.size();
if (size > 0) {
//拿到一个pendingpost.用事件和订阅者组装他
PendingPost pendingPost = pendingPostPool.remove(size - 1);
pendingPost.event = event;
pendingPost.subscription = subscription;
pendingPost.next = null;
return pendingPost;
}
}
return new PendingPost(event, subscription);
}
//用完后清空事件和订阅者,然后再把pendingpost给回收,最大支持1万个.pendingpost
static void releasePendingPost(PendingPost pendingPost) {
pendingPost.event = null;
pendingPost.subscription = null;
pendingPost.next = null;
synchronized (pendingPostPool) {
// Don't let the pool grow indefinitely
if (pendingPostPool.size() < 10000) {
pendingPostPool.add(pendingPost);
}
}
}
}
消息最终处理
上边线程哪里看到 通过eventBus.invokeSubscriber(pendingPost);来执行的消息. 来执行事件.其实就是从pendingpost 取出消息event对象. 和一个订阅者.Subscription包装类,上边我们记录了Subscription包装了订阅者对象和订阅的方法.因此通过反射来执行订阅者的方法.
void invokeSubscriber(PendingPost pendingPost) {
Object event = pendingPost.event;
Subscription subscription = pendingPost.subscription;
//回收pendingpost
PendingPost.releasePendingPost(pendingPost);
if (subscription.active) {
invokeSubscriber(subscription, event);
}
}
void invokeSubscriber(Subscription subscription, Object event) {
try {
//订阅者方法通过反射.来执行. method.invoke(class,parameter)
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
发现事件订阅者
每个类通过EventBus.getDefault().register(this);就成了事件订阅者. 然后就可以自行指定要监控的消息类和订阅方法.那么势必要找到订阅者订阅了哪些消息类型,和对应的订阅方法.
这是通过反射实现的.SubscriberMethodFinder 类.遍历所有注册的订阅者的method. 找到onEvent开头的.取出参数,就是消息类型,通过方法名取出 订阅者要求的线程模式.封装成 SubscriberMethod
class SubscriberMethod {
final Method method; 反射的方法
final ThreadMode threadMode; 改方法要求的线程模式
final Class<?> eventType; 消息类型.也就是方法形参的class类
}
看看如何反射订阅者封装成 SubScriberMethod, 这里看到.使用缓存.所以不需要多次遍历一个订阅者.同时还迭代的拿到该订阅者所有父类的订阅事件,
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
String key = subscriberClass.getName();
List<SubscriberMethod> subscriberMethods;//该订阅者的所有定义方法的集合
synchronized (methodCache) {
subscriberMethods = methodCache.get(key); //先查找缓存,通过订阅者类名
}
if (subscriberMethods != null) {
return subscriberMethods;
}
//该订阅者的所有定阅方法
subscriberMethods = new ArrayList<SubscriberMethod>();
Class<?> clazz = subscriberClass;
HashSet<String> eventTypesFound = new HashSet<String>();
StringBuilder methodKeyBuilder = new StringBuilder();
while (clazz != null) {
Method[] methods = clazz.getDeclaredMethods(); //拿到订阅者类的所有方法.遍历
for (Method method : methods) {
String methodName = method.getName();
//注册的订阅方法只有一个参数,就是要订阅的消息
if (parameterTypes.length == 1) { //截取方法名 onEvent之后的字符串,
// onEventMainThrad 就 截取出 MainThread.因此确认是要求正在主线程监听,
//这里就是订阅者通过方法名来觉得在哪个线程监听.
String modifierString = methodName.substring("onEvent".length());
ThreadMode threadMode; //这就是觉得在哪个线程的模式.
if (modifierString.length() == 0) {
threadMode = ThreadMode.PostThread;
} else if (modifierString.equals("MainThread")) {
threadMode = ThreadMode.MainThread;
} else if (modifierString.equals("BackgroundThread")) {
threadMode = ThreadMode.BackgroundThread;
} else if (modifierString.equals("Async")) {
threadMode = ThreadMode.Async;
}
//拼装方法名, 通过方法名确定方法唯一性.
Class<?> eventType = parameterTypes[0];
methodKeyBuilder.setLength(0);
methodKeyBuilder.append(methodName);
methodKeyBuilder.append('>').append(eventType.getName());
String methodKey = methodKeyBuilder.toString();
if (eventTypesFound.add(methodKey)) {
//确保方法子类重写父类方法后,只有子类订阅会被触发
subscriberMethods.add(new SubscriberMethod(method, threadMode, eventType));
}
}
}
//同时遍历了子类和父类,
clazz = clazz.getSuperclass();
}
//加到缓存中,下次就不在查找这个订阅者了.
synchronized (methodCache) {
methodCache.put(key, subscriberMethods);
}
return subscriberMethods;
}
这样.一个订阅方法的抽象subscriberMethod就完成了. 这里的List<SubscriberMethod> subscriberMethods是该订阅者的所有订阅方法集合,我们还要把这个集合和该订阅者绑定起来.
这里注意.一个订阅者类可能有多个订阅方法. 而每个订阅方法又监听不同的 消息类型.并且 同一个消息类型,可能有多个订阅者, 而我们发送消息 是只有指定消息类型. 所以我们要建立起消息类型到订阅者的 一对多映射.
image-20200317111831057
绑定订阅者和订阅消息
看eventBus里最主要的两个方法, 第一个是通过反正找到订阅者的所有订阅方法,抽象为SubscriberMethod.
然后遍历.为没给个订阅者和抽象方法匹配到一个消息类型.
Subscription是一个订阅者对一个消息类型的抽象. 因此我们的一个订阅者,如果订阅了多个消息.就会有多个Subscription,而每个Subscription则绑定了这个对象自己和他的一个订阅方法.
image-20200317112854191
这里主要是通过订阅方发的消息类型,来创建Subcsription.并加入到这个消息类型对应的集合中.
private synchronized void register(Object subscriber, boolean sticky, int priority) {
//subscriber是订阅者. 这里已经通过反射, 拿到该订阅者所有的订阅方法了.组成一个集合
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriber.getClass());
//遍历订阅方法,因为订阅方法里有消息类型,可以加入对应消息类型的检测队列中.
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod, sticky, priority);
}
}
// 注意这是同步方法.subscriptionsByEventType代表消息类型和该消息类型的所有订阅者的一对多映射.这个是全局唯一的.要同步执行.
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod, boolean sticky, int priority) {
//从订阅方法中得到 要检测的消息类型.
Class<?> eventType = subscriberMethod.eventType;
//改消息类型的所有订阅者对象集合
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
//根据订阅者类和订阅者的一个方法.新建的一个订阅者对象.
Subscription newSubscription = new Subscription(subscriber, subscriberMethod, priority);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<Subscription>();
subscriptionsByEventType.put(eventType, subscriptions);
}
//根据优先级.插入到改消息的订阅者对象的集合中,
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || newSubscription.priority > subscriptions.get(i).priority) {
subscriptions.add(i, newSubscription);
break;
}
}
// 找到改订阅者的所有订阅的消息类型. 并添加本次的消息类型.
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<Class<?>>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
if (sticky) { //如果是sticky消息. 就立刻发送消息让订阅者执行
Object stickyEvent;
synchronized (stickyEvents) {
stickyEvent = stickyEvents.get(eventType);
}
if (stickyEvent != null) {
postToSubscription(newSubscription, stickyEvent, Looper.getMainLooper() == Looper.myLooper());
}
}
}
其实主要是量点,通过消息类型.创建Subscription,建立消息和订阅者的联系. 然后把消息类型加入该订阅者对应的消息类型集合中.typesBySubscriber
取消订阅者绑定
在看看取消绑定的逻辑. 就是找到这个订阅者定义的所有消息类型. 然后依次删除消息类型对应的这个订阅者的Subscription
public synchronized void unregister(Object subscriber) {
//这个订阅者的所有消息类型
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
//遍历这个消息类型.和这个订阅者的绑定关系
unubscribeByEventType(subscriber, eventType);
}
typesBySubscriber.remove(subscriber);
}
}
private void unubscribeByEventType(Object subscriber, Class<?> eventType) {
//根据消息类型.遍历订阅者Subscription, 如果和传入的要删除的订阅者一致,就删除掉
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
if (subscription.subscriber == subscriber) {
subscription.active = false;
subscriptions.remove(i);
i--;
size--;
}
}
}
}
发送消息
常规的发送消息就是EventBus.getDefault().post(new C());
然后就通过消息类型 C的class.找到注册的订阅者. 然后在根据订阅者要求的线程,把消息加入到不同线程的队列中,然后线程在执行订阅者的注册方法.
消息加入队列
因为发送消息可以在任何地方.event为每个线程创建PostingThreadState.里边有个eventQuene来保存这个线程要发送的消息.
private final ThreadLocal<EventBus.PostingThreadState> currentPostingThreadState = new ThreadLocal<EventBus.PostingThreadState>() {
@Override
protected EventBus.PostingThreadState initialValue() {
return new EventBus.PostingThreadState();
}
};
currentPostingThreadState 通过 ThreadLocal 保证每个线程只能拿到自己线程的 PostingThreadState,而PostingThreadState里有个eventQueue承载要发送的消息.
消息加入PostingThreadState的队列.遍历发送
public void post(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
//消息加入队列
eventQueue.add(event);
if (!postingState.isPosting) {
postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
try {
/遍历发送消息
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
}
}
}
拿到所有消息类型
拿到该消息的所有类型.包括他的类型和他所有父类的类型,这是为了订阅父类型消息的订阅者,可以收到子类型的消息.也就是 子消息可以通知他父类型消息的订阅者
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
if (eventInheritance) {
找到该消息类型及所有父类 的class
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
//用所有的消息类型.进行消息执行.
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
}
消息及订阅者封装
这里就是通过消息类型,找到所有的消息订阅者,订阅者指定了线程模式,然后遍历,把消息和消息订阅者发送到对应的线程队列来执行.
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
// 通过消息类型找到所有的订阅者,封装在一个PostingState中
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
//封装成消息后.发送到对应线程执行.
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
}
return true;
}
return false;
}
分发消息到不同线程
isMainThread是通过Looper.getMainLooper() == Looper.myLooper()当前looper是否是主线程looper来判断是否是主线程,然后把消息和订阅者发送到不同的线程队列中.这和之前的线程切换就连起来了.
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case PostThread:
invokeSubscriber(subscription, event);
break;
case MainThread:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case BackgroundThread:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case Async:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
eventBus 流程概述
- 初始化eventBus,包括一个后台线程池, 向主线程发消息的handler.异步执行消息的runnable.
- 注册EventBus.getDefault().register(this);
注册后.通过subscriberMethodFinder.findSubscriberMethods找到该类和他的父类所有的onEvent开头的监听方法.绑定方法和消息参数类型,形成SubscriberMethod集合,通过参数名指定接收消息的线程
遍历SubscriberMethod集合,通过消息类型找到 监听改消息的所有 订阅者抽象.Subscription,并把本次的订阅方法和订阅者形成新的Subscription.加入到对应的消息类型集合中, 此时,消息类型和订阅者对象,订阅者方法的绑定关系就完成
找到改订阅者的所有订阅的消息类型.保存在typesBySubscriber .此时订阅者和他订阅 的消息建立绑定关系
- 发送消息.
先把消息加入到线程唯一的PostingThreadState的队列eventQueue中.遍历发送
通过lookupAllEventTypes(eventClass); 找到消息类型的所有类及所有父类,接口类型.遍历这些类型发送消息.
通过 subscriptionsByEventType 找到这些消息类型对应的 订阅者Subscription,遍历进行消息发送,
通过订阅者指定的线程表示.把订阅者和消息加入到指定的线程的消息队列PendingPostQueue中.
如果是主线程.就用handler往主线程发消息.在handlemessage中处理加入的消息.
如果是后台线程,就把线程加入到runnable中.在run中取出消息队列的消息
- 执行消息
最后通过subscription.subscriberMethod.method.invoke(subscription.subscriber, event); 用反射的方式,执行订阅者的订阅方法. 参数是订阅者对象和消息对象.












网友评论