import { Observable, takeUntil } from "rxjs";

import { shareLastValue, retryWhenWithCaptureException } from "util/rxjs";
import Channel from "socket/channel";

/** Get an observable for socket creation */
export function socketChannel({ channelName }: { channelName: string }): Observable<Channel> {
  return new Observable<Channel>((observer) => {
    const channel = new Channel(channelName);
    channel.onOpen(observer.next.bind(observer, channel));
    channel.subscribe();
    return channel.teardown.bind(channel);
  }).pipe(shareLastValue());
}

export function safeSubscribeToChannel<T>(
  endNotifier$: Observable<unknown>,
  root$: Observable<T>,
  next: (value: T) => void,
) {
  return root$.pipe(retryWhenWithCaptureException(), takeUntil(endNotifier$)).subscribe(next);
}

/** Observable factory function for socket messages */
export function fromSocketEvent<T>(channel: Channel, eventName: string): Observable<T> {
  return new Observable((observer) => {
    const callback = observer.next.bind(observer);
    channel.on(eventName, callback);
    return channel.off.bind(channel, eventName, callback);
  });
}
