Compare commits
No commits in common. "b00ce6d7999d1ce46731738dc698020fc9f1c92e" and "61f751b59a28f00fab7464ec5132711524301ceb" have entirely different histories.
b00ce6d799
...
61f751b59a
|
@ -1,22 +0,0 @@
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
|
||||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
|
||||||
)
|
|
||||||
|
|
||||||
var lastEventTimestamp = promauto.NewGauge(prometheus.GaugeOpts{
|
|
||||||
Name: "plcmirror_last_op_timestamp",
|
|
||||||
Help: "Timestamp of the last operation received from upstream.",
|
|
||||||
})
|
|
||||||
|
|
||||||
var requestCount = promauto.NewCounterVec(prometheus.CounterOpts{
|
|
||||||
Name: "plcmirror_inbound_requests_total",
|
|
||||||
Help: "Counter of received requests.",
|
|
||||||
}, []string{"status"})
|
|
||||||
|
|
||||||
var requestLatency = promauto.NewHistogramVec(prometheus.HistogramOpts{
|
|
||||||
Name: "plcmirror_response_latency_millisecond",
|
|
||||||
Help: "Latency of responses.",
|
|
||||||
Buckets: prometheus.ExponentialBucketsRange(0.1, 30000, 20),
|
|
||||||
}, []string{"status"})
|
|
|
@ -36,9 +36,8 @@ type Mirror struct {
|
||||||
upstream *url.URL
|
upstream *url.URL
|
||||||
limiter *rate.Limiter
|
limiter *rate.Limiter
|
||||||
|
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
lastCompletionTimestamp time.Time
|
lastSuccessTimestamp time.Time
|
||||||
lastRecordTimestamp time.Time
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMirror(ctx context.Context, upstream string, db *gorm.DB) (*Mirror, error) {
|
func NewMirror(ctx context.Context, upstream string, db *gorm.DB) (*Mirror, error) {
|
||||||
|
@ -78,7 +77,7 @@ func (m *Mirror) run(ctx context.Context) {
|
||||||
} else {
|
} else {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
m.mu.Lock()
|
m.mu.Lock()
|
||||||
m.lastCompletionTimestamp = now
|
m.lastSuccessTimestamp = now
|
||||||
m.mu.Unlock()
|
m.mu.Unlock()
|
||||||
}
|
}
|
||||||
time.Sleep(10 * time.Second)
|
time.Sleep(10 * time.Second)
|
||||||
|
@ -86,39 +85,16 @@ func (m *Mirror) run(ctx context.Context) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Mirror) LastCompletion() time.Time {
|
func (m *Mirror) LastSuccess() time.Time {
|
||||||
m.mu.RLock()
|
m.mu.RLock()
|
||||||
defer m.mu.RUnlock()
|
defer m.mu.RUnlock()
|
||||||
return m.lastCompletionTimestamp
|
return m.lastSuccessTimestamp
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Mirror) LastRecordTimestamp(ctx context.Context) (time.Time, error) {
|
func (m *Mirror) LastRecordTimestamp(ctx context.Context) (string, error) {
|
||||||
m.mu.RLock()
|
|
||||||
t := m.lastRecordTimestamp
|
|
||||||
m.mu.RUnlock()
|
|
||||||
if !t.IsZero() {
|
|
||||||
return t, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
ts := ""
|
ts := ""
|
||||||
err := m.db.WithContext(ctx).Model(&PLCLogEntry{}).Select("plc_timestamp").Order("plc_timestamp desc").Limit(1).Take(&ts).Error
|
err := m.db.WithContext(ctx).Model(&PLCLogEntry{}).Select("plc_timestamp").Order("plc_timestamp desc").Limit(1).Take(&ts).Error
|
||||||
if err != nil {
|
return ts, err
|
||||||
return time.Time{}, err
|
|
||||||
}
|
|
||||||
dbTimestamp, err := time.Parse(time.RFC3339, ts)
|
|
||||||
if err != nil {
|
|
||||||
return time.Time{}, fmt.Errorf("parsing timestamp %q: %w", ts, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
m.mu.Lock()
|
|
||||||
defer m.mu.Unlock()
|
|
||||||
if m.lastRecordTimestamp.IsZero() {
|
|
||||||
m.lastRecordTimestamp = dbTimestamp
|
|
||||||
}
|
|
||||||
if m.lastRecordTimestamp.After(dbTimestamp) {
|
|
||||||
return m.lastRecordTimestamp, nil
|
|
||||||
}
|
|
||||||
return dbTimestamp, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Mirror) runOnce(ctx context.Context) error {
|
func (m *Mirror) runOnce(ctx context.Context) error {
|
||||||
|
@ -162,8 +138,6 @@ func (m *Mirror) runOnce(ctx context.Context) error {
|
||||||
decoder := json.NewDecoder(resp.Body)
|
decoder := json.NewDecoder(resp.Body)
|
||||||
oldCursor := cursor
|
oldCursor := cursor
|
||||||
|
|
||||||
var lastTimestamp time.Time
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
var entry plc.OperationLogEntry
|
var entry plc.OperationLogEntry
|
||||||
err := decoder.Decode(&entry)
|
err := decoder.Decode(&entry)
|
||||||
|
@ -175,16 +149,7 @@ func (m *Mirror) runOnce(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
cursor = entry.CreatedAt
|
cursor = entry.CreatedAt
|
||||||
row := *FromOperationLogEntry(entry)
|
newEntries = append(newEntries, *FromOperationLogEntry(entry))
|
||||||
newEntries = append(newEntries, row)
|
|
||||||
|
|
||||||
t, err := time.Parse(time.RFC3339, row.PLCTimestamp)
|
|
||||||
if err == nil {
|
|
||||||
lastEventTimestamp.Set(float64(t.Unix()))
|
|
||||||
lastTimestamp = t
|
|
||||||
} else {
|
|
||||||
log.Warn().Msgf("Failed to parse %q: %s", row.PLCTimestamp, err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(newEntries) == 0 || cursor == oldCursor {
|
if len(newEntries) == 0 || cursor == oldCursor {
|
||||||
|
@ -201,12 +166,6 @@ func (m *Mirror) runOnce(ctx context.Context) error {
|
||||||
return fmt.Errorf("inserting log entry into database: %w", err)
|
return fmt.Errorf("inserting log entry into database: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !lastTimestamp.IsZero() {
|
|
||||||
m.mu.Lock()
|
|
||||||
m.lastRecordTimestamp = lastTimestamp
|
|
||||||
m.mu.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Info().Msgf("Got %d log entries. New cursor: %q", len(newEntries), cursor)
|
log.Info().Msgf("Got %d log entries. New cursor: %q", len(newEntries), cursor)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -51,7 +51,11 @@ func (s *Server) Ready(w http.ResponseWriter, req *http.Request) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return respond.InternalServerError(err.Error())
|
return respond.InternalServerError(err.Error())
|
||||||
}
|
}
|
||||||
delay := time.Since(ts)
|
t, err := time.Parse(time.RFC3339, ts)
|
||||||
|
if err != nil {
|
||||||
|
return respond.InternalServerError(err.Error())
|
||||||
|
}
|
||||||
|
delay := time.Since(t)
|
||||||
if delay > s.MaxDelay {
|
if delay > s.MaxDelay {
|
||||||
return respond.ServiceUnavailable(fmt.Sprintf("still %s behind", delay))
|
return respond.ServiceUnavailable(fmt.Sprintf("still %s behind", delay))
|
||||||
}
|
}
|
||||||
|
@ -60,45 +64,24 @@ func (s *Server) Ready(w http.ResponseWriter, req *http.Request) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) serve(ctx context.Context, req *http.Request) convreq.HttpResponse {
|
func (s *Server) serve(ctx context.Context, req *http.Request) convreq.HttpResponse {
|
||||||
start := time.Now()
|
delay := time.Since(s.mirror.LastSuccess())
|
||||||
updateMetrics := func(c int) {
|
|
||||||
requestCount.WithLabelValues(fmt.Sprint(c)).Inc()
|
|
||||||
requestLatency.WithLabelValues(fmt.Sprint(c)).Observe(float64(time.Now().Sub(start)) / float64(time.Millisecond))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if the mirror is up to date.
|
|
||||||
ts, err := s.mirror.LastRecordTimestamp(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return respond.InternalServerError(err.Error())
|
|
||||||
}
|
|
||||||
delay := time.Since(ts)
|
|
||||||
if delay > s.MaxDelay {
|
if delay > s.MaxDelay {
|
||||||
// Check LastCompletion and if it's recent enough - that means
|
return respond.ServiceUnavailable(fmt.Sprintf("mirror is %s behind", delay))
|
||||||
// that we're actually caught up and there simply aren't any recent
|
|
||||||
// PLC operations.
|
|
||||||
completionDelay := time.Since(s.mirror.LastCompletion())
|
|
||||||
if completionDelay > s.MaxDelay {
|
|
||||||
updateMetrics(http.StatusServiceUnavailable)
|
|
||||||
return respond.ServiceUnavailable(fmt.Sprintf("mirror is %s behind", delay))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
log := zerolog.Ctx(ctx)
|
log := zerolog.Ctx(ctx)
|
||||||
|
|
||||||
requestedDid := strings.ToLower(strings.TrimPrefix(req.URL.Path, "/"))
|
requestedDid := strings.ToLower(strings.TrimPrefix(req.URL.Path, "/"))
|
||||||
var entry PLCLogEntry
|
var entry PLCLogEntry
|
||||||
err = s.db.Model(&entry).Where("did = ? AND (NOT nullified)", requestedDid).Order("plc_timestamp desc").Limit(1).Take(&entry).Error
|
err := s.db.Model(&entry).Where("did = ? AND (NOT nullified)", requestedDid).Order("plc_timestamp desc").Limit(1).Take(&entry).Error
|
||||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||||
updateMetrics(http.StatusNotFound)
|
|
||||||
return respond.NotFound("unknown DID")
|
return respond.NotFound("unknown DID")
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error().Err(err).Str("did", requestedDid).Msgf("Failed to get the last log entry for %q: %s", requestedDid, err)
|
log.Error().Err(err).Str("did", requestedDid).Msgf("Failed to get the last log entry for %q: %s", requestedDid, err)
|
||||||
updateMetrics(http.StatusInternalServerError)
|
|
||||||
return respond.InternalServerError("failed to get the last log entry")
|
return respond.InternalServerError("failed to get the last log entry")
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, ok := entry.Operation.Value.(plc.Tombstone); ok {
|
if _, ok := entry.Operation.Value.(plc.Tombstone); ok {
|
||||||
updateMetrics(http.StatusNotFound)
|
|
||||||
return respond.NotFound("DID deleted")
|
return respond.NotFound("DID deleted")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -156,7 +139,7 @@ func (s *Server) serve(ctx context.Context, req *http.Request) convreq.HttpRespo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
updateMetrics(http.StatusOK)
|
|
||||||
return respond.JSON(r)
|
return respond.JSON(r)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue