diff --git a/cmd/app.go b/cmd/app.go index 6ef49945..b6440658 100644 --- a/cmd/app.go +++ b/cmd/app.go @@ -32,6 +32,7 @@ func New() *cli.App { cmdServe, cmdPublish, cmdSubscribe, + cmdDistribute, }, } } diff --git a/cmd/distributor.go b/cmd/distributor.go new file mode 100644 index 00000000..1edd72fd --- /dev/null +++ b/cmd/distributor.go @@ -0,0 +1,201 @@ +package cmd + +import ( + "errors" + "fmt" + "log" + "strings" + + "github.com/urfave/cli/v2" + "gorm.io/gorm" + "gorm.io/gorm/clause" + "heckel.io/ntfy/client" + + distributor_tools "unifiedpush.org/go/np2p_dbus/distributor" + "unifiedpush.org/go/np2p_dbus/storage" + "unifiedpush.org/go/np2p_dbus/utils" +) + +type store struct { + storage.Storage +} + +func (s store) GetAllPubTokens() string { + var conns []storage.Connection + result := s.DB().Find(&conns) + if result.Error != nil { + //TODO + } + pubtokens := []string{} + for _, conn := range conns { + pubtokens = append(pubtokens, conn.PublicToken) + } + + return strings.Join(pubtokens, ",") +} + +type KVStore struct { + Key string `gorm:"primaryKey"` + Value string +} + +func (kv *KVStore) Get(db *gorm.DB) error { + return db.First(kv).Error +} + +func (kv KVStore) Set(db *gorm.DB) error { + return db.Clauses(clause.OnConflict{UpdateAll: true}).Create(&kv).Error +} + +func (s store) SetLastMessage(id string) error { + return KVStore{"device-id", id}.Set(s.DB()) +} + +func (s store) GetLastMessage() string { + answer := KVStore{Key: "device-id"} + if err := answer.Get(s.DB()); err != nil { + //log or fatal?? + return "100" + } + return answer.Value +} + +var cmdDistribute = &cli.Command{ + Name: "distribute", + Aliases: []string{"dist"}, + Usage: "Start the UnifiedPush distributor", + UsageText: "ntfy distribute", + Action: execDistribute, + Flags: []cli.Flag{ + &cli.StringFlag{Name: "config", Aliases: []string{"c"}, Usage: "client config file"}, + &cli.BoolFlag{Name: "verbose", Aliases: []string{"v"}, Usage: "print verbose output"}, + }, + Description: `TODO`, +} + +func execDistribute(c *cli.Context) error { + + // this channel will resubscribe to the server whenever an app is added or removed + resubscribe := make(chan struct{}) + + // Read config + conf, err := loadConfig(c) + if err != nil { + return err + } + + distrib := newDistributor(conf, resubscribe) + + cl := client.New(conf) + + go distrib.handleEndpointSettingsChanges() + go distrib.handleDistribution(cl) + + var sub string + // everytime resubscribe is triggered, this loop will unsubscribe from the old subscription + // and resubscribe to one with the new list of topics/applications + // On the first run, 'sub' is empty but cl.Unsubscribe doesn't care. + // the first message to resubscribe (trigerring the first loop run) is sent by handleEndpointSettingsChanges + for _ = range resubscribe { + cl.Unsubscribe(sub) + + fmt.Println("Subscribing...") + subscribeTopics := distrib.st.GetAllPubTokens() + if subscribeTopics == "" { + continue + } + sub = cl.Subscribe(subscribeTopics, client.WithSince(distrib.st.GetLastMessage())) + } + + return nil +} + +// creates a new distributor object with an initialized storage and dbus +func newDistributor(conf *client.Config, resub chan struct{}) (d distributor) { + st, err := storage.InitStorage(utils.StoragePath("ntfy.db")) + st.DB().AutoMigrate(KVStore{}) //todo move to proper function + if err != nil { + log.Fatalln("failed to connect database") + } + + dbus := distributor_tools.NewDBus("org.unifiedpush.Distributor.ntfy") + d = distributor{dbus, store{*st}, conf, resub} + err = dbus.StartHandling(d) + fmt.Println("DBUS HANDLING") + if err != nil { + log.Fatalln("failed to connect to dbus") + } + + return +} + +type distributor struct { + dbus *distributor_tools.DBus + st store + conf *client.Config + resub chan struct{} +} + +// handleEndpointSettingsChanges runs on every start and +// checks if the new configuration server is different from previously. +// If so, it re-registers the apps which have the old info. +func (d distributor) handleEndpointSettingsChanges() { + endpointFormat := d.fillInURL("") + for _, i := range d.st.GetUnequalSettings(endpointFormat) { + utils.Log.Debugln("new endpoint format for", i.AppID, i.AppToken) + //newconnection updates the endpoint settings when one already exists + n := d.st.NewConnection(i.AppID, i.AppToken, endpointFormat) + if n == nil || n.Settings != endpointFormat { + utils.Log.Debugln("unable to save new endpoint format for", i.AppID, i.AppToken) + continue + } + d.dbus.NewConnector(n.AppID).NewEndpoint(n.AppToken, d.fillInURL(n.PublicToken)) + } + d.resub <- struct{}{} +} + +// handleDistribution listens to the nfty client and forwards messages to the right dbus app based on the db. +func (d distributor) handleDistribution(cl *client.Client) { + for i := range cl.Messages { + conn := d.st.GetConnectionbyPublic(i.Topic) + if conn != nil { + _ = d.dbus.NewConnector(conn.AppID).Message(conn.AppToken, i.Message, "") + } + d.st.SetLastMessage(fmt.Sprintf("%d", i.Time)) + } +} + +// Register handles an app's call to register for a new connection +// this creates a new connection in the db and triggers a resubscribe with that id +// then it returns the endpoint with that new token to dbus +func (d distributor) Register(appName, token string) (string, string, error) { + fmt.Println(appName, "registration request") + conn := d.st.NewConnection(appName, token, d.fillInURL("")) + fmt.Println("registered", conn) + if conn != nil { + d.resub <- struct{}{} + return d.fillInURL(conn.PublicToken), "", nil + } + //np2p doesn't have a situation for refuse + return "", "", errors.New("Unknown error with NoProvider2Push") +} + +// Unregister handles an app's unregister request +// It deletes the connection in the database and triggers a resubscribe +func (d distributor) Unregister(token string) { + deletedConn, err := d.st.DeleteConnection(token) + utils.Log.Debugln("deleted", deletedConn) + + if err != nil { + //????? + } + _ = d.dbus.NewConnector(deletedConn.AppID).Unregistered(deletedConn.AppToken) + + d.resub <- struct{}{} + +} + +// Fills in the default host to and app token to make UnifiedPush endpoints +func (d distributor) fillInURL(token string) string { + return fmt.Sprintf("%s/%s?up=1", d.conf.DefaultHost, token) +} diff --git a/go.mod b/go.mod index 918e9fc1..0cbf3111 100644 --- a/go.mod +++ b/go.mod @@ -28,10 +28,14 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/envoyproxy/go-control-plane v0.10.1 // indirect github.com/envoyproxy/protoc-gen-validate v0.6.2 // indirect + github.com/godbus/dbus/v5 v5.0.5 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/google/go-cmp v0.5.6 // indirect + github.com/google/uuid v1.3.0 // indirect github.com/googleapis/gax-go/v2 v2.1.1 // indirect + github.com/jinzhu/inflection v1.0.0 // indirect + github.com/jinzhu/now v1.1.2 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect @@ -44,5 +48,9 @@ require ( google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect google.golang.org/grpc v1.43.0 // indirect google.golang.org/protobuf v1.27.1 // indirect + gopkg.in/ini.v1 v1.63.1 // indirect gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect + gorm.io/driver/sqlite v1.1.5 // indirect + gorm.io/gorm v1.21.15 // indirect + unifiedpush.org/go/np2p_dbus v0.2.2 // indirect ) diff --git a/go.sum b/go.sum index c7b18e18..0d70dde6 100644 --- a/go.sum +++ b/go.sum @@ -106,6 +106,8 @@ github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeME github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= +github.com/godbus/dbus/v5 v5.0.5 h1:9Eg0XUhQxtkV8ykTMKtMMYY72g4NgxtRq4jgh4Ih5YM= +github.com/godbus/dbus/v5 v5.0.5/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -178,6 +180,8 @@ github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLe github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/gax-go/v2 v2.1.0/go.mod h1:Q3nei7sK6ybPYH7twZdmQpAd1MKb7pfu6SK+H1/DsU0= @@ -189,6 +193,10 @@ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/iancoleman/strcase v0.2.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= +github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= +github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= +github.com/jinzhu/now v1.1.2 h1:eVKgfIdy9b6zbWBMgFpfDPoAMifwSZagU9HmEU6zgiI= +github.com/jinzhu/now v1.1.2/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= @@ -199,6 +207,7 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/lyft/protoc-gen-star v0.5.3/go.mod h1:V0xaHgaf5oCCqmcxYcWiDfTiKsZsRc87/1qhoTACD8w= +github.com/mattn/go-sqlite3 v1.14.8/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/mattn/go-sqlite3 v1.14.9 h1:10HX2Td0ocZpYEjhilsuo6WWtUqttj2Kb0KtD86/KYA= github.com/mattn/go-sqlite3 v1.14.9/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/olebedev/when v0.0.0-20211212231525-59bd4edcf9d6 h1:oDSPaYiL2dbjcArLrFS8ANtwgJMyOLzvQCZon+XmFsk= @@ -626,12 +635,18 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/ini.v1 v1.63.1 h1:WlmD2fPTg4maPpRITalGs62TK7VMMtP5E9CHH7aFy6Y= +gopkg.in/ini.v1 v1.63.1/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gorm.io/driver/sqlite v1.1.5 h1:JU8G59VyKu1x1RMQgjefQnkZjDe9wHc1kARDZPu5dZs= +gorm.io/driver/sqlite v1.1.5/go.mod h1:NpaYMcVKEh6vLJ47VP6T7Weieu4H1Drs3dGD/K6GrGc= +gorm.io/gorm v1.21.15 h1:gAyaDoPw0lCyrSFWhBlahbUA1U4P5RViC1uIqoB+1Rk= +gorm.io/gorm v1.21.15/go.mod h1:F+OptMscr0P2F2qU97WT1WimdH9GaQPoDW7AYd5i2Y0= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= @@ -642,3 +657,5 @@ honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9 rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= +unifiedpush.org/go/np2p_dbus v0.2.2 h1:QzsJonfxgJPVmQ4RXMmVhWWVo8mFQQ4m0IKAMbRTITI= +unifiedpush.org/go/np2p_dbus v0.2.2/go.mod h1:4ug2cMRBjAeOiQVmArby6gg9GH0ZYvb8WXZ+yhrgzlw=