[Clipclops] 2 Clipped 2 Clopped (#3796)
* Add new pkg * copy queries over to new file * useConvoQuery * useListConvos * Use useListConvos * extract useConvoQuery * useGetConvoForMembers * Delete unused * exract useListConvos * Replace imports * Messages/List/index.tsx * extract getconvoformembers * MessageItem * delete chatLog and rename query.ts * Update import * Clipclop service (#3794) * Add Chat service * Better handle deletions * Rollback unneeded changes * Better insertion order * Use clipclops * don't show FAB if error * clean up imports * Update Convo service * Remove temp files --------- Co-authored-by: Samuel Newman <mozzius@protonmail.com>
This commit is contained in:
parent
d61b366b26
commit
538ca8dff1
30 changed files with 752 additions and 1130 deletions
38
src/state/messages/__tests__/client.test.ts
Normal file
38
src/state/messages/__tests__/client.test.ts
Normal file
|
@ -0,0 +1,38 @@
|
|||
import {describe, it} from '@jest/globals'
|
||||
|
||||
describe(`#/state/dms/client`, () => {
|
||||
describe(`ChatsService`, () => {
|
||||
describe(`unread count`, () => {
|
||||
it.todo(`marks a chat as read, decrements total unread count`)
|
||||
})
|
||||
|
||||
describe(`log processing`, () => {
|
||||
/*
|
||||
* We receive a new chat log AND messages for it in the same batch. We
|
||||
* need to first initialize the chat, then process the received logs.
|
||||
*/
|
||||
describe(`handles new chats and subsequent messages received in same log batch`, () => {
|
||||
it.todo(`receives new chat and messages`)
|
||||
it.todo(
|
||||
`receives new chat, new messages come in while still initializing new chat`,
|
||||
)
|
||||
})
|
||||
})
|
||||
|
||||
describe(`reset state`, () => {
|
||||
it.todo(`after period of inactivity, rehydrates entirely fresh state`)
|
||||
})
|
||||
})
|
||||
|
||||
describe(`ChatService`, () => {
|
||||
describe(`history fetching`, () => {
|
||||
it.todo(`fetches initial chat history`)
|
||||
it.todo(`fetches additional chat history`)
|
||||
it.todo(`handles history fetch failure`)
|
||||
})
|
||||
|
||||
describe(`optimistic updates`, () => {
|
||||
it.todo(`adds sending messages`)
|
||||
})
|
||||
})
|
||||
})
|
442
src/state/messages/convo.ts
Normal file
442
src/state/messages/convo.ts
Normal file
|
@ -0,0 +1,442 @@
|
|||
import {
|
||||
BskyAgent,
|
||||
ChatBskyConvoDefs,
|
||||
ChatBskyConvoSendMessage,
|
||||
} from '@atproto-labs/api'
|
||||
import {EventEmitter} from 'eventemitter3'
|
||||
import {nanoid} from 'nanoid/non-secure'
|
||||
|
||||
export type ConvoParams = {
|
||||
convoId: string
|
||||
agent: BskyAgent
|
||||
__tempFromUserDid: string
|
||||
}
|
||||
|
||||
export enum ConvoStatus {
|
||||
Uninitialized = 'uninitialized',
|
||||
Initializing = 'initializing',
|
||||
Ready = 'ready',
|
||||
Error = 'error',
|
||||
Destroyed = 'destroyed',
|
||||
}
|
||||
|
||||
export type ConvoItem =
|
||||
| {
|
||||
type: 'message'
|
||||
key: string
|
||||
message: ChatBskyConvoDefs.MessageView
|
||||
nextMessage:
|
||||
| ChatBskyConvoDefs.MessageView
|
||||
| ChatBskyConvoDefs.DeletedMessageView
|
||||
| null
|
||||
}
|
||||
| {
|
||||
type: 'deleted-message'
|
||||
key: string
|
||||
message: ChatBskyConvoDefs.DeletedMessageView
|
||||
nextMessage:
|
||||
| ChatBskyConvoDefs.MessageView
|
||||
| ChatBskyConvoDefs.DeletedMessageView
|
||||
| null
|
||||
}
|
||||
| {
|
||||
type: 'pending-message'
|
||||
key: string
|
||||
message: ChatBskyConvoSendMessage.InputSchema['message']
|
||||
}
|
||||
|
||||
export type ConvoState =
|
||||
| {
|
||||
status: ConvoStatus.Uninitialized
|
||||
}
|
||||
| {
|
||||
status: ConvoStatus.Initializing
|
||||
}
|
||||
| {
|
||||
status: ConvoStatus.Ready
|
||||
items: ConvoItem[]
|
||||
convo: ChatBskyConvoDefs.ConvoView
|
||||
isFetchingHistory: boolean
|
||||
}
|
||||
| {
|
||||
status: ConvoStatus.Error
|
||||
error: any
|
||||
}
|
||||
| {
|
||||
status: ConvoStatus.Destroyed
|
||||
}
|
||||
|
||||
export class Convo {
|
||||
private convoId: string
|
||||
private agent: BskyAgent
|
||||
private __tempFromUserDid: string
|
||||
|
||||
private status: ConvoStatus = ConvoStatus.Uninitialized
|
||||
private error: any
|
||||
private convo: ChatBskyConvoDefs.ConvoView | undefined
|
||||
private historyCursor: string | undefined | null = undefined
|
||||
private isFetchingHistory = false
|
||||
private eventsCursor: string | undefined = undefined
|
||||
|
||||
private pastMessages: Map<
|
||||
string,
|
||||
ChatBskyConvoDefs.MessageView | ChatBskyConvoDefs.DeletedMessageView
|
||||
> = new Map()
|
||||
private newMessages: Map<
|
||||
string,
|
||||
ChatBskyConvoDefs.MessageView | ChatBskyConvoDefs.DeletedMessageView
|
||||
> = new Map()
|
||||
private pendingMessages: Map<
|
||||
string,
|
||||
{id: string; message: ChatBskyConvoSendMessage.InputSchema['message']}
|
||||
> = new Map()
|
||||
|
||||
private pendingEventIngestion: Promise<void> | undefined
|
||||
|
||||
constructor(params: ConvoParams) {
|
||||
this.convoId = params.convoId
|
||||
this.agent = params.agent
|
||||
this.__tempFromUserDid = params.__tempFromUserDid
|
||||
}
|
||||
|
||||
async initialize() {
|
||||
if (this.status !== 'uninitialized') return
|
||||
this.status = ConvoStatus.Initializing
|
||||
|
||||
try {
|
||||
const response = await this.agent.api.chat.bsky.convo.getConvo(
|
||||
{
|
||||
convoId: this.convoId,
|
||||
},
|
||||
{
|
||||
headers: {
|
||||
Authorization: this.__tempFromUserDid,
|
||||
},
|
||||
},
|
||||
)
|
||||
const {convo} = response.data
|
||||
|
||||
this.convo = convo
|
||||
this.status = ConvoStatus.Ready
|
||||
|
||||
this.commit()
|
||||
|
||||
await this.fetchMessageHistory()
|
||||
|
||||
this.pollEvents()
|
||||
} catch (e) {
|
||||
this.status = ConvoStatus.Error
|
||||
this.error = e
|
||||
}
|
||||
}
|
||||
|
||||
private async pollEvents() {
|
||||
if (this.status === ConvoStatus.Destroyed) return
|
||||
if (this.pendingEventIngestion) return
|
||||
setTimeout(async () => {
|
||||
this.pendingEventIngestion = this.ingestLatestEvents()
|
||||
await this.pendingEventIngestion
|
||||
this.pendingEventIngestion = undefined
|
||||
this.pollEvents()
|
||||
}, 5e3)
|
||||
}
|
||||
|
||||
async fetchMessageHistory() {
|
||||
if (this.status === ConvoStatus.Destroyed) return
|
||||
// reached end
|
||||
if (this.historyCursor === null) return
|
||||
if (this.isFetchingHistory) return
|
||||
|
||||
this.isFetchingHistory = true
|
||||
this.commit()
|
||||
|
||||
/*
|
||||
* Delay if paginating while scrolled.
|
||||
*
|
||||
* TODO why does the FlatList jump without this delay?
|
||||
*
|
||||
* Tbh it feels a little more natural with a slight delay.
|
||||
*/
|
||||
if (this.pastMessages.size > 0) {
|
||||
await new Promise(y => setTimeout(y, 500))
|
||||
}
|
||||
|
||||
const response = await this.agent.api.chat.bsky.convo.getMessages(
|
||||
{
|
||||
cursor: this.historyCursor,
|
||||
convoId: this.convoId,
|
||||
limit: 20,
|
||||
},
|
||||
{
|
||||
headers: {
|
||||
Authorization: this.__tempFromUserDid,
|
||||
},
|
||||
},
|
||||
)
|
||||
const {cursor, messages} = response.data
|
||||
|
||||
this.historyCursor = cursor || null
|
||||
|
||||
for (const message of messages) {
|
||||
if (
|
||||
ChatBskyConvoDefs.isMessageView(message) ||
|
||||
ChatBskyConvoDefs.isDeletedMessageView(message)
|
||||
) {
|
||||
this.pastMessages.set(message.id, message)
|
||||
|
||||
// set to latest rev
|
||||
if (
|
||||
message.rev > (this.eventsCursor = this.eventsCursor || message.rev)
|
||||
) {
|
||||
this.eventsCursor = message.rev
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
this.isFetchingHistory = false
|
||||
this.commit()
|
||||
}
|
||||
|
||||
async ingestLatestEvents() {
|
||||
if (this.status === ConvoStatus.Destroyed) return
|
||||
|
||||
const response = await this.agent.api.chat.bsky.convo.getLog(
|
||||
{
|
||||
cursor: this.eventsCursor,
|
||||
},
|
||||
{
|
||||
headers: {
|
||||
Authorization: this.__tempFromUserDid,
|
||||
},
|
||||
},
|
||||
)
|
||||
const {logs} = response.data
|
||||
|
||||
for (const log of logs) {
|
||||
/*
|
||||
* If there's a rev, we should handle it. If there's not a rev, we don't
|
||||
* know what it is.
|
||||
*/
|
||||
if (typeof log.rev === 'string') {
|
||||
/*
|
||||
* We only care about new events
|
||||
*/
|
||||
if (log.rev > (this.eventsCursor = this.eventsCursor || log.rev)) {
|
||||
/*
|
||||
* Update rev regardless of if it's a log type we care about or not
|
||||
*/
|
||||
this.eventsCursor = log.rev
|
||||
|
||||
/*
|
||||
* This is VERY important. We don't want to insert any messages from
|
||||
* your other chats.
|
||||
*
|
||||
* TODO there may be a better way to handle this
|
||||
*/
|
||||
if (log.convoId !== this.convoId) continue
|
||||
|
||||
if (
|
||||
ChatBskyConvoDefs.isLogCreateMessage(log) &&
|
||||
ChatBskyConvoDefs.isMessageView(log.message)
|
||||
) {
|
||||
if (this.newMessages.has(log.message.id)) {
|
||||
// Trust the log as the source of truth on ordering
|
||||
// TODO test this
|
||||
this.newMessages.delete(log.message.id)
|
||||
}
|
||||
this.newMessages.set(log.message.id, log.message)
|
||||
} else if (
|
||||
ChatBskyConvoDefs.isLogDeleteMessage(log) &&
|
||||
ChatBskyConvoDefs.isDeletedMessageView(log.message)
|
||||
) {
|
||||
/*
|
||||
* Update if we have this in state. If we don't, don't worry about it.
|
||||
*/
|
||||
if (this.pastMessages.has(log.message.id)) {
|
||||
/*
|
||||
* For now, we remove deleted messages from the thread, if we receive one.
|
||||
*
|
||||
* To support them, it'd look something like this:
|
||||
* this.pastMessages.set(log.message.id, log.message)
|
||||
*/
|
||||
this.pastMessages.delete(log.message.id)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
this.commit()
|
||||
}
|
||||
|
||||
async sendMessage(message: ChatBskyConvoSendMessage.InputSchema['message']) {
|
||||
if (this.status === ConvoStatus.Destroyed) return
|
||||
// Ignore empty messages for now since they have no other purpose atm
|
||||
if (!message.text) return
|
||||
|
||||
const tempId = nanoid()
|
||||
|
||||
this.pendingMessages.set(tempId, {
|
||||
id: tempId,
|
||||
message,
|
||||
})
|
||||
this.commit()
|
||||
|
||||
await new Promise(y => setTimeout(y, 500))
|
||||
const response = await this.agent.api.chat.bsky.convo.sendMessage(
|
||||
{
|
||||
convoId: this.convoId,
|
||||
message,
|
||||
},
|
||||
{
|
||||
encoding: 'application/json',
|
||||
headers: {
|
||||
Authorization: this.__tempFromUserDid,
|
||||
},
|
||||
},
|
||||
)
|
||||
const res = response.data
|
||||
|
||||
/*
|
||||
* Insert into `newMessages` as soon as we have a real ID. That way, when
|
||||
* we get an event log back, we can replace in situ.
|
||||
*/
|
||||
this.newMessages.set(res.id, {
|
||||
...res,
|
||||
$type: 'chat.bsky.convo.defs#messageView',
|
||||
sender: this.convo?.members.find(m => m.did === this.__tempFromUserDid),
|
||||
})
|
||||
this.pendingMessages.delete(tempId)
|
||||
|
||||
this.commit()
|
||||
}
|
||||
|
||||
/*
|
||||
* Items in reverse order, since FlatList inverts
|
||||
*/
|
||||
get items(): ConvoItem[] {
|
||||
const items: ConvoItem[] = []
|
||||
|
||||
// `newMessages` is in insertion order, unshift to reverse
|
||||
this.newMessages.forEach(m => {
|
||||
if (ChatBskyConvoDefs.isMessageView(m)) {
|
||||
items.unshift({
|
||||
type: 'message',
|
||||
key: m.id,
|
||||
message: m,
|
||||
nextMessage: null,
|
||||
})
|
||||
} else if (ChatBskyConvoDefs.isDeletedMessageView(m)) {
|
||||
items.unshift({
|
||||
type: 'deleted-message',
|
||||
key: m.id,
|
||||
message: m,
|
||||
nextMessage: null,
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
// `newMessages` is in insertion order, unshift to reverse
|
||||
this.pendingMessages.forEach(m => {
|
||||
items.unshift({
|
||||
type: 'pending-message',
|
||||
key: m.id,
|
||||
message: m.message,
|
||||
})
|
||||
})
|
||||
|
||||
this.pastMessages.forEach(m => {
|
||||
if (ChatBskyConvoDefs.isMessageView(m)) {
|
||||
items.push({
|
||||
type: 'message',
|
||||
key: m.id,
|
||||
message: m,
|
||||
nextMessage: null,
|
||||
})
|
||||
} else if (ChatBskyConvoDefs.isDeletedMessageView(m)) {
|
||||
items.push({
|
||||
type: 'deleted-message',
|
||||
key: m.id,
|
||||
message: m,
|
||||
nextMessage: null,
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
return items.map((item, i) => {
|
||||
let nextMessage = null
|
||||
|
||||
if (
|
||||
ChatBskyConvoDefs.isMessageView(item.message) ||
|
||||
ChatBskyConvoDefs.isDeletedMessageView(item.message)
|
||||
) {
|
||||
const next = items[i - 1]
|
||||
if (
|
||||
next &&
|
||||
(ChatBskyConvoDefs.isMessageView(next.message) ||
|
||||
ChatBskyConvoDefs.isDeletedMessageView(next.message))
|
||||
) {
|
||||
nextMessage = next.message
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
...item,
|
||||
nextMessage,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
destroy() {
|
||||
this.status = ConvoStatus.Destroyed
|
||||
this.commit()
|
||||
}
|
||||
|
||||
get state(): ConvoState {
|
||||
switch (this.status) {
|
||||
case ConvoStatus.Initializing: {
|
||||
return {
|
||||
status: ConvoStatus.Initializing,
|
||||
}
|
||||
}
|
||||
case ConvoStatus.Ready: {
|
||||
return {
|
||||
status: ConvoStatus.Ready,
|
||||
items: this.items,
|
||||
convo: this.convo!,
|
||||
isFetchingHistory: this.isFetchingHistory,
|
||||
}
|
||||
}
|
||||
case ConvoStatus.Error: {
|
||||
return {
|
||||
status: ConvoStatus.Error,
|
||||
error: this.error,
|
||||
}
|
||||
}
|
||||
case ConvoStatus.Destroyed: {
|
||||
return {
|
||||
status: ConvoStatus.Destroyed,
|
||||
}
|
||||
}
|
||||
default: {
|
||||
return {
|
||||
status: ConvoStatus.Uninitialized,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private _emitter = new EventEmitter()
|
||||
|
||||
private commit() {
|
||||
this._emitter.emit('update')
|
||||
}
|
||||
|
||||
on(event: 'update', cb: () => void) {
|
||||
this._emitter.on(event, cb)
|
||||
}
|
||||
|
||||
off(event: 'update', cb: () => void) {
|
||||
this._emitter.off(event, cb)
|
||||
}
|
||||
}
|
57
src/state/messages/index.tsx
Normal file
57
src/state/messages/index.tsx
Normal file
|
@ -0,0 +1,57 @@
|
|||
import React from 'react'
|
||||
import {BskyAgent} from '@atproto-labs/api'
|
||||
|
||||
import {Convo, ConvoParams} from '#/state/messages/convo'
|
||||
import {useAgent} from '#/state/session'
|
||||
import {useDmServiceUrlStorage} from '#/screens/Messages/Temp/useDmServiceUrlStorage'
|
||||
|
||||
const ChatContext = React.createContext<{
|
||||
service: Convo
|
||||
state: Convo['state']
|
||||
}>({
|
||||
// @ts-ignore
|
||||
service: null,
|
||||
// @ts-ignore
|
||||
state: null,
|
||||
})
|
||||
|
||||
export function useChat() {
|
||||
return React.useContext(ChatContext)
|
||||
}
|
||||
|
||||
export function ChatProvider({
|
||||
children,
|
||||
convoId,
|
||||
}: Pick<ConvoParams, 'convoId'> & {children: React.ReactNode}) {
|
||||
const {serviceUrl} = useDmServiceUrlStorage()
|
||||
const {getAgent} = useAgent()
|
||||
const [service] = React.useState(
|
||||
() =>
|
||||
new Convo({
|
||||
convoId,
|
||||
agent: new BskyAgent({
|
||||
service: serviceUrl,
|
||||
}),
|
||||
__tempFromUserDid: getAgent().session?.did!,
|
||||
}),
|
||||
)
|
||||
const [state, setState] = React.useState(service.state)
|
||||
|
||||
React.useEffect(() => {
|
||||
service.initialize()
|
||||
}, [service])
|
||||
|
||||
React.useEffect(() => {
|
||||
const update = () => setState(service.state)
|
||||
service.on('update', update)
|
||||
return () => {
|
||||
service.destroy()
|
||||
}
|
||||
}, [service])
|
||||
|
||||
return (
|
||||
<ChatContext.Provider value={{state, service}}>
|
||||
{children}
|
||||
</ChatContext.Provider>
|
||||
)
|
||||
}
|
25
src/state/queries/messages/conversation.ts
Normal file
25
src/state/queries/messages/conversation.ts
Normal file
|
@ -0,0 +1,25 @@
|
|||
import {BskyAgent} from '@atproto-labs/api'
|
||||
import {useQuery} from '@tanstack/react-query'
|
||||
|
||||
import {useDmServiceUrlStorage} from '#/screens/Messages/Temp/useDmServiceUrlStorage'
|
||||
import {useHeaders} from './temp-headers'
|
||||
|
||||
const RQKEY_ROOT = 'convo'
|
||||
export const RQKEY = (convoId: string) => [RQKEY_ROOT, convoId]
|
||||
|
||||
export function useConvoQuery(convoId: string) {
|
||||
const headers = useHeaders()
|
||||
const {serviceUrl} = useDmServiceUrlStorage()
|
||||
|
||||
return useQuery({
|
||||
queryKey: RQKEY(convoId),
|
||||
queryFn: async () => {
|
||||
const agent = new BskyAgent({service: serviceUrl})
|
||||
const {data} = await agent.api.chat.bsky.convo.getConvo(
|
||||
{convoId},
|
||||
{headers},
|
||||
)
|
||||
return data.convo
|
||||
},
|
||||
})
|
||||
}
|
35
src/state/queries/messages/get-convo-for-members.ts
Normal file
35
src/state/queries/messages/get-convo-for-members.ts
Normal file
|
@ -0,0 +1,35 @@
|
|||
import {BskyAgent, ChatBskyConvoGetConvoForMembers} from '@atproto-labs/api'
|
||||
import {useMutation, useQueryClient} from '@tanstack/react-query'
|
||||
|
||||
import {useDmServiceUrlStorage} from '#/screens/Messages/Temp/useDmServiceUrlStorage'
|
||||
import {RQKEY as CONVO_KEY} from './conversation'
|
||||
import {useHeaders} from './temp-headers'
|
||||
|
||||
export function useGetConvoForMembers({
|
||||
onSuccess,
|
||||
onError,
|
||||
}: {
|
||||
onSuccess?: (data: ChatBskyConvoGetConvoForMembers.OutputSchema) => void
|
||||
onError?: (error: Error) => void
|
||||
}) {
|
||||
const queryClient = useQueryClient()
|
||||
const headers = useHeaders()
|
||||
const {serviceUrl} = useDmServiceUrlStorage()
|
||||
|
||||
return useMutation({
|
||||
mutationFn: async (members: string[]) => {
|
||||
const agent = new BskyAgent({service: serviceUrl})
|
||||
const {data} = await agent.api.chat.bsky.convo.getConvoForMembers(
|
||||
{members: members},
|
||||
{headers},
|
||||
)
|
||||
|
||||
return data
|
||||
},
|
||||
onSuccess: data => {
|
||||
queryClient.setQueryData(CONVO_KEY(data.convo.id), data.convo)
|
||||
onSuccess?.(data)
|
||||
},
|
||||
onError,
|
||||
})
|
||||
}
|
28
src/state/queries/messages/list-converations.ts
Normal file
28
src/state/queries/messages/list-converations.ts
Normal file
|
@ -0,0 +1,28 @@
|
|||
import {BskyAgent} from '@atproto-labs/api'
|
||||
import {useInfiniteQuery} from '@tanstack/react-query'
|
||||
|
||||
import {useDmServiceUrlStorage} from '#/screens/Messages/Temp/useDmServiceUrlStorage'
|
||||
import {useHeaders} from './temp-headers'
|
||||
|
||||
export const RQKEY = ['convo-list']
|
||||
type RQPageParam = string | undefined
|
||||
|
||||
export function useListConvos() {
|
||||
const headers = useHeaders()
|
||||
const {serviceUrl} = useDmServiceUrlStorage()
|
||||
|
||||
return useInfiniteQuery({
|
||||
queryKey: RQKEY,
|
||||
queryFn: async ({pageParam}) => {
|
||||
const agent = new BskyAgent({service: serviceUrl})
|
||||
const {data} = await agent.api.chat.bsky.convo.listConvos(
|
||||
{cursor: pageParam},
|
||||
{headers},
|
||||
)
|
||||
|
||||
return data
|
||||
},
|
||||
initialPageParam: undefined as RQPageParam,
|
||||
getNextPageParam: lastPage => lastPage.cursor,
|
||||
})
|
||||
}
|
11
src/state/queries/messages/temp-headers.ts
Normal file
11
src/state/queries/messages/temp-headers.ts
Normal file
|
@ -0,0 +1,11 @@
|
|||
import {useSession} from '#/state/session'
|
||||
|
||||
// toy auth
|
||||
export const useHeaders = () => {
|
||||
const {currentAccount} = useSession()
|
||||
return {
|
||||
get Authorization() {
|
||||
return currentAccount!.did
|
||||
},
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue