[Clipclops] External store, suspend/resume (#3829)
* Initial working external store * Clean up WIP, explore suspend/resume * Clean up state, bindings, snapshots, add some logs * Reduce snapshots, add better logic check * Bump interval a smidge * Remove unused type
This commit is contained in:
parent
c13685a0cf
commit
c9cf608f78
6 changed files with 344 additions and 172 deletions
|
@ -12,6 +12,10 @@ describe(`#/state/messages/convo`, () => {
|
|||
})
|
||||
})
|
||||
|
||||
describe(`read states`, () => {
|
||||
it.todo(`should mark messages as read as they come in`)
|
||||
})
|
||||
|
||||
describe(`history fetching`, () => {
|
||||
it.todo(`fetches initial chat history`)
|
||||
it.todo(`fetches additional chat history`)
|
||||
|
|
|
@ -4,9 +4,9 @@ import {
|
|||
ChatBskyConvoDefs,
|
||||
ChatBskyConvoSendMessage,
|
||||
} from '@atproto-labs/api'
|
||||
import {EventEmitter} from 'eventemitter3'
|
||||
import {nanoid} from 'nanoid/non-secure'
|
||||
|
||||
import {logger} from '#/logger'
|
||||
import {isNative} from '#/platform/detection'
|
||||
|
||||
export type ConvoParams = {
|
||||
|
@ -18,9 +18,11 @@ export type ConvoParams = {
|
|||
export enum ConvoStatus {
|
||||
Uninitialized = 'uninitialized',
|
||||
Initializing = 'initializing',
|
||||
Resuming = 'resuming',
|
||||
Ready = 'ready',
|
||||
Error = 'error',
|
||||
Destroyed = 'destroyed',
|
||||
Backgrounded = 'backgrounded',
|
||||
Suspended = 'suspended',
|
||||
}
|
||||
|
||||
export type ConvoItem =
|
||||
|
@ -51,23 +53,85 @@ export type ConvoItem =
|
|||
export type ConvoState =
|
||||
| {
|
||||
status: ConvoStatus.Uninitialized
|
||||
items: []
|
||||
convo: undefined
|
||||
error: undefined
|
||||
isFetchingHistory: false
|
||||
deleteMessage: undefined
|
||||
sendMessage: undefined
|
||||
fetchMessageHistory: undefined
|
||||
}
|
||||
| {
|
||||
status: ConvoStatus.Initializing
|
||||
items: []
|
||||
convo: undefined
|
||||
error: undefined
|
||||
isFetchingHistory: boolean
|
||||
deleteMessage: undefined
|
||||
sendMessage: undefined
|
||||
fetchMessageHistory: undefined
|
||||
}
|
||||
| {
|
||||
status: ConvoStatus.Ready
|
||||
items: ConvoItem[]
|
||||
convo: ChatBskyConvoDefs.ConvoView
|
||||
error: undefined
|
||||
isFetchingHistory: boolean
|
||||
deleteMessage: (messageId: string) => void
|
||||
sendMessage: (
|
||||
message: ChatBskyConvoSendMessage.InputSchema['message'],
|
||||
) => void
|
||||
fetchMessageHistory: () => void
|
||||
}
|
||||
| {
|
||||
status: ConvoStatus.Suspended
|
||||
items: ConvoItem[]
|
||||
convo: ChatBskyConvoDefs.ConvoView
|
||||
error: undefined
|
||||
isFetchingHistory: boolean
|
||||
deleteMessage: (messageId: string) => void
|
||||
sendMessage: (
|
||||
message: ChatBskyConvoSendMessage.InputSchema['message'],
|
||||
) => void
|
||||
fetchMessageHistory: () => void
|
||||
}
|
||||
| {
|
||||
status: ConvoStatus.Backgrounded
|
||||
items: ConvoItem[]
|
||||
convo: ChatBskyConvoDefs.ConvoView
|
||||
error: undefined
|
||||
isFetchingHistory: boolean
|
||||
deleteMessage: (messageId: string) => void
|
||||
sendMessage: (
|
||||
message: ChatBskyConvoSendMessage.InputSchema['message'],
|
||||
) => void
|
||||
fetchMessageHistory: () => void
|
||||
}
|
||||
| {
|
||||
status: ConvoStatus.Resuming
|
||||
items: ConvoItem[]
|
||||
convo: ChatBskyConvoDefs.ConvoView
|
||||
error: undefined
|
||||
isFetchingHistory: boolean
|
||||
deleteMessage: (messageId: string) => void
|
||||
sendMessage: (
|
||||
message: ChatBskyConvoSendMessage.InputSchema['message'],
|
||||
) => void
|
||||
fetchMessageHistory: () => void
|
||||
}
|
||||
| {
|
||||
status: ConvoStatus.Error
|
||||
items: []
|
||||
convo: undefined
|
||||
error: any
|
||||
isFetchingHistory: false
|
||||
deleteMessage: undefined
|
||||
sendMessage: undefined
|
||||
fetchMessageHistory: undefined
|
||||
}
|
||||
| {
|
||||
status: ConvoStatus.Destroyed
|
||||
}
|
||||
|
||||
const ACTIVE_POLL_INTERVAL = 2e3
|
||||
const BACKGROUND_POLL_INTERVAL = 10e3
|
||||
|
||||
export function isConvoItemMessage(
|
||||
item: ConvoItem,
|
||||
|
@ -84,16 +148,13 @@ export class Convo {
|
|||
private agent: BskyAgent
|
||||
private __tempFromUserDid: string
|
||||
|
||||
private pollInterval = ACTIVE_POLL_INTERVAL
|
||||
private status: ConvoStatus = ConvoStatus.Uninitialized
|
||||
private error: any
|
||||
private historyCursor: string | undefined | null = undefined
|
||||
private isFetchingHistory = false
|
||||
private eventsCursor: string | undefined = undefined
|
||||
|
||||
convoId: string
|
||||
convo: ChatBskyConvoDefs.ConvoView | undefined
|
||||
sender: AppBskyActorDefs.ProfileViewBasic | undefined
|
||||
|
||||
private pastMessages: Map<
|
||||
string,
|
||||
ChatBskyConvoDefs.MessageView | ChatBskyConvoDefs.DeletedMessageView
|
||||
|
@ -112,72 +173,205 @@ export class Convo {
|
|||
private pendingEventIngestion: Promise<void> | undefined
|
||||
private isProcessingPendingMessages = false
|
||||
|
||||
convoId: string
|
||||
convo: ChatBskyConvoDefs.ConvoView | undefined
|
||||
sender: AppBskyActorDefs.ProfileViewBasic | undefined
|
||||
snapshot: ConvoState | undefined
|
||||
|
||||
constructor(params: ConvoParams) {
|
||||
this.convoId = params.convoId
|
||||
this.agent = params.agent
|
||||
this.__tempFromUserDid = params.__tempFromUserDid
|
||||
|
||||
this.subscribe = this.subscribe.bind(this)
|
||||
this.getSnapshot = this.getSnapshot.bind(this)
|
||||
this.sendMessage = this.sendMessage.bind(this)
|
||||
this.deleteMessage = this.deleteMessage.bind(this)
|
||||
this.fetchMessageHistory = this.fetchMessageHistory.bind(this)
|
||||
}
|
||||
|
||||
async initialize() {
|
||||
if (this.status !== 'uninitialized') return
|
||||
this.status = ConvoStatus.Initializing
|
||||
private commit() {
|
||||
this.snapshot = undefined
|
||||
this.subscribers.forEach(subscriber => subscriber())
|
||||
}
|
||||
|
||||
try {
|
||||
const response = await this.agent.api.chat.bsky.convo.getConvo(
|
||||
{
|
||||
convoId: this.convoId,
|
||||
},
|
||||
{
|
||||
headers: {
|
||||
Authorization: this.__tempFromUserDid,
|
||||
},
|
||||
},
|
||||
)
|
||||
const {convo} = response.data
|
||||
private subscribers: (() => void)[] = []
|
||||
|
||||
this.convo = convo
|
||||
this.sender = this.convo.members.find(
|
||||
m => m.did === this.__tempFromUserDid,
|
||||
)
|
||||
this.status = ConvoStatus.Ready
|
||||
subscribe(subscriber: () => void) {
|
||||
if (this.subscribers.length === 0) this.init()
|
||||
|
||||
this.commit()
|
||||
this.subscribers.push(subscriber)
|
||||
|
||||
await this.fetchMessageHistory()
|
||||
|
||||
this.pollEvents()
|
||||
} catch (e) {
|
||||
this.status = ConvoStatus.Error
|
||||
this.error = e
|
||||
return () => {
|
||||
this.subscribers = this.subscribers.filter(s => s !== subscriber)
|
||||
if (this.subscribers.length === 0) this.suspend()
|
||||
}
|
||||
}
|
||||
|
||||
private async pollEvents() {
|
||||
if (this.status === ConvoStatus.Destroyed) return
|
||||
if (this.pendingEventIngestion) return
|
||||
setTimeout(async () => {
|
||||
this.pendingEventIngestion = this.ingestLatestEvents()
|
||||
await this.pendingEventIngestion
|
||||
this.pendingEventIngestion = undefined
|
||||
this.pollEvents()
|
||||
}, 5e3)
|
||||
getSnapshot(): ConvoState {
|
||||
if (!this.snapshot) this.snapshot = this.generateSnapshot()
|
||||
logger.debug('Convo: snapshotted', {}, logger.DebugContext.convo)
|
||||
return this.snapshot
|
||||
}
|
||||
|
||||
private generateSnapshot(): ConvoState {
|
||||
switch (this.status) {
|
||||
case ConvoStatus.Initializing: {
|
||||
return {
|
||||
status: ConvoStatus.Initializing,
|
||||
items: [],
|
||||
convo: undefined,
|
||||
error: undefined,
|
||||
isFetchingHistory: this.isFetchingHistory,
|
||||
deleteMessage: undefined,
|
||||
sendMessage: undefined,
|
||||
fetchMessageHistory: undefined,
|
||||
}
|
||||
}
|
||||
case ConvoStatus.Suspended:
|
||||
case ConvoStatus.Backgrounded:
|
||||
case ConvoStatus.Resuming:
|
||||
case ConvoStatus.Ready: {
|
||||
return {
|
||||
status: this.status,
|
||||
items: this.getItems(),
|
||||
convo: this.convo!,
|
||||
error: undefined,
|
||||
isFetchingHistory: this.isFetchingHistory,
|
||||
deleteMessage: this.deleteMessage,
|
||||
sendMessage: this.sendMessage,
|
||||
fetchMessageHistory: this.fetchMessageHistory,
|
||||
}
|
||||
}
|
||||
case ConvoStatus.Error: {
|
||||
return {
|
||||
status: ConvoStatus.Error,
|
||||
items: [],
|
||||
convo: undefined,
|
||||
error: this.error,
|
||||
isFetchingHistory: false,
|
||||
deleteMessage: undefined,
|
||||
sendMessage: undefined,
|
||||
fetchMessageHistory: undefined,
|
||||
}
|
||||
}
|
||||
default: {
|
||||
return {
|
||||
status: ConvoStatus.Uninitialized,
|
||||
items: [],
|
||||
convo: undefined,
|
||||
error: undefined,
|
||||
isFetchingHistory: false,
|
||||
deleteMessage: undefined,
|
||||
sendMessage: undefined,
|
||||
fetchMessageHistory: undefined,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async init() {
|
||||
logger.debug('Convo: init', {}, logger.DebugContext.convo)
|
||||
|
||||
if (this.status === ConvoStatus.Uninitialized) {
|
||||
try {
|
||||
this.status = ConvoStatus.Initializing
|
||||
this.commit()
|
||||
|
||||
await this.refreshConvo()
|
||||
this.status = ConvoStatus.Ready
|
||||
this.commit()
|
||||
|
||||
await this.fetchMessageHistory()
|
||||
|
||||
this.pollEvents()
|
||||
} catch (e) {
|
||||
this.error = e
|
||||
this.status = ConvoStatus.Error
|
||||
this.commit()
|
||||
}
|
||||
} else {
|
||||
logger.warn(`Convo: cannot init from ${this.status}`)
|
||||
}
|
||||
}
|
||||
|
||||
async resume() {
|
||||
logger.debug('Convo: resume', {}, logger.DebugContext.convo)
|
||||
|
||||
if (
|
||||
this.status === ConvoStatus.Suspended ||
|
||||
this.status === ConvoStatus.Backgrounded
|
||||
) {
|
||||
try {
|
||||
this.status = ConvoStatus.Resuming
|
||||
this.commit()
|
||||
|
||||
await this.refreshConvo()
|
||||
this.status = ConvoStatus.Ready
|
||||
this.commit()
|
||||
|
||||
await this.fetchMessageHistory()
|
||||
|
||||
this.pollInterval = ACTIVE_POLL_INTERVAL
|
||||
this.pollEvents()
|
||||
} catch (e) {
|
||||
// TODO handle errors in one place
|
||||
this.error = e
|
||||
this.status = ConvoStatus.Error
|
||||
this.commit()
|
||||
}
|
||||
} else {
|
||||
logger.warn(`Convo: cannot resume from ${this.status}`)
|
||||
}
|
||||
}
|
||||
|
||||
async background() {
|
||||
logger.debug('Convo: backgrounded', {}, logger.DebugContext.convo)
|
||||
this.status = ConvoStatus.Backgrounded
|
||||
this.pollInterval = BACKGROUND_POLL_INTERVAL
|
||||
this.commit()
|
||||
}
|
||||
|
||||
async suspend() {
|
||||
logger.debug('Convo: suspended', {}, logger.DebugContext.convo)
|
||||
this.status = ConvoStatus.Suspended
|
||||
this.commit()
|
||||
}
|
||||
|
||||
async refreshConvo() {
|
||||
const response = await this.agent.api.chat.bsky.convo.getConvo(
|
||||
{
|
||||
convoId: this.convoId,
|
||||
},
|
||||
{
|
||||
headers: {
|
||||
Authorization: this.__tempFromUserDid,
|
||||
},
|
||||
},
|
||||
)
|
||||
this.convo = response.data.convo
|
||||
this.sender = this.convo.members.find(m => m.did === this.__tempFromUserDid)
|
||||
}
|
||||
|
||||
async fetchMessageHistory() {
|
||||
if (this.status === ConvoStatus.Destroyed) return
|
||||
// reached end
|
||||
logger.debug('Convo: fetch message history', {}, logger.DebugContext.convo)
|
||||
|
||||
/*
|
||||
* If historyCursor is null, we've fetched all history.
|
||||
*/
|
||||
if (this.historyCursor === null) return
|
||||
|
||||
/*
|
||||
* Don't fetch again if a fetch is already in progress
|
||||
*/
|
||||
if (this.isFetchingHistory) return
|
||||
|
||||
this.isFetchingHistory = true
|
||||
this.commit()
|
||||
|
||||
/*
|
||||
* Delay if paginating while scrolled.
|
||||
*
|
||||
* TODO why does the FlatList jump without this delay?
|
||||
*
|
||||
* Tbh it feels a little more natural with a slight delay.
|
||||
* Delay if paginating while scrolled to prevent momentum scrolling from
|
||||
* jerking the list around, plus makes it feel a little more human.
|
||||
*/
|
||||
if (this.pastMessages.size > 0) {
|
||||
await new Promise(y => setTimeout(y, 500))
|
||||
|
@ -219,9 +413,23 @@ export class Convo {
|
|||
this.commit()
|
||||
}
|
||||
|
||||
async ingestLatestEvents() {
|
||||
if (this.status === ConvoStatus.Destroyed) return
|
||||
private async pollEvents() {
|
||||
if (
|
||||
this.status === ConvoStatus.Ready ||
|
||||
this.status === ConvoStatus.Backgrounded
|
||||
) {
|
||||
if (this.pendingEventIngestion) return
|
||||
|
||||
setTimeout(async () => {
|
||||
this.pendingEventIngestion = this.ingestLatestEvents()
|
||||
await this.pendingEventIngestion
|
||||
this.pendingEventIngestion = undefined
|
||||
this.pollEvents()
|
||||
}, this.pollInterval)
|
||||
}
|
||||
}
|
||||
|
||||
async ingestLatestEvents() {
|
||||
const response = await this.agent.api.chat.bsky.convo.getLog(
|
||||
{
|
||||
cursor: this.eventsCursor,
|
||||
|
@ -234,6 +442,8 @@ export class Convo {
|
|||
)
|
||||
const {logs} = response.data
|
||||
|
||||
let needsCommit = false
|
||||
|
||||
for (const log of logs) {
|
||||
/*
|
||||
* If there's a rev, we should handle it. If there's not a rev, we don't
|
||||
|
@ -264,6 +474,7 @@ export class Convo {
|
|||
this.newMessages.delete(log.message.id)
|
||||
}
|
||||
this.newMessages.set(log.message.id, log.message)
|
||||
needsCommit = true
|
||||
} else if (
|
||||
ChatBskyConvoDefs.isLogDeleteMessage(log) &&
|
||||
ChatBskyConvoDefs.isDeletedMessageView(log.message)
|
||||
|
@ -281,16 +492,44 @@ export class Convo {
|
|||
this.pastMessages.delete(log.message.id)
|
||||
this.newMessages.delete(log.message.id)
|
||||
this.deletedMessages.delete(log.message.id)
|
||||
needsCommit = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (needsCommit) {
|
||||
this.commit()
|
||||
}
|
||||
}
|
||||
|
||||
async sendMessage(message: ChatBskyConvoSendMessage.InputSchema['message']) {
|
||||
// Ignore empty messages for now since they have no other purpose atm
|
||||
if (!message.text.trim()) return
|
||||
|
||||
logger.debug('Convo: send message', {}, logger.DebugContext.convo)
|
||||
|
||||
const tempId = nanoid()
|
||||
|
||||
this.pendingMessages.set(tempId, {
|
||||
id: tempId,
|
||||
message,
|
||||
})
|
||||
this.commit()
|
||||
|
||||
if (!this.isProcessingPendingMessages) {
|
||||
this.processPendingMessages()
|
||||
}
|
||||
}
|
||||
|
||||
async processPendingMessages() {
|
||||
logger.debug(
|
||||
`Convo: processing messages (${this.pendingMessages.size} remaining)`,
|
||||
{},
|
||||
logger.DebugContext.convo,
|
||||
)
|
||||
|
||||
const pendingMessage = Array.from(this.pendingMessages.values()).shift()
|
||||
|
||||
/*
|
||||
|
@ -346,6 +585,12 @@ export class Convo {
|
|||
}
|
||||
|
||||
async batchRetryPendingMessages() {
|
||||
logger.debug(
|
||||
`Convo: retrying ${this.pendingMessages.size} pending messages`,
|
||||
{},
|
||||
logger.DebugContext.convo,
|
||||
)
|
||||
|
||||
this.footerItems.delete('pending-retry')
|
||||
this.commit()
|
||||
|
||||
|
@ -396,25 +641,9 @@ export class Convo {
|
|||
}
|
||||
}
|
||||
|
||||
async sendMessage(message: ChatBskyConvoSendMessage.InputSchema['message']) {
|
||||
if (this.status === ConvoStatus.Destroyed) return
|
||||
// Ignore empty messages for now since they have no other purpose atm
|
||||
if (!message.text.trim()) return
|
||||
|
||||
const tempId = nanoid()
|
||||
|
||||
this.pendingMessages.set(tempId, {
|
||||
id: tempId,
|
||||
message,
|
||||
})
|
||||
this.commit()
|
||||
|
||||
if (!this.isProcessingPendingMessages) {
|
||||
this.processPendingMessages()
|
||||
}
|
||||
}
|
||||
|
||||
async deleteMessage(messageId: string) {
|
||||
logger.debug('Convo: delete message', {}, logger.DebugContext.convo)
|
||||
|
||||
this.deletedMessages.add(messageId)
|
||||
this.commit()
|
||||
|
||||
|
@ -441,7 +670,7 @@ export class Convo {
|
|||
/*
|
||||
* Items in reverse order, since FlatList inverts
|
||||
*/
|
||||
get items(): ConvoItem[] {
|
||||
getItems(): ConvoItem[] {
|
||||
const items: ConvoItem[] = []
|
||||
|
||||
// `newMessages` is in insertion order, unshift to reverse
|
||||
|
@ -539,57 +768,4 @@ export class Convo {
|
|||
return item
|
||||
})
|
||||
}
|
||||
|
||||
destroy() {
|
||||
this.status = ConvoStatus.Destroyed
|
||||
this.commit()
|
||||
}
|
||||
|
||||
get state(): ConvoState {
|
||||
switch (this.status) {
|
||||
case ConvoStatus.Initializing: {
|
||||
return {
|
||||
status: ConvoStatus.Initializing,
|
||||
}
|
||||
}
|
||||
case ConvoStatus.Ready: {
|
||||
return {
|
||||
status: ConvoStatus.Ready,
|
||||
items: this.items,
|
||||
convo: this.convo!,
|
||||
isFetchingHistory: this.isFetchingHistory,
|
||||
}
|
||||
}
|
||||
case ConvoStatus.Error: {
|
||||
return {
|
||||
status: ConvoStatus.Error,
|
||||
error: this.error,
|
||||
}
|
||||
}
|
||||
case ConvoStatus.Destroyed: {
|
||||
return {
|
||||
status: ConvoStatus.Destroyed,
|
||||
}
|
||||
}
|
||||
default: {
|
||||
return {
|
||||
status: ConvoStatus.Uninitialized,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private _emitter = new EventEmitter()
|
||||
|
||||
private commit() {
|
||||
this._emitter.emit('update')
|
||||
}
|
||||
|
||||
on(event: 'update', cb: () => void) {
|
||||
this._emitter.on(event, cb)
|
||||
}
|
||||
|
||||
off(event: 'update', cb: () => void) {
|
||||
this._emitter.off(event, cb)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,14 +1,12 @@
|
|||
import React, {useContext, useEffect, useMemo, useState} from 'react'
|
||||
import React, {useContext, useState, useSyncExternalStore} from 'react'
|
||||
import {BskyAgent} from '@atproto-labs/api'
|
||||
import {useFocusEffect} from '@react-navigation/native'
|
||||
|
||||
import {Convo, ConvoParams} from '#/state/messages/convo'
|
||||
import {Convo, ConvoParams, ConvoState} from '#/state/messages/convo'
|
||||
import {useAgent} from '#/state/session'
|
||||
import {useDmServiceUrlStorage} from '#/screens/Messages/Temp/useDmServiceUrlStorage'
|
||||
|
||||
const ChatContext = React.createContext<{
|
||||
service: Convo
|
||||
state: Convo['state']
|
||||
} | null>(null)
|
||||
const ChatContext = React.createContext<ConvoState | null>(null)
|
||||
|
||||
export function useChat() {
|
||||
const ctx = useContext(ChatContext)
|
||||
|
@ -24,7 +22,7 @@ export function ChatProvider({
|
|||
}: Pick<ConvoParams, 'convoId'> & {children: React.ReactNode}) {
|
||||
const {serviceUrl} = useDmServiceUrlStorage()
|
||||
const {getAgent} = useAgent()
|
||||
const [service] = useState(
|
||||
const [convo] = useState(
|
||||
() =>
|
||||
new Convo({
|
||||
convoId,
|
||||
|
@ -34,21 +32,17 @@ export function ChatProvider({
|
|||
__tempFromUserDid: getAgent().session?.did!,
|
||||
}),
|
||||
)
|
||||
const [state, setState] = useState(service.state)
|
||||
const service = useSyncExternalStore(convo.subscribe, convo.getSnapshot)
|
||||
|
||||
useEffect(() => {
|
||||
service.initialize()
|
||||
}, [service])
|
||||
useFocusEffect(
|
||||
React.useCallback(() => {
|
||||
convo.resume()
|
||||
|
||||
useEffect(() => {
|
||||
const update = () => setState(service.state)
|
||||
service.on('update', update)
|
||||
return () => {
|
||||
service.destroy()
|
||||
}
|
||||
}, [service])
|
||||
return () => {
|
||||
convo.background()
|
||||
}
|
||||
}, [convo]),
|
||||
)
|
||||
|
||||
const value = useMemo(() => ({service, state}), [service, state])
|
||||
|
||||
return <ChatContext.Provider value={value}>{children}</ChatContext.Provider>
|
||||
return <ChatContext.Provider value={service}>{children}</ChatContext.Provider>
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue