冷饭热炒——RxJava

简介: 已经超过一个月没有写文章了,原因无非就是工作太忙。最近终于恢复以前的节奏,任务开始正常了起来。忙里偷闲,写一写人们写烂了的RxJava。这篇文章主要分析RxJava事件的产生以及变化的原理,Ok,let's go!0. 前言本次源码分析使用的是RxJava2,版本2.1.14。

已经超过一个月没有写文章了,原因无非就是工作太忙。最近终于恢复以前的节奏,任务开始正常了起来。忙里偷闲,写一写人们写烂了的RxJava。这篇文章主要分析RxJava事件的产生以及变化的原理,Ok,let's go!

0. 前言

本次源码分析使用的是RxJava2,版本2.1.14。RxJava1和RxJava2区别还是很大的,今天去github上看了下,RxJava1在三月底就停止更新了。

1. 关于RxJava

套用扔物线大神的话就是:a library for composing asynchronous and event-based programs by using observable sequences.(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。就简单说这句吧,详细还是看大神的文章吧。

2. Observable创建

在使用RxJava时,通常使用Observable.just(T t...)/Observable.create(ObservableOnSubscribe<T> source)来创建Observable对象,看其源码如下:

Observable.java
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    // 判空
    ObjectHelper.requireNonNull(source, "source is null");
    // 通过RxJavaPlugins
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

RxJavaPlugins.java
@SuppressWarnings({ "rawtypes", "unchecked" })
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    // 这个与hook有关,默认为null,所以返回值即传入值
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}

整个创建过程其实就是返回了一个ObservableCreate对象,该对象如下:

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        // 代理模式的运用
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            // 调用了ObservableOnSubscribe的subscribe方法
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
}

ObservableCreate作为Observable的子类,实现了其subscribeActual方法,创建时需要传入接口ObservableOnSubscribe,该接口作为连接了观察者与被观察者。通过实现其subscribe方法,来通知观察者事件发生。
当然,我们也可以看到在subscribeActual方法中通过使用CreateEmitter作为观察者的代理类,用于控制观察这事件是否需要通知。

static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {

    private static final long serialVersionUID = -3434801548987643227L;

    final Observer<? super T> observer;

    CreateEmitter(Observer<? super T> observer) {
        this.observer = observer;
    }

    @Override
    public void onNext(T t) {
        // 传入值不能为null
        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
        }
        // 如果没有被处理则调用
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }
    ......
}

CreateEmitter作为Observer的代理类,通知观察者时需要判断传入参数是否为空以及是否被处理来判断是否调用ObserveronNext(T t)方法。

同理,我们也可以知道通过just(T... t)也是通过这种方式创建,这里不详细讲了。

3. subscribe订阅

订阅方法作为观察者和被观察者连接的桥梁,通过该方法,我们可以获得被观察者的事件发送,以及接受该事件做出的响应。

Observable.java
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        // hook相关略过
        observer = RxJavaPlugins.onSubscribe(this, observer);
        // 判空
        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
        // 调用了Observable中的抽象方法subscribeActual,该方法的在ObservableCreate中实现
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        ......
        throw npe;
    }
}

订阅方法更加的简单,调用了其抽象方法subscribeActual,该方法在ObservableCreate中实现。我们再看下subscribeActual代码:

