[🐴] Global event mgmt (#3897)

* Add global event bus for messages logs

* Add rev to state

* Better handle error

* Clean up polling, add backgrounding

* Add trailConvo method

* Extend polling until we're ready for this
zio/stable
Eric Bailey 2024-05-07 17:54:34 -05:00 committed by GitHub
parent 0625a914bd
commit 87cb4c105e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 686 additions and 41 deletions

View File

@ -16,6 +16,7 @@ import {useQueryClient} from '@tanstack/react-query'
import {Provider as StatsigProvider} from '#/lib/statsig/statsig'
import {logger} from '#/logger'
import {MessagesEventBusProvider} from '#/state/messages/events'
import {init as initPersistedState} from '#/state/persisted'
import {Provider as LabelDefsProvider} from '#/state/preferences/label-defs'
import {Provider as ModerationOptsProvider} from '#/state/preferences/moderation-opts'
@ -95,25 +96,27 @@ function InnerApp() {
// Resets the entire tree below when it changes:
key={currentAccount?.did}>
<QueryProvider currentDid={currentAccount?.did}>
<PushNotificationsListener>
<StatsigProvider>
{/* LabelDefsProvider MUST come before ModerationOptsProvider */}
<LabelDefsProvider>
<ModerationOptsProvider>
<LoggedOutViewProvider>
<SelectedFeedProvider>
<UnreadNotifsProvider>
<GestureHandlerRootView style={s.h100pct}>
<TestCtrls />
<Shell />
</GestureHandlerRootView>
</UnreadNotifsProvider>
</SelectedFeedProvider>
</LoggedOutViewProvider>
</ModerationOptsProvider>
</LabelDefsProvider>
</StatsigProvider>
</PushNotificationsListener>
<MessagesEventBusProvider>
<PushNotificationsListener>
<StatsigProvider>
{/* LabelDefsProvider MUST come before ModerationOptsProvider */}
<LabelDefsProvider>
<ModerationOptsProvider>
<LoggedOutViewProvider>
<SelectedFeedProvider>
<UnreadNotifsProvider>
<GestureHandlerRootView style={s.h100pct}>
<TestCtrls />
<Shell />
</GestureHandlerRootView>
</UnreadNotifsProvider>
</SelectedFeedProvider>
</LoggedOutViewProvider>
</ModerationOptsProvider>
</LabelDefsProvider>
</StatsigProvider>
</PushNotificationsListener>
</MessagesEventBusProvider>
</QueryProvider>
</React.Fragment>
</RootSiblingParent>

View File

@ -9,6 +9,7 @@ import {useLingui} from '@lingui/react'
import {Provider as StatsigProvider} from '#/lib/statsig/statsig'
import {logger} from '#/logger'
import {MessagesEventBusProvider} from '#/state/messages/events'
import {init as initPersistedState} from '#/state/persisted'
import {Provider as LabelDefsProvider} from '#/state/preferences/label-defs'
import {Provider as ModerationOptsProvider} from '#/state/preferences/moderation-opts'
@ -83,22 +84,24 @@ function InnerApp() {
// Resets the entire tree below when it changes:
key={currentAccount?.did}>
<QueryProvider currentDid={currentAccount?.did}>
<StatsigProvider>
{/* LabelDefsProvider MUST come before ModerationOptsProvider */}
<LabelDefsProvider>
<ModerationOptsProvider>
<LoggedOutViewProvider>
<SelectedFeedProvider>
<UnreadNotifsProvider>
<SafeAreaProvider>
<Shell />
</SafeAreaProvider>
</UnreadNotifsProvider>
</SelectedFeedProvider>
</LoggedOutViewProvider>
</ModerationOptsProvider>
</LabelDefsProvider>
</StatsigProvider>
<MessagesEventBusProvider>
<StatsigProvider>
{/* LabelDefsProvider MUST come before ModerationOptsProvider */}
<LabelDefsProvider>
<ModerationOptsProvider>
<LoggedOutViewProvider>
<SelectedFeedProvider>
<UnreadNotifsProvider>
<SafeAreaProvider>
<Shell />
</SafeAreaProvider>
</UnreadNotifsProvider>
</SelectedFeedProvider>
</LoggedOutViewProvider>
</ModerationOptsProvider>
</LabelDefsProvider>
</StatsigProvider>
</MessagesEventBusProvider>
</QueryProvider>
</React.Fragment>
<ToastContainer />
@ -112,12 +115,7 @@ function App() {
const [isReady, setReady] = useState(false)
React.useEffect(() => {
initPersistedState().then(() => {
setReady(true)
const preloadElement = document.getElementById('preload')
preloadElement?.remove()
})
initPersistedState().then(() => setReady(true))
}, [])
if (!isReady) {

View File

@ -0,0 +1,466 @@
import {BskyAgent, ChatBskyConvoGetLog} from '@atproto-labs/api'
import EventEmitter from 'eventemitter3'
import {nanoid} from 'nanoid/non-secure'
import {logger} from '#/logger'
import {
MessagesEventBusDispatch,
MessagesEventBusDispatchEvent,
MessagesEventBusError,
MessagesEventBusErrorCode,
MessagesEventBusParams,
MessagesEventBusState,
MessagesEventBusStatus,
} from '#/state/messages/events/types'
const LOGGER_CONTEXT = 'MessagesEventBus'
const ACTIVE_POLL_INTERVAL = 60e3
const BACKGROUND_POLL_INTERVAL = 60e3
export class MessagesEventBus {
private id: string
private agent: BskyAgent
private __tempFromUserDid: string
private emitter = new EventEmitter()
private status: MessagesEventBusStatus = MessagesEventBusStatus.Uninitialized
private pollInterval = ACTIVE_POLL_INTERVAL
private error: MessagesEventBusError | undefined
private latestRev: string | undefined = undefined
snapshot: MessagesEventBusState | undefined
constructor(params: MessagesEventBusParams) {
this.id = nanoid(3)
this.agent = params.agent
this.__tempFromUserDid = params.__tempFromUserDid
this.subscribe = this.subscribe.bind(this)
this.getSnapshot = this.getSnapshot.bind(this)
this.init = this.init.bind(this)
this.suspend = this.suspend.bind(this)
this.resume = this.resume.bind(this)
this.setPollInterval = this.setPollInterval.bind(this)
this.trail = this.trail.bind(this)
this.trailConvo = this.trailConvo.bind(this)
}
private commit() {
this.snapshot = undefined
this.subscribers.forEach(subscriber => subscriber())
}
private subscribers: (() => void)[] = []
subscribe(subscriber: () => void) {
if (this.subscribers.length === 0) this.init()
this.subscribers.push(subscriber)
return () => {
this.subscribers = this.subscribers.filter(s => s !== subscriber)
if (this.subscribers.length === 0) this.suspend()
}
}
getSnapshot(): MessagesEventBusState {
if (!this.snapshot) this.snapshot = this.generateSnapshot()
// logger.debug(`${LOGGER_CONTEXT}: snapshotted`, {}, logger.DebugContext.convo)
return this.snapshot
}
private generateSnapshot(): MessagesEventBusState {
switch (this.status) {
case MessagesEventBusStatus.Initializing: {
return {
status: MessagesEventBusStatus.Initializing,
rev: undefined,
error: undefined,
setPollInterval: this.setPollInterval,
trail: this.trail,
trailConvo: this.trailConvo,
}
}
case MessagesEventBusStatus.Ready: {
return {
status: this.status,
rev: this.latestRev!,
error: undefined,
setPollInterval: this.setPollInterval,
trail: this.trail,
trailConvo: this.trailConvo,
}
}
case MessagesEventBusStatus.Suspended: {
return {
status: this.status,
rev: this.latestRev,
error: undefined,
setPollInterval: this.setPollInterval,
trail: this.trail,
trailConvo: this.trailConvo,
}
}
case MessagesEventBusStatus.Error: {
return {
status: MessagesEventBusStatus.Error,
rev: this.latestRev,
error: this.error || {
code: MessagesEventBusErrorCode.Unknown,
retry: () => {
this.init()
},
},
setPollInterval: this.setPollInterval,
trail: this.trail,
trailConvo: this.trailConvo,
}
}
default: {
return {
status: MessagesEventBusStatus.Uninitialized,
rev: undefined,
error: undefined,
setPollInterval: this.setPollInterval,
trail: this.trail,
trailConvo: this.trailConvo,
}
}
}
}
dispatch(action: MessagesEventBusDispatch) {
const prevStatus = this.status
switch (this.status) {
case MessagesEventBusStatus.Uninitialized: {
switch (action.event) {
case MessagesEventBusDispatchEvent.Init: {
this.status = MessagesEventBusStatus.Initializing
this.setup()
break
}
}
break
}
case MessagesEventBusStatus.Initializing: {
switch (action.event) {
case MessagesEventBusDispatchEvent.Ready: {
this.status = MessagesEventBusStatus.Ready
this.setPollInterval(ACTIVE_POLL_INTERVAL)
break
}
case MessagesEventBusDispatchEvent.Background: {
this.status = MessagesEventBusStatus.Backgrounded
this.setPollInterval(BACKGROUND_POLL_INTERVAL)
break
}
case MessagesEventBusDispatchEvent.Suspend: {
this.status = MessagesEventBusStatus.Suspended
break
}
case MessagesEventBusDispatchEvent.Error: {
this.status = MessagesEventBusStatus.Error
this.error = action.payload
break
}
}
break
}
case MessagesEventBusStatus.Ready: {
switch (action.event) {
case MessagesEventBusDispatchEvent.Background: {
this.status = MessagesEventBusStatus.Backgrounded
this.setPollInterval(BACKGROUND_POLL_INTERVAL)
break
}
case MessagesEventBusDispatchEvent.Suspend: {
this.status = MessagesEventBusStatus.Suspended
this.stopPoll()
break
}
case MessagesEventBusDispatchEvent.Error: {
this.status = MessagesEventBusStatus.Error
this.error = action.payload
this.stopPoll()
break
}
}
break
}
case MessagesEventBusStatus.Backgrounded: {
switch (action.event) {
case MessagesEventBusDispatchEvent.Resume: {
this.status = MessagesEventBusStatus.Ready
this.setPollInterval(ACTIVE_POLL_INTERVAL)
break
}
case MessagesEventBusDispatchEvent.Suspend: {
this.status = MessagesEventBusStatus.Suspended
this.stopPoll()
break
}
case MessagesEventBusDispatchEvent.Error: {
this.status = MessagesEventBusStatus.Error
this.error = action.payload
this.stopPoll()
break
}
}
break
}
case MessagesEventBusStatus.Suspended: {
switch (action.event) {
case MessagesEventBusDispatchEvent.Resume: {
this.status = MessagesEventBusStatus.Ready
this.setPollInterval(ACTIVE_POLL_INTERVAL)
break
}
case MessagesEventBusDispatchEvent.Background: {
this.status = MessagesEventBusStatus.Backgrounded
this.setPollInterval(BACKGROUND_POLL_INTERVAL)
break
}
case MessagesEventBusDispatchEvent.Error: {
this.status = MessagesEventBusStatus.Error
this.error = action.payload
this.stopPoll()
break
}
}
break
}
case MessagesEventBusStatus.Error: {
switch (action.event) {
case MessagesEventBusDispatchEvent.Resume:
case MessagesEventBusDispatchEvent.Init: {
this.status = MessagesEventBusStatus.Initializing
this.error = undefined
this.latestRev = undefined
this.setup()
break
}
}
break
}
default:
break
}
logger.debug(
`${LOGGER_CONTEXT}: dispatch '${action.event}'`,
{
id: this.id,
prev: prevStatus,
next: this.status,
},
logger.DebugContext.convo,
)
this.commit()
}
private async setup() {
logger.debug(`${LOGGER_CONTEXT}: setup`, {}, logger.DebugContext.convo)
try {
await this.initializeLatestRev()
this.dispatch({event: MessagesEventBusDispatchEvent.Ready})
} catch (e: any) {
logger.error(e, {
context: `${LOGGER_CONTEXT}: setup failed`,
})
this.dispatch({
event: MessagesEventBusDispatchEvent.Error,
payload: {
exception: e,
code: MessagesEventBusErrorCode.InitFailed,
retry: () => {
this.init()
},
},
})
}
}
init() {
logger.debug(`${LOGGER_CONTEXT}: init`, {}, logger.DebugContext.convo)
this.dispatch({event: MessagesEventBusDispatchEvent.Init})
}
background() {
logger.debug(`${LOGGER_CONTEXT}: background`, {}, logger.DebugContext.convo)
this.dispatch({event: MessagesEventBusDispatchEvent.Background})
}
suspend() {
logger.debug(`${LOGGER_CONTEXT}: suspend`, {}, logger.DebugContext.convo)
this.dispatch({event: MessagesEventBusDispatchEvent.Suspend})
}
resume() {
logger.debug(`${LOGGER_CONTEXT}: resume`, {}, logger.DebugContext.convo)
this.dispatch({event: MessagesEventBusDispatchEvent.Resume})
}
setPollInterval(interval: number) {
this.pollInterval = interval
this.resetPoll()
}
trail(handler: (events: ChatBskyConvoGetLog.OutputSchema['logs']) => void) {
this.emitter.on('events', handler)
return () => {
this.emitter.off('events', handler)
}
}
trailConvo(
convoId: string,
handler: (events: ChatBskyConvoGetLog.OutputSchema['logs']) => void,
) {
const handle = (events: ChatBskyConvoGetLog.OutputSchema['logs']) => {
const convoEvents = events.filter(ev => {
if (typeof ev.convoId === 'string' && ev.convoId === convoId) {
return ev.convoId === convoId
}
return false
})
if (convoEvents.length > 0) {
handler(convoEvents)
}
}
this.emitter.on('events', handle)
return () => {
this.emitter.off('events', handle)
}
}
private async initializeLatestRev() {
logger.debug(
`${LOGGER_CONTEXT}: initialize latest rev`,
{},
logger.DebugContext.convo,
)
const response = await this.agent.api.chat.bsky.convo.listConvos(
{
limit: 1,
},
{
headers: {
Authorization: this.__tempFromUserDid,
},
},
)
const {convos} = response.data
for (const convo of convos) {
if (convo.rev > (this.latestRev = this.latestRev || convo.rev)) {
this.latestRev = convo.rev
}
}
}
/*
* Polling
*/
private isPolling = false
private pollIntervalRef: NodeJS.Timeout | undefined
private resetPoll() {
this.stopPoll()
this.startPoll()
}
private startPoll() {
if (!this.isPolling) this.poll()
this.pollIntervalRef = setInterval(() => {
if (this.isPolling) return
this.poll()
}, this.pollInterval)
}
private stopPoll() {
if (this.pollIntervalRef) clearInterval(this.pollIntervalRef)
}
private async poll() {
if (this.isPolling) return
this.isPolling = true
logger.debug(`${LOGGER_CONTEXT}: poll`, {}, logger.DebugContext.convo)
try {
const response = await this.agent.api.chat.bsky.convo.getLog(
{
cursor: this.latestRev,
},
{
headers: {
Authorization: this.__tempFromUserDid,
},
},
)
const {logs: events} = response.data
let needsEmit = false
let batch: ChatBskyConvoGetLog.OutputSchema['logs'] = []
for (const ev of events) {
/*
* If there's a rev, we should handle it. If there's not a rev, we don't
* know what it is.
*/
if (typeof ev.rev === 'string') {
/*
* We only care about new events
*/
if (ev.rev > (this.latestRev = this.latestRev || ev.rev)) {
/*
* Update rev regardless of if it's a ev type we care about or not
*/
this.latestRev = ev.rev
needsEmit = true
batch.push(ev)
}
}
}
if (needsEmit) {
try {
this.emitter.emit('events', batch)
} catch (e: any) {
logger.error(e, {
context: `${LOGGER_CONTEXT}: process latest events`,
})
}
}
} catch (e: any) {
logger.error(e, {context: `${LOGGER_CONTEXT}: poll events failed`})
this.dispatch({
event: MessagesEventBusDispatchEvent.Error,
payload: {
exception: e,
code: MessagesEventBusErrorCode.PollFailed,
retry: () => {
this.init()
},
},
})
} finally {
this.isPolling = false
}
}
}

View File

@ -0,0 +1,67 @@
import React from 'react'
import {AppState} from 'react-native'
import {BskyAgent} from '@atproto-labs/api'
import {isWeb} from '#/platform/detection'
import {MessagesEventBus} from '#/state/messages/events/agent'
import {MessagesEventBusState} from '#/state/messages/events/types'
import {useAgent} from '#/state/session'
import {useDmServiceUrlStorage} from '#/screens/Messages/Temp/useDmServiceUrlStorage'
import {IS_DEV} from '#/env'
const MessagesEventBusContext =
React.createContext<MessagesEventBusState | null>(null)
export function useMessagesEventBus() {
const ctx = React.useContext(MessagesEventBusContext)
if (!ctx) {
throw new Error('useChat must be used within a ChatProvider')
}
return ctx
}
export function MessagesEventBusProvider({
children,
}: {
children: React.ReactNode
}) {
const {serviceUrl} = useDmServiceUrlStorage()
const {getAgent} = useAgent()
const [bus] = React.useState(
() =>
new MessagesEventBus({
agent: new BskyAgent({
service: serviceUrl,
}),
__tempFromUserDid: getAgent().session?.did!,
}),
)
const service = React.useSyncExternalStore(bus.subscribe, bus.getSnapshot)
if (isWeb && IS_DEV) {
// @ts-ignore
window.messagesEventBus = service
}
React.useEffect(() => {
const handleAppStateChange = (nextAppState: string) => {
if (nextAppState === 'active') {
bus.resume()
} else {
bus.background()
}
}
const sub = AppState.addEventListener('change', handleAppStateChange)
return () => {
sub.remove()
}
}, [bus])
return (
<MessagesEventBusContext.Provider value={service}>
{children}
</MessagesEventBusContext.Provider>
)
}

View File

@ -0,0 +1,111 @@
import {BskyAgent, ChatBskyConvoGetLog} from '@atproto-labs/api'
export type MessagesEventBusParams = {
agent: BskyAgent
__tempFromUserDid: string
}
export enum MessagesEventBusStatus {
Uninitialized = 'uninitialized',
Initializing = 'initializing',
Ready = 'ready',
Error = 'error',
Backgrounded = 'backgrounded',
Suspended = 'suspended',
}
export enum MessagesEventBusDispatchEvent {
Init = 'init',
Ready = 'ready',
Error = 'error',
Background = 'background',
Suspend = 'suspend',
Resume = 'resume',
}
export enum MessagesEventBusErrorCode {
Unknown = 'unknown',
InitFailed = 'initFailed',
PollFailed = 'pollFailed',
}
export type MessagesEventBusError = {
code: MessagesEventBusErrorCode
exception?: Error
retry: () => void
}
export type MessagesEventBusDispatch =
| {
event: MessagesEventBusDispatchEvent.Init
}
| {
event: MessagesEventBusDispatchEvent.Ready
}
| {
event: MessagesEventBusDispatchEvent.Background
}
| {
event: MessagesEventBusDispatchEvent.Suspend
}
| {
event: MessagesEventBusDispatchEvent.Resume
}
| {
event: MessagesEventBusDispatchEvent.Error
payload: MessagesEventBusError
}
export type TrailHandler = (
events: ChatBskyConvoGetLog.OutputSchema['logs'],
) => void
export type MessagesEventBusState =
| {
status: MessagesEventBusStatus.Uninitialized
rev: undefined
error: undefined
setPollInterval: (interval: number) => void
trail: (handler: TrailHandler) => () => void
trailConvo: (convoId: string, handler: TrailHandler) => () => void
}
| {
status: MessagesEventBusStatus.Initializing
rev: undefined
error: undefined
setPollInterval: (interval: number) => void
trail: (handler: TrailHandler) => () => void
trailConvo: (convoId: string, handler: TrailHandler) => () => void
}
| {
status: MessagesEventBusStatus.Ready
rev: string
error: undefined
setPollInterval: (interval: number) => void
trail: (handler: TrailHandler) => () => void
trailConvo: (convoId: string, handler: TrailHandler) => () => void
}
| {
status: MessagesEventBusStatus.Backgrounded
rev: string | undefined
error: undefined
setPollInterval: (interval: number) => void
trail: (handler: TrailHandler) => () => void
trailConvo: (convoId: string, handler: TrailHandler) => () => void
}
| {
status: MessagesEventBusStatus.Suspended
rev: string | undefined
error: undefined
setPollInterval: (interval: number) => void
trail: (handler: TrailHandler) => () => void
trailConvo: (convoId: string, handler: TrailHandler) => () => void
}
| {
status: MessagesEventBusStatus.Error
rev: string | undefined
error: MessagesEventBusError
setPollInterval: (interval: number) => void
trail: (handler: TrailHandler) => () => void
trailConvo: (convoId: string, handler: TrailHandler) => () => void
}