import { useEffect, useRef } from "react";
import {
  EMPTY,
  NEVER,
  from,
  of,
  merge,
  scan,
  startWith,
  filter,
  switchMap,
  retry,
  takeUntil,
  catchError,
  type Observable,
} from "rxjs";
import type { DocumentNode } from "graphql";

import { useApolloClient, type OperationVariables } from "util/graphql";
import { captureException } from "util/exception";
import { pollWhile } from "util/rxjs";

type QueryPollerOptions<TVars> = {
  /** Millisecond rate of polling */
  interval: number;
  /** Query to poll */
  query: DocumentNode;
  /** Variables used by query */
  variables?: TVars;
  /** Optionally use different vars on write */
  writeVariables?: TVars;
  /** If truthy, don't poll */
  skip?: boolean;
  /**
   * Observable emitting `close`/`open` to pause/resume interval polling.
   * This observable represents a semaphore count. Polling will only happen while
   * the count is zero, or the number of calls to close is equal to the number of opens).
   */
  semaphore$?: Observable<"open" | "close">;
  /** Observable which emissions discard data from outstanding query request */
  void$?: Observable<unknown>;
};

/** Polls query at intervals and updates apollo client cache */
export function useQueryPoller<TData, TVars extends OperationVariables>(
  options: QueryPollerOptions<TVars>,
) {
  const { query, skip, interval, semaphore$ = NEVER, void$ = EMPTY } = options;
  const client = useApolloClient();
  if (process.env.NODE_ENV !== "production") {
    if (interval < 1_000) {
      throw new Error(`${interval} is too fast! Take it easy on the server.`);
    }
  }

  const optionsRef = useRef(options);
  useEffect(() => {
    optionsRef.current = options;
  });

  const stableSkip = Boolean(skip); // always true/false for the equality check on dep array
  useEffect(() => {
    if (stableSkip) {
      return;
    }

    const voidOrClose$ = merge(
      void$,
      semaphore$.pipe(filter((semaphoreSignal) => semaphoreSignal === "close")),
    );
    const subscription = semaphore$
      .pipe(
        scan((accum, semaphoreSignal) => {
          const isOpening = semaphoreSignal === "open";
          if (isOpening && accum === 0) {
            // eslint-disable-next-line no-console
            console.warn("Query poller tried to open an already open semaphore.");
            return 0;
          }
          return accum + (isOpening ? -1 : 1);
        }, 0),
        startWith(0),
        pollWhile({
          interval,
          predicate: (count) => count === 0,
          iteration: (_count, _countIndex, pollIndex) => {
            // Don't do a query on the first initial subscription
            if (pollIndex === 0) {
              return EMPTY;
            }

            // Save a reference that is later used in the `switchMap` too so that the
            // server read is always consisent variables with the later client write.
            const { variables, writeVariables } = optionsRef.current;
            return from(
              client.query<TData, TVars>({ query, variables, fetchPolicy: "no-cache" }),
            ).pipe(
              retry({ count: 1, delay: 2_000 }),
              catchError((error) => {
                captureException(error);
                return EMPTY;
              }),
              switchMap((response) => {
                return response.data
                  ? of({ data: response.data, variables: writeVariables || variables })
                  : EMPTY;
              }),
              takeUntil(voidOrClose$),
            );
          },
        }),
      )
      .subscribe(({ data, variables }) => {
        try {
          client.writeQuery<TData, TVars>({ query, variables, data });
        } catch (e) {
          captureException(e);
        }
      });
    return () => subscription.unsubscribe();
  }, [stableSkip, interval, client, query, semaphore$, void$]);
}
