Add tests, and proper rate

pull/707/head
binwiederhier 2023-04-21 11:09:13 -04:00
parent 6be95f8285
commit 91d2603fe0
4 changed files with 97 additions and 18 deletions

View File

@ -1169,6 +1169,7 @@ and the [ntfy Android app](https://github.com/binwiederhier/ntfy-android/release
**Features:**
* [ntfy CLI](subscribe/cli.md) (`ntfy publish` and `ntfy subscribe` only) can now be installed via Homebrew (thanks to [@Moulick](https://github.com/Moulick))
* Added `v1/stats` endpoint to expose messages stats (no ticket)
**Bug fixes + maintenance:**

View File

@ -573,13 +573,15 @@ func (s *Server) handleDocs(w http.ResponseWriter, r *http.Request, _ *visitor)
// handleStats returns the publicly available server stats
func (s *Server) handleStats(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
s.mu.RLock()
n := len(s.messagesHistory)
rate := float64(s.messagesHistory[n-1]-s.messagesHistory[0]) / (float64(n-1) * s.config.ManagerInterval.Seconds())
response := &apiStatsResponse{
Messages: s.messages,
MessagesRate: rate,
messages, n, rate := s.messages, len(s.messagesHistory), float64(0)
if n > 1 {
rate = float64(s.messagesHistory[n-1]-s.messagesHistory[0]) / (float64(n-1) * s.config.ManagerInterval.Seconds())
}
s.mu.RUnlock()
response := &apiStatsResponse{
Messages: messages,
MessagesRate: rate,
}
return s.writeJSON(w, response)
}
@ -1847,3 +1849,17 @@ func (s *Server) writeJSON(w http.ResponseWriter, v any) error {
}
return nil
}
func (s *Server) updateAndWriteStats(messagesCount int64) {
s.mu.Lock()
s.messagesHistory = append(s.messagesHistory, messagesCount)
if len(s.messagesHistory) > messagesHistoryMax {
s.messagesHistory = s.messagesHistory[1:]
}
s.mu.Unlock()
go func() {
if err := s.messageCache.UpdateStats(messagesCount); err != nil {
log.Tag(tagManager).Err(err).Warn("Cannot write messages stats")
}
}()
}

View File

@ -76,6 +76,11 @@ func (s *Server) execManager() {
s.mu.RLock()
messagesCount, topicsCount, visitorsCount := s.messages, len(s.topics), len(s.visitors)
s.mu.RUnlock()
// Update stats
s.updateAndWriteStats(messagesCount)
// Log stats
log.
Tag(tagManager).
Fields(log.Context{
@ -98,19 +103,6 @@ func (s *Server) execManager() {
mset(metricUsers, usersCount)
mset(metricSubscribers, subscribers)
mset(metricTopics, topicsCount)
// Write stats
s.mu.Lock()
s.messagesHistory = append(s.messagesHistory, messagesCount)
if len(s.messagesHistory) > messagesHistoryMax {
s.messagesHistory = s.messagesHistory[1:]
}
s.mu.Unlock()
go func() {
if err := s.messageCache.UpdateStats(messagesCount); err != nil {
log.Tag(tagManager).Err(err).Warn("Cannot write messages stats")
}
}()
}
func (s *Server) pruneVisitors() {

View File

@ -2399,6 +2399,76 @@ func TestServer_SubscriberRateLimiting_ProtectedTopics_WithDefaultReadWrite(t *t
require.Nil(t, s.topics["announcements"].rateVisitor)
}
func TestServer_MessageHistoryAndStatsEndpoint(t *testing.T) {
c := newTestConfig(t)
c.ManagerInterval = 2 * time.Second
s := newTestServer(t, c)
// Publish some messages, and get stats
for i := 0; i < 5; i++ {
response := request(t, s, "POST", "/mytopic", "some message", nil)
require.Equal(t, 200, response.Code)
}
require.Equal(t, int64(5), s.messages)
require.Equal(t, []int64{0}, s.messagesHistory)
response := request(t, s, "GET", "/v1/stats", "", nil)
require.Equal(t, 200, response.Code)
require.Equal(t, `{"messages":5,"messages_rate":0}`+"\n", response.Body.String())
// Run manager and see message history update
s.execManager()
require.Equal(t, []int64{0, 5}, s.messagesHistory)
response = request(t, s, "GET", "/v1/stats", "", nil)
require.Equal(t, 200, response.Code)
require.Equal(t, `{"messages":5,"messages_rate":2.5}`+"\n", response.Body.String()) // 5 messages in 2 seconds = 2.5 messages per second
// Publish some more messages
for i := 0; i < 10; i++ {
response := request(t, s, "POST", "/mytopic", "some message", nil)
require.Equal(t, 200, response.Code)
}
require.Equal(t, int64(15), s.messages)
require.Equal(t, []int64{0, 5}, s.messagesHistory)
response = request(t, s, "GET", "/v1/stats", "", nil)
require.Equal(t, 200, response.Code)
require.Equal(t, `{"messages":15,"messages_rate":2.5}`+"\n", response.Body.String()) // Rate did not update yet
// Run manager and see message history update
s.execManager()
require.Equal(t, []int64{0, 5, 15}, s.messagesHistory)
response = request(t, s, "GET", "/v1/stats", "", nil)
require.Equal(t, 200, response.Code)
require.Equal(t, `{"messages":15,"messages_rate":3.75}`+"\n", response.Body.String()) // 15 messages in 4 seconds = 3.75 messages per second
}
func TestServer_MessageHistoryMaxSize(t *testing.T) {
s := newTestServer(t, newTestConfig(t))
for i := 0; i < 20; i++ {
s.messages = int64(i)
s.execManager()
}
require.Equal(t, []int64{10, 11, 12, 13, 14, 15, 16, 17, 18, 19}, s.messagesHistory)
}
func TestServer_MessageCountPersistence(t *testing.T) {
c := newTestConfig(t)
s := newTestServer(t, c)
s.messages = 1234
s.execManager()
waitFor(t, func() bool {
messages, err := s.messageCache.Stats()
require.Nil(t, err)
return messages == 1234
})
s = newTestServer(t, c)
require.Equal(t, int64(1234), s.messages)
}
func newTestConfig(t *testing.T) *Config {
conf := NewConfig()
conf.BaseURL = "http://127.0.0.1:12345"