Drop the hack for detecting cursor reset from non-compliant servers

main
Max Ignatenko 2024-04-10 23:18:16 +01:00
parent 4b40c5919b
commit c17730c11f
1 changed files with 0 additions and 37 deletions

View File

@ -217,23 +217,6 @@ func (c *Consumer) runOnce(ctx context.Context) error {
} }
} }
func (c *Consumer) checkForCursorReset(ctx context.Context, seq int64) error {
// hack to detect cursor resets upon connection for implementations
// that don't emit an explicit #info when connecting with an outdated cursor.
zerolog.Ctx(ctx).Info().
Int64("cursor", c.remote.Cursor).
Int64("remote_cursor", seq).
Msgf("Checking for possible cursor reset")
if seq == c.remote.Cursor+1 {
// No reset.
return nil
}
return c.resetCursor(ctx, seq)
}
func (c *Consumer) resetCursor(ctx context.Context, seq int64) error { func (c *Consumer) resetCursor(ctx context.Context, seq int64) error {
zerolog.Ctx(ctx).Warn().Str("pds", c.remote.Host).Msgf("Cursor reset: %d -> %d", c.remote.Cursor, seq) zerolog.Ctx(ctx).Warn().Str("pds", c.remote.Host).Msgf("Cursor reset: %d -> %d", c.remote.Cursor, seq)
err := c.db.Model(&c.remote). err := c.db.Model(&c.remote).
@ -282,11 +265,6 @@ func (c *Consumer) processMessage(ctx context.Context, typ string, r io.Reader,
return fmt.Errorf("handling cursor reset: %w", err) return fmt.Errorf("handling cursor reset: %w", err)
} }
} }
if first {
if err := c.checkForCursorReset(ctx, payload.Seq); err != nil {
return err
}
}
repoInfo, created, err := repo.EnsureExists(ctx, c.db, payload.Repo) repoInfo, created, err := repo.EnsureExists(ctx, c.db, payload.Repo)
if err != nil { if err != nil {
@ -487,11 +465,6 @@ func (c *Consumer) processMessage(ctx context.Context, typ string, r io.Reader,
return fmt.Errorf("handling cursor reset: %w", err) return fmt.Errorf("handling cursor reset: %w", err)
} }
} }
if first {
if err := c.checkForCursorReset(ctx, payload.Seq); err != nil {
return err
}
}
// No-op, we don't store handles. // No-op, we don't store handles.
if err := c.updateCursor(ctx, payload.Seq); err != nil { if err := c.updateCursor(ctx, payload.Seq); err != nil {
return err return err
@ -509,11 +482,6 @@ func (c *Consumer) processMessage(ctx context.Context, typ string, r io.Reader,
return fmt.Errorf("handling cursor reset: %w", err) return fmt.Errorf("handling cursor reset: %w", err)
} }
} }
if first {
if err := c.checkForCursorReset(ctx, payload.Seq); err != nil {
return err
}
}
log.Debug().Interface("payload", payload).Str("did", payload.Did).Msgf("MIGRATION") log.Debug().Interface("payload", payload).Str("did", payload.Did).Msgf("MIGRATION")
// TODO // TODO
@ -533,11 +501,6 @@ func (c *Consumer) processMessage(ctx context.Context, typ string, r io.Reader,
return fmt.Errorf("handling cursor reset: %w", err) return fmt.Errorf("handling cursor reset: %w", err)
} }
} }
if first {
if err := c.checkForCursorReset(ctx, payload.Seq); err != nil {
return err
}
}
// TODO // TODO
if err := c.updateCursor(ctx, payload.Seq); err != nil { if err := c.updateCursor(ctx, payload.Seq); err != nil {
return err return err