Rxjs
一、Concept
RxJS 是一个基于可观测数据流,结合了观察者模式和迭代器模式的一种异步编程的库,用来解决异步事件管理。
tips:
响应式编程 (Reactive Programming)
是一种基于事件的模型。在异步编程模式中,两种获得上一个任务执行结果的方式,一个就是主动轮询,另一个是被动接收反馈(称为 Reactive)。
在 Reactive 方式中,上一个任务的结果的反馈就是一个事件,这个事件的到来将会触发下一个任务的执行。
- Observable (可观察对象):产生数据的对象,是一个可调用的未来值或事件的集合。
- Observer (观察者): 接收数据的对象。一个回调函数的集合,它知道如何去监听由 Observable 提供的值。
- Subscription (订阅): 表示 Observable 的执行,主要用于取消 Observable 的执行。
- Operators (操作符): 采用函数式编程风格的纯函数 (pure function),使用像 map、filter、concat、flatMap 等这样的操作符来处理集合。
- Subject (主体): 相当于 EventEmitter,并且是将值或事件多路推送给多个 Observer 的唯一方式。
二、Observable
Observables 是使用 Rx.Observable.create 或创建操作符创建的,并使用观察者来订阅它,然后执行它并发送 next / error / complete 通知给观察者,而且执行可能会被清理。
Observable 的核心:
创建 Observables
1 | // 创建一个 Observable:每隔一秒会向观察者发送字符串 'hi' |
通常使用创建操作符: of、from、interval等。
订阅 Observables
1 | observable.subscribe(x => console.log(x)); |
subscribe 调用是启动 “Observable 执行”的一种简单方式, 并将值或事件传递给本次执行的观察者。
执行 Observables
1 | Observable.create(function subscribe(observer) {...}) |
上述…的代码表示 “Observable 执行”,它是惰性运算,只有在每个观察者订阅后才会执行。
执行可以传递三种类型的值:
- “Next” 通知: 发送一个值,比如数字、字符串、对象,等等。
- “Error” 通知: 发送一个 JavaScript 错误 或 异常。
- “Complete” 通知: 不再发送任何值。
“Error” 和 “Complete” 通知可能只会在 Observable 执行期间发生一次,并且只会执行其中的一个。
清理 Observables
因为 Observable 的执行可能会是无限的。若要在有限时间内中止执行,就需要一个 API 来取消执行。
1 | var observable = Rx.Observable.from([10, 20, 30]); |
二、Observer
观察者只是一组回调函数的集合,每个回调函数对应一种 Observable 发送的通知类型:next、error 和 complete。
典型对象如下:
1 | var observer = { |
当订阅 Observable 时,可能只提供了一个回调函数作为参数,like this
1 | observable.subscribe(x => console.log('Observer got a next value: ' + x)); |
上述这样会创建一个观察者对象,并使用第一个回调函数参数作为 next 的处理方法。
三种类型的回调函数都可以直接作为参数来提供:
1 | observable.subscribe( |
三、Subject
RxJS Subject 是一种特殊类型的 Observable。
每个 Subject 都是 Observable 。
每个 Subject 都是观察者。
可以提供一个观察者并使用 subscribe 方法,就可以开始正常接收值。
上图中的Subject,是一个代理对象,既是一个 Observable 又是一个 Observer。
它可以同时接受 Observable 发射出的数据,也可以向订阅了它的 observer 发射数据,
同时,Subject 会对内部的 observers 清单,进行多播(multicast)。
四、单播与多播
单播
普通的Observable 是单播的。
意思是,
每个普通的 Observables 实例都只能被一个观察者订阅,
当它被其他观察者订阅的时候,会产生一个新的实例。
也就是普通 Observables 被不同的观察者订阅的时候,会有多个实例。
不管观察者是从何时开始订阅,每个实例都是从头开始把值发给对应的观察者。
1 | const Rx = require('rxjs/Rx') |
上述代码:
A从0开始,每隔一秒打印一个递增的数,而B延时了一秒,然后再从0开始打印,由此可见,A与B的执行是完全分开的,也就是每次订阅都创建了一个新的实例。
多播
不论什么时候订阅只会接收到实时的数据。
上面例子中:相当于提供一个中间人来订阅这个源,然后将数据转发给A和B
1 | const source = Rx.Observable.interval(1000).take(3); |
不同于单播订阅者总是需要从头开始获取数据,多播模式能够保证数据的实时性。
有如下方法: next(v)、error(e) 和 complete() 。要给 Subject 提供新值,只要调用 next(theValue),它会将值多播给已注册监听该 Subject 的观察者们。
1 | var subject = new Rx.Subject(); |
五、Subject 特点
| BehaviorSubject | |
|---|---|
| 特点 | 当前值 |
| 应用 | 保存了发送给消费者的最新值。当有新的观察者订阅时,立即接收到“当前值”。 |
| ReplaySubject | |
|---|---|
| 特点 | 回放值 |
| 应用 | 记录 Observable 执行中的多个值,并将其回放给新的订阅者。 |
| 参数 | 可指定缓冲数量,或window time (ms)来确定多久之前的值可以记录。 |
| AsyncSubject | |
|---|---|
| 特点 | 完成值 |
| 应用 | 只有当 Observable 执行完成时(complete()),才会将执行的最后一个单个值发送给观察者。 |
六、Operators
操作符是函数,它基于当前的 Observable, 创建一个新的 Observable。
这是一个无副作用的操作:前面的 Observable 保持不变。
静态操作符是: 附加到 Observalbe 类上的纯函数,通常用来从头开始创建 Observalbe 。
实例操作符是: 使用 this 关键字来指代输入的 Observable 的函数。
根据不同的用途分类为:创建、转换、过滤、组合、错误处理、工具等等。
from
结构对象,依次输出。
输入可遍历的对象(如字符串,数组) string 输出分个开的string.
如 输入’hey hi’ 输出 ‘h’ ‘e’ ‘y’ ‘ ‘ ‘h’ ‘i’
输入 [1,2,3] 输出单个的数字 1 2 3
of
原样输出,可传入可遍历对象或单独的多个参数
七、Schedulers
- 本文作者:JSZ
- 本文链接:blog.vampuck.com/2023/02/06/rxjs/index.html
- 版权声明:本博客所有文章均采用 BY-NC-SA 许可协议,转载请注明出处!