Rxjava深入理解之自己动手编写Rxjava

  1. 云栖社区>
  2. 博客>
  3. 正文

Rxjava深入理解之自己动手编写Rxjava

jimmie_yang 2018-11-06 11:41:00 浏览498
展开阅读全文

Demo的源码地址在 mini-rxjava, 有兴趣的可以下载源码来看.

从观察者模式说起

观察者模式,是我们在平时使用的比较多的一种设计模式.
观察者模式定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象。这个主题对象在状态上发生变化时,会通知所有观察者对象,使它们能够自动更新自己. 它的特点是 一个对象状态改变给其他对象通知的问题,而且要考虑到易用和低耦合,保证高度的协作。

本文的重点不是在介绍 观察者模式,因此这个话题就不展开讨论.

Rxjava 是一种 函数式编程,或者称为链式调用模型,也是使用观察者模式来实现事件的传递与监听.

下面我们来看,Rxjava 和 普通观察者的点区别.

  • 普通的观察者模式通常是 一个主题,多个观察者配合,基本上是属于 一对多的情况.
  • Rxjava的观察者模式通常是 一对一的关系.
  • 普通的观察者模式是 主题数据改变时,通知观察者数据的变动
  • Rxjava的观察者模式是 在被观察者(主题)调用subscribe方法后,触发数据流动和观察者接收事件.

基础知识介绍到这里,接下来我们来自己动手编写Rxjava

编写Rxjava

看一个常见的rxjava的使用示例,(原始数据,数据转换,线程切换,数据接收处理一系列功能):

    public static void main(String[] args) throws InterruptedException {
        Observable.create((ObservableOnSubscribe<String>) emitter -> {
            emitter.onNext("1");
            emitter.onNext("2");
            emitter.onComplete();
        })
                .observeOn(Schedulers.io())
                .map(s -> Integer.parseInt(s) * 10)
                .subscribe(System.out::println);
        TimeUnit.SECONDS.sleep(1);
    }
    // 10
    // 20

接下来,我会一步一步带领大家实现上述所有的功能.

一个简单的观察者模式

// Observer.java
// 观察者
public interface Observer<T> {
    void onUpdate(T t);
}
// ObservableSource.java
// 被观察者(主题)接口
public interface ObservableSource<T> {
    void subscribe(Observer<? super T> observer);
}
// Observable.java
// 具体的被观察者(主题)
public class Observable<T> implements ObservableSource<T> {
    private T t;
    public Observable(T t) {
        this.t = t;
    }
    @Override
    public void subscribe(Observer<? super T> observer) {
        // 调用订阅时,触发观察者更新
        observer.onUpdate(t);
    }
}

使用:

    public static void main(String[] args) {
        // 观察者
        Observer<String> observer = s -> System.out.println(s);
        // 被观察者(主题)
        Observable<String> observable = new Observable<>("hello");
        // 调用
        observable.subscribe(observer);
    }
    // hello

这样,算是一个简单的观察模式了,但是这种方式很不灵活,数据在构造中直接传入了.

接下来我们来改造一下 Observable.java类. 可以传入一个接口来定义数据的传递规则,并且为Observable写一个适配器和一个事件分发器,为原始事件的产生提供支持.

  • 添加 Emitter接口,它是一个事件分发的接口;
  • 添加 ObservableOnSubscribe接口,它是创建 Observable实例的桥梁,并且有生产事件的功能,支持lambda方式调用;
  • 添加 ObservableCreate类,它是Observable的适配器,能够根据ObservableOnSubscribe接口,快速创建一个 Observable实例;并且内部类CreateEmitter实现了Emitter接口,用于事件的分发;
  • 修改 Observable类,添加工厂方法,能够根据ObservableOnSubscribe接口,快速构建Observable实例;
