/* eslint-disable import/no-deprecated */
import { Wukong } from '@wukong/bridge-proto'
import { difference, differenceWith, intersectionWith, isEqual, uniq } from 'lodash-es'
import { BehaviorSubject, NEVER, Observable, ReplaySubject, combineLatest, firstValueFrom, from, merge, of } from 'rxjs'
import {
    catchError,
    debounceTime,
    distinctUntilChanged,
    filter,
    map,
    mergeMap,
    pairwise,
    reduce,
    retryWhen,
    shareReplay,
    switchMap,
    tap,
    withLatestFrom,
} from 'rxjs/operators'
import { delay } from 'signal-timers'
import { ConcurrentScheduler, genericRetryStrategy } from '../../../../../../util/src'
import { createSwitchTask, signalDebounce } from '../../../../../../util/src/abort-controller/signal-task'
import { TraceableAbortSignal } from '../../../../../../util/src/abort-controller/traceable-abort-controller'
import { transaction } from '../../../../../../util/src/abort-controller/traceable-transaction'
import { distinctFn } from '../../../../../../util/src/distinct-fn'
import { BehaviorEventEmitter } from '../../../../../../util/src/event-emitter/behavior-event-emitter'
import { ReplayEventEmitter } from '../../../../../../util/src/event-emitter/replay-event-emitter'
import { WkCLog } from '../../../../kernel/clog/wukong/instance'
import { LibraryId } from '../../../../kernel/interface/component-style-library-id'
import type {
    LibraryContentMetaInfoVO,
    LibraryContentVO,
    LibraryIdAndNameVO,
    LibrarySubscriptionVO,
} from '../../../../kernel/interface/library'
import { MessageContentType } from '../../../../kernel/interface/notify'
import { DocID, OrgID } from '../../../../kernel/interface/type'
import { WsPropertyChangeMessage, WsRelationChangeMessage } from '../../../../kernel/notify/notify-message'
import { ConnectInfo, NotifyService } from '../../../../kernel/notify/notify-service'
import { GetDocRequest } from '../../../../kernel/request/document'
import {
    GetLibraryContentBaseInfoMap,
    GetLibraryIdAndNameByDocIdRequest,
    GetLibraryIdAndNameMapRequest,
} from '../../../../kernel/request/library'
import { GetLibrarySubscription } from '../../../../kernel/request/library-subscription'
import { OssError } from '../../../../kernel/request/oss'
import { featureSwitchManager } from '../../../../kernel/switch'
import { ServiceClass } from '../../../../kernel/util/service-class'
import { transformLibraryContentVO } from '../../../../kernel/util/transform-library-data-struct'
import { createFileManager } from '../../../../main/create-file-manager'
import {
    LibraryResourceOssClientType,
    type LibraryResourceDownloader,
} from '../../../../share/component-style-library/service/library-resource-downloader'
import { isLibraryContentMapEqual } from '../util/equal'

enum SubjectUpdateSourceType {
    Init,
    Reset,
    Fetch,
}

// 处理 notify 上下行消息生成数据源
export class LibraryNotifySyncService extends ServiceClass {
    // 当前文档所在 orgId（用于 notify 上行消息）
    private readonly currentOrgId$ = this.buildReplaySubject<OrgID>()
    private readonly currentOrgId$V2 = new ReplayEventEmitter<OrgID>(1)

    // 当前 notify 初次连接/断线重连 后通知
    private readonly notifyConnectRefresher$!: Observable<ConnectInfo>
    private readonly notifyConnectRefresher$V2 = new ReplayEventEmitter<ConnectInfo>(1)

    // 所有远端组件库内容
    public allRemoteLibraryContentMap$!: Observable<Map<LibraryId, LibraryContentVO>>
    private allRemoteLibraryContentMap$V2 = new ReplayEventEmitter<Map<LibraryId, LibraryContentVO>>(1)
    private readonly allRemoteLibraryContentMap$$ = this.buildBehaviorSubject<{
        updateSourceType: SubjectUpdateSourceType
        value: Map<LibraryId, LibraryContentVO>
    }>({ updateSourceType: SubjectUpdateSourceType.Init, value: new Map<LibraryId, LibraryContentVO>() })
    private readonly allRemoteLibraryContentMap$$V2 = new BehaviorEventEmitter<{
        updateSourceType: SubjectUpdateSourceType
        value: Map<LibraryId, LibraryContentVO>
    }>({ updateSourceType: SubjectUpdateSourceType.Init, value: new Map<LibraryId, LibraryContentVO>() })

    public async getAllRemoteLibraryContentPromise(
        check: (params: Map<string, LibraryContentVO>) => boolean = () => true
    ) {
        if (featureSwitchManager.isEnabled('library-notify-sync-service-rxjs-removal')) {
            return new Promise<Map<LibraryId, LibraryContentVO>>((resolve) => {
                this.allRemoteLibraryContentMap$V2
                    .getFirstValueWithFilter(this.signal, check)
                    .then((value) => {
                        resolve(value)
                    })
                    .catch(() => {
                        resolve(new Map<LibraryId, LibraryContentVO>())
                    })
            })
        } else {
            return firstValueFrom(this.allRemoteLibraryContentMap$.pipe(filter(check)), {
                defaultValue: new Map<LibraryId, LibraryContentVO>(),
            })
        }
    }
    public onAllRemoteLibraryContentChanged = (
        onChange: (newValue: Map<LibraryId, LibraryContentVO>) => void,
        signal: TraceableAbortSignal
    ) => {
        if (featureSwitchManager.isEnabled('library-notify-sync-service-rxjs-removal')) {
            this.allRemoteLibraryContentMap$V2.onWithSignal(signal, onChange)
        } else {
            const { act } = transaction(signal)
            act('LibraryNotifySyncService.onAllRemoteLibraryContentChanged', () => {
                const subscription = this.allRemoteLibraryContentMap$.subscribe(onChange)
                return () => {
                    subscription.unsubscribe()
                }
            })
        }
    }

    // 当前文档的远端组件库内容(不包括订阅了别的组件库的)
    public currentRemoteLibraryContent$!: Observable<LibraryContentVO | null>
    private currentRemoteLibraryContent$V2 = new ReplayEventEmitter<LibraryContentVO | null>(1)
    public onCurrentRemoteLibraryContentChanged = (
        onChange: (newValue: LibraryContentVO | null) => void,
        signal: TraceableAbortSignal
    ) => {
        if (featureSwitchManager.isEnabled('library-notify-sync-service-rxjs-removal')) {
            return this.currentRemoteLibraryContent$V2.onWithSignal(signal, onChange)
        } else {
            const { act } = transaction(signal)
            act('LibraryNotifySyncService.onAllRemoteLibraryContentChanged', () => {
                const subscription = this.currentRemoteLibraryContent$.subscribe(onChange)
                return () => {
                    subscription.unsubscribe()
                }
            })
        }
    }

    public getCurrentRemoteLibraryContentPromise = () => {
        if (featureSwitchManager.isEnabled('library-notify-sync-service-rxjs-removal')) {
            return this.currentRemoteLibraryContent$V2.getFirstValue(this.signal)
        } else {
            return firstValueFrom(this.currentRemoteLibraryContent$, {
                defaultValue: null,
            })
        }
    }

    // 所有订阅同步的 docIds
    private readonly allSyncDocumentIds$ = this.buildBehaviorSubject(new Array<DocID>())
    private readonly allSyncDocumentIds$V2 = new BehaviorEventEmitter(new Array<DocID>())

    // 订阅同步的 docIds 关联的移动目标组件库 docIds（例：当前文档引用文档 A 的组件 C1，组件 C1 被移动到文档 B 中，则需要将文档 B 的 docId 加入此列表）
    private readonly movedRelatedDocumentIds$ = this.buildBehaviorSubject(new Set<DocID>())
    private readonly movedRelatedDocumentIds$V2 = new BehaviorEventEmitter(new Set<DocID>())

