Compare commits

...

6 Commits

Author SHA1 Message Date
Philipp Heckel a327fe802e Make web app work for updated notifications 2022-03-25 11:01:07 -04:00
Philipp Heckel 86baa80ab8 Merge branch 'main' into update-messages 2022-03-24 19:30:27 -04:00
Philipp Heckel 0a77c5296b Add since=$ID/$timestamp parsing logic 2022-03-24 17:05:07 -04:00
Philipp Heckel b7871b80ab Merge branch 'main' into update-messages 2022-03-24 13:27:04 -04:00
Philipp Heckel 8939173a1e Continued work 2022-03-23 21:51:38 -04:00
Philipp Heckel 8848829dfa WIP: Update messages 2022-03-23 16:39:22 -04:00
9 changed files with 258 additions and 43 deletions

View File

@ -40,14 +40,16 @@ var (
errHTTPBadRequestAttachmentsExpiryBeforeDelivery = &errHTTP{40015, http.StatusBadRequest, "invalid request: attachment expiry before delayed delivery date", "https://ntfy.sh/docs/publish/#scheduled-delivery"}
errHTTPBadRequestWebSocketsUpgradeHeaderMissing = &errHTTP{40016, http.StatusBadRequest, "invalid request: client not using the websocket protocol", "https://ntfy.sh/docs/subscribe/api/#websockets"}
errHTTPBadRequestJSONInvalid = &errHTTP{40017, http.StatusBadRequest, "invalid request: request body must be message JSON", "https://ntfy.sh/docs/publish/#publish-as-json"}
errHTTPNotFound = &errHTTP{40401, http.StatusNotFound, "page not found", ""}
errHTTPNotFound = &errHTTP{40401, http.StatusNotFound, "not found", ""}
errHTTPNotFoundMessageID = &errHTTP{40402, http.StatusNotFound, "not found: unable to find message with this ID", "https://ntfy.sh/docs/publish/#updating-messages"} // FIXME LINK
errHTTPUnauthorized = &errHTTP{40101, http.StatusUnauthorized, "unauthorized", "https://ntfy.sh/docs/publish/#authentication"}
errHTTPForbidden = &errHTTP{40301, http.StatusForbidden, "forbidden", "https://ntfy.sh/docs/publish/#authentication"}
errHTTPTooManyRequestsLimitRequests = &errHTTP{42901, http.StatusTooManyRequests, "limit reached: too many requests, please be nice", "https://ntfy.sh/docs/publish/#limitations"}
errHTTPTooManyRequestsLimitEmails = &errHTTP{42902, http.StatusTooManyRequests, "limit reached: too many emails, please be nice", "https://ntfy.sh/docs/publish/#limitations"}
errHTTPTooManyRequestsLimitSubscriptions = &errHTTP{42903, http.StatusTooManyRequests, "limit reached: too many active subscriptions, please be nice", "https://ntfy.sh/docs/publish/#limitations"}
errHTTPTooManyRequestsLimitTotalTopics = &errHTTP{42904, http.StatusTooManyRequests, "limit reached: the total number of topics on the server has been reached, please contact the admin", "https://ntfy.sh/docs/publish/#limitations"}
errHTTPTooManyRequestsAttachmentBandwidthLimit = &errHTTP{42905, http.StatusTooManyRequests, "too many requests: daily bandwidth limit reached", "https://ntfy.sh/docs/publish/#limitations"}
errHTTPTooManyRequestsAttachmentBandwidthLimit = &errHTTP{42905, http.StatusTooManyRequests, "limit reached: daily bandwidth limit reached, please be nice", "https://ntfy.sh/docs/publish/#limitations"}
errHTTPTooManyRequestsUpdatingTooQuickly = &errHTTP{42906, http.StatusTooManyRequests, "limit reached: too many consecutive message updates", "https://ntfy.sh/docs/publish/#updating-messages"} // FIXME LINK
errHTTPInternalError = &errHTTP{50001, http.StatusInternalServerError, "internal server error", ""}
errHTTPInternalErrorInvalidFilePath = &errHTTP{50002, http.StatusInternalServerError, "internal server error: invalid file path", ""}
)

