WIP: CLI, relates to #46

pull/60/head
Philipp Heckel 2021-12-16 20:33:01 -05:00
parent 4346f55b29
commit 1e8421e8ce
10 changed files with 571 additions and 102 deletions

135
client/client.go 100644
View File

@ -0,0 +1,135 @@
package client
import (
"bufio"
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"strings"
"sync"
"time"
)
const (
DefaultBaseURL = "https://ntfy.sh"
)
const (
MessageEvent = "message"
KeepaliveEvent = "keepalive"
OpenEvent = "open"
)
type Client struct {
BaseURL string
Messages chan *Message
subscriptions map[string]*subscription
mu sync.Mutex
}
type Message struct {
ID string
Event string
Time int64
Topic string
Message string
Title string
Priority int
Tags []string
BaseURL string
TopicURL string
Raw string
}
type subscription struct {
cancel context.CancelFunc
}
var DefaultClient = New()
func New() *Client {
return &Client{
Messages: make(chan *Message),
subscriptions: make(map[string]*subscription),
}
}
func (c *Client) Publish(topicURL, message string, options ...PublishOption) error {
req, _ := http.NewRequest("POST", topicURL, strings.NewReader(message))
for _, option := range options {
if err := option(req); err != nil {
return err
}
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected response %d from server", resp.StatusCode)
}
return err
}
func (c *Client) Subscribe(topicURL string) {
c.mu.Lock()
defer c.mu.Unlock()
if _, ok := c.subscriptions[topicURL]; ok {
return
}
ctx, cancel := context.WithCancel(context.Background())
c.subscriptions[topicURL] = &subscription{cancel}
go handleConnectionLoop(ctx, c.Messages, topicURL)
}
func (c *Client) Unsubscribe(topicURL string) {
c.mu.Lock()
defer c.mu.Unlock()
sub, ok := c.subscriptions[topicURL]
if !ok {
return
}
sub.cancel()
return
}
func handleConnectionLoop(ctx context.Context, msgChan chan *Message, topicURL string) {
for {
if err := handleConnection(ctx, msgChan, topicURL); err != nil {
log.Printf("connection to %s failed: %s", topicURL, err.Error())
}
select {
case <-ctx.Done():
log.Printf("connection to %s exited", topicURL)
return
case <-time.After(5 * time.Second):
}
}
}
func handleConnection(ctx context.Context, msgChan chan *Message, topicURL string) error {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/json", topicURL), nil)
if err != nil {
return err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
var m *Message
line := scanner.Text()
if err := json.NewDecoder(strings.NewReader(line)).Decode(&m); err != nil {
return err
}
m.BaseURL = strings.TrimSuffix(topicURL, "/"+m.Topic) // FIXME hack!
m.TopicURL = topicURL
m.Raw = line
msgChan <- m
}
return nil
}

71
client/options.go 100644
View File

@ -0,0 +1,71 @@
package client
import (
"net/http"
)
type PublishOption func(r *http.Request) error
func WithTitle(title string) PublishOption {
return func(r *http.Request) error {
if title != "" {
r.Header.Set("X-Title", title)
}
return nil
}
}
func WithPriority(priority string) PublishOption {
return func(r *http.Request) error {
if priority != "" {
r.Header.Set("X-Priority", priority)
}
return nil
}
}
func WithTags(tags string) PublishOption {
return func(r *http.Request) error {
if tags != "" {
r.Header.Set("X-Tags", tags)
}
return nil
}
}
func WithDelay(delay string) PublishOption {
return func(r *http.Request) error {
if delay != "" {
r.Header.Set("X-Delay", delay)
}
return nil
}
}
func WithNoCache() PublishOption {
return WithHeader("X-Cache", "no")
}
func WithNoFirebase() PublishOption {
return WithHeader("X-Firebase", "no")
}
func WithHeader(header, value string) PublishOption {
return func(r *http.Request) error {
r.Header.Set(header, value)
return nil
}
}
type SubscribeOption func(r *http.Request) error
func WithSince(since string) PublishOption {
return func(r *http.Request) error {
if since != "" {
q := r.URL.Query()
q.Add("since", since)
r.URL.RawQuery = q.Encode()
}
return nil
}
}

