RxJava2学习笔记(1)

简介: 作为github上star数极高的响应式编程java扩展类库,rxjava是啥就不多说了,网上能查到一堆介绍,下面是一些学习记录: 前提依赖: compile 'io.reactivex.rxjava2:rxjava:2.1.9' 一、Observable 1.1 hello world rxjava中的核心思路是“生产者-消费者”模型,生产者的java类通常用xxxEmitter命名,字面意思:发射器,可以想象为一个机关枪,一直biu biu biu的向外发射信息,另一端则是靶子(也就是消费者),在不停的接收。

作为github上star数极高的响应式编程java扩展类库,rxjava是啥就不多说了,网上能查到一堆介绍,下面是一些学习记录:

前提依赖:

compile 'io.reactivex.rxjava2:rxjava:2.1.9'

一、Observable

1.1 hello world

rxjava中的核心思路是“生产者-消费者”模型,生产者的java类通常用xxxEmitter命名,字面意思:发射器,可以想象为一个机关枪,一直biu biu biu的向外发射信息,另一端则是靶子(也就是消费者),在不停的接收。不过要注意的是:rxjava中,能接收子弹的靶子,可以同时有多个。

        Observable<String> observable = Observable.create(emitter -> {
            emitter.onNext("a");
            emitter.onNext("b");
            emitter.onNext("c");
            emitter.onComplete();
        });

        Observer observer1 = new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                System.out.println("subscribe=>");
            }

            @Override
            public void onNext(@NonNull String s) {
                System.out.println(s + " ");
            }

            @Override
            public void onError(@NonNull Throwable e) {
                System.out.println(e.getMessage());
            }

            @Override
            public void onComplete() {
                System.out.println("complete");
            }
        };

        observable.subscribe(observer1);

 输出:

subscribe=>
a 
b 
c 
complete

注:最后一行,也可以改成

observable.subscribe(observer1);
observable.subscribe(observer1);

这样就相当于2个靶子在接子弹了。 上面这是最正统的写法,官方推荐使用链式编程写法:

        Observable.create((ObservableOnSubscribe<String>) emitter -> {
            emitter.onNext("a");
            emitter.onNext("b");
            emitter.onNext("c");
            emitter.onComplete();
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                System.out.println("subscribe=>");
            }

            @Override
            public void onNext(@NonNull String s) {
                System.out.println(s + " ");
            }

            @Override
            public void onError(@NonNull Throwable e) {
                System.out.println(e.getMessage());
            }

            @Override
            public void onComplete() {
                System.out.println("complete");
            }
        });

1.2 onComplete事件

emitter发送onComplete消息后,挨打的靶子(消费者),就不再继续处理了,不管后面emitter是否还继续发送。

Observable.create((ObservableOnSubscribe<String>) emitter -> {
    emitter.onNext("a");
    emitter.onNext("b");
    emitter.onNext("c");
    emitter.onComplete(); //这里主动通知消费者complete
    System.out.println("complete后,emitter还继续发射...");
    emitter.onNext("d");
}).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {
        System.out.println("subscribe=>");
    }

    @Override
    public void onNext(@NonNull String s) {
        System.out.println(s + " ");
    }

    @Override
    public void onError(@NonNull Throwable e) {
        System.out.println(e.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("complete");
    }
});

 输出:

subscribe=>
a 
b 
c 
complete
complete后,emitter还继续发射...

注:onComplete之后,emitter再次发送的"d",消费者已经不再处理了。

1.3 onError事件

onError即可以在emitter(生产者)端报错,也可以在靶子(消费者)上报错,不管哪一端发生error,消费者就停止处理了。

Observable.create((ObservableOnSubscribe<String>) emitter -> {
    emitter.onNext("a");
    emitter.onError(new Throwable("emitter报了个错!"));
    System.out.println("准备发送c");
    emitter.onNext("c");
    emitter.onComplete();
}).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {
        System.out.println("subscribe=>");
    }

    @Override
    public void onNext(@NonNull String s) {
        System.out.println(s + " ");
    }

    @Override
    public void onError(@NonNull Throwable e) {
        System.out.println(e.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("complete");
    }
});

 输出:

subscribe=>
a 
emitter报了个错!
准备发送c

下面模拟下消费者处理时,发生异常:

