reactor3 源码分析

  1. 云栖社区>
  2. 博客>
  3. 正文

reactor3 源码分析

快乐崇拜007 2019-03-19 10:13:22 浏览561
展开阅读全文

本文需要知识点

lambda表达式

先写一个lambda表达式的小例子,注意,本文不是讲lambda表达式,所以这里仅仅是能够看到接下来的文章做铺垫而已。

@Test
public void test11() {
    Function<Integer, Integer> mapper = i -> i * i;//定义lambda表达式
    Integer result = mapper.apply(2);  //执行
    System.out.println(result);
}

输出结果是4
自定义lambda

//定义lambda函数式接口
@FunctionalInterface
public interface MyLambda {
    int add(int a, int b);
}

@Test
public void test12() {
    //使用自定义的lambda
    MyLambda myLambda = (a, b) -> a + b;
    result = myLambda.add(2 ,3);
    System.out.println(result);
}

输出结果5

reactor源码分析

分析之前,先看一下reactor提供的顶级抽奖接口
image

发布者
Publisher 是一个可以发送无限序列元素的发布者。他可以根据订阅者的要求来发布他们。

//发布者
public interface Publisher<T> {
    /**
     * 订阅方法
     * 请求发布者启动数据流
     * @param s   消费者
     */
    public void subscribe(Subscriber<? super T> s);
}

org.reactivestreams.Publisher#subscribe是一个工厂方法,允许调用多次,每次调用都会启动一个新的Subscription。每个Subscription只能被一个Subscriber使用;Subscriber消费者只能订阅一次Publisher。如果在执行过程中出错,则会发出error信号。

订阅者(消费者)

public interface Subscriber<T> {
    /**
     * 该方法在调用Publisher#subscribe(Subscriber)后执行
     * 在Subscription#request(long)调用之前不会有数据流消费
     * 如果订阅者想要消费更多的数据,需要调用Subscription#request(long)来请求数据。
     */
    public void onSubscribe(Subscription s);
    /**
     * 消费下一个消息
     * 在调用Subscription#request(long)方法时,Publisher会通过这个方法来通知订阅者消息
     * 
     * @param t 数据元素
     */
    public void onNext(T t);
    /**
     * 出错
     */
    public void onError(Throwable t);
    /**
     * 执行完成
     */
    public void onComplete();
}
  • 通过org.reactivestreams.Subscriber#onSubscribe官方注释可以看出,在调用onSubscribe方法之前,数据流不会消费。如果想要消费必须有消费者调用才行。这种模式是冷数据流模式。
  • org.reactivestreams.Subscriber#onSubscribe只会被调用一次
  • errorComplete是结束终止信号,当发出该信号之后不会再有任何新的信号发出
  • onNext方法可以被调用一次或多次,最多被调用的次数由Subscription#request(long)的参数决定

Subscription

public interface Subscription {
    public void request(long n);
    public void cancel();
}
  • Subscription 的生命周期是订阅者对发布者的一次消费。
  • 一个Subscription只能被一个Subscriber使用
  • Subscription不仅允许请求数据,也允许取消对数据的请求,并且支持资源清理
  • 在通过org.reactivestreams.Subscription#request发送需求信号之前,不会有任何事件产生。该方法请求的数量最大是Long.MAX_VALUE

Processor

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
  • Processor实现了一种即是发布者又是消费者的组件。后续再讲map的时候会讲到。

先从最简单的实例开始入手。

@Test
public void test13() {
    Flux.just(1, 2, 3, 4, 5)
            .subscribe(new CoreSubscriber<>() {//这里传入CoreSubscriber对象作为订阅者
                @Override
                public void onSubscribe(Subscription s) {
                    log.info("onSubscribe, {}", s.getClass());
                    s.request(5);
                }
                @Override
                public void onNext(Integer integer) {
                    log.info("onNext: {}", integer);
                }
                @Override
                public void onError(Throwable t) {
                }
                @Override
                public void onComplete() {
                    log.info("onComplete");
                }
            });
}

运行程序,输出:

17:24:57.303 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
17:24:57.308 [main] INFO MyTest - onSubscribe, class reactor.core.publisher.FluxArray$ArraySubscription
17:24:57.310 [main] INFO MyTest - onNext: 1
17:24:57.310 [main] INFO MyTest - onNext: 2
17:24:57.310 [main] INFO MyTest - onNext: 3
17:24:57.310 [main] INFO MyTest - onNext: 4
17:24:57.310 [main] INFO MyTest - onNext: 5
17:24:57.310 [main] INFO MyTest - onComplete