// Emitter.java
// 事件分发器接口
public interface Emitter<T> {
    void onUpdate(T t);
}
// ObservableOnSubscribe.java
// Observable的事件分发接口
public interface ObservableOnSubscribe<T> {
    void subscribe(Emitter<T> emitter) throws Exception;
}
// Observable.java
public abstract class Observable<T> implements ObservableSource<T> {
    // 工厂方法,生产出一个Observable实例
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        return new ObservableCreate<>(source);
    }
    // 真正进行事件分发处理的方法
    abstract void subscribeActual(Observer<? super T> observer) throws Exception;
    @Override // 交给subscribeActual实现,需要子类实现
    public void subscribe(Observer<? super T> observer) throws Exception {
        subscribeActual(observer);
    }
}
// ObservableCreate.java
// Observable的一个适配器,用于快速创建一个可以发送事件的Observable
class ObservableCreate<T> extends Observable<T> {
    // 事件分发接口
    private final ObservableOnSubscribe<T> source;
    ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    @Override // 分发逻辑的具体代码
    void subscribeActual(Observer<? super T> observer) throws Exception {
        CreateEmitter<T> emitter = new CreateEmitter<>(observer);
        source.subscribe(emitter);
    }
    // 内部分发器
    static class CreateEmitter<T> implements Emitter<T> {
        private final Observer<? super T> observer;
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }
        @Override // 这里只是简单的将observer观察者的事件直接分发出去
        public void onUpdate(T t) {
            observer.onUpdate(t);
        }
    }
}

ObservableSource.javaObserver.java没有修改,固没有贴出,未贴出代码请查看上一步.使用 :

    public static void main(String[] args) throws Exception {
        Observable.create(emitter -> {
                    emitter.onUpdate("hello");
                    emitter.onUpdate("world");
                })
                .subscribe(System.out::println);
    }
    // hello
    // world

哇! 你会发现,到此为止,你已经使用观察模式实现了一个简易的函数式编程的代码了.
如果你完全理解了上述代码是怎么产生的,那么恭喜你,你已经理解rxjava最最核心的原理了.

添加事件结束和异常捕获

上诉的代码,还不能够捕获异常和结束事件,这样使用起来很不方便,接下来我们来改造实现它.
仿造rxjava,我们也将事件分为onNext,onError,onComplete三个事件.

需要修改 分发器接口和观察者接口,以及Observable的适配器.

  • 修改 Observer接口和Emitter接口, 改为onNext,onError,onComplete方法;
  • 修改 ObservableCreate类,添加异常处理和结束的逻辑;
// Observer.java
public interface Observer<T> {
    void onNext(T t);
    void onError(Throwable e);
    void onComplete();
}
// Emitter.java
public interface Emitter<T> {
    void onNext(T t);
    void onError(Throwable e);
    void onComplete();
}
// ObservableCreate.java
class ObservableCreate<T> extends Observable<T> {
    // 事件分发接口
    private final ObservableOnSubscribe<T> source;
    ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    @Override // 分发逻辑代码
    void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> emitter = new CreateEmitter<>(observer);
        try {
            source.subscribe(emitter);
        } catch (Exception e) {
            // 异常接收和处理
            emitter.onError(e);
        }
    }
    // 内部分发器
    static class CreateEmitter<T> implements Emitter<T> {
        private final Observer<? super T> observer;
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }
        @Override
        public void onNext(T t) {
            observer.onNext(t);
        }
        @Override
        public void onError(Throwable e) {
            observer.onError(e);
        }
        @Override
        public void onComplete() {
            observer.onComplete();
        }
    }
}

修改过这是三个类后,我们就能接收异常和结束了.使用:

    public static void main(String[] args) {
        Observable.create((ObservableOnSubscribe<String>) emitter -> {
            emitter.onNext("1");
            emitter.onNext("2");
            emitter.onComplete();
        })
                .subscribe(new Observer<String>() {
                    @Override
                    public void onNext(String s) {
                        System.out.println(s);
                    }
                    @Override
                    public void onError(Throwable e) {
                    }
                    @Override
                    public void onComplete() {
                        System.out.println("end...");
                    }
                });
    }
    // 1
    // 2
    // end...

嗯!? 虽然实现了接收异常和结束的功能,但是有时我们只需要onNext事件时,这样的代码写起来不够优雅.

