最新文章专题视频专题问答1问答10问答100问答1000问答2000关键字专题1关键字专题50关键字专题500关键字专题1500TAG最新视频文章推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37视频文章20视频文章30视频文章40视频文章50视频文章60 视频文章70视频文章80视频文章90视频文章100视频文章120视频文章140 视频2关键字专题关键字专题tag2tag3文章专题文章专题2文章索引1文章索引2文章索引3文章索引4文章索引5123456789101112131415文章专题3
当前位置: 首页 - 科技 - 知识百科 - 正文

关于RxJS Subject的学习笔记

来源:动视网 责编:小采 时间:2020-11-27 22:02:48
文档

关于RxJS Subject的学习笔记

关于RxJS Subject的学习笔记:Observer Pattern 观察者模式定义 观察者模式又叫发布订阅模式(Publish/Subscribe),它定义了一种一对多的关系,让多个观察者对象同时监听某一个主题对象,这个主题对象的状态发生变化时就会通知所有的观察者对象,使得它们能够自动更新自己。 我们可以使
推荐度:
导读关于RxJS Subject的学习笔记:Observer Pattern 观察者模式定义 观察者模式又叫发布订阅模式(Publish/Subscribe),它定义了一种一对多的关系,让多个观察者对象同时监听某一个主题对象,这个主题对象的状态发生变化时就会通知所有的观察者对象,使得它们能够自动更新自己。 我们可以使


Observable subscribe

在介绍RxJS - Subject 之前,我们先来看个示例:

const interval$ = Rx.Observable.interval(1000).take(3);

interval$.subscribe({
 next: value => console.log('Observer A get value: ' + value);
});

setTimeout(() => {
 interval$.subscribe({
 next: value => console.log('Observer B get value: ' + value);
 });
}, 1000);

以上代码运行后,控制台的输出结果:

Observer A get value: 0
Observer A get value: 1
Observer B get value: 0
Observer A get value: 2
Observer B get value: 1
Observer B get value: 2

