Detect CAR files with zero blocks and handle them accordingly
parent
638bdcf515
commit
4c41389e9b
|
@ -173,7 +173,8 @@ retry:
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: add a configuration knob for switching between full and partial fetch.
|
// TODO: add a configuration knob for switching between full and partial fetch.
|
||||||
b, err := comatproto.SyncGetRepo(ctx, client, work.Repo.DID, work.Repo.LastIndexedRev)
|
sinceRev := work.Repo.LastIndexedRev
|
||||||
|
b, err := comatproto.SyncGetRepo(ctx, client, work.Repo.DID, sinceRev)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err, ok := errors.As[*xrpc.Error](err); ok {
|
if err, ok := errors.As[*xrpc.Error](err); ok {
|
||||||
if err.IsThrottled() && err.Ratelimit != nil {
|
if err.IsThrottled() && err.Ratelimit != nil {
|
||||||
|
@ -204,7 +205,10 @@ retry:
|
||||||
}
|
}
|
||||||
|
|
||||||
newRev, err := repo.GetRev(ctx, bytes.NewReader(b))
|
newRev, err := repo.GetRev(ctx, bytes.NewReader(b))
|
||||||
if err != nil {
|
if sinceRev != "" && errors.Is(err, repo.ErrZeroBlocks) {
|
||||||
|
// No new records since the rev we requested above.
|
||||||
|
return nil
|
||||||
|
} else if err != nil {
|
||||||
l := 25
|
l := 25
|
||||||
if len(b) < l {
|
if len(b) < l {
|
||||||
l = len(b)
|
l = len(b)
|
||||||
|
|
|
@ -250,6 +250,8 @@ func parseMap(node datamodel.Node) (map[string]datamodel.Node, error) {
|
||||||
return m, nil
|
return m, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var ErrZeroBlocks = fmt.Errorf("zero blocks found")
|
||||||
|
|
||||||
func GetRev(ctx context.Context, b io.Reader) (string, error) {
|
func GetRev(ctx context.Context, b io.Reader) (string, error) {
|
||||||
r, err := car.NewCarReader(b)
|
r, err := car.NewCarReader(b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -278,6 +280,10 @@ func GetRev(ctx context.Context, b io.Reader) (string, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(blocks) == 0 {
|
||||||
|
return "", ErrZeroBlocks
|
||||||
|
}
|
||||||
|
|
||||||
builder := basicnode.Prototype.Any.NewBuilder()
|
builder := basicnode.Prototype.Any.NewBuilder()
|
||||||
if err := (&dagcbor.DecodeOptions{AllowLinks: true}).Decode(builder, bytes.NewReader(blocks[r.Header.Roots[0]])); err != nil {
|
if err := (&dagcbor.DecodeOptions{AllowLinks: true}).Decode(builder, bytes.NewReader(blocks[r.Header.Roots[0]])); err != nil {
|
||||||
return "", fmt.Errorf("unmarshaling %q: %w", r.Header.Roots[0].String(), err)
|
return "", fmt.Errorf("unmarshaling %q: %w", r.Header.Roots[0].String(), err)
|
||||||
|
|
Loading…
Reference in New Issue