View File

@ -2,112 +2,42 @@
package cmd
import (
"errors"
"fmt"
"github.com/urfave/cli/v2"
"github.com/urfave/cli/v2/altsrc"
"heckel.io/ntfy/config"
"heckel.io/ntfy/server"
"heckel.io/ntfy/client"
"heckel.io/ntfy/util"
"log"
"os"
"time"
"strings"
)
// New creates a new CLI application
func New() *cli.App {
flags := []cli.Flag{
&cli.StringFlag{Name: "config", Aliases: []string{"c"}, EnvVars: []string{"NTFY_CONFIG_FILE"}, Value: "/etc/ntfy/config.yml", DefaultText: "/etc/ntfy/config.yml", Usage: "config file"},
altsrc.NewStringFlag(&cli.StringFlag{Name: "listen-http", Aliases: []string{"l"}, EnvVars: []string{"NTFY_LISTEN_HTTP"}, Value: config.DefaultListenHTTP, Usage: "ip:port used to as HTTP listen address"}),
altsrc.NewStringFlag(&cli.StringFlag{Name: "listen-https", Aliases: []string{"L"}, EnvVars: []string{"NTFY_LISTEN_HTTPS"}, Usage: "ip:port used to as HTTPS listen address"}),
altsrc.NewStringFlag(&cli.StringFlag{Name: "key-file", Aliases: []string{"K"}, EnvVars: []string{"NTFY_KEY_FILE"}, Usage: "private key file, if listen-https is set"}),
altsrc.NewStringFlag(&cli.StringFlag{Name: "cert-file", Aliases: []string{"E"}, EnvVars: []string{"NTFY_CERT_FILE"}, Usage: "certificate file, if listen-https is set"}),
altsrc.NewStringFlag(&cli.StringFlag{Name: "firebase-key-file", Aliases: []string{"F"}, EnvVars: []string{"NTFY_FIREBASE_KEY_FILE"}, Usage: "Firebase credentials file; if set additionally publish to FCM topic"}),
altsrc.NewStringFlag(&cli.StringFlag{Name: "cache-file", Aliases: []string{"C"}, EnvVars: []string{"NTFY_CACHE_FILE"}, Usage: "cache file used for message caching"}),
altsrc.NewDurationFlag(&cli.DurationFlag{Name: "cache-duration", Aliases: []string{"b"}, EnvVars: []string{"NTFY_CACHE_DURATION"}, Value: config.DefaultCacheDuration, Usage: "buffer messages for this time to allow `since` requests"}),
altsrc.NewDurationFlag(&cli.DurationFlag{Name: "keepalive-interval", Aliases: []string{"k"}, EnvVars: []string{"NTFY_KEEPALIVE_INTERVAL"}, Value: config.DefaultKeepaliveInterval, Usage: "interval of keepalive messages"}),
altsrc.NewDurationFlag(&cli.DurationFlag{Name: "manager-interval", Aliases: []string{"m"}, EnvVars: []string{"NTFY_MANAGER_INTERVAL"}, Value: config.DefaultManagerInterval, Usage: "interval of for message pruning and stats printing"}),
altsrc.NewIntFlag(&cli.IntFlag{Name: "global-topic-limit", Aliases: []string{"T"}, EnvVars: []string{"NTFY_GLOBAL_TOPIC_LIMIT"}, Value: config.DefaultGlobalTopicLimit, Usage: "total number of topics allowed"}),
altsrc.NewIntFlag(&cli.IntFlag{Name: "visitor-subscription-limit", Aliases: []string{"V"}, EnvVars: []string{"NTFY_VISITOR_SUBSCRIPTION_LIMIT"}, Value: config.DefaultVisitorSubscriptionLimit, Usage: "number of subscriptions per visitor"}),
altsrc.NewIntFlag(&cli.IntFlag{Name: "visitor-request-limit-burst", Aliases: []string{"B"}, EnvVars: []string{"NTFY_VISITOR_REQUEST_LIMIT_BURST"}, Value: config.DefaultVisitorRequestLimitBurst, Usage: "initial limit of requests per visitor"}),
altsrc.NewDurationFlag(&cli.DurationFlag{Name: "visitor-request-limit-replenish", Aliases: []string{"R"}, EnvVars: []string{"NTFY_VISITOR_REQUEST_LIMIT_REPLENISH"}, Value: config.DefaultVisitorRequestLimitReplenish, Usage: "interval at which burst limit is replenished (one per x)"}),
altsrc.NewBoolFlag(&cli.BoolFlag{Name: "behind-proxy", Aliases: []string{"P"}, EnvVars: []string{"NTFY_BEHIND_PROXY"}, Value: false, Usage: "if set, use X-Forwarded-For header to determine visitor IP address (for rate limiting)"}),
}
return &cli.App{
Name: "ntfy",
Usage: "Simple pub-sub notification service",
UsageText: "ntfy [OPTION..]",
HideHelp: true,
HideVersion: true,
EnableBashCompletion: true,
UseShortOptionHandling: true,
Reader: os.Stdin,
Writer: os.Stdout,
ErrWriter: os.Stderr,
Action: execRun,
Before: initConfigFileInputSource("config", flags),
Flags: flags,
Action: execMainApp,
Before: initConfigFileInputSource("config", flagsServe), // DEPRECATED, see deprecation notice
Flags: flagsServe, // DEPRECATED, see deprecation notice
Commands: []*cli.Command{
cmdServe,
cmdPublish,
cmdSubscribe,
},
}
}
func execRun(c *cli.Context) error {
// Read all the options
listenHTTP := c.String("listen-http")
listenHTTPS := c.String("listen-https")
keyFile := c.String("key-file")
certFile := c.String("cert-file")
firebaseKeyFile := c.String("firebase-key-file")
cacheFile := c.String("cache-file")
cacheDuration := c.Duration("cache-duration")
keepaliveInterval := c.Duration("keepalive-interval")
managerInterval := c.Duration("manager-interval")
globalTopicLimit := c.Int("global-topic-limit")
visitorSubscriptionLimit := c.Int("visitor-subscription-limit")
visitorRequestLimitBurst := c.Int("visitor-request-limit-burst")
visitorRequestLimitReplenish := c.Duration("visitor-request-limit-replenish")
behindProxy := c.Bool("behind-proxy")
// Check values
if firebaseKeyFile != "" && !util.FileExists(firebaseKeyFile) {
return errors.New("if set, FCM key file must exist")
} else if keepaliveInterval < 5*time.Second {
return errors.New("keepalive interval cannot be lower than five seconds")
} else if managerInterval < 5*time.Second {
return errors.New("manager interval cannot be lower than five seconds")
} else if cacheDuration > 0 && cacheDuration < managerInterval {
return errors.New("cache duration cannot be lower than manager interval")
} else if keyFile != "" && !util.FileExists(keyFile) {
return errors.New("if set, key file must exist")
} else if certFile != "" && !util.FileExists(certFile) {
return errors.New("if set, certificate file must exist")
} else if listenHTTPS != "" && (keyFile == "" || certFile == "") {
return errors.New("if listen-https is set, both key-file and cert-file must be set")
}
// Run server
conf := config.New(listenHTTP)
conf.ListenHTTPS = listenHTTPS
conf.KeyFile = keyFile
conf.CertFile = certFile
conf.FirebaseKeyFile = firebaseKeyFile
conf.CacheFile = cacheFile
conf.CacheDuration = cacheDuration
conf.KeepaliveInterval = keepaliveInterval
conf.ManagerInterval = managerInterval
conf.GlobalTopicLimit = globalTopicLimit
conf.VisitorSubscriptionLimit = visitorSubscriptionLimit
conf.VisitorRequestLimitBurst = visitorRequestLimitBurst
conf.VisitorRequestLimitReplenish = visitorRequestLimitReplenish
conf.BehindProxy = behindProxy
s, err := server.New(conf)
if err != nil {
log.Fatalln(err)
}
if err := s.Run(); err != nil {
log.Fatalln(err)
}
log.Printf("Exiting.")
return nil
func execMainApp(c *cli.Context) error {
log.Printf("\x1b[1;33mDeprecation notice: Please run the server using 'ntfy serve'; see 'ntfy -h' for help.\x1b[0m")
log.Printf("\x1b[1;33mThis way of running the server will be removed Feb 2022.\x1b[0m")
return execServe(c)
}
// initConfigFileInputSource is like altsrc.InitInputSourceWithContext and altsrc.NewYamlSourceFromFlagFunc, but checks
@ -127,3 +57,16 @@ func initConfigFileInputSource(configFlag string, flags []cli.Flag) cli.BeforeFu
return altsrc.ApplyInputSourceValues(context, inputSource, flags)
}
}
func expandTopicURL(s string) string {
if strings.HasPrefix(s, "http://") || strings.HasPrefix(s, "https://") {
return s
} else if strings.Contains(s, "/") {
return fmt.Sprintf("https://%s", s)
}
return fmt.Sprintf("%s/%s", client.DefaultBaseURL, s)
}
func collapseTopicURL(s string) string {
return strings.TrimPrefix(strings.TrimPrefix(s, "https://"), "http://")
}

