fix: keep the timestamp of last record in memory instead of querying the DB every time
parent
80128ebcff
commit
04f2f80a06
|
@ -38,6 +38,7 @@ type Mirror struct {
|
|||
|
||||
mu sync.RWMutex
|
||||
lastSuccessTimestamp time.Time
|
||||
lastRecordTimestamp time.Time
|
||||
}
|
||||
|
||||
func NewMirror(ctx context.Context, upstream string, db *gorm.DB) (*Mirror, error) {
|
||||
|
@ -91,10 +92,33 @@ func (m *Mirror) LastSuccess() time.Time {
|
|||
return m.lastSuccessTimestamp
|
||||
}
|
||||
|
||||
func (m *Mirror) LastRecordTimestamp(ctx context.Context) (string, error) {
|
||||
func (m *Mirror) LastRecordTimestamp(ctx context.Context) (time.Time, error) {
|
||||
m.mu.RLock()
|
||||
t := m.lastRecordTimestamp
|
||||
m.mu.RUnlock()
|
||||
if !t.IsZero() {
|
||||
return t, nil
|
||||
}
|
||||
|
||||
ts := ""
|
||||
err := m.db.WithContext(ctx).Model(&PLCLogEntry{}).Select("plc_timestamp").Order("plc_timestamp desc").Limit(1).Take(&ts).Error
|
||||
return ts, err
|
||||
if err != nil {
|
||||
return time.Time{}, err
|
||||
}
|
||||
dbTimestamp, err := time.Parse(time.RFC3339, ts)
|
||||
if err != nil {
|
||||
return time.Time{}, fmt.Errorf("parsing timestamp %q: %w", ts, err)
|
||||
}
|
||||
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
if t.IsZero() {
|
||||
t = dbTimestamp
|
||||
}
|
||||
if t.After(dbTimestamp) {
|
||||
return t, nil
|
||||
}
|
||||
return dbTimestamp, nil
|
||||
}
|
||||
|
||||
func (m *Mirror) runOnce(ctx context.Context) error {
|
||||
|
@ -138,6 +162,8 @@ func (m *Mirror) runOnce(ctx context.Context) error {
|
|||
decoder := json.NewDecoder(resp.Body)
|
||||
oldCursor := cursor
|
||||
|
||||
var lastTimestamp time.Time
|
||||
|
||||
for {
|
||||
var entry plc.OperationLogEntry
|
||||
err := decoder.Decode(&entry)
|
||||
|
@ -155,6 +181,7 @@ func (m *Mirror) runOnce(ctx context.Context) error {
|
|||
t, err := time.Parse(time.RFC3339, row.PLCTimestamp)
|
||||
if err == nil {
|
||||
lastEventTimestamp.Set(float64(t.Unix()))
|
||||
lastTimestamp = t
|
||||
} else {
|
||||
log.Warn().Msgf("Failed to parse %q: %s", row.PLCTimestamp, err)
|
||||
}
|
||||
|
@ -174,6 +201,12 @@ func (m *Mirror) runOnce(ctx context.Context) error {
|
|||
return fmt.Errorf("inserting log entry into database: %w", err)
|
||||
}
|
||||
|
||||
if !lastTimestamp.IsZero() {
|
||||
m.mu.Lock()
|
||||
m.lastRecordTimestamp = lastTimestamp
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
log.Info().Msgf("Got %d log entries. New cursor: %q", len(newEntries), cursor)
|
||||
}
|
||||
return nil
|
||||
|
|
|
@ -51,11 +51,7 @@ func (s *Server) Ready(w http.ResponseWriter, req *http.Request) {
|
|||
if err != nil {
|
||||
return respond.InternalServerError(err.Error())
|
||||
}
|
||||
t, err := time.Parse(time.RFC3339, ts)
|
||||
if err != nil {
|
||||
return respond.InternalServerError(err.Error())
|
||||
}
|
||||
delay := time.Since(t)
|
||||
delay := time.Since(ts)
|
||||
if delay > s.MaxDelay {
|
||||
return respond.ServiceUnavailable(fmt.Sprintf("still %s behind", delay))
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue