1,什么是RxBus
rxbus不是什么框架,它只是一个通过rxjava实现eventbus的类
在android中使用时,它还还可以引用AndroidLifecycle来解决内存溢出问题
它是观察者模式的一种应用,方便了我们在不同页面与不同线程间的通信
2,代码
RxBus的代码实现
public class RxBus {
private volatile static RxBus mDefaultInstance;
//事件总线
private final Subject<Object> mBus;
//粘性事件存储
private final Map<Class<?>, Object> mStickyEventMap;
private RxBus() {
mBus = PublishSubject.create().toSerialized();
mStickyEventMap = new ConcurrentHashMap<>();
}
public static RxBus getInstance() {
if (mDefaultInstance == null) {
synchronized (RxBus.class) {
if (mDefaultInstance == null) {
mDefaultInstance = new RxBus();
}
}
}
return mDefaultInstance;
}
/**
* 发送事件
*/
public void post(Object event) {
mBus.onNext(event);
}
/**
* 使用Rxlifecycle解决RxJava引起的内存泄漏
*/
public <T> Observable<T> toObservable(LifecycleOwner owner, final Class<T> eventType) {
LifecycleProvider<Lifecycle.Event> provider = AndroidLifecycle.createLifecycleProvider(owner);
return mBus.ofType(eventType).compose(provider.<T>bindToLifecycle());
}
/**
* 判断是否有订阅者
*/
public boolean hasObservers() {
return mBus.hasObservers();
}
public void reset() {
mDefaultInstance = null;
}
/**
* 发送一个新Sticky事件
*/
public void postSticky(Object event) {
synchronized (mStickyEventMap) {
mStickyEventMap.put(event.getClass(), event);
}
post(event);
}
/**
* 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
* 使用Rxlifecycle解决RxJava引起的内存泄漏
*/
public <T> Observable<T> toObservableSticky(LifecycleOwner owner, final Class<T> eventType) {
synchronized (mStickyEventMap) {
LifecycleProvider<Lifecycle.Event> provider = AndroidLifecycle.createLifecycleProvider(owner);
Observable<T> observable = mBus.ofType(eventType).compose(provider.<T>bindToLifecycle());
final Object event = mStickyEventMap.get(eventType);
if (event != null) {
return observable.mergeWith(Observable.create(subscriber -> subscriber.onNext(eventType.cast(event))));
} else {
return observable;
}
}
}
/**
* 根据eventType获取Sticky事件
*/
public <T> T getStickyEvent(Class<T> eventType) {
synchronized (mStickyEventMap) {
return eventType.cast(mStickyEventMap.get(eventType));
}
}
/**
* 移除指定eventType的Sticky事件
*/
public <T> T removeStickyEvent(Class<T> eventType) {
synchronized (mStickyEventMap) {
return eventType.cast(mStickyEventMap.remove(eventType));
}
}
/**
* 移除所有的Sticky事件
*/
public void removeAllStickyEvents() {
synchronized (mStickyEventMap) {
mStickyEventMap.clear();
}
}
}
事件实体类
public class MsgEvent {
private String msg;
public MsgEvent(String msg) {
this.msg = msg;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
}
接收事件(观察者)
RxBus.getInstance()
.toObservable(this, MsgEvent.class)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<MsgEvent>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(MsgEvent msgEvent) {
text.setText("one " + msgEvent.getMsg());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
事件发送
RxBus.getInstance().post(new MsgEvent("Java"));
RxBus实现原理
初始化分析
首先,RxBus是一个单利模式,这没什么可以说的,毕竟后面使用到RxBus需要是公共唯一的类。
事件总线,现在来说一下Subject<Object> mBus,这是一个事件总线,什么是事件总线呢?
在简单观察模式中,观察者订阅被观察者,单被观察者状态或者数据发生变化时通知观察者,这是一对一的关系。
但当观察者和被观察者是多个或者不确定数量的时候,这就需要一个总线来存储这些观察者和被观察者,方便在发送通知的时候找到对应的观察者。
public class RxBus {
...
//事件总线
private final Subject<Object> mBus;
private RxBus() {
mBus = PublishSubject.create().toSerialized();
mStickyEventMap = new ConcurrentHashMap<>();
}
...
}
这里是通过Rxjava中的PublishSubject.create().toSerialized() 来创建总线用来存储观察者。简单的就把它当做集合吧。
观察者的创建分析
public class RxBus{
...
/**
* 使用Rxlifecycle解决RxJava引起的内存泄漏
*/
public <T> Observable<T> toObservable(LifecycleOwner owner, final Class<T> eventType) {
LifecycleProvider<Lifecycle.Event> provider = AndroidLifecycle.createLifecycleProvider(owner);
return mBus.ofType(eventType).compose(provider.<T>bindToLifecycle());
}
/**
* 发送事件
*/
public void post(Object event) {
mBus.onNext(event);
}
...
}
回顾观察者创建代码
RxBus.getInstance()
.toObservable(this, MsgEvent.class)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<MsgEvent>() { ... });
在观察者创建时,先调用RxBus中的toObservable获取一个回调事件类型为MsgEvent的被观察者Observable,其中传入了LifecycleOwner,
这个是为了防止当页面关闭以后,订阅事件还没有结束。
然后再执行.subscribe(),并传入一个新建的观察者Observer,在subscribe的作用就是让观察者订阅被观察者
事件发送
执行Subject的next进行发送
3,RxBus粘性事件
什么是粘性事件
一般情况都是先创建观察者并加入到总线中,然后在执行事件发送,观察者就可以收到相应的事件
但是有时候也出现先发送事件,然后再创建观察者,这个时候就收不到之前的事件了,使用粘性事件就可以做到后创建的观察者也可以收到之前的事件。
class RxBus{
...
/**
* 发送一个新Sticky事件
*/
public void postSticky(Object event) {
synchronized (mStickyEventMap) {
mStickyEventMap.put(event.getClass(), event);
}
post(event);
}
/**
* 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
* 使用Rxlifecycle解决RxJava引起的内存泄漏
*/
public <T> Observable<T> toObservableSticky(LifecycleOwner owner, final Class<T> eventType) {
synchronized (mStickyEventMap) {
LifecycleProvider<Lifecycle.Event> provider = AndroidLifecycle.createLifecycleProvider(owner);
Observable<T> observable = mBus.ofType(eventType).compose(provider.<T>bindToLifecycle());
final Object event = mStickyEventMap.get(eventType);
if (event != null) {
return observable.mergeWith(Observable.create(subscriber -> subscriber.onNext(eventType.cast(event))));
} else {
return observable;
}
}
}
...
}
粘性事件实际就是创建一个Map<Class<?>, Object> mStickyEventMap,用于存储所有发送过的粘性事件,当创建粘性观察者时,会从这map中知道对应的EventType类型的被观察者Observable,并返回
observable.mergeWith(Observable.create(subscriber -> subscriber.onNext(eventType.cast(event)))),这是返回Observable时代码发送了从map中对应的事件,这样新创建的观察者也可以能马上收到之前的事件










网友评论