70
cmd/publish.go 100644
View File

@ -0,0 +1,70 @@
package cmd
import (
"errors"
"github.com/urfave/cli/v2"
"heckel.io/ntfy/client"
"strings"
)
var cmdPublish = &cli.Command{
Name: "publish",
Aliases: []string{"pub", "send"},
Usage: "Send message via a ntfy server",
UsageText: "ntfy send [OPTIONS..] TOPIC MESSAGE",
Action: execPublish,
Flags: []cli.Flag{
&cli.StringFlag{Name: "title", Aliases: []string{"t"}, Usage: "message title"},
&cli.StringFlag{Name: "priority", Aliases: []string{"p"}, Usage: "priority of the message (1=min, 2=low, 3=default, 4=high, 5=max)"},
&cli.StringFlag{Name: "tags", Aliases: []string{"ta"}, Usage: "comma separated list of tags and emojis"},
&cli.StringFlag{Name: "delay", Aliases: []string{"at", "in"}, Usage: "delay/schedule message"},
&cli.BoolFlag{Name: "no-cache", Aliases: []string{"C"}, Usage: "do not cache message server-side"},
&cli.BoolFlag{Name: "no-firebase", Aliases: []string{"F"}, Usage: "do not forward message to Firebase"},
},
Description: `Publish a message to a ntfy server.
Examples:
ntfy publish mytopic This is my message # Send simple message
ntfy send myserver.com/mytopic "This is my message" # Send message to different default host
ntfy pub -p high backups "Backups failed" # Send high priority message
ntfy pub --tags=warning,skull backups "Backups failed" # Add tags/emojis to message
ntfy pub --delay=10s delayed_topic Laterzz # Delay message by 10s
ntfy pub --at=8:30am delayed_topic Laterzz # Send message at 8:30am
Please also check out the docs on publishing messages. Especially for the --tags and --delay options,
it has incredibly useful information: https://ntfy.sh/docs/publish/.`,
}
func execPublish(c *cli.Context) error {
if c.NArg() < 2 {
return errors.New("topic/message missing")
}
title := c.String("title")
priority := c.String("priority")
tags := c.String("tags")
delay := c.String("delay")
noCache := c.Bool("no-cache")
noFirebase := c.Bool("no-firebase")
topicURL := expandTopicURL(c.Args().Get(0))
message := strings.Join(c.Args().Slice()[1:], " ")
var options []client.PublishOption
if title != "" {
options = append(options, client.WithTitle(title))
}
if priority != "" {
options = append(options, client.WithPriority(priority))
}
if tags != "" {
options = append(options, client.WithTags(tags))
}
if delay != "" {
options = append(options, client.WithDelay(delay))
}
if noCache {
options = append(options, client.WithNoCache())
}
if noFirebase {
options = append(options, client.WithNoFirebase())
}
return client.DefaultClient.Publish(topicURL, message, options...)
}