    // 当前订阅引用的组件库列表
    public readonly subscribedLibraryInfoList$ = this.buildBehaviorSubject<LibrarySubscriptionVO[]>([])
    private readonly subscribedLibraryInfoList$V2 = new BehaviorEventEmitter<LibrarySubscriptionVO[]>([])
    public readonly onSubscribedLibraryInfoListChanged = (
        onChange: (newValue: LibrarySubscriptionVO[]) => void,
        signal: TraceableAbortSignal
    ) => {
        if (featureSwitchManager.isEnabled('library-notify-sync-service-rxjs-removal')) {
            this.subscribedLibraryInfoList$V2.onWithSignal(this.signal, onChange)
        } else {
            const { act } = transaction(signal)
            act('LibraryNotifySyncService.onCachedDocumentId2LibraryIdInfoMapChanged', () => {
                const subscription = this.subscribedLibraryInfoList$.subscribe(onChange)
                return () => {
                    subscription.unsubscribe()
                }
            })
        }
    }
    public getLatestSubscribedLibraryInfoList = () => {
        if (featureSwitchManager.isEnabled('library-notify-sync-service-rxjs-removal')) {
            return this.subscribedLibraryInfoList$V2.getValue()
        } else {
            return this.subscribedLibraryInfoList$.getValue()
        }
    }
    // 当前文档 wasm 侧引用同步的 docIds（包括 inUsed、替换弹窗、远端详情页等涉及到的 docIds）
    private readonly wasmNeedSyncDocumentIds$ = this.buildBehaviorSubject(new Array<DocID>())
    private readonly wasmNeedSyncDocumentIds$V2 = new BehaviorEventEmitter(new Array<DocID>())

    // 当前缓存的所有 docId-> 文档&组件库基本信息 的 map
    public readonly cachedDocumentId2LibraryIdInfoMap$ = this.buildBehaviorSubject(new Map<DocID, LibraryIdAndNameVO>())
    private readonly cachedDocumentId2LibraryIdInfoMap$V2 = new BehaviorEventEmitter(
        new Map<DocID, LibraryIdAndNameVO>()
    )
    public readonly onCachedDocumentId2LibraryIdInfoMapChanged = (
        onChange: (newValue: Map<DocID, LibraryIdAndNameVO>) => void,
        signal: TraceableAbortSignal
    ) => {
        if (featureSwitchManager.isEnabled('library-notify-sync-service-rxjs-removal')) {
            this.cachedDocumentId2LibraryIdInfoMap$V2.onWithSignal(this.signal, onChange)
        } else {
            const { act } = transaction(signal)
            act('LibraryNotifySyncService.onCachedDocumentId2LibraryIdInfoMapChanged', () => {
                const subscription = this.cachedDocumentId2LibraryIdInfoMap$.subscribe(onChange)
                return () => {
                    subscription.unsubscribe()
                }
            })
        }
    }
    public getLatestCachedDocumentId2LibraryIdInfoMap = () => {
        if (featureSwitchManager.isEnabled('library-notify-sync-service-rxjs-removal')) {
            return this.cachedDocumentId2LibraryIdInfoMap$V2.getValue()
        } else {
            return this.cachedDocumentId2LibraryIdInfoMap$.getValue()
        }
    }
    // ws 下发某个组件库内容变更
    private singleLibraryContentChange$!: Observable<LibraryId>
    private singleLibraryContentChange$V2 = new ReplayEventEmitter<LibraryId>(1)

    // ws 下发其他文档变更（关注文档名称变更）
    private otherDocNameChange$!: Observable<DocID>
    private otherDocNameChange$V2 = new ReplayEventEmitter<DocID>(1)

    // ws 下发当前文档订阅引用组件库变更
    private currentDocLibrarySubscriptionChange$!: Observable<Wukong.NotifyProto.IBusinessEntityRelationChange>
    private currentDocLibrarySubscriptionChange$V2 =
        new ReplayEventEmitter<Wukong.NotifyProto.IBusinessEntityRelationChange>(1)
    // ws 下发当前文档权限变更
    private currentDocAuthorizationChange$!: Observable<Wukong.NotifyProto.IBusinessEntityRelationChange>
    private currentDocAuthorizationChange$V2 = new ReplayEventEmitter<Wukong.NotifyProto.IBusinessEntityRelationChange>(
        1
    )
    // ws 下发当前文档所在组织任意团队/草稿箱订阅默认组件库变更
    private orgDefaultLibrarySubscriptionChange$!: Observable<Wukong.NotifyProto.IBusinessEntityRelationChange>
    private orgDefaultLibrarySubscriptionChange$V2 =
        new ReplayEventEmitter<Wukong.NotifyProto.IBusinessEntityRelationChange>(1)

    constructor(
        private readonly docId: DocID,
        private readonly libraryResourceDownloader: LibraryResourceDownloader,
        private readonly notifyService: NotifyService,
        private readonly signal: TraceableAbortSignal
    ) {
        super()
        if (featureSwitchManager.isEnabled('library-notify-sync-service-rxjs-removal')) {
            notifyService.onConnectChangeWithSignal(this.signal, (sessionId) => {
                console.info('Running notify session id', sessionId)
                this.notifyConnectRefresher$V2.next(sessionId)
            })
            this.subscribedLibraryInfoList$V2.onWithSignal(this.signal, (list) => {
                this.subscribedLibraryInfoList$.next(list)
            })

            this.cachedDocumentId2LibraryIdInfoMap$V2.onWithSignal(this.signal, (value) => {
                this.cachedDocumentId2LibraryIdInfoMap$.next(value)
            })

            const allRemoteLibraryContentMap$Subject = new ReplaySubject<Map<LibraryId, LibraryContentVO>>(1)
            this.allRemoteLibraryContentMap$ = allRemoteLibraryContentMap$Subject.asObservable()
            this.allRemoteLibraryContentMap$V2.onWithSignal(this.signal, (value) => {
                allRemoteLibraryContentMap$Subject.next(value)
            })

            const currentRemoteLibraryContent$Subject = new ReplaySubject<LibraryContentVO | null>(1)
            this.currentRemoteLibraryContent$ = currentRemoteLibraryContent$Subject.asObservable()
            this.currentRemoteLibraryContent$V2.onWithSignal(this.signal, (value) => {
                currentRemoteLibraryContent$Subject.next(value)
            })

            this.injectCreateFileCallBack(this.initCurrentOrgId$V2.bind(this))
            this.initNotifyChangeListenersV2()
            this.syncCurrentDocV2()
            this.initRemoteLibraryContentMapV2()
            this.initMovedRelatedDocIdsV2()
            this.injectCreateFileCallBack(this.initCurrentDocLibrarySubscriptionV2.bind(this))
        } else {
            this.notifyConnectRefresher$ = notifyService.connect$.pipe(
                tap((sessionId) => console.info('Running notify session id', sessionId)),
                filter((sessionId) => !!sessionId),
                shareReplay({ refCount: true, bufferSize: 1 })
            )
            this.injectCreateFileCallBack(this.initCurrentOrgId$.bind(this))
            this.initNotifyChangeListeners()
            this.syncCurrentDoc()
            this.initRemoteLibraryContentMap()
            this.initMovedRelatedDocIds()
            this.injectCreateFileCallBack(this.initCurrentDocLibrarySubscription.bind(this))
        }
    }

    private injectCreateFileCallBack(fn: () => void) {
        if (createFileManager.isCreatingFile()) {
            createFileManager.injectCreateFileCallBack(fn)
        } else {
            fn()
        }
    }

