美文网首页
RxJava 中的设计模式(三)代理模式之切换线程实现

RxJava 中的设计模式(三)代理模式之切换线程实现

作者: 蓝笔头 | 来源:发表于2021-06-27 13:11 被阅读0次

代理模式介绍

在代理模式(Proxy Pattern)中,一个类代表另一个类的功能。这种类型的设计模式属于结构型模式。

  • 意图:为其他对象提供一种代理以控制对这个对象的访问。
  • 何时使用:想在访问一个类时做一些控制。
  • 如何解决:增加中间层。

注意事项:

实现

UML 类图
public interface Service {
    void handle();
}

@Slf4j
public class RealService implements Service{
    @Override
    public void handle() {
        log.info("I am the RealService");
    }
}

@Slf4j
public class ProxyService implements Service {
    private Service realService;

    public ProxyService(Service realService) {
        this.realService = realService;
    }

    @Override
    public void handle() {
        new Thread(() -> {
            this.realService.handle();
        }).start();
    }
}

测试:

public class Main {

    public static void main(String[] args) {
        Service realService = new RealService();
        realService.handle();

        ProxyService proxyService = new ProxyService(realService);
        proxyService.handle();
    }
}

输出结果:

10:53:10.375 [main] INFO org.company.pattern.proxy.RealService - I am the RealService
10:53:10.425 [Thread-0] INFO org.company.pattern.proxy.RealService - I am the RealService

RxJava 切换线程实现

通过代理模式实现 RxJava 中的线程切换代码如下文所示。

订阅时(SubscribeOn)

订阅时,即在调用 subscribe() 方法时,切换线程。

1)实现调度器 Schedulers

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class Schedulers {
    private static Scheduler IO = new IoScheduler("IO");
    private static Scheduler NEW_THREAD = new NewThreadScheduler();

    public static Scheduler io() {
        return IO;
    }

    public static Scheduler newThread() {
        return NEW_THREAD;
    }

    interface Scheduler {
        void schedule(Runnable run);
    }

    static class NamedThreadFactory extends AtomicInteger implements ThreadFactory {
        private String prefix;

        public NamedThreadFactory(String prefix) {
            this.prefix = prefix;
        }

        @Override
        public Thread newThread(Runnable r) {
            StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet());
            String name = nameBuilder.toString();
            return new Thread(r, name);
        }
    }

    static class NewThreadScheduler implements Scheduler {
        private static final NamedThreadFactory threadFactory = new NamedThreadFactory("New-Thread");

        @Override
        public void schedule(Runnable run) {
            Thread thread = threadFactory.newThread(run);
            thread.start();
        }
    }

    static class IoScheduler implements Scheduler {
        private static final NamedThreadFactory threadFactory = new NamedThreadFactory("IO");
        private ExecutorService executorService;

        public IoScheduler(String tag) {
            executorService = Executors.newFixedThreadPool(1, threadFactory);
        }

        @Override
        public void schedule(Runnable run) {
            executorService.submit(run);
        }
    }
}

2)实现 ObservableSubscribeOn

@Slf4j
public class ObservableSubscribeOn<T> extends Observable<T>{
    private Observable<T> upstream;
    private Schedulers.Scheduler scheduler;

    public ObservableSubscribeOn(Observable<T> upstream, Schedulers.Scheduler scheduler) {
        this.upstream = upstream;
        this.scheduler = scheduler;
    }

    @Override
    protected void subscribeActual(Observer<T> observer) {
        scheduler.schedule(() -> {
            log.info("schedule is called.");
            this.upstream.subscribe(observer);
        });
    }
}

代理模式分析:

  • upstream 字段所引用的 Observable 对象可以看作是上文的 RealService
  • ObservableSubscribeOn 可以看作是上文的 ProxyService

注意:这里代理的的是 Observable

观察时(ObserveOn)

观察时,即事件处理时(调用 ObserveronNext() 方法)。
这个时候也可以切换线程。

1)Schedulers.Scheduler 新增 createWorker() 方法。

public class Schedulers {
    ...
    interface Scheduler {
        ...
        ExecutorService createWorker();
    }

    static class NewThreadScheduler implements Scheduler {
        ...
        @Override
        public ExecutorService createWorker() {
            return Executors.newFixedThreadPool(1, threadFactory);
        }
    }

    static class IoScheduler implements Scheduler {
        ...
        @Override
        public ExecutorService createWorker() {
            return Executors.newFixedThreadPool(1, threadFactory);
        }
    }
}

因为一个事件流中可以包含多个事件数据,所以要使用 Worker(线程池)的形式执行事件监听处理(ObserveronNext() 方法)。

2)实现 ObservableObserveOn

