WIP
This commit is contained in:
parent
09cb1482b4
commit
466c9874a8
6 changed files with 114 additions and 76 deletions
|
@ -3,14 +3,19 @@ package client
|
|||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/stretchr/testify/require"
|
||||
"heckel.io/ntfy/crypto"
|
||||
"heckel.io/ntfy/log"
|
||||
"heckel.io/ntfy/util"
|
||||
"io"
|
||||
"mime/multipart"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -25,7 +30,8 @@ const (
|
|||
)
|
||||
|
||||
const (
|
||||
maxResponseBytes = 4096
|
||||
maxResponseBytes = 4096
|
||||
encryptedMessageBytesLimit = 100 * 1024 * 1024 // 100 MB
|
||||
)
|
||||
|
||||
// Client is the ntfy client that can be used to publish and subscribe to ntfy topics
|
||||
|
@ -95,7 +101,7 @@ func (c *Client) Publish(topic, message string, options ...PublishOption) (*Mess
|
|||
// To pass title, priority and tags, check out WithTitle, WithPriority, WithTagsList, WithDelay, WithNoCache,
|
||||
// WithNoFirebase, and the generic WithHeader.
|
||||
func (c *Client) PublishReader(topic string, body io.Reader, options ...PublishOption) (*Message, error) {
|
||||
topicURL := c.expandTopicURL(topic)
|
||||
topicURL := util.ExpandTopicURL(topic, c.config.DefaultHost)
|
||||
req, _ := http.NewRequest("POST", topicURL, body)
|
||||
for _, option := range options {
|
||||
if err := option(req); err != nil {
|
||||
|
@ -122,6 +128,59 @@ func (c *Client) PublishReader(topic string, body io.Reader, options ...PublishO
|
|||
return m, nil
|
||||
}
|
||||
|
||||
func (c *Client) PublishEncryptedReader(topic string, body io.Reader, password string, options ...PublishOption) (*Message, error) {
|
||||
topicURL := util.ExpandTopicURL(topic, c.config.DefaultHost)
|
||||
key := crypto.DeriveKey(password, topicURL)
|
||||
peaked, err := util.PeekLimit(io.NopCloser(body), encryptedMessageBytesLimit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ciphertext, err := crypto.Encrypt(peaked.PeekedBytes, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var b bytes.Buffer
|
||||
|
||||
body = strings.NewReader(ciphertext)
|
||||
w := multipart.NewWriter(&b)
|
||||
for _, part := range parts {
|
||||
mw, _ := w.CreateFormField(part.key)
|
||||
_, err := io.Copy(mw, strings.NewReader(part.value))
|
||||
require.Nil(t, err)
|
||||
}
|
||||
require.Nil(t, w.Close())
|
||||
rr := httptest.NewRecorder()
|
||||
req, err := http.NewRequest(method, url, &b)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
req, _ := http.NewRequest("POST", topicURL, body)
|
||||
req.Header.Set("X-Encoding", "jwe")
|
||||
for _, option := range options {
|
||||
if err := option(req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
log.Debug("%s Publishing message with headers %s", util.ShortTopicURL(topicURL), req.Header)
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
b, err := io.ReadAll(io.LimitReader(resp.Body, maxResponseBytes))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, errors.New(strings.TrimSpace(string(b)))
|
||||
}
|
||||
m, err := toMessage(string(b), topicURL, "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// Poll queries a topic for all (or a limited set) of messages. Unlike Subscribe, this method only polls for
|
||||
// messages and does not subscribe to messages that arrive after this call.
|
||||
//
|
||||
|
@ -136,7 +195,7 @@ func (c *Client) Poll(topic string, options ...SubscribeOption) ([]*Message, err
|
|||
messages := make([]*Message, 0)
|
||||
msgChan := make(chan *Message)
|
||||
errChan := make(chan error)
|
||||
topicURL := c.expandTopicURL(topic)
|
||||
topicURL := util.ExpandTopicURL(topic, c.config.DefaultHost)
|
||||
log.Debug("%s Polling from topic", util.ShortTopicURL(topicURL))
|
||||
options = append(options, WithPoll())
|
||||
go func() {
|
||||
|
@ -172,7 +231,7 @@ func (c *Client) Subscribe(topic string, options ...SubscribeOption) string {
|
|||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
subscriptionID := util.RandomString(10)
|
||||
topicURL := c.expandTopicURL(topic)
|
||||
topicURL := util.ExpandTopicURL(topic, c.config.DefaultHost)
|
||||
log.Debug("%s Subscribing to topic", util.ShortTopicURL(topicURL))
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
c.subscriptions[subscriptionID] = &subscription{
|
||||
|
@ -206,7 +265,7 @@ func (c *Client) Unsubscribe(subscriptionID string) {
|
|||
func (c *Client) UnsubscribeAll(topic string) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
topicURL := c.expandTopicURL(topic)
|
||||
topicURL := util.ExpandTopicURL(topic, c.config.DefaultHost)
|
||||
for _, sub := range c.subscriptions {
|
||||
if sub.topicURL == topicURL {
|
||||
delete(c.subscriptions, sub.ID)
|
||||
|
@ -215,15 +274,6 @@ func (c *Client) UnsubscribeAll(topic string) {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *Client) expandTopicURL(topic string) string {
|
||||
if strings.HasPrefix(topic, "http://") || strings.HasPrefix(topic, "https://") {
|
||||
return topic
|
||||
} else if strings.Contains(topic, "/") {
|
||||
return fmt.Sprintf("https://%s", topic)
|
||||
}
|
||||
return fmt.Sprintf("%s/%s", c.config.DefaultHost, topic)
|
||||
}
|
||||
|
||||
func handleSubscribeConnLoop(ctx context.Context, msgChan chan *Message, topicURL, subcriptionID string, options ...SubscribeOption) {
|
||||
for {
|
||||
// TODO The retry logic is crude and may lose messages. It should record the last message like the
|
||||
|
|
|
@ -92,8 +92,9 @@ func WithNoFirebase() PublishOption {
|
|||
return WithHeader("X-Firebase", "no")
|
||||
}
|
||||
|
||||
// WithEncrypted sets the encoding header to "jwe"
|
||||
func WithEncrypted() PublishOption {
|
||||
return WithHeader("X-Encryption", "jwe")
|
||||
return WithHeader("X-Encoding", "jwe")
|
||||
}
|
||||
|
||||
// WithSince limits the number of messages returned from the server. The parameter since can be a Unix
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue