美文网首页
模拟RxJava的实现原理

模拟RxJava的实现原理

作者: vpractical | 来源:发表于2019-10-14 16:21 被阅读0次

[TOC]

GitHub代码地址

使用

Observable
               .create(new ObservableOnSubscribe<List<User>>() {
                   @Override
                   public void subscribe(Emitter<List<User>> emitter) {
                       emitter.next(list);
                       emitter.complete();
                       log("create : " + Thread.currentThread().getName());
                   }
               })
               .map(new Function<List<User>, List<User>>() {
                   @Override
                   public List<User> apply(List<User> users) {
                       for (User u : users) {
                           u.age = 3;
                       }
                       return users;
                   }
               })
               .subscribeOn(ThreadScheduler.IO)
               .observerOn(ThreadScheduler.MAIN)
               .subscribe(new Observer<List<User>>() {
                   @Override
                   public void onSubscribe(Disposable disposable) {
                       log("onSubscribe()");
                       log("onSubscribe() : " + Thread.currentThread().getName());
                   }

                   @Override
                   public void onNext(List<User> val) {
                       log("onNext(): " + val.toString());
                       log("onNext() : " + Thread.currentThread().getName());
                   }

                   @Override
                   public void onError(Throwable t) {
                       log("onError(): " + t.toString());
                   }

                   @Override
                   public void onComplete() {
                       log("onComplete()");
                   }
               });

实现

1.被观察者抽象类

  • rxjava使用过程,每个操作符都是一层,每一层都是独立的,观察上一层,同时被下一层观察。最上层不用向上观察,他持有一个内容分发者,这是一个接口,用户实现数据分发操作。最下层不被观察,他只需要观察上一层。
  • 各个操作符的方法,返回本层被观察者的实现类,如create返回ObservableCreate
  • 各个被观察者继承这个抽象类,实现subscribeImpl,参数是下一层的观察者,由下一层调用,以此建立订阅关系
  • 除最上层,每层持有上一层的被观察者对象,调用subscribeImpl实现订阅,上一层在被订阅时同时订阅更上一层。从最下层的观察者,依次向上层订阅
public abstract class Observable<T> {

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source){
        return new ObservableCreate<>(source);
    }
    public void subscribe(Observer<? super T> observer){
        subscribeImpl(observer);
    }
    public <R> Observable<R> map(Function<? super T,? extends R> function){
        return new ObservableMap<>(this,function);
    }
    public Observable<T> subscribeOn(ThreadScheduler scheduler){
        return new ObservableSubscribeOn<>(this,scheduler);
    }
    public Observable<T> observerOn(ThreadScheduler scheduler){
        return new ObservableObserverOn<>(this,scheduler);
    }
    protected abstract void subscribeImpl(Observer<? super T> observer);
}

2.create操作符:返回这一层的被观察者ObservableCreate
rxjava 有3个主要构成

  • 被观察者
  • 观察者
  • 内容分发者:在第一层的被观察者中分发数据
    需要注意的是,rxjava不止是一层一层叠起来的结构,可能某一层中包含了多个层,多个内容分发源头等
public interface ObservableOnSubscribe<T> {
    void subscribe(Emitter<T> emitter) throws Exception;
}

public interface Emitter<T> {
    void next(T val);
    void error(Throwable t);
    void complete();
}

参数是内容分发者接口对象,用户实现该接口后,源码调用订阅方法subscribe(),并创建发射器emitter,用户使用emitter分发数据。

/**
 * create操作符对应的被观察者
 */
class ObservableCreate<T> extends Observable<T> {
    private ObservableOnSubscribe<T> source;

    ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    @Override
    protected void subscribeImpl(Observer<? super T> observer) {
        EmitterCreate<T> emitter = new EmitterCreate<>(observer);
        observer.onSubscribe(emitter);
        try {
            source.subscribe(emitter);
        }catch (Exception e){
            e.printStackTrace();
            observer.onError(e);
        }
    }
    static final class EmitterCreate<T> implements Emitter<T>, Disposable {
        private Observer<? super T> observer;
        private boolean isDisposable;
        EmitterCreate(Observer<? super T> observer) {
            this.observer = observer;
        }
        @Override
        public void disposable() {
            isDisposable = true;
        }
        @Override
        public boolean isDisposable() {
            return isDisposable;
        }
        @Override
        public void next(T val) {
            if(isDisposable) return;
            observer.onNext(val);
        }
        @Override
        public void error(Throwable t) {
            if(isDisposable) return;
            observer.onError(t);
        }
        @Override
        public void complete() {
            if(isDisposable) return;
            observer.onComplete();
        }
    }
}

3.map操作符

  • 这一层有观察上一层的观察者MapObserver,ObservableMap本身是这一层的被观察者,在被下一层的观察者订阅时,本层的MapObserver同时订阅了上一层的被观察者
