/* eslint-disable no-restricted-imports */
import {
    ChangeSynergyStateCommand,
    ClearRepositoryCommand,
    GetUsedMemoryCommand,
    OnBatchOperation,
    OnBatchOperationAck,
    OnBridgeStatusChange,
    OnDocIdChange,
    OnDocumentCommand,
    OnSessionIdChange,
    OnUserIdChange,
    OnUserRoleChange,
    SaveRepositoryCommand,
    SendBatchOperationCommand,
    Wukong,
} from '@wukong/bridge-proto'
import { BehaviorSubject } from 'rxjs'
import { getReleaseTag } from '../../../../util/src'
import { TraceableAbortSignal } from '../../../../util/src/abort-controller/traceable-abort-controller'
import { transaction } from '../../../../util/src/abort-controller/traceable-transaction'
import { createCombineLatestEventHandler } from '../../../../util/src/combine-latest-event-handler'
import { benchmarkService } from '../../kernel/benchmark'
import { EmBridge } from '../../kernel/bridge/em-bridge'
import { WkCLog } from '../../kernel/clog/wukong/instance'
import { debugLog } from '../../kernel/debug'
import { DocID } from '../../kernel/interface/type'
import { MetricCollector, MetricName } from '../../kernel/metric-collector'
import { AfterImportFixOperations } from '../../kernel/request/document'
import { featureSwitchManager } from '../../kernel/switch/core'
import { createFileManager } from '../../main/create-file-manager/index'
import { WK } from '../../window'
import { RepositoryStatus } from '../node/node'
import { AutoSaveService } from './auto-save-v2/auto-save-service'
import { BatchOperationManager, MirrorBatchOperationManager } from './batch-operation-manager'
import { GetSchemaVersionByDocId } from './schema-version'
import { UpgradeOperations } from './upgrade-operations'
import { WebSocketBridge, WebSocketStatus } from './web-socket-bridge'
import { WsBatchOperation, WsBatchOperationAck, WsDocument } from './web-socket-message'

export enum SynergyEventType {
    DocumentLoaded,
    DocumentSynced,
}

export interface SynergyDocumentLoadEvent {
    type: SynergyEventType.DocumentLoaded
}

export interface SynergyDocumentSyncEvent {
    type: SynergyEventType.DocumentSynced
}

export type SynergyEvent = SynergyDocumentLoadEvent | SynergyDocumentSyncEvent

export class SynergyManager {
    private firstLoad = true // 是否为首次链接，false 即表示是断线重连
    private firstAck = true // 标明第一次 ack 的消息收取
    private currentSchemaVersion: number | null = null
    private documentId: DocID | null = null
    private userId: number | null = null
    private localOperations: Array<Uint8Array> = []
    private localBlobs: { [k: string]: Uint8Array } = {}
    private refNodes: { [k: string]: Uint8Array } = {}
    private hasLoadDb = false
    private currentWebSocketStatus: WebSocketStatus | null = null
    private currentUserId: number | null = null
    private currentSessionId: number | null = null
    private currentDocId: string | null = null
    private currentRole: number | null = null
    private preFetchSchemaVersion?: Promise<number>

    private readonly repositoryStatus$$ = new BehaviorSubject<RepositoryStatus>(RepositoryStatus.Unknown)
    public onRepositoryStatusChangeWithSignal = (
        signal: TraceableAbortSignal,
        handler: (status: RepositoryStatus) => void
    ) => {
        const { act } = transaction(signal)
        act('onRepositoryStatusChange', () => {
            const subscription = this.repositoryStatus$$.subscribe(handler)
            return () => subscription.unsubscribe()
        })
    }

    private batchOperationManager?: BatchOperationManager

    // 若干 on document 的最后一片消息后执行
    private afterDocumentFinalLoadedCallbacks: (() => void)[] = []

    constructor(
        private readonly signal: TraceableAbortSignal,
        private readonly webSocketBridge: WebSocketBridge,
        private readonly bridge: EmBridge,
        private readonly isMirror: boolean,
        private readonly autoSaveService: AutoSaveService = new AutoSaveService()
    ) {
        this.init(this.signal)
    }