108
cmd/serve.go 100644
View File

@ -0,0 +1,108 @@
// Package cmd provides the ntfy CLI application
package cmd
import (
"errors"
"github.com/urfave/cli/v2"
"github.com/urfave/cli/v2/altsrc"
"heckel.io/ntfy/config"
"heckel.io/ntfy/server"
"heckel.io/ntfy/util"
"log"
"time"
)
var flagsServe = []cli.Flag{
&cli.StringFlag{Name: "config", Aliases: []string{"c"}, EnvVars: []string{"NTFY_CONFIG_FILE"}, Value: "/etc/ntfy/config.yml", DefaultText: "/etc/ntfy/config.yml", Usage: "config file"},
altsrc.NewStringFlag(&cli.StringFlag{Name: "listen-http", Aliases: []string{"l"}, EnvVars: []string{"NTFY_LISTEN_HTTP"}, Value: config.DefaultListenHTTP, Usage: "ip:port used to as HTTP listen address"}),
altsrc.NewStringFlag(&cli.StringFlag{Name: "listen-https", Aliases: []string{"L"}, EnvVars: []string{"NTFY_LISTEN_HTTPS"}, Usage: "ip:port used to as HTTPS listen address"}),
altsrc.NewStringFlag(&cli.StringFlag{Name: "key-file", Aliases: []string{"K"}, EnvVars: []string{"NTFY_KEY_FILE"}, Usage: "private key file, if listen-https is set"}),
altsrc.NewStringFlag(&cli.StringFlag{Name: "cert-file", Aliases: []string{"E"}, EnvVars: []string{"NTFY_CERT_FILE"}, Usage: "certificate file, if listen-https is set"}),
altsrc.NewStringFlag(&cli.StringFlag{Name: "firebase-key-file", Aliases: []string{"F"}, EnvVars: []string{"NTFY_FIREBASE_KEY_FILE"}, Usage: "Firebase credentials file; if set additionally publish to FCM topic"}),
altsrc.NewStringFlag(&cli.StringFlag{Name: "cache-file", Aliases: []string{"C"}, EnvVars: []string{"NTFY_CACHE_FILE"}, Usage: "cache file used for message caching"}),
altsrc.NewDurationFlag(&cli.DurationFlag{Name: "cache-duration", Aliases: []string{"b"}, EnvVars: []string{"NTFY_CACHE_DURATION"}, Value: config.DefaultCacheDuration, Usage: "buffer messages for this time to allow `since` requests"}),
altsrc.NewDurationFlag(&cli.DurationFlag{Name: "keepalive-interval", Aliases: []string{"k"}, EnvVars: []string{"NTFY_KEEPALIVE_INTERVAL"}, Value: config.DefaultKeepaliveInterval, Usage: "interval of keepalive messages"}),
altsrc.NewDurationFlag(&cli.DurationFlag{Name: "manager-interval", Aliases: []string{"m"}, EnvVars: []string{"NTFY_MANAGER_INTERVAL"}, Value: config.DefaultManagerInterval, Usage: "interval of for message pruning and stats printing"}),
altsrc.NewIntFlag(&cli.IntFlag{Name: "global-topic-limit", Aliases: []string{"T"}, EnvVars: []string{"NTFY_GLOBAL_TOPIC_LIMIT"}, Value: config.DefaultGlobalTopicLimit, Usage: "total number of topics allowed"}),
altsrc.NewIntFlag(&cli.IntFlag{Name: "visitor-subscription-limit", Aliases: []string{"V"}, EnvVars: []string{"NTFY_VISITOR_SUBSCRIPTION_LIMIT"}, Value: config.DefaultVisitorSubscriptionLimit, Usage: "number of subscriptions per visitor"}),
altsrc.NewIntFlag(&cli.IntFlag{Name: "visitor-request-limit-burst", Aliases: []string{"B"}, EnvVars: []string{"NTFY_VISITOR_REQUEST_LIMIT_BURST"}, Value: config.DefaultVisitorRequestLimitBurst, Usage: "initial limit of requests per visitor"}),
altsrc.NewDurationFlag(&cli.DurationFlag{Name: "visitor-request-limit-replenish", Aliases: []string{"R"}, EnvVars: []string{"NTFY_VISITOR_REQUEST_LIMIT_REPLENISH"}, Value: config.DefaultVisitorRequestLimitReplenish, Usage: "interval at which burst limit is replenished (one per x)"}),
altsrc.NewBoolFlag(&cli.BoolFlag{Name: "behind-proxy", Aliases: []string{"P"}, EnvVars: []string{"NTFY_BEHIND_PROXY"}, Value: false, Usage: "if set, use X-Forwarded-For header to determine visitor IP address (for rate limiting)"}),
}
var cmdServe = &cli.Command{
Name: "serve",
Usage: "Run the ntfy server",
UsageText: "ntfy serve [OPTIONS..]",
Action: execServe,
Flags: flagsServe,
Before: initConfigFileInputSource("config", flagsServe),
Description: `Run the ntfy server and listen for incoming requests
The command will load the configuration from /etc/ntfy/config.yml. Config options can
be overridden using the command line options.
Examples:
ntfy serve # Starts server in the foreground (on port 80)
ntfy serve --listen-http :8080 # Starts server with alternate port`,
}
func execServe(c *cli.Context) error {
// Read all the options
listenHTTP := c.String("listen-http")
listenHTTPS := c.String("listen-https")
keyFile := c.String("key-file")
certFile := c.String("cert-file")
firebaseKeyFile := c.String("firebase-key-file")
cacheFile := c.String("cache-file")
cacheDuration := c.Duration("cache-duration")
keepaliveInterval := c.Duration("keepalive-interval")
managerInterval := c.Duration("manager-interval")
globalTopicLimit := c.Int("global-topic-limit")
visitorSubscriptionLimit := c.Int("visitor-subscription-limit")
visitorRequestLimitBurst := c.Int("visitor-request-limit-burst")
visitorRequestLimitReplenish := c.Duration("visitor-request-limit-replenish")
behindProxy := c.Bool("behind-proxy")
// Check values
if firebaseKeyFile != "" && !util.FileExists(firebaseKeyFile) {
return errors.New("if set, FCM key file must exist")
} else if keepaliveInterval < 5*time.Second {
return errors.New("keepalive interval cannot be lower than five seconds")
} else if managerInterval < 5*time.Second {
return errors.New("manager interval cannot be lower than five seconds")
} else if cacheDuration > 0 && cacheDuration < managerInterval {
return errors.New("cache duration cannot be lower than manager interval")
} else if keyFile != "" && !util.FileExists(keyFile) {
return errors.New("if set, key file must exist")
} else if certFile != "" && !util.FileExists(certFile) {
return errors.New("if set, certificate file must exist")
} else if listenHTTPS != "" && (keyFile == "" || certFile == "") {
return errors.New("if listen-https is set, both key-file and cert-file must be set")
}
// Run server
conf := config.New(listenHTTP)
conf.ListenHTTPS = listenHTTPS
conf.KeyFile = keyFile
conf.CertFile = certFile
conf.FirebaseKeyFile = firebaseKeyFile
conf.CacheFile = cacheFile
conf.CacheDuration = cacheDuration
conf.KeepaliveInterval = keepaliveInterval
conf.ManagerInterval = managerInterval
conf.GlobalTopicLimit = globalTopicLimit
conf.VisitorSubscriptionLimit = visitorSubscriptionLimit
conf.VisitorRequestLimitBurst = visitorRequestLimitBurst
conf.VisitorRequestLimitReplenish = visitorRequestLimitReplenish
conf.BehindProxy = behindProxy
s, err := server.New(conf)
if err != nil {
log.Fatalln(err)
}
if err := s.Run(); err != nil {
log.Fatalln(err)
}
log.Printf("Exiting.")
return nil
}