@Override
protected void subscribeActual(Observer<? super T> observer) {
    // 代理模式的运用
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);

    try {
        // 调用了ObservableOnSubscribe的subscribe方法
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

到这里整个事件发送就完全走通了:

  • 通过create方法创建Observable,传入匿名内部类ObservableOnSubscribe
  • 调用ObservableOnSubscribesubscribe方法,通过实现类中来发送数据。
  • 通过Observablesubscribe方法来将Observer订阅,在ObservableOnSubscribe调用了Observer的代理类CreateEmitter的方法来通知Observer的事件。
img_6ebaaee9de02aecdae0ee1055eacd2a7.png
调用过程

emmm,用更简单的话说就是ObservableOnSubscribesubscribe方法调用了接口Observer的方法

4. map变换

前面讲了关于Observable的创建以及事件的发送,这些都只是基本操作。RxJava的强大是对事件流的各种操作,比如过滤、变化以及线程切换等。这里我们看看我们常用的map(Function<? super T, ? extends R> mapper)来分析一下吧!

Observable.java
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    // 创建了ObservableMap,这里作为一个新的Observable对象返回,返回的是转换后的泛型R
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

ObservableMap.java
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;
    
    // 创建过程是需要传入第一个Observable以及一个Function接口
    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    // 调用subscribe时需要重写的方法
    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }

    // 转换的Observer
    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;
        
        // 需要将实际的Observer传入
        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            ......

            U v;

            try {
                // 这里调用了Function的apply方法,将T转成U
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            // 将转换后的U发送给Observer接收
            actual.onNext(v);
        }
        ......
    }
}

OK,源码就这些,代码量很少,理解起来也很容易。这里再总结下:

  • 创建新的Observable对象ObservableMap,并将当前Observable对象传入。
  • 调用新的Observable对象的subscribe方法,在ObservableMap中调用source.subscribe(new MapObserver<T, U>(t, function))
  • MapObserver实现了Observer接口,在其onNext方法中调用了Functionapply()方法,并且最终调用传入的ObserveronNext()方法。

如下图所示:


img_6f97d1beed2d46601987ad7d3864378f.png
流程

5. 线程切换

RxJava的强大之处不只是体现在各个操作符,也体现在线程的切换。RxJava默认工作在当前线程中,如果需要发送事件产生在新的线程,接收并处理事件在另一个线程怎么办?RxJava给我们提供了两个方法subscribeOn()observerOn()
subscribeOn()是指事件产生的线程,该方法只会在第一次设置有效。而observerOn()是指定事件处理的的线程,每调用一次就会切换线程。下面还是分开来说吧。

5.1 subscribeOn分析

上面说到subscribeOn作用于事件产生,并且只有第一个设置有效,我我们分析下源码:

Observable.java
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

ObservableSubscribeOn.java
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    // 这里还是老套路
    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

    // 新的Observer,最为原始Observer的代理
    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> actual;

        final AtomicReference<Disposable> s;

        SubscribeOnObserver(Observer<? super T> actual) {
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        }
        ......
        @Override
        public void onNext(T t) {
            actual.onNext(t);
        }
        ......
    }
    
    // 创建了subscribe任务,这里实现了runnable接口
    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            // source是上面传入的Observable对象
            source.subscribe(parent);
        }
    }
}

上面的步骤已经很熟悉了,通过该方法创建了一个新的Observable对象,最终执行subscribe()方法时调用了scheduler.scheduleDirect(new SubscribeTask(parent))SubscribeTask作为Runnable的实现类,所以我们最终会调用run()方法。其实到这里我们也就清楚为什么subscribeOn只有第一个设置有效了——不管创建多少个新的Observable对象,最终还会调用第一个Observable对象的subscribe方法,而该方法工作在该线程中!
我们接着看到底是怎么进行线程切换的(这里使用Schedulers.newThread())为例:

Schedulers.java
@NonNull
public static Scheduler newThread() {
    return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}
static {
    ......
    NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}

static final class NewThreadTask implements Callable<Scheduler> {
    @Override
    public Scheduler call() throws Exception {
        return NewThreadHolder.DEFAULT;
    }
}

static final class NewThreadHolder {
    static final Scheduler DEFAULT = new NewThreadScheduler();
}

NewThreadScheduler.java
public final class NewThreadScheduler extends Scheduler {

    final ThreadFactory threadFactory;

    private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
    private static final RxThreadFactory THREAD_FACTORY;

    /** The name of the system property for setting the thread priority for this Scheduler. */
    private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";

