add mqtt listenner
This commit is contained in:
parent
2624897efe
commit
db518b5c19
4 changed files with 133 additions and 0 deletions
15
cmd/serve.go
15
cmd/serve.go
|
@ -74,6 +74,11 @@ var flagsServe = append(
|
||||||
altsrc.NewIntFlag(&cli.IntFlag{Name: "visitor-email-limit-burst", Aliases: []string{"visitor_email_limit_burst"}, EnvVars: []string{"NTFY_VISITOR_EMAIL_LIMIT_BURST"}, Value: server.DefaultVisitorEmailLimitBurst, Usage: "initial limit of e-mails per visitor"}),
|
altsrc.NewIntFlag(&cli.IntFlag{Name: "visitor-email-limit-burst", Aliases: []string{"visitor_email_limit_burst"}, EnvVars: []string{"NTFY_VISITOR_EMAIL_LIMIT_BURST"}, Value: server.DefaultVisitorEmailLimitBurst, Usage: "initial limit of e-mails per visitor"}),
|
||||||
altsrc.NewDurationFlag(&cli.DurationFlag{Name: "visitor-email-limit-replenish", Aliases: []string{"visitor_email_limit_replenish"}, EnvVars: []string{"NTFY_VISITOR_EMAIL_LIMIT_REPLENISH"}, Value: server.DefaultVisitorEmailLimitReplenish, Usage: "interval at which burst limit is replenished (one per x)"}),
|
altsrc.NewDurationFlag(&cli.DurationFlag{Name: "visitor-email-limit-replenish", Aliases: []string{"visitor_email_limit_replenish"}, EnvVars: []string{"NTFY_VISITOR_EMAIL_LIMIT_REPLENISH"}, Value: server.DefaultVisitorEmailLimitReplenish, Usage: "interval at which burst limit is replenished (one per x)"}),
|
||||||
altsrc.NewBoolFlag(&cli.BoolFlag{Name: "behind-proxy", Aliases: []string{"behind_proxy", "P"}, EnvVars: []string{"NTFY_BEHIND_PROXY"}, Value: false, Usage: "if set, use X-Forwarded-For header to determine visitor IP address (for rate limiting)"}),
|
altsrc.NewBoolFlag(&cli.BoolFlag{Name: "behind-proxy", Aliases: []string{"behind_proxy", "P"}, EnvVars: []string{"NTFY_BEHIND_PROXY"}, Value: false, Usage: "if set, use X-Forwarded-For header to determine visitor IP address (for rate limiting)"}),
|
||||||
|
altsrc.NewStringFlag(&cli.StringFlag{Name: "mqtt-server", Aliases: []string{"mqtt_server"}, EnvVars: []string{"NTFY_MQTT_SERVER"}, Value: "", Usage: "if set, ntfy try to connect to this mqtt server"}),
|
||||||
|
altsrc.NewIntFlag(&cli.IntFlag{Name: "mqtt-port", Aliases: []string{"mqtt_port"}, EnvVars: []string{"NTFY_MQTT_PORT"}, Value: 1883, Usage: "port of mqtt server"}),
|
||||||
|
altsrc.NewStringFlag(&cli.StringFlag{Name: "mqtt-username", Aliases: []string{"mqtt_username"}, EnvVars: []string{"NTFY_MQTT_USERNAME"}, Value: "", Usage: "if set ntfy connect to mqtt server in authentificated mode"}),
|
||||||
|
altsrc.NewStringFlag(&cli.StringFlag{Name: "mqtt-password", Aliases: []string{"mqtt_password"}, EnvVars: []string{"NTFY_MQTT_PASSWORD"}, Value: "", Usage: "must be set set if mqtt-username is set"}),
|
||||||
|
altsrc.NewStringFlag(&cli.StringFlag{Name: "mqtt-topic", Aliases: []string{"mqtt_topic"}, EnvVars: []string{"NTFY_MQTT_TOPIC"}, Value: "ntfy", Usage: "default mqtt topic to listen, message need to be send in json format on mqtt-topic/message, by defaut it's ntfy/message"}),
|
||||||
)
|
)
|
||||||
|
|
||||||
var cmdServe = &cli.Command{
|
var cmdServe = &cli.Command{
|
||||||
|
@ -141,6 +146,11 @@ func execServe(c *cli.Context) error {
|
||||||
visitorEmailLimitBurst := c.Int("visitor-email-limit-burst")
|
visitorEmailLimitBurst := c.Int("visitor-email-limit-burst")
|
||||||
visitorEmailLimitReplenish := c.Duration("visitor-email-limit-replenish")
|
visitorEmailLimitReplenish := c.Duration("visitor-email-limit-replenish")
|
||||||
behindProxy := c.Bool("behind-proxy")
|
behindProxy := c.Bool("behind-proxy")
|
||||||
|
mqttServer := c.String("mqtt-server")
|
||||||
|
mqttPort := c.Int("mqtt-port")
|
||||||
|
mqttUsername := c.String("mqtt-usernameig")
|
||||||
|
mqttPassword := c.String("mqtt-password")
|
||||||
|
mqttTopic := c.String("mqtt-topic")
|
||||||
|
|
||||||
// Check values
|
// Check values
|
||||||
if firebaseKeyFile != "" && !util.FileExists(firebaseKeyFile) {
|
if firebaseKeyFile != "" && !util.FileExists(firebaseKeyFile) {
|
||||||
|
@ -269,6 +279,11 @@ func execServe(c *cli.Context) error {
|
||||||
conf.BehindProxy = behindProxy
|
conf.BehindProxy = behindProxy
|
||||||
conf.EnableWeb = enableWeb
|
conf.EnableWeb = enableWeb
|
||||||
conf.Version = c.App.Version
|
conf.Version = c.App.Version
|
||||||
|
conf.MqttServer = mqttServer
|
||||||
|
conf.MqttPort = mqttPort
|
||||||
|
conf.MqttUsername = mqttUsername
|
||||||
|
conf.MqttPassword = mqttPassword
|
||||||
|
conf.MqttTopic = mqttTopic
|
||||||
|
|
||||||
// Set up hot-reloading of config
|
// Set up hot-reloading of config
|
||||||
go sigHandlerConfigReload(config)
|
go sigHandlerConfigReload(config)
|
||||||
|
|
93
server/mqtt.go
Normal file
93
server/mqtt.go
Normal file
|
@ -0,0 +1,93 @@
|
||||||
|
package server
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"bytes"
|
||||||
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||||
|
"heckel.io/ntfy/log"
|
||||||
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
type mqttBackend struct {
|
||||||
|
config *Config
|
||||||
|
handler func(http.ResponseWriter, *http.Request)
|
||||||
|
client mqtt.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
type mqttMessage struct {
|
||||||
|
Topic string `json:"topic"`
|
||||||
|
Title string `json:"title"`
|
||||||
|
Message string `json:"message"`
|
||||||
|
Priority string `json:"priority"`
|
||||||
|
Tags string `json:"tags"`
|
||||||
|
Actions string `json:"actions"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func newMqttBackend(conf *Config, handler func(http.ResponseWriter, *http.Request)) *mqttBackend {
|
||||||
|
return &mqttBackend{
|
||||||
|
config: conf,
|
||||||
|
handler: handler,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *mqttBackend) Connect() () {
|
||||||
|
opts := mqtt.NewClientOptions()
|
||||||
|
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", b.config.MqttServer, b.config.MqttPort))
|
||||||
|
opts.SetClientID("ntfy")
|
||||||
|
opts.SetUsername(b.config.MqttUsername)
|
||||||
|
opts.SetPassword(b.config.MqttPassword)
|
||||||
|
log.Info("[mqtt] Connect to mqtt server %s:%d", b.config.MqttServer, b.config.MqttPort)
|
||||||
|
client := mqtt.NewClient(opts)
|
||||||
|
if token := client.Connect(); token.Wait() && token.Error() != nil {
|
||||||
|
log.Debug("[mqtt] Can not connect")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
b.client = client
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *mqttBackend) Subscribe() () {
|
||||||
|
b.client.Subscribe(b.config.MqttTopic+"/message", 0, func(client mqtt.Client, msg mqtt.Message) {
|
||||||
|
log.Debug("[mqtt] Received message [%s] %s ", msg.Topic(), string(msg.Payload()))
|
||||||
|
var m mqttMessage
|
||||||
|
if err := json.NewDecoder(bytes.NewReader(msg.Payload())).Decode(&m); err != nil {
|
||||||
|
log.Info("[mqtt] not a valide json")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !topicRegex.MatchString(m.Topic) {
|
||||||
|
log.Info("[mqtt] not a valide topic")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if m.Message == "" {
|
||||||
|
m.Message = emptyMessageBody
|
||||||
|
}
|
||||||
|
url := fmt.Sprintf("%s/%s", b.config.BaseURL, m.Topic)
|
||||||
|
req, err := http.NewRequest("POST", url, strings.NewReader(m.Message))
|
||||||
|
req.RequestURI = "/" + m.Topic
|
||||||
|
req.RemoteAddr = "127.0.0.1"
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if m.Title != "" {
|
||||||
|
req.Header.Set("Title", m.Title)
|
||||||
|
}
|
||||||
|
if m.Priority != "" {
|
||||||
|
req.Header.Set("Priority",m.Priority)
|
||||||
|
}
|
||||||
|
if m.Tags != "" {
|
||||||
|
req.Header.Set("Tags",m.Tags)
|
||||||
|
}
|
||||||
|
if m.Actions != "" {
|
||||||
|
req.Header.Set("Actions",m.Actions)
|
||||||
|
}
|
||||||
|
rr := httptest.NewRecorder()
|
||||||
|
b.handler(rr, req)
|
||||||
|
if rr.Code != http.StatusOK {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
})
|
||||||
|
log.Info("[mqtt] Subscribed to topic %s/message", b.config.MqttTopic)
|
||||||
|
}
|
|
@ -41,6 +41,7 @@ type Server struct {
|
||||||
unixListener net.Listener
|
unixListener net.Listener
|
||||||
smtpServer *smtp.Server
|
smtpServer *smtp.Server
|
||||||
smtpServerBackend *smtpBackend
|
smtpServerBackend *smtpBackend
|
||||||
|
mqttServerBackend *mqttBackend
|
||||||
smtpSender mailer
|
smtpSender mailer
|
||||||
topics map[string]*topic
|
topics map[string]*topic
|
||||||
visitors map[netip.Addr]*visitor
|
visitors map[netip.Addr]*visitor
|
||||||
|
@ -228,6 +229,11 @@ func (s *Server) Run() error {
|
||||||
errChan <- s.runSMTPServer()
|
errChan <- s.runSMTPServer()
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
if s.config.MqttServer != "" {
|
||||||
|
go func() {
|
||||||
|
s.runMqttServer()
|
||||||
|
}()
|
||||||
|
}
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
go s.runManager()
|
go s.runManager()
|
||||||
go s.runDelayedSender()
|
go s.runDelayedSender()
|
||||||
|
@ -1221,6 +1227,13 @@ func (s *Server) runSMTPServer() error {
|
||||||
return s.smtpServer.ListenAndServe()
|
return s.smtpServer.ListenAndServe()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Server) runMqttServer() {
|
||||||
|
s.mqttServerBackend = newMqttBackend(s.config, s.handle)
|
||||||
|
s.mqttServerBackend.Connect()
|
||||||
|
s.mqttServerBackend.Subscribe()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Server) runManager() {
|
func (s *Server) runManager() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
|
|
@ -203,6 +203,18 @@
|
||||||
# visitor-attachment-total-size-limit: "100M"
|
# visitor-attachment-total-size-limit: "100M"
|
||||||
# visitor-attachment-daily-bandwidth-limit: "500M"
|
# visitor-attachment-daily-bandwidth-limit: "500M"
|
||||||
|
|
||||||
|
# MQTT configuration, if mqtt-server is empty no connexion to MQTT server :
|
||||||
|
# - mqtt-server if it's empty no connexion to Mqtt server
|
||||||
|
# - mqtt-port mqtt server port, by defaut it's 1883
|
||||||
|
# - mqtt-username if it's empty no authentification
|
||||||
|
# - mqtt-password can not be empty if username is not empty
|
||||||
|
# - mqtt-topic defaut topic to listen, message need to be send in json format on mqtt-topic/message, by defaut it's ntfy/message
|
||||||
|
# mqtt-server: ""
|
||||||
|
# mqtt-port: 1883
|
||||||
|
# mqtt-username: ""
|
||||||
|
# mqtt-password: ""
|
||||||
|
# mqtt-topic: "ntfy"
|
||||||
|
|
||||||
# Log level, can be TRACE, DEBUG, INFO, WARN or ERROR
|
# Log level, can be TRACE, DEBUG, INFO, WARN or ERROR
|
||||||
# This option can be hot-reloaded by calling "kill -HUP $pid" or "systemctl reload ntfy".
|
# This option can be hot-reloaded by calling "kill -HUP $pid" or "systemctl reload ntfy".
|
||||||
#
|
#
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue