Populate FirstRevSinceReset if empty

main
Max Ignatenko 2024-03-13 11:32:23 +00:00
parent 87d510e67a
commit e150f1da90
1 changed files with 35 additions and 8 deletions

View File

@ -77,7 +77,7 @@ func (l *Lister) run(ctx context.Context) {
client.Host = remote.Host client.Host = remote.Host
log.Info().Msgf("Listing repos from %q...", remote.Host) log.Info().Msgf("Listing repos from %q...", remote.Host)
dids, err := pagination.Reduce( repos, err := pagination.Reduce(
func(cursor string) (resp *comatproto.SyncListRepos_Output, nextCursor string, err error) { func(cursor string) (resp *comatproto.SyncListRepos_Output, nextCursor string, err error) {
resp, err = comatproto.SyncListRepos(ctx, client, cursor, 200) resp, err = comatproto.SyncListRepos(ctx, client, cursor, 200)
if err == nil && resp.Cursor != nil { if err == nil && resp.Cursor != nil {
@ -85,12 +85,12 @@ func (l *Lister) run(ctx context.Context) {
} }
return return
}, },
func(resp *comatproto.SyncListRepos_Output, acc []string) ([]string, error) { func(resp *comatproto.SyncListRepos_Output, acc []*comatproto.SyncListRepos_Repo) ([]*comatproto.SyncListRepos_Repo, error) {
for _, repo := range resp.Repos { for _, repo := range resp.Repos {
if repo == nil { if repo == nil {
continue continue
} }
acc = append(acc, repo.Did) acc = append(acc, repo)
} }
return acc, nil return acc, nil
}) })
@ -99,15 +99,42 @@ func (l *Lister) run(ctx context.Context) {
log.Error().Err(err).Msgf("Failed to list repos from %q: %s", remote.Host, err) log.Error().Err(err).Msgf("Failed to list repos from %q: %s", remote.Host, err)
break break
} }
log.Info().Msgf("Received %d DIDs from %q", len(dids), remote.Host) log.Info().Msgf("Received %d DIDs from %q", len(repos), remote.Host)
reposListed.WithLabelValues(remote.Host).Add(float64(len(dids))) reposListed.WithLabelValues(remote.Host).Add(float64(len(repos)))
for _, did := range dids { for _, repoInfo := range repos {
if _, created, err := repo.EnsureExists(ctx, l.db, did); err != nil { record, created, err := repo.EnsureExists(ctx, l.db, repoInfo.Did)
log.Error().Err(err).Msgf("Failed to ensure that we have a record for the repo %q: %s", did, err) if err != nil {
log.Error().Err(err).Msgf("Failed to ensure that we have a record for the repo %q: %s", repoInfo.Did, err)
} else if created { } else if created {
reposDiscovered.WithLabelValues(remote.Host).Inc() reposDiscovered.WithLabelValues(remote.Host).Inc()
} }
if record.FirstRevSinceReset == "" {
// Populate this field in case it's empty, so we don't have to wait for the first firehose event
// to trigger a resync.
err := l.db.Transaction(func(tx *gorm.DB) error {
var currentRecord repo.Repo
if err := tx.Model(&record).Where(&repo.Repo{ID: record.ID}).Take(&currentRecord).Error; err != nil {
return err
}
if currentRecord.FirstRevSinceReset != "" {
// Someone else already updated it, nothing to do.
return nil
}
var remote pds.PDS
if err := tx.Model(&remote).Where(&pds.PDS{ID: record.PDS}).Take(&remote).Error; err != nil {
return err
}
return tx.Model(&record).Where(&repo.Repo{ID: record.ID}).Updates(&repo.Repo{
FirstRevSinceReset: repoInfo.Rev,
FirstCursorSinceReset: remote.FirstCursorSinceReset,
}).Error
})
if err != nil {
log.Error().Err(err).Msgf("Failed to set the initial FirstRevSinceReset value for %q: %s", repoInfo.Did, err)
}
}
} }
if err := db.Model(&remote).Updates(&pds.PDS{LastList: time.Now()}).Error; err != nil { if err := db.Model(&remote).Updates(&pds.PDS{LastList: time.Now()}).Error; err != nil {