Avoid blocking incoming messages.

Through the additional of a secondary signalling channel,
whenever there are sufficient messages in the queue to fill a "batch",
the processing goroutine will be immediately woken. Because the
buffered channel has double that capacity, requests should never be
delayed unless the server is actually overloaded.

This preserves the improvements of the previous commit, meaning that,
even when a batch size of 10 and a batch delay of 1s is used:
- there is no background load; if no messages are received, the server
  is dormant;
- if no message was received in the last second, a new message will be
  immediately processed;
- if a small number of messages are received after the first, the
  additional messages will be collected and processed in a single
  transaction after the configured delay;
- if enough new messages arrive to fill a batch (ie 10 in this example),
  they will be immediately processed, freeing up capacity for more
  messages;
- if up to double the configured number of messages arrive in a burst,
  there will be sufficient capacity to cache them immediately,
  regardless of how slowly the mysql server commits each transaction
This commit is contained in:
Nick Farrell 2022-12-17 17:26:33 +11:00
parent 09e8fb81b5
commit 9f2311b98e
No known key found for this signature in database
GPG key ID: 740D3A86CF435835
2 changed files with 28 additions and 20 deletions

View file

@ -196,6 +196,7 @@ const (
type messageCache struct {
db *sql.DB
queue chan *message
wakeupSignal chan bool
nop bool
}
@ -214,8 +215,9 @@ func newSqliteCache(filename, startupQueries string, batchSize int, batchTimeout
}
if batchSize > 0 {
// here, batchSize determines the maximum number of unprocessed messages the server will
// buffer before (briefly) blocking.
cache.queue = make(chan *message, batchSize)
// buffer before a wakeupSignal is issued. Our channel actually has double that capacity.
cache.queue = make(chan *message, batchSize*2)
cache.wakeupSignal = make(chan bool)
}
// here, batchSize indicates the maximum number of messages which will be inserted into
// the database per transaction.
@ -252,6 +254,13 @@ func (c *messageCache) AddMessage(m *message) error {
}
if c.queue != nil {
c.queue <- m
// queue is over 50% full, so wake up the processor, if necessary
if len(c.queue) > cap(c.queue)/2 {
select {
case c.wakeupSignal <- true:
default:
}
}
return nil
}
if c.nop {
@ -468,17 +477,14 @@ func (c *messageCache) processMessageBatches(batchSize int, batchTimeout time.Du
}
// initialise the array once to avoid needing to recreate it each iteration
var messagebuffer []*message = make([]*message, batchSize)
var bufferRemainingCapacityOnPreviousIteration int
for {
messages := messagebuffer[:0]
// To increase the efficiency of database insertions, optionally
// delay processing the incoming message stream for a short period,
// unless the previous batch insertion held the maximum allowable
// number of messages.
if batchTimeout > 0 && bufferRemainingCapacityOnPreviousIteration > 0 {
time.Sleep(batchTimeout)
}
// number of messages. This can be interrupted if something is found
// on the wakeupSignal channel.
// Perform a blocking read; until at least one message
// is pending, there is nothing to do.
@ -498,11 +504,17 @@ func (c *messageCache) processMessageBatches(batchSize int, batchTimeout time.Du
break retrieve_messages_from_channel
}
}
bufferRemainingCapacityOnPreviousIteration = batchSize - len(messages)
if err := c.addMessages(messages); err != nil {
log.Error("processMessageBatches: %s", err.Error())
}
if batchTimeout > 0 {
select {
case <-time.After(batchTimeout):
case <-c.wakeupSignal:
}
}
}
}

View file

@ -76,18 +76,14 @@ func TestBufferedCacheFlushBehaviour(t *testing.T) {
require.Nil(t, err)
require.Equal(t, 2, counts["mytopic"])
// Add an extra message. Because the buffered queue is at capacity, this should block
// this goroutine until the cooldown period has expired, and at least one of the pending
// messages has been read from the channel.
// Add an extra message.
require.Nil(t, c.AddMessage(newDefaultMessage("mytopic", "my example message")))
require.Greater(t, time.Since(t1), cooldown/3)
// Because the channel was full, there should not be a cooldown, and our new message should
// be processed without delay
time.Sleep(cooldown / 3)
// Because we have more than `queueSize` elements in the channel, the processing
// goroutine should be immediately woken, processing everything very quickly.
time.Sleep(cooldown / 6)
counts, err = c.MessageCounts()
require.Nil(t, err)
require.Equal(t, 3+queueSize, counts["mytopic"])
require.GreaterOrEqual(t, 2+queueSize, counts["mytopic"])
}
func testCacheMessages(t *testing.T, c *messageCache) {