plc-mirror/repo/repo.go

113 lines
3.6 KiB
Go
Raw Normal View History

2024-02-15 17:10:39 +01:00
package repo
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/url"
"time"
"gorm.io/gorm"
"github.com/uabluerail/indexer/models"
"github.com/uabluerail/indexer/pds"
"github.com/uabluerail/indexer/util/resolver"
)
type Repo struct {
2024-02-15 21:29:08 +01:00
ID models.ID `gorm:"primarykey"`
CreatedAt time.Time
UpdatedAt time.Time
2024-02-15 19:39:29 +01:00
PDS models.ID `gorm:"index:rev_state_index,priority:2;index:was_indexed,priority:2"`
2024-02-15 17:10:39 +01:00
DID string `gorm:"uniqueIndex;column:did"`
2024-02-15 19:39:29 +01:00
LastIndexedRev string `gorm:"index:rev_state_index,expression:(last_indexed_rev < first_rev_since_reset),priority:1;index:was_indexed,expression:(last_indexed_rev is null OR last_indexed_rev = ''),priority:1"`
2024-02-15 17:10:39 +01:00
FirstRevSinceReset string
FirstCursorSinceReset int64
TombstonedAt time.Time
LastIndexAttempt time.Time
LastError string
FailedAttempts int `gorm:"default:0"`
2024-02-15 17:10:39 +01:00
}
type Record struct {
2024-02-15 21:29:08 +01:00
ID models.ID `gorm:"primarykey"`
CreatedAt time.Time
UpdatedAt time.Time
Repo models.ID `gorm:"index:idx_repo_record_key,unique,priority:1;not null;index:idx_repo_rev"`
2024-02-15 17:10:39 +01:00
Collection string `gorm:"index:idx_repo_record_key,unique,priority:2;not null"`
Rkey string `gorm:"index:idx_repo_record_key,unique,priority:3"`
AtRev string `gorm:"index:idx_repo_rev"`
2024-02-15 17:10:39 +01:00
Content json.RawMessage `gorm:"type:JSONB"`
Deleted bool
}
func AutoMigrate(db *gorm.DB) error {
return db.AutoMigrate(&Repo{}, &Record{})
}
2024-02-18 18:23:54 +01:00
func EnsureExists(ctx context.Context, db *gorm.DB, did string) (*Repo, bool, error) {
2024-02-15 17:10:39 +01:00
r := Repo{}
if err := db.Model(&r).Where(&Repo{DID: did}).Take(&r).Error; err == nil {
// Already have a row, just return it.
2024-02-18 18:23:54 +01:00
return &r, false, nil
2024-02-15 17:10:39 +01:00
} else {
if !errors.Is(err, gorm.ErrRecordNotFound) {
2024-02-18 18:23:54 +01:00
return nil, false, fmt.Errorf("querying DB: %w", err)
2024-02-15 17:10:39 +01:00
}
}
// No row yet, so we need to create one (keeping in mind that it can be created
// concurrently by someone else).
// 1) resolve did (i.e., query PLC)
// 2) get PDS address from didDoc and ensure we have a record for it
// 3) in a transaction, check if we have a record for the repo
// if we don't - just create a record
// if we do - compare PDS IDs
// if they don't match - also reset FirstRevSinceReset
doc, err := resolver.GetDocument(ctx, did)
if err != nil {
2024-02-18 18:23:54 +01:00
return nil, false, fmt.Errorf("fetching DID Document: %w", err)
2024-02-15 17:10:39 +01:00
}
pdsHost := ""
for _, srv := range doc.Service {
if srv.Type != "AtprotoPersonalDataServer" {
continue
}
pdsHost = srv.ServiceEndpoint
}
if pdsHost == "" {
2024-02-18 18:23:54 +01:00
return nil, false, fmt.Errorf("did not find any PDS in DID Document")
2024-02-15 17:10:39 +01:00
}
u, err := url.Parse(pdsHost)
if err != nil {
2024-02-18 18:23:54 +01:00
return nil, false, fmt.Errorf("PDS endpoint (%q) is an invalid URL: %w", pdsHost, err)
2024-02-15 17:10:39 +01:00
}
if u.Host == "" {
2024-02-18 18:23:54 +01:00
return nil, false, fmt.Errorf("PDS endpoint (%q) doesn't have a host part", pdsHost)
2024-02-15 17:10:39 +01:00
}
2024-02-21 10:35:25 +01:00
remote, err := pds.EnsureExists(ctx, db, u.String())
if err != nil {
2024-02-18 18:23:54 +01:00
return nil, false, fmt.Errorf("failed to get PDS record from DB for %q: %w", remote.Host, err)
2024-02-15 17:10:39 +01:00
}
r = Repo{DID: did, PDS: models.ID(remote.ID)}
2024-02-18 18:23:54 +01:00
created := false
2024-02-15 17:10:39 +01:00
err = db.Transaction(func(tx *gorm.DB) error {
2024-02-18 18:23:54 +01:00
result := tx.Model(&r).Where(&Repo{DID: r.DID}).FirstOrCreate(&r)
if err := result.Error; err != nil {
2024-02-15 17:10:39 +01:00
return fmt.Errorf("looking for repo: %w", err)
}
if r.PDS != models.ID(remote.ID) {
return tx.Model(&r).Select("FirstRevSinceReset").Updates(&Repo{FirstRevSinceReset: ""}).Error
}
2024-02-18 18:23:54 +01:00
created = result.RowsAffected > 0
2024-02-15 17:10:39 +01:00
return nil
})
if err != nil {
2024-02-18 18:23:54 +01:00
return nil, false, fmt.Errorf("upserting repo record: %w", err)
2024-02-15 17:10:39 +01:00
}
2024-02-18 18:23:54 +01:00
return &r, created, nil
2024-02-15 17:10:39 +01:00
}