Fix potential infinite loop for inactive repos after a cursor reset

main
Max Ignatenko 2024-03-17 19:36:12 +00:00
parent 4c41389e9b
commit 328a676e2a
2 changed files with 24 additions and 2 deletions

View File

@ -165,6 +165,8 @@ func (p *WorkerPool) doWork(ctx context.Context, work WorkItem) error {
client.Client = util.RobustHTTPClient() client.Client = util.RobustHTTPClient()
client.Client.Timeout = 30 * time.Minute client.Client.Timeout = 30 * time.Minute
knownCursorBeforeFetch := remote.FirstCursorSinceReset
retry: retry:
if p.limiter != nil { if p.limiter != nil {
if err := p.limiter.Wait(ctx, u.String()); err != nil { if err := p.limiter.Wait(ctx, u.String()); err != nil {
@ -274,6 +276,24 @@ retry:
return fmt.Errorf("updating repo rev: %w", err) return fmt.Errorf("updating repo rev: %w", err)
} }
if work.Repo.FirstCursorSinceReset < knownCursorBeforeFetch {
err := p.db.Transaction(func(tx *gorm.DB) error {
var currentCursor int64
err := tx.Model(&repo.Repo{}).Where(&repo.Repo{ID: work.Repo.ID}).
Select("first_cursor_since_reset").First(&currentCursor).Error
if err != nil {
return fmt.Errorf("failed to get current cursor value: %w", err)
}
if currentCursor < knownCursorBeforeFetch {
return tx.Model(&repo.Repo{}).Where(&repo.Repo{ID: work.Repo.ID}).
Updates(&repo.Repo{FirstCursorSinceReset: knownCursorBeforeFetch}).Error
}
return nil
})
if err != nil {
return fmt.Errorf("updating first_cursor_since_reset: %w", err)
}
}
// TODO: check for records that are missing in the repo download // TODO: check for records that are missing in the repo download
// and mark them as deleted. // and mark them as deleted.

View File

@ -54,9 +54,13 @@ repo-specific `rev` which is the same with a full repo fetch.
### Indexing a repo ### Indexing a repo
* Resolve the current PDS hosting the repo and store its `FirstCursorSinceReset` in a variable
* If the PDS is different from the one we have on record (i.e., the repo migrated) - update accordingly
* Fetch the repo * Fetch the repo
* Upsert all fetched records * Upsert all fetched records
* Set `LastIndexedRev` to `rev` of the fetched repo * Set `LastIndexedRev` to `rev` of the fetched repo
* In a transaction check if `Repo`.`FirstCursorSinceReset` >= the value stored in the first step, and set it to that value if it isn't.
* Assumption here is that a PDS returns strongly consistent responses for a single repo, and fetching the repo will include all records corresponding to a cursor value generated before that.
### Connecting to firehose ### Connecting to firehose
@ -112,5 +116,3 @@ Currently we're simply resetting `FirstRevSinceReset`.
* `LastIndexedRev` is not set * `LastIndexedRev` is not set
* `LastIndexedRev` < `FirstCursorSinceReset` * `LastIndexedRev` < `FirstCursorSinceReset`
* `Repo`.`FirstCursorSinceReset` < `PDS`.`FirstCursorSinceReset` * `Repo`.`FirstCursorSinceReset` < `PDS`.`FirstCursorSinceReset`
* TODO: avoid reindexing the repo forever if there are no new firehose
events for it.