From 10e0b23a518619309a2cd37e8858e49d97f69993 Mon Sep 17 00:00:00 2001 From: Karmanyaah Malhotra Date: Thu, 30 Dec 2021 00:51:04 -0600 Subject: [PATCH] test up in subscribe mostly working --- client/config.go | 10 ++++-- cmd/app.go | 4 +-- cmd/distributor.go | 89 +++++++++------------------------------------- cmd/subscribe.go | 70 ++++++++++++++++++++++++++++++------ 4 files changed, 84 insertions(+), 89 deletions(-) diff --git a/client/config.go b/client/config.go index c44fac6c..c342ab36 100644 --- a/client/config.go +++ b/client/config.go @@ -1,8 +1,9 @@ package client import ( - "gopkg.in/yaml.v2" "os" + + "gopkg.in/yaml.v2" ) const ( @@ -18,13 +19,16 @@ type Config struct { Command string `yaml:"command"` If map[string]string `yaml:"if"` } `yaml:"subscribe"` + + EnableUnifiedPush bool `yaml:"enable_unifiedpush"` } // NewConfig creates a new Config struct for a Client func NewConfig() *Config { return &Config{ - DefaultHost: DefaultBaseURL, - Subscribe: nil, + DefaultHost: DefaultBaseURL, + Subscribe: nil, + EnableUnifiedPush: true, } } diff --git a/cmd/app.go b/cmd/app.go index b6440658..fad36a21 100644 --- a/cmd/app.go +++ b/cmd/app.go @@ -3,10 +3,11 @@ package cmd import ( "fmt" + "os" + "github.com/urfave/cli/v2" "github.com/urfave/cli/v2/altsrc" "heckel.io/ntfy/util" - "os" ) var ( @@ -32,7 +33,6 @@ func New() *cli.App { cmdServe, cmdPublish, cmdSubscribe, - cmdDistribute, }, } } diff --git a/cmd/distributor.go b/cmd/distributor.go index 1edd72fd..7bc5084e 100644 --- a/cmd/distributor.go +++ b/cmd/distributor.go @@ -4,9 +4,9 @@ import ( "errors" "fmt" "log" + "strconv" "strings" - "github.com/urfave/cli/v2" "gorm.io/gorm" "gorm.io/gorm/clause" "heckel.io/ntfy/client" @@ -16,11 +16,11 @@ import ( "unifiedpush.org/go/np2p_dbus/utils" ) -type store struct { - storage.Storage +type Store struct { + *storage.Storage } -func (s store) GetAllPubTokens() string { +func (s Store) GetAllPubTokens() string { var conns []storage.Connection result := s.DB().Find(&conns) if result.Error != nil { @@ -47,71 +47,22 @@ func (kv KVStore) Set(db *gorm.DB) error { return db.Clauses(clause.OnConflict{UpdateAll: true}).Create(&kv).Error } -func (s store) SetLastMessage(id string) error { - return KVStore{"device-id", id}.Set(s.DB()) +func (s Store) SetLastMessage(id int64) error { + return KVStore{"device-id", fmt.Sprintf("%d", id)}.Set(s.DB()) } -func (s store) GetLastMessage() string { +func (s Store) GetLastMessage() int64 { answer := KVStore{Key: "device-id"} if err := answer.Get(s.DB()); err != nil { //log or fatal?? - return "100" + return 100 } - return answer.Value -} - -var cmdDistribute = &cli.Command{ - Name: "distribute", - Aliases: []string{"dist"}, - Usage: "Start the UnifiedPush distributor", - UsageText: "ntfy distribute", - Action: execDistribute, - Flags: []cli.Flag{ - &cli.StringFlag{Name: "config", Aliases: []string{"c"}, Usage: "client config file"}, - &cli.BoolFlag{Name: "verbose", Aliases: []string{"v"}, Usage: "print verbose output"}, - }, - Description: `TODO`, -} - -func execDistribute(c *cli.Context) error { - - // this channel will resubscribe to the server whenever an app is added or removed - resubscribe := make(chan struct{}) - - // Read config - conf, err := loadConfig(c) - if err != nil { - return err - } - - distrib := newDistributor(conf, resubscribe) - - cl := client.New(conf) - - go distrib.handleEndpointSettingsChanges() - go distrib.handleDistribution(cl) - - var sub string - // everytime resubscribe is triggered, this loop will unsubscribe from the old subscription - // and resubscribe to one with the new list of topics/applications - // On the first run, 'sub' is empty but cl.Unsubscribe doesn't care. - // the first message to resubscribe (trigerring the first loop run) is sent by handleEndpointSettingsChanges - for _ = range resubscribe { - cl.Unsubscribe(sub) - - fmt.Println("Subscribing...") - subscribeTopics := distrib.st.GetAllPubTokens() - if subscribeTopics == "" { - continue - } - sub = cl.Subscribe(subscribeTopics, client.WithSince(distrib.st.GetLastMessage())) - } - - return nil + time, _ := strconv.Atoi(answer.Value) + return int64(time) } // creates a new distributor object with an initialized storage and dbus -func newDistributor(conf *client.Config, resub chan struct{}) (d distributor) { +func newDistributor(conf *client.Config) (d *distributor) { st, err := storage.InitStorage(utils.StoragePath("ntfy.db")) st.DB().AutoMigrate(KVStore{}) //todo move to proper function if err != nil { @@ -119,7 +70,7 @@ func newDistributor(conf *client.Config, resub chan struct{}) (d distributor) { } dbus := distributor_tools.NewDBus("org.unifiedpush.Distributor.ntfy") - d = distributor{dbus, store{*st}, conf, resub} + d = &distributor{dbus, Store{st}, conf, make(chan struct{})} err = dbus.StartHandling(d) fmt.Println("DBUS HANDLING") if err != nil { @@ -131,7 +82,7 @@ func newDistributor(conf *client.Config, resub chan struct{}) (d distributor) { type distributor struct { dbus *distributor_tools.DBus - st store + st Store conf *client.Config resub chan struct{} } @@ -154,17 +105,6 @@ func (d distributor) handleEndpointSettingsChanges() { d.resub <- struct{}{} } -// handleDistribution listens to the nfty client and forwards messages to the right dbus app based on the db. -func (d distributor) handleDistribution(cl *client.Client) { - for i := range cl.Messages { - conn := d.st.GetConnectionbyPublic(i.Topic) - if conn != nil { - _ = d.dbus.NewConnector(conn.AppID).Message(conn.AppToken, i.Message, "") - } - d.st.SetLastMessage(fmt.Sprintf("%d", i.Time)) - } -} - // Register handles an app's call to register for a new connection // this creates a new connection in the db and triggers a resubscribe with that id // then it returns the endpoint with that new token to dbus @@ -173,7 +113,9 @@ func (d distributor) Register(appName, token string) (string, string, error) { conn := d.st.NewConnection(appName, token, d.fillInURL("")) fmt.Println("registered", conn) if conn != nil { + fmt.Println("F1") d.resub <- struct{}{} + fmt.Println("F2") return d.fillInURL(conn.PublicToken), "", nil } //np2p doesn't have a situation for refuse @@ -188,6 +130,7 @@ func (d distributor) Unregister(token string) { if err != nil { //????? + return } _ = d.dbus.NewConnector(deletedConn.AppID).Unregistered(deletedConn.AppToken) diff --git a/cmd/subscribe.go b/cmd/subscribe.go index 17498030..fb133c39 100644 --- a/cmd/subscribe.go +++ b/cmd/subscribe.go @@ -3,14 +3,15 @@ package cmd import ( "errors" "fmt" - "github.com/urfave/cli/v2" - "heckel.io/ntfy/client" - "heckel.io/ntfy/util" "log" "os" "os/exec" "os/user" "strings" + + "github.com/urfave/cli/v2" + "heckel.io/ntfy/client" + "heckel.io/ntfy/util" ) var cmdSubscribe = &cli.Command{ @@ -26,6 +27,7 @@ var cmdSubscribe = &cli.Command{ &cli.BoolFlag{Name: "poll", Aliases: []string{"p"}, Usage: "return events and exit, do not listen for new events"}, &cli.BoolFlag{Name: "scheduled", Aliases: []string{"sched", "S"}, Usage: "also return scheduled/delayed events"}, &cli.BoolFlag{Name: "verbose", Aliases: []string{"v"}, Usage: "print verbose output"}, + &cli.BoolFlag{Name: "unifiedpush", Aliases: []string{"up"}, Usage: "enable or disable unifiedpush", DefaultText: "true"}, }, Description: `Subscribe to a topic from a ntfy server, and either print or execute a command for every arriving message. There are 3 modes in which the command can be run: @@ -83,10 +85,12 @@ func execSubscribe(c *cli.Context) error { poll := c.Bool("poll") scheduled := c.Bool("scheduled") fromConfig := c.Bool("from-config") + unifiedpush := c.Bool("unifiedpush") topic := c.Args().Get(0) command := c.Args().Get(1) if !fromConfig { conf.Subscribe = nil // wipe if --from-config not passed + conf.EnableUnifiedPush = unifiedpush } var options []client.SubscribeOption if since != "" { @@ -98,15 +102,47 @@ func execSubscribe(c *cli.Context) error { if scheduled { options = append(options, client.WithScheduled()) } - if topic == "" && len(conf.Subscribe) == 0 { + if topic == "" && len(conf.Subscribe) == 0 && !conf.EnableUnifiedPush { return errors.New("must specify topic, type 'ntfy subscribe --help' for help") } + var d *distributor + if conf.EnableUnifiedPush { + d = newDistributor(conf) + + go d.handleEndpointSettingsChanges() + } + // Execute poll or subscribe if poll { return doPoll(c, cl, conf, topic, command, options...) } - return doSubscribe(c, cl, conf, topic, command, options...) + return doSubscribe(c, cl, conf, d, topic, command, options...) +} + +func unifiedPushUpdatedSubscribe(commands map[string]string, cl *client.Client, d distributor) { + var sub string + // everytime resubscribe is triggered, this loop will unsubscribe from the old subscription + // and resubscribe to one with the new list of topics/applications + for { + fmt.Println("Subscribing...") + subscribeTopics := d.st.GetAllPubTokens() + if subscribeTopics != "" { + // TODO needs better deduplication mechanism (or maybe this is good enough?) + // currently if there's a message at time 100.1, the client disconnects at 100.5, there's a message at 100.9, the client won't get the message from 100.9 + // though I don't know if this impact is serious enough to justify adding a whole bunch of code with more maintainance, bugs, etc. + sub = cl.Subscribe(subscribeTopics, client.WithSinceUnixTime(d.st.GetLastMessage()+1)) + commands[sub] = "unifiedpush" + } + + if _, open := <-d.resub; !open { + return + } + + // both operations are no-ops when the key doesn't exist so can be run even if subscribeTopics == "" + cl.Unsubscribe(sub) + delete(commands, sub) + } } func doPoll(c *cli.Context, cl *client.Client, conf *client.Config, topic, command string, options ...client.SubscribeOption) error { @@ -129,14 +165,18 @@ func doPollSingle(c *cli.Context, cl *client.Client, topic, command string, opti return err } for _, m := range messages { - printMessageOrRunCommand(c, m, command) + printMessageOrRunCommand(c, m, nil, command) } return nil } -func doSubscribe(c *cli.Context, cl *client.Client, conf *client.Config, topic, command string, options ...client.SubscribeOption) error { +func doSubscribe(c *cli.Context, cl *client.Client, conf *client.Config, d *distributor, topic, command string, options ...client.SubscribeOption) error { commands := make(map[string]string) // Subscription ID -> command - for _, s := range conf.Subscribe { // May be nil + + if d != nil { + go unifiedPushUpdatedSubscribe(commands, cl, *d) + } + for _, s := range conf.Subscribe { // May be nil topicOptions := append(make([]client.SubscribeOption, 0), options...) for filter, value := range s.If { topicOptions = append(topicOptions, client.WithFilter(filter, value)) @@ -150,16 +190,24 @@ func doSubscribe(c *cli.Context, cl *client.Client, conf *client.Config, topic, } for m := range cl.Messages { command, ok := commands[m.SubscriptionID] + fmt.Println(command, ok, m, m.SubscriptionID) if !ok { continue } - printMessageOrRunCommand(c, m, command) + printMessageOrRunCommand(c, m, d, command) } return nil } -func printMessageOrRunCommand(c *cli.Context, m *client.Message, command string) { - if command != "" { +func printMessageOrRunCommand(c *cli.Context, m *client.Message, d *distributor, command string) { + if command == "unifiedpush" && d != nil { + // this shouldn't ever be run if d is nil since there won't be a "unifiedpush" subscription + if conn := d.st.GetConnectionbyPublic(m.Topic); conn != nil { + fmt.Println("NEWMSG") + _ = d.dbus.NewConnector(conn.AppID).Message(conn.AppToken, m.Message, "") + d.st.SetLastMessage(m.Time) + } + } else if command != "" { runCommand(c, command, m) } else { fmt.Fprintln(c.App.Writer, m.Raw)