fix: s/LastSuccess/LastCompletion/ and use it as orignally intended
parent
04ab6167c7
commit
bad22e6422
|
@ -37,7 +37,7 @@ type Mirror struct {
|
||||||
limiter *rate.Limiter
|
limiter *rate.Limiter
|
||||||
|
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
lastSuccessTimestamp time.Time
|
lastCompletionTimestamp time.Time
|
||||||
lastRecordTimestamp time.Time
|
lastRecordTimestamp time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -78,7 +78,7 @@ func (m *Mirror) run(ctx context.Context) {
|
||||||
} else {
|
} else {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
m.mu.Lock()
|
m.mu.Lock()
|
||||||
m.lastSuccessTimestamp = now
|
m.lastCompletionTimestamp = now
|
||||||
m.mu.Unlock()
|
m.mu.Unlock()
|
||||||
}
|
}
|
||||||
time.Sleep(10 * time.Second)
|
time.Sleep(10 * time.Second)
|
||||||
|
@ -86,10 +86,10 @@ func (m *Mirror) run(ctx context.Context) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Mirror) LastSuccess() time.Time {
|
func (m *Mirror) LastCompletion() time.Time {
|
||||||
m.mu.RLock()
|
m.mu.RLock()
|
||||||
defer m.mu.RUnlock()
|
defer m.mu.RUnlock()
|
||||||
return m.lastSuccessTimestamp
|
return m.lastCompletionTimestamp
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Mirror) LastRecordTimestamp(ctx context.Context) (time.Time, error) {
|
func (m *Mirror) LastRecordTimestamp(ctx context.Context) (time.Time, error) {
|
||||||
|
@ -201,12 +201,11 @@ func (m *Mirror) runOnce(ctx context.Context) error {
|
||||||
return fmt.Errorf("inserting log entry into database: %w", err)
|
return fmt.Errorf("inserting log entry into database: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
m.mu.Lock()
|
|
||||||
m.lastSuccessTimestamp = time.Now()
|
|
||||||
if !lastTimestamp.IsZero() {
|
if !lastTimestamp.IsZero() {
|
||||||
|
m.mu.Lock()
|
||||||
m.lastRecordTimestamp = lastTimestamp
|
m.lastRecordTimestamp = lastTimestamp
|
||||||
}
|
|
||||||
m.mu.Unlock()
|
m.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
log.Info().Msgf("Got %d log entries. New cursor: %q", len(newEntries), cursor)
|
log.Info().Msgf("Got %d log entries. New cursor: %q", len(newEntries), cursor)
|
||||||
}
|
}
|
||||||
|
|
|
@ -66,15 +66,22 @@ func (s *Server) serve(ctx context.Context, req *http.Request) convreq.HttpRespo
|
||||||
requestLatency.WithLabelValues(fmt.Sprint(c)).Observe(float64(time.Now().Sub(start)) / float64(time.Millisecond))
|
requestLatency.WithLabelValues(fmt.Sprint(c)).Observe(float64(time.Now().Sub(start)) / float64(time.Millisecond))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check if the mirror is up to date.
|
||||||
ts, err := s.mirror.LastRecordTimestamp(ctx)
|
ts, err := s.mirror.LastRecordTimestamp(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return respond.InternalServerError(err.Error())
|
return respond.InternalServerError(err.Error())
|
||||||
}
|
}
|
||||||
delay := time.Since(ts)
|
delay := time.Since(ts)
|
||||||
if delay > s.MaxDelay {
|
if delay > s.MaxDelay {
|
||||||
|
// Check LastCompletion and if it's recent enough - that means
|
||||||
|
// that we're actually caught up and there simply aren't any recent
|
||||||
|
// PLC operations.
|
||||||
|
completionDelay := time.Since(s.mirror.LastCompletion())
|
||||||
|
if completionDelay > s.MaxDelay {
|
||||||
updateMetrics(http.StatusServiceUnavailable)
|
updateMetrics(http.StatusServiceUnavailable)
|
||||||
return respond.ServiceUnavailable(fmt.Sprintf("mirror is %s behind", delay))
|
return respond.ServiceUnavailable(fmt.Sprintf("mirror is %s behind", delay))
|
||||||
}
|
}
|
||||||
|
}
|
||||||
log := zerolog.Ctx(ctx)
|
log := zerolog.Ctx(ctx)
|
||||||
|
|
||||||
requestedDid := strings.ToLower(strings.TrimPrefix(req.URL.Path, "/"))
|
requestedDid := strings.ToLower(strings.TrimPrefix(req.URL.Path, "/"))
|
||||||
|
|
Loading…
Reference in New Issue