import { boolean } from "zod";

export const ReadableStream: typeof import("stream/web").ReadableStream =
  // biome-ignore lint/suspicious/noExplicitAny: hack
  globalThis.ReadableStream as any;
export const TransformStream: typeof import("stream/web").TransformStream =
  // biome-ignore lint/suspicious/noExplicitAny: hack
  globalThis.TransformStream as any;
export type ReadableStream<T = unknown> = import("stream/web").ReadableStream<T>;
export type TransformStream<T = unknown, U = unknown> = import("stream/web").TransformStream<T, U>;

export interface Pipe {
  <A>(value: A): A;
  <A, B>(value: A, fn1: (input: A) => B): B;
  <A, B, C>(value: A, fn1: (input: A) => B, fn2: (input: B) => C): C;
  <A, B, C, D>(value: A, fn1: (input: A) => B, fn2: (input: B) => C, fn3: (input: C) => D): D;
  <A, B, C, D, E>(value: A, fn1: (input: A) => B, fn2: (input: B) => C, fn3: (input: C) => D, fn4: (input: D) => E): E;
  // ... and so on
}

// biome-ignore lint/suspicious/noExplicitAny: <explanation>
// biome-ignore lint/complexity/noBannedTypes: <explanation>
export const pipe: Pipe = (value: any, ...fns: Function[]): unknown => {
  return fns.reduce((acc, fn) => fn(acc), value);
};

export const curry = <R, T1, T2 extends unknown[]>(fn: (t: T1, ...args: T2) => R) => {
  return (t: T1) =>
    (...args: T2) =>
      fn(t, ...args);
};
export const curry2 = <R, T1, T2, T3 extends unknown[]>(fn: (t: T1, t2: T2, ...args: T3) => R) => {
  return (t: T1, t2: T2) =>
    (...args: T3) =>
      fn(t, t2, ...args);
};

export const scan = curry2(<T, TAcc>(f: (acc: TAcc, chunk: T) => TAcc, initial: TAcc, stream: ReadableStream<T>): ReadableStream<TAcc> => {
  const reader = stream.getReader();
  let acc = initial;
  return new ReadableStream<TAcc>({
    start(controller) {
      (async () => {
        while (true) {
          try {
            const { done, value } = await reader.read();
            if (done) {
              controller.close();
              break;
            }
            acc = f(acc, value);
            controller.enqueue(acc);
          } catch (error) {
            controller.error(error);
            break;
          }
        }
      })();
    },
    cancel() {
      reader.cancel();
    },
  });
});

export const onError = curry(<T>(fn: (err: unknown) =>  Promise<undefined | boolean>, stream: ReadableStream<T>) => {
  const reader = stream.getReader();
  return new ReadableStream<T>({
    start(controller) {
      (async () => {
        while (true) {
          try {
            const { done, value } = await reader.read();
            if (done) {
              controller.close();
              break;
            }
            controller.enqueue(value);
          } catch (error) {
            const stopGracefully = await fn(error).catch(() => {
              console.log("Error handling error");
            });
            if (stopGracefully) {
              controller.close();
            } else {
              controller.error(error);
            }
            break;
          }
        }
      })();
    },
    async cancel() {
      await reader.cancel();
    },
  });
});

export const map = curry(<T, U>(fn: (x: T) => U | PromiseLike<U>, stream: ReadableStream<T>) => {
  return stream.pipeThrough(
    new TransformStream<T, U>({
      async transform(chunk, controller) {
        controller.enqueue(await fn(chunk));
      },
    }),
  );
});

export const flatMap = curry(<T, U>(fn: (x: T) => ReadableStream<U>, stream: ReadableStream<T>) => {
  return stream.pipeThrough(
    new TransformStream<T, U>({
      async transform(chunk, controller) {
        for await (const c of fn(chunk)) {
          controller.enqueue(c);
        }
      },
    }),
  );
});

export const filter = curry(<T>(fn: (x: T) => boolean | PromiseLike<boolean>, stream: ReadableStream<T>) => {
  return stream.pipeThrough(
    new TransformStream<T, T>({
      async transform(chunk, controller) {
        if (await fn(chunk)) {
          controller.enqueue(chunk);
        }
      },
    }),
  );
});

export const fromPromise = <T>(p: Promise<T>) => {
  const reader = new ReadableStream({
    async start(controller) {
      (async () => {
        try {
          const value = await p;
          controller.enqueue(value);
          controller.close();
        } catch (error) {
          controller.error(error);
        }
      })();
    },
  });
  return reader;
};

export const tap = curry(<T>(fn: (x: T) => void | PromiseLike<void>, stream: ReadableStream<T>) => {
  return stream.pipeThrough(
    new TransformStream<T, T>({
      async transform(chunk, controller) {
        await fn(chunk);
        controller.enqueue(chunk);
      },
    }),
  );
});

export function of<T>(data: T): ReadableStream<T> {
  return new ReadableStream<T>({
    async start(controller) {
      controller.enqueue(data);
      controller.close();
    },
  });
}

export function from<T>(iterable: Iterable<T>): ReadableStream<T> {
  return new ReadableStream<T>({
    async start(controller) {
      for (const value of iterable) {
        controller.enqueue(value);
      }
      controller.close();
    },
  });
}

export const skip = curry(<T>(n: number, stream: ReadableStream<T>) => {
  return new ReadableStream<T>({
    async start(controller) {
      (async () => {
        let i = 0;
        try {
          for await (const item of stream) {
            if (i++ < n) {
              continue;
            }
            controller.enqueue(item);
          }
          controller.close();
        } catch (error) {
          controller.error(error);
        }
      })();
    },
    cancel() {
      stream.cancel();
    },
  });
});

export const skipUntil = curry(<T>(predicate: (x: T) => boolean, stream: ReadableStream<T>) => {
  return new ReadableStream<T>({
    async start(controller) {
      (async () => {
        let shouldSkip = true;
        try {
          for await (const item of stream) {
            if (shouldSkip) {
              shouldSkip = !predicate(item);
              if (shouldSkip) {
                continue;
              }
            }
            controller.enqueue(item);
          }
          controller.close();
        } catch (error) {
          controller.error(error);
        }
      })();
    },
    cancel() {
      stream.cancel();
    },
  });
});

export async function toPromise<T>(r: ReadableStream<T>): Promise<T[]> {
  const acc: T[] = [];
  for await (const value of r) {
    acc.push(value);
  }
  return acc;
}


export const endsWith = <T, U = T>(value: U, stream: ReadableStream<T>) =>{
  return new ReadableStream<T | U>({
    async start(controller) {
      for await (const item of stream) {
        controller.enqueue(item);
      }
      controller.enqueue(value)
      controller.close();
    },
  });
};

/*
  export function convertNodeTransform<T,U>(s: NodeTransformStream){
      return new TransformStream<T, U>({
        start(controller) {
          s.on("data", (data) => controller.enqueue(data));
        },
        async transform(chunk, controller) {
          s.write(chunk as T);
        },
      });
    }
  */
