import { BehaviorSubject, filter, Observable, Subscriber, Subscription, timer } from 'rxjs';

// TODO: move all logic below to common-utils
export class Cancellation {
  private _cancelSubject = new BehaviorSubject(false);
  private _subscriptions = new Subscription();

  public get cancelled() {
    return this._cancelSubject.value;
  }

  public get observable() {
    return this._cancelSubject.pipe(filter(x => x));
  }

  public subscribe(fn: () => void) {
    this._subscriptions.add(this.observable.subscribe({ next: fn }));
  }

  public cancel() {
    this._cancelSubject.next(true);
    this._cancelSubject.complete();
    this._subscriptions.unsubscribe();
  }

  public dispose() {
    this._subscriptions.unsubscribe();
    this._cancelSubject.complete();
  }

  public addTimeout(timeoutMs: number) {
    this._subscriptions.add(timer(timeoutMs).subscribe(_ => this.cancel()));
  }

  public static timeout(timeoutMs: number) {
    const cancellation = new Cancellation();
    cancellation.addTimeout(timeoutMs);
    return cancellation;
  }

  public static combined(...cancellations: Cancellation[]) {
    const combinedCancellation = new Cancellation();
    for (const cancellation of cancellations) {
      combinedCancellation._subscriptions.add(cancellation.observable.subscribe(() => combinedCancellation.cancel()));
    }
    return combinedCancellation;
  }
}

export function fromAsync<T>(
  asyncWork: (subscriber: Subscriber<T>, cancellationToken: Cancellation) => Promise<unknown>,
  asyncWorkCancellation?: Cancellation
): Observable<T> {
  return new Observable(subscriber => {
    const subscriptionCancellation = new Cancellation();
    const promiseCancellation = asyncWorkCancellation ?? new Cancellation();
    asyncWork(subscriber, Cancellation.combined(subscriptionCancellation, promiseCancellation))
      .catch(error => {
        subscriber.error(error);
        subscriptionCancellation.cancel();
      })
      .finally(() => promiseCancellation.dispose());
    return () => subscriptionCancellation.cancel();
  });
}
