diff --git a/.gitignore b/.gitignore index 1062418c..a88775f3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ +dist/ .idea/ *.iml diff --git a/.goreleaser.yml b/.goreleaser.yml new file mode 100644 index 00000000..a71e1042 --- /dev/null +++ b/.goreleaser.yml @@ -0,0 +1,62 @@ +before: + hooks: + - go mod download +builds: + - binary: ntfy + env: + - CGO_ENABLED=0 + goos: + - linux + - windows + - darwin + goarch: + - amd64 +nfpms: + - + package_name: ntfy + file_name_template: "{{ .ProjectName }}_{{ .Version }}_{{ .Arch }}" + homepage: https://heckel.io/ntfy + maintainer: Philipp C. Heckel + description: Simple pub-sub notification service + license: Apache 2.0 + formats: + - deb + - rpm + bindir: /usr/bin + contents: + - src: config/config.yml + dst: /etc/ntfy/config.yml + type: config + - src: config/ntfy.service + dst: /lib/systemd/system/ntfy.service + scripts: + postremove: "scripts/postrm.sh" +archives: + - + wrap_in_directory: true + files: + - LICENSE + - README.md + - config/config.yml + - config/ntfy.service + replacements: + 386: i386 + amd64: x86_64 +checksum: + name_template: 'checksums.txt' +snapshot: + name_template: "{{ .Tag }}-next" +changelog: + sort: asc + filters: + exclude: + - '^docs:' + - '^test:' +dockers: + - dockerfile: Dockerfile + ids: + - ntfy + image_templates: + - "binwiederhier/ntfy:latest" + - "binwiederhier/ntfy:{{ .Tag }}" + - "binwiederhier/ntfy:v{{ .Major }}.{{ .Minor }}" diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 00000000..8e789a7b --- /dev/null +++ b/Dockerfile @@ -0,0 +1,5 @@ +FROM alpine +MAINTAINER Philipp C. Heckel + +COPY ntfy /usr/bin +ENTRYPOINT ["ntfy"] diff --git a/Makefile b/Makefile new file mode 100644 index 00000000..109e6db6 --- /dev/null +++ b/Makefile @@ -0,0 +1,124 @@ +GO=$(shell which go) +VERSION := $(shell git describe --tag) + +.PHONY: + +help: + @echo "Typical commands:" + @echo " make check - Run all tests, vetting/formatting checks and linters" + @echo " make fmt build-snapshot install - Build latest and install to local system" + @echo + @echo "Test/check:" + @echo " make test - Run tests" + @echo " make race - Run tests with -race flag" + @echo " make coverage - Run tests and show coverage" + @echo " make coverage-html - Run tests and show coverage (as HTML)" + @echo " make coverage-upload - Upload coverage results to codecov.io" + @echo + @echo "Lint/format:" + @echo " make fmt - Run 'go fmt'" + @echo " make fmt-check - Run 'go fmt', but don't change anything" + @echo " make vet - Run 'go vet'" + @echo " make lint - Run 'golint'" + @echo " make staticcheck - Run 'staticcheck'" + @echo + @echo "Build:" + @echo " make build - Build" + @echo " make build-snapshot - Build snapshot" + @echo " make build-simple - Build (using go build, without goreleaser)" + @echo " make clean - Clean build folder" + @echo + @echo "Releasing (requires goreleaser):" + @echo " make release - Create a release" + @echo " make release-snapshot - Create a test release" + @echo + @echo "Install locally (requires sudo):" + @echo " make install - Copy binary from dist/ to /usr/bin" + @echo " make install-deb - Install .deb from dist/" + @echo " make install-lint - Install golint" + + +# Test/check targets + +check: test fmt-check vet lint staticcheck + +test: .PHONY + $(GO) test ./... + +race: .PHONY + $(GO) test -race ./... + +coverage: + mkdir -p build/coverage + $(GO) test -race -coverprofile=build/coverage/coverage.txt -covermode=atomic ./... + $(GO) tool cover -func build/coverage/coverage.txt + +coverage-html: + mkdir -p build/coverage + $(GO) test -race -coverprofile=build/coverage/coverage.txt -covermode=atomic ./... + $(GO) tool cover -html build/coverage/coverage.txt + +coverage-upload: + cd build/coverage && (curl -s https://codecov.io/bash | bash) + +# Lint/formatting targets + +fmt: + $(GO) fmt ./... + +fmt-check: + test -z $(shell gofmt -l .) + +vet: + $(GO) vet ./... + +lint: + which golint || $(GO) get -u golang.org/x/lint/golint + $(GO) list ./... | grep -v /vendor/ | xargs -L1 golint -set_exit_status + +staticcheck: .PHONY + rm -rf build/staticcheck + which staticcheck || go get honnef.co/go/tools/cmd/staticcheck + mkdir -p build/staticcheck + ln -s "$(GO)" build/staticcheck/go + PATH="$(PWD)/build/staticcheck:$(PATH)" staticcheck ./... + rm -rf build/staticcheck + +# Building targets + +build: .PHONY + goreleaser build --rm-dist + +build-snapshot: + goreleaser build --snapshot --rm-dist + +build-simple: clean + mkdir -p dist/ntfy_linux_amd64 + $(GO) build \ + -o dist/ntfy_linux_amd64/ntfy \ + -ldflags \ + "-s -w -X main.version=$(VERSION) -X main.commit=$(shell git rev-parse --short HEAD) -X main.date=$(shell date +%s)" + +clean: .PHONY + rm -rf dist build + + +# Releasing targets + +release: + goreleaser release --rm-dist + +release-snapshot: + goreleaser release --snapshot --skip-publish --rm-dist + + +# Installing targets + +install: + sudo rm -f /usr/bin/ntfy + sudo cp -a dist/ntfy_linux_amd64/ntfy /usr/bin/ntfy + +install-deb: + sudo systemctl stop ntfy || true + sudo apt-get purge ntfy || true + sudo dpkg -i dist/*.deb diff --git a/README.md b/README.md index af79562e..45205a13 100644 --- a/README.md +++ b/README.md @@ -1,41 +1,63 @@ # ntfy -ntfy is a super simple pub-sub notification service. It allows you to send desktop and (soon) phone notifications -via scripts. I run a free version of it on *[ntfy.sh](https://ntfy.sh)*. No signups or cost. +ntfy (pronounce: *notify*) is a super simple pub-sub notification service. It allows you to send desktop and (soon) phone notifications +via scripts. I run a free version of it on *[ntfy.sh](https://ntfy.sh)*. **No signups or cost.** ## Usage ### Subscribe to a topic +You can subscribe to a topic either in a web UI, or in your own app by subscribing to an +[SSE](https://en.wikipedia.org/wiki/Server-sent_events)/[EventSource](https://developer.mozilla.org/en-US/docs/Web/API/EventSource), +or a JSON or raw feed. -You can subscribe to a topic either in a web UI, or in your own app by subscribing to an SSE/EventSource -or JSON feed. - -Here's how to do it via curl see the SSE stream in `curl`: +Here's how to see the raw/json/sse stream in `curl`. This will subscribe to the topic and wait for events. ``` -curl -s localhost:9997/mytopic/sse +# Subscribe to "mytopic" and output one message per line (\n are replaced with a space) +curl -s ntfy.sh/mytopic/raw + +# Subscribe to "mytopic" and output one JSON message per line +curl -s ntfy.sh/mytopic/json + +# Subscribe to "mytopic" and output an SSE stream (supported via JS/EventSource) +curl -s ntfy.sh/mytopic/sse ``` -You can easily script it to execute any command when a message arrives: +You can easily script it to execute any command when a message arrives. This sends desktop notifications (just like +the web UI, but without it): ``` -while read json; do - msg="$(echo "$json" | jq -r .message)" +while read msg; do notify-send "$msg" -done < <(stdbuf -i0 -o0 curl -s localhost:9997/mytopic/json) +done < <(stdbuf -i0 -o0 curl -s ntfy.sh/mytopic/raw) ``` ### Publish messages - Publishing messages can be done via PUT or POST using. Here's an example using `curl`: ``` curl -d "long process is done" ntfy.sh/mytopic ``` +Messages published to a non-existing topic or a topic without subscribers will not be delivered later. There is (currently) +no buffering of any kind. If you're not listening, the message won't be delivered. + +## FAQ + +### Isn't this like ...? +Probably. I didn't do a whole lot of research before making this. + +### Can I use this in my app? +Yes. As long as you don't abuse it, it'll be available and free of charge. + +### What are the uptime guarantees? +Best effort. + +### Why is the web UI so ugly? +I don't particularly like JS or dealing with CSS. I'll make it pretty after it's functional. + ## TODO -- /raw endpoint -- netcat usage - rate limiting / abuse protection - release/packaging +- add HTTPS ## Contributing I welcome any and all contributions. Just create a PR or an issue. diff --git a/cmd/app.go b/cmd/app.go new file mode 100644 index 00000000..4d8c1fa1 --- /dev/null +++ b/cmd/app.go @@ -0,0 +1,73 @@ +// Package cmd provides the ntfy CLI application +package cmd + +import ( + "fmt" + "github.com/urfave/cli/v2" + "github.com/urfave/cli/v2/altsrc" + "heckel.io/ntfy/config" + "heckel.io/ntfy/server" + "log" + "os" +) + +// New creates a new CLI application +func New() *cli.App { + flags := []cli.Flag{ + &cli.StringFlag{Name: "config", Aliases: []string{"c"}, EnvVars: []string{"NTFY_CONFIG_FILE"}, Value: "/etc/ntfy/config.yml", DefaultText: "/etc/ntfy/config.yml", Usage: "config file"}, + altsrc.NewStringFlag(&cli.StringFlag{Name: "listen-http", Aliases: []string{"l"}, EnvVars: []string{"NTFY_LISTEN_HTTP"}, Value: config.DefaultListenHTTP, Usage: "ip:port used to as listen address"}), + } + return &cli.App{ + Name: "ntfy", + Usage: "Simple pub-sub notification service", + UsageText: "ntfy [OPTION..]", + HideHelp: true, + HideVersion: true, + EnableBashCompletion: true, + UseShortOptionHandling: true, + Reader: os.Stdin, + Writer: os.Stdout, + ErrWriter: os.Stderr, + Action: execRun, + Before: initConfigFileInputSource("config", flags), + Flags: flags, + } +} + +func execRun(c *cli.Context) error { + // Read all the options + listenHTTP := c.String("listen-http") + + // Run main bot, can be killed by signal + conf := config.New(listenHTTP) + s := server.New(conf) + if err := s.Run(); err != nil { + log.Fatalln(err) + } + + log.Printf("Exiting.") + return nil +} + +// initConfigFileInputSource is like altsrc.InitInputSourceWithContext and altsrc.NewYamlSourceFromFlagFunc, but checks +// if the config flag is exists and only loads it if it does. If the flag is set and the file exists, it fails. +func initConfigFileInputSource(configFlag string, flags []cli.Flag) cli.BeforeFunc { + return func(context *cli.Context) error { + configFile := context.String(configFlag) + if context.IsSet(configFlag) && !fileExists(configFile) { + return fmt.Errorf("config file %s does not exist", configFile) + } else if !context.IsSet(configFlag) && !fileExists(configFile) { + return nil + } + inputSource, err := altsrc.NewYamlSourceFromFile(configFile) + if err != nil { + return err + } + return altsrc.ApplyInputSourceValues(context, inputSource, flags) + } +} + +func fileExists(filename string) bool { + stat, _ := os.Stat(filename) + return stat != nil +} diff --git a/config/config.go b/config/config.go new file mode 100644 index 00000000..85297910 --- /dev/null +++ b/config/config.go @@ -0,0 +1,18 @@ +// Package config provides the main configuration +package config + +const ( + DefaultListenHTTP = ":80" +) + +// Config is the main config struct for the application. Use New to instantiate a default config struct. +type Config struct { + ListenHTTP string +} + +// New instantiates a default new config +func New(listenHTTP string) *Config { + return &Config{ + ListenHTTP: listenHTTP, + } +} diff --git a/config/config.yml b/config/config.yml new file mode 100644 index 00000000..553ff00f --- /dev/null +++ b/config/config.yml @@ -0,0 +1,9 @@ +# ntfy config file + +# Listen address for the HTTP web server +# +# Format: : +# Default: :80 +# Required: No +# +# listen-http: ":80" diff --git a/config/ntfy.service b/config/ntfy.service new file mode 100644 index 00000000..4a70cd02 --- /dev/null +++ b/config/ntfy.service @@ -0,0 +1,10 @@ +[Unit] +Description=ntfy server +After=network.target + +[Service] +ExecStart=/usr/bin/ntfy +Restart=on-failure + +[Install] +WantedBy=multi-user.target diff --git a/go.mod b/go.mod index ee3af69f..7c9ad4cd 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,10 @@ -module heckel.io/notifyme +module heckel.io/ntfy go 1.16 -require github.com/gorilla/websocket v1.4.2 // indirect +require ( + github.com/BurntSushi/toml v0.4.1 // indirect + github.com/cpuguy83/go-md2man/v2 v2.0.1 // indirect + github.com/urfave/cli/v2 v2.3.0 + gopkg.in/yaml.v2 v2.4.0 // indirect +) diff --git a/go.sum b/go.sum index 85efffd9..2cd4b5df 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,23 @@ -github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= -github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/BurntSushi/toml v0.4.1 h1:GaI7EiDXDRfa8VshkTj7Fym7ha+y8/XxIgD2okUIjLw= +github.com/BurntSushi/toml v0.4.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= +github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d h1:U+s90UTSYgptZMwQh2aRr3LuazLJIa+Pg3Kc1ylSYVY= +github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= +github.com/cpuguy83/go-md2man/v2 v2.0.1 h1:r/myEWzV9lfsM1tFLgDyu0atFtJ1fXn261LKYj/3DxU= +github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q= +github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo= +github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= +github.com/urfave/cli/v2 v2.3.0 h1:qph92Y649prgesehzOrQjdWyxFOp/QVM+6imKHad91M= +github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.3 h1:fvjTMHxHEw/mxHbtzPi3JCcKXQRAnQTBRo6YCJSVHKI= +gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= diff --git a/main.go b/main.go index 43af89f2..cecae09d 100644 --- a/main.go +++ b/main.go @@ -1,13 +1,32 @@ package main import ( - "heckel.io/notifyme/server" - "log" + "fmt" + "github.com/urfave/cli/v2" + "heckel.io/ntfy/cmd" + "os" + "runtime" +) + +var ( + version = "dev" + commit = "unknown" + date = "unknown" ) func main() { - s := server.New() - if err := s.Run(); err != nil { - log.Fatalln(err) + cli.AppHelpTemplate += fmt.Sprintf(` +Try 'ntfy COMMAND --help' for more information. + +ntfy %s (%s), runtime %s, built at %s +Copyright (C) 2021 Philipp C. Heckel, distributed under the Apache License 2.0 +`, version, commit[:7], runtime.Version(), date) + + app := cmd.New() + app.Version = version + + if err := app.Run(os.Args); err != nil { + fmt.Fprintln(os.Stderr, err.Error()) + os.Exit(1) } } diff --git a/scripts/postrm.sh b/scripts/postrm.sh new file mode 100644 index 00000000..4588bc27 --- /dev/null +++ b/scripts/postrm.sh @@ -0,0 +1,6 @@ +#!/bin/sh +set -eu +systemctl stop ntfy >/dev/null 2>&1 || true +if [ "$1" = "purge" ]; then + rm -rf /etc/ntfy +fi diff --git a/server/index.html b/server/index.html index 50d99944..1e3c2318 100644 --- a/server/index.html +++ b/server/index.html @@ -3,37 +3,45 @@ ntfy.sh -

