/* eslint-disable no-restricted-imports */
import { Wukong } from '@wukong/bridge-proto'
import { debounce, DebouncedFunc } from 'lodash-es'
import { BehaviorSubject, Subject } from 'rxjs'
import { intervalWorker } from '../../../../util/src'
import { TraceableAbortSignal } from '../../../../util/src/abort-controller/traceable-abort-controller'
import { transaction } from '../../../../util/src/abort-controller/traceable-transaction'
import { environment, IN_JEST_TEST } from '../../environment'
import { debugError, debugLog } from '../debug'
import { MessageContentType, MessageRequestParamVO } from '../interface/notify'
import {
    NotifyWebSocketMessage,
    WsAuthResult,
    WsBusinessMessage,
    WsError,
    WsPropertyChangeMessage,
    WsSubscription,
    WsUnSubscription,
} from './notify-message'

export enum WebSocketStatus {
    Connecting = 1,
    Connected = 2,
    Reconnecting = 3,
    Disconnected = 4,
}

export const SECOND = 1000
export const HEARTBEAT_INTERVAL = 10 * SECOND
// 断线重连周期
export const RECONNECT_INTERVALS = [SECOND, 5 * SECOND, 10 * SECOND, 30 * SECOND, 60 * SECOND]

export type SubscribeOptions = Pick<MessageRequestParamVO, 'messageContentType' | 'filterParameters'>

export interface ConnectInfo {
    sessionId?: string
    disconnectedDuration?: number
}

export class NotifyService {
    /**
     * 用于校验是否是预期的 socket 连接接收到的消息
     */
    private currentSocketId = 0
    /**
     * 用于记录验证状态
     */
    private sessionId: string | null = null
    private socket?: WebSocket
    protected status = WebSocketStatus.Disconnected
    private is401 = false
    private bindMethods: Record<number, (proto: Wukong.NotifyProto.MessageProto) => void> = {}
    private networkProbeWorker: Worker | null = null
    private timeoutDebounce: DebouncedFunc<() => void> | null = null
    private userId = 0
    private lastDisconnectTime: number | null = null // 上次断开时刻
    private lastConnectTime: number | null = null // 上次连接时刻
    private timeoutReconnectTime = HEARTBEAT_INTERVAL * 20

    constructor(isMirror = false) {
        if (isMirror) {
            this.timeoutReconnectTime = HEARTBEAT_INTERVAL * 3 // 新 mirror 未收到服务端消息自动断线重连时间为 30s
        }
    }

    // 改成 BehaviorSubject 因为评论和预览发送订阅的时机不同
    public connect$ = new BehaviorSubject<ConnectInfo>({})
    public onConnectChangeWithSignal = (signal: TraceableAbortSignal, callback: (info: ConnectInfo) => void) => {
        const { act } = transaction(signal)
        act('onConnectChangeWithSignal', () => {
            const subscription = this.connect$.subscribe(callback)
            return () => subscription.unsubscribe()
        })
    }

    // 不要在外部直接订阅这个businessMessage$, 这会造成消息的多次解码。
    // 使用基于下面的 propertyChangeMessage$ 对外暴露 callback 的方式，参考 onUserPropertyChange
    public businessMessage$ = new Subject<Wukong.NotifyProto.BusinessMessageProto>()
    public onBusinessMessageChangeWithSignal = (
        signal: TraceableAbortSignal,
        callback: (msg: Wukong.NotifyProto.BusinessMessageProto) => void
    ) => {
        const { act } = transaction(signal)
        act('onBusinessMessageChangeWithSignal', () => {
            const subscription = this.businessMessage$.subscribe(callback)
            return () => subscription.unsubscribe()
        })
    }

    public onUserPropertyChangeWithSignal(
        signal: TraceableAbortSignal,
        orgId: string,
        callback: (msg: Wukong.NotifyProto.BusinessEntityPropertiesChange) => void
    ) {
        const subscribeParam = {
            [MessageContentType.PropertyChange]: {
                filters: [
                    {
                        filterParameters: [
                            { name: 'entityType', value: Wukong.NotifyProto.EntityType.USER - 1 + '' },
                            { name: 'entityId', value: this.userId.toString() },
                        ],
                    },
                ],
            },
        }

        // 这里先不能取消订阅，因为工作台也会订阅，取消订阅会导致工作台无法接收到消息，等把所有订阅收敛到 notifyService 内部做引用计数后再打开
        // signal.addEventListener('abort', () => {
        //     this.sendUnSubscribeProto(orgId, subscribeParam)
        // })

        this.onConnectChangeWithSignal(signal, ({ sessionId }) => {
            if (sessionId) {
                this.sendSubscribeProto(orgId, subscribeParam)
            }
        })

        this.onBusinessMessageChangeWithSignal(signal, (msg) => {
            if (msg.businessCode !== WsPropertyChangeMessage.code) {
                return
            }

            const body = WsPropertyChangeMessage.bodyType.decode(msg.payload)
            if (body.businessEntity?.entityType !== Wukong.NotifyProto.EntityType.USER) {
                return
            }

            callback(body)
        })
    }

    setUserId(userId: number) {
        this.userId = userId
    }

    /**
     * 重连次数，用于计算重连的延时间隔
     */
    private reconnectTimes = 0
    protected reconnectingTimer: NodeJS.Timeout | null = null

    private getDisconnectedDuration = (): number | undefined => {
        if (this.lastDisconnectTime && this.lastConnectTime) {
            return this.lastConnectTime - this.lastDisconnectTime
        }
        return undefined
    }

    /**
     * 建立连接
     * @returns
     */
    connect = () => {
        if (this.status !== WebSocketStatus.Disconnected) {
            debugError(`NotifyService should not connect websocket when status = ${this.status}`)
            return
        }

        this.status = WebSocketStatus.Connecting

        this.socket = new WebSocket(environment.notifyWssPrefix)
        this.socket.binaryType = 'arraybuffer'

        this.currentSocketId++
        const socketId = this.currentSocketId

        this.socket.onerror = this.onError
        this.socket.onclose = this.onClose
        this.socket.onopen = this.onOpen

        this.onProto(WsAuthResult, this.onAuthResult)
        this.onProto(WsError, this.onErrorMessage)
        this.onProto(WsBusinessMessage, this.onBusinessMessage)
        this.socket.onmessage = (e) => this.onMessage(e, socketId)

        this.startNetworkProbe()

        this.lastConnectTime = Date.now()
        debugLog(`NotifyService connected. [socketId=${this.currentSocketId}]`)
    }

    /**
     * ws 开启
     * @returns
     */
    private onOpen = async () => {
        if (this.socket?.readyState !== WebSocket.OPEN) {
            debugError(`NotifyService onOpen but socket is not open, state=${this.socket?.readyState}`)
            return
        }

        this.status = WebSocketStatus.Connected
        // 连接成功后重置重连次数
        this.reconnectTimes = 0
    }

    /**
     * ws 关闭
     * @param event
     */
    private onClose = (event: CloseEvent) => {
        debugError(`NotifyService is closed. ${JSON.stringify(event, ['isTrusted', 'reason', 'wasClean', 'code'])}`)
        this.disconnectAndTryReconnect()
    }

    /**
     * ws 出错
     * @param event
     */
    private onError = (event: Event) => {
        debugError(event)
        debugError(`NotifyService on error. ${JSON.stringify(event, Object.getOwnPropertyNames(event))}`)
        this.disconnectAndTryReconnect()
    }

    /**
     * 收到 ws 的消息
     * @param msg
     * @param socketId
     * @returns
     */
    private onMessage = (msg: MessageEvent, socketId: number) => {
        if (this.currentSocketId !== socketId) {
            return
        }

        this.timeoutDebounce?.()

        const message = Wukong.NotifyProto.MessageProto.decode(new Uint8Array(msg.data))
        const callback = this.bindMethods[message.messageType]
        callback?.(message)
    }