View File

@ -23,6 +23,7 @@ const (
id INTEGER PRIMARY KEY AUTOINCREMENT,
mid TEXT NOT NULL,
time INT NOT NULL,
updated INT NOT NULL,
topic TEXT NOT NULL,
message TEXT NOT NULL,
title TEXT NOT NULL,
@ -43,41 +44,47 @@ const (
COMMIT;
`
insertMessageQuery = `
INSERT INTO messages (mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding, published)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
INSERT INTO messages (mid, time, updated, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding, published)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`
updateMessageQuery = `UPDATE messages SET updated = ?, message = ?, title = ?, priority = ?, tags = ?, click = ? WHERE topic = ? AND mid = ?`
pruneMessagesQuery = `DELETE FROM messages WHERE time < ? AND published = 1`
selectRowIDFromMessageID = `SELECT id FROM messages WHERE topic = ? AND mid = ?`
selectMessagesSinceTimeQuery = `
SELECT mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
SELECT mid, time, updated, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
FROM messages
WHERE topic = ? AND time >= ? AND published = 1
ORDER BY time, id
`
selectMessagesSinceTimeIncludeScheduledQuery = `
SELECT mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
SELECT mid, time, updated, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
FROM messages
WHERE topic = ? AND time >= ?
ORDER BY time, id
`
selectMessagesSinceIDQuery = `
SELECT mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
SELECT mid, time, updated, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
FROM messages
WHERE topic = ? AND id > ? AND published = 1
WHERE topic = ? AND id >= ? AND published = 1
ORDER BY time, id
`
selectMessagesSinceIDIncludeScheduledQuery = `
SELECT mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
SELECT mid, time, updated, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
FROM messages
WHERE topic = ? AND (id > ? OR published = 0)
WHERE topic = ? AND (id >= ? OR published = 0)
ORDER BY time, id
`
selectMessagesDueQuery = `
SELECT mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
SELECT mid, time, updated, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
FROM messages
WHERE time <= ? AND published = 0
ORDER BY time, id
`
selectMessageByIDQuery = `
SELECT mid, time, updated, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
FROM messages
WHERE topic = ? AND mid = ?
`
updateMessagePublishedQuery = `UPDATE messages SET published = 1 WHERE mid = ?`
selectMessagesCountQuery = `SELECT COUNT(*) FROM messages`
selectMessageCountForTopicQuery = `SELECT COUNT(*) FROM messages WHERE topic = ?`
@ -88,7 +95,7 @@ const (
// Schema management queries
const (
currentSchemaVersion = 5
currentSchemaVersion = 6
createSchemaVersionTableQuery = `
CREATE TABLE IF NOT EXISTS schemaVersion (
id INT PRIMARY KEY,
@ -166,6 +173,11 @@ const (
ALTER TABLE messages_new RENAME TO messages;
COMMIT;
`
// 5 -> 6
migrate5To6AlterMessagesTableQuery = `
ALTER TABLE messages ADD COLUMN updated INT NOT NULL DEFAULT (0);
`
)
type messageCache struct {
@ -232,6 +244,7 @@ func (c *messageCache) AddMessage(m *message) error {
insertMessageQuery,
m.ID,
m.Time,
m.Updated,
m.Topic,
m.Message,
m.Title,
@ -250,6 +263,28 @@ func (c *messageCache) AddMessage(m *message) error {
return err
}
func (c *messageCache) UpdateMessage(m *message) error {
if m.Event != messageEvent {
return errUnexpectedMessageType
}
if c.nop {
return nil
}
tags := strings.Join(m.Tags, ",")
_, err := c.db.Exec(
updateMessageQuery,
m.Updated,
m.Message,
m.Title,
m.Priority,
tags,
m.Click,
m.Topic,
m.ID,
)
return err
}
func (c *messageCache) Messages(topic string, since sinceMarker, scheduled bool) ([]*message, error) {
if since.IsNone() {
return make([]*message, 0), nil
@ -296,7 +331,15 @@ func (c *messageCache) messagesSinceID(topic string, since sinceMarker, schedule
if err != nil {
return nil, err
}
return readMessages(rows)
messages, err := readMessages(rows)
if err != nil {
return nil, err
} else if len(messages) == 0 {
return messages, nil
} else if since.IsTime() && messages[0].Updated > since.Time().Unix() {
return messages, nil
}
return messages[1:], nil // Do not include row with ID itself
}
func (c *messageCache) MessagesDue() ([]*message, error) {
@ -393,16 +436,31 @@ func (c *messageCache) AttachmentsExpired() ([]string, error) {
return ids, nil
}
func (c *messageCache) Message(topic, id string) (*message, error) {
rows, err := c.db.Query(selectMessageByIDQuery, topic, id)
if err != nil {
return nil, err
}
messages, err := readMessages(rows)
if err != nil {
return nil, err
} else if len(messages) == 0 {
return nil, errors.New("not found")
}
return messages[0], nil
}
func readMessages(rows *sql.Rows) ([]*message, error) {
defer rows.Close()
messages := make([]*message, 0)
for rows.Next() {
var timestamp, attachmentSize, attachmentExpires int64
var timestamp, updated, attachmentSize, attachmentExpires int64
var priority int
var id, topic, msg, title, tagsStr, click, attachmentName, attachmentType, attachmentURL, attachmentOwner, encoding string
err := rows.Scan(
&id,
&timestamp,
&updated,
&topic,
&msg,
&title,
@ -438,6 +496,7 @@ func readMessages(rows *sql.Rows) ([]*message, error) {
messages = append(messages, &message{
ID: id,
Time: timestamp,
Updated: updated,
Event: messageEvent,
Topic: topic,
Message: msg,
@ -490,6 +549,8 @@ func setupCacheDB(db *sql.DB) error {
return migrateFrom3(db)
} else if schemaVersion == 4 {
return migrateFrom4(db)
} else if schemaVersion == 5 {
return migrateFrom5(db)
}
return fmt.Errorf("unexpected schema version found: %d", schemaVersion)
}
@ -562,5 +623,16 @@ func migrateFrom4(db *sql.DB) error {
if _, err := db.Exec(updateSchemaVersion, 5); err != nil {
return err
}
return migrateFrom5(db)
}
func migrateFrom5(db *sql.DB) error {
log.Print("Migrating cache database schema: from 5 to 6")
if _, err := db.Exec(migrate5To6AlterMessagesTableQuery); err != nil {
return err
}
if _, err := db.Exec(updateSchemaVersion, 6); err != nil {
return err
}
return nil // Update this when a new version is added
}

View File

@ -55,9 +55,10 @@ type handleFunc func(http.ResponseWriter, *http.Request, *visitor) error
var (
// If changed, don't forget to update Android App and auth_sqlite.go
topicRegex = regexp.MustCompile(`^[-_A-Za-z0-9]{1,64}$`) // No /!
topicPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}$`) // Regex must match JS & Android app!
externalTopicPathRegex = regexp.MustCompile(`^/[^/]+\.[^/]+/[-_A-Za-z0-9]{1,64}$`) // Extended topic path, for web-app, e.g. /example.com/mytopic
topicRegex = regexp.MustCompile(`^[-_A-Za-z0-9]{1,64}$`) // No /!
topicPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}$`) // Regex must match JS & Android app!
updateTopicPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}/[A-Za-z0-9]{12}$`) // ID length must match messageIDLength & util.randomStringCharset
externalTopicPathRegex = regexp.MustCompile(`^/[^/]+\.[^/]+/[-_A-Za-z0-9]{1,64}$`) // Extended topic path, for web-app, e.g. /example.com/mytopic
jsonPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/json$`)
ssePathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/sse$`)
rawPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/raw$`)
@ -279,7 +280,7 @@ func (s *Server) handleInternal(w http.ResponseWriter, r *http.Request, v *visit
return s.handleOptions(w, r)
} else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && r.URL.Path == "/" {
return s.limitRequests(s.transformBodyJSON(s.authWrite(s.handlePublish)))(w, r, v)
} else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && topicPathRegex.MatchString(r.URL.Path) {
} else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && (topicPathRegex.MatchString(r.URL.Path) || updateTopicPathRegex.MatchString(r.URL.Path)) {
return s.limitRequests(s.authWrite(s.handlePublish))(w, r, v)
} else if r.Method == http.MethodGet && publishPathRegex.MatchString(r.URL.Path) {
return s.limitRequests(s.authWrite(s.handlePublish))(w, r, v)
@ -390,7 +391,26 @@ func (s *Server) handleFile(w http.ResponseWriter, r *http.Request, v *visitor)
}
func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visitor) error {
t, err := s.topicFromPath(r.URL.Path)
t, messageID, err := s.topicAndMessageIDFromPath(r.URL.Path)
if err != nil {
return err
}
var m *message
update := messageID != ""
if update {
m, err = s.messageCache.Message(t.ID, messageID)
if err != nil {
return errHTTPNotFoundMessageID
}
newUpdated := time.Now().Unix()
if newUpdated <= m.Updated {
return errHTTPTooManyRequestsUpdatingTooQuickly
}
m.Updated = newUpdated
} else {
m = newDefaultMessage(t.ID, "")
}
cache, firebase, email, unifiedpush, err := s.parsePublishParams(r, v, m)
if err != nil {
return err
}
@ -398,11 +418,6 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visito
if err != nil {
return err
}
m := newDefaultMessage(t.ID, "")
cache, firebase, email, unifiedpush, err := s.parsePublishParams(r, v, m)
if err != nil {
return err
}
if err := s.handlePublishBody(r, v, m, body, unifiedpush); err != nil {
return err
}
@ -430,8 +445,14 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visito
}()
}
if cache {
if err := s.messageCache.AddMessage(m); err != nil {
return err
if update {
if err := s.messageCache.UpdateMessage(m); err != nil {
return err
}
} else {
if err := s.messageCache.AddMessage(m); err != nil {
return err
}
}
}
w.Header().Set("Content-Type", "application/json")
@ -447,9 +468,19 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visito
func (s *Server) parsePublishParams(r *http.Request, v *visitor, m *message) (cache bool, firebase bool, email string, unifiedpush bool, err error) {
cache = readBoolParam(r, true, "x-cache", "cache")
if !cache && m.Updated != 0 {
return false, false, "", false, errors.New("message updates must be cached")
}
// TODO more restrictions
firebase = readBoolParam(r, true, "x-firebase", "firebase")
m.Title = readParam(r, "x-title", "title", "t")
m.Click = readParam(r, "x-click", "click")
title := readParam(r, "x-title", "title", "t")
if title != "" {
m.Title = title
}
click := readParam(r, "x-click", "click")
if click != "" {
m.Click = click
}
filename := readParam(r, "x-filename", "filename", "file", "f")
attach := readParam(r, "x-attach", "attach", "a")
if attach != "" || filename != "" {
@ -489,9 +520,11 @@ func (s *Server) parsePublishParams(r *http.Request, v *visitor, m *message) (ca
if messageStr != "" {
m.Message = messageStr
}
m.Priority, err = util.ParsePriority(readParam(r, "x-priority", "priority", "prio", "p"))
priority, err := util.ParsePriority(readParam(r, "x-priority", "priority", "prio", "p"))
if err != nil {
return false, false, "", false, errHTTPBadRequestPriorityInvalid
} else if priority > 0 {
m.Priority = priority
}
tagsStr := readParam(r, "x-tags", "tags", "tag", "ta")
if tagsStr != "" {
@ -870,6 +903,13 @@ func parseSince(r *http.Request, poll bool) (sinceMarker, error) {
return sinceNoMessages, nil
}
// ID/timestamp
parts := strings.Split(since, "/")
if len(parts) == 2 && validMessageID(parts[0]) && validUnixTimestamp(parts[1]) {
t, _ := toUnixTimestamp(parts[1])
return newSince(parts[0], t), nil
}
// ID, timestamp, duration
if validMessageID(since) {
return newSinceID(since), nil
@ -888,16 +928,20 @@ func (s *Server) handleOptions(w http.ResponseWriter, _ *http.Request) error {
return nil
}
func (s *Server) topicFromPath(path string) (*topic, error) {
func (s *Server) topicAndMessageIDFromPath(path string) (*topic, string, error) {
parts := strings.Split(path, "/")
if len(parts) < 2 {
return nil, errHTTPBadRequestTopicInvalid
if len(parts) != 2 && len(parts) != 3 {
return nil, "", errHTTPBadRequestTopicInvalid
}
topics, err := s.topicsFromIDs(parts[1])
if err != nil {
return nil, err
return nil, "", err
}
return topics[0], nil
messageID := ""
if len(parts) == 3 && len(parts[2]) == messageIDLength {
messageID = parts[2]
}
return topics[0], messageID, nil
}
func (s *Server) topicsFromPath(path string) ([]*topic, string, error) {

View File

@ -72,6 +72,7 @@ func toFirebaseMessage(m *message, auther auth.Auther) (*messaging.Message, erro
data = map[string]string{
"id": m.ID,
"time": fmt.Sprintf("%d", m.Time),
"updated": fmt.Sprintf("%d", m.Updated),
"event": m.Event,
"topic": m.Topic,
"priority": fmt.Sprintf("%d", m.Priority),

View File

@ -77,6 +77,7 @@ func TestToFirebaseMessage_Message_Normal_Allowed(t *testing.T) {
require.Equal(t, map[string]string{
"id": m.ID,
"time": fmt.Sprintf("%d", m.Time),
"updated": "0",
"event": "message",
"topic": "mytopic",
"priority": "4",

View File

@ -390,6 +390,69 @@ func TestServer_PublishAndPollSince(t *testing.T) {
require.Equal(t, 40008, toHTTPError(t, response.Body.String()).Code)
}
func TestServer_PublishUpdateAndPollSince(t *testing.T) {
s := newTestServer(t, newTestConfig(t))
// Initial PUT
response := request(t, s, "PUT", "/mytopic?t=atitle&tags=tag1,tag2&prio=high&click=https://google.com&attach=https://heckel.io", "test 1", nil)
message1 := toMessage(t, response.Body.String())
require.Equal(t, int64(0), message1.Updated)
require.Equal(t, "test 1", message1.Message)
require.Equal(t, "atitle", message1.Title)
require.Equal(t, 4, message1.Priority)
require.Equal(t, []string{"tag1", "tag2"}, message1.Tags)
require.Equal(t, "https://google.com", message1.Click)
require.Equal(t, "https://heckel.io", message1.Attachment.URL)
// Update
response = request(t, s, "PUT", "/mytopic/"+message1.ID+"?prio=low", "test 2", nil)
message2 := toMessage(t, response.Body.String())
require.Equal(t, message1.ID, message2.ID)
require.True(t, message2.Updated > message1.Updated)
require.Equal(t, "test 2", message2.Message) // Updated
require.Equal(t, "atitle", message2.Title)
require.Equal(t, 2, message2.Priority) // Updated
require.Equal(t, []string{"tag1", "tag2"}, message2.Tags)
require.Equal(t, "https://google.com", message2.Click)
require.Equal(t, "https://heckel.io", message2.Attachment.URL)
time.Sleep(1100 * time.Millisecond)
// Another update
response = request(t, s, "PUT", "/mytopic/"+message1.ID+"?title=new+title", "test 3", nil)
message3 := toMessage(t, response.Body.String())
require.True(t, message3.Updated > message2.Updated)
require.Equal(t, "test 3", message3.Message) // Updated
require.Equal(t, "new title", message3.Title) // Updated
// Get all messages: Should be only one that was updated
since := "all"
response = request(t, s, "GET", "/mytopic/json?since="+since+"&poll=1", "", nil)
messages := toMessages(t, response.Body.String())
require.Equal(t, 1, len(messages))
require.Equal(t, message1.ID, messages[0].ID)
require.Equal(t, "test 3", messages[0].Message)
// Get all messages since "message ID": Should be zero, since we know this message
since = message1.ID
response = request(t, s, "GET", "/mytopic/json?since="+since+"&poll=1", "", nil)
messages = toMessages(t, response.Body.String())
require.Equal(t, 0, len(messages))
// Get all messages since "message ID" but with an older timestamp: Should be the latest updated message
since = fmt.Sprintf("%s/%d", message1.ID, message2.Updated) // We're missing an update
response = request(t, s, "GET", "/mytopic/json?since="+since+"&poll=1", "", nil)
messages = toMessages(t, response.Body.String())
require.Equal(t, 1, len(messages))
require.Equal(t, "test 3", messages[0].Message)
// Get all messages since "message ID" with the current timestamp: No messages expected
since = fmt.Sprintf("%s/%d", message3.ID, message3.Updated) // We are up-to-date
response = request(t, s, "GET", "/mytopic/json?since="+since+"&poll=1", "", nil)
messages = toMessages(t, response.Body.String())
require.Equal(t, 0, len(messages))
}
func TestServer_PublishViaGET(t *testing.T) {
s := newTestServer(t, newTestConfig(t))

View File

@ -1,8 +1,10 @@
package server
import (
"errors"
"heckel.io/ntfy/util"
"net/http"
"strconv"
"time"
)
@ -20,9 +22,11 @@ const (
// message represents a message published to a topic
type message struct {
ID string `json:"id"` // Random message ID
Time int64 `json:"time"` // Unix time in seconds
Event string `json:"event"` // One of the above
ID string `json:"id"` // Random message ID
Time int64 `json:"time"` // Unix time in seconds
Updated int64 `json:"updated,omitempty"` // Set if updated, unix time in seconds
Deleted int64 `json:"deleted,omitempty"` // Set if deleted, unix time in seconds
Event string `json:"event"` // One of the above
Topic string `json:"topic"`
Priority int `json:"priority,omitempty"`
Tags []string `json:"tags,omitempty"`
@ -30,7 +34,7 @@ type message struct {
Attachment *attachment `json:"attachment,omitempty"`
Title string `json:"title,omitempty"`
Message string `json:"message,omitempty"`
Encoding string `json:"encoding,omitempty"` // empty for raw UTF-8, or "base64" for encoded bytes
Encoding string `json:"encoding,omitempty"` // Empty for raw UTF-8, or "base64" for encoded bytes
}
type attachment struct {
@ -90,11 +94,31 @@ func validMessageID(s string) bool {
return util.ValidRandomString(s, messageIDLength)
}
func validUnixTimestamp(s string) bool {
_, err := toUnixTimestamp(s)
return err == nil
}
func toUnixTimestamp(s string) (int64, error) {
u, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return 0, err
}
if u < 1000000000 || u > 3000000000 { // I know. It's practical. So relax ...
return 0, errors.New("invalid unix date")
}
return u, nil
}
type sinceMarker struct {
time time.Time
id string
}
func newSince(id string, timestamp int64) sinceMarker {
return sinceMarker{time.Unix(timestamp, 0), id}
}
func newSinceTime(timestamp int64) sinceMarker {
return sinceMarker{time.Unix(timestamp, 0), ""}
}
@ -115,6 +139,10 @@ func (t sinceMarker) IsID() bool {
return t.id != ""
}
func (t sinceMarker) IsTime() bool {
return t.time.Unix() > 0
}
func (t sinceMarker) Time() time.Time {
return t.time
}

View File

@ -17,7 +17,7 @@ import (
)
const (
randomStringCharset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
randomStringCharset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" // Update updateTopicPathRegex if changed
)
var (

View File

@ -65,13 +65,17 @@ class SubscriptionManager {
/** Adds notification, or returns false if it already exists */
async addNotification(subscriptionId, notification) {
const exists = await db.notifications.get(notification.id);
if (exists) {
return false;
const existingNotification = await db.notifications.get(notification.id);
if (existingNotification) {
const upToDate = (existingNotification?.updated ?? 0) >= (notification.updated ?? 0);
if (upToDate) {
console.error(`[SubscriptionManager] up to date`, existingNotification, notification);
return false;
}
}
try {
notification.new = 1; // New marker (used for bubble indicator); cannot be boolean; Dexie index limitation
await db.notifications.add({ ...notification, subscriptionId }); // FIXME consider put() for double tab
await db.notifications.put({ ...notification, subscriptionId });
await db.subscriptions.update(subscriptionId, {
last: notification.id
});