    static {
        int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
                Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY)));

        THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
    }

    public NewThreadScheduler() {
        this(THREAD_FACTORY);
    }

    public NewThreadScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
    }

    @NonNull
    @Override
    public Worker createWorker() {
        return new NewThreadWorker(threadFactory);
    }
}

NewThreadWorker.java
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    // 线程池,可以实现循环或延迟任务。
    private final ScheduledExecutorService executor;

    volatile boolean disposed;
    
    public NewThreadWorker(ThreadFactory threadFactory) {
        // 创建方法
        executor = SchedulerPoolFactory.create(threadFactory);
    }

    @NonNull
    @Override
    public Disposable schedule(@NonNull final Runnable run) {
        return schedule(run, 0, null);
    }

    @NonNull
    @Override
    public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
        if (disposed) {
            return EmptyDisposable.INSTANCE;
        }
        return scheduleActual(action, delayTime, unit, null);
    }

    @NonNull
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        // 创建一个新的Runnable,最终仍然调用传入的run方法,这里不去看了
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }

        Future<?> f;
        try {
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }

        return sr;
    }
    ......
}

通过上面的代码,我们可以看到最终通过线程池调用SubscribeTaskrun方法,从而达到线程切换的目的。

5.2 observerOn实现

关于observerOn的实现其实和subscribeOn很像,只不过一个是在新的线程中调用source.subscribe(parent)方法;而observerOn是在新的线程调用相应的onNext\onError等方法,所以我们没调用一次observerOn就会切换一次线程,并且下面的操作都会工作在紧挨着该操作的observerOn所指定的线程中。

6. 总结

RxJava的基本原理就是这些,RxJava使用的较多的模式就是代理模式。整个源码不难(没有包括背压以及高级用法),用心看仔细梳理就能很好理解。我所认为的理解可能就是看懂了,然后可以根据它来仿写出一个相似的RxJava,这样我觉得是真正的掌握了。RxJava, Out!

目录
相关文章
|
3月前
|
安全 Android开发
你是否了解 RxJava 的 Disposable ?
你是否了解 RxJava 的 Disposable ?
62 0
|
20天前
|
JSON Java 数据格式
rxjava2+retrofit2
rxjava2+retrofit2
27 1
RxJava2 中 doFinally 和 doAfterTerminate 的比较
RxJava2 中 doFinally 和 doAfterTerminate 的比较
236 0
|
JSON Java 数据格式
rxjava2+retrofit2 简介
rxjava2+retrofit2 简介
74 0
|
数据处理
RxJava2实现RxBus
RxJava2实现RxBus
143 0
|
安全 Android开发
详解 RxJava 的 Disposable
RxJava2 的 Disposable,可以在适当时机取消订阅、截断数据流,避免 Android 中的内存泄露。
975 0
|
Java Go Android开发
RxJava2
函数式编程是一种编程范式。我们常见的编程范式有命令式编程、函数式编程和逻辑式编程。我们常见的面向对象编程是一种命令式编程。命令式编程是面向计算机硬件的抽象,有变量、赋值语句、表达式和控制语句。而函数式编程是面向数学的抽象,将计算描述为一种表达式求值,函数可以在任何地方定义,并且可以对函数进行组合。响应式编程是一种面向数据流和变化传播的编程范式,数据更新是相关联的。把函数式编程里的一套思路和响应式编程合起来就是函数响应式编程。函数响应式编程可以极大地简化项目,特别是处理嵌套回调的异步事件、复杂的列表过滤和变换或者时间相关问题。在Android开发中使用函数响应式编程的主要有两大框架:
144 0
RxJava2
|
设计模式 存储 Java
XTask与RxJava的使用对比
XTask与RxJava的使用对比
134 0
XTask与RxJava的使用对比
|
负载均衡 算法 Java
RxJava 并行操作
RxJava 并行操作
375 0
RxJava 并行操作
|
Java API
RxJava 之 ParallelFlowable
RxJava 之 ParallelFlowable
216 0
RxJava 之 ParallelFlowable