    /**
     * 收到 ws 的错误消息
     * @param proto
     */
    private onErrorMessage = (proto: Wukong.NotifyProto.ErrorProto) => {
        debugError(`NotifyService onErrorMessage ${proto.errorCode} ${proto.subCode}`)
        this.disconnectAndTryReconnect()
    }

    /**
     * 收到 ws 的鉴权成功消息
     * @param authResult
     */
    private onAuthResult = (authResult: Wukong.NotifyProto.AuthResultProto) => {
        if (authResult.status == 200) {
            const sessionId = authResult.sessionId.toString()
            this.sessionId = sessionId
            debugLog(`NotifyService onAuthed. [sesionId=${this.sessionId}]`)
            this.connect$.next({ sessionId, disconnectedDuration: this.getDisconnectedDuration() })
        } else if (authResult.status == 401) {
            this.is401 = true
            debugError(
                `NotifyService AuthResult failed 用户未登录. [status=${authResult.status}, sessionId=${authResult.sessionId}]`
            )
        } else {
            debugError(
                `NotifyService AuthResult failed. [status=${authResult.status}, sessionId=${authResult.sessionId}]`
            )
            this.connect$.next({})
        }
    }

    /**
     * 收到 ws 的业务消息
     * @param businessMessage
     * @returns
     */
    private onBusinessMessage = (businessMessage: Wukong.NotifyProto.BusinessMessageProto) => {
        this.businessMessage$.next(businessMessage)
    }

    /**
     * 注册 ws 下行的消息回调
     * @param message
     * @param callback
     */
    private onProto = <T>(message: NotifyWebSocketMessage<T>, callback: (arg: T) => void) => {
        this.bindMethods[message.code] = (proto) => {
            const body = message.bodyType.decode(proto.payload)
            callback(body)
        }
    }

    /**
     * 发送 ws 上行消息
     * @param message
     * @param arg
     * @returns
     */
    private sendProto = <T>(message: NotifyWebSocketMessage<T>, arg: Partial<T>): boolean => {
        if (!this.webSocketConnected()) {
            return false
        }

        this.socket?.send(
            Wukong.NotifyProto.MessageProto.encode({
                messageType: message.code,
                payload: message.bodyType.encode(arg).finish(),
            }).finish()
        )
        return true
    }

    private webSocketConnected(): boolean {
        return this.status === WebSocketStatus.Connected && this.socket?.readyState == WebSocket.OPEN
    }

    /**
     * 发送 ws 订阅消息
     */
    public sendSubscribeProto = (
        orgId: string,
        subscribeFilters: { [k in MessageContentType]?: Wukong.NotifyProto.ISubscriptionFilterCollectionProto }
    ) => {
        this.sendProto(WsSubscription, {
            orgId,
            subscribeFilters,
        })
    }
    /**
     * 发送 ws 取消订阅消息
     */
    public sendUnSubscribeProto = (
        orgId: string,
        subscribeFilters: { [k in MessageContentType]?: Wukong.NotifyProto.ISubscriptionFilterCollectionProto }
    ) => {
        this.sendProto(WsUnSubscription, {
            orgId,
            subscribeFilters,
        })
    }

    /**
     * 开启 ws 通道的心跳检测
     */
    private startNetworkProbe = () => {
        //  等集成测试支持加载 wasm 后去掉
        if (!IN_JEST_TEST) {
            intervalWorker(HEARTBEAT_INTERVAL, environment.isDev).then((worker) => {
                this.networkProbeWorker = worker
                worker.onmessage = () => {
                    this.sendPing()
                }
            })

            if (!this.timeoutDebounce) {
                // 超时未收到服务端消息主动断连
                this.timeoutDebounce = debounce(() => {
                    if (this.status === WebSocketStatus.Connected) {
                        debugError(`NotifyService timeout disconnect`)
                        this.disconnectAndTryReconnect()
                    }
                }, this.timeoutReconnectTime)
                debugLog('NotifyService start timeout debounce')
            }
        }
    }

