import { prepareTimeframe, getNextBarTs } from "./helpers";
import { Bar, ResolutionString, SubscribeBarsCallback } from "@src/charting_library";
import { CefiExchangeMarketType } from "@src/store/apis/anbotoApi/types";
import { getMarketDataSubscription } from "@src/subscriptions";
import { MarketDataOHLCVData, MarketDataSubscriptionType, MarketDataTradesData } from "@src/subscriptions/types";

type TSubscriptionItem = {
  data: {
    symbol: string;
    market_type: CefiExchangeMarketType;
    data_type: MarketDataSubscriptionType;
    exchange: string;
    params: Record<string, string | number>;
  };
  timeframe: ResolutionString;
  bar?: {
    time: number;
    open: number;
    high: number;
    low: number;
    close: number;
    volume?: number;
  };
  lastUpdateTs?: number;
  handler: (data: any) => void;
  unsubscribe?: () => void;
};

const marketDataSubscription = getMarketDataSubscription();

export default class SubscriptionManager {
  constructor({ resetData }: { resetData: () => void }) {
    this.resetData = resetData;
  }

  subscriptions = new Map<string, TSubscriptionItem>();
  resetCache: (() => void) | undefined;
  resetData: (() => void) | undefined;

  prepareSubscriptionId = (symbol: string, exchange: string, market_type: string, timeframe?: ResolutionString) =>
    `${symbol}~${exchange}~${market_type}~${timeframe ? prepareTimeframe(timeframe) : ""}`;

  subscribe(
    symbol: string,
    exchange: string,
    market_type: CefiExchangeMarketType,
    timeframe: ResolutionString,
    handler: SubscribeBarsCallback,
    listenerId: string,
    useTrades: boolean,
    lastBar: Bar,
    onResetCacheNeededCallback: () => void
  ) {
    this.resetCache = onResetCacheNeededCallback;

    const subscriptionItem: TSubscriptionItem = {
      data: {
        symbol,
        market_type,
        data_type: useTrades ? MarketDataSubscriptionType.TRADES : MarketDataSubscriptionType.OHLCV,
        exchange,
        params: {},
      },
      timeframe,
      bar: lastBar,
      handler,
    };

    if (!useTrades) subscriptionItem.data.params = { timeframe: prepareTimeframe(timeframe) };

    subscriptionItem.handler = (msg) => {
      this.handleMessage(msg, listenerId, handler);
    };

    marketDataSubscription.subscribe(subscriptionItem.data, subscriptionItem.handler);

    console.log("[tv] subscribe to anboto stream market_data", subscriptionItem.data);

    this.subscriptions.set(listenerId, subscriptionItem);
  }

  unsubscribe(listenerId: string) {
    const subscriptionItem = this.subscriptions.get(listenerId);

    if (subscriptionItem?.handler) {
      marketDataSubscription.unsubscribe(subscriptionItem?.data, subscriptionItem.handler);
    }

    this.subscriptions.delete(listenerId);

    console.log("[tv] unsubscribe from anboto stream market_data", {
      listenerId,
      subscriptionItem,
      subscriptions: this.subscriptions,
    });
  }

  handleMessage = (
    data: MarketDataOHLCVData | MarketDataTradesData,
    listenerId: string,
    handler: SubscribeBarsCallback
  ) => {
    const subscriptionItem = this.subscriptions.get(listenerId);

    if (subscriptionItem && ["trades", "ohlcv"].includes(subscriptionItem.data.data_type)) {
      const { data_type } = subscriptionItem.data;

      if (data_type !== "trades") {
        (data as MarketDataOHLCVData).forEach(([time, open, high, low, close, volume]) =>
          handler({ time, open, high, low, close, volume })
        );
      } else {
        if (!subscriptionItem.bar?.time) {
          console.error("initial history bar not found");
          return false;
        }

        (data as MarketDataTradesData).forEach(({ timestamp, price, amount }) => {
          if (subscriptionItem.lastUpdateTs && subscriptionItem.lastUpdateTs > timestamp) return;

          const nextBarTime = getNextBarTs(subscriptionItem?.bar?.time || +new Date(), subscriptionItem.timeframe);

          let bar: Bar;

          if (timestamp >= nextBarTime) {
            bar = {
              time: nextBarTime,
              open: price,
              high: price,
              low: price,
              close: price,
              volume: amount,
            };

            subscriptionItem.bar = bar;
            subscriptionItem.lastUpdateTs = timestamp;
            handler(bar);
          } else {
            bar = {
              time: subscriptionItem?.bar?.time || 0,
              open: subscriptionItem?.bar?.open || 0,
              high: Math.max(price || 0, subscriptionItem.bar?.high || price || 0),
              low: Math.min(price || 0, subscriptionItem.bar?.low || price || 0),
              close: price || 0,
              volume: (subscriptionItem?.bar?.volume || 0) + amount,
            };

            subscriptionItem.bar = bar;

            if (timestamp >= (subscriptionItem?.lastUpdateTs || 0) + 500) {
              subscriptionItem.lastUpdateTs = timestamp;
              handler(bar);
            }
          }
        });
      }
    }
  };
}
