diff --git a/server/message_cache.go b/server/message_cache.go index 48f43ef0..eb65dff4 100644 --- a/server/message_cache.go +++ b/server/message_cache.go @@ -194,9 +194,10 @@ const ( ) type messageCache struct { - db *sql.DB - queue chan *message - nop bool + db *sql.DB + queue chan *message + wakeupSignal chan bool + nop bool } // newSqliteCache creates a SQLite file-backed cache @@ -214,8 +215,9 @@ func newSqliteCache(filename, startupQueries string, batchSize int, batchTimeout } if batchSize > 0 { // here, batchSize determines the maximum number of unprocessed messages the server will - // buffer before (briefly) blocking. - cache.queue = make(chan *message, batchSize) + // buffer before a wakeupSignal is issued. Our channel actually has double that capacity. + cache.queue = make(chan *message, batchSize*2) + cache.wakeupSignal = make(chan bool) } // here, batchSize indicates the maximum number of messages which will be inserted into // the database per transaction. @@ -252,6 +254,13 @@ func (c *messageCache) AddMessage(m *message) error { } if c.queue != nil { c.queue <- m + // queue is over 50% full, so wake up the processor, if necessary + if len(c.queue) > cap(c.queue)/2 { + select { + case c.wakeupSignal <- true: + default: + } + } return nil } if c.nop { @@ -468,17 +477,14 @@ func (c *messageCache) processMessageBatches(batchSize int, batchTimeout time.Du } // initialise the array once to avoid needing to recreate it each iteration var messagebuffer []*message = make([]*message, batchSize) - var bufferRemainingCapacityOnPreviousIteration int for { messages := messagebuffer[:0] // To increase the efficiency of database insertions, optionally // delay processing the incoming message stream for a short period, // unless the previous batch insertion held the maximum allowable - // number of messages. - if batchTimeout > 0 && bufferRemainingCapacityOnPreviousIteration > 0 { - time.Sleep(batchTimeout) - } + // number of messages. This can be interrupted if something is found + // on the wakeupSignal channel. // Perform a blocking read; until at least one message // is pending, there is nothing to do. @@ -498,11 +504,17 @@ func (c *messageCache) processMessageBatches(batchSize int, batchTimeout time.Du break retrieve_messages_from_channel } } - bufferRemainingCapacityOnPreviousIteration = batchSize - len(messages) if err := c.addMessages(messages); err != nil { log.Error("processMessageBatches: %s", err.Error()) } + + if batchTimeout > 0 { + select { + case <-time.After(batchTimeout): + case <-c.wakeupSignal: + } + } } } diff --git a/server/message_cache_test.go b/server/message_cache_test.go index 571b6eb3..4bd3f423 100644 --- a/server/message_cache_test.go +++ b/server/message_cache_test.go @@ -76,18 +76,14 @@ func TestBufferedCacheFlushBehaviour(t *testing.T) { require.Nil(t, err) require.Equal(t, 2, counts["mytopic"]) - // Add an extra message. Because the buffered queue is at capacity, this should block - // this goroutine until the cooldown period has expired, and at least one of the pending - // messages has been read from the channel. + // Add an extra message. require.Nil(t, c.AddMessage(newDefaultMessage("mytopic", "my example message"))) - require.Greater(t, time.Since(t1), cooldown/3) - - // Because the channel was full, there should not be a cooldown, and our new message should - // be processed without delay - time.Sleep(cooldown / 3) + // Because we have more than `queueSize` elements in the channel, the processing + // goroutine should be immediately woken, processing everything very quickly. + time.Sleep(cooldown / 6) counts, err = c.MessageCounts() require.Nil(t, err) - require.Equal(t, 3+queueSize, counts["mytopic"]) + require.GreaterOrEqual(t, 2+queueSize, counts["mytopic"]) } func testCacheMessages(t *testing.T, c *messageCache) {