parent
43c9a92748
commit
d453db89a7
|
@ -7,7 +7,7 @@ import (
|
||||||
|
|
||||||
type cache interface {
|
type cache interface {
|
||||||
AddMessage(m *message) error
|
AddMessage(m *message) error
|
||||||
Messages(topic string, since time.Time) ([]*message, error)
|
Messages(topic string, since sinceTime) ([]*message, error)
|
||||||
MessageCount(topic string) (int, error)
|
MessageCount(topic string) (int, error)
|
||||||
Topics() (map[string]*topic, error)
|
Topics() (map[string]*topic, error)
|
||||||
Prune(keep time.Duration) error
|
Prune(keep time.Duration) error
|
||||||
|
|
|
@ -29,7 +29,7 @@ func (s *memCache) AddMessage(m *message) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *memCache) Messages(topic string, since time.Time) ([]*message, error) {
|
func (s *memCache) Messages(topic string, since sinceTime) ([]*message, error) {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
if _, ok := s.messages[topic]; !ok {
|
if _, ok := s.messages[topic]; !ok {
|
||||||
|
@ -38,7 +38,7 @@ func (s *memCache) Messages(topic string, since time.Time) ([]*message, error) {
|
||||||
messages := make([]*message, 0) // copy!
|
messages := make([]*message, 0) // copy!
|
||||||
for _, m := range s.messages[topic] {
|
for _, m := range s.messages[topic] {
|
||||||
msgTime := time.Unix(m.Time, 0)
|
msgTime := time.Unix(m.Time, 0)
|
||||||
if msgTime == since || msgTime.After(since) {
|
if msgTime == since.Time() || msgTime.After(since.Time()) {
|
||||||
messages = append(messages, m)
|
messages = append(messages, m)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,8 +55,8 @@ func (c *sqliteCache) AddMessage(m *message) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *sqliteCache) Messages(topic string, since time.Time) ([]*message, error) {
|
func (c *sqliteCache) Messages(topic string, since sinceTime) ([]*message, error) {
|
||||||
rows, err := c.db.Query(selectMessagesSinceTimeQuery, topic, since.Unix())
|
rows, err := c.db.Query(selectMessagesSinceTimeQuery, topic, since.Time().Unix())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,6 +53,25 @@ type indexPage struct {
|
||||||
CacheDuration string
|
CacheDuration string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type sinceTime time.Time
|
||||||
|
|
||||||
|
func (t sinceTime) IsAll() bool {
|
||||||
|
return t == sinceAllMessages
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t sinceTime) IsNone() bool {
|
||||||
|
return t == sinceNoMessages
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t sinceTime) Time() time.Time {
|
||||||
|
return time.Time(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
sinceAllMessages = sinceTime(time.Unix(0, 0))
|
||||||
|
sinceNoMessages = sinceTime(time.Unix(1, 0))
|
||||||
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
messageLimit = 512
|
messageLimit = 512
|
||||||
)
|
)
|
||||||
|
@ -318,8 +337,8 @@ func (s *Server) handleSubscribe(w http.ResponseWriter, r *http.Request, v *visi
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) sendOldMessages(t *topic, since time.Time, sub subscriber) error {
|
func (s *Server) sendOldMessages(t *topic, since sinceTime, sub subscriber) error {
|
||||||
if since.IsZero() {
|
if since.IsNone() {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
messages, err := s.cache.Messages(t.id, since)
|
messages, err := s.cache.Messages(t.id, since)
|
||||||
|
@ -334,17 +353,27 @@ func (s *Server) sendOldMessages(t *topic, since time.Time, sub subscriber) erro
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseSince(r *http.Request) (time.Time, error) {
|
// parseSince returns a timestamp identifying the time span from which cached messages should be received.
|
||||||
|
//
|
||||||
|
// Values in the "since=..." parameter can be either a unix timestamp or a duration (e.g. 12h), or
|
||||||
|
// "all" for all messages.
|
||||||
|
func parseSince(r *http.Request) (sinceTime, error) {
|
||||||
if !r.URL.Query().Has("since") {
|
if !r.URL.Query().Has("since") {
|
||||||
return time.Time{}, nil
|
if r.URL.Query().Has("poll") {
|
||||||
|
return sinceAllMessages, nil
|
||||||
}
|
}
|
||||||
if since, err := strconv.ParseInt(r.URL.Query().Get("since"), 10, 64); err == nil {
|
return sinceNoMessages, nil
|
||||||
return time.Unix(since, 0), nil
|
}
|
||||||
|
if r.URL.Query().Get("since") == "all" {
|
||||||
|
return sinceAllMessages, nil
|
||||||
|
}
|
||||||
|
if s, err := strconv.ParseInt(r.URL.Query().Get("since"), 10, 64); err == nil {
|
||||||
|
return sinceTime(time.Unix(s, 0)), nil
|
||||||
}
|
}
|
||||||
if d, err := time.ParseDuration(r.URL.Query().Get("since")); err == nil {
|
if d, err := time.ParseDuration(r.URL.Query().Get("since")); err == nil {
|
||||||
return time.Now().Add(-1 * d), nil
|
return sinceTime(time.Now().Add(-1 * d)), nil
|
||||||
}
|
}
|
||||||
return time.Time{}, errHTTPBadRequest
|
return sinceNoMessages, errHTTPBadRequest
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) handleOptions(w http.ResponseWriter, r *http.Request) error {
|
func (s *Server) handleOptions(w http.ResponseWriter, r *http.Request) error {
|
||||||
|
|
|
@ -118,7 +118,7 @@ const test = (topic) => {
|
||||||
};
|
};
|
||||||
|
|
||||||
const fetchCachedMessages = async (topic) => {
|
const fetchCachedMessages = async (topic) => {
|
||||||
const topicJsonUrl = `/${topic}/json?poll=1&since=12h`; // Poll!
|
const topicJsonUrl = `/${topic}/json?poll=1`; // Poll!
|
||||||
for await (let line of makeTextFileLineIterator(topicJsonUrl)) {
|
for await (let line of makeTextFileLineIterator(topicJsonUrl)) {
|
||||||
const message = JSON.parse(line);
|
const message = JSON.parse(line);
|
||||||
topics[topic]['messages'].push(message);
|
topics[topic]['messages'].push(message);
|
||||||
|
|
Loading…
Reference in New Issue