diff --git a/config/config.go b/config/config.go
index 2dbed003..9e1640a8 100644
--- a/config/config.go
+++ b/config/config.go
@@ -11,6 +11,10 @@ const (
DefaultCacheDuration = 12 * time.Hour
DefaultKeepaliveInterval = 30 * time.Second
DefaultManagerInterval = time.Minute
+ DefaultAtSenderInterval = 10 * time.Second
+ DefaultMinDelay = 10 * time.Second
+ DefaultMaxDelay = 3 * 24 * time.Hour
+ DefaultMessageLimit = 512
)
// Defines all the limits
@@ -35,6 +39,10 @@ type Config struct {
CacheDuration time.Duration
KeepaliveInterval time.Duration
ManagerInterval time.Duration
+ AtSenderInterval time.Duration
+ MessageLimit int
+ MinDelay time.Duration
+ MaxDelay time.Duration
GlobalTopicLimit int
VisitorRequestLimitBurst int
VisitorRequestLimitReplenish time.Duration
@@ -54,6 +62,10 @@ func New(listenHTTP string) *Config {
CacheDuration: DefaultCacheDuration,
KeepaliveInterval: DefaultKeepaliveInterval,
ManagerInterval: DefaultManagerInterval,
+ MessageLimit: DefaultMessageLimit,
+ MinDelay: DefaultMinDelay,
+ MaxDelay: DefaultMaxDelay,
+ AtSenderInterval: DefaultAtSenderInterval,
GlobalTopicLimit: DefaultGlobalTopicLimit,
VisitorRequestLimitBurst: DefaultVisitorRequestLimitBurst,
VisitorRequestLimitReplenish: DefaultVisitorRequestLimitReplenish,
diff --git a/docs/config.md b/docs/config.md
index 1f8a54d2..57625622 100644
--- a/docs/config.md
+++ b/docs/config.md
@@ -32,7 +32,7 @@ You can also entirely disable the cache by setting `cache-duration` to `0`. When
passed on to the connected subscribers, but never stored on disk or even kept in memory longer than is needed to forward
the message to the subscribers.
-Subscribers can retrieve cached messaging using the [`poll=1` parameter](subscribe/api.md#polling), as well as the
+Subscribers can retrieve cached messaging using the [`poll=1` parameter](subscribe/api.md#polling-for-messages), as well as the
[`since=` parameter](subscribe/api.md#fetching-cached-messages).
## Behind a proxy (TLS, etc.)
diff --git a/docs/publish.md b/docs/publish.md
index 5ffaaf78..0e4a04d1 100644
--- a/docs/publish.md
+++ b/docs/publish.md
@@ -332,6 +332,85 @@ them with a comma, e.g. `tag1,tag2,tag3`.
Detail view of notifications with tags
+## Scheduled delivery
+You can delay the delivery of messages and let ntfy send them at a later date. This can be used to send yourself
+reminders or even to execute commands at a later date (if your subscriber acts on messages).
+
+Usage is pretty straight forward. You can set the delivery time using the `X-Delay` header (or any of its aliases: `Delay`,
+`X-At`, `At`, `X-In` or `In`), either by specifying a Unix timestamp (e.g. `1639194738`), a duration (e.g. `30m`,
+`3h`, `2 days`), or a natural language time string (e.g. `10am`, `8:30pm`, `tomorrow, 3pm`, `Tuesday, 7am`,
+[and more](https://github.com/olebedev/when)).
+
+As of today, the minimum delay you can set is **10 seconds** and the maximum delay is **3 days**. This can currently
+not be configured otherwise ([let me know](https://github.com/binwiederhier/ntfy/issues) if you'd like to change
+these limits).
+
+For the purposes of [message caching](config.md#message-cache), scheduled messages are kept in the cache until 12 hours
+after they were delivered (or whatever the server-side cache duration is set to). For instance, if a message is scheduled
+to be delivered in 3 days, it'll remain in the cache for 3 days and 12 hours. Also note that naturally,
+[turning off server-side caching](#message-caching) is not possible in combination with this feature.
+
+=== "Command line (curl)"
+ ```
+ curl -H "At: tomorrow, 10am" -d "Good morning" ntfy.sh/hello
+ curl -H "In: 30min" -d "It's 30 minutes later now" ntfy.sh/reminder
+ curl -H "Delay: 1639194738" -d "Unix timestamps are awesome" ntfy.sh/itsaunixsystem
+ ```
+
+=== "HTTP"
+ ``` http
+ POST /hello HTTP/1.1
+ Host: ntfy.sh
+ At: tomorrow, 10am
+
+ Good morning
+ ```
+
+=== "JavaScript"
+ ``` javascript
+ fetch('https://ntfy.sh/hello', {
+ method: 'POST',
+ body: 'Good morning',
+ headers: { 'At': 'tomorrow, 10am' }
+ })
+ ```
+
+=== "Go"
+ ``` go
+ req, _ := http.NewRequest("POST", "https://ntfy.sh/hello", strings.NewReader("Good morning"))
+ req.Header.Set("At", "tomorrow, 10am")
+ http.DefaultClient.Do(req)
+ ```
+
+=== "PHP"
+ ``` php-inline
+ file_get_contents('https://ntfy.sh/backups', false, stream_context_create([
+ 'http' => [
+ 'method' => 'POST',
+ 'header' =>
+ "Content-Type: text/plain\r\n" .
+ "At: tomorrow, 10am",
+ 'content' => 'Good morning'
+ ]
+ ]));
+ ```
+
+Here are a few examples (assuming today's date is **12/10/2021, 9am, Eastern Time Zone**):
+
+
+
+
+ Delay/At/In header | Message will be delivered at | Explanation |
+ 30m | 12/10/2021, 9:30am | 30 minutes from now |
+ 2 hours | 12/10/2021, 11:30am | 2 hours from now |
+ 1 day | 12/11/2021, 9am | 24 hours from now |
+ 10am | 12/10/2021, 10am | Today at 10am (same day, because it's only 9am) |
+ 8am | 12/11/2021, 8am | Tomorrow at 8am (because it's 9am already) |
+ 1639152000 | 12/10/2021, 11am (EST) | Today at 11am (EST) |
+
+ |
+
+
## Advanced features
### Message caching
@@ -347,7 +426,7 @@ client-side network disruptions, but arguably this feature also may raise privac
To avoid messages being cached server-side entirely, you can set `X-Cache` header (or its alias: `Cache`) to `no`.
This will make sure that your message is not cached on the server, even if server-side caching is enabled. Messages
are still delivered to connected subscribers, but [`since=`](subscribe/api.md#fetching-cached-messages) and
-[`poll=1`](subscribe/api.md#polling) won't return the message anymore.
+[`poll=1`](subscribe/api.md#polling-for-messages) won't return the message anymore.
=== "Command line (curl)"
```
@@ -393,7 +472,7 @@ are still delivered to connected subscribers, but [`since=`](subscribe/api.md#fe
]));
```
-### Firebase
+### Disable Firebase
!!! info
If `Firebase: no` is used and [instant delivery](subscribe/phone.md#instant-delivery) isn't enabled in the Android
app (Google Play variant only), **message delivery will be significantly delayed (up to 15 minutes)**. To overcome
diff --git a/docs/subscribe/api.md b/docs/subscribe/api.md
index b4449cf2..4a0bb858 100644
--- a/docs/subscribe/api.md
+++ b/docs/subscribe/api.md
@@ -239,7 +239,7 @@ or `all` (all cached messages).
curl -s "ntfy.sh/mytopic/json?since=10m"
```
-### Polling
+### Polling for messages
You can also just poll for messages if you don't like the long-standing connection using the `poll=1`
query parameter. The connection will end after all available messages have been read. This parameter can be
combined with `since=` (defaults to `since=all`).
@@ -248,6 +248,16 @@ combined with `since=` (defaults to `since=all`).
curl -s "ntfy.sh/mytopic/json?poll=1"
```
+### Fetching scheduled messages
+Messages that are [scheduled to be delivered](../publish.md#scheduled-delivery) at a later date are not typically
+returned when subscribing via the API, which makes sense, because after all, the messages have technically not been
+delivered yet. To also return scheduled messages from the API, you can use the `scheduled=1` (alias: `sched=1`)
+parameter (makes most sense with the `poll=1` parameter):
+
+```
+curl -s "ntfy.sh/mytopic/json?poll=1&sched=1"
+```
+
### Subscribing to multiple topics
It's possible to subscribe to multiple topics in one HTTP call by providing a
comma-separated list of topics in the URL. This allows you to reduce the number of connections you have to maintain:
diff --git a/go.mod b/go.mod
index 9987eebc..2d4db264 100644
--- a/go.mod
+++ b/go.mod
@@ -2,6 +2,8 @@ module heckel.io/ntfy
go 1.17
+replace github.com/olebedev/when => github.com/binwiederhier/when v0.0.1-binwiederhier2
+
require (
cloud.google.com/go/firestore v1.6.1 // indirect
cloud.google.com/go/storage v1.18.2 // indirect
@@ -9,36 +11,39 @@ require (
github.com/BurntSushi/toml v0.4.1 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.1 // indirect
github.com/mattn/go-sqlite3 v1.14.9
+ github.com/olebedev/when v0.0.0-20190311101825-c3b538a97254
github.com/stretchr/testify v1.7.0
github.com/urfave/cli/v2 v2.3.0
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11
- google.golang.org/api v0.61.0
+ google.golang.org/api v0.62.0
gopkg.in/yaml.v2 v2.4.0 // indirect
)
require (
cloud.google.com/go v0.99.0 // indirect
+ github.com/AlekSi/pointer v1.0.0 // indirect
github.com/census-instrumentation/opencensus-proto v0.3.0 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4 // indirect
github.com/cncf/xds/go v0.0.0-20211130200136-a8f946100490 // indirect
- github.com/davecgh/go-spew v1.1.0 // indirect
+ github.com/davecgh/go-spew v1.1.1 // indirect
github.com/envoyproxy/go-control-plane v0.10.1 // indirect
github.com/envoyproxy/protoc-gen-validate v0.6.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-cmp v0.5.6 // indirect
github.com/googleapis/gax-go/v2 v2.1.1 // indirect
+ github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
go.opencensus.io v0.23.0 // indirect
golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d // indirect
- golang.org/x/sys v0.0.0-20211124211545-fe61309f8881 // indirect
+ golang.org/x/sys v0.0.0-20211205182925-97ca703d548d // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/appengine v1.6.7 // indirect
- google.golang.org/genproto v0.0.0-20211206220100-3cb06788ce7f // indirect
+ google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect
google.golang.org/grpc v1.42.0 // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
diff --git a/go.sum b/go.sum
index 48252afe..e04bb27f 100644
--- a/go.sum
+++ b/go.sum
@@ -25,6 +25,7 @@ cloud.google.com/go v0.90.0/go.mod h1:kRX0mNRHe0e2rC6oNakvwQqzyDmg57xJ+SZU1eT2aD
cloud.google.com/go v0.93.3/go.mod h1:8utlLll2EF5XMAV15woO4lSbWQlk8rer9aLOfLh7+YI=
cloud.google.com/go v0.94.1/go.mod h1:qAlAugsXlC+JWO+Bke5vCtc9ONxjQT3drlTTnAplMW4=
cloud.google.com/go v0.97.0/go.mod h1:GF7l59pYBVlXQIBLx3a761cZ41F9bBH3JUlihCt2Udc=
+cloud.google.com/go v0.98.0/go.mod h1:ua6Ush4NALrHk5QXDWnjvZHN93OuF0HfuEPq9I1X0cM=
cloud.google.com/go v0.99.0 h1:y/cM2iqGgGi5D5DQZl6D9STN/3dR/Vx5Mp8s752oJTY=
cloud.google.com/go v0.99.0/go.mod h1:w0Xx2nLzqWJPuozYQX+hFfCSI8WioryfRDzkoI/Y2ZA=
cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o=
@@ -51,12 +52,16 @@ cloud.google.com/go/storage v1.18.2/go.mod h1:AiIj7BWXyhO5gGVmYJ+S8tbkCx3yb0IMju
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
firebase.google.com/go v3.13.0+incompatible h1:3TdYC3DDi6aHn20qoRkxwGqNgdjtblwVAyRLQwGn/+4=
firebase.google.com/go v3.13.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIwjt8toICdV5Wh9ptHs=
+github.com/AlekSi/pointer v1.0.0 h1:KWCWzsvFxNLcmM5XmiqHsGTTsuwZMsLFwWF9Y+//bNE=
+github.com/AlekSi/pointer v1.0.0/go.mod h1:1kjywbfcPFCmncIxtk6fIEub6LKrfMz3gc5QKVOSOA8=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/toml v0.4.1 h1:GaI7EiDXDRfa8VshkTj7Fym7ha+y8/XxIgD2okUIjLw=
github.com/BurntSushi/toml v0.4.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
+github.com/binwiederhier/when v0.0.1-binwiederhier2 h1:BjQC7OQI4MK0vXeltn2BEuf0Tdh/M6YNh1JrepnVr2I=
+github.com/binwiederhier/when v0.0.1-binwiederhier2/go.mod h1:DPucAeQGDPUzYUt+NaWw6qsF5SFapWWToxEiVDh2aV0=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.3.0 h1:t/LhUZLVitR1Ow2YOnduCsavhwFUklBMoGVYUCqmCqk=
github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
@@ -84,8 +89,9 @@ github.com/cncf/xds/go v0.0.0-20211130200136-a8f946100490/go.mod h1:eXthEFrGJvWH
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/cpuguy83/go-md2man/v2 v2.0.1 h1:r/myEWzV9lfsM1tFLgDyu0atFtJ1fXn261LKYj/3DxU=
github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
-github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
@@ -199,6 +205,8 @@ github.com/lyft/protoc-gen-star v0.5.3/go.mod h1:V0xaHgaf5oCCqmcxYcWiDfTiKsZsRc8
github.com/mattn/go-sqlite3 v1.14.9 h1:10HX2Td0ocZpYEjhilsuo6WWtUqttj2Kb0KtD86/KYA=
github.com/mattn/go-sqlite3 v1.14.9/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
+github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
@@ -213,6 +221,7 @@ github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasO
github.com/spf13/afero v1.3.3/go.mod h1:5KUK8ByomD5Ti5Artl0RtHeI5pTF7MIDuXL3yY520V4=
github.com/spf13/afero v1.6.0/go.mod h1:Ai8FlHk4v/PARR026UzYexafAt9roJ7LcLMAmO6Z93I=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
@@ -390,8 +399,9 @@ golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210908233432-aa78b53d3365/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210917161153-d61c044b1678/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20211124211545-fe61309f8881 h1:TyHqChC80pFkXWraUUf6RuB5IqFdQieMLwwCJokV2pc=
golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20211205182925-97ca703d548d h1:FjkYO/PPp4Wi0EAUOVLxePm7qVW4r4ctbWpURyuOD0E=
+golang.org/x/sys v0.0.0-20211205182925-97ca703d548d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -495,8 +505,9 @@ google.golang.org/api v0.56.0/go.mod h1:38yMfeP1kfjsl8isn0tliTjIb1rJXcQi4UXlbqiv
google.golang.org/api v0.57.0/go.mod h1:dVPlbZyBo2/OjBpmvNdpn2GRm6rPy75jyU7bmhdrMgI=
google.golang.org/api v0.58.0/go.mod h1:cAbP2FsxoGVNwtgNAmmn3y5G1TWAiVYRmg4yku3lv+E=
google.golang.org/api v0.59.0/go.mod h1:sT2boj7M9YJxZzgeZqXogmhfmRWDtPzT31xkieUbuZU=
-google.golang.org/api v0.61.0 h1:TXXKS1slM3b2bZNJwD5DV/Tp6/M2cLzLOLh9PjDhrw8=
google.golang.org/api v0.61.0/go.mod h1:xQRti5UdCmoCEqFxcz93fTl338AVqDgyaDRuOZ3hg9I=
+google.golang.org/api v0.62.0 h1:PhGymJMXfGBzc4lBRmrx9+1w4w2wEzURHNGF/sD/xGc=
+google.golang.org/api v0.62.0/go.mod h1:dKmwPCydfsad4qCH08MSdgWjfHOyfpd4VtDGgRFdavw=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
@@ -566,9 +577,11 @@ google.golang.org/genproto v0.0.0-20211008145708-270636b82663/go.mod h1:5CzLGKJ6
google.golang.org/genproto v0.0.0-20211016002631-37fc39342514/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20211028162531-8db9c33dc351/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
+google.golang.org/genproto v0.0.0-20211129164237-f09f9a12af12/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
+google.golang.org/genproto v0.0.0-20211203200212-54befc351ae9/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20211206160659-862468c7d6e0/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
-google.golang.org/genproto v0.0.0-20211206220100-3cb06788ce7f h1:QH7+Ym+7e2XV1dZIHapkXoeqHyNaCzn6MNp3JBaYYUc=
-google.golang.org/genproto v0.0.0-20211206220100-3cb06788ce7f/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
+google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa h1:I0YcKz0I7OAhddo7ya8kMnvprhcWM045PmkBdMO9zN0=
+google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
@@ -594,6 +607,7 @@ google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQ
google.golang.org/grpc v1.39.0/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE=
google.golang.org/grpc v1.39.1/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE=
google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34=
+google.golang.org/grpc v1.40.1/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34=
google.golang.org/grpc v1.42.0 h1:XT2/MFpuPFsEX2fWh3YQtHkZ+WYZFQRfaUgLZYj/p6A=
google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw=
diff --git a/server/cache.go b/server/cache.go
index b3557910..64d517d0 100644
--- a/server/cache.go
+++ b/server/cache.go
@@ -14,8 +14,10 @@ var (
// i.e. message structs with the Event messageEvent.
type cache interface {
AddMessage(m *message) error
- Messages(topic string, since sinceTime) ([]*message, error)
+ Messages(topic string, since sinceTime, scheduled bool) ([]*message, error)
+ MessagesDue() ([]*message, error)
MessageCount(topic string) (int, error)
Topics() (map[string]*topic, error)
Prune(olderThan time.Time) error
+ MarkPublished(m *message) error
}
diff --git a/server/cache_mem.go b/server/cache_mem.go
index 9272ebd2..31c7bb97 100644
--- a/server/cache_mem.go
+++ b/server/cache_mem.go
@@ -1,14 +1,16 @@
package server
import (
+ "sort"
"sync"
"time"
)
type memCache struct {
- messages map[string][]*message
- nop bool
- mu sync.Mutex
+ messages map[string][]*message
+ scheduled map[string]*message // Message ID -> message
+ nop bool
+ mu sync.Mutex
}
var _ cache = (*memCache)(nil)
@@ -16,8 +18,9 @@ var _ cache = (*memCache)(nil)
// newMemCache creates an in-memory cache
func newMemCache() *memCache {
return &memCache{
- messages: make(map[string][]*message),
- nop: false,
+ messages: make(map[string][]*message),
+ scheduled: make(map[string]*message),
+ nop: false,
}
}
@@ -25,77 +28,109 @@ func newMemCache() *memCache {
// it is always empty and can be used if caching is entirely disabled
func newNopCache() *memCache {
return &memCache{
- messages: make(map[string][]*message),
- nop: true,
+ messages: make(map[string][]*message),
+ scheduled: make(map[string]*message),
+ nop: true,
}
}
-func (s *memCache) AddMessage(m *message) error {
- s.mu.Lock()
- defer s.mu.Unlock()
- if s.nop {
+func (c *memCache) AddMessage(m *message) error {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ if c.nop {
return nil
}
if m.Event != messageEvent {
return errUnexpectedMessageType
}
- if _, ok := s.messages[m.Topic]; !ok {
- s.messages[m.Topic] = make([]*message, 0)
+ if _, ok := c.messages[m.Topic]; !ok {
+ c.messages[m.Topic] = make([]*message, 0)
}
- s.messages[m.Topic] = append(s.messages[m.Topic], m)
+ delayed := m.Time > time.Now().Unix()
+ if delayed {
+ c.scheduled[m.ID] = m
+ }
+ c.messages[m.Topic] = append(c.messages[m.Topic], m)
return nil
}
-func (s *memCache) Messages(topic string, since sinceTime) ([]*message, error) {
- s.mu.Lock()
- defer s.mu.Unlock()
- if _, ok := s.messages[topic]; !ok || since.IsNone() {
+func (c *memCache) Messages(topic string, since sinceTime, scheduled bool) ([]*message, error) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ if _, ok := c.messages[topic]; !ok || since.IsNone() {
return make([]*message, 0), nil
}
- messages := make([]*message, 0) // copy!
- for _, m := range s.messages[topic] {
- msgTime := time.Unix(m.Time, 0)
- if msgTime == since.Time() || msgTime.After(since.Time()) {
+ messages := make([]*message, 0)
+ for _, m := range c.messages[topic] {
+ _, messageScheduled := c.scheduled[m.ID]
+ include := m.Time >= since.Time().Unix() && (!messageScheduled || scheduled)
+ if include {
messages = append(messages, m)
}
}
+ sort.Slice(messages, func(i, j int) bool {
+ return messages[i].Time < messages[j].Time
+ })
return messages, nil
}
-func (s *memCache) MessageCount(topic string) (int, error) {
- s.mu.Lock()
- defer s.mu.Unlock()
- if _, ok := s.messages[topic]; !ok {
- return 0, nil
+func (c *memCache) MessagesDue() ([]*message, error) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ messages := make([]*message, 0)
+ for _, m := range c.scheduled {
+ due := time.Now().Unix() >= m.Time
+ if due {
+ messages = append(messages, m)
+ }
}
- return len(s.messages[topic]), nil
+ sort.Slice(messages, func(i, j int) bool {
+ return messages[i].Time < messages[j].Time
+ })
+ return messages, nil
}
-func (s *memCache) Topics() (map[string]*topic, error) {
- s.mu.Lock()
- defer s.mu.Unlock()
+func (c *memCache) MarkPublished(m *message) error {
+ c.mu.Lock()
+ delete(c.scheduled, m.ID)
+ c.mu.Unlock()
+ return nil
+}
+
+func (c *memCache) MessageCount(topic string) (int, error) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ if _, ok := c.messages[topic]; !ok {
+ return 0, nil
+ }
+ return len(c.messages[topic]), nil
+}
+
+func (c *memCache) Topics() (map[string]*topic, error) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
topics := make(map[string]*topic)
- for topic := range s.messages {
+ for topic := range c.messages {
topics[topic] = newTopic(topic)
}
return topics, nil
}
-func (s *memCache) Prune(olderThan time.Time) error {
- s.mu.Lock()
- defer s.mu.Unlock()
- for topic := range s.messages {
- s.pruneTopic(topic, olderThan)
+func (c *memCache) Prune(olderThan time.Time) error {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ for topic := range c.messages {
+ c.pruneTopic(topic, olderThan)
}
return nil
}
-func (s *memCache) pruneTopic(topic string, olderThan time.Time) {
+func (c *memCache) pruneTopic(topic string, olderThan time.Time) {
messages := make([]*message, 0)
- for _, m := range s.messages[topic] {
+ for _, m := range c.messages[topic] {
if m.Time >= olderThan.Unix() {
messages = append(messages, m)
}
}
- s.messages[topic] = messages
+ c.messages[topic] = messages
}
diff --git a/server/cache_mem_test.go b/server/cache_mem_test.go
index a1c854d1..831703a0 100644
--- a/server/cache_mem_test.go
+++ b/server/cache_mem_test.go
@@ -9,6 +9,10 @@ func TestMemCache_Messages(t *testing.T) {
testCacheMessages(t, newMemCache())
}
+func TestMemCache_MessagesScheduled(t *testing.T) {
+ testCacheMessagesScheduled(t, newMemCache())
+}
+
func TestMemCache_Topics(t *testing.T) {
testCacheTopics(t, newMemCache())
}
@@ -25,7 +29,7 @@ func TestMemCache_NopCache(t *testing.T) {
c := newNopCache()
assert.Nil(t, c.AddMessage(newDefaultMessage("mytopic", "my message")))
- messages, err := c.Messages("mytopic", sinceAllMessages)
+ messages, err := c.Messages("mytopic", sinceAllMessages, false)
assert.Nil(t, err)
assert.Empty(t, messages)
diff --git a/server/cache_sqlite.go b/server/cache_sqlite.go
index 3c3564de..82d09073 100644
--- a/server/cache_sqlite.go
+++ b/server/cache_sqlite.go
@@ -21,19 +21,32 @@ const (
message VARCHAR(512) NOT NULL,
title VARCHAR(256) NOT NULL,
priority INT NOT NULL,
- tags VARCHAR(256) NOT NULL
+ tags VARCHAR(256) NOT NULL,
+ published INT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
COMMIT;
`
- insertMessageQuery = `INSERT INTO messages (id, time, topic, message, title, priority, tags) VALUES (?, ?, ?, ?, ?, ?, ?)`
- pruneMessagesQuery = `DELETE FROM messages WHERE time < ?`
+ insertMessageQuery = `INSERT INTO messages (id, time, topic, message, title, priority, tags, published) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`
+ pruneMessagesQuery = `DELETE FROM messages WHERE time < ? AND published = 1`
selectMessagesSinceTimeQuery = `
- SELECT id, time, message, title, priority, tags
+ SELECT id, time, topic, message, title, priority, tags
+ FROM messages
+ WHERE topic = ? AND time >= ? AND published = 1
+ ORDER BY time ASC
+ `
+ selectMessagesSinceTimeIncludeScheduledQuery = `
+ SELECT id, time, topic, message, title, priority, tags
FROM messages
WHERE topic = ? AND time >= ?
ORDER BY time ASC
`
+ selectMessagesDueQuery = `
+ SELECT id, time, topic, message, title, priority, tags
+ FROM messages
+ WHERE time <= ? AND published = 0
+ `
+ updateMessagePublishedQuery = `UPDATE messages SET published = 1 WHERE id = ?`
selectMessagesCountQuery = `SELECT COUNT(*) FROM messages`
selectMessageCountForTopicQuery = `SELECT COUNT(*) FROM messages WHERE topic = ?`
selectTopicsQuery = `SELECT topic FROM messages GROUP BY topic`
@@ -41,7 +54,7 @@ const (
// Schema management queries
const (
- currentSchemaVersion = 1
+ currentSchemaVersion = 2
createSchemaVersionTableQuery = `
CREATE TABLE IF NOT EXISTS schemaVersion (
id INT PRIMARY KEY,
@@ -49,6 +62,7 @@ const (
);
`
insertSchemaVersion = `INSERT INTO schemaVersion VALUES (1, ?)`
+ updateSchemaVersion = `UPDATE schemaVersion SET version = ? WHERE id = 1`
selectSchemaVersionQuery = `SELECT version FROM schemaVersion WHERE id = 1`
// 0 -> 1
@@ -59,6 +73,11 @@ const (
ALTER TABLE messages ADD COLUMN tags VARCHAR(256) NOT NULL DEFAULT('');
COMMIT;
`
+
+ // 1 -> 2
+ migrate1To2AlterMessagesTableQuery = `
+ ALTER TABLE messages ADD COLUMN published INT NOT NULL DEFAULT(1);
+ `
)
type sqliteCache struct {
@@ -84,46 +103,39 @@ func (c *sqliteCache) AddMessage(m *message) error {
if m.Event != messageEvent {
return errUnexpectedMessageType
}
- _, err := c.db.Exec(insertMessageQuery, m.ID, m.Time, m.Topic, m.Message, m.Title, m.Priority, strings.Join(m.Tags, ","))
+ published := m.Time <= time.Now().Unix()
+ _, err := c.db.Exec(insertMessageQuery, m.ID, m.Time, m.Topic, m.Message, m.Title, m.Priority, strings.Join(m.Tags, ","), published)
return err
}
-func (c *sqliteCache) Messages(topic string, since sinceTime) ([]*message, error) {
+func (c *sqliteCache) Messages(topic string, since sinceTime, scheduled bool) ([]*message, error) {
if since.IsNone() {
return make([]*message, 0), nil
}
- rows, err := c.db.Query(selectMessagesSinceTimeQuery, topic, since.Time().Unix())
+ var rows *sql.Rows
+ var err error
+ if scheduled {
+ rows, err = c.db.Query(selectMessagesSinceTimeIncludeScheduledQuery, topic, since.Time().Unix())
+ } else {
+ rows, err = c.db.Query(selectMessagesSinceTimeQuery, topic, since.Time().Unix())
+ }
if err != nil {
return nil, err
}
- defer rows.Close()
- messages := make([]*message, 0)
- for rows.Next() {
- var timestamp int64
- var priority int
- var id, msg, title, tagsStr string
- if err := rows.Scan(&id, ×tamp, &msg, &title, &priority, &tagsStr); err != nil {
- return nil, err
- }
- var tags []string
- if tagsStr != "" {
- tags = strings.Split(tagsStr, ",")
- }
- messages = append(messages, &message{
- ID: id,
- Time: timestamp,
- Event: messageEvent,
- Topic: topic,
- Message: msg,
- Title: title,
- Priority: priority,
- Tags: tags,
- })
- }
- if err := rows.Err(); err != nil {
+ return readMessages(rows)
+}
+
+func (c *sqliteCache) MessagesDue() ([]*message, error) {
+ rows, err := c.db.Query(selectMessagesDueQuery, time.Now().Unix())
+ if err != nil {
return nil, err
}
- return messages, nil
+ return readMessages(rows)
+}
+
+func (c *sqliteCache) MarkPublished(m *message) error {
+ _, err := c.db.Exec(updateMessagePublishedQuery, m.ID)
+ return err
}
func (c *sqliteCache) MessageCount(topic string) (int, error) {
@@ -169,13 +181,44 @@ func (c *sqliteCache) Prune(olderThan time.Time) error {
return err
}
+func readMessages(rows *sql.Rows) ([]*message, error) {
+ defer rows.Close()
+ messages := make([]*message, 0)
+ for rows.Next() {
+ var timestamp int64
+ var priority int
+ var id, topic, msg, title, tagsStr string
+ if err := rows.Scan(&id, ×tamp, &topic, &msg, &title, &priority, &tagsStr); err != nil {
+ return nil, err
+ }
+ var tags []string
+ if tagsStr != "" {
+ tags = strings.Split(tagsStr, ",")
+ }
+ messages = append(messages, &message{
+ ID: id,
+ Time: timestamp,
+ Event: messageEvent,
+ Topic: topic,
+ Message: msg,
+ Title: title,
+ Priority: priority,
+ Tags: tags,
+ })
+ }
+ if err := rows.Err(); err != nil {
+ return nil, err
+ }
+ return messages, nil
+}
+
func setupDB(db *sql.DB) error {
// If 'messages' table does not exist, this must be a new database
rowsMC, err := db.Query(selectMessagesCountQuery)
if err != nil {
return setupNewDB(db)
}
- defer rowsMC.Close()
+ rowsMC.Close()
// If 'messages' table exists, check 'schemaVersion' table
schemaVersion := 0
@@ -188,13 +231,16 @@ func setupDB(db *sql.DB) error {
if err := rowsSV.Scan(&schemaVersion); err != nil {
return err
}
+ rowsSV.Close()
}
// Do migrations
if schemaVersion == currentSchemaVersion {
return nil
} else if schemaVersion == 0 {
- return migrateFrom0To1(db)
+ return migrateFrom0(db)
+ } else if schemaVersion == 1 {
+ return migrateFrom1(db)
}
return fmt.Errorf("unexpected schema version found: %d", schemaVersion)
}
@@ -212,7 +258,7 @@ func setupNewDB(db *sql.DB) error {
return nil
}
-func migrateFrom0To1(db *sql.DB) error {
+func migrateFrom0(db *sql.DB) error {
log.Print("Migrating cache database schema: from 0 to 1")
if _, err := db.Exec(migrate0To1AlterMessagesTableQuery); err != nil {
return err
@@ -223,5 +269,16 @@ func migrateFrom0To1(db *sql.DB) error {
if _, err := db.Exec(insertSchemaVersion, 1); err != nil {
return err
}
- return nil
+ return migrateFrom1(db)
+}
+
+func migrateFrom1(db *sql.DB) error {
+ log.Print("Migrating cache database schema: from 1 to 2")
+ if _, err := db.Exec(migrate1To2AlterMessagesTableQuery); err != nil {
+ return err
+ }
+ if _, err := db.Exec(updateSchemaVersion, 2); err != nil {
+ return err
+ }
+ return nil // Update this when a new version is added
}
diff --git a/server/cache_sqlite_test.go b/server/cache_sqlite_test.go
index 0f6c4302..384da256 100644
--- a/server/cache_sqlite_test.go
+++ b/server/cache_sqlite_test.go
@@ -3,16 +3,20 @@ package server
import (
"database/sql"
"fmt"
- "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
"path/filepath"
"testing"
"time"
)
-func TestSqliteCache_AddMessage(t *testing.T) {
+func TestSqliteCache_Messages(t *testing.T) {
testCacheMessages(t, newSqliteTestCache(t))
}
+func TestSqliteCache_MessagesScheduled(t *testing.T) {
+ testCacheMessagesScheduled(t, newSqliteTestCache(t))
+}
+
func TestSqliteCache_Topics(t *testing.T) {
testCacheTopics(t, newSqliteTestCache(t))
}
@@ -25,10 +29,10 @@ func TestSqliteCache_Prune(t *testing.T) {
testCachePrune(t, newSqliteTestCache(t))
}
-func TestSqliteCache_Migration_0to1(t *testing.T) {
+func TestSqliteCache_Migration_From0(t *testing.T) {
filename := newSqliteTestCacheFile(t)
db, err := sql.Open("sqlite3", filename)
- assert.Nil(t, err)
+ require.Nil(t, err)
// Create "version 0" schema
_, err = db.Exec(`
@@ -42,32 +46,91 @@ func TestSqliteCache_Migration_0to1(t *testing.T) {
CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
COMMIT;
`)
- assert.Nil(t, err)
+ require.Nil(t, err)
// Insert a bunch of messages
for i := 0; i < 10; i++ {
_, err = db.Exec(`INSERT INTO messages (id, time, topic, message) VALUES (?, ?, ?, ?)`,
fmt.Sprintf("abcd%d", i), time.Now().Unix(), "mytopic", fmt.Sprintf("some message %d", i))
- assert.Nil(t, err)
+ require.Nil(t, err)
}
+ require.Nil(t, db.Close())
// Create cache to trigger migration
c := newSqliteTestCacheFromFile(t, filename)
- messages, err := c.Messages("mytopic", sinceAllMessages)
- assert.Nil(t, err)
- assert.Equal(t, 10, len(messages))
- assert.Equal(t, "some message 5", messages[5].Message)
- assert.Equal(t, "", messages[5].Title)
- assert.Nil(t, messages[5].Tags)
- assert.Equal(t, 0, messages[5].Priority)
+ checkSchemaVersion(t, c.db)
- rows, err := c.db.Query(`SELECT version FROM schemaVersion`)
- assert.Nil(t, err)
- assert.True(t, rows.Next())
+ messages, err := c.Messages("mytopic", sinceAllMessages, false)
+ require.Nil(t, err)
+ require.Equal(t, 10, len(messages))
+ require.Equal(t, "some message 5", messages[5].Message)
+ require.Equal(t, "", messages[5].Title)
+ require.Nil(t, messages[5].Tags)
+ require.Equal(t, 0, messages[5].Priority)
+}
+
+func TestSqliteCache_Migration_From1(t *testing.T) {
+ filename := newSqliteTestCacheFile(t)
+ db, err := sql.Open("sqlite3", filename)
+ require.Nil(t, err)
+
+ // Create "version 1" schema
+ _, err = db.Exec(`
+ CREATE TABLE IF NOT EXISTS messages (
+ id VARCHAR(20) PRIMARY KEY,
+ time INT NOT NULL,
+ topic VARCHAR(64) NOT NULL,
+ message VARCHAR(512) NOT NULL,
+ title VARCHAR(256) NOT NULL,
+ priority INT NOT NULL,
+ tags VARCHAR(256) NOT NULL
+ );
+ CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
+ CREATE TABLE IF NOT EXISTS schemaVersion (
+ id INT PRIMARY KEY,
+ version INT NOT NULL
+ );
+ INSERT INTO schemaVersion (id, version) VALUES (1, 1);
+ `)
+ require.Nil(t, err)
+
+ // Insert a bunch of messages
+ for i := 0; i < 10; i++ {
+ _, err = db.Exec(`INSERT INTO messages (id, time, topic, message, title, priority, tags) VALUES (?, ?, ?, ?, ?, ?, ?)`,
+ fmt.Sprintf("abcd%d", i), time.Now().Unix(), "mytopic", fmt.Sprintf("some message %d", i), "", 0, "")
+ require.Nil(t, err)
+ }
+ require.Nil(t, db.Close())
+
+ // Create cache to trigger migration
+ c := newSqliteTestCacheFromFile(t, filename)
+ checkSchemaVersion(t, c.db)
+
+ // Add delayed message
+ delayedMessage := newDefaultMessage("mytopic", "some delayed message")
+ delayedMessage.Time = time.Now().Add(time.Minute).Unix()
+ require.Nil(t, c.AddMessage(delayedMessage))
+
+ // 10, not 11!
+ messages, err := c.Messages("mytopic", sinceAllMessages, false)
+ require.Nil(t, err)
+ require.Equal(t, 10, len(messages))
+
+ // 11!
+ messages, err = c.Messages("mytopic", sinceAllMessages, true)
+ require.Nil(t, err)
+ require.Equal(t, 11, len(messages))
+}
+
+func checkSchemaVersion(t *testing.T, db *sql.DB) {
+ rows, err := db.Query(`SELECT version FROM schemaVersion`)
+ require.Nil(t, err)
+ require.True(t, rows.Next())
var schemaVersion int
- assert.Nil(t, rows.Scan(&schemaVersion))
- assert.Equal(t, 1, schemaVersion)
+ require.Nil(t, rows.Scan(&schemaVersion))
+ require.Equal(t, currentSchemaVersion, schemaVersion)
+ require.Nil(t, rows.Close())
}
func newSqliteTestCache(t *testing.T) *sqliteCache {
diff --git a/server/cache_test.go b/server/cache_test.go
index ab65b062..1eae0919 100644
--- a/server/cache_test.go
+++ b/server/cache_test.go
@@ -27,7 +27,7 @@ func testCacheMessages(t *testing.T, c cache) {
assert.Equal(t, 2, count)
// mytopic: since all
- messages, _ := c.Messages("mytopic", sinceAllMessages)
+ messages, _ := c.Messages("mytopic", sinceAllMessages, false)
assert.Equal(t, 2, len(messages))
assert.Equal(t, "my message", messages[0].Message)
assert.Equal(t, "mytopic", messages[0].Topic)
@@ -38,11 +38,11 @@ func testCacheMessages(t *testing.T, c cache) {
assert.Equal(t, "my other message", messages[1].Message)
// mytopic: since none
- messages, _ = c.Messages("mytopic", sinceNoMessages)
+ messages, _ = c.Messages("mytopic", sinceNoMessages, false)
assert.Empty(t, messages)
// mytopic: since 2
- messages, _ = c.Messages("mytopic", sinceTime(time.Unix(2, 0)))
+ messages, _ = c.Messages("mytopic", sinceTime(time.Unix(2, 0)), false)
assert.Equal(t, 1, len(messages))
assert.Equal(t, "my other message", messages[0].Message)
@@ -52,7 +52,7 @@ func testCacheMessages(t *testing.T, c cache) {
assert.Equal(t, 1, count)
// example: since all
- messages, _ = c.Messages("example", sinceAllMessages)
+ messages, _ = c.Messages("example", sinceAllMessages, false)
assert.Equal(t, "my example message", messages[0].Message)
// non-existing: count
@@ -61,7 +61,7 @@ func testCacheMessages(t *testing.T, c cache) {
assert.Equal(t, 0, count)
// non-existing: since all
- messages, _ = c.Messages("doesnotexist", sinceAllMessages)
+ messages, _ = c.Messages("doesnotexist", sinceAllMessages, false)
assert.Empty(t, messages)
}
@@ -103,7 +103,7 @@ func testCachePrune(t *testing.T, c cache) {
assert.Nil(t, err)
assert.Equal(t, 0, count)
- messages, err := c.Messages("mytopic", sinceAllMessages)
+ messages, err := c.Messages("mytopic", sinceAllMessages, false)
assert.Nil(t, err)
assert.Equal(t, 1, len(messages))
assert.Equal(t, "my other message", messages[0].Message)
@@ -116,8 +116,34 @@ func testCacheMessagesTagsPrioAndTitle(t *testing.T, c cache) {
m.Title = "some title"
assert.Nil(t, c.AddMessage(m))
- messages, _ := c.Messages("mytopic", sinceAllMessages)
+ messages, _ := c.Messages("mytopic", sinceAllMessages, false)
assert.Equal(t, []string{"tag1", "tag2"}, messages[0].Tags)
assert.Equal(t, 5, messages[0].Priority)
assert.Equal(t, "some title", messages[0].Title)
}
+
+func testCacheMessagesScheduled(t *testing.T, c cache) {
+ m1 := newDefaultMessage("mytopic", "message 1")
+ m2 := newDefaultMessage("mytopic", "message 2")
+ m2.Time = time.Now().Add(time.Hour).Unix()
+ m3 := newDefaultMessage("mytopic", "message 3")
+ m3.Time = time.Now().Add(time.Minute).Unix() // earlier than m2!
+ m4 := newDefaultMessage("mytopic2", "message 4")
+ m4.Time = time.Now().Add(time.Minute).Unix()
+ assert.Nil(t, c.AddMessage(m1))
+ assert.Nil(t, c.AddMessage(m2))
+ assert.Nil(t, c.AddMessage(m3))
+
+ messages, _ := c.Messages("mytopic", sinceAllMessages, false) // exclude scheduled
+ assert.Equal(t, 1, len(messages))
+ assert.Equal(t, "message 1", messages[0].Message)
+
+ messages, _ = c.Messages("mytopic", sinceAllMessages, true) // include scheduled
+ assert.Equal(t, 3, len(messages))
+ assert.Equal(t, "message 1", messages[0].Message)
+ assert.Equal(t, "message 3", messages[1].Message) // Order!
+ assert.Equal(t, "message 2", messages[2].Message)
+
+ messages, _ = c.MessagesDue()
+ assert.Empty(t, messages)
+}
diff --git a/server/server.go b/server/server.go
index 7ee039c4..6fadd9d9 100644
--- a/server/server.go
+++ b/server/server.go
@@ -71,10 +71,6 @@ var (
sinceNoMessages = sinceTime(time.Unix(1, 0))
)
-const (
- messageLimit = 512
-)
-
var (
topicRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}$`) // Regex must match JS & Android app!
jsonRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/json$`)
@@ -180,7 +176,16 @@ func (s *Server) Run() error {
ticker := time.NewTicker(s.config.ManagerInterval)
for {
<-ticker.C
- s.updateStatsAndExpire()
+ s.updateStatsAndPrune()
+ }
+ }()
+ go func() {
+ ticker := time.NewTicker(s.config.AtSenderInterval)
+ for {
+ <-ticker.C
+ if err := s.sendDelayedMessages(); err != nil {
+ log.Printf("error sending scheduled messages: %s", err.Error())
+ }
}
}()
listenStr := fmt.Sprintf("%s/http", s.config.ListenHTTP)
@@ -270,7 +275,7 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, _ *visito
if err != nil {
return err
}
- reader := io.LimitReader(r.Body, messageLimit)
+ reader := io.LimitReader(r.Body, int64(s.config.MessageLimit))
b, err := io.ReadAll(reader)
if err != nil {
return err
@@ -279,14 +284,17 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, _ *visito
if m.Message == "" {
return errHTTPBadRequest
}
- title, priority, tags, cache, firebase := parseHeaders(r.Header)
- m.Title = title
- m.Priority = priority
- m.Tags = tags
- if err := t.Publish(m); err != nil {
+ cache, firebase, err := s.parseHeaders(r.Header, m)
+ if err != nil {
return err
}
- if s.firebase != nil && firebase {
+ delayed := m.Time > time.Now().Unix()
+ if !delayed {
+ if err := t.Publish(m); err != nil {
+ return err
+ }
+ }
+ if s.firebase != nil && firebase && !delayed {
go func() {
if err := s.firebase(m); err != nil {
log.Printf("Unable to publish to Firebase: %v", err.Error())
@@ -308,35 +316,50 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, _ *visito
return nil
}
-func parseHeaders(header http.Header) (title string, priority int, tags []string, cache bool, firebase bool) {
- title = readHeader(header, "x-title", "title", "ti", "t")
+func (s *Server) parseHeaders(header http.Header, m *message) (cache bool, firebase bool, err error) {
+ cache = readHeader(header, "x-cache", "cache") != "no"
+ firebase = readHeader(header, "x-firebase", "firebase") != "no"
+ m.Title = readHeader(header, "x-title", "title", "ti", "t")
priorityStr := readHeader(header, "x-priority", "priority", "prio", "p")
if priorityStr != "" {
switch strings.ToLower(priorityStr) {
case "1", "min":
- priority = 1
+ m.Priority = 1
case "2", "low":
- priority = 2
+ m.Priority = 2
case "3", "default":
- priority = 3
+ m.Priority = 3
case "4", "high":
- priority = 4
+ m.Priority = 4
case "5", "max", "urgent":
- priority = 5
+ m.Priority = 5
default:
- priority = 0
+ return false, false, errHTTPBadRequest
}
}
tagsStr := readHeader(header, "x-tags", "tag", "tags", "ta")
if tagsStr != "" {
- tags = make([]string, 0)
+ m.Tags = make([]string, 0)
for _, s := range strings.Split(tagsStr, ",") {
- tags = append(tags, strings.TrimSpace(s))
+ m.Tags = append(m.Tags, strings.TrimSpace(s))
}
}
- cache = readHeader(header, "x-cache", "cache") != "no"
- firebase = readHeader(header, "x-firebase", "firebase") != "no"
- return title, priority, tags, cache, firebase
+ delayStr := readHeader(header, "x-delay", "delay", "x-at", "at", "x-in", "in")
+ if delayStr != "" {
+ if !cache {
+ return false, false, errHTTPBadRequest
+ }
+ delay, err := util.ParseFutureTime(delayStr, time.Now())
+ if err != nil {
+ return false, false, errHTTPBadRequest
+ } else if delay.Unix() < time.Now().Add(s.config.MinDelay).Unix() {
+ return false, false, errHTTPBadRequest
+ } else if delay.Unix() > time.Now().Add(s.config.MaxDelay).Unix() {
+ return false, false, errHTTPBadRequest
+ }
+ m.Time = delay.Unix()
+ }
+ return cache, firebase, nil
}
func readHeader(header http.Header, names ...string) string {
@@ -401,6 +424,7 @@ func (s *Server) handleSubscribe(w http.ResponseWriter, r *http.Request, v *visi
}
var wlock sync.Mutex
poll := r.URL.Query().Has("poll")
+ scheduled := r.URL.Query().Has("scheduled") || r.URL.Query().Has("sched")
sub := func(msg *message) error {
wlock.Lock()
defer wlock.Unlock()
@@ -419,7 +443,7 @@ func (s *Server) handleSubscribe(w http.ResponseWriter, r *http.Request, v *visi
w.Header().Set("Access-Control-Allow-Origin", "*") // CORS, allow cross-origin requests
w.Header().Set("Content-Type", contentType+"; charset=utf-8") // Android/Volley client needs charset!
if poll {
- return s.sendOldMessages(topics, since, sub)
+ return s.sendOldMessages(topics, since, scheduled, sub)
}
subscriberIDs := make([]int, 0)
for _, t := range topics {
@@ -433,7 +457,7 @@ func (s *Server) handleSubscribe(w http.ResponseWriter, r *http.Request, v *visi
if err := sub(newOpenMessage(topicsStr)); err != nil { // Send out open message
return err
}
- if err := s.sendOldMessages(topics, since, sub); err != nil {
+ if err := s.sendOldMessages(topics, since, scheduled, sub); err != nil {
return err
}
for {
@@ -449,12 +473,12 @@ func (s *Server) handleSubscribe(w http.ResponseWriter, r *http.Request, v *visi
}
}
-func (s *Server) sendOldMessages(topics []*topic, since sinceTime, sub subscriber) error {
+func (s *Server) sendOldMessages(topics []*topic, since sinceTime, scheduled bool, sub subscriber) error {
if since.IsNone() {
return nil
}
for _, t := range topics {
- messages, err := s.cache.Messages(t.ID, since)
+ messages, err := s.cache.Messages(t.ID, since, scheduled)
if err != nil {
return err
}
@@ -521,7 +545,7 @@ func (s *Server) topicsFromIDs(ids ...string) ([]*topic, error) {
return topics, nil
}
-func (s *Server) updateStatsAndExpire() {
+func (s *Server) updateStatsAndPrune() {
s.mu.Lock()
defer s.mu.Unlock()
@@ -532,13 +556,13 @@ func (s *Server) updateStatsAndExpire() {
}
}
- // Prune cache
+ // Prune message cache
olderThan := time.Now().Add(-1 * s.config.CacheDuration)
if err := s.cache.Prune(olderThan); err != nil {
log.Printf("error pruning cache: %s", err.Error())
}
- // Prune old messages, remove subscriptions without subscribers
+ // Prune old topics, remove subscriptions without subscribers
var subscribers, messages int
for _, t := range s.topics {
subs := t.Subscribers()
@@ -560,6 +584,32 @@ func (s *Server) updateStatsAndExpire() {
s.messages, len(s.topics), subscribers, messages, len(s.visitors))
}
+func (s *Server) sendDelayedMessages() error {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ messages, err := s.cache.MessagesDue()
+ if err != nil {
+ return err
+ }
+ for _, m := range messages {
+ t, ok := s.topics[m.Topic] // If no subscribers, just mark message as published
+ if ok {
+ if err := t.Publish(m); err != nil {
+ log.Printf("unable to publish message %s to topic %s: %v", m.ID, m.Topic, err.Error())
+ }
+ if s.firebase != nil {
+ if err := s.firebase(m); err != nil {
+ log.Printf("unable to publish to Firebase: %v", err.Error())
+ }
+ }
+ }
+ if err := s.cache.MarkPublished(m); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
func (s *Server) withRateLimit(w http.ResponseWriter, r *http.Request, handler func(w http.ResponseWriter, r *http.Request, v *visitor) error) error {
v := s.visitor(r)
if err := v.RequestAllowed(); err != nil {
diff --git a/server/server_test.go b/server/server_test.go
index 1513cb90..cbe24a5f 100644
--- a/server/server_test.go
+++ b/server/server_test.go
@@ -4,7 +4,7 @@ import (
"bufio"
"context"
"encoding/json"
- "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
"heckel.io/ntfy/config"
"net/http"
"net/http/httptest"
@@ -19,33 +19,33 @@ func TestServer_PublishAndPoll(t *testing.T) {
response1 := request(t, s, "PUT", "/mytopic", "my first message", nil)
msg1 := toMessage(t, response1.Body.String())
- assert.NotEmpty(t, msg1.ID)
- assert.Equal(t, "my first message", msg1.Message)
+ require.NotEmpty(t, msg1.ID)
+ require.Equal(t, "my first message", msg1.Message)
response2 := request(t, s, "PUT", "/mytopic", "my second\n\nmessage", nil)
msg2 := toMessage(t, response2.Body.String())
- assert.NotEqual(t, msg1.ID, msg2.ID)
- assert.NotEmpty(t, msg2.ID)
- assert.Equal(t, "my second\n\nmessage", msg2.Message)
+ require.NotEqual(t, msg1.ID, msg2.ID)
+ require.NotEmpty(t, msg2.ID)
+ require.Equal(t, "my second\n\nmessage", msg2.Message)
response := request(t, s, "GET", "/mytopic/json?poll=1", "", nil)
messages := toMessages(t, response.Body.String())
- assert.Equal(t, 2, len(messages))
- assert.Equal(t, "my first message", messages[0].Message)
- assert.Equal(t, "my second\n\nmessage", messages[1].Message)
+ require.Equal(t, 2, len(messages))
+ require.Equal(t, "my first message", messages[0].Message)
+ require.Equal(t, "my second\n\nmessage", messages[1].Message)
response = request(t, s, "GET", "/mytopic/sse?poll=1", "", nil)
lines := strings.Split(strings.TrimSpace(response.Body.String()), "\n")
- assert.Equal(t, 3, len(lines))
- assert.Equal(t, "my first message", toMessage(t, strings.TrimPrefix(lines[0], "data: ")).Message)
- assert.Equal(t, "", lines[1])
- assert.Equal(t, "my second\n\nmessage", toMessage(t, strings.TrimPrefix(lines[2], "data: ")).Message)
+ require.Equal(t, 3, len(lines))
+ require.Equal(t, "my first message", toMessage(t, strings.TrimPrefix(lines[0], "data: ")).Message)
+ require.Equal(t, "", lines[1])
+ require.Equal(t, "my second\n\nmessage", toMessage(t, strings.TrimPrefix(lines[2], "data: ")).Message)
response = request(t, s, "GET", "/mytopic/raw?poll=1", "", nil)
lines = strings.Split(strings.TrimSpace(response.Body.String()), "\n")
- assert.Equal(t, 2, len(lines))
- assert.Equal(t, "my first message", lines[0])
- assert.Equal(t, "my second message", lines[1]) // \n -> " "
+ require.Equal(t, 2, len(lines))
+ require.Equal(t, "my first message", lines[0])
+ require.Equal(t, "my second message", lines[1]) // \n -> " "
}
func TestServer_SubscribeOpenAndKeepalive(t *testing.T) {
@@ -69,21 +69,21 @@ func TestServer_SubscribeOpenAndKeepalive(t *testing.T) {
<-doneChan
messages := toMessages(t, rr.Body.String())
- assert.Equal(t, 2, len(messages))
+ require.Equal(t, 2, len(messages))
- assert.Equal(t, openEvent, messages[0].Event)
- assert.Equal(t, "mytopic", messages[0].Topic)
- assert.Equal(t, "", messages[0].Message)
- assert.Equal(t, "", messages[0].Title)
- assert.Equal(t, 0, messages[0].Priority)
- assert.Nil(t, messages[0].Tags)
+ require.Equal(t, openEvent, messages[0].Event)
+ require.Equal(t, "mytopic", messages[0].Topic)
+ require.Equal(t, "", messages[0].Message)
+ require.Equal(t, "", messages[0].Title)
+ require.Equal(t, 0, messages[0].Priority)
+ require.Nil(t, messages[0].Tags)
- assert.Equal(t, keepaliveEvent, messages[1].Event)
- assert.Equal(t, "mytopic", messages[1].Topic)
- assert.Equal(t, "", messages[1].Message)
- assert.Equal(t, "", messages[1].Title)
- assert.Equal(t, 0, messages[1].Priority)
- assert.Nil(t, messages[1].Tags)
+ require.Equal(t, keepaliveEvent, messages[1].Event)
+ require.Equal(t, "mytopic", messages[1].Topic)
+ require.Equal(t, "", messages[1].Message)
+ require.Equal(t, "", messages[1].Title)
+ require.Equal(t, 0, messages[1].Priority)
+ require.Nil(t, messages[1].Tags)
}
func TestServer_PublishAndSubscribe(t *testing.T) {
@@ -93,63 +93,79 @@ func TestServer_PublishAndSubscribe(t *testing.T) {
subscribeCancel := subscribe(t, s, "/mytopic/json", subscribeRR)
publishFirstRR := request(t, s, "PUT", "/mytopic", "my first message", nil)
- assert.Equal(t, 200, publishFirstRR.Code)
+ require.Equal(t, 200, publishFirstRR.Code)
publishSecondRR := request(t, s, "PUT", "/mytopic", "my other message", map[string]string{
"Title": " This is a title ",
"X-Tags": "tag1,tag 2, tag3",
"p": "1",
})
- assert.Equal(t, 200, publishSecondRR.Code)
+ require.Equal(t, 200, publishSecondRR.Code)
subscribeCancel()
messages := toMessages(t, subscribeRR.Body.String())
- assert.Equal(t, 3, len(messages))
- assert.Equal(t, openEvent, messages[0].Event)
+ require.Equal(t, 3, len(messages))
+ require.Equal(t, openEvent, messages[0].Event)
- assert.Equal(t, messageEvent, messages[1].Event)
- assert.Equal(t, "mytopic", messages[1].Topic)
- assert.Equal(t, "my first message", messages[1].Message)
- assert.Equal(t, "", messages[1].Title)
- assert.Equal(t, 0, messages[1].Priority)
- assert.Nil(t, messages[1].Tags)
+ require.Equal(t, messageEvent, messages[1].Event)
+ require.Equal(t, "mytopic", messages[1].Topic)
+ require.Equal(t, "my first message", messages[1].Message)
+ require.Equal(t, "", messages[1].Title)
+ require.Equal(t, 0, messages[1].Priority)
+ require.Nil(t, messages[1].Tags)
- assert.Equal(t, messageEvent, messages[2].Event)
- assert.Equal(t, "mytopic", messages[2].Topic)
- assert.Equal(t, "my other message", messages[2].Message)
- assert.Equal(t, "This is a title", messages[2].Title)
- assert.Equal(t, 1, messages[2].Priority)
- assert.Equal(t, []string{"tag1", "tag 2", "tag3"}, messages[2].Tags)
+ require.Equal(t, messageEvent, messages[2].Event)
+ require.Equal(t, "mytopic", messages[2].Topic)
+ require.Equal(t, "my other message", messages[2].Message)
+ require.Equal(t, "This is a title", messages[2].Title)
+ require.Equal(t, 1, messages[2].Priority)
+ require.Equal(t, []string{"tag1", "tag 2", "tag3"}, messages[2].Tags)
}
func TestServer_StaticSites(t *testing.T) {
s := newTestServer(t, newTestConfig(t))
rr := request(t, s, "GET", "/", "", nil)
- assert.Equal(t, 200, rr.Code)
- assert.Contains(t, rr.Body.String(), "