diff --git a/cmd/plc-mirror/metrics.go b/cmd/plc-mirror/metrics.go deleted file mode 100644 index 68e511e..0000000 --- a/cmd/plc-mirror/metrics.go +++ /dev/null @@ -1,22 +0,0 @@ -package main - -import ( - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" -) - -var lastEventTimestamp = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "plcmirror_last_op_timestamp", - Help: "Timestamp of the last operation received from upstream.", -}) - -var requestCount = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "plcmirror_inbound_requests_total", - Help: "Counter of received requests.", -}, []string{"status"}) - -var requestLatency = promauto.NewHistogramVec(prometheus.HistogramOpts{ - Name: "plcmirror_response_latency_millisecond", - Help: "Latency of responses.", - Buckets: prometheus.ExponentialBucketsRange(0.1, 30000, 20), -}, []string{"status"}) diff --git a/cmd/plc-mirror/mirror.go b/cmd/plc-mirror/mirror.go index cf7525c..88f7d91 100644 --- a/cmd/plc-mirror/mirror.go +++ b/cmd/plc-mirror/mirror.go @@ -36,9 +36,8 @@ type Mirror struct { upstream *url.URL limiter *rate.Limiter - mu sync.RWMutex - lastCompletionTimestamp time.Time - lastRecordTimestamp time.Time + mu sync.RWMutex + lastSuccessTimestamp time.Time } func NewMirror(ctx context.Context, upstream string, db *gorm.DB) (*Mirror, error) { @@ -78,7 +77,7 @@ func (m *Mirror) run(ctx context.Context) { } else { now := time.Now() m.mu.Lock() - m.lastCompletionTimestamp = now + m.lastSuccessTimestamp = now m.mu.Unlock() } time.Sleep(10 * time.Second) @@ -86,39 +85,16 @@ func (m *Mirror) run(ctx context.Context) { } } -func (m *Mirror) LastCompletion() time.Time { +func (m *Mirror) LastSuccess() time.Time { m.mu.RLock() defer m.mu.RUnlock() - return m.lastCompletionTimestamp + return m.lastSuccessTimestamp } -func (m *Mirror) LastRecordTimestamp(ctx context.Context) (time.Time, error) { - m.mu.RLock() - t := m.lastRecordTimestamp - m.mu.RUnlock() - if !t.IsZero() { - return t, nil - } - +func (m *Mirror) LastRecordTimestamp(ctx context.Context) (string, error) { ts := "" err := m.db.WithContext(ctx).Model(&PLCLogEntry{}).Select("plc_timestamp").Order("plc_timestamp desc").Limit(1).Take(&ts).Error - if err != nil { - return time.Time{}, err - } - dbTimestamp, err := time.Parse(time.RFC3339, ts) - if err != nil { - return time.Time{}, fmt.Errorf("parsing timestamp %q: %w", ts, err) - } - - m.mu.Lock() - defer m.mu.Unlock() - if m.lastRecordTimestamp.IsZero() { - m.lastRecordTimestamp = dbTimestamp - } - if m.lastRecordTimestamp.After(dbTimestamp) { - return m.lastRecordTimestamp, nil - } - return dbTimestamp, nil + return ts, err } func (m *Mirror) runOnce(ctx context.Context) error { @@ -162,8 +138,6 @@ func (m *Mirror) runOnce(ctx context.Context) error { decoder := json.NewDecoder(resp.Body) oldCursor := cursor - var lastTimestamp time.Time - for { var entry plc.OperationLogEntry err := decoder.Decode(&entry) @@ -175,16 +149,7 @@ func (m *Mirror) runOnce(ctx context.Context) error { } cursor = entry.CreatedAt - row := *FromOperationLogEntry(entry) - newEntries = append(newEntries, row) - - t, err := time.Parse(time.RFC3339, row.PLCTimestamp) - if err == nil { - lastEventTimestamp.Set(float64(t.Unix())) - lastTimestamp = t - } else { - log.Warn().Msgf("Failed to parse %q: %s", row.PLCTimestamp, err) - } + newEntries = append(newEntries, *FromOperationLogEntry(entry)) } if len(newEntries) == 0 || cursor == oldCursor { @@ -201,12 +166,6 @@ func (m *Mirror) runOnce(ctx context.Context) error { return fmt.Errorf("inserting log entry into database: %w", err) } - if !lastTimestamp.IsZero() { - m.mu.Lock() - m.lastRecordTimestamp = lastTimestamp - m.mu.Unlock() - } - log.Info().Msgf("Got %d log entries. New cursor: %q", len(newEntries), cursor) } return nil diff --git a/cmd/plc-mirror/serve.go b/cmd/plc-mirror/serve.go index 9d42036..637dfeb 100644 --- a/cmd/plc-mirror/serve.go +++ b/cmd/plc-mirror/serve.go @@ -51,7 +51,11 @@ func (s *Server) Ready(w http.ResponseWriter, req *http.Request) { if err != nil { return respond.InternalServerError(err.Error()) } - delay := time.Since(ts) + t, err := time.Parse(time.RFC3339, ts) + if err != nil { + return respond.InternalServerError(err.Error()) + } + delay := time.Since(t) if delay > s.MaxDelay { return respond.ServiceUnavailable(fmt.Sprintf("still %s behind", delay)) } @@ -60,45 +64,24 @@ func (s *Server) Ready(w http.ResponseWriter, req *http.Request) { } func (s *Server) serve(ctx context.Context, req *http.Request) convreq.HttpResponse { - start := time.Now() - updateMetrics := func(c int) { - requestCount.WithLabelValues(fmt.Sprint(c)).Inc() - requestLatency.WithLabelValues(fmt.Sprint(c)).Observe(float64(time.Now().Sub(start)) / float64(time.Millisecond)) - } - - // Check if the mirror is up to date. - ts, err := s.mirror.LastRecordTimestamp(ctx) - if err != nil { - return respond.InternalServerError(err.Error()) - } - delay := time.Since(ts) + delay := time.Since(s.mirror.LastSuccess()) if delay > s.MaxDelay { - // Check LastCompletion and if it's recent enough - that means - // that we're actually caught up and there simply aren't any recent - // PLC operations. - completionDelay := time.Since(s.mirror.LastCompletion()) - if completionDelay > s.MaxDelay { - updateMetrics(http.StatusServiceUnavailable) - return respond.ServiceUnavailable(fmt.Sprintf("mirror is %s behind", delay)) - } + return respond.ServiceUnavailable(fmt.Sprintf("mirror is %s behind", delay)) } log := zerolog.Ctx(ctx) requestedDid := strings.ToLower(strings.TrimPrefix(req.URL.Path, "/")) var entry PLCLogEntry - err = s.db.Model(&entry).Where("did = ? AND (NOT nullified)", requestedDid).Order("plc_timestamp desc").Limit(1).Take(&entry).Error + err := s.db.Model(&entry).Where("did = ? AND (NOT nullified)", requestedDid).Order("plc_timestamp desc").Limit(1).Take(&entry).Error if errors.Is(err, gorm.ErrRecordNotFound) { - updateMetrics(http.StatusNotFound) return respond.NotFound("unknown DID") } if err != nil { log.Error().Err(err).Str("did", requestedDid).Msgf("Failed to get the last log entry for %q: %s", requestedDid, err) - updateMetrics(http.StatusInternalServerError) return respond.InternalServerError("failed to get the last log entry") } if _, ok := entry.Operation.Value.(plc.Tombstone); ok { - updateMetrics(http.StatusNotFound) return respond.NotFound("DID deleted") } @@ -156,7 +139,7 @@ func (s *Server) serve(ctx context.Context, req *http.Request) convreq.HttpRespo } } } - updateMetrics(http.StatusOK) + return respond.JSON(r) }