    // 获取当前 doc 所在 orgId
    private initCurrentOrgId$ = async () => {
        try {
            const orgId = (
                await firstValueFrom(
                    from(new GetDocRequest(this.docId).start()).pipe(
                        retryWhen(genericRetryStrategy({ maxRetryAttempts: 2, scalingDuration: 1000 }))
                    ),
                    { defaultValue: { orgId: '-1' } }
                )
            ).orgId
            if (orgId) {
                this.currentOrgId$.next(orgId)
            } else {
                throw new Error('[LibraryWsService] 获取当前文档 orgId 失败')
            }
        } catch (e) {
            if (this.isDestroy) {
                return
            }
            throw e
        }
    }
    private async getDocOrgIdWithRetry() {
        const maxRetryAttempts = 2
        const scalingDuration = 1000
        for (let i = 0; i < maxRetryAttempts; i++) {
            try {
                return (await new GetDocRequest(this.docId).startWithSignal(this.signal)).orgId
            } catch (e) {
                if (this.signal.aborted || i === maxRetryAttempts - 1) {
                    throw e
                } else {
                    await delay(scalingDuration * i, { signal: this.signal })
                }
            }
        }
    }
    private initCurrentOrgId$V2 = async () => {
        try {
            const orgId = await this.getDocOrgIdWithRetry()
            if (orgId) {
                this.currentOrgId$V2.next(orgId)
            } else {
                throw new Error('[LibraryWsService] 获取当前文档 orgId 失败')
            }
        } catch (e) {
            if (this.isDestroy) {
                return
            }
            throw e
        }
    }
    // 设置监听 ws 下行消息
    private initNotifyChangeListeners = () => {
        // 属性变更
        const propertyChange$ = this.notifyService.businessMessage$.pipe(
            filter((proto) => proto.businessCode == WsPropertyChangeMessage.code),
            map((proto) => WsPropertyChangeMessage.bodyType.decode(proto.payload)),
            shareReplay({ refCount: true, bufferSize: 1 })
        )

        // 关系变更
        const relationChange$ = this.notifyService.businessMessage$.pipe(
            filter((proto) => proto.businessCode == WsRelationChangeMessage.code),
            map((proto) => WsRelationChangeMessage.bodyType.decode(proto.payload)),
            shareReplay({ refCount: true, bufferSize: 1 })
        )

        this.singleLibraryContentChange$ = propertyChange$.pipe(
            withLatestFrom(this.allSyncDocumentIds$),
            filter(
                ([body]) =>
                    body.businessEntity?.entityType == Wukong.NotifyProto.EntityType.LIBRARY &&
                    !!body.businessEntity?.entityId
            ),
            map(([body, allSyncDocumentIds]) => ({
                libraryId: body.businessEntity!.entityId!,
                docId: body.businessEntity?.ancestors?.find(
                    (ancestor) => ancestor.entityType == Wukong.NotifyProto.EntityType.DOC
                )?.entityId,
                allSyncDocumentIds,
            })),
            filter(({ docId, allSyncDocumentIds }) => !!docId && allSyncDocumentIds.includes(docId)),
            map(({ libraryId }) => libraryId),
            shareReplay({ refCount: true, bufferSize: 1 })
        )

        this.otherDocNameChange$ = propertyChange$.pipe(
            withLatestFrom(this.allSyncDocumentIds$),
            filter(([body]) => !!body.changedProperties.name),
            filter(
                ([body]) =>
                    body.businessEntity?.entityType == Wukong.NotifyProto.EntityType.DOC &&
                    !!body.businessEntity?.entityId
            ),
            filter(
                ([body, allSyncDocumentIds]) =>
                    body.businessEntity?.entityId !== this.docId &&
                    allSyncDocumentIds.includes(body.businessEntity?.entityId!)
            ),
            map(([body]) => body.businessEntity?.entityId!),
            shareReplay({ refCount: true, bufferSize: 1 })
        )

        this.currentDocLibrarySubscriptionChange$ = relationChange$.pipe(
            filter((body) => body.relation?.relation == Wukong.NotifyProto.Relation.LIBRARY_SUBSCRIPTION),
            filter(
                (body) =>
                    body.relation?.one?.entityType == Wukong.NotifyProto.EntityType.DOC &&
                    body.relation.one.entityId === this.docId
            ),
            filter(
                (body) =>
                    body.relation?.another?.entityType === Wukong.NotifyProto.EntityType.LIBRARY &&
                    !!body.relation!.another.entityId
            ),
            shareReplay({ refCount: true, bufferSize: 1 })
        )

        this.orgDefaultLibrarySubscriptionChange$ = relationChange$.pipe(
            filter((body) => body.relation?.relation == Wukong.NotifyProto.Relation.DEFAULT_LIBRARY_SUBSCRIPTION),
            filter(
                (body) =>
                    body.relation?.another?.entityType === Wukong.NotifyProto.EntityType.LIBRARY &&
                    !!body.relation!.another.entityId &&
                    !!body.relation?.another?.ancestors?.find(
                        (entity) => entity.entityType === Wukong.NotifyProto.EntityType.ORG
                    )
            ),
            shareReplay({ refCount: true, bufferSize: 1 })
        )

        this.currentDocAuthorizationChange$ = relationChange$.pipe(
            filter((body) => {
                return body.changeType === Wukong.NotifyProto.RelationChangeType.UPDATE
            }),
            filter((body) => body.relation?.relation == Wukong.NotifyProto.Relation.AUTHORIZATION),
            filter(
                (body) =>
                    body.relation?.one?.entityType == Wukong.NotifyProto.EntityType.DOC &&
                    body.relation.one.entityId === this.docId
            ),
            shareReplay({ refCount: true, bufferSize: 1 })
        )
    }
    // 设置监听 ws 下行消息
    private initNotifyChangeListenersV2 = () => {
        const propertyChange$V2 = new ReplayEventEmitter<Wukong.NotifyProto.BusinessEntityPropertiesChange>(1)
        const relationChange$V2 = new ReplayEventEmitter<Wukong.NotifyProto.BusinessEntityRelationChange>(1)
        this.notifyService.onBusinessMessageChangeWithSignal(this.signal, (proto) => {
            if (proto.businessCode == WsPropertyChangeMessage.code) {
                propertyChange$V2.next(WsPropertyChangeMessage.bodyType.decode(proto.payload))
            }
            if (proto.businessCode == WsRelationChangeMessage.code) {
                relationChange$V2.next(WsRelationChangeMessage.bodyType.decode(proto.payload))
            }
        })
        // singleLibraryContentChange
        propertyChange$V2.onWithSignal(this.signal, (body) => {
            if (
                body.businessEntity?.entityType == Wukong.NotifyProto.EntityType.LIBRARY &&
                !!body.businessEntity?.entityId
            ) {
                const libraryId = body.businessEntity!.entityId!
                const docId = body.businessEntity?.ancestors?.find(
                    (ancestor) => ancestor.entityType == Wukong.NotifyProto.EntityType.DOC
                )?.entityId
                const allSyncDocumentIds = this.allSyncDocumentIds$V2.getValue()
                if (!!docId && allSyncDocumentIds.includes(docId)) {
                    this.singleLibraryContentChange$V2.next(libraryId)
                }
            }
        })
        // otherDocNameChange
        propertyChange$V2.onWithSignal(this.signal, (body) => {
            if (!body.changedProperties.name) {
                return
            }
            if (
                !(
                    body.businessEntity?.entityType == Wukong.NotifyProto.EntityType.DOC &&
                    !!body.businessEntity?.entityId
                )
            ) {
                return
            }
            if (
                !(
                    body.businessEntity?.entityId !== this.docId &&
                    this.allSyncDocumentIds$V2.getValue().includes(body.businessEntity?.entityId!)
                )
            ) {
                return
            }
            this.otherDocNameChange$V2.next(body.businessEntity?.entityId!)
        })
        // currentDocLibrarySubscriptionChange
        relationChange$V2.onWithSignal(this.signal, (body) => {
            if (body.relation?.relation != Wukong.NotifyProto.Relation.LIBRARY_SUBSCRIPTION) {
                return
            }
            if (
                !(
                    body.relation?.one?.entityType == Wukong.NotifyProto.EntityType.DOC &&
                    body.relation.one.entityId === this.docId
                )
            ) {
                return
            }
            if (
                !(
                    body.relation?.another?.entityType === Wukong.NotifyProto.EntityType.LIBRARY &&
                    !!body.relation!.another.entityId
                )
            ) {
                return
            }
            this.currentDocLibrarySubscriptionChange$V2.next(body)
        })
        // orgDefaultLibrarySubscriptionChange
        relationChange$V2.onWithSignal(this.signal, (body) => {
            if (body.relation?.relation != Wukong.NotifyProto.Relation.DEFAULT_LIBRARY_SUBSCRIPTION) {
                return
            }
            if (
                !(
                    body.relation?.another?.entityType === Wukong.NotifyProto.EntityType.LIBRARY &&
                    !!body.relation!.another.entityId &&
                    !!body.relation?.another?.ancestors?.find(
                        (entity) => entity.entityType === Wukong.NotifyProto.EntityType.ORG
                    )
                )
            ) {
                return
            }
            this.orgDefaultLibrarySubscriptionChange$V2.next(body)
        })
        // currentDocAuthorizationChange
        relationChange$V2.onWithSignal(this.signal, (body) => {
            if (body.changeType !== Wukong.NotifyProto.RelationChangeType.UPDATE) {
                return
            }
            if (body.relation?.relation != Wukong.NotifyProto.Relation.AUTHORIZATION) {
                return
            }
            if (
                !(
                    body.relation?.one?.entityType == Wukong.NotifyProto.EntityType.DOC &&
                    body.relation.one.entityId === this.docId
                )
            ) {
                return
            }
            this.currentDocAuthorizationChange$V2.next(body)
        })
    }

    // 订阅同步当前文档
    private syncCurrentDoc = () => {
        this.subscribe(combineLatest([this.currentOrgId$, this.notifyConnectRefresher$]), ([orgId]) => {
            const filters = [
                {
                    filterParameters: [
                        {
                            name: 'entityType',
                            value: Wukong.NotifyProto.EntityType.ORG - 1 + '',
                        },
                        {
                            name: 'entityId',
                            value: orgId,
                        },
                    ],
                },
            ]
            this.notifyService.sendSubscribeProto(orgId, {
                [MessageContentType.RelationChange]: {
                    filters,
                },
                [MessageContentType.PropertyChange]: {
                    filters,
                },
            })
        })

        this.subscribe(merge(this.notifyConnectRefresher$, this.currentDocAuthorizationChange$), () => {
            const allSyncDocumentIds = [...this.allSyncDocumentIds$.getValue()]
            // allSyncDocumentIds 中含有当前 docId，说明为断线重连，需要重新订阅同步所有 docIds
            if (allSyncDocumentIds.includes(this.docId)) {
                this.allSyncDocumentIds$.next([])
                this.allRemoteLibraryContentMap$$.next({
                    updateSourceType: SubjectUpdateSourceType.Reset,
                    value: new Map(),
                })
                this.addSyncDocIds(allSyncDocumentIds)
            } else {
                this.addSyncDocIds([this.docId])
            }
        })
    }

    // 订阅同步当前文档
    private syncCurrentDocV2 = () => {
        const callback1 = (orgId: OrgID) => {
            const filters = [
                {
                    filterParameters: [
                        {
                            name: 'entityType',
                            value: Wukong.NotifyProto.EntityType.ORG - 1 + '',
                        },
                        {
                            name: 'entityId',
                            value: orgId,
                        },
                    ],
                },
            ]
            this.notifyService.sendSubscribeProto(orgId, {
                [MessageContentType.RelationChange]: {
                    filters,
                },
                [MessageContentType.PropertyChange]: {
                    filters,
                },
            })
        }
        this.currentOrgId$V2.onWithSignal(this.signal, callback1)
        this.notifyConnectRefresher$V2.onWithSignal(this.signal, () => {
            this.currentOrgId$V2.getFirstValue(this.signal).then((orgId) => {
                callback1(orgId)
            })
        })

        const callback2 = () => {
            const allSyncDocumentIds = [...this.allSyncDocumentIds$V2.getValue()]
            // allSyncDocumentIds 中含有当前 docId，说明为断线重连，需要重新订阅同步所有 docIds
            if (allSyncDocumentIds.includes(this.docId)) {
                this.allSyncDocumentIds$V2.next([])
                this.allRemoteLibraryContentMap$$V2.next({
                    updateSourceType: SubjectUpdateSourceType.Reset,
                    value: new Map(),
                })
                this.addSyncDocIdsV2(allSyncDocumentIds)
            } else {
                this.addSyncDocIdsV2([this.docId])
            }
        }

        this.notifyConnectRefresher$V2.onWithSignal(this.signal, callback2)
        this.currentDocAuthorizationChange$V2.onWithSignal(this.signal, callback2)
    }

    // 添加需要订阅同步 文档权限、libraryContent 变化的 docIds
    public addSyncDocIds = (docIds: DocID[]) => {
        if (!docIds.length) {
            return
        }

        const addDocIds = this.addItemsInTargetArray$(this.allSyncDocumentIds$, docIds)
        if (!addDocIds.length) {
            return
        }

        const filters = this.getSendNotifyFilterParamByDocIds(addDocIds)

        this.subscribe(this.currentOrgId$, (orgId) => {
            this.notifyService.sendSubscribeProto(orgId, {
                [MessageContentType.RelationChange]: {
                    filters,
                },
                [MessageContentType.PropertyChange]: {
                    filters,
                },
            })
        })
    }
    // 添加需要订阅同步 文档权限、libraryContent 变化的 docIds
    public addSyncDocIdsV2 = (docIds: DocID[]) => {
        if (!docIds.length) {
            return
        }

        const addDocIds = this.addItemsInTargetArray$V2(this.allSyncDocumentIds$V2, docIds)
        if (!addDocIds.length) {
            return
        }

        const filters = this.getSendNotifyFilterParamByDocIds(addDocIds)

        this.currentOrgId$V2.onWithSignal(this.signal, (orgId) => {
            this.notifyService.sendSubscribeProto(orgId, {
                [MessageContentType.RelationChange]: {
                    filters,
                },
                [MessageContentType.PropertyChange]: {
                    filters,
                },
            })
        })
    }
    // 移除无需订阅同步 文档权限、libraryContent 变化的 docIds
    private removeSyncDocIds = (docIds: DocID[]) => {
        if (!docIds.length) {
            return
        }

        const removedDocIds = this.removeItemsInTargetArray$(
            this.allSyncDocumentIds$,
            this.filterNoNeedRemovedDocIds(docIds)
        )
        if (!removedDocIds.length) {
            return
        }

        const filters = this.getSendNotifyFilterParamByDocIds(removedDocIds)
        this.subscribe(this.currentOrgId$, (orgId) => {
            this.notifyService.sendUnSubscribeProto(orgId, {
                [MessageContentType.RelationChange]: {
                    filters,
                },
                [MessageContentType.PropertyChange]: {
                    filters,
                },
            })
        })
    }
    // 移除无需订阅同步 文档权限、libraryContent 变化的 docIds
    private removeSyncDocIdsV2 = (docIds: DocID[]) => {
        if (!docIds.length) {
            return
        }

        const removedDocIds = this.removeItemsInTargetArray$V2(
            this.allSyncDocumentIds$V2,
            this.filterNoNeedRemovedDocIdsV2(docIds)
        )
        if (!removedDocIds.length) {
            return
        }

        const filters = this.getSendNotifyFilterParamByDocIds(removedDocIds)
        this.currentOrgId$V2.onWithSignal(this.signal, (orgId) => {
            this.notifyService.sendUnSubscribeProto(orgId, {
                [MessageContentType.RelationChange]: {
                    filters,
                },
                [MessageContentType.PropertyChange]: {
                    filters,
                },
            })
        })
    }
    // 过滤不能被 removeSync 的 docIds
    private filterNoNeedRemovedDocIds = (removeDocIds: DocID[]): DocID[] => {
        const noNeedRemovedDocIds = [
            ...this.movedRelatedDocumentIds$.getValue(),
            ...this.subscribedLibraryInfoList$.getValue().map(({ libraryDocId }) => libraryDocId),
            ...this.wasmNeedSyncDocumentIds$.getValue(),
        ]
        return removeDocIds.filter((docId) => !noNeedRemovedDocIds.includes(docId))
    }
    // 过滤不能被 removeSync 的 docIds
    private filterNoNeedRemovedDocIdsV2 = (removeDocIds: DocID[]): DocID[] => {
        const noNeedRemovedDocIds = [
            ...this.movedRelatedDocumentIds$V2.getValue(),
            ...this.subscribedLibraryInfoList$V2.getValue().map(({ libraryDocId }) => libraryDocId),
            ...this.wasmNeedSyncDocumentIds$V2.getValue(),
        ]
        return removeDocIds.filter((docId) => !noNeedRemovedDocIds.includes(docId))
    }

    private getSendNotifyFilterParamByDocIds = (docIds: string[]) =>
        uniq(docIds).map((id) => ({
            filterParameters: [
                {
                    name: 'entityType',
                    value: Wukong.NotifyProto.EntityType.DOC - 1 + '',
                },
                {
                    name: 'entityId',
                    value: id,
                },
            ],
        }))

    // 处理所有 libraryContent 变更
    private initRemoteLibraryContentMap = () => {
        this.allRemoteLibraryContentMap$ = this.allRemoteLibraryContentMap$$.pipe(
            filter(({ updateSourceType }) => updateSourceType === SubjectUpdateSourceType.Fetch),
            map(({ value }) => value),
            distinctUntilChanged(isLibraryContentMapEqual),
            shareReplay({ refCount: true, bufferSize: 1 })
        )

        this.currentRemoteLibraryContent$ = this.allRemoteLibraryContentMap$.pipe(
            map((allRemoteLibraryContentMap) => {
                const optCurrentLibraryInfo = this.cachedDocumentId2LibraryIdInfoMap$.getValue().get(this.docId)
                if (!optCurrentLibraryInfo) return null
                return allRemoteLibraryContentMap.get(optCurrentLibraryInfo.id) ?? null
            }),
            shareReplay({ refCount: true, bufferSize: 1 })
        )

        this.subscribe(
            merge(
                this.otherDocNameChange$.pipe(
                    switchMap((docId) =>
                        from(new GetLibraryIdAndNameByDocIdRequest(docId, this.docId).start()).pipe(
                            catchError(() => NEVER)
                        )
                    ),
                    filter((ret) => !!ret?.id),
                    map((ret) => ret?.id as LibraryId)
                ),
                this.singleLibraryContentChange$
            ).pipe(
                mergeMap((libraryId) =>
                    from(new GetLibraryContentBaseInfoMap([libraryId], this.docId).start()).pipe(
                        map((baseInfoVOMap) => baseInfoVOMap[libraryId]),
                        switchMap((baseInfoVO) =>
                            baseInfoVO
                                ? from(
                                      this.libraryResourceDownloader
                                          .fetchFile(LibraryResourceOssClientType.Library, baseInfoVO.resourceId)
                                          .then((fetchedResponse) => fetchedResponse.json())
                                          .catch((e: OssError) => {
                                              if (e.code === 'NoSuchKey') {
                                                  WkCLog.log('RemoteLibrary', {
                                                      traceEventName: 'INFO_SCHEMA_VERSION_NOT_MATCH',
                                                      traceEventKey: 1386768,
                                                      scenario: 'RemoteLibrary',
                                                      docId: this.docId,
                                                      libraryId: baseInfoVO.library.id,
                                                      resourceId: baseInfoVO.resourceId,
                                                      requestId: e.requestId,
                                                  })
                                              }
                                          })
                                  ).pipe(
                                      map((metaInfo: LibraryContentMetaInfoVO | null) =>
                                          metaInfo ? transformLibraryContentVO(baseInfoVO, metaInfo) : null
                                      )
                                  )
                                : of(null)
                        ),
                        map(
                            (libraryContent) =>
                                [libraryId, libraryContent ?? undefined] as [LibraryId, LibraryContentVO | undefined]
                        ),
                        catchError(() => NEVER)
                    )
                ),
                withLatestFrom(this.allRemoteLibraryContentMap$$)
            ),
            ([[libraryId, optVO], { value }]) => {
                const newVoMap = new Map(value)
                if (optVO) {
                    newVoMap.set(libraryId, optVO)
                    this.updateCachedDocumentId2LibraryInfoMap$({
                        [optVO.library.document!.id]: {
                            id: libraryId,
                            documentName: optVO.library.document!.name,
                            shared: !!optVO.library.shared,
                            docId: optVO.library.document!.id,
                        },
                    })
                } else {
                    newVoMap.delete(libraryId)
                }
                this.allRemoteLibraryContentMap$$.next({
                    updateSourceType: SubjectUpdateSourceType.Fetch,
                    value: newVoMap,
                })
            }
        )

        this.subscribe(
            this.allSyncDocumentIds$.pipe(
                debounceTime(300),
                // allSyncDocumentIds 不可能为空，至少会含有 currentDocId（加 filter 空数组，为了避免断线重连时 allSyncDocumentIds 为空浪费算力）
                filter((allSyncDocumentIds) => !!allSyncDocumentIds.length),
                switchMap((allSyncDocumentIds) => {
                    const unCachedDocumentIds = allSyncDocumentIds.filter(
                        (id) => !this.cachedDocumentId2LibraryIdInfoMap$.getValue().has(id)
                    )

                    const combineLibraryIds = (documentId2LibraryInfoMap: Map<DocID, LibraryIdAndNameVO>) =>
                        allSyncDocumentIds.reduce((ret, docId) => {
                            const libraryInfo = documentId2LibraryInfoMap.get(docId)
                            if (libraryInfo) {
                                return [...ret, libraryInfo.id]
                            }
                            return ret
                        }, [] as LibraryId[])

                    if (unCachedDocumentIds.length) {
                        return from(new GetLibraryIdAndNameMapRequest(unCachedDocumentIds, this.docId).start()).pipe(
                            map((voMap) => {
                                this.updateCachedDocumentId2LibraryInfoMap$(voMap)
                                return combineLibraryIds(this.cachedDocumentId2LibraryIdInfoMap$.getValue())
                            }),
                            catchError(() => NEVER)
                        )
                    }

                    return of(combineLibraryIds(this.cachedDocumentId2LibraryIdInfoMap$.getValue()))
                }),
                withLatestFrom(this.allRemoteLibraryContentMap$$),
                switchMap(([allLibraryIds, { value: voMap }]) => {
                    const unFetchedLibraryIds = allLibraryIds.filter((libraryId) => !voMap.has(libraryId))

                    const combineLibraryContentMap = (remoteLibraryContentMap: Map<LibraryId, LibraryContentVO>) =>
                        allLibraryIds.reduce((ret, libraryId) => {
                            const vo = remoteLibraryContentMap.get(libraryId)
                            if (vo) {
                                ret.set(libraryId, vo)
                            }
                            return ret
                        }, new Map<LibraryId, LibraryContentVO>())

                    if (unFetchedLibraryIds.length) {
                        return from(new GetLibraryContentBaseInfoMap(unFetchedLibraryIds, this.docId).start())
                            .pipe(
                                switchMap((newBaseInfoVOMap) =>
                                    from(Object.values(newBaseInfoVOMap)).pipe(
                                        mergeMap(
                                            (baseInfoVO) =>
                                                from(
                                                    this.libraryResourceDownloader
                                                        .fetchFile(
                                                            LibraryResourceOssClientType.Library,
                                                            baseInfoVO.resourceId
                                                        )
                                                        .then((fetchedResponse) => fetchedResponse.json())
                                                        .catch((e: OssError) => {
                                                            if (e.code === 'NoSuchKey') {
                                                                WkCLog.log('RemoteLibrary', {
                                                                    traceEventName: 'INFO_SCHEMA_VERSION_NOT_MATCH',
                                                                    traceEventKey: 1386768,
                                                                    scenario: 'RemoteLibrary',
                                                                    libraryId: baseInfoVO.library.id,
                                                                    resourceId: baseInfoVO.resourceId,
                                                                    requestId: e.requestId,
                                                                })
                                                            }
                                                        })
                                                ).pipe(
                                                    map((metaInfo: LibraryContentMetaInfoVO | null) =>
                                                        metaInfo
                                                            ? transformLibraryContentVO(baseInfoVO, metaInfo)
                                                            : null
                                                    )
                                                ),
                                            20
                                        ),
                                        reduce(
                                            (res, cur) => (cur ? { ...res, [cur.library.id]: cur } : res),
                                            {} as Record<LibraryId, LibraryContentVO>
                                        )
                                    )
                                )
                            )
                            .pipe(
                                map((newVoMap) =>
                                    combineLibraryContentMap(new Map([...voMap.entries(), ...Object.entries(newVoMap)]))
                                ),
                                catchError(() => NEVER)
                            )
                    }

                    return of(combineLibraryContentMap(voMap))
                })
            ),
            (voMap) => {
                this.allRemoteLibraryContentMap$$.next({
                    updateSourceType: SubjectUpdateSourceType.Fetch,
                    value: voMap,
                })
            }
        )
    }

    private fetchAllLibraryIds = async (signal: TraceableAbortSignal, allSyncDocumentIds: string[]) => {
        return new Promise<LibraryId[]>((resolve) => {
            const unCachedDocumentIds = allSyncDocumentIds.filter(
                (id) => !this.cachedDocumentId2LibraryIdInfoMap$V2.getValue().has(id)
            )

            const combineLibraryIds = (documentId2LibraryInfoMap: Map<DocID, LibraryIdAndNameVO>) =>
                allSyncDocumentIds.reduce((ret, docId) => {
                    const libraryInfo = documentId2LibraryInfoMap.get(docId)
                    if (libraryInfo) {
                        return [...ret, libraryInfo.id]
                    }
                    return ret
                }, [] as LibraryId[])

            if (!unCachedDocumentIds.length) {
                resolve(combineLibraryIds(this.cachedDocumentId2LibraryIdInfoMap$V2.getValue()))
                return
            }
            new GetLibraryIdAndNameMapRequest(unCachedDocumentIds, this.docId)
                .startWithSignal(signal)
                .then((voMap) => {
                    this.updateCachedDocumentId2LibraryInfoMap$V2(voMap)
                    resolve(combineLibraryIds(this.cachedDocumentId2LibraryIdInfoMap$V2.getValue()))
                })
                .catch(() => {})
        })
    }

    private fetchAllRemoteLibraryContentMap = async (
        signal: TraceableAbortSignal,
        allLibraryIds: LibraryId[],
        voMap: Map<LibraryId, LibraryContentVO>
    ) => {
        return new Promise<Map<LibraryId, LibraryContentVO>>((resolve) => {
            const unFetchedLibraryIds = allLibraryIds.filter((libraryId) => !voMap.has(libraryId))

            const combineLibraryContentMap = (remoteLibraryContentMap: Map<LibraryId, LibraryContentVO>) =>
                allLibraryIds.reduce((ret, libraryId) => {
                    const vo = remoteLibraryContentMap.get(libraryId)
                    if (vo) {
                        ret.set(libraryId, vo)
                    }
                    return ret
                }, new Map<LibraryId, LibraryContentVO>())

            if (!unFetchedLibraryIds.length) {
                resolve(combineLibraryContentMap(voMap))
                return
            }
            new GetLibraryContentBaseInfoMap(unFetchedLibraryIds, this.docId)
                .startWithSignal(signal)
                .then((baseInfoVOMap) => {
                    const concurrentScheduler = ConcurrentScheduler({ delayTime: 0, limitCount: 20 })
                    const allTaks = Object.values(baseInfoVOMap).map((baseInfoVO) => {
                        return concurrentScheduler.add(async () =>
                            this.libraryResourceDownloader
                                .fetchFile(LibraryResourceOssClientType.Library, baseInfoVO.resourceId)
                                .then((fetchedResponse) => fetchedResponse.json())
                                .catch((e: OssError) => {
                                    if (e.code === 'NoSuchKey') {
                                        WkCLog.log('RemoteLibrary', {
                                            traceEventName: 'INFO_SCHEMA_VERSION_NOT_MATCH',
                                            traceEventKey: 1386768,
                                            scenario: 'RemoteLibrary',
                                            libraryId: baseInfoVO.library.id,
                                            resourceId: baseInfoVO.resourceId,
                                            requestId: e.requestId,
                                        })
                                    }
                                })
                                .then((metaInfo: LibraryContentMetaInfoVO | null) =>
                                    metaInfo ? transformLibraryContentVO(baseInfoVO, metaInfo) : null
                                )
                        )
                    })
                    Promise.all(allTaks).then((ret) => {
                        const newVoMap = ret.reduce(
                            (res, cur) => (cur ? { ...res, [cur.library.id]: cur } : res),
                            {} as Record<LibraryId, LibraryContentVO>
                        )
                        resolve(combineLibraryContentMap(new Map([...voMap.entries(), ...Object.entries(newVoMap)])))
                    })
                })
                .catch(() => {})
        })
    }

