From d0fd66f83f68ad5c048b6e57173a7edb2f7d0098 Mon Sep 17 00:00:00 2001 From: Max Ignatenko Date: Sat, 26 Oct 2024 21:31:44 +0100 Subject: [PATCH 1/8] feature: a few exported metrics --- cmd/plc-mirror/metrics.go | 21 +++++++++++++++++++++ cmd/plc-mirror/mirror.go | 10 +++++++++- cmd/plc-mirror/serve.go | 12 +++++++++++- 3 files changed, 41 insertions(+), 2 deletions(-) create mode 100644 cmd/plc-mirror/metrics.go diff --git a/cmd/plc-mirror/metrics.go b/cmd/plc-mirror/metrics.go new file mode 100644 index 0000000..a5cbdad --- /dev/null +++ b/cmd/plc-mirror/metrics.go @@ -0,0 +1,21 @@ +package main + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var lastEventTimestamp = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "plcmirror_last_op_timestamp", + Help: "Timestamp of the last operation received from upstream.", +}) + +var requestCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "plcmirror_inbound_requests_total", + Help: "Counter of received requests.", +}, []string{"status"}) + +var requestLatency = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "plcmirror_response_latency_millisecond", + Help: "Latency of responses.", +}, []string{"status"}) diff --git a/cmd/plc-mirror/mirror.go b/cmd/plc-mirror/mirror.go index b4bd6a0..133f952 100644 --- a/cmd/plc-mirror/mirror.go +++ b/cmd/plc-mirror/mirror.go @@ -149,7 +149,15 @@ func (m *Mirror) runOnce(ctx context.Context) error { } cursor = entry.CreatedAt - newEntries = append(newEntries, *FromOperationLogEntry(entry)) + row := *FromOperationLogEntry(entry) + newEntries = append(newEntries, row) + + t, err := time.Parse(time.RFC3339, row.PLCTimestamp) + if err == nil { + lastEventTimestamp.Set(float64(t.Unix())) + } else { + log.Warn().Msgf("Failed to parse %q: %s", row.PLCTimestamp, err) + } } if len(newEntries) == 0 || cursor == oldCursor { diff --git a/cmd/plc-mirror/serve.go b/cmd/plc-mirror/serve.go index 3f18a0d..ef739a1 100644 --- a/cmd/plc-mirror/serve.go +++ b/cmd/plc-mirror/serve.go @@ -64,8 +64,15 @@ func (s *Server) Ready(w http.ResponseWriter, req *http.Request) { } func (s *Server) serve(ctx context.Context, req *http.Request) convreq.HttpResponse { + start := time.Now() + updateMetrics := func(c int) { + requestCount.WithLabelValues(fmt.Sprint(c)).Inc() + requestLatency.WithLabelValues(fmt.Sprint(c)).Observe(float64(time.Now().Sub(start).Milliseconds())) + } + delay := time.Since(s.mirror.LastSuccess()) if delay > s.MaxDelay { + updateMetrics(http.StatusServiceUnavailable) return respond.ServiceUnavailable(fmt.Sprintf("mirror is %s behind", delay)) } log := zerolog.Ctx(ctx) @@ -74,14 +81,17 @@ func (s *Server) serve(ctx context.Context, req *http.Request) convreq.HttpRespo var entry PLCLogEntry err := s.db.Model(&entry).Where("did = ? AND (NOT nullified)", requestedDid).Order("plc_timestamp desc").Limit(1).Take(&entry).Error if errors.Is(err, gorm.ErrRecordNotFound) { + updateMetrics(http.StatusNotFound) return respond.NotFound("unknown DID") } if err != nil { log.Error().Err(err).Str("did", requestedDid).Msgf("Failed to get the last log entry for %q: %s", requestedDid, err) + updateMetrics(http.StatusInternalServerError) return respond.InternalServerError("failed to get the last log entry") } if _, ok := entry.Operation.Value.(plc.Tombstone); ok { + updateMetrics(http.StatusNotFound) return respond.NotFound("DID deleted") } @@ -139,7 +149,7 @@ func (s *Server) serve(ctx context.Context, req *http.Request) convreq.HttpRespo } } } - + updateMetrics(http.StatusOK) return respond.JSON(r) } From f379f6b4b4ce02f1eb453c22330a99ec14cb1c11 Mon Sep 17 00:00:00 2001 From: Max Ignatenko Date: Sun, 27 Oct 2024 12:21:59 +0000 Subject: [PATCH 2/8] fix: tweak histogram buckets --- cmd/plc-mirror/metrics.go | 5 +++-- cmd/plc-mirror/serve.go | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/cmd/plc-mirror/metrics.go b/cmd/plc-mirror/metrics.go index a5cbdad..2a5cc4a 100644 --- a/cmd/plc-mirror/metrics.go +++ b/cmd/plc-mirror/metrics.go @@ -16,6 +16,7 @@ var requestCount = promauto.NewCounterVec(prometheus.CounterOpts{ }, []string{"status"}) var requestLatency = promauto.NewHistogramVec(prometheus.HistogramOpts{ - Name: "plcmirror_response_latency_millisecond", - Help: "Latency of responses.", + Name: "plcmirror_response_latency_millisecond", + Help: "Latency of responses.", + Buckets: prometheus.ExponentialBucketsRange(1, 30000, 20), }, []string{"status"}) diff --git a/cmd/plc-mirror/serve.go b/cmd/plc-mirror/serve.go index ef739a1..bff15ff 100644 --- a/cmd/plc-mirror/serve.go +++ b/cmd/plc-mirror/serve.go @@ -67,7 +67,7 @@ func (s *Server) serve(ctx context.Context, req *http.Request) convreq.HttpRespo start := time.Now() updateMetrics := func(c int) { requestCount.WithLabelValues(fmt.Sprint(c)).Inc() - requestLatency.WithLabelValues(fmt.Sprint(c)).Observe(float64(time.Now().Sub(start).Milliseconds())) + requestLatency.WithLabelValues(fmt.Sprint(c)).Observe(float64(time.Now().Sub(start)) / float64(time.Millisecond)) } delay := time.Since(s.mirror.LastSuccess()) From 80128ebcff96ae0a624db7cd909b910dc75366c0 Mon Sep 17 00:00:00 2001 From: Max Ignatenko Date: Sun, 27 Oct 2024 12:23:59 +0000 Subject: [PATCH 3/8] fix: change lower bound 1 -> 0.1 --- cmd/plc-mirror/metrics.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/plc-mirror/metrics.go b/cmd/plc-mirror/metrics.go index 2a5cc4a..68e511e 100644 --- a/cmd/plc-mirror/metrics.go +++ b/cmd/plc-mirror/metrics.go @@ -18,5 +18,5 @@ var requestCount = promauto.NewCounterVec(prometheus.CounterOpts{ var requestLatency = promauto.NewHistogramVec(prometheus.HistogramOpts{ Name: "plcmirror_response_latency_millisecond", Help: "Latency of responses.", - Buckets: prometheus.ExponentialBucketsRange(1, 30000, 20), + Buckets: prometheus.ExponentialBucketsRange(0.1, 30000, 20), }, []string{"status"}) From 04f2f80a06cdf25d0099a042173abed6606b5729 Mon Sep 17 00:00:00 2001 From: Max Ignatenko Date: Wed, 13 Nov 2024 18:56:22 +0000 Subject: [PATCH 4/8] fix: keep the timestamp of last record in memory instead of querying the DB every time --- cmd/plc-mirror/mirror.go | 37 +++++++++++++++++++++++++++++++++++-- cmd/plc-mirror/serve.go | 6 +----- 2 files changed, 36 insertions(+), 7 deletions(-) diff --git a/cmd/plc-mirror/mirror.go b/cmd/plc-mirror/mirror.go index 133f952..03ee903 100644 --- a/cmd/plc-mirror/mirror.go +++ b/cmd/plc-mirror/mirror.go @@ -38,6 +38,7 @@ type Mirror struct { mu sync.RWMutex lastSuccessTimestamp time.Time + lastRecordTimestamp time.Time } func NewMirror(ctx context.Context, upstream string, db *gorm.DB) (*Mirror, error) { @@ -91,10 +92,33 @@ func (m *Mirror) LastSuccess() time.Time { return m.lastSuccessTimestamp } -func (m *Mirror) LastRecordTimestamp(ctx context.Context) (string, error) { +func (m *Mirror) LastRecordTimestamp(ctx context.Context) (time.Time, error) { + m.mu.RLock() + t := m.lastRecordTimestamp + m.mu.RUnlock() + if !t.IsZero() { + return t, nil + } + ts := "" err := m.db.WithContext(ctx).Model(&PLCLogEntry{}).Select("plc_timestamp").Order("plc_timestamp desc").Limit(1).Take(&ts).Error - return ts, err + if err != nil { + return time.Time{}, err + } + dbTimestamp, err := time.Parse(time.RFC3339, ts) + if err != nil { + return time.Time{}, fmt.Errorf("parsing timestamp %q: %w", ts, err) + } + + m.mu.Lock() + defer m.mu.Unlock() + if t.IsZero() { + t = dbTimestamp + } + if t.After(dbTimestamp) { + return t, nil + } + return dbTimestamp, nil } func (m *Mirror) runOnce(ctx context.Context) error { @@ -138,6 +162,8 @@ func (m *Mirror) runOnce(ctx context.Context) error { decoder := json.NewDecoder(resp.Body) oldCursor := cursor + var lastTimestamp time.Time + for { var entry plc.OperationLogEntry err := decoder.Decode(&entry) @@ -155,6 +181,7 @@ func (m *Mirror) runOnce(ctx context.Context) error { t, err := time.Parse(time.RFC3339, row.PLCTimestamp) if err == nil { lastEventTimestamp.Set(float64(t.Unix())) + lastTimestamp = t } else { log.Warn().Msgf("Failed to parse %q: %s", row.PLCTimestamp, err) } @@ -174,6 +201,12 @@ func (m *Mirror) runOnce(ctx context.Context) error { return fmt.Errorf("inserting log entry into database: %w", err) } + if !lastTimestamp.IsZero() { + m.mu.Lock() + m.lastRecordTimestamp = lastTimestamp + m.mu.Unlock() + } + log.Info().Msgf("Got %d log entries. New cursor: %q", len(newEntries), cursor) } return nil diff --git a/cmd/plc-mirror/serve.go b/cmd/plc-mirror/serve.go index bff15ff..bc49847 100644 --- a/cmd/plc-mirror/serve.go +++ b/cmd/plc-mirror/serve.go @@ -51,11 +51,7 @@ func (s *Server) Ready(w http.ResponseWriter, req *http.Request) { if err != nil { return respond.InternalServerError(err.Error()) } - t, err := time.Parse(time.RFC3339, ts) - if err != nil { - return respond.InternalServerError(err.Error()) - } - delay := time.Since(t) + delay := time.Since(ts) if delay > s.MaxDelay { return respond.ServiceUnavailable(fmt.Sprintf("still %s behind", delay)) } From 2c6a7201efef6d5600901e8bbe9030778171f2c1 Mon Sep 17 00:00:00 2001 From: Max Ignatenko Date: Thu, 14 Nov 2024 12:17:01 +0000 Subject: [PATCH 5/8] fix: actually store the queried timestamp in the struct --- cmd/plc-mirror/mirror.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/plc-mirror/mirror.go b/cmd/plc-mirror/mirror.go index 03ee903..489b2a6 100644 --- a/cmd/plc-mirror/mirror.go +++ b/cmd/plc-mirror/mirror.go @@ -112,11 +112,11 @@ func (m *Mirror) LastRecordTimestamp(ctx context.Context) (time.Time, error) { m.mu.Lock() defer m.mu.Unlock() - if t.IsZero() { - t = dbTimestamp + if m.lastRecordTimestamp.IsZero() { + m.lastRecordTimestamp = dbTimestamp } - if t.After(dbTimestamp) { - return t, nil + if m.lastRecordTimestamp.After(dbTimestamp) { + return m.lastRecordTimestamp, nil } return dbTimestamp, nil } From b26866fe0cc90ca282372e99569276eca47e2f30 Mon Sep 17 00:00:00 2001 From: Max Ignatenko Date: Thu, 14 Nov 2024 12:34:13 +0000 Subject: [PATCH 6/8] fix: update lastSuccessTimestamp after each request too Updating only in run() can be delayed arbitrarily long under load. --- cmd/plc-mirror/mirror.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cmd/plc-mirror/mirror.go b/cmd/plc-mirror/mirror.go index 489b2a6..808de96 100644 --- a/cmd/plc-mirror/mirror.go +++ b/cmd/plc-mirror/mirror.go @@ -201,11 +201,12 @@ func (m *Mirror) runOnce(ctx context.Context) error { return fmt.Errorf("inserting log entry into database: %w", err) } + m.mu.Lock() + m.lastSuccessTimestamp = time.Now() if !lastTimestamp.IsZero() { - m.mu.Lock() m.lastRecordTimestamp = lastTimestamp - m.mu.Unlock() } + m.mu.Unlock() log.Info().Msgf("Got %d log entries. New cursor: %q", len(newEntries), cursor) } From 04ab6167c734202bd6a8b51447cf869440ef86ae Mon Sep 17 00:00:00 2001 From: Max Ignatenko Date: Thu, 14 Nov 2024 12:36:39 +0000 Subject: [PATCH 7/8] fix: use LastRecordTimestamp in serve() instead of LastSuccess --- cmd/plc-mirror/serve.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/cmd/plc-mirror/serve.go b/cmd/plc-mirror/serve.go index bc49847..520de27 100644 --- a/cmd/plc-mirror/serve.go +++ b/cmd/plc-mirror/serve.go @@ -66,7 +66,11 @@ func (s *Server) serve(ctx context.Context, req *http.Request) convreq.HttpRespo requestLatency.WithLabelValues(fmt.Sprint(c)).Observe(float64(time.Now().Sub(start)) / float64(time.Millisecond)) } - delay := time.Since(s.mirror.LastSuccess()) + ts, err := s.mirror.LastRecordTimestamp(ctx) + if err != nil { + return respond.InternalServerError(err.Error()) + } + delay := time.Since(ts) if delay > s.MaxDelay { updateMetrics(http.StatusServiceUnavailable) return respond.ServiceUnavailable(fmt.Sprintf("mirror is %s behind", delay)) @@ -75,7 +79,7 @@ func (s *Server) serve(ctx context.Context, req *http.Request) convreq.HttpRespo requestedDid := strings.ToLower(strings.TrimPrefix(req.URL.Path, "/")) var entry PLCLogEntry - err := s.db.Model(&entry).Where("did = ? AND (NOT nullified)", requestedDid).Order("plc_timestamp desc").Limit(1).Take(&entry).Error + err = s.db.Model(&entry).Where("did = ? AND (NOT nullified)", requestedDid).Order("plc_timestamp desc").Limit(1).Take(&entry).Error if errors.Is(err, gorm.ErrRecordNotFound) { updateMetrics(http.StatusNotFound) return respond.NotFound("unknown DID") From bad22e6422877b4e5b3c5aa51ba809fb0aa6a808 Mon Sep 17 00:00:00 2001 From: Max Ignatenko Date: Thu, 14 Nov 2024 12:44:42 +0000 Subject: [PATCH 8/8] fix: s/LastSuccess/LastCompletion/ and use it as orignally intended --- cmd/plc-mirror/mirror.go | 17 ++++++++--------- cmd/plc-mirror/serve.go | 11 +++++++++-- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/cmd/plc-mirror/mirror.go b/cmd/plc-mirror/mirror.go index 808de96..1666fde 100644 --- a/cmd/plc-mirror/mirror.go +++ b/cmd/plc-mirror/mirror.go @@ -36,9 +36,9 @@ type Mirror struct { upstream *url.URL limiter *rate.Limiter - mu sync.RWMutex - lastSuccessTimestamp time.Time - lastRecordTimestamp time.Time + mu sync.RWMutex + lastCompletionTimestamp time.Time + lastRecordTimestamp time.Time } func NewMirror(ctx context.Context, upstream string, db *gorm.DB) (*Mirror, error) { @@ -78,7 +78,7 @@ func (m *Mirror) run(ctx context.Context) { } else { now := time.Now() m.mu.Lock() - m.lastSuccessTimestamp = now + m.lastCompletionTimestamp = now m.mu.Unlock() } time.Sleep(10 * time.Second) @@ -86,10 +86,10 @@ func (m *Mirror) run(ctx context.Context) { } } -func (m *Mirror) LastSuccess() time.Time { +func (m *Mirror) LastCompletion() time.Time { m.mu.RLock() defer m.mu.RUnlock() - return m.lastSuccessTimestamp + return m.lastCompletionTimestamp } func (m *Mirror) LastRecordTimestamp(ctx context.Context) (time.Time, error) { @@ -201,12 +201,11 @@ func (m *Mirror) runOnce(ctx context.Context) error { return fmt.Errorf("inserting log entry into database: %w", err) } - m.mu.Lock() - m.lastSuccessTimestamp = time.Now() if !lastTimestamp.IsZero() { + m.mu.Lock() m.lastRecordTimestamp = lastTimestamp + m.mu.Unlock() } - m.mu.Unlock() log.Info().Msgf("Got %d log entries. New cursor: %q", len(newEntries), cursor) } diff --git a/cmd/plc-mirror/serve.go b/cmd/plc-mirror/serve.go index 520de27..245881d 100644 --- a/cmd/plc-mirror/serve.go +++ b/cmd/plc-mirror/serve.go @@ -66,14 +66,21 @@ func (s *Server) serve(ctx context.Context, req *http.Request) convreq.HttpRespo requestLatency.WithLabelValues(fmt.Sprint(c)).Observe(float64(time.Now().Sub(start)) / float64(time.Millisecond)) } + // Check if the mirror is up to date. ts, err := s.mirror.LastRecordTimestamp(ctx) if err != nil { return respond.InternalServerError(err.Error()) } delay := time.Since(ts) if delay > s.MaxDelay { - updateMetrics(http.StatusServiceUnavailable) - return respond.ServiceUnavailable(fmt.Sprintf("mirror is %s behind", delay)) + // Check LastCompletion and if it's recent enough - that means + // that we're actually caught up and there simply aren't any recent + // PLC operations. + completionDelay := time.Since(s.mirror.LastCompletion()) + if completionDelay > s.MaxDelay { + updateMetrics(http.StatusServiceUnavailable) + return respond.ServiceUnavailable(fmt.Sprintf("mirror is %s behind", delay)) + } } log := zerolog.Ctx(ctx)