[🐴] Error recovery (#4036)
* Handle block state when sending messages * Handle different pending failures * Use existing profile data to handle blocks * Better cleanup, leave room for more * Attempt recover upon next send * Reset pending failure * Capture unexpected error * Gracefully handle network errors and recovery * Re-align error components and types * Include history fetching in recoverable states
This commit is contained in:
parent
dff6bd7c65
commit
4bceabc21c
7 changed files with 216 additions and 113 deletions
|
@ -5,6 +5,8 @@ import {
|
|||
ChatBskyConvoGetLog,
|
||||
ChatBskyConvoSendMessage,
|
||||
} from '@atproto/api'
|
||||
import {XRPCError} from '@atproto/xrpc'
|
||||
import EventEmitter from 'eventemitter3'
|
||||
import {nanoid} from 'nanoid/non-secure'
|
||||
|
||||
import {networkRetry} from '#/lib/async/retry'
|
||||
|
@ -14,11 +16,14 @@ import {
|
|||
ACTIVE_POLL_INTERVAL,
|
||||
BACKGROUND_POLL_INTERVAL,
|
||||
INACTIVE_TIMEOUT,
|
||||
NETWORK_FAILURE_STATUSES,
|
||||
} from '#/state/messages/convo/const'
|
||||
import {
|
||||
ConvoDispatch,
|
||||
ConvoDispatchEvent,
|
||||
ConvoError,
|
||||
ConvoErrorCode,
|
||||
ConvoEvent,
|
||||
ConvoItem,
|
||||
ConvoItemError,
|
||||
ConvoParams,
|
||||
|
@ -51,13 +56,7 @@ export class Convo {
|
|||
private senderUserDid: string
|
||||
|
||||
private status: ConvoStatus = ConvoStatus.Uninitialized
|
||||
private error:
|
||||
| {
|
||||
code: ConvoErrorCode
|
||||
exception?: Error
|
||||
retry: () => void
|
||||
}
|
||||
| undefined
|
||||
private error: ConvoError | undefined
|
||||
private oldestRev: string | undefined | null = undefined
|
||||
private isFetchingHistory = false
|
||||
private latestRev: string | undefined = undefined
|
||||
|
@ -75,13 +74,13 @@ export class Convo {
|
|||
{id: string; message: ChatBskyConvoSendMessage.InputSchema['message']}
|
||||
> = new Map()
|
||||
private deletedMessages: Set<string> = new Set()
|
||||
private footerItems: Map<string, ConvoItem> = new Map()
|
||||
private headerItems: Map<string, ConvoItem> = new Map()
|
||||
|
||||
private isProcessingPendingMessages = false
|
||||
|
||||
private lastActiveTimestamp: number | undefined
|
||||
|
||||
private emitter = new EventEmitter<{event: [ConvoEvent]}>()
|
||||
|
||||
convoId: string
|
||||
convo: ChatBskyConvoDefs.ConvoView | undefined
|
||||
sender: AppBskyActorDefs.ProfileViewBasic | undefined
|
||||
|
@ -174,7 +173,7 @@ export class Convo {
|
|||
status: ConvoStatus.Error,
|
||||
items: [],
|
||||
convo: undefined,
|
||||
error: this.error,
|
||||
error: this.error!,
|
||||
sender: undefined,
|
||||
recipients: undefined,
|
||||
isFetchingHistory: false,
|
||||
|
@ -282,6 +281,7 @@ export class Convo {
|
|||
if (this.convo) {
|
||||
this.status = ConvoStatus.Ready
|
||||
this.refreshConvo()
|
||||
this.maybeRecoverFromNetworkError()
|
||||
} else {
|
||||
this.status = ConvoStatus.Initializing
|
||||
this.setup()
|
||||
|
@ -379,12 +379,30 @@ export class Convo {
|
|||
this.newMessages = new Map()
|
||||
this.pendingMessages = new Map()
|
||||
this.deletedMessages = new Set()
|
||||
this.footerItems = new Map()
|
||||
this.headerItems = new Map()
|
||||
|
||||
this.pendingMessageFailure = null
|
||||
this.fetchMessageHistoryError = undefined
|
||||
this.firehoseError = undefined
|
||||
|
||||
this.dispatch({event: ConvoDispatchEvent.Init})
|
||||
}
|
||||
|
||||
maybeRecoverFromNetworkError() {
|
||||
if (this.firehoseError) {
|
||||
this.firehoseError.retry()
|
||||
this.firehoseError = undefined
|
||||
this.commit()
|
||||
} else {
|
||||
this.batchRetryPendingMessages()
|
||||
}
|
||||
|
||||
if (this.fetchMessageHistoryError) {
|
||||
this.fetchMessageHistoryError.retry()
|
||||
this.fetchMessageHistoryError = undefined
|
||||
this.commit()
|
||||
}
|
||||
}
|
||||
|
||||
private async setup() {
|
||||
try {
|
||||
const {convo, sender, recipients} = await this.fetchConvo()
|
||||
|
@ -520,6 +538,11 @@ export class Convo {
|
|||
}
|
||||
}
|
||||
|
||||
private fetchMessageHistoryError:
|
||||
| {
|
||||
retry: () => void
|
||||
}
|
||||
| undefined
|
||||
async fetchMessageHistory() {
|
||||
logger.debug('Convo: fetch message history', {}, logger.DebugContext.convo)
|
||||
|
||||
|
@ -537,7 +560,7 @@ export class Convo {
|
|||
* If we've rendered a retry state for history fetching, exit. Upon retry,
|
||||
* this will be removed and we'll try again.
|
||||
*/
|
||||
if (this.headerItems.has(ConvoItemError.HistoryFailed)) return
|
||||
if (this.fetchMessageHistoryError) return
|
||||
|
||||
try {
|
||||
this.isFetchingHistory = true
|
||||
|
@ -586,15 +609,11 @@ export class Convo {
|
|||
} catch (e: any) {
|
||||
logger.error('Convo: failed to fetch message history')
|
||||
|
||||
this.headerItems.set(ConvoItemError.HistoryFailed, {
|
||||
type: 'error-recoverable',
|
||||
key: ConvoItemError.HistoryFailed,
|
||||
code: ConvoItemError.HistoryFailed,
|
||||
this.fetchMessageHistoryError = {
|
||||
retry: () => {
|
||||
this.headerItems.delete(ConvoItemError.HistoryFailed)
|
||||
this.fetchMessageHistory()
|
||||
},
|
||||
})
|
||||
}
|
||||
} finally {
|
||||
this.isFetchingHistory = false
|
||||
this.commit()
|
||||
|
@ -628,22 +647,16 @@ export class Convo {
|
|||
)
|
||||
}
|
||||
|
||||
private firehoseError: MessagesEventBusError | undefined
|
||||
|
||||
onFirehoseConnect() {
|
||||
this.footerItems.delete(ConvoItemError.FirehoseFailed)
|
||||
this.firehoseError = undefined
|
||||
this.batchRetryPendingMessages()
|
||||
this.commit()
|
||||
}
|
||||
|
||||
onFirehoseError(error?: MessagesEventBusError) {
|
||||
this.footerItems.set(ConvoItemError.FirehoseFailed, {
|
||||
type: 'error-recoverable',
|
||||
key: ConvoItemError.FirehoseFailed,
|
||||
code: ConvoItemError.FirehoseFailed,
|
||||
retry: () => {
|
||||
this.footerItems.delete(ConvoItemError.FirehoseFailed)
|
||||
this.commit()
|
||||
error?.retry()
|
||||
},
|
||||
})
|
||||
this.firehoseError = error
|
||||
this.commit()
|
||||
}
|
||||
|
||||
|
@ -724,7 +737,7 @@ export class Convo {
|
|||
}
|
||||
}
|
||||
|
||||
private pendingFailed = false
|
||||
private pendingMessageFailure: 'recoverable' | 'unrecoverable' | null = null
|
||||
|
||||
async sendMessage(message: ChatBskyConvoSendMessage.InputSchema['message']) {
|
||||
// Ignore empty messages for now since they have no other purpose atm
|
||||
|
@ -734,13 +747,14 @@ export class Convo {
|
|||
|
||||
const tempId = nanoid()
|
||||
|
||||
this.pendingMessageFailure = null
|
||||
this.pendingMessages.set(tempId, {
|
||||
id: tempId,
|
||||
message,
|
||||
})
|
||||
this.commit()
|
||||
|
||||
if (!this.isProcessingPendingMessages && !this.pendingFailed) {
|
||||
if (!this.isProcessingPendingMessages && !this.pendingMessageFailure) {
|
||||
this.processPendingMessages()
|
||||
}
|
||||
}
|
||||
|
@ -765,7 +779,6 @@ export class Convo {
|
|||
try {
|
||||
this.isProcessingPendingMessages = true
|
||||
|
||||
// throw new Error('UNCOMMENT TO TEST RETRY')
|
||||
const {id, message} = pendingMessage
|
||||
|
||||
const response = await networkRetry(2, () => {
|
||||
|
@ -794,23 +807,65 @@ export class Convo {
|
|||
this.commit()
|
||||
} catch (e: any) {
|
||||
logger.error(e, {context: `Convo: failed to send message`})
|
||||
this.pendingFailed = true
|
||||
this.commit()
|
||||
this.handleSendMessageFailure(e)
|
||||
} finally {
|
||||
this.isProcessingPendingMessages = false
|
||||
}
|
||||
}
|
||||
|
||||
private handleSendMessageFailure(e: any) {
|
||||
if (e instanceof XRPCError) {
|
||||
if (NETWORK_FAILURE_STATUSES.includes(e.status)) {
|
||||
this.pendingMessageFailure = 'recoverable'
|
||||
} else {
|
||||
switch (e.message) {
|
||||
case 'block between recipient and sender':
|
||||
this.pendingMessageFailure = 'unrecoverable'
|
||||
this.emitter.emit('event', {
|
||||
type: 'invalidate-block-state',
|
||||
accountDids: [
|
||||
this.sender!.did,
|
||||
...this.recipients!.map(r => r.did),
|
||||
],
|
||||
})
|
||||
break
|
||||
default:
|
||||
logger.warn(
|
||||
`Convo handleSendMessageFailure could not handle error`,
|
||||
{
|
||||
status: e.status,
|
||||
message: e.message,
|
||||
},
|
||||
)
|
||||
break
|
||||
}
|
||||
}
|
||||
} else {
|
||||
logger.error(e, {
|
||||
context: `Convo handleSendMessageFailure received unknown error`,
|
||||
})
|
||||
}
|
||||
|
||||
this.commit()
|
||||
}
|
||||
|
||||
async batchRetryPendingMessages() {
|
||||
if (this.pendingMessageFailure === null) return
|
||||
|
||||
const messageArray = Array.from(this.pendingMessages.values())
|
||||
if (messageArray.length === 0) return
|
||||
|
||||
this.pendingMessageFailure = null
|
||||
this.commit()
|
||||
|
||||
logger.debug(
|
||||
`Convo: retrying ${this.pendingMessages.size} pending messages`,
|
||||
`Convo: batch retrying ${this.pendingMessages.size} pending messages`,
|
||||
{},
|
||||
logger.DebugContext.convo,
|
||||
)
|
||||
|
||||
try {
|
||||
// throw new Error('UNCOMMENT TO TEST RETRY')
|
||||
const messageArray = Array.from(this.pendingMessages.values())
|
||||
const {data} = await networkRetry(2, () => {
|
||||
return this.agent.api.chat.bsky.convo.sendMessageBatch(
|
||||
{
|
||||
|
@ -848,8 +903,7 @@ export class Convo {
|
|||
)
|
||||
} catch (e: any) {
|
||||
logger.error(e, {context: `Convo: failed to batch retry messages`})
|
||||
this.pendingFailed = true
|
||||
this.commit()
|
||||
this.handleSendMessageFailure(e)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -877,6 +931,14 @@ export class Convo {
|
|||
}
|
||||
}
|
||||
|
||||
on(handler: (event: ConvoEvent) => void) {
|
||||
this.emitter.on('event', handler)
|
||||
|
||||
return () => {
|
||||
this.emitter.off('event', handler)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Items in reverse order, since FlatList inverts
|
||||
*/
|
||||
|
@ -901,9 +963,16 @@ export class Convo {
|
|||
}
|
||||
})
|
||||
|
||||
this.headerItems.forEach(item => {
|
||||
items.unshift(item)
|
||||
})
|
||||
if (this.fetchMessageHistoryError) {
|
||||
items.unshift({
|
||||
type: 'error',
|
||||
code: ConvoItemError.HistoryFailed,
|
||||
key: ConvoItemError.HistoryFailed,
|
||||
retry: () => {
|
||||
this.maybeRecoverFromNetworkError()
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
this.newMessages.forEach(m => {
|
||||
if (ChatBskyConvoDefs.isMessageView(m)) {
|
||||
|
@ -940,19 +1009,26 @@ export class Convo {
|
|||
sender: this.sender!,
|
||||
},
|
||||
nextMessage: null,
|
||||
retry: this.pendingFailed
|
||||
? () => {
|
||||
this.pendingFailed = false
|
||||
this.commit()
|
||||
this.batchRetryPendingMessages()
|
||||
}
|
||||
: undefined,
|
||||
failed: this.pendingMessageFailure !== null,
|
||||
retry:
|
||||
this.pendingMessageFailure === 'recoverable'
|
||||
? () => {
|
||||
this.maybeRecoverFromNetworkError()
|
||||
}
|
||||
: undefined,
|
||||
})
|
||||
})
|
||||
|
||||
this.footerItems.forEach(item => {
|
||||
items.push(item)
|
||||
})
|
||||
if (this.firehoseError) {
|
||||
items.push({
|
||||
type: 'error',
|
||||
code: ConvoItemError.FirehoseFailed,
|
||||
key: ConvoItemError.FirehoseFailed,
|
||||
retry: () => {
|
||||
this.firehoseError?.retry()
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
return items
|
||||
.filter(item => {
|
||||
|
|
|
@ -1,3 +1,7 @@
|
|||
export const ACTIVE_POLL_INTERVAL = 1e3
|
||||
export const BACKGROUND_POLL_INTERVAL = 5e3
|
||||
export const INACTIVE_TIMEOUT = 60e3 * 5
|
||||
|
||||
export const NETWORK_FAILURE_STATUSES = [
|
||||
1, 408, 425, 429, 500, 502, 503, 504, 522, 524,
|
||||
]
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import React, {useContext, useState, useSyncExternalStore} from 'react'
|
||||
import {AppState} from 'react-native'
|
||||
import {useFocusEffect, useIsFocused} from '@react-navigation/native'
|
||||
import {useQueryClient} from '@tanstack/react-query'
|
||||
|
||||
import {Convo} from '#/state/messages/convo/agent'
|
||||
import {
|
||||
|
@ -13,6 +14,8 @@ import {
|
|||
import {isConvoActive} from '#/state/messages/convo/util'
|
||||
import {useMessagesEventBus} from '#/state/messages/events'
|
||||
import {useMarkAsReadMutation} from '#/state/queries/messages/conversation'
|
||||
import {RQKEY as ListConvosQueryKey} from '#/state/queries/messages/list-converations'
|
||||
import {RQKEY as createProfileQueryKey} from '#/state/queries/profile'
|
||||
import {useAgent} from '#/state/session'
|
||||
|
||||
export * from '#/state/messages/convo/util'
|
||||
|
@ -52,6 +55,7 @@ export function ConvoProvider({
|
|||
children,
|
||||
convoId,
|
||||
}: Pick<ConvoParams, 'convoId'> & {children: React.ReactNode}) {
|
||||
const queryClient = useQueryClient()
|
||||
const isScreenFocused = useIsFocused()
|
||||
const {getAgent} = useAgent()
|
||||
const events = useMessagesEventBus()
|
||||
|
@ -78,6 +82,23 @@ export function ConvoProvider({
|
|||
}, [convo, convoId, markAsRead]),
|
||||
)
|
||||
|
||||
React.useEffect(() => {
|
||||
return convo.on(event => {
|
||||
switch (event.type) {
|
||||
case 'invalidate-block-state': {
|
||||
for (const did of event.accountDids) {
|
||||
queryClient.invalidateQueries({
|
||||
queryKey: createProfileQueryKey(did),
|
||||
})
|
||||
}
|
||||
queryClient.invalidateQueries({
|
||||
queryKey: ListConvosQueryKey,
|
||||
})
|
||||
}
|
||||
}
|
||||
})
|
||||
}, [convo, queryClient])
|
||||
|
||||
React.useEffect(() => {
|
||||
const handleAppStateChange = (nextAppState: string) => {
|
||||
if (isScreenFocused) {
|
||||
|
|
|
@ -23,10 +23,6 @@ export enum ConvoStatus {
|
|||
}
|
||||
|
||||
export enum ConvoItemError {
|
||||
/**
|
||||
* Generic error
|
||||
*/
|
||||
Network = 'network',
|
||||
/**
|
||||
* Error connecting to event firehose
|
||||
*/
|
||||
|
@ -95,6 +91,7 @@ export type ConvoItem =
|
|||
| ChatBskyConvoDefs.MessageView
|
||||
| ChatBskyConvoDefs.DeletedMessageView
|
||||
| null
|
||||
failed: boolean
|
||||
/**
|
||||
* Retry sending the message. If present, the message is in a failed state.
|
||||
*/
|
||||
|
@ -110,10 +107,13 @@ export type ConvoItem =
|
|||
| null
|
||||
}
|
||||
| {
|
||||
type: 'error-recoverable'
|
||||
type: 'error'
|
||||
key: string
|
||||
code: ConvoItemError
|
||||
retry: () => void
|
||||
/**
|
||||
* If present, error is recoverable.
|
||||
*/
|
||||
retry?: () => void
|
||||
}
|
||||
|
||||
type DeleteMessage = (messageId: string) => Promise<void>
|
||||
|
@ -186,7 +186,7 @@ export type ConvoStateError = {
|
|||
status: ConvoStatus.Error
|
||||
items: []
|
||||
convo: undefined
|
||||
error: any
|
||||
error: ConvoError
|
||||
sender: undefined
|
||||
recipients: undefined
|
||||
isFetchingHistory: false
|
||||
|
@ -201,3 +201,8 @@ export type ConvoState =
|
|||
| ConvoStateBackgrounded
|
||||
| ConvoStateSuspended
|
||||
| ConvoStateError
|
||||
|
||||
export type ConvoEvent = {
|
||||
type: 'invalidate-block-state'
|
||||
accountDids: string[]
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue