From 87cb4c105e786b73b8994313e8d46fa86aba96c4 Mon Sep 17 00:00:00 2001 From: Eric Bailey Date: Tue, 7 May 2024 17:54:34 -0500 Subject: [PATCH] =?UTF-8?q?[=F0=9F=90=B4]=20Global=20event=20mgmt=20(#3897?= =?UTF-8?q?)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add global event bus for messages logs * Add rev to state * Better handle error * Clean up polling, add backgrounding * Add trailConvo method * Extend polling until we're ready for this --- src/App.native.tsx | 41 +-- src/App.web.tsx | 42 ++- src/state/messages/events/agent.ts | 466 ++++++++++++++++++++++++++++ src/state/messages/events/index.tsx | 67 ++++ src/state/messages/events/types.ts | 111 +++++++ 5 files changed, 686 insertions(+), 41 deletions(-) create mode 100644 src/state/messages/events/agent.ts create mode 100644 src/state/messages/events/index.tsx create mode 100644 src/state/messages/events/types.ts diff --git a/src/App.native.tsx b/src/App.native.tsx index 9fa82e9c..08ef35bc 100644 --- a/src/App.native.tsx +++ b/src/App.native.tsx @@ -16,6 +16,7 @@ import {useQueryClient} from '@tanstack/react-query' import {Provider as StatsigProvider} from '#/lib/statsig/statsig' import {logger} from '#/logger' +import {MessagesEventBusProvider} from '#/state/messages/events' import {init as initPersistedState} from '#/state/persisted' import {Provider as LabelDefsProvider} from '#/state/preferences/label-defs' import {Provider as ModerationOptsProvider} from '#/state/preferences/moderation-opts' @@ -95,25 +96,27 @@ function InnerApp() { // Resets the entire tree below when it changes: key={currentAccount?.did}> - - - {/* LabelDefsProvider MUST come before ModerationOptsProvider */} - - - - - - - - - - - - - - - - + + + + {/* LabelDefsProvider MUST come before ModerationOptsProvider */} + + + + + + + + + + + + + + + + + diff --git a/src/App.web.tsx b/src/App.web.tsx index 9c2b34a7..9dfbfbe5 100644 --- a/src/App.web.tsx +++ b/src/App.web.tsx @@ -9,6 +9,7 @@ import {useLingui} from '@lingui/react' import {Provider as StatsigProvider} from '#/lib/statsig/statsig' import {logger} from '#/logger' +import {MessagesEventBusProvider} from '#/state/messages/events' import {init as initPersistedState} from '#/state/persisted' import {Provider as LabelDefsProvider} from '#/state/preferences/label-defs' import {Provider as ModerationOptsProvider} from '#/state/preferences/moderation-opts' @@ -83,22 +84,24 @@ function InnerApp() { // Resets the entire tree below when it changes: key={currentAccount?.did}> - - {/* LabelDefsProvider MUST come before ModerationOptsProvider */} - - - - - - - - - - - - - - + + + {/* LabelDefsProvider MUST come before ModerationOptsProvider */} + + + + + + + + + + + + + + + @@ -112,12 +115,7 @@ function App() { const [isReady, setReady] = useState(false) React.useEffect(() => { - initPersistedState().then(() => { - setReady(true) - - const preloadElement = document.getElementById('preload') - preloadElement?.remove() - }) + initPersistedState().then(() => setReady(true)) }, []) if (!isReady) { diff --git a/src/state/messages/events/agent.ts b/src/state/messages/events/agent.ts new file mode 100644 index 00000000..c75f42ba --- /dev/null +++ b/src/state/messages/events/agent.ts @@ -0,0 +1,466 @@ +import {BskyAgent, ChatBskyConvoGetLog} from '@atproto-labs/api' +import EventEmitter from 'eventemitter3' +import {nanoid} from 'nanoid/non-secure' + +import {logger} from '#/logger' +import { + MessagesEventBusDispatch, + MessagesEventBusDispatchEvent, + MessagesEventBusError, + MessagesEventBusErrorCode, + MessagesEventBusParams, + MessagesEventBusState, + MessagesEventBusStatus, +} from '#/state/messages/events/types' + +const LOGGER_CONTEXT = 'MessagesEventBus' + +const ACTIVE_POLL_INTERVAL = 60e3 +const BACKGROUND_POLL_INTERVAL = 60e3 + +export class MessagesEventBus { + private id: string + + private agent: BskyAgent + private __tempFromUserDid: string + private emitter = new EventEmitter() + + private status: MessagesEventBusStatus = MessagesEventBusStatus.Uninitialized + private pollInterval = ACTIVE_POLL_INTERVAL + private error: MessagesEventBusError | undefined + private latestRev: string | undefined = undefined + + snapshot: MessagesEventBusState | undefined + + constructor(params: MessagesEventBusParams) { + this.id = nanoid(3) + this.agent = params.agent + this.__tempFromUserDid = params.__tempFromUserDid + + this.subscribe = this.subscribe.bind(this) + this.getSnapshot = this.getSnapshot.bind(this) + this.init = this.init.bind(this) + this.suspend = this.suspend.bind(this) + this.resume = this.resume.bind(this) + this.setPollInterval = this.setPollInterval.bind(this) + this.trail = this.trail.bind(this) + this.trailConvo = this.trailConvo.bind(this) + } + + private commit() { + this.snapshot = undefined + this.subscribers.forEach(subscriber => subscriber()) + } + + private subscribers: (() => void)[] = [] + + subscribe(subscriber: () => void) { + if (this.subscribers.length === 0) this.init() + + this.subscribers.push(subscriber) + + return () => { + this.subscribers = this.subscribers.filter(s => s !== subscriber) + if (this.subscribers.length === 0) this.suspend() + } + } + + getSnapshot(): MessagesEventBusState { + if (!this.snapshot) this.snapshot = this.generateSnapshot() + // logger.debug(`${LOGGER_CONTEXT}: snapshotted`, {}, logger.DebugContext.convo) + return this.snapshot + } + + private generateSnapshot(): MessagesEventBusState { + switch (this.status) { + case MessagesEventBusStatus.Initializing: { + return { + status: MessagesEventBusStatus.Initializing, + rev: undefined, + error: undefined, + setPollInterval: this.setPollInterval, + trail: this.trail, + trailConvo: this.trailConvo, + } + } + case MessagesEventBusStatus.Ready: { + return { + status: this.status, + rev: this.latestRev!, + error: undefined, + setPollInterval: this.setPollInterval, + trail: this.trail, + trailConvo: this.trailConvo, + } + } + case MessagesEventBusStatus.Suspended: { + return { + status: this.status, + rev: this.latestRev, + error: undefined, + setPollInterval: this.setPollInterval, + trail: this.trail, + trailConvo: this.trailConvo, + } + } + case MessagesEventBusStatus.Error: { + return { + status: MessagesEventBusStatus.Error, + rev: this.latestRev, + error: this.error || { + code: MessagesEventBusErrorCode.Unknown, + retry: () => { + this.init() + }, + }, + setPollInterval: this.setPollInterval, + trail: this.trail, + trailConvo: this.trailConvo, + } + } + default: { + return { + status: MessagesEventBusStatus.Uninitialized, + rev: undefined, + error: undefined, + setPollInterval: this.setPollInterval, + trail: this.trail, + trailConvo: this.trailConvo, + } + } + } + } + + dispatch(action: MessagesEventBusDispatch) { + const prevStatus = this.status + + switch (this.status) { + case MessagesEventBusStatus.Uninitialized: { + switch (action.event) { + case MessagesEventBusDispatchEvent.Init: { + this.status = MessagesEventBusStatus.Initializing + this.setup() + break + } + } + break + } + case MessagesEventBusStatus.Initializing: { + switch (action.event) { + case MessagesEventBusDispatchEvent.Ready: { + this.status = MessagesEventBusStatus.Ready + this.setPollInterval(ACTIVE_POLL_INTERVAL) + break + } + case MessagesEventBusDispatchEvent.Background: { + this.status = MessagesEventBusStatus.Backgrounded + this.setPollInterval(BACKGROUND_POLL_INTERVAL) + break + } + case MessagesEventBusDispatchEvent.Suspend: { + this.status = MessagesEventBusStatus.Suspended + break + } + case MessagesEventBusDispatchEvent.Error: { + this.status = MessagesEventBusStatus.Error + this.error = action.payload + break + } + } + break + } + case MessagesEventBusStatus.Ready: { + switch (action.event) { + case MessagesEventBusDispatchEvent.Background: { + this.status = MessagesEventBusStatus.Backgrounded + this.setPollInterval(BACKGROUND_POLL_INTERVAL) + break + } + case MessagesEventBusDispatchEvent.Suspend: { + this.status = MessagesEventBusStatus.Suspended + this.stopPoll() + break + } + case MessagesEventBusDispatchEvent.Error: { + this.status = MessagesEventBusStatus.Error + this.error = action.payload + this.stopPoll() + break + } + } + break + } + case MessagesEventBusStatus.Backgrounded: { + switch (action.event) { + case MessagesEventBusDispatchEvent.Resume: { + this.status = MessagesEventBusStatus.Ready + this.setPollInterval(ACTIVE_POLL_INTERVAL) + break + } + case MessagesEventBusDispatchEvent.Suspend: { + this.status = MessagesEventBusStatus.Suspended + this.stopPoll() + break + } + case MessagesEventBusDispatchEvent.Error: { + this.status = MessagesEventBusStatus.Error + this.error = action.payload + this.stopPoll() + break + } + } + break + } + case MessagesEventBusStatus.Suspended: { + switch (action.event) { + case MessagesEventBusDispatchEvent.Resume: { + this.status = MessagesEventBusStatus.Ready + this.setPollInterval(ACTIVE_POLL_INTERVAL) + break + } + case MessagesEventBusDispatchEvent.Background: { + this.status = MessagesEventBusStatus.Backgrounded + this.setPollInterval(BACKGROUND_POLL_INTERVAL) + break + } + case MessagesEventBusDispatchEvent.Error: { + this.status = MessagesEventBusStatus.Error + this.error = action.payload + this.stopPoll() + break + } + } + break + } + case MessagesEventBusStatus.Error: { + switch (action.event) { + case MessagesEventBusDispatchEvent.Resume: + case MessagesEventBusDispatchEvent.Init: { + this.status = MessagesEventBusStatus.Initializing + this.error = undefined + this.latestRev = undefined + this.setup() + break + } + } + break + } + default: + break + } + + logger.debug( + `${LOGGER_CONTEXT}: dispatch '${action.event}'`, + { + id: this.id, + prev: prevStatus, + next: this.status, + }, + logger.DebugContext.convo, + ) + + this.commit() + } + + private async setup() { + logger.debug(`${LOGGER_CONTEXT}: setup`, {}, logger.DebugContext.convo) + + try { + await this.initializeLatestRev() + this.dispatch({event: MessagesEventBusDispatchEvent.Ready}) + } catch (e: any) { + logger.error(e, { + context: `${LOGGER_CONTEXT}: setup failed`, + }) + + this.dispatch({ + event: MessagesEventBusDispatchEvent.Error, + payload: { + exception: e, + code: MessagesEventBusErrorCode.InitFailed, + retry: () => { + this.init() + }, + }, + }) + } + } + + init() { + logger.debug(`${LOGGER_CONTEXT}: init`, {}, logger.DebugContext.convo) + this.dispatch({event: MessagesEventBusDispatchEvent.Init}) + } + + background() { + logger.debug(`${LOGGER_CONTEXT}: background`, {}, logger.DebugContext.convo) + this.dispatch({event: MessagesEventBusDispatchEvent.Background}) + } + + suspend() { + logger.debug(`${LOGGER_CONTEXT}: suspend`, {}, logger.DebugContext.convo) + this.dispatch({event: MessagesEventBusDispatchEvent.Suspend}) + } + + resume() { + logger.debug(`${LOGGER_CONTEXT}: resume`, {}, logger.DebugContext.convo) + this.dispatch({event: MessagesEventBusDispatchEvent.Resume}) + } + + setPollInterval(interval: number) { + this.pollInterval = interval + this.resetPoll() + } + + trail(handler: (events: ChatBskyConvoGetLog.OutputSchema['logs']) => void) { + this.emitter.on('events', handler) + return () => { + this.emitter.off('events', handler) + } + } + + trailConvo( + convoId: string, + handler: (events: ChatBskyConvoGetLog.OutputSchema['logs']) => void, + ) { + const handle = (events: ChatBskyConvoGetLog.OutputSchema['logs']) => { + const convoEvents = events.filter(ev => { + if (typeof ev.convoId === 'string' && ev.convoId === convoId) { + return ev.convoId === convoId + } + return false + }) + + if (convoEvents.length > 0) { + handler(convoEvents) + } + } + + this.emitter.on('events', handle) + return () => { + this.emitter.off('events', handle) + } + } + + private async initializeLatestRev() { + logger.debug( + `${LOGGER_CONTEXT}: initialize latest rev`, + {}, + logger.DebugContext.convo, + ) + + const response = await this.agent.api.chat.bsky.convo.listConvos( + { + limit: 1, + }, + { + headers: { + Authorization: this.__tempFromUserDid, + }, + }, + ) + + const {convos} = response.data + + for (const convo of convos) { + if (convo.rev > (this.latestRev = this.latestRev || convo.rev)) { + this.latestRev = convo.rev + } + } + } + + /* + * Polling + */ + + private isPolling = false + private pollIntervalRef: NodeJS.Timeout | undefined + + private resetPoll() { + this.stopPoll() + this.startPoll() + } + + private startPoll() { + if (!this.isPolling) this.poll() + + this.pollIntervalRef = setInterval(() => { + if (this.isPolling) return + this.poll() + }, this.pollInterval) + } + + private stopPoll() { + if (this.pollIntervalRef) clearInterval(this.pollIntervalRef) + } + + private async poll() { + if (this.isPolling) return + + this.isPolling = true + + logger.debug(`${LOGGER_CONTEXT}: poll`, {}, logger.DebugContext.convo) + + try { + const response = await this.agent.api.chat.bsky.convo.getLog( + { + cursor: this.latestRev, + }, + { + headers: { + Authorization: this.__tempFromUserDid, + }, + }, + ) + + const {logs: events} = response.data + + let needsEmit = false + let batch: ChatBskyConvoGetLog.OutputSchema['logs'] = [] + + for (const ev of events) { + /* + * If there's a rev, we should handle it. If there's not a rev, we don't + * know what it is. + */ + if (typeof ev.rev === 'string') { + /* + * We only care about new events + */ + if (ev.rev > (this.latestRev = this.latestRev || ev.rev)) { + /* + * Update rev regardless of if it's a ev type we care about or not + */ + this.latestRev = ev.rev + needsEmit = true + batch.push(ev) + } + } + } + + if (needsEmit) { + try { + this.emitter.emit('events', batch) + } catch (e: any) { + logger.error(e, { + context: `${LOGGER_CONTEXT}: process latest events`, + }) + } + } + } catch (e: any) { + logger.error(e, {context: `${LOGGER_CONTEXT}: poll events failed`}) + + this.dispatch({ + event: MessagesEventBusDispatchEvent.Error, + payload: { + exception: e, + code: MessagesEventBusErrorCode.PollFailed, + retry: () => { + this.init() + }, + }, + }) + } finally { + this.isPolling = false + } + } +} diff --git a/src/state/messages/events/index.tsx b/src/state/messages/events/index.tsx new file mode 100644 index 00000000..f37d0abe --- /dev/null +++ b/src/state/messages/events/index.tsx @@ -0,0 +1,67 @@ +import React from 'react' +import {AppState} from 'react-native' +import {BskyAgent} from '@atproto-labs/api' + +import {isWeb} from '#/platform/detection' +import {MessagesEventBus} from '#/state/messages/events/agent' +import {MessagesEventBusState} from '#/state/messages/events/types' +import {useAgent} from '#/state/session' +import {useDmServiceUrlStorage} from '#/screens/Messages/Temp/useDmServiceUrlStorage' +import {IS_DEV} from '#/env' + +const MessagesEventBusContext = + React.createContext(null) + +export function useMessagesEventBus() { + const ctx = React.useContext(MessagesEventBusContext) + if (!ctx) { + throw new Error('useChat must be used within a ChatProvider') + } + return ctx +} + +export function MessagesEventBusProvider({ + children, +}: { + children: React.ReactNode +}) { + const {serviceUrl} = useDmServiceUrlStorage() + const {getAgent} = useAgent() + const [bus] = React.useState( + () => + new MessagesEventBus({ + agent: new BskyAgent({ + service: serviceUrl, + }), + __tempFromUserDid: getAgent().session?.did!, + }), + ) + const service = React.useSyncExternalStore(bus.subscribe, bus.getSnapshot) + + if (isWeb && IS_DEV) { + // @ts-ignore + window.messagesEventBus = service + } + + React.useEffect(() => { + const handleAppStateChange = (nextAppState: string) => { + if (nextAppState === 'active') { + bus.resume() + } else { + bus.background() + } + } + + const sub = AppState.addEventListener('change', handleAppStateChange) + + return () => { + sub.remove() + } + }, [bus]) + + return ( + + {children} + + ) +} diff --git a/src/state/messages/events/types.ts b/src/state/messages/events/types.ts new file mode 100644 index 00000000..52083b2c --- /dev/null +++ b/src/state/messages/events/types.ts @@ -0,0 +1,111 @@ +import {BskyAgent, ChatBskyConvoGetLog} from '@atproto-labs/api' + +export type MessagesEventBusParams = { + agent: BskyAgent + __tempFromUserDid: string +} + +export enum MessagesEventBusStatus { + Uninitialized = 'uninitialized', + Initializing = 'initializing', + Ready = 'ready', + Error = 'error', + Backgrounded = 'backgrounded', + Suspended = 'suspended', +} + +export enum MessagesEventBusDispatchEvent { + Init = 'init', + Ready = 'ready', + Error = 'error', + Background = 'background', + Suspend = 'suspend', + Resume = 'resume', +} + +export enum MessagesEventBusErrorCode { + Unknown = 'unknown', + InitFailed = 'initFailed', + PollFailed = 'pollFailed', +} + +export type MessagesEventBusError = { + code: MessagesEventBusErrorCode + exception?: Error + retry: () => void +} + +export type MessagesEventBusDispatch = + | { + event: MessagesEventBusDispatchEvent.Init + } + | { + event: MessagesEventBusDispatchEvent.Ready + } + | { + event: MessagesEventBusDispatchEvent.Background + } + | { + event: MessagesEventBusDispatchEvent.Suspend + } + | { + event: MessagesEventBusDispatchEvent.Resume + } + | { + event: MessagesEventBusDispatchEvent.Error + payload: MessagesEventBusError + } + +export type TrailHandler = ( + events: ChatBskyConvoGetLog.OutputSchema['logs'], +) => void + +export type MessagesEventBusState = + | { + status: MessagesEventBusStatus.Uninitialized + rev: undefined + error: undefined + setPollInterval: (interval: number) => void + trail: (handler: TrailHandler) => () => void + trailConvo: (convoId: string, handler: TrailHandler) => () => void + } + | { + status: MessagesEventBusStatus.Initializing + rev: undefined + error: undefined + setPollInterval: (interval: number) => void + trail: (handler: TrailHandler) => () => void + trailConvo: (convoId: string, handler: TrailHandler) => () => void + } + | { + status: MessagesEventBusStatus.Ready + rev: string + error: undefined + setPollInterval: (interval: number) => void + trail: (handler: TrailHandler) => () => void + trailConvo: (convoId: string, handler: TrailHandler) => () => void + } + | { + status: MessagesEventBusStatus.Backgrounded + rev: string | undefined + error: undefined + setPollInterval: (interval: number) => void + trail: (handler: TrailHandler) => () => void + trailConvo: (convoId: string, handler: TrailHandler) => () => void + } + | { + status: MessagesEventBusStatus.Suspended + rev: string | undefined + error: undefined + setPollInterval: (interval: number) => void + trail: (handler: TrailHandler) => () => void + trailConvo: (convoId: string, handler: TrailHandler) => () => void + } + | { + status: MessagesEventBusStatus.Error + rev: string | undefined + error: MessagesEventBusError + setPollInterval: (interval: number) => void + trail: (handler: TrailHandler) => () => void + trailConvo: (convoId: string, handler: TrailHandler) => () => void + }