diff --git a/src/state/messages/events/agent.ts b/src/state/messages/events/agent.ts index 42267285..f22cff9d 100644 --- a/src/state/messages/events/agent.ts +++ b/src/state/messages/events/agent.ts @@ -3,316 +3,51 @@ import EventEmitter from 'eventemitter3' import {nanoid} from 'nanoid/non-secure' import {logger} from '#/logger' +import {DEFAULT_POLL_INTERVAL} from '#/state/messages/events/const' import { MessagesEventBusDispatch, MessagesEventBusDispatchEvent, MessagesEventBusError, MessagesEventBusErrorCode, + MessagesEventBusEvents, MessagesEventBusParams, - MessagesEventBusState, MessagesEventBusStatus, } from '#/state/messages/events/types' const LOGGER_CONTEXT = 'MessagesEventBus' -const DEFAULT_POLL_INTERVAL = 60e3 - export class MessagesEventBus { private id: string private agent: BskyAgent private __tempFromUserDid: string - private emitter = new EventEmitter() + private emitter = new EventEmitter() - private status: MessagesEventBusStatus = MessagesEventBusStatus.Uninitialized + private status: MessagesEventBusStatus = MessagesEventBusStatus.Initializing private error: MessagesEventBusError | undefined private latestRev: string | undefined = undefined private pollInterval = DEFAULT_POLL_INTERVAL private requestedPollIntervals: Map = new Map() - snapshot: MessagesEventBusState | undefined - constructor(params: MessagesEventBusParams) { this.id = nanoid(3) this.agent = params.agent this.__tempFromUserDid = params.__tempFromUserDid - this.subscribe = this.subscribe.bind(this) - this.getSnapshot = this.getSnapshot.bind(this) - this.init = this.init.bind(this) - this.suspend = this.suspend.bind(this) - this.resume = this.resume.bind(this) - this.requestPollInterval = this.requestPollInterval.bind(this) - this.trail = this.trail.bind(this) - this.trailConvo = this.trailConvo.bind(this) - } - - private commit() { - this.snapshot = undefined - this.subscribers.forEach(subscriber => subscriber()) - } - - private subscribers: (() => void)[] = [] - - subscribe(subscriber: () => void) { - if (this.subscribers.length === 0) this.init() - - this.subscribers.push(subscriber) - - return () => { - this.subscribers = this.subscribers.filter(s => s !== subscriber) - if (this.subscribers.length === 0) this.suspend() - } - } - - getSnapshot(): MessagesEventBusState { - if (!this.snapshot) this.snapshot = this.generateSnapshot() - // logger.debug(`${LOGGER_CONTEXT}: snapshotted`, {}, logger.DebugContext.convo) - return this.snapshot - } - - private generateSnapshot(): MessagesEventBusState { - switch (this.status) { - case MessagesEventBusStatus.Initializing: { - return { - status: MessagesEventBusStatus.Initializing, - rev: undefined, - error: undefined, - requestPollInterval: this.requestPollInterval, - trail: this.trail, - trailConvo: this.trailConvo, - } - } - case MessagesEventBusStatus.Ready: { - return { - status: this.status, - rev: this.latestRev!, - error: undefined, - requestPollInterval: this.requestPollInterval, - trail: this.trail, - trailConvo: this.trailConvo, - } - } - case MessagesEventBusStatus.Suspended: { - return { - status: this.status, - rev: this.latestRev, - error: undefined, - requestPollInterval: this.requestPollInterval, - trail: this.trail, - trailConvo: this.trailConvo, - } - } - case MessagesEventBusStatus.Error: { - return { - status: MessagesEventBusStatus.Error, - rev: this.latestRev, - error: this.error || { - code: MessagesEventBusErrorCode.Unknown, - retry: () => { - this.init() - }, - }, - requestPollInterval: this.requestPollInterval, - trail: this.trail, - trailConvo: this.trailConvo, - } - } - default: { - return { - status: MessagesEventBusStatus.Uninitialized, - rev: undefined, - error: undefined, - requestPollInterval: this.requestPollInterval, - trail: this.trail, - trailConvo: this.trailConvo, - } - } - } - } - - dispatch(action: MessagesEventBusDispatch) { - const prevStatus = this.status - - switch (this.status) { - case MessagesEventBusStatus.Uninitialized: { - switch (action.event) { - case MessagesEventBusDispatchEvent.Init: { - this.status = MessagesEventBusStatus.Initializing - this.setup() - break - } - } - break - } - case MessagesEventBusStatus.Initializing: { - switch (action.event) { - case MessagesEventBusDispatchEvent.Ready: { - this.status = MessagesEventBusStatus.Ready - this.resetPoll() - break - } - case MessagesEventBusDispatchEvent.Background: { - this.status = MessagesEventBusStatus.Backgrounded - this.resetPoll() - break - } - case MessagesEventBusDispatchEvent.Suspend: { - this.status = MessagesEventBusStatus.Suspended - break - } - case MessagesEventBusDispatchEvent.Error: { - this.status = MessagesEventBusStatus.Error - this.error = action.payload - break - } - } - break - } - case MessagesEventBusStatus.Ready: { - switch (action.event) { - case MessagesEventBusDispatchEvent.Background: { - this.status = MessagesEventBusStatus.Backgrounded - this.resetPoll() - break - } - case MessagesEventBusDispatchEvent.Suspend: { - this.status = MessagesEventBusStatus.Suspended - this.stopPoll() - break - } - case MessagesEventBusDispatchEvent.Error: { - this.status = MessagesEventBusStatus.Error - this.error = action.payload - this.stopPoll() - break - } - } - break - } - case MessagesEventBusStatus.Backgrounded: { - switch (action.event) { - case MessagesEventBusDispatchEvent.Resume: { - this.status = MessagesEventBusStatus.Ready - this.resetPoll() - break - } - case MessagesEventBusDispatchEvent.Suspend: { - this.status = MessagesEventBusStatus.Suspended - this.stopPoll() - break - } - case MessagesEventBusDispatchEvent.Error: { - this.status = MessagesEventBusStatus.Error - this.error = action.payload - this.stopPoll() - break - } - } - break - } - case MessagesEventBusStatus.Suspended: { - switch (action.event) { - case MessagesEventBusDispatchEvent.Resume: { - this.status = MessagesEventBusStatus.Ready - this.resetPoll() - break - } - case MessagesEventBusDispatchEvent.Background: { - this.status = MessagesEventBusStatus.Backgrounded - this.resetPoll() - break - } - case MessagesEventBusDispatchEvent.Error: { - this.status = MessagesEventBusStatus.Error - this.error = action.payload - this.stopPoll() - break - } - } - break - } - case MessagesEventBusStatus.Error: { - switch (action.event) { - case MessagesEventBusDispatchEvent.Resume: - case MessagesEventBusDispatchEvent.Init: { - this.status = MessagesEventBusStatus.Initializing - this.error = undefined - this.latestRev = undefined - this.setup() - break - } - } - break - } - default: - break - } - - logger.debug( - `${LOGGER_CONTEXT}: dispatch '${action.event}'`, - { - id: this.id, - prev: prevStatus, - next: this.status, - }, - logger.DebugContext.convo, - ) - - this.commit() - } - - private async setup() { - logger.debug(`${LOGGER_CONTEXT}: setup`, {}, logger.DebugContext.convo) - - try { - await this.initializeLatestRev() - this.dispatch({event: MessagesEventBusDispatchEvent.Ready}) - } catch (e: any) { - logger.error(e, { - context: `${LOGGER_CONTEXT}: setup failed`, - }) - - this.dispatch({ - event: MessagesEventBusDispatchEvent.Error, - payload: { - exception: e, - code: MessagesEventBusErrorCode.InitFailed, - retry: () => { - this.init() - }, - }, - }) - } - } - - init() { - logger.debug(`${LOGGER_CONTEXT}: init`, {}, logger.DebugContext.convo) - this.dispatch({event: MessagesEventBusDispatchEvent.Init}) - } - - background() { - logger.debug(`${LOGGER_CONTEXT}: background`, {}, logger.DebugContext.convo) - this.dispatch({event: MessagesEventBusDispatchEvent.Background}) - } - - suspend() { - logger.debug(`${LOGGER_CONTEXT}: suspend`, {}, logger.DebugContext.convo) - this.dispatch({event: MessagesEventBusDispatchEvent.Suspend}) - } - - resume() { - logger.debug(`${LOGGER_CONTEXT}: resume`, {}, logger.DebugContext.convo) - this.dispatch({event: MessagesEventBusDispatchEvent.Resume}) + this.init() } requestPollInterval(interval: number) { const id = nanoid() this.requestedPollIntervals.set(id, interval) - this.resetPoll() + this.dispatch({ + event: MessagesEventBusDispatchEvent.UpdatePoll, + }) return () => { this.requestedPollIntervals.delete(id) - this.resetPoll() + this.dispatch({ + event: MessagesEventBusDispatchEvent.UpdatePoll, + }) } } @@ -346,30 +81,226 @@ export class MessagesEventBus { } } - private async initializeLatestRev() { + getLatestRev() { + return this.latestRev + } + + onConnect(handler: () => void) { + this.emitter.on('connect', handler) + + if ( + this.status === MessagesEventBusStatus.Ready || + this.status === MessagesEventBusStatus.Backgrounded || + this.status === MessagesEventBusStatus.Suspended + ) { + handler() + } + + return () => { + this.emitter.off('connect', handler) + } + } + + onError(handler: (payload?: MessagesEventBusError) => void) { + this.emitter.on('error', handler) + + if (this.status === MessagesEventBusStatus.Error) { + handler(this.error) + } + + return () => { + this.emitter.off('error', handler) + } + } + + background() { + logger.debug(`${LOGGER_CONTEXT}: background`, {}, logger.DebugContext.convo) + this.dispatch({event: MessagesEventBusDispatchEvent.Background}) + } + + suspend() { + logger.debug(`${LOGGER_CONTEXT}: suspend`, {}, logger.DebugContext.convo) + this.dispatch({event: MessagesEventBusDispatchEvent.Suspend}) + } + + resume() { + logger.debug(`${LOGGER_CONTEXT}: resume`, {}, logger.DebugContext.convo) + this.dispatch({event: MessagesEventBusDispatchEvent.Resume}) + } + + private dispatch(action: MessagesEventBusDispatch) { + const prevStatus = this.status + + switch (this.status) { + case MessagesEventBusStatus.Initializing: { + switch (action.event) { + case MessagesEventBusDispatchEvent.Ready: { + this.status = MessagesEventBusStatus.Ready + this.resetPoll() + this.emitter.emit('connect') + break + } + case MessagesEventBusDispatchEvent.Background: { + this.status = MessagesEventBusStatus.Backgrounded + this.resetPoll() + this.emitter.emit('connect') + break + } + case MessagesEventBusDispatchEvent.Suspend: { + this.status = MessagesEventBusStatus.Suspended + break + } + case MessagesEventBusDispatchEvent.Error: { + this.status = MessagesEventBusStatus.Error + this.error = action.payload + this.emitter.emit('error', action.payload) + break + } + } + break + } + case MessagesEventBusStatus.Ready: { + switch (action.event) { + case MessagesEventBusDispatchEvent.Background: { + this.status = MessagesEventBusStatus.Backgrounded + this.resetPoll() + break + } + case MessagesEventBusDispatchEvent.Suspend: { + this.status = MessagesEventBusStatus.Suspended + this.stopPoll() + break + } + case MessagesEventBusDispatchEvent.Error: { + this.status = MessagesEventBusStatus.Error + this.error = action.payload + this.stopPoll() + this.emitter.emit('error', action.payload) + break + } + case MessagesEventBusDispatchEvent.UpdatePoll: { + this.resetPoll() + break + } + } + break + } + case MessagesEventBusStatus.Backgrounded: { + switch (action.event) { + case MessagesEventBusDispatchEvent.Resume: { + this.status = MessagesEventBusStatus.Ready + this.resetPoll() + break + } + case MessagesEventBusDispatchEvent.Suspend: { + this.status = MessagesEventBusStatus.Suspended + this.stopPoll() + break + } + case MessagesEventBusDispatchEvent.Error: { + this.status = MessagesEventBusStatus.Error + this.error = action.payload + this.stopPoll() + this.emitter.emit('error', action.payload) + break + } + case MessagesEventBusDispatchEvent.UpdatePoll: { + this.resetPoll() + break + } + } + break + } + case MessagesEventBusStatus.Suspended: { + switch (action.event) { + case MessagesEventBusDispatchEvent.Resume: { + this.status = MessagesEventBusStatus.Ready + this.resetPoll() + break + } + case MessagesEventBusDispatchEvent.Background: { + this.status = MessagesEventBusStatus.Backgrounded + this.resetPoll() + break + } + case MessagesEventBusDispatchEvent.Error: { + this.status = MessagesEventBusStatus.Error + this.error = action.payload + this.stopPoll() + this.emitter.emit('error', action.payload) + break + } + } + break + } + case MessagesEventBusStatus.Error: { + switch (action.event) { + case MessagesEventBusDispatchEvent.Resume: { + // basically reset + this.status = MessagesEventBusStatus.Initializing + this.error = undefined + this.latestRev = undefined + this.init() + break + } + } + break + } + default: + break + } + logger.debug( - `${LOGGER_CONTEXT}: initialize latest rev`, - {}, + `${LOGGER_CONTEXT}: dispatch '${action.event}'`, + { + id: this.id, + prev: prevStatus, + next: this.status, + }, logger.DebugContext.convo, ) + } - const response = await this.agent.api.chat.bsky.convo.listConvos( - { - limit: 1, - }, - { - headers: { - Authorization: this.__tempFromUserDid, + private async init() { + logger.debug(`${LOGGER_CONTEXT}: init`, {}, logger.DebugContext.convo) + + try { + const response = await this.agent.api.chat.bsky.convo.listConvos( + { + limit: 1, }, - }, - ) + { + headers: { + Authorization: this.__tempFromUserDid, + }, + }, + ) + // throw new Error('UNCOMMENT TO TEST INIT FAILURE') - const {convos} = response.data + const {convos} = response.data - for (const convo of convos) { - if (convo.rev > (this.latestRev = this.latestRev || convo.rev)) { - this.latestRev = convo.rev + for (const convo of convos) { + if (convo.rev > (this.latestRev = this.latestRev || convo.rev)) { + this.latestRev = convo.rev + } } + + this.dispatch({event: MessagesEventBusDispatchEvent.Ready}) + } catch (e: any) { + logger.error(e, { + context: `${LOGGER_CONTEXT}: init failed`, + }) + + this.dispatch({ + event: MessagesEventBusDispatchEvent.Error, + payload: { + exception: e, + code: MessagesEventBusErrorCode.InitFailed, + retry: () => { + this.dispatch({event: MessagesEventBusDispatchEvent.Resume}) + }, + }, + }) } } @@ -430,6 +361,8 @@ export class MessagesEventBus { }, ) + // throw new Error('UNCOMMENT TO TEST POLL FAILURE') + const {logs: events} = response.data let needsEmit = false @@ -473,7 +406,7 @@ export class MessagesEventBus { exception: e, code: MessagesEventBusErrorCode.PollFailed, retry: () => { - this.init() + this.dispatch({event: MessagesEventBusDispatchEvent.Resume}) }, }, }) diff --git a/src/state/messages/events/const.ts b/src/state/messages/events/const.ts new file mode 100644 index 00000000..921557ce --- /dev/null +++ b/src/state/messages/events/const.ts @@ -0,0 +1 @@ +export const DEFAULT_POLL_INTERVAL = 20e3 diff --git a/src/state/messages/events/index.tsx b/src/state/messages/events/index.tsx index 2de6286e..08ec7750 100644 --- a/src/state/messages/events/index.tsx +++ b/src/state/messages/events/index.tsx @@ -5,13 +5,13 @@ import {BskyAgent} from '@atproto-labs/api' import {useGate} from '#/lib/statsig/statsig' import {isWeb} from '#/platform/detection' import {MessagesEventBus} from '#/state/messages/events/agent' -import {MessagesEventBusState} from '#/state/messages/events/types' import {useAgent} from '#/state/session' import {useDmServiceUrlStorage} from '#/screens/Messages/Temp/useDmServiceUrlStorage' import {IS_DEV} from '#/env' -const MessagesEventBusContext = - React.createContext(null) +const MessagesEventBusContext = React.createContext( + null, +) export function useMessagesEventBus() { const ctx = React.useContext(MessagesEventBusContext) @@ -37,12 +37,13 @@ export function Temp_MessagesEventBusProvider({ __tempFromUserDid: getAgent().session?.did!, }), ) - const service = React.useSyncExternalStore(bus.subscribe, bus.getSnapshot) - if (isWeb && IS_DEV) { - // @ts-ignore - window.messagesEventBus = service - } + React.useEffect(() => { + if (isWeb && IS_DEV) { + // @ts-ignore + window.bus = bus + } + }, [bus]) React.useEffect(() => { const handleAppStateChange = (nextAppState: string) => { @@ -61,7 +62,7 @@ export function Temp_MessagesEventBusProvider({ }, [bus]) return ( - + {children} ) diff --git a/src/state/messages/events/types.ts b/src/state/messages/events/types.ts index 6959b4f0..c6be522a 100644 --- a/src/state/messages/events/types.ts +++ b/src/state/messages/events/types.ts @@ -6,7 +6,6 @@ export type MessagesEventBusParams = { } export enum MessagesEventBusStatus { - Uninitialized = 'uninitialized', Initializing = 'initializing', Ready = 'ready', Error = 'error', @@ -15,12 +14,12 @@ export enum MessagesEventBusStatus { } export enum MessagesEventBusDispatchEvent { - Init = 'init', Ready = 'ready', Error = 'error', Background = 'background', Suspend = 'suspend', Resume = 'resume', + UpdatePoll = 'updatePoll', } export enum MessagesEventBusErrorCode { @@ -36,9 +35,6 @@ export type MessagesEventBusError = { } export type MessagesEventBusDispatch = - | { - event: MessagesEventBusDispatchEvent.Init - } | { event: MessagesEventBusDispatchEvent.Ready } @@ -55,59 +51,22 @@ export type MessagesEventBusDispatch = event: MessagesEventBusDispatchEvent.Error payload: MessagesEventBusError } + | { + event: MessagesEventBusDispatchEvent.UpdatePoll + } export type TrailHandler = ( events: ChatBskyConvoGetLog.OutputSchema['logs'], ) => void export type RequestPollIntervalHandler = (interval: number) => () => void +export type OnConnectHandler = (handler: () => void) => () => void +export type OnDisconnectHandler = ( + handler: (error?: MessagesEventBusError) => void, +) => () => void -export type MessagesEventBusState = - | { - status: MessagesEventBusStatus.Uninitialized - rev: undefined - error: undefined - requestPollInterval: RequestPollIntervalHandler - trail: (handler: TrailHandler) => () => void - trailConvo: (convoId: string, handler: TrailHandler) => () => void - } - | { - status: MessagesEventBusStatus.Initializing - rev: undefined - error: undefined - requestPollInterval: RequestPollIntervalHandler - trail: (handler: TrailHandler) => () => void - trailConvo: (convoId: string, handler: TrailHandler) => () => void - } - | { - status: MessagesEventBusStatus.Ready - rev: string - error: undefined - requestPollInterval: RequestPollIntervalHandler - trail: (handler: TrailHandler) => () => void - trailConvo: (convoId: string, handler: TrailHandler) => () => void - } - | { - status: MessagesEventBusStatus.Backgrounded - rev: string | undefined - error: undefined - requestPollInterval: RequestPollIntervalHandler - trail: (handler: TrailHandler) => () => void - trailConvo: (convoId: string, handler: TrailHandler) => () => void - } - | { - status: MessagesEventBusStatus.Suspended - rev: string | undefined - error: undefined - requestPollInterval: RequestPollIntervalHandler - trail: (handler: TrailHandler) => () => void - trailConvo: (convoId: string, handler: TrailHandler) => () => void - } - | { - status: MessagesEventBusStatus.Error - rev: string | undefined - error: MessagesEventBusError - requestPollInterval: RequestPollIntervalHandler - trail: (handler: TrailHandler) => () => void - trailConvo: (convoId: string, handler: TrailHandler) => () => void - } +export type MessagesEventBusEvents = { + events: [ChatBskyConvoGetLog.OutputSchema['logs']] + connect: undefined + error: [MessagesEventBusError] | undefined +}