Fix m.Expires and prune stale topics based on lastVisitorExpires

pull/609/head
Karmanyaah Malhotra 2023-02-14 14:00:43 -06:00
parent 28b654ae27
commit fb2fa4c478
1 changed files with 6 additions and 3 deletions

View File

@ -622,7 +622,7 @@ func (s *Server) handlePublishWithoutResponse(r *http.Request, v *visitor) (*mes
} }
m.Sender = v.IP() m.Sender = v.IP()
m.User = v.MaybeUserID() m.User = v.MaybeUserID()
m.Expires = time.Now().Add(v.Limits().MessageExpiryDuration).Unix() m.Expires = time.Unix(m.Time, 0).Add(v.Limits().MessageExpiryDuration).Unix()
if err := s.handlePublishBody(r, v, m, body, unifiedpush); err != nil { if err := s.handlePublishBody(r, v, m, body, unifiedpush); err != nil {
return nil, err return nil, err
} }
@ -666,6 +666,8 @@ func (s *Server) handlePublishWithoutResponse(r *http.Request, v *visitor) (*mes
if err := s.messageCache.AddMessage(m); err != nil { if err := s.messageCache.AddMessage(m); err != nil {
return nil, err return nil, err
} }
} else {
m.Expires = m.Time
} }
u := v.User() u := v.User()
if s.userManager != nil && u != nil && u.Tier != nil { if s.userManager != nil && u != nil && u.Tier != nil {
@ -1404,9 +1406,10 @@ func (s *Server) execManager() {
defer s.mu.Unlock() defer s.mu.Unlock()
for _, t := range s.topics { for _, t := range s.topics {
subs := t.SubscribersCount() subs := t.SubscribersCount()
log.Tag(tagManager).Trace("- topic %s: %d subscribers", t.ID, subs) expiryTime := time.Until(t.lastVisitorExpires)
log.Tag(tagManager).Trace("- topic %s: %d subscribers, expires in %s", t.ID, subs, expiryTime)
msgs, exists := messageCounts[t.ID] msgs, exists := messageCounts[t.ID]
if subs == 0 && (!exists || msgs == 0) { if t.Stale() && (!exists || msgs == 0) {
log.Tag(tagManager).Trace("Deleting empty topic %s", t.ID) log.Tag(tagManager).Trace("Deleting empty topic %s", t.ID)
emptyTopics++ emptyTopics++
delete(s.topics, t.ID) delete(s.topics, t.ID)