import {
  Observable,
  BehaviorSubject,
  Subject,
  defer,
  combineLatest,
  concat,
  ignoreElements,
  timer,
  map,
  filter,
  buffer,
  throwError,
  share,
  delay,
  tap,
  startWith,
  exhaustMap,
  retry,
  type RetryConfig,
  type OperatorFunction,
} from "rxjs";

import { captureException } from "util/exception";

/** Unwrap an Observable inner type, like `Awaited`, but for Observables intead of Promises */
export type Subscribed<T> = T extends Observable<infer I> ? I : never;
type RetryBackoffParams = {
  predicate?: (val: unknown, index: number) => boolean;
  startWait?: number;
  maxWait?: number;
  maxAttempts?: number;
};
type PollWhileParams<S, T> = {
  /** Millisecond rate of polling. */
  interval: number;
  /** When truthy, keep emitting. */
  predicate: (sourceValue: S, index: number) => boolean;
  /** Higher order observable constructor function */
  iteration: (sourceValue: S, index: number, realIndex: number) => Observable<T>;
};

const SKIP_VALUE = Symbol("Value never to be emitted");

function valueIsNotSkipSymbol<T>(value: T | symbol): value is T {
  return value !== SKIP_VALUE;
}

/**
 * shareLastValue makes the source observable "hot" by sharing its latest value
 * to all subscribers. shareLastValue unsubscribes from the source observable if
 * there's no more subscribers.
 */
export function shareLastValue<T>(): OperatorFunction<T, T> {
  return (source$) =>
    source$.pipe(
      share({
        connector: () => new BehaviorSubject<T | symbol>(SKIP_VALUE),
        resetOnComplete: false,
        resetOnError: false,
      }),
      filter<T | symbol, T>(valueIsNotSkipSymbol),
    );
}

/** retryWhenWithCaptureException retries an observable sequence while also capturing exception. */
export function retryWhenWithCaptureException<T>(
  retryConfig?: number | RetryConfig,
): OperatorFunction<T, T> {
  return (source$) => source$.pipe(tap({ error: captureException }), retry(retryConfig as number));
}

/** retries an observable sequence with exponential backoff while also capturing exceptions.
 */
export function retryBackoffWithCaptureException<T>(
  retryBackoffConfig: RetryBackoffParams = {},
): OperatorFunction<T, T> {
  return (source$) =>
    source$.pipe(tap({ error: captureException }), retryBackoff(retryBackoffConfig));
}

/**
 * Uses a iteration fn to "poll" a inner observable on an interval. The interval cannot be interupted
 * by an emmisson of source$. Think of it like a repeating (while loop) version of `exhaustMap`.
 */
export function pollWhile<S, T>(options: PollWhileParams<S, T>): OperatorFunction<S, T> {
  const { predicate, iteration } = options;
  const delay$ = timer(options.interval).pipe(ignoreElements());
  return (source$) =>
    new Observable((observer) => {
      const completeSignal$ = new Subject<null>();
      const prefixedCompletes$ = completeSignal$.pipe(
        // We add a delay here of zero (which means we wait till the next frame). We need to do this
        // since we don't want this to emit until the exhaustMap inner observable below fully completes.
        delay(0),
        startWith(null),
      );
      const sourceWithIndex$ = source$.pipe(
        // Make sure to complete everything when source$ completes...
        tap({ complete: () => completeSignal$.complete() }),
        map((value, index) => ({ value, index })),
      );
      return combineLatest([sourceWithIndex$, prefixedCompletes$])
        .pipe(
          map(([sourceValue]) => sourceValue),
          // We must filter after the combine latest so we can cancel _either_ source$ or complete$.
          filter(({ value, index }) => predicate(value, index)),
          exhaustMap(({ value, index }, realIndex) => {
            return concat(iteration(value, index, realIndex), delay$).pipe(
              tap({ complete: () => completeSignal$.next(null) }),
            );
          }),
        )
        .subscribe(observer);
    });
}

/** waits at least `minDelay` milliseconds before emitting */
export function withMinDelay<T>(
  source$: Observable<T> | Promise<T>,
  minDelay = 1000,
): Observable<T> {
  return combineLatest([timer(minDelay), source$]).pipe(map(([, value]) => value));
}

export function blobFromUrl(url: string): Observable<Blob> {
  return defer(() =>
    fetch(url).then((res) => {
      if (res.ok) {
        return res.blob();
      }
      throw new Error();
    }),
  );
}

/** Like bufferTime, but lazily only starts timers on emissions */
export function lazyBufferTime<T>(timeMs: number): OperatorFunction<T, T[]> {
  return (source$) =>
    new Observable<T[]>((observer) => {
      const fire$ = new Subject<null>();
      let timeoutId: undefined | number;
      const buffered$ = source$.pipe(
        tap(() => {
          if (timeoutId === undefined) {
            timeoutId = window.setTimeout(() => {
              fire$.next(null);
              timeoutId = undefined;
            }, timeMs);
          }
        }),
        buffer(fire$),
      );
      const sub = buffered$.subscribe(observer);
      return () => {
        sub.unsubscribe();
        fire$.complete();
        if (timeoutId !== undefined) {
          window.clearTimeout(timeoutId);
        }
      };
    });
}

export function fromMutationRecords(
  target: Node,
  options?: MutationObserverInit,
): Observable<MutationRecord> {
  return new Observable((observer) => {
    const mutationObserver = new MutationObserver((mutationList) => {
      mutationList.forEach((mutation) => observer.next(mutation));
    });
    mutationObserver.observe(target, options);
    return () => mutationObserver.disconnect();
  });
}

/** Like the `retry` operator, but exponential backoff of resubscription */
export function retryBackoff<T>({
  predicate = () => true,
  startWait = 5_000,
  maxWait = 60_000,
  maxAttempts = 10,
}: RetryBackoffParams = {}) {
  return (source: Observable<T>) =>
    source.pipe(
      retry({
        count: maxAttempts,
        delay: (error, i) => {
          const index = i - 1;
          if (!predicate(error, index)) {
            return throwError(() => error);
          }
          const currentDelay = 2 ** index * startWait;
          const computedDelay = Math.min(currentDelay, maxWait);
          return timer(computedDelay);
        },
      }),
    );
}
