RxJS -- Subscription

简介: Subscription是什么? 当subscribe一个observable的时候, 返回的就是一个subscription. 它是一个一次性对象(disposable), 它有一个非常重要的方法 ubsubscribe(), 它没有参数, 它会dispose掉subscription所持有的资源, 或者叫取消observable的执行.

Subscription是什么?

当subscribe一个observable的时候, 返回的就是一个subscription. 它是一个一次性对象(disposable), 它有一个非常重要的方法 ubsubscribe(), 它没有参数, 它会dispose掉subscription所持有的资源, 或者叫取消observable的执行.

第一个例子:

import { Observable } from "rxjs/Observable";
import { Subscription } from "rxjs/Subscription";
import 'rxjs/add/observable/interval';

const observable = Observable.interval(1000);

const subscription = observable.subscribe(x => console.log(x));

console.log(subscription);

subscription.unsubscribe();

console.log(subscription);

 

运行结果是这样的:

Subscriber {
  closed: false,
  _parent: null,
  _parents: null,
  _subscriptions: 
   [ AsyncAction {
       closed: false,
       _parent: [Circular],
       _parents: null,
       _subscriptions: null,
       scheduler: [AsyncScheduler],
       work: [Function],
       pending: true,
       state: [Object],
       delay: 1000,
       id: [Timeout] } ],
  syncErrorValue: null,
  syncErrorThrown: false,
  syncErrorThrowable: false,
  isStopped: false,
  destination: 
   SafeSubscriber {
     closed: false,
     _parent: null,
     _parents: null,
     _subscriptions: null,
     syncErrorValue: null,
     syncErrorThrown: false,
     syncErrorThrowable: false,
     isStopped: false,
     destination: 
      { closed: true,
        next: [Function: next],
        error: [Function: error],
        complete: [Function: complete] },
     _parentSubscriber: [Circular],
     _context: [Circular],
     _next: [Function],
     _error: undefined,
     _complete: undefined } }
Subscriber {
  closed: true,
  _parent: null,
  _parents: null,
  _subscriptions: null,
  syncErrorValue: null,
  syncErrorThrown: false,
  syncErrorThrowable: false,
  isStopped: true,
  destination: 
   SafeSubscriber {
     closed: false,
     _parent: null,
     _parents: null,
     _subscriptions: null,
     syncErrorValue: null,
     syncErrorThrown: false,
     syncErrorThrowable: false,
     isStopped: false,
     destination: 
      { closed: true,
        next: [Function: next],
        error: [Function: error],
        complete: [Function: complete] },
     _parentSubscriber: [Circular],
     _context: [Circular],
     _next: [Function],
     _error: undefined,
     _complete: undefined } }

注意两次控制台输出的closed属性的值是不同的, true表示已经unsubscribe()了.

在ubsubscribe之后, _subscriptions属性也变成空了, 之前它是一个数组, 说明subscription可以是多个subscriptions的组合.

毁灭函数

如果使用Observable.create方法的话, 它的参数函数可以返回一个function. 而subscription在unsubscribe这个observable的时候, 会调用这个参数函数返回的function, 看例子:

import { Observable } from "rxjs/Observable";
import { Subscription } from "rxjs/Subscription";
import 'rxjs/add/observable/interval';

const observable = Observable.create(observer => {

    let index = 1;
    setInterval(() => {
        observer.next(index++);
    }, 200);

    return () => {
        // 在这可以做清理工作
        console.log('我在Observable.create返回的function里面...');
    };
});

const subscription = observable.subscribe(
    x => console.log(x),
    err => console.error(err),
    () => console.log(`complete..`)
);

setTimeout(() => {
    subscription.unsubscribe();
}, 1100);

 

运行结果:

这个例子很好的解释了我写的那一堆拗口的解释..

 

retry, retryWhen的原理

直接举例:

import { Observable } from "rxjs/Observable";
import { Subscription } from "rxjs/Subscription";
import 'rxjs/add/observable/interval';
import 'rxjs/add/operator/retry';

const observable = Observable.create(observer => {

    setInterval(() => {
        observer.next('doing...');
        observer.error('error!!!');
    }, 200);

    return () => {
        // 在这可以做清理工作
        console.log('我在Observable.create返回的function里面...');
    };
}).retry(4);

observable.subscribe(
    x => console.log(x),
    err => console.error(err),
    () => console.log(`complete..`)
);

 

可以看到, 每次执行next之后都会有错误, 重试4次.

运行结果:

可以看到, retry/retryWhen其实的原理即是先unsubscribe然后再重新subscribe而已, 所以每次retry都会运行我所称的毁灭函数.

 

操作多个Subscriptions

多个subscriptions可以一起操作, 一个subscription可以同时unsubscribe多个subscriptions, 使用add方法为subscription添加另一个subscription. 对应的还有一个remove方法.

直接举官网的例子:

var observable1 = Observable.interval(400);
var observable2 = Observable.interval(300);

var subscription = observable1.subscribe(x => console.log('first: ' + x));
var childSubscription = observable2.subscribe(x => console.log('second: ' + x));

subscription.add(childSubscription);

setTimeout(() => {
  // Unsubscribes BOTH subscription and childSubscription
  subscription.unsubscribe();
}, 1000);

 

运行结果:

 

下面是我的关于ASP.NET Core Web API相关技术的公众号--草根专栏:

目录
相关文章
|
6月前
|
缓存 数据处理 数据格式
Rxjs 里 Observable 对象的 tap 操作
Rxjs 里 Observable 对象的 tap 操作
25 0
|
6月前
什么是 Rxjs Observable subscribe 方法的副作用
什么是 Rxjs Observable subscribe 方法的副作用
28 0
|
9月前
|
JavaScript 前端开发 算法
RxJS系列06:测试 Observable
RxJS系列06:测试 Observable
|
C++
Writing a simple publisher and subscriber (C++):编写一个简单的发布者和订阅者(C++)
Writing a simple publisher and subscriber (C++):编写一个简单的发布者和订阅者(C++)
104 0
|
JavaScript 前端开发 调度
你会用RxJS吗?【初识 RxJS中的Observable和Observer】
概念 RxJS是一个库,可以使用可观察队列来编写异步和基于事件的程序的库。 RxJS 中管理和解决异步事件的几个关键点: Observable: 表示未来值或事件的可调用集合的概念。 Observer: 是一个回调集合,它知道如何监听 Observable 传递的值。 Subscription: 表示一个 Observable 的执行,主要用于取消执行。 Operators:** 是纯函数,可以使用函数式编程风格来处理具有map、filter、concat、reduce等操作的集合。
109 0
rxjs里subscribe和tap的区别
rxjs里subscribe和tap的区别
233 0
rxjs里Observable.subscribe(subscriber)的执行示意图
rxjs里Observable.subscribe(subscriber)的执行示意图
89 0
rxjs里Observable.subscribe(subscriber)的执行示意图
rxjs里switchMap operators的用法
rxjs里switchMap operators的用法
170 0
rxjs里switchMap operators的用法
Rxjs BehaviorSuject 和 Observable 的区别
Rxjs BehaviorSuject 和 Observable 的区别
109 0
Rxjs BehaviorSuject 和 Observable 的区别