Observable.create((ObservableOnSubscribe<String>) emitter -> {
    emitter.onNext("a");
    emitter.onNext("b");
    emitter.onNext("c");
    emitter.onComplete();
}).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {
        System.out.println("subscribe=>");
    }

    @Override
    public void onNext(@NonNull String s) {
        if (s.equals("b")) {
            int a = 0;
            int b = 1;
            System.out.println((b / a));
        }
        System.out.println(s + " ");
    }

    @Override
    public void onError(@NonNull Throwable e) {
        System.out.println("error:" + e.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("complete");
    }
});

输出:

subscribe=>
a
error:/ by zero

1.4 disposable

如果消费者主动dispose()后,相当于就解除了生产者-消费者的关系。

Observable.create((ObservableOnSubscribe<String>) emitter -> {
    emitter.onNext("a");
    emitter.onNext("b");
    System.out.println("准备发送c");
    emitter.onNext("c");
    emitter.onComplete();
}).subscribe(new Observer<String>() {

    Disposable disposable;

    @Override
    public void onSubscribe(@NonNull Disposable d) {
        disposable = d;
        System.out.println("subscribe=>");
    }

    @Override
    public void onNext(@NonNull String s) {
        if (s.equals("b")) {
            disposable.dispose();
        }
        System.out.println(s + " ");
    }

    @Override
    public void onError(@NonNull Throwable e) {
        System.out.println("error:" + e.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("complete");
    }
});

 上面的代码,消费者在遇到b时,主动切断了与生产者的关联,emitter再发送的d,消费者就不处理了,输出:

subscribe=>
a 
b 
准备发送c

1.5 大道至简

如果消费者只关心onNext的处理部分,其它无所谓,上面这一堆代码,可以简化为一行:

Observable.fromArray("a", "b", "c").subscribe(c -> System.out.println(c + " "));

 输出:

a 
b 
c 

最后再来一个示例:把3个单词拼成一句话,而且每个单词处理成“首字母大写”的风格。

Observable.fromArray("I", "AM", "CHINESE")
        .map(c -> c.substring(0, 1).toUpperCase() + c.substring(1).toLowerCase())
        .subscribe(c -> System.out.print(c + " "));

输出:

I Am Chinese 

 

参考:

http://www.vogella.com/tutorials/RxJava/article.html

http://www.cnblogs.com/aademeng/articles/7462540.html

https://www.jianshu.com/c/299d0a51fdd4

作者: 菩提树下的杨过
出处: http://yjmyzz.cnblogs.com
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
目录
相关文章
|
3月前
|
安全 Android开发
你是否了解 RxJava 的 Disposable ?
你是否了解 RxJava 的 Disposable ?
62 0
RxJava2 中 doFinally 和 doAfterTerminate 的比较
RxJava2 中 doFinally 和 doAfterTerminate 的比较
236 0
|
数据处理
RxJava2实现RxBus
RxJava2实现RxBus
143 0
|
安全 Android开发
详解 RxJava 的 Disposable
RxJava2 的 Disposable,可以在适当时机取消订阅、截断数据流,避免 Android 中的内存泄露。
975 0
|
Java 数据库 UED
RxJava的简介
RxJava的简介
236 0
RxJava的简介
|
Java Go Android开发
RxJava2
函数式编程是一种编程范式。我们常见的编程范式有命令式编程、函数式编程和逻辑式编程。我们常见的面向对象编程是一种命令式编程。命令式编程是面向计算机硬件的抽象,有变量、赋值语句、表达式和控制语句。而函数式编程是面向数学的抽象,将计算描述为一种表达式求值,函数可以在任何地方定义,并且可以对函数进行组合。响应式编程是一种面向数据流和变化传播的编程范式,数据更新是相关联的。把函数式编程里的一套思路和响应式编程合起来就是函数响应式编程。函数响应式编程可以极大地简化项目,特别是处理嵌套回调的异步事件、复杂的列表过滤和变换或者时间相关问题。在Android开发中使用函数响应式编程的主要有两大框架:
144 0
RxJava2
|
设计模式 存储 Java
XTask与RxJava的使用对比
XTask与RxJava的使用对比
134 0
XTask与RxJava的使用对比
|
负载均衡 算法 Java
RxJava 并行操作
RxJava 并行操作
375 0
RxJava 并行操作
|
Java API
RxJava 之 ParallelFlowable
RxJava 之 ParallelFlowable
216 0
RxJava 之 ParallelFlowable
|
Java 调度 安全
Rxjava深入理解之自己动手编写Rxjava
Demo的源码地址在 mini-rxjava, 有兴趣的可以下载源码来看. 从观察者模式说起 观察者模式,是我们在平时使用的比较多的一种设计模式.观察者模式定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象。
1144 0