diff --git a/cmd/serve.go b/cmd/serve.go index ecc4d4a1..00d36aa8 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -74,6 +74,11 @@ var flagsServe = append( altsrc.NewIntFlag(&cli.IntFlag{Name: "visitor-email-limit-burst", Aliases: []string{"visitor_email_limit_burst"}, EnvVars: []string{"NTFY_VISITOR_EMAIL_LIMIT_BURST"}, Value: server.DefaultVisitorEmailLimitBurst, Usage: "initial limit of e-mails per visitor"}), altsrc.NewDurationFlag(&cli.DurationFlag{Name: "visitor-email-limit-replenish", Aliases: []string{"visitor_email_limit_replenish"}, EnvVars: []string{"NTFY_VISITOR_EMAIL_LIMIT_REPLENISH"}, Value: server.DefaultVisitorEmailLimitReplenish, Usage: "interval at which burst limit is replenished (one per x)"}), altsrc.NewBoolFlag(&cli.BoolFlag{Name: "behind-proxy", Aliases: []string{"behind_proxy", "P"}, EnvVars: []string{"NTFY_BEHIND_PROXY"}, Value: false, Usage: "if set, use X-Forwarded-For header to determine visitor IP address (for rate limiting)"}), + altsrc.NewStringFlag(&cli.StringFlag{Name: "mqtt-server", Aliases: []string{"mqtt_server"}, EnvVars: []string{"NTFY_MQTT_SERVER"}, Value: "", Usage: "if set, ntfy try to connect to this mqtt server"}), + altsrc.NewIntFlag(&cli.IntFlag{Name: "mqtt-port", Aliases: []string{"mqtt_port"}, EnvVars: []string{"NTFY_MQTT_PORT"}, Value: 1883, Usage: "port of mqtt server"}), + altsrc.NewStringFlag(&cli.StringFlag{Name: "mqtt-username", Aliases: []string{"mqtt_username"}, EnvVars: []string{"NTFY_MQTT_USERNAME"}, Value: "", Usage: "if set ntfy connect to mqtt server in authentificated mode"}), + altsrc.NewStringFlag(&cli.StringFlag{Name: "mqtt-password", Aliases: []string{"mqtt_password"}, EnvVars: []string{"NTFY_MQTT_PASSWORD"}, Value: "", Usage: "must be set set if mqtt-username is set"}), + altsrc.NewStringFlag(&cli.StringFlag{Name: "mqtt-topic", Aliases: []string{"mqtt_topic"}, EnvVars: []string{"NTFY_MQTT_TOPIC"}, Value: "ntfy", Usage: "default mqtt topic to listen, message need to be send in json format on mqtt-topic/message, by defaut it's ntfy/message"}), ) var cmdServe = &cli.Command{ @@ -141,6 +146,11 @@ func execServe(c *cli.Context) error { visitorEmailLimitBurst := c.Int("visitor-email-limit-burst") visitorEmailLimitReplenish := c.Duration("visitor-email-limit-replenish") behindProxy := c.Bool("behind-proxy") + mqttServer := c.String("mqtt-server") + mqttPort := c.Int("mqtt-port") + mqttUsername := c.String("mqtt-usernameig") + mqttPassword := c.String("mqtt-password") + mqttTopic := c.String("mqtt-topic") // Check values if firebaseKeyFile != "" && !util.FileExists(firebaseKeyFile) { @@ -269,6 +279,11 @@ func execServe(c *cli.Context) error { conf.BehindProxy = behindProxy conf.EnableWeb = enableWeb conf.Version = c.App.Version + conf.MqttServer = mqttServer + conf.MqttPort = mqttPort + conf.MqttUsername = mqttUsername + conf.MqttPassword = mqttPassword + conf.MqttTopic = mqttTopic // Set up hot-reloading of config go sigHandlerConfigReload(config) diff --git a/server/mqtt.go b/server/mqtt.go new file mode 100644 index 00000000..6cdd3916 --- /dev/null +++ b/server/mqtt.go @@ -0,0 +1,93 @@ +package server + +import ( + "fmt" + "bytes" + mqtt "github.com/eclipse/paho.mqtt.golang" + "heckel.io/ntfy/log" + "encoding/json" + "net/http" + "net/http/httptest" + "strings" +) + +type mqttBackend struct { + config *Config + handler func(http.ResponseWriter, *http.Request) + client mqtt.Client +} + +type mqttMessage struct { + Topic string `json:"topic"` + Title string `json:"title"` + Message string `json:"message"` + Priority string `json:"priority"` + Tags string `json:"tags"` + Actions string `json:"actions"` +} + +func newMqttBackend(conf *Config, handler func(http.ResponseWriter, *http.Request)) *mqttBackend { + return &mqttBackend{ + config: conf, + handler: handler, + } +} + +func (b *mqttBackend) Connect() () { + opts := mqtt.NewClientOptions() + opts.AddBroker(fmt.Sprintf("tcp://%s:%d", b.config.MqttServer, b.config.MqttPort)) + opts.SetClientID("ntfy") + opts.SetUsername(b.config.MqttUsername) + opts.SetPassword(b.config.MqttPassword) + log.Info("[mqtt] Connect to mqtt server %s:%d", b.config.MqttServer, b.config.MqttPort) + client := mqtt.NewClient(opts) + if token := client.Connect(); token.Wait() && token.Error() != nil { + log.Debug("[mqtt] Can not connect") + return + } + b.client = client + return +} + +func (b *mqttBackend) Subscribe() () { + b.client.Subscribe(b.config.MqttTopic+"/message", 0, func(client mqtt.Client, msg mqtt.Message) { + log.Debug("[mqtt] Received message [%s] %s ", msg.Topic(), string(msg.Payload())) + var m mqttMessage + if err := json.NewDecoder(bytes.NewReader(msg.Payload())).Decode(&m); err != nil { + log.Info("[mqtt] not a valide json") + return + } + if !topicRegex.MatchString(m.Topic) { + log.Info("[mqtt] not a valide topic") + return + } + if m.Message == "" { + m.Message = emptyMessageBody + } + url := fmt.Sprintf("%s/%s", b.config.BaseURL, m.Topic) + req, err := http.NewRequest("POST", url, strings.NewReader(m.Message)) + req.RequestURI = "/" + m.Topic + req.RemoteAddr = "127.0.0.1" + if err != nil { + return + } + if m.Title != "" { + req.Header.Set("Title", m.Title) + } + if m.Priority != "" { + req.Header.Set("Priority",m.Priority) + } + if m.Tags != "" { + req.Header.Set("Tags",m.Tags) + } + if m.Actions != "" { + req.Header.Set("Actions",m.Actions) + } + rr := httptest.NewRecorder() + b.handler(rr, req) + if rr.Code != http.StatusOK { + return + } + }) + log.Info("[mqtt] Subscribed to topic %s/message", b.config.MqttTopic) +} \ No newline at end of file diff --git a/server/server.go b/server/server.go index 14ab0070..a85f5bc2 100644 --- a/server/server.go +++ b/server/server.go @@ -41,6 +41,7 @@ type Server struct { unixListener net.Listener smtpServer *smtp.Server smtpServerBackend *smtpBackend + mqttServerBackend *mqttBackend smtpSender mailer topics map[string]*topic visitors map[netip.Addr]*visitor @@ -228,6 +229,11 @@ func (s *Server) Run() error { errChan <- s.runSMTPServer() }() } + if s.config.MqttServer != "" { + go func() { + s.runMqttServer() + }() + } s.mu.Unlock() go s.runManager() go s.runDelayedSender() @@ -1221,6 +1227,13 @@ func (s *Server) runSMTPServer() error { return s.smtpServer.ListenAndServe() } +func (s *Server) runMqttServer() { + s.mqttServerBackend = newMqttBackend(s.config, s.handle) + s.mqttServerBackend.Connect() + s.mqttServerBackend.Subscribe() + return +} + func (s *Server) runManager() { for { select { diff --git a/server/server.yml b/server/server.yml index 1b268995..c15963f4 100644 --- a/server/server.yml +++ b/server/server.yml @@ -203,6 +203,18 @@ # visitor-attachment-total-size-limit: "100M" # visitor-attachment-daily-bandwidth-limit: "500M" +# MQTT configuration, if mqtt-server is empty no connexion to MQTT server : +# - mqtt-server if it's empty no connexion to Mqtt server +# - mqtt-port mqtt server port, by defaut it's 1883 +# - mqtt-username if it's empty no authentification +# - mqtt-password can not be empty if username is not empty +# - mqtt-topic defaut topic to listen, message need to be send in json format on mqtt-topic/message, by defaut it's ntfy/message +# mqtt-server: "" +# mqtt-port: 1883 +# mqtt-username: "" +# mqtt-password: "" +# mqtt-topic: "ntfy" + # Log level, can be TRACE, DEBUG, INFO, WARN or ERROR # This option can be hot-reloaded by calling "kill -HUP $pid" or "systemctl reload ntfy". #