Add exponential backoff to firehose connection attempts and export connection status directly
parent
e986d370ec
commit
e6176a2217
|
@ -13,6 +13,7 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/cenkalti/backoff/v4"
|
||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/rs/zerolog"
|
"github.com/rs/zerolog"
|
||||||
|
@ -82,6 +83,13 @@ func (c *Consumer) run(ctx context.Context) {
|
||||||
log := zerolog.Ctx(ctx).With().Str("pds", c.remote.Host).Logger()
|
log := zerolog.Ctx(ctx).With().Str("pds", c.remote.Host).Logger()
|
||||||
ctx = log.WithContext(ctx)
|
ctx = log.WithContext(ctx)
|
||||||
|
|
||||||
|
backoffTimer := backoff.NewExponentialBackOff(
|
||||||
|
backoff.WithMaxElapsedTime(0),
|
||||||
|
backoff.WithInitialInterval(time.Second),
|
||||||
|
backoff.WithMaxInterval(5*time.Minute),
|
||||||
|
)
|
||||||
|
pdsOnline.WithLabelValues(c.remote.Host).Set(0)
|
||||||
|
|
||||||
defer close(c.running)
|
defer close(c.running)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
@ -95,13 +103,20 @@ func (c *Consumer) run(ctx context.Context) {
|
||||||
eventCounter.DeletePartialMatch(prometheus.Labels{"remote": c.remote.Host})
|
eventCounter.DeletePartialMatch(prometheus.Labels{"remote": c.remote.Host})
|
||||||
reposDiscovered.DeletePartialMatch(prometheus.Labels{"remote": c.remote.Host})
|
reposDiscovered.DeletePartialMatch(prometheus.Labels{"remote": c.remote.Host})
|
||||||
postsByLanguageIndexed.DeletePartialMatch(prometheus.Labels{"remote": c.remote.Host})
|
postsByLanguageIndexed.DeletePartialMatch(prometheus.Labels{"remote": c.remote.Host})
|
||||||
|
pdsOnline.DeletePartialMatch(prometheus.Labels{"remote": c.remote.Host})
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
|
start := time.Now()
|
||||||
if err := c.runOnce(ctx); err != nil {
|
if err := c.runOnce(ctx); err != nil {
|
||||||
log.Error().Err(err).Msgf("Consumer of %q failed (will be restarted): %s", c.remote.Host, err)
|
log.Error().Err(err).Msgf("Consumer of %q failed (will be restarted): %s", c.remote.Host, err)
|
||||||
connectionFailures.WithLabelValues(c.remote.Host).Inc()
|
connectionFailures.WithLabelValues(c.remote.Host).Inc()
|
||||||
}
|
}
|
||||||
time.Sleep(time.Second)
|
if time.Since(start) > backoffTimer.MaxInterval*3 {
|
||||||
|
// XXX: assume that c.runOnce did some useful work in this case,
|
||||||
|
// even though it might have been stuck on some absurdly long timeouts.
|
||||||
|
backoffTimer.Reset()
|
||||||
|
}
|
||||||
|
time.Sleep(backoffTimer.NextBackOff())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -132,6 +147,9 @@ func (c *Consumer) runOnce(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
defer conn.Close()
|
defer conn.Close()
|
||||||
|
|
||||||
|
pdsOnline.WithLabelValues(c.remote.Host).Set(1)
|
||||||
|
defer func() { pdsOnline.WithLabelValues(c.remote.Host).Set(0) }()
|
||||||
|
|
||||||
ch := make(chan bool)
|
ch := make(chan bool)
|
||||||
defer close(ch)
|
defer close(ch)
|
||||||
go func() {
|
go func() {
|
||||||
|
|
|
@ -29,3 +29,8 @@ var connectionFailures = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||||
Name: "consumer_connection_failures",
|
Name: "consumer_connection_failures",
|
||||||
Help: "Counter of firehose connection failures",
|
Help: "Counter of firehose connection failures",
|
||||||
}, []string{"remote"})
|
}, []string{"remote"})
|
||||||
|
|
||||||
|
var pdsOnline = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
||||||
|
Name: "consumer_connection_up",
|
||||||
|
Help: "Status of a connection. 1 - up and running.",
|
||||||
|
}, []string{"remote"})
|
||||||
|
|
1
go.mod
1
go.mod
|
@ -32,6 +32,7 @@ require (
|
||||||
require (
|
require (
|
||||||
github.com/beorn7/perks v1.0.1 // indirect
|
github.com/beorn7/perks v1.0.1 // indirect
|
||||||
github.com/carlmjohnson/versioninfo v0.22.5 // indirect
|
github.com/carlmjohnson/versioninfo v0.22.5 // indirect
|
||||||
|
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
|
||||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect
|
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect
|
||||||
github.com/felixge/httpsnoop v1.0.4 // indirect
|
github.com/felixge/httpsnoop v1.0.4 // indirect
|
||||||
|
|
2
go.sum
2
go.sum
|
@ -10,6 +10,8 @@ github.com/bluesky-social/indigo v0.0.0-20240517204223-0daef8805413 h1:kRC2618UP
|
||||||
github.com/bluesky-social/indigo v0.0.0-20240517204223-0daef8805413/go.mod h1:dBIOGhsiK0rgEETnxiGiuEyrSx6DKxEotHIPbiKD6WU=
|
github.com/bluesky-social/indigo v0.0.0-20240517204223-0daef8805413/go.mod h1:dBIOGhsiK0rgEETnxiGiuEyrSx6DKxEotHIPbiKD6WU=
|
||||||
github.com/carlmjohnson/versioninfo v0.22.5 h1:O00sjOLUAFxYQjlN/bzYTuZiS0y6fWDQjMRvwtKgwwc=
|
github.com/carlmjohnson/versioninfo v0.22.5 h1:O00sjOLUAFxYQjlN/bzYTuZiS0y6fWDQjMRvwtKgwwc=
|
||||||
github.com/carlmjohnson/versioninfo v0.22.5/go.mod h1:QT9mph3wcVfISUKd0i9sZfVrPviHuSF+cUtLjm2WSf8=
|
github.com/carlmjohnson/versioninfo v0.22.5/go.mod h1:QT9mph3wcVfISUKd0i9sZfVrPviHuSF+cUtLjm2WSf8=
|
||||||
|
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
|
||||||
|
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
|
||||||
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||||
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||||
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
|
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
|
||||||
|
|
Loading…
Reference in New Issue