    private initRemoteLibraryContentMapV2 = () => {
        const updateAllRemoteLibraryContentMap = distinctFn(
            (value) => {
                this.allRemoteLibraryContentMap$V2.next(value)
            },
            (prev: Map<string, LibraryContentVO>, curr: Map<string, LibraryContentVO>) =>
                isLibraryContentMapEqual(prev, curr)
        )

        this.allRemoteLibraryContentMap$$V2.onWithSignal(this.signal, ({ updateSourceType, value }) => {
            if (updateSourceType !== SubjectUpdateSourceType.Fetch) {
                return
            }
            updateAllRemoteLibraryContentMap(value)
        })

        this.allRemoteLibraryContentMap$V2.onWithSignal(this.signal, (allRemoteLibraryContentMap) => {
            const convert = () => {
                const optCurrentLibraryInfo = this.cachedDocumentId2LibraryIdInfoMap$V2.getValue().get(this.docId)
                if (!optCurrentLibraryInfo) return null
                return allRemoteLibraryContentMap.get(optCurrentLibraryInfo.id) ?? null
            }
            const value = convert()
            this.currentRemoteLibraryContent$V2.next(value)
        })

        const getLibraryIdAndNameByDocIdRequest = createSwitchTask(
            this.signal,
            async (
                signal: TraceableAbortSignal,
                docId: DocID,
                callback: (ret: LibraryIdAndNameVO | undefined) => void
            ) => {
                new GetLibraryIdAndNameByDocIdRequest(docId, this.docId).startWithSignal(signal).then((ret) => {
                    callback(ret)
                })
            }
        )
        this.otherDocNameChange$V2.onWithSignal(this.signal, (docId) => {
            getLibraryIdAndNameByDocIdRequest(docId, (ret) => {
                if (!ret?.id) {
                    return
                }
                uppdateLibraryContentBaseInfoMap(ret.id as LibraryId)
            })
        })
        this.singleLibraryContentChange$V2.onWithSignal(this.signal, (libraryId) => {
            uppdateLibraryContentBaseInfoMap(libraryId)
        })

        const fetchLibraryContentBaseInfoMap = async (libraryId: string) => {
            return new Promise<[LibraryId, LibraryContentVO | undefined]>((resolve) => {
                new GetLibraryContentBaseInfoMap([libraryId], this.docId)
                    .startWithSignal(this.signal)
                    .then((baseInfoVOMap) => {
                        return baseInfoVOMap[libraryId]
                    })
                    .then((baseInfoVO) => {
                        if (!baseInfoVO) {
                            resolve([libraryId, undefined])
                            return
                        }
                        this.libraryResourceDownloader
                            .fetchFile(LibraryResourceOssClientType.Library, baseInfoVO.resourceId)
                            .then((fetchedResponse) => fetchedResponse.json())
                            .catch((e: OssError) => {
                                if (e.code === 'NoSuchKey') {
                                    WkCLog.log('RemoteLibrary', {
                                        traceEventName: 'INFO_SCHEMA_VERSION_NOT_MATCH',
                                        traceEventKey: 1386768,
                                        scenario: 'RemoteLibrary',
                                        docId: this.docId,
                                        libraryId: baseInfoVO.library.id,
                                        resourceId: baseInfoVO.resourceId,
                                        requestId: e.requestId,
                                    })
                                }
                            })
                            .then((metaInfo) => (metaInfo ? transformLibraryContentVO(baseInfoVO, metaInfo) : null))
                            .then((libraryContent) => {
                                resolve([libraryId, libraryContent ?? undefined])
                            })
                    })
                    .catch(() => {})
            })
        }

        const buildNewVoMapAndUpdate = (
            libraryId: LibraryId,
            optVO: LibraryContentVO | undefined,
            value: Map<LibraryId, LibraryContentVO>
        ) => {
            const newVoMap = new Map(value)
            if (optVO) {
                newVoMap.set(libraryId, optVO)
                this.updateCachedDocumentId2LibraryInfoMap$V2({
                    [optVO.library.document!.id]: {
                        id: libraryId,
                        documentName: optVO.library.document!.name,
                        shared: !!optVO.library.shared,
                        docId: optVO.library.document!.id,
                    },
                })
            } else {
                newVoMap.delete(libraryId)
            }
            this.allRemoteLibraryContentMap$$V2.next({
                updateSourceType: SubjectUpdateSourceType.Fetch,
                value: newVoMap,
            })
        }

        const uppdateLibraryContentBaseInfoMap = (currentLibraryId: LibraryId) => {
            fetchLibraryContentBaseInfoMap(currentLibraryId).then(([libraryId, optVO]) => {
                buildNewVoMapAndUpdate(libraryId, optVO, this.allRemoteLibraryContentMap$$V2.getValue().value)
            })
        }

        const fetchLatestAllRemoteLibraryContentMap = createSwitchTask(
            this.signal,
            async (
                signal: TraceableAbortSignal,
                allLibraryIds: LibraryId[],
                voMap: Map<LibraryId, LibraryContentVO>
            ) => {
                await this.fetchAllRemoteLibraryContentMap(signal, allLibraryIds, voMap).then((newVoMap) => {
                    this.allRemoteLibraryContentMap$$V2.next({
                        updateSourceType: SubjectUpdateSourceType.Fetch,
                        value: newVoMap,
                    })
                })
            }
        )
        const fetchLatestAllLibraryIds = createSwitchTask(
            this.signal,
            async (signal: TraceableAbortSignal, allSyncDocumentIds: string[]) => {
                await this.fetchAllLibraryIds(signal, allSyncDocumentIds).then((allLibraryIds) => {
                    fetchLatestAllRemoteLibraryContentMap(
                        allLibraryIds,
                        this.allRemoteLibraryContentMap$$V2.getValue().value
                    )
                })
            }
        )

        const onAllSyncDocumentIds$V2Changed = signalDebounce(
            this.signal,
            (allSyncDocumentIds: string[]) => {
                if (!allSyncDocumentIds.length) {
                    return
                }
                fetchLatestAllLibraryIds(allSyncDocumentIds)
            },
            300
        )
        this.allSyncDocumentIds$V2.onWithSignal(this.signal, (allSyncDocumentIds) => {
            onAllSyncDocumentIds$V2Changed(allSyncDocumentIds)
        })
    }
    // sync 的组件库中有移动操作时，需要同步移动相关的 docIds
    private initMovedRelatedDocIds = () => {
        // sync 移动相关的 docIds
        this.subscribe(this.allRemoteLibraryContentMap$, (allRemoteLibraryContentMap) => {
            const unSyncDocIds: DocID[] = []
            allRemoteLibraryContentMap.forEach((vo) => {
                const libraryInfoMap = vo.libraryMovedInfos.libraryId2LibraryInfo
                this.updateCachedDocumentId2LibraryInfoMap$(libraryInfoMap)
                Object.values(libraryInfoMap).forEach((libraryInfo) => {
                    if (!allRemoteLibraryContentMap.has(libraryInfo.id)) {
                        unSyncDocIds.push(libraryInfo.docId)
                    }
                })
            })
            this.addSyncDocIds(unSyncDocIds)
        })

        // 收集移动相关的 docIds
        this.subscribe(this.allRemoteLibraryContentMap$, (allRemoteLibraryContentMap) => {
            this.movedRelatedDocumentIds$.next(
                new Set(
                    [...allRemoteLibraryContentMap.values()]
                        .map((vo) =>
                            Object.values(vo.libraryMovedInfos.libraryId2LibraryInfo).map(
                                (libraryInfo) => libraryInfo.docId
                            )
                        )
                        .flat()
                )
            )
        })
    }
    // sync 的组件库中有移动操作时，需要同步移动相关的 docIds
    private initMovedRelatedDocIdsV2 = () => {
        // sync 移动相关的 docIds
        this.allRemoteLibraryContentMap$V2.onWithSignal(this.signal, (allRemoteLibraryContentMap) => {
            const unSyncDocIds: DocID[] = []
            allRemoteLibraryContentMap.forEach((vo) => {
                const libraryInfoMap = vo.libraryMovedInfos.libraryId2LibraryInfo
                this.updateCachedDocumentId2LibraryInfoMap$V2(libraryInfoMap)
                Object.values(libraryInfoMap).forEach((libraryInfo) => {
                    if (!allRemoteLibraryContentMap.has(libraryInfo.id)) {
                        unSyncDocIds.push(libraryInfo.docId)
                    }
                })
            })
            this.addSyncDocIdsV2(unSyncDocIds)
        })

        // 收集移动相关的 docIds
        this.allRemoteLibraryContentMap$V2.onWithSignal(this.signal, (allRemoteLibraryContentMap) => {
            this.movedRelatedDocumentIds$V2.next(
                new Set(
                    [...allRemoteLibraryContentMap.values()]
                        .map((vo) =>
                            Object.values(vo.libraryMovedInfos.libraryId2LibraryInfo).map(
                                (libraryInfo) => libraryInfo.docId
                            )
                        )
                        .flat()
                )
            )
        })
    }

    private updateCachedDocumentId2LibraryInfoMap$ = (
        newMapValue: Map<DocID, LibraryIdAndNameVO> | Record<DocID, LibraryIdAndNameVO>
    ) => {
        this.cachedDocumentId2LibraryIdInfoMap$.next(
            new Map([...this.cachedDocumentId2LibraryIdInfoMap$.getValue().entries(), ...Object.entries(newMapValue)])
        )
    }
    private updateCachedDocumentId2LibraryInfoMap$V2 = (
        newMapValue: Map<DocID, LibraryIdAndNameVO> | Record<DocID, LibraryIdAndNameVO>
    ) => {
        this.cachedDocumentId2LibraryIdInfoMap$V2.next(
            new Map([...this.cachedDocumentId2LibraryIdInfoMap$V2.getValue().entries(), ...Object.entries(newMapValue)])
        )
    }

