Combine tests and all that
parent
6dc4e441e4
commit
c21737d546
564
server/cache.go
564
server/cache.go
|
@ -1,8 +1,13 @@
|
||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"database/sql"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
_ "github.com/mattn/go-sqlite3" // SQLite driver
|
_ "github.com/mattn/go-sqlite3" // SQLite driver
|
||||||
|
"heckel.io/ntfy/util"
|
||||||
|
"log"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -10,16 +15,551 @@ var (
|
||||||
errUnexpectedMessageType = errors.New("unexpected message type")
|
errUnexpectedMessageType = errors.New("unexpected message type")
|
||||||
)
|
)
|
||||||
|
|
||||||
// cache implements a cache for messages of type "message" events,
|
// Messages cache
|
||||||
// i.e. message structs with the Event messageEvent.
|
const (
|
||||||
type cache interface {
|
createMessagesTableQuery = `
|
||||||
AddMessage(m *message) error
|
BEGIN;
|
||||||
Messages(topic string, since sinceMarker, scheduled bool) ([]*message, error)
|
CREATE TABLE IF NOT EXISTS messages (
|
||||||
MessagesDue() ([]*message, error)
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
MessageCount(topic string) (int, error)
|
mid TEXT NOT NULL,
|
||||||
Topics() (map[string]*topic, error)
|
time INT NOT NULL,
|
||||||
Prune(olderThan time.Time) error
|
topic TEXT NOT NULL,
|
||||||
MarkPublished(m *message) error
|
message TEXT NOT NULL,
|
||||||
AttachmentsSize(owner string) (int64, error)
|
title TEXT NOT NULL,
|
||||||
AttachmentsExpired() ([]string, error)
|
priority INT NOT NULL,
|
||||||
|
tags TEXT NOT NULL,
|
||||||
|
click TEXT NOT NULL,
|
||||||
|
attachment_name TEXT NOT NULL,
|
||||||
|
attachment_type TEXT NOT NULL,
|
||||||
|
attachment_size INT NOT NULL,
|
||||||
|
attachment_expires INT NOT NULL,
|
||||||
|
attachment_url TEXT NOT NULL,
|
||||||
|
attachment_owner TEXT NOT NULL,
|
||||||
|
encoding TEXT NOT NULL,
|
||||||
|
published INT NOT NULL
|
||||||
|
);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_mid ON messages (mid);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
|
||||||
|
COMMIT;
|
||||||
|
`
|
||||||
|
insertMessageQuery = `
|
||||||
|
INSERT INTO messages (mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding, published)
|
||||||
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||||
|
`
|
||||||
|
pruneMessagesQuery = `DELETE FROM messages WHERE time < ? AND published = 1`
|
||||||
|
selectRowIDFromMessageID = `SELECT id FROM messages WHERE topic = ? AND mid = ?`
|
||||||
|
selectMessagesSinceTimeQuery = `
|
||||||
|
SELECT mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
|
||||||
|
FROM messages
|
||||||
|
WHERE topic = ? AND time >= ? AND published = 1
|
||||||
|
ORDER BY time, id
|
||||||
|
`
|
||||||
|
selectMessagesSinceTimeIncludeScheduledQuery = `
|
||||||
|
SELECT mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
|
||||||
|
FROM messages
|
||||||
|
WHERE topic = ? AND time >= ?
|
||||||
|
ORDER BY time, id
|
||||||
|
`
|
||||||
|
selectMessagesSinceIDQuery = `
|
||||||
|
SELECT mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
|
||||||
|
FROM messages
|
||||||
|
WHERE topic = ? AND id > ? AND published = 1
|
||||||
|
ORDER BY time, id
|
||||||
|
`
|
||||||
|
selectMessagesSinceIDIncludeScheduledQuery = `
|
||||||
|
SELECT mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
|
||||||
|
FROM messages
|
||||||
|
WHERE topic = ? AND (id > ? OR published = 0)
|
||||||
|
ORDER BY time, id
|
||||||
|
`
|
||||||
|
selectMessagesDueQuery = `
|
||||||
|
SELECT mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
|
||||||
|
FROM messages
|
||||||
|
WHERE time <= ? AND published = 0
|
||||||
|
ORDER BY time, id
|
||||||
|
`
|
||||||
|
updateMessagePublishedQuery = `UPDATE messages SET published = 1 WHERE mid = ?`
|
||||||
|
selectMessagesCountQuery = `SELECT COUNT(*) FROM messages`
|
||||||
|
selectMessageCountForTopicQuery = `SELECT COUNT(*) FROM messages WHERE topic = ?`
|
||||||
|
selectTopicsQuery = `SELECT topic FROM messages GROUP BY topic`
|
||||||
|
selectAttachmentsSizeQuery = `SELECT IFNULL(SUM(attachment_size), 0) FROM messages WHERE attachment_owner = ? AND attachment_expires >= ?`
|
||||||
|
selectAttachmentsExpiredQuery = `SELECT mid FROM messages WHERE attachment_expires > 0 AND attachment_expires < ?`
|
||||||
|
)
|
||||||
|
|
||||||
|
// Schema management queries
|
||||||
|
const (
|
||||||
|
currentSchemaVersion = 5
|
||||||
|
createSchemaVersionTableQuery = `
|
||||||
|
CREATE TABLE IF NOT EXISTS schemaVersion (
|
||||||
|
id INT PRIMARY KEY,
|
||||||
|
version INT NOT NULL
|
||||||
|
);
|
||||||
|
`
|
||||||
|
insertSchemaVersion = `INSERT INTO schemaVersion VALUES (1, ?)`
|
||||||
|
updateSchemaVersion = `UPDATE schemaVersion SET version = ? WHERE id = 1`
|
||||||
|
selectSchemaVersionQuery = `SELECT version FROM schemaVersion WHERE id = 1`
|
||||||
|
|
||||||
|
// 0 -> 1
|
||||||
|
migrate0To1AlterMessagesTableQuery = `
|
||||||
|
BEGIN;
|
||||||
|
ALTER TABLE messages ADD COLUMN title TEXT NOT NULL DEFAULT('');
|
||||||
|
ALTER TABLE messages ADD COLUMN priority INT NOT NULL DEFAULT(0);
|
||||||
|
ALTER TABLE messages ADD COLUMN tags TEXT NOT NULL DEFAULT('');
|
||||||
|
COMMIT;
|
||||||
|
`
|
||||||
|
|
||||||
|
// 1 -> 2
|
||||||
|
migrate1To2AlterMessagesTableQuery = `
|
||||||
|
ALTER TABLE messages ADD COLUMN published INT NOT NULL DEFAULT(1);
|
||||||
|
`
|
||||||
|
|
||||||
|
// 2 -> 3
|
||||||
|
migrate2To3AlterMessagesTableQuery = `
|
||||||
|
BEGIN;
|
||||||
|
ALTER TABLE messages ADD COLUMN click TEXT NOT NULL DEFAULT('');
|
||||||
|
ALTER TABLE messages ADD COLUMN attachment_name TEXT NOT NULL DEFAULT('');
|
||||||
|
ALTER TABLE messages ADD COLUMN attachment_type TEXT NOT NULL DEFAULT('');
|
||||||
|
ALTER TABLE messages ADD COLUMN attachment_size INT NOT NULL DEFAULT('0');
|
||||||
|
ALTER TABLE messages ADD COLUMN attachment_expires INT NOT NULL DEFAULT('0');
|
||||||
|
ALTER TABLE messages ADD COLUMN attachment_owner TEXT NOT NULL DEFAULT('');
|
||||||
|
ALTER TABLE messages ADD COLUMN attachment_url TEXT NOT NULL DEFAULT('');
|
||||||
|
COMMIT;
|
||||||
|
`
|
||||||
|
// 3 -> 4
|
||||||
|
migrate3To4AlterMessagesTableQuery = `
|
||||||
|
ALTER TABLE messages ADD COLUMN encoding TEXT NOT NULL DEFAULT('');
|
||||||
|
`
|
||||||
|
|
||||||
|
// 4 -> 5
|
||||||
|
migrate4To5AlterMessagesTableQuery = `
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE IF NOT EXISTS messages_new (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
mid TEXT NOT NULL,
|
||||||
|
time INT NOT NULL,
|
||||||
|
topic TEXT NOT NULL,
|
||||||
|
message TEXT NOT NULL,
|
||||||
|
title TEXT NOT NULL,
|
||||||
|
priority INT NOT NULL,
|
||||||
|
tags TEXT NOT NULL,
|
||||||
|
click TEXT NOT NULL,
|
||||||
|
attachment_name TEXT NOT NULL,
|
||||||
|
attachment_type TEXT NOT NULL,
|
||||||
|
attachment_size INT NOT NULL,
|
||||||
|
attachment_expires INT NOT NULL,
|
||||||
|
attachment_url TEXT NOT NULL,
|
||||||
|
attachment_owner TEXT NOT NULL,
|
||||||
|
encoding TEXT NOT NULL,
|
||||||
|
published INT NOT NULL
|
||||||
|
);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_mid ON messages_new (mid);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_topic ON messages_new (topic);
|
||||||
|
INSERT
|
||||||
|
INTO messages_new (
|
||||||
|
mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type,
|
||||||
|
attachment_size, attachment_expires, attachment_url, attachment_owner, encoding, published)
|
||||||
|
SELECT
|
||||||
|
id, time, topic, message, title, priority, tags, click, attachment_name, attachment_type,
|
||||||
|
attachment_size, attachment_expires, attachment_url, attachment_owner, encoding, published
|
||||||
|
FROM messages;
|
||||||
|
DROP TABLE messages;
|
||||||
|
ALTER TABLE messages_new RENAME TO messages;
|
||||||
|
COMMIT;
|
||||||
|
`
|
||||||
|
)
|
||||||
|
|
||||||
|
type sqliteCache struct {
|
||||||
|
db *sql.DB
|
||||||
|
nop bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func newSqliteCache(filename string, nop bool) (*sqliteCache, error) {
|
||||||
|
db, err := sql.Open("sqlite3", filename)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := setupCacheDB(db); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &sqliteCache{
|
||||||
|
db: db,
|
||||||
|
nop: nop,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// newMemCache creates an in-memory cache
|
||||||
|
func newMemCache() (*sqliteCache, error) {
|
||||||
|
return newSqliteCache(createMemoryFilename(), false)
|
||||||
|
}
|
||||||
|
|
||||||
|
// newNopCache creates an in-memory cache that discards all messages;
|
||||||
|
// it is always empty and can be used if caching is entirely disabled
|
||||||
|
func newNopCache() (*sqliteCache, error) {
|
||||||
|
return newSqliteCache(createMemoryFilename(), true)
|
||||||
|
}
|
||||||
|
|
||||||
|
// createMemoryFilename creates a unique memory filename to use for the SQLite backend.
|
||||||
|
// From mattn/go-sqlite3: "Each connection to ":memory:" opens a brand new in-memory
|
||||||
|
// sql database, so if the stdlib's sql engine happens to open another connection and
|
||||||
|
// you've only specified ":memory:", that connection will see a brand new database.
|
||||||
|
// A workaround is to use "file::memory:?cache=shared" (or "file:foobar?mode=memory&cache=shared").
|
||||||
|
// Every connection to this string will point to the same in-memory database."
|
||||||
|
func createMemoryFilename() string {
|
||||||
|
return fmt.Sprintf("file:%s?mode=memory&cache=shared", util.RandomString(10))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *sqliteCache) AddMessage(m *message) error {
|
||||||
|
if m.Event != messageEvent {
|
||||||
|
return errUnexpectedMessageType
|
||||||
|
}
|
||||||
|
if c.nop {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
published := m.Time <= time.Now().Unix()
|
||||||
|
tags := strings.Join(m.Tags, ",")
|
||||||
|
var attachmentName, attachmentType, attachmentURL, attachmentOwner string
|
||||||
|
var attachmentSize, attachmentExpires int64
|
||||||
|
if m.Attachment != nil {
|
||||||
|
attachmentName = m.Attachment.Name
|
||||||
|
attachmentType = m.Attachment.Type
|
||||||
|
attachmentSize = m.Attachment.Size
|
||||||
|
attachmentExpires = m.Attachment.Expires
|
||||||
|
attachmentURL = m.Attachment.URL
|
||||||
|
attachmentOwner = m.Attachment.Owner
|
||||||
|
}
|
||||||
|
_, err := c.db.Exec(
|
||||||
|
insertMessageQuery,
|
||||||
|
m.ID,
|
||||||
|
m.Time,
|
||||||
|
m.Topic,
|
||||||
|
m.Message,
|
||||||
|
m.Title,
|
||||||
|
m.Priority,
|
||||||
|
tags,
|
||||||
|
m.Click,
|
||||||
|
attachmentName,
|
||||||
|
attachmentType,
|
||||||
|
attachmentSize,
|
||||||
|
attachmentExpires,
|
||||||
|
attachmentURL,
|
||||||
|
attachmentOwner,
|
||||||
|
m.Encoding,
|
||||||
|
published,
|
||||||
|
)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *sqliteCache) Messages(topic string, since sinceMarker, scheduled bool) ([]*message, error) {
|
||||||
|
if since.IsNone() {
|
||||||
|
return make([]*message, 0), nil
|
||||||
|
} else if since.IsID() {
|
||||||
|
return c.messagesSinceID(topic, since, scheduled)
|
||||||
|
}
|
||||||
|
return c.messagesSinceTime(topic, since, scheduled)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *sqliteCache) messagesSinceTime(topic string, since sinceMarker, scheduled bool) ([]*message, error) {
|
||||||
|
var rows *sql.Rows
|
||||||
|
var err error
|
||||||
|
if scheduled {
|
||||||
|
rows, err = c.db.Query(selectMessagesSinceTimeIncludeScheduledQuery, topic, since.Time().Unix())
|
||||||
|
} else {
|
||||||
|
rows, err = c.db.Query(selectMessagesSinceTimeQuery, topic, since.Time().Unix())
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return readMessages(rows)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *sqliteCache) messagesSinceID(topic string, since sinceMarker, scheduled bool) ([]*message, error) {
|
||||||
|
idrows, err := c.db.Query(selectRowIDFromMessageID, topic, since.ID())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer idrows.Close()
|
||||||
|
if !idrows.Next() {
|
||||||
|
return c.messagesSinceTime(topic, sinceAllMessages, scheduled)
|
||||||
|
}
|
||||||
|
var rowID int64
|
||||||
|
if err := idrows.Scan(&rowID); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
idrows.Close()
|
||||||
|
var rows *sql.Rows
|
||||||
|
if scheduled {
|
||||||
|
rows, err = c.db.Query(selectMessagesSinceIDIncludeScheduledQuery, topic, rowID)
|
||||||
|
} else {
|
||||||
|
rows, err = c.db.Query(selectMessagesSinceIDQuery, topic, rowID)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return readMessages(rows)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *sqliteCache) MessagesDue() ([]*message, error) {
|
||||||
|
rows, err := c.db.Query(selectMessagesDueQuery, time.Now().Unix())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return readMessages(rows)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *sqliteCache) MarkPublished(m *message) error {
|
||||||
|
_, err := c.db.Exec(updateMessagePublishedQuery, m.ID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *sqliteCache) MessageCount(topic string) (int, error) {
|
||||||
|
rows, err := c.db.Query(selectMessageCountForTopicQuery, topic)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
var count int
|
||||||
|
if !rows.Next() {
|
||||||
|
return 0, errors.New("no rows found")
|
||||||
|
}
|
||||||
|
if err := rows.Scan(&count); err != nil {
|
||||||
|
return 0, err
|
||||||
|
} else if err := rows.Err(); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return count, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *sqliteCache) Topics() (map[string]*topic, error) {
|
||||||
|
rows, err := c.db.Query(selectTopicsQuery)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
topics := make(map[string]*topic)
|
||||||
|
for rows.Next() {
|
||||||
|
var id string
|
||||||
|
if err := rows.Scan(&id); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
topics[id] = newTopic(id)
|
||||||
|
}
|
||||||
|
if err := rows.Err(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return topics, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *sqliteCache) Prune(olderThan time.Time) error {
|
||||||
|
_, err := c.db.Exec(pruneMessagesQuery, olderThan.Unix())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *sqliteCache) AttachmentsSize(owner string) (int64, error) {
|
||||||
|
rows, err := c.db.Query(selectAttachmentsSizeQuery, owner, time.Now().Unix())
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
var size int64
|
||||||
|
if !rows.Next() {
|
||||||
|
return 0, errors.New("no rows found")
|
||||||
|
}
|
||||||
|
if err := rows.Scan(&size); err != nil {
|
||||||
|
return 0, err
|
||||||
|
} else if err := rows.Err(); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return size, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *sqliteCache) AttachmentsExpired() ([]string, error) {
|
||||||
|
rows, err := c.db.Query(selectAttachmentsExpiredQuery, time.Now().Unix())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
ids := make([]string, 0)
|
||||||
|
for rows.Next() {
|
||||||
|
var id string
|
||||||
|
if err := rows.Scan(&id); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
ids = append(ids, id)
|
||||||
|
}
|
||||||
|
if err := rows.Err(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return ids, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func readMessages(rows *sql.Rows) ([]*message, error) {
|
||||||
|
defer rows.Close()
|
||||||
|
messages := make([]*message, 0)
|
||||||
|
for rows.Next() {
|
||||||
|
var timestamp, attachmentSize, attachmentExpires int64
|
||||||
|
var priority int
|
||||||
|
var id, topic, msg, title, tagsStr, click, attachmentName, attachmentType, attachmentURL, attachmentOwner, encoding string
|
||||||
|
err := rows.Scan(
|
||||||
|
&id,
|
||||||
|
×tamp,
|
||||||
|
&topic,
|
||||||
|
&msg,
|
||||||
|
&title,
|
||||||
|
&priority,
|
||||||
|
&tagsStr,
|
||||||
|
&click,
|
||||||
|
&attachmentName,
|
||||||
|
&attachmentType,
|
||||||
|
&attachmentSize,
|
||||||
|
&attachmentExpires,
|
||||||
|
&attachmentURL,
|
||||||
|
&attachmentOwner,
|
||||||
|
&encoding,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
var tags []string
|
||||||
|
if tagsStr != "" {
|
||||||
|
tags = strings.Split(tagsStr, ",")
|
||||||
|
}
|
||||||
|
var att *attachment
|
||||||
|
if attachmentName != "" && attachmentURL != "" {
|
||||||
|
att = &attachment{
|
||||||
|
Name: attachmentName,
|
||||||
|
Type: attachmentType,
|
||||||
|
Size: attachmentSize,
|
||||||
|
Expires: attachmentExpires,
|
||||||
|
URL: attachmentURL,
|
||||||
|
Owner: attachmentOwner,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
messages = append(messages, &message{
|
||||||
|
ID: id,
|
||||||
|
Time: timestamp,
|
||||||
|
Event: messageEvent,
|
||||||
|
Topic: topic,
|
||||||
|
Message: msg,
|
||||||
|
Title: title,
|
||||||
|
Priority: priority,
|
||||||
|
Tags: tags,
|
||||||
|
Click: click,
|
||||||
|
Attachment: att,
|
||||||
|
Encoding: encoding,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
if err := rows.Err(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return messages, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func setupCacheDB(db *sql.DB) error {
|
||||||
|
// If 'messages' table does not exist, this must be a new database
|
||||||
|
rowsMC, err := db.Query(selectMessagesCountQuery)
|
||||||
|
if err != nil {
|
||||||
|
return setupNewCacheDB(db)
|
||||||
|
}
|
||||||
|
rowsMC.Close()
|
||||||
|
|
||||||
|
// If 'messages' table exists, check 'schemaVersion' table
|
||||||
|
schemaVersion := 0
|
||||||
|
rowsSV, err := db.Query(selectSchemaVersionQuery)
|
||||||
|
if err == nil {
|
||||||
|
defer rowsSV.Close()
|
||||||
|
if !rowsSV.Next() {
|
||||||
|
return errors.New("cannot determine schema version: cache file may be corrupt")
|
||||||
|
}
|
||||||
|
if err := rowsSV.Scan(&schemaVersion); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
rowsSV.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Do migrations
|
||||||
|
if schemaVersion == currentSchemaVersion {
|
||||||
|
return nil
|
||||||
|
} else if schemaVersion == 0 {
|
||||||
|
return migrateFrom0(db)
|
||||||
|
} else if schemaVersion == 1 {
|
||||||
|
return migrateFrom1(db)
|
||||||
|
} else if schemaVersion == 2 {
|
||||||
|
return migrateFrom2(db)
|
||||||
|
} else if schemaVersion == 3 {
|
||||||
|
return migrateFrom3(db)
|
||||||
|
} else if schemaVersion == 4 {
|
||||||
|
return migrateFrom4(db)
|
||||||
|
}
|
||||||
|
return fmt.Errorf("unexpected schema version found: %d", schemaVersion)
|
||||||
|
}
|
||||||
|
|
||||||
|
func setupNewCacheDB(db *sql.DB) error {
|
||||||
|
if _, err := db.Exec(createMessagesTableQuery); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := db.Exec(createSchemaVersionTableQuery); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := db.Exec(insertSchemaVersion, currentSchemaVersion); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func migrateFrom0(db *sql.DB) error {
|
||||||
|
log.Print("Migrating cache database schema: from 0 to 1")
|
||||||
|
if _, err := db.Exec(migrate0To1AlterMessagesTableQuery); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := db.Exec(createSchemaVersionTableQuery); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := db.Exec(insertSchemaVersion, 1); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return migrateFrom1(db)
|
||||||
|
}
|
||||||
|
|
||||||
|
func migrateFrom1(db *sql.DB) error {
|
||||||
|
log.Print("Migrating cache database schema: from 1 to 2")
|
||||||
|
if _, err := db.Exec(migrate1To2AlterMessagesTableQuery); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := db.Exec(updateSchemaVersion, 2); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return migrateFrom2(db)
|
||||||
|
}
|
||||||
|
|
||||||
|
func migrateFrom2(db *sql.DB) error {
|
||||||
|
log.Print("Migrating cache database schema: from 2 to 3")
|
||||||
|
if _, err := db.Exec(migrate2To3AlterMessagesTableQuery); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := db.Exec(updateSchemaVersion, 3); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return migrateFrom3(db)
|
||||||
|
}
|
||||||
|
|
||||||
|
func migrateFrom3(db *sql.DB) error {
|
||||||
|
log.Print("Migrating cache database schema: from 3 to 4")
|
||||||
|
if _, err := db.Exec(migrate3To4AlterMessagesTableQuery); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := db.Exec(updateSchemaVersion, 4); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return migrateFrom4(db)
|
||||||
|
}
|
||||||
|
|
||||||
|
func migrateFrom4(db *sql.DB) error {
|
||||||
|
log.Print("Migrating cache database schema: from 4 to 5")
|
||||||
|
if _, err := db.Exec(migrate4To5AlterMessagesTableQuery); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := db.Exec(updateSchemaVersion, 5); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil // Update this when a new version is added
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,55 +0,0 @@
|
||||||
package server
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"testing"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestMemCache_Messages(t *testing.T) {
|
|
||||||
testCacheMessages(t, newMemTestCache(t))
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestMemCache_MessagesScheduled(t *testing.T) {
|
|
||||||
testCacheMessagesScheduled(t, newMemTestCache(t))
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestMemCache_Topics(t *testing.T) {
|
|
||||||
testCacheTopics(t, newMemTestCache(t))
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestMemCache_MessagesTagsPrioAndTitle(t *testing.T) {
|
|
||||||
testCacheMessagesTagsPrioAndTitle(t, newMemTestCache(t))
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestMemCache_MessagesSinceID(t *testing.T) {
|
|
||||||
testCacheMessagesSinceID(t, newMemTestCache(t))
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestMemCache_Prune(t *testing.T) {
|
|
||||||
testCachePrune(t, newMemTestCache(t))
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestMemCache_Attachments(t *testing.T) {
|
|
||||||
testCacheAttachments(t, newMemTestCache(t))
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestMemCache_NopCache(t *testing.T) {
|
|
||||||
c, _ := newNopCache()
|
|
||||||
assert.Nil(t, c.AddMessage(newDefaultMessage("mytopic", "my message")))
|
|
||||||
|
|
||||||
messages, err := c.Messages("mytopic", sinceAllMessages, false)
|
|
||||||
assert.Nil(t, err)
|
|
||||||
assert.Empty(t, messages)
|
|
||||||
|
|
||||||
topics, err := c.Topics()
|
|
||||||
assert.Nil(t, err)
|
|
||||||
assert.Empty(t, topics)
|
|
||||||
}
|
|
||||||
|
|
||||||
func newMemTestCache(t *testing.T) cache {
|
|
||||||
c, err := newMemCache()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
return c
|
|
||||||
}
|
|
|
@ -1,563 +0,0 @@
|
||||||
package server
|
|
||||||
|
|
||||||
import (
|
|
||||||
"database/sql"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
_ "github.com/mattn/go-sqlite3" // SQLite driver
|
|
||||||
"heckel.io/ntfy/util"
|
|
||||||
"log"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Messages cache
|
|
||||||
const (
|
|
||||||
createMessagesTableQuery = `
|
|
||||||
BEGIN;
|
|
||||||
CREATE TABLE IF NOT EXISTS messages (
|
|
||||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
||||||
mid TEXT NOT NULL,
|
|
||||||
time INT NOT NULL,
|
|
||||||
topic TEXT NOT NULL,
|
|
||||||
message TEXT NOT NULL,
|
|
||||||
title TEXT NOT NULL,
|
|
||||||
priority INT NOT NULL,
|
|
||||||
tags TEXT NOT NULL,
|
|
||||||
click TEXT NOT NULL,
|
|
||||||
attachment_name TEXT NOT NULL,
|
|
||||||
attachment_type TEXT NOT NULL,
|
|
||||||
attachment_size INT NOT NULL,
|
|
||||||
attachment_expires INT NOT NULL,
|
|
||||||
attachment_url TEXT NOT NULL,
|
|
||||||
attachment_owner TEXT NOT NULL,
|
|
||||||
encoding TEXT NOT NULL,
|
|
||||||
published INT NOT NULL
|
|
||||||
);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_mid ON messages (mid);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
|
|
||||||
COMMIT;
|
|
||||||
`
|
|
||||||
insertMessageQuery = `
|
|
||||||
INSERT INTO messages (mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding, published)
|
|
||||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
||||||
`
|
|
||||||
pruneMessagesQuery = `DELETE FROM messages WHERE time < ? AND published = 1`
|
|
||||||
selectRowIDFromMessageID = `SELECT id FROM messages WHERE topic = ? AND mid = ?`
|
|
||||||
selectMessagesSinceTimeQuery = `
|
|
||||||
SELECT mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
|
|
||||||
FROM messages
|
|
||||||
WHERE topic = ? AND time >= ? AND published = 1
|
|
||||||
ORDER BY time, id
|
|
||||||
`
|
|
||||||
selectMessagesSinceTimeIncludeScheduledQuery = `
|
|
||||||
SELECT mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
|
|
||||||
FROM messages
|
|
||||||
WHERE topic = ? AND time >= ?
|
|
||||||
ORDER BY time, id
|
|
||||||
`
|
|
||||||
selectMessagesSinceIDQuery = `
|
|
||||||
SELECT mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
|
|
||||||
FROM messages
|
|
||||||
WHERE topic = ? AND id > ? AND published = 1
|
|
||||||
ORDER BY time, id
|
|
||||||
`
|
|
||||||
selectMessagesSinceIDIncludeScheduledQuery = `
|
|
||||||
SELECT mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
|
|
||||||
FROM messages
|
|
||||||
WHERE topic = ? AND (id > ? OR published = 0)
|
|
||||||
ORDER BY time, id
|
|
||||||
`
|
|
||||||
selectMessagesDueQuery = `
|
|
||||||
SELECT mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
|
|
||||||
FROM messages
|
|
||||||
WHERE time <= ? AND published = 0
|
|
||||||
ORDER BY time, id
|
|
||||||
`
|
|
||||||
updateMessagePublishedQuery = `UPDATE messages SET published = 1 WHERE mid = ?`
|
|
||||||
selectMessagesCountQuery = `SELECT COUNT(*) FROM messages`
|
|
||||||
selectMessageCountForTopicQuery = `SELECT COUNT(*) FROM messages WHERE topic = ?`
|
|
||||||
selectTopicsQuery = `SELECT topic FROM messages GROUP BY topic`
|
|
||||||
selectAttachmentsSizeQuery = `SELECT IFNULL(SUM(attachment_size), 0) FROM messages WHERE attachment_owner = ? AND attachment_expires >= ?`
|
|
||||||
selectAttachmentsExpiredQuery = `SELECT mid FROM messages WHERE attachment_expires > 0 AND attachment_expires < ?`
|
|
||||||
)
|
|
||||||
|
|
||||||
// Schema management queries
|
|
||||||
const (
|
|
||||||
currentSchemaVersion = 5
|
|
||||||
createSchemaVersionTableQuery = `
|
|
||||||
CREATE TABLE IF NOT EXISTS schemaVersion (
|
|
||||||
id INT PRIMARY KEY,
|
|
||||||
version INT NOT NULL
|
|
||||||
);
|
|
||||||
`
|
|
||||||
insertSchemaVersion = `INSERT INTO schemaVersion VALUES (1, ?)`
|
|
||||||
updateSchemaVersion = `UPDATE schemaVersion SET version = ? WHERE id = 1`
|
|
||||||
selectSchemaVersionQuery = `SELECT version FROM schemaVersion WHERE id = 1`
|
|
||||||
|
|
||||||
// 0 -> 1
|
|
||||||
migrate0To1AlterMessagesTableQuery = `
|
|
||||||
BEGIN;
|
|
||||||
ALTER TABLE messages ADD COLUMN title TEXT NOT NULL DEFAULT('');
|
|
||||||
ALTER TABLE messages ADD COLUMN priority INT NOT NULL DEFAULT(0);
|
|
||||||
ALTER TABLE messages ADD COLUMN tags TEXT NOT NULL DEFAULT('');
|
|
||||||
COMMIT;
|
|
||||||
`
|
|
||||||
|
|
||||||
// 1 -> 2
|
|
||||||
migrate1To2AlterMessagesTableQuery = `
|
|
||||||
ALTER TABLE messages ADD COLUMN published INT NOT NULL DEFAULT(1);
|
|
||||||
`
|
|
||||||
|
|
||||||
// 2 -> 3
|
|
||||||
migrate2To3AlterMessagesTableQuery = `
|
|
||||||
BEGIN;
|
|
||||||
ALTER TABLE messages ADD COLUMN click TEXT NOT NULL DEFAULT('');
|
|
||||||
ALTER TABLE messages ADD COLUMN attachment_name TEXT NOT NULL DEFAULT('');
|
|
||||||
ALTER TABLE messages ADD COLUMN attachment_type TEXT NOT NULL DEFAULT('');
|
|
||||||
ALTER TABLE messages ADD COLUMN attachment_size INT NOT NULL DEFAULT('0');
|
|
||||||
ALTER TABLE messages ADD COLUMN attachment_expires INT NOT NULL DEFAULT('0');
|
|
||||||
ALTER TABLE messages ADD COLUMN attachment_owner TEXT NOT NULL DEFAULT('');
|
|
||||||
ALTER TABLE messages ADD COLUMN attachment_url TEXT NOT NULL DEFAULT('');
|
|
||||||
COMMIT;
|
|
||||||
`
|
|
||||||
// 3 -> 4
|
|
||||||
migrate3To4AlterMessagesTableQuery = `
|
|
||||||
ALTER TABLE messages ADD COLUMN encoding TEXT NOT NULL DEFAULT('');
|
|
||||||
`
|
|
||||||
|
|
||||||
// 4 -> 5
|
|
||||||
migrate4To5AlterMessagesTableQuery = `
|
|
||||||
BEGIN;
|
|
||||||
CREATE TABLE IF NOT EXISTS messages_new (
|
|
||||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
||||||
mid TEXT NOT NULL,
|
|
||||||
time INT NOT NULL,
|
|
||||||
topic TEXT NOT NULL,
|
|
||||||
message TEXT NOT NULL,
|
|
||||||
title TEXT NOT NULL,
|
|
||||||
priority INT NOT NULL,
|
|
||||||
tags TEXT NOT NULL,
|
|
||||||
click TEXT NOT NULL,
|
|
||||||
attachment_name TEXT NOT NULL,
|
|
||||||
attachment_type TEXT NOT NULL,
|
|
||||||
attachment_size INT NOT NULL,
|
|
||||||
attachment_expires INT NOT NULL,
|
|
||||||
attachment_url TEXT NOT NULL,
|
|
||||||
attachment_owner TEXT NOT NULL,
|
|
||||||
encoding TEXT NOT NULL,
|
|
||||||
published INT NOT NULL
|
|
||||||
);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_mid ON messages_new (mid);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_topic ON messages_new (topic);
|
|
||||||
INSERT
|
|
||||||
INTO messages_new (
|
|
||||||
mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type,
|
|
||||||
attachment_size, attachment_expires, attachment_url, attachment_owner, encoding, published)
|
|
||||||
SELECT
|
|
||||||
id, time, topic, message, title, priority, tags, click, attachment_name, attachment_type,
|
|
||||||
attachment_size, attachment_expires, attachment_url, attachment_owner, encoding, published
|
|
||||||
FROM messages;
|
|
||||||
DROP TABLE messages;
|
|
||||||
ALTER TABLE messages_new RENAME TO messages;
|
|
||||||
COMMIT;
|
|
||||||
`
|
|
||||||
)
|
|
||||||
|
|
||||||
type sqliteCache struct {
|
|
||||||
db *sql.DB
|
|
||||||
nop bool
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ cache = (*sqliteCache)(nil)
|
|
||||||
|
|
||||||
func newSqliteCache(filename string, nop bool) (*sqliteCache, error) {
|
|
||||||
db, err := sql.Open("sqlite3", filename)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if err := setupCacheDB(db); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return &sqliteCache{
|
|
||||||
db: db,
|
|
||||||
nop: nop,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// newMemCache creates an in-memory cache
|
|
||||||
func newMemCache() (*sqliteCache, error) {
|
|
||||||
return newSqliteCache(createMemoryFilename(), false)
|
|
||||||
}
|
|
||||||
|
|
||||||
// newNopCache creates an in-memory cache that discards all messages;
|
|
||||||
// it is always empty and can be used if caching is entirely disabled
|
|
||||||
func newNopCache() (*sqliteCache, error) {
|
|
||||||
return newSqliteCache(createMemoryFilename(), true)
|
|
||||||
}
|
|
||||||
|
|
||||||
// createMemoryFilename creates a unique memory filename to use for the SQLite backend.
|
|
||||||
// From mattn/go-sqlite3: "Each connection to ":memory:" opens a brand new in-memory
|
|
||||||
// sql database, so if the stdlib's sql engine happens to open another connection and
|
|
||||||
// you've only specified ":memory:", that connection will see a brand new database.
|
|
||||||
// A workaround is to use "file::memory:?cache=shared" (or "file:foobar?mode=memory&cache=shared").
|
|
||||||
// Every connection to this string will point to the same in-memory database."
|
|
||||||
func createMemoryFilename() string {
|
|
||||||
return fmt.Sprintf("file:%s?mode=memory&cache=shared", util.RandomString(10))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *sqliteCache) AddMessage(m *message) error {
|
|
||||||
if m.Event != messageEvent {
|
|
||||||
return errUnexpectedMessageType
|
|
||||||
}
|
|
||||||
if c.nop {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
published := m.Time <= time.Now().Unix()
|
|
||||||
tags := strings.Join(m.Tags, ",")
|
|
||||||
var attachmentName, attachmentType, attachmentURL, attachmentOwner string
|
|
||||||
var attachmentSize, attachmentExpires int64
|
|
||||||
if m.Attachment != nil {
|
|
||||||
attachmentName = m.Attachment.Name
|
|
||||||
attachmentType = m.Attachment.Type
|
|
||||||
attachmentSize = m.Attachment.Size
|
|
||||||
attachmentExpires = m.Attachment.Expires
|
|
||||||
attachmentURL = m.Attachment.URL
|
|
||||||
attachmentOwner = m.Attachment.Owner
|
|
||||||
}
|
|
||||||
_, err := c.db.Exec(
|
|
||||||
insertMessageQuery,
|
|
||||||
m.ID,
|
|
||||||
m.Time,
|
|
||||||
m.Topic,
|
|
||||||
m.Message,
|
|
||||||
m.Title,
|
|
||||||
m.Priority,
|
|
||||||
tags,
|
|
||||||
m.Click,
|
|
||||||
attachmentName,
|
|
||||||
attachmentType,
|
|
||||||
attachmentSize,
|
|
||||||
attachmentExpires,
|
|
||||||
attachmentURL,
|
|
||||||
attachmentOwner,
|
|
||||||
m.Encoding,
|
|
||||||
published,
|
|
||||||
)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *sqliteCache) Messages(topic string, since sinceMarker, scheduled bool) ([]*message, error) {
|
|
||||||
if since.IsNone() {
|
|
||||||
return make([]*message, 0), nil
|
|
||||||
} else if since.IsID() {
|
|
||||||
return c.messagesSinceID(topic, since, scheduled)
|
|
||||||
}
|
|
||||||
return c.messagesSinceTime(topic, since, scheduled)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *sqliteCache) messagesSinceTime(topic string, since sinceMarker, scheduled bool) ([]*message, error) {
|
|
||||||
var rows *sql.Rows
|
|
||||||
var err error
|
|
||||||
if scheduled {
|
|
||||||
rows, err = c.db.Query(selectMessagesSinceTimeIncludeScheduledQuery, topic, since.Time().Unix())
|
|
||||||
} else {
|
|
||||||
rows, err = c.db.Query(selectMessagesSinceTimeQuery, topic, since.Time().Unix())
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return readMessages(rows)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *sqliteCache) messagesSinceID(topic string, since sinceMarker, scheduled bool) ([]*message, error) {
|
|
||||||
idrows, err := c.db.Query(selectRowIDFromMessageID, topic, since.ID())
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer idrows.Close()
|
|
||||||
if !idrows.Next() {
|
|
||||||
return c.messagesSinceTime(topic, sinceAllMessages, scheduled)
|
|
||||||
}
|
|
||||||
var rowID int64
|
|
||||||
if err := idrows.Scan(&rowID); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
idrows.Close()
|
|
||||||
var rows *sql.Rows
|
|
||||||
if scheduled {
|
|
||||||
rows, err = c.db.Query(selectMessagesSinceIDIncludeScheduledQuery, topic, rowID)
|
|
||||||
} else {
|
|
||||||
rows, err = c.db.Query(selectMessagesSinceIDQuery, topic, rowID)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return readMessages(rows)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *sqliteCache) MessagesDue() ([]*message, error) {
|
|
||||||
rows, err := c.db.Query(selectMessagesDueQuery, time.Now().Unix())
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return readMessages(rows)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *sqliteCache) MarkPublished(m *message) error {
|
|
||||||
_, err := c.db.Exec(updateMessagePublishedQuery, m.ID)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *sqliteCache) MessageCount(topic string) (int, error) {
|
|
||||||
rows, err := c.db.Query(selectMessageCountForTopicQuery, topic)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
defer rows.Close()
|
|
||||||
var count int
|
|
||||||
if !rows.Next() {
|
|
||||||
return 0, errors.New("no rows found")
|
|
||||||
}
|
|
||||||
if err := rows.Scan(&count); err != nil {
|
|
||||||
return 0, err
|
|
||||||
} else if err := rows.Err(); err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
return count, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *sqliteCache) Topics() (map[string]*topic, error) {
|
|
||||||
rows, err := c.db.Query(selectTopicsQuery)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer rows.Close()
|
|
||||||
topics := make(map[string]*topic)
|
|
||||||
for rows.Next() {
|
|
||||||
var id string
|
|
||||||
if err := rows.Scan(&id); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
topics[id] = newTopic(id)
|
|
||||||
}
|
|
||||||
if err := rows.Err(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return topics, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *sqliteCache) Prune(olderThan time.Time) error {
|
|
||||||
_, err := c.db.Exec(pruneMessagesQuery, olderThan.Unix())
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *sqliteCache) AttachmentsSize(owner string) (int64, error) {
|
|
||||||
rows, err := c.db.Query(selectAttachmentsSizeQuery, owner, time.Now().Unix())
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
defer rows.Close()
|
|
||||||
var size int64
|
|
||||||
if !rows.Next() {
|
|
||||||
return 0, errors.New("no rows found")
|
|
||||||
}
|
|
||||||
if err := rows.Scan(&size); err != nil {
|
|
||||||
return 0, err
|
|
||||||
} else if err := rows.Err(); err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
return size, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *sqliteCache) AttachmentsExpired() ([]string, error) {
|
|
||||||
rows, err := c.db.Query(selectAttachmentsExpiredQuery, time.Now().Unix())
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer rows.Close()
|
|
||||||
ids := make([]string, 0)
|
|
||||||
for rows.Next() {
|
|
||||||
var id string
|
|
||||||
if err := rows.Scan(&id); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
ids = append(ids, id)
|
|
||||||
}
|
|
||||||
if err := rows.Err(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return ids, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func readMessages(rows *sql.Rows) ([]*message, error) {
|
|
||||||
defer rows.Close()
|
|
||||||
messages := make([]*message, 0)
|
|
||||||
for rows.Next() {
|
|
||||||
var timestamp, attachmentSize, attachmentExpires int64
|
|
||||||
var priority int
|
|
||||||
var id, topic, msg, title, tagsStr, click, attachmentName, attachmentType, attachmentURL, attachmentOwner, encoding string
|
|
||||||
err := rows.Scan(
|
|
||||||
&id,
|
|
||||||
×tamp,
|
|
||||||
&topic,
|
|
||||||
&msg,
|
|
||||||
&title,
|
|
||||||
&priority,
|
|
||||||
&tagsStr,
|
|
||||||
&click,
|
|
||||||
&attachmentName,
|
|
||||||
&attachmentType,
|
|
||||||
&attachmentSize,
|
|
||||||
&attachmentExpires,
|
|
||||||
&attachmentURL,
|
|
||||||
&attachmentOwner,
|
|
||||||
&encoding,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
var tags []string
|
|
||||||
if tagsStr != "" {
|
|
||||||
tags = strings.Split(tagsStr, ",")
|
|
||||||
}
|
|
||||||
var att *attachment
|
|
||||||
if attachmentName != "" && attachmentURL != "" {
|
|
||||||
att = &attachment{
|
|
||||||
Name: attachmentName,
|
|
||||||
Type: attachmentType,
|
|
||||||
Size: attachmentSize,
|
|
||||||
Expires: attachmentExpires,
|
|
||||||
URL: attachmentURL,
|
|
||||||
Owner: attachmentOwner,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
messages = append(messages, &message{
|
|
||||||
ID: id,
|
|
||||||
Time: timestamp,
|
|
||||||
Event: messageEvent,
|
|
||||||
Topic: topic,
|
|
||||||
Message: msg,
|
|
||||||
Title: title,
|
|
||||||
Priority: priority,
|
|
||||||
Tags: tags,
|
|
||||||
Click: click,
|
|
||||||
Attachment: att,
|
|
||||||
Encoding: encoding,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
if err := rows.Err(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return messages, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func setupCacheDB(db *sql.DB) error {
|
|
||||||
// If 'messages' table does not exist, this must be a new database
|
|
||||||
rowsMC, err := db.Query(selectMessagesCountQuery)
|
|
||||||
if err != nil {
|
|
||||||
return setupNewCacheDB(db)
|
|
||||||
}
|
|
||||||
rowsMC.Close()
|
|
||||||
|
|
||||||
// If 'messages' table exists, check 'schemaVersion' table
|
|
||||||
schemaVersion := 0
|
|
||||||
rowsSV, err := db.Query(selectSchemaVersionQuery)
|
|
||||||
if err == nil {
|
|
||||||
defer rowsSV.Close()
|
|
||||||
if !rowsSV.Next() {
|
|
||||||
return errors.New("cannot determine schema version: cache file may be corrupt")
|
|
||||||
}
|
|
||||||
if err := rowsSV.Scan(&schemaVersion); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
rowsSV.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Do migrations
|
|
||||||
if schemaVersion == currentSchemaVersion {
|
|
||||||
return nil
|
|
||||||
} else if schemaVersion == 0 {
|
|
||||||
return migrateFrom0(db)
|
|
||||||
} else if schemaVersion == 1 {
|
|
||||||
return migrateFrom1(db)
|
|
||||||
} else if schemaVersion == 2 {
|
|
||||||
return migrateFrom2(db)
|
|
||||||
} else if schemaVersion == 3 {
|
|
||||||
return migrateFrom3(db)
|
|
||||||
} else if schemaVersion == 4 {
|
|
||||||
return migrateFrom4(db)
|
|
||||||
}
|
|
||||||
return fmt.Errorf("unexpected schema version found: %d", schemaVersion)
|
|
||||||
}
|
|
||||||
|
|
||||||
func setupNewCacheDB(db *sql.DB) error {
|
|
||||||
if _, err := db.Exec(createMessagesTableQuery); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if _, err := db.Exec(createSchemaVersionTableQuery); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if _, err := db.Exec(insertSchemaVersion, currentSchemaVersion); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func migrateFrom0(db *sql.DB) error {
|
|
||||||
log.Print("Migrating cache database schema: from 0 to 1")
|
|
||||||
if _, err := db.Exec(migrate0To1AlterMessagesTableQuery); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if _, err := db.Exec(createSchemaVersionTableQuery); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if _, err := db.Exec(insertSchemaVersion, 1); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return migrateFrom1(db)
|
|
||||||
}
|
|
||||||
|
|
||||||
func migrateFrom1(db *sql.DB) error {
|
|
||||||
log.Print("Migrating cache database schema: from 1 to 2")
|
|
||||||
if _, err := db.Exec(migrate1To2AlterMessagesTableQuery); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if _, err := db.Exec(updateSchemaVersion, 2); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return migrateFrom2(db)
|
|
||||||
}
|
|
||||||
|
|
||||||
func migrateFrom2(db *sql.DB) error {
|
|
||||||
log.Print("Migrating cache database schema: from 2 to 3")
|
|
||||||
if _, err := db.Exec(migrate2To3AlterMessagesTableQuery); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if _, err := db.Exec(updateSchemaVersion, 3); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return migrateFrom3(db)
|
|
||||||
}
|
|
||||||
|
|
||||||
func migrateFrom3(db *sql.DB) error {
|
|
||||||
log.Print("Migrating cache database schema: from 3 to 4")
|
|
||||||
if _, err := db.Exec(migrate3To4AlterMessagesTableQuery); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if _, err := db.Exec(updateSchemaVersion, 4); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return migrateFrom4(db)
|
|
||||||
}
|
|
||||||
|
|
||||||
func migrateFrom4(db *sql.DB) error {
|
|
||||||
log.Print("Migrating cache database schema: from 4 to 5")
|
|
||||||
if _, err := db.Exec(migrate4To5AlterMessagesTableQuery); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if _, err := db.Exec(updateSchemaVersion, 5); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil // Update this when a new version is added
|
|
||||||
}
|
|
|
@ -3,6 +3,7 @@ package server
|
||||||
import (
|
import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -13,30 +14,58 @@ func TestSqliteCache_Messages(t *testing.T) {
|
||||||
testCacheMessages(t, newSqliteTestCache(t))
|
testCacheMessages(t, newSqliteTestCache(t))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMemCache_Messages(t *testing.T) {
|
||||||
|
testCacheMessages(t, newMemTestCache(t))
|
||||||
|
}
|
||||||
|
|
||||||
func TestSqliteCache_MessagesScheduled(t *testing.T) {
|
func TestSqliteCache_MessagesScheduled(t *testing.T) {
|
||||||
testCacheMessagesScheduled(t, newSqliteTestCache(t))
|
testCacheMessagesScheduled(t, newSqliteTestCache(t))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMemCache_MessagesScheduled(t *testing.T) {
|
||||||
|
testCacheMessagesScheduled(t, newMemTestCache(t))
|
||||||
|
}
|
||||||
|
|
||||||
func TestSqliteCache_Topics(t *testing.T) {
|
func TestSqliteCache_Topics(t *testing.T) {
|
||||||
testCacheTopics(t, newSqliteTestCache(t))
|
testCacheTopics(t, newSqliteTestCache(t))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMemCache_Topics(t *testing.T) {
|
||||||
|
testCacheTopics(t, newMemTestCache(t))
|
||||||
|
}
|
||||||
|
|
||||||
func TestSqliteCache_MessagesTagsPrioAndTitle(t *testing.T) {
|
func TestSqliteCache_MessagesTagsPrioAndTitle(t *testing.T) {
|
||||||
testCacheMessagesTagsPrioAndTitle(t, newSqliteTestCache(t))
|
testCacheMessagesTagsPrioAndTitle(t, newSqliteTestCache(t))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMemCache_MessagesTagsPrioAndTitle(t *testing.T) {
|
||||||
|
testCacheMessagesTagsPrioAndTitle(t, newMemTestCache(t))
|
||||||
|
}
|
||||||
|
|
||||||
func TestSqliteCache_MessagesSinceID(t *testing.T) {
|
func TestSqliteCache_MessagesSinceID(t *testing.T) {
|
||||||
testCacheMessagesSinceID(t, newSqliteTestCache(t))
|
testCacheMessagesSinceID(t, newSqliteTestCache(t))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMemCache_MessagesSinceID(t *testing.T) {
|
||||||
|
testCacheMessagesSinceID(t, newMemTestCache(t))
|
||||||
|
}
|
||||||
|
|
||||||
func TestSqliteCache_Prune(t *testing.T) {
|
func TestSqliteCache_Prune(t *testing.T) {
|
||||||
testCachePrune(t, newSqliteTestCache(t))
|
testCachePrune(t, newSqliteTestCache(t))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMemCache_Prune(t *testing.T) {
|
||||||
|
testCachePrune(t, newMemTestCache(t))
|
||||||
|
}
|
||||||
|
|
||||||
func TestSqliteCache_Attachments(t *testing.T) {
|
func TestSqliteCache_Attachments(t *testing.T) {
|
||||||
testCacheAttachments(t, newSqliteTestCache(t))
|
testCacheAttachments(t, newSqliteTestCache(t))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMemCache_Attachments(t *testing.T) {
|
||||||
|
testCacheAttachments(t, newMemTestCache(t))
|
||||||
|
}
|
||||||
|
|
||||||
func TestSqliteCache_Migration_From0(t *testing.T) {
|
func TestSqliteCache_Migration_From0(t *testing.T) {
|
||||||
filename := newSqliteTestCacheFile(t)
|
filename := newSqliteTestCacheFile(t)
|
||||||
db, err := sql.Open("sqlite3", filename)
|
db, err := sql.Open("sqlite3", filename)
|
||||||
|
@ -141,6 +170,19 @@ func checkSchemaVersion(t *testing.T, db *sql.DB) {
|
||||||
require.Nil(t, rows.Close())
|
require.Nil(t, rows.Close())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMemCache_NopCache(t *testing.T) {
|
||||||
|
c, _ := newNopCache()
|
||||||
|
assert.Nil(t, c.AddMessage(newDefaultMessage("mytopic", "my message")))
|
||||||
|
|
||||||
|
messages, err := c.Messages("mytopic", sinceAllMessages, false)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Empty(t, messages)
|
||||||
|
|
||||||
|
topics, err := c.Topics()
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Empty(t, topics)
|
||||||
|
}
|
||||||
|
|
||||||
func newSqliteTestCache(t *testing.T) *sqliteCache {
|
func newSqliteTestCache(t *testing.T) *sqliteCache {
|
||||||
c, err := newSqliteCache(newSqliteTestCacheFile(t), false)
|
c, err := newSqliteCache(newSqliteTestCacheFile(t), false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -160,3 +202,11 @@ func newSqliteTestCacheFromFile(t *testing.T, filename string) *sqliteCache {
|
||||||
}
|
}
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newMemTestCache(t *testing.T) *sqliteCache {
|
||||||
|
c, err := newMemCache()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
|
@ -6,7 +6,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func testCacheMessages(t *testing.T, c cache) {
|
func testCacheMessages(t *testing.T, c *sqliteCache) {
|
||||||
m1 := newDefaultMessage("mytopic", "my message")
|
m1 := newDefaultMessage("mytopic", "my message")
|
||||||
m1.Time = 1
|
m1.Time = 1
|
||||||
|
|
||||||
|
@ -72,7 +72,7 @@ func testCacheMessages(t *testing.T, c cache) {
|
||||||
require.Empty(t, messages)
|
require.Empty(t, messages)
|
||||||
}
|
}
|
||||||
|
|
||||||
func testCacheTopics(t *testing.T, c cache) {
|
func testCacheTopics(t *testing.T, c *sqliteCache) {
|
||||||
require.Nil(t, c.AddMessage(newDefaultMessage("topic1", "my example message")))
|
require.Nil(t, c.AddMessage(newDefaultMessage("topic1", "my example message")))
|
||||||
require.Nil(t, c.AddMessage(newDefaultMessage("topic2", "message 1")))
|
require.Nil(t, c.AddMessage(newDefaultMessage("topic2", "message 1")))
|
||||||
require.Nil(t, c.AddMessage(newDefaultMessage("topic2", "message 2")))
|
require.Nil(t, c.AddMessage(newDefaultMessage("topic2", "message 2")))
|
||||||
|
@ -87,7 +87,7 @@ func testCacheTopics(t *testing.T, c cache) {
|
||||||
require.Equal(t, "topic2", topics["topic2"].ID)
|
require.Equal(t, "topic2", topics["topic2"].ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func testCachePrune(t *testing.T, c cache) {
|
func testCachePrune(t *testing.T, c *sqliteCache) {
|
||||||
m1 := newDefaultMessage("mytopic", "my message")
|
m1 := newDefaultMessage("mytopic", "my message")
|
||||||
m1.Time = 1
|
m1.Time = 1
|
||||||
|
|
||||||
|
@ -116,7 +116,7 @@ func testCachePrune(t *testing.T, c cache) {
|
||||||
require.Equal(t, "my other message", messages[0].Message)
|
require.Equal(t, "my other message", messages[0].Message)
|
||||||
}
|
}
|
||||||
|
|
||||||
func testCacheMessagesTagsPrioAndTitle(t *testing.T, c cache) {
|
func testCacheMessagesTagsPrioAndTitle(t *testing.T, c *sqliteCache) {
|
||||||
m := newDefaultMessage("mytopic", "some message")
|
m := newDefaultMessage("mytopic", "some message")
|
||||||
m.Tags = []string{"tag1", "tag2"}
|
m.Tags = []string{"tag1", "tag2"}
|
||||||
m.Priority = 5
|
m.Priority = 5
|
||||||
|
@ -129,7 +129,7 @@ func testCacheMessagesTagsPrioAndTitle(t *testing.T, c cache) {
|
||||||
require.Equal(t, "some title", messages[0].Title)
|
require.Equal(t, "some title", messages[0].Title)
|
||||||
}
|
}
|
||||||
|
|
||||||
func testCacheMessagesScheduled(t *testing.T, c cache) {
|
func testCacheMessagesScheduled(t *testing.T, c *sqliteCache) {
|
||||||
m1 := newDefaultMessage("mytopic", "message 1")
|
m1 := newDefaultMessage("mytopic", "message 1")
|
||||||
m2 := newDefaultMessage("mytopic", "message 2")
|
m2 := newDefaultMessage("mytopic", "message 2")
|
||||||
m2.Time = time.Now().Add(time.Hour).Unix()
|
m2.Time = time.Now().Add(time.Hour).Unix()
|
||||||
|
@ -155,7 +155,7 @@ func testCacheMessagesScheduled(t *testing.T, c cache) {
|
||||||
require.Empty(t, messages)
|
require.Empty(t, messages)
|
||||||
}
|
}
|
||||||
|
|
||||||
func testCacheMessagesSinceID(t *testing.T, c cache) {
|
func testCacheMessagesSinceID(t *testing.T, c *sqliteCache) {
|
||||||
m1 := newDefaultMessage("mytopic", "message 1")
|
m1 := newDefaultMessage("mytopic", "message 1")
|
||||||
m1.Time = 100
|
m1.Time = 100
|
||||||
m2 := newDefaultMessage("mytopic", "message 2")
|
m2 := newDefaultMessage("mytopic", "message 2")
|
||||||
|
@ -220,7 +220,7 @@ func testCacheMessagesSinceID(t *testing.T, c cache) {
|
||||||
// TODO Add more delayed messages
|
// TODO Add more delayed messages
|
||||||
}
|
}
|
||||||
|
|
||||||
func testCacheAttachments(t *testing.T, c cache) {
|
func testCacheAttachments(t *testing.T, c *sqliteCache) {
|
||||||
expires1 := time.Now().Add(-4 * time.Hour).Unix()
|
expires1 := time.Now().Add(-4 * time.Hour).Unix()
|
||||||
m := newDefaultMessage("mytopic", "flower for you")
|
m := newDefaultMessage("mytopic", "flower for you")
|
||||||
m.ID = "m1"
|
m.ID = "m1"
|
||||||
|
|
|
@ -45,7 +45,7 @@ type Server struct {
|
||||||
mailer mailer
|
mailer mailer
|
||||||
messages int64
|
messages int64
|
||||||
auth auth.Auther
|
auth auth.Auther
|
||||||
cache cache
|
cache *sqliteCache
|
||||||
fileCache *fileCache
|
fileCache *fileCache
|
||||||
closeChan chan bool
|
closeChan chan bool
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
|
@ -160,7 +160,7 @@ func New(conf *Config) (*Server, error) {
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func createCache(conf *Config) (cache, error) {
|
func createCache(conf *Config) (*sqliteCache, error) {
|
||||||
if conf.CacheDuration == 0 {
|
if conf.CacheDuration == 0 {
|
||||||
return newNopCache()
|
return newNopCache()
|
||||||
} else if conf.CacheFile != "" {
|
} else if conf.CacheFile != "" {
|
||||||
|
|
Loading…
Reference in New Issue