Update FirstCursorSinceReset even if we received zero new records

main
Max Ignatenko 2024-03-17 21:22:34 +00:00
parent 328a676e2a
commit e8c816a3a3
1 changed files with 23 additions and 14 deletions

View File

@ -209,6 +209,11 @@ retry:
newRev, err := repo.GetRev(ctx, bytes.NewReader(b)) newRev, err := repo.GetRev(ctx, bytes.NewReader(b))
if sinceRev != "" && errors.Is(err, repo.ErrZeroBlocks) { if sinceRev != "" && errors.Is(err, repo.ErrZeroBlocks) {
// No new records since the rev we requested above. // No new records since the rev we requested above.
if work.Repo.FirstCursorSinceReset < knownCursorBeforeFetch {
if err := p.bumpFirstCursorSinceReset(work.Repo.ID, knownCursorBeforeFetch); err != nil {
return fmt.Errorf("updating first_cursor_since_reset: %w", err)
}
}
return nil return nil
} else if err != nil { } else if err != nil {
l := 25 l := 25
@ -277,20 +282,7 @@ retry:
} }
if work.Repo.FirstCursorSinceReset < knownCursorBeforeFetch { if work.Repo.FirstCursorSinceReset < knownCursorBeforeFetch {
err := p.db.Transaction(func(tx *gorm.DB) error { if err := p.bumpFirstCursorSinceReset(work.Repo.ID, knownCursorBeforeFetch); err != nil {
var currentCursor int64
err := tx.Model(&repo.Repo{}).Where(&repo.Repo{ID: work.Repo.ID}).
Select("first_cursor_since_reset").First(&currentCursor).Error
if err != nil {
return fmt.Errorf("failed to get current cursor value: %w", err)
}
if currentCursor < knownCursorBeforeFetch {
return tx.Model(&repo.Repo{}).Where(&repo.Repo{ID: work.Repo.ID}).
Updates(&repo.Repo{FirstCursorSinceReset: knownCursorBeforeFetch}).Error
}
return nil
})
if err != nil {
return fmt.Errorf("updating first_cursor_since_reset: %w", err) return fmt.Errorf("updating first_cursor_since_reset: %w", err)
} }
} }
@ -300,6 +292,23 @@ retry:
return nil return nil
} }
// bumpFirstCursorSinceReset increases repo's FirstCursorSinceReset iff it is currently lower than the supplied value.
func (p *WorkerPool) bumpFirstCursorSinceReset(repoId models.ID, cursorValue int64) error {
return p.db.Transaction(func(tx *gorm.DB) error {
var currentCursor int64
err := tx.Model(&repo.Repo{}).Where(&repo.Repo{ID: repoId}).
Select("first_cursor_since_reset").First(&currentCursor).Error
if err != nil {
return fmt.Errorf("failed to get current cursor value: %w", err)
}
if currentCursor < cursorValue {
return tx.Model(&repo.Repo{}).Where(&repo.Repo{ID: repoId}).
Updates(&repo.Repo{FirstCursorSinceReset: cursorValue}).Error
}
return nil
})
}
func splitInBatshes[T any](s []T, batchSize int) [][]T { func splitInBatshes[T any](s []T, batchSize int) [][]T {
var r [][]T var r [][]T
for i := 0; i < len(s); i += batchSize { for i := 0; i < len(s); i += batchSize {