Accumulate incoming messages in a buffered channel

Instead of using a deque, store incoming messages in a native
buffered channel, if buffering is enabled.

In addition, modify the batching algorithm so the enforced delay
between consecutive `addMessages` invocations is applied after
all pending messages are processed. This acts as a "cooldown", rather
than a "warmup". This avoids the need for more complex timing logic to
dispatch batches, removes latency in adding messages when received
infrequently, and natively blocking the goroutine until messages are
received.

Because the message processing loop always performs a blocking read
first, it is appropriate for low-throughput environments just as much as
high-throughput ones.

The default value of batchSize has been changed to 10, with a zero
cooldown. This means that when messages are arriving faster than they
can be inserted into sqlite, they will automatically become batched in
groups of up to 10.
This commit is contained in:
Nick Farrell 2022-12-12 18:31:44 +11:00
parent 6f170b1ad7
commit 09e8fb81b5
No known key found for this signature in database
GPG key ID: 740D3A86CF435835
7 changed files with 140 additions and 168 deletions

View file

@ -195,7 +195,7 @@ const (
type messageCache struct {
db *sql.DB
queue *util.BatchingQueue[*message]
queue chan *message
nop bool
}
@ -208,16 +208,18 @@ func newSqliteCache(filename, startupQueries string, batchSize int, batchTimeout
if err := setupCacheDB(db, startupQueries); err != nil {
return nil, err
}
var queue *util.BatchingQueue[*message]
if batchSize > 0 || batchTimeout > 0 {
queue = util.NewBatchingQueue[*message](batchSize, batchTimeout)
}
cache := &messageCache{
db: db,
queue: queue,
nop: nop,
db: db,
nop: nop,
}
go cache.processMessageBatches()
if batchSize > 0 {
// here, batchSize determines the maximum number of unprocessed messages the server will
// buffer before (briefly) blocking.
cache.queue = make(chan *message, batchSize)
}
// here, batchSize indicates the maximum number of messages which will be inserted into
// the database per transaction.
go cache.processMessageBatches(batchSize, batchTimeout)
return cache, nil
}
@ -242,11 +244,17 @@ func createMemoryFilename() string {
return fmt.Sprintf("file:%s?mode=memory&cache=shared", util.RandomString(10))
}
// AddMessage stores a message to the message cache synchronously, or queues it to be stored at a later date asyncronously.
// The message is queued only if "batchSize" or "batchTimeout" are passed to the constructor.
// AddMessage stores a message to the message cache synchronously, or queues it to be stored at a later date asynchronously.
// The message is queued only if "batchSize" > 0 is passed to the constructor.
func (c *messageCache) AddMessage(m *message) error {
if m.Event != messageEvent {
return errUnexpectedMessageType
}
if c.queue != nil {
c.queue.Enqueue(m)
c.queue <- m
return nil
}
if c.nop {
return nil
}
return c.addMessages([]*message{m})
@ -255,9 +263,6 @@ func (c *messageCache) AddMessage(m *message) error {
// addMessages synchronously stores a match of messages. If the database is locked, the transaction waits until
// SQLite's busy_timeout is exceeded before erroring out.
func (c *messageCache) addMessages(ms []*message) error {
if c.nop {
return nil
}
if len(ms) == 0 {
return nil
}
@ -273,9 +278,6 @@ func (c *messageCache) addMessages(ms []*message) error {
}
defer stmt.Close()
for _, m := range ms {
if m.Event != messageEvent {
return errUnexpectedMessageType
}
published := m.Time <= time.Now().Unix()
tags := strings.Join(m.Tags, ",")
var attachmentName, attachmentType, attachmentURL string
@ -460,13 +462,46 @@ func (c *messageCache) AttachmentBytesUsed(sender string) (int64, error) {
return size, nil
}
func (c *messageCache) processMessageBatches() {
func (c *messageCache) processMessageBatches(batchSize int, batchTimeout time.Duration) {
if c.queue == nil {
return
}
for messages := range c.queue.Dequeue() {
// initialise the array once to avoid needing to recreate it each iteration
var messagebuffer []*message = make([]*message, batchSize)
var bufferRemainingCapacityOnPreviousIteration int
for {
messages := messagebuffer[:0]
// To increase the efficiency of database insertions, optionally
// delay processing the incoming message stream for a short period,
// unless the previous batch insertion held the maximum allowable
// number of messages.
if batchTimeout > 0 && bufferRemainingCapacityOnPreviousIteration > 0 {
time.Sleep(batchTimeout)
}
// Perform a blocking read; until at least one message
// is pending, there is nothing to do.
messages = append(messages, <-c.queue)
retrieve_messages_from_channel:
for {
select {
case message := <-c.queue:
messages = append(messages, message)
if batchSize == len(messages) {
// no more room in the messagebuffer.
break retrieve_messages_from_channel
}
default:
// no more incoming messages.
break retrieve_messages_from_channel
}
}
bufferRemainingCapacityOnPreviousIteration = batchSize - len(messages)
if err := c.addMessages(messages); err != nil {
log.Error("Cache: %s", err.Error())
log.Error("processMessageBatches: %s", err.Error())
}
}
}