import { match } from "ts-pattern"
import { atomFamily, unwrap, useAtomCallback } from "jotai/utils"
import { startTransition, use, useCallback } from "react"
import type { ResourceStreamTrace } from "shared/data/resource-stream"
import { P } from "ts-pattern"
import { scan, from, pipe, flatMap } from "../../../utils/readable-streams"
import { atom, useAtomValue, useSetAtom, type Setter } from "jotai"
import {
  j_baseResources,
  j_localResources,
  j_playground,
  j_playgroundId,
  j_playgroundLocalState,
  j_remoteResources,
  j_resourceById,
  j_selectedOutputTab,
  j_setOutputTab,
  useSelectOutputTab,
} from "./state"
import type { UIResource } from "shared/data/resource"
import type { ClientToolDef } from "shared/tool"
import { type ReadableStream, ReadableStream as WebReadableStream } from "../../../utils/readable-streams"
import type { Playground } from "shared/data/playground"

export function useApplyResourceStream() {
  return useSetAtom(j_applyResourceStream)
}

const j_runningProcesses_base = atom<Record<string, AbortController>>({})
export const j_runningProcesses = atomFamily((id: string) =>
  atom(
    get => {
      const running = get(j_runningProcesses_base)
      if (!running[id]) {
        return undefined
      }
      return running[id]
    },
    (get, set, arg: AbortController | undefined) => {
      set(j_runningProcesses_base, g => {
        if (!arg) {
          const { [id]: _, ...rest } = g
          return rest
        }
        return {
          ...g,
          [id]: arg,
        }
      })
    },
  ),
)

type WebsocketContext = {
  ws: WebSocket
  resourceStream: ReadableStream<ResourceStreamTrace>
  messagesToSend: Map<string, any>
  abortController: AbortController
}

const j_resourceSockets = atom<Record<string, WebsocketContext>>({})
export const j_resourceSocket = atomFamily((id: string) =>
  atom(
    get => {
      const sockets = get(j_resourceSockets)
      if (!sockets[id]) {
        return undefined
      }
      return sockets[id]
    },
    (get, set, arg?: string) => {
      if (!arg) {
        set(j_resourceSockets, g => {
          const { [id]: _, ...rest } = g
          return rest
        })
        return
      }
      const ws = new WebSocket(arg)
      const abortController = registerAbortController(id, set)
      abortController.signal.addEventListener("abort", () => {
        ws.close()
      })
      ws.onclose = () => {
        abortController.abort()
        set(j_resourceSockets, g => {
          const { [id]: _, ...rest } = g
          return rest
        })
        set(j_runningProcesses(id), undefined)
      }
      const messagesToSend = new Map<string, any>()
      const handleWaitMessage = async (resource: string, messageType: string) => {
        if (messagesToSend.has(`${messageType}_${resource}`)) {
          return messagesToSend.get(`${messageType}_${resource}`)
        }
        return await new Promise((resolve, reject) => {
          messagesToSend.set(`${messageType}_${resource}`, resolve)
        })
      }

      const resourceStream = new WebReadableStream<ResourceStreamTrace>({
        start(controller) {
          ws.onmessage = event => {
            const message = JSON.parse(event.data) as
              | ResourceStreamTrace
              | {
                  type: "waiting"
                  resource: string
                  messageType: string
                }
            if (message.type === "waiting") {
              handleWaitMessage(message.resource, message.messageType)
                .then(x => {
                  messagesToSend.delete(`${message.messageType}_${message.resource}`)
                  return x
                })
                .then(x => ws.send(JSON.stringify(x)))
            } else {
              controller.enqueue(message)
            }

            ws.onclose = () => {
              controller.close()
            }
          }
        },
      })
      const wsContext = {
        ws,
        resourceStream,
        messagesToSend,
        abortController,
      }

      set(j_resourceSockets, g => {
        if (!arg) {
          const { [id]: _, ...rest } = g
          return rest
        }
        return {
          ...g,
          [id]: wsContext,
        }
      })
      return wsContext
    },
  ),
)

function resolveWaitingMessage(wsContext: WebsocketContext, resource: string, messageType: string, data: any) {
  const msg = wsContext.messagesToSend.get(`${messageType}_${resource}`)
  const payload = {
    messageType,
    resource,
    ...data,
  }
  if (msg && typeof msg === "function") {
    msg(payload)
  } else {
    wsContext.messagesToSend.set(`${messageType}_${resource}`, payload)
  }
}

