From c9cf608f789943e81bfa32b8da5f6ca4f75d5a66 Mon Sep 17 00:00:00 2001 From: Eric Bailey Date: Thu, 2 May 2024 20:57:51 -0500 Subject: [PATCH] [Clipclops] External store, suspend/resume (#3829) * Initial working external store * Clean up WIP, explore suspend/resume * Clean up state, bindings, snapshots, add some logs * Reduce snapshots, add better logic check * Bump interval a smidge * Remove unused type --- src/logger/debugContext.ts | 1 + .../Messages/Conversation/MessagesList.tsx | 21 +- src/screens/Messages/Conversation/index.tsx | 26 +- src/state/messages/__tests__/convo.test.ts | 4 + src/state/messages/convo.ts | 428 ++++++++++++------ src/state/messages/index.tsx | 36 +- 6 files changed, 344 insertions(+), 172 deletions(-) diff --git a/src/logger/debugContext.ts b/src/logger/debugContext.ts index 0e04752e..99712078 100644 --- a/src/logger/debugContext.ts +++ b/src/logger/debugContext.ts @@ -9,4 +9,5 @@ export const DebugContext = { // e.g. composer: 'composer' session: 'session', notifications: 'notifications', + convo: 'convo', } as const diff --git a/src/screens/Messages/Conversation/MessagesList.tsx b/src/screens/Messages/Conversation/MessagesList.tsx index 435c4032..3990a1de 100644 --- a/src/screens/Messages/Conversation/MessagesList.tsx +++ b/src/screens/Messages/Conversation/MessagesList.tsx @@ -90,7 +90,9 @@ export function MessagesList() { }, []) const onEndReached = useCallback(() => { - chat.service.fetchMessageHistory() + if (chat.status === ConvoStatus.Ready) { + chat.fetchMessageHistory() + } }, [chat]) const onInputFocus = useCallback(() => { @@ -103,11 +105,13 @@ export function MessagesList() { const onSendMessage = useCallback( (text: string) => { - chat.service.sendMessage({ - text, - }) + if (chat.status === ConvoStatus.Ready) { + chat.sendMessage({ + text, + }) + } }, - [chat.service], + [chat], ) const onScroll = React.useCallback( @@ -136,9 +140,7 @@ export function MessagesList() { contentContainerStyle={a.flex_1}> } diff --git a/src/screens/Messages/Conversation/index.tsx b/src/screens/Messages/Conversation/index.tsx index 155c0b2f..0c68d619 100644 --- a/src/screens/Messages/Conversation/index.tsx +++ b/src/screens/Messages/Conversation/index.tsx @@ -1,7 +1,6 @@ import React, {useCallback} from 'react' import {TouchableOpacity, View} from 'react-native' import {AppBskyActorDefs} from '@atproto/api' -import {ChatBskyConvoDefs} from '@atproto-labs/api' import {FontAwesomeIcon} from '@fortawesome/react-native-fontawesome' import {msg} from '@lingui/macro' import {useLingui} from '@lingui/react' @@ -47,16 +46,16 @@ function Inner() { const myDid = currentAccount?.did const otherProfile = React.useMemo(() => { - if (chat.state.status !== ConvoStatus.Ready) return - return chat.state.convo.members.find(m => m.did !== myDid) - }, [chat.state, myDid]) + if (chat.status !== ConvoStatus.Ready) return + return chat.convo.members.find(m => m.did !== myDid) + }, [chat, myDid]) // TODO whenever we have error messages, we should use them in here -hailey - if (chat.state.status !== ConvoStatus.Ready || !otherProfile) { + if (chat.status !== ConvoStatus.Ready || !otherProfile) { return ( ) } @@ -78,7 +77,7 @@ let Header = ({ const {_} = useLingui() const {gtTablet} = useBreakpoints() const navigation = useNavigation() - const {service} = useChat() + const chat = useChat() const onPressBack = useCallback(() => { if (isWeb) { @@ -88,12 +87,9 @@ let Header = ({ } }, [navigation]) - const onUpdateConvo = useCallback( - (convo: ChatBskyConvoDefs.ConvoView) => { - service.convo = convo - }, - [service], - ) + const onUpdateConvo = useCallback(() => { + // TODO eric update muted state + }, []) return ( {profile.displayName} - {service.convo ? ( + {chat.status === ConvoStatus.Ready ? ( { }) }) + describe(`read states`, () => { + it.todo(`should mark messages as read as they come in`) + }) + describe(`history fetching`, () => { it.todo(`fetches initial chat history`) it.todo(`fetches additional chat history`) diff --git a/src/state/messages/convo.ts b/src/state/messages/convo.ts index c3d5ffc2..f687008e 100644 --- a/src/state/messages/convo.ts +++ b/src/state/messages/convo.ts @@ -4,9 +4,9 @@ import { ChatBskyConvoDefs, ChatBskyConvoSendMessage, } from '@atproto-labs/api' -import {EventEmitter} from 'eventemitter3' import {nanoid} from 'nanoid/non-secure' +import {logger} from '#/logger' import {isNative} from '#/platform/detection' export type ConvoParams = { @@ -18,9 +18,11 @@ export type ConvoParams = { export enum ConvoStatus { Uninitialized = 'uninitialized', Initializing = 'initializing', + Resuming = 'resuming', Ready = 'ready', Error = 'error', - Destroyed = 'destroyed', + Backgrounded = 'backgrounded', + Suspended = 'suspended', } export type ConvoItem = @@ -51,23 +53,85 @@ export type ConvoItem = export type ConvoState = | { status: ConvoStatus.Uninitialized + items: [] + convo: undefined + error: undefined + isFetchingHistory: false + deleteMessage: undefined + sendMessage: undefined + fetchMessageHistory: undefined } | { status: ConvoStatus.Initializing + items: [] + convo: undefined + error: undefined + isFetchingHistory: boolean + deleteMessage: undefined + sendMessage: undefined + fetchMessageHistory: undefined } | { status: ConvoStatus.Ready items: ConvoItem[] convo: ChatBskyConvoDefs.ConvoView + error: undefined isFetchingHistory: boolean + deleteMessage: (messageId: string) => void + sendMessage: ( + message: ChatBskyConvoSendMessage.InputSchema['message'], + ) => void + fetchMessageHistory: () => void + } + | { + status: ConvoStatus.Suspended + items: ConvoItem[] + convo: ChatBskyConvoDefs.ConvoView + error: undefined + isFetchingHistory: boolean + deleteMessage: (messageId: string) => void + sendMessage: ( + message: ChatBskyConvoSendMessage.InputSchema['message'], + ) => void + fetchMessageHistory: () => void + } + | { + status: ConvoStatus.Backgrounded + items: ConvoItem[] + convo: ChatBskyConvoDefs.ConvoView + error: undefined + isFetchingHistory: boolean + deleteMessage: (messageId: string) => void + sendMessage: ( + message: ChatBskyConvoSendMessage.InputSchema['message'], + ) => void + fetchMessageHistory: () => void + } + | { + status: ConvoStatus.Resuming + items: ConvoItem[] + convo: ChatBskyConvoDefs.ConvoView + error: undefined + isFetchingHistory: boolean + deleteMessage: (messageId: string) => void + sendMessage: ( + message: ChatBskyConvoSendMessage.InputSchema['message'], + ) => void + fetchMessageHistory: () => void } | { status: ConvoStatus.Error + items: [] + convo: undefined error: any + isFetchingHistory: false + deleteMessage: undefined + sendMessage: undefined + fetchMessageHistory: undefined } - | { - status: ConvoStatus.Destroyed - } + +const ACTIVE_POLL_INTERVAL = 2e3 +const BACKGROUND_POLL_INTERVAL = 10e3 export function isConvoItemMessage( item: ConvoItem, @@ -84,16 +148,13 @@ export class Convo { private agent: BskyAgent private __tempFromUserDid: string + private pollInterval = ACTIVE_POLL_INTERVAL private status: ConvoStatus = ConvoStatus.Uninitialized private error: any private historyCursor: string | undefined | null = undefined private isFetchingHistory = false private eventsCursor: string | undefined = undefined - convoId: string - convo: ChatBskyConvoDefs.ConvoView | undefined - sender: AppBskyActorDefs.ProfileViewBasic | undefined - private pastMessages: Map< string, ChatBskyConvoDefs.MessageView | ChatBskyConvoDefs.DeletedMessageView @@ -112,72 +173,205 @@ export class Convo { private pendingEventIngestion: Promise | undefined private isProcessingPendingMessages = false + convoId: string + convo: ChatBskyConvoDefs.ConvoView | undefined + sender: AppBskyActorDefs.ProfileViewBasic | undefined + snapshot: ConvoState | undefined + constructor(params: ConvoParams) { this.convoId = params.convoId this.agent = params.agent this.__tempFromUserDid = params.__tempFromUserDid + + this.subscribe = this.subscribe.bind(this) + this.getSnapshot = this.getSnapshot.bind(this) + this.sendMessage = this.sendMessage.bind(this) + this.deleteMessage = this.deleteMessage.bind(this) + this.fetchMessageHistory = this.fetchMessageHistory.bind(this) } - async initialize() { - if (this.status !== 'uninitialized') return - this.status = ConvoStatus.Initializing + private commit() { + this.snapshot = undefined + this.subscribers.forEach(subscriber => subscriber()) + } - try { - const response = await this.agent.api.chat.bsky.convo.getConvo( - { - convoId: this.convoId, - }, - { - headers: { - Authorization: this.__tempFromUserDid, - }, - }, - ) - const {convo} = response.data + private subscribers: (() => void)[] = [] - this.convo = convo - this.sender = this.convo.members.find( - m => m.did === this.__tempFromUserDid, - ) - this.status = ConvoStatus.Ready + subscribe(subscriber: () => void) { + if (this.subscribers.length === 0) this.init() - this.commit() + this.subscribers.push(subscriber) - await this.fetchMessageHistory() - - this.pollEvents() - } catch (e) { - this.status = ConvoStatus.Error - this.error = e + return () => { + this.subscribers = this.subscribers.filter(s => s !== subscriber) + if (this.subscribers.length === 0) this.suspend() } } - private async pollEvents() { - if (this.status === ConvoStatus.Destroyed) return - if (this.pendingEventIngestion) return - setTimeout(async () => { - this.pendingEventIngestion = this.ingestLatestEvents() - await this.pendingEventIngestion - this.pendingEventIngestion = undefined - this.pollEvents() - }, 5e3) + getSnapshot(): ConvoState { + if (!this.snapshot) this.snapshot = this.generateSnapshot() + logger.debug('Convo: snapshotted', {}, logger.DebugContext.convo) + return this.snapshot + } + + private generateSnapshot(): ConvoState { + switch (this.status) { + case ConvoStatus.Initializing: { + return { + status: ConvoStatus.Initializing, + items: [], + convo: undefined, + error: undefined, + isFetchingHistory: this.isFetchingHistory, + deleteMessage: undefined, + sendMessage: undefined, + fetchMessageHistory: undefined, + } + } + case ConvoStatus.Suspended: + case ConvoStatus.Backgrounded: + case ConvoStatus.Resuming: + case ConvoStatus.Ready: { + return { + status: this.status, + items: this.getItems(), + convo: this.convo!, + error: undefined, + isFetchingHistory: this.isFetchingHistory, + deleteMessage: this.deleteMessage, + sendMessage: this.sendMessage, + fetchMessageHistory: this.fetchMessageHistory, + } + } + case ConvoStatus.Error: { + return { + status: ConvoStatus.Error, + items: [], + convo: undefined, + error: this.error, + isFetchingHistory: false, + deleteMessage: undefined, + sendMessage: undefined, + fetchMessageHistory: undefined, + } + } + default: { + return { + status: ConvoStatus.Uninitialized, + items: [], + convo: undefined, + error: undefined, + isFetchingHistory: false, + deleteMessage: undefined, + sendMessage: undefined, + fetchMessageHistory: undefined, + } + } + } + } + + async init() { + logger.debug('Convo: init', {}, logger.DebugContext.convo) + + if (this.status === ConvoStatus.Uninitialized) { + try { + this.status = ConvoStatus.Initializing + this.commit() + + await this.refreshConvo() + this.status = ConvoStatus.Ready + this.commit() + + await this.fetchMessageHistory() + + this.pollEvents() + } catch (e) { + this.error = e + this.status = ConvoStatus.Error + this.commit() + } + } else { + logger.warn(`Convo: cannot init from ${this.status}`) + } + } + + async resume() { + logger.debug('Convo: resume', {}, logger.DebugContext.convo) + + if ( + this.status === ConvoStatus.Suspended || + this.status === ConvoStatus.Backgrounded + ) { + try { + this.status = ConvoStatus.Resuming + this.commit() + + await this.refreshConvo() + this.status = ConvoStatus.Ready + this.commit() + + await this.fetchMessageHistory() + + this.pollInterval = ACTIVE_POLL_INTERVAL + this.pollEvents() + } catch (e) { + // TODO handle errors in one place + this.error = e + this.status = ConvoStatus.Error + this.commit() + } + } else { + logger.warn(`Convo: cannot resume from ${this.status}`) + } + } + + async background() { + logger.debug('Convo: backgrounded', {}, logger.DebugContext.convo) + this.status = ConvoStatus.Backgrounded + this.pollInterval = BACKGROUND_POLL_INTERVAL + this.commit() + } + + async suspend() { + logger.debug('Convo: suspended', {}, logger.DebugContext.convo) + this.status = ConvoStatus.Suspended + this.commit() + } + + async refreshConvo() { + const response = await this.agent.api.chat.bsky.convo.getConvo( + { + convoId: this.convoId, + }, + { + headers: { + Authorization: this.__tempFromUserDid, + }, + }, + ) + this.convo = response.data.convo + this.sender = this.convo.members.find(m => m.did === this.__tempFromUserDid) } async fetchMessageHistory() { - if (this.status === ConvoStatus.Destroyed) return - // reached end + logger.debug('Convo: fetch message history', {}, logger.DebugContext.convo) + + /* + * If historyCursor is null, we've fetched all history. + */ if (this.historyCursor === null) return + + /* + * Don't fetch again if a fetch is already in progress + */ if (this.isFetchingHistory) return this.isFetchingHistory = true this.commit() /* - * Delay if paginating while scrolled. - * - * TODO why does the FlatList jump without this delay? - * - * Tbh it feels a little more natural with a slight delay. + * Delay if paginating while scrolled to prevent momentum scrolling from + * jerking the list around, plus makes it feel a little more human. */ if (this.pastMessages.size > 0) { await new Promise(y => setTimeout(y, 500)) @@ -219,9 +413,23 @@ export class Convo { this.commit() } - async ingestLatestEvents() { - if (this.status === ConvoStatus.Destroyed) return + private async pollEvents() { + if ( + this.status === ConvoStatus.Ready || + this.status === ConvoStatus.Backgrounded + ) { + if (this.pendingEventIngestion) return + setTimeout(async () => { + this.pendingEventIngestion = this.ingestLatestEvents() + await this.pendingEventIngestion + this.pendingEventIngestion = undefined + this.pollEvents() + }, this.pollInterval) + } + } + + async ingestLatestEvents() { const response = await this.agent.api.chat.bsky.convo.getLog( { cursor: this.eventsCursor, @@ -234,6 +442,8 @@ export class Convo { ) const {logs} = response.data + let needsCommit = false + for (const log of logs) { /* * If there's a rev, we should handle it. If there's not a rev, we don't @@ -264,6 +474,7 @@ export class Convo { this.newMessages.delete(log.message.id) } this.newMessages.set(log.message.id, log.message) + needsCommit = true } else if ( ChatBskyConvoDefs.isLogDeleteMessage(log) && ChatBskyConvoDefs.isDeletedMessageView(log.message) @@ -281,16 +492,44 @@ export class Convo { this.pastMessages.delete(log.message.id) this.newMessages.delete(log.message.id) this.deletedMessages.delete(log.message.id) + needsCommit = true } } } } } + if (needsCommit) { + this.commit() + } + } + + async sendMessage(message: ChatBskyConvoSendMessage.InputSchema['message']) { + // Ignore empty messages for now since they have no other purpose atm + if (!message.text.trim()) return + + logger.debug('Convo: send message', {}, logger.DebugContext.convo) + + const tempId = nanoid() + + this.pendingMessages.set(tempId, { + id: tempId, + message, + }) this.commit() + + if (!this.isProcessingPendingMessages) { + this.processPendingMessages() + } } async processPendingMessages() { + logger.debug( + `Convo: processing messages (${this.pendingMessages.size} remaining)`, + {}, + logger.DebugContext.convo, + ) + const pendingMessage = Array.from(this.pendingMessages.values()).shift() /* @@ -346,6 +585,12 @@ export class Convo { } async batchRetryPendingMessages() { + logger.debug( + `Convo: retrying ${this.pendingMessages.size} pending messages`, + {}, + logger.DebugContext.convo, + ) + this.footerItems.delete('pending-retry') this.commit() @@ -396,25 +641,9 @@ export class Convo { } } - async sendMessage(message: ChatBskyConvoSendMessage.InputSchema['message']) { - if (this.status === ConvoStatus.Destroyed) return - // Ignore empty messages for now since they have no other purpose atm - if (!message.text.trim()) return - - const tempId = nanoid() - - this.pendingMessages.set(tempId, { - id: tempId, - message, - }) - this.commit() - - if (!this.isProcessingPendingMessages) { - this.processPendingMessages() - } - } - async deleteMessage(messageId: string) { + logger.debug('Convo: delete message', {}, logger.DebugContext.convo) + this.deletedMessages.add(messageId) this.commit() @@ -441,7 +670,7 @@ export class Convo { /* * Items in reverse order, since FlatList inverts */ - get items(): ConvoItem[] { + getItems(): ConvoItem[] { const items: ConvoItem[] = [] // `newMessages` is in insertion order, unshift to reverse @@ -539,57 +768,4 @@ export class Convo { return item }) } - - destroy() { - this.status = ConvoStatus.Destroyed - this.commit() - } - - get state(): ConvoState { - switch (this.status) { - case ConvoStatus.Initializing: { - return { - status: ConvoStatus.Initializing, - } - } - case ConvoStatus.Ready: { - return { - status: ConvoStatus.Ready, - items: this.items, - convo: this.convo!, - isFetchingHistory: this.isFetchingHistory, - } - } - case ConvoStatus.Error: { - return { - status: ConvoStatus.Error, - error: this.error, - } - } - case ConvoStatus.Destroyed: { - return { - status: ConvoStatus.Destroyed, - } - } - default: { - return { - status: ConvoStatus.Uninitialized, - } - } - } - } - - private _emitter = new EventEmitter() - - private commit() { - this._emitter.emit('update') - } - - on(event: 'update', cb: () => void) { - this._emitter.on(event, cb) - } - - off(event: 'update', cb: () => void) { - this._emitter.off(event, cb) - } } diff --git a/src/state/messages/index.tsx b/src/state/messages/index.tsx index cdc5a4db..22c4242e 100644 --- a/src/state/messages/index.tsx +++ b/src/state/messages/index.tsx @@ -1,14 +1,12 @@ -import React, {useContext, useEffect, useMemo, useState} from 'react' +import React, {useContext, useState, useSyncExternalStore} from 'react' import {BskyAgent} from '@atproto-labs/api' +import {useFocusEffect} from '@react-navigation/native' -import {Convo, ConvoParams} from '#/state/messages/convo' +import {Convo, ConvoParams, ConvoState} from '#/state/messages/convo' import {useAgent} from '#/state/session' import {useDmServiceUrlStorage} from '#/screens/Messages/Temp/useDmServiceUrlStorage' -const ChatContext = React.createContext<{ - service: Convo - state: Convo['state'] -} | null>(null) +const ChatContext = React.createContext(null) export function useChat() { const ctx = useContext(ChatContext) @@ -24,7 +22,7 @@ export function ChatProvider({ }: Pick & {children: React.ReactNode}) { const {serviceUrl} = useDmServiceUrlStorage() const {getAgent} = useAgent() - const [service] = useState( + const [convo] = useState( () => new Convo({ convoId, @@ -34,21 +32,17 @@ export function ChatProvider({ __tempFromUserDid: getAgent().session?.did!, }), ) - const [state, setState] = useState(service.state) + const service = useSyncExternalStore(convo.subscribe, convo.getSnapshot) - useEffect(() => { - service.initialize() - }, [service]) + useFocusEffect( + React.useCallback(() => { + convo.resume() - useEffect(() => { - const update = () => setState(service.state) - service.on('update', update) - return () => { - service.destroy() - } - }, [service]) + return () => { + convo.background() + } + }, [convo]), + ) - const value = useMemo(() => ({service, state}), [service, state]) - - return {children} + return {children} }