接下来我们编写一个观察者的适配器,让它能够使用 lambda表达式来优雅的编写代码.

  • 添加 Consumer接口,它是接收一个参数,无返回值的接口,用途是进行lambda方式进行参数传递;
  • 添加 Action接口,它是一个不接受参数,无返回值和的接口,用途也是进行lambda方式进行参数传递;
  • 添加 Functions类,它是一个辅助类,能获取空Consumer和空Action实现;
  • 添加 LambdaObserver类,它会将lambda参数形式,转化为Observer实例,进而实现lambda式的调用;
  • 修改 Observable类, 添加 void subscribe(Consumer<? super T> next, Action complete, Consumer<? super Throwable> error)系列的方法,让subscribe方法真正支持 lambda式调用.
// Consumer.java
// 接受一个参数,无返回的接口
public interface Consumer<T> {
    void apply(T t) throws Exception;
}
// Action.java
// 不接受参数,无返回的接口
public interface Action {
    void apply() throws Exception;
}
// Functions.java
public class Functions {
    public static final Action EMPTY_ACTION = () -> {};
    public static <T> Consumer<T> emptyConsumer() {
        return t -> {};
    }
}
// LambdaObserver.java
public class LambdaObserver<T> implements Observer<T> {
    private final Consumer<? super T> onNext;
    private final Consumer<? super Throwable> onError;
    private final Action onComplete;
    public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {
        this.onNext = onNext;
        this.onError = onError;
        this.onComplete = onComplete;
    }
    @Override
    public void onNext(T t) {
        try {
            onNext.apply(t);
        } catch (Exception e) {
            onError(e);
        }
    }
    @Override
    public void onError(Throwable e) {
        try {
            onError.apply(e);
        } catch (Exception e1) {
            e1.printStackTrace();
        }
    }
    @Override
    public void onComplete() {
        try {
            onComplete.apply();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
// Observable.java
public abstract class Observable<T> implements ObservableSource<T> {
    // 工厂方法,生产出一个Observable实例
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        return new ObservableCreate<>(source);
    }
    // 真正进行事件分发处理的方法
    abstract void subscribeActual(Observer<? super T> observer);
    @Override // 交给subscribeActual实现,需要子类实现
    public void subscribe(Observer<? super T> observer) {
        subscribeActual(observer);
    }
    // 接受3个lambda表达式的方法参数
    public void subscribe(Consumer<? super T> next, Consumer<? super Throwable> error, Action complete) {
        LambdaObserver<T> lambdaObserver = new LambdaObserver<>(next, error, complete);
        subscribe(lambdaObserver);
    }
    // 接受2个lambda表达式的方法参数
    public void subscribe(Consumer<? super T> next, Consumer<? super Throwable> error) {
        subscribe(next, error, Functions.EMPTY_ACTION);
    }
    // 接受1个lambda表达式的方法参数
    public void subscribe(Consumer<? super T> next) {
        subscribe(next, Functions.emptyConsumer(), Functions.EMPTY_ACTION);
    }
}

接下来是调用:

public static void main(String[] args) {
        Observable.create((ObservableOnSubscribe<String>) emitter -> {
            emitter.onNext("1");
            emitter.onNext("2");
            emitter.onComplete();
        }).subscribe(System.out::println);
    }
    // 1
    // 2

啧啧!?有点激动了,已经看起来有模有样了,但是功能远远还不够.

添加 是否消费事件的功能(中断事件功能)

在上诉的代码中,我们无法主动中断整个事件发生的过程.接下来我们就需要编写 Disposable来实现onCompleteonError的自中断,以及主动取消事件.

Rxjava中,Disposable是使用 枚举类型加上原子引用(AtomicReference)类来实现线程安全(具体可查看DisposableHelper类).这种方式比较繁琐,这里就不用这种方式来演示,而使用 volatile声明的状态变量来做同步安全.

  • 添加 Disposable接口,提供中断和是否中断的方法;
  • 修改Observer接口, 添加onSubscribe方法,让观察者可以在事件传递前,获取Disposable,进而可以在事件传递的任意阶段中断事件;
  • 修改ObservableCreate类,添加观察者回调onSubscribe,此回调需在事件分发前才能起到作用;内部类CreateEmitter实现Disposable接口,在事件分发前先判断是否被中断了;使用volatile变量实现中断判断;
  • 修改LambdaObserver类,让它实现Disposable接口,添加是否中断的判断;
  • 修改Observable类,添加onSubscribelambda调用;以及返回Disposable实例;
// Disposable.java
public interface Disposable {
    void dispose();
    boolean isDisposed();
}
// Observer.java
public interface Observer<T> {
    void onSubscribe(Disposable d);
    void onNext(T t);
    void onError(Throwable e);
    void onComplete();
}
// ObservableCreate.java
// Observable的一个适配器,用于快速创建一个可以发送事件的Observable
final class ObservableCreate<T> extends Observable<T> {
    // 事件分发接口
    private final ObservableOnSubscribe<T> source;
    ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    @Override // 分发逻辑代码
    void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> emitter = new CreateEmitter<>(observer);
        // 将中断器回调给observer
        observer.onSubscribe(emitter);
        try {
            source.subscribe(emitter);
        } catch (Exception e) {
            // 异常接收和处理
            emitter.onError(e);
        }
    }
    // 内部分发器
    static class CreateEmitter<T> implements Emitter<T>, Disposable {
        private final Observer<? super T> observer;
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }
        @Override
        public void onNext(T t) {
            // 如果事件没被消费,则进行操作
            if (!isDisposed())
                observer.onNext(t);
        }
        @Override
        public void onError(Throwable e) {
            if (!isDisposed()) {
                try {
                    observer.onError(e);
                } finally {
                    // 触发消费,后续不再处理事件
                    dispose();
                }
            }
        }
        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    // 触发消费,后续不再处理事件
                    dispose();
                }
            }
        }
        private volatile boolean isDisposed = false;
        @Override
        public void dispose() {
            isDisposed = true;
        }
        @Override
        public boolean isDisposed() {
            return isDisposed;
        }
    }
}
// LambdaObserver.java
public class LambdaObserver<T> implements Observer<T>, Disposable {
    private final Consumer<? super T> onNext;
    private final Consumer<? super Throwable> onError;
    private final Action onComplete;
    private final Consumer<? super Disposable> onSubscribe;
    public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
                          Action onComplete, Consumer<? super Disposable> onSubscribe) {
        this.onNext = onNext;
        this.onError = onError;
        this.onComplete = onComplete;
        this.onSubscribe = onSubscribe;
    }
    @Override
    public void onSubscribe(Disposable d) {
        try {
            onSubscribe.apply(d);
        } catch (Exception e) {
            onError(e);
        }
    }
    @Override
    public void onNext(T t) {
        if (!isDisposed())
            try {
                onNext.apply(t);
            } catch (Exception e) {
                onError(e);
            }
    }
    @Override
    public void onError(Throwable e) {
        if (!isDisposed())
            try {
                onError.apply(e);
            } catch (Exception e1) {
                e1.printStackTrace();
            } finally {
                dispose();
            }
    }
    @Override
    public void onComplete() {
        if (!isDisposed())
            try {
                onComplete.apply();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                dispose();
            }
    }
    private volatile boolean isDisposed = false;
    @Override
    public void dispose() {
        isDisposed = true;
    }
    @Override
    public boolean isDisposed() {
        return isDisposed;
    }
}
// Observable.java
public abstract class Observable<T> implements ObservableSource<T> {
    // 工厂方法,生产出一个Observable实例
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        return new ObservableCreate<>(source);
    }
    // 真正进行事件分发处理的方法
    abstract void subscribeActual(Observer<? super T> observer);
    @Override // 交给subscribeActual实现,需要子类实现
    public void subscribe(Observer<? super T> observer) {
        subscribeActual(observer);
    }
    // 接受4个lambda表达式的方法参数
    public Disposable subscribe(Consumer<? super T> next, Consumer<? super Throwable> error,
                                Action complete, Consumer<? super Disposable> onSubscribe) {
        LambdaObserver<T> lambdaObserver = new LambdaObserver<>(next, error, complete, onSubscribe);
        subscribe(lambdaObserver);
        return lambdaObserver;
    }
    // 接受3个lambda表达式的方法参数
    public Disposable subscribe(Consumer<? super T> next, Consumer<? super Throwable> error, Action complete) {
        return subscribe(next, error, complete, Functions.emptyConsumer());
    }
    // 接受2个lambda表达式的方法参数
    public Disposable subscribe(Consumer<? super T> next, Consumer<? super Throwable> error) {
        return subscribe(next, error, Functions.EMPTY_ACTION, Functions.emptyConsumer());
    }
    // 接受1个lambda表达式的方法参数
    public Disposable subscribe(Consumer<? super T> next) {
        return subscribe(next, Functions.emptyConsumer(), Functions.EMPTY_ACTION, Functions.emptyConsumer());
    }
}

