Fix race condition making it possible for batches to be >batchSize

pull/502/head
Philipp Heckel 2022-11-16 11:16:07 -05:00
parent e147a41f92
commit db9ca80b69
2 changed files with 9 additions and 6 deletions

View File

@ -48,10 +48,13 @@ func NewBatchingQueue[T any](batchSize int, timeout time.Duration) *BatchingQueu
func (q *BatchingQueue[T]) Enqueue(element T) { func (q *BatchingQueue[T]) Enqueue(element T) {
q.mu.Lock() q.mu.Lock()
q.in = append(q.in, element) q.in = append(q.in, element)
limitReached := len(q.in) == q.batchSize var elements []T
if len(q.in) == q.batchSize {
elements = q.dequeueAll()
}
q.mu.Unlock() q.mu.Unlock()
if limitReached { if len(elements) > 0 {
q.out <- q.dequeueAll() q.out <- elements
} }
} }
@ -61,8 +64,6 @@ func (q *BatchingQueue[T]) Dequeue() <-chan []T {
} }
func (q *BatchingQueue[T]) dequeueAll() []T { func (q *BatchingQueue[T]) dequeueAll() []T {
q.mu.Lock()
defer q.mu.Unlock()
elements := make([]T, len(q.in)) elements := make([]T, len(q.in))
copy(elements, q.in) copy(elements, q.in)
q.in = q.in[:0] q.in = q.in[:0]
@ -75,7 +76,9 @@ func (q *BatchingQueue[T]) timeoutTicker() {
} }
ticker := time.NewTicker(q.timeout) ticker := time.NewTicker(q.timeout)
for range ticker.C { for range ticker.C {
q.mu.Lock()
elements := q.dequeueAll() elements := q.dequeueAll()
q.mu.Unlock()
if len(elements) > 0 { if len(elements) > 0 {
q.out <- elements q.out <- elements
} }

View File

@ -24,7 +24,7 @@ func TestBatchingQueue_InfTimeout(t *testing.T) {
for i := 0; i < 101; i++ { for i := 0; i < 101; i++ {
go q.Enqueue(i) go q.Enqueue(i)
} }
time.Sleep(500 * time.Millisecond) time.Sleep(time.Second)
mu.Lock() mu.Lock()
require.Equal(t, 100, total) // One is missing, stuck in the last batch! require.Equal(t, 100, total) // One is missing, stuck in the last batch!
require.Equal(t, 4, len(batches)) require.Equal(t, 4, len(batches))