class ObservableMap<T, R> extends Observable<R> {
    private Observable<T> observable;
    private Function<? super T, ? extends R> function;
    ObservableMap(Observable<T> observable, Function<? super T, ? extends R> function) {
        this.observable = observable;
        this.function = function;
    }
    @Override
    protected void subscribeImpl(Observer<? super R> observer) {
        observable.subscribe(new MapObserver<>(observer,function));
    }
    private static final class MapObserver<T,R> extends Basic2Observer<T,R> {
        Function<? super T,? extends R> function;
        MapObserver(Observer<? super R> observer,Function<? super T,? extends R> function){
            super(observer);
            this.function = function;
        }
        @Override
        public void onNext(T val) {
            R r = function.apply(val);
            observer.onNext(r);
        }
    }
}
  • map操作符实现的数据类型转换,其实是参数传入了一个接口Function,内部定义方法apply()参数是逆变泛型T,返回值是协变泛型R,用于转换数据类型,apply的实现由用户实现
public interface Function<T,R> {
    R apply(T t);
}

4.subscribeOn操作符:

  • 线程调度实现,判断用户传入的线程和当前线程的关系,然后看是直接订阅or放到子线程or主线程
  • 也是有自己层的被观察者和观察者对象
class ObservableSubscribeOn<T> extends Observable<T>{
    private Observable<T> observable;
    private ThreadScheduler scheduler;
    ObservableSubscribeOn(Observable<T> observable, ThreadScheduler scheduler){
        this.observable = observable;
        this.scheduler = scheduler;
    }
    @Override
    protected void subscribeImpl(final Observer<? super T> observer) {
        Runnable r = new Runnable() {
            @Override
            public void run() {
                BasicObserver<T> subscribeOnObserver = new BasicObserver<>(observer);
                observable.subscribe(subscribeOnObserver);
            }
        };
        TaskScheduler.run(r,scheduler);
    }
}
public class TaskScheduler {
    private static final Handler HANDLER = new Handler(Looper.getMainLooper());
    private static final ExecutorService SERVICE = Executors.newCachedThreadPool();

    public static void run(Runnable r,ThreadScheduler scheduler){
        boolean isMainThread = Looper.myLooper() == Looper.getMainLooper();
        if(scheduler == ThreadScheduler.DEFAULT){
            r.run();
        }else if(scheduler == ThreadScheduler.MAIN){
            if(isMainThread){
                r.run();
            }else{
                HANDLER.post(r);
            }
        }else{
            SERVICE.submit(r);
        }
    }
}

5.后续的操作符都是同样的道理,在最下层只有一个观察者,他在调用上层的订阅方法时,上层会先回调他的onSubscribe(),参数是Disposable ,由源码实现,当上层回调时,如果他调用Disposable的disposable()方法,上层会中断事件传递

public interface Disposable {
    void disposable();
    boolean isDisposable();
}

相关文章

  • 模拟RxJava的实现原理

    [TOC] GitHub代码地址 使用 实现 1.被观察者抽象类 rxjava使用过程,每个操作符都是一层,每一层...

  • 3、rxjava基本原理研究

    模拟rxjava基本工作原理 输出:

  • RxJava基础

    主要记录RxJava的知识点,不涉及原理和细节 简介 利用RxJava实现函数响应式编程 RxJava = 被观察...

  • RxJava 之 Hello World

    原理介绍## 什么是RxJava## RxJava 就是异步 RxJava 的异步实现,是通过一种扩展的观察者模式...

  • 自己动手实现 RxJava zip

    为了学习 RxJava 的原理,参考其源码,自己动手实现一个简化的 RxJava,代码 LittleRx 本文接上...

  • Android面试复习

    想到什么就记什么吧 java基础篇 HashMap实现原理及源码分析 RXjava RXJava的好处:(1)简洁...

  • RxJava 是如何实现线程切换的(上)

    前言 通过前一篇的从观察者模式出发,聊聊RxJava,我们大致理解了RxJava的实现原理,在RxJava中可以非...

  • RxJava 是如何实现线程切换的(上)

    前言 通过前一篇的从观察者模式出发,聊聊RxJava,我们大致理解了RxJava的实现原理,在RxJava中可以非...

  • RxJava基本使用

    1、RxJava概述 RxJava 是一个基于事件流、实现异步操作的库Rxjava原理基于一种扩展的观察者模式,有...

  • Retrofit2+rxjava2源码解析(二):rxjava2

    上一篇讲了retrofit2的原理,这一篇咱们重点讲讲rxjava2的实现原理。不过呢,由于rxjava2博大精深...

网友评论

      本文标题:模拟RxJava的实现原理

      本文链接:https://www.haomeiwen.com/subject/mrirmctx.html