Disconnect from firehoses in parallel when shutting down

main
Max Ignatenko 2024-04-06 21:59:21 +01:00
parent 1358bc3f08
commit ecf2fc57d8
1 changed files with 9 additions and 2 deletions

View File

@ -14,6 +14,7 @@ import (
"runtime" "runtime"
"runtime/debug" "runtime/debug"
"strings" "strings"
"sync"
"syscall" "syscall"
"time" "time"
@ -153,11 +154,17 @@ func runConsumers(ctx context.Context, db *gorm.DB, doneCh chan struct{}) {
} }
case <-ctx.Done(): case <-ctx.Done():
var wg sync.WaitGroup
for host, handle := range running { for host, handle := range running {
handle.cancel() wg.Add(1)
_ = handle.consumer.Wait(ctx) go func(handle consumerHandle) {
handle.cancel()
_ = handle.consumer.Wait(ctx)
wg.Done()
}(handle)
delete(running, host) delete(running, host)
} }
wg.Wait()
case v := <-ticker.C: case v := <-ticker.C:
// Non-blocking send. // Non-blocking send.