    private init(signal: TraceableAbortSignal) {
        this.bridge.bind(ChangeSynergyStateCommand, this.onChangeSynergyState.bind(this), { signal })

        this.webSocketBridge.onSynergyMessageProto(WsDocument, this.onDocument.bind(this))
        this.webSocketBridge.onSynergyMessageProto(WsBatchOperation, this.onBatchOperation.bind(this))
        this.webSocketBridge.onSynergyMessageProto(WsBatchOperationAck, this.onBatchOperationAck.bind(this))
        this.webSocketBridge.registerTriggerLoadLocalOperations(async (userId) => {
            return await this.tryLoadLocalOperations(userId)
        })

        this.webSocketBridge.onDocumentIdChangeWithSignal(signal, (id) => {
            this.documentId = id
            if (!createFileManager.isCreatingFile()) {
                this.preFetchSchemaVersion = this.getDocumentCurrentSchemaVersion()
            }
        })

        this.webSocketBridge.onUserIdChangeWithSignal(signal, (id) => {
            this.userId = id
        })

        const combineLatestEventHandler = createCombineLatestEventHandler(
            {
                status: { init: false },
                userId: { init: false },
                sessionId: { init: false },
                docId: { init: false },
                role: { init: false },
            },
            (events: { status: WebSocketStatus; userId: number; sessionId: number; docId: string; role: number }) => {
                this.bridgeOnBridgeStatusChange([
                    events.status,
                    events.userId,
                    events.sessionId,
                    events.docId,
                    events.role,
                ])
            }
        )

        this.webSocketBridge.onStatusChangeWithSignal(signal, (status) => {
            combineLatestEventHandler.handleEvent('status', status)
        })

        this.webSocketBridge.onUserIdChangeWithSignal(signal, (userId) => {
            combineLatestEventHandler.handleEvent('userId', userId)
        })

        this.webSocketBridge.onSessionIdChangeWithSignal(signal, (sessionId) => {
            combineLatestEventHandler.handleEvent('sessionId', sessionId)
        })

        this.webSocketBridge.onDocumentIdChangeWithSignal(signal, (docId) => {
            combineLatestEventHandler.handleEvent('docId', docId)
        })

        this.webSocketBridge.onRoleChangeWithSignal(signal, (role) => {
            combineLatestEventHandler.handleEvent('role', role)
        })

        this.bridge.bind(SendBatchOperationCommand, this.bridgeSendBatchOperations.bind(this), { signal })

        this.bridge.bind(
            SaveRepositoryCommand,
            async (arg) => {
                await this.bridgeSaveRespository(arg)
            },
            { signal }
        )

        this.bridge.bind(
            ClearRepositoryCommand,
            async () => {
                await this.bridgeClearRespository()
            },
            { signal }
        )

        WK.getDBStatus = () => structuredClone(this.repositoryStatus$$.value)

        if (this.isMirror) {
            this.batchOperationManager = new MirrorBatchOperationManager(this.performBatchOperation)
        }
    }

    injectAfterDocumentFinalLoadedCallback = (args: (() => void)[]) => {
        this.afterDocumentFinalLoadedCallbacks = this.afterDocumentFinalLoadedCallbacks.concat(args)
    }

    private afterDocumentFinalLoaded() {
        for (const fn of this.afterDocumentFinalLoadedCallbacks) {
            fn()
        }
    }

    private afterDocumentFirstNotify(beginApplyTs: number) {
        benchmarkService.reportUserOnDocumentDuration(
            beginApplyTs - this.webSocketBridge.getStartConnectWsTs(),
            performance.now() - beginApplyTs,
            this.bridge.call(GetUsedMemoryCommand).sceneNodesCount
        )
        MetricCollector.pushMetricToServer(
            MetricName.SYNERGY_LOAD_DOC_DURATION,
            performance.now() - this.webSocketBridge.getStartConnectWsTs(),
            { first: this.firstLoad ? 'true' : 'false' }
        )
        MetricCollector.pushMemoryMetricOnDocumentLoaded()
        MetricCollector.flush()
        if (this.isMirror) {
            benchmarkService.benchmarkMirrorSwitchFrame()
        }
    }