public class ObservableObserveOn<T> extends Observable<T> {
    private Observable<T> upstream;
    private Schedulers.Scheduler scheduler;

    public ObservableObserveOn(Observable<T> upstream, Schedulers.Scheduler scheduler) {
        this.upstream = upstream;
        this.scheduler = scheduler;
    }

    @Override
    protected void subscribeActual(Observer observer) {
        ObserveOnObserver<T> observeOnObserver = new ObserveOnObserver(observer, this.scheduler);
        this.upstream.subscribe(observeOnObserver);
    }

    public static class ObserveOnObserver<T> implements Observer<T> {
        private Observer<T> downstream;
        private ExecutorService worker;

        public ObserveOnObserver(Observer<T> downstream, Schedulers.Scheduler scheduler) {
            this.downstream = downstream;
            this.worker = scheduler.createWorker();
        }

        @Override
        public void onNext(T value) {
            this.worker.submit(() -> {
                this.downstream.onNext(value);
            });
        }
    }
}

代理模式分析:

  • downstream 字段所引用的 Observer 对象可以看作是上文的 RealService
  • ObserveOnObserver 可以看作是上文的 ProxyService

注意:这里代理的的是 Observer

测试代码如下所示:

@Slf4j
public class Main {

    public static void main(String[] args) throws InterruptedException {
        Observable.create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(Emitter<Integer> emitter) {
                        log.info("emitter begin");
                        emitter.onNext(1);
                        emitter.onNext(2);
                        emitter.onNext(3);
                    }
                })
                .map(v -> v + 10) // 11 12 13
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.newThread())
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onNext(Integer o) {
                        log.info("onNext {}", o);
                    }
                });
    }
}

输出如下所示:

17:17:51.623 [IO-1] INFO org.company.rxjava.pattern.ObservableSubscribeOn - schedule is called.
17:17:51.639 [IO-1] INFO org.company.rxjava.pattern.Main2 - emitter begin
17:17:51.642 [New-Thread-1] INFO org.company.rxjava.pattern.Main2 - onNext 11
17:17:51.649 [New-Thread-1] INFO org.company.rxjava.pattern.Main2 - onNext 12
17:17:51.649 [New-Thread-1] INFO org.company.rxjava.pattern.Main2 - onNext 13

从输出中可以得知,订阅(subscribe()) 在 [IO-0] 线程中执行,事件处理(onNext())在 [New-Thread-[worker]-0] 线程中执行。

说明实现符合预期。

总结

  • ObservableSubscribeOn 中用代理模式代理了 upstream 字段引用的 Observable 对象。
  • ObserveOnObserver 中用代理模式代理了 downstream 字段所引用的 Observer 对象。

参考

相关文章

  • RxJava 中的设计模式(三)代理模式之切换线程实现

    代理模式介绍 在代理模式(Proxy Pattern)中,一个类代表另一个类的功能。这种类型的设计模式属于结构型模...

  • 设计模式之代理模式

    设计模式之代理模式 10分钟看懂动态代理设计模式(升级篇)-对这篇动态代理模式的思路整理 仿JDK实现动态代理逻辑...

  • 设计模式之代理

    设计模式之代理模式 一、定义 在Java中代理的实现一般分为三种:JDK静态代理、JDK动态代理以及CGLIB动态...

  • Delegate的基本使用

    代理的基本使用 代理是一种通用的设计模式,在iOS中对代理设计模式支持的很好,有特定的语法来实现代理模式,OC语言...

  • iOS之代理笔记

    代理的基本使用 代理是一种通用的设计模式,在iOS中对代理设计模式支持的很好,有特定的语法来 实现代理模式,OC语...

  • 设计模式-代理

    代理的基本使用 代理是一种通用的设计模式,在iOS中对代理设计模式支持的很好,有特定的语法来实现代理模式,OC语言...

  • 代理

    代理 概念:一种通用的设计模式,在iOS中对代理设计模式支持的很好,有特定的语法来实现代理模式,OC语言可以通过@...

  • 代理模式(Proxy Pattern)

    1. 简介 代理模式属于设计模式中的结构型模式,有静态代理和动态代理两种实现方式。代理模式使用代理类来控制访问委托...

  • java设计模式之代理模式(静态代理)

      今天给大家分享的是java设计模式之代理模式中的静态代理模式,动态代理模式将在后面文章中给出。如有不足,敬请指...

  • 反射与代理设计模式

      代理设计模式是在程序开发中使用最多的设计模式,代理设计模式的核心是有真实业务实现类和代理业务实现类,并且代理类...

网友评论

      本文标题:RxJava 中的设计模式(三)代理模式之切换线程实现

      本文链接:https://www.haomeiwen.com/subject/fecxultx.html