From 9e9caee639ac92450f462dd05af0b5b389bbd835 Mon Sep 17 00:00:00 2001 From: binwiederhier Date: Fri, 27 Jan 2023 09:59:16 -0500 Subject: [PATCH] (Hopefully) remove statsQueue races --- server/server.go | 2 +- server/visitor.go | 17 ++++++++--------- user/manager.go | 20 ++++++++++---------- user/manager_test.go | 8 ++++---- 4 files changed, 23 insertions(+), 24 deletions(-) diff --git a/server/server.go b/server/server.go index 158c4d40..267fc15e 100644 --- a/server/server.go +++ b/server/server.go @@ -599,7 +599,7 @@ func (s *Server) handlePublishWithoutResponse(r *http.Request, v *visitor) (*mes } v.IncrementMessages() if s.userManager != nil && v.user != nil { - s.userManager.EnqueueStats(v.user) // FIXME this makes no sense for tier-less users + s.userManager.EnqueueStats(v.user.ID, v.Stats()) // FIXME this makes no sense for tier-less users } s.mu.Lock() s.messages++ diff --git a/server/visitor.go b/server/visitor.go index b8f89533..0a0793f9 100644 --- a/server/visitor.go +++ b/server/visitor.go @@ -232,17 +232,20 @@ func (v *visitor) IncrementMessages() { v.mu.Lock() defer v.mu.Unlock() v.messages++ - if v.user != nil { - v.user.Stats.Messages = v.messages - } } func (v *visitor) IncrementEmails() { v.mu.Lock() defer v.mu.Unlock() v.emails++ - if v.user != nil { - v.user.Stats.Emails = v.emails +} + +func (v *visitor) Stats() *user.Stats { + v.mu.Lock() + defer v.mu.Unlock() + return &user.Stats{ + Messages: v.messages, + Emails: v.emails, } } @@ -254,10 +257,6 @@ func (v *visitor) ResetStats() { if v.messagesLimiter != nil { v.messagesLimiter.Reset() } - if v.user != nil { - v.user.Stats.Messages = 0 - v.user.Stats.Emails = 0 - } } // SetUser sets the visitors user to the given value diff --git a/user/manager.go b/user/manager.go index 7ecba73b..600bcedc 100644 --- a/user/manager.go +++ b/user/manager.go @@ -292,8 +292,8 @@ const ( // in a SQLite database. type Manager struct { db *sql.DB - defaultAccess Permission // Default permission if no ACL matches - statsQueue map[string]*User // Username -> User, for "unimportant" user updates + defaultAccess Permission // Default permission if no ACL matches + statsQueue map[string]*Stats // "Queue" to asynchronously write user stats to the database (UserID -> Stats) mu sync.Mutex } @@ -319,7 +319,7 @@ func newManager(filename, startupQueries string, defaultAccess Permission, stats manager := &Manager{ db: db, defaultAccess: defaultAccess, - statsQueue: make(map[string]*User), + statsQueue: make(map[string]*Stats), } go manager.userStatsQueueWriter(statsWriterInterval) return manager, nil @@ -464,16 +464,16 @@ func (a *Manager) ResetStats() error { if _, err := a.db.Exec(updateUserStatsResetAllQuery); err != nil { return err } - a.statsQueue = make(map[string]*User) + a.statsQueue = make(map[string]*Stats) return nil } // EnqueueStats adds the user to a queue which writes out user stats (messages, emails, ..) in // batches at a regular interval -func (a *Manager) EnqueueStats(user *User) { +func (a *Manager) EnqueueStats(userID string, stats *Stats) { a.mu.Lock() defer a.mu.Unlock() - a.statsQueue[user.ID] = user + a.statsQueue[userID] = stats } func (a *Manager) userStatsQueueWriter(interval time.Duration) { @@ -493,7 +493,7 @@ func (a *Manager) writeUserStatsQueue() error { return nil } statsQueue := a.statsQueue - a.statsQueue = make(map[string]*User) + a.statsQueue = make(map[string]*Stats) a.mu.Unlock() tx, err := a.db.Begin() if err != nil { @@ -501,9 +501,9 @@ func (a *Manager) writeUserStatsQueue() error { } defer tx.Rollback() log.Debug("User Manager: Writing user stats queue for %d user(s)", len(statsQueue)) - for userID, u := range statsQueue { - log.Trace("User Manager: Updating stats for user %s: messages=%d, emails=%d", userID, u.Stats.Messages, u.Stats.Emails) - if _, err := tx.Exec(updateUserStatsQuery, u.Stats.Messages, u.Stats.Emails, userID); err != nil { + for userID, update := range statsQueue { + log.Trace("User Manager: Updating stats for user %s: messages=%d, emails=%d", userID, update.Messages, update.Emails) + if _, err := tx.Exec(updateUserStatsQuery, update.Messages, update.Emails, userID); err != nil { return err } } diff --git a/user/manager_test.go b/user/manager_test.go index 2bc0492f..335cdf87 100644 --- a/user/manager_test.go +++ b/user/manager_test.go @@ -554,10 +554,10 @@ func TestManager_EnqueueStats(t *testing.T) { require.Nil(t, err) require.Equal(t, int64(0), u.Stats.Messages) require.Equal(t, int64(0), u.Stats.Emails) - - u.Stats.Messages = 11 - u.Stats.Emails = 2 - a.EnqueueStats(u) + a.EnqueueStats(u.ID, &Stats{ + Messages: 11, + Emails: 2, + }) // Still no change, because it's queued asynchronously u, err = a.User("ben")