Flux是一个发布者,他实现了Publisher接口。我们看just方法

public static <T> Flux<T> just(T... data) {
    return fromArray(data);
}
public static <T> Flux<T> fromArray(T[] array) {
    if (array.length == 0) {
        return empty();
    }
    if (array.length == 1) {
        return just(array[0]);
    }
    return onAssembly(new FluxArray<>(array));
}

可以看到,它返回的是一个FluxArray对象,将我们传进去的参数封装为一个数组。

final class FluxArray<T> extends Flux<T> implements Fuseable, SourceProducer<T> {

    final T[] array;//保存数据

    public FluxArray(T... array) {
        this.array = Objects.requireNonNull(array, "array");
    }
    
    public static <T> void subscribe(CoreSubscriber<? super T> s, T[] array) {
        if (array.length == 0) {
            Operators.complete(s);
            return;
        }
        if (s instanceof ConditionalSubscriber) {
            s.onSubscribe(new ArrayConditionalSubscription<>((ConditionalSubscriber<? super T>) s, array));
        }
        else {//上面demo会走这个分支
             //这里的onSubscribe调用的就是demo中我们自定义的CoreSubscriber中的onSubscribe方法,方法内会打印日志并且调用request请求数据
            s.onSubscribe(new ArraySubscription<>(s, array));
        }
    }

//上面示例demo中调用的subscribe方法就是这个;actual是我们new的CoreSubscriber,array是我们初始化的1,2,3,4,5数组
    @Override
    public void subscribe(CoreSubscriber<? super T> actual) {
        subscribe(actual, array);
    }

    static final class ArraySubscription<T>
            implements InnerProducer<T>, SynchronousSubscription<T> {
    //省略
    }
    //省略
}

FluxArray继承了Flux也是一个发布者。内部有一个final 的属性array用于存储数据。
publisher#subscribe(Subscriber<? super T>)方法中调用Subscriber#onSubscribe()是固定流程。在上面org.reactivestreams.Subscriber#onSubscribe有介绍到。
接下来我们看一下Subscription。这里使用的是ArraySubscription

static final class ArraySubscription<T>
            implements InnerProducer<T>, SynchronousSubscription<T> {
        final CoreSubscriber<? super T> actual;//我们自定义的CoreSubscriber
        final T[] array;//存储数据
        int index;
        //下面这两个都是volatile ,保证了可见性
        volatile boolean cancelled;//流是否已经取消
        volatile long requested;//记录请求了多少个
        //使用AtomicLongFieldUpdater将requested进行了封装,保证了原子性更新
        @SuppressWarnings("rawtypes")
        static final AtomicLongFieldUpdater<ArraySubscription> REQUESTED =
                AtomicLongFieldUpdater.newUpdater(ArraySubscription.class, "requested");

        ArraySubscription(CoreSubscriber<? super T> actual, T[] array) {
            this.actual = actual;
            this.array = array;
        }

        @Override
        public void request(long n) {
            if (Operators.validate(n)) {
                if (Operators.addCap(REQUESTED, this, n) == 0) {//每次请求request都会更新requested属性的值
                    if (n == Long.MAX_VALUE) {
                        fastPath();
                    }
                    else {
                        slowPath(n);
                    }
                }
            }
        }

        void slowPath(long n) {
            final T[] a = array;
            final int len = a.length;
            final Subscriber<? super T> s = actual;

            int i = index;
            int e = 0;

            for (; ; ) {
                if (cancelled) {//每次都会判断事件流是否已经取消
                    return;
                }

                while (i != len && e != n) {
                    T t = a[i];

                    if (t == null) {
                        s.onError(new NullPointerException("The " + i + "th array element was null"));
                        return;
                    }
                     //调用我们自定义的CoreSubscriber.onNext()方法来处理消息。
                    s.onNext(t);

                    if (cancelled) {
                        return;
                    }

                    i++;
                    e++;
                }

                if (i == len) {//数据已经全部被消费,发送完成信号。我们这个demo中一次性取了5个数据,直接取完,所以到了这里就直接发送完成信号结束了。
                    s.onComplete();
                    return;
                }

                n = requested;

                if (n == e) {
                    index = i;
                    n = REQUESTED.addAndGet(this, -e);
                    if (n == 0) {
                        return;
                    }
                    e = 0;
                }
            }
        }
        //取消
        @Override
        public void cancel() {
            cancelled = true;
        }
    }

ArraySubscription定义了订阅消费数据的过程,在一次订阅的生命周期中,通过不断回调onNext方法来消费消息。消费完成后,通过回调onComplete()方法结束整个流程,也就是通常说的发出完成信号。
整个流程是采用的观察者模式,通过对象的传递与回调来实现的发布与消费,时序图与类图如下:
image

