Add support for discovering new PDSs from relays
parent
c919050833
commit
1abe505ef9
|
@ -35,6 +35,7 @@ type Config struct {
|
||||||
LogLevel int64 `default:"1"`
|
LogLevel int64 `default:"1"`
|
||||||
MetricsPort string `split_words:"true"`
|
MetricsPort string `split_words:"true"`
|
||||||
DBUrl string `envconfig:"POSTGRES_URL"`
|
DBUrl string `envconfig:"POSTGRES_URL"`
|
||||||
|
Relays string
|
||||||
}
|
}
|
||||||
|
|
||||||
var config Config
|
var config Config
|
||||||
|
@ -54,6 +55,16 @@ func runMain(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
log.Debug().Msgf("DB connection established")
|
log.Debug().Msgf("DB connection established")
|
||||||
|
|
||||||
|
if config.Relays != "" {
|
||||||
|
for _, host := range strings.Split(config.Relays, ",") {
|
||||||
|
c, err := NewRelayConsumer(ctx, host, db)
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Msgf("Failed to create relay consumer for %q: %s", host, err)
|
||||||
|
}
|
||||||
|
c.Start(ctx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
consumersCh := make(chan struct{})
|
consumersCh := make(chan struct{})
|
||||||
go runConsumers(ctx, db, consumersCh)
|
go runConsumers(ctx, db, consumersCh)
|
||||||
|
|
||||||
|
@ -160,6 +171,7 @@ func main() {
|
||||||
flag.StringVar(&config.LogFile, "log", "", "Path to the log file. If empty, will log to stderr")
|
flag.StringVar(&config.LogFile, "log", "", "Path to the log file. If empty, will log to stderr")
|
||||||
flag.StringVar(&config.LogFormat, "log-format", "text", "Logging format. 'text' or 'json'")
|
flag.StringVar(&config.LogFormat, "log-format", "text", "Logging format. 'text' or 'json'")
|
||||||
flag.Int64Var(&config.LogLevel, "log-level", 1, "Log level. -1 - trace, 0 - debug, 1 - info, 5 - panic")
|
flag.Int64Var(&config.LogLevel, "log-level", 1, "Log level. -1 - trace, 0 - debug, 1 - info, 5 - panic")
|
||||||
|
flag.StringVar(&config.Relays, "relays", "", "List of relays to connect to (for discovering new PDSs)")
|
||||||
|
|
||||||
if err := envconfig.Process("consumer", &config); err != nil {
|
if err := envconfig.Process("consumer", &config); err != nil {
|
||||||
log.Fatalf("envconfig.Process: %s", err)
|
log.Fatalf("envconfig.Process: %s", err)
|
||||||
|
|
|
@ -0,0 +1,179 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"path"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
comatproto "github.com/bluesky-social/indigo/api/atproto"
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
|
"github.com/ipld/go-ipld-prime/codec/dagcbor"
|
||||||
|
"github.com/ipld/go-ipld-prime/node/basicnode"
|
||||||
|
"github.com/rs/zerolog"
|
||||||
|
"github.com/uabluerail/indexer/pds"
|
||||||
|
"github.com/uabluerail/indexer/util/resolver"
|
||||||
|
"gorm.io/gorm"
|
||||||
|
)
|
||||||
|
|
||||||
|
type RelayConsumer struct {
|
||||||
|
url string
|
||||||
|
db *gorm.DB
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRelayConsumer(ctx context.Context, host string, db *gorm.DB) (*RelayConsumer, error) {
|
||||||
|
addr, err := url.Parse(host)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("parsing URL %q: %s", host, err)
|
||||||
|
}
|
||||||
|
addr.Scheme = "wss"
|
||||||
|
addr.Path = path.Join(addr.Path, "xrpc/com.atproto.sync.subscribeRepos")
|
||||||
|
return &RelayConsumer{db: db, url: addr.String()}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *RelayConsumer) Start(ctx context.Context) {
|
||||||
|
go c.run(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *RelayConsumer) run(ctx context.Context) {
|
||||||
|
log := zerolog.Ctx(ctx).With().Str("relay", c.url).Logger()
|
||||||
|
ctx = log.WithContext(ctx)
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
log.Info().Msgf("Relay consumer stopped")
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
if err := c.runOnce(ctx); err != nil {
|
||||||
|
log.Error().Err(err).Msgf("Consumer of relay %q failed (will be restarted): %s", c.url, err)
|
||||||
|
}
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *RelayConsumer) runOnce(ctx context.Context) error {
|
||||||
|
log := zerolog.Ctx(ctx)
|
||||||
|
|
||||||
|
conn, _, err := websocket.DefaultDialer.DialContext(ctx, c.url, http.Header{})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("establishing websocker connection: %w", err)
|
||||||
|
}
|
||||||
|
defer conn.Close()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
_, b, err := conn.ReadMessage()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("websocket.ReadMessage: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
r := bytes.NewReader(b)
|
||||||
|
proto := basicnode.Prototype.Any
|
||||||
|
headerNode := proto.NewBuilder()
|
||||||
|
if err := (&dagcbor.DecodeOptions{DontParseBeyondEnd: true}).Decode(headerNode, r); err != nil {
|
||||||
|
return fmt.Errorf("unmarshaling message header: %w", err)
|
||||||
|
}
|
||||||
|
header, err := parseHeader(headerNode.Build())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("parsing message header: %w", err)
|
||||||
|
}
|
||||||
|
switch header.Op {
|
||||||
|
case 1:
|
||||||
|
if err := c.processMessage(ctx, header.Type, r); err != nil {
|
||||||
|
log.Info().Err(err).Msgf("Relay consumer failed to process a message: %s", err)
|
||||||
|
}
|
||||||
|
case -1:
|
||||||
|
bodyNode := proto.NewBuilder()
|
||||||
|
if err := (&dagcbor.DecodeOptions{DontParseBeyondEnd: true, AllowLinks: true}).Decode(bodyNode, r); err != nil {
|
||||||
|
return fmt.Errorf("unmarshaling message body: %w", err)
|
||||||
|
}
|
||||||
|
body, err := parseError(bodyNode.Build())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("parsing error payload: %w", err)
|
||||||
|
}
|
||||||
|
return &body
|
||||||
|
default:
|
||||||
|
log.Warn().Msgf("Unknown 'op' value received: %d", header.Op)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *RelayConsumer) processMessage(ctx context.Context, typ string, r io.Reader) error {
|
||||||
|
log := zerolog.Ctx(ctx)
|
||||||
|
|
||||||
|
did := ""
|
||||||
|
|
||||||
|
switch typ {
|
||||||
|
case "#commit":
|
||||||
|
payload := &comatproto.SyncSubscribeRepos_Commit{}
|
||||||
|
if err := payload.UnmarshalCBOR(r); err != nil {
|
||||||
|
return fmt.Errorf("failed to unmarshal commit: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
did = payload.Repo
|
||||||
|
|
||||||
|
case "#handle":
|
||||||
|
payload := &comatproto.SyncSubscribeRepos_Handle{}
|
||||||
|
if err := payload.UnmarshalCBOR(r); err != nil {
|
||||||
|
return fmt.Errorf("failed to unmarshal commit: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
did = payload.Did
|
||||||
|
|
||||||
|
case "#migrate":
|
||||||
|
payload := &comatproto.SyncSubscribeRepos_Migrate{}
|
||||||
|
if err := payload.UnmarshalCBOR(r); err != nil {
|
||||||
|
return fmt.Errorf("failed to unmarshal commit: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
did = payload.Did
|
||||||
|
|
||||||
|
case "#tombstone":
|
||||||
|
payload := &comatproto.SyncSubscribeRepos_Tombstone{}
|
||||||
|
if err := payload.UnmarshalCBOR(r); err != nil {
|
||||||
|
return fmt.Errorf("failed to unmarshal commit: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
did = payload.Did
|
||||||
|
|
||||||
|
case "#info":
|
||||||
|
// Ignore
|
||||||
|
case "#identity":
|
||||||
|
payload := &comatproto.SyncSubscribeRepos_Identity{}
|
||||||
|
if err := payload.UnmarshalCBOR(r); err != nil {
|
||||||
|
return fmt.Errorf("failed to unmarshal commit: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
did = payload.Did
|
||||||
|
|
||||||
|
default:
|
||||||
|
b, err := io.ReadAll(r)
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Msgf("Failed to read message payload: %s", err)
|
||||||
|
}
|
||||||
|
log.Warn().Msgf("Unknown message type received: %s payload=%q", typ, string(b))
|
||||||
|
}
|
||||||
|
|
||||||
|
if did == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
u, err := resolver.GetPDSEndpoint(ctx, did)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err = pds.EnsureExists(ctx, c.db, u.String())
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
|
@ -73,6 +73,7 @@ services:
|
||||||
environment:
|
environment:
|
||||||
CONSUMER_METRICS_PORT: '8080'
|
CONSUMER_METRICS_PORT: '8080'
|
||||||
CONSUMER_POSTGRES_URL: "postgres://postgres:${POSTGRES_PASSWORD}@db/bluesky?sslmode=disable"
|
CONSUMER_POSTGRES_URL: "postgres://postgres:${POSTGRES_PASSWORD}@db/bluesky?sslmode=disable"
|
||||||
|
# CONSUMER_RELAYS: "https://bsky.network" # Effectively doubles inbound network traffic. Set this in docker-compose.override.yml if needed.
|
||||||
ATP_PLC_ADDR: "${PLC_ADDRESS:-https://plc.directory}"
|
ATP_PLC_ADDR: "${PLC_ADDRESS:-https://plc.directory}"
|
||||||
ports:
|
ports:
|
||||||
- "0.0.0.0:11002:8080"
|
- "0.0.0.0:11002:8080"
|
||||||
|
|
Loading…
Reference in New Issue