diff --git a/cmd/plc-mirror/mirror.go b/cmd/plc-mirror/mirror.go index 808de96..1666fde 100644 --- a/cmd/plc-mirror/mirror.go +++ b/cmd/plc-mirror/mirror.go @@ -36,9 +36,9 @@ type Mirror struct { upstream *url.URL limiter *rate.Limiter - mu sync.RWMutex - lastSuccessTimestamp time.Time - lastRecordTimestamp time.Time + mu sync.RWMutex + lastCompletionTimestamp time.Time + lastRecordTimestamp time.Time } func NewMirror(ctx context.Context, upstream string, db *gorm.DB) (*Mirror, error) { @@ -78,7 +78,7 @@ func (m *Mirror) run(ctx context.Context) { } else { now := time.Now() m.mu.Lock() - m.lastSuccessTimestamp = now + m.lastCompletionTimestamp = now m.mu.Unlock() } time.Sleep(10 * time.Second) @@ -86,10 +86,10 @@ func (m *Mirror) run(ctx context.Context) { } } -func (m *Mirror) LastSuccess() time.Time { +func (m *Mirror) LastCompletion() time.Time { m.mu.RLock() defer m.mu.RUnlock() - return m.lastSuccessTimestamp + return m.lastCompletionTimestamp } func (m *Mirror) LastRecordTimestamp(ctx context.Context) (time.Time, error) { @@ -201,12 +201,11 @@ func (m *Mirror) runOnce(ctx context.Context) error { return fmt.Errorf("inserting log entry into database: %w", err) } - m.mu.Lock() - m.lastSuccessTimestamp = time.Now() if !lastTimestamp.IsZero() { + m.mu.Lock() m.lastRecordTimestamp = lastTimestamp + m.mu.Unlock() } - m.mu.Unlock() log.Info().Msgf("Got %d log entries. New cursor: %q", len(newEntries), cursor) } diff --git a/cmd/plc-mirror/serve.go b/cmd/plc-mirror/serve.go index 520de27..245881d 100644 --- a/cmd/plc-mirror/serve.go +++ b/cmd/plc-mirror/serve.go @@ -66,14 +66,21 @@ func (s *Server) serve(ctx context.Context, req *http.Request) convreq.HttpRespo requestLatency.WithLabelValues(fmt.Sprint(c)).Observe(float64(time.Now().Sub(start)) / float64(time.Millisecond)) } + // Check if the mirror is up to date. ts, err := s.mirror.LastRecordTimestamp(ctx) if err != nil { return respond.InternalServerError(err.Error()) } delay := time.Since(ts) if delay > s.MaxDelay { - updateMetrics(http.StatusServiceUnavailable) - return respond.ServiceUnavailable(fmt.Sprintf("mirror is %s behind", delay)) + // Check LastCompletion and if it's recent enough - that means + // that we're actually caught up and there simply aren't any recent + // PLC operations. + completionDelay := time.Since(s.mirror.LastCompletion()) + if completionDelay > s.MaxDelay { + updateMetrics(http.StatusServiceUnavailable) + return respond.ServiceUnavailable(fmt.Sprintf("mirror is %s behind", delay)) + } } log := zerolog.Ctx(ctx)