Check repos against PDS cursor resets, instead of waiting for a first new even for them on firehose

main
Max Ignatenko 2024-03-13 10:39:41 +00:00
parent 57aa4731e5
commit 87d510e67a
2 changed files with 19 additions and 2 deletions

View File

@ -119,7 +119,16 @@ func (s *Scheduler) fillQueue(ctx context.Context) error {
err := s.db.Raw(`SELECT * FROM "repos" WHERE pds = ? AND (last_indexed_rev is null OR last_indexed_rev = '') AND failed_attempts < ? err := s.db.Raw(`SELECT * FROM "repos" WHERE pds = ? AND (last_indexed_rev is null OR last_indexed_rev = '') AND failed_attempts < ?
UNION UNION
SELECT * FROM "repos" WHERE pds = ? AND (first_rev_since_reset is not null AND first_rev_since_reset <> '' AND last_indexed_rev < first_rev_since_reset) AND failed_attempts < ? LIMIT ?`, SELECT "repos".* FROM "repos" left join "pds" on repos.pds = pds.id WHERE pds = ?
AND
(
(first_rev_since_reset is not null AND first_rev_since_reset <> ''
AND last_indexed_rev < first_rev_since_reset)
OR
("repos".first_cursor_since_reset is not null AND "repos".first_cursor_since_reset <> 0
AND "repos".first_cursor_since_reset < "pds".first_cursor_since_reset)
)
AND failed_attempts < ? LIMIT ?`,
remote.ID, maxAttempts, remote.ID, maxAttempts, perPDSLimit). remote.ID, maxAttempts, remote.ID, maxAttempts, perPDSLimit).
Scan(&repos).Error Scan(&repos).Error

View File

@ -23,7 +23,15 @@ queries:
interval: 30 interval: 30
databases: [db1] databases: [db1]
metrics: [repos_fully_indexed] metrics: [repos_fully_indexed]
sql: select count(*) as repos_fully_indexed from repos where failed_attempts < 3 and last_indexed_rev <> '' and (last_indexed_rev >= first_rev_since_reset or first_rev_since_reset is null or first_rev_since_reset = ''); sql: >
select count(*) as repos_fully_indexed
from repos left join pds on repos.pds = pds.id
where failed_attempts < 3
and last_indexed_rev <> ''
and (last_indexed_rev >= first_rev_since_reset
or first_rev_since_reset is null or first_rev_since_reset = '')
and (repos.first_cursor_since_reset >= pds.first_cursor_since_reset
or repos.first_cursor_since_reset is null or repos.first_cursor_since_reset = 0);
query2: query2:
interval: 30 interval: 30
databases: [db1] databases: [db1]