Add retries to all handlers (#3935)

zio/stable
Eric Bailey 2024-05-09 16:31:36 -05:00 committed by GitHub
parent becc708c61
commit 55fdbc7399
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 90 additions and 73 deletions

View File

@ -7,6 +7,7 @@ import {
} from '@atproto-labs/api' } from '@atproto-labs/api'
import {nanoid} from 'nanoid/non-secure' import {nanoid} from 'nanoid/non-secure'
import {networkRetry} from '#/lib/async/retry'
import {logger} from '#/logger' import {logger} from '#/logger'
import {isNative} from '#/platform/detection' import {isNative} from '#/platform/detection'
import { import {
@ -459,16 +460,18 @@ export class Convo {
recipients: AppBskyActorDefs.ProfileViewBasic[] recipients: AppBskyActorDefs.ProfileViewBasic[]
}>(async (resolve, reject) => { }>(async (resolve, reject) => {
try { try {
const response = await this.agent.api.chat.bsky.convo.getConvo( const response = await networkRetry(2, () => {
{ return this.agent.api.chat.bsky.convo.getConvo(
convoId: this.convoId, {
}, convoId: this.convoId,
{
headers: {
Authorization: this.__tempFromUserDid,
}, },
}, {
) headers: {
Authorization: this.__tempFromUserDid,
},
},
)
})
const convo = response.data.convo const convo = response.data.convo
@ -544,18 +547,21 @@ export class Convo {
// throw new Error('UNCOMMENT TO TEST RETRY') // throw new Error('UNCOMMENT TO TEST RETRY')
} }
const response = await this.agent.api.chat.bsky.convo.getMessages( const nextCursor = this.oldestRev // for TS
{ const response = await networkRetry(2, () => {
cursor: this.oldestRev, return this.agent.api.chat.bsky.convo.getMessages(
convoId: this.convoId, {
limit: isNative ? 25 : 50, cursor: nextCursor,
}, convoId: this.convoId,
{ limit: isNative ? 40 : 60,
headers: {
Authorization: this.__tempFromUserDid,
}, },
}, {
) headers: {
Authorization: this.__tempFromUserDid,
},
},
)
})
const {cursor, messages} = response.data const {cursor, messages} = response.data
this.oldestRev = cursor ?? null this.oldestRev = cursor ?? null
@ -736,18 +742,20 @@ export class Convo {
// throw new Error('UNCOMMENT TO TEST RETRY') // throw new Error('UNCOMMENT TO TEST RETRY')
const {id, message} = pendingMessage const {id, message} = pendingMessage
const response = await this.agent.api.chat.bsky.convo.sendMessage( const response = await networkRetry(2, () => {
{ return this.agent.api.chat.bsky.convo.sendMessage(
convoId: this.convoId, {
message, convoId: this.convoId,
}, message,
{
encoding: 'application/json',
headers: {
Authorization: this.__tempFromUserDid,
}, },
}, {
) encoding: 'application/json',
headers: {
Authorization: this.__tempFromUserDid,
},
},
)
})
const res = response.data const res = response.data
/* /*
@ -786,20 +794,22 @@ export class Convo {
try { try {
const messageArray = Array.from(this.pendingMessages.values()) const messageArray = Array.from(this.pendingMessages.values())
const {data} = await this.agent.api.chat.bsky.convo.sendMessageBatch( const {data} = await networkRetry(2, () => {
{ return this.agent.api.chat.bsky.convo.sendMessageBatch(
items: messageArray.map(({message}) => ({ {
convoId: this.convoId, items: messageArray.map(({message}) => ({
message, convoId: this.convoId,
})), message,
}, })),
{
encoding: 'application/json',
headers: {
Authorization: this.__tempFromUserDid,
}, },
}, {
) encoding: 'application/json',
headers: {
Authorization: this.__tempFromUserDid,
},
},
)
})
const {items} = data const {items} = data
/* /*
@ -838,18 +848,20 @@ export class Convo {
this.commit() this.commit()
try { try {
await this.agent.api.chat.bsky.convo.deleteMessageForSelf( await networkRetry(2, () => {
{ return this.agent.api.chat.bsky.convo.deleteMessageForSelf(
convoId: this.convoId, {
messageId, convoId: this.convoId,
}, messageId,
{
encoding: 'application/json',
headers: {
Authorization: this.__tempFromUserDid,
}, },
}, {
) encoding: 'application/json',
headers: {
Authorization: this.__tempFromUserDid,
},
},
)
})
} catch (e) { } catch (e) {
this.deletedMessages.delete(messageId) this.deletedMessages.delete(messageId)
this.commit() this.commit()

View File

@ -2,6 +2,7 @@ import {BskyAgent, ChatBskyConvoGetLog} from '@atproto-labs/api'
import EventEmitter from 'eventemitter3' import EventEmitter from 'eventemitter3'
import {nanoid} from 'nanoid/non-secure' import {nanoid} from 'nanoid/non-secure'
import {networkRetry} from '#/lib/async/retry'
import {logger} from '#/logger' import {logger} from '#/logger'
import {DEFAULT_POLL_INTERVAL} from '#/state/messages/events/const' import {DEFAULT_POLL_INTERVAL} from '#/state/messages/events/const'
import { import {
@ -265,16 +266,18 @@ export class MessagesEventBus {
logger.debug(`${LOGGER_CONTEXT}: init`, {}, logger.DebugContext.convo) logger.debug(`${LOGGER_CONTEXT}: init`, {}, logger.DebugContext.convo)
try { try {
const response = await this.agent.api.chat.bsky.convo.listConvos( const response = await networkRetry(2, () => {
{ return this.agent.api.chat.bsky.convo.listConvos(
limit: 1, {
}, limit: 1,
{
headers: {
Authorization: this.__tempFromUserDid,
}, },
}, {
) headers: {
Authorization: this.__tempFromUserDid,
},
},
)
})
// throw new Error('UNCOMMENT TO TEST INIT FAILURE') // throw new Error('UNCOMMENT TO TEST INIT FAILURE')
const {convos} = response.data const {convos} = response.data
@ -358,16 +361,18 @@ export class MessagesEventBus {
// ) // )
try { try {
const response = await this.agent.api.chat.bsky.convo.getLog( const response = await networkRetry(2, () => {
{ return this.agent.api.chat.bsky.convo.getLog(
cursor: this.latestRev, {
}, cursor: this.latestRev,
{
headers: {
Authorization: this.__tempFromUserDid,
}, },
}, {
) headers: {
Authorization: this.__tempFromUserDid,
},
},
)
})
// throw new Error('UNCOMMENT TO TEST POLL FAILURE') // throw new Error('UNCOMMENT TO TEST POLL FAILURE')