初识RxJava(四)组合类 操作符

  1. 云栖社区>
  2. 博客>
  3. 正文

初识RxJava(四)组合类 操作符

吾乃韩小呆 2018-11-09 22:53:17 浏览459
展开阅读全文

前言:

前面已经记录了 三种操作符,下面开始第四种操作符的相关使用笔记,每天学一点没什么坏处,而且现在 RxJava 并不是什么新鲜玩意,都到现在了,还不知道 RxJava 怎么使用,那么 笔者请你 打开 Boss直聘 app 看看 Android 的招聘信息,你就知道是什么了。不墨迹了,开始写笔记。

正文:

1、zip 操作符

1)、作用

将多个 被观察者对象 通过一定的 骚操作 组合到一起,发射给 观察者的;
观察者接收到的数据是 按照严格的顺序;
结束点是 数据少的被被观察者对象 的数据队列。

2.1)、代码
 /**
     * zip  操作符
     */
    private void zipMethod() {
        Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                logDUtils("Integer:"+1);
                emitter.onNext(1);
                logDUtils("Integer:"+2);
                emitter.onNext(2);
                logDUtils("Integer:"+3);
                emitter.onNext(3);
                logDUtils("Integer:"+4);
                emitter.onNext(4);
                emitter.onComplete();
            }

        });

        Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                logDUtils("String: aaa");
                emitter.onNext("aaa");
                logDUtils("String: bbb");
                emitter.onNext("bbb");
                logDUtils("String: ccc");
                emitter.onNext("ccc");
                emitter.onComplete();
            }
        });

        Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
            @Override
            public String apply(Integer integer, String s) throws Exception {
                return integer + s;
            }
        }).subscribe(new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {
                logDUtils("onSubscribe:");
            }

            @Override
            public void onNext(String s) {
                logDUtils("onNext:" + s);
            }

            @Override
            public void onError(Throwable e) {
                logDUtils("onError:" + e.getMessage());
            }

            @Override
            public void onComplete() {
                logDUtils("onComplete:");
            }
        });
    }
2.2)、效果

主线程运行效果

仔细观察上图输出结果,有没有发现感觉好像很奇异,所有组合全是 第一个 被观察者发射数据之后,第二个被观察者每发射一个,组合一个 ,观察者接收一个。为什么会产生这样的原因呢,因为大家都运行在主线程啊,得排队。具体原因 建议去看 笔者崇拜的大佬的文章 给初学者的RxJava2.0教程(四)

如何解决这个尴尬的问题呢,那就是在不同线程执行咯,show code

3.1)、代码

    /**
     * zip  操作符
     */
    private void zipMethod() {
        Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                logDUtils("Integer:"+1);
                emitter.onNext(1);
                Thread.sleep(1000);
                logDUtils("Integer:"+2);
                emitter.onNext(2);
                Thread.sleep(1000);
                logDUtils("Integer:"+3);
                emitter.onNext(3);
                Thread.sleep(1000);
                logDUtils("Integer:"+4);
                emitter.onNext(4);
                Thread.sleep(1000);
                logDUtils("Integer: onComplete");
                emitter.onComplete();
            }

        }).subscribeOn(Schedulers.io());

        Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                logDUtils("String: aaa");
                emitter.onNext("aaa");
                Thread.sleep(1000);
                logDUtils("String: bbb");
                emitter.onNext("bbb");
                Thread.sleep(1000);
                logDUtils("String: ccc");
                emitter.onNext("ccc");
                Thread.sleep(1000);
                logDUtils("String: onComplete");
                emitter.onComplete();
            }
        }).subscribeOn(Schedulers.io());

        Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
            @Override
            public String apply(Integer integer, String s) throws Exception {
                return integer + s;
            }
        }).subscribe(new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {
                logDUtils("onSubscribe:");
            }

            @Override
            public void onNext(String s) {
                logDUtils("onNext:" + s);
            }

            @Override
            public void onError(Throwable e) {
                logDUtils("onError:" + e.getMessage());
            }

            @Override
            public void onComplete() {
                logDUtils("onComplete:");
            }
        });

    }
3.2)、效果

效果

2、combineLatest 操作符

1)、作用

从两个 被观察对象内 将先发射数据的 被观察对象的最新一个数据与另一个 被观察对象每个数据进行结合,之后发射给 观察者

2)、代码
  /**
     * combineLatest 操作符
     */
    private void combineLatestMethod() {
        Observable.combineLatest(Observable.just("a", "b", "c"), Observable
                //从1 开始发射数据 连续发射 4个 开始发射延迟1秒  发射开始后时间间隔为1秒
                .intervalRange(1, 4, 1, 1, TimeUnit.SECONDS), new
                BiFunction<String, Long, String>() {
                    @Override
                    public String apply(String s, Long l) throws Exception {
                        return s + l;
                    }
                }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                logDUtils("onSubscribe");
            }

            @Override
            public void onNext(String s) {
                logDUtils("onNext:" + s);
            }

            @Override
            public void onError(Throwable e) {
                logDUtils("onError:" + e.getMessage());
            }

            @Override
            public void onComplete() {
                logDUtils("onComplete");
            }
        });
    }
