From 7cc8c81bd886b9be7ef2ece6b8ac5bd8482c9db3 Mon Sep 17 00:00:00 2001 From: binwiederhier Date: Sun, 5 Feb 2023 23:34:27 -0500 Subject: [PATCH] Continued logging work --- client/client_test.go | 7 +++ cmd/access.go | 2 +- cmd/access_test.go | 2 - cmd/app.go | 11 +++- cmd/app_test.go | 2 +- cmd/publish.go | 2 +- cmd/serve.go | 4 +- cmd/subscribe.go | 2 +- cmd/token.go | 2 +- cmd/user.go | 2 +- log/event.go | 45 +++++++++++++---- log/log.go | 64 ++++++++++++++++++++--- log/log_test.go | 95 +++++++++++++++++++++-------------- log/types.go | 15 ++---- server/file_cache.go | 5 +- server/log.go | 14 ++++-- server/message_cache.go | 26 +++++----- server/server.go | 39 +++++++------- server/server.yml | 11 ++-- server/server_account.go | 7 ++- server/server_account_test.go | 4 +- server/server_firebase.go | 2 +- server/server_payments.go | 30 +++++------ server/server_test.go | 6 ++- server/smtp_sender.go | 3 +- server/types.go | 3 +- server/visitor.go | 21 ++------ user/manager.go | 32 +++++++----- 28 files changed, 287 insertions(+), 171 deletions(-) diff --git a/client/client_test.go b/client/client_test.go index 4ce00670..a71ea5cb 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -4,11 +4,18 @@ import ( "fmt" "github.com/stretchr/testify/require" "heckel.io/ntfy/client" + "heckel.io/ntfy/log" "heckel.io/ntfy/test" + "os" "testing" "time" ) +func TestMain(m *testing.M) { + log.SetLevel(log.ErrorLevel) + os.Exit(m.Run()) +} + func TestClient_Publish_Subscribe(t *testing.T) { s, port := test.StartServer(t) defer test.StopServer(t, s, port) diff --git a/cmd/access.go b/cmd/access.go index 3b83000a..40c84f2b 100644 --- a/cmd/access.go +++ b/cmd/access.go @@ -19,7 +19,7 @@ const ( ) var flagsAccess = append( - flagsUser, + append([]cli.Flag{}, flagsUser...), &cli.BoolFlag{Name: "reset", Aliases: []string{"r"}, Usage: "reset access for user (and topic)"}, ) diff --git a/cmd/access_test.go b/cmd/access_test.go index a9d1c534..6e3c5ba3 100644 --- a/cmd/access_test.go +++ b/cmd/access_test.go @@ -26,8 +26,6 @@ func TestCLI_Access_Grant_And_Publish(t *testing.T) { stdin.WriteString("philpass\nphilpass\nbenpass\nbenpass") require.Nil(t, runUserCommand(app, conf, "add", "--role=admin", "phil")) require.Nil(t, runUserCommand(app, conf, "add", "ben")) - - app, stdin, _, _ = newTestApp() require.Nil(t, runAccessCommand(app, conf, "ben", "announcements", "rw")) require.Nil(t, runAccessCommand(app, conf, "ben", "sometopic", "read")) require.Nil(t, runAccessCommand(app, conf, "everyone", "announcements", "read")) diff --git a/cmd/app.go b/cmd/app.go index 805c5dc7..9c2e6882 100644 --- a/cmd/app.go +++ b/cmd/app.go @@ -23,7 +23,8 @@ var flagsDefault = []cli.Flag{ &cli.BoolFlag{Name: "no-log-dates", Aliases: []string{"no_log_dates"}, EnvVars: []string{"NTFY_NO_LOG_DATES"}, Usage: "disable the date/time prefix"}, altsrc.NewStringFlag(&cli.StringFlag{Name: "log-level", Aliases: []string{"log_level"}, Value: log.InfoLevel.String(), EnvVars: []string{"NTFY_LOG_LEVEL"}, Usage: "set log level"}), altsrc.NewStringSliceFlag(&cli.StringSliceFlag{Name: "log-level-overrides", Aliases: []string{"log_level_overrides"}, EnvVars: []string{"NTFY_LOG_LEVEL_OVERRIDES"}, Usage: "set log level overrides"}), - altsrc.NewStringFlag(&cli.StringFlag{Name: "log-format", Aliases: []string{"log_format"}, Value: log.TextFormat.String(), EnvVars: []string{"NTFY_LOG_FORMAT"}, Usage: "set log level"}), + altsrc.NewStringFlag(&cli.StringFlag{Name: "log-format", Aliases: []string{"log_format"}, Value: log.TextFormat.String(), EnvVars: []string{"NTFY_LOG_FORMAT"}, Usage: "set log format"}), + altsrc.NewStringFlag(&cli.StringFlag{Name: "log-file", Aliases: []string{"log_file"}, EnvVars: []string{"NTFY_LOG_FILE"}, Usage: "set log file, default is STDOUT"}), } var ( @@ -61,6 +62,14 @@ func initLogFunc(c *cli.Context) error { if err := applyLogLevelOverrides(c.StringSlice("log-level-overrides")); err != nil { return err } + logFile := c.String("log-file") + if logFile != "" { + w, err := os.OpenFile(logFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600) + if err != nil { + return err + } + log.SetOutput(w) + } return nil } diff --git a/cmd/app_test.go b/cmd/app_test.go index 0e8f4cde..ec27a67d 100644 --- a/cmd/app_test.go +++ b/cmd/app_test.go @@ -14,7 +14,7 @@ import ( // This only contains helpers so far func TestMain(m *testing.M) { - log.SetLevel(log.WarnLevel) + log.SetLevel(log.ErrorLevel) os.Exit(m.Run()) } diff --git a/cmd/publish.go b/cmd/publish.go index 7a561973..83d79113 100644 --- a/cmd/publish.go +++ b/cmd/publish.go @@ -20,7 +20,7 @@ func init() { } var flagsPublish = append( - flagsDefault, + append([]cli.Flag{}, flagsDefault...), &cli.StringFlag{Name: "config", Aliases: []string{"c"}, EnvVars: []string{"NTFY_CONFIG"}, Usage: "client config file"}, &cli.StringFlag{Name: "title", Aliases: []string{"t"}, EnvVars: []string{"NTFY_TITLE"}, Usage: "message title"}, &cli.StringFlag{Name: "message", Aliases: []string{"m"}, EnvVars: []string{"NTFY_MESSAGE"}, Usage: "message body"}, diff --git a/cmd/serve.go b/cmd/serve.go index 3aaafeb8..f54b07e7 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -34,7 +34,7 @@ const ( ) var flagsServe = append( - flagsDefault, + append([]cli.Flag{}, flagsDefault...), &cli.StringFlag{Name: "config", Aliases: []string{"c"}, EnvVars: []string{"NTFY_CONFIG_FILE"}, Value: defaultServerConfigFile, DefaultText: defaultServerConfigFile, Usage: "config file"}, altsrc.NewStringFlag(&cli.StringFlag{Name: "base-url", Aliases: []string{"base_url", "B"}, EnvVars: []string{"NTFY_BASE_URL"}, Usage: "externally visible base URL for this host (e.g. https://ntfy.sh)"}), altsrc.NewStringFlag(&cli.StringFlag{Name: "listen-http", Aliases: []string{"listen_http", "l"}, EnvVars: []string{"NTFY_LISTEN_HTTP"}, Value: server.DefaultListenHTTP, Usage: "ip:port used to as HTTP listen address"}), @@ -378,7 +378,7 @@ func reloadLogLevel(inputSource altsrc.InputSourceContext) error { if err != nil { return fmt.Errorf("cannot load log level overrides (1): %s", err.Error()) } - log.ResetLevelOverride() + log.ResetLevelOverrides() if err := applyLogLevelOverrides(overrides); err != nil { return fmt.Errorf("cannot load log level overrides (2): %s", err.Error()) } diff --git a/cmd/subscribe.go b/cmd/subscribe.go index 7bfb6123..bbc6fb33 100644 --- a/cmd/subscribe.go +++ b/cmd/subscribe.go @@ -26,7 +26,7 @@ const ( ) var flagsSubscribe = append( - flagsDefault, + append([]cli.Flag{}, flagsDefault...), &cli.StringFlag{Name: "config", Aliases: []string{"c"}, Usage: "client config file"}, &cli.StringFlag{Name: "since", Aliases: []string{"s"}, Usage: "return events since `SINCE` (Unix timestamp, or all)"}, &cli.StringFlag{Name: "user", Aliases: []string{"u"}, EnvVars: []string{"NTFY_USER"}, Usage: "username[:password] used to auth against the server"}, diff --git a/cmd/token.go b/cmd/token.go index ed347083..dce8b672 100644 --- a/cmd/token.go +++ b/cmd/token.go @@ -16,7 +16,7 @@ func init() { commands = append(commands, cmdToken) } -var flagsToken = flagsUser +var flagsToken = append([]cli.Flag{}, flagsUser...) var cmdToken = &cli.Command{ Name: "token", diff --git a/cmd/user.go b/cmd/user.go index 7e9ed50a..9faa4be8 100644 --- a/cmd/user.go +++ b/cmd/user.go @@ -24,7 +24,7 @@ func init() { } var flagsUser = append( - flagsDefault, + append([]cli.Flag{}, flagsDefault...), &cli.StringFlag{Name: "config", Aliases: []string{"c"}, EnvVars: []string{"NTFY_CONFIG_FILE"}, Value: defaultServerConfigFile, DefaultText: defaultServerConfigFile, Usage: "config file"}, altsrc.NewStringFlag(&cli.StringFlag{Name: "auth-file", Aliases: []string{"auth_file", "H"}, EnvVars: []string{"NTFY_AUTH_FILE"}, Usage: "auth database file used for access control"}), altsrc.NewStringFlag(&cli.StringFlag{Name: "auth-default-access", Aliases: []string{"auth_default_access", "p"}, EnvVars: []string{"NTFY_AUTH_DEFAULT_ACCESS"}, Value: "read-write", Usage: "default permissions if no matching entries in the auth database are found"}), diff --git a/log/event.go b/log/event.go index f55d3099..ceebe466 100644 --- a/log/event.go +++ b/log/event.go @@ -15,74 +15,95 @@ const ( errorField = "error" ) +// Event represents a single log event type Event struct { - Time int64 `json:"time"` - Level Level `json:"level"` - Message string `json:"message"` - fields map[string]any + Timestamp int64 `json:"time"` + Level Level `json:"level"` + Message string `json:"message"` + fields Context } +// newEvent creates a new log event func newEvent() *Event { return &Event{ - Time: time.Now().UnixMilli(), - fields: make(map[string]any), + Timestamp: time.Now().UnixMilli(), + fields: make(Context), } } +// Fatal logs the event as FATAL, and exits the program with exit code 1 func (e *Event) Fatal(message string, v ...any) { - e.Log(FatalLevel, message, v...) + e.Field("exit_code", 1).Log(FatalLevel, message, v...) + fmt.Fprintf(os.Stderr, fmt.Sprintf(message+"\n", v...)) // Always output error to stderr os.Exit(1) } +// Error logs the event with log level error func (e *Event) Error(message string, v ...any) { e.Log(ErrorLevel, message, v...) } +// Warn logs the event with log level warn func (e *Event) Warn(message string, v ...any) { e.Log(WarnLevel, message, v...) } +// Info logs the event with log level info func (e *Event) Info(message string, v ...any) { e.Log(InfoLevel, message, v...) } +// Debug logs the event with log level debug func (e *Event) Debug(message string, v ...any) { e.Log(DebugLevel, message, v...) } +// Trace logs the event with log level trace func (e *Event) Trace(message string, v ...any) { e.Log(TraceLevel, message, v...) } +// Tag adds a "tag" field to the log event func (e *Event) Tag(tag string) *Event { e.fields[tagField] = tag return e } -func (e *Event) Err(err error) *Event { - e.fields[errorField] = err +// Time sets the time field +func (e *Event) Time(time time.Time) *Event { + e.Timestamp = time.UnixMilli() return e } +// Err adds an "error" field to the log event +func (e *Event) Err(err error) *Event { + e.fields[errorField] = err.Error() + return e +} + +// Field adds a custom field and value to the log event func (e *Event) Field(key string, value any) *Event { e.fields[key] = value return e } -func (e *Event) Fields(fields map[string]any) *Event { +// Fields adds a map of fields to the log event +func (e *Event) Fields(fields Context) *Event { for k, v := range fields { e.fields[k] = v } return e } -func (e *Event) Context(contexts ...Contexter) *Event { +// With adds the fields of the given Contexter structs to the log event by calling their With method +func (e *Event) With(contexts ...Contexter) *Event { for _, c := range contexts { e.Fields(c.Context()) } return e } +// Log logs a message with the given log level func (e *Event) Log(l Level, message string, v ...any) { e.Message = fmt.Sprintf(message, v...) e.Level = l @@ -110,6 +131,7 @@ func (e *Event) IsDebug() bool { return e.Loggable(DebugLevel) } +// JSON returns the event as a JSON representation func (e *Event) JSON() string { b, _ := json.Marshal(e) s := string(b) @@ -120,6 +142,7 @@ func (e *Event) JSON() string { return s } +// String returns the event as a string func (e *Event) String() string { if len(e.fields) == 0 { return fmt.Sprintf("%s %s", e.Level.String(), e.Message) diff --git a/log/log.go b/log/log.go index 1a0b90e8..67191c5e 100644 --- a/log/log.go +++ b/log/log.go @@ -1,17 +1,30 @@ package log import ( + "io" "log" + "os" "sync" + "time" +) + +const ( + DefaultLevel = InfoLevel + DefaultFormat = TextFormat ) var ( - level = InfoLevel - format = TextFormat + level = DefaultLevel + format = DefaultFormat overrides = make(map[string]*levelOverride) mu = &sync.Mutex{} ) +var ( + DefaultOutput = os.Stderr + output io.Writer = DefaultOutput +) + // Fatal prints the given message, and exits the program func Fatal(message string, v ...any) { newEvent().Fatal(message, v...) @@ -42,22 +55,31 @@ func Trace(message string, v ...any) { newEvent().Trace(message, v...) } -func Context(contexts ...Contexter) *Event { - return newEvent().Context(contexts...) +// With creates a new log event and adds the fields of the given Contexter structs +func With(contexts ...Contexter) *Event { + return newEvent().With(contexts...) } +// Field creates a new log event and adds a custom field and value to it func Field(key string, value any) *Event { return newEvent().Field(key, value) } -func Fields(fields map[string]any) *Event { +// Fields creates a new log event and adds a map of fields to it +func Fields(fields Context) *Event { return newEvent().Fields(fields) } +// Tag creates a new log event and adds a "tag" field to it func Tag(tag string) *Event { return newEvent().Tag(tag) } +// Time creates a new log event and sets the time field +func Time(time time.Time) *Event { + return newEvent().Time(time) +} + // CurrentLevel returns the current log level func CurrentLevel() Level { mu.Lock() @@ -79,8 +101,8 @@ func SetLevelOverride(field string, value any, level Level) { overrides[field] = &levelOverride{value: value, level: level} } -// ResetLevelOverride removes all log level overrides -func ResetLevelOverride() { +// ResetLevelOverrides removes all log level overrides +func ResetLevelOverrides() { mu.Lock() defer mu.Unlock() overrides = make(map[string]*levelOverride) @@ -103,6 +125,34 @@ func SetFormat(newFormat Format) { } } +// SetOutput sets the log output writer +func SetOutput(w io.Writer) { + mu.Lock() + defer mu.Unlock() + log.SetOutput(w) + output = w +} + +// File returns the log file, if any, or an empty string otherwise +func File() string { + mu.Lock() + defer mu.Unlock() + if f, ok := output.(*os.File); ok { + return f.Name() + } + return "" +} + +// IsFile returns true if the output is a non-default file +func IsFile() bool { + mu.Lock() + defer mu.Unlock() + if _, ok := output.(*os.File); ok && output != DefaultOutput { + return true + } + return false +} + // DisableDates disables the date/time prefix func DisableDates() { log.SetFlags(0) diff --git a/log/log_test.go b/log/log_test.go index bae46c7f..4d912f09 100644 --- a/log/log_test.go +++ b/log/log_test.go @@ -1,57 +1,74 @@ package log_test import ( + "bytes" + "github.com/stretchr/testify/require" "heckel.io/ntfy/log" "net/http" + "os" "testing" + "time" ) -const tagPay = "PAY" +func TestMain(m *testing.M) { + exitCode := m.Run() + resetState() + log.SetLevel(log.ErrorLevel) // For other modules! + os.Exit(exitCode) +} -type visitor struct { +func TestLog_TagContextFieldFields(t *testing.T) { + t.Cleanup(resetState) + v := &fakeVisitor{ + UserID: "u_abc", + IP: "1.2.3.4", + } + var out bytes.Buffer + log.SetOutput(&out) + log.SetFormat(log.JSONFormat) + log.SetLevelOverride("tag", "stripe", log.DebugLevel) + + log. + Tag("mytag"). + Field("field2", 123). + Field("field1", "value1"). + Time(time.Unix(123, 0)). + Info("hi there %s", "phil") + log. + Tag("not-stripe"). + Debug("this message will not appear") + log. + With(v). + Fields(log.Context{ + "stripe_customer_id": "acct_123", + "stripe_subscription_id": "sub_123", + }). + Tag("stripe"). + Err(http.ErrHandlerTimeout). + Time(time.Unix(456, 0)). + Debug("Subscription status %s", "active") + + expected := `{"time":123000,"level":"INFO","message":"hi there phil","field1":"value1","field2":123,"tag":"mytag"} +{"time":456000,"level":"DEBUG","message":"Subscription status active","error":"http: Handler timeout","stripe_customer_id":"acct_123","stripe_subscription_id":"sub_123","tag":"stripe","user_id":"u_abc","visitor_ip":"1.2.3.4"} +` + require.Equal(t, expected, out.String()) +} + +type fakeVisitor struct { UserID string IP string } -func (v *visitor) Context() map[string]any { +func (v *fakeVisitor) Context() log.Context { return map[string]any{ - "user_id": v.UserID, - "ip": v.IP, + "user_id": v.UserID, + "visitor_ip": v.IP, } } -func TestEvent_Info(t *testing.T) { - /* - log-level: INFO, user_id:u_abc=DEBUG - log-level-overrides: - - user_id=u_abc: DEBUG - log-filter = - - */ - v := &visitor{ - UserID: "u_abc", - IP: "1.2.3.4", - } - stripeCtx := log.NewCtx(map[string]any{ - "tag": "pay", - }) - log.SetLevel(log.InfoLevel) - //log.SetFormat(log.JSONFormat) - //log.SetLevelOverride("user_id", "u_abc", log.DebugLevel) - log.SetLevelOverride("tag", "pay", log.DebugLevel) - mlog := log.Field("tag", "manager") - mlog.Field("one", 1).Info("this is one") - mlog.Err(http.ErrHandlerTimeout).Field("two", 2).Info("this is two") - log.Info("somebody did something") - log. - Context(stripeCtx, v). - Fields(map[string]any{ - "tier": "ti_abc", - "user_id": "u_abc", - }). - Debug("Somebody paid something for $%d", 10) - log. - Field("tag", "account"). - Field("user_id", "u_abc"). - Debug("User logged in") +func resetState() { + log.SetLevel(log.DefaultLevel) + log.SetFormat(log.DefaultFormat) + log.SetOutput(log.DefaultOutput) + log.ResetLevelOverrides() } diff --git a/log/types.go b/log/types.go index c5581650..dd119e4e 100644 --- a/log/types.go +++ b/log/types.go @@ -91,19 +91,14 @@ func ToFormat(s string) Format { } } +// Contexter allows structs to export a key-value pairs in the form of a Context type Contexter interface { - Context() map[string]any + // Context returns the object context as key-value pairs + Context() Context } -type fieldsCtx map[string]any - -func (f fieldsCtx) Context() map[string]any { - return f -} - -func NewCtx(fields map[string]any) Contexter { - return fieldsCtx(fields) -} +// Context represents an object's state in the form of key-value pairs +type Context map[string]any type levelOverride struct { value any diff --git a/server/file_cache.go b/server/file_cache.go index 2659bea9..35cb0f4b 100644 --- a/server/file_cache.go +++ b/server/file_cache.go @@ -44,6 +44,7 @@ func (c *fileCache) Write(id string, in io.Reader, limiters ...util.Limiter) (in if !fileIDRegex.MatchString(id) { return 0, errInvalidFileID } + log.Tag(tagFileCache).Field("message_id", id).Debug("Writing attachment") file := filepath.Join(c.dir, id) if _, err := os.Stat(file); err == nil { return 0, errFileExists @@ -75,10 +76,10 @@ func (c *fileCache) Remove(ids ...string) error { if !fileIDRegex.MatchString(id) { return errInvalidFileID } - log.Debug("File Cache: Deleting attachment %s", id) + log.Tag(tagFileCache).Field("message_id", id).Debug("Deleting attachment") file := filepath.Join(c.dir, id) if err := os.Remove(file); err != nil { - log.Debug("File Cache: Error deleting attachment %s: %s", id, err.Error()) + log.Tag(tagFileCache).Field("message_id", id).Err(err).Debug("Error deleting attachment") } } size, err := dirSize(c.dir) diff --git a/server/log.go b/server/log.go index 510c68e0..85ab8b51 100644 --- a/server/log.go +++ b/server/log.go @@ -11,30 +11,36 @@ import ( "unicode/utf8" ) +// logr creates a new log event with HTTP request fields func logr(r *http.Request) *log.Event { return log.Fields(httpFields(r)) } +// logr creates a new log event with visitor fields func logv(v *visitor) *log.Event { - return log.Context(v) + return log.With(v) } +// logr creates a new log event with HTTP request and visitor fields func logvr(v *visitor, r *http.Request) *log.Event { return logv(v).Fields(httpFields(r)) } +// logvrm creates a new log event with HTTP request, visitor fields and message fields func logvrm(v *visitor, r *http.Request, m *message) *log.Event { - return logvr(v, r).Context(m) + return logvr(v, r).With(m) } +// logvrm creates a new log event with visitor fields and message fields func logvm(v *visitor, m *message) *log.Event { - return logv(v).Context(m) + return logv(v).With(m) } +// logem creates a new log event with email fields func logem(state *smtp.ConnectionState) *log.Event { return log. Tag(tagSMTP). - Fields(map[string]any{ + Fields(log.Context{ "smtp_hostname": state.Hostname, "smtp_remote_addr": state.RemoteAddr.String(), }) diff --git a/server/message_cache.go b/server/message_cache.go index eda0c808..21cf96c3 100644 --- a/server/message_cache.go +++ b/server/message_cache.go @@ -369,10 +369,10 @@ func (c *messageCache) addMessages(ms []*message) error { } } if err := tx.Commit(); err != nil { - log.Error("Message Cache: Writing %d message(s) failed (took %v)", len(ms), time.Since(start)) + log.Tag(tagMessageCache).Err(err).Error("Writing %d message(s) failed (took %v)", len(ms), time.Since(start)) return err } - log.Debug("Message Cache: Wrote %d message(s) in %v", len(ms), time.Since(start)) + log.Tag(tagMessageCache).Debug("Wrote %d message(s) in %v", len(ms), time.Since(start)) return nil } @@ -609,7 +609,7 @@ func (c *messageCache) processMessageBatches() { } for messages := range c.queue.Dequeue() { if err := c.addMessages(messages); err != nil { - log.Error("Message Cache: %s", err.Error()) + log.Tag(tagMessageCache).Err(err).Error("Cannot write message batch") } } } @@ -766,7 +766,7 @@ func setupNewCacheDB(db *sql.DB) error { } func migrateFrom0(db *sql.DB, _ time.Duration) error { - log.Info("Migrating cache database schema: from 0 to 1") + log.Tag(tagMessageCache).Info("Migrating cache database schema: from 0 to 1") if _, err := db.Exec(migrate0To1AlterMessagesTableQuery); err != nil { return err } @@ -780,7 +780,7 @@ func migrateFrom0(db *sql.DB, _ time.Duration) error { } func migrateFrom1(db *sql.DB, _ time.Duration) error { - log.Info("Migrating cache database schema: from 1 to 2") + log.Tag(tagMessageCache).Info("Migrating cache database schema: from 1 to 2") if _, err := db.Exec(migrate1To2AlterMessagesTableQuery); err != nil { return err } @@ -791,7 +791,7 @@ func migrateFrom1(db *sql.DB, _ time.Duration) error { } func migrateFrom2(db *sql.DB, _ time.Duration) error { - log.Info("Migrating cache database schema: from 2 to 3") + log.Tag(tagMessageCache).Info("Migrating cache database schema: from 2 to 3") if _, err := db.Exec(migrate2To3AlterMessagesTableQuery); err != nil { return err } @@ -802,7 +802,7 @@ func migrateFrom2(db *sql.DB, _ time.Duration) error { } func migrateFrom3(db *sql.DB, _ time.Duration) error { - log.Info("Migrating cache database schema: from 3 to 4") + log.Tag(tagMessageCache).Info("Migrating cache database schema: from 3 to 4") if _, err := db.Exec(migrate3To4AlterMessagesTableQuery); err != nil { return err } @@ -813,7 +813,7 @@ func migrateFrom3(db *sql.DB, _ time.Duration) error { } func migrateFrom4(db *sql.DB, _ time.Duration) error { - log.Info("Migrating cache database schema: from 4 to 5") + log.Tag(tagMessageCache).Info("Migrating cache database schema: from 4 to 5") if _, err := db.Exec(migrate4To5AlterMessagesTableQuery); err != nil { return err } @@ -824,7 +824,7 @@ func migrateFrom4(db *sql.DB, _ time.Duration) error { } func migrateFrom5(db *sql.DB, _ time.Duration) error { - log.Info("Migrating cache database schema: from 5 to 6") + log.Tag(tagMessageCache).Info("Migrating cache database schema: from 5 to 6") if _, err := db.Exec(migrate5To6AlterMessagesTableQuery); err != nil { return err } @@ -835,7 +835,7 @@ func migrateFrom5(db *sql.DB, _ time.Duration) error { } func migrateFrom6(db *sql.DB, _ time.Duration) error { - log.Info("Migrating cache database schema: from 6 to 7") + log.Tag(tagMessageCache).Info("Migrating cache database schema: from 6 to 7") if _, err := db.Exec(migrate6To7AlterMessagesTableQuery); err != nil { return err } @@ -846,7 +846,7 @@ func migrateFrom6(db *sql.DB, _ time.Duration) error { } func migrateFrom7(db *sql.DB, _ time.Duration) error { - log.Info("Migrating cache database schema: from 7 to 8") + log.Tag(tagMessageCache).Info("Migrating cache database schema: from 7 to 8") if _, err := db.Exec(migrate7To8AlterMessagesTableQuery); err != nil { return err } @@ -857,7 +857,7 @@ func migrateFrom7(db *sql.DB, _ time.Duration) error { } func migrateFrom8(db *sql.DB, _ time.Duration) error { - log.Info("Migrating cache database schema: from 8 to 9") + log.Tag(tagMessageCache).Info("Migrating cache database schema: from 8 to 9") if _, err := db.Exec(migrate8To9AlterMessagesTableQuery); err != nil { return err } @@ -868,7 +868,7 @@ func migrateFrom8(db *sql.DB, _ time.Duration) error { } func migrateFrom9(db *sql.DB, cacheDuration time.Duration) error { - log.Info("Migrating cache database schema: from 9 to 10") + log.Tag(tagMessageCache).Info("Migrating cache database schema: from 9 to 10") tx, err := db.Begin() if err != nil { return err diff --git a/server/server.go b/server/server.go index 27fe11eb..33aab7fc 100644 --- a/server/server.go +++ b/server/server.go @@ -40,7 +40,6 @@ import ( - HIGH CLI "ntfy tier [add|list|delete]" - HIGH CLI "ntfy user" should show tier - HIGH Self-review -- HIGH Stripe webhook failures cannot be diagnosed because of missing logs - MEDIUM: Test for expiring messages after reservation removal - MEDIUM: Test new token endpoints & never-expiring token - LOW: UI: Flickering upgrade banner when logging in @@ -140,16 +139,18 @@ const ( // Log tags const ( - tagPublish = "publish" - tagFirebase = "firebase" - tagEmail = "email" // Send email - tagSMTP = "smtp" // Receive email - tagPay = "pay" - tagAccount = "account" - tagManager = "manager" - tagResetter = "resetter" - tagWebsocket = "websocket" - tagMatrix = "matrix" + tagPublish = "publish" + tagFirebase = "firebase" + tagEmail = "email" // Send email + tagSMTP = "smtp" // Receive email + tagFileCache = "file_cache" + tagMessageCache = "message_cache" + tagStripe = "stripe" + tagAccount = "account" + tagManager = "manager" + tagResetter = "resetter" + tagWebsocket = "websocket" + tagMatrix = "matrix" ) // New instantiates a new Server. It creates the cache and adds a Firebase @@ -234,6 +235,10 @@ func (s *Server) Run() error { listenStr += fmt.Sprintf(" %s[smtp]", s.config.SMTPServerListen) } log.Info("Listening on%s, ntfy %s, log level is %s", listenStr, s.config.Version, log.CurrentLevel().String()) + if log.IsFile() { + fmt.Fprintf(os.Stderr, "Listening on%s, ntfy %s, log file is %s\n", listenStr, s.config.Version, log.File()) + fmt.Fprintln(os.Stderr, "No more output is expected.") + } mux := http.NewServeMux() mux.HandleFunc("/", s.handle) errChan := make(chan error) @@ -346,19 +351,19 @@ func (s *Server) handle(w http.ResponseWriter, r *http.Request) { isNormalError := httpErr.HTTPCode == http.StatusNotFound || httpErr.HTTPCode == http.StatusBadRequest if isNormalError { logvr(v, r). - Fields(map[string]any{ - "error": err, + Fields(log.Context{ "error_code": httpErr.Code, "http_status": httpErr.HTTPCode, }). + Err(err). Debug("Connection closed with HTTP %d (ntfy error %d): %s", httpErr.HTTPCode, httpErr.Code, err.Error()) } else { logvr(v, r). - Fields(map[string]any{ - "error": err, + Fields(log.Context{ "error_code": httpErr.Code, "http_status": httpErr.HTTPCode, }). + Err(err). Info("Connection closed with HTTP %d (ntfy error %d): %s", httpErr.HTTPCode, httpErr.Code, err.Error()) } w.Header().Set("Content-Type", "application/json") @@ -614,7 +619,7 @@ func (s *Server) handlePublishWithoutResponse(r *http.Request, v *visitor) (*mes delayed := m.Time > time.Now().Unix() logvrm(v, r, m). Tag(tagPublish). - Fields(map[string]any{ + Fields(log.Context{ "message_delayed": delayed, "message_firebase": firebase, "message_unifiedpush": unifiedpush, @@ -1496,7 +1501,7 @@ func (s *Server) sendDelayedMessages() error { if s.userManager != nil && m.User != "" { u, err = s.userManager.User(m.User) if err != nil { - log.Context(m).Err(err).Warn("Error sending delayed message") + log.With(m).Err(err).Warn("Error sending delayed message") continue } } diff --git a/server/server.yml b/server/server.yml index b6071107..e783cee2 100644 --- a/server/server.yml +++ b/server/server.yml @@ -233,10 +233,15 @@ # stripe-secret-key: # stripe-webhook-key: -# Log level, can be TRACE, DEBUG, INFO, WARN or ERROR +# Log level, can be "trace", "debug", "info", "warn" or "error" # This option can be hot-reloaded by calling "kill -HUP $pid" or "systemctl reload ntfy". # -# Be aware that DEBUG (and particularly TRACE) can be VERY CHATTY. Only turn them on for +# FIXME +# +# Be aware that "debug" (and particularly "trace"") can be VERY CHATTY. Only turn them on for # debugging purposes, or your disk will fill up quickly. # -# log-level: INFO +# log-level: info +# log-level-overrides: +# log-format: text +# log-file: diff --git a/server/server_account.go b/server/server_account.go index 8e6d2b2a..924a6096 100644 --- a/server/server_account.go +++ b/server/server_account.go @@ -2,7 +2,6 @@ package server import ( "encoding/json" - "heckel.io/ntfy/log" "heckel.io/ntfy/user" "heckel.io/ntfy/util" "net/http" @@ -155,7 +154,7 @@ func (s *Server) handleAccountDelete(w http.ResponseWriter, r *http.Request, v * return errHTTPBadRequestIncorrectPasswordConfirmation } if u.Billing.StripeSubscriptionID != "" { - logvr(v, r).Tag(tagPay).Info("Canceling billing subscription for user %s", u.Name) + logvr(v, r).Tag(tagStripe).Info("Canceling billing subscription for user %s", u.Name) if _, err := s.stripe.CancelSubscription(u.Billing.StripeSubscriptionID); err != nil { return err } @@ -488,7 +487,7 @@ func (s *Server) maybeRemoveMessagesAndExcessReservations(r *http.Request, v *vi func (s *Server) publishSyncEventAsync(v *visitor) { go func() { if err := s.publishSyncEvent(v); err != nil { - log.Trace("%s Error publishing to user's sync topic: %s", v.String(), err.Error()) + logv(v).Err(err).Trace("Error publishing to user's sync topic") } }() } @@ -499,7 +498,7 @@ func (s *Server) publishSyncEvent(v *visitor) error { if u == nil || u.SyncTopic == "" { return nil } - log.Trace("Publishing sync event to user %s's sync topic %s", u.Name, u.SyncTopic) + logv(v).Field("sync_topic", u.SyncTopic).Trace("Publishing sync event to user's sync topic") syncTopic, err := s.topicFromID(u.SyncTopic) if err != nil { return err diff --git a/server/server_account_test.go b/server/server_account_test.go index fb1cb406..2febd64e 100644 --- a/server/server_account_test.go +++ b/server/server_account_test.go @@ -683,7 +683,7 @@ func TestAccount_Reservation_Add_Kills_Other_Subscribers(t *testing.T) { func TestAccount_Persist_UserStats_After_Tier_Change(t *testing.T) { conf := newTestConfigWithAuthFile(t) conf.AuthDefault = user.PermissionReadWrite - conf.AuthStatsQueueWriterInterval = 100 * time.Millisecond + conf.AuthStatsQueueWriterInterval = 200 * time.Millisecond s := newTestServer(t, conf) defer s.closeDatabases() @@ -706,7 +706,7 @@ func TestAccount_Persist_UserStats_After_Tier_Change(t *testing.T) { require.Equal(t, 200, rr.Code) // Wait for stats queue writer - time.Sleep(200 * time.Millisecond) + time.Sleep(300 * time.Millisecond) // Verify that message stats were persisted u, err := s.userManager.User("phil") diff --git a/server/server_firebase.go b/server/server_firebase.go index a315bed2..0aa61283 100644 --- a/server/server_firebase.go +++ b/server/server_firebase.go @@ -46,7 +46,7 @@ func (c *firebaseClient) Send(v *visitor, m *message) error { if err != nil { return err } - if log.IsTrace() { + if log.Tag(tagFirebase).IsTrace() { logvm(v, m). Tag(tagFirebase). Field("firebase_message", util.MaybeMarshalJSON(fbm)). diff --git a/server/server_payments.go b/server/server_payments.go index 647da8cc..dcb370f5 100644 --- a/server/server_payments.go +++ b/server/server_payments.go @@ -121,8 +121,8 @@ func (s *Server) handleAccountBillingSubscriptionCreate(w http.ResponseWriter, r return errNotAPaidTier } logvr(v, r). - Tag(tagPay). - Fields(map[string]any{ + Tag(tagStripe). + Fields(log.Context{ "tier": tier, "stripe_price_id": tier.StripePriceID, }). @@ -196,8 +196,8 @@ func (s *Server) handleAccountBillingSubscriptionCreateSuccess(w http.ResponseWr } v.SetUser(u) logvr(v, r). - Tag(tagPay). - Fields(map[string]any{ + Tag(tagStripe). + Fields(log.Context{ "tier_id": tier.ID, "tier_name": tier.Name, "stripe_price_id": tier.StripePriceID, @@ -241,8 +241,8 @@ func (s *Server) handleAccountBillingSubscriptionUpdate(w http.ResponseWriter, r return err } logvr(v, r). - Tag(tagPay). - Fields(map[string]any{ + Tag(tagStripe). + Fields(log.Context{ "new_tier_id": tier.ID, "new_tier_name": tier.Name, "new_tier_stripe_price_id": tier.StripePriceID, @@ -275,7 +275,7 @@ func (s *Server) handleAccountBillingSubscriptionUpdate(w http.ResponseWriter, r // handleAccountBillingSubscriptionDelete facilitates downgrading a paid user to a tier-less user, // and cancelling the Stripe subscription entirely func (s *Server) handleAccountBillingSubscriptionDelete(w http.ResponseWriter, r *http.Request, v *visitor) error { - logvr(v, r).Tag(tagPay).Info("Deleting Stripe subscription") + logvr(v, r).Tag(tagStripe).Info("Deleting Stripe subscription") u := v.User() if u.Billing.StripeSubscriptionID != "" { params := &stripe.SubscriptionParams{ @@ -292,7 +292,7 @@ func (s *Server) handleAccountBillingSubscriptionDelete(w http.ResponseWriter, r // handleAccountBillingPortalSessionCreate creates a session to the customer billing portal, and returns the // redirect URL. The billing portal allows customers to change their payment methods, and cancel the subscription. func (s *Server) handleAccountBillingPortalSessionCreate(w http.ResponseWriter, r *http.Request, v *visitor) error { - logvr(v, r).Tag(tagPay).Info("Creating Stripe billing portal session") + logvr(v, r).Tag(tagStripe).Info("Creating Stripe billing portal session") u := v.User() if u.Billing.StripeCustomerID == "" { return errHTTPBadRequestNotAPaidUser @@ -338,7 +338,7 @@ func (s *Server) handleAccountBillingWebhook(_ http.ResponseWriter, r *http.Requ return s.handleAccountBillingWebhookSubscriptionDeleted(r, v, event) default: logvr(v, r). - Tag(tagPay). + Tag(tagStripe). Field("stripe_webhook_type", event.Type). Warn("Unhandled Stripe webhook event %s received", event.Type) return nil @@ -354,8 +354,8 @@ func (s *Server) handleAccountBillingWebhookSubscriptionUpdated(r *http.Request, } subscriptionID, priceID := ev.ID, ev.Items.Data[0].Price.ID logvr(v, r). - Tag(tagPay). - Fields(map[string]any{ + Tag(tagStripe). + Fields(log.Context{ "stripe_webhook_type": event.Type, "stripe_customer_id": ev.Customer, "stripe_subscription_id": ev.ID, @@ -400,7 +400,7 @@ func (s *Server) handleAccountBillingWebhookSubscriptionDeleted(r *http.Request, } v.SetUser(u) logvr(v, r). - Tag(tagPay). + Tag(tagStripe). Field("stripe_webhook_type", event.Type). Info("Subscription deleted, downgrading to unpaid tier") if err := s.updateSubscriptionAndTier(r, v, u, nil, ev.Customer, "", "", 0, 0); err != nil { @@ -419,14 +419,14 @@ func (s *Server) updateSubscriptionAndTier(r *http.Request, v *visitor, u *user. return err } if tier == nil && u.Tier != nil { - logvr(v, r).Tag(tagPay).Info("Resetting tier for user %s", u.Name) + logvr(v, r).Tag(tagStripe).Info("Resetting tier for user %s", u.Name) if err := s.userManager.ResetTier(u.Name); err != nil { return err } } else if tier != nil && u.TierID() != tier.ID { logvr(v, r). - Tag(tagPay). - Fields(map[string]any{ + Tag(tagStripe). + Fields(log.Context{ "new_tier_id": tier.ID, "new_tier_name": tier.Name, "new_tier_stripe_price_id": tier.StripePriceID, diff --git a/server/server_test.go b/server/server_test.go index 915b00f3..4119e483 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -13,6 +13,7 @@ import ( "net/http" "net/http/httptest" "net/netip" + "os" "path/filepath" "strings" "sync" @@ -26,8 +27,9 @@ import ( "heckel.io/ntfy/util" ) -func init() { - // log.SetLevel(log.DebugLevel) +func TestMain(m *testing.M) { + log.SetLevel(log.ErrorLevel) + os.Exit(m.Run()) } func TestServer_PublishAndPoll(t *testing.T) { diff --git a/server/smtp_sender.go b/server/smtp_sender.go index 8971f12d..d8114ad0 100644 --- a/server/smtp_sender.go +++ b/server/smtp_sender.go @@ -4,6 +4,7 @@ import ( _ "embed" // required by go:embed "encoding/json" "fmt" + "heckel.io/ntfy/log" "heckel.io/ntfy/util" "mime" "net" @@ -38,7 +39,7 @@ func (s *smtpSender) Send(v *visitor, m *message, to string) error { auth := smtp.PlainAuth("", s.config.SMTPSenderUser, s.config.SMTPSenderPass, host) logvm(v, m). Tag(tagEmail). - Fields(map[string]any{ + Fields(log.Context{ "email_via": s.config.SMTPSenderAddr, "email_user": s.config.SMTPSenderUser, "email_to": to, diff --git a/server/types.go b/server/types.go index 2c818ca6..0b034206 100644 --- a/server/types.go +++ b/server/types.go @@ -1,6 +1,7 @@ package server import ( + "heckel.io/ntfy/log" "heckel.io/ntfy/user" "net/http" "net/netip" @@ -42,7 +43,7 @@ type message struct { User string `json:"-"` // Username of the uploader, used to associated attachments } -func (m *message) Context() map[string]any { +func (m *message) Context() log.Context { fields := map[string]any{ "message_id": m.ID, "message_time": m.Time, diff --git a/server/visitor.go b/server/visitor.go index 6b96a785..1a945c4a 100644 --- a/server/visitor.go +++ b/server/visitor.go @@ -135,25 +135,14 @@ func newVisitor(conf *Config, messageCache *messageCache, userManager *user.Mana return v } -func (v *visitor) String() string { +func (v *visitor) Context() log.Context { v.mu.Lock() defer v.mu.Unlock() - return v.stringNoLock() + return v.contextNoLock() } -func (v *visitor) stringNoLock() string { - if v.user != nil && v.user.Billing.StripeCustomerID != "" { - return fmt.Sprintf("%s/%s/%s", v.ip.String(), v.user.ID, v.user.Billing.StripeCustomerID) - } else if v.user != nil { - return fmt.Sprintf("%s/%s", v.ip.String(), v.user.ID) - } - return v.ip.String() -} - -func (v *visitor) Context() map[string]any { - v.mu.Lock() - defer v.mu.Unlock() - fields := map[string]any{ +func (v *visitor) contextNoLock() log.Context { + fields := log.Context{ "visitor_ip": v.ip.String(), } if v.user != nil { @@ -320,7 +309,7 @@ func (v *visitor) MaybeUserID() string { } func (v *visitor) resetLimitersNoLock(messages, emails int64, enqueueUpdate bool) { - log.Context(v).Debug("%s Resetting limiters for visitor", v.stringNoLock()) + log.Fields(v.contextNoLock()).Debug("Resetting limiters for visitor") limits := v.limitsNoLock() v.requestLimiter = rate.NewLimiter(limits.RequestLimitReplenish, limits.RequestLimitBurst) v.messagesLimiter = util.NewFixedLimiterWithValue(limits.MessageLimit, messages) diff --git a/user/manager.go b/user/manager.go index 51108661..a83974e6 100644 --- a/user/manager.go +++ b/user/manager.go @@ -28,6 +28,7 @@ const ( tokenPrefix = "tk_" tokenLength = 32 tokenMaxCount = 20 // Only keep this many tokens in the table per user + tagManager = "user_manager" ) // Default constants that may be overridden by configs @@ -343,15 +344,15 @@ func (a *Manager) Authenticate(username, password string) (*User, error) { } user, err := a.User(username) if err != nil { - log.Trace("authentication of user %s failed (1): %s", username, err.Error()) + log.Tag(tagManager).Field("user_name", username).Err(err).Trace("Authentication of user failed (1)") bcrypt.CompareHashAndPassword([]byte(userAuthIntentionalSlowDownHash), []byte("intentional slow-down to avoid timing attacks")) return nil, ErrUnauthenticated } else if user.Deleted { - log.Trace("authentication of user %s failed (2): user marked deleted", username) + log.Tag(tagManager).Field("user_name", username).Trace("Authentication of user failed (2): user marked deleted") bcrypt.CompareHashAndPassword([]byte(userAuthIntentionalSlowDownHash), []byte("intentional slow-down to avoid timing attacks")) return nil, ErrUnauthenticated } else if err := bcrypt.CompareHashAndPassword([]byte(user.Hash), []byte(password)); err != nil { - log.Trace("authentication of user %s failed (3): %s", username, err.Error()) + log.Tag(tagManager).Field("user_name", username).Err(err).Trace("Authentication of user failed (3)") return nil, ErrUnauthenticated } return user, nil @@ -566,10 +567,10 @@ func (a *Manager) asyncQueueWriter(interval time.Duration) { ticker := time.NewTicker(interval) for range ticker.C { if err := a.writeUserStatsQueue(); err != nil { - log.Warn("User Manager: Writing user stats queue failed: %s", err.Error()) + log.Tag(tagManager).Err(err).Warn("Writing user stats queue failed") } if err := a.writeTokenUpdateQueue(); err != nil { - log.Warn("User Manager: Writing token update queue failed: %s", err.Error()) + log.Tag(tagManager).Err(err).Warn("Writing token update queue failed") } } } @@ -578,7 +579,7 @@ func (a *Manager) writeUserStatsQueue() error { a.mu.Lock() if len(a.statsQueue) == 0 { a.mu.Unlock() - log.Trace("User Manager: No user stats updates to commit") + log.Tag(tagManager).Trace("No user stats updates to commit") return nil } statsQueue := a.statsQueue @@ -589,9 +590,16 @@ func (a *Manager) writeUserStatsQueue() error { return err } defer tx.Rollback() - log.Debug("User Manager: Writing user stats queue for %d user(s)", len(statsQueue)) + log.Tag(tagManager).Debug("Writing user stats queue for %d user(s)", len(statsQueue)) for userID, update := range statsQueue { - log.Trace("User Manager: Updating stats for user %s: messages=%d, emails=%d", userID, update.Messages, update.Emails) + log. + Tag(tagManager). + Fields(log.Context{ + "user_id": userID, + "messages_count": update.Messages, + "emails_count": update.Emails, + }). + Trace("Updating stats for user %s", userID) if _, err := tx.Exec(updateUserStatsQuery, update.Messages, update.Emails, userID); err != nil { return err } @@ -603,7 +611,7 @@ func (a *Manager) writeTokenUpdateQueue() error { a.mu.Lock() if len(a.tokenQueue) == 0 { a.mu.Unlock() - log.Trace("User Manager: No token updates to commit") + log.Tag(tagManager).Trace("No token updates to commit") return nil } tokenQueue := a.tokenQueue @@ -614,9 +622,9 @@ func (a *Manager) writeTokenUpdateQueue() error { return err } defer tx.Rollback() - log.Debug("User Manager: Writing token update queue for %d token(s)", len(tokenQueue)) + log.Tag(tagManager).Debug("Writing token update queue for %d token(s)", len(tokenQueue)) for tokenID, update := range tokenQueue { - log.Trace("User Manager: Updating token %s with last access time %v", tokenID, update.LastAccess.Unix()) + log.Tag(tagManager).Trace("Updating token %s with last access time %v", tokenID, update.LastAccess.Unix()) if _, err := tx.Exec(updateTokenLastAccessQuery, update.LastAccess.Unix(), update.LastOrigin.String(), tokenID); err != nil { return err } @@ -1257,7 +1265,7 @@ func setupNewDB(db *sql.DB) error { } func migrateFrom1(db *sql.DB) error { - log.Info("Migrating user database schema: from 1 to 2") + log.Tag(tagManager).Info("Migrating user database schema: from 1 to 2") tx, err := db.Begin() if err != nil { return err