diff --git a/cmd/plc-mirror/metrics.go b/cmd/plc-mirror/metrics.go new file mode 100644 index 0000000..a5cbdad --- /dev/null +++ b/cmd/plc-mirror/metrics.go @@ -0,0 +1,21 @@ +package main + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var lastEventTimestamp = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "plcmirror_last_op_timestamp", + Help: "Timestamp of the last operation received from upstream.", +}) + +var requestCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "plcmirror_inbound_requests_total", + Help: "Counter of received requests.", +}, []string{"status"}) + +var requestLatency = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "plcmirror_response_latency_millisecond", + Help: "Latency of responses.", +}, []string{"status"}) diff --git a/cmd/plc-mirror/mirror.go b/cmd/plc-mirror/mirror.go index b4bd6a0..133f952 100644 --- a/cmd/plc-mirror/mirror.go +++ b/cmd/plc-mirror/mirror.go @@ -149,7 +149,15 @@ func (m *Mirror) runOnce(ctx context.Context) error { } cursor = entry.CreatedAt - newEntries = append(newEntries, *FromOperationLogEntry(entry)) + row := *FromOperationLogEntry(entry) + newEntries = append(newEntries, row) + + t, err := time.Parse(time.RFC3339, row.PLCTimestamp) + if err == nil { + lastEventTimestamp.Set(float64(t.Unix())) + } else { + log.Warn().Msgf("Failed to parse %q: %s", row.PLCTimestamp, err) + } } if len(newEntries) == 0 || cursor == oldCursor { diff --git a/cmd/plc-mirror/serve.go b/cmd/plc-mirror/serve.go index 3f18a0d..ef739a1 100644 --- a/cmd/plc-mirror/serve.go +++ b/cmd/plc-mirror/serve.go @@ -64,8 +64,15 @@ func (s *Server) Ready(w http.ResponseWriter, req *http.Request) { } func (s *Server) serve(ctx context.Context, req *http.Request) convreq.HttpResponse { + start := time.Now() + updateMetrics := func(c int) { + requestCount.WithLabelValues(fmt.Sprint(c)).Inc() + requestLatency.WithLabelValues(fmt.Sprint(c)).Observe(float64(time.Now().Sub(start).Milliseconds())) + } + delay := time.Since(s.mirror.LastSuccess()) if delay > s.MaxDelay { + updateMetrics(http.StatusServiceUnavailable) return respond.ServiceUnavailable(fmt.Sprintf("mirror is %s behind", delay)) } log := zerolog.Ctx(ctx) @@ -74,14 +81,17 @@ func (s *Server) serve(ctx context.Context, req *http.Request) convreq.HttpRespo var entry PLCLogEntry err := s.db.Model(&entry).Where("did = ? AND (NOT nullified)", requestedDid).Order("plc_timestamp desc").Limit(1).Take(&entry).Error if errors.Is(err, gorm.ErrRecordNotFound) { + updateMetrics(http.StatusNotFound) return respond.NotFound("unknown DID") } if err != nil { log.Error().Err(err).Str("did", requestedDid).Msgf("Failed to get the last log entry for %q: %s", requestedDid, err) + updateMetrics(http.StatusInternalServerError) return respond.InternalServerError("failed to get the last log entry") } if _, ok := entry.Operation.Value.(plc.Tombstone); ok { + updateMetrics(http.StatusNotFound) return respond.NotFound("DID deleted") } @@ -139,7 +149,7 @@ func (s *Server) serve(ctx context.Context, req *http.Request) convreq.HttpRespo } } } - + updateMetrics(http.StatusOK) return respond.JSON(r) }