test up in subscribe
mostly working
This commit is contained in:
parent
85126a4403
commit
10e0b23a51
4 changed files with 84 additions and 89 deletions
|
@ -1,8 +1,9 @@
|
||||||
package client
|
package client
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"gopkg.in/yaml.v2"
|
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
|
"gopkg.in/yaml.v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -18,6 +19,8 @@ type Config struct {
|
||||||
Command string `yaml:"command"`
|
Command string `yaml:"command"`
|
||||||
If map[string]string `yaml:"if"`
|
If map[string]string `yaml:"if"`
|
||||||
} `yaml:"subscribe"`
|
} `yaml:"subscribe"`
|
||||||
|
|
||||||
|
EnableUnifiedPush bool `yaml:"enable_unifiedpush"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewConfig creates a new Config struct for a Client
|
// NewConfig creates a new Config struct for a Client
|
||||||
|
@ -25,6 +28,7 @@ func NewConfig() *Config {
|
||||||
return &Config{
|
return &Config{
|
||||||
DefaultHost: DefaultBaseURL,
|
DefaultHost: DefaultBaseURL,
|
||||||
Subscribe: nil,
|
Subscribe: nil,
|
||||||
|
EnableUnifiedPush: true,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,10 +3,11 @@ package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"os"
|
||||||
|
|
||||||
"github.com/urfave/cli/v2"
|
"github.com/urfave/cli/v2"
|
||||||
"github.com/urfave/cli/v2/altsrc"
|
"github.com/urfave/cli/v2/altsrc"
|
||||||
"heckel.io/ntfy/util"
|
"heckel.io/ntfy/util"
|
||||||
"os"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -32,7 +33,6 @@ func New() *cli.App {
|
||||||
cmdServe,
|
cmdServe,
|
||||||
cmdPublish,
|
cmdPublish,
|
||||||
cmdSubscribe,
|
cmdSubscribe,
|
||||||
cmdDistribute,
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,9 +4,9 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/urfave/cli/v2"
|
|
||||||
"gorm.io/gorm"
|
"gorm.io/gorm"
|
||||||
"gorm.io/gorm/clause"
|
"gorm.io/gorm/clause"
|
||||||
"heckel.io/ntfy/client"
|
"heckel.io/ntfy/client"
|
||||||
|
@ -16,11 +16,11 @@ import (
|
||||||
"unifiedpush.org/go/np2p_dbus/utils"
|
"unifiedpush.org/go/np2p_dbus/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
type store struct {
|
type Store struct {
|
||||||
storage.Storage
|
*storage.Storage
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s store) GetAllPubTokens() string {
|
func (s Store) GetAllPubTokens() string {
|
||||||
var conns []storage.Connection
|
var conns []storage.Connection
|
||||||
result := s.DB().Find(&conns)
|
result := s.DB().Find(&conns)
|
||||||
if result.Error != nil {
|
if result.Error != nil {
|
||||||
|
@ -47,71 +47,22 @@ func (kv KVStore) Set(db *gorm.DB) error {
|
||||||
return db.Clauses(clause.OnConflict{UpdateAll: true}).Create(&kv).Error
|
return db.Clauses(clause.OnConflict{UpdateAll: true}).Create(&kv).Error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s store) SetLastMessage(id string) error {
|
func (s Store) SetLastMessage(id int64) error {
|
||||||
return KVStore{"device-id", id}.Set(s.DB())
|
return KVStore{"device-id", fmt.Sprintf("%d", id)}.Set(s.DB())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s store) GetLastMessage() string {
|
func (s Store) GetLastMessage() int64 {
|
||||||
answer := KVStore{Key: "device-id"}
|
answer := KVStore{Key: "device-id"}
|
||||||
if err := answer.Get(s.DB()); err != nil {
|
if err := answer.Get(s.DB()); err != nil {
|
||||||
//log or fatal??
|
//log or fatal??
|
||||||
return "100"
|
return 100
|
||||||
}
|
}
|
||||||
return answer.Value
|
time, _ := strconv.Atoi(answer.Value)
|
||||||
}
|
return int64(time)
|
||||||
|
|
||||||
var cmdDistribute = &cli.Command{
|
|
||||||
Name: "distribute",
|
|
||||||
Aliases: []string{"dist"},
|
|
||||||
Usage: "Start the UnifiedPush distributor",
|
|
||||||
UsageText: "ntfy distribute",
|
|
||||||
Action: execDistribute,
|
|
||||||
Flags: []cli.Flag{
|
|
||||||
&cli.StringFlag{Name: "config", Aliases: []string{"c"}, Usage: "client config file"},
|
|
||||||
&cli.BoolFlag{Name: "verbose", Aliases: []string{"v"}, Usage: "print verbose output"},
|
|
||||||
},
|
|
||||||
Description: `TODO`,
|
|
||||||
}
|
|
||||||
|
|
||||||
func execDistribute(c *cli.Context) error {
|
|
||||||
|
|
||||||
// this channel will resubscribe to the server whenever an app is added or removed
|
|
||||||
resubscribe := make(chan struct{})
|
|
||||||
|
|
||||||
// Read config
|
|
||||||
conf, err := loadConfig(c)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
distrib := newDistributor(conf, resubscribe)
|
|
||||||
|
|
||||||
cl := client.New(conf)
|
|
||||||
|
|
||||||
go distrib.handleEndpointSettingsChanges()
|
|
||||||
go distrib.handleDistribution(cl)
|
|
||||||
|
|
||||||
var sub string
|
|
||||||
// everytime resubscribe is triggered, this loop will unsubscribe from the old subscription
|
|
||||||
// and resubscribe to one with the new list of topics/applications
|
|
||||||
// On the first run, 'sub' is empty but cl.Unsubscribe doesn't care.
|
|
||||||
// the first message to resubscribe (trigerring the first loop run) is sent by handleEndpointSettingsChanges
|
|
||||||
for _ = range resubscribe {
|
|
||||||
cl.Unsubscribe(sub)
|
|
||||||
|
|
||||||
fmt.Println("Subscribing...")
|
|
||||||
subscribeTopics := distrib.st.GetAllPubTokens()
|
|
||||||
if subscribeTopics == "" {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
sub = cl.Subscribe(subscribeTopics, client.WithSince(distrib.st.GetLastMessage()))
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// creates a new distributor object with an initialized storage and dbus
|
// creates a new distributor object with an initialized storage and dbus
|
||||||
func newDistributor(conf *client.Config, resub chan struct{}) (d distributor) {
|
func newDistributor(conf *client.Config) (d *distributor) {
|
||||||
st, err := storage.InitStorage(utils.StoragePath("ntfy.db"))
|
st, err := storage.InitStorage(utils.StoragePath("ntfy.db"))
|
||||||
st.DB().AutoMigrate(KVStore{}) //todo move to proper function
|
st.DB().AutoMigrate(KVStore{}) //todo move to proper function
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -119,7 +70,7 @@ func newDistributor(conf *client.Config, resub chan struct{}) (d distributor) {
|
||||||
}
|
}
|
||||||
|
|
||||||
dbus := distributor_tools.NewDBus("org.unifiedpush.Distributor.ntfy")
|
dbus := distributor_tools.NewDBus("org.unifiedpush.Distributor.ntfy")
|
||||||
d = distributor{dbus, store{*st}, conf, resub}
|
d = &distributor{dbus, Store{st}, conf, make(chan struct{})}
|
||||||
err = dbus.StartHandling(d)
|
err = dbus.StartHandling(d)
|
||||||
fmt.Println("DBUS HANDLING")
|
fmt.Println("DBUS HANDLING")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -131,7 +82,7 @@ func newDistributor(conf *client.Config, resub chan struct{}) (d distributor) {
|
||||||
|
|
||||||
type distributor struct {
|
type distributor struct {
|
||||||
dbus *distributor_tools.DBus
|
dbus *distributor_tools.DBus
|
||||||
st store
|
st Store
|
||||||
conf *client.Config
|
conf *client.Config
|
||||||
resub chan struct{}
|
resub chan struct{}
|
||||||
}
|
}
|
||||||
|
@ -154,17 +105,6 @@ func (d distributor) handleEndpointSettingsChanges() {
|
||||||
d.resub <- struct{}{}
|
d.resub <- struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleDistribution listens to the nfty client and forwards messages to the right dbus app based on the db.
|
|
||||||
func (d distributor) handleDistribution(cl *client.Client) {
|
|
||||||
for i := range cl.Messages {
|
|
||||||
conn := d.st.GetConnectionbyPublic(i.Topic)
|
|
||||||
if conn != nil {
|
|
||||||
_ = d.dbus.NewConnector(conn.AppID).Message(conn.AppToken, i.Message, "")
|
|
||||||
}
|
|
||||||
d.st.SetLastMessage(fmt.Sprintf("%d", i.Time))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Register handles an app's call to register for a new connection
|
// Register handles an app's call to register for a new connection
|
||||||
// this creates a new connection in the db and triggers a resubscribe with that id
|
// this creates a new connection in the db and triggers a resubscribe with that id
|
||||||
// then it returns the endpoint with that new token to dbus
|
// then it returns the endpoint with that new token to dbus
|
||||||
|
@ -173,7 +113,9 @@ func (d distributor) Register(appName, token string) (string, string, error) {
|
||||||
conn := d.st.NewConnection(appName, token, d.fillInURL("<token>"))
|
conn := d.st.NewConnection(appName, token, d.fillInURL("<token>"))
|
||||||
fmt.Println("registered", conn)
|
fmt.Println("registered", conn)
|
||||||
if conn != nil {
|
if conn != nil {
|
||||||
|
fmt.Println("F1")
|
||||||
d.resub <- struct{}{}
|
d.resub <- struct{}{}
|
||||||
|
fmt.Println("F2")
|
||||||
return d.fillInURL(conn.PublicToken), "", nil
|
return d.fillInURL(conn.PublicToken), "", nil
|
||||||
}
|
}
|
||||||
//np2p doesn't have a situation for refuse
|
//np2p doesn't have a situation for refuse
|
||||||
|
@ -188,6 +130,7 @@ func (d distributor) Unregister(token string) {
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
//?????
|
//?????
|
||||||
|
return
|
||||||
}
|
}
|
||||||
_ = d.dbus.NewConnector(deletedConn.AppID).Unregistered(deletedConn.AppToken)
|
_ = d.dbus.NewConnector(deletedConn.AppID).Unregistered(deletedConn.AppToken)
|
||||||
|
|
||||||
|
|
|
@ -3,14 +3,15 @@ package cmd
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/urfave/cli/v2"
|
|
||||||
"heckel.io/ntfy/client"
|
|
||||||
"heckel.io/ntfy/util"
|
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"os/user"
|
"os/user"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/urfave/cli/v2"
|
||||||
|
"heckel.io/ntfy/client"
|
||||||
|
"heckel.io/ntfy/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
var cmdSubscribe = &cli.Command{
|
var cmdSubscribe = &cli.Command{
|
||||||
|
@ -26,6 +27,7 @@ var cmdSubscribe = &cli.Command{
|
||||||
&cli.BoolFlag{Name: "poll", Aliases: []string{"p"}, Usage: "return events and exit, do not listen for new events"},
|
&cli.BoolFlag{Name: "poll", Aliases: []string{"p"}, Usage: "return events and exit, do not listen for new events"},
|
||||||
&cli.BoolFlag{Name: "scheduled", Aliases: []string{"sched", "S"}, Usage: "also return scheduled/delayed events"},
|
&cli.BoolFlag{Name: "scheduled", Aliases: []string{"sched", "S"}, Usage: "also return scheduled/delayed events"},
|
||||||
&cli.BoolFlag{Name: "verbose", Aliases: []string{"v"}, Usage: "print verbose output"},
|
&cli.BoolFlag{Name: "verbose", Aliases: []string{"v"}, Usage: "print verbose output"},
|
||||||
|
&cli.BoolFlag{Name: "unifiedpush", Aliases: []string{"up"}, Usage: "enable or disable unifiedpush", DefaultText: "true"},
|
||||||
},
|
},
|
||||||
Description: `Subscribe to a topic from a ntfy server, and either print or execute a command for
|
Description: `Subscribe to a topic from a ntfy server, and either print or execute a command for
|
||||||
every arriving message. There are 3 modes in which the command can be run:
|
every arriving message. There are 3 modes in which the command can be run:
|
||||||
|
@ -83,10 +85,12 @@ func execSubscribe(c *cli.Context) error {
|
||||||
poll := c.Bool("poll")
|
poll := c.Bool("poll")
|
||||||
scheduled := c.Bool("scheduled")
|
scheduled := c.Bool("scheduled")
|
||||||
fromConfig := c.Bool("from-config")
|
fromConfig := c.Bool("from-config")
|
||||||
|
unifiedpush := c.Bool("unifiedpush")
|
||||||
topic := c.Args().Get(0)
|
topic := c.Args().Get(0)
|
||||||
command := c.Args().Get(1)
|
command := c.Args().Get(1)
|
||||||
if !fromConfig {
|
if !fromConfig {
|
||||||
conf.Subscribe = nil // wipe if --from-config not passed
|
conf.Subscribe = nil // wipe if --from-config not passed
|
||||||
|
conf.EnableUnifiedPush = unifiedpush
|
||||||
}
|
}
|
||||||
var options []client.SubscribeOption
|
var options []client.SubscribeOption
|
||||||
if since != "" {
|
if since != "" {
|
||||||
|
@ -98,15 +102,47 @@ func execSubscribe(c *cli.Context) error {
|
||||||
if scheduled {
|
if scheduled {
|
||||||
options = append(options, client.WithScheduled())
|
options = append(options, client.WithScheduled())
|
||||||
}
|
}
|
||||||
if topic == "" && len(conf.Subscribe) == 0 {
|
if topic == "" && len(conf.Subscribe) == 0 && !conf.EnableUnifiedPush {
|
||||||
return errors.New("must specify topic, type 'ntfy subscribe --help' for help")
|
return errors.New("must specify topic, type 'ntfy subscribe --help' for help")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var d *distributor
|
||||||
|
if conf.EnableUnifiedPush {
|
||||||
|
d = newDistributor(conf)
|
||||||
|
|
||||||
|
go d.handleEndpointSettingsChanges()
|
||||||
|
}
|
||||||
|
|
||||||
// Execute poll or subscribe
|
// Execute poll or subscribe
|
||||||
if poll {
|
if poll {
|
||||||
return doPoll(c, cl, conf, topic, command, options...)
|
return doPoll(c, cl, conf, topic, command, options...)
|
||||||
}
|
}
|
||||||
return doSubscribe(c, cl, conf, topic, command, options...)
|
return doSubscribe(c, cl, conf, d, topic, command, options...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func unifiedPushUpdatedSubscribe(commands map[string]string, cl *client.Client, d distributor) {
|
||||||
|
var sub string
|
||||||
|
// everytime resubscribe is triggered, this loop will unsubscribe from the old subscription
|
||||||
|
// and resubscribe to one with the new list of topics/applications
|
||||||
|
for {
|
||||||
|
fmt.Println("Subscribing...")
|
||||||
|
subscribeTopics := d.st.GetAllPubTokens()
|
||||||
|
if subscribeTopics != "" {
|
||||||
|
// TODO needs better deduplication mechanism (or maybe this is good enough?)
|
||||||
|
// currently if there's a message at time 100.1, the client disconnects at 100.5, there's a message at 100.9, the client won't get the message from 100.9
|
||||||
|
// though I don't know if this impact is serious enough to justify adding a whole bunch of code with more maintainance, bugs, etc.
|
||||||
|
sub = cl.Subscribe(subscribeTopics, client.WithSinceUnixTime(d.st.GetLastMessage()+1))
|
||||||
|
commands[sub] = "unifiedpush"
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, open := <-d.resub; !open {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// both operations are no-ops when the key doesn't exist so can be run even if subscribeTopics == ""
|
||||||
|
cl.Unsubscribe(sub)
|
||||||
|
delete(commands, sub)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func doPoll(c *cli.Context, cl *client.Client, conf *client.Config, topic, command string, options ...client.SubscribeOption) error {
|
func doPoll(c *cli.Context, cl *client.Client, conf *client.Config, topic, command string, options ...client.SubscribeOption) error {
|
||||||
|
@ -129,13 +165,17 @@ func doPollSingle(c *cli.Context, cl *client.Client, topic, command string, opti
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for _, m := range messages {
|
for _, m := range messages {
|
||||||
printMessageOrRunCommand(c, m, command)
|
printMessageOrRunCommand(c, m, nil, command)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func doSubscribe(c *cli.Context, cl *client.Client, conf *client.Config, topic, command string, options ...client.SubscribeOption) error {
|
func doSubscribe(c *cli.Context, cl *client.Client, conf *client.Config, d *distributor, topic, command string, options ...client.SubscribeOption) error {
|
||||||
commands := make(map[string]string) // Subscription ID -> command
|
commands := make(map[string]string) // Subscription ID -> command
|
||||||
|
|
||||||
|
if d != nil {
|
||||||
|
go unifiedPushUpdatedSubscribe(commands, cl, *d)
|
||||||
|
}
|
||||||
for _, s := range conf.Subscribe { // May be nil
|
for _, s := range conf.Subscribe { // May be nil
|
||||||
topicOptions := append(make([]client.SubscribeOption, 0), options...)
|
topicOptions := append(make([]client.SubscribeOption, 0), options...)
|
||||||
for filter, value := range s.If {
|
for filter, value := range s.If {
|
||||||
|
@ -150,16 +190,24 @@ func doSubscribe(c *cli.Context, cl *client.Client, conf *client.Config, topic,
|
||||||
}
|
}
|
||||||
for m := range cl.Messages {
|
for m := range cl.Messages {
|
||||||
command, ok := commands[m.SubscriptionID]
|
command, ok := commands[m.SubscriptionID]
|
||||||
|
fmt.Println(command, ok, m, m.SubscriptionID)
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
printMessageOrRunCommand(c, m, command)
|
printMessageOrRunCommand(c, m, d, command)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func printMessageOrRunCommand(c *cli.Context, m *client.Message, command string) {
|
func printMessageOrRunCommand(c *cli.Context, m *client.Message, d *distributor, command string) {
|
||||||
if command != "" {
|
if command == "unifiedpush" && d != nil {
|
||||||
|
// this shouldn't ever be run if d is nil since there won't be a "unifiedpush" subscription
|
||||||
|
if conn := d.st.GetConnectionbyPublic(m.Topic); conn != nil {
|
||||||
|
fmt.Println("NEWMSG")
|
||||||
|
_ = d.dbus.NewConnector(conn.AppID).Message(conn.AppToken, m.Message, "")
|
||||||
|
d.st.SetLastMessage(m.Time)
|
||||||
|
}
|
||||||
|
} else if command != "" {
|
||||||
runCommand(c, command, m)
|
runCommand(c, command, m)
|
||||||
} else {
|
} else {
|
||||||
fmt.Fprintln(c.App.Writer, m.Raw)
|
fmt.Fprintln(c.App.Writer, m.Raw)
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue