import { decryptData, encryptData } from "@/features/cryptography";
import supabase from "@/features/supabase";
import { store } from "@/store";
import { supabaseApi } from "@/store/services/supabase";
import { PortfolioExportProgressTransformStream } from "@/tools/aggregate/portfolio-export/classes/PortfolioExportProgressTransformStream";
import { PortfolioExportResultsTransformStream } from "@/tools/aggregate/portfolio-export/classes/PortfolioExportResultsTransformStream";
import { PortfolioExportJobSchema } from "@/tools/aggregate/portfolio-export/types";
import { streamProgress, streamResultsErrorsOnly } from "@/tools/aggregate/portfolio-export/utils/stream";
import { KeyPairAndSalt, SecretSalt } from "@/types/cryptography";
import { ProcessStatus } from "@/types/processes";
import { hexToArrayBuffer } from "@/utils/crypto";
import { delay } from "@/utils/promises";
import { cloneDeep } from "lodash/fp";

type DecryptedIdMaps = { supabaseId: string; decryptedId: string }[];

const PROGRESS_ABORTED_BY_USER = "PROGRESS_ABORTED_BY_USER";

export class PortfolioExportProgressTracker {
  public schemas: PortfolioExportJobSchema[] = [];

  constructor(
    private updateSchema: (schema: PortfolioExportJobSchema) => void,
    private keys: {
      user: KeyPairAndSalt;
      org: KeyPairAndSalt | null;
    },
    private inProcessingSchemas: { id: string; controller: AbortController }[] = []
  ) {}

  /**
   * Decrypts encrypted IDs for Portfolio Export assets.
   */
  private getDecryptedAssetIds = async (
    ids: { id: string; encrypted_id: string }[],
    schema: PortfolioExportJobSchema
  ): Promise<DecryptedIdMaps> => {
    const promises = ids.map(async (id) => ({
      supabaseId: id.id,
      decryptedId: await decryptData(schema.secret, hexToArrayBuffer(id.encrypted_id)),
    }));
    return await Promise.all(promises);
  };

  /**
   * Abort if there's a mismatch between this.schemas and this.inProcessingSchemas.
   * This is usually due to cancellation by the user.
   */
  private abortCancelledSchemas = () => {
    try {
      this.inProcessingSchemas.map((processingSchema) => {
        if (!this.schemas.some((currentSchema) => currentSchema.id === processingSchema.id)) {
          processingSchema.controller.abort(PROGRESS_ABORTED_BY_USER);
          console.info(`Portfolio [${processingSchema.id}] aborted by the user`);
          this.inProcessingSchemas = this.inProcessingSchemas.filter((schema) => schema.id !== processingSchema.id);
        }
      });
    } catch (err) {
      console.error("Error while attempting to abort portfolio: ", err);
    }
  };

  /**
   * Abort requests to /progress if there's no session.
   */
  private abortSchemasProgressIfNoSession = async () => {
    const session = (await supabase.auth.getSession()).data.session;
    if (session) return;
    this.inProcessingSchemas.map((schema) => schema.controller.abort());
  };

  private updateAssetValues = async ({ value, schema }: { value: any; schema: PortfolioExportJobSchema }) => {
    const shouldUpdate = value.completed + value.errors === value.total;

    if (shouldUpdate) {
      const action = supabaseApi.endpoints.updatePortfolioProgressCounts.initiate({
        id: schema.id,
        pending: value.pending || 0,
        success: value.completed,
        error: value.errors,
        total: value.total,
      });

      return await store.dispatch(action);
    }
  };

  private getReadableProgressStream = async ({
    encryptedIds,
    controller,
  }: {
    encryptedIds: string[];
    controller: AbortController;
  }): Promise<ReadableStreamDefaultReader<any> | undefined> => {
    return await streamProgress(encryptedIds, controller.signal)
      .then((res) => {
        const stream: ReadableStream = res.stream;
        const transformStream = new PortfolioExportProgressTransformStream();
        const reader = stream.pipeThrough(transformStream).getReader();

        return reader;
      })
      .catch(async (err) => {
        if (err === PROGRESS_ABORTED_BY_USER) return;
        console.log("Retrying fetching progress in 5s: ", err);
        await delay(5000);
        return await this.getReadableProgressStream({ encryptedIds, controller });
      });
  };

