diff --git a/cmd/app.go b/cmd/app.go index 4d8c1fa1..59ee5155 100644 --- a/cmd/app.go +++ b/cmd/app.go @@ -16,6 +16,7 @@ func New() *cli.App { flags := []cli.Flag{ &cli.StringFlag{Name: "config", Aliases: []string{"c"}, EnvVars: []string{"NTFY_CONFIG_FILE"}, Value: "/etc/ntfy/config.yml", DefaultText: "/etc/ntfy/config.yml", Usage: "config file"}, altsrc.NewStringFlag(&cli.StringFlag{Name: "listen-http", Aliases: []string{"l"}, EnvVars: []string{"NTFY_LISTEN_HTTP"}, Value: config.DefaultListenHTTP, Usage: "ip:port used to as listen address"}), + altsrc.NewDurationFlag(&cli.DurationFlag{Name: "keepalive-interval", Aliases: []string{"k"}, EnvVars: []string{"NTFY_KEEPALIVE_INTERVAL"}, Value: config.DefaultKeepaliveInterval, Usage: "default interval of keepalive messages"}), } return &cli.App{ Name: "ntfy", @@ -37,9 +38,11 @@ func New() *cli.App { func execRun(c *cli.Context) error { // Read all the options listenHTTP := c.String("listen-http") + keepaliveInterval := c.Duration("keepalive-interval") // Run main bot, can be killed by signal conf := config.New(listenHTTP) + conf.KeepaliveInterval = keepaliveInterval s := server.New(conf) if err := s.Run(); err != nil { log.Fatalln(err) diff --git a/config/config.go b/config/config.go index ba91f132..c7f39a40 100644 --- a/config/config.go +++ b/config/config.go @@ -8,8 +8,9 @@ import ( // Defines default config settings const ( - DefaultListenHTTP = ":80" - defaultManagerInterval = time.Minute + DefaultListenHTTP = ":80" + DefaultKeepaliveInterval = 30 * time.Second + defaultManagerInterval = time.Minute ) // Defines the max number of requests, here: @@ -21,18 +22,20 @@ var ( // Config is the main config struct for the application. Use New to instantiate a default config struct. type Config struct { - ListenHTTP string - Limit rate.Limit - LimitBurst int - ManagerInterval time.Duration + ListenHTTP string + Limit rate.Limit + LimitBurst int + KeepaliveInterval time.Duration + ManagerInterval time.Duration } // New instantiates a default new config func New(listenHTTP string) *Config { return &Config{ - ListenHTTP: listenHTTP, - Limit: defaultLimit, - LimitBurst: defaultLimitBurst, - ManagerInterval: defaultManagerInterval, + ListenHTTP: listenHTTP, + Limit: defaultLimit, + LimitBurst: defaultLimitBurst, + KeepaliveInterval: DefaultKeepaliveInterval, + ManagerInterval: defaultManagerInterval, } } diff --git a/server/index.html b/server/index.html index e42114e0..0c9f5392 100644 --- a/server/index.html +++ b/server/index.html @@ -65,20 +65,7 @@
- Here are some examples using curl: -
-
- # one message per line (\n are replaced with a space)
- curl -s ntfy.sh/mytopic/raw
-
- # one JSON message per line
- curl -s ntfy.sh/mytopic/json
-
- # server-sent events (SSE) stream, use with EventSource
- curl -s ntfy.sh/mytopic/sse
-
- +
Using EventSource, you can consume notifications like this (see full example):
@@ -88,15 +75,46 @@ // Do something with e.data+ Or you can use curl or any other HTTP library. Here's an example for the /json endpoint, + which prints one JSON message per line (keepalive and open messages have an "event" field): +
+
+ $ curl -s ntfy.sh/mytopic/json
+ {"time":1635359841,"event":"open"}
+ {"time":1635359844,"message":"This is a notification"}
+ {"time":1635359851,"event":"keepalive"}
+
+ + Using the /sse endpoint (SSE, server-sent events stream): +
+
+ $ curl -s ntfy.sh/mytopic/sse
+ event: open
+ data: {"time":1635359796,"event":"open"}
+
+ data: {"time":1635359803,"message":"This is a notification"}
+
+ event: keepalive
+ data: {"time":1635359806,"event":"keepalive"}
+
+ + Using the /raw endpoint (empty lines are keepalive messages): +
+
+ $ curl -s ntfy.sh/mytopic/raw
+
+ This is a notification
+
+
Publishing messages can be done via PUT or POST using. Here's an example using curl:
curl -d "long process is done" ntfy.sh/mytopic
- +
Here's an example in JS with fetch() (see full example):
diff --git a/server/message.go b/server/message.go
new file mode 100644
index 00000000..514ebac1
--- /dev/null
+++ b/server/message.go
@@ -0,0 +1,43 @@
+package server
+
+import "time"
+
+// List of possible events
+const (
+ openEvent = "open"
+ keepaliveEvent = "keepalive"
+)
+
+// message represents a message published to a topic
+type message struct {
+ Time int64 `json:"time"` // Unix time in seconds
+ Event string `json:"event,omitempty"` // One of the above
+ Message string `json:"message,omitempty"`
+}
+
+// messageEncoder is a function that knows how to encode a message
+type messageEncoder func(msg *message) (string, error)
+
+// newMessage creates a new message with the current timestamp
+func newMessage(event string, msg string) *message {
+ return &message{
+ Time: time.Now().Unix(),
+ Event: event,
+ Message: msg,
+ }
+}
+
+// newOpenMessage is a convenience method to create an open message
+func newOpenMessage() *message {
+ return newMessage(openEvent, "")
+}
+
+// newKeepaliveMessage is a convenience method to create a keepalive message
+func newKeepaliveMessage() *message {
+ return newMessage(keepaliveEvent, "")
+}
+
+// newDefaultMessage is a convenience method to create a notification message
+func newDefaultMessage(msg string) *message {
+ return newMessage("", msg)
+}
diff --git a/server/server.go b/server/server.go
index acb1664a..b639cf08 100644
--- a/server/server.go
+++ b/server/server.go
@@ -48,11 +48,11 @@ const (
)
var (
- topicRegex = regexp.MustCompile(`^/[^/]+$`)
- jsonRegex = regexp.MustCompile(`^/[^/]+/json$`)
- sseRegex = regexp.MustCompile(`^/[^/]+/sse$`)
- rawRegex = regexp.MustCompile(`^/[^/]+/raw$`)
- staticRegex = regexp.MustCompile(`^/static/.+`)
+ topicRegex = regexp.MustCompile(`^/[^/]+$`)
+ jsonRegex = regexp.MustCompile(`^/[^/]+/json$`)
+ sseRegex = regexp.MustCompile(`^/[^/]+/sse$`)
+ rawRegex = regexp.MustCompile(`^/[^/]+/raw$`)
+ staticRegex = regexp.MustCompile(`^/static/.+`)
//go:embed "index.html"
indexSource string
@@ -159,11 +159,7 @@ func (s *Server) handlePublishHTTP(w http.ResponseWriter, r *http.Request) error
if err != nil {
return err
}
- msg := &message{
- Time: time.Now().UnixMilli(),
- Message: string(b),
- }
- if err := t.Publish(msg); err != nil {
+ if err := t.Publish(newDefaultMessage(string(b))); err != nil {
return err
}
w.Header().Set("Access-Control-Allow-Origin", "*") // CORS, allow cross-origin requests
@@ -171,75 +167,74 @@ func (s *Server) handlePublishHTTP(w http.ResponseWriter, r *http.Request) error
}
func (s *Server) handleSubscribeJSON(w http.ResponseWriter, r *http.Request) error {
- t := s.createTopic(strings.TrimSuffix(r.URL.Path[1:], "/json")) // Hack
- subscriberID := t.Subscribe(func(msg *message) error {
- if err := json.NewEncoder(w).Encode(&msg); err != nil {
- return err
+ encoder := func(msg *message) (string, error) {
+ var buf bytes.Buffer
+ if err := json.NewEncoder(&buf).Encode(&msg); err != nil {
+ return "", err
}
- if fl, ok := w.(http.Flusher); ok {
- fl.Flush()
- }
- return nil
- })
- defer s.unsubscribe(t, subscriberID)
- w.Header().Set("Access-Control-Allow-Origin", "*") // CORS, allow cross-origin requests
- select {
- case <-t.ctx.Done():
- case <-r.Context().Done():
+ return buf.String(), nil
}
- return nil
+ return s.handleSubscribe(w, r, "json", "application/stream+json", encoder)
}
func (s *Server) handleSubscribeSSE(w http.ResponseWriter, r *http.Request) error {
- t := s.createTopic(strings.TrimSuffix(r.URL.Path[1:], "/sse")) // Hack
- subscriberID := t.Subscribe(func(msg *message) error {
+ encoder := func(msg *message) (string, error) {
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(&msg); err != nil {
- return err
+ return "", err
}
- m := fmt.Sprintf("data: %s\n", buf.String())
- if _, err := io.WriteString(w, m); err != nil {
- return err
+ if msg.Event != "" {
+ return fmt.Sprintf("event: %s\ndata: %s\n", msg.Event, buf.String()), nil // Browser's .onmessage() does not fire on this!
}
- if fl, ok := w.(http.Flusher); ok {
- fl.Flush()
- }
- return nil
- })
- defer s.unsubscribe(t, subscriberID)
- w.Header().Set("Content-Type", "text/event-stream")
- w.Header().Set("Access-Control-Allow-Origin", "*") // CORS, allow cross-origin requests
- if _, err := io.WriteString(w, "event: open\n\n"); err != nil {
- return err
+ return fmt.Sprintf("data: %s\n", buf.String()), nil
}
- if fl, ok := w.(http.Flusher); ok {
- fl.Flush()
- }
- select {
- case <-t.ctx.Done():
- case <-r.Context().Done():
- }
- return nil
+ return s.handleSubscribe(w, r, "sse", "text/event-stream", encoder)
}
func (s *Server) handleSubscribeRaw(w http.ResponseWriter, r *http.Request) error {
- t := s.createTopic(strings.TrimSuffix(r.URL.Path[1:], "/raw")) // Hack
- subscriberID := t.Subscribe(func(msg *message) error {
- m := strings.ReplaceAll(msg.Message, "\n", " ") + "\n"
- if _, err := io.WriteString(w, m); err != nil {
+ encoder := func(msg *message) (string, error) {
+ if msg.Event == "" { // only handle default events
+ return strings.ReplaceAll(msg.Message, "\n", " ") + "\n", nil
+ }
+ return "\n", nil // "keepalive" and "open" events just send an empty line
+ }
+ return s.handleSubscribe(w, r, "raw", "text/plain", encoder)
+}
+
+func (s *Server) handleSubscribe(w http.ResponseWriter, r *http.Request, format string, contentType string, encoder messageEncoder) error {
+ t := s.createTopic(strings.TrimSuffix(r.URL.Path[1:], "/"+format)) // Hack
+ sub := func(msg *message) error {
+ m, err := encoder(msg)
+ if err != nil {
+ return err
+ }
+ if _, err := w.Write([]byte(m)); err != nil {
return err
}
if fl, ok := w.(http.Flusher); ok {
fl.Flush()
}
return nil
- })
- defer s.unsubscribe(t, subscriberID)
- select {
- case <-t.ctx.Done():
- case <-r.Context().Done():
}
- return nil
+ subscriberID := t.Subscribe(sub)
+ defer s.unsubscribe(t, subscriberID)
+ w.Header().Set("Access-Control-Allow-Origin", "*") // CORS, allow cross-origin requests
+ w.Header().Set("Content-Type", contentType)
+ if err := sub(newOpenMessage()); err != nil { // Send out open message
+ return err
+ }
+ for {
+ select {
+ case <-t.ctx.Done():
+ return nil
+ case <-r.Context().Done():
+ return nil
+ case <-time.After(s.config.KeepaliveInterval):
+ if err := sub(newKeepaliveMessage()); err != nil { // Send keepalive message
+ return err
+ }
+ }
+ }
}
func (s *Server) handleOptions(w http.ResponseWriter, r *http.Request) error {
diff --git a/server/static/css/app.css b/server/static/css/app.css
index ba6c8c21..fe409447 100644
--- a/server/static/css/app.css
+++ b/server/static/css/app.css
@@ -40,6 +40,10 @@ p {
line-height: 140%;
}
+p.smallMarginBottom {
+ margin-bottom: 10px;
+}
+
tt {
background: #eee;
padding: 2px 7px;
@@ -53,7 +57,7 @@ code {
padding: 20px;
border-radius: 3px;
margin-top: 10px;
- margin-bottom: 10px;
+ margin-bottom: 20px;
}
/* Lato font (OFL), https://fonts.google.com/specimen/Lato#about,
diff --git a/server/static/js/app.js b/server/static/js/app.js
index 33baa990..b1b78a56 100644
--- a/server/static/js/app.js
+++ b/server/static/js/app.js
@@ -60,7 +60,7 @@ const subscribeInternal = (topic, delaySec) => {
eventSource.onmessage = (e) => {
const event = JSON.parse(e.data);
notifySound.play();
- new Notification(topic, {
+ new Notification(`${location.host}/${topic}`, {
body: event.message,
icon: '/static/img/favicon.png'
});
diff --git a/server/topic.go b/server/topic.go
index e83c2966..ab9f26b8 100644
--- a/server/topic.go
+++ b/server/topic.go
@@ -21,15 +21,10 @@ type topic struct {
mu sync.Mutex
}
-// message represents a message published to a topic
-type message struct {
- Time int64 `json:"time"`
- Message string `json:"message"`
-}
-
// subscriber is a function that is called for every new message on a topic
type subscriber func(msg *message) error
+// newTopic creates a new topic
func newTopic(id string) *topic {
ctx, cancel := context.WithCancel(context.Background())
return &topic{