From a38aca47bda185024eac59ee9215a7e9f637ac83 Mon Sep 17 00:00:00 2001 From: Philipp Heckel Date: Wed, 27 Oct 2021 14:56:17 -0400 Subject: [PATCH] Subscribe endpoint consolidation; same behavior for all endpoints; keepalive --- cmd/app.go | 3 + config/config.go | 23 ++++---- server/index.html | 50 +++++++++++------ server/message.go | 43 +++++++++++++++ server/server.go | 113 ++++++++++++++++++-------------------- server/static/css/app.css | 6 +- server/static/js/app.js | 2 +- server/topic.go | 7 +-- 8 files changed, 154 insertions(+), 93 deletions(-) create mode 100644 server/message.go diff --git a/cmd/app.go b/cmd/app.go index 4d8c1fa1..59ee5155 100644 --- a/cmd/app.go +++ b/cmd/app.go @@ -16,6 +16,7 @@ func New() *cli.App { flags := []cli.Flag{ &cli.StringFlag{Name: "config", Aliases: []string{"c"}, EnvVars: []string{"NTFY_CONFIG_FILE"}, Value: "/etc/ntfy/config.yml", DefaultText: "/etc/ntfy/config.yml", Usage: "config file"}, altsrc.NewStringFlag(&cli.StringFlag{Name: "listen-http", Aliases: []string{"l"}, EnvVars: []string{"NTFY_LISTEN_HTTP"}, Value: config.DefaultListenHTTP, Usage: "ip:port used to as listen address"}), + altsrc.NewDurationFlag(&cli.DurationFlag{Name: "keepalive-interval", Aliases: []string{"k"}, EnvVars: []string{"NTFY_KEEPALIVE_INTERVAL"}, Value: config.DefaultKeepaliveInterval, Usage: "default interval of keepalive messages"}), } return &cli.App{ Name: "ntfy", @@ -37,9 +38,11 @@ func New() *cli.App { func execRun(c *cli.Context) error { // Read all the options listenHTTP := c.String("listen-http") + keepaliveInterval := c.Duration("keepalive-interval") // Run main bot, can be killed by signal conf := config.New(listenHTTP) + conf.KeepaliveInterval = keepaliveInterval s := server.New(conf) if err := s.Run(); err != nil { log.Fatalln(err) diff --git a/config/config.go b/config/config.go index ba91f132..c7f39a40 100644 --- a/config/config.go +++ b/config/config.go @@ -8,8 +8,9 @@ import ( // Defines default config settings const ( - DefaultListenHTTP = ":80" - defaultManagerInterval = time.Minute + DefaultListenHTTP = ":80" + DefaultKeepaliveInterval = 30 * time.Second + defaultManagerInterval = time.Minute ) // Defines the max number of requests, here: @@ -21,18 +22,20 @@ var ( // Config is the main config struct for the application. Use New to instantiate a default config struct. type Config struct { - ListenHTTP string - Limit rate.Limit - LimitBurst int - ManagerInterval time.Duration + ListenHTTP string + Limit rate.Limit + LimitBurst int + KeepaliveInterval time.Duration + ManagerInterval time.Duration } // New instantiates a default new config func New(listenHTTP string) *Config { return &Config{ - ListenHTTP: listenHTTP, - Limit: defaultLimit, - LimitBurst: defaultLimitBurst, - ManagerInterval: defaultManagerInterval, + ListenHTTP: listenHTTP, + Limit: defaultLimit, + LimitBurst: defaultLimitBurst, + KeepaliveInterval: DefaultKeepaliveInterval, + ManagerInterval: defaultManagerInterval, } } diff --git a/server/index.html b/server/index.html index e42114e0..0c9f5392 100644 --- a/server/index.html +++ b/server/index.html @@ -65,20 +65,7 @@

Subscribe via your app, or via the CLI

-

- Here are some examples using curl: -

- - # one message per line (\n are replaced with a space)
- curl -s ntfy.sh/mytopic/raw

- - # one JSON message per line
- curl -s ntfy.sh/mytopic/json

- - # server-sent events (SSE) stream, use with EventSource
- curl -s ntfy.sh/mytopic/sse -
-

+

Using EventSource, you can consume notifications like this (see full example):

@@ -88,15 +75,46 @@   // Do something with e.data
}; +

+ Or you can use curl or any other HTTP library. Here's an example for the /json endpoint, + which prints one JSON message per line (keepalive and open messages have an "event" field): +

+ + $ curl -s ntfy.sh/mytopic/json
+ {"time":1635359841,"event":"open"}
+ {"time":1635359844,"message":"This is a notification"}
+ {"time":1635359851,"event":"keepalive"} +
+

+ Using the /sse endpoint (SSE, server-sent events stream): +

+ + $ curl -s ntfy.sh/mytopic/sse
+ event: open
+ data: {"time":1635359796,"event":"open"}

+ + data: {"time":1635359803,"message":"This is a notification"}

+ + event: keepalive
+ data: {"time":1635359806,"event":"keepalive"} +
+

+ Using the /raw endpoint (empty lines are keepalive messages): +

+ + $ curl -s ntfy.sh/mytopic/raw
+
+ This is a notification +

Publishing messages

-

+

Publishing messages can be done via PUT or POST using. Here's an example using curl:

curl -d "long process is done" ntfy.sh/mytopic -

+

Here's an example in JS with fetch() (see full example):

diff --git a/server/message.go b/server/message.go new file mode 100644 index 00000000..514ebac1 --- /dev/null +++ b/server/message.go @@ -0,0 +1,43 @@ +package server + +import "time" + +// List of possible events +const ( + openEvent = "open" + keepaliveEvent = "keepalive" +) + +// message represents a message published to a topic +type message struct { + Time int64 `json:"time"` // Unix time in seconds + Event string `json:"event,omitempty"` // One of the above + Message string `json:"message,omitempty"` +} + +// messageEncoder is a function that knows how to encode a message +type messageEncoder func(msg *message) (string, error) + +// newMessage creates a new message with the current timestamp +func newMessage(event string, msg string) *message { + return &message{ + Time: time.Now().Unix(), + Event: event, + Message: msg, + } +} + +// newOpenMessage is a convenience method to create an open message +func newOpenMessage() *message { + return newMessage(openEvent, "") +} + +// newKeepaliveMessage is a convenience method to create a keepalive message +func newKeepaliveMessage() *message { + return newMessage(keepaliveEvent, "") +} + +// newDefaultMessage is a convenience method to create a notification message +func newDefaultMessage(msg string) *message { + return newMessage("", msg) +} diff --git a/server/server.go b/server/server.go index acb1664a..b639cf08 100644 --- a/server/server.go +++ b/server/server.go @@ -48,11 +48,11 @@ const ( ) var ( - topicRegex = regexp.MustCompile(`^/[^/]+$`) - jsonRegex = regexp.MustCompile(`^/[^/]+/json$`) - sseRegex = regexp.MustCompile(`^/[^/]+/sse$`) - rawRegex = regexp.MustCompile(`^/[^/]+/raw$`) - staticRegex = regexp.MustCompile(`^/static/.+`) + topicRegex = regexp.MustCompile(`^/[^/]+$`) + jsonRegex = regexp.MustCompile(`^/[^/]+/json$`) + sseRegex = regexp.MustCompile(`^/[^/]+/sse$`) + rawRegex = regexp.MustCompile(`^/[^/]+/raw$`) + staticRegex = regexp.MustCompile(`^/static/.+`) //go:embed "index.html" indexSource string @@ -159,11 +159,7 @@ func (s *Server) handlePublishHTTP(w http.ResponseWriter, r *http.Request) error if err != nil { return err } - msg := &message{ - Time: time.Now().UnixMilli(), - Message: string(b), - } - if err := t.Publish(msg); err != nil { + if err := t.Publish(newDefaultMessage(string(b))); err != nil { return err } w.Header().Set("Access-Control-Allow-Origin", "*") // CORS, allow cross-origin requests @@ -171,75 +167,74 @@ func (s *Server) handlePublishHTTP(w http.ResponseWriter, r *http.Request) error } func (s *Server) handleSubscribeJSON(w http.ResponseWriter, r *http.Request) error { - t := s.createTopic(strings.TrimSuffix(r.URL.Path[1:], "/json")) // Hack - subscriberID := t.Subscribe(func(msg *message) error { - if err := json.NewEncoder(w).Encode(&msg); err != nil { - return err + encoder := func(msg *message) (string, error) { + var buf bytes.Buffer + if err := json.NewEncoder(&buf).Encode(&msg); err != nil { + return "", err } - if fl, ok := w.(http.Flusher); ok { - fl.Flush() - } - return nil - }) - defer s.unsubscribe(t, subscriberID) - w.Header().Set("Access-Control-Allow-Origin", "*") // CORS, allow cross-origin requests - select { - case <-t.ctx.Done(): - case <-r.Context().Done(): + return buf.String(), nil } - return nil + return s.handleSubscribe(w, r, "json", "application/stream+json", encoder) } func (s *Server) handleSubscribeSSE(w http.ResponseWriter, r *http.Request) error { - t := s.createTopic(strings.TrimSuffix(r.URL.Path[1:], "/sse")) // Hack - subscriberID := t.Subscribe(func(msg *message) error { + encoder := func(msg *message) (string, error) { var buf bytes.Buffer if err := json.NewEncoder(&buf).Encode(&msg); err != nil { - return err + return "", err } - m := fmt.Sprintf("data: %s\n", buf.String()) - if _, err := io.WriteString(w, m); err != nil { - return err + if msg.Event != "" { + return fmt.Sprintf("event: %s\ndata: %s\n", msg.Event, buf.String()), nil // Browser's .onmessage() does not fire on this! } - if fl, ok := w.(http.Flusher); ok { - fl.Flush() - } - return nil - }) - defer s.unsubscribe(t, subscriberID) - w.Header().Set("Content-Type", "text/event-stream") - w.Header().Set("Access-Control-Allow-Origin", "*") // CORS, allow cross-origin requests - if _, err := io.WriteString(w, "event: open\n\n"); err != nil { - return err + return fmt.Sprintf("data: %s\n", buf.String()), nil } - if fl, ok := w.(http.Flusher); ok { - fl.Flush() - } - select { - case <-t.ctx.Done(): - case <-r.Context().Done(): - } - return nil + return s.handleSubscribe(w, r, "sse", "text/event-stream", encoder) } func (s *Server) handleSubscribeRaw(w http.ResponseWriter, r *http.Request) error { - t := s.createTopic(strings.TrimSuffix(r.URL.Path[1:], "/raw")) // Hack - subscriberID := t.Subscribe(func(msg *message) error { - m := strings.ReplaceAll(msg.Message, "\n", " ") + "\n" - if _, err := io.WriteString(w, m); err != nil { + encoder := func(msg *message) (string, error) { + if msg.Event == "" { // only handle default events + return strings.ReplaceAll(msg.Message, "\n", " ") + "\n", nil + } + return "\n", nil // "keepalive" and "open" events just send an empty line + } + return s.handleSubscribe(w, r, "raw", "text/plain", encoder) +} + +func (s *Server) handleSubscribe(w http.ResponseWriter, r *http.Request, format string, contentType string, encoder messageEncoder) error { + t := s.createTopic(strings.TrimSuffix(r.URL.Path[1:], "/"+format)) // Hack + sub := func(msg *message) error { + m, err := encoder(msg) + if err != nil { + return err + } + if _, err := w.Write([]byte(m)); err != nil { return err } if fl, ok := w.(http.Flusher); ok { fl.Flush() } return nil - }) - defer s.unsubscribe(t, subscriberID) - select { - case <-t.ctx.Done(): - case <-r.Context().Done(): } - return nil + subscriberID := t.Subscribe(sub) + defer s.unsubscribe(t, subscriberID) + w.Header().Set("Access-Control-Allow-Origin", "*") // CORS, allow cross-origin requests + w.Header().Set("Content-Type", contentType) + if err := sub(newOpenMessage()); err != nil { // Send out open message + return err + } + for { + select { + case <-t.ctx.Done(): + return nil + case <-r.Context().Done(): + return nil + case <-time.After(s.config.KeepaliveInterval): + if err := sub(newKeepaliveMessage()); err != nil { // Send keepalive message + return err + } + } + } } func (s *Server) handleOptions(w http.ResponseWriter, r *http.Request) error { diff --git a/server/static/css/app.css b/server/static/css/app.css index ba6c8c21..fe409447 100644 --- a/server/static/css/app.css +++ b/server/static/css/app.css @@ -40,6 +40,10 @@ p { line-height: 140%; } +p.smallMarginBottom { + margin-bottom: 10px; +} + tt { background: #eee; padding: 2px 7px; @@ -53,7 +57,7 @@ code { padding: 20px; border-radius: 3px; margin-top: 10px; - margin-bottom: 10px; + margin-bottom: 20px; } /* Lato font (OFL), https://fonts.google.com/specimen/Lato#about, diff --git a/server/static/js/app.js b/server/static/js/app.js index 33baa990..b1b78a56 100644 --- a/server/static/js/app.js +++ b/server/static/js/app.js @@ -60,7 +60,7 @@ const subscribeInternal = (topic, delaySec) => { eventSource.onmessage = (e) => { const event = JSON.parse(e.data); notifySound.play(); - new Notification(topic, { + new Notification(`${location.host}/${topic}`, { body: event.message, icon: '/static/img/favicon.png' }); diff --git a/server/topic.go b/server/topic.go index e83c2966..ab9f26b8 100644 --- a/server/topic.go +++ b/server/topic.go @@ -21,15 +21,10 @@ type topic struct { mu sync.Mutex } -// message represents a message published to a topic -type message struct { - Time int64 `json:"time"` - Message string `json:"message"` -} - // subscriber is a function that is called for every new message on a topic type subscriber func(msg *message) error +// newTopic creates a new topic func newTopic(id string) *topic { ctx, cancel := context.WithCancel(context.Background()) return &topic{