Limit the number of retries when failing to index a repo
parent
37ccca4c2f
commit
4626b8b9ca
|
@ -91,6 +91,7 @@ func (s *Scheduler) run(ctx context.Context) {
|
||||||
|
|
||||||
func (s *Scheduler) fillQueue(ctx context.Context) error {
|
func (s *Scheduler) fillQueue(ctx context.Context) error {
|
||||||
const maxQueueLen = 10000
|
const maxQueueLen = 10000
|
||||||
|
const maxAttempts = 3
|
||||||
|
|
||||||
if len(s.queue)+len(s.inProgress) >= maxQueueLen {
|
if len(s.queue)+len(s.inProgress) >= maxQueueLen {
|
||||||
return nil
|
return nil
|
||||||
|
@ -106,10 +107,10 @@ func (s *Scheduler) fillQueue(ctx context.Context) error {
|
||||||
for _, remote := range remotes {
|
for _, remote := range remotes {
|
||||||
repos := []repo.Repo{}
|
repos := []repo.Repo{}
|
||||||
|
|
||||||
err := s.db.Raw(`SELECT * FROM "repos" WHERE pds = ? AND (last_indexed_rev is null OR last_indexed_rev = '')
|
err := s.db.Raw(`SELECT * FROM "repos" WHERE pds = ? AND (last_indexed_rev is null OR last_indexed_rev = '') AND failed_attempts < ?
|
||||||
UNION
|
UNION
|
||||||
SELECT * FROM "repos" WHERE pds = ? AND (first_rev_since_reset is not null AND first_rev_since_reset <> '' AND last_indexed_rev < first_rev_since_reset) LIMIT ?`,
|
SELECT * FROM "repos" WHERE pds = ? AND (first_rev_since_reset is not null AND first_rev_since_reset <> '' AND last_indexed_rev < first_rev_since_reset) AND failed_attempts < ? LIMIT ?`,
|
||||||
remote.ID, remote.ID, perPDSLimit).
|
remote.ID, maxAttempts, remote.ID, maxAttempts, perPDSLimit).
|
||||||
Scan(&repos).Error
|
Scan(&repos).Error
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -111,13 +111,16 @@ func (p *WorkerPool) worker(ctx context.Context, signal chan struct{}) {
|
||||||
if err := p.doWork(ctx, work); err != nil {
|
if err := p.doWork(ctx, work); err != nil {
|
||||||
log.Error().Err(err).Msgf("Work task %q failed: %s", work.Repo.DID, err)
|
log.Error().Err(err).Msgf("Work task %q failed: %s", work.Repo.DID, err)
|
||||||
updates.LastError = err.Error()
|
updates.LastError = err.Error()
|
||||||
|
updates.FailedAttempts = work.Repo.FailedAttempts + 1
|
||||||
reposIndexed.WithLabelValues("false").Inc()
|
reposIndexed.WithLabelValues("false").Inc()
|
||||||
} else {
|
} else {
|
||||||
|
updates.FailedAttempts = 0
|
||||||
reposIndexed.WithLabelValues("true").Inc()
|
reposIndexed.WithLabelValues("true").Inc()
|
||||||
}
|
}
|
||||||
updates.LastIndexAttempt = time.Now()
|
updates.LastIndexAttempt = time.Now()
|
||||||
err := p.db.Model(&repo.Repo{}).
|
err := p.db.Model(&repo.Repo{}).
|
||||||
Where(&repo.Repo{ID: work.Repo.ID}).
|
Where(&repo.Repo{ID: work.Repo.ID}).
|
||||||
|
Select("last_error", "last_index_attempt", "failed_attempts").
|
||||||
Updates(updates).Error
|
Updates(updates).Error
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error().Err(err).Msgf("Failed to update repo info for %q: %s", work.Repo.DID, err)
|
log.Error().Err(err).Msgf("Failed to update repo info for %q: %s", work.Repo.DID, err)
|
||||||
|
|
|
@ -27,6 +27,7 @@ type Repo struct {
|
||||||
TombstonedAt time.Time
|
TombstonedAt time.Time
|
||||||
LastIndexAttempt time.Time
|
LastIndexAttempt time.Time
|
||||||
LastError string
|
LastError string
|
||||||
|
FailedAttempts int `gorm:"default:0"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type Record struct {
|
type Record struct {
|
||||||
|
|
Loading…
Reference in New Issue