Implement push subscription expiry

This commit is contained in:
nimbleghost 2023-06-02 14:45:05 +02:00
parent 47ad024ec7
commit 0f0074cbab
16 changed files with 272 additions and 102 deletions

View file

@ -23,6 +23,12 @@ const (
DefaultStripePriceCacheDuration = 3 * time.Hour // Time to keep Stripe prices cached in memory before a refresh is needed
)
// Defines default web push settings
const (
DefaultWebPushExpiryWarningDuration = 7 * 24 * time.Hour
DefaultWebPushExpiryDuration = DefaultWebPushExpiryWarningDuration + 24*time.Hour
)
// Defines all global and per-visitor limits
// - message size limit: the max number of bytes for a message
// - total topic limit: max number of topics overall
@ -152,6 +158,8 @@ type Config struct {
WebPushPublicKey string
WebPushSubscriptionsFile string
WebPushEmailAddress string
WebPushExpiryDuration time.Duration
WebPushExpiryWarningDuration time.Duration
}
// NewConfig instantiates a default new server config
@ -238,5 +246,7 @@ func NewConfig() *Config {
WebPushPublicKey: "",
WebPushSubscriptionsFile: "",
WebPushEmailAddress: "",
WebPushExpiryDuration: DefaultWebPushExpiryDuration,
WebPushExpiryWarningDuration: DefaultWebPushExpiryWarningDuration,
}
}

View file

@ -29,6 +29,7 @@ const (
tagResetter = "resetter"
tagWebsocket = "websocket"
tagMatrix = "matrix"
tagWebPush = "web_push"
)
var (

View file

@ -47,6 +47,8 @@
# web-push-private-key:
# web-push-subscriptions-file:
# web-push-email-address:
# web-push-expiry-warning-duration: 168h
# web-push-expiry-duration: 192h
# If "cache-file" is set, messages are cached in a local SQLite database instead of only in-memory.
# This allows for service restarts without losing messages in support of the since= parameter.

View file

@ -15,6 +15,9 @@ func (s *Server) execManager() {
s.pruneTokens()
s.pruneAttachments()
s.pruneMessages()
if s.config.WebPushEnabled {
s.expireOrNotifyOldSubscriptions()
}
// Message count per topic
var messagesCached int

View file

@ -43,6 +43,7 @@ func (s *Server) publishToWebPushEndpoints(v *visitor, m *message) {
subscriptions, err := s.webPush.SubscriptionsForTopic(m.Topic)
if err != nil {
logvm(v, m).Err(err).Warn("Unable to publish web push messages")
return
}
@ -50,42 +51,61 @@ func (s *Server) publishToWebPushEndpoints(v *visitor, m *message) {
go func(i int, sub webPushSubscription) {
ctx := log.Context{"endpoint": sub.BrowserSubscription.Endpoint, "username": sub.UserID, "topic": m.Topic, "message_id": m.ID}
payload := &webPushPayload{
SubscriptionID: fmt.Sprintf("%s/%s", s.config.BaseURL, m.Topic),
Message: *m,
}
jsonPayload, err := json.Marshal(payload)
if err != nil {
logvm(v, m).Err(err).Fields(ctx).Debug("Unable to publish web push message")
return
}
resp, err := webpush.SendNotification(jsonPayload, &sub.BrowserSubscription, &webpush.Options{
Subscriber: s.config.WebPushEmailAddress,
VAPIDPublicKey: s.config.WebPushPublicKey,
VAPIDPrivateKey: s.config.WebPushPrivateKey,
// Deliverability on iOS isn't great with lower urgency values,
// and thus we can't really map lower ntfy priorities to lower urgency values
Urgency: webpush.UrgencyHigh,
})
if err != nil {
logvm(v, m).Err(err).Fields(ctx).Debug("Unable to publish web push message")
if err := s.webPush.RemoveByEndpoint(sub.BrowserSubscription.Endpoint); err != nil {
logvm(v, m).Err(err).Fields(ctx).Warn("Unable to expire subscription")
}
return
}
// May want to handle at least 429 differently, but for now treat all errors the same
if !(200 <= resp.StatusCode && resp.StatusCode <= 299) {
logvm(v, m).Fields(ctx).Field("response", resp).Debug("Unable to publish web push message")
if err := s.webPush.RemoveByEndpoint(sub.BrowserSubscription.Endpoint); err != nil {
logvm(v, m).Err(err).Fields(ctx).Warn("Unable to expire subscription")
}
return
}
s.sendWebPushNotification(newWebPushPayload(fmt.Sprintf("%s/%s", s.config.BaseURL, m.Topic), *m), &sub, &ctx)
}(i, xi)
}
}
func (s *Server) expireOrNotifyOldSubscriptions() {
subscriptions, err := s.webPush.ExpireAndGetExpiringSubscriptions(s.config.WebPushExpiryWarningDuration, s.config.WebPushExpiryDuration)
if err != nil {
log.Tag(tagWebPush).Err(err).Warn("Unable to publish expiry imminent warning")
return
}
for i, xi := range subscriptions {
go func(i int, sub webPushSubscription) {
ctx := log.Context{"endpoint": sub.BrowserSubscription.Endpoint}
s.sendWebPushNotification(newWebPushSubscriptionExpiringPayload(), &sub, &ctx)
}(i, xi)
}
log.Tag(tagWebPush).Debug("Expired old subscriptions and published %d expiry imminent warnings", len(subscriptions))
}
func (s *Server) sendWebPushNotification(payload any, sub *webPushSubscription, ctx *log.Context) {
jsonPayload, err := json.Marshal(payload)
if err != nil {
log.Tag(tagWebPush).Err(err).Fields(*ctx).Debug("Unable to publish web push message")
return
}
resp, err := webpush.SendNotification(jsonPayload, &sub.BrowserSubscription, &webpush.Options{
Subscriber: s.config.WebPushEmailAddress,
VAPIDPublicKey: s.config.WebPushPublicKey,
VAPIDPrivateKey: s.config.WebPushPrivateKey,
// Deliverability on iOS isn't great with lower urgency values,
// and thus we can't really map lower ntfy priorities to lower urgency values
Urgency: webpush.UrgencyHigh,
})
if err != nil {
log.Tag(tagWebPush).Err(err).Fields(*ctx).Debug("Unable to publish web push message")
if err := s.webPush.RemoveByEndpoint(sub.BrowserSubscription.Endpoint); err != nil {
log.Tag(tagWebPush).Err(err).Fields(*ctx).Warn("Unable to expire subscription")
}
return
}
// May want to handle at least 429 differently, but for now treat all errors the same
if !(200 <= resp.StatusCode && resp.StatusCode <= 299) {
log.Tag(tagWebPush).Fields(*ctx).Field("response", resp).Debug("Unable to publish web push message")
if err := s.webPush.RemoveByEndpoint(sub.BrowserSubscription.Endpoint); err != nil {
log.Tag(tagWebPush).Err(err).Fields(*ctx).Warn("Unable to expire subscription")
}
return
}
}

View file

@ -157,8 +157,42 @@ func TestServer_WebPush_PublishExpire(t *testing.T) {
requireSubscriptionCount(t, s, "test-topic-abc", 0)
}
func TestServer_WebPush_Expiry(t *testing.T) {
s := newTestServer(t, newTestConfigWithWebPush(t))
var received atomic.Bool
upstreamServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, err := io.ReadAll(r.Body)
require.Nil(t, err)
w.WriteHeader(200)
w.Write([]byte(``))
received.Store(true)
}))
defer upstreamServer.Close()
addSubscription(t, s, "test-topic", upstreamServer.URL+"/push-receive")
requireSubscriptionCount(t, s, "test-topic", 1)
_, err := s.webPush.db.Exec("UPDATE subscriptions SET updated_at = datetime('now', '-7 days')")
require.Nil(t, err)
s.expireOrNotifyOldSubscriptions()
requireSubscriptionCount(t, s, "test-topic", 1)
waitFor(t, func() bool {
return received.Load()
})
_, err = s.webPush.db.Exec("UPDATE subscriptions SET updated_at = datetime('now', '-8 days')")
require.Nil(t, err)
s.expireOrNotifyOldSubscriptions()
requireSubscriptionCount(t, s, "test-topic", 0)
}
func payloadForTopics(t *testing.T, topics []string) string {
topicsJson, err := json.Marshal(topics)
topicsJSON, err := json.Marshal(topics)
require.Nil(t, err)
return fmt.Sprintf(`{
@ -170,7 +204,7 @@ func payloadForTopics(t *testing.T, topics []string) string {
"auth": "auth-key"
}
}
}`, topicsJson)
}`, topicsJSON)
}
func addSubscription(t *testing.T, s *Server, topic string, url string) {

View file

@ -468,10 +468,29 @@ type apiStripeSubscriptionDeletedEvent struct {
}
type webPushPayload struct {
Event string `json:"event"`
SubscriptionID string `json:"subscription_id"`
Message message `json:"message"`
}
func newWebPushPayload(subscriptionID string, message message) webPushPayload {
return webPushPayload{
Event: "message",
SubscriptionID: subscriptionID,
Message: message,
}
}
type webPushControlMessagePayload struct {
Event string `json:"event"`
}
func newWebPushSubscriptionExpiringPayload() webPushControlMessagePayload {
return webPushControlMessagePayload{
Event: "subscription_expiring",
}
}
type webPushSubscription struct {
BrowserSubscription webpush.Subscription
UserID string

View file

@ -3,6 +3,7 @@ package server
import (
"database/sql"
"fmt"
"time"
"github.com/SherClockHolmes/webpush-go"
_ "github.com/mattn/go-sqlite3" // SQLite driver
@ -18,7 +19,8 @@ const (
endpoint TEXT NOT NULL,
key_auth TEXT NOT NULL,
key_p256dh TEXT NOT NULL,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
warning_sent BOOLEAN DEFAULT FALSE
);
CREATE INDEX IF NOT EXISTS idx_topic ON subscriptions (topic);
CREATE INDEX IF NOT EXISTS idx_endpoint ON subscriptions (endpoint);
@ -32,8 +34,12 @@ const (
deleteWebPushSubscriptionByEndpointQuery = `DELETE FROM subscriptions WHERE endpoint = ?`
deleteWebPushSubscriptionByUserIDQuery = `DELETE FROM subscriptions WHERE user_id = ?`
deleteWebPushSubscriptionByTopicAndEndpointQuery = `DELETE FROM subscriptions WHERE topic = ? AND endpoint = ?`
deleteWebPushSubscriptionsByAgeQuery = `DELETE FROM subscriptions WHERE warning_sent = 1 AND updated_at <= datetime('now', ?)`
selectWebPushSubscriptionsForTopicQuery = `SELECT endpoint, key_auth, key_p256dh, user_id FROM subscriptions WHERE topic = ?`
selectWebPushSubscriptionsForTopicQuery = `SELECT endpoint, key_auth, key_p256dh, user_id FROM subscriptions WHERE topic = ?`
selectWebPushSubscriptionsExpiringSoonQuery = `SELECT DISTINCT endpoint, key_auth, key_p256dh FROM subscriptions WHERE warning_sent = 0 AND updated_at <= datetime('now', ?)`
updateWarningSentQuery = `UPDATE subscriptions SET warning_sent = true WHERE warning_sent = 0 AND updated_at <= datetime('now', ?)`
selectWebPushSubscriptionsCountQuery = `SELECT COUNT(*) FROM subscriptions`
)
@ -72,7 +78,6 @@ func setupNewSubscriptionsDB(db *sql.DB) error {
}
func (c *webPushStore) UpdateSubscriptions(topics []string, userID string, subscription webpush.Subscription) error {
fmt.Printf("AAA")
tx, err := c.db.Begin()
if err != nil {
return err
@ -121,6 +126,49 @@ func (c *webPushStore) SubscriptionsForTopic(topic string) (subscriptions []webP
return data, nil
}
func (c *webPushStore) ExpireAndGetExpiringSubscriptions(warningDuration time.Duration, expiryDuration time.Duration) (subscriptions []webPushSubscription, err error) {
tx, err := c.db.Begin()
if err != nil {
return nil, err
}
defer tx.Rollback()
_, err = tx.Exec(deleteWebPushSubscriptionsByAgeQuery, fmt.Sprintf("-%.2f seconds", expiryDuration.Seconds()))
if err != nil {
return nil, err
}
rows, err := tx.Query(selectWebPushSubscriptionsExpiringSoonQuery, fmt.Sprintf("-%.2f seconds", warningDuration.Seconds()))
if err != nil {
return nil, err
}
defer rows.Close()
var data []webPushSubscription
for rows.Next() {
i := webPushSubscription{}
err = rows.Scan(&i.BrowserSubscription.Endpoint, &i.BrowserSubscription.Keys.Auth, &i.BrowserSubscription.Keys.P256dh)
fmt.Printf("%v+", i)
if err != nil {
return nil, err
}
data = append(data, i)
}
// also set warning as sent
_, err = tx.Exec(updateWarningSentQuery, fmt.Sprintf("-%.2f seconds", warningDuration.Seconds()))
if err != nil {
return nil, err
}
err = tx.Commit()
if err != nil {
return nil, err
}
return data, nil
}
func (c *webPushStore) RemoveByEndpoint(endpoint string) error {
_, err := c.db.Exec(
deleteWebPushSubscriptionByEndpointQuery,
@ -136,6 +184,7 @@ func (c *webPushStore) RemoveByUserID(userID string) error {
)
return err
}
func (c *webPushStore) Close() error {
return c.db.Close()
}