通过以上示例,我们可以得出以下结论:

  • Observable 对象可以被重复订阅
  • Observable 对象每次被订阅后,都会重新执行
  • 上面的示例,我们可以简单地认为两次调用普通的函数,具体参考以下代码:

    function interval() {
     setInterval(() => console.log('..'), 1000);
    }
    
    interval();
    
    setTimeout(() => {
     interval();
    }, 1000);

    Observable 对象的默认行为,适用于大部分场景。但有些时候,我们会希望在第二次订阅的时候,不会从头开始接收 Observable 发出的值,而是从第一次订阅当前正在处理的值开始发送,我们把这种处理方式成为组播 (multicast),那我们要怎么实现呢 ?回想一下我们刚才介绍过观察者模式,你脑海中是不是已经想到方案了。没错,我们可以通过自定义 Subject 来实现上述功能。

    自定义 Subject

    Subject 类定义

    class Subject { 
     constructor() {
     this.observers = [];
     }
     
     addObserver(observer) { 
     this.observers.push(observer);
     }
     
     next(value) { 
     this.observers.forEach(o => o.next(value)); 
     }
     
     error(error){ 
     this.observers.forEach(o => o.error(error));
     }
     
     complete() {
     this.observers.forEach(o => o.complete());
     }
    }

    使用示例

    const interval$ = Rx.Observable.interval(1000).take(3);
    let subject = new Subject();
    
    let observerA = {
     next: value => console.log('Observer A get value: ' + value),
     error: error => console.log('Observer A error: ' + error),
     complete: () => console.log('Observer A complete!')
    };
    
    var observerB = {
     next: value => console.log('Observer B get value: ' + value),
     error: error => console.log('Observer B error: ' + error),
     complete: () => console.log('Observer B complete!')
    };
    
    subject.addObserver(observerA); // 添加观察者A
    interval$.subscribe(subject); // 订阅interval$对象
    setTimeout(() => {
     subject.addObserver(observerB); // 添加观察者B
    }, 1000);

    以上代码运行后,控制台的输出结果:

    Observer A get value: 0
    Observer A get value: 1
    Observer B get value: 1
    Observer A get value: 2
    Observer B get value: 2
    Observer A complete!
    Observer B complete!

    通过自定义 Subject,我们实现了前面提到的功能。接下来我们进入正题 - RxJS Subject。

    RxJS Subject

    首先我们通过 RxJS Subject 来重写一下上面的示例:

    const interval$ = Rx.Observable.interval(1000).take(3);
    let subject = new Rx.Subject();
    
    let observerA = {
     next: value => console.log('Observer A get value: ' + value),
     error: error => console.log('Observer A error: ' + error),
     complete: () => console.log('Observer A complete!')
    };
    
    var observerB = {
     next: value => console.log('Observer B get value: ' + value),
     error: error => console.log('Observer B error: ' + error),
     complete: () => console.log('Observer B complete!')
    };
    
    subject.subscribe(observerA); // 添加观察者A
    interval$.subscribe(subject); // 订阅interval$对象
    setTimeout(() => {
     subject.subscribe(observerB); // 添加观察者B
    }, 1000);

    RxJS Subject 源码片段

    /**
     * Suject继承于Observable 
     */
    export class Subject extends Observable {
     constructor() {
     super();
     this.observers = []; // 观察者列表
     this.closed = false;
     this.isStopped = false;
     this.hasError = false;
     this.thrownError = null;
     }
     
     next(value) {
     if (this.closed) {
     throw new ObjectUnsubscribedError();
     }
     if (!this.isStopped) {
     const { observers } = this;
     const len = observers.length;
     const copy = observers.slice();
     for (let i = 0; i < len; i++) { // 循环调用观察者next方法,通知观察者
     copy[i].next(value);
     }
     }
     }
     
     error(err) {
     if (this.closed) {
     throw new ObjectUnsubscribedError();
     }
     this.hasError = true;
     this.thrownError = err;
     this.isStopped = true;
     const { observers } = this;
     const len = observers.length;
     const copy = observers.slice();
     for (let i = 0; i < len; i++) { // 循环调用观察者error方法
     copy[i].error(err);
     }
     this.observers.length = 0;
     }
     
     complete() {
     if (this.closed) {
     throw new ObjectUnsubscribedError();
     }
     this.isStopped = true;
     const { observers } = this;
     const len = observers.length;
     const copy = observers.slice();
     for (let i = 0; i < len; i++) { // 循环调用观察者complete方法
     copy[i].complete();
     }
     this.observers.length = 0; // 清空内部观察者列表
     }
    }

    通过 RxJS Subject 示例和源码片段,对于 Subject 我们可以得出以下结论:

  • Subject 既是 Observable 对象,又是 Observer 对象
  • 当有新消息时,Subject 会对内部的 observers 列表进行组播 (multicast)
  • Angular 2 RxJS Subject 应用

    在 Angular 2 中,我们可以利用 RxJS Subject 来实现组件通信,具体示例如下:

    message.service.ts

    import { Injectable } from '@angular/core';
    import {Observable} from 'rxjs/Observable';
    import { Subject } from 'rxjs/Subject';
    
    @Injectable()
    export class MessageService {
     private subject = new Subject<any>();
    
     sendMessage(message: string) {
     this.subject.next({ text: message });
     }
    
     clearMessage() {
     this.subject.next();
     }
    
     getMessage(): Observable<any> {
     return this.subject.asObservable();
     }
    }

    home.component.ts

    import { Component } from '@angular/core';
    
    import { MessageService } from '../_services/index';
    
    @Component({
     moduleId: module.id,
     templateUrl: 'home.component.html'
    })
    
    export class HomeComponent {
     constructor(private messageService: MessageService) {}
     
     sendMessage(): void { // 发送消息
     this.messageService.sendMessage('Message from Home Component to App Component!');
     }
    
     clearMessage(): void { // 清除消息
     this.messageService.clearMessage();
     }
    }

    app.component.ts

    import { Component, OnDestroy } from '@angular/core';
    import { Subscription } from 'rxjs/Subscription';
    
    import { MessageService } from './_services/index';
    
    @Component({
     moduleId: module.id,
     selector: 'app',
     templateUrl: 'app.component.html'
    })
    
    export class AppComponent implements OnDestroy {
     message: any;
     subscription: Subscription;
    
     constructor(private messageService: MessageService) {
     this.subscription = this.messageService.getMessage()
     .subscribe(message => { this.message = message; });
     }
    
     ngOnDestroy() {
     this.subscription.unsubscribe();
     }
    }

    以上示例实现的功能是组件之间消息通信,即 HomeComponent 子组件,向 AppComponent 父组件发送消息。代码运行后,浏览器的显示结果如下:

    Subject 存在的问题

    因为 Subject 在订阅时,是把 observer 存放到观察者列表中,并在接收到新值的时候,遍历观察者列表并调用观察者上的 next 方法,具体如下:

    next(value) {
     if (this.closed) {
     throw new ObjectUnsubscribedError();
     }
     if (!this.isStopped) {
     const { observers } = this;
     const len = observers.length;
     const copy = observers.slice();
     for (let i = 0; i < len; i++) { // 循环调用观察者next方法,通知观察者
     copy[i].next(value);
     }
     }
    }

    这样会有一个大问题,如果某个 observer 在执行时出现异常,却没进行异常处理,就会影响到其它的订阅者,具体示例如下:

    const source = Rx.Observable.interval(1000);
    const subject = new Rx.Subject();
    
    const example = subject.map(x => {
     if (x === 1) {
     throw new Error('oops');
     }
     return x;
    });
    subject.subscribe(x => console.log('A', x));
    example.subscribe(x => console.log('B', x));
    subject.subscribe(x => console.log('C', x));
    
    source.subscribe(subject);

    以上代码运行后,控制台的输出结果:

    A 0
    B 0
    C 0
    A 1
    Rx.min.js:74 Uncaught Error: oops

    JSBin - Subject Problem Demo

    在代码运行前,大家会认为观察者B 会在接收到 1 值时抛出异常,观察者 A 和 C 仍会正常运行。但实际上,在当前的 RxJS 版本中若观察者 B 报错,观察者 A 和 C 也会停止运行。那么应该如何解决这个问题呢?目前最简单的方式就是为所有的观察者添加异常处理,更新后的代码如下:

    const source = Rx.Observable.interval(1000);
    const subject = new Rx.Subject();
    
    const example = subject.map(x => {
     if (x === 1) {
     throw new Error('oops');
     }
     return x;
    });
    
    subject.subscribe(
     x => console.log('A', x),
     error => console.log('A Error:' + error)
    );
     
    example.subscribe(
     x => console.log('B', x),
     error => console.log('B Error:' + error)
    );
    
    subject.subscribe(
     x => console.log('C', x),
     error => console.log('C Error:' + error)
    );
    
    source.subscribe(subject);

    JSBin - RxJS Subject Problem Solved Demo

    RxJS Subject & Observable

    Subject 其实是观察者模式的实现,所以当观察者订阅 Subject 对象时,Subject 对象会把订阅者添加到观察者列表中,每当有 subject 对象接收到新值时,它就会遍历观察者列表,依次调用观察者内部的 next() 方法,把值一一送出。

    Subject 之所以具有 Observable 中的所有方法,是因为 Subject 类继承了 Observable 类,在 Subject 类中有五个重要的方法:

  • next - 每当 Subject 对象接收到新值的时候,next 方法会被调用
  • error - 运行中出现异常,error 方法会被调用
  • complete - Subject 订阅的 Observable 对象结束后,complete 方法会被调用
  • subscribe - 添加观察者
  • unsubscribe - 取消订阅 (设置终止标识符、清空观察者列表)
  • BehaviorSubject

    BehaviorSubject 定义

    BehaviorSubject 源码片段

    export class BehaviorSubject extends Subject {
     constructor(_value) { // 设置初始值
     super();
     this._value = _value;
     }
     get value() { // 获取当前值
     return this.getValue();
     }
     _subscribe(subscriber) {
     const subscription = super._subscribe(subscriber);
     if (subscription && !subscription.closed) {
     subscriber.next(this._value); // 为新的订阅者发送当前最新的值
     }
     return subscription;
     }
     getValue() {
     if (this.hasError) {
     throw this.thrownError;
     }
     else if (this.closed) {
     throw new ObjectUnsubscribedError();
     }
     else {
     return this._value;
     }
     }
     next(value) { // 调用父类Subject的next方法,同时更新当前值
     super.next(this._value = value);
     }
    }

    BehaviorSubject 应用

    有些时候我们会希望 Subject 能保存当前的最新状态,而不是单纯的进行事件发送,也就是说每当新增一个观察者的时候,我们希望 Subject 能够立即发出当前最新的值,而不是没有任何响应。具体我们先看一下示例:

    var subject = new Rx.Subject();
    
    var observerA = {
     next: value => console.log('Observer A get value: ' + value),
     error: error => console.log('Observer A error: ' + error),
     complete: () => console.log('Observer A complete!')
    };
    
    var observerB = {
     next: value => console.log('Observer B get value: ' + value),
     error: error => console.log('Observer B error: ' + error),
     complete: () => console.log('Observer B complete!')
    };
    
    subject.subscribe(observerA);
    
    subject.next(1);
    subject.next(2);
    subject.next(3);
    
    setTimeout(() => {
     subject.subscribe(observerB); // 1秒后订阅
    }, 1000);

    以上代码运行后,控制台的输出结果:

    Observer A get value: 1
    Observer A get value: 2
    Observer A get value: 3

    通过输出结果,我们发现在 observerB 订阅 Subject 对象后,它再也没有收到任何值了。因为 Subject 对象没有再调用 next() 方法。但很多时候我们会希望 Subject 对象能够保存当前的状态,当新增订阅者的时候,自动把当前最新的值发送给订阅者。要实现这个功能,我们就需要使用 BehaviorSubject。

    BehaviorSubject 跟 Subject 最大的不同就是 BehaviorSubject 是用来保存当前最新的值,而不是单纯的发送事件。BehaviorSubject 会记住最近一次发送的值,并把该值作为当前值保存在内部的属性中。接下来我们来使用 BehaviorSubject 重新一下上面的示例:

    var subject = new Rx.BehaviorSubject(0); // 设定初始值
    
    var observerA = {
     next: value => console.log('Observer A get value: ' + value),
     error: error => console.log('Observer A error: ' + error),
     complete: () => console.log('Observer A complete!')
    };
    
    var observerB = {
     next: value => console.log('Observer B get value: ' + value),
     error: error => console.log('Observer B error: ' + error),
     complete: () => console.log('Observer B complete!')
    };
    
    subject.subscribe(observerA);
    
    subject.next(1);
    subject.next(2);
    subject.next(3);
    
    setTimeout(() => {
     subject.subscribe(observerB); // 1秒后订阅
    }, 1000);

    以上代码运行后,控制台的输出结果:

    Observer A get value: 0
    Observer A get value: 1
    Observer A get value: 2
    Observer A get value: 3
    Observer B get value: 3

    JSBin - BehaviorSubject

    ReplaySubject

    ReplaySubject 定义

    ReplaySubject 源码片段

    export class ReplaySubject extends Subject {
     constructor(bufferSize = Number.POSITIVE_INFINITY, 
     windowTime = Number.POSITIVE_INFINITY, 
     scheduler) {
     super();
     this.scheduler = scheduler;
     this._events = []; // ReplayEvent对象列表
     this._bufferSize = bufferSize < 1 ? 1 : bufferSize; // 设置缓冲区大小
     this._windowTime = windowTime < 1 ? 1 : windowTime;
     }
     
     next(value) {
     const now = this._getNow();
     this._events.push(new ReplayEvent(now, value));
     this._trimBufferThenGetEvents();
     super.next(value);
     }
     
     _subscribe(subscriber) {
     const _events = this._trimBufferThenGetEvents(); // 过滤ReplayEvent对象列表
     let subscription;
     if (this.closed) {
     throw new ObjectUnsubscribedError();
     }
     ...
     else {
     this.observers.push(subscriber);
     subscription = new SubjectSubscription(this, subscriber);
     }
     ...
     const len = _events.length;
     // 重新发送设定的最后bufferSize个值
     for (let i = 0; i < len && !subscriber.closed; i++) {
     subscriber.next(_events[i].value);
     }
     ...
     return subscription;
     }
    }
    
    class ReplayEvent {
     constructor(time, value) {
     this.time = time;
     this.value = value;
     }
    }

    ReplaySubject 应用

    有些时候我们希望在 Subject 新增订阅者后,能向新增的订阅者重新发送最后几个值,这时我们就可以使用 ReplaySubject ,具体示例如下:

    var subject = new Rx.ReplaySubject(2); // 重新发送最后2个值
    
    var observerA = {
     next: value => console.log('Observer A get value: ' + value),
     error: error => console.log('Observer A error: ' + error),
     complete: () => console.log('Observer A complete!')
    };
    
    var observerB = {
     next: value => console.log('Observer B get value: ' + value),
     error: error => console.log('Observer B error: ' + error),
     complete: () => console.log('Observer B complete!')
    };
    
    subject.subscribe(observerA);
    
    subject.next(1);
    subject.next(2);
    subject.next(3);
    
    setTimeout(() => {
     subject.subscribe(observerB); // 1秒后订阅
    }, 1000);

    以上代码运行后,控制台的输出结果:

    Observer A get value: 1
    Observer A get value: 2
    Observer A get value: 3
    Observer B get value: 2
    Observer B get value: 3

    可能会有人认为 ReplaySubject(1) 是不是等同于 BehaviorSubject,其实它们是不一样的。在创建BehaviorSubject 对象时,是设置初始值,它用于表示 Subject 对象当前的状态,而 ReplaySubject 只是事件的重放。

    JSBin - ReplaySubject

    AsyncSubject

    AsyncSubject 定义

    AsyncSubject 源码片段

    export class AsyncSubject extends Subject {
     constructor() {
     super(...arguments);
     this.value = null;
     this.hasNext = false;
     this.hasCompleted = false; // 标识是否已完成
     }
     _subscribe(subscriber) {
     if (this.hasError) {
     subscriber.error(this.thrownError);
     return Subscription.EMPTY;
     }
     else if (this.hasCompleted && this.hasNext) { // 等到完成后,才发出最后的值
     subscriber.next(this.value);
     subscriber.complete();
     return Subscription.EMPTY;
     }
     return super._subscribe(subscriber);
     }
     next(value) {
     if (!this.hasCompleted) { // 若未完成,保存当前的值
     this.value = value;
     this.hasNext = true;
     }
     }
    }

    AsyncSubject 应用

    AsyncSubject 类似于 last 操作符,它会在 Subject 结束后发出最后一个值,具体示例如下:

    var subject = new Rx.AsyncSubject();
    
     var observerA = {
     next: value => console.log('Observer A get value: ' + value),
     error: error => console.log('Observer A error: ' + error),
     complete: () => console.log('Observer A complete!')
     };
    
     var observerB = {
     next: value => console.log('Observer B get value: ' + value),
     error: error => console.log('Observer B error: ' + error),
     complete: () => console.log('Observer B complete!')
     };
    
     subject.subscribe(observerA);
    
     subject.next(1);
     subject.next(2);
     subject.next(3);
    
     subject.complete();
    
     setTimeout(() => {
     subject.subscribe(observerB); // 1秒后订阅
     }, 1000);

    以上代码运行后,控制台的输出结果:

    Observer A get value: 3
    Observer A complete!
    Observer B get value: 3
    Observer B complete!

    JSBin - AsyncSubject

    文档

    关于RxJS Subject的学习笔记

    关于RxJS Subject的学习笔记:Observer Pattern 观察者模式定义 观察者模式又叫发布订阅模式(Publish/Subscribe),它定义了一种一对多的关系,让多个观察者对象同时监听某一个主题对象,这个主题对象的状态发生变化时就会通知所有的观察者对象,使得它们能够自动更新自己。 我们可以使
    推荐度:
    标签: 关于 subjects subject
    • 热门焦点

    最新推荐

    猜你喜欢

    热门推荐

    专题
    Top