Rate limiting, docs
This commit is contained in:
		
							parent
							
								
									e1c9fef6dc
								
							
						
					
					
						commit
						23cf77e0b7
					
				
					 7 changed files with 180 additions and 57 deletions
				
			
		
							
								
								
									
										15
									
								
								README.md
									
										
									
									
									
								
							
							
						
						
									
										15
									
								
								README.md
									
										
									
									
									
								
							|  | @ -6,11 +6,14 @@ via scripts. I run a free version of it on *[ntfy.sh](https://ntfy.sh)*. **No si | ||||||
| ## Usage | ## Usage | ||||||
| 
 | 
 | ||||||
| ### Subscribe to a topic | ### Subscribe to a topic | ||||||
| You can subscribe to a topic either in a web UI, or in your own app by subscribing to an  | Topics are created on the fly by subscribing to them. You can create and subscribe to a topic either in a web UI, or in  | ||||||
| [SSE](https://en.wikipedia.org/wiki/Server-sent_events)/[EventSource](https://developer.mozilla.org/en-US/docs/Web/API/EventSource), | your own app by subscribing to an [SSE](https://en.wikipedia.org/wiki/Server-sent_events)/[EventSource](https://developer.mozilla.org/en-US/docs/Web/API/EventSource), | ||||||
| or a JSON or raw feed.   | or a JSON or raw feed.   | ||||||
| 
 | 
 | ||||||
| Here's how to see the raw/json/sse stream in `curl`. This will subscribe to the topic and wait for events. | Because there is no sign-up, **the topic is essentially a password**, so pick something that's not easily guessable.   | ||||||
|  | 
 | ||||||
|  | Here's how you can create a topic `mytopic`, subscribe to it topic and wait for events. This is using `curl`, but you | ||||||
|  | can use any library that can do HTTP GETs: | ||||||
| 
 | 
 | ||||||
| ``` | ``` | ||||||
| # Subscribe to "mytopic" and output one message per line (\n are replaced with a space) | # Subscribe to "mytopic" and output one message per line (\n are replaced with a space) | ||||||
|  | @ -54,9 +57,11 @@ Best effort. | ||||||
| ### Why is the web UI so ugly? | ### Why is the web UI so ugly? | ||||||
| I don't particularly like JS or dealing with CSS. I'll make it pretty after it's functional. | I don't particularly like JS or dealing with CSS. I'll make it pretty after it's functional. | ||||||
| 
 | 
 | ||||||
|  | ## Will you know what topics exist, can you spy on me? | ||||||
|  | If you don't trust me or your messages are sensitive, run your ntfy on your own server. That said, the logs do not  | ||||||
|  | contain any topic names | ||||||
|  | 
 | ||||||
| ## TODO | ## TODO | ||||||
| - rate limiting / abuse protection |  | ||||||
| - release/packaging |  | ||||||
| - add HTTPS | - add HTTPS | ||||||
| 
 | 
 | ||||||
| ## Contributing | ## Contributing | ||||||
|  |  | ||||||
|  | @ -1,18 +1,38 @@ | ||||||
| // Package config provides the main configuration | // Package config provides the main configuration | ||||||
| package config | package config | ||||||
| 
 | 
 | ||||||
|  | import ( | ||||||
|  | 	"golang.org/x/time/rate" | ||||||
|  | 	"time" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | // Defines default config settings | ||||||
| const ( | const ( | ||||||
| 	DefaultListenHTTP      = ":80" | 	DefaultListenHTTP      = ":80" | ||||||
|  | 	defaultManagerInterval = time.Minute | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | // Defines the max number of requests, here: | ||||||
|  | // 50 requests bucket, replenished at a rate of 1 per second | ||||||
|  | var ( | ||||||
|  | 	defaultLimit      = rate.Every(time.Second) | ||||||
|  | 	defaultLimitBurst = 50 | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| // Config is the main config struct for the application. Use New to instantiate a default config struct. | // Config is the main config struct for the application. Use New to instantiate a default config struct. | ||||||
| type Config struct { | type Config struct { | ||||||
| 	ListenHTTP      string | 	ListenHTTP      string | ||||||
|  | 	Limit           rate.Limit | ||||||
|  | 	LimitBurst      int | ||||||
|  | 	ManagerInterval time.Duration | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // New instantiates a default new config | // New instantiates a default new config | ||||||
| func New(listenHTTP string) *Config { | func New(listenHTTP string) *Config { | ||||||
| 	return &Config{ | 	return &Config{ | ||||||
| 		ListenHTTP:      listenHTTP, | 		ListenHTTP:      listenHTTP, | ||||||
|  | 		Limit:           defaultLimit, | ||||||
|  | 		LimitBurst:      defaultLimitBurst, | ||||||
|  | 		ManagerInterval: defaultManagerInterval, | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
							
								
								
									
										1
									
								
								go.mod
									
										
									
									
									
								
							
							
						
						
									
										1
									
								
								go.mod
									
										
									
									
									
								
							|  | @ -6,5 +6,6 @@ require ( | ||||||
| 	github.com/BurntSushi/toml v0.4.1 // indirect | 	github.com/BurntSushi/toml v0.4.1 // indirect | ||||||
| 	github.com/cpuguy83/go-md2man/v2 v2.0.1 // indirect | 	github.com/cpuguy83/go-md2man/v2 v2.0.1 // indirect | ||||||
| 	github.com/urfave/cli/v2 v2.3.0 | 	github.com/urfave/cli/v2 v2.3.0 | ||||||
|  | 	golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac | ||||||
| 	gopkg.in/yaml.v2 v2.4.0 // indirect | 	gopkg.in/yaml.v2 v2.4.0 // indirect | ||||||
| ) | ) | ||||||
|  |  | ||||||
							
								
								
									
										9
									
								
								go.sum
									
										
									
									
									
								
							
							
						
						
									
										9
									
								
								go.sum
									
										
									
									
									
								
							|  | @ -1,23 +1,20 @@ | ||||||
| github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= |  | ||||||
| github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= | github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= | ||||||
| github.com/BurntSushi/toml v0.4.1 h1:GaI7EiDXDRfa8VshkTj7Fym7ha+y8/XxIgD2okUIjLw= | github.com/BurntSushi/toml v0.4.1 h1:GaI7EiDXDRfa8VshkTj7Fym7ha+y8/XxIgD2okUIjLw= | ||||||
| github.com/BurntSushi/toml v0.4.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= | github.com/BurntSushi/toml v0.4.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= | ||||||
| github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d h1:U+s90UTSYgptZMwQh2aRr3LuazLJIa+Pg3Kc1ylSYVY= |  | ||||||
| github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= | github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= | ||||||
| github.com/cpuguy83/go-md2man/v2 v2.0.1 h1:r/myEWzV9lfsM1tFLgDyu0atFtJ1fXn261LKYj/3DxU= | github.com/cpuguy83/go-md2man/v2 v2.0.1 h1:r/myEWzV9lfsM1tFLgDyu0atFtJ1fXn261LKYj/3DxU= | ||||||
| github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= | github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= | ||||||
| github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= |  | ||||||
| github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= | github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= | ||||||
| github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q= |  | ||||||
| github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= | github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= | ||||||
| github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= | github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= | ||||||
| github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= | github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= | ||||||
| github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo= |  | ||||||
| github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= | github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= | ||||||
| github.com/urfave/cli/v2 v2.3.0 h1:qph92Y649prgesehzOrQjdWyxFOp/QVM+6imKHad91M= | github.com/urfave/cli/v2 v2.3.0 h1:qph92Y649prgesehzOrQjdWyxFOp/QVM+6imKHad91M= | ||||||
| github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI= | github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI= | ||||||
|  | golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac h1:7zkz7BUtwNFFqcowJ+RIgu2MaV/MapERkDIy+mwPyjs= | ||||||
|  | golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= | ||||||
|  | gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= | ||||||
| gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= | gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= | ||||||
| gopkg.in/yaml.v2 v2.2.3 h1:fvjTMHxHEw/mxHbtzPi3JCcKXQRAnQTBRo6YCJSVHKI= |  | ||||||
| gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= | gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= | ||||||
| gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= | gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= | ||||||
| gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= | gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= | ||||||
|  |  | ||||||
|  | @ -3,37 +3,64 @@ | ||||||
| <head> | <head> | ||||||
|     <title>ntfy.sh</title> |     <title>ntfy.sh</title> | ||||||
|     <style> |     <style> | ||||||
|         body { font-size: 1.3em; line-height: 140%; } |         body { font-size: 1.2em; line-height: 130%; } | ||||||
|         #error { color: darkred; font-style: italic; } |         #error { color: darkred; font-style: italic; } | ||||||
|         #main { max-width: 800px; margin: 0 auto; } |         #main { max-width: 900px; margin: 0 auto 50px auto; } | ||||||
|     </style> |     </style> | ||||||
| </head> | </head> | ||||||
| <body> | <body> | ||||||
| <div id="main"> | <div id="main"> | ||||||
|     <h1>ntfy.sh</h1> |     <h1>ntfy.sh - simple HTTP-based pub-sub</h1> | ||||||
| 
 |  | ||||||
|     <p> |     <p> | ||||||
|         <b>ntfy</b> (pronounce: <i>notify</i>) is a simple HTTP-based pub-sub notification service. It allows you to send desktop and (soon) phone notifications |         <b>ntfy</b> (pronounce: <i>notify</i>) is a simple <b>HTTP-based pub-sub notification service and tool</b>. | ||||||
|         via scripts, without signup or cost. It's entirely free and open source. You can find the source code <a href="https://github.com/binwiederhier/ntfy">on GitHub</a>. |         It allows you to send <b>desktop notifications via scripts</b>, entirely <b>without signup or cost</b>. | ||||||
|  |         It's entirely free and open source. You can find the source code <a href="https://github.com/binwiederhier/ntfy">on GitHub</a>. | ||||||
|     </p> |     </p> | ||||||
| 
 |  | ||||||
|     <p> |  | ||||||
|         You can subscribe to a topic either in this web UI, or in your own app by subscribing to an SSE/EventSource |  | ||||||
|         or JSON feed. Once subscribed, you can publish messages via PUT or POST. |  | ||||||
|     </p> |  | ||||||
| 
 |  | ||||||
|     <p id="error"></p> |     <p id="error"></p> | ||||||
| 
 | 
 | ||||||
|  |     <h2>Subscribe to a topic</h2> | ||||||
|  |     <p> | ||||||
|  |         Topics are created on the fly by subscribing to them. You can create and subscribe to a topic either in this web UI, or in | ||||||
|  |         your own app by subscribing to an <a href="https://developer.mozilla.org/en-US/docs/Web/API/EventSource">EventSource</a>, | ||||||
|  |         a JSON feed, or raw feed. | ||||||
|  |     </p> | ||||||
|  |     <p> | ||||||
|  |         Because there is no sign-up, <b>the topic is essentially a password</b>, so pick something that's not easily guessable. | ||||||
|  |     </p> | ||||||
|  | 
 | ||||||
|  |     <h3>Subscribe via web</h3> | ||||||
|  |     <p> | ||||||
|  |         If you subscribe to a topic via this web UI in the field below, messages published to any subscribed topic | ||||||
|  |         will show up as <b>desktop notification</b>.  | ||||||
|  |     </p> | ||||||
|     <form id="subscribeForm"> |     <form id="subscribeForm"> | ||||||
|         <p> |         <p> | ||||||
|             <input type="text" id="topicField" size="64" placeholder="Topic ID (letters, numbers, _ and -)"  pattern="[-_A-Za-z]{1,64}" autofocus /> |             <label for="topicField">Topic ID:</label> | ||||||
|  |             <input type="text" id="topicField" size="64" placeholder="Letters, numbers, _ and -"  pattern="[-_A-Za-z]{1,64}" autofocus /> | ||||||
|             <input type="submit" id="subscribeButton" value="Subscribe topic" /> |             <input type="submit" id="subscribeButton" value="Subscribe topic" /> | ||||||
|         </p> |         </p> | ||||||
|     </form> |     </form> | ||||||
| 
 |     <p id="topicsHeader">Subscribed topics:</p> | ||||||
|     <p id="topicsHeader"><b>Subscribed topics:</b></p> |  | ||||||
|     <ul id="topicsList"></ul> |     <ul id="topicsList"></ul> | ||||||
| 
 | 
 | ||||||
|  |     <h3>Subscribe via your app, or via the CLI</h3> | ||||||
|  |     <tt> | ||||||
|  |         curl -s ntfy.sh/mytopic/raw # one message per line (\n are replaced with a space)<br/> | ||||||
|  |         curl -s ntfy.sh/mytopic/json # one JSON message per line<br/> | ||||||
|  |         curl -s ntfy.sh/mytopic/sse # server-sent events (SSE) stream | ||||||
|  |     </tt> | ||||||
|  | 
 | ||||||
|  |     <h3>Publishing messages</h3> | ||||||
|  |     <p> | ||||||
|  |         Publishing messages can be done via PUT or POST using. Here's an example using <tt>curl</tt>: | ||||||
|  |     </p> | ||||||
|  |     <tt> | ||||||
|  |         curl -d "long process is done" ntfy.sh/mytopic | ||||||
|  |     </tt> | ||||||
|  |     <p> | ||||||
|  |         Messages published to a non-existing topic or a topic without subscribers will not be delivered later. | ||||||
|  |         There is (currently) no buffering of any kind. If you're not listening, the message won't be delivered. | ||||||
|  |     </p> | ||||||
| </div> | </div> | ||||||
| 
 | 
 | ||||||
| <script type="text/javascript"> | <script type="text/javascript"> | ||||||
|  |  | ||||||
|  | @ -4,11 +4,12 @@ import ( | ||||||
| 	"bytes" | 	"bytes" | ||||||
| 	_ "embed" // required for go:embed | 	_ "embed" // required for go:embed | ||||||
| 	"encoding/json" | 	"encoding/json" | ||||||
| 	"errors" |  | ||||||
| 	"fmt" | 	"fmt" | ||||||
|  | 	"golang.org/x/time/rate" | ||||||
| 	"heckel.io/ntfy/config" | 	"heckel.io/ntfy/config" | ||||||
| 	"io" | 	"io" | ||||||
| 	"log" | 	"log" | ||||||
|  | 	"net" | ||||||
| 	"net/http" | 	"net/http" | ||||||
| 	"regexp" | 	"regexp" | ||||||
| 	"strings" | 	"strings" | ||||||
|  | @ -16,19 +17,33 @@ import ( | ||||||
| 	"time" | 	"time" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
|  | // Server is the main server | ||||||
| type Server struct { | type Server struct { | ||||||
| 	config   *config.Config | 	config   *config.Config | ||||||
| 	topics   map[string]*topic | 	topics   map[string]*topic | ||||||
|  | 	visitors map[string]*visitor | ||||||
| 	mu       sync.Mutex | 	mu       sync.Mutex | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| type message struct { | // visitor represents an API user, and its associated rate.Limiter used for rate limiting | ||||||
| 	Time    int64  `json:"time"` | type visitor struct { | ||||||
| 	Message string `json:"message"` | 	limiter *rate.Limiter | ||||||
|  | 	seen    time.Time | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // errHTTP is a generic HTTP error for any non-200 HTTP error | ||||||
|  | type errHTTP struct { | ||||||
|  | 	Code   int | ||||||
|  | 	Status string | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (e errHTTP) Error() string { | ||||||
|  | 	return fmt.Sprintf("http: %s", e.Status) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| const ( | const ( | ||||||
| 	messageLimit        = 1024 | 	messageLimit        = 1024 | ||||||
|  | 	visitorExpungeAfter = 30 * time.Minute | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| var ( | var ( | ||||||
|  | @ -40,18 +55,26 @@ var ( | ||||||
| 	//go:embed "index.html" | 	//go:embed "index.html" | ||||||
| 	indexSource string | 	indexSource string | ||||||
| 
 | 
 | ||||||
| 	errTopicNotFound = errors.New("topic not found") | 	errHTTPNotFound        = &errHTTP{http.StatusNotFound, http.StatusText(http.StatusNotFound)} | ||||||
|  | 	errHTTPTooManyRequests = &errHTTP{http.StatusTooManyRequests, http.StatusText(http.StatusTooManyRequests)} | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| func New(conf *config.Config) *Server { | func New(conf *config.Config) *Server { | ||||||
| 	return &Server{ | 	return &Server{ | ||||||
| 		config:   conf, | 		config:   conf, | ||||||
| 		topics:   make(map[string]*topic), | 		topics:   make(map[string]*topic), | ||||||
|  | 		visitors: make(map[string]*visitor), | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (s *Server) Run() error { | func (s *Server) Run() error { | ||||||
| 	go s.runMonitor() | 	go func() { | ||||||
|  | 		ticker := time.NewTicker(s.config.ManagerInterval) | ||||||
|  | 		for { | ||||||
|  | 			<-ticker.C | ||||||
|  | 			s.updateStatsAndExpire() | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
| 	return s.listenAndServe() | 	return s.listenAndServe() | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | @ -61,29 +84,43 @@ func (s *Server) listenAndServe() error { | ||||||
| 	return http.ListenAndServe(s.config.ListenHTTP, nil) | 	return http.ListenAndServe(s.config.ListenHTTP, nil) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (s *Server) runMonitor() { | func (s *Server) updateStatsAndExpire() { | ||||||
| 	for { |  | ||||||
| 		time.Sleep(30 * time.Second) |  | ||||||
| 	s.mu.Lock() | 	s.mu.Lock() | ||||||
|  | 	defer s.mu.Unlock() | ||||||
|  | 
 | ||||||
|  | 	// Expire visitors from rate visitors map | ||||||
|  | 	for ip, v := range s.visitors { | ||||||
|  | 		if time.Since(v.seen) > visitorExpungeAfter { | ||||||
|  | 			delete(s.visitors, ip) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// Print stats | ||||||
| 	var subscribers, messages int | 	var subscribers, messages int | ||||||
| 	for _, t := range s.topics { | 	for _, t := range s.topics { | ||||||
| 		subs, msgs := t.Stats() | 		subs, msgs := t.Stats() | ||||||
| 		subscribers += subs | 		subscribers += subs | ||||||
| 		messages += msgs | 		messages += msgs | ||||||
| 	} | 	} | ||||||
| 		log.Printf("Stats: %d topic(s), %d subscriber(s), %d message(s) sent", len(s.topics), subscribers, messages) | 	log.Printf("Stats: %d topic(s), %d subscriber(s), %d message(s) sent, %d visitor(s)", | ||||||
| 		s.mu.Unlock() | 		len(s.topics), subscribers, messages, len(s.visitors)) | ||||||
| 	} |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (s *Server) handle(w http.ResponseWriter, r *http.Request) { | func (s *Server) handle(w http.ResponseWriter, r *http.Request) { | ||||||
| 	if err := s.handleInternal(w, r); err != nil { | 	if err := s.handleInternal(w, r); err != nil { | ||||||
| 		w.WriteHeader(http.StatusInternalServerError) | 		if e, ok := err.(*errHTTP); ok { | ||||||
| 		_, _ = io.WriteString(w, err.Error()+"\n") | 			s.fail(w, r, e.Code, e) | ||||||
|  | 		} else { | ||||||
|  | 			s.fail(w, r, http.StatusInternalServerError, err) | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (s *Server) handleInternal(w http.ResponseWriter, r *http.Request) error { | func (s *Server) handleInternal(w http.ResponseWriter, r *http.Request) error { | ||||||
|  | 	v := s.visitor(r.RemoteAddr) | ||||||
|  | 	if !v.limiter.Allow() { | ||||||
|  | 		return errHTTPTooManyRequests | ||||||
|  | 	} | ||||||
| 	if r.Method == http.MethodGet && r.URL.Path == "/" { | 	if r.Method == http.MethodGet && r.URL.Path == "/" { | ||||||
| 		return s.handleHome(w, r) | 		return s.handleHome(w, r) | ||||||
| 	} else if r.Method == http.MethodGet && jsonRegex.MatchString(r.URL.Path) { | 	} else if r.Method == http.MethodGet && jsonRegex.MatchString(r.URL.Path) { | ||||||
|  | @ -95,8 +132,7 @@ func (s *Server) handleInternal(w http.ResponseWriter, r *http.Request) error { | ||||||
| 	} else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && topicRegex.MatchString(r.URL.Path) { | 	} else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && topicRegex.MatchString(r.URL.Path) { | ||||||
| 		return s.handlePublishHTTP(w, r) | 		return s.handlePublishHTTP(w, r) | ||||||
| 	} | 	} | ||||||
| 	http.NotFound(w, r) | 	return errHTTPNotFound | ||||||
| 	return nil |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (s *Server) handleHome(w http.ResponseWriter, r *http.Request) error { | func (s *Server) handleHome(w http.ResponseWriter, r *http.Request) error { | ||||||
|  | @ -206,7 +242,7 @@ func (s *Server) topic(topicID string) (*topic, error) { | ||||||
| 	defer s.mu.Unlock() | 	defer s.mu.Unlock() | ||||||
| 	c, ok := s.topics[topicID] | 	c, ok := s.topics[topicID] | ||||||
| 	if !ok { | 	if !ok { | ||||||
| 		return nil, errTopicNotFound | 		return nil, errHTTPNotFound | ||||||
| 	} | 	} | ||||||
| 	return c, nil | 	return c, nil | ||||||
| } | } | ||||||
|  | @ -218,3 +254,31 @@ func (s *Server) unsubscribe(t *topic, subscriberID int) { | ||||||
| 		delete(s.topics, t.id) | 		delete(s.topics, t.id) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | // visitor creates or retrieves a rate.Limiter for the given visitor. | ||||||
|  | // This function was taken from https://www.alexedwards.net/blog/how-to-rate-limit-http-requests (MIT). | ||||||
|  | func (s *Server) visitor(remoteAddr string) *visitor { | ||||||
|  | 	s.mu.Lock() | ||||||
|  | 	defer s.mu.Unlock() | ||||||
|  | 	ip, _, err := net.SplitHostPort(remoteAddr) | ||||||
|  | 	if err != nil { | ||||||
|  | 		ip = remoteAddr // This should not happen in real life; only in tests. | ||||||
|  | 	} | ||||||
|  | 	v, exists := s.visitors[ip] | ||||||
|  | 	if !exists { | ||||||
|  | 		v = &visitor{ | ||||||
|  | 			rate.NewLimiter(s.config.Limit, s.config.LimitBurst), | ||||||
|  | 			time.Now(), | ||||||
|  | 		} | ||||||
|  | 		s.visitors[ip] = v | ||||||
|  | 		return v | ||||||
|  | 	} | ||||||
|  | 	v.seen = time.Now() | ||||||
|  | 	return v | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (s *Server) fail(w http.ResponseWriter, r *http.Request, code int, err error) { | ||||||
|  | 	log.Printf("[%s] %s - %d - %s", r.RemoteAddr, r.Method, code, err.Error()) | ||||||
|  | 	w.WriteHeader(code) | ||||||
|  | 	io.WriteString(w, fmt.Sprintf("%s\n", http.StatusText(code))) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | @ -9,6 +9,8 @@ import ( | ||||||
| 	"time" | 	"time" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
|  | // topic represents a channel to which subscribers can subscribe, and publishers | ||||||
|  | // can publish a message | ||||||
| type topic struct { | type topic struct { | ||||||
| 	id          string | 	id          string | ||||||
| 	subscribers map[int]subscriber | 	subscribers map[int]subscriber | ||||||
|  | @ -19,6 +21,13 @@ type topic struct { | ||||||
| 	mu          sync.Mutex | 	mu          sync.Mutex | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | // message represents a message published to a topic | ||||||
|  | type message struct { | ||||||
|  | 	Time    int64  `json:"time"` | ||||||
|  | 	Message string `json:"message"` | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // subscriber is a function that is called for every new message on a topic | ||||||
| type subscriber func(msg *message) error | type subscriber func(msg *message) error | ||||||
| 
 | 
 | ||||||
| func newTopic(id string) *topic { | func newTopic(id string) *topic { | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue