[🐴] Refactor event bus (#3919)

* Refactor to singleton class outside react

* Fix retry, remove debug logs
zio/stable
Eric Bailey 2024-05-08 17:41:10 -05:00 committed by GitHub
parent 0c6bf276dd
commit ce2eddca8e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 250 additions and 356 deletions

View File

@ -3,316 +3,51 @@ import EventEmitter from 'eventemitter3'
import {nanoid} from 'nanoid/non-secure'
import {logger} from '#/logger'
import {DEFAULT_POLL_INTERVAL} from '#/state/messages/events/const'
import {
MessagesEventBusDispatch,
MessagesEventBusDispatchEvent,
MessagesEventBusError,
MessagesEventBusErrorCode,
MessagesEventBusEvents,
MessagesEventBusParams,
MessagesEventBusState,
MessagesEventBusStatus,
} from '#/state/messages/events/types'
const LOGGER_CONTEXT = 'MessagesEventBus'
const DEFAULT_POLL_INTERVAL = 60e3
export class MessagesEventBus {
private id: string
private agent: BskyAgent
private __tempFromUserDid: string
private emitter = new EventEmitter()
private emitter = new EventEmitter<MessagesEventBusEvents>()
private status: MessagesEventBusStatus = MessagesEventBusStatus.Uninitialized
private status: MessagesEventBusStatus = MessagesEventBusStatus.Initializing
private error: MessagesEventBusError | undefined
private latestRev: string | undefined = undefined
private pollInterval = DEFAULT_POLL_INTERVAL
private requestedPollIntervals: Map<string, number> = new Map()
snapshot: MessagesEventBusState | undefined
constructor(params: MessagesEventBusParams) {
this.id = nanoid(3)
this.agent = params.agent
this.__tempFromUserDid = params.__tempFromUserDid
this.subscribe = this.subscribe.bind(this)
this.getSnapshot = this.getSnapshot.bind(this)
this.init = this.init.bind(this)
this.suspend = this.suspend.bind(this)
this.resume = this.resume.bind(this)
this.requestPollInterval = this.requestPollInterval.bind(this)
this.trail = this.trail.bind(this)
this.trailConvo = this.trailConvo.bind(this)
}
private commit() {
this.snapshot = undefined
this.subscribers.forEach(subscriber => subscriber())
}
private subscribers: (() => void)[] = []
subscribe(subscriber: () => void) {
if (this.subscribers.length === 0) this.init()
this.subscribers.push(subscriber)
return () => {
this.subscribers = this.subscribers.filter(s => s !== subscriber)
if (this.subscribers.length === 0) this.suspend()
}
}
getSnapshot(): MessagesEventBusState {
if (!this.snapshot) this.snapshot = this.generateSnapshot()
// logger.debug(`${LOGGER_CONTEXT}: snapshotted`, {}, logger.DebugContext.convo)
return this.snapshot
}
private generateSnapshot(): MessagesEventBusState {
switch (this.status) {
case MessagesEventBusStatus.Initializing: {
return {
status: MessagesEventBusStatus.Initializing,
rev: undefined,
error: undefined,
requestPollInterval: this.requestPollInterval,
trail: this.trail,
trailConvo: this.trailConvo,
}
}
case MessagesEventBusStatus.Ready: {
return {
status: this.status,
rev: this.latestRev!,
error: undefined,
requestPollInterval: this.requestPollInterval,
trail: this.trail,
trailConvo: this.trailConvo,
}
}
case MessagesEventBusStatus.Suspended: {
return {
status: this.status,
rev: this.latestRev,
error: undefined,
requestPollInterval: this.requestPollInterval,
trail: this.trail,
trailConvo: this.trailConvo,
}
}
case MessagesEventBusStatus.Error: {
return {
status: MessagesEventBusStatus.Error,
rev: this.latestRev,
error: this.error || {
code: MessagesEventBusErrorCode.Unknown,
retry: () => {
this.init()
},
},
requestPollInterval: this.requestPollInterval,
trail: this.trail,
trailConvo: this.trailConvo,
}
}
default: {
return {
status: MessagesEventBusStatus.Uninitialized,
rev: undefined,
error: undefined,
requestPollInterval: this.requestPollInterval,
trail: this.trail,
trailConvo: this.trailConvo,
}
}
}
}
dispatch(action: MessagesEventBusDispatch) {
const prevStatus = this.status
switch (this.status) {
case MessagesEventBusStatus.Uninitialized: {
switch (action.event) {
case MessagesEventBusDispatchEvent.Init: {
this.status = MessagesEventBusStatus.Initializing
this.setup()
break
}
}
break
}
case MessagesEventBusStatus.Initializing: {
switch (action.event) {
case MessagesEventBusDispatchEvent.Ready: {
this.status = MessagesEventBusStatus.Ready
this.resetPoll()
break
}
case MessagesEventBusDispatchEvent.Background: {
this.status = MessagesEventBusStatus.Backgrounded
this.resetPoll()
break
}
case MessagesEventBusDispatchEvent.Suspend: {
this.status = MessagesEventBusStatus.Suspended
break
}
case MessagesEventBusDispatchEvent.Error: {
this.status = MessagesEventBusStatus.Error
this.error = action.payload
break
}
}
break
}
case MessagesEventBusStatus.Ready: {
switch (action.event) {
case MessagesEventBusDispatchEvent.Background: {
this.status = MessagesEventBusStatus.Backgrounded
this.resetPoll()
break
}
case MessagesEventBusDispatchEvent.Suspend: {
this.status = MessagesEventBusStatus.Suspended
this.stopPoll()
break
}
case MessagesEventBusDispatchEvent.Error: {
this.status = MessagesEventBusStatus.Error
this.error = action.payload
this.stopPoll()
break
}
}
break
}
case MessagesEventBusStatus.Backgrounded: {
switch (action.event) {
case MessagesEventBusDispatchEvent.Resume: {
this.status = MessagesEventBusStatus.Ready
this.resetPoll()
break
}
case MessagesEventBusDispatchEvent.Suspend: {
this.status = MessagesEventBusStatus.Suspended
this.stopPoll()
break
}
case MessagesEventBusDispatchEvent.Error: {
this.status = MessagesEventBusStatus.Error
this.error = action.payload
this.stopPoll()
break
}
}
break
}
case MessagesEventBusStatus.Suspended: {
switch (action.event) {
case MessagesEventBusDispatchEvent.Resume: {
this.status = MessagesEventBusStatus.Ready
this.resetPoll()
break
}
case MessagesEventBusDispatchEvent.Background: {
this.status = MessagesEventBusStatus.Backgrounded
this.resetPoll()
break
}
case MessagesEventBusDispatchEvent.Error: {
this.status = MessagesEventBusStatus.Error
this.error = action.payload
this.stopPoll()
break
}
}
break
}
case MessagesEventBusStatus.Error: {
switch (action.event) {
case MessagesEventBusDispatchEvent.Resume:
case MessagesEventBusDispatchEvent.Init: {
this.status = MessagesEventBusStatus.Initializing
this.error = undefined
this.latestRev = undefined
this.setup()
break
}
}
break
}
default:
break
}
logger.debug(
`${LOGGER_CONTEXT}: dispatch '${action.event}'`,
{
id: this.id,
prev: prevStatus,
next: this.status,
},
logger.DebugContext.convo,
)
this.commit()
}
private async setup() {
logger.debug(`${LOGGER_CONTEXT}: setup`, {}, logger.DebugContext.convo)
try {
await this.initializeLatestRev()
this.dispatch({event: MessagesEventBusDispatchEvent.Ready})
} catch (e: any) {
logger.error(e, {
context: `${LOGGER_CONTEXT}: setup failed`,
})
this.dispatch({
event: MessagesEventBusDispatchEvent.Error,
payload: {
exception: e,
code: MessagesEventBusErrorCode.InitFailed,
retry: () => {
this.init()
},
},
})
}
}
init() {
logger.debug(`${LOGGER_CONTEXT}: init`, {}, logger.DebugContext.convo)
this.dispatch({event: MessagesEventBusDispatchEvent.Init})
}
background() {
logger.debug(`${LOGGER_CONTEXT}: background`, {}, logger.DebugContext.convo)
this.dispatch({event: MessagesEventBusDispatchEvent.Background})
}
suspend() {
logger.debug(`${LOGGER_CONTEXT}: suspend`, {}, logger.DebugContext.convo)
this.dispatch({event: MessagesEventBusDispatchEvent.Suspend})
}
resume() {
logger.debug(`${LOGGER_CONTEXT}: resume`, {}, logger.DebugContext.convo)
this.dispatch({event: MessagesEventBusDispatchEvent.Resume})
this.init()
}
requestPollInterval(interval: number) {
const id = nanoid()
this.requestedPollIntervals.set(id, interval)
this.resetPoll()
this.dispatch({
event: MessagesEventBusDispatchEvent.UpdatePoll,
})
return () => {
this.requestedPollIntervals.delete(id)
this.resetPoll()
this.dispatch({
event: MessagesEventBusDispatchEvent.UpdatePoll,
})
}
}
@ -346,30 +81,226 @@ export class MessagesEventBus {
}
}
private async initializeLatestRev() {
getLatestRev() {
return this.latestRev
}
onConnect(handler: () => void) {
this.emitter.on('connect', handler)
if (
this.status === MessagesEventBusStatus.Ready ||
this.status === MessagesEventBusStatus.Backgrounded ||
this.status === MessagesEventBusStatus.Suspended
) {
handler()
}
return () => {
this.emitter.off('connect', handler)
}
}
onError(handler: (payload?: MessagesEventBusError) => void) {
this.emitter.on('error', handler)
if (this.status === MessagesEventBusStatus.Error) {
handler(this.error)
}
return () => {
this.emitter.off('error', handler)
}
}
background() {
logger.debug(`${LOGGER_CONTEXT}: background`, {}, logger.DebugContext.convo)
this.dispatch({event: MessagesEventBusDispatchEvent.Background})
}
suspend() {
logger.debug(`${LOGGER_CONTEXT}: suspend`, {}, logger.DebugContext.convo)
this.dispatch({event: MessagesEventBusDispatchEvent.Suspend})
}
resume() {
logger.debug(`${LOGGER_CONTEXT}: resume`, {}, logger.DebugContext.convo)
this.dispatch({event: MessagesEventBusDispatchEvent.Resume})
}
private dispatch(action: MessagesEventBusDispatch) {
const prevStatus = this.status
switch (this.status) {
case MessagesEventBusStatus.Initializing: {
switch (action.event) {
case MessagesEventBusDispatchEvent.Ready: {
this.status = MessagesEventBusStatus.Ready
this.resetPoll()
this.emitter.emit('connect')
break
}
case MessagesEventBusDispatchEvent.Background: {
this.status = MessagesEventBusStatus.Backgrounded
this.resetPoll()
this.emitter.emit('connect')
break
}
case MessagesEventBusDispatchEvent.Suspend: {
this.status = MessagesEventBusStatus.Suspended
break
}
case MessagesEventBusDispatchEvent.Error: {
this.status = MessagesEventBusStatus.Error
this.error = action.payload
this.emitter.emit('error', action.payload)
break
}
}
break
}
case MessagesEventBusStatus.Ready: {
switch (action.event) {
case MessagesEventBusDispatchEvent.Background: {
this.status = MessagesEventBusStatus.Backgrounded
this.resetPoll()
break
}
case MessagesEventBusDispatchEvent.Suspend: {
this.status = MessagesEventBusStatus.Suspended
this.stopPoll()
break
}
case MessagesEventBusDispatchEvent.Error: {
this.status = MessagesEventBusStatus.Error
this.error = action.payload
this.stopPoll()
this.emitter.emit('error', action.payload)
break
}
case MessagesEventBusDispatchEvent.UpdatePoll: {
this.resetPoll()
break
}
}
break
}
case MessagesEventBusStatus.Backgrounded: {
switch (action.event) {
case MessagesEventBusDispatchEvent.Resume: {
this.status = MessagesEventBusStatus.Ready
this.resetPoll()
break
}
case MessagesEventBusDispatchEvent.Suspend: {
this.status = MessagesEventBusStatus.Suspended
this.stopPoll()
break
}
case MessagesEventBusDispatchEvent.Error: {
this.status = MessagesEventBusStatus.Error
this.error = action.payload
this.stopPoll()
this.emitter.emit('error', action.payload)
break
}
case MessagesEventBusDispatchEvent.UpdatePoll: {
this.resetPoll()
break
}
}
break
}
case MessagesEventBusStatus.Suspended: {
switch (action.event) {
case MessagesEventBusDispatchEvent.Resume: {
this.status = MessagesEventBusStatus.Ready
this.resetPoll()
break
}
case MessagesEventBusDispatchEvent.Background: {
this.status = MessagesEventBusStatus.Backgrounded
this.resetPoll()
break
}
case MessagesEventBusDispatchEvent.Error: {
this.status = MessagesEventBusStatus.Error
this.error = action.payload
this.stopPoll()
this.emitter.emit('error', action.payload)
break
}
}
break
}
case MessagesEventBusStatus.Error: {
switch (action.event) {
case MessagesEventBusDispatchEvent.Resume: {
// basically reset
this.status = MessagesEventBusStatus.Initializing
this.error = undefined
this.latestRev = undefined
this.init()
break
}
}
break
}
default:
break
}
logger.debug(
`${LOGGER_CONTEXT}: initialize latest rev`,
{},
`${LOGGER_CONTEXT}: dispatch '${action.event}'`,
{
id: this.id,
prev: prevStatus,
next: this.status,
},
logger.DebugContext.convo,
)
}
const response = await this.agent.api.chat.bsky.convo.listConvos(
{
limit: 1,
},
{
headers: {
Authorization: this.__tempFromUserDid,
private async init() {
logger.debug(`${LOGGER_CONTEXT}: init`, {}, logger.DebugContext.convo)
try {
const response = await this.agent.api.chat.bsky.convo.listConvos(
{
limit: 1,
},
},
)
{
headers: {
Authorization: this.__tempFromUserDid,
},
},
)
// throw new Error('UNCOMMENT TO TEST INIT FAILURE')
const {convos} = response.data
const {convos} = response.data
for (const convo of convos) {
if (convo.rev > (this.latestRev = this.latestRev || convo.rev)) {
this.latestRev = convo.rev
for (const convo of convos) {
if (convo.rev > (this.latestRev = this.latestRev || convo.rev)) {
this.latestRev = convo.rev
}
}
this.dispatch({event: MessagesEventBusDispatchEvent.Ready})
} catch (e: any) {
logger.error(e, {
context: `${LOGGER_CONTEXT}: init failed`,
})
this.dispatch({
event: MessagesEventBusDispatchEvent.Error,
payload: {
exception: e,
code: MessagesEventBusErrorCode.InitFailed,
retry: () => {
this.dispatch({event: MessagesEventBusDispatchEvent.Resume})
},
},
})
}
}
@ -430,6 +361,8 @@ export class MessagesEventBus {
},
)
// throw new Error('UNCOMMENT TO TEST POLL FAILURE')
const {logs: events} = response.data
let needsEmit = false
@ -473,7 +406,7 @@ export class MessagesEventBus {
exception: e,
code: MessagesEventBusErrorCode.PollFailed,
retry: () => {
this.init()
this.dispatch({event: MessagesEventBusDispatchEvent.Resume})
},
},
})

View File

@ -0,0 +1 @@
export const DEFAULT_POLL_INTERVAL = 20e3

View File

@ -5,13 +5,13 @@ import {BskyAgent} from '@atproto-labs/api'
import {useGate} from '#/lib/statsig/statsig'
import {isWeb} from '#/platform/detection'
import {MessagesEventBus} from '#/state/messages/events/agent'
import {MessagesEventBusState} from '#/state/messages/events/types'
import {useAgent} from '#/state/session'
import {useDmServiceUrlStorage} from '#/screens/Messages/Temp/useDmServiceUrlStorage'
import {IS_DEV} from '#/env'
const MessagesEventBusContext =
React.createContext<MessagesEventBusState | null>(null)
const MessagesEventBusContext = React.createContext<MessagesEventBus | null>(
null,
)
export function useMessagesEventBus() {
const ctx = React.useContext(MessagesEventBusContext)
@ -37,12 +37,13 @@ export function Temp_MessagesEventBusProvider({
__tempFromUserDid: getAgent().session?.did!,
}),
)
const service = React.useSyncExternalStore(bus.subscribe, bus.getSnapshot)
if (isWeb && IS_DEV) {
// @ts-ignore
window.messagesEventBus = service
}
React.useEffect(() => {
if (isWeb && IS_DEV) {
// @ts-ignore
window.bus = bus
}
}, [bus])
React.useEffect(() => {
const handleAppStateChange = (nextAppState: string) => {
@ -61,7 +62,7 @@ export function Temp_MessagesEventBusProvider({
}, [bus])
return (
<MessagesEventBusContext.Provider value={service}>
<MessagesEventBusContext.Provider value={bus}>
{children}
</MessagesEventBusContext.Provider>
)

View File

@ -6,7 +6,6 @@ export type MessagesEventBusParams = {
}
export enum MessagesEventBusStatus {
Uninitialized = 'uninitialized',
Initializing = 'initializing',
Ready = 'ready',
Error = 'error',
@ -15,12 +14,12 @@ export enum MessagesEventBusStatus {
}
export enum MessagesEventBusDispatchEvent {
Init = 'init',
Ready = 'ready',
Error = 'error',
Background = 'background',
Suspend = 'suspend',
Resume = 'resume',
UpdatePoll = 'updatePoll',
}
export enum MessagesEventBusErrorCode {
@ -36,9 +35,6 @@ export type MessagesEventBusError = {
}
export type MessagesEventBusDispatch =
| {
event: MessagesEventBusDispatchEvent.Init
}
| {
event: MessagesEventBusDispatchEvent.Ready
}
@ -55,59 +51,22 @@ export type MessagesEventBusDispatch =
event: MessagesEventBusDispatchEvent.Error
payload: MessagesEventBusError
}
| {
event: MessagesEventBusDispatchEvent.UpdatePoll
}
export type TrailHandler = (
events: ChatBskyConvoGetLog.OutputSchema['logs'],
) => void
export type RequestPollIntervalHandler = (interval: number) => () => void
export type OnConnectHandler = (handler: () => void) => () => void
export type OnDisconnectHandler = (
handler: (error?: MessagesEventBusError) => void,
) => () => void
export type MessagesEventBusState =
| {
status: MessagesEventBusStatus.Uninitialized
rev: undefined
error: undefined
requestPollInterval: RequestPollIntervalHandler
trail: (handler: TrailHandler) => () => void
trailConvo: (convoId: string, handler: TrailHandler) => () => void
}
| {
status: MessagesEventBusStatus.Initializing
rev: undefined
error: undefined
requestPollInterval: RequestPollIntervalHandler
trail: (handler: TrailHandler) => () => void
trailConvo: (convoId: string, handler: TrailHandler) => () => void
}
| {
status: MessagesEventBusStatus.Ready
rev: string
error: undefined
requestPollInterval: RequestPollIntervalHandler
trail: (handler: TrailHandler) => () => void
trailConvo: (convoId: string, handler: TrailHandler) => () => void
}
| {
status: MessagesEventBusStatus.Backgrounded
rev: string | undefined
error: undefined
requestPollInterval: RequestPollIntervalHandler
trail: (handler: TrailHandler) => () => void
trailConvo: (convoId: string, handler: TrailHandler) => () => void
}
| {
status: MessagesEventBusStatus.Suspended
rev: string | undefined
error: undefined
requestPollInterval: RequestPollIntervalHandler
trail: (handler: TrailHandler) => () => void
trailConvo: (convoId: string, handler: TrailHandler) => () => void
}
| {
status: MessagesEventBusStatus.Error
rev: string | undefined
error: MessagesEventBusError
requestPollInterval: RequestPollIntervalHandler
trail: (handler: TrailHandler) => () => void
trailConvo: (convoId: string, handler: TrailHandler) => () => void
}
export type MessagesEventBusEvents = {
events: [ChatBskyConvoGetLog.OutputSchema['logs']]
connect: undefined
error: [MessagesEventBusError] | undefined
}