到此,我们就能够控制事件的中断了.我们来看使用:

    private Disposable mDisposable = null;
    void disposableTest() {
        Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
            emitter.onComplete();
        })
        .subscribe(integer -> {
            System.out.println(integer);
            // 3 事件将不再传递和接收
            if (integer == 2 && mDisposable != null)
                mDisposable.dispose();
            },
            Functions.emptyConsumer(), Functions.EMPTY_ACTION,
             d -> mDisposable = d);
    }
    // 1
    // 2
    
    // 这种方式只在,异步的情况下使用,由于示例中目前还不支持异步,因此以下代码起不了作用.
    void disposableTest2() {
    Disposable disposable = Observable
    .create((ObservableOnSubscribe<Integer>) emitter -> {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);
        emitter.onComplete();
    })
    .subscribe(System.out::println);
    disposable.dispose();
    }

像这样,在事件分发前,拿到Disposable对象,这样你能在任意阶段中断 这个过程.

至此, 我们实现了基本的事件发送和lambda调用,以及中断功能.接下来我们需要开始添加 操作符了, 让它真的能达到函数式调用的模样!

添加 操作符(Map)

操作符的重点是, 你需要处理好上游传递下来的Disposable对象,以及下游待传递的Observer.

下面我们来实现map操作符的功能, 它可以将一个类型转化为另一个类型.

  • 添加 Function接口,该接口接收一个类型的参数,并且返回另一个类型的值;
  • 添加 BasicObserver类,该类实现ObserverDisposable接口,用于传递上下游的数据;
  • 添加 ObservableMap类,该类继承Observable,并且使用Function接口,实现类型转换;内部类MapObserver 继承自BasicObserver实现具体转换逻辑;
  • 修改 Observable类,添加map操作符;