    /**
     * 关闭 ws 通道的心跳检测
     */
    private stopNetworkProbe = () => {
        this.networkProbeWorker?.terminate?.()
        this.networkProbeWorker = null

        this.timeoutDebounce?.cancel()
        this.timeoutDebounce = null
        // 清除计时器
        if (this.reconnectingTimer !== null) {
            clearTimeout(this.reconnectingTimer)
            this.reconnectingTimer = null
        }
    }

    /**
     * 上行 ping 消息
     */
    private sendPing = () => {
        if (!this.webSocketConnected()) {
            return false
        }
        debugLog('NotifyService ping')
        this.socket?.send(
            Wukong.NotifyProto.MessageProto.encode({
                messageType: Wukong.NotifyProto.MessageTypeCode.TC_Ping,
                payload: Wukong.NotifyProto.PingProto.encode({
                    timestamp: Date.now(),
                }).finish(),
            }).finish()
        )
    }

    /**
     * 开启离线状态监听
     */
    startHandleNetworkChanged = () => {
        window.addEventListener('online', this.reconnectImmediately)
        window.addEventListener('offline', this.disconnect)
    }

    /**
     * 取消离线状态监听
     */
    stopHandleNetworkChanged = () => {
        window.removeEventListener('online', this.reconnectImmediately)
        window.removeEventListener('offline', this.disconnect)
    }

    public reconnectImmediately = () => {
        if (this.status === WebSocketStatus.Connected || this.status === WebSocketStatus.Connecting) {
            return
        }
        if (this.reconnectingTimer) {
            clearTimeout(this.reconnectingTimer)
            this.reconnectTimes = 0
            this.reconnectingTimer = null
        }
        this.reconnectTimes++
        this.connect()
    }
    /**
     * 重连
     * @returns
     */
    public reconnect = () => {
        if (this.is401) {
            // 用户未登录状态下不需要重连
            return
        }
        if (this.status === WebSocketStatus.Connected || this.status === WebSocketStatus.Connecting) {
            return
        }

        if (this.reconnectingTimer !== null) {
            return
        }

        const reconnectDelayTime = RECONNECT_INTERVALS[Math.min(this.reconnectTimes, RECONNECT_INTERVALS.length - 1)]
        debugError(`NotifyService will try ${this.reconnectTimes}th reconnecting in ${reconnectDelayTime}ms`)
        this.reconnectingTimer = setTimeout(() => {
            debugError(`NotifyService start reconnect ${this.reconnectTimes}`)
            this.reconnectTimes++
            this.connect()
            this.reconnectingTimer = null
        }, reconnectDelayTime)
    }

    /**
     * 断线
     * @returns
     */
    public disconnect = () => {
        if (!this.socket) {
            return
        }
        this.status = WebSocketStatus.Disconnected
        this.socket.onclose = null
        this.socket.onopen = null
        this.socket.onmessage = null
        this.socket.onerror = null
        this.socket.close()
        this.socket = undefined

        this.sessionId = null
        this.lastDisconnectTime = Date.now()
        debugLog('NotifyService disconnected')
    }

    /**
     * 断线重连
     */
    private disconnectAndTryReconnect = () => {
        this.disconnect()
        this.reconnect()
    }

    /**
     * 销毁
     */
    destroy() {
        debugLog('NotifyService destroyed')
        // 取消所有订阅
        this.connect$.complete()
        this.businessMessage$.complete()
        this.bindMethods = {}
        // 关闭连接
        this.stopHandleNetworkChanged()
        this.stopNetworkProbe()
        this.disconnect()
    }
}