3)、效果

效果

3、reduce 操作符

1)、作用

将发射来的数据进行组合,一个被观察对象发射 的数据。

2)、代码
 /**
     * reduce 操作符
     */
    @SuppressLint("CheckResult")
    private void reduceMethod() {
        
        String[] str = {"a", "b", "c", "d"};
        Observable.fromArray(str).reduce(new BiFunction<String, String, String>() {
            @Override
            public String apply(String s, String s2) throws Exception {
                logDUtils(s+" 与 "+s2+" 组合");
                return s + s2;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                logDUtils("accept:" + s);
            }
        });
    }
3)、效果

效果

4、collect 操作符

1)、作用

将多个 被观察对象发射的数据 进行组合,添加到一个指定的容器内,有一点像 list 的 add

2)、代码

    /**
     * collect 修饰符
     */
    @SuppressLint("CheckResult")
    private void collectMethod() {
        Observable.just("a", "b", "ccc", "d").collect(new Callable<ArrayList<String>>() {
            @Override
            public ArrayList<String> call() throws Exception {
                //创建收集容器
                return new ArrayList<>();
            }
        }, new BiConsumer<ArrayList<String>, String>() {
            @Override
            public void accept(ArrayList<String> strings, String s) throws Exception {
                //将数据 添加到容器  发射
                strings.add(s);
            }
        }).subscribe(new Consumer<ArrayList<String>>() {
            @Override
            public void accept(ArrayList<String> strings) throws Exception {
                logDUtils(Arrays.toString(strings.toArray()));
            }
        });
    }
3)、效果

效果

5、concat 操作符 和 concatArray 操作符

1)、作用

组合多个被观察者数据然后一起发送,合并后 按发送数据有序

concat 只能组合 小于等于 4 组被观察者的数据、concatArray 可以组合大于 4 组被观察者对象

2)、代码
   /**
     * concat 操作符 和  concatArray 操作符
     */
    private void concatMethod() {
        Observable
                .concat(Observable.just(1, 2, 3, 4), Observable.just("a", "b", "c"),
                        Observable.just(0, 9, 55, 99), Observable.just("一", "二"))
                .subscribe(new Observer<Serializable>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        logDUtils("onSubscribe:" + d);
                    }

                    @Override
                    public void onNext(Serializable serializable) {
                        logDUtils("onNext:" + serializable.toString());
                    }

                    @Override
                    public void onError(Throwable e) {
                        logDUtils("onError:" + e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        logDUtils("onComplete:");
                    }
                });
    }
3)、效果

效果

6、merge操作符 和 mergeArray 操作符

1)、作用

使用时间为节点 同一个时间节点 内数据一同发射;
执行完最长的 被观察者 终止发射;

2)、代码
/**
     * merge 操作符 和 mergeArray 操作符
     */
    private void mergeMethod() {
        Observable.merge(Observable.intervalRange(2, 5, 1, 1, TimeUnit.SECONDS)
                , Observable.intervalRange(0, 8, 2, 3, TimeUnit.SECONDS))
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        logDUtils("onSubscribe:");
                    }

                    @Override
                    public void onNext(Long aLong) {
                        logDUtils("onNext:" + aLong);
                    }

                    @Override
                    public void onError(Throwable e) {
                        logDUtils("onError:" + e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        logDUtils("onComplete:");
                    }
                });
    }
3)、效果

效果图

7、startWith 操作符 和 startWithArray 操作符

1)、作用

在一个被观察者发射事件前,发射一些数据 / 一个新的被观察者对象

2)、代码
  /**
     * startWith 操作符 和  startWithArray 操作符
     */
    private void startMethod() {
        Observable.just("a", "b", "c").startWith("11")
                .startWithArray("99", "666").subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                logDUtils("onSubscribe:");
            }

            @Override
            public void onNext(String s) {
                logDUtils("onNext:" + s);
            }

            @Override
            public void onError(Throwable e) {
                logDUtils("onError:" + e.getMessage());
            }

            @Override
            public void onComplete() {
                logDUtils("onComplete:");
            }
        });
    }

3)、效果

效果图

8、count 操作符

1)、作用

统计被观察对象 发送数据的 数量

2)、代码
 /**
     * count 操作符
     */
    @SuppressLint("CheckResult")
    private void countMethod() {
        Observable.just(1, 2, 3, 4).count().subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                logDUtils("发射数据数量:" + aLong);
            }
        });
    }
3)、效果

效果

9、concatDelayError 操作符 和 mergeDelayError 操作符 和 combineLatestDelayError 操作符

作用:

保证合并中的 代码一方出现错误,另一位的合并项科研继续运行 分别用于 concat 、merge 、combineLatest操作符。

网友评论

登录后评论
0/500
评论
吾乃韩小呆
+ 关注