import { v4 as uuidv4 } from "uuid";
import {
  ReactNode,
  createContext,
  useContext,
  useEffect,
  useRef,
  useState,
} from "react";
import { Mutex } from "async-mutex";
import { createSharedRecordsAccessor } from "../utils/concurrency";
import { WebSocketAdapter, useWebSocket } from "./use_websocket";
import { useWsSubscribe } from "./use_pubsub";

type SubscriptionCallback = (payload: unknown) => void;
export const EventContext = createContext<{
  subscribe: (topic: string, key: string, cb: SubscriptionCallback) => void;
  unsubscribe: (topic: string, key: string) => void;
}>({
  subscribe: () => {},
  unsubscribe: () => {},
});

const PUBLISHED_TOPIC = "published";

export type EventSubscriptionPayload = {
  event: string;
  payload: unknown;
};

type SubscriptionCallbackRecord = Record<string, SubscriptionCallback>;

export function EventProvider({ children }: { children: ReactNode }) {
  const ref = useRef<Record<string, SubscriptionCallbackRecord>>({});

  const mutexRef = useRef<Mutex>(new Mutex());

  const { implementation, connectionStatus } = useWebSocket();

  console.log(
    "[bentows] events.tsx: EventProvider: Implementation: ",
    implementation
  );

  function subscribe(topic: string, key: string, cb: SubscriptionCallback) {
    const sharedRecords = createSharedRecordsAccessor(
      ref.current,
      mutexRef.current
    );

    const func = async () => {
      await sharedRecords.use(async (records) => {
        if (!records[topic]) {
          records[topic] = {};
        }

        records[topic][key] = cb;
      });
    };

    func();
  }

  function unsubscribe(topic: string, key: string) {
    const sharedRecords = createSharedRecordsAccessor(
      ref.current,
      mutexRef.current
    );

    const func = async () => {
      await sharedRecords.use(async (records) => {
        if (!records[topic]) return;

        delete records[topic][key];
      });
    };

    func();
  }

  function callSubscribers(payload: unknown) {
    const json = payload as EventSubscriptionPayload;

    const func = async () => {
      let topicCopy: { [key: string]: SubscriptionCallback } | undefined =
        undefined;

      await mutexRef.current.runExclusive(async () => {
        const topic = ref.current?.[json.event];

        if (!topic) return;

        topicCopy = { ...topic };
      });

      if (topicCopy) {
        const subscribers = Object.entries(
          topicCopy as { [key: string]: SubscriptionCallback }
        ).map((o) => o[1]);

        subscribers.forEach((subscribe) => {
          subscribe(json.payload);
        });
      }
    };

    func();
  }

  useWsSubscribe(
    PUBLISHED_TOPIC,
    (payload) => {
      callSubscribers(payload);
    },
    []
  );

  return (
    <EventContext.Provider
      value={{
        subscribe,
        unsubscribe,
      }}
    >
      {children}
    </EventContext.Provider>
  );
}

export function useEventSubscribe(
  topic: string,
  key: string,
  cb: SubscriptionCallback,
  deps: unknown[] = []
) {
  const { subscribe, unsubscribe } = useContext(EventContext);
  useEffect(() => {
    subscribe(topic, key, cb);

    return () => {
      unsubscribe(topic, key);
    };
  }, deps);
}

export function useEvent<T>(topic: string, wa: WebSocketAdapter) {
  const [data, setData] = useState<T | undefined>(undefined);

  useEventSubscribe(topic, `${uuidv4()}-${topic}`, (payload) => {
    const result = payload as T;
    setData(result);
  });

  return { data };
}

export function useEventContext() {
  return useContext(EventContext);
}
