Cosmetic changes

pull/751/head
binwiederhier 2023-05-30 13:50:24 -04:00 committed by nimbleghost
parent f94bb1aa30
commit 7b23158e0a
3 changed files with 35 additions and 45 deletions

View File

@ -270,7 +270,7 @@ func newSqliteCache(filename, startupQueries string, cacheDuration time.Duration
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err := setupDB(db, startupQueries, cacheDuration); err != nil { if err := setupMessagesDB(db, startupQueries, cacheDuration); err != nil {
return nil, err return nil, err
} }
var queue *util.BatchingQueue[*message] var queue *util.BatchingQueue[*message]
@ -749,7 +749,7 @@ func (c *messageCache) Close() error {
return c.db.Close() return c.db.Close()
} }
func setupDB(db *sql.DB, startupQueries string, cacheDuration time.Duration) error { func setupMessagesDB(db *sql.DB, startupQueries string, cacheDuration time.Duration) error {
// Run startup queries // Run startup queries
if startupQueries != "" { if startupQueries != "" {
if _, err := db.Exec(startupQueries); err != nil { if _, err := db.Exec(startupQueries); err != nil {

View File

@ -55,7 +55,7 @@ type Server struct {
messagesHistory []int64 // Last n values of the messages counter, used to determine rate messagesHistory []int64 // Last n values of the messages counter, used to determine rate
userManager *user.Manager // Might be nil! userManager *user.Manager // Might be nil!
messageCache *messageCache // Database that stores the messages messageCache *messageCache // Database that stores the messages
webPushSubscriptionStore *webPushSubscriptionStore // Database that stores web push subscriptions webPushSubscriptionStore *webPushStore // Database that stores web push subscriptions
fileCache *fileCache // File system based cache that stores attachments fileCache *fileCache // File system based cache that stores attachments
stripe stripeAPI // Stripe API, can be replaced with a mock stripe stripeAPI // Stripe API, can be replaced with a mock
priceCache *util.LookupCache[map[string]int64] // Stripe price ID -> price as cents (USD implied!) priceCache *util.LookupCache[map[string]int64] // Stripe price ID -> price as cents (USD implied!)
@ -227,12 +227,12 @@ func createMessageCache(conf *Config) (*messageCache, error) {
return newMemCache() return newMemCache()
} }
func createWebPushSubscriptionStore(conf *Config) (*webPushSubscriptionStore, error) { func createWebPushSubscriptionStore(conf *Config) (*webPushStore, error) {
if !conf.WebPushEnabled { if !conf.WebPushEnabled {
return nil, nil return nil, nil
} }
return newWebPushSubscriptionStore(conf.WebPushSubscriptionsFile) return newWebPushStore(conf.WebPushSubscriptionsFile)
} }
// Run executes the main server. It listens on HTTP (+ HTTPS, if configured), and starts // Run executes the main server. It listens on HTTP (+ HTTPS, if configured), and starts
@ -979,18 +979,12 @@ func (s *Server) forwardPollRequest(v *visitor, m *message) {
func (s *Server) publishToWebPushEndpoints(v *visitor, m *message) { func (s *Server) publishToWebPushEndpoints(v *visitor, m *message) {
subscriptions, err := s.webPushSubscriptionStore.GetSubscriptionsForTopic(m.Topic) subscriptions, err := s.webPushSubscriptionStore.GetSubscriptionsForTopic(m.Topic)
if err != nil { if err != nil {
logvm(v, m).Err(err).Warn("Unable to publish web push messages") logvm(v, m).Err(err).Warn("Unable to publish web push messages")
return return
} }
totalCount := len(subscriptions) ctx := log.Context{"topic": m.Topic, "message_id": m.ID, "total_count": len(subscriptions)}
wg := &sync.WaitGroup{}
wg.Add(totalCount)
ctx := log.Context{"topic": m.Topic, "message_id": m.ID, "total_count": totalCount}
// Importing the emojis in the service worker would add unnecessary complexity, // Importing the emojis in the service worker would add unnecessary complexity,
// simply do it here for web push notifications instead // simply do it here for web push notifications instead
@ -1017,7 +1011,6 @@ func (s *Server) publishToWebPushEndpoints(v *visitor, m *message) {
for i, xi := range subscriptions { for i, xi := range subscriptions {
go func(i int, sub webPushSubscription) { go func(i int, sub webPushSubscription) {
defer wg.Done()
ctx := log.Context{"endpoint": sub.BrowserSubscription.Endpoint, "username": sub.Username, "topic": m.Topic, "message_id": m.ID} ctx := log.Context{"endpoint": sub.BrowserSubscription.Endpoint, "username": sub.Username, "topic": m.Topic, "message_id": m.ID}
payload := &webPushPayload{ payload := &webPushPayload{

View File

@ -6,11 +6,10 @@ import (
_ "github.com/mattn/go-sqlite3" // SQLite driver _ "github.com/mattn/go-sqlite3" // SQLite driver
) )
// Messages cache
const ( const (
createWebPushSubscriptionsTableQuery = ` createWebPushSubscriptionsTableQuery = `
BEGIN; BEGIN;
CREATE TABLE IF NOT EXISTS web_push_subscriptions ( CREATE TABLE IF NOT EXISTS subscriptions (
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY AUTOINCREMENT,
topic TEXT NOT NULL, topic TEXT NOT NULL,
username TEXT, username TEXT,
@ -19,60 +18,58 @@ const (
key_p256dh TEXT NOT NULL, key_p256dh TEXT NOT NULL,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
); );
CREATE INDEX IF NOT EXISTS idx_topic ON web_push_subscriptions (topic); CREATE INDEX IF NOT EXISTS idx_topic ON subscriptions (topic);
CREATE INDEX IF NOT EXISTS idx_endpoint ON web_push_subscriptions (endpoint); CREATE INDEX IF NOT EXISTS idx_endpoint ON subscriptions (endpoint);
CREATE UNIQUE INDEX IF NOT EXISTS idx_topic_endpoint ON web_push_subscriptions (topic, endpoint); CREATE UNIQUE INDEX IF NOT EXISTS idx_topic_endpoint ON subscriptions (topic, endpoint);
COMMIT; COMMIT;
` `
insertWebPushSubscriptionQuery = ` insertWebPushSubscriptionQuery = `
INSERT OR REPLACE INTO web_push_subscriptions (topic, username, endpoint, key_auth, key_p256dh) INSERT OR REPLACE INTO subscriptions (topic, username, endpoint, key_auth, key_p256dh)
VALUES (?, ?, ?, ?, ?); VALUES (?, ?, ?, ?, ?)
` `
deleteWebPushSubscriptionByEndpointQuery = `DELETE FROM web_push_subscriptions WHERE endpoint = ?` deleteWebPushSubscriptionByEndpointQuery = `DELETE FROM subscriptions WHERE endpoint = ?`
deleteWebPushSubscriptionByUsernameQuery = `DELETE FROM web_push_subscriptions WHERE username = ?` deleteWebPushSubscriptionByUsernameQuery = `DELETE FROM subscriptions WHERE username = ?`
deleteWebPushSubscriptionByTopicAndEndpointQuery = `DELETE FROM web_push_subscriptions WHERE topic = ? AND endpoint = ?` deleteWebPushSubscriptionByTopicAndEndpointQuery = `DELETE FROM subscriptions WHERE topic = ? AND endpoint = ?`
selectWebPushSubscriptionsForTopicQuery = `SELECT endpoint, key_auth, key_p256dh, username FROM web_push_subscriptions WHERE topic = ?` selectWebPushSubscriptionsForTopicQuery = `SELECT endpoint, key_auth, key_p256dh, username FROM subscriptions WHERE topic = ?`
selectWebPushSubscriptionsCountQuery = `SELECT COUNT(*) FROM web_push_subscriptions` selectWebPushSubscriptionsCountQuery = `SELECT COUNT(*) FROM subscriptions`
) )
type webPushSubscriptionStore struct { type webPushStore struct {
db *sql.DB db *sql.DB
} }
func newWebPushSubscriptionStore(filename string) (*webPushSubscriptionStore, error) { func newWebPushStore(filename string) (*webPushStore, error) {
db, err := sql.Open("sqlite3", filename) db, err := sql.Open("sqlite3", filename)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err := setupSubscriptionDb(db); err != nil { if err := setupSubscriptionsDB(db); err != nil {
return nil, err return nil, err
} }
webPushSubscriptionStore := &webPushSubscriptionStore{ return &webPushStore{
db: db, db: db,
} }, nil
return webPushSubscriptionStore, nil
} }
func setupSubscriptionDb(db *sql.DB) error { func setupSubscriptionsDB(db *sql.DB) error {
// If 'messages' table does not exist, this must be a new database // If 'subscriptions' table does not exist, this must be a new database
rowsMC, err := db.Query(selectWebPushSubscriptionsCountQuery) rowsMC, err := db.Query(selectWebPushSubscriptionsCountQuery)
if err != nil { if err != nil {
return setupNewSubscriptionDb(db) return setupNewSubscriptionsDB(db)
} }
rowsMC.Close() return rowsMC.Close()
return nil
} }
func setupNewSubscriptionDb(db *sql.DB) error { func setupNewSubscriptionsDB(db *sql.DB) error {
if _, err := db.Exec(createWebPushSubscriptionsTableQuery); err != nil { if _, err := db.Exec(createWebPushSubscriptionsTableQuery); err != nil {
return err return err
} }
return nil return nil
} }
func (c *webPushSubscriptionStore) AddSubscription(topic string, username string, subscription webPushSubscribePayload) error { func (c *webPushStore) AddSubscription(topic string, username string, subscription webPushSubscribePayload) error {
_, err := c.db.Exec( _, err := c.db.Exec(
insertWebPushSubscriptionQuery, insertWebPushSubscriptionQuery,
topic, topic,
@ -84,7 +81,7 @@ func (c *webPushSubscriptionStore) AddSubscription(topic string, username string
return err return err
} }
func (c *webPushSubscriptionStore) RemoveSubscription(topic string, endpoint string) error { func (c *webPushStore) RemoveSubscription(topic string, endpoint string) error {
_, err := c.db.Exec( _, err := c.db.Exec(
deleteWebPushSubscriptionByTopicAndEndpointQuery, deleteWebPushSubscriptionByTopicAndEndpointQuery,
topic, topic,
@ -93,14 +90,14 @@ func (c *webPushSubscriptionStore) RemoveSubscription(topic string, endpoint str
return err return err
} }
func (c *webPushSubscriptionStore) GetSubscriptionsForTopic(topic string) (subscriptions []webPushSubscription, err error) { func (c *webPushStore) GetSubscriptionsForTopic(topic string) (subscriptions []webPushSubscription, err error) {
rows, err := c.db.Query(selectWebPushSubscriptionsForTopicQuery, topic) rows, err := c.db.Query(selectWebPushSubscriptionsForTopicQuery, topic)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer rows.Close() defer rows.Close()
data := []webPushSubscription{} var data []webPushSubscription
for rows.Next() { for rows.Next() {
i := webPushSubscription{} i := webPushSubscription{}
err = rows.Scan(&i.BrowserSubscription.Endpoint, &i.BrowserSubscription.Keys.Auth, &i.BrowserSubscription.Keys.P256dh, &i.Username) err = rows.Scan(&i.BrowserSubscription.Endpoint, &i.BrowserSubscription.Keys.Auth, &i.BrowserSubscription.Keys.P256dh, &i.Username)
@ -112,7 +109,7 @@ func (c *webPushSubscriptionStore) GetSubscriptionsForTopic(topic string) (subsc
return data, nil return data, nil
} }
func (c *webPushSubscriptionStore) ExpireWebPushEndpoint(endpoint string) error { func (c *webPushStore) ExpireWebPushEndpoint(endpoint string) error {
_, err := c.db.Exec( _, err := c.db.Exec(
deleteWebPushSubscriptionByEndpointQuery, deleteWebPushSubscriptionByEndpointQuery,
endpoint, endpoint,
@ -120,13 +117,13 @@ func (c *webPushSubscriptionStore) ExpireWebPushEndpoint(endpoint string) error
return err return err
} }
func (c *webPushSubscriptionStore) ExpireWebPushForUser(username string) error { func (c *webPushStore) ExpireWebPushForUser(username string) error {
_, err := c.db.Exec( _, err := c.db.Exec(
deleteWebPushSubscriptionByUsernameQuery, deleteWebPushSubscriptionByUsernameQuery,
username, username,
) )
return err return err
} }
func (c *webPushSubscriptionStore) Close() error { func (c *webPushStore) Close() error {
return c.db.Close() return c.db.Close()
} }