diff --git a/cmd/plc-mirror/mirror.go b/cmd/plc-mirror/mirror.go index 133f952..03ee903 100644 --- a/cmd/plc-mirror/mirror.go +++ b/cmd/plc-mirror/mirror.go @@ -38,6 +38,7 @@ type Mirror struct { mu sync.RWMutex lastSuccessTimestamp time.Time + lastRecordTimestamp time.Time } func NewMirror(ctx context.Context, upstream string, db *gorm.DB) (*Mirror, error) { @@ -91,10 +92,33 @@ func (m *Mirror) LastSuccess() time.Time { return m.lastSuccessTimestamp } -func (m *Mirror) LastRecordTimestamp(ctx context.Context) (string, error) { +func (m *Mirror) LastRecordTimestamp(ctx context.Context) (time.Time, error) { + m.mu.RLock() + t := m.lastRecordTimestamp + m.mu.RUnlock() + if !t.IsZero() { + return t, nil + } + ts := "" err := m.db.WithContext(ctx).Model(&PLCLogEntry{}).Select("plc_timestamp").Order("plc_timestamp desc").Limit(1).Take(&ts).Error - return ts, err + if err != nil { + return time.Time{}, err + } + dbTimestamp, err := time.Parse(time.RFC3339, ts) + if err != nil { + return time.Time{}, fmt.Errorf("parsing timestamp %q: %w", ts, err) + } + + m.mu.Lock() + defer m.mu.Unlock() + if t.IsZero() { + t = dbTimestamp + } + if t.After(dbTimestamp) { + return t, nil + } + return dbTimestamp, nil } func (m *Mirror) runOnce(ctx context.Context) error { @@ -138,6 +162,8 @@ func (m *Mirror) runOnce(ctx context.Context) error { decoder := json.NewDecoder(resp.Body) oldCursor := cursor + var lastTimestamp time.Time + for { var entry plc.OperationLogEntry err := decoder.Decode(&entry) @@ -155,6 +181,7 @@ func (m *Mirror) runOnce(ctx context.Context) error { t, err := time.Parse(time.RFC3339, row.PLCTimestamp) if err == nil { lastEventTimestamp.Set(float64(t.Unix())) + lastTimestamp = t } else { log.Warn().Msgf("Failed to parse %q: %s", row.PLCTimestamp, err) } @@ -174,6 +201,12 @@ func (m *Mirror) runOnce(ctx context.Context) error { return fmt.Errorf("inserting log entry into database: %w", err) } + if !lastTimestamp.IsZero() { + m.mu.Lock() + m.lastRecordTimestamp = lastTimestamp + m.mu.Unlock() + } + log.Info().Msgf("Got %d log entries. New cursor: %q", len(newEntries), cursor) } return nil diff --git a/cmd/plc-mirror/serve.go b/cmd/plc-mirror/serve.go index bff15ff..bc49847 100644 --- a/cmd/plc-mirror/serve.go +++ b/cmd/plc-mirror/serve.go @@ -51,11 +51,7 @@ func (s *Server) Ready(w http.ResponseWriter, req *http.Request) { if err != nil { return respond.InternalServerError(err.Error()) } - t, err := time.Parse(time.RFC3339, ts) - if err != nil { - return respond.InternalServerError(err.Error()) - } - delay := time.Since(t) + delay := time.Since(ts) if delay > s.MaxDelay { return respond.ServiceUnavailable(fmt.Sprintf("still %s behind", delay)) }