Add AtRev column to only overwrite records with a newer version

main
Max Ignatenko 2024-02-17 14:29:45 +00:00
parent 1d3c6edf0a
commit 1038ca3bea
3 changed files with 50 additions and 7 deletions

View File

@ -10,6 +10,7 @@ import (
"net/http" "net/http"
"net/url" "net/url"
"path" "path"
"regexp"
"strings" "strings"
"time" "time"
@ -177,6 +178,12 @@ func (c *Consumer) updateCursor(ctx context.Context, seq int64) error {
} }
var postgresFixRegexp = regexp.MustCompile(`[^\\](\\\\)*(\\u0000)`)
func escapeNullCharForPostgres(b []byte) []byte {
return postgresFixRegexp.ReplaceAll(b, []byte(`$1<0x00>`))
}
func (c *Consumer) processMessage(ctx context.Context, typ string, r io.Reader, first bool) error { func (c *Consumer) processMessage(ctx context.Context, typ string, r io.Reader, first bool) error {
log := zerolog.Ctx(ctx) log := zerolog.Ctx(ctx)
@ -253,7 +260,11 @@ func (c *Consumer) processMessage(ctx context.Context, typ string, r io.Reader,
Repo: models.ID(repoInfo.ID), Repo: models.ID(repoInfo.ID),
Collection: parts[0], Collection: parts[0],
Rkey: parts[1], Rkey: parts[1],
Content: v, // XXX: proper replacement of \u0000 would require full parsing of JSON
// and recursive iteration over all string values, but this
// should work well enough for now.
Content: escapeNullCharForPostgres(v),
AtRev: payload.Rev,
}) })
} }
if len(recs) == 0 && expectRecords { if len(recs) == 0 && expectRecords {
@ -261,7 +272,15 @@ func (c *Consumer) processMessage(ctx context.Context, typ string, r io.Reader,
} }
if len(recs) > 0 || expectRecords { if len(recs) > 0 || expectRecords {
err = c.db.Model(&repo.Record{}). err = c.db.Model(&repo.Record{}).
Clauses(clause.OnConflict{DoUpdates: clause.AssignmentColumns([]string{"content"}), Clauses(clause.OnConflict{
Where: clause.Where{Exprs: []clause.Expression{clause.Or(
clause.Eq{Column: clause.Column{Name: "at_rev", Table: "records"}, Value: nil},
clause.Eq{Column: clause.Column{Name: "at_rev", Table: "records"}, Value: ""},
clause.Lt{
Column: clause.Column{Name: "at_rev", Table: "records"},
Value: clause.Column{Name: "at_rev", Table: "excluded"}},
)}},
DoUpdates: clause.AssignmentColumns([]string{"content", "at_rev"}),
Columns: []clause.Column{{Name: "repo"}, {Name: "collection"}, {Name: "rkey"}}}). Columns: []clause.Column{{Name: "repo"}, {Name: "collection"}, {Name: "rkey"}}}).
Create(recs).Error Create(recs).Error
if err != nil { if err != nil {

View File

@ -5,6 +5,7 @@ import (
"context" "context"
"fmt" "fmt"
"net/url" "net/url"
"regexp"
"strings" "strings"
"time" "time"
@ -131,6 +132,12 @@ func (p *WorkerPool) worker(ctx context.Context, signal chan struct{}) {
} }
} }
var postgresFixRegexp = regexp.MustCompile(`[^\\](\\\\)*(\\u0000)`)
func escapeNullCharForPostgres(b []byte) []byte {
return postgresFixRegexp.ReplaceAll(b, []byte(`$1<0x00>`))
}
func (p *WorkerPool) doWork(ctx context.Context, work WorkItem) error { func (p *WorkerPool) doWork(ctx context.Context, work WorkItem) error {
log := zerolog.Ctx(ctx) log := zerolog.Ctx(ctx)
defer close(work.signal) defer close(work.signal)
@ -201,18 +208,31 @@ retry:
log.Warn().Msgf("Unexpected key format: %q", k) log.Warn().Msgf("Unexpected key format: %q", k)
continue continue
} }
v = regexp.MustCompile(`[^\\](\\\\)*(\\u0000)`).ReplaceAll(v, []byte(`$1<0x00>`))
recs = append(recs, repo.Record{ recs = append(recs, repo.Record{
Repo: models.ID(work.Repo.ID), Repo: models.ID(work.Repo.ID),
Collection: parts[0], Collection: parts[0],
Rkey: parts[1], Rkey: parts[1],
Content: v, // XXX: proper replacement of \u0000 would require full parsing of JSON
// and recursive iteration over all string values, but this
// should work well enough for now.
Content: escapeNullCharForPostgres(v),
AtRev: newRev,
}) })
} }
recordsFetched.Add(float64(len(recs))) recordsFetched.Add(float64(len(recs)))
if len(recs) > 0 { if len(recs) > 0 {
for _, batch := range splitInBatshes(recs, 500) { for _, batch := range splitInBatshes(recs, 500) {
result := p.db.Model(&repo.Record{}). result := p.db.Model(&repo.Record{}).
Clauses(clause.OnConflict{DoUpdates: clause.AssignmentColumns([]string{"content"}), Clauses(clause.OnConflict{
Where: clause.Where{Exprs: []clause.Expression{clause.Or(
clause.Eq{Column: clause.Column{Name: "at_rev", Table: "records"}, Value: nil},
clause.Eq{Column: clause.Column{Name: "at_rev", Table: "records"}, Value: ""},
clause.Lt{
Column: clause.Column{Name: "at_rev", Table: "records"},
Value: clause.Column{Name: "at_rev", Table: "excluded"}},
)}},
DoUpdates: clause.AssignmentColumns([]string{"content", "at_rev"}),
Columns: []clause.Column{{Name: "repo"}, {Name: "collection"}, {Name: "rkey"}}}). Columns: []clause.Column{{Name: "repo"}, {Name: "collection"}, {Name: "rkey"}}}).
Create(batch) Create(batch)
if err := result.Error; err != nil { if err := result.Error; err != nil {
@ -229,6 +249,9 @@ retry:
return fmt.Errorf("updating repo rev: %w", err) return fmt.Errorf("updating repo rev: %w", err)
} }
// TODO: check for records that are missing in the repo download
// and mark them as deleted.
return nil return nil
} }

View File

@ -34,9 +34,10 @@ type Record struct {
ID models.ID `gorm:"primarykey"` ID models.ID `gorm:"primarykey"`
CreatedAt time.Time CreatedAt time.Time
UpdatedAt time.Time UpdatedAt time.Time
Repo models.ID `gorm:"index:idx_repo_record_key,unique,priority:1;not null"` Repo models.ID `gorm:"index:idx_repo_record_key,unique,priority:1;not null;index:idx_repo_rev"`
Collection string `gorm:"index:idx_repo_record_key,unique,priority:2;not null"` Collection string `gorm:"index:idx_repo_record_key,unique,priority:2;not null"`
Rkey string `gorm:"index:idx_repo_record_key,unique,priority:3"` Rkey string `gorm:"index:idx_repo_record_key,unique,priority:3"`
AtRev string `gorm:"index:idx_repo_rev"`
Content json.RawMessage `gorm:"type:JSONB"` Content json.RawMessage `gorm:"type:JSONB"`
Deleted bool Deleted bool
} }