    private onChangeSynergyState = ({ value }: Wukong.DocumentProto.IArg_changeSynergyState) => {
        if (!this.bridge.inHistoryMode) {
            this.webSocketBridge.updateSynergyState(value ?? Wukong.DocumentProto.SynergyState.SYNERGY_STATE_OFFLINE)
        }
    }

    bridgeSendBatchOperations(arg: Wukong.DocumentProto.IArg_sendBatchOperations) {
        // 复制一份payload数据，避免后续原始数据被修改
        this.webSocketBridge.sendPayload(
            WsBatchOperation,
            new Uint8Array(arg.payload!),
            arg.compressType!,
            arg.beforeCompressSize!
        )
    }

    async bridgeSaveRespository(arg: Wukong.DocumentProto.IArg_autoSaveRepository) {
        if (
            (!arg?.opMap || Object.keys(arg.opMap).length === 0) &&
            (!arg?.blobMap || Object.keys(arg.blobMap).length === 0)
        ) {
            return
        }
        await this.bridgeSaveRespositoryV2(arg)
        this.repositoryStatus$$.next(RepositoryStatus.NonNull)
    }

    async bridgeClearRespository() {
        WkCLog.log(`clearRespository`)
        this.localOperations = []
        this.localBlobs = {}
        this.refNodes = {}

        await this.clearHoldingOfflineOperations()
        this.repositoryStatus$$.next(RepositoryStatus.Null)
    }

    async clearHoldingOfflineOperations() {
        const startTime = performance.now()
        const firstOpenDoc = this.webSocketBridge.getJoinedSynergyTimes() == 1
        let success = true
        try {
            await this.autoSaveService.clearHoldingOfflineOperations(firstOpenDoc)
        } catch (e) {
            // 如果离线数据删除失败就先只打日志, 之后发现会产生其他问题时再特殊处理
            WkCLog.log(`Failed to clearHoldingOfflineOperations. ${e}`)
            success = false
        }
        MetricCollector.pushMetricToServer(
            MetricName.SYNERGY_CLEAR_OFFLINE_DATA_DURATION,
            performance.now() - startTime,
            {
                success: success,
                firstOpenDoc: firstOpenDoc,
            }
        )
    }

    destroy = () => {
        this.autoSaveService.destroy()

        delete WK.getDBStatus
        delete WK.docMessage
        delete WK.allMessages

        this.afterDocumentFinalLoadedCallbacks = []
    }

    /**
     * 处理服务器广播的 document 文档，可能存在分片，具体类型见proto.fragmentType
     * @param proto
     * @private
     */
    onDocument(proto: Wukong.ServerProto.SynergyMessageProto) {
        const payload = proto.payload

        if (featureSwitchManager.isEnabled('allow-debug-log')) {
            // 把接收到的文档挂载到全局变量，用来 debug 使用
            WK.docMessage = payload
            WK.allMessages = WK.allMessages ?? []
            WK.allMessages.push(payload)
        }

        const beginApplyTs = performance.now()
        const syncDuration = beginApplyTs - this.webSocketBridge.getStartConnectWsTs()

        MetricCollector.pushMetricToServer(MetricName.SYNERGY_GET_SYNC_DURATION, syncDuration)

        if (proto.finalFragment) {
            // finalFragment 的 document 是从协同服务拿到的, 获取到这个消息即说明加入协同服务成功
            this.webSocketBridge.joinedSynergy()
        }

        // 对于 ondocument，在第一次执行后会返回对应的 flag 标明是 first tick 结束
        const isFirstNotify = this.bridge.call(OnDocumentCommand, {
            payload: payload,
            docId: this.documentId,
            localOperations: this.localOperations,
            localBlobs: this.localBlobs,
            finalFragment: proto.finalFragment ?? true,
            checksum: proto.checksum,
            docHash: proto.docHash,
            index: proto.index ?? {},
            opLogs: proto.opLogs ?? [],
            compressType: proto.compressType ?? Wukong.DocumentProto.CompressType.COMPRESS_TYPE_NO_COMPRESS,
            refNodes: this.refNodes ?? {},
            onlyDiff: proto.onlyDiff,
        })

        if (isFirstNotify === undefined) {
            return
        }

        // 如果是 first tick 需要做一些 trace
        if (isFirstNotify.value) {
            this.afterDocumentFirstNotify(beginApplyTs)
        }

        MetricCollector.pushMetricToServer(MetricName.SYNERGY_APPLY_SYNC_DURATION, performance.now() - beginApplyTs, {
            first: this.firstLoad ? 'true' : 'false',
        })

        // 如果是 final fragment 则表示 document 已经完全同步完成
        if (proto.finalFragment) {
            MetricCollector.pushMetricToServer(
                MetricName.SYNERGY_SYNC_DOC_DURATION,
                performance.now() - this.webSocketBridge.getStartConnectWsTs(),
                { first: this.firstLoad ? 'true' : 'false' }
            )
            MetricCollector.flush()
            this.firstLoad = false
            this.afterDocumentFinalLoaded()
        }

        // 后续离线操作暂存在wasm
        this.localOperations = []
        this.localBlobs = {}
        this.refNodes = {}
        this.webSocketBridge.hasLoadedDocumentData = true
    }