好了,上面介绍了关键的Publisher,Subscriber及Subscription。接下来看一下map这种即是订阅者又充当发布者的角色。
我们把上面的demo修改一下:

@Test
publicvoid test13() {
    Flux.just(1, 2, 3, 4, 5)
            .map(i -> i * i)
            .subscribe(new CoreSubscriber<>() {
                //省略,与之前相同
            });
}

直接跟进到map方法:

//这里mapper是map的参数,lambda表达式 : i->i*i
public final <V> Flux<V> map(Function<? super T, ? extends V> mapper) {
    if (this instanceof Fuseable) {
        //this是FluxArray, 他实现了Fuseable接口,所以走这一步
        return onAssembly(new FluxMapFuseable<>(this, mapper));
    }
    return onAssembly(new FluxMap<>(this, mapper));
}

看一下FluxMapFuseable<T, R>类, T 是源数据, R是经过计算后的结果数据。

final class FluxMapFuseable<T, R> extends FluxOperator<T, R> implements Fuseable {
    //lambda表达式:需要执行的计算
    final Function<? super T, ? extends R> mapper;
    
    FluxMapFuseable(Flux<? extends T> source,
            Function<? super T, ? extends R> mapper) {
        super(source);//这里source是fluxArray对象,
        this.mapper = Objects.requireNonNull(mapper, "mapper");
    }

    @Override
    @SuppressWarnings("unchecked")
    public void subscribe(CoreSubscriber<? super R> actual) {
        if (actual instanceof ConditionalSubscriber) {
            ConditionalSubscriber<? super R> cs = (ConditionalSubscriber<? super R>) actual;
            source.subscribe(new MapFuseableConditionalSubscriber<>(cs, mapper));
            return;
        }
        //这里source是fluxArray对象,
        source.subscribe(new MapFuseableSubscriber<>(actual, mapper));
    }

    static final class MapFuseableSubscriber<T, R>
            implements InnerOperator<T, R>,
                       QueueSubscription<R> {

        final CoreSubscriber<? super R>        actual;
        final Function<? super T, ? extends R> mapper;

        boolean done;
        QueueSubscription<T> s;
        int sourceMode;

        MapFuseableSubscriber(CoreSubscriber<? super R> actual,
                Function<? super T, ? extends R> mapper) {
            this.actual = actual;//这个actual是demo实例中我们自己创建的CoreSubscriber对象
            this.mapper = mapper;//map中的lambda表达式
        }

        //当调用FluxArray的subscribe后会调用这个onSubscribe方法
        public void onSubscribe(Subscription s) {
            if (Operators.validate(this.s, s)) {
                this.s = (QueueSubscription<T>) s;
                actual.onSubscribe(this);
            }
        }
        
        //调用FluxArray的request方法后,会调用这里的OnNext方法,这里充当了消费者的角色
        @Override
        public void onNext(T t) {
            if (sourceMode == ASYNC) {
                actual.onNext(null);
            }
            else {
                if (done) {
                    Operators.onNextDropped(t, actual.currentContext());
                    return;
                }
                R v;

                try {//这里使用mapper.apply(t)来执行demo中的lambda表达式,v是执行结果。这里充当了消费者的角色
                    v = Objects.requireNonNull(mapper.apply(t),
                            "The mapper returned a null value.");
                }
                catch (Throwable e) {
                    Throwable e_ = Operators.onNextError(t, e, actual.currentContext(), s);
                    if (e_ != null) {
                        onError(e_);
                    }
                    else {
                        s.request(1);
                    }
                    return;
                }
                //调用demo中我们自己创建的CoreSubscriber的onNext方法,这里其实充当了发布者的角色
                actual.onNext(v);
            }
        }
        //省略部分代码

        @Override
        public void onComplete() {
            if (done) {
                return;
            }
            done = true;
            
            actual.onComplete();
        }
        
        @Override
        public void request(long n) {
        //这个s是
            s.request(n);
        }
        @Override
        public void cancel() {
            s.cancel();
        }
        //省略部分代码
    }
}

上面可以看到,map的执行过程中,并不会产生数据,只是传送数据过程中对其执行一些计算。map即是消费者也是发布者。这个map的调用链路比较长,画个图清晰一下:
image

好了,相必有上面的图,再看这段代码就清晰多了。这里有很多对象的传递,回调,比较复杂。

网友评论

登录后评论
0/500
评论
快乐崇拜007
+ 关注