diff --git a/src/screens/Messages/Conversation/MessageListError.tsx b/src/screens/Messages/Conversation/MessageListError.tsx index 523788d4..7c6fd02c 100644 --- a/src/screens/Messages/Conversation/MessageListError.tsx +++ b/src/screens/Messages/Conversation/MessageListError.tsx @@ -18,10 +18,10 @@ export function MessageListError({ const {_} = useLingui() const message = React.useMemo(() => { return { - [ConvoItemError.HistoryFailed]: _(msg`Failed to load past messages.`), - [ConvoItemError.ResumeFailed]: _( + [ConvoItemError.Network]: _( msg`There was an issue connecting to the chat.`, ), + [ConvoItemError.HistoryFailed]: _(msg`Failed to load past messages.`), [ConvoItemError.PollFailed]: _( msg`This chat was disconnected due to a network error.`, ), diff --git a/src/screens/Messages/Conversation/index.tsx b/src/screens/Messages/Conversation/index.tsx index 11044c21..2a4f14a5 100644 --- a/src/screens/Messages/Conversation/index.tsx +++ b/src/screens/Messages/Conversation/index.tsx @@ -18,6 +18,7 @@ import {PreviewableUserAvatar} from 'view/com/util/UserAvatar' import {CenteredView} from 'view/com/util/Views' import {MessagesList} from '#/screens/Messages/Conversation/MessagesList' import {atoms as a, useBreakpoints, useTheme} from '#/alf' +import {Button, ButtonText} from '#/components/Button' import {ConvoMenu} from '#/components/dms/ConvoMenu' import {ListMaybePlaceholder} from '#/components/Lists' import {Text} from '#/components/Typography' @@ -51,8 +52,21 @@ function Inner() { } if (chat.status === ConvoStatus.Error) { - // TODO error - return null + // TODO + return ( + + + Something went wrong + + + + ) } /* diff --git a/src/state/messages/convo.ts b/src/state/messages/convo.ts index 81ab94f4..de21ef39 100644 --- a/src/state/messages/convo.ts +++ b/src/state/messages/convo.ts @@ -2,6 +2,7 @@ import {AppBskyActorDefs} from '@atproto/api' import { BskyAgent, ChatBskyConvoDefs, + ChatBskyConvoGetLog, ChatBskyConvoSendMessage, } from '@atproto-labs/api' import {nanoid} from 'nanoid/non-secure' @@ -18,7 +19,6 @@ export type ConvoParams = { export enum ConvoStatus { Uninitialized = 'uninitialized', Initializing = 'initializing', - Resuming = 'resuming', Ready = 'ready', Error = 'error', Backgrounded = 'backgrounded', @@ -27,14 +27,50 @@ export enum ConvoStatus { export enum ConvoItemError { HistoryFailed = 'historyFailed', - ResumeFailed = 'resumeFailed', PollFailed = 'pollFailed', + Network = 'network', } -export enum ConvoError { +export enum ConvoErrorCode { InitFailed = 'initFailed', } +export type ConvoError = { + code: ConvoErrorCode + exception?: Error + retry: () => void +} + +export enum ConvoDispatchEvent { + Init = 'init', + Ready = 'ready', + Resume = 'resume', + Background = 'background', + Suspend = 'suspend', + Error = 'error', +} + +export type ConvoDispatch = + | { + event: ConvoDispatchEvent.Init + } + | { + event: ConvoDispatchEvent.Ready + } + | { + event: ConvoDispatchEvent.Resume + } + | { + event: ConvoDispatchEvent.Background + } + | { + event: ConvoDispatchEvent.Suspend + } + | { + event: ConvoDispatchEvent.Error + payload: ConvoError + } + export type ConvoItem = | { type: 'message' | 'pending-message' @@ -133,20 +169,6 @@ export type ConvoState = ) => Promise fetchMessageHistory: () => Promise } - | { - status: ConvoStatus.Resuming - items: ConvoItem[] - convo: ChatBskyConvoDefs.ConvoView - error: undefined - sender: AppBskyActorDefs.ProfileViewBasic - recipients: AppBskyActorDefs.ProfileViewBasic[] - isFetchingHistory: boolean - deleteMessage: (messageId: string) => Promise - sendMessage: ( - message: ChatBskyConvoSendMessage.InputSchema['message'], - ) => Promise - fetchMessageHistory: () => Promise - } | { status: ConvoStatus.Error items: [] @@ -160,9 +182,12 @@ export type ConvoState = fetchMessageHistory: undefined } -const ACTIVE_POLL_INTERVAL = 2e3 +const ACTIVE_POLL_INTERVAL = 1e3 const BACKGROUND_POLL_INTERVAL = 10e3 +// TODO temporary +let DEBUG_ACTIVE_CHAT: string | undefined + export function isConvoItemMessage( item: ConvoItem, ): item is ConvoItem & {type: 'message'} { @@ -175,14 +200,16 @@ export function isConvoItemMessage( } export class Convo { + private id: string + private agent: BskyAgent private __tempFromUserDid: string - private pollInterval = ACTIVE_POLL_INTERVAL private status: ConvoStatus = ConvoStatus.Uninitialized + private pollInterval = ACTIVE_POLL_INTERVAL private error: | { - code: ConvoError + code: ConvoErrorCode exception?: Error retry: () => void } @@ -190,7 +217,6 @@ export class Convo { private historyCursor: string | undefined | null = undefined private isFetchingHistory = false private eventsCursor: string | undefined = undefined - private pollingFailure = false private pastMessages: Map< string, @@ -208,8 +234,9 @@ export class Convo { private footerItems: Map = new Map() private headerItems: Map = new Map() - private pendingEventIngestion: Promise | undefined private isProcessingPendingMessages = false + private pendingPoll: Promise | undefined + private nextPoll: NodeJS.Timeout | undefined convoId: string convo: ChatBskyConvoDefs.ConvoView | undefined @@ -218,6 +245,7 @@ export class Convo { snapshot: ConvoState | undefined constructor(params: ConvoParams) { + this.id = nanoid(3) this.convoId = params.convoId this.agent = params.agent this.__tempFromUserDid = params.__tempFromUserDid @@ -227,6 +255,14 @@ export class Convo { this.sendMessage = this.sendMessage.bind(this) this.deleteMessage = this.deleteMessage.bind(this) this.fetchMessageHistory = this.fetchMessageHistory.bind(this) + + if (DEBUG_ACTIVE_CHAT) { + logger.error(`Convo: another chat was already active`, { + convoId: this.convoId, + }) + } else { + DEBUG_ACTIVE_CHAT = this.convoId + } } private commit() { @@ -271,7 +307,6 @@ export class Convo { } case ConvoStatus.Suspended: case ConvoStatus.Backgrounded: - case ConvoStatus.Resuming: case ConvoStatus.Ready: { return { status: this.status, @@ -317,122 +352,309 @@ export class Convo { } } - async init() { - logger.debug('Convo: init', {}, logger.DebugContext.convo) + dispatch(action: ConvoDispatch) { + const prevStatus = this.status - if ( - this.status === ConvoStatus.Uninitialized || - this.status === ConvoStatus.Error - ) { - try { - this.status = ConvoStatus.Initializing - this.commit() - - await this.refreshConvo() - this.status = ConvoStatus.Ready - this.commit() - - await this.fetchMessageHistory() - - this.pollEvents() - } catch (e: any) { - logger.error('Convo: failed to init') - this.error = { - exception: e, - code: ConvoError.InitFailed, - retry: () => { - this.error = undefined - this.init() - }, + switch (this.status) { + case ConvoStatus.Uninitialized: { + switch (action.event) { + case ConvoDispatchEvent.Init: { + this.status = ConvoStatus.Initializing + this.setup() + break + } } - this.status = ConvoStatus.Error - this.commit() + break } - } else { - logger.warn(`Convo: cannot init from ${this.status}`) + case ConvoStatus.Initializing: { + switch (action.event) { + case ConvoDispatchEvent.Ready: { + this.status = ConvoStatus.Ready + this.pollInterval = ACTIVE_POLL_INTERVAL + this.fetchMessageHistory().then(() => { + this.restartPoll() + }) + break + } + case ConvoDispatchEvent.Background: { + this.status = ConvoStatus.Backgrounded + this.pollInterval = BACKGROUND_POLL_INTERVAL + this.fetchMessageHistory().then(() => { + this.restartPoll() + }) + break + } + case ConvoDispatchEvent.Suspend: { + this.status = ConvoStatus.Suspended + break + } + case ConvoDispatchEvent.Error: { + this.status = ConvoStatus.Error + this.error = action.payload + break + } + } + break + } + case ConvoStatus.Ready: { + switch (action.event) { + case ConvoDispatchEvent.Resume: { + this.refreshConvo() + this.restartPoll() + break + } + case ConvoDispatchEvent.Background: { + this.status = ConvoStatus.Backgrounded + this.pollInterval = BACKGROUND_POLL_INTERVAL + this.restartPoll() + break + } + case ConvoDispatchEvent.Suspend: { + this.status = ConvoStatus.Suspended + this.cancelNextPoll() + break + } + case ConvoDispatchEvent.Error: { + this.status = ConvoStatus.Error + this.error = action.payload + this.cancelNextPoll() + break + } + } + break + } + case ConvoStatus.Backgrounded: { + switch (action.event) { + case ConvoDispatchEvent.Resume: { + this.status = ConvoStatus.Ready + this.pollInterval = ACTIVE_POLL_INTERVAL + this.refreshConvo() + // TODO truncate history if needed + this.restartPoll() + break + } + case ConvoDispatchEvent.Suspend: { + this.status = ConvoStatus.Suspended + this.cancelNextPoll() + break + } + case ConvoDispatchEvent.Error: { + this.status = ConvoStatus.Error + this.error = action.payload + this.cancelNextPoll() + break + } + } + break + } + case ConvoStatus.Suspended: { + switch (action.event) { + case ConvoDispatchEvent.Init: { + this.status = ConvoStatus.Ready + this.pollInterval = ACTIVE_POLL_INTERVAL + this.refreshConvo() + // TODO truncate history if needed + this.restartPoll() + break + } + case ConvoDispatchEvent.Resume: { + this.status = ConvoStatus.Ready + this.pollInterval = ACTIVE_POLL_INTERVAL + this.refreshConvo() + this.restartPoll() + break + } + case ConvoDispatchEvent.Error: { + this.status = ConvoStatus.Error + this.error = action.payload + break + } + } + break + } + case ConvoStatus.Error: { + switch (action.event) { + case ConvoDispatchEvent.Init: { + this.reset() + break + } + case ConvoDispatchEvent.Resume: { + this.reset() + break + } + case ConvoDispatchEvent.Suspend: { + this.status = ConvoStatus.Suspended + break + } + case ConvoDispatchEvent.Error: { + this.status = ConvoStatus.Error + this.error = action.payload + break + } + } + break + } + default: + break } + + logger.debug( + `Convo: dispatch '${action.event}'`, + { + id: this.id, + prev: prevStatus, + next: this.status, + }, + logger.DebugContext.convo, + ) + + this.commit() } - async resume() { - logger.debug('Convo: resume', {}, logger.DebugContext.convo) + private reset() { + this.convo = undefined + this.sender = undefined + this.recipients = undefined + this.snapshot = undefined - if ( - this.status === ConvoStatus.Suspended || - this.status === ConvoStatus.Backgrounded - ) { - const fromStatus = this.status + this.status = ConvoStatus.Uninitialized + this.error = undefined + this.historyCursor = undefined + this.eventsCursor = undefined - try { - this.status = ConvoStatus.Resuming - this.commit() + this.pastMessages = new Map() + this.newMessages = new Map() + this.pendingMessages = new Map() + this.deletedMessages = new Set() + this.footerItems = new Map() + this.headerItems = new Map() - await this.refreshConvo() - this.status = ConvoStatus.Ready - this.commit() + this.dispatch({event: ConvoDispatchEvent.Init}) + } - // throw new Error('UNCOMMENT TO TEST RESUME FAILURE') + private async setup() { + try { + const {convo, sender, recipients} = await this.fetchConvo() - this.pollInterval = ACTIVE_POLL_INTERVAL - this.pollEvents() - } catch (e) { - logger.error('Convo: failed to resume') + this.convo = convo + this.sender = sender + this.recipients = recipients - this.footerItems.set(ConvoItemError.ResumeFailed, { - type: 'error-recoverable', - key: ConvoItemError.ResumeFailed, - code: ConvoItemError.ResumeFailed, + /* + * Some validation prior to `Ready` status + */ + if (!this.convo) { + throw new Error('Convo: could not find convo') + } + if (!this.sender) { + throw new Error('Convo: could not find sender in convo') + } + if (!this.recipients) { + throw new Error('Convo: could not find recipients in convo') + } + + // await new Promise(y => setTimeout(y, 2000)) + // throw new Error('UNCOMMENT TO TEST INIT FAILURE') + this.dispatch({event: ConvoDispatchEvent.Ready}) + } catch (e: any) { + logger.error('Convo: setup() failed') + + this.dispatch({ + event: ConvoDispatchEvent.Error, + payload: { + exception: e, + code: ConvoErrorCode.InitFailed, retry: () => { - this.footerItems.delete(ConvoItemError.ResumeFailed) - this.resume() + this.reset() }, - }) - - this.status = fromStatus - this.commit() - } - } else { - logger.warn(`Convo: cannot resume from ${this.status}`) + }, + }) } } - async background() { - logger.debug('Convo: backgrounded', {}, logger.DebugContext.convo) - this.status = ConvoStatus.Backgrounded - this.pollInterval = BACKGROUND_POLL_INTERVAL - this.commit() + init() { + this.dispatch({event: ConvoDispatchEvent.Init}) } - async suspend() { - logger.debug('Convo: suspended', {}, logger.DebugContext.convo) - this.status = ConvoStatus.Suspended - this.commit() + resume() { + this.dispatch({event: ConvoDispatchEvent.Resume}) + } + + background() { + this.dispatch({event: ConvoDispatchEvent.Background}) + } + + suspend() { + this.dispatch({event: ConvoDispatchEvent.Suspend}) + DEBUG_ACTIVE_CHAT = undefined + } + + private pendingFetchConvo: + | Promise<{ + convo: ChatBskyConvoDefs.ConvoView + sender: AppBskyActorDefs.ProfileViewBasic | undefined + recipients: AppBskyActorDefs.ProfileViewBasic[] + }> + | undefined + async fetchConvo() { + if (this.pendingFetchConvo) return this.pendingFetchConvo + + this.pendingFetchConvo = new Promise<{ + convo: ChatBskyConvoDefs.ConvoView + sender: AppBskyActorDefs.ProfileViewBasic | undefined + recipients: AppBskyActorDefs.ProfileViewBasic[] + }>(async (resolve, reject) => { + try { + const response = await this.agent.api.chat.bsky.convo.getConvo( + { + convoId: this.convoId, + }, + { + headers: { + Authorization: this.__tempFromUserDid, + }, + }, + ) + + const convo = response.data.convo + + resolve({ + convo, + sender: convo.members.find(m => m.did === this.__tempFromUserDid), + recipients: convo.members.filter( + m => m.did !== this.__tempFromUserDid, + ), + }) + } catch (e) { + reject(e) + } finally { + this.pendingFetchConvo = undefined + } + }) + + return this.pendingFetchConvo } async refreshConvo() { - const response = await this.agent.api.chat.bsky.convo.getConvo( - { - convoId: this.convoId, - }, - { - headers: { - Authorization: this.__tempFromUserDid, - }, - }, - ) - this.convo = response.data.convo - this.sender = this.convo.members.find(m => m.did === this.__tempFromUserDid) - this.recipients = this.convo.members.filter( - m => m.did !== this.__tempFromUserDid, - ) + try { + const {convo, sender, recipients} = await this.fetchConvo() + // throw new Error('UNCOMMENT TO TEST REFRESH FAILURE') + this.convo = convo || this.convo + this.sender = sender || this.sender + this.recipients = recipients || this.recipients + } catch (e: any) { + logger.error(`Convo: failed to refresh convo`) - /* - * Prevent invalid states - */ - if (!this.sender) { - throw new Error('Convo: could not find sender in convo') - } - if (!this.recipients) { - throw new Error('Convo: could not find recipients in convo') + this.footerItems.set(ConvoItemError.Network, { + type: 'error-recoverable', + key: ConvoItemError.Network, + code: ConvoItemError.Network, + retry: () => { + this.footerItems.delete(ConvoItemError.Network) + this.resume() + }, + }) + this.commit() } } @@ -517,116 +739,142 @@ export class Convo { } } - private async pollEvents() { - if ( - this.status === ConvoStatus.Ready || - this.status === ConvoStatus.Backgrounded - ) { - if (this.pendingEventIngestion) return - - /* - * Represents a failed state, which is retryable. - */ - if (this.pollingFailure) return - - setTimeout(async () => { - this.pendingEventIngestion = this.ingestLatestEvents() - await this.pendingEventIngestion - this.pendingEventIngestion = undefined - this.pollEvents() - }, this.pollInterval) - } + private restartPoll() { + this.cancelNextPoll() + this.pollLatestEvents() } - async ingestLatestEvents() { + private cancelNextPoll() { + if (this.nextPoll) clearTimeout(this.nextPoll) + } + + private pollLatestEvents() { + /* + * Uncomment to view poll events + */ + logger.debug('Convo: poll events', {id: this.id}, logger.DebugContext.convo) + try { - // throw new Error('UNCOMMENT TO TEST POLL FAILURE') - const response = await this.agent.api.chat.bsky.convo.getLog( - { - cursor: this.eventsCursor, - }, - { - headers: { - Authorization: this.__tempFromUserDid, - }, - }, - ) - const {logs} = response.data - - let needsCommit = false - - for (const log of logs) { - /* - * If there's a rev, we should handle it. If there's not a rev, we don't - * know what it is. - */ - if (typeof log.rev === 'string') { - /* - * We only care about new events - */ - if (log.rev > (this.eventsCursor = this.eventsCursor || log.rev)) { - /* - * Update rev regardless of if it's a log type we care about or not - */ - this.eventsCursor = log.rev - - /* - * This is VERY important. We don't want to insert any messages from - * your other chats. - */ - if (log.convoId !== this.convoId) continue - - if ( - ChatBskyConvoDefs.isLogCreateMessage(log) && - ChatBskyConvoDefs.isMessageView(log.message) - ) { - if (this.newMessages.has(log.message.id)) { - // Trust the log as the source of truth on ordering - this.newMessages.delete(log.message.id) - } - this.newMessages.set(log.message.id, log.message) - needsCommit = true - } else if ( - ChatBskyConvoDefs.isLogDeleteMessage(log) && - ChatBskyConvoDefs.isDeletedMessageView(log.message) - ) { - /* - * Update if we have this in state. If we don't, don't worry about it. - */ - if (this.pastMessages.has(log.message.id)) { - /* - * For now, we remove deleted messages from the thread, if we receive one. - * - * To support them, it'd look something like this: - * this.pastMessages.set(log.message.id, log.message) - */ - this.pastMessages.delete(log.message.id) - this.newMessages.delete(log.message.id) - this.deletedMessages.delete(log.message.id) - needsCommit = true - } - } - } - } - } - - if (needsCommit) { - this.commit() - } + this.fetchLatestEvents().then(({events}) => { + this.applyLatestEvents(events) + }) + this.nextPoll = setTimeout(() => { + this.pollLatestEvents() + }, this.pollInterval) } catch (e: any) { - logger.error('Convo: failed to poll events') - this.pollingFailure = true + logger.error('Convo: poll events failed') + + this.cancelNextPoll() + this.footerItems.set(ConvoItemError.PollFailed, { type: 'error-recoverable', key: ConvoItemError.PollFailed, code: ConvoItemError.PollFailed, retry: () => { this.footerItems.delete(ConvoItemError.PollFailed) - this.pollingFailure = false this.commit() - this.pollEvents() + this.pollLatestEvents() }, }) + + this.commit() + } + } + + private pendingFetchLatestEvents: + | Promise<{ + events: ChatBskyConvoGetLog.OutputSchema['logs'] + }> + | undefined + async fetchLatestEvents() { + if (this.pendingFetchLatestEvents) return this.pendingFetchLatestEvents + + this.pendingFetchLatestEvents = new Promise<{ + events: ChatBskyConvoGetLog.OutputSchema['logs'] + }>(async (resolve, reject) => { + try { + // throw new Error('UNCOMMENT TO TEST POLL FAILURE') + const response = await this.agent.api.chat.bsky.convo.getLog( + { + cursor: this.eventsCursor, + }, + { + headers: { + Authorization: this.__tempFromUserDid, + }, + }, + ) + const {logs} = response.data + resolve({events: logs}) + } catch (e) { + reject(e) + } finally { + this.pendingFetchLatestEvents = undefined + } + }) + + return this.pendingFetchLatestEvents + } + + private applyLatestEvents(events: ChatBskyConvoGetLog.OutputSchema['logs']) { + let needsCommit = false + + for (const ev of events) { + /* + * If there's a rev, we should handle it. If there's not a rev, we don't + * know what it is. + */ + if (typeof ev.rev === 'string') { + /* + * We only care about new events + */ + if (ev.rev > (this.eventsCursor = this.eventsCursor || ev.rev)) { + /* + * Update rev regardless of if it's a ev type we care about or not + */ + this.eventsCursor = ev.rev + + /* + * This is VERY important. We don't want to insert any messages from + * your other chats. + */ + if (ev.convoId !== this.convoId) continue + + if ( + ChatBskyConvoDefs.isLogCreateMessage(ev) && + ChatBskyConvoDefs.isMessageView(ev.message) + ) { + if (this.newMessages.has(ev.message.id)) { + // Trust the ev as the source of truth on ordering + this.newMessages.delete(ev.message.id) + } + this.newMessages.set(ev.message.id, ev.message) + needsCommit = true + } else if ( + ChatBskyConvoDefs.isLogDeleteMessage(ev) && + ChatBskyConvoDefs.isDeletedMessageView(ev.message) + ) { + /* + * Update if we have this in state. If we don't, don't worry about it. + */ + if (this.pastMessages.has(ev.message.id)) { + /* + * For now, we remove deleted messages from the thread, if we receive one. + * + * To support them, it'd look something like this: + * this.pastMessages.set(ev.message.id, ev.message) + */ + this.pastMessages.delete(ev.message.id) + this.newMessages.delete(ev.message.id) + this.deletedMessages.delete(ev.message.id) + needsCommit = true + } + } + } + } + } + + if (needsCommit) { this.commit() } } diff --git a/src/state/messages/index.tsx b/src/state/messages/index.tsx index 22c4242e..95ebf0af 100644 --- a/src/state/messages/index.tsx +++ b/src/state/messages/index.tsx @@ -1,6 +1,7 @@ import React, {useContext, useState, useSyncExternalStore} from 'react' +import {AppState} from 'react-native' import {BskyAgent} from '@atproto-labs/api' -import {useFocusEffect} from '@react-navigation/native' +import {useFocusEffect, useIsFocused} from '@react-navigation/native' import {Convo, ConvoParams, ConvoState} from '#/state/messages/convo' import {useAgent} from '#/state/session' @@ -20,6 +21,7 @@ export function ChatProvider({ children, convoId, }: Pick & {children: React.ReactNode}) { + const isScreenFocused = useIsFocused() const {serviceUrl} = useDmServiceUrlStorage() const {getAgent} = useAgent() const [convo] = useState( @@ -44,5 +46,23 @@ export function ChatProvider({ }, [convo]), ) + React.useEffect(() => { + const handleAppStateChange = (nextAppState: string) => { + if (isScreenFocused) { + if (nextAppState === 'active') { + convo.resume() + } else { + convo.background() + } + } + } + + const sub = AppState.addEventListener('change', handleAppStateChange) + + return () => { + sub.remove() + } + }, [convo, isScreenFocused]) + return {children} }