ntfy.sh

+
+

ntfy.sh

-

- ntfy.sh is a super simple pub-sub notification service. It allows you to send desktop and (soon) phone notifications - via scripts, without signup or cost. It's entirely free and open source. You can find the source code on GitHub. -

+

+ ntfy (pronounce: notify) is a simple HTTP-based pub-sub notification service. It allows you to send desktop and (soon) phone notifications + via scripts, without signup or cost. It's entirely free and open source. You can find the source code on GitHub. +

-

- You can subscribe to a topic either in this web UI, or in your own app by subscribing to an SSE/EventSource - or JSON feed. Once subscribed, you can publish messages via PUT or POST. -

+

+ You can subscribe to a topic either in this web UI, or in your own app by subscribing to an SSE/EventSource + or JSON feed. Once subscribed, you can publish messages via PUT or POST. +

-

+

-
- - -
+
+

+ + +

+
-

Topics:

-
    -
+

Subscribed topics:

+
    + +
    diff --git a/server/server.go b/server/server.go index f620114e..77b2e5ac 100644 --- a/server/server.go +++ b/server/server.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "heckel.io/ntfy/config" "io" "log" "net/http" @@ -16,6 +17,7 @@ import ( ) type Server struct { + config *config.Config topics map[string]*topic mu sync.Mutex } @@ -33,13 +35,17 @@ var ( topicRegex = regexp.MustCompile(`^/[^/]+$`) jsonRegex = regexp.MustCompile(`^/[^/]+/json$`) sseRegex = regexp.MustCompile(`^/[^/]+/sse$`) + rawRegex = regexp.MustCompile(`^/[^/]+/raw$`) //go:embed "index.html" indexSource string + + errTopicNotFound = errors.New("topic not found") ) -func New() *Server { +func New(conf *config.Config) *Server { return &Server{ + config: conf, topics: make(map[string]*topic), } } @@ -50,23 +56,22 @@ func (s *Server) Run() error { } func (s *Server) listenAndServe() error { - log.Printf("Listening on :9997") + log.Printf("Listening on %s", s.config.ListenHTTP) http.HandleFunc("/", s.handle) - return http.ListenAndServe(":9997", nil) + return http.ListenAndServe(s.config.ListenHTTP, nil) } func (s *Server) runMonitor() { for { - time.Sleep(5 * time.Second) + time.Sleep(30 * time.Second) s.mu.Lock() - log.Printf("topics: %d", len(s.topics)) + var subscribers, messages int for _, t := range s.topics { - t.mu.Lock() - log.Printf("- %s: %d subscriber(s), %d message(s) sent, last active = %s", - t.id, len(t.subscribers), t.messages, t.last.String()) - t.mu.Unlock() + subs, msgs := t.Stats() + subscribers += subs + messages += msgs } - // TODO kill dead topics + log.Printf("Stats: %d topic(s), %d subscriber(s), %d message(s) sent", len(s.topics), subscribers, messages) s.mu.Unlock() } } @@ -74,7 +79,7 @@ func (s *Server) runMonitor() { func (s *Server) handle(w http.ResponseWriter, r *http.Request) { if err := s.handleInternal(w, r); err != nil { w.WriteHeader(http.StatusInternalServerError) - _, _ = io.WriteString(w, err.Error()) + _, _ = io.WriteString(w, err.Error()+"\n") } } @@ -85,6 +90,8 @@ func (s *Server) handleInternal(w http.ResponseWriter, r *http.Request) error { return s.handleSubscribeJSON(w, r) } else if r.Method == http.MethodGet && sseRegex.MatchString(r.URL.Path) { return s.handleSubscribeSSE(w, r) + } else if r.Method == http.MethodGet && rawRegex.MatchString(r.URL.Path) { + return s.handleSubscribeRaw(w, r) } else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && topicRegex.MatchString(r.URL.Path) { return s.handlePublishHTTP(w, r) } @@ -125,7 +132,7 @@ func (s *Server) handleSubscribeJSON(w http.ResponseWriter, r *http.Request) err } return nil }) - defer t.Unsubscribe(subscriberID) + defer s.unsubscribe(t, subscriberID) select { case <-t.ctx.Done(): case <-r.Context().Done(): @@ -149,7 +156,7 @@ func (s *Server) handleSubscribeSSE(w http.ResponseWriter, r *http.Request) erro } return nil }) - defer t.Unsubscribe(subscriberID) + defer s.unsubscribe(t, subscriberID) w.Header().Set("Content-Type", "text/event-stream") w.WriteHeader(http.StatusOK) if _, err := io.WriteString(w, "event: open\n\n"); err != nil { @@ -165,6 +172,26 @@ func (s *Server) handleSubscribeSSE(w http.ResponseWriter, r *http.Request) erro return nil } +func (s *Server) handleSubscribeRaw(w http.ResponseWriter, r *http.Request) error { + t := s.createTopic(strings.TrimSuffix(r.URL.Path[1:], "/raw")) // Hack + subscriberID := t.Subscribe(func(msg *message) error { + m := strings.ReplaceAll(msg.Message, "\n", " ") + "\n" + if _, err := io.WriteString(w, m); err != nil { + return err + } + if fl, ok := w.(http.Flusher); ok { + fl.Flush() + } + return nil + }) + defer s.unsubscribe(t, subscriberID) + select { + case <-t.ctx.Done(): + case <-r.Context().Done(): + } + return nil +} + func (s *Server) createTopic(id string) *topic { s.mu.Lock() defer s.mu.Unlock() @@ -179,7 +206,15 @@ func (s *Server) topic(topicID string) (*topic, error) { defer s.mu.Unlock() c, ok := s.topics[topicID] if !ok { - return nil, errors.New("topic does not exist") + return nil, errTopicNotFound } return c, nil } + +func (s *Server) unsubscribe(t *topic, subscriberID int) { + s.mu.Lock() + defer s.mu.Unlock() + if subscribers := t.Unsubscribe(subscriberID); subscribers == 0 { + delete(s.topics, t.id) + } +} diff --git a/server/topic.go b/server/topic.go index 283b6da4..7c8f38bf 100644 --- a/server/topic.go +++ b/server/topic.go @@ -41,10 +41,11 @@ func (t *topic) Subscribe(s subscriber) int { return subscriberID } -func (t *topic) Unsubscribe(id int) { +func (t *topic) Unsubscribe(id int) int { t.mu.Lock() defer t.mu.Unlock() delete(t.subscribers, id) + return len(t.subscribers) } func (t *topic) Publish(m *message) error { @@ -57,12 +58,18 @@ func (t *topic) Publish(m *message) error { t.messages++ for _, s := range t.subscribers { if err := s(m); err != nil { - log.Printf("error publishing message to subscriber x") + log.Printf("error publishing message to subscriber") } } return nil } +func (t *topic) Stats() (subscribers int, messages int) { + t.mu.Lock() + defer t.mu.Unlock() + return len(t.subscribers), t.messages +} + func (t *topic) Close() { t.cancel() }