Add quick&dirty quarantine logic for bad records
parent
4418d76bb2
commit
758c5fe5e6
|
@ -30,6 +30,15 @@ import (
|
|||
"github.com/uabluerail/indexer/repo"
|
||||
)
|
||||
|
||||
type BadRecord struct {
|
||||
ID models.ID `gorm:"primarykey"`
|
||||
CreatedAt time.Time
|
||||
PDS models.ID `gorm:"index"`
|
||||
Cursor int64
|
||||
Error string
|
||||
Content []byte
|
||||
}
|
||||
|
||||
type Consumer struct {
|
||||
db *gorm.DB
|
||||
remote pds.PDS
|
||||
|
@ -38,6 +47,10 @@ type Consumer struct {
|
|||
}
|
||||
|
||||
func NewConsumer(ctx context.Context, remote *pds.PDS, db *gorm.DB) (*Consumer, error) {
|
||||
if err := db.AutoMigrate(&BadRecord{}); err != nil {
|
||||
return nil, fmt.Errorf("db.AutoMigrate: %s", err)
|
||||
}
|
||||
|
||||
return &Consumer{
|
||||
db: db,
|
||||
remote: *remote,
|
||||
|
@ -118,8 +131,27 @@ func (c *Consumer) runOnce(ctx context.Context) error {
|
|||
switch header.Op {
|
||||
case 1:
|
||||
if err := c.processMessage(ctx, header.Type, r, first); err != nil {
|
||||
const maxBadRecords = 100
|
||||
var count int64
|
||||
if err2 := c.db.Model(&BadRecord{}).Where(&BadRecord{PDS: c.remote.ID}).Count(&count).Error; err2 != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if count >= maxBadRecords {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Error().Err(err).Str("pds", c.remote.Host).Msgf("Failed to process message at cursor %d: %s", c.remote.Cursor, err)
|
||||
err := c.db.Create(&BadRecord{
|
||||
PDS: c.remote.ID,
|
||||
Cursor: c.remote.Cursor,
|
||||
Error: err.Error(),
|
||||
Content: b,
|
||||
}).Error
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to store bad message: %s", err)
|
||||
}
|
||||
}
|
||||
case -1:
|
||||
bodyNode := proto.NewBuilder()
|
||||
if err := (&dagcbor.DecodeOptions{DontParseBeyondEnd: true, AllowLinks: true}).Decode(bodyNode, r); err != nil {
|
||||
|
|
Loading…
Reference in New Issue