Add a target for waiting until PLC mirror catches up
parent
c6fe0c2350
commit
c5f3a55ac8
8
Makefile
8
Makefile
|
@ -28,7 +28,13 @@ status:
|
||||||
@docker compose stats
|
@docker compose stats
|
||||||
|
|
||||||
logs:
|
logs:
|
||||||
@docker compose logs -f -n 50 lister consumer record-indexer
|
@docker compose logs -f -n 50
|
||||||
|
|
||||||
|
start-plc: .env
|
||||||
|
@docker compose up -d --build postgres plc
|
||||||
|
|
||||||
|
wait-for-plc:
|
||||||
|
@. ./.env && while ! curl -sf http://$${METRICS_ADDR:-localhost}:11004/ready; do sleep 10; done
|
||||||
|
|
||||||
# ---------------------------- Docker ----------------------------
|
# ---------------------------- Docker ----------------------------
|
||||||
|
|
||||||
|
|
|
@ -72,6 +72,7 @@ func runMain(ctx context.Context) error {
|
||||||
return fmt.Errorf("failed to create server: %w", err)
|
return fmt.Errorf("failed to create server: %w", err)
|
||||||
}
|
}
|
||||||
http.Handle("/", server)
|
http.Handle("/", server)
|
||||||
|
http.HandleFunc("/ready", server.Ready)
|
||||||
|
|
||||||
log.Info().Msgf("Starting HTTP listener on %q...", config.MetricsPort)
|
log.Info().Msgf("Starting HTTP listener on %q...", config.MetricsPort)
|
||||||
http.Handle("/metrics", promhttp.Handler())
|
http.Handle("/metrics", promhttp.Handler())
|
||||||
|
|
|
@ -89,6 +89,12 @@ func (m *Mirror) LastSuccess() time.Time {
|
||||||
return m.lastSuccessTimestamp
|
return m.lastSuccessTimestamp
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *Mirror) LastRecordTimestamp(ctx context.Context) (string, error) {
|
||||||
|
ts := ""
|
||||||
|
err := m.db.WithContext(ctx).Model(&PLCLogEntry{}).Select("plc_timestamp").Order("plc_timestamp desc").Limit(1).Take(&ts).Error
|
||||||
|
return ts, err
|
||||||
|
}
|
||||||
|
|
||||||
func (m *Mirror) runOnce(ctx context.Context) error {
|
func (m *Mirror) runOnce(ctx context.Context) error {
|
||||||
log := zerolog.Ctx(ctx)
|
log := zerolog.Ctx(ctx)
|
||||||
|
|
||||||
|
|
|
@ -45,6 +45,24 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||||
s.handler(w, req)
|
s.handler(w, req)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Server) Ready(w http.ResponseWriter, req *http.Request) {
|
||||||
|
convreq.Wrap(func(ctx context.Context) convreq.HttpResponse {
|
||||||
|
ts, err := s.mirror.LastRecordTimestamp(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return respond.InternalServerError(err.Error())
|
||||||
|
}
|
||||||
|
t, err := time.Parse(time.RFC3339, ts)
|
||||||
|
if err != nil {
|
||||||
|
return respond.InternalServerError(err.Error())
|
||||||
|
}
|
||||||
|
delay := time.Since(t)
|
||||||
|
if delay > s.MaxDelay {
|
||||||
|
return respond.ServiceUnavailable(fmt.Sprintf("still %s behind", delay))
|
||||||
|
}
|
||||||
|
return respond.String("OK")
|
||||||
|
})(w, req)
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Server) serve(ctx context.Context, req *http.Request) convreq.HttpResponse {
|
func (s *Server) serve(ctx context.Context, req *http.Request) convreq.HttpResponse {
|
||||||
delay := time.Since(s.mirror.LastSuccess())
|
delay := time.Since(s.mirror.LastSuccess())
|
||||||
if delay > s.MaxDelay {
|
if delay > s.MaxDelay {
|
||||||
|
|
Loading…
Reference in New Issue