    /**
     * 首次打开文档时, 去 IndexDB 里 load 离线数据;
     * 断线重连时, 不需要再去 IndexDB 里拿数据, 已经缓存在 localOperationsCache 中
     */
    async tryLoadLocalOperations(userId: number) {
        if (!this.documentId) {
            return
        }

        if (!this.hasLoadDb) {
            this.hasLoadDb = true
            await this.tryLoadLocalOperationsV2(userId)
            if (this.localOperations.length > 0) {
                this.repositoryStatus$$.next(RepositoryStatus.NonNull)
            } else {
                this.repositoryStatus$$.next(RepositoryStatus.Null)
            }
        }
    }

    async upgradeOfflineOperations(
        offlineSchemaVersion: number,
        targetSchemaVersion: number,
        localOperations: Array<Uint8Array>,
        localBlobs: { [k: string]: Uint8Array },
        localRefNodes: { [k: string]: Uint8Array }
    ): Promise<[Array<Uint8Array>, { [k: string]: Uint8Array }, { [k: string]: Uint8Array }]> {
        const upgradeOperationsReuqest = new UpgradeOperations(
            this.documentId!,
            localOperations!.map((op: Uint8Array) => {
                return Array.from(op)
            }),
            offlineSchemaVersion,
            targetSchemaVersion,
            localBlobs
                ? Object.keys(localBlobs).reduce((acc, i) => {
                      acc[i] = Array.from(localBlobs[i])
                      return acc
                  }, {} as { [k: string]: number[] })
                : {},
            localRefNodes
                ? Object.keys(localRefNodes).reduce((acc, i) => {
                      acc[i] = Array.from(localRefNodes[i])
                      return acc
                  }, {} as { [k: string]: number[] })
                : {}
        )

        WkCLog.log(`开始升级离线数据: ${JSON.stringify(upgradeOperationsReuqest.requestBody())}`)
        try {
            const upgradedOperations = await upgradeOperationsReuqest.start()
            WkCLog.log(`升级离线数据成功: ${JSON.stringify(upgradedOperations)}`)
            return [
                upgradedOperations.synergyOperations.map((base64Str: string) => {
                    return Buffer.from(base64Str, 'base64')
                }),
                Object.keys(upgradedOperations.blobMap || {}).reduce((acc, i) => {
                    acc[i] = Buffer.from(upgradedOperations.blobMap![i], 'base64')
                    return acc
                }, {} as { [k: string]: Uint8Array }),
                Object.keys(upgradedOperations.refNodes || {}).reduce((acc, i) => {
                    acc[i] = Buffer.from(upgradedOperations.refNodes![i], 'base64')
                    return acc
                }, {} as { [k: string]: Uint8Array }),
            ]
        } catch (e: any) {
            WkCLog.log(`升级离线数据失败. ${e}`)
            this.bridge.crash(e)
            throw e
        }
    }

    // 用于导入优化，在服务端第一次 ack 后，向上层 post 消息标明已经缓存结束
    private sendCacheAckMessage() {
        if (window.name === 'advance_render' && this.firstAck) {
            const workbenchReleaseVersion = window.parent.wukong.releaseTag as string
            const editorReleaseVersion = window.wukong.releaseTag as string
            if (workbenchReleaseVersion !== editorReleaseVersion) {
                WkCLog.log(
                    '[文件导入-工作台编辑器版本不一致] =>' +
                        '工作台版本: ' +
                        workbenchReleaseVersion +
                        ', 编辑器版本: ' +
                        editorReleaseVersion
                )
            }
            new AfterImportFixOperations(this.documentId!).start().finally(() => {
                window.parent.postMessage({
                    type: 'advance_render',
                    success: true,
                })
            })
            this.firstAck = false
        }
    }

    /**
     * 处理服务器推送的操作ack
     * @private
     * @param batchOperationAck
     */
    onBatchOperationAck(proto: Wukong.ServerProto.SynergyMessageProto) {
        // 对于服务端下方的第一条 ACK 操作，证明 firstTicket 的缓存提交已结束，此时向上传递一条结束的消息【用于导入优化】
        this.sendCacheAckMessage()
        const payload = proto.payload
        this.bridge.call(OnBatchOperationAck, {
            payload,
            serverPayload: proto.serverPayload,
            checksum: proto.checksum,
            prevChecksum: proto.prevChecksum,
            compressType: proto.compressType ?? Wukong.DocumentProto.CompressType.COMPRESS_TYPE_NO_COMPRESS,
        })
    }

    /**
     * 处理服务器推送的远程消息，这些消息来自于同一文档协同的其他用户
     *
     * @private
     */
    onBatchOperation(proto: Wukong.ServerProto.SynergyMessageProto) {
        if (this.batchOperationManager) {
            this.batchOperationManager.onBatchOperation(proto)
        } else {
            this.performBatchOperation(proto)
        }
        if (this.isMirror) {
            benchmarkService.benchmarkMirrorSwitchFrame()
        }
    }

    performBatchOperation = (proto: Wukong.ServerProto.SynergyMessageProto) => {
        const payload = proto.payload
        // 调试时 synergy log 这这里打
        // debugLog(Wukong.DocumentProto.SynergyBatchOperation.decode(payload))
        this.bridge.call(OnBatchOperation, {
            payload,
            serverPayload: proto.serverPayload,
            checksum: proto.checksum,
            prevChecksum: proto.prevChecksum,
            docHash: proto.docHash,
            compressType: proto.compressType ?? Wukong.DocumentProto.CompressType.COMPRESS_TYPE_NO_COMPRESS,
        })
    }

    private bridgeOnBridgeStatusChange(value: [WebSocketStatus, number, number, string, number]) {
        const webSocketStatus = value[0]
        const userId = value[1]
        const sid = value[2]
        const docId = value[3]
        const role = value[4]
        if (userId != this.currentUserId) {
            this.bridgeOnUserIdChange(userId)
            this.currentUserId = userId
        }
        if (role != this.currentRole) {
            this.bridgeOnUserRoleChange(role)
            this.currentRole = role
        }
        if (docId != this.currentDocId) {
            this.bridgeOnDocIdChange(docId)
            this.currentDocId = docId
        }
        // 断线后，清理预览场景中的batchoperationQueue
        if (webSocketStatus === WebSocketStatus.Disconnected) {
            this.batchOperationManager?.clearBatchOperationQueueWhenDisconnected()
        }
        // WK-26104 防止WASM状态与JS状态的不一致,
        // 所以 webSocketStatus == this.currentWebSocketStatus 时也不跳过 wasmCall
        this.bridgeOnWebsocketStatusChange(webSocketStatus)
        this.currentWebSocketStatus = webSocketStatus
        if (sid != this.currentSessionId) {
            this.bridgeOnSessionIdChange(sid)
            this.currentSessionId = sid
        }
    }

    private bridgeOnWebsocketStatusChange(value: WebSocketStatus) {
        this.bridge.call(OnBridgeStatusChange, { value })
    }

    private bridgeOnDocIdChange(value: string) {
        this.bridge.call(OnDocIdChange, { value })
    }

    private bridgeOnUserIdChange(value: number) {
        this.bridge.call(OnUserIdChange, { value })
    }

    private bridgeOnSessionIdChange(value: number) {
        this.bridge.call(OnSessionIdChange, { value })
    }

    private bridgeOnUserRoleChange(value: number) {
        this.bridge.call(OnUserRoleChange, { value })
    }

    private async bridgeSaveRespositoryV2(arg: Wukong.DocumentProto.IArg_autoSaveRepository) {
        const offlineSession = {
            docId: this.documentId!,
            schemaVersion: this.currentSchemaVersion ? this.currentSchemaVersion! : arg.schemaVersion!,
            sid: this.currentSessionId || 1,
            userId: this.userId!,
            releaseTag: getReleaseTag(),
        }
        await this.autoSaveService.saveOfflineOperationsOfSession({
            offlineSession: offlineSession,
            offlineOperations: { operations: arg.opMap!, blobs: arg.blobMap!, refNodes: arg.refNodes ?? {} },
        })
    }

    private async tryLoadLocalOperationsV2(userId: number) {
        const offlineOps = await this.autoSaveService.getAllOfflineOperations(this.documentId!, userId)
        MetricCollector.pushMetricToServer(MetricName.SYNERGY_GET_OFFLINE_SESSION_COUNT, offlineOps.length)
        const offlineOperationsCountBeforeLoadV2 = this.localOperations.length
        this.currentSchemaVersion = await (this.preFetchSchemaVersion ?? this.getDocumentCurrentSchemaVersion())
        for (const offlineData of offlineOps || []) {
            if (
                // TODO(wuhuajia): 离线数据版本大于文档当前版本时特殊处理
                offlineData.offlineSession.schemaVersion &&
                offlineData.offlineSession.schemaVersion < this.currentSchemaVersion!
            ) {
                WkCLog.log(
                    `离线数据与当前文档版本不匹配 v2. offlineSchemaVersion:${offlineData.offlineSession.schemaVersion}, currentSchemaVersion: ${this.currentSchemaVersion}`
                )

                const [upgradedOperations, upgradedBlobs, upgradedRefNodes] = await this.upgradeOfflineOperations(
                    offlineData.offlineSession.schemaVersion,
                    this.currentSchemaVersion!,
                    Object.values(offlineData.offlineOperations.operations || {}),
                    offlineData.offlineOperations.blobs,
                    offlineData.offlineOperations.refNodes
                )

                // 使用往 localOperations 后追加的方式, 而非直接覆盖. 防止把把旧的离线数据库中的数据给覆盖了
                upgradedOperations.forEach((op) => {
                    this.localOperations.push(op)
                })
                Object.entries(upgradedBlobs).forEach(([blobId, blob]) => {
                    this.localBlobs[blobId] = blob
                })
                Object.entries(upgradedRefNodes).forEach(([nodeId, refNode]) => {
                    this.refNodes[nodeId] = refNode
                })
                continue
            }

            Object.values(offlineData.offlineOperations.operations || {}).forEach((op) => {
                this.localOperations.push(op)
            })

            Object.entries(offlineData.offlineOperations.blobs || {}).forEach(([blobId, blob]) => {
                this.localBlobs[blobId] = blob
            })

            Object.entries(offlineData.offlineOperations.refNodes || {}).forEach(([nodeId, refNodesVec]) => {
                this.refNodes[nodeId] = refNodesVec
            })
        }

        if (offlineOps.length > 0 && this.localOperations.length == offlineOperationsCountBeforeLoadV2) {
            await this.clearHoldingOfflineOperations()
        }

        MetricCollector.pushMetricToServer(MetricName.SYNERGY_GET_OFFLINE_OP_COUNT, this.localOperations.length)
        MetricCollector.pushMetricToServer(
            MetricName.SYNERGY_GET_OFFLINE_BLOB_COUNT,
            Object.keys(this.localBlobs).length
        )
        MetricCollector.pushMetricToServer(
            MetricName.SYNERGY_GET_OFFLINE_REFERENCED_NODES_COUNT,
            Object.keys(this.refNodes).length
        )

        debugLog(`load offline date v2 ${JSON.stringify(this.localOperations)}`)
    }

    private async getDocumentCurrentSchemaVersion() {
        const getSchemaVersionRequest = new GetSchemaVersionByDocId(this.documentId!)
        const schemaVersionResponse = await getSchemaVersionRequest.start()
        return schemaVersionResponse.schemaVersion
    }
}