    // 处理当前文档打开引用开关的组件库列表变化
    private initCurrentDocLibrarySubscription = () => {
        this.subscribe(
            merge(
                this.notifyConnectRefresher$,
                this.currentDocAuthorizationChange$,
                this.orgDefaultLibrarySubscriptionChange$
            ).pipe(switchMap(() => from(new GetLibrarySubscription(this.docId).start()).pipe(catchError(() => NEVER)))),
            (list) => {
                this.subscribedLibraryInfoList$.next(list)
            }
        )

        this.subscribe(this.currentDocLibrarySubscriptionChange$, (body) => {
            const libraryDocId = body.relation?.another?.ancestors?.find(
                (ancestor) => ancestor.entityType === Wukong.NotifyProto.EntityType.DOC && !!ancestor.entityId
            )?.entityId

            if (!libraryDocId) {
                return
            }

            const item: LibrarySubscriptionVO = {
                docId: this.docId,
                libraryId: body.relation?.another?.entityId!,
                libraryDocId,
            }

            if (body.changeType == Wukong.NotifyProto.RelationChangeType.ADD) {
                this.addItemsInTargetArray$(this.subscribedLibraryInfoList$, [item])
            } else if (body.changeType == Wukong.NotifyProto.RelationChangeType.REMOVE) {
                this.removeItemsInTargetArray$(this.subscribedLibraryInfoList$, [item])
            }
        })

        // 添加同步订阅的组件库的文档
        this.subscribe(this.subscribedLibraryInfoList$, (subscribedLibraryInfoList) => {
            this.addSyncDocIds(subscribedLibraryInfoList.map((vo) => vo.libraryDocId))
        })

        // 取消同步不再订阅的组件库的文档
        this.subscribe(this.subscribedLibraryInfoList$.pipe(pairwise()), ([prev, current]) => {
            const prevSet = new Set(prev.map((vo) => vo.libraryDocId))
            const currentSet = new Set(current.map((vo) => vo.libraryDocId))
            const removeIds = difference([...prevSet], [...currentSet])
            if (removeIds.length) {
                this.removeSyncDocIds(removeIds)
            }
        })
    }
    // 处理当前文档打开引用开关的组件库列表变化
    private initCurrentDocLibrarySubscriptionV2 = () => {
        const updateSubscribedLibraryInfoList = createSwitchTask(this.signal, async () => {
            new GetLibrarySubscription(this.docId)
                .startWithSignal(this.signal)
                .then((list) => {
                    this.subscribedLibraryInfoList$V2.next(list)
                })
                .catch(() => {})
        })
        this.notifyConnectRefresher$V2.onWithSignal(this.signal, updateSubscribedLibraryInfoList)
        this.currentDocAuthorizationChange$V2.onWithSignal(this.signal, updateSubscribedLibraryInfoList)
        this.orgDefaultLibrarySubscriptionChange$V2.onWithSignal(this.signal, updateSubscribedLibraryInfoList)

        this.currentDocLibrarySubscriptionChange$V2.onWithSignal(
            this.signal,
            (body: Wukong.NotifyProto.IBusinessEntityRelationChange) => {
                const libraryDocId = body.relation?.another?.ancestors?.find(
                    (ancestor) => ancestor.entityType === Wukong.NotifyProto.EntityType.DOC && !!ancestor.entityId
                )?.entityId

                if (!libraryDocId) {
                    return
                }

                const item: LibrarySubscriptionVO = {
                    docId: this.docId,
                    libraryId: body.relation?.another?.entityId!,
                    libraryDocId,
                }

                if (body.changeType == Wukong.NotifyProto.RelationChangeType.ADD) {
                    this.addItemsInTargetArray$V2(this.subscribedLibraryInfoList$V2, [item])
                } else if (body.changeType == Wukong.NotifyProto.RelationChangeType.REMOVE) {
                    this.removeItemsInTargetArray$V2(this.subscribedLibraryInfoList$V2, [item])
                }
            }
        )
        // 添加同步订阅的组件库的文档
        this.subscribedLibraryInfoList$V2.onWithSignal(this.signal, (subscribedLibraryInfoList) => {
            this.addSyncDocIdsV2(subscribedLibraryInfoList.map((vo) => vo.libraryDocId))
        })

        // 取消同步不再订阅的组件库的文档
        let prev: LibrarySubscriptionVO[] = []
        this.subscribedLibraryInfoList$V2.onWithSignal(this.signal, (current) => {
            const prevSet = new Set(prev.map((vo) => vo.libraryDocId))
            const currentSet = new Set(current.map((vo) => vo.libraryDocId))
            const removeIds = difference([...prevSet], [...currentSet])
            if (removeIds.length) {
                this.removeSyncDocIdsV2(removeIds)
            }
            prev = current
        })
    }

    // 添加一些项到指定的 BehaviorSubject 对象中，并返回添加的项
    private addItemsInTargetArray$ = <T>(targetArray$: BehaviorSubject<T[]>, addItems: T[]): T[] => {
        if (!addItems.length) {
            return []
        }

        const currentTargetArrayValue = [...targetArray$.getValue()]
        const changedItems = differenceWith(addItems, currentTargetArrayValue, isEqual)

        if (changedItems.length) {
            targetArray$.next([...currentTargetArrayValue, ...changedItems])
        }

        return changedItems
    }
    // 添加一些项到指定的 BehaviorEventEmitter 对象中，并返回添加的项
    private addItemsInTargetArray$V2 = <T>(targetArray$: BehaviorEventEmitter<T[]>, addItems: T[]): T[] => {
        if (!addItems.length) {
            return []
        }

        const currentTargetArrayValue = [...targetArray$.getValue()]
        const changedItems = differenceWith(addItems, currentTargetArrayValue, isEqual)

        if (changedItems.length) {
            targetArray$.next([...currentTargetArrayValue, ...changedItems])
        }

        return changedItems
    }
    // 从指定的 BehaviorSubject 对象中删除一些项，并返回被移除的项
    private removeItemsInTargetArray$ = <T>(targetArray$: BehaviorSubject<T[]>, removeItems: T[]): T[] => {
        if (!removeItems.length) {
            return []
        }

        const currentTargetArrayValue = [...targetArray$.getValue()]
        const changedItems = intersectionWith(removeItems, currentTargetArrayValue, isEqual)
        if (changedItems.length) {
            targetArray$.next(currentTargetArrayValue.filter((item) => !changedItems.find((vo) => isEqual(vo, item))))
        }

        return changedItems
    }
    // 从指定的 BehaviorEventEmitter 对象中删除一些项，并返回被移除的项
    private removeItemsInTargetArray$V2 = <T>(targetArray$: BehaviorEventEmitter<T[]>, removeItems: T[]): T[] => {
        if (!removeItems.length) {
            return []
        }

        const currentTargetArrayValue = [...targetArray$.getValue()]
        const changedItems = intersectionWith(removeItems, currentTargetArrayValue, isEqual)
        if (changedItems.length) {
            targetArray$.next(currentTargetArrayValue.filter((item) => !changedItems.find((vo) => isEqual(vo, item))))
        }

        return changedItems
    }
    // 维护当前文档中使用组件/样式所在的母版文档 id
    public handleWasmNeedSyncDocumentIds = (addDocIds: DocID[], removeDocIds: DocID[]) => {
        if (featureSwitchManager.isEnabled('library-notify-sync-service-rxjs-removal')) {
            this.addItemsInTargetArray$V2(this.wasmNeedSyncDocumentIds$V2, addDocIds)
            this.removeItemsInTargetArray$V2(this.wasmNeedSyncDocumentIds$V2, removeDocIds)
            this.removeSyncDocIdsV2(removeDocIds)
            this.addSyncDocIdsV2(addDocIds)
        } else {
            this.addItemsInTargetArray$(this.wasmNeedSyncDocumentIds$, addDocIds)
            this.removeItemsInTargetArray$(this.wasmNeedSyncDocumentIds$, removeDocIds)
            this.removeSyncDocIds(removeDocIds)
            this.addSyncDocIds(addDocIds)
        }
    }
}
