美文网首页
异步工作转为RX的Observable

异步工作转为RX的Observable

作者: 蒸汽飞船 | 来源:发表于2018-03-18 16:18 被阅读105次

如何将一个一步操作的方法转为RX的Observable

1.定义接口Async2Rx

public interface Async2Rx<T>{
    void start(AsyncCallBack<T> callback);
    void cancel();

    interface AsyncCallBack<T> {
        void onSucc(T t);
        void onError(Throwable throwable);
    }
}

2.该类实现Async2Rx接口:

public static class Test implements Async2Rx<String>{ 

    @Override
    public void start(final AsyncCallBack<String> callback) {
        //异步的操作,比如买东西
        buySomeThing(new OnPayListener(){
            @Override
            public void onPaySuccess(String success) {
                callback.onSucc(success);
            }

            @Override
            public void onPayFailed(Throwable throwable) {
                callback.onError(throwable);
            }
        });
    }

    @Override
    public void cancel() {
        //收到取消通知时,取消异步任务。比如用户调用了:Disposable.dispose()方法
        cancelBuy();
    }
}

3.生成Observable:

Test mTest=xxx;
Observable observable = AsyncObservable.create(mTest);

该方法,可以设置超时时间单位为秒,不设置的话默认为40秒

核心类:

public class AsyncObservable<T> extends Observable<T> {
    private static final int DEFAULT_TIME_OUT = 40;  //超时时间
    private final Async2Rx<T> originalCall;

    AsyncObservable(Async2Rx<T> originalCall) {
        this.originalCall = originalCall;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CallCallback<T> callback = new CallCallback(originalCall, observer);
        observer.onSubscribe(callback);
        originalCall.start(callback);
    }

    private static final class CallCallback<T> implements Disposable, Async2Rx.AsyncCallBack<T> {
        private final Observer<? super T> observer;
        private Async2Rx<T> originalCall;
        boolean terminated = false;
        private boolean mIsDisposed = false;

        CallCallback(Async2Rx<T> originalCall, Observer<? super T> observer) {
            this.originalCall = originalCall;
            this.observer = observer;
        }

        @Override
        public void dispose() {
            mIsDisposed = true;
            this.originalCall.cancel();
        }

        @Override
        public boolean isDisposed() {
            return mIsDisposed;
        }

        @Override
        public void onSucc(T t) {
            if (!isDisposed()) {
                try {
                    this.observer.onNext(t);
                    if (!isDisposed()) {
                        this.terminated = true;
                        this.observer.onComplete();
                    }
                } catch (Throwable var6) {
                    if (this.terminated) {
                        //onNext结束后onComplete()方法出错
                        RxJavaPlugins.onError(var6);
                    } else if (!isDisposed()) {
                        try {
                            this.observer.onError(var6);
                        } catch (Throwable var5) {
                            Exceptions.throwIfFatal(var5);
                            RxJavaPlugins.onError(new CompositeException(new Throwable[]{var6, var5}));
                        }
                    }
                }

            }
        }

        @Override
        public void onError(Throwable throwable) {
            if (!isDisposed()) {
                try {
                    this.observer.onError(throwable);
                } catch (Throwable var4) {
                    Exceptions.throwIfFatal(var4);
                    RxJavaPlugins.onError(new CompositeException(new Throwable[]{throwable, var4}));
                }

            }
        }
    }

    public static Observable create(Async2Rx originalCall){
       return create(originalCall,  DEFAULT_TIME_OUT);
    }
    public static Observable create(Async2Rx originalCall,int timeout){
        return new AsyncObservable(originalCall).timeout(timeout, TimeUnit.SECONDS).observeOn(Schedulers.newThread());
    }
}

相关文章

网友评论

      本文标题:异步工作转为RX的Observable

      本文链接:https://www.haomeiwen.com/subject/udgeqftx.html