Compare commits

...
Sign in to create a new pull request.

6 commits

Author SHA1 Message Date
Philipp Heckel
a327fe802e Make web app work for updated notifications 2022-03-25 11:01:07 -04:00
Philipp Heckel
86baa80ab8 Merge branch 'main' into update-messages 2022-03-24 19:30:27 -04:00
Philipp Heckel
0a77c5296b Add since=$ID/$timestamp parsing logic 2022-03-24 17:05:07 -04:00
Philipp Heckel
b7871b80ab Merge branch 'main' into update-messages 2022-03-24 13:27:04 -04:00
Philipp Heckel
8939173a1e Continued work 2022-03-23 21:51:38 -04:00
Philipp Heckel
8848829dfa WIP: Update messages 2022-03-23 16:39:22 -04:00
9 changed files with 258 additions and 43 deletions

View file

@ -40,14 +40,16 @@ var (
errHTTPBadRequestAttachmentsExpiryBeforeDelivery = &errHTTP{40015, http.StatusBadRequest, "invalid request: attachment expiry before delayed delivery date", "https://ntfy.sh/docs/publish/#scheduled-delivery"} errHTTPBadRequestAttachmentsExpiryBeforeDelivery = &errHTTP{40015, http.StatusBadRequest, "invalid request: attachment expiry before delayed delivery date", "https://ntfy.sh/docs/publish/#scheduled-delivery"}
errHTTPBadRequestWebSocketsUpgradeHeaderMissing = &errHTTP{40016, http.StatusBadRequest, "invalid request: client not using the websocket protocol", "https://ntfy.sh/docs/subscribe/api/#websockets"} errHTTPBadRequestWebSocketsUpgradeHeaderMissing = &errHTTP{40016, http.StatusBadRequest, "invalid request: client not using the websocket protocol", "https://ntfy.sh/docs/subscribe/api/#websockets"}
errHTTPBadRequestJSONInvalid = &errHTTP{40017, http.StatusBadRequest, "invalid request: request body must be message JSON", "https://ntfy.sh/docs/publish/#publish-as-json"} errHTTPBadRequestJSONInvalid = &errHTTP{40017, http.StatusBadRequest, "invalid request: request body must be message JSON", "https://ntfy.sh/docs/publish/#publish-as-json"}
errHTTPNotFound = &errHTTP{40401, http.StatusNotFound, "page not found", ""} errHTTPNotFound = &errHTTP{40401, http.StatusNotFound, "not found", ""}
errHTTPNotFoundMessageID = &errHTTP{40402, http.StatusNotFound, "not found: unable to find message with this ID", "https://ntfy.sh/docs/publish/#updating-messages"} // FIXME LINK
errHTTPUnauthorized = &errHTTP{40101, http.StatusUnauthorized, "unauthorized", "https://ntfy.sh/docs/publish/#authentication"} errHTTPUnauthorized = &errHTTP{40101, http.StatusUnauthorized, "unauthorized", "https://ntfy.sh/docs/publish/#authentication"}
errHTTPForbidden = &errHTTP{40301, http.StatusForbidden, "forbidden", "https://ntfy.sh/docs/publish/#authentication"} errHTTPForbidden = &errHTTP{40301, http.StatusForbidden, "forbidden", "https://ntfy.sh/docs/publish/#authentication"}
errHTTPTooManyRequestsLimitRequests = &errHTTP{42901, http.StatusTooManyRequests, "limit reached: too many requests, please be nice", "https://ntfy.sh/docs/publish/#limitations"} errHTTPTooManyRequestsLimitRequests = &errHTTP{42901, http.StatusTooManyRequests, "limit reached: too many requests, please be nice", "https://ntfy.sh/docs/publish/#limitations"}
errHTTPTooManyRequestsLimitEmails = &errHTTP{42902, http.StatusTooManyRequests, "limit reached: too many emails, please be nice", "https://ntfy.sh/docs/publish/#limitations"} errHTTPTooManyRequestsLimitEmails = &errHTTP{42902, http.StatusTooManyRequests, "limit reached: too many emails, please be nice", "https://ntfy.sh/docs/publish/#limitations"}
errHTTPTooManyRequestsLimitSubscriptions = &errHTTP{42903, http.StatusTooManyRequests, "limit reached: too many active subscriptions, please be nice", "https://ntfy.sh/docs/publish/#limitations"} errHTTPTooManyRequestsLimitSubscriptions = &errHTTP{42903, http.StatusTooManyRequests, "limit reached: too many active subscriptions, please be nice", "https://ntfy.sh/docs/publish/#limitations"}
errHTTPTooManyRequestsLimitTotalTopics = &errHTTP{42904, http.StatusTooManyRequests, "limit reached: the total number of topics on the server has been reached, please contact the admin", "https://ntfy.sh/docs/publish/#limitations"} errHTTPTooManyRequestsLimitTotalTopics = &errHTTP{42904, http.StatusTooManyRequests, "limit reached: the total number of topics on the server has been reached, please contact the admin", "https://ntfy.sh/docs/publish/#limitations"}
errHTTPTooManyRequestsAttachmentBandwidthLimit = &errHTTP{42905, http.StatusTooManyRequests, "too many requests: daily bandwidth limit reached", "https://ntfy.sh/docs/publish/#limitations"} errHTTPTooManyRequestsAttachmentBandwidthLimit = &errHTTP{42905, http.StatusTooManyRequests, "limit reached: daily bandwidth limit reached, please be nice", "https://ntfy.sh/docs/publish/#limitations"}
errHTTPTooManyRequestsUpdatingTooQuickly = &errHTTP{42906, http.StatusTooManyRequests, "limit reached: too many consecutive message updates", "https://ntfy.sh/docs/publish/#updating-messages"} // FIXME LINK
errHTTPInternalError = &errHTTP{50001, http.StatusInternalServerError, "internal server error", ""} errHTTPInternalError = &errHTTP{50001, http.StatusInternalServerError, "internal server error", ""}
errHTTPInternalErrorInvalidFilePath = &errHTTP{50002, http.StatusInternalServerError, "internal server error: invalid file path", ""} errHTTPInternalErrorInvalidFilePath = &errHTTP{50002, http.StatusInternalServerError, "internal server error: invalid file path", ""}
) )

