Merge branch 'main' of github.com:uabluerail/indexer
commit
78a17bf238
|
@ -28,6 +28,7 @@ import (
|
|||
"github.com/uabluerail/indexer/models"
|
||||
"github.com/uabluerail/indexer/pds"
|
||||
"github.com/uabluerail/indexer/repo"
|
||||
"github.com/uabluerail/indexer/util/resolver"
|
||||
)
|
||||
|
||||
type BadRecord struct {
|
||||
|
@ -245,11 +246,33 @@ func (c *Consumer) processMessage(ctx context.Context, typ string, r io.Reader,
|
|||
if err != nil {
|
||||
return fmt.Errorf("repo.EnsureExists(%q): %w", payload.Repo, err)
|
||||
}
|
||||
if repoInfo.PDS != models.ID(c.remote.ID) {
|
||||
if repoInfo.PDS != c.remote.ID {
|
||||
u, err := resolver.GetPDSEndpoint(ctx, payload.Repo)
|
||||
if err == nil {
|
||||
cur, err := pds.EnsureExists(ctx, c.db, u.String())
|
||||
if err == nil {
|
||||
if repoInfo.PDS != cur.ID {
|
||||
// Repo was migrated, lets update our record.
|
||||
err := c.db.Model(repoInfo).Where(&repo.Repo{ID: repoInfo.ID}).Updates(&repo.Repo{PDS: cur.ID}).Error
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msgf("Repo %q was migrated to %q, but updating the repo has failed: %s", payload.Repo, cur.Host, err)
|
||||
}
|
||||
}
|
||||
repoInfo.PDS = cur.ID
|
||||
} else {
|
||||
log.Error().Err(err).Msgf("Failed to get PDS record for %q: %s", u, err)
|
||||
}
|
||||
} else {
|
||||
log.Error().Err(err).Msgf("Failed to get PDS endpoint for repo %q: %s", payload.Repo, err)
|
||||
}
|
||||
|
||||
if repoInfo.PDS != c.remote.ID {
|
||||
// We checked a recent version of DID doc and this is still not a correct PDS.
|
||||
log.Error().Str("did", payload.Repo).Str("rev", payload.Rev).
|
||||
Msgf("Commit from an incorrect PDS, skipping")
|
||||
return nil
|
||||
}
|
||||
}
|
||||
if created {
|
||||
reposDiscovered.WithLabelValues(c.remote.Host).Inc()
|
||||
}
|
||||
|
@ -443,8 +466,25 @@ func (c *Consumer) processMessage(ctx context.Context, typ string, r io.Reader,
|
|||
default:
|
||||
log.Error().Msgf("Unknown #info message %q: %+v", payload.Name, payload)
|
||||
}
|
||||
case "#identity":
|
||||
payload := &comatproto.SyncSubscribeRepos_Identity{}
|
||||
if err := payload.UnmarshalCBOR(r); err != nil {
|
||||
return fmt.Errorf("failed to unmarshal commit: %w", err)
|
||||
}
|
||||
|
||||
exportEventTimestamp(ctx, c.remote.Host, payload.Time)
|
||||
log.Trace().Str("did", payload.Did).Str("type", typ).Int64("seq", payload.Seq).
|
||||
Msgf("#identity message: %s seq=%d time=%q", payload.Did, payload.Seq, payload.Time)
|
||||
|
||||
resolver.Resolver.FlushCacheFor(payload.Did)
|
||||
|
||||
// TODO: fetch DID doc and update PDS field?
|
||||
default:
|
||||
log.Warn().Msgf("Unknown message type received: %s", typ)
|
||||
b, err := io.ReadAll(r)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msgf("Failed to read message payload: %s", err)
|
||||
}
|
||||
log.Warn().Msgf("Unknown message type received: %s payload=%q", typ, string(b))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -79,6 +79,9 @@ func runMain(ctx context.Context) error {
|
|||
}
|
||||
// TODO: check for changes and start/stop consumers as needed
|
||||
for _, remote := range remotes {
|
||||
if remote.Disabled {
|
||||
continue
|
||||
}
|
||||
c, err := NewConsumer(ctx, &remote, db)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create a consumer for %q: %w", remote.Host, err)
|
||||
|
|
|
@ -57,13 +57,22 @@ func (l *Lister) run(ctx context.Context) {
|
|||
|
||||
remote := pds.PDS{}
|
||||
if err := db.Model(&remote).
|
||||
Where("last_list is null or last_list < ?", time.Now().Add(-l.listRefreshInterval)).
|
||||
Where("disabled=false and (last_list is null or last_list < ?)", time.Now().Add(-l.listRefreshInterval)).
|
||||
Take(&remote).Error; err != nil {
|
||||
if !errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
log.Error().Err(err).Msgf("Failed to query DB for a PDS to list repos from: %s", err)
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
if !pds.IsWhitelisted(remote.Host) {
|
||||
log.Info().Msgf("PDS %q is not whitelisted, disabling it", remote.Host)
|
||||
if err := db.Model(&remote).Where(&pds.PDS{ID: remote.ID}).Updates(&pds.PDS{Disabled: true}).Error; err != nil {
|
||||
log.Error().Err(err).Msgf("Failed to disable PDS %q: %s", remote.Host, err)
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
client := xrpcauth.NewAnonymousClient(ctx)
|
||||
client.Host = remote.Host
|
||||
|
||||
|
|
|
@ -0,0 +1,26 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestPostgresFix(t *testing.T) {
|
||||
type testCase struct{ input, want string }
|
||||
|
||||
cases := []testCase{
|
||||
{`"a"`, `"a"`},
|
||||
{`"\u0000"`, `"<0x00>"`},
|
||||
{`"description":"\u0000"`, `"description":"<0x00>"`},
|
||||
{`"\\u0000"`, `"\\u0000"`},
|
||||
{`"\\\u0000"`, `"\\<0x00>"`},
|
||||
{`\n\n\u0000\u0000 \u0000\u0000\u0000\u0000 \u0000\u0000\u0000\u0000\u0000`,
|
||||
`\n\n<0x00><0x00> <0x00><0x00><0x00><0x00> <0x00><0x00><0x00><0x00><0x00>`},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
got := escapeNullCharForPostgres([]byte(tc.input))
|
||||
if string(got) != tc.want {
|
||||
t.Errorf("escapeNullCharForPostgres(%s) = %s, want %s", tc.input, string(got), tc.want)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -3,6 +3,7 @@ package main
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"slices"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
|
@ -101,7 +102,11 @@ func (s *Scheduler) fillQueue(ctx context.Context) error {
|
|||
if err := s.db.Find(&remotes).Error; err != nil {
|
||||
return fmt.Errorf("failed to get the list of PDSs: %w", err)
|
||||
}
|
||||
perPDSLimit := 0
|
||||
|
||||
remotes = slices.DeleteFunc(remotes, func(pds pds.PDS) bool {
|
||||
return pds.Disabled
|
||||
})
|
||||
perPDSLimit := maxQueueLen
|
||||
if len(remotes) > 0 {
|
||||
perPDSLimit = maxQueueLen * 2 / len(remotes)
|
||||
}
|
||||
|
|
|
@ -4,7 +4,6 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
|
@ -133,37 +132,32 @@ func (p *WorkerPool) worker(ctx context.Context, signal chan struct{}) {
|
|||
}
|
||||
}
|
||||
|
||||
var postgresFixRegexp = regexp.MustCompile(`[^\\](\\\\)*(\\u0000)`)
|
||||
var postgresFixRegexp = regexp.MustCompile(`([^\\](\\\\)*)(\\u0000)+`)
|
||||
|
||||
func escapeNullCharForPostgres(b []byte) []byte {
|
||||
return postgresFixRegexp.ReplaceAll(b, []byte(`$1<0x00>`))
|
||||
return postgresFixRegexp.ReplaceAllFunc(b, func(b []byte) []byte {
|
||||
return bytes.ReplaceAll(b, []byte(`\u0000`), []byte(`<0x00>`))
|
||||
})
|
||||
}
|
||||
|
||||
func (p *WorkerPool) doWork(ctx context.Context, work WorkItem) error {
|
||||
log := zerolog.Ctx(ctx)
|
||||
defer close(work.signal)
|
||||
|
||||
doc, err := resolver.GetDocument(ctx, work.Repo.DID)
|
||||
u, err := resolver.GetPDSEndpoint(ctx, work.Repo.DID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("resolving did %q: %w", work.Repo.DID, err)
|
||||
return err
|
||||
}
|
||||
|
||||
pdsHost := ""
|
||||
for _, srv := range doc.Service {
|
||||
if srv.Type != "AtprotoPersonalDataServer" {
|
||||
continue
|
||||
}
|
||||
pdsHost = srv.ServiceEndpoint
|
||||
}
|
||||
if pdsHost == "" {
|
||||
return fmt.Errorf("did not find any PDS in DID Document")
|
||||
}
|
||||
u, err := url.Parse(pdsHost)
|
||||
remote, err := pds.EnsureExists(ctx, p.db, u.String())
|
||||
if err != nil {
|
||||
return fmt.Errorf("PDS endpoint (%q) is an invalid URL: %w", pdsHost, err)
|
||||
return fmt.Errorf("failed to get PDS records for %q: %w", u, err)
|
||||
}
|
||||
if u.Host == "" {
|
||||
return fmt.Errorf("PDS endpoint (%q) doesn't have a host part", pdsHost)
|
||||
if work.Repo.PDS != remote.ID {
|
||||
if err := p.db.Model(&work.Repo).Where(&repo.Repo{ID: work.Repo.ID}).Updates(&repo.Repo{PDS: remote.ID}).Error; err != nil {
|
||||
return fmt.Errorf("failed to update repo's PDS to %q: %w", u, err)
|
||||
}
|
||||
work.Repo.PDS = remote.ID
|
||||
}
|
||||
|
||||
client := xrpcauth.NewAnonymousClient(ctx)
|
||||
|
|
2
go.mod
2
go.mod
|
@ -3,7 +3,7 @@ module github.com/uabluerail/indexer
|
|||
go 1.21.0
|
||||
|
||||
require (
|
||||
github.com/bluesky-social/indigo v0.0.0-20240213052310-89516fdbfe38
|
||||
github.com/bluesky-social/indigo v0.0.0-20240222031037-d6ed4eb62c91
|
||||
github.com/ipfs/go-cid v0.4.1
|
||||
github.com/ipld/go-car v0.6.2
|
||||
github.com/ipld/go-ipld-prime v0.21.0
|
||||
|
|
2
go.sum
2
go.sum
|
@ -7,6 +7,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
|||
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
|
||||
github.com/bluesky-social/indigo v0.0.0-20240213052310-89516fdbfe38 h1:KKXEauaiIkqW9HfChxp/t2tvHSQm8Pbbv/mottywm6g=
|
||||
github.com/bluesky-social/indigo v0.0.0-20240213052310-89516fdbfe38/go.mod h1:N3Fv7QoBtarvhoHtGLP2U+my6ZbAetbPMwAFTtRAUrI=
|
||||
github.com/bluesky-social/indigo v0.0.0-20240222031037-d6ed4eb62c91 h1:KuyyafTzRXxTkhFPKKMVyWjbaGK9Q7tO5AjKGaMvDCM=
|
||||
github.com/bluesky-social/indigo v0.0.0-20240222031037-d6ed4eb62c91/go.mod h1:zheM9Nt+x0CPWv7cQ/16tOFuxDRxr0fDjLTXRNfpKgQ=
|
||||
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8=
|
||||
github.com/btcsuite/btcd v0.0.0-20190523000118-16327141da8c/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI=
|
||||
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA=
|
||||
|
|
19
pds/pds.go
19
pds/pds.go
|
@ -3,6 +3,7 @@ package pds
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"gorm.io/gorm"
|
||||
|
@ -12,6 +13,11 @@ import (
|
|||
|
||||
const Unknown models.ID = 0
|
||||
|
||||
var whitelist []string = []string{
|
||||
"https://bsky.social",
|
||||
"https://*.bsky.network",
|
||||
}
|
||||
|
||||
type PDS struct {
|
||||
ID models.ID `gorm:"primarykey"`
|
||||
CreatedAt time.Time
|
||||
|
@ -21,6 +27,7 @@ type PDS struct {
|
|||
FirstCursorSinceReset int64
|
||||
LastList time.Time
|
||||
CrawlLimit int
|
||||
Disabled bool
|
||||
}
|
||||
|
||||
func AutoMigrate(db *gorm.DB) error {
|
||||
|
@ -28,9 +35,21 @@ func AutoMigrate(db *gorm.DB) error {
|
|||
}
|
||||
|
||||
func EnsureExists(ctx context.Context, db *gorm.DB, host string) (*PDS, error) {
|
||||
if !IsWhitelisted(host) {
|
||||
return nil, fmt.Errorf("host %q is not whitelisted", host)
|
||||
}
|
||||
remote := PDS{Host: host}
|
||||
if err := db.Model(&remote).Where(&PDS{Host: host}).FirstOrCreate(&remote).Error; err != nil {
|
||||
return nil, fmt.Errorf("failed to get PDS record from DB for %q: %w", remote.Host, err)
|
||||
}
|
||||
return &remote, nil
|
||||
}
|
||||
|
||||
func IsWhitelisted(host string) bool {
|
||||
for _, p := range whitelist {
|
||||
if match, _ := filepath.Match(p, host); match {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
|
20
repo/repo.go
20
repo/repo.go
|
@ -5,7 +5,6 @@ import (
|
|||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"gorm.io/gorm"
|
||||
|
@ -66,28 +65,11 @@ func EnsureExists(ctx context.Context, db *gorm.DB, did string) (*Repo, bool, er
|
|||
// if we do - compare PDS IDs
|
||||
// if they don't match - also reset FirstRevSinceReset
|
||||
|
||||
doc, err := resolver.GetDocument(ctx, did)
|
||||
u, err := resolver.GetPDSEndpoint(ctx, did)
|
||||
if err != nil {
|
||||
return nil, false, fmt.Errorf("fetching DID Document: %w", err)
|
||||
}
|
||||
|
||||
pdsHost := ""
|
||||
for _, srv := range doc.Service {
|
||||
if srv.Type != "AtprotoPersonalDataServer" {
|
||||
continue
|
||||
}
|
||||
pdsHost = srv.ServiceEndpoint
|
||||
}
|
||||
if pdsHost == "" {
|
||||
return nil, false, fmt.Errorf("did not find any PDS in DID Document")
|
||||
}
|
||||
u, err := url.Parse(pdsHost)
|
||||
if err != nil {
|
||||
return nil, false, fmt.Errorf("PDS endpoint (%q) is an invalid URL: %w", pdsHost, err)
|
||||
}
|
||||
if u.Host == "" {
|
||||
return nil, false, fmt.Errorf("PDS endpoint (%q) doesn't have a host part", pdsHost)
|
||||
}
|
||||
remote, err := pds.EnsureExists(ctx, db, u.String())
|
||||
if err != nil {
|
||||
return nil, false, fmt.Errorf("failed to get PDS record from DB for %q: %w", remote.Host, err)
|
||||
|
|
|
@ -3,6 +3,8 @@ package resolver
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"os"
|
||||
|
||||
"github.com/bluesky-social/indigo/api"
|
||||
|
@ -56,3 +58,29 @@ func (r *fallbackResolver) FlushCacheFor(did string) {
|
|||
res.FlushCacheFor(did)
|
||||
}
|
||||
}
|
||||
|
||||
func GetPDSEndpoint(ctx context.Context, did string) (*url.URL, error) {
|
||||
doc, err := GetDocument(ctx, did)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("resolving did %q: %w", did, err)
|
||||
}
|
||||
|
||||
pdsHost := ""
|
||||
for _, srv := range doc.Service {
|
||||
if srv.Type != "AtprotoPersonalDataServer" {
|
||||
continue
|
||||
}
|
||||
pdsHost = srv.ServiceEndpoint
|
||||
}
|
||||
if pdsHost == "" {
|
||||
return nil, fmt.Errorf("did not find any PDS in DID Document")
|
||||
}
|
||||
u, err := url.Parse(pdsHost)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("PDS endpoint (%q) is an invalid URL: %w", pdsHost, err)
|
||||
}
|
||||
if u.Host == "" {
|
||||
return nil, fmt.Errorf("PDS endpoint (%q) doesn't have a host part", pdsHost)
|
||||
}
|
||||
return u, nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue