[🐴] Simplify message passing, cleanup (#3952)

* Simplify message passing

* Setup/teardown events
zio/stable
Eric Bailey 2024-05-10 12:44:55 -05:00 committed by GitHub
parent ab21aafc28
commit f84a2def29
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 86 additions and 88 deletions

View File

@ -107,12 +107,6 @@ export class Convo {
} else { } else {
DEBUG_ACTIVE_CHAT = this.convoId DEBUG_ACTIVE_CHAT = this.convoId
} }
this.events.trailConvo(this.convoId, events => {
this.ingestFirehose(events)
})
this.events.onConnect(this.onFirehoseConnect)
this.events.onError(this.onFirehoseError)
} }
private commit() { private commit() {
@ -211,6 +205,7 @@ export class Convo {
case ConvoDispatchEvent.Init: { case ConvoDispatchEvent.Init: {
this.status = ConvoStatus.Initializing this.status = ConvoStatus.Initializing
this.setup() this.setup()
this.setupFirehose()
this.requestPollInterval(ACTIVE_POLL_INTERVAL) this.requestPollInterval(ACTIVE_POLL_INTERVAL)
break break
} }
@ -232,12 +227,14 @@ export class Convo {
} }
case ConvoDispatchEvent.Suspend: { case ConvoDispatchEvent.Suspend: {
this.status = ConvoStatus.Suspended this.status = ConvoStatus.Suspended
this.cleanupFirehoseConnection?.()
this.withdrawRequestedPollInterval() this.withdrawRequestedPollInterval()
break break
} }
case ConvoDispatchEvent.Error: { case ConvoDispatchEvent.Error: {
this.status = ConvoStatus.Error this.status = ConvoStatus.Error
this.error = action.payload this.error = action.payload
this.cleanupFirehoseConnection?.()
this.withdrawRequestedPollInterval() this.withdrawRequestedPollInterval()
break break
} }
@ -258,12 +255,14 @@ export class Convo {
} }
case ConvoDispatchEvent.Suspend: { case ConvoDispatchEvent.Suspend: {
this.status = ConvoStatus.Suspended this.status = ConvoStatus.Suspended
this.cleanupFirehoseConnection?.()
this.withdrawRequestedPollInterval() this.withdrawRequestedPollInterval()
break break
} }
case ConvoDispatchEvent.Error: { case ConvoDispatchEvent.Error: {
this.status = ConvoStatus.Error this.status = ConvoStatus.Error
this.error = action.payload this.error = action.payload
this.cleanupFirehoseConnection?.()
this.withdrawRequestedPollInterval() this.withdrawRequestedPollInterval()
break break
} }
@ -286,12 +285,14 @@ export class Convo {
} }
case ConvoDispatchEvent.Suspend: { case ConvoDispatchEvent.Suspend: {
this.status = ConvoStatus.Suspended this.status = ConvoStatus.Suspended
this.cleanupFirehoseConnection?.()
this.withdrawRequestedPollInterval() this.withdrawRequestedPollInterval()
break break
} }
case ConvoDispatchEvent.Error: { case ConvoDispatchEvent.Error: {
this.status = ConvoStatus.Error this.status = ConvoStatus.Error
this.error = action.payload this.error = action.payload
this.cleanupFirehoseConnection?.()
this.withdrawRequestedPollInterval() this.withdrawRequestedPollInterval()
break break
} }
@ -601,6 +602,33 @@ export class Convo {
} }
} }
private cleanupFirehoseConnection: (() => void) | undefined
private setupFirehose() {
// remove old listeners, if exist
this.cleanupFirehoseConnection?.()
// reconnect
this.cleanupFirehoseConnection = this.events.on(
event => {
switch (event.type) {
case 'connect': {
this.onFirehoseConnect()
break
}
case 'error': {
this.onFirehoseError(event.error)
break
}
case 'logs': {
this.ingestFirehose(event.logs)
break
}
}
},
{convoId: this.convoId},
)
}
onFirehoseConnect() { onFirehoseConnect() {
this.footerItems.delete(ConvoItemError.FirehoseFailed) this.footerItems.delete(ConvoItemError.FirehoseFailed)
this.commit() this.commit()
@ -709,6 +737,8 @@ export class Convo {
id: tempId, id: tempId,
message, message,
}) })
// remove on each send, it might go through now without user having to click
this.footerItems.delete(ConvoItemError.PendingFailed)
this.commit() this.commit()
if (!this.isProcessingPendingMessages) { if (!this.isProcessingPendingMessages) {

View File

@ -8,9 +8,8 @@ import {DEFAULT_POLL_INTERVAL} from '#/state/messages/events/const'
import { import {
MessagesEventBusDispatch, MessagesEventBusDispatch,
MessagesEventBusDispatchEvent, MessagesEventBusDispatchEvent,
MessagesEventBusError,
MessagesEventBusErrorCode, MessagesEventBusErrorCode,
MessagesEventBusEvents, MessagesEventBusEvent,
MessagesEventBusParams, MessagesEventBusParams,
MessagesEventBusStatus, MessagesEventBusStatus,
} from '#/state/messages/events/types' } from '#/state/messages/events/types'
@ -22,10 +21,9 @@ export class MessagesEventBus {
private agent: BskyAgent private agent: BskyAgent
private __tempFromUserDid: string private __tempFromUserDid: string
private emitter = new EventEmitter<MessagesEventBusEvents>() private emitter = new EventEmitter<{event: [MessagesEventBusEvent]}>()
private status: MessagesEventBusStatus = MessagesEventBusStatus.Initializing private status: MessagesEventBusStatus = MessagesEventBusStatus.Initializing
private error: MessagesEventBusError | undefined
private latestRev: string | undefined = undefined private latestRev: string | undefined = undefined
private pollInterval = DEFAULT_POLL_INTERVAL private pollInterval = DEFAULT_POLL_INTERVAL
private requestedPollIntervals: Map<string, number> = new Map() private requestedPollIntervals: Map<string, number> = new Map()
@ -52,65 +50,43 @@ export class MessagesEventBus {
} }
} }
trail(handler: (events: ChatBskyConvoGetLog.OutputSchema['logs']) => void) {
this.emitter.on('events', handler)
return () => {
this.emitter.off('events', handler)
}
}
trailConvo(
convoId: string,
handler: (events: ChatBskyConvoGetLog.OutputSchema['logs']) => void,
) {
const handle = (events: ChatBskyConvoGetLog.OutputSchema['logs']) => {
const convoEvents = events.filter(ev => {
if (typeof ev.convoId === 'string' && ev.convoId === convoId) {
return ev.convoId === convoId
}
return false
})
if (convoEvents.length > 0) {
handler(convoEvents)
}
}
this.emitter.on('events', handle)
return () => {
this.emitter.off('events', handle)
}
}
getLatestRev() { getLatestRev() {
return this.latestRev return this.latestRev
} }
onConnect(handler: () => void) { on(
this.emitter.on('connect', handler) handler: (event: MessagesEventBusEvent) => void,
options: {
convoId?: string
},
) {
const handle = (event: MessagesEventBusEvent) => {
if (event.type === 'logs' && options.convoId) {
const filteredLogs = event.logs.filter(log => {
if (
typeof log.convoId === 'string' &&
log.convoId === options.convoId
) {
return log.convoId === options.convoId
}
return false
})
if ( if (filteredLogs.length > 0) {
this.status === MessagesEventBusStatus.Ready || handler({
this.status === MessagesEventBusStatus.Backgrounded || ...event,
this.status === MessagesEventBusStatus.Suspended logs: filteredLogs,
) { })
handler() }
} else {
handler(event)
}
} }
this.emitter.on('event', handle)
return () => { return () => {
this.emitter.off('connect', handler) this.emitter.off('event', handle)
}
}
onError(handler: (payload?: MessagesEventBusError) => void) {
this.emitter.on('error', handler)
if (this.status === MessagesEventBusStatus.Error) {
handler(this.error)
}
return () => {
this.emitter.off('error', handler)
} }
} }
@ -138,13 +114,13 @@ export class MessagesEventBus {
case MessagesEventBusDispatchEvent.Ready: { case MessagesEventBusDispatchEvent.Ready: {
this.status = MessagesEventBusStatus.Ready this.status = MessagesEventBusStatus.Ready
this.resetPoll() this.resetPoll()
this.emitter.emit('connect') this.emitter.emit('event', {type: 'connect'})
break break
} }
case MessagesEventBusDispatchEvent.Background: { case MessagesEventBusDispatchEvent.Background: {
this.status = MessagesEventBusStatus.Backgrounded this.status = MessagesEventBusStatus.Backgrounded
this.resetPoll() this.resetPoll()
this.emitter.emit('connect') this.emitter.emit('event', {type: 'connect'})
break break
} }
case MessagesEventBusDispatchEvent.Suspend: { case MessagesEventBusDispatchEvent.Suspend: {
@ -153,8 +129,7 @@ export class MessagesEventBus {
} }
case MessagesEventBusDispatchEvent.Error: { case MessagesEventBusDispatchEvent.Error: {
this.status = MessagesEventBusStatus.Error this.status = MessagesEventBusStatus.Error
this.error = action.payload this.emitter.emit('event', {type: 'error', error: action.payload})
this.emitter.emit('error', action.payload)
break break
} }
} }
@ -174,9 +149,8 @@ export class MessagesEventBus {
} }
case MessagesEventBusDispatchEvent.Error: { case MessagesEventBusDispatchEvent.Error: {
this.status = MessagesEventBusStatus.Error this.status = MessagesEventBusStatus.Error
this.error = action.payload
this.stopPoll() this.stopPoll()
this.emitter.emit('error', action.payload) this.emitter.emit('event', {type: 'error', error: action.payload})
break break
} }
case MessagesEventBusDispatchEvent.UpdatePoll: { case MessagesEventBusDispatchEvent.UpdatePoll: {
@ -200,9 +174,8 @@ export class MessagesEventBus {
} }
case MessagesEventBusDispatchEvent.Error: { case MessagesEventBusDispatchEvent.Error: {
this.status = MessagesEventBusStatus.Error this.status = MessagesEventBusStatus.Error
this.error = action.payload
this.stopPoll() this.stopPoll()
this.emitter.emit('error', action.payload) this.emitter.emit('event', {type: 'error', error: action.payload})
break break
} }
case MessagesEventBusDispatchEvent.UpdatePoll: { case MessagesEventBusDispatchEvent.UpdatePoll: {
@ -226,9 +199,8 @@ export class MessagesEventBus {
} }
case MessagesEventBusDispatchEvent.Error: { case MessagesEventBusDispatchEvent.Error: {
this.status = MessagesEventBusStatus.Error this.status = MessagesEventBusStatus.Error
this.error = action.payload
this.stopPoll() this.stopPoll()
this.emitter.emit('error', action.payload) this.emitter.emit('event', {type: 'error', error: action.payload})
break break
} }
} }
@ -239,7 +211,6 @@ export class MessagesEventBus {
case MessagesEventBusDispatchEvent.Resume: { case MessagesEventBusDispatchEvent.Resume: {
// basically reset // basically reset
this.status = MessagesEventBusStatus.Initializing this.status = MessagesEventBusStatus.Initializing
this.error = undefined
this.latestRev = undefined this.latestRev = undefined
this.init() this.init()
break break
@ -403,7 +374,7 @@ export class MessagesEventBus {
if (needsEmit) { if (needsEmit) {
try { try {
this.emitter.emit('events', batch) this.emitter.emit('event', {type: 'logs', logs: batch})
} catch (e: any) { } catch (e: any) {
logger.error(e, { logger.error(e, {
context: `${LOGGER_CONTEXT}: process latest events`, context: `${LOGGER_CONTEXT}: process latest events`,

View File

@ -55,18 +55,15 @@ export type MessagesEventBusDispatch =
event: MessagesEventBusDispatchEvent.UpdatePoll event: MessagesEventBusDispatchEvent.UpdatePoll
} }
export type TrailHandler = ( export type MessagesEventBusEvent =
events: ChatBskyConvoGetLog.OutputSchema['logs'], | {
) => void type: 'connect'
}
export type RequestPollIntervalHandler = (interval: number) => () => void | {
export type OnConnectHandler = (handler: () => void) => () => void type: 'error'
export type OnDisconnectHandler = ( error: MessagesEventBusError
handler: (error?: MessagesEventBusError) => void, }
) => () => void | {
type: 'logs'
export type MessagesEventBusEvents = { logs: ChatBskyConvoGetLog.OutputSchema['logs']
events: [ChatBskyConvoGetLog.OutputSchema['logs']] }
connect: undefined
error: [MessagesEventBusError] | undefined
}