// Function.java
// 接收一个类型的参数,返回一个类型
public interface Function<T, R> {
    R apply(T t) throws Exception;
}
// BasicObserver.java
public abstract class BasicObserver<T, R> implements Observer<T>, Disposable {
    // 上游传递的Disposable对象
    private Disposable          upstream;
    // 下游接收的观察者对象
    final   Observer<? super R> downstream;
    // 如果已经中断,则无需下传
    boolean done;
    BasicObserver(Observer<? super R> downstream) {
        this.downstream = downstream;
    }
    @Override
    public void onSubscribe(Disposable d) {
        // 接收上游的Disposable
        this.upstream = d;
        downstream.onSubscribe(this);
    }
    @Override
    public void onError(Throwable e) {
        if (done) return;
        done = true;
        downstream.onError(e);
    }
    @Override
    public void onComplete() {
        if (done) return;
        done = true;
        downstream.onComplete();
    }
    @Override
    public void dispose() {
        upstream.dispose();
    }
    @Override
    public boolean isDisposed() {
        return upstream.isDisposed();
    }
}
// ObservableMap.java
final class ObservableMap<T, U> extends Observable<U> {
    private final ObservableSource<T>              source;
    private final Function<? super T, ? extends U> function;
    ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        this.source = source;
        this.function = function;
    }
    @Override
    void subscribeActual(Observer<? super U> observer) {
        source.subscribe(new MapObserver<T, U>(observer, function));
    }
    static class MapObserver<T, U> extends BasicObserver<T, U> {
        final Function<? super T, ? extends U> mapper;
        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }
        @Override
        public void onNext(T t) {
            if (done) return;
            try {
                // 这里实现具体的转化,下游接收到转化后的类型变量
                downstream.onNext(mapper.apply(t));
            } catch (Exception e) {
                onError(e);
            }
        }
    }
}
// Observable.java
public abstract class Observable<T> implements ObservableSource<T> {
    ...
    // map操作符
    public <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        return new ObservableMap<>(this, mapper);
    }
}

实现好了map功能,我们来验证一下:

    void mapTest() {
        Observable.create((ObservableOnSubscribe<String>) emitter -> {
            emitter.onNext("1");
            emitter.onNext("2");
            emitter.onComplete();
        })
        .map(s -> Integer.parseInt(s) * 10)
        .subscribe(System.out::println);
    }
    // 10
    // 20

哇!! 有点不可思议,已经做到这一步了,我们还差什么呢? 没错,就是线程切换!!

PS : Rxjava中有很多的操作符, 我用其中比较典型的map来做示范,其他操作符,有兴趣的可以自己手动来实现.

线程切换

rxjava中,自己实现了一套功能强大的线程池.配合操作符 observeOn,subscribeOn来进行线程切换.这里就不对其进行展开,我们的重点是自己实现.

由于rxjava的线程池调度相当的复杂, 这里为了方便演示,将只采用jdk自带的线程池来做线程调度.下面我们来实现, rxjava中 observeOn操作符,以及Schedulers.io()的线程调度.

  • 添加Scheduler接口,定义了调度的方法,提交任务,移除任务,停止线程池;
  • 添加IOScheduler类,是Scheduler的具体实现,采用newCachedThreadPool提交任务和中断任务;
  • 添加ObservableObserveOn类,继承Observable,为observeOn操作符提供支持;内部类ObserveOnObserver,实现Observer,DisposableRunnable接口,run方法将拦截所有事件,将其作为任务提交给线程池运行,达到异步的效果;
  • 修改Observable类,添加observeOn操作符;
// Scheduler.java
public interface Scheduler {
    void submit(Runnable runnable);
    void remove(Runnable runnable);
    void shutdown();
}
// IOScheduler
public class IOScheduler implements Scheduler {
    // 线程池
    private final ExecutorService       executor  = Executors.newCachedThreadPool();
    // 保存Future对象,为了能够中断指定线程
    private final Map<Runnable, Future> futureMap = new ConcurrentHashMap<>();
    @Override
    public void submit(Runnable runnable) {
        Future future = futureMap.get(runnable);
        // 如果对应的任务正在执行,则无需再提交
        if (future != null && !future.isDone()) return;
        if (executor.isShutdown()) return;
        futureMap.put(runnable, executor.submit(runnable));
    }
    @Override
    public void remove(Runnable runnable) {
        Future future = futureMap.get(runnable);
        if (future == null) return;
        try {
            future.cancel(true);
        } catch (Exception ignored) {
        } finally {
            futureMap.remove(runnable);
        }
    }
    @Override
    public void shutdown() {
        if (!executor.isShutdown())
            executor.shutdown();
    }
}
// ObservableObserveOn.java
public final class ObservableObserveOn<T> extends Observable<T> {
    private final ObservableSource<T> source;
    private final Scheduler           scheduler;

    ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler) {
        this.source = source;
        this.scheduler = scheduler;
    }
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        source.subscribe(new ObserveOnObserver<>(observer, scheduler));
    }
    static final class ObserveOnObserver<T> implements Observer<T>, Disposable, Runnable {
        private final    Observer<? super T> downstream;
        private final    Scheduler           scheduler;
        private          Disposable          upstream;
        private volatile boolean             done;
        private volatile boolean             disposed;
        private          Queue<T>            queue = new LinkedList<>();
        private          Throwable           error;

        ObserveOnObserver(Observer<? super T> actual, Scheduler scheduler) {
            this.downstream = actual;
            this.scheduler = scheduler;
        }
        @Override
        public void onSubscribe(Disposable d) {
            upstream = d;
            downstream.onSubscribe(this);
        }
        @Override
        public void onNext(T t) {
            if (done) return;
            queue.offer(t);
            schedule();
        }
        @Override
        public void onError(Throwable t) {
            if (done) return;
            done = true;
            error = t;
            schedule();
        }
        @Override
        public void onComplete() {
            if (done) return;
            done = true;
            schedule();
        }
        @Override
        public void dispose() {
            if (!disposed) {
                disposed = true;
                upstream.dispose();
                scheduler.remove(this);
                queue.clear();
            }
        }
        @Override
        public boolean isDisposed() {
            return disposed;
        }
        // 提交任务
        void schedule() {
            scheduler.submit(this);
        }
        // 检查事件是否已中断,并作出相应的反馈
        boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
            if (disposed) {
                queue.clear();
                return true;
            }
            if (d) {
                Throwable e = error;
                if (e != null) {
                    disposed = true;
                    queue.clear();
                    a.onError(e);
                    scheduler.remove(this);
                    return true;
                } else if (empty) {
                    disposed = true;
                    a.onComplete();
                    scheduler.remove(this);
                    return true;
                }
            }
            return false;
        }
        @Override // 拦截事件传递,到run方法,run方法将由线程池运行
        public void run() {
            final Queue<T>            q = queue;
            final Observer<? super T> a = downstream;
            for (; ; ) {
                if (checkTerminated(done, q.isEmpty(), a)) return;
                for (; ; ) {
                    boolean d = done;
                    T       v;
                    try {
                        v = q.poll();
                    } catch (Throwable ex) {
                        disposed = true;
                        upstream.dispose();
                        q.clear();
                        a.onError(ex);
                        scheduler.remove(this);
                        return;
                    }
                    boolean empty = v == null;
                    if (checkTerminated(d, empty, a)) return;
                    if (empty) break;
                    a.onNext(v);
                }
            }
        }
    }
}
// Observable.java
public abstract class Observable<T> implements ObservableSource<T> {
    ...
    // 线程调度操作符
    public final Observable<T> observeOn(Scheduler scheduler) {
        return new ObservableObserveOn<>(this, scheduler);
    }
}

