Compare commits
6 Commits
main
...
update-mes
Author | SHA1 | Date |
---|---|---|
|
a327fe802e | |
|
86baa80ab8 | |
|
0a77c5296b | |
|
b7871b80ab | |
|
8939173a1e | |
|
8848829dfa |
|
@ -40,14 +40,16 @@ var (
|
|||
errHTTPBadRequestAttachmentsExpiryBeforeDelivery = &errHTTP{40015, http.StatusBadRequest, "invalid request: attachment expiry before delayed delivery date", "https://ntfy.sh/docs/publish/#scheduled-delivery"}
|
||||
errHTTPBadRequestWebSocketsUpgradeHeaderMissing = &errHTTP{40016, http.StatusBadRequest, "invalid request: client not using the websocket protocol", "https://ntfy.sh/docs/subscribe/api/#websockets"}
|
||||
errHTTPBadRequestJSONInvalid = &errHTTP{40017, http.StatusBadRequest, "invalid request: request body must be message JSON", "https://ntfy.sh/docs/publish/#publish-as-json"}
|
||||
errHTTPNotFound = &errHTTP{40401, http.StatusNotFound, "page not found", ""}
|
||||
errHTTPNotFound = &errHTTP{40401, http.StatusNotFound, "not found", ""}
|
||||
errHTTPNotFoundMessageID = &errHTTP{40402, http.StatusNotFound, "not found: unable to find message with this ID", "https://ntfy.sh/docs/publish/#updating-messages"} // FIXME LINK
|
||||
errHTTPUnauthorized = &errHTTP{40101, http.StatusUnauthorized, "unauthorized", "https://ntfy.sh/docs/publish/#authentication"}
|
||||
errHTTPForbidden = &errHTTP{40301, http.StatusForbidden, "forbidden", "https://ntfy.sh/docs/publish/#authentication"}
|
||||
errHTTPTooManyRequestsLimitRequests = &errHTTP{42901, http.StatusTooManyRequests, "limit reached: too many requests, please be nice", "https://ntfy.sh/docs/publish/#limitations"}
|
||||
errHTTPTooManyRequestsLimitEmails = &errHTTP{42902, http.StatusTooManyRequests, "limit reached: too many emails, please be nice", "https://ntfy.sh/docs/publish/#limitations"}
|
||||
errHTTPTooManyRequestsLimitSubscriptions = &errHTTP{42903, http.StatusTooManyRequests, "limit reached: too many active subscriptions, please be nice", "https://ntfy.sh/docs/publish/#limitations"}
|
||||
errHTTPTooManyRequestsLimitTotalTopics = &errHTTP{42904, http.StatusTooManyRequests, "limit reached: the total number of topics on the server has been reached, please contact the admin", "https://ntfy.sh/docs/publish/#limitations"}
|
||||
errHTTPTooManyRequestsAttachmentBandwidthLimit = &errHTTP{42905, http.StatusTooManyRequests, "too many requests: daily bandwidth limit reached", "https://ntfy.sh/docs/publish/#limitations"}
|
||||
errHTTPTooManyRequestsAttachmentBandwidthLimit = &errHTTP{42905, http.StatusTooManyRequests, "limit reached: daily bandwidth limit reached, please be nice", "https://ntfy.sh/docs/publish/#limitations"}
|
||||
errHTTPTooManyRequestsUpdatingTooQuickly = &errHTTP{42906, http.StatusTooManyRequests, "limit reached: too many consecutive message updates", "https://ntfy.sh/docs/publish/#updating-messages"} // FIXME LINK
|
||||
errHTTPInternalError = &errHTTP{50001, http.StatusInternalServerError, "internal server error", ""}
|
||||
errHTTPInternalErrorInvalidFilePath = &errHTTP{50002, http.StatusInternalServerError, "internal server error: invalid file path", ""}
|
||||
)
|
||||
|
|
|
@ -23,6 +23,7 @@ const (
|
|||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
mid TEXT NOT NULL,
|
||||
time INT NOT NULL,
|
||||
updated INT NOT NULL,
|
||||
topic TEXT NOT NULL,
|
||||
message TEXT NOT NULL,
|
||||
title TEXT NOT NULL,
|
||||
|
@ -43,41 +44,47 @@ const (
|
|||
COMMIT;
|
||||
`
|
||||
insertMessageQuery = `
|
||||
INSERT INTO messages (mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding, published)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
INSERT INTO messages (mid, time, updated, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding, published)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
`
|
||||
updateMessageQuery = `UPDATE messages SET updated = ?, message = ?, title = ?, priority = ?, tags = ?, click = ? WHERE topic = ? AND mid = ?`
|
||||
pruneMessagesQuery = `DELETE FROM messages WHERE time < ? AND published = 1`
|
||||
selectRowIDFromMessageID = `SELECT id FROM messages WHERE topic = ? AND mid = ?`
|
||||
selectMessagesSinceTimeQuery = `
|
||||
SELECT mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
|
||||
SELECT mid, time, updated, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
|
||||
FROM messages
|
||||
WHERE topic = ? AND time >= ? AND published = 1
|
||||
ORDER BY time, id
|
||||
`
|
||||
selectMessagesSinceTimeIncludeScheduledQuery = `
|
||||
SELECT mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
|
||||
SELECT mid, time, updated, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
|
||||
FROM messages
|
||||
WHERE topic = ? AND time >= ?
|
||||
ORDER BY time, id
|
||||
`
|
||||
selectMessagesSinceIDQuery = `
|
||||
SELECT mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
|
||||
SELECT mid, time, updated, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
|
||||
FROM messages
|
||||
WHERE topic = ? AND id > ? AND published = 1
|
||||
WHERE topic = ? AND id >= ? AND published = 1
|
||||
ORDER BY time, id
|
||||
`
|
||||
selectMessagesSinceIDIncludeScheduledQuery = `
|
||||
SELECT mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
|
||||
SELECT mid, time, updated, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
|
||||
FROM messages
|
||||
WHERE topic = ? AND (id > ? OR published = 0)
|
||||
WHERE topic = ? AND (id >= ? OR published = 0)
|
||||
ORDER BY time, id
|
||||
`
|
||||
selectMessagesDueQuery = `
|
||||
SELECT mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
|
||||
SELECT mid, time, updated, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
|
||||
FROM messages
|
||||
WHERE time <= ? AND published = 0
|
||||
ORDER BY time, id
|
||||
`
|
||||
selectMessageByIDQuery = `
|
||||
SELECT mid, time, updated, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
|
||||
FROM messages
|
||||
WHERE topic = ? AND mid = ?
|
||||
`
|
||||
updateMessagePublishedQuery = `UPDATE messages SET published = 1 WHERE mid = ?`
|
||||
selectMessagesCountQuery = `SELECT COUNT(*) FROM messages`
|
||||
selectMessageCountForTopicQuery = `SELECT COUNT(*) FROM messages WHERE topic = ?`
|
||||
|
@ -88,7 +95,7 @@ const (
|
|||
|
||||
// Schema management queries
|
||||
const (
|
||||
currentSchemaVersion = 5
|
||||
currentSchemaVersion = 6
|
||||
createSchemaVersionTableQuery = `
|
||||
CREATE TABLE IF NOT EXISTS schemaVersion (
|
||||
id INT PRIMARY KEY,
|
||||
|
@ -166,6 +173,11 @@ const (
|
|||
ALTER TABLE messages_new RENAME TO messages;
|
||||
COMMIT;
|
||||
`
|
||||
|
||||
// 5 -> 6
|
||||
migrate5To6AlterMessagesTableQuery = `
|
||||
ALTER TABLE messages ADD COLUMN updated INT NOT NULL DEFAULT (0);
|
||||
`
|
||||
)
|
||||
|
||||
type messageCache struct {
|
||||
|
@ -232,6 +244,7 @@ func (c *messageCache) AddMessage(m *message) error {
|
|||
insertMessageQuery,
|
||||
m.ID,
|
||||
m.Time,
|
||||
m.Updated,
|
||||
m.Topic,
|
||||
m.Message,
|
||||
m.Title,
|
||||
|
@ -250,6 +263,28 @@ func (c *messageCache) AddMessage(m *message) error {
|
|||
return err
|
||||
}
|
||||
|
||||
func (c *messageCache) UpdateMessage(m *message) error {
|
||||
if m.Event != messageEvent {
|
||||
return errUnexpectedMessageType
|
||||
}
|
||||
if c.nop {
|
||||
return nil
|
||||
}
|
||||
tags := strings.Join(m.Tags, ",")
|
||||
_, err := c.db.Exec(
|
||||
updateMessageQuery,
|
||||
m.Updated,
|
||||
m.Message,
|
||||
m.Title,
|
||||
m.Priority,
|
||||
tags,
|
||||
m.Click,
|
||||
m.Topic,
|
||||
m.ID,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *messageCache) Messages(topic string, since sinceMarker, scheduled bool) ([]*message, error) {
|
||||
if since.IsNone() {
|
||||
return make([]*message, 0), nil
|
||||
|
@ -296,7 +331,15 @@ func (c *messageCache) messagesSinceID(topic string, since sinceMarker, schedule
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return readMessages(rows)
|
||||
messages, err := readMessages(rows)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if len(messages) == 0 {
|
||||
return messages, nil
|
||||
} else if since.IsTime() && messages[0].Updated > since.Time().Unix() {
|
||||
return messages, nil
|
||||
}
|
||||
return messages[1:], nil // Do not include row with ID itself
|
||||
}
|
||||
|
||||
func (c *messageCache) MessagesDue() ([]*message, error) {
|
||||
|
@ -393,16 +436,31 @@ func (c *messageCache) AttachmentsExpired() ([]string, error) {
|
|||
return ids, nil
|
||||
}
|
||||
|
||||
func (c *messageCache) Message(topic, id string) (*message, error) {
|
||||
rows, err := c.db.Query(selectMessageByIDQuery, topic, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
messages, err := readMessages(rows)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if len(messages) == 0 {
|
||||
return nil, errors.New("not found")
|
||||
}
|
||||
return messages[0], nil
|
||||
}
|
||||
|
||||
func readMessages(rows *sql.Rows) ([]*message, error) {
|
||||
defer rows.Close()
|
||||
messages := make([]*message, 0)
|
||||
for rows.Next() {
|
||||
var timestamp, attachmentSize, attachmentExpires int64
|
||||
var timestamp, updated, attachmentSize, attachmentExpires int64
|
||||
var priority int
|
||||
var id, topic, msg, title, tagsStr, click, attachmentName, attachmentType, attachmentURL, attachmentOwner, encoding string
|
||||
err := rows.Scan(
|
||||
&id,
|
||||
×tamp,
|
||||
&updated,
|
||||
&topic,
|
||||
&msg,
|
||||
&title,
|
||||
|
@ -438,6 +496,7 @@ func readMessages(rows *sql.Rows) ([]*message, error) {
|
|||
messages = append(messages, &message{
|
||||
ID: id,
|
||||
Time: timestamp,
|
||||
Updated: updated,
|
||||
Event: messageEvent,
|
||||
Topic: topic,
|
||||
Message: msg,
|
||||
|
@ -490,6 +549,8 @@ func setupCacheDB(db *sql.DB) error {
|
|||
return migrateFrom3(db)
|
||||
} else if schemaVersion == 4 {
|
||||
return migrateFrom4(db)
|
||||
} else if schemaVersion == 5 {
|
||||
return migrateFrom5(db)
|
||||
}
|
||||
return fmt.Errorf("unexpected schema version found: %d", schemaVersion)
|
||||
}
|
||||
|
@ -562,5 +623,16 @@ func migrateFrom4(db *sql.DB) error {
|
|||
if _, err := db.Exec(updateSchemaVersion, 5); err != nil {
|
||||
return err
|
||||
}
|
||||
return migrateFrom5(db)
|
||||
}
|
||||
|
||||
func migrateFrom5(db *sql.DB) error {
|
||||
log.Print("Migrating cache database schema: from 5 to 6")
|
||||
if _, err := db.Exec(migrate5To6AlterMessagesTableQuery); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := db.Exec(updateSchemaVersion, 6); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil // Update this when a new version is added
|
||||
}
|
||||
|
|
|
@ -55,9 +55,10 @@ type handleFunc func(http.ResponseWriter, *http.Request, *visitor) error
|
|||
|
||||
var (
|
||||
// If changed, don't forget to update Android App and auth_sqlite.go
|
||||
topicRegex = regexp.MustCompile(`^[-_A-Za-z0-9]{1,64}$`) // No /!
|
||||
topicPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}$`) // Regex must match JS & Android app!
|
||||
externalTopicPathRegex = regexp.MustCompile(`^/[^/]+\.[^/]+/[-_A-Za-z0-9]{1,64}$`) // Extended topic path, for web-app, e.g. /example.com/mytopic
|
||||
topicRegex = regexp.MustCompile(`^[-_A-Za-z0-9]{1,64}$`) // No /!
|
||||
topicPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}$`) // Regex must match JS & Android app!
|
||||
updateTopicPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}/[A-Za-z0-9]{12}$`) // ID length must match messageIDLength & util.randomStringCharset
|
||||
externalTopicPathRegex = regexp.MustCompile(`^/[^/]+\.[^/]+/[-_A-Za-z0-9]{1,64}$`) // Extended topic path, for web-app, e.g. /example.com/mytopic
|
||||
jsonPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/json$`)
|
||||
ssePathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/sse$`)
|
||||
rawPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/raw$`)
|
||||
|
@ -279,7 +280,7 @@ func (s *Server) handleInternal(w http.ResponseWriter, r *http.Request, v *visit
|
|||
return s.handleOptions(w, r)
|
||||
} else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && r.URL.Path == "/" {
|
||||
return s.limitRequests(s.transformBodyJSON(s.authWrite(s.handlePublish)))(w, r, v)
|
||||
} else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && topicPathRegex.MatchString(r.URL.Path) {
|
||||
} else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && (topicPathRegex.MatchString(r.URL.Path) || updateTopicPathRegex.MatchString(r.URL.Path)) {
|
||||
return s.limitRequests(s.authWrite(s.handlePublish))(w, r, v)
|
||||
} else if r.Method == http.MethodGet && publishPathRegex.MatchString(r.URL.Path) {
|
||||
return s.limitRequests(s.authWrite(s.handlePublish))(w, r, v)
|
||||
|
@ -390,7 +391,26 @@ func (s *Server) handleFile(w http.ResponseWriter, r *http.Request, v *visitor)
|
|||
}
|
||||
|
||||
func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visitor) error {
|
||||
t, err := s.topicFromPath(r.URL.Path)
|
||||
t, messageID, err := s.topicAndMessageIDFromPath(r.URL.Path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var m *message
|
||||
update := messageID != ""
|
||||
if update {
|
||||
m, err = s.messageCache.Message(t.ID, messageID)
|
||||
if err != nil {
|
||||
return errHTTPNotFoundMessageID
|
||||
}
|
||||
newUpdated := time.Now().Unix()
|
||||
if newUpdated <= m.Updated {
|
||||
return errHTTPTooManyRequestsUpdatingTooQuickly
|
||||
}
|
||||
m.Updated = newUpdated
|
||||
} else {
|
||||
m = newDefaultMessage(t.ID, "")
|
||||
}
|
||||
cache, firebase, email, unifiedpush, err := s.parsePublishParams(r, v, m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -398,11 +418,6 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visito
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m := newDefaultMessage(t.ID, "")
|
||||
cache, firebase, email, unifiedpush, err := s.parsePublishParams(r, v, m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.handlePublishBody(r, v, m, body, unifiedpush); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -430,8 +445,14 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visito
|
|||
}()
|
||||
}
|
||||
if cache {
|
||||
if err := s.messageCache.AddMessage(m); err != nil {
|
||||
return err
|
||||
if update {
|
||||
if err := s.messageCache.UpdateMessage(m); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if err := s.messageCache.AddMessage(m); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
|
@ -447,9 +468,19 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visito
|
|||
|
||||
func (s *Server) parsePublishParams(r *http.Request, v *visitor, m *message) (cache bool, firebase bool, email string, unifiedpush bool, err error) {
|
||||
cache = readBoolParam(r, true, "x-cache", "cache")
|
||||
if !cache && m.Updated != 0 {
|
||||
return false, false, "", false, errors.New("message updates must be cached")
|
||||
}
|
||||
// TODO more restrictions
|
||||
firebase = readBoolParam(r, true, "x-firebase", "firebase")
|
||||
m.Title = readParam(r, "x-title", "title", "t")
|
||||
m.Click = readParam(r, "x-click", "click")
|
||||
title := readParam(r, "x-title", "title", "t")
|
||||
if title != "" {
|
||||
m.Title = title
|
||||
}
|
||||
click := readParam(r, "x-click", "click")
|
||||
if click != "" {
|
||||
m.Click = click
|
||||
}
|
||||
filename := readParam(r, "x-filename", "filename", "file", "f")
|
||||
attach := readParam(r, "x-attach", "attach", "a")
|
||||
if attach != "" || filename != "" {
|
||||
|
@ -489,9 +520,11 @@ func (s *Server) parsePublishParams(r *http.Request, v *visitor, m *message) (ca
|
|||
if messageStr != "" {
|
||||
m.Message = messageStr
|
||||
}
|
||||
m.Priority, err = util.ParsePriority(readParam(r, "x-priority", "priority", "prio", "p"))
|
||||
priority, err := util.ParsePriority(readParam(r, "x-priority", "priority", "prio", "p"))
|
||||
if err != nil {
|
||||
return false, false, "", false, errHTTPBadRequestPriorityInvalid
|
||||
} else if priority > 0 {
|
||||
m.Priority = priority
|
||||
}
|
||||
tagsStr := readParam(r, "x-tags", "tags", "tag", "ta")
|
||||
if tagsStr != "" {
|
||||
|
@ -870,6 +903,13 @@ func parseSince(r *http.Request, poll bool) (sinceMarker, error) {
|
|||
return sinceNoMessages, nil
|
||||
}
|
||||
|
||||
// ID/timestamp
|
||||
parts := strings.Split(since, "/")
|
||||
if len(parts) == 2 && validMessageID(parts[0]) && validUnixTimestamp(parts[1]) {
|
||||
t, _ := toUnixTimestamp(parts[1])
|
||||
return newSince(parts[0], t), nil
|
||||
}
|
||||
|
||||
// ID, timestamp, duration
|
||||
if validMessageID(since) {
|
||||
return newSinceID(since), nil
|
||||
|
@ -888,16 +928,20 @@ func (s *Server) handleOptions(w http.ResponseWriter, _ *http.Request) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) topicFromPath(path string) (*topic, error) {
|
||||
func (s *Server) topicAndMessageIDFromPath(path string) (*topic, string, error) {
|
||||
parts := strings.Split(path, "/")
|
||||
if len(parts) < 2 {
|
||||
return nil, errHTTPBadRequestTopicInvalid
|
||||
if len(parts) != 2 && len(parts) != 3 {
|
||||
return nil, "", errHTTPBadRequestTopicInvalid
|
||||
}
|
||||
topics, err := s.topicsFromIDs(parts[1])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, "", err
|
||||
}
|
||||
return topics[0], nil
|
||||
messageID := ""
|
||||
if len(parts) == 3 && len(parts[2]) == messageIDLength {
|
||||
messageID = parts[2]
|
||||
}
|
||||
return topics[0], messageID, nil
|
||||
}
|
||||
|
||||
func (s *Server) topicsFromPath(path string) ([]*topic, string, error) {
|
||||
|
|
|
@ -72,6 +72,7 @@ func toFirebaseMessage(m *message, auther auth.Auther) (*messaging.Message, erro
|
|||
data = map[string]string{
|
||||
"id": m.ID,
|
||||
"time": fmt.Sprintf("%d", m.Time),
|
||||
"updated": fmt.Sprintf("%d", m.Updated),
|
||||
"event": m.Event,
|
||||
"topic": m.Topic,
|
||||
"priority": fmt.Sprintf("%d", m.Priority),
|
||||
|
|
|
@ -77,6 +77,7 @@ func TestToFirebaseMessage_Message_Normal_Allowed(t *testing.T) {
|
|||
require.Equal(t, map[string]string{
|
||||
"id": m.ID,
|
||||
"time": fmt.Sprintf("%d", m.Time),
|
||||
"updated": "0",
|
||||
"event": "message",
|
||||
"topic": "mytopic",
|
||||
"priority": "4",
|
||||
|
|
|
@ -390,6 +390,69 @@ func TestServer_PublishAndPollSince(t *testing.T) {
|
|||
require.Equal(t, 40008, toHTTPError(t, response.Body.String()).Code)
|
||||
}
|
||||
|
||||
func TestServer_PublishUpdateAndPollSince(t *testing.T) {
|
||||
s := newTestServer(t, newTestConfig(t))
|
||||
|
||||
// Initial PUT
|
||||
response := request(t, s, "PUT", "/mytopic?t=atitle&tags=tag1,tag2&prio=high&click=https://google.com&attach=https://heckel.io", "test 1", nil)
|
||||
message1 := toMessage(t, response.Body.String())
|
||||
require.Equal(t, int64(0), message1.Updated)
|
||||
require.Equal(t, "test 1", message1.Message)
|
||||
require.Equal(t, "atitle", message1.Title)
|
||||
require.Equal(t, 4, message1.Priority)
|
||||
require.Equal(t, []string{"tag1", "tag2"}, message1.Tags)
|
||||
require.Equal(t, "https://google.com", message1.Click)
|
||||
require.Equal(t, "https://heckel.io", message1.Attachment.URL)
|
||||
|
||||
// Update
|
||||
response = request(t, s, "PUT", "/mytopic/"+message1.ID+"?prio=low", "test 2", nil)
|
||||
message2 := toMessage(t, response.Body.String())
|
||||
require.Equal(t, message1.ID, message2.ID)
|
||||
require.True(t, message2.Updated > message1.Updated)
|
||||
require.Equal(t, "test 2", message2.Message) // Updated
|
||||
require.Equal(t, "atitle", message2.Title)
|
||||
require.Equal(t, 2, message2.Priority) // Updated
|
||||
require.Equal(t, []string{"tag1", "tag2"}, message2.Tags)
|
||||
require.Equal(t, "https://google.com", message2.Click)
|
||||
require.Equal(t, "https://heckel.io", message2.Attachment.URL)
|
||||
|
||||
time.Sleep(1100 * time.Millisecond)
|
||||
|
||||
// Another update
|
||||
response = request(t, s, "PUT", "/mytopic/"+message1.ID+"?title=new+title", "test 3", nil)
|
||||
message3 := toMessage(t, response.Body.String())
|
||||
require.True(t, message3.Updated > message2.Updated)
|
||||
require.Equal(t, "test 3", message3.Message) // Updated
|
||||
require.Equal(t, "new title", message3.Title) // Updated
|
||||
|
||||
// Get all messages: Should be only one that was updated
|
||||
since := "all"
|
||||
response = request(t, s, "GET", "/mytopic/json?since="+since+"&poll=1", "", nil)
|
||||
messages := toMessages(t, response.Body.String())
|
||||
require.Equal(t, 1, len(messages))
|
||||
require.Equal(t, message1.ID, messages[0].ID)
|
||||
require.Equal(t, "test 3", messages[0].Message)
|
||||
|
||||
// Get all messages since "message ID": Should be zero, since we know this message
|
||||
since = message1.ID
|
||||
response = request(t, s, "GET", "/mytopic/json?since="+since+"&poll=1", "", nil)
|
||||
messages = toMessages(t, response.Body.String())
|
||||
require.Equal(t, 0, len(messages))
|
||||
|
||||
// Get all messages since "message ID" but with an older timestamp: Should be the latest updated message
|
||||
since = fmt.Sprintf("%s/%d", message1.ID, message2.Updated) // We're missing an update
|
||||
response = request(t, s, "GET", "/mytopic/json?since="+since+"&poll=1", "", nil)
|
||||
messages = toMessages(t, response.Body.String())
|
||||
require.Equal(t, 1, len(messages))
|
||||
require.Equal(t, "test 3", messages[0].Message)
|
||||
|
||||
// Get all messages since "message ID" with the current timestamp: No messages expected
|
||||
since = fmt.Sprintf("%s/%d", message3.ID, message3.Updated) // We are up-to-date
|
||||
response = request(t, s, "GET", "/mytopic/json?since="+since+"&poll=1", "", nil)
|
||||
messages = toMessages(t, response.Body.String())
|
||||
require.Equal(t, 0, len(messages))
|
||||
}
|
||||
|
||||
func TestServer_PublishViaGET(t *testing.T) {
|
||||
s := newTestServer(t, newTestConfig(t))
|
||||
|
||||
|
|
|
@ -1,8 +1,10 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"heckel.io/ntfy/util"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
|
@ -20,9 +22,11 @@ const (
|
|||
|
||||
// message represents a message published to a topic
|
||||
type message struct {
|
||||
ID string `json:"id"` // Random message ID
|
||||
Time int64 `json:"time"` // Unix time in seconds
|
||||
Event string `json:"event"` // One of the above
|
||||
ID string `json:"id"` // Random message ID
|
||||
Time int64 `json:"time"` // Unix time in seconds
|
||||
Updated int64 `json:"updated,omitempty"` // Set if updated, unix time in seconds
|
||||
Deleted int64 `json:"deleted,omitempty"` // Set if deleted, unix time in seconds
|
||||
Event string `json:"event"` // One of the above
|
||||
Topic string `json:"topic"`
|
||||
Priority int `json:"priority,omitempty"`
|
||||
Tags []string `json:"tags,omitempty"`
|
||||
|
@ -30,7 +34,7 @@ type message struct {
|
|||
Attachment *attachment `json:"attachment,omitempty"`
|
||||
Title string `json:"title,omitempty"`
|
||||
Message string `json:"message,omitempty"`
|
||||
Encoding string `json:"encoding,omitempty"` // empty for raw UTF-8, or "base64" for encoded bytes
|
||||
Encoding string `json:"encoding,omitempty"` // Empty for raw UTF-8, or "base64" for encoded bytes
|
||||
}
|
||||
|
||||
type attachment struct {
|
||||
|
@ -90,11 +94,31 @@ func validMessageID(s string) bool {
|
|||
return util.ValidRandomString(s, messageIDLength)
|
||||
}
|
||||
|
||||
func validUnixTimestamp(s string) bool {
|
||||
_, err := toUnixTimestamp(s)
|
||||
return err == nil
|
||||
}
|
||||
|
||||
func toUnixTimestamp(s string) (int64, error) {
|
||||
u, err := strconv.ParseInt(s, 10, 64)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if u < 1000000000 || u > 3000000000 { // I know. It's practical. So relax ...
|
||||
return 0, errors.New("invalid unix date")
|
||||
}
|
||||
return u, nil
|
||||
}
|
||||
|
||||
type sinceMarker struct {
|
||||
time time.Time
|
||||
id string
|
||||
}
|
||||
|
||||
func newSince(id string, timestamp int64) sinceMarker {
|
||||
return sinceMarker{time.Unix(timestamp, 0), id}
|
||||
}
|
||||
|
||||
func newSinceTime(timestamp int64) sinceMarker {
|
||||
return sinceMarker{time.Unix(timestamp, 0), ""}
|
||||
}
|
||||
|
@ -115,6 +139,10 @@ func (t sinceMarker) IsID() bool {
|
|||
return t.id != ""
|
||||
}
|
||||
|
||||
func (t sinceMarker) IsTime() bool {
|
||||
return t.time.Unix() > 0
|
||||
}
|
||||
|
||||
func (t sinceMarker) Time() time.Time {
|
||||
return t.time
|
||||
}
|
||||
|
|
|
@ -17,7 +17,7 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
randomStringCharset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
|
||||
randomStringCharset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" // Update updateTopicPathRegex if changed
|
||||
)
|
||||
|
||||
var (
|
||||
|
|
|
@ -65,13 +65,17 @@ class SubscriptionManager {
|
|||
|
||||
/** Adds notification, or returns false if it already exists */
|
||||
async addNotification(subscriptionId, notification) {
|
||||
const exists = await db.notifications.get(notification.id);
|
||||
if (exists) {
|
||||
return false;
|
||||
const existingNotification = await db.notifications.get(notification.id);
|
||||
if (existingNotification) {
|
||||
const upToDate = (existingNotification?.updated ?? 0) >= (notification.updated ?? 0);
|
||||
if (upToDate) {
|
||||
console.error(`[SubscriptionManager] up to date`, existingNotification, notification);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
try {
|
||||
notification.new = 1; // New marker (used for bubble indicator); cannot be boolean; Dexie index limitation
|
||||
await db.notifications.add({ ...notification, subscriptionId }); // FIXME consider put() for double tab
|
||||
await db.notifications.put({ ...notification, subscriptionId });
|
||||
await db.subscriptions.update(subscriptionId, {
|
||||
last: notification.id
|
||||
});
|
||||
|
|
Loading…
Reference in New Issue