diff --git a/cmd/plc-mirror/metrics.go b/cmd/plc-mirror/metrics.go new file mode 100644 index 0000000..68e511e --- /dev/null +++ b/cmd/plc-mirror/metrics.go @@ -0,0 +1,22 @@ +package main + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var lastEventTimestamp = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "plcmirror_last_op_timestamp", + Help: "Timestamp of the last operation received from upstream.", +}) + +var requestCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "plcmirror_inbound_requests_total", + Help: "Counter of received requests.", +}, []string{"status"}) + +var requestLatency = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "plcmirror_response_latency_millisecond", + Help: "Latency of responses.", + Buckets: prometheus.ExponentialBucketsRange(0.1, 30000, 20), +}, []string{"status"}) diff --git a/cmd/plc-mirror/mirror.go b/cmd/plc-mirror/mirror.go index 88f7d91..cf7525c 100644 --- a/cmd/plc-mirror/mirror.go +++ b/cmd/plc-mirror/mirror.go @@ -36,8 +36,9 @@ type Mirror struct { upstream *url.URL limiter *rate.Limiter - mu sync.RWMutex - lastSuccessTimestamp time.Time + mu sync.RWMutex + lastCompletionTimestamp time.Time + lastRecordTimestamp time.Time } func NewMirror(ctx context.Context, upstream string, db *gorm.DB) (*Mirror, error) { @@ -77,7 +78,7 @@ func (m *Mirror) run(ctx context.Context) { } else { now := time.Now() m.mu.Lock() - m.lastSuccessTimestamp = now + m.lastCompletionTimestamp = now m.mu.Unlock() } time.Sleep(10 * time.Second) @@ -85,16 +86,39 @@ func (m *Mirror) run(ctx context.Context) { } } -func (m *Mirror) LastSuccess() time.Time { +func (m *Mirror) LastCompletion() time.Time { m.mu.RLock() defer m.mu.RUnlock() - return m.lastSuccessTimestamp + return m.lastCompletionTimestamp } -func (m *Mirror) LastRecordTimestamp(ctx context.Context) (string, error) { +func (m *Mirror) LastRecordTimestamp(ctx context.Context) (time.Time, error) { + m.mu.RLock() + t := m.lastRecordTimestamp + m.mu.RUnlock() + if !t.IsZero() { + return t, nil + } + ts := "" err := m.db.WithContext(ctx).Model(&PLCLogEntry{}).Select("plc_timestamp").Order("plc_timestamp desc").Limit(1).Take(&ts).Error - return ts, err + if err != nil { + return time.Time{}, err + } + dbTimestamp, err := time.Parse(time.RFC3339, ts) + if err != nil { + return time.Time{}, fmt.Errorf("parsing timestamp %q: %w", ts, err) + } + + m.mu.Lock() + defer m.mu.Unlock() + if m.lastRecordTimestamp.IsZero() { + m.lastRecordTimestamp = dbTimestamp + } + if m.lastRecordTimestamp.After(dbTimestamp) { + return m.lastRecordTimestamp, nil + } + return dbTimestamp, nil } func (m *Mirror) runOnce(ctx context.Context) error { @@ -138,6 +162,8 @@ func (m *Mirror) runOnce(ctx context.Context) error { decoder := json.NewDecoder(resp.Body) oldCursor := cursor + var lastTimestamp time.Time + for { var entry plc.OperationLogEntry err := decoder.Decode(&entry) @@ -149,7 +175,16 @@ func (m *Mirror) runOnce(ctx context.Context) error { } cursor = entry.CreatedAt - newEntries = append(newEntries, *FromOperationLogEntry(entry)) + row := *FromOperationLogEntry(entry) + newEntries = append(newEntries, row) + + t, err := time.Parse(time.RFC3339, row.PLCTimestamp) + if err == nil { + lastEventTimestamp.Set(float64(t.Unix())) + lastTimestamp = t + } else { + log.Warn().Msgf("Failed to parse %q: %s", row.PLCTimestamp, err) + } } if len(newEntries) == 0 || cursor == oldCursor { @@ -166,6 +201,12 @@ func (m *Mirror) runOnce(ctx context.Context) error { return fmt.Errorf("inserting log entry into database: %w", err) } + if !lastTimestamp.IsZero() { + m.mu.Lock() + m.lastRecordTimestamp = lastTimestamp + m.mu.Unlock() + } + log.Info().Msgf("Got %d log entries. New cursor: %q", len(newEntries), cursor) } return nil diff --git a/cmd/plc-mirror/serve.go b/cmd/plc-mirror/serve.go index 637dfeb..9d42036 100644 --- a/cmd/plc-mirror/serve.go +++ b/cmd/plc-mirror/serve.go @@ -51,11 +51,7 @@ func (s *Server) Ready(w http.ResponseWriter, req *http.Request) { if err != nil { return respond.InternalServerError(err.Error()) } - t, err := time.Parse(time.RFC3339, ts) - if err != nil { - return respond.InternalServerError(err.Error()) - } - delay := time.Since(t) + delay := time.Since(ts) if delay > s.MaxDelay { return respond.ServiceUnavailable(fmt.Sprintf("still %s behind", delay)) } @@ -64,24 +60,45 @@ func (s *Server) Ready(w http.ResponseWriter, req *http.Request) { } func (s *Server) serve(ctx context.Context, req *http.Request) convreq.HttpResponse { - delay := time.Since(s.mirror.LastSuccess()) + start := time.Now() + updateMetrics := func(c int) { + requestCount.WithLabelValues(fmt.Sprint(c)).Inc() + requestLatency.WithLabelValues(fmt.Sprint(c)).Observe(float64(time.Now().Sub(start)) / float64(time.Millisecond)) + } + + // Check if the mirror is up to date. + ts, err := s.mirror.LastRecordTimestamp(ctx) + if err != nil { + return respond.InternalServerError(err.Error()) + } + delay := time.Since(ts) if delay > s.MaxDelay { - return respond.ServiceUnavailable(fmt.Sprintf("mirror is %s behind", delay)) + // Check LastCompletion and if it's recent enough - that means + // that we're actually caught up and there simply aren't any recent + // PLC operations. + completionDelay := time.Since(s.mirror.LastCompletion()) + if completionDelay > s.MaxDelay { + updateMetrics(http.StatusServiceUnavailable) + return respond.ServiceUnavailable(fmt.Sprintf("mirror is %s behind", delay)) + } } log := zerolog.Ctx(ctx) requestedDid := strings.ToLower(strings.TrimPrefix(req.URL.Path, "/")) var entry PLCLogEntry - err := s.db.Model(&entry).Where("did = ? AND (NOT nullified)", requestedDid).Order("plc_timestamp desc").Limit(1).Take(&entry).Error + err = s.db.Model(&entry).Where("did = ? AND (NOT nullified)", requestedDid).Order("plc_timestamp desc").Limit(1).Take(&entry).Error if errors.Is(err, gorm.ErrRecordNotFound) { + updateMetrics(http.StatusNotFound) return respond.NotFound("unknown DID") } if err != nil { log.Error().Err(err).Str("did", requestedDid).Msgf("Failed to get the last log entry for %q: %s", requestedDid, err) + updateMetrics(http.StatusInternalServerError) return respond.InternalServerError("failed to get the last log entry") } if _, ok := entry.Operation.Value.(plc.Tombstone); ok { + updateMetrics(http.StatusNotFound) return respond.NotFound("DID deleted") } @@ -139,7 +156,7 @@ func (s *Server) serve(ctx context.Context, req *http.Request) convreq.HttpRespo } } } - + updateMetrics(http.StatusOK) return respond.JSON(r) }