132
cmd/subscribe.go 100644
View File

@ -0,0 +1,132 @@
package cmd
import (
"errors"
"fmt"
"github.com/urfave/cli/v2"
"heckel.io/ntfy/client"
"heckel.io/ntfy/util"
"log"
"os"
"os/exec"
"strings"
)
var cmdSubscribe = &cli.Command{
Name: "subscribe",
Aliases: []string{"sub"},
Usage: "Subscribe to one or more topics on a ntfy server",
UsageText: "ntfy subscribe [OPTIONS..] TOPIC",
Action: execSubscribe,
Flags: []cli.Flag{
&cli.StringFlag{Name: "exec", Aliases: []string{"e"}, Usage: "execute command for each message event"},
&cli.StringFlag{Name: "since", Aliases: []string{"s"}, Usage: "return events since (Unix timestamp, or all)"},
},
Description: `(THIS COMMAND IS INCUBATING. IT MAY CHANGE WITHOUT NOTICE.)
Subscribe to one or more topics on a ntfy server, and either print
or execute commands for every arriving message.
By default, the subscribe command just prints the JSON representation of a message.
When --exec is passed, each incoming message will execute a command. The message fields
are passed to the command as environment variables:
Variable Aliases Description
--------------- --------------- -----------------------------------
$NTFY_MESSAGE $message, $m Message body
$NTFY_TITLE $title, $t Message title
$NTFY_PRIORITY $priority, $p Message priority (1=min, 5=max)
$NTFY_TAGS $tags, $ta Message tags (comma separated list)
$NTFY_ID $id Unique message ID
$NTFY_TIME $time Unix timestamp of the message delivery
$NTFY_TOPIC $topic Topic name
$NTFY_EVENT $event, $ev Event identifier (always "message")
Examples:
ntfy subscribe mytopic # Prints JSON for incoming messages to stdout
ntfy sub home.lan/backups alerts # Subscribe to two different topics
ntfy sub --exec='notify-send "$m"' mytopic # Execute command for incoming messages'
`,
}
func execSubscribe(c *cli.Context) error {
if c.NArg() < 1 {
return errors.New("topic missing")
}
log.Printf("\x1b[1;33mThis command is incubating. The interface may change without notice.\x1b[0m")
cl := client.DefaultClient
command := c.String("exec")
for _, topic := range c.Args().Slice() {
cl.Subscribe(expandTopicURL(topic))
}
for m := range cl.Messages {
_ = dispatchMessage(c, command, m)
}
return nil
}
func dispatchMessage(c *cli.Context, command string, m *client.Message) error {
if command != "" {
return execCommand(c, command, m)
}
fmt.Println(m.Raw)
return nil
}
func execCommand(c *cli.Context, command string, m *client.Message) error {
if m.Event == client.OpenEvent {
log.Printf("[%s] Connection opened, subscribed to topic", collapseTopicURL(m.TopicURL))
} else if m.Event == client.MessageEvent {
go func() {
if err := runCommandInternal(c, command, m); err != nil {
log.Printf("[%s] Command failed: %s", collapseTopicURL(m.TopicURL), err.Error())
}
}()
}
return nil
}
func runCommandInternal(c *cli.Context, command string, m *client.Message) error {
scriptFile, err := createTmpScript(command)
if err != nil {
return err
}
defer os.Remove(scriptFile)
log.Printf("[%s] Executing: %s (for message: %s)", collapseTopicURL(m.TopicURL), command, m.Raw)
cmd := exec.Command("sh", "-c", scriptFile)
cmd.Stdin = c.App.Reader
cmd.Stdout = c.App.Writer
cmd.Stderr = c.App.ErrWriter
cmd.Env = envVars(m)
return cmd.Run()
}
func createTmpScript(command string) (string, error) {
scriptFile := fmt.Sprintf("%s/ntfy-subscribe-%s.sh.tmp", os.TempDir(), util.RandomString(10))
script := fmt.Sprintf("#!/bin/sh\n%s", command)
if err := os.WriteFile(scriptFile, []byte(script), 0700); err != nil {
return "", err
}
return scriptFile, nil
}
func envVars(m *client.Message) []string {
env := os.Environ()
env = append(env, envVar(m.ID, "NTFY_ID", "id")...)
env = append(env, envVar(m.Event, "NTFY_EVENT", "event", "ev")...)
env = append(env, envVar(m.Topic, "NTFY_TOPIC", "topic")...)
env = append(env, envVar(fmt.Sprintf("%d", m.Time), "NTFY_TIME", "time")...)
env = append(env, envVar(m.Message, "NTFY_MESSAGE", "message", "m")...)
env = append(env, envVar(m.Title, "NTFY_TITLE", "title", "t")...)
env = append(env, envVar(fmt.Sprintf("%d", m.Priority), "NTFY_PRIORITY", "priority", "prio", "p")...)
env = append(env, envVar(strings.Join(m.Tags, ","), "NTFY_TAGS", "tags", "ta")...)
return env
}
func envVar(value string, vars ...string) []string {
env := make([]string, 0)
for _, v := range vars {
env = append(env, fmt.Sprintf("%s=%s", v, value))
}
return env
}

