Compare commits

..

No commits in common. "b00ce6d7999d1ce46731738dc698020fc9f1c92e" and "61f751b59a28f00fab7464ec5132711524301ceb" have entirely different histories.

3 changed files with 17 additions and 97 deletions

View File

@ -1,22 +0,0 @@
package main
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var lastEventTimestamp = promauto.NewGauge(prometheus.GaugeOpts{
Name: "plcmirror_last_op_timestamp",
Help: "Timestamp of the last operation received from upstream.",
})
var requestCount = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "plcmirror_inbound_requests_total",
Help: "Counter of received requests.",
}, []string{"status"})
var requestLatency = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "plcmirror_response_latency_millisecond",
Help: "Latency of responses.",
Buckets: prometheus.ExponentialBucketsRange(0.1, 30000, 20),
}, []string{"status"})

View File

@ -37,8 +37,7 @@ type Mirror struct {
limiter *rate.Limiter limiter *rate.Limiter
mu sync.RWMutex mu sync.RWMutex
lastCompletionTimestamp time.Time lastSuccessTimestamp time.Time
lastRecordTimestamp time.Time
} }
func NewMirror(ctx context.Context, upstream string, db *gorm.DB) (*Mirror, error) { func NewMirror(ctx context.Context, upstream string, db *gorm.DB) (*Mirror, error) {
@ -78,7 +77,7 @@ func (m *Mirror) run(ctx context.Context) {
} else { } else {
now := time.Now() now := time.Now()
m.mu.Lock() m.mu.Lock()
m.lastCompletionTimestamp = now m.lastSuccessTimestamp = now
m.mu.Unlock() m.mu.Unlock()
} }
time.Sleep(10 * time.Second) time.Sleep(10 * time.Second)
@ -86,39 +85,16 @@ func (m *Mirror) run(ctx context.Context) {
} }
} }
func (m *Mirror) LastCompletion() time.Time { func (m *Mirror) LastSuccess() time.Time {
m.mu.RLock() m.mu.RLock()
defer m.mu.RUnlock() defer m.mu.RUnlock()
return m.lastCompletionTimestamp return m.lastSuccessTimestamp
} }
func (m *Mirror) LastRecordTimestamp(ctx context.Context) (time.Time, error) { func (m *Mirror) LastRecordTimestamp(ctx context.Context) (string, error) {
m.mu.RLock()
t := m.lastRecordTimestamp
m.mu.RUnlock()
if !t.IsZero() {
return t, nil
}
ts := "" ts := ""
err := m.db.WithContext(ctx).Model(&PLCLogEntry{}).Select("plc_timestamp").Order("plc_timestamp desc").Limit(1).Take(&ts).Error err := m.db.WithContext(ctx).Model(&PLCLogEntry{}).Select("plc_timestamp").Order("plc_timestamp desc").Limit(1).Take(&ts).Error
if err != nil { return ts, err
return time.Time{}, err
}
dbTimestamp, err := time.Parse(time.RFC3339, ts)
if err != nil {
return time.Time{}, fmt.Errorf("parsing timestamp %q: %w", ts, err)
}
m.mu.Lock()
defer m.mu.Unlock()
if m.lastRecordTimestamp.IsZero() {
m.lastRecordTimestamp = dbTimestamp
}
if m.lastRecordTimestamp.After(dbTimestamp) {
return m.lastRecordTimestamp, nil
}
return dbTimestamp, nil
} }
func (m *Mirror) runOnce(ctx context.Context) error { func (m *Mirror) runOnce(ctx context.Context) error {
@ -162,8 +138,6 @@ func (m *Mirror) runOnce(ctx context.Context) error {
decoder := json.NewDecoder(resp.Body) decoder := json.NewDecoder(resp.Body)
oldCursor := cursor oldCursor := cursor
var lastTimestamp time.Time
for { for {
var entry plc.OperationLogEntry var entry plc.OperationLogEntry
err := decoder.Decode(&entry) err := decoder.Decode(&entry)
@ -175,16 +149,7 @@ func (m *Mirror) runOnce(ctx context.Context) error {
} }
cursor = entry.CreatedAt cursor = entry.CreatedAt
row := *FromOperationLogEntry(entry) newEntries = append(newEntries, *FromOperationLogEntry(entry))
newEntries = append(newEntries, row)
t, err := time.Parse(time.RFC3339, row.PLCTimestamp)
if err == nil {
lastEventTimestamp.Set(float64(t.Unix()))
lastTimestamp = t
} else {
log.Warn().Msgf("Failed to parse %q: %s", row.PLCTimestamp, err)
}
} }
if len(newEntries) == 0 || cursor == oldCursor { if len(newEntries) == 0 || cursor == oldCursor {
@ -201,12 +166,6 @@ func (m *Mirror) runOnce(ctx context.Context) error {
return fmt.Errorf("inserting log entry into database: %w", err) return fmt.Errorf("inserting log entry into database: %w", err)
} }
if !lastTimestamp.IsZero() {
m.mu.Lock()
m.lastRecordTimestamp = lastTimestamp
m.mu.Unlock()
}
log.Info().Msgf("Got %d log entries. New cursor: %q", len(newEntries), cursor) log.Info().Msgf("Got %d log entries. New cursor: %q", len(newEntries), cursor)
} }
return nil return nil

View File

@ -51,7 +51,11 @@ func (s *Server) Ready(w http.ResponseWriter, req *http.Request) {
if err != nil { if err != nil {
return respond.InternalServerError(err.Error()) return respond.InternalServerError(err.Error())
} }
delay := time.Since(ts) t, err := time.Parse(time.RFC3339, ts)
if err != nil {
return respond.InternalServerError(err.Error())
}
delay := time.Since(t)
if delay > s.MaxDelay { if delay > s.MaxDelay {
return respond.ServiceUnavailable(fmt.Sprintf("still %s behind", delay)) return respond.ServiceUnavailable(fmt.Sprintf("still %s behind", delay))
} }
@ -60,45 +64,24 @@ func (s *Server) Ready(w http.ResponseWriter, req *http.Request) {
} }
func (s *Server) serve(ctx context.Context, req *http.Request) convreq.HttpResponse { func (s *Server) serve(ctx context.Context, req *http.Request) convreq.HttpResponse {
start := time.Now() delay := time.Since(s.mirror.LastSuccess())
updateMetrics := func(c int) {
requestCount.WithLabelValues(fmt.Sprint(c)).Inc()
requestLatency.WithLabelValues(fmt.Sprint(c)).Observe(float64(time.Now().Sub(start)) / float64(time.Millisecond))
}
// Check if the mirror is up to date.
ts, err := s.mirror.LastRecordTimestamp(ctx)
if err != nil {
return respond.InternalServerError(err.Error())
}
delay := time.Since(ts)
if delay > s.MaxDelay { if delay > s.MaxDelay {
// Check LastCompletion and if it's recent enough - that means
// that we're actually caught up and there simply aren't any recent
// PLC operations.
completionDelay := time.Since(s.mirror.LastCompletion())
if completionDelay > s.MaxDelay {
updateMetrics(http.StatusServiceUnavailable)
return respond.ServiceUnavailable(fmt.Sprintf("mirror is %s behind", delay)) return respond.ServiceUnavailable(fmt.Sprintf("mirror is %s behind", delay))
} }
}
log := zerolog.Ctx(ctx) log := zerolog.Ctx(ctx)
requestedDid := strings.ToLower(strings.TrimPrefix(req.URL.Path, "/")) requestedDid := strings.ToLower(strings.TrimPrefix(req.URL.Path, "/"))
var entry PLCLogEntry var entry PLCLogEntry
err = s.db.Model(&entry).Where("did = ? AND (NOT nullified)", requestedDid).Order("plc_timestamp desc").Limit(1).Take(&entry).Error err := s.db.Model(&entry).Where("did = ? AND (NOT nullified)", requestedDid).Order("plc_timestamp desc").Limit(1).Take(&entry).Error
if errors.Is(err, gorm.ErrRecordNotFound) { if errors.Is(err, gorm.ErrRecordNotFound) {
updateMetrics(http.StatusNotFound)
return respond.NotFound("unknown DID") return respond.NotFound("unknown DID")
} }
if err != nil { if err != nil {
log.Error().Err(err).Str("did", requestedDid).Msgf("Failed to get the last log entry for %q: %s", requestedDid, err) log.Error().Err(err).Str("did", requestedDid).Msgf("Failed to get the last log entry for %q: %s", requestedDid, err)
updateMetrics(http.StatusInternalServerError)
return respond.InternalServerError("failed to get the last log entry") return respond.InternalServerError("failed to get the last log entry")
} }
if _, ok := entry.Operation.Value.(plc.Tombstone); ok { if _, ok := entry.Operation.Value.(plc.Tombstone); ok {
updateMetrics(http.StatusNotFound)
return respond.NotFound("DID deleted") return respond.NotFound("DID deleted")
} }
@ -156,7 +139,7 @@ func (s *Server) serve(ctx context.Context, req *http.Request) convreq.HttpRespo
} }
} }
} }
updateMetrics(http.StatusOK)
return respond.JSON(r) return respond.JSON(r)
} }