[🐴] State transitions (#3880)

* Handle init/resume/suspend/background and polling

* Add debug and temp guard

* Make state transitions sync

* Make init sync also

* Checkpoint: confusing but working state machine

* Reducer-esque

* Remove poll events

* Guard fetchConvo

(cherry picked from commit 8385579d31500bb4bfb60afeecdc1eb3ddd7e747)

* Clean up polling, make sync

(cherry picked from commit 7f75cd04c3bf81c94662785748698640a84bef51)

* Update history handling

(cherry picked from commit b82b552ba4040adf7ead2377541132a386964ff8)

* Check for screen focus in app state listener

* Get rid of ad-hoc status checks
zio/stable
Eric Bailey 2024-05-07 17:54:52 -05:00 committed by GitHub
parent 87cb4c105e
commit f78126e01a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 500 additions and 218 deletions

View File

@ -18,10 +18,10 @@ export function MessageListError({
const {_} = useLingui()
const message = React.useMemo(() => {
return {
[ConvoItemError.HistoryFailed]: _(msg`Failed to load past messages.`),
[ConvoItemError.ResumeFailed]: _(
[ConvoItemError.Network]: _(
msg`There was an issue connecting to the chat.`,
),
[ConvoItemError.HistoryFailed]: _(msg`Failed to load past messages.`),
[ConvoItemError.PollFailed]: _(
msg`This chat was disconnected due to a network error.`,
),

View File

@ -18,6 +18,7 @@ import {PreviewableUserAvatar} from 'view/com/util/UserAvatar'
import {CenteredView} from 'view/com/util/Views'
import {MessagesList} from '#/screens/Messages/Conversation/MessagesList'
import {atoms as a, useBreakpoints, useTheme} from '#/alf'
import {Button, ButtonText} from '#/components/Button'
import {ConvoMenu} from '#/components/dms/ConvoMenu'
import {ListMaybePlaceholder} from '#/components/Lists'
import {Text} from '#/components/Typography'
@ -51,8 +52,21 @@ function Inner() {
}
if (chat.status === ConvoStatus.Error) {
// TODO error
return null
// TODO
return (
<View>
<CenteredView style={{flex: 1}} sideBorders>
<Text>Something went wrong</Text>
<Button
label="Retry"
onPress={() => {
chat.error.retry()
}}>
<ButtonText>Retry</ButtonText>
</Button>
</CenteredView>
</View>
)
}
/*

View File

@ -2,6 +2,7 @@ import {AppBskyActorDefs} from '@atproto/api'
import {
BskyAgent,
ChatBskyConvoDefs,
ChatBskyConvoGetLog,
ChatBskyConvoSendMessage,
} from '@atproto-labs/api'
import {nanoid} from 'nanoid/non-secure'
@ -18,7 +19,6 @@ export type ConvoParams = {
export enum ConvoStatus {
Uninitialized = 'uninitialized',
Initializing = 'initializing',
Resuming = 'resuming',
Ready = 'ready',
Error = 'error',
Backgrounded = 'backgrounded',
@ -27,14 +27,50 @@ export enum ConvoStatus {
export enum ConvoItemError {
HistoryFailed = 'historyFailed',
ResumeFailed = 'resumeFailed',
PollFailed = 'pollFailed',
Network = 'network',
}
export enum ConvoError {
export enum ConvoErrorCode {
InitFailed = 'initFailed',
}
export type ConvoError = {
code: ConvoErrorCode
exception?: Error
retry: () => void
}
export enum ConvoDispatchEvent {
Init = 'init',
Ready = 'ready',
Resume = 'resume',
Background = 'background',
Suspend = 'suspend',
Error = 'error',
}
export type ConvoDispatch =
| {
event: ConvoDispatchEvent.Init
}
| {
event: ConvoDispatchEvent.Ready
}
| {
event: ConvoDispatchEvent.Resume
}
| {
event: ConvoDispatchEvent.Background
}
| {
event: ConvoDispatchEvent.Suspend
}
| {
event: ConvoDispatchEvent.Error
payload: ConvoError
}
export type ConvoItem =
| {
type: 'message' | 'pending-message'
@ -133,20 +169,6 @@ export type ConvoState =
) => Promise<void>
fetchMessageHistory: () => Promise<void>
}
| {
status: ConvoStatus.Resuming
items: ConvoItem[]
convo: ChatBskyConvoDefs.ConvoView
error: undefined
sender: AppBskyActorDefs.ProfileViewBasic
recipients: AppBskyActorDefs.ProfileViewBasic[]
isFetchingHistory: boolean
deleteMessage: (messageId: string) => Promise<void>
sendMessage: (
message: ChatBskyConvoSendMessage.InputSchema['message'],
) => Promise<void>
fetchMessageHistory: () => Promise<void>
}
| {
status: ConvoStatus.Error
items: []
@ -160,9 +182,12 @@ export type ConvoState =
fetchMessageHistory: undefined
}
const ACTIVE_POLL_INTERVAL = 2e3
const ACTIVE_POLL_INTERVAL = 1e3
const BACKGROUND_POLL_INTERVAL = 10e3
// TODO temporary
let DEBUG_ACTIVE_CHAT: string | undefined
export function isConvoItemMessage(
item: ConvoItem,
): item is ConvoItem & {type: 'message'} {
@ -175,14 +200,16 @@ export function isConvoItemMessage(
}
export class Convo {
private id: string
private agent: BskyAgent
private __tempFromUserDid: string
private pollInterval = ACTIVE_POLL_INTERVAL
private status: ConvoStatus = ConvoStatus.Uninitialized
private pollInterval = ACTIVE_POLL_INTERVAL
private error:
| {
code: ConvoError
code: ConvoErrorCode
exception?: Error
retry: () => void
}
@ -190,7 +217,6 @@ export class Convo {
private historyCursor: string | undefined | null = undefined
private isFetchingHistory = false
private eventsCursor: string | undefined = undefined
private pollingFailure = false
private pastMessages: Map<
string,
@ -208,8 +234,9 @@ export class Convo {
private footerItems: Map<string, ConvoItem> = new Map()
private headerItems: Map<string, ConvoItem> = new Map()
private pendingEventIngestion: Promise<void> | undefined
private isProcessingPendingMessages = false
private pendingPoll: Promise<void> | undefined
private nextPoll: NodeJS.Timeout | undefined
convoId: string
convo: ChatBskyConvoDefs.ConvoView | undefined
@ -218,6 +245,7 @@ export class Convo {
snapshot: ConvoState | undefined
constructor(params: ConvoParams) {
this.id = nanoid(3)
this.convoId = params.convoId
this.agent = params.agent
this.__tempFromUserDid = params.__tempFromUserDid
@ -227,6 +255,14 @@ export class Convo {
this.sendMessage = this.sendMessage.bind(this)
this.deleteMessage = this.deleteMessage.bind(this)
this.fetchMessageHistory = this.fetchMessageHistory.bind(this)
if (DEBUG_ACTIVE_CHAT) {
logger.error(`Convo: another chat was already active`, {
convoId: this.convoId,
})
} else {
DEBUG_ACTIVE_CHAT = this.convoId
}
}
private commit() {
@ -271,7 +307,6 @@ export class Convo {
}
case ConvoStatus.Suspended:
case ConvoStatus.Backgrounded:
case ConvoStatus.Resuming:
case ConvoStatus.Ready: {
return {
status: this.status,
@ -317,122 +352,309 @@ export class Convo {
}
}
async init() {
logger.debug('Convo: init', {}, logger.DebugContext.convo)
dispatch(action: ConvoDispatch) {
const prevStatus = this.status
if (
this.status === ConvoStatus.Uninitialized ||
this.status === ConvoStatus.Error
) {
try {
this.status = ConvoStatus.Initializing
this.commit()
await this.refreshConvo()
this.status = ConvoStatus.Ready
this.commit()
await this.fetchMessageHistory()
this.pollEvents()
} catch (e: any) {
logger.error('Convo: failed to init')
this.error = {
exception: e,
code: ConvoError.InitFailed,
retry: () => {
this.error = undefined
this.init()
},
switch (this.status) {
case ConvoStatus.Uninitialized: {
switch (action.event) {
case ConvoDispatchEvent.Init: {
this.status = ConvoStatus.Initializing
this.setup()
break
}
}
this.status = ConvoStatus.Error
this.commit()
break
}
} else {
logger.warn(`Convo: cannot init from ${this.status}`)
case ConvoStatus.Initializing: {
switch (action.event) {
case ConvoDispatchEvent.Ready: {
this.status = ConvoStatus.Ready
this.pollInterval = ACTIVE_POLL_INTERVAL
this.fetchMessageHistory().then(() => {
this.restartPoll()
})
break
}
case ConvoDispatchEvent.Background: {
this.status = ConvoStatus.Backgrounded
this.pollInterval = BACKGROUND_POLL_INTERVAL
this.fetchMessageHistory().then(() => {
this.restartPoll()
})
break
}
case ConvoDispatchEvent.Suspend: {
this.status = ConvoStatus.Suspended
break
}
case ConvoDispatchEvent.Error: {
this.status = ConvoStatus.Error
this.error = action.payload
break
}
}
break
}
case ConvoStatus.Ready: {
switch (action.event) {
case ConvoDispatchEvent.Resume: {
this.refreshConvo()
this.restartPoll()
break
}
case ConvoDispatchEvent.Background: {
this.status = ConvoStatus.Backgrounded
this.pollInterval = BACKGROUND_POLL_INTERVAL
this.restartPoll()
break
}
case ConvoDispatchEvent.Suspend: {
this.status = ConvoStatus.Suspended
this.cancelNextPoll()
break
}
case ConvoDispatchEvent.Error: {
this.status = ConvoStatus.Error
this.error = action.payload
this.cancelNextPoll()
break
}
}
break
}
case ConvoStatus.Backgrounded: {
switch (action.event) {
case ConvoDispatchEvent.Resume: {
this.status = ConvoStatus.Ready
this.pollInterval = ACTIVE_POLL_INTERVAL
this.refreshConvo()
// TODO truncate history if needed
this.restartPoll()
break
}
case ConvoDispatchEvent.Suspend: {
this.status = ConvoStatus.Suspended
this.cancelNextPoll()
break
}
case ConvoDispatchEvent.Error: {
this.status = ConvoStatus.Error
this.error = action.payload
this.cancelNextPoll()
break
}
}
break
}
case ConvoStatus.Suspended: {
switch (action.event) {
case ConvoDispatchEvent.Init: {
this.status = ConvoStatus.Ready
this.pollInterval = ACTIVE_POLL_INTERVAL
this.refreshConvo()
// TODO truncate history if needed
this.restartPoll()
break
}
case ConvoDispatchEvent.Resume: {
this.status = ConvoStatus.Ready
this.pollInterval = ACTIVE_POLL_INTERVAL
this.refreshConvo()
this.restartPoll()
break
}
case ConvoDispatchEvent.Error: {
this.status = ConvoStatus.Error
this.error = action.payload
break
}
}
break
}
case ConvoStatus.Error: {
switch (action.event) {
case ConvoDispatchEvent.Init: {
this.reset()
break
}
case ConvoDispatchEvent.Resume: {
this.reset()
break
}
case ConvoDispatchEvent.Suspend: {
this.status = ConvoStatus.Suspended
break
}
case ConvoDispatchEvent.Error: {
this.status = ConvoStatus.Error
this.error = action.payload
break
}
}
break
}
default:
break
}
logger.debug(
`Convo: dispatch '${action.event}'`,
{
id: this.id,
prev: prevStatus,
next: this.status,
},
logger.DebugContext.convo,
)
this.commit()
}
async resume() {
logger.debug('Convo: resume', {}, logger.DebugContext.convo)
private reset() {
this.convo = undefined
this.sender = undefined
this.recipients = undefined
this.snapshot = undefined
if (
this.status === ConvoStatus.Suspended ||
this.status === ConvoStatus.Backgrounded
) {
const fromStatus = this.status
this.status = ConvoStatus.Uninitialized
this.error = undefined
this.historyCursor = undefined
this.eventsCursor = undefined
try {
this.status = ConvoStatus.Resuming
this.commit()
this.pastMessages = new Map()
this.newMessages = new Map()
this.pendingMessages = new Map()
this.deletedMessages = new Set()
this.footerItems = new Map()
this.headerItems = new Map()
await this.refreshConvo()
this.status = ConvoStatus.Ready
this.commit()
this.dispatch({event: ConvoDispatchEvent.Init})
}
// throw new Error('UNCOMMENT TO TEST RESUME FAILURE')
private async setup() {
try {
const {convo, sender, recipients} = await this.fetchConvo()
this.pollInterval = ACTIVE_POLL_INTERVAL
this.pollEvents()
} catch (e) {
logger.error('Convo: failed to resume')
this.convo = convo
this.sender = sender
this.recipients = recipients
this.footerItems.set(ConvoItemError.ResumeFailed, {
type: 'error-recoverable',
key: ConvoItemError.ResumeFailed,
code: ConvoItemError.ResumeFailed,
/*
* Some validation prior to `Ready` status
*/
if (!this.convo) {
throw new Error('Convo: could not find convo')
}
if (!this.sender) {
throw new Error('Convo: could not find sender in convo')
}
if (!this.recipients) {
throw new Error('Convo: could not find recipients in convo')
}
// await new Promise(y => setTimeout(y, 2000))
// throw new Error('UNCOMMENT TO TEST INIT FAILURE')
this.dispatch({event: ConvoDispatchEvent.Ready})
} catch (e: any) {
logger.error('Convo: setup() failed')
this.dispatch({
event: ConvoDispatchEvent.Error,
payload: {
exception: e,
code: ConvoErrorCode.InitFailed,
retry: () => {
this.footerItems.delete(ConvoItemError.ResumeFailed)
this.resume()
this.reset()
},
})
this.status = fromStatus
this.commit()
}
} else {
logger.warn(`Convo: cannot resume from ${this.status}`)
},
})
}
}
async background() {
logger.debug('Convo: backgrounded', {}, logger.DebugContext.convo)
this.status = ConvoStatus.Backgrounded
this.pollInterval = BACKGROUND_POLL_INTERVAL
this.commit()
init() {
this.dispatch({event: ConvoDispatchEvent.Init})
}
async suspend() {
logger.debug('Convo: suspended', {}, logger.DebugContext.convo)
this.status = ConvoStatus.Suspended
this.commit()
resume() {
this.dispatch({event: ConvoDispatchEvent.Resume})
}
background() {
this.dispatch({event: ConvoDispatchEvent.Background})
}
suspend() {
this.dispatch({event: ConvoDispatchEvent.Suspend})
DEBUG_ACTIVE_CHAT = undefined
}
private pendingFetchConvo:
| Promise<{
convo: ChatBskyConvoDefs.ConvoView
sender: AppBskyActorDefs.ProfileViewBasic | undefined
recipients: AppBskyActorDefs.ProfileViewBasic[]
}>
| undefined
async fetchConvo() {
if (this.pendingFetchConvo) return this.pendingFetchConvo
this.pendingFetchConvo = new Promise<{
convo: ChatBskyConvoDefs.ConvoView
sender: AppBskyActorDefs.ProfileViewBasic | undefined
recipients: AppBskyActorDefs.ProfileViewBasic[]
}>(async (resolve, reject) => {
try {
const response = await this.agent.api.chat.bsky.convo.getConvo(
{
convoId: this.convoId,
},
{
headers: {
Authorization: this.__tempFromUserDid,
},
},
)
const convo = response.data.convo
resolve({
convo,
sender: convo.members.find(m => m.did === this.__tempFromUserDid),
recipients: convo.members.filter(
m => m.did !== this.__tempFromUserDid,
),
})
} catch (e) {
reject(e)
} finally {
this.pendingFetchConvo = undefined
}
})
return this.pendingFetchConvo
}
async refreshConvo() {
const response = await this.agent.api.chat.bsky.convo.getConvo(
{
convoId: this.convoId,
},
{
headers: {
Authorization: this.__tempFromUserDid,
},
},
)
this.convo = response.data.convo
this.sender = this.convo.members.find(m => m.did === this.__tempFromUserDid)
this.recipients = this.convo.members.filter(
m => m.did !== this.__tempFromUserDid,
)
try {
const {convo, sender, recipients} = await this.fetchConvo()
// throw new Error('UNCOMMENT TO TEST REFRESH FAILURE')
this.convo = convo || this.convo
this.sender = sender || this.sender
this.recipients = recipients || this.recipients
} catch (e: any) {
logger.error(`Convo: failed to refresh convo`)
/*
* Prevent invalid states
*/
if (!this.sender) {
throw new Error('Convo: could not find sender in convo')
}
if (!this.recipients) {
throw new Error('Convo: could not find recipients in convo')
this.footerItems.set(ConvoItemError.Network, {
type: 'error-recoverable',
key: ConvoItemError.Network,
code: ConvoItemError.Network,
retry: () => {
this.footerItems.delete(ConvoItemError.Network)
this.resume()
},
})
this.commit()
}
}
@ -517,116 +739,142 @@ export class Convo {
}
}
private async pollEvents() {
if (
this.status === ConvoStatus.Ready ||
this.status === ConvoStatus.Backgrounded
) {
if (this.pendingEventIngestion) return
/*
* Represents a failed state, which is retryable.
*/
if (this.pollingFailure) return
setTimeout(async () => {
this.pendingEventIngestion = this.ingestLatestEvents()
await this.pendingEventIngestion
this.pendingEventIngestion = undefined
this.pollEvents()
}, this.pollInterval)
}
private restartPoll() {
this.cancelNextPoll()
this.pollLatestEvents()
}
async ingestLatestEvents() {
private cancelNextPoll() {
if (this.nextPoll) clearTimeout(this.nextPoll)
}
private pollLatestEvents() {
/*
* Uncomment to view poll events
*/
logger.debug('Convo: poll events', {id: this.id}, logger.DebugContext.convo)
try {
// throw new Error('UNCOMMENT TO TEST POLL FAILURE')
const response = await this.agent.api.chat.bsky.convo.getLog(
{
cursor: this.eventsCursor,
},
{
headers: {
Authorization: this.__tempFromUserDid,
},
},
)
const {logs} = response.data
let needsCommit = false
for (const log of logs) {
/*
* If there's a rev, we should handle it. If there's not a rev, we don't
* know what it is.
*/
if (typeof log.rev === 'string') {
/*
* We only care about new events
*/
if (log.rev > (this.eventsCursor = this.eventsCursor || log.rev)) {
/*
* Update rev regardless of if it's a log type we care about or not
*/
this.eventsCursor = log.rev
/*
* This is VERY important. We don't want to insert any messages from
* your other chats.
*/
if (log.convoId !== this.convoId) continue
if (
ChatBskyConvoDefs.isLogCreateMessage(log) &&
ChatBskyConvoDefs.isMessageView(log.message)
) {
if (this.newMessages.has(log.message.id)) {
// Trust the log as the source of truth on ordering
this.newMessages.delete(log.message.id)
}
this.newMessages.set(log.message.id, log.message)
needsCommit = true
} else if (
ChatBskyConvoDefs.isLogDeleteMessage(log) &&
ChatBskyConvoDefs.isDeletedMessageView(log.message)
) {
/*
* Update if we have this in state. If we don't, don't worry about it.
*/
if (this.pastMessages.has(log.message.id)) {
/*
* For now, we remove deleted messages from the thread, if we receive one.
*
* To support them, it'd look something like this:
* this.pastMessages.set(log.message.id, log.message)
*/
this.pastMessages.delete(log.message.id)
this.newMessages.delete(log.message.id)
this.deletedMessages.delete(log.message.id)
needsCommit = true
}
}
}
}
}
if (needsCommit) {
this.commit()
}
this.fetchLatestEvents().then(({events}) => {
this.applyLatestEvents(events)
})
this.nextPoll = setTimeout(() => {
this.pollLatestEvents()
}, this.pollInterval)
} catch (e: any) {
logger.error('Convo: failed to poll events')
this.pollingFailure = true
logger.error('Convo: poll events failed')
this.cancelNextPoll()
this.footerItems.set(ConvoItemError.PollFailed, {
type: 'error-recoverable',
key: ConvoItemError.PollFailed,
code: ConvoItemError.PollFailed,
retry: () => {
this.footerItems.delete(ConvoItemError.PollFailed)
this.pollingFailure = false
this.commit()
this.pollEvents()
this.pollLatestEvents()
},
})
this.commit()
}
}
private pendingFetchLatestEvents:
| Promise<{
events: ChatBskyConvoGetLog.OutputSchema['logs']
}>
| undefined
async fetchLatestEvents() {
if (this.pendingFetchLatestEvents) return this.pendingFetchLatestEvents
this.pendingFetchLatestEvents = new Promise<{
events: ChatBskyConvoGetLog.OutputSchema['logs']
}>(async (resolve, reject) => {
try {
// throw new Error('UNCOMMENT TO TEST POLL FAILURE')
const response = await this.agent.api.chat.bsky.convo.getLog(
{
cursor: this.eventsCursor,
},
{
headers: {
Authorization: this.__tempFromUserDid,
},
},
)
const {logs} = response.data
resolve({events: logs})
} catch (e) {
reject(e)
} finally {
this.pendingFetchLatestEvents = undefined
}
})
return this.pendingFetchLatestEvents
}
private applyLatestEvents(events: ChatBskyConvoGetLog.OutputSchema['logs']) {
let needsCommit = false
for (const ev of events) {
/*
* If there's a rev, we should handle it. If there's not a rev, we don't
* know what it is.
*/
if (typeof ev.rev === 'string') {
/*
* We only care about new events
*/
if (ev.rev > (this.eventsCursor = this.eventsCursor || ev.rev)) {
/*
* Update rev regardless of if it's a ev type we care about or not
*/
this.eventsCursor = ev.rev
/*
* This is VERY important. We don't want to insert any messages from
* your other chats.
*/
if (ev.convoId !== this.convoId) continue
if (
ChatBskyConvoDefs.isLogCreateMessage(ev) &&
ChatBskyConvoDefs.isMessageView(ev.message)
) {
if (this.newMessages.has(ev.message.id)) {
// Trust the ev as the source of truth on ordering
this.newMessages.delete(ev.message.id)
}
this.newMessages.set(ev.message.id, ev.message)
needsCommit = true
} else if (
ChatBskyConvoDefs.isLogDeleteMessage(ev) &&
ChatBskyConvoDefs.isDeletedMessageView(ev.message)
) {
/*
* Update if we have this in state. If we don't, don't worry about it.
*/
if (this.pastMessages.has(ev.message.id)) {
/*
* For now, we remove deleted messages from the thread, if we receive one.
*
* To support them, it'd look something like this:
* this.pastMessages.set(ev.message.id, ev.message)
*/
this.pastMessages.delete(ev.message.id)
this.newMessages.delete(ev.message.id)
this.deletedMessages.delete(ev.message.id)
needsCommit = true
}
}
}
}
}
if (needsCommit) {
this.commit()
}
}

View File

@ -1,6 +1,7 @@
import React, {useContext, useState, useSyncExternalStore} from 'react'
import {AppState} from 'react-native'
import {BskyAgent} from '@atproto-labs/api'
import {useFocusEffect} from '@react-navigation/native'
import {useFocusEffect, useIsFocused} from '@react-navigation/native'
import {Convo, ConvoParams, ConvoState} from '#/state/messages/convo'
import {useAgent} from '#/state/session'
@ -20,6 +21,7 @@ export function ChatProvider({
children,
convoId,
}: Pick<ConvoParams, 'convoId'> & {children: React.ReactNode}) {
const isScreenFocused = useIsFocused()
const {serviceUrl} = useDmServiceUrlStorage()
const {getAgent} = useAgent()
const [convo] = useState(
@ -44,5 +46,23 @@ export function ChatProvider({
}, [convo]),
)
React.useEffect(() => {
const handleAppStateChange = (nextAppState: string) => {
if (isScreenFocused) {
if (nextAppState === 'active') {
convo.resume()
} else {
convo.background()
}
}
}
const sub = AppState.addEventListener('change', handleAppStateChange)
return () => {
sub.remove()
}
}, [convo, isScreenFocused])
return <ChatContext.Provider value={service}>{children}</ChatContext.Provider>
}