View File

@ -5,7 +5,7 @@ After=network.target
[Service]
User=ntfy
Group=ntfy
ExecStart=/usr/bin/ntfy
ExecStart=/usr/bin/ntfy serve
Restart=on-failure
AmbientCapabilities=CAP_NET_BIND_SERVICE
LimitNOFILE=10000

View File

@ -19,7 +19,7 @@ func main() {
Try 'ntfy COMMAND --help' for more information.
ntfy %s (%s), runtime %s, built at %s
Copyright (C) 2021 Philipp C. Heckel, distributed under the Apache License 2.0
Copyright (C) 2021 Philipp C. Heckel, licensed under Apache License 2.0 & GPLv2
`, version, commit[:7], runtime.Version(), date)
app := cmd.New()

View File

@ -328,22 +328,9 @@ func (s *Server) parseParams(r *http.Request, m *message) (cache bool, firebase
if messageStr != "" {
m.Message = messageStr
}
priorityStr := readParam(r, "x-priority", "priority", "prio", "p")
if priorityStr != "" {
switch strings.ToLower(priorityStr) {
case "1", "min":
m.Priority = 1
case "2", "low":
m.Priority = 2
case "3", "default":
m.Priority = 3
case "4", "high":
m.Priority = 4
case "5", "max", "urgent":
m.Priority = 5
default:
return false, false, errHTTPBadRequest
}
m.Priority, err = util.ParsePriority(readParam(r, "x-priority", "priority", "prio", "p"))
if err != nil {
return false, false, errHTTPBadRequest
}
tagsStr := readParam(r, "x-tags", "tag", "tags", "ta")
if tagsStr != "" {

View File

@ -1,9 +1,11 @@
package util
import (
"errors"
"fmt"
"math/rand"
"os"
"strings"
"sync"
"time"
)
@ -15,6 +17,8 @@ const (
var (
random = rand.New(rand.NewSource(time.Now().UnixNano()))
randomMutex = sync.Mutex{}
errInvalidPriority = errors.New("unknown priority")
)
// FileExists checks if a file exists, and returns true if it does
@ -75,3 +79,22 @@ func DurationToHuman(d time.Duration) (str string) {
}
return
}
func ParsePriority(priority string) (int, error) {
switch strings.ToLower(priority) {
case "":
return 0, nil
case "1", "min":
return 1, nil
case "2", "low":
return 2, nil
case "3", "default":
return 3, nil
case "4", "high":
return 4, nil
case "5", "max", "urgent":
return 5, nil
default:
return 0, errInvalidPriority
}
}