  /**
   * Iterates through schemas with a processing status and checks progress.
   */
  public progress = async () => {
    let errorCount = 0;
    this.abortCancelledSchemas();

    // 'processing' schemas yet to be included in this.inProcessingSchemas
    const newProcessingSchemas = this.schemas
      .filter((schema) => schema.status === "processing")
      .filter((schema) => !this.inProcessingSchemas.find(({ id }) => id === schema.id));

    await Promise.all(
      newProcessingSchemas.map(async (schema, idx) => {
        const action = supabaseApi.endpoints.getAllPortfolioAssetIds.initiate(schema.id);
        const controller = new AbortController();
        this.inProcessingSchemas.push({ id: schema.id, controller });

        return await store.dispatch(action).then(async ({ data: ids }) => {
          if (!ids) throw new Error("Failed to get asset IDs for portfolio.");

          let retryStream = true;
          const allIds = await this.getDecryptedAssetIds(ids, schema);
          const encryptedIds = allIds.map((id) => id.decryptedId);

          while (retryStream) {
            const reader = await this.getReadableProgressStream({ encryptedIds, controller });
            let isDone = false;

            while (!isDone) {
              await this.abortSchemasProgressIfNoSession();
              try {
                if (!reader) return;
                const { value } = await reader.read();

                if (value) {
                  let status: ProcessStatus = "processing";

                  const res = await this.updateAssetValues({ schema, value });
                  if (res) {
                    isDone = true;
                    status = "completed";
                    retryStream = false;
                  }

                  if (errorCount !== value.errors) {
                    errorCount = value.errors;
                    await handleResultsError(allIds, encryptedIds, schema.secret, value.errors);
                  }

                  if (value.pending === 0) {
                    isDone = true;
                    status = "completed";
                    retryStream = false;
                    store.dispatch(supabaseApi.util.invalidateTags([{ type: "Org", id: "ToolUsage" }]));
                  }

                  this.updateSchema({
                    ...cloneDeep(schema),
                    assets: {
                      errorCount: value.errors,
                      processedCount: value.completed,
                      totalCount: value.total,
                      unprocessedCount: value.pending,
                    },
                    status,
                  });

                  if (status === "completed") {
                    if (typeof Notification === "undefined" || Notification.permission !== "granted") {
                      return;
                    }

                    new Notification("Portfolio Export | Hub", {
                      body: `${schema.name} is complete`,
                      icon: "/favicon.ico",
                    });
                  }

                  logStream({ schema, value, idx });
                }
              } catch (err: unknown) {
                if (!reader) {
                  isDone = true;
                  retryStream = false;
                  return;
                }

                if (err instanceof Error && err.message === "BodyStreamBuffer was aborted") {
                  reader.releaseLock();
                  isDone = true;
                  retryStream = false;
                  return;
                }

                console.error("Error reading stream: ", err);
                isDone = true;
                reader.releaseLock();
                console.log("Retrying reading stream in 2s");
                await delay(2000);
              }
            }
          }
        });
      })
    );
  };
}

const handleResultsError = async (allIds: DecryptedIdMaps, uuids: string[], secret: SecretSalt, errors: any) => {
  if (!errors) return;
  console.log("[handleResultsError]");
  const res = await streamResultsErrorsOnly(uuids).catch((err) => {
    console.error("Unknown error while fetching errors in results: ", err);
    throw new Error("Unknown error while fetching errors in results.");
  });

  // match up with supabase id

  const stream: ReadableStream = res.stream;
  const transformStream = new PortfolioExportResultsTransformStream();
  const reader = stream.pipeThrough(transformStream).getReader();

  let isDone = false;

  while (!isDone) {
    const { value, done } = await reader.read();

    if (value) {
      const current = allIds.find((id) => value.id === id.decryptedId);
      console.log("[handleResultsError][current]", current);

      if (!current) return;

      const action = supabaseApi.endpoints.updatePortfolioAssetsWithErrors.initiate({
        errors: await encryptData(secret, JSON.stringify(value)),
        id: current.supabaseId,
      });
      const result = await store.dispatch(action);
      if ("error" in result) {
        console.error("[handleResultsError][sb.updatePortfolioAssetsWithErrors]", result.error);
      }
    }
    if (done) {
      isDone = true;
    }
  }
};

const logStream = ({ schema, value, idx }: any) => {
  const colors = ["#fb923c", "#2987D9", "#84cc16", "#97CE81", "#DAD356"];

  console.log(
    `%c [PortfolioExport]: ${idx + 1} `,
    `background: ${colors[idx]}; color: black`,
    `${schema.id.slice(0, 7)}: ${JSON.stringify(value)}`
  );
};
