Observable
May 25, 2022
build your own Observable
Observable 虽然是可观察对象,但是其内部并没有维护一个观察者列表,订阅 Observable 其实就像执行一个函数一样
下面是一个 Observable 的简单实用案例
type Observer = {
  next?: (value: any) => void,
  error?: (error) => void,
  complete?: () => void
}
const simpleObservable = new Observable((observer: Observer) => {
  observer.next(1)
  setTimeout(() => {
    observer.next(2)
    observer.complete()
    observer.next(3) // do not show
  }, 200)
  observer.next(4)
})
const observer: Observer = {
  next: (value: any) => console.info(value),
  error: () => {},
  complete: () => console.info('complete')
}
simpleObservable.subscribe(observer)
// 输出结果:1 4 2从上面代码看出,Observable 接受一个以 observer 为参数的函数,然后在 subscribe 时传入 observer 调用这个函数,因此 Observable 实际上就是如此
type Subscribe = (observer: Observer) => () => void
class Observable {
  private _subscribe: Subscribe
  // 接受一个以 observer 为参数的函数
  constructor(subscribe: Subscribe) {
    this._subscribe = subscribe
  }
  subscribe(observer: Observer) {
    // 在此调用并传入 observer
    this._subscribe(observer)
  }
}但是外部传入的 observer 并不一定按我们的要求来,所以我们需要保护一下
type Subscribe = (observer: Observer) => () => void
class SafeObserver {
  private destination: Observer
  private isUnsubscribed: boolean = false
  constructor(destination: Observer) {
    this.destination = destination
  }
  next(value: any) {
    if (this.destination.next && !this.isUnsubscribed) {
      this.destination.next(value)
    }
  }
  error(err: Error) {
    if (!this.isUnsubscribed) {
      this.isUnsubscribed = true
      if (this.destination.error) {
        this.destination.error(err)
      }
    }
  }
  complete() {
    if (!this.isUnsubscribed) {
      this.isUnsubscribed = true
      if (this.destination.complete) {
        this.destination.complete()
      }
    }
  }
}
class Observable {
  private _subscribe: Subscribe
  constructor(_subscribe: Subscribe) {
    this._subscribe = _subscribe
  }
  subscribe(observer: any) {
    const safeObserver = new SafeObserver(observer)
    return this._subscribe(safeObserver)
  }
}