接下来我们来看示例:

    public void schedulerTest() throws InterruptedException {
        Observable.create((ObservableOnSubscribe<String>) emmit -> {
            System.out.println("emmit : " + Thread.currentThread().getName());
            emmit.onNext("1");
            emmit.onNext("2");
            emmit.onComplete();
        })
                .observeOn(Schedulers.io())
                .map(it -> {
                    int r = Integer.parseInt(it) * 10;
                    System.out.println(r + " : map : " + Thread.currentThread().getName());
                    return r;
                }).subscribe(it -> System.out.println(it + " : observer : " + Thread.currentThread().getName()),
                Functions.emptyConsumer(),
                () -> System.out.println("onCompleted.... " + Thread.currentThread().getName()));
        TimeUnit.SECONDS.sleep(1);
    }
emmit : main
10 : map : pool-1-thread-1
10 : observer : pool-1-thread-1
20 : map : pool-1-thread-1
20 : observer : pool-1-thread-1
onCompleted.... pool-1-thread-1

可以看到 observeOn的所有下游事件,都在新的线程中运行了!!至此,线程调度的部分功能,我们也粗略的实现了!

总结

如果,你从开始看到现在, 我们已经自己实现了rxjava的一个基本使用操作了,编写了10来个类,其中大部分都是接口,写了500多行的代码.其中涉及rxjava中事件分发,lambda调用,取消和中断,map操作符,io线程切换,这一完整的流程.

rxjava 提供了丰富的 操作符, 和 各种的线程切换模型, 我们在理解其原理的情况下,都可以自己来实现.

rxjava中 RxJavaPlugins使用代理的思想来插入全局资源管理,以及使用Backpressure(背压)来控制数据流的思想,我们都可以学习和借鉴!

我们在学习过程中,可以根据源码,分解其中的知识点,逐步消化,甚至自己动手来实现它, 来达到深入理解的目的.

最后,赶紧动手编写吧!!!

引用

  1. RxJava: Reactive Extensions for the JVM

网友评论

登录后评论
0/500
评论
jimmie_yang
+ 关注