export const j_connectResourceWS = atom(
  null,
  (
    get,
    set,
    resourceId: string,
    opts?: {
      query?: string
      forceRecrate?: boolean
    },
  ) => {
    let wsContext = get(j_resourceSocket(resourceId))
    if (wsContext?.ws.readyState === WebSocket.CLOSED) {
      wsContext = undefined
    }
    if (opts?.forceRecrate && wsContext) {
      wsContext.abortController.abort()
      wsContext = undefined
    }
    if (!wsContext) {
      wsContext = set(j_resourceSocket(resourceId), `/api/playgrounds/${get(j_playgroundId)}/resources/${resourceId}/ws${opts?.query ? `?${opts.query}` : ""}`)
      if (!wsContext) {
        throw new Error("Failed to create socket")
      }
       set(j_resourceById(resourceId), g => ({
        ...g,
        trace: [],
      }))
      set(j_applyResourceStream, wsContext.resourceStream).catch(e => {
        
        console.error("Failed to apply resource stream", e)
        wsContext?.abortController.abort()
        set(j_resourceSocket(resourceId), undefined)
      })
      if (!opts?.query?.includes("regenerate=") && !opts?.query?.includes("resume") && !opts?.query?.includes("release")) {
        set(j_setOutputTab, resourceId)
      }
    }
    return wsContext!
  },
)

export function useResourceWSApi() {
  const connect = useAtomCallback(
    (
      get,
      set,
      resourceId: string,
      opts?: {
        query?: string
        forceRecrate?: boolean
      },
    ) => {
      if (opts?.forceRecrate) {
        startTransition(() => {
          //@ts-ignore
          set(j_resourceById(resourceId), g => ({
            ...g,
            status: "init",
            output: undefined,
            trace: [],
          }))
        })
      }
      const wsContext = set(j_connectResourceWS, resourceId, opts)
      return wsContext
    },
  )
  const create = useAtomCallback(
    useCallback(
      (
        get,
        set,
        opts: {
          resourceId: string
          tool: Pick<ClientToolDef, "name" | "outputType">
          toolArgs: any
          regenerate?: boolean
        },
      ) => {
        const generator = {
          tool: opts.tool.name,
          args: opts.toolArgs,
          outputType: opts.tool.outputType ?? "document",
        }
        set(j_resourceById(opts.resourceId), ({__partial, ...g}) => ({
          ...g,
          generator: generator,
          status: "init",
          output: undefined,
          trace: [],
        }))

        const wsContext = connect(opts.resourceId, opts.regenerate ? { query: "regenerate", forceRecrate: true } : undefined)

        resolveWaitingMessage(wsContext, opts.resourceId, "create", {
          id: opts.resourceId,
          friendlyName: `${opts.tool.outputType}_${opts.resourceId}`,
          generator: generator,
        })
      },
      [],
    ),
  )

  const release = useAtomCallback(
    useCallback(
      (
        get,
        set,
        { rootResourceId, triggeringResourceId }: { rootResourceId: string; triggeringResourceId: string },
        msg: { args: unknown[]; result: unknown },
      ) => {
        const wsContext = connect(rootResourceId)
        resolveWaitingMessage(wsContext!, triggeringResourceId, "release", msg)
      },
      [],
    ),
  )

  const resume = useAtomCallback(
    useCallback((get, set, resourceId: string) => {
      connect(resourceId)
    }, []),
  )

  const regenerateFromCheckpoint = useAtomCallback(
    useCallback((get, set, rootResourceId: string, checkpointId: string) => {
      connect(rootResourceId, {
        forceRecrate: true,
        query: `regenerate=${checkpointId}`,
      })
    }, []),
  )

  return {
    connect,
    create,
    resume,
    release,
    regenerateFromCheckpoint,
  }
}

const registerAbortController = (resourceId: string, set: Setter) => {
  const abortController = new AbortController()
  set(j_runningProcesses(resourceId), abortController)

  abortController.signal.addEventListener("abort", async () => {
    set(j_localResources, x => x.map(y => (y.status === "generating" ? { ...y, status: "paused" } : y)))
    set(j_runningProcesses(resourceId), undefined)
  })
  return abortController
}

const j_applyResourceStream = atom(null, async (get, set, data: ReadableStream<ResourceStreamTrace>) => {
  const resources = get(j_baseResources)
  for await (const message of data) {
    if (message.type === "create-subresource") {
      const current = get(j_baseResources)
      startTransition(() => {
        set(j_localResources, g => {
          const existing = current.find(x => x.id === message.data.id)
          if (existing?.status === "done" || existing?.status === "error") {
            return g
          }
          return [...g.filter(x => x.id !== message.data.id), message.data as UIResource]
        })
      })
    }
    if (!resources.some(x => x.id === message.resource)) {
      await get(j_resourceById(message.resource))
    }
    startTransition(() => {
      set(j_resourceById(message.resource), s => {
        const newResource = {
          ...s,
          status: "generating",
          trace: s.trace ?? [],
        } as UIResource


        if (message.type === "log" || message.type === "data" || message.type === "progress") {
          const [prevTrace] = newResource.trace.slice(-1)
          if (prevTrace?.type === message.type && !message.flush) {
            prevTrace.data += message.data
          } else {
            newResource.trace = [...newResource.trace, message]
          }
        } else {
          newResource.trace = [...newResource.trace, message]
        }

        return (
          match(message)
            .with({ type: "update-resource-name" }, x => ({
              ...newResource,
              friendlyName: x.data,
            }))
            .with(
              {
                type: "create-subresource",
                data: { id: P.nonNullable },
              },
              x => (x.data.status === "done" || x.data.status === "error") ? x : ({
                ...newResource,
                output: {
                  ...newResource.output,
                  childResources: [...new Set([...(newResource.output?.childResources ?? []), x.data.id])],
                },
              }),
            )
            /*
            .with({ type: "data" }, (x) => ({
              ...newResource,
              output: {
                ...newResource.output,
                data:  (x.flush ? "" : (newResource.output?.data ?? "")) + x.data,
              },
            }))*/
            .with({ type: "error" }, x => ({
              ...newResource,
              status: "error",
              output: {
                ...newResource.output,
                data: x.data,
              },
            }))
            .with({ type: "done" }, x => ({
              ...newResource,
              status: "done",
              output: {
                ...newResource.output,
                data: x.data,
              },
            }))
            .with({ type: "suspend" }, x => ({
              ...newResource,
              status: "suspended",
            }))
            .with({
              type: "abort",
              cause: "cache-invalidation",
            }, x => {
              console.log("cache invalidation happened")
            return {
              ...newResource,
              frozen: true,
                __partial: true,
              }
            })
            .with({ type: "abort" }, x => ({
              ...newResource,
              __partial: true,
              status: "paused",
            }))
            .otherwise(() => newResource) as UIResource
        )
      })
    })
  }
})

export const j_suspendedCauseResource = atomFamily((resourceId: string) => {
  return atom(get => {
    return (async function suspendedCauseResource(resourceId: string): Promise<UIResource | null> {
      const resource = await get(j_resourceById(resourceId))
      if (resource.status !== "suspended") {
        return null
      }
      const children = (await Promise.all((resource.output?.childResources ?? []).map(x => get(j_resourceById(x))))).filter(x => x?.status === "suspended")
      if (resource.status === "suspended" && children?.length === 0) {
        return resource
      }
      return (await Promise.all(children.map(x => suspendedCauseResource(x.id))))[0]
    })(resourceId)
  })
})

const j_rootParent = atomFamily((resourceId: string) => {
  return atom(get => {
    return (async function rootParent(resourceId: string): Promise<UIResource | null> {
      const resource = await get(j_resourceById(resourceId))
      if (!resource.parentResource) {
        return resource
      }
      return await rootParent(resource.parentResource)
    })(resourceId)
  })
})

export const useGetRootParentResource = (resourceId: string) => {
  return useAtomValue(j_rootParent(resourceId))
}

export const useGetSuspendCauseResource = (resourceId: string) => {
  return useAtomValue(j_suspendedCauseResource(resourceId))
}

export function useUpdateFriendlyName() {
  const playground = useAtomValue(j_playgroundId)

  return useAtomCallback(
    useCallback(async (get, set, { resourceId, newName }: { resourceId: string; newName: string }) => {
      // Update the local state optimistically
      startTransition(() => {
        set(j_resourceById(resourceId), resource => ({
          ...resource,
          friendlyName: newName.trim(),
        }))
      })

      const response = await fetch(`/api/playgrounds/${playground}/resources/${resourceId}`, {
        method: "PATCH",
        headers: {
          "Content-Type": "application/json",
        },
        body: JSON.stringify({
          friendlyName: newName.trim(),
        }),
      })

      if (!response.ok) {
        set(j_remoteResources)
        set(j_localResources, x => x.filter(y => y.id !== resourceId))
        throw new Error("Failed to update name")
      }
    }, []),
  )
}

export const useUpdateResourceArchived = () => {
  const playground = useAtomValue(j_playgroundId)

  return useAtomCallback(
    useCallback(async (get, set, { resourceId, newArchived }: { resourceId: string; newArchived: boolean }) => {
      // Update the local state optimistically
      startTransition(() => {
        set(j_resourceById(resourceId), resource => ({
          ...resource,
          archived: newArchived,
        }))
      })

      const response = await fetch(`/api/playgrounds/${playground}/resources/${resourceId}`, {
        method: "PATCH",
        headers: {
          "Content-Type": "application/json",
        },
        body: JSON.stringify({
          archived: newArchived,
        }),
      })

      if (!response.ok) {
        set(j_remoteResources)
        set(j_localResources, x => x.filter(y => y.id !== resourceId))
        throw new Error("Failed to update name")
      }
    }, []),
  )
}

export const useUpdatePlayground = () => {

  return useAtomCallback(useCallback(async (get, set, update: Partial<Playground>) => {
    const playgroundId = get(j_playgroundId)
    const playground = await get(j_playground)
    startTransition(() => {
      set(j_playgroundLocalState, {
        ...playground,
        ...update,
      })
    })
    
    const response = await fetch(`/api/playgrounds/${playgroundId}`, {
      method: "PATCH",
      headers: {
        "Content-Type": "application/json",
      },
      body: JSON.stringify(update),
    })

    startTransition(() => {
      set(j_playgroundLocalState, null)
    })
    
    if (!response.ok) {
      throw new Error("Failed to update name")
    }
  }, []),
  )
}