View file

@ -23,6 +23,7 @@ const (
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY AUTOINCREMENT,
mid TEXT NOT NULL, mid TEXT NOT NULL,
time INT NOT NULL, time INT NOT NULL,
updated INT NOT NULL,
topic TEXT NOT NULL, topic TEXT NOT NULL,
message TEXT NOT NULL, message TEXT NOT NULL,
title TEXT NOT NULL, title TEXT NOT NULL,
@ -43,41 +44,47 @@ const (
COMMIT; COMMIT;
` `
insertMessageQuery = ` insertMessageQuery = `
INSERT INTO messages (mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding, published) INSERT INTO messages (mid, time, updated, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding, published)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
` `
updateMessageQuery = `UPDATE messages SET updated = ?, message = ?, title = ?, priority = ?, tags = ?, click = ? WHERE topic = ? AND mid = ?`
pruneMessagesQuery = `DELETE FROM messages WHERE time < ? AND published = 1` pruneMessagesQuery = `DELETE FROM messages WHERE time < ? AND published = 1`
selectRowIDFromMessageID = `SELECT id FROM messages WHERE topic = ? AND mid = ?` selectRowIDFromMessageID = `SELECT id FROM messages WHERE topic = ? AND mid = ?`
selectMessagesSinceTimeQuery = ` selectMessagesSinceTimeQuery = `
SELECT mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding SELECT mid, time, updated, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
FROM messages FROM messages
WHERE topic = ? AND time >= ? AND published = 1 WHERE topic = ? AND time >= ? AND published = 1
ORDER BY time, id ORDER BY time, id
` `
selectMessagesSinceTimeIncludeScheduledQuery = ` selectMessagesSinceTimeIncludeScheduledQuery = `
SELECT mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding SELECT mid, time, updated, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
FROM messages FROM messages
WHERE topic = ? AND time >= ? WHERE topic = ? AND time >= ?
ORDER BY time, id ORDER BY time, id
` `
selectMessagesSinceIDQuery = ` selectMessagesSinceIDQuery = `
SELECT mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding SELECT mid, time, updated, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
FROM messages FROM messages
WHERE topic = ? AND id > ? AND published = 1 WHERE topic = ? AND id >= ? AND published = 1
ORDER BY time, id ORDER BY time, id
` `
selectMessagesSinceIDIncludeScheduledQuery = ` selectMessagesSinceIDIncludeScheduledQuery = `
SELECT mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding SELECT mid, time, updated, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
FROM messages FROM messages
WHERE topic = ? AND (id > ? OR published = 0) WHERE topic = ? AND (id >= ? OR published = 0)
ORDER BY time, id ORDER BY time, id
` `
selectMessagesDueQuery = ` selectMessagesDueQuery = `
SELECT mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding SELECT mid, time, updated, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
FROM messages FROM messages
WHERE time <= ? AND published = 0 WHERE time <= ? AND published = 0
ORDER BY time, id ORDER BY time, id
` `
selectMessageByIDQuery = `
SELECT mid, time, updated, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
FROM messages
WHERE topic = ? AND mid = ?
`
updateMessagePublishedQuery = `UPDATE messages SET published = 1 WHERE mid = ?` updateMessagePublishedQuery = `UPDATE messages SET published = 1 WHERE mid = ?`
selectMessagesCountQuery = `SELECT COUNT(*) FROM messages` selectMessagesCountQuery = `SELECT COUNT(*) FROM messages`
selectMessageCountForTopicQuery = `SELECT COUNT(*) FROM messages WHERE topic = ?` selectMessageCountForTopicQuery = `SELECT COUNT(*) FROM messages WHERE topic = ?`
@ -88,7 +95,7 @@ const (
// Schema management queries // Schema management queries
const ( const (
currentSchemaVersion = 5 currentSchemaVersion = 6
createSchemaVersionTableQuery = ` createSchemaVersionTableQuery = `
CREATE TABLE IF NOT EXISTS schemaVersion ( CREATE TABLE IF NOT EXISTS schemaVersion (
id INT PRIMARY KEY, id INT PRIMARY KEY,
@ -166,6 +173,11 @@ const (
ALTER TABLE messages_new RENAME TO messages; ALTER TABLE messages_new RENAME TO messages;
COMMIT; COMMIT;
` `
// 5 -> 6
migrate5To6AlterMessagesTableQuery = `
ALTER TABLE messages ADD COLUMN updated INT NOT NULL DEFAULT (0);
`
) )
type messageCache struct { type messageCache struct {
@ -232,6 +244,7 @@ func (c *messageCache) AddMessage(m *message) error {
insertMessageQuery, insertMessageQuery,
m.ID, m.ID,
m.Time, m.Time,
m.Updated,
m.Topic, m.Topic,
m.Message, m.Message,
m.Title, m.Title,
@ -250,6 +263,28 @@ func (c *messageCache) AddMessage(m *message) error {
return err return err
} }
func (c *messageCache) UpdateMessage(m *message) error {
if m.Event != messageEvent {
return errUnexpectedMessageType
}
if c.nop {
return nil
}
tags := strings.Join(m.Tags, ",")
_, err := c.db.Exec(
updateMessageQuery,
m.Updated,
m.Message,
m.Title,
m.Priority,
tags,
m.Click,
m.Topic,
m.ID,
)
return err
}
func (c *messageCache) Messages(topic string, since sinceMarker, scheduled bool) ([]*message, error) { func (c *messageCache) Messages(topic string, since sinceMarker, scheduled bool) ([]*message, error) {
if since.IsNone() { if since.IsNone() {
return make([]*message, 0), nil return make([]*message, 0), nil
@ -296,7 +331,15 @@ func (c *messageCache) messagesSinceID(topic string, since sinceMarker, schedule
if err != nil { if err != nil {
return nil, err return nil, err
} }
return readMessages(rows) messages, err := readMessages(rows)
if err != nil {
return nil, err
} else if len(messages) == 0 {
return messages, nil
} else if since.IsTime() && messages[0].Updated > since.Time().Unix() {
return messages, nil
}
return messages[1:], nil // Do not include row with ID itself
} }
func (c *messageCache) MessagesDue() ([]*message, error) { func (c *messageCache) MessagesDue() ([]*message, error) {
@ -393,16 +436,31 @@ func (c *messageCache) AttachmentsExpired() ([]string, error) {
return ids, nil return ids, nil
} }
func (c *messageCache) Message(topic, id string) (*message, error) {
rows, err := c.db.Query(selectMessageByIDQuery, topic, id)
if err != nil {
return nil, err
}
messages, err := readMessages(rows)
if err != nil {
return nil, err
} else if len(messages) == 0 {
return nil, errors.New("not found")
}
return messages[0], nil
}
func readMessages(rows *sql.Rows) ([]*message, error) { func readMessages(rows *sql.Rows) ([]*message, error) {
defer rows.Close() defer rows.Close()
messages := make([]*message, 0) messages := make([]*message, 0)
for rows.Next() { for rows.Next() {
var timestamp, attachmentSize, attachmentExpires int64 var timestamp, updated, attachmentSize, attachmentExpires int64
var priority int var priority int
var id, topic, msg, title, tagsStr, click, attachmentName, attachmentType, attachmentURL, attachmentOwner, encoding string var id, topic, msg, title, tagsStr, click, attachmentName, attachmentType, attachmentURL, attachmentOwner, encoding string
err := rows.Scan( err := rows.Scan(
&id, &id,
&timestamp, &timestamp,
&updated,
&topic, &topic,
&msg, &msg,
&title, &title,
@ -438,6 +496,7 @@ func readMessages(rows *sql.Rows) ([]*message, error) {
messages = append(messages, &message{ messages = append(messages, &message{
ID: id, ID: id,
Time: timestamp, Time: timestamp,
Updated: updated,
Event: messageEvent, Event: messageEvent,
Topic: topic, Topic: topic,
Message: msg, Message: msg,
@ -490,6 +549,8 @@ func setupCacheDB(db *sql.DB) error {
return migrateFrom3(db) return migrateFrom3(db)
} else if schemaVersion == 4 { } else if schemaVersion == 4 {
return migrateFrom4(db) return migrateFrom4(db)
} else if schemaVersion == 5 {
return migrateFrom5(db)
} }
return fmt.Errorf("unexpected schema version found: %d", schemaVersion) return fmt.Errorf("unexpected schema version found: %d", schemaVersion)
} }
@ -562,5 +623,16 @@ func migrateFrom4(db *sql.DB) error {
if _, err := db.Exec(updateSchemaVersion, 5); err != nil { if _, err := db.Exec(updateSchemaVersion, 5); err != nil {
return err return err
} }
return migrateFrom5(db)
}
func migrateFrom5(db *sql.DB) error {
log.Print("Migrating cache database schema: from 5 to 6")
if _, err := db.Exec(migrate5To6AlterMessagesTableQuery); err != nil {
return err
}
if _, err := db.Exec(updateSchemaVersion, 6); err != nil {
return err
}
return nil // Update this when a new version is added return nil // Update this when a new version is added
} }

View file

@ -55,9 +55,10 @@ type handleFunc func(http.ResponseWriter, *http.Request, *visitor) error
var ( var (
// If changed, don't forget to update Android App and auth_sqlite.go // If changed, don't forget to update Android App and auth_sqlite.go
topicRegex = regexp.MustCompile(`^[-_A-Za-z0-9]{1,64}$`) // No /! topicRegex = regexp.MustCompile(`^[-_A-Za-z0-9]{1,64}$`) // No /!
topicPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}$`) // Regex must match JS & Android app! topicPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}$`) // Regex must match JS & Android app!
externalTopicPathRegex = regexp.MustCompile(`^/[^/]+\.[^/]+/[-_A-Za-z0-9]{1,64}$`) // Extended topic path, for web-app, e.g. /example.com/mytopic updateTopicPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}/[A-Za-z0-9]{12}$`) // ID length must match messageIDLength & util.randomStringCharset
externalTopicPathRegex = regexp.MustCompile(`^/[^/]+\.[^/]+/[-_A-Za-z0-9]{1,64}$`) // Extended topic path, for web-app, e.g. /example.com/mytopic
jsonPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/json$`) jsonPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/json$`)
ssePathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/sse$`) ssePathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/sse$`)
rawPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/raw$`) rawPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/raw$`)
@ -279,7 +280,7 @@ func (s *Server) handleInternal(w http.ResponseWriter, r *http.Request, v *visit
return s.handleOptions(w, r) return s.handleOptions(w, r)
} else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && r.URL.Path == "/" { } else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && r.URL.Path == "/" {
return s.limitRequests(s.transformBodyJSON(s.authWrite(s.handlePublish)))(w, r, v) return s.limitRequests(s.transformBodyJSON(s.authWrite(s.handlePublish)))(w, r, v)
} else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && topicPathRegex.MatchString(r.URL.Path) { } else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && (topicPathRegex.MatchString(r.URL.Path) || updateTopicPathRegex.MatchString(r.URL.Path)) {
return s.limitRequests(s.authWrite(s.handlePublish))(w, r, v) return s.limitRequests(s.authWrite(s.handlePublish))(w, r, v)
} else if r.Method == http.MethodGet && publishPathRegex.MatchString(r.URL.Path) { } else if r.Method == http.MethodGet && publishPathRegex.MatchString(r.URL.Path) {
return s.limitRequests(s.authWrite(s.handlePublish))(w, r, v) return s.limitRequests(s.authWrite(s.handlePublish))(w, r, v)
@ -390,7 +391,26 @@ func (s *Server) handleFile(w http.ResponseWriter, r *http.Request, v *visitor)
} }
func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visitor) error { func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visitor) error {
t, err := s.topicFromPath(r.URL.Path) t, messageID, err := s.topicAndMessageIDFromPath(r.URL.Path)
if err != nil {
return err
}
var m *message
update := messageID != ""
if update {
m, err = s.messageCache.Message(t.ID, messageID)
if err != nil {
return errHTTPNotFoundMessageID
}
newUpdated := time.Now().Unix()
if newUpdated <= m.Updated {
return errHTTPTooManyRequestsUpdatingTooQuickly
}
m.Updated = newUpdated
} else {
m = newDefaultMessage(t.ID, "")
}
cache, firebase, email, unifiedpush, err := s.parsePublishParams(r, v, m)
if err != nil { if err != nil {
return err return err
} }
@ -398,11 +418,6 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visito
if err != nil { if err != nil {
return err return err
} }
m := newDefaultMessage(t.ID, "")
cache, firebase, email, unifiedpush, err := s.parsePublishParams(r, v, m)
if err != nil {
return err
}
if err := s.handlePublishBody(r, v, m, body, unifiedpush); err != nil { if err := s.handlePublishBody(r, v, m, body, unifiedpush); err != nil {
return err return err
} }
@ -430,8 +445,14 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visito
}() }()
} }
if cache { if cache {
if err := s.messageCache.AddMessage(m); err != nil { if update {
return err if err := s.messageCache.UpdateMessage(m); err != nil {
return err
}
} else {
if err := s.messageCache.AddMessage(m); err != nil {
return err
}
} }
} }
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
@ -447,9 +468,19 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visito
func (s *Server) parsePublishParams(r *http.Request, v *visitor, m *message) (cache bool, firebase bool, email string, unifiedpush bool, err error) { func (s *Server) parsePublishParams(r *http.Request, v *visitor, m *message) (cache bool, firebase bool, email string, unifiedpush bool, err error) {
cache = readBoolParam(r, true, "x-cache", "cache") cache = readBoolParam(r, true, "x-cache", "cache")
if !cache && m.Updated != 0 {
return false, false, "", false, errors.New("message updates must be cached")
}
// TODO more restrictions
firebase = readBoolParam(r, true, "x-firebase", "firebase") firebase = readBoolParam(r, true, "x-firebase", "firebase")
m.Title = readParam(r, "x-title", "title", "t") title := readParam(r, "x-title", "title", "t")
m.Click = readParam(r, "x-click", "click") if title != "" {
m.Title = title
}
click := readParam(r, "x-click", "click")
if click != "" {
m.Click = click
}
filename := readParam(r, "x-filename", "filename", "file", "f") filename := readParam(r, "x-filename", "filename", "file", "f")
attach := readParam(r, "x-attach", "attach", "a") attach := readParam(r, "x-attach", "attach", "a")
if attach != "" || filename != "" { if attach != "" || filename != "" {
@ -489,9 +520,11 @@ func (s *Server) parsePublishParams(r *http.Request, v *visitor, m *message) (ca
if messageStr != "" { if messageStr != "" {
m.Message = messageStr m.Message = messageStr
} }
m.Priority, err = util.ParsePriority(readParam(r, "x-priority", "priority", "prio", "p")) priority, err := util.ParsePriority(readParam(r, "x-priority", "priority", "prio", "p"))
if err != nil { if err != nil {
return false, false, "", false, errHTTPBadRequestPriorityInvalid return false, false, "", false, errHTTPBadRequestPriorityInvalid
} else if priority > 0 {
m.Priority = priority
} }
tagsStr := readParam(r, "x-tags", "tags", "tag", "ta") tagsStr := readParam(r, "x-tags", "tags", "tag", "ta")
if tagsStr != "" { if tagsStr != "" {
@ -870,6 +903,13 @@ func parseSince(r *http.Request, poll bool) (sinceMarker, error) {
return sinceNoMessages, nil return sinceNoMessages, nil
} }
// ID/timestamp
parts := strings.Split(since, "/")
if len(parts) == 2 && validMessageID(parts[0]) && validUnixTimestamp(parts[1]) {
t, _ := toUnixTimestamp(parts[1])
return newSince(parts[0], t), nil
}
// ID, timestamp, duration // ID, timestamp, duration
if validMessageID(since) { if validMessageID(since) {
return newSinceID(since), nil return newSinceID(since), nil
@ -888,16 +928,20 @@ func (s *Server) handleOptions(w http.ResponseWriter, _ *http.Request) error {
return nil return nil
} }
func (s *Server) topicFromPath(path string) (*topic, error) { func (s *Server) topicAndMessageIDFromPath(path string) (*topic, string, error) {
parts := strings.Split(path, "/") parts := strings.Split(path, "/")
if len(parts) < 2 { if len(parts) != 2 && len(parts) != 3 {
return nil, errHTTPBadRequestTopicInvalid return nil, "", errHTTPBadRequestTopicInvalid
} }
topics, err := s.topicsFromIDs(parts[1]) topics, err := s.topicsFromIDs(parts[1])
if err != nil { if err != nil {
return nil, err return nil, "", err
} }
return topics[0], nil messageID := ""
if len(parts) == 3 && len(parts[2]) == messageIDLength {
messageID = parts[2]
}
return topics[0], messageID, nil
} }
func (s *Server) topicsFromPath(path string) ([]*topic, string, error) { func (s *Server) topicsFromPath(path string) ([]*topic, string, error) {

View file

@ -72,6 +72,7 @@ func toFirebaseMessage(m *message, auther auth.Auther) (*messaging.Message, erro
data = map[string]string{ data = map[string]string{
"id": m.ID, "id": m.ID,
"time": fmt.Sprintf("%d", m.Time), "time": fmt.Sprintf("%d", m.Time),
"updated": fmt.Sprintf("%d", m.Updated),
"event": m.Event, "event": m.Event,
"topic": m.Topic, "topic": m.Topic,
"priority": fmt.Sprintf("%d", m.Priority), "priority": fmt.Sprintf("%d", m.Priority),

View file

@ -77,6 +77,7 @@ func TestToFirebaseMessage_Message_Normal_Allowed(t *testing.T) {
require.Equal(t, map[string]string{ require.Equal(t, map[string]string{
"id": m.ID, "id": m.ID,
"time": fmt.Sprintf("%d", m.Time), "time": fmt.Sprintf("%d", m.Time),
"updated": "0",
"event": "message", "event": "message",
"topic": "mytopic", "topic": "mytopic",
"priority": "4", "priority": "4",

View file

@ -390,6 +390,69 @@ func TestServer_PublishAndPollSince(t *testing.T) {
require.Equal(t, 40008, toHTTPError(t, response.Body.String()).Code) require.Equal(t, 40008, toHTTPError(t, response.Body.String()).Code)
} }
func TestServer_PublishUpdateAndPollSince(t *testing.T) {
s := newTestServer(t, newTestConfig(t))
// Initial PUT
response := request(t, s, "PUT", "/mytopic?t=atitle&tags=tag1,tag2&prio=high&click=https://google.com&attach=https://heckel.io", "test 1", nil)
message1 := toMessage(t, response.Body.String())
require.Equal(t, int64(0), message1.Updated)
require.Equal(t, "test 1", message1.Message)
require.Equal(t, "atitle", message1.Title)
require.Equal(t, 4, message1.Priority)
require.Equal(t, []string{"tag1", "tag2"}, message1.Tags)
require.Equal(t, "https://google.com", message1.Click)
require.Equal(t, "https://heckel.io", message1.Attachment.URL)
// Update
response = request(t, s, "PUT", "/mytopic/"+message1.ID+"?prio=low", "test 2", nil)
message2 := toMessage(t, response.Body.String())
require.Equal(t, message1.ID, message2.ID)
require.True(t, message2.Updated > message1.Updated)
require.Equal(t, "test 2", message2.Message) // Updated
require.Equal(t, "atitle", message2.Title)
require.Equal(t, 2, message2.Priority) // Updated
require.Equal(t, []string{"tag1", "tag2"}, message2.Tags)
require.Equal(t, "https://google.com", message2.Click)
require.Equal(t, "https://heckel.io", message2.Attachment.URL)
time.Sleep(1100 * time.Millisecond)
// Another update
response = request(t, s, "PUT", "/mytopic/"+message1.ID+"?title=new+title", "test 3", nil)
message3 := toMessage(t, response.Body.String())
require.True(t, message3.Updated > message2.Updated)
require.Equal(t, "test 3", message3.Message) // Updated
require.Equal(t, "new title", message3.Title) // Updated
// Get all messages: Should be only one that was updated
since := "all"
response = request(t, s, "GET", "/mytopic/json?since="+since+"&poll=1", "", nil)
messages := toMessages(t, response.Body.String())
require.Equal(t, 1, len(messages))
require.Equal(t, message1.ID, messages[0].ID)
require.Equal(t, "test 3", messages[0].Message)
// Get all messages since "message ID": Should be zero, since we know this message
since = message1.ID
response = request(t, s, "GET", "/mytopic/json?since="+since+"&poll=1", "", nil)
messages = toMessages(t, response.Body.String())
require.Equal(t, 0, len(messages))
// Get all messages since "message ID" but with an older timestamp: Should be the latest updated message
since = fmt.Sprintf("%s/%d", message1.ID, message2.Updated) // We're missing an update
response = request(t, s, "GET", "/mytopic/json?since="+since+"&poll=1", "", nil)
messages = toMessages(t, response.Body.String())
require.Equal(t, 1, len(messages))
require.Equal(t, "test 3", messages[0].Message)
// Get all messages since "message ID" with the current timestamp: No messages expected
since = fmt.Sprintf("%s/%d", message3.ID, message3.Updated) // We are up-to-date
response = request(t, s, "GET", "/mytopic/json?since="+since+"&poll=1", "", nil)
messages = toMessages(t, response.Body.String())
require.Equal(t, 0, len(messages))
}
func TestServer_PublishViaGET(t *testing.T) { func TestServer_PublishViaGET(t *testing.T) {
s := newTestServer(t, newTestConfig(t)) s := newTestServer(t, newTestConfig(t))

View file

@ -1,8 +1,10 @@
package server package server
import ( import (
"errors"
"heckel.io/ntfy/util" "heckel.io/ntfy/util"
"net/http" "net/http"
"strconv"
"time" "time"
) )
@ -20,9 +22,11 @@ const (
// message represents a message published to a topic // message represents a message published to a topic
type message struct { type message struct {
ID string `json:"id"` // Random message ID ID string `json:"id"` // Random message ID
Time int64 `json:"time"` // Unix time in seconds Time int64 `json:"time"` // Unix time in seconds
Event string `json:"event"` // One of the above Updated int64 `json:"updated,omitempty"` // Set if updated, unix time in seconds
Deleted int64 `json:"deleted,omitempty"` // Set if deleted, unix time in seconds
Event string `json:"event"` // One of the above
Topic string `json:"topic"` Topic string `json:"topic"`
Priority int `json:"priority,omitempty"` Priority int `json:"priority,omitempty"`
Tags []string `json:"tags,omitempty"` Tags []string `json:"tags,omitempty"`
@ -30,7 +34,7 @@ type message struct {
Attachment *attachment `json:"attachment,omitempty"` Attachment *attachment `json:"attachment,omitempty"`
Title string `json:"title,omitempty"` Title string `json:"title,omitempty"`
Message string `json:"message,omitempty"` Message string `json:"message,omitempty"`
Encoding string `json:"encoding,omitempty"` // empty for raw UTF-8, or "base64" for encoded bytes Encoding string `json:"encoding,omitempty"` // Empty for raw UTF-8, or "base64" for encoded bytes
} }
type attachment struct { type attachment struct {
@ -90,11 +94,31 @@ func validMessageID(s string) bool {
return util.ValidRandomString(s, messageIDLength) return util.ValidRandomString(s, messageIDLength)
} }
func validUnixTimestamp(s string) bool {
_, err := toUnixTimestamp(s)
return err == nil
}
func toUnixTimestamp(s string) (int64, error) {
u, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return 0, err
}
if u < 1000000000 || u > 3000000000 { // I know. It's practical. So relax ...
return 0, errors.New("invalid unix date")
}
return u, nil
}
type sinceMarker struct { type sinceMarker struct {
time time.Time time time.Time
id string id string
} }
func newSince(id string, timestamp int64) sinceMarker {
return sinceMarker{time.Unix(timestamp, 0), id}
}
func newSinceTime(timestamp int64) sinceMarker { func newSinceTime(timestamp int64) sinceMarker {
return sinceMarker{time.Unix(timestamp, 0), ""} return sinceMarker{time.Unix(timestamp, 0), ""}
} }
@ -115,6 +139,10 @@ func (t sinceMarker) IsID() bool {
return t.id != "" return t.id != ""
} }
func (t sinceMarker) IsTime() bool {
return t.time.Unix() > 0
}
func (t sinceMarker) Time() time.Time { func (t sinceMarker) Time() time.Time {
return t.time return t.time
} }

View file

@ -17,7 +17,7 @@ import (
) )
const ( const (
randomStringCharset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" randomStringCharset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" // Update updateTopicPathRegex if changed
) )
var ( var (

View file

@ -65,13 +65,17 @@ class SubscriptionManager {
/** Adds notification, or returns false if it already exists */ /** Adds notification, or returns false if it already exists */
async addNotification(subscriptionId, notification) { async addNotification(subscriptionId, notification) {
const exists = await db.notifications.get(notification.id); const existingNotification = await db.notifications.get(notification.id);
if (exists) { if (existingNotification) {
return false; const upToDate = (existingNotification?.updated ?? 0) >= (notification.updated ?? 0);
if (upToDate) {
console.error(`[SubscriptionManager] up to date`, existingNotification, notification);
return false;
}
} }
try { try {
notification.new = 1; // New marker (used for bubble indicator); cannot be boolean; Dexie index limitation notification.new = 1; // New marker (used for bubble indicator); cannot be boolean; Dexie index limitation
await db.notifications.add({ ...notification, subscriptionId }); // FIXME consider put() for double tab await db.notifications.put({ ...notification, subscriptionId });
await db.subscriptions.update(subscriptionId, { await db.subscriptions.update(subscriptionId, {
last: notification.id last: notification.id
}); });