EventBus Post & UnRegister
Post
EventBus是通过Post()和postSticky()来发送事件的,先来看post()方法
public void postSticky(Object event) {
synchronized (stickyEvents) {
stickyEvents.put(event.getClass(), event);
}
post(event);
}
public void post(Object event) {
//得到当前线程的PostingThreadState,其中currentPostingThreadState是一个ThreadLocal对象
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);//塞入eventQueue
//如果postingState没有处于分发事件Event的流程中,就会启动事件Event的分发
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();//是否是主线程
postingState.isPosting = true;//状态改为正在分发
if (postingState.canceled) {//是否取消分发
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {//一直分发事件,知道eventQueue为空
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
//推出while循环,说明没有事件Event还需要分发,所以状态复位
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
postSticky会把当前event存入stickyEvents里面,stickyEvents以event的Class对象为key,stickyEvents只保存当前Class对象最新的event值,然后接着就执行Post();下面只分析post方法了。
- 得到当前线程的PostingThreadState对象,currentPostingThreadState是一个ThreadLocal对象。
- 把event塞进postingThreadState对象
- 如果当前线程的postingThreadState没有处于分发事件的状态,那么就启动分发事件的流程。直到eventQueue为空
这里有个新的类PostingThreadState,比较简单,就不分析了。继续看postSingleEvent()
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();//得到event的Class对象
boolean subscriptionFound = false;//是否找到订阅者Subscriber来响应event
if (eventInheritance) {//如果响应继承关系的话
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);//会找到当前event的所有父类和接口的Class类型。简单理解一下,响应继承关系的话,如果发送当前event的类型,那么也相当于发送了当前event的所有父类的Class的对象,订阅了Event父类的订阅者就也会得到响应
int countTypes = eventTypes.size();
//分发当前event的所有父类型和当前event的类型
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
//如果不响应继承关系的话,那就只发送当前event的Class
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
//如果没有订阅者订阅event,可以打log和发送没有订阅者事件NoSubscriberEvent
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
- 得到event的Class对象
- 如果需要响应继承关系的话,发送了当前类型的event,即发送当前event的所有父类型,所以就需要找到当前的event的所有父类和当前类的列表,遍历类型列表,分别发送
- 如果不响应继承关系的话,就直接发送当前event
- 如果没有订阅方法响应event,可以选择打log,或者发送NoSubscriberEvent
下面再继续往下追踪,继续看postSingleEventForEventType()
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
//根据subscriptionsByEventType得到订阅当前event类型的所有订阅者对象subscriptions
subscriptions = subscriptionsByEventType.get(eventClass);
}
//如果有响应当前event的订阅者列表,那么就把当前事件分别发送给每个订阅者
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
//发送事件给订阅者
postToSubscription(subscription, event, postingState.isMainThread);
//是否调用了cancelEventDelivery方法,如果调用了,那么当前event就不再分发
aborted = postingState.canceled;
} finally {
//对postingState复位
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
//如果不需要继续分发当前event,那么就跳出循环
if (aborted) {
break;
}
}
return true;
}
return false;
}
public void cancelEventDelivery(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
if (!postingState.isPosting) {
throw new EventBusException(
"This method may only be called from inside event handling methods on the posting thread");
} else if (event == null) {
throw new EventBusException("Event may not be null");
} else if (postingState.event != event) {
throw new EventBusException("Only the currently handled event may be aborted");
} else if (postingState.subscription.subscriberMethod.threadMode != ThreadMode.POSTING) {
throw new EventBusException(" event handlers may only abort the incoming event");
}
postingState.canceled = true;
}
上面的方法其实也响应了cancelEventDelivery()方法,如果在事件分发的过程中调用了cancelEventDelivery(),那么就表示当前事件不再分发给后面的订阅者,跳出遍历订阅者的操作。
cancelEventDelivery()比较简单,主要是在响应事件的方法中调用,将取消事件的进一步传递。后面的订阅者将不会收到该事件。通常优先级较高的订阅者会掉用此方法取消事件分发给低优先级的响应者。取消事件的分发仅限在ThreadMode.POSTING的事件响应方法中,不然取消不掉的.
在postSingleEventForEventType()中梳理了取消分发的逻辑,再继续看分发的逻辑,再追踪postToSubscription()
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
void invokeSubscriber(Subscription subscription, Object event) {
try {
//利用反射,调用订阅者subscription.subscriber的订阅方法subscription.subscriberMethod.method,并且传入event
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
- 如果是POSTING,就在当前线程直接调用
- 如果是MAIN,如果当前线程是主线程,就直接调用。如果当前线程不是主线程,就交给mainThreadPoster.enqueue()
- 如果是MAIN_ORDERED,如果mainThreadPoster不为空,直接交给mainThreadPoster.enqueue();否者直接在当前线程调用
- 如果是BACKGROUND,如果不是主线程,直接调用;否者交给backgroundPoster.enqueue()
- 如果是ASYNC,直接交给asyncPoster.enqueue()
invokeSubscriber()方法是利用反射,调用订阅者subscription.subscriber的订阅方法subscription.subscriberMethod.method并且传入event。
到这里事件的分大概的流程基本结束了,最终调用invokeSubscriber()。还有线程的调度操作没分析。下面再继续分析线程的调度
先介绍PendingPost
final class PendingPost {
private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();//PendingPost 的池子
Object event;//发送的事件event
Subscription subscription;//订阅者的封装subscription
PendingPost next;//用于PendingPostQueue,队列中的下一个
private PendingPost(Object event, Subscription subscription) {
this.event = event;
this.subscription = subscription;
}
//如果池子pendingPostPool里面有PendingPost,就从池子里面取,没有的话就新建PendingPost
static PendingPost obtainPendingPost(Subscription subscription, Object event) {
synchronized (pendingPostPool) {
int size = pendingPostPool.size();
if (size > 0) {
PendingPost pendingPost = pendingPostPool.remove(size - 1);
pendingPost.event = event;
pendingPost.subscription = subscription;
pendingPost.next = null;
return pendingPost;
}
}
return new PendingPost(event, subscription);
}
//把pendingPost放入池子pendingPostPool
static void releasePendingPost(PendingPost pendingPost) {
pendingPost.event = null;
pendingPost.subscription = null;
pendingPost.next = null;
synchronized (pendingPostPool) {
// Don't let the pool grow indefinitely
//池子最大10000个pendingPost
if (pendingPostPool.size() < 10000) {
pendingPostPool.add(pendingPost);
}
}
}
}
PendingPost类比较简单,里面有两个静态方法,一个是取包括event和subscription的pendingPost,一个是释放pendingPost,放入池子pendingPostPool中。
PendingPostQueue 是获取PendingPost的队列,提供了阻塞方法,每个方法都加了synchronized锁,保证线程安全,比较简单不再分析。
Poster是个接口,主要有AsyncPoster,BackgroundPoster和HandlerPoster三个实现类
先看下HandlerPoster
public class HandlerPoster extends Handler implements Poster {
private final PendingPostQueue queue;//PendingPost 队列
private final int maxMillisInsideHandleMessage;//默认值10
private final EventBus eventBus;
private boolean handlerActive;//是否正在处理事件
protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
//如果没有正在处理event,就发送一个message,并且handlerActive设为true
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
//循环取队列里面的PendingPost
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;//队列为空的话,就跳出循环,置为false
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
//如果event处理事件大于等于10ms的话,暂时退出while循环,重新发送message
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
//如果是大于10ms退出的话,handlerActive会置为true,上面enqueue就不用再发对于的message
handlerActive = rescheduled;
}
}
}
- handlerActive表示是否正在处理eventQueue里面的事件,调用enqueue()方法时,先把对应的pendinPost存入PendingPostQueue。如果handlerActive为false,就发送消息,调用主线程来处理eventQueue。如果handlerActive为true,表示当前正在处理eventQueue,就不要再管
在handleMessage()方法里
- 先记录进入当前的方法时间,rescheduled初始化为false
- 进入whlie循环,eventQueue里面取出pendingPost,如果pendingPost为null,那么handlerActive设为false,当前所有事件已经处理完成
- 如果处理事件分发的事件已经大于10ms,此时由于主线程一直在处理事件分发,那么主线程的其他任务可能得不到执行,所以需要暂停处理事件分发,重新发送message,退出while循环,rescheduled设为true
- 在finally代码块内,如果是因为事件分发处理的时间大于10ms的话,因为在while循环中已经发送了开启下一轮事件分发处理的消息message,所以handlerActive还是为true,防止在enqueue()方法中再次发送message
BackgroundPoster和AsyncPoster就不再分析了,类似。BackgroundPoster只会开启一个线程,事件分发都在这个线程内;AsyncPoster是每个event都会开启一个线程
UnRegister
解除注册方法比较简单
public synchronized void unregister(Object subscriber) {
//取出当前订阅者的所有event列表,遍历event列表,分别移除当前订阅者
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
//从响应当前eventType的订阅者列表里,移除当前订阅者
unsubscribeByEventType(subscriber, eventType);
}
//从typesBySubscriber移除当前subscriber
typesBySubscriber.remove(subscriber);
} else {
logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);//获取订阅当前eventType的所有订阅者
if (subscriptions != null) {
int size = subscriptions.size();
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
//移除当前订阅者subscriber
if (subscription.subscriber == subscriber) {
subscription.active = false;
subscriptions.remove(i);
i--;
size--;
}
}
}
}
unregister()
- 首先从typesBySubscriber中获取当前订阅者响应的所有事件类型列表subscribedTypes
- 遍历subscribedTypes,分别对每一个eventType移除当前订阅者
- 从typesBySubscriber移除当前subscriber
unsubscribeByEventType()
- 获取当前eventType的所有订阅者subscriptions
- 遍历订阅者列表,移除当前订阅者
好了,EventBus分析完成









网友评论