[🐴] Integrate event bus (#3915)

* Integrate event bus

* Fixes

* Move events mgmt into Convo class

* Clean up poll interval updates

* Remove unused

* Remove annoying log
zio/stable
Eric Bailey 2024-05-08 18:01:07 -05:00 committed by GitHub
parent ce2eddca8e
commit 3bac0182b5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 120 additions and 126 deletions

View File

@ -9,6 +9,10 @@ import {nanoid} from 'nanoid/non-secure'
import {logger} from '#/logger' import {logger} from '#/logger'
import {isNative} from '#/platform/detection' import {isNative} from '#/platform/detection'
import {
ACTIVE_POLL_INTERVAL,
BACKGROUND_POLL_INTERVAL,
} from '#/state/messages/convo/const'
import { import {
ConvoDispatch, ConvoDispatch,
ConvoDispatchEvent, ConvoDispatchEvent,
@ -19,9 +23,8 @@ import {
ConvoState, ConvoState,
ConvoStatus, ConvoStatus,
} from '#/state/messages/convo/types' } from '#/state/messages/convo/types'
import {MessagesEventBus} from '#/state/messages/events/agent'
const ACTIVE_POLL_INTERVAL = 1e3 import {MessagesEventBusError} from '#/state/messages/events/types'
const BACKGROUND_POLL_INTERVAL = 10e3
// TODO temporary // TODO temporary
let DEBUG_ACTIVE_CHAT: string | undefined let DEBUG_ACTIVE_CHAT: string | undefined
@ -41,10 +44,10 @@ export class Convo {
private id: string private id: string
private agent: BskyAgent private agent: BskyAgent
private events: MessagesEventBus
private __tempFromUserDid: string private __tempFromUserDid: string
private status: ConvoStatus = ConvoStatus.Uninitialized private status: ConvoStatus = ConvoStatus.Uninitialized
private pollInterval = ACTIVE_POLL_INTERVAL
private error: private error:
| { | {
code: ConvoErrorCode code: ConvoErrorCode
@ -52,9 +55,9 @@ export class Convo {
retry: () => void retry: () => void
} }
| undefined | undefined
private historyCursor: string | undefined | null = undefined private oldestRev: string | undefined | null = undefined
private isFetchingHistory = false private isFetchingHistory = false
private eventsCursor: string | undefined = undefined private latestRev: string | undefined = undefined
private pastMessages: Map< private pastMessages: Map<
string, string,
@ -73,7 +76,6 @@ export class Convo {
private headerItems: Map<string, ConvoItem> = new Map() private headerItems: Map<string, ConvoItem> = new Map()
private isProcessingPendingMessages = false private isProcessingPendingMessages = false
private nextPoll: NodeJS.Timeout | undefined
convoId: string convoId: string
convo: ChatBskyConvoDefs.ConvoView | undefined convo: ChatBskyConvoDefs.ConvoView | undefined
@ -85,6 +87,7 @@ export class Convo {
this.id = nanoid(3) this.id = nanoid(3)
this.convoId = params.convoId this.convoId = params.convoId
this.agent = params.agent this.agent = params.agent
this.events = params.events
this.__tempFromUserDid = params.__tempFromUserDid this.__tempFromUserDid = params.__tempFromUserDid
this.subscribe = this.subscribe.bind(this) this.subscribe = this.subscribe.bind(this)
@ -92,6 +95,9 @@ export class Convo {
this.sendMessage = this.sendMessage.bind(this) this.sendMessage = this.sendMessage.bind(this)
this.deleteMessage = this.deleteMessage.bind(this) this.deleteMessage = this.deleteMessage.bind(this)
this.fetchMessageHistory = this.fetchMessageHistory.bind(this) this.fetchMessageHistory = this.fetchMessageHistory.bind(this)
this.ingestFirehose = this.ingestFirehose.bind(this)
this.onFirehoseConnect = this.onFirehoseConnect.bind(this)
this.onFirehoseError = this.onFirehoseError.bind(this)
if (DEBUG_ACTIVE_CHAT) { if (DEBUG_ACTIVE_CHAT) {
logger.error(`Convo: another chat was already active`, { logger.error(`Convo: another chat was already active`, {
@ -100,6 +106,12 @@ export class Convo {
} else { } else {
DEBUG_ACTIVE_CHAT = this.convoId DEBUG_ACTIVE_CHAT = this.convoId
} }
this.events.trailConvo(this.convoId, events => {
this.ingestFirehose(events)
})
this.events.onConnect(this.onFirehoseConnect)
this.events.onError(this.onFirehoseError)
} }
private commit() { private commit() {
@ -198,6 +210,7 @@ export class Convo {
case ConvoDispatchEvent.Init: { case ConvoDispatchEvent.Init: {
this.status = ConvoStatus.Initializing this.status = ConvoStatus.Initializing
this.setup() this.setup()
this.requestPollInterval(ACTIVE_POLL_INTERVAL)
break break
} }
} }
@ -207,27 +220,24 @@ export class Convo {
switch (action.event) { switch (action.event) {
case ConvoDispatchEvent.Ready: { case ConvoDispatchEvent.Ready: {
this.status = ConvoStatus.Ready this.status = ConvoStatus.Ready
this.pollInterval = ACTIVE_POLL_INTERVAL this.fetchMessageHistory()
this.fetchMessageHistory().then(() => {
this.restartPoll()
})
break break
} }
case ConvoDispatchEvent.Background: { case ConvoDispatchEvent.Background: {
this.status = ConvoStatus.Backgrounded this.status = ConvoStatus.Backgrounded
this.pollInterval = BACKGROUND_POLL_INTERVAL this.fetchMessageHistory()
this.fetchMessageHistory().then(() => { this.requestPollInterval(BACKGROUND_POLL_INTERVAL)
this.restartPoll()
})
break break
} }
case ConvoDispatchEvent.Suspend: { case ConvoDispatchEvent.Suspend: {
this.status = ConvoStatus.Suspended this.status = ConvoStatus.Suspended
this.withdrawRequestedPollInterval()
break break
} }
case ConvoDispatchEvent.Error: { case ConvoDispatchEvent.Error: {
this.status = ConvoStatus.Error this.status = ConvoStatus.Error
this.error = action.payload this.error = action.payload
this.withdrawRequestedPollInterval()
break break
} }
} }
@ -237,24 +247,23 @@ export class Convo {
switch (action.event) { switch (action.event) {
case ConvoDispatchEvent.Resume: { case ConvoDispatchEvent.Resume: {
this.refreshConvo() this.refreshConvo()
this.restartPoll() this.requestPollInterval(ACTIVE_POLL_INTERVAL)
break break
} }
case ConvoDispatchEvent.Background: { case ConvoDispatchEvent.Background: {
this.status = ConvoStatus.Backgrounded this.status = ConvoStatus.Backgrounded
this.pollInterval = BACKGROUND_POLL_INTERVAL this.requestPollInterval(BACKGROUND_POLL_INTERVAL)
this.restartPoll()
break break
} }
case ConvoDispatchEvent.Suspend: { case ConvoDispatchEvent.Suspend: {
this.status = ConvoStatus.Suspended this.status = ConvoStatus.Suspended
this.cancelNextPoll() this.withdrawRequestedPollInterval()
break break
} }
case ConvoDispatchEvent.Error: { case ConvoDispatchEvent.Error: {
this.status = ConvoStatus.Error this.status = ConvoStatus.Error
this.error = action.payload this.error = action.payload
this.cancelNextPoll() this.withdrawRequestedPollInterval()
break break
} }
} }
@ -262,23 +271,27 @@ export class Convo {
} }
case ConvoStatus.Backgrounded: { case ConvoStatus.Backgrounded: {
switch (action.event) { switch (action.event) {
case ConvoDispatchEvent.Resume: {
this.status = ConvoStatus.Ready
this.pollInterval = ACTIVE_POLL_INTERVAL
this.refreshConvo()
// TODO truncate history if needed // TODO truncate history if needed
this.restartPoll() case ConvoDispatchEvent.Resume: {
if (this.convo) {
this.status = ConvoStatus.Ready
this.refreshConvo()
} else {
this.status = ConvoStatus.Initializing
this.setup()
}
this.requestPollInterval(ACTIVE_POLL_INTERVAL)
break break
} }
case ConvoDispatchEvent.Suspend: { case ConvoDispatchEvent.Suspend: {
this.status = ConvoStatus.Suspended this.status = ConvoStatus.Suspended
this.cancelNextPoll() this.withdrawRequestedPollInterval()
break break
} }
case ConvoDispatchEvent.Error: { case ConvoDispatchEvent.Error: {
this.status = ConvoStatus.Error this.status = ConvoStatus.Error
this.error = action.payload this.error = action.payload
this.cancelNextPoll() this.withdrawRequestedPollInterval()
break break
} }
} }
@ -287,18 +300,11 @@ export class Convo {
case ConvoStatus.Suspended: { case ConvoStatus.Suspended: {
switch (action.event) { switch (action.event) {
case ConvoDispatchEvent.Init: { case ConvoDispatchEvent.Init: {
this.status = ConvoStatus.Ready this.reset()
this.pollInterval = ACTIVE_POLL_INTERVAL
this.refreshConvo()
// TODO truncate history if needed
this.restartPoll()
break break
} }
case ConvoDispatchEvent.Resume: { case ConvoDispatchEvent.Resume: {
this.status = ConvoStatus.Ready this.reset()
this.pollInterval = ACTIVE_POLL_INTERVAL
this.refreshConvo()
this.restartPoll()
break break
} }
case ConvoDispatchEvent.Error: { case ConvoDispatchEvent.Error: {
@ -356,8 +362,8 @@ export class Convo {
this.status = ConvoStatus.Uninitialized this.status = ConvoStatus.Uninitialized
this.error = undefined this.error = undefined
this.historyCursor = undefined this.oldestRev = undefined
this.eventsCursor = undefined this.latestRev = undefined
this.pastMessages = new Map() this.pastMessages = new Map()
this.newMessages = new Map() this.newMessages = new Map()
@ -426,6 +432,17 @@ export class Convo {
DEBUG_ACTIVE_CHAT = undefined DEBUG_ACTIVE_CHAT = undefined
} }
private requestedPollInterval: (() => void) | undefined
private requestPollInterval(interval: number) {
this.withdrawRequestedPollInterval()
this.requestedPollInterval = this.events.requestPollInterval(interval)
}
private withdrawRequestedPollInterval() {
if (this.requestedPollInterval) {
this.requestedPollInterval()
}
}
private pendingFetchConvo: private pendingFetchConvo:
| Promise<{ | Promise<{
convo: ChatBskyConvoDefs.ConvoView convo: ChatBskyConvoDefs.ConvoView
@ -499,9 +516,9 @@ export class Convo {
logger.debug('Convo: fetch message history', {}, logger.DebugContext.convo) logger.debug('Convo: fetch message history', {}, logger.DebugContext.convo)
/* /*
* If historyCursor is null, we've fetched all history. * If oldestRev is null, we've fetched all history.
*/ */
if (this.historyCursor === null) return if (this.oldestRev === null) return
/* /*
* Don't fetch again if a fetch is already in progress * Don't fetch again if a fetch is already in progress
@ -529,7 +546,7 @@ export class Convo {
const response = await this.agent.api.chat.bsky.convo.getMessages( const response = await this.agent.api.chat.bsky.convo.getMessages(
{ {
cursor: this.historyCursor, cursor: this.oldestRev,
convoId: this.convoId, convoId: this.convoId,
limit: isNative ? 25 : 50, limit: isNative ? 25 : 50,
}, },
@ -541,21 +558,22 @@ export class Convo {
) )
const {cursor, messages} = response.data const {cursor, messages} = response.data
this.historyCursor = cursor ?? null this.oldestRev = cursor ?? null
for (const message of messages) { for (const message of messages) {
if ( if (
ChatBskyConvoDefs.isMessageView(message) || ChatBskyConvoDefs.isMessageView(message) ||
ChatBskyConvoDefs.isDeletedMessageView(message) ChatBskyConvoDefs.isDeletedMessageView(message)
) { ) {
this.pastMessages.set(message.id, message) /*
* If this message is already in new messages, it was added by the
// set to latest rev * firehose ingestion, and we can safely overwrite it. This trusts
if ( * the server on ordering, and keeps it in sync.
message.rev > (this.eventsCursor = this.eventsCursor || message.rev) */
) { if (this.newMessages.has(message.id)) {
this.eventsCursor = message.rev this.newMessages.delete(message.id)
} }
this.pastMessages.set(message.id, message)
} }
} }
} catch (e: any) { } catch (e: any) {
@ -576,33 +594,12 @@ export class Convo {
} }
} }
private restartPoll() { onFirehoseConnect() {
this.cancelNextPoll() this.footerItems.delete(ConvoItemError.PollFailed)
this.pollLatestEvents() this.commit()
} }
private cancelNextPoll() { onFirehoseError(error?: MessagesEventBusError) {
if (this.nextPoll) clearTimeout(this.nextPoll)
}
private pollLatestEvents() {
/*
* Uncomment to view poll events
*/
logger.debug('Convo: poll events', {id: this.id}, logger.DebugContext.convo)
try {
this.fetchLatestEvents().then(({events}) => {
this.applyLatestEvents(events)
})
this.nextPoll = setTimeout(() => {
this.pollLatestEvents()
}, this.pollInterval)
} catch (e: any) {
logger.error('Convo: poll events failed')
this.cancelNextPoll()
this.footerItems.set(ConvoItemError.PollFailed, { this.footerItems.set(ConvoItemError.PollFailed, {
type: 'error-recoverable', type: 'error-recoverable',
key: ConvoItemError.PollFailed, key: ConvoItemError.PollFailed,
@ -610,50 +607,13 @@ export class Convo {
retry: () => { retry: () => {
this.footerItems.delete(ConvoItemError.PollFailed) this.footerItems.delete(ConvoItemError.PollFailed)
this.commit() this.commit()
this.pollLatestEvents() error?.retry()
}, },
}) })
this.commit() this.commit()
} }
}
private pendingFetchLatestEvents: ingestFirehose(events: ChatBskyConvoGetLog.OutputSchema['logs']) {
| Promise<{
events: ChatBskyConvoGetLog.OutputSchema['logs']
}>
| undefined
async fetchLatestEvents() {
if (this.pendingFetchLatestEvents) return this.pendingFetchLatestEvents
this.pendingFetchLatestEvents = new Promise<{
events: ChatBskyConvoGetLog.OutputSchema['logs']
}>(async (resolve, reject) => {
try {
// throw new Error('UNCOMMENT TO TEST POLL FAILURE')
const response = await this.agent.api.chat.bsky.convo.getLog(
{
cursor: this.eventsCursor,
},
{
headers: {
Authorization: this.__tempFromUserDid,
},
},
)
const {logs} = response.data
resolve({events: logs})
} catch (e) {
reject(e)
} finally {
this.pendingFetchLatestEvents = undefined
}
})
return this.pendingFetchLatestEvents
}
private applyLatestEvents(events: ChatBskyConvoGetLog.OutputSchema['logs']) {
let needsCommit = false let needsCommit = false
for (const ev of events) { for (const ev of events) {
@ -662,14 +622,25 @@ export class Convo {
* know what it is. * know what it is.
*/ */
if (typeof ev.rev === 'string') { if (typeof ev.rev === 'string') {
const isUninitialized = !this.latestRev
const isNewEvent = this.latestRev && ev.rev > this.latestRev
/*
* We received an event prior to fetching any history, so we can safely
* use this as the initial history cursor
*/
if (this.oldestRev === undefined && isUninitialized) {
this.oldestRev = ev.rev
}
/* /*
* We only care about new events * We only care about new events
*/ */
if (ev.rev > (this.eventsCursor = this.eventsCursor || ev.rev)) { if (isNewEvent || isUninitialized) {
/* /*
* Update rev regardless of if it's a ev type we care about or not * Update rev regardless of if it's a ev type we care about or not
*/ */
this.eventsCursor = ev.rev this.latestRev = ev.rev
/* /*
* This is VERY important. We don't want to insert any messages from * This is VERY important. We don't want to insert any messages from
@ -681,8 +652,14 @@ export class Convo {
ChatBskyConvoDefs.isLogCreateMessage(ev) && ChatBskyConvoDefs.isLogCreateMessage(ev) &&
ChatBskyConvoDefs.isMessageView(ev.message) ChatBskyConvoDefs.isMessageView(ev.message)
) { ) {
/**
* If this message is already in new messages, it was added by our
* sending logic, and is based on client-ordering. When we receive
* the "commited" event from the log, we should replace this
* reference and re-insert in order to respect the order we receied
* from the log.
*/
if (this.newMessages.has(ev.message.id)) { if (this.newMessages.has(ev.message.id)) {
// Trust the ev as the source of truth on ordering
this.newMessages.delete(ev.message.id) this.newMessages.delete(ev.message.id)
} }
this.newMessages.set(ev.message.id, ev.message) this.newMessages.set(ev.message.id, ev.message)
@ -694,6 +671,7 @@ export class Convo {
/* /*
* Update if we have this in state. If we don't, don't worry about it. * Update if we have this in state. If we don't, don't worry about it.
*/ */
// TODO check for other storage spots
if (this.pastMessages.has(ev.message.id)) { if (this.pastMessages.has(ev.message.id)) {
/* /*
* For now, we remove deleted messages from the thread, if we receive one. * For now, we remove deleted messages from the thread, if we receive one.

View File

@ -0,0 +1,2 @@
export const ACTIVE_POLL_INTERVAL = 1e3
export const BACKGROUND_POLL_INTERVAL = 5e3

View File

@ -5,6 +5,7 @@ import {useFocusEffect, useIsFocused} from '@react-navigation/native'
import {Convo} from '#/state/messages/convo/agent' import {Convo} from '#/state/messages/convo/agent'
import {ConvoParams, ConvoState} from '#/state/messages/convo/types' import {ConvoParams, ConvoState} from '#/state/messages/convo/types'
import {useMessagesEventBus} from '#/state/messages/events'
import {useMarkAsReadMutation} from '#/state/queries/messages/conversation' import {useMarkAsReadMutation} from '#/state/queries/messages/conversation'
import {useAgent} from '#/state/session' import {useAgent} from '#/state/session'
import {useDmServiceUrlStorage} from '#/screens/Messages/Temp/useDmServiceUrlStorage' import {useDmServiceUrlStorage} from '#/screens/Messages/Temp/useDmServiceUrlStorage'
@ -26,6 +27,7 @@ export function ConvoProvider({
const isScreenFocused = useIsFocused() const isScreenFocused = useIsFocused()
const {serviceUrl} = useDmServiceUrlStorage() const {serviceUrl} = useDmServiceUrlStorage()
const {getAgent} = useAgent() const {getAgent} = useAgent()
const events = useMessagesEventBus()
const [convo] = useState( const [convo] = useState(
() => () =>
new Convo({ new Convo({
@ -33,6 +35,7 @@ export function ConvoProvider({
agent: new BskyAgent({ agent: new BskyAgent({
service: serviceUrl, service: serviceUrl,
}), }),
events,
__tempFromUserDid: getAgent().session?.did!, __tempFromUserDid: getAgent().session?.did!,
}), }),
) )

View File

@ -5,9 +5,12 @@ import {
ChatBskyConvoSendMessage, ChatBskyConvoSendMessage,
} from '@atproto-labs/api' } from '@atproto-labs/api'
import {MessagesEventBus} from '#/state/messages/events/agent'
export type ConvoParams = { export type ConvoParams = {
convoId: string convoId: string
agent: BskyAgent agent: BskyAgent
events: MessagesEventBus
__tempFromUserDid: string __tempFromUserDid: string
} }

View File

@ -347,7 +347,15 @@ export class MessagesEventBus {
this.isPolling = true this.isPolling = true
logger.debug(`${LOGGER_CONTEXT}: poll`, {}, logger.DebugContext.convo) // logger.debug(
// `${LOGGER_CONTEXT}: poll`,
// {
// requestedPollIntervals: Array.from(
// this.requestedPollIntervals.values(),
// ),
// },
// logger.DebugContext.convo,
// )
try { try {
const response = await this.agent.api.chat.bsky.convo.getLog( const response = await this.agent.api.chat.bsky.convo.getLog(