import grpcWeb from "grpc-web"
import { Message } from "google-protobuf"
import type { AnyRequest, ClientReadableStreamCallback, StreamInvoker } from "./types"
import { ABORTED_STATUS_CODE } from "/src/utils/grpc"

const MAX_ACTIVE_REQUESTS = 5

class RequestQueue {
    private queue: {
        invoker: StreamInvoker,
        request: AnyRequest,
        resolve: (arg0: unknown) => unknown,
        reject: (arg0: unknown) => unknown,
    }[] = []
    private concurrentRunning: number = 0
    private readonly maxConcurrent: number

    constructor(maxConcurrent: number) {
        this.maxConcurrent = maxConcurrent
    }

    // Add a new request to the queue
    enqueue(invoker: StreamInvoker, request: AnyRequest): Promise<grpcWeb.ClientReadableStream<Message>> {
        const promise = new Promise<grpcWeb.ClientReadableStream<Message>>((resolve, reject) => {
            this.queue.push({ invoker, request, resolve, reject })
        })
        this.processQueue()
        return promise
    }

    cancel(request: AnyRequest) {
        const index = this.queue.findIndex((item) => item.request === request)
        if (index > -1) {
            const { reject } = this.queue.splice(index, 1)[0]
            reject(new grpcWeb.RpcError(ABORTED_STATUS_CODE, "Request aborted", {}))
        }
        this.processQueue()
    }

    // Process the queue
    private processQueue() {
        if (this.queue.length > 0 && this.concurrentRunning < this.maxConcurrent) {
            this.concurrentRunning++
            const { invoker, request, resolve, reject } = this.queue.shift()

            // setTimeout to simulate async behavior
            setTimeout(async () => {
                let isFinished = false
                const finish = () => {
                    if (!isFinished) {
                        isFinished = true
                        this.concurrentRunning--
                        this.processQueue()
                    }
                }

                try {
                    const stream = await invoker(request)
                    resolve(stream)

                    // Make sure the request is always cleared from concurrentRunning
                    // if nothig happens for 2 seconds
                    const timeout = setTimeout(finish, 5000)

                    // Remove from the concurrent requests if the stream ends or errors
                    stream.on("end", () => {
                        clearTimeout(timeout)
                        finish()
                    })
                    stream.on("error", () => {
                        clearTimeout(timeout)
                        finish()
                    })
                } catch (e) {
                    reject(e)
                }
            }, 0)
        }
    }
}

const requestQueue = new RequestQueue(MAX_ACTIVE_REQUESTS)

class InterceptedStream implements grpcWeb.ClientReadableStream<Message> {
    private cancelThrottle: () => void
    private promise: Promise<grpcWeb.ClientReadableStream<Message>>
    private signal?: AbortSignal
    private request: grpcWeb.Request<Message, Message>

    constructor(invoker: StreamInvoker, request: grpcWeb.Request<Message, Message>) {

        this.promise = requestQueue.enqueue(invoker, request)
        this.cancelThrottle = () => requestQueue.cancel(request)
        this.request = request
        this.signal = request.getMetadata().signal as unknown as AbortSignal

        if (this.signal) {
            // Delete from the metadata
            delete request.getMetadata().signal
            this.signal.onabort = () => this.cancel()
        }
    }

    on(eventType: "error", callback: (err: grpcWeb.RpcError) => void): grpcWeb.ClientReadableStream<Message>
    on(eventType: "status", callback: (status: grpcWeb.Status) => void): grpcWeb.ClientReadableStream<Message>
    on(eventType: "metadata", callback: (status: grpcWeb.Metadata) => void): grpcWeb.ClientReadableStream<Message>
    on(eventType: "data", callback: (response: Message) => void): grpcWeb.ClientReadableStream<Message>
    on(eventType: "end", callback: () => void): grpcWeb.ClientReadableStream<Message>

    on(eventType: "error" | "status" | "metadata" | "data" | "end", callback: ClientReadableStreamCallback): grpcWeb.ClientReadableStream<Message> {
        if (this.signal?.aborted) {
            this.cancel()
            throw new grpcWeb.RpcError(ABORTED_STATUS_CODE, this.signal.reason, this.request.getMetadata())
        }
        this.promise.then((stream: grpcWeb.ClientReadableStream<Message>) => {
            if (this.signal?.aborted) {
                this.cancel()
                throw new grpcWeb.RpcError(ABORTED_STATUS_CODE, this.signal.reason, this.request.getMetadata())
            }
            // eslint-disable-next-line
            // @ts-ignore
            stream.on(eventType, callback)
        })
        return this
    }

    removeListener(eventType: "error", callback: (err: grpcWeb.RpcError) => void): void
    removeListener(eventType: "status", callback: (status: grpcWeb.Status) => void): void
    removeListener(eventType: "metadata", callback: (status: grpcWeb.Metadata) => void): void
    removeListener(eventType: "data", callback: (response: Message) => void): void
    removeListener(eventType: "end", callback: () => void): void

    removeListener(eventType: "error" | "status" | "metadata" | "data" | "end", callback: ClientReadableStreamCallback) {
        this.promise.then((stream: grpcWeb.ClientReadableStream<Message>) => {
            // eslint-disable-next-line
            // @ts-ignore
            stream.removeListener(eventType, callback)
        })
    }

    cancel() {
        this.cancelThrottle()
        this.promise.then((stream: grpcWeb.ClientReadableStream<Message>) => stream.cancel())
    }
}

export class ThrottledStreamInterceptor implements grpcWeb.StreamInterceptor<Message, Message> {
    intercept(
        request: grpcWeb.Request<Message, Message>, invoker: StreamInvoker,
    ): grpcWeb.ClientReadableStream<Message> {
        return new InterceptedStream(invoker, request)
    }
}
