import { createContext, useContext, useCallback, useMemo, useEffect, type ReactNode } from "react";
import {
  EMPTY,
  Observable,
  catchError,
  scan,
  tap,
  defer,
  retry,
  debounceTime,
  type Observer,
} from "rxjs";

import { ProcessingStates } from "graphql_globals";
import { useApolloClient, type ApolloClient } from "util/graphql";
import { pollWhile } from "util/rxjs";
import { useSubject } from "util/rxjs/hooks";

import DocumentsStatusPollQuery, { type DocumentsStatusPoll } from "./document_query.graphql";

type DocumentSubscription = {
  documentId: string;
  observer: Observer<DocumentStatus>;
  lastSeenStatusFromServer?: {
    timestamp: string;
    processingState: DocumentStatus["processingState"];
  };
};
type StatusOp = { op: "add" | "remove" } & DocumentSubscription;
type FailedDocumentStatus = {
  processingState: ProcessingStates.FAILED;
  processingError: string | null;
  name: null | string;
  classification: { category: string | null; languages: string[] } | null;
  metadata: { pagesTotal: number | null } | null;
};
type NormalDocumentStatus = {
  processingState: ProcessingStates.PENDING | ProcessingStates.DONE;
  processingError?: string | null;
  s3OriginalAsset: { url: string | null } | null;
  name: string | null;
  classification: { category: string | null; languages: string[] } | null;
  splitDocumentIds: string[];
  metadata: { pagesTotal: number | null } | null;
};
type OpLog = { index: number; timestamp: string; documentId: string; op: StatusOp["op"] };
type DocumentStatus = NormalDocumentStatus | FailedDocumentStatus;
type PollingCallback = (documentId: string) => Observable<DocumentStatus>;

const POLLING_RATE_MS = 1_000;
const PollerCtx = createContext<PollingCallback>(() => EMPTY);

export class MissingDocumentError extends Error {}

function logError(firstMessage: string, ...rest: string[]) {
  // eslint-disable-next-line no-console
  console.error(`DOCUMENT STATE POLLER: ${firstMessage}`, ...rest);
}

function singlePoll(
  client: ApolloClient<unknown>,
  documentIds: string[],
  retryDelay: number,
): Observable<DocumentsStatusPoll["documents"]> {
  return defer(async () => {
    const { data } = await client.query<DocumentsStatusPoll>({
      query: DocumentsStatusPollQuery,
      variables: { documentIds },
      fetchPolicy: "network-only", // never read this from local cache
    });
    return data.documents;
  }).pipe(
    tap({
      error: (error: Error | null) => {
        logError(`Polling query retrying because of ${error?.message}`);
      },
    }),
    // Most likely this is network failure, so give it a few tries. It could also
    // be cache update issue though.
    retry({ count: 4, delay: retryDelay }),
  );
}

function getLogsForDocumentId(logs: OpLog[], documentId: string) {
  try {
    return JSON.stringify(logs.filter((log) => log.documentId === documentId));
  } catch {
    return "unknown";
  }
}

function signalSubscribers(
  client: ApolloClient<unknown>,
  subscriptions: DocumentSubscription[],
  logs: OpLog[],
  pollingRate: number,
): Observable<unknown> {
  const uniqueDocumentIds = Array.from(new Set(subscriptions.map((sub) => sub.documentId)));
  return singlePoll(client, uniqueDocumentIds, pollingRate).pipe(
    tap((responseDocuments) => {
      const timestamp = new Date().toISOString();
      for (const subscription of subscriptions) {
        const { documentId, observer } = subscription;
        const foundDocument = responseDocuments.find((doc) => doc?.id === documentId);
        if (!foundDocument) {
          // If we can't find the document in the response, we think this is most likely because of document
          // deletion. We give up and call terminal error on downstream observer (we can't call next with "DONE"
          // because we don't know all the data like S3 URL). Downstream user can choose to restart if they
          // think this is not correct.
          const error = `Could not find ${documentId} in response`;
          const lastSeen = subscription.lastSeenStatusFromServer;
          logError(
            error,
            `Last server status for this subscriber was ${lastSeen?.processingState} at ${lastSeen?.timestamp}`,
            `Logs for all doc subs: ${getLogsForDocumentId(logs, documentId)}`,
          );
          observer.error(new MissingDocumentError(error));
          return;
        } else if (foundDocument.__typename !== "Document") {
          const error = `Unexpected node of type ${foundDocument.__typename} for ${documentId}`;
          logError(error);
          observer.error(new Error(error));
          return;
        }

        const { name, processingState, splitDocumentIds, metadata } = foundDocument;
        subscription.lastSeenStatusFromServer = { processingState, timestamp };
        const isFailed = processingState === ProcessingStates.FAILED;
        const status: DocumentStatus = isFailed
          ? {
              processingState,
              name,
              processingError: foundDocument.processingError,
              classification: foundDocument.classification,
              metadata,
            }
          : {
              processingState,
              name,
              s3OriginalAsset: foundDocument.s3OriginalAsset,
              classification: foundDocument.classification,
              splitDocumentIds,
              metadata,
            };
        observer.next(status);
      }
    }),
    catchError((error) => {
      // If for any reason we crash in an loop of the poll (most likely that query network request),
      // we cannot stop the subscription to `observerOperations$` in the root context -- otherwise no
      // user of this infrastructure will ever be to poll again until a refresh.
      for (const subscription of subscriptions) {
        subscription.observer.error(error);
      }
      return EMPTY;
    }),
  );
}

function DocumentStatePollerProvider(props: { children: ReactNode }) {
  const apolloClient = useApolloClient();
  const observerOperations$ = useSubject<StatusOp>();
  const logs = useMemo<OpLog[]>(() => [], []);
  useEffect(() => {
    const sub = observerOperations$
      .pipe(
        scan<StatusOp, DocumentSubscription[]>(
          (subscriptions, { documentId, observer, op }, index) => {
            logs.push({ timestamp: new Date().toISOString(), op, documentId, index });
            return op === "add"
              ? subscriptions.concat({ documentId, observer })
              : subscriptions.filter((sub) => sub.observer !== observer);
          },
          [],
        ),
        // Doc subscriptions tend to come in a few at a time in a single execution stack. We want to debounce batch them up for throughput.
        debounceTime(0),
        pollWhile({
          interval: POLLING_RATE_MS,
          predicate: (subscriptions) => Boolean(subscriptions.length),
          iteration: (subscriptions) =>
            signalSubscribers(apolloClient, subscriptions, logs, POLLING_RATE_MS),
        }),
      )
      .subscribe();
    return () => sub.unsubscribe();
  }, []);
  const getStatusForDocument = useCallback<PollingCallback>((documentId) => {
    return new Observable((observer) => {
      observerOperations$.next({ op: "add", documentId, observer });
      return () => observerOperations$.next({ op: "remove", documentId, observer });
    });
  }, []);

  return <PollerCtx.Provider value={getStatusForDocument}>{props.children}</PollerCtx.Provider>;
}

/**
 * This is the low-level context for batch polling document processing statuses. You probably don't want
 * to use this directly and instead want to use `useDocumentPoller`.
 */
export function useBatchedDocumentStatusPoller(): PollingCallback {
  return useContext(PollerCtx);
}

export default DocumentStatePollerProvider;
