Observable

May 25, 2022

build your own Observable

Observable 虽然是可观察对象,但是其内部并没有维护一个观察者列表,订阅 Observable 其实就像执行一个函数一样

下面是一个 Observable 的简单实用案例

type Observer = {
  next?: (value: any) => void,
  error?: (error) => void,
  complete?: () => void
}

const simpleObservable = new Observable((observer: Observer) => {
  observer.next(1)
  setTimeout(() => {
    observer.next(2)
    observer.complete()
    observer.next(3) // do not show
  }, 200)
  observer.next(4)
})

const observer: Observer = {
  next: (value: any) => console.info(value),
  error: () => {},
  complete: () => console.info('complete')
}

simpleObservable.subscribe(observer)
// 输出结果:1 4 2

从上面代码看出,Observable 接受一个以 observer 为参数的函数,然后在 subscribe 时传入 observer 调用这个函数,因此 Observable 实际上就是如此

type Subscribe = (observer: Observer) => () => void

class Observable {
  private _subscribe: Subscribe
  // 接受一个以 observer 为参数的函数
  constructor(subscribe: Subscribe) {
    this._subscribe = subscribe
  }

  subscribe(observer: Observer) {
    // 在此调用并传入 observer
    this._subscribe(observer)
  }
}

但是外部传入的 observer 并不一定按我们的要求来,所以我们需要保护一下

type Subscribe = (observer: Observer) => () => void

class SafeObserver {
  private destination: Observer
  private isUnsubscribed: boolean = false
  constructor(destination: Observer) {
    this.destination = destination
  }

  next(value: any) {
    if (this.destination.next && !this.isUnsubscribed) {
      this.destination.next(value)
    }
  }

  error(err: Error) {
    if (!this.isUnsubscribed) {
      this.isUnsubscribed = true
      if (this.destination.error) {
        this.destination.error(err)
      }
    }
  }

  complete() {
    if (!this.isUnsubscribed) {
      this.isUnsubscribed = true
      if (this.destination.complete) {
        this.destination.complete()
      }
    }
  }
}

class Observable {
  private _subscribe: Subscribe
  constructor(_subscribe: Subscribe) {
    this._subscribe = _subscribe
  }

  subscribe(observer: any) {
    const safeObserver = new SafeObserver(observer)
    return this._subscribe(safeObserver)
  }
}