Observable
May 25, 2022
build your own Observable
Observable 虽然是可观察对象,但是其内部并没有维护一个观察者列表,订阅 Observable 其实就像执行一个函数一样
下面是一个 Observable 的简单实用案例
type Observer = {
next?: (value: any) => void,
error?: (error) => void,
complete?: () => void
}
const simpleObservable = new Observable((observer: Observer) => {
observer.next(1)
setTimeout(() => {
observer.next(2)
observer.complete()
observer.next(3) // do not show
}, 200)
observer.next(4)
})
const observer: Observer = {
next: (value: any) => console.info(value),
error: () => {},
complete: () => console.info('complete')
}
simpleObservable.subscribe(observer)
// 输出结果:1 4 2
从上面代码看出,Observable 接受一个以 observer 为参数的函数,然后在 subscribe 时传入 observer 调用这个函数,因此 Observable 实际上就是如此
type Subscribe = (observer: Observer) => () => void
class Observable {
private _subscribe: Subscribe
// 接受一个以 observer 为参数的函数
constructor(subscribe: Subscribe) {
this._subscribe = subscribe
}
subscribe(observer: Observer) {
// 在此调用并传入 observer
this._subscribe(observer)
}
}
但是外部传入的 observer 并不一定按我们的要求来,所以我们需要保护一下
type Subscribe = (observer: Observer) => () => void
class SafeObserver {
private destination: Observer
private isUnsubscribed: boolean = false
constructor(destination: Observer) {
this.destination = destination
}
next(value: any) {
if (this.destination.next && !this.isUnsubscribed) {
this.destination.next(value)
}
}
error(err: Error) {
if (!this.isUnsubscribed) {
this.isUnsubscribed = true
if (this.destination.error) {
this.destination.error(err)
}
}
}
complete() {
if (!this.isUnsubscribed) {
this.isUnsubscribed = true
if (this.destination.complete) {
this.destination.complete()
}
}
}
}
class Observable {
private _subscribe: Subscribe
constructor(_subscribe: Subscribe) {
this._subscribe = _subscribe
}
subscribe(observer: any) {
const safeObserver = new SafeObserver(observer)
return this._subscribe(safeObserver)
}
}