RxJava2学习笔记(3)

简介: 接上回继续,今天来学习下zip(打包)操作 一、zip操作 @Test public void zipTest() { Observable.zip(Observable.

上回继续,今天来学习下zip(打包)操作

一、zip操作

    @Test
    public void zipTest() {
        Observable.zip(Observable.create(emitter -> {
            for (int i = 0; i < 10; i++) {
                emitter.onNext(100 + i);
            }
        }), Observable.create(emitter -> {
            for (int i = 0; i < 5; i++) {
                emitter.onNext(new Character((char) (65 + i)));
            }
        }), (integer, character) -> integer + "" + character).subscribe(s -> System.out.println(s));
    }

zip字面意义,就是打包操作,把多个Obserable合并在一起,形成一个新的Obserable,类似文件1、文件2 ... 文件n,合成一个新文件。上面这段代码的输出:

100A
101B
102C
103D
104E

第1个生产者,发射了10个数字(100~109),第1个生产者发射了5个字符(A~E),合并处理时,会把 “数字+字符",变成一个新字符串,然后继续发射。注意:这里有一个类似"木桶原理",即决定一个木桶能盛多少水的,永远是最短的那块木头。10发A型子弹 + 5发B型子弹,按1:1来合成,最终只有得到5发新型子弹。

 

二、限流

生产者-消费者模型中,有可能会遇到这样一种情况:生产者精力旺盛,狂生产数据,然后消费者力不从心,根本来不及处理,这样上游就堵住了,严重的话,可能导致内存耗尽。最简单的办法,就是把来不及处理的内容给扔掉(即:丢弃策略)。刚刚提到的zip操作中的木桶原理,就可以派上用场了。

    @Test
    public void zipTest1() throws InterruptedException {
        Observable.zip(Observable.create(emitter -> {
            for (int i = 0; ; i++) { //一直不停的发
                emitter.onNext(i);
            }
        }).subscribeOn(Schedulers.newThread()), Observable.create(emitter -> {
            for (int i = 0; i < 5; i++) {
                emitter.onNext(0); //这里技巧性的处理:发1个0过去
            }
        }).subscribeOn(Schedulers.newThread()),
                (BiFunction<Object, Object, Object>) (i1, i2) -> (Integer) i1 + (Integer) i2) //1个数字+0,不影响原值
                .subscribe(integer -> System.out.println(integer));

        Thread.sleep(200);
    }

  输出:

0
1
2
3
4

  如果是字符串,可以参考下面这样处理:

        Observable.zip(Observable.create(emitter -> {
                    for (int i = 0; ; i++) {
                        emitter.onNext("A" + i);
                    }
                }).subscribeOn(Schedulers.newThread()), Observable.create(emitter -> {
                    for (int i = 0; i < 5; i++) {
                        emitter.onNext("");
                    }
                }).subscribeOn(Schedulers.newThread()),
                (BiFunction<Object, Object, Object>) (i1, i2) -> (String) i1 + (String) i2)
                .subscribe(s -> System.out.println(s));
        Thread.sleep(200);

  输出:

A0
A1
A2
A3
A4

 

三、Flowable

刚才用zip这种"奇淫技巧"实现了限流,但其实rxjava还有更科学的做法(Flowable)。再思考一下“限流”这种场景,生产者太猛,一下喷出来的量太多,而消费者太弱,完全吸收不下。比较温和的方式,最好是生产者喷发前先问下消费者,你1次能接承受多大的量?我根据你的能力来

作者: 菩提树下的杨过
出处: http://yjmyzz.cnblogs.com
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
目录
相关文章
|
3月前
|
安全 Android开发
你是否了解 RxJava 的 Disposable ?
你是否了解 RxJava 的 Disposable ?
72 0
RxJava2 中 doFinally 和 doAfterTerminate 的比较
RxJava2 中 doFinally 和 doAfterTerminate 的比较
236 0
|
数据处理
RxJava2实现RxBus
RxJava2实现RxBus
143 0
|
安全 Android开发
详解 RxJava 的 Disposable
RxJava2 的 Disposable,可以在适当时机取消订阅、截断数据流,避免 Android 中的内存泄露。
982 0
|
Java 数据库 UED
RxJava的简介
RxJava的简介
239 0
RxJava的简介
|
Java Go Android开发
RxJava2
函数式编程是一种编程范式。我们常见的编程范式有命令式编程、函数式编程和逻辑式编程。我们常见的面向对象编程是一种命令式编程。命令式编程是面向计算机硬件的抽象,有变量、赋值语句、表达式和控制语句。而函数式编程是面向数学的抽象,将计算描述为一种表达式求值,函数可以在任何地方定义,并且可以对函数进行组合。响应式编程是一种面向数据流和变化传播的编程范式,数据更新是相关联的。把函数式编程里的一套思路和响应式编程合起来就是函数响应式编程。函数响应式编程可以极大地简化项目,特别是处理嵌套回调的异步事件、复杂的列表过滤和变换或者时间相关问题。在Android开发中使用函数响应式编程的主要有两大框架:
144 0
RxJava2
|
设计模式 存储 Java
XTask与RxJava的使用对比
XTask与RxJava的使用对比
135 0
XTask与RxJava的使用对比
|
负载均衡 算法 Java
RxJava 并行操作
RxJava 并行操作
377 0
RxJava 并行操作
|
Java API
RxJava 之 ParallelFlowable
RxJava 之 ParallelFlowable
218 0
RxJava 之 ParallelFlowable
|
Java 调度 安全
Rxjava深入理解之自己动手编写Rxjava
Demo的源码地址在 mini-rxjava, 有兴趣的可以下载源码来看. 从观察者模式说起 观察者模式,是我们在平时使用的比较多的一种设计模式.观察者模式定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象。
1146 0