Add few more metrics
parent
334af033b8
commit
1d25842b78
|
@ -187,6 +187,8 @@ func escapeNullCharForPostgres(b []byte) []byte {
|
||||||
func (c *Consumer) processMessage(ctx context.Context, typ string, r io.Reader, first bool) error {
|
func (c *Consumer) processMessage(ctx context.Context, typ string, r io.Reader, first bool) error {
|
||||||
log := zerolog.Ctx(ctx)
|
log := zerolog.Ctx(ctx)
|
||||||
|
|
||||||
|
eventCounter.WithLabelValues(c.remote.Host, typ).Inc()
|
||||||
|
|
||||||
switch typ {
|
switch typ {
|
||||||
case "#commit":
|
case "#commit":
|
||||||
payload := &comatproto.SyncSubscribeRepos_Commit{}
|
payload := &comatproto.SyncSubscribeRepos_Commit{}
|
||||||
|
@ -194,6 +196,8 @@ func (c *Consumer) processMessage(ctx context.Context, typ string, r io.Reader,
|
||||||
return fmt.Errorf("failed to unmarshal commit: %w", err)
|
return fmt.Errorf("failed to unmarshal commit: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
exportEventTimestamp(ctx, c.remote.Host, payload.Time)
|
||||||
|
|
||||||
if c.remote.FirstCursorSinceReset == 0 {
|
if c.remote.FirstCursorSinceReset == 0 {
|
||||||
if err := c.resetCursor(ctx, payload.Seq); err != nil {
|
if err := c.resetCursor(ctx, payload.Seq); err != nil {
|
||||||
return fmt.Errorf("handling cursor reset: %w", err)
|
return fmt.Errorf("handling cursor reset: %w", err)
|
||||||
|
@ -205,7 +209,7 @@ func (c *Consumer) processMessage(ctx context.Context, typ string, r io.Reader,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
repoInfo, err := repo.EnsureExists(ctx, c.db, payload.Repo)
|
repoInfo, created, err := repo.EnsureExists(ctx, c.db, payload.Repo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("repo.EnsureExists(%q): %w", payload.Repo, err)
|
return fmt.Errorf("repo.EnsureExists(%q): %w", payload.Repo, err)
|
||||||
}
|
}
|
||||||
|
@ -214,6 +218,9 @@ func (c *Consumer) processMessage(ctx context.Context, typ string, r io.Reader,
|
||||||
Msgf("Commit from an incorrect PDS, skipping")
|
Msgf("Commit from an incorrect PDS, skipping")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
if created {
|
||||||
|
reposDiscovered.WithLabelValues(c.remote.Host).Inc()
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: verify signature
|
// TODO: verify signature
|
||||||
|
|
||||||
|
@ -320,6 +327,8 @@ func (c *Consumer) processMessage(ctx context.Context, typ string, r io.Reader,
|
||||||
return fmt.Errorf("failed to unmarshal commit: %w", err)
|
return fmt.Errorf("failed to unmarshal commit: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
exportEventTimestamp(ctx, c.remote.Host, payload.Time)
|
||||||
|
|
||||||
if c.remote.FirstCursorSinceReset == 0 {
|
if c.remote.FirstCursorSinceReset == 0 {
|
||||||
if err := c.resetCursor(ctx, payload.Seq); err != nil {
|
if err := c.resetCursor(ctx, payload.Seq); err != nil {
|
||||||
return fmt.Errorf("handling cursor reset: %w", err)
|
return fmt.Errorf("handling cursor reset: %w", err)
|
||||||
|
@ -340,6 +349,8 @@ func (c *Consumer) processMessage(ctx context.Context, typ string, r io.Reader,
|
||||||
return fmt.Errorf("failed to unmarshal commit: %w", err)
|
return fmt.Errorf("failed to unmarshal commit: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
exportEventTimestamp(ctx, c.remote.Host, payload.Time)
|
||||||
|
|
||||||
if c.remote.FirstCursorSinceReset == 0 {
|
if c.remote.FirstCursorSinceReset == 0 {
|
||||||
if err := c.resetCursor(ctx, payload.Seq); err != nil {
|
if err := c.resetCursor(ctx, payload.Seq); err != nil {
|
||||||
return fmt.Errorf("handling cursor reset: %w", err)
|
return fmt.Errorf("handling cursor reset: %w", err)
|
||||||
|
@ -362,6 +373,8 @@ func (c *Consumer) processMessage(ctx context.Context, typ string, r io.Reader,
|
||||||
return fmt.Errorf("failed to unmarshal commit: %w", err)
|
return fmt.Errorf("failed to unmarshal commit: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
exportEventTimestamp(ctx, c.remote.Host, payload.Time)
|
||||||
|
|
||||||
if c.remote.FirstCursorSinceReset == 0 {
|
if c.remote.FirstCursorSinceReset == 0 {
|
||||||
if err := c.resetCursor(ctx, payload.Seq); err != nil {
|
if err := c.resetCursor(ctx, payload.Seq); err != nil {
|
||||||
return fmt.Errorf("handling cursor reset: %w", err)
|
return fmt.Errorf("handling cursor reset: %w", err)
|
||||||
|
@ -448,3 +461,11 @@ func parseError(node datamodel.Node) (xrpc.XRPCError, error) {
|
||||||
|
|
||||||
return r, nil
|
return r, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func exportEventTimestamp(ctx context.Context, remote string, timestamp string) {
|
||||||
|
if t, err := time.Parse(time.RFC3339, timestamp); err != nil {
|
||||||
|
zerolog.Ctx(ctx).Error().Err(err).Str("pds", remote).Msgf("Failed to parse %q as a timestamp: %s", timestamp, err)
|
||||||
|
} else {
|
||||||
|
lastEventTimestamp.WithLabelValues(remote).Set(float64(t.Unix()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,21 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||||
|
)
|
||||||
|
|
||||||
|
var lastEventTimestamp = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
||||||
|
Name: "repo_commit_received_timestamp",
|
||||||
|
Help: "Timestamp of the last event received from firehose.",
|
||||||
|
}, []string{"remote"})
|
||||||
|
|
||||||
|
var eventCounter = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||||
|
Name: "repo_commits_received_counter",
|
||||||
|
Help: "Counter of events received from each remote.",
|
||||||
|
}, []string{"remote", "type"})
|
||||||
|
|
||||||
|
var reposDiscovered = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||||
|
Name: "repo_discovered_counter",
|
||||||
|
Help: "Counter of newly discovered repos",
|
||||||
|
}, []string{"remote"})
|
|
@ -91,10 +91,13 @@ func (l *Lister) run(ctx context.Context) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
log.Info().Msgf("Received %d DIDs from %q", len(dids), remote.Host)
|
log.Info().Msgf("Received %d DIDs from %q", len(dids), remote.Host)
|
||||||
|
reposListed.WithLabelValues(remote.Host).Add(float64(len(dids)))
|
||||||
|
|
||||||
for _, did := range dids {
|
for _, did := range dids {
|
||||||
if _, err := repo.EnsureExists(ctx, l.db, did); err != nil {
|
if _, created, err := repo.EnsureExists(ctx, l.db, did); err != nil {
|
||||||
log.Error().Err(err).Msgf("Failed to ensure that we have a record for the repo %q: %s", did, err)
|
log.Error().Err(err).Msgf("Failed to ensure that we have a record for the repo %q: %s", did, err)
|
||||||
|
} else if created {
|
||||||
|
reposDiscovered.WithLabelValues(remote.Host).Inc()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,16 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||||
|
)
|
||||||
|
|
||||||
|
var reposDiscovered = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||||
|
Name: "repo_discovered_counter",
|
||||||
|
Help: "Counter of newly discovered repos",
|
||||||
|
}, []string{"remote"})
|
||||||
|
|
||||||
|
var reposListed = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||||
|
Name: "repo_listed_counter",
|
||||||
|
Help: "Counter of repos received by listing PDSs.",
|
||||||
|
}, []string{"remote"})
|
25
repo/repo.go
25
repo/repo.go
|
@ -46,14 +46,14 @@ func AutoMigrate(db *gorm.DB) error {
|
||||||
return db.AutoMigrate(&Repo{}, &Record{})
|
return db.AutoMigrate(&Repo{}, &Record{})
|
||||||
}
|
}
|
||||||
|
|
||||||
func EnsureExists(ctx context.Context, db *gorm.DB, did string) (*Repo, error) {
|
func EnsureExists(ctx context.Context, db *gorm.DB, did string) (*Repo, bool, error) {
|
||||||
r := Repo{}
|
r := Repo{}
|
||||||
if err := db.Model(&r).Where(&Repo{DID: did}).Take(&r).Error; err == nil {
|
if err := db.Model(&r).Where(&Repo{DID: did}).Take(&r).Error; err == nil {
|
||||||
// Already have a row, just return it.
|
// Already have a row, just return it.
|
||||||
return &r, nil
|
return &r, false, nil
|
||||||
} else {
|
} else {
|
||||||
if !errors.Is(err, gorm.ErrRecordNotFound) {
|
if !errors.Is(err, gorm.ErrRecordNotFound) {
|
||||||
return nil, fmt.Errorf("querying DB: %w", err)
|
return nil, false, fmt.Errorf("querying DB: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -68,7 +68,7 @@ func EnsureExists(ctx context.Context, db *gorm.DB, did string) (*Repo, error) {
|
||||||
|
|
||||||
doc, err := resolver.GetDocument(ctx, did)
|
doc, err := resolver.GetDocument(ctx, did)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("fetching DID Document: %w", err)
|
return nil, false, fmt.Errorf("fetching DID Document: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
pdsHost := ""
|
pdsHost := ""
|
||||||
|
@ -79,31 +79,34 @@ func EnsureExists(ctx context.Context, db *gorm.DB, did string) (*Repo, error) {
|
||||||
pdsHost = srv.ServiceEndpoint
|
pdsHost = srv.ServiceEndpoint
|
||||||
}
|
}
|
||||||
if pdsHost == "" {
|
if pdsHost == "" {
|
||||||
return nil, fmt.Errorf("did not find any PDS in DID Document")
|
return nil, false, fmt.Errorf("did not find any PDS in DID Document")
|
||||||
}
|
}
|
||||||
u, err := url.Parse(pdsHost)
|
u, err := url.Parse(pdsHost)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("PDS endpoint (%q) is an invalid URL: %w", pdsHost, err)
|
return nil, false, fmt.Errorf("PDS endpoint (%q) is an invalid URL: %w", pdsHost, err)
|
||||||
}
|
}
|
||||||
if u.Host == "" {
|
if u.Host == "" {
|
||||||
return nil, fmt.Errorf("PDS endpoint (%q) doesn't have a host part", pdsHost)
|
return nil, false, fmt.Errorf("PDS endpoint (%q) doesn't have a host part", pdsHost)
|
||||||
}
|
}
|
||||||
remote := pds.PDS{Host: u.String()}
|
remote := pds.PDS{Host: u.String()}
|
||||||
if err := db.Model(&remote).Where(&pds.PDS{Host: remote.Host}).FirstOrCreate(&remote).Error; err != nil {
|
if err := db.Model(&remote).Where(&pds.PDS{Host: remote.Host}).FirstOrCreate(&remote).Error; err != nil {
|
||||||
return nil, fmt.Errorf("failed to get PDS record from DB for %q: %w", remote.Host, err)
|
return nil, false, fmt.Errorf("failed to get PDS record from DB for %q: %w", remote.Host, err)
|
||||||
}
|
}
|
||||||
r = Repo{DID: did, PDS: models.ID(remote.ID)}
|
r = Repo{DID: did, PDS: models.ID(remote.ID)}
|
||||||
|
created := false
|
||||||
err = db.Transaction(func(tx *gorm.DB) error {
|
err = db.Transaction(func(tx *gorm.DB) error {
|
||||||
if err := tx.Model(&r).Where(&Repo{DID: r.DID}).FirstOrCreate(&r).Error; err != nil {
|
result := tx.Model(&r).Where(&Repo{DID: r.DID}).FirstOrCreate(&r)
|
||||||
|
if err := result.Error; err != nil {
|
||||||
return fmt.Errorf("looking for repo: %w", err)
|
return fmt.Errorf("looking for repo: %w", err)
|
||||||
}
|
}
|
||||||
if r.PDS != models.ID(remote.ID) {
|
if r.PDS != models.ID(remote.ID) {
|
||||||
return tx.Model(&r).Select("FirstRevSinceReset").Updates(&Repo{FirstRevSinceReset: ""}).Error
|
return tx.Model(&r).Select("FirstRevSinceReset").Updates(&Repo{FirstRevSinceReset: ""}).Error
|
||||||
}
|
}
|
||||||
|
created = result.RowsAffected > 0
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("upserting repo record: %w", err)
|
return nil, false, fmt.Errorf("upserting repo record: %w", err)
|
||||||
}
|
}
|
||||||
return &r, nil
|
return &r, created, nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue