Logging improvements, etc.
parent
f7f343fe55
commit
8215b66db3
|
@ -86,6 +86,7 @@ func TestCLI_Publish_All_The_Things(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCLI_Publish_Wait_PID_And_Cmd(t *testing.T) {
|
func TestCLI_Publish_Wait_PID_And_Cmd(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
s, port := test.StartServer(t)
|
s, port := test.StartServer(t)
|
||||||
defer test.StopServer(t, s, port)
|
defer test.StopServer(t, s, port)
|
||||||
topic := fmt.Sprintf("http://127.0.0.1:%d/mytopic", port)
|
topic := fmt.Sprintf("http://127.0.0.1:%d/mytopic", port)
|
||||||
|
|
|
@ -22,6 +22,7 @@ func init() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCLI_Serve_Unix_Curl(t *testing.T) {
|
func TestCLI_Serve_Unix_Curl(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
sockFile := filepath.Join(t.TempDir(), "ntfy.sock")
|
sockFile := filepath.Join(t.TempDir(), "ntfy.sock")
|
||||||
configFile := newEmptyFile(t) // Avoid issues with existing server.yml file on system
|
configFile := newEmptyFile(t) // Avoid issues with existing server.yml file on system
|
||||||
go func() {
|
go func() {
|
||||||
|
|
|
@ -102,6 +102,13 @@ type Contexter interface {
|
||||||
// Context represents an object's state in the form of key-value pairs
|
// Context represents an object's state in the form of key-value pairs
|
||||||
type Context map[string]any
|
type Context map[string]any
|
||||||
|
|
||||||
|
// Merge merges other into this context
|
||||||
|
func (c Context) Merge(other Context) {
|
||||||
|
for k, v := range other {
|
||||||
|
c[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type levelOverride struct {
|
type levelOverride struct {
|
||||||
value string
|
value string
|
||||||
level Level
|
level Level
|
||||||
|
|
150
server/errors.go
150
server/errors.go
|
@ -13,6 +13,7 @@ type errHTTP struct {
|
||||||
HTTPCode int `json:"http"`
|
HTTPCode int `json:"http"`
|
||||||
Message string `json:"error"`
|
Message string `json:"error"`
|
||||||
Link string `json:"link,omitempty"`
|
Link string `json:"link,omitempty"`
|
||||||
|
context log.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e errHTTP) Error() string {
|
func (e errHTTP) Error() string {
|
||||||
|
@ -25,71 +26,106 @@ func (e errHTTP) JSON() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e errHTTP) Context() log.Context {
|
func (e errHTTP) Context() log.Context {
|
||||||
return log.Context{
|
context := log.Context{
|
||||||
"error": e.Message,
|
"error": e.Message,
|
||||||
"error_code": e.Code,
|
"error_code": e.Code,
|
||||||
"http_status": e.HTTPCode,
|
"http_status": e.HTTPCode,
|
||||||
}
|
}
|
||||||
|
for k, v := range e.context {
|
||||||
|
context[k] = v
|
||||||
|
}
|
||||||
|
return context
|
||||||
}
|
}
|
||||||
|
|
||||||
func wrapErrHTTP(err *errHTTP, message string, args ...any) *errHTTP {
|
func (e errHTTP) Wrap(message string, args ...any) *errHTTP {
|
||||||
return &errHTTP{
|
clone := e.clone()
|
||||||
Code: err.Code,
|
clone.Message = fmt.Sprintf("%s, %s", clone.Message, fmt.Sprintf(message, args...))
|
||||||
HTTPCode: err.HTTPCode,
|
return &clone
|
||||||
Message: fmt.Sprintf("%s, %s", err.Message, fmt.Sprintf(message, args...)),
|
}
|
||||||
Link: err.Link,
|
|
||||||
|
func (e errHTTP) With(contexters ...log.Contexter) *errHTTP {
|
||||||
|
c := e.clone()
|
||||||
|
if c.context == nil {
|
||||||
|
c.context = make(log.Context)
|
||||||
|
}
|
||||||
|
for _, contexter := range contexters {
|
||||||
|
c.context.Merge(contexter.Context())
|
||||||
|
}
|
||||||
|
return &c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e errHTTP) Fields(context log.Context) *errHTTP {
|
||||||
|
c := e.clone()
|
||||||
|
if c.context == nil {
|
||||||
|
c.context = make(log.Context)
|
||||||
|
}
|
||||||
|
c.context.Merge(context)
|
||||||
|
return &c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e errHTTP) clone() errHTTP {
|
||||||
|
context := make(log.Context)
|
||||||
|
for k, v := range e.context {
|
||||||
|
context[k] = v
|
||||||
|
}
|
||||||
|
return errHTTP{
|
||||||
|
Code: e.Code,
|
||||||
|
HTTPCode: e.HTTPCode,
|
||||||
|
Message: e.Message,
|
||||||
|
Link: e.Link,
|
||||||
|
context: context,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
errHTTPBadRequest = &errHTTP{40000, http.StatusBadRequest, "invalid request", ""}
|
errHTTPBadRequest = &errHTTP{40000, http.StatusBadRequest, "invalid request", "", nil}
|
||||||
errHTTPBadRequestEmailDisabled = &errHTTP{40001, http.StatusBadRequest, "e-mail notifications are not enabled", "https://ntfy.sh/docs/config/#e-mail-notifications"}
|
errHTTPBadRequestEmailDisabled = &errHTTP{40001, http.StatusBadRequest, "e-mail notifications are not enabled", "https://ntfy.sh/docs/config/#e-mail-notifications", nil}
|
||||||
errHTTPBadRequestDelayNoCache = &errHTTP{40002, http.StatusBadRequest, "cannot disable cache for delayed message", ""}
|
errHTTPBadRequestDelayNoCache = &errHTTP{40002, http.StatusBadRequest, "cannot disable cache for delayed message", "", nil}
|
||||||
errHTTPBadRequestDelayNoEmail = &errHTTP{40003, http.StatusBadRequest, "delayed e-mail notifications are not supported", ""}
|
errHTTPBadRequestDelayNoEmail = &errHTTP{40003, http.StatusBadRequest, "delayed e-mail notifications are not supported", "", nil}
|
||||||
errHTTPBadRequestDelayCannotParse = &errHTTP{40004, http.StatusBadRequest, "invalid delay parameter: unable to parse delay", "https://ntfy.sh/docs/publish/#scheduled-delivery"}
|
errHTTPBadRequestDelayCannotParse = &errHTTP{40004, http.StatusBadRequest, "invalid delay parameter: unable to parse delay", "https://ntfy.sh/docs/publish/#scheduled-delivery", nil}
|
||||||
errHTTPBadRequestDelayTooSmall = &errHTTP{40005, http.StatusBadRequest, "invalid delay parameter: too small, please refer to the docs", "https://ntfy.sh/docs/publish/#scheduled-delivery"}
|
errHTTPBadRequestDelayTooSmall = &errHTTP{40005, http.StatusBadRequest, "invalid delay parameter: too small, please refer to the docs", "https://ntfy.sh/docs/publish/#scheduled-delivery", nil}
|
||||||
errHTTPBadRequestDelayTooLarge = &errHTTP{40006, http.StatusBadRequest, "invalid delay parameter: too large, please refer to the docs", "https://ntfy.sh/docs/publish/#scheduled-delivery"}
|
errHTTPBadRequestDelayTooLarge = &errHTTP{40006, http.StatusBadRequest, "invalid delay parameter: too large, please refer to the docs", "https://ntfy.sh/docs/publish/#scheduled-delivery", nil}
|
||||||
errHTTPBadRequestPriorityInvalid = &errHTTP{40007, http.StatusBadRequest, "invalid priority parameter", "https://ntfy.sh/docs/publish/#message-priority"}
|
errHTTPBadRequestPriorityInvalid = &errHTTP{40007, http.StatusBadRequest, "invalid priority parameter", "https://ntfy.sh/docs/publish/#message-priority", nil}
|
||||||
errHTTPBadRequestSinceInvalid = &errHTTP{40008, http.StatusBadRequest, "invalid since parameter", "https://ntfy.sh/docs/subscribe/api/#fetch-cached-messages"}
|
errHTTPBadRequestSinceInvalid = &errHTTP{40008, http.StatusBadRequest, "invalid since parameter", "https://ntfy.sh/docs/subscribe/api/#fetch-cached-messages", nil}
|
||||||
errHTTPBadRequestTopicInvalid = &errHTTP{40009, http.StatusBadRequest, "invalid request: topic invalid", ""}
|
errHTTPBadRequestTopicInvalid = &errHTTP{40009, http.StatusBadRequest, "invalid request: topic invalid", "", nil}
|
||||||
errHTTPBadRequestTopicDisallowed = &errHTTP{40010, http.StatusBadRequest, "invalid request: topic name is not allowed", ""}
|
errHTTPBadRequestTopicDisallowed = &errHTTP{40010, http.StatusBadRequest, "invalid request: topic name is not allowed", "", nil}
|
||||||
errHTTPBadRequestMessageNotUTF8 = &errHTTP{40011, http.StatusBadRequest, "invalid message: message must be UTF-8 encoded", ""}
|
errHTTPBadRequestMessageNotUTF8 = &errHTTP{40011, http.StatusBadRequest, "invalid message: message must be UTF-8 encoded", "", nil}
|
||||||
errHTTPBadRequestAttachmentURLInvalid = &errHTTP{40013, http.StatusBadRequest, "invalid request: attachment URL is invalid", "https://ntfy.sh/docs/publish/#attachments"}
|
errHTTPBadRequestAttachmentURLInvalid = &errHTTP{40013, http.StatusBadRequest, "invalid request: attachment URL is invalid", "https://ntfy.sh/docs/publish/#attachments", nil}
|
||||||
errHTTPBadRequestAttachmentsDisallowed = &errHTTP{40014, http.StatusBadRequest, "invalid request: attachments not allowed", "https://ntfy.sh/docs/config/#attachments"}
|
errHTTPBadRequestAttachmentsDisallowed = &errHTTP{40014, http.StatusBadRequest, "invalid request: attachments not allowed", "https://ntfy.sh/docs/config/#attachments", nil}
|
||||||
errHTTPBadRequestAttachmentsExpiryBeforeDelivery = &errHTTP{40015, http.StatusBadRequest, "invalid request: attachment expiry before delayed delivery date", "https://ntfy.sh/docs/publish/#scheduled-delivery"}
|
errHTTPBadRequestAttachmentsExpiryBeforeDelivery = &errHTTP{40015, http.StatusBadRequest, "invalid request: attachment expiry before delayed delivery date", "https://ntfy.sh/docs/publish/#scheduled-delivery", nil}
|
||||||
errHTTPBadRequestWebSocketsUpgradeHeaderMissing = &errHTTP{40016, http.StatusBadRequest, "invalid request: client not using the websocket protocol", "https://ntfy.sh/docs/subscribe/api/#websockets"}
|
errHTTPBadRequestWebSocketsUpgradeHeaderMissing = &errHTTP{40016, http.StatusBadRequest, "invalid request: client not using the websocket protocol", "https://ntfy.sh/docs/subscribe/api/#websockets", nil}
|
||||||
errHTTPBadRequestMessageJSONInvalid = &errHTTP{40017, http.StatusBadRequest, "invalid request: request body must be message JSON", "https://ntfy.sh/docs/publish/#publish-as-json"}
|
errHTTPBadRequestMessageJSONInvalid = &errHTTP{40017, http.StatusBadRequest, "invalid request: request body must be message JSON", "https://ntfy.sh/docs/publish/#publish-as-json", nil}
|
||||||
errHTTPBadRequestActionsInvalid = &errHTTP{40018, http.StatusBadRequest, "invalid request: actions invalid", "https://ntfy.sh/docs/publish/#action-buttons"}
|
errHTTPBadRequestActionsInvalid = &errHTTP{40018, http.StatusBadRequest, "invalid request: actions invalid", "https://ntfy.sh/docs/publish/#action-buttons", nil}
|
||||||
errHTTPBadRequestMatrixMessageInvalid = &errHTTP{40019, http.StatusBadRequest, "invalid request: Matrix JSON invalid", "https://ntfy.sh/docs/publish/#matrix-gateway"}
|
errHTTPBadRequestMatrixMessageInvalid = &errHTTP{40019, http.StatusBadRequest, "invalid request: Matrix JSON invalid", "https://ntfy.sh/docs/publish/#matrix-gateway", nil}
|
||||||
errHTTPBadRequestIconURLInvalid = &errHTTP{40021, http.StatusBadRequest, "invalid request: icon URL is invalid", "https://ntfy.sh/docs/publish/#icons"}
|
errHTTPBadRequestIconURLInvalid = &errHTTP{40021, http.StatusBadRequest, "invalid request: icon URL is invalid", "https://ntfy.sh/docs/publish/#icons", nil}
|
||||||
errHTTPBadRequestSignupNotEnabled = &errHTTP{40022, http.StatusBadRequest, "invalid request: signup not enabled", "https://ntfy.sh/docs/config"}
|
errHTTPBadRequestSignupNotEnabled = &errHTTP{40022, http.StatusBadRequest, "invalid request: signup not enabled", "https://ntfy.sh/docs/config", nil}
|
||||||
errHTTPBadRequestNoTokenProvided = &errHTTP{40023, http.StatusBadRequest, "invalid request: no token provided", ""}
|
errHTTPBadRequestNoTokenProvided = &errHTTP{40023, http.StatusBadRequest, "invalid request: no token provided", "", nil}
|
||||||
errHTTPBadRequestJSONInvalid = &errHTTP{40024, http.StatusBadRequest, "invalid request: request body must be valid JSON", ""}
|
errHTTPBadRequestJSONInvalid = &errHTTP{40024, http.StatusBadRequest, "invalid request: request body must be valid JSON", "", nil}
|
||||||
errHTTPBadRequestPermissionInvalid = &errHTTP{40025, http.StatusBadRequest, "invalid request: incorrect permission string", ""}
|
errHTTPBadRequestPermissionInvalid = &errHTTP{40025, http.StatusBadRequest, "invalid request: incorrect permission string", "", nil}
|
||||||
errHTTPBadRequestIncorrectPasswordConfirmation = &errHTTP{40026, http.StatusBadRequest, "invalid request: password confirmation is not correct", ""}
|
errHTTPBadRequestIncorrectPasswordConfirmation = &errHTTP{40026, http.StatusBadRequest, "invalid request: password confirmation is not correct", "", nil}
|
||||||
errHTTPBadRequestNotAPaidUser = &errHTTP{40027, http.StatusBadRequest, "invalid request: not a paid user", ""}
|
errHTTPBadRequestNotAPaidUser = &errHTTP{40027, http.StatusBadRequest, "invalid request: not a paid user", "", nil}
|
||||||
errHTTPBadRequestBillingRequestInvalid = &errHTTP{40028, http.StatusBadRequest, "invalid request: not a valid billing request", ""}
|
errHTTPBadRequestBillingRequestInvalid = &errHTTP{40028, http.StatusBadRequest, "invalid request: not a valid billing request", "", nil}
|
||||||
errHTTPBadRequestBillingSubscriptionExists = &errHTTP{40029, http.StatusBadRequest, "invalid request: billing subscription already exists", ""}
|
errHTTPBadRequestBillingSubscriptionExists = &errHTTP{40029, http.StatusBadRequest, "invalid request: billing subscription already exists", "", nil}
|
||||||
errHTTPNotFound = &errHTTP{40401, http.StatusNotFound, "page not found", ""}
|
errHTTPNotFound = &errHTTP{40401, http.StatusNotFound, "page not found", "", nil}
|
||||||
errHTTPUnauthorized = &errHTTP{40101, http.StatusUnauthorized, "unauthorized", "https://ntfy.sh/docs/publish/#authentication"}
|
errHTTPUnauthorized = &errHTTP{40101, http.StatusUnauthorized, "unauthorized", "https://ntfy.sh/docs/publish/#authentication", nil}
|
||||||
errHTTPForbidden = &errHTTP{40301, http.StatusForbidden, "forbidden", "https://ntfy.sh/docs/publish/#authentication"}
|
errHTTPForbidden = &errHTTP{40301, http.StatusForbidden, "forbidden", "https://ntfy.sh/docs/publish/#authentication", nil}
|
||||||
errHTTPConflictUserExists = &errHTTP{40901, http.StatusConflict, "conflict: user already exists", ""}
|
errHTTPConflictUserExists = &errHTTP{40901, http.StatusConflict, "conflict: user already exists", "", nil}
|
||||||
errHTTPConflictTopicReserved = &errHTTP{40902, http.StatusConflict, "conflict: access control entry for topic or topic pattern already exists", ""}
|
errHTTPConflictTopicReserved = &errHTTP{40902, http.StatusConflict, "conflict: access control entry for topic or topic pattern already exists", "", nil}
|
||||||
errHTTPConflictSubscriptionExists = &errHTTP{40903, http.StatusConflict, "conflict: topic subscription already exists", ""}
|
errHTTPConflictSubscriptionExists = &errHTTP{40903, http.StatusConflict, "conflict: topic subscription already exists", "", nil}
|
||||||
errHTTPEntityTooLargeAttachment = &errHTTP{41301, http.StatusRequestEntityTooLarge, "attachment too large, or bandwidth limit reached", "https://ntfy.sh/docs/publish/#limitations"}
|
errHTTPEntityTooLargeAttachment = &errHTTP{41301, http.StatusRequestEntityTooLarge, "attachment too large, or bandwidth limit reached", "https://ntfy.sh/docs/publish/#limitations", nil}
|
||||||
errHTTPEntityTooLargeMatrixRequest = &errHTTP{41302, http.StatusRequestEntityTooLarge, "Matrix request is larger than the max allowed length", ""}
|
errHTTPEntityTooLargeMatrixRequest = &errHTTP{41302, http.StatusRequestEntityTooLarge, "Matrix request is larger than the max allowed length", "", nil}
|
||||||
errHTTPEntityTooLargeJSONBody = &errHTTP{41303, http.StatusRequestEntityTooLarge, "JSON body too large", ""}
|
errHTTPEntityTooLargeJSONBody = &errHTTP{41303, http.StatusRequestEntityTooLarge, "JSON body too large", "", nil}
|
||||||
errHTTPTooManyRequestsLimitRequests = &errHTTP{42901, http.StatusTooManyRequests, "limit reached: too many requests, please be nice", "https://ntfy.sh/docs/publish/#limitations"}
|
errHTTPTooManyRequestsLimitRequests = &errHTTP{42901, http.StatusTooManyRequests, "limit reached: too many requests, please be nice", "https://ntfy.sh/docs/publish/#limitations", nil}
|
||||||
errHTTPTooManyRequestsLimitEmails = &errHTTP{42902, http.StatusTooManyRequests, "limit reached: too many emails, please be nice", "https://ntfy.sh/docs/publish/#limitations"}
|
errHTTPTooManyRequestsLimitEmails = &errHTTP{42902, http.StatusTooManyRequests, "limit reached: too many emails, please be nice", "https://ntfy.sh/docs/publish/#limitations", nil}
|
||||||
errHTTPTooManyRequestsLimitSubscriptions = &errHTTP{42903, http.StatusTooManyRequests, "limit reached: too many active subscriptions, please be nice", "https://ntfy.sh/docs/publish/#limitations"}
|
errHTTPTooManyRequestsLimitSubscriptions = &errHTTP{42903, http.StatusTooManyRequests, "limit reached: too many active subscriptions, please be nice", "https://ntfy.sh/docs/publish/#limitations", nil}
|
||||||
errHTTPTooManyRequestsLimitTotalTopics = &errHTTP{42904, http.StatusTooManyRequests, "limit reached: the total number of topics on the server has been reached, please contact the admin", "https://ntfy.sh/docs/publish/#limitations"}
|
errHTTPTooManyRequestsLimitTotalTopics = &errHTTP{42904, http.StatusTooManyRequests, "limit reached: the total number of topics on the server has been reached, please contact the admin", "https://ntfy.sh/docs/publish/#limitations", nil}
|
||||||
errHTTPTooManyRequestsLimitAttachmentBandwidth = &errHTTP{42905, http.StatusTooManyRequests, "limit reached: daily bandwidth reached", "https://ntfy.sh/docs/publish/#limitations"}
|
errHTTPTooManyRequestsLimitAttachmentBandwidth = &errHTTP{42905, http.StatusTooManyRequests, "limit reached: daily bandwidth reached", "https://ntfy.sh/docs/publish/#limitations", nil}
|
||||||
errHTTPTooManyRequestsLimitAccountCreation = &errHTTP{42906, http.StatusTooManyRequests, "limit reached: too many accounts created", "https://ntfy.sh/docs/publish/#limitations"} // FIXME document limit
|
errHTTPTooManyRequestsLimitAccountCreation = &errHTTP{42906, http.StatusTooManyRequests, "limit reached: too many accounts created", "https://ntfy.sh/docs/publish/#limitations", nil} // FIXME document limit
|
||||||
errHTTPTooManyRequestsLimitReservations = &errHTTP{42907, http.StatusTooManyRequests, "limit reached: too many topic reservations for this user", ""}
|
errHTTPTooManyRequestsLimitReservations = &errHTTP{42907, http.StatusTooManyRequests, "limit reached: too many topic reservations for this user", "", nil}
|
||||||
errHTTPTooManyRequestsLimitMessages = &errHTTP{42908, http.StatusTooManyRequests, "limit reached: daily message quota reached", "https://ntfy.sh/docs/publish/#limitations"}
|
errHTTPTooManyRequestsLimitMessages = &errHTTP{42908, http.StatusTooManyRequests, "limit reached: daily message quota reached", "https://ntfy.sh/docs/publish/#limitations", nil}
|
||||||
errHTTPTooManyRequestsLimitAuthFailure = &errHTTP{42909, http.StatusTooManyRequests, "limit reached: too many auth failures", "https://ntfy.sh/docs/publish/#limitations"} // FIXME document limit
|
errHTTPTooManyRequestsLimitAuthFailure = &errHTTP{42909, http.StatusTooManyRequests, "limit reached: too many auth failures", "https://ntfy.sh/docs/publish/#limitations", nil} // FIXME document limit
|
||||||
errHTTPInternalError = &errHTTP{50001, http.StatusInternalServerError, "internal server error", ""}
|
errHTTPInternalError = &errHTTP{50001, http.StatusInternalServerError, "internal server error", "", nil}
|
||||||
errHTTPInternalErrorInvalidPath = &errHTTP{50002, http.StatusInternalServerError, "internal server error: invalid path", ""}
|
errHTTPInternalErrorInvalidPath = &errHTTP{50002, http.StatusInternalServerError, "internal server error: invalid path", "", nil}
|
||||||
errHTTPInternalErrorMissingBaseURL = &errHTTP{50003, http.StatusInternalServerError, "internal server error: base-url must be be configured for this feature", "https://ntfy.sh/docs/config/"}
|
errHTTPInternalErrorMissingBaseURL = &errHTTP{50003, http.StatusInternalServerError, "internal server error: base-url must be be configured for this feature", "https://ntfy.sh/docs/config/", nil}
|
||||||
errHTTPInsufficientStorage = &errHTTP{50701, http.StatusInsufficientStorage, "internal server error: cannot publish to UnifiedPush topic without previously active subscriber", ""}
|
errHTTPInsufficientStorage = &errHTTP{50701, http.StatusInsufficientStorage, "internal server error: cannot publish to UnifiedPush topic without previously active subscriber", "", nil}
|
||||||
)
|
)
|
||||||
|
|
|
@ -582,11 +582,11 @@ func (s *Server) handlePublishWithoutResponse(r *http.Request, v *visitor) (*mes
|
||||||
// Rate-Topics header). The 5xx response is because some app servers (in particular Mastodon) will remove
|
// Rate-Topics header). The 5xx response is because some app servers (in particular Mastodon) will remove
|
||||||
// the subscription as invalid if any 400-499 code (except 429/408) is returned.
|
// the subscription as invalid if any 400-499 code (except 429/408) is returned.
|
||||||
// See https://github.com/mastodon/mastodon/blob/730bb3e211a84a2f30e3e2bbeae3f77149824a68/app/workers/web/push_notification_worker.rb#L35-L46
|
// See https://github.com/mastodon/mastodon/blob/730bb3e211a84a2f30e3e2bbeae3f77149824a68/app/workers/web/push_notification_worker.rb#L35-L46
|
||||||
return nil, errHTTPInsufficientStorage
|
return nil, errHTTPInsufficientStorage.With(t)
|
||||||
} else if !util.ContainsIP(s.config.VisitorRequestExemptIPAddrs, v.ip) && !vrate.MessageAllowed() {
|
} else if !util.ContainsIP(s.config.VisitorRequestExemptIPAddrs, v.ip) && !vrate.MessageAllowed() {
|
||||||
return nil, errHTTPTooManyRequestsLimitMessages
|
return nil, errHTTPTooManyRequestsLimitMessages.With(t)
|
||||||
} else if email != "" && !vrate.EmailAllowed() {
|
} else if email != "" && !vrate.EmailAllowed() {
|
||||||
return nil, errHTTPTooManyRequestsLimitEmails
|
return nil, errHTTPTooManyRequestsLimitEmails.With(t)
|
||||||
}
|
}
|
||||||
if m.PollID != "" {
|
if m.PollID != "" {
|
||||||
m = newPollRequestMessage(t.ID, m.PollID)
|
m = newPollRequestMessage(t.ID, m.PollID)
|
||||||
|
@ -605,6 +605,7 @@ func (s *Server) handlePublishWithoutResponse(r *http.Request, v *visitor) (*mes
|
||||||
delayed := m.Time > time.Now().Unix()
|
delayed := m.Time > time.Now().Unix()
|
||||||
ev := logvrm(v, r, m).
|
ev := logvrm(v, r, m).
|
||||||
Tag(tagPublish).
|
Tag(tagPublish).
|
||||||
|
With(t).
|
||||||
Fields(log.Context{
|
Fields(log.Context{
|
||||||
"message_delayed": delayed,
|
"message_delayed": delayed,
|
||||||
"message_firebase": firebase,
|
"message_firebase": firebase,
|
||||||
|
@ -781,7 +782,7 @@ func (s *Server) parsePublishParams(r *http.Request, m *message) (cache bool, fi
|
||||||
if actionsStr != "" {
|
if actionsStr != "" {
|
||||||
m.Actions, err = parseActions(actionsStr)
|
m.Actions, err = parseActions(actionsStr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, false, "", false, wrapErrHTTP(errHTTPBadRequestActionsInvalid, err.Error())
|
return false, false, "", false, errHTTPBadRequestActionsInvalid.Wrap(err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
unifiedpush = readBoolParam(r, false, "x-unifiedpush", "unifiedpush", "up") // see GET too!
|
unifiedpush = readBoolParam(r, false, "x-unifiedpush", "unifiedpush", "up") // see GET too!
|
||||||
|
@ -845,7 +846,7 @@ func (s *Server) handleBodyAsMessageAutoDetect(m *message, body *util.PeekedRead
|
||||||
|
|
||||||
func (s *Server) handleBodyAsTextMessage(m *message, body *util.PeekedReadCloser) error {
|
func (s *Server) handleBodyAsTextMessage(m *message, body *util.PeekedReadCloser) error {
|
||||||
if !utf8.Valid(body.PeekedBytes) {
|
if !utf8.Valid(body.PeekedBytes) {
|
||||||
return errHTTPBadRequestMessageNotUTF8
|
return errHTTPBadRequestMessageNotUTF8.With(m)
|
||||||
}
|
}
|
||||||
if len(body.PeekedBytes) > 0 { // Empty body should not override message (publish via GET!)
|
if len(body.PeekedBytes) > 0 { // Empty body should not override message (publish via GET!)
|
||||||
m.Message = strings.TrimSpace(string(body.PeekedBytes)) // Truncates the message to the peek limit if required
|
m.Message = strings.TrimSpace(string(body.PeekedBytes)) // Truncates the message to the peek limit if required
|
||||||
|
@ -858,7 +859,7 @@ func (s *Server) handleBodyAsTextMessage(m *message, body *util.PeekedReadCloser
|
||||||
|
|
||||||
func (s *Server) handleBodyAsAttachment(r *http.Request, v *visitor, m *message, body *util.PeekedReadCloser) error {
|
func (s *Server) handleBodyAsAttachment(r *http.Request, v *visitor, m *message, body *util.PeekedReadCloser) error {
|
||||||
if s.fileCache == nil || s.config.BaseURL == "" || s.config.AttachmentCacheDir == "" {
|
if s.fileCache == nil || s.config.BaseURL == "" || s.config.AttachmentCacheDir == "" {
|
||||||
return errHTTPBadRequestAttachmentsDisallowed
|
return errHTTPBadRequestAttachmentsDisallowed.With(m)
|
||||||
}
|
}
|
||||||
vinfo, err := v.Info()
|
vinfo, err := v.Info()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -895,7 +896,7 @@ func (s *Server) handleBodyAsAttachment(r *http.Request, v *visitor, m *message,
|
||||||
}
|
}
|
||||||
m.Attachment.Size, err = s.fileCache.Write(m.ID, body, limiters...)
|
m.Attachment.Size, err = s.fileCache.Write(m.ID, body, limiters...)
|
||||||
if err == util.ErrLimitReached {
|
if err == util.ErrLimitReached {
|
||||||
return errHTTPEntityTooLargeAttachment
|
return errHTTPEntityTooLargeAttachment.With(m)
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -1212,7 +1213,7 @@ func (s *Server) setRateVisitors(r *http.Request, v *visitor, rateTopics []*topi
|
||||||
for _, t := range rateTopics {
|
for _, t := range rateTopics {
|
||||||
logvr(v, r).
|
logvr(v, r).
|
||||||
Tag(tagSubscribe).
|
Tag(tagSubscribe).
|
||||||
Field("message_topic", t.ID).
|
With(t).
|
||||||
Debug("Setting visitor as rate visitor for topic %s", t.ID)
|
Debug("Setting visitor as rate visitor for topic %s", t.ID)
|
||||||
t.SetRateVisitor(v)
|
t.SetRateVisitor(v)
|
||||||
}
|
}
|
||||||
|
@ -1558,8 +1559,8 @@ func (s *Server) autorizeTopic(next handleFunc, perm user.Permission) handleFunc
|
||||||
u := v.User()
|
u := v.User()
|
||||||
for _, t := range topics {
|
for _, t := range topics {
|
||||||
if err := s.userManager.Authorize(u, t.ID, perm); err != nil {
|
if err := s.userManager.Authorize(u, t.ID, perm); err != nil {
|
||||||
logvr(v, r).Err(err).Field("message_topic", t.ID).Debug("Access to topic %s not authorized", t.ID)
|
logvr(v, r).With(t).Err(err).Debug("Access to topic %s not authorized", t.ID)
|
||||||
return errHTTPForbidden
|
return errHTTPForbidden.With(t)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return next(w, r, v)
|
return next(w, r, v)
|
||||||
|
|
|
@ -290,6 +290,7 @@ func TestAccount_ChangePassword_NoAccount(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAccount_ExtendToken(t *testing.T) {
|
func TestAccount_ExtendToken(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
s := newTestServer(t, newTestConfigWithAuthFile(t))
|
s := newTestServer(t, newTestConfigWithAuthFile(t))
|
||||||
defer s.closeDatabases()
|
defer s.closeDatabases()
|
||||||
|
|
||||||
|
@ -611,6 +612,7 @@ func TestAccount_Reservation_PublishByAnonymousFails(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAccount_Reservation_Delete_Messages_And_Attachments(t *testing.T) {
|
func TestAccount_Reservation_Delete_Messages_And_Attachments(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
conf := newTestConfigWithAuthFile(t)
|
conf := newTestConfigWithAuthFile(t)
|
||||||
conf.AuthDefault = user.PermissionReadWrite
|
conf.AuthDefault = user.PermissionReadWrite
|
||||||
s := newTestServer(t, conf)
|
s := newTestServer(t, conf)
|
||||||
|
@ -685,6 +687,7 @@ func TestAccount_Reservation_Delete_Messages_And_Attachments(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAccount_Reservation_Add_Kills_Other_Subscribers(t *testing.T) {
|
func TestAccount_Reservation_Add_Kills_Other_Subscribers(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
conf := newTestConfigWithAuthFile(t)
|
conf := newTestConfigWithAuthFile(t)
|
||||||
conf.AuthDefault = user.PermissionReadWrite
|
conf.AuthDefault = user.PermissionReadWrite
|
||||||
conf.EnableSignup = true
|
conf.EnableSignup = true
|
||||||
|
@ -766,6 +769,7 @@ func TestAccount_Reservation_Add_Kills_Other_Subscribers(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAccount_Persist_UserStats_After_Tier_Change(t *testing.T) {
|
func TestAccount_Persist_UserStats_After_Tier_Change(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
conf := newTestConfigWithAuthFile(t)
|
conf := newTestConfigWithAuthFile(t)
|
||||||
conf.AuthDefault = user.PermissionReadWrite
|
conf.AuthDefault = user.PermissionReadWrite
|
||||||
conf.AuthStatsQueueWriterInterval = 200 * time.Millisecond
|
conf.AuthStatsQueueWriterInterval = 200 * time.Millisecond
|
||||||
|
|
|
@ -44,16 +44,11 @@ func (s *Server) execManager() {
|
||||||
"rate_visitor_user_id": vrate.MaybeUserID(),
|
"rate_visitor_user_id": vrate.MaybeUserID(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
ev.
|
ev.With(t).Trace("- topic %s: %d subscribers", t.ID, subs)
|
||||||
Fields(log.Context{
|
|
||||||
"message_topic": t.ID,
|
|
||||||
"message_topic_subscribers": subs,
|
|
||||||
}).
|
|
||||||
Trace("- topic %s: %d subscribers", t.ID, subs)
|
|
||||||
}
|
}
|
||||||
msgs, exists := messageCounts[t.ID]
|
msgs, exists := messageCounts[t.ID]
|
||||||
if t.Stale() && (!exists || msgs == 0) {
|
if t.Stale() && (!exists || msgs == 0) {
|
||||||
log.Tag(tagManager).Field("message_topic", t.ID).Trace("Deleting empty topic %s", t.ID)
|
log.Tag(tagManager).With(t).Trace("Deleting empty topic %s", t.ID)
|
||||||
emptyTopics++
|
emptyTopics++
|
||||||
delete(s.topics, t.ID)
|
delete(s.topics, t.ID)
|
||||||
continue
|
continue
|
||||||
|
|
|
@ -188,13 +188,13 @@ func (s *Server) handleAccountBillingSubscriptionCreateSuccess(w http.ResponseWr
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
} else if sess.Customer == nil || sess.Subscription == nil || sess.ClientReferenceID == "" {
|
} else if sess.Customer == nil || sess.Subscription == nil || sess.ClientReferenceID == "" {
|
||||||
return wrapErrHTTP(errHTTPBadRequestBillingRequestInvalid, "customer or subscription not found")
|
return errHTTPBadRequestBillingRequestInvalid.Wrap("customer or subscription not found")
|
||||||
}
|
}
|
||||||
sub, err := s.stripe.GetSubscription(sess.Subscription.ID)
|
sub, err := s.stripe.GetSubscription(sess.Subscription.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
} else if sub.Items == nil || len(sub.Items.Data) != 1 || sub.Items.Data[0].Price == nil || sub.Items.Data[0].Price.Recurring == nil {
|
} else if sub.Items == nil || len(sub.Items.Data) != 1 || sub.Items.Data[0].Price == nil || sub.Items.Data[0].Price.Recurring == nil {
|
||||||
return wrapErrHTTP(errHTTPBadRequestBillingRequestInvalid, "more than one line item in existing subscription")
|
return errHTTPBadRequestBillingRequestInvalid.Wrap("more than one line item in existing subscription")
|
||||||
}
|
}
|
||||||
priceID, interval := sub.Items.Data[0].Price.ID, sub.Items.Data[0].Price.Recurring.Interval
|
priceID, interval := sub.Items.Data[0].Price.ID, sub.Items.Data[0].Price.Recurring.Interval
|
||||||
tier, err := s.userManager.TierByStripePrice(priceID)
|
tier, err := s.userManager.TierByStripePrice(priceID)
|
||||||
|
@ -273,7 +273,7 @@ func (s *Server) handleAccountBillingSubscriptionUpdate(w http.ResponseWriter, r
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
} else if sub.Items == nil || len(sub.Items.Data) != 1 {
|
} else if sub.Items == nil || len(sub.Items.Data) != 1 {
|
||||||
return wrapErrHTTP(errHTTPBadRequestBillingRequestInvalid, "no items, or more than one item")
|
return errHTTPBadRequestBillingRequestInvalid.Wrap("no items, or more than one item")
|
||||||
}
|
}
|
||||||
params := &stripe.SubscriptionParams{
|
params := &stripe.SubscriptionParams{
|
||||||
CancelAtPeriodEnd: stripe.Bool(false),
|
CancelAtPeriodEnd: stripe.Bool(false),
|
||||||
|
|
|
@ -415,6 +415,8 @@ func TestPayments_Checkout_Success_And_Increase_Rate_Limits_Reset_Visitor(t *tes
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPayments_Webhook_Subscription_Updated_Downgrade_From_PastDue_To_Active(t *testing.T) {
|
func TestPayments_Webhook_Subscription_Updated_Downgrade_From_PastDue_To_Active(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
// This tests incoming webhooks from Stripe to update a subscription:
|
// This tests incoming webhooks from Stripe to update a subscription:
|
||||||
// - All Stripe columns are updated in the user table
|
// - All Stripe columns are updated in the user table
|
||||||
// - When downgrading, excess reservations are deleted, including messages and attachments in
|
// - When downgrading, excess reservations are deleted, including messages and attachments in
|
||||||
|
|
|
@ -84,6 +84,7 @@ func TestServer_PublishWithFirebase(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestServer_SubscribeOpenAndKeepalive(t *testing.T) {
|
func TestServer_SubscribeOpenAndKeepalive(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
c := newTestConfig(t)
|
c := newTestConfig(t)
|
||||||
c.KeepaliveInterval = time.Second
|
c.KeepaliveInterval = time.Second
|
||||||
s := newTestServer(t, c)
|
s := newTestServer(t, c)
|
||||||
|
@ -122,6 +123,7 @@ func TestServer_SubscribeOpenAndKeepalive(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestServer_PublishAndSubscribe(t *testing.T) {
|
func TestServer_PublishAndSubscribe(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
s := newTestServer(t, newTestConfig(t))
|
s := newTestServer(t, newTestConfig(t))
|
||||||
|
|
||||||
subscribeRR := httptest.NewRecorder()
|
subscribeRR := httptest.NewRecorder()
|
||||||
|
@ -297,6 +299,7 @@ func TestServer_PublishNoCache(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestServer_PublishAt(t *testing.T) {
|
func TestServer_PublishAt(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
c := newTestConfig(t)
|
c := newTestConfig(t)
|
||||||
c.MinDelay = time.Second
|
c.MinDelay = time.Second
|
||||||
c.DelayedSenderInterval = 100 * time.Millisecond
|
c.DelayedSenderInterval = 100 * time.Millisecond
|
||||||
|
@ -452,6 +455,7 @@ func TestServer_PublishWithNopCache(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestServer_PublishAndPollSince(t *testing.T) {
|
func TestServer_PublishAndPollSince(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
s := newTestServer(t, newTestConfig(t))
|
s := newTestServer(t, newTestConfig(t))
|
||||||
|
|
||||||
request(t, s, "PUT", "/mytopic", "test 1", nil)
|
request(t, s, "PUT", "/mytopic", "test 1", nil)
|
||||||
|
@ -632,6 +636,7 @@ func TestServer_PollWithQueryFilters(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestServer_SubscribeWithQueryFilters(t *testing.T) {
|
func TestServer_SubscribeWithQueryFilters(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
c := newTestConfig(t)
|
c := newTestConfig(t)
|
||||||
c.KeepaliveInterval = 800 * time.Millisecond
|
c.KeepaliveInterval = 800 * time.Millisecond
|
||||||
s := newTestServer(t, c)
|
s := newTestServer(t, c)
|
||||||
|
@ -816,6 +821,7 @@ func TestServer_Auth_NonBasicHeader(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestServer_StatsResetter(t *testing.T) {
|
func TestServer_StatsResetter(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
// This tests the stats resetter for
|
// This tests the stats resetter for
|
||||||
// - an anonymous user
|
// - an anonymous user
|
||||||
// - a user without a tier (treated like the same as the anonymous user)
|
// - a user without a tier (treated like the same as the anonymous user)
|
||||||
|
@ -956,6 +962,8 @@ func TestServer_StatsResetter_MessageLimiter_EmailsLimiter(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestServer_DailyMessageQuotaFromDatabase(t *testing.T) {
|
func TestServer_DailyMessageQuotaFromDatabase(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
// This tests that the daily message quota is prefilled originally from the database,
|
// This tests that the daily message quota is prefilled originally from the database,
|
||||||
// if the visitor is unknown
|
// if the visitor is unknown
|
||||||
|
|
||||||
|
@ -1050,6 +1058,7 @@ func TestServer_PublishTooRequests_Defaults_ExemptHosts_MessageDailyLimit(t *tes
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestServer_PublishTooRequests_ShortReplenish(t *testing.T) {
|
func TestServer_PublishTooRequests_ShortReplenish(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
c := newTestConfig(t)
|
c := newTestConfig(t)
|
||||||
c.VisitorRequestLimitBurst = 60
|
c.VisitorRequestLimitBurst = 60
|
||||||
c.VisitorRequestLimitReplenish = time.Second
|
c.VisitorRequestLimitReplenish = time.Second
|
||||||
|
@ -1082,6 +1091,7 @@ func TestServer_PublishTooManyEmails_Defaults(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestServer_PublishTooManyEmails_Replenish(t *testing.T) {
|
func TestServer_PublishTooManyEmails_Replenish(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
c := newTestConfig(t)
|
c := newTestConfig(t)
|
||||||
c.VisitorEmailLimitReplenish = 500 * time.Millisecond
|
c.VisitorEmailLimitReplenish = 500 * time.Millisecond
|
||||||
s := newTestServer(t, c)
|
s := newTestServer(t, c)
|
||||||
|
@ -1349,6 +1359,7 @@ func TestServer_PublishAsJSON_RateLimit_MessageDailyLimit(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestServer_PublishAsJSON_WithEmail(t *testing.T) {
|
func TestServer_PublishAsJSON_WithEmail(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
mailer := &testMailer{}
|
mailer := &testMailer{}
|
||||||
s := newTestServer(t, newTestConfig(t))
|
s := newTestServer(t, newTestConfig(t))
|
||||||
s.smtpSender = mailer
|
s.smtpSender = mailer
|
||||||
|
@ -1604,6 +1615,7 @@ func TestServer_PublishAttachmentTooLargeBodyVisitorAttachmentTotalSizeLimit(t *
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestServer_PublishAttachmentAndExpire(t *testing.T) {
|
func TestServer_PublishAttachmentAndExpire(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
content := util.RandomString(5000) // > 4096
|
content := util.RandomString(5000) // > 4096
|
||||||
|
|
||||||
c := newTestConfig(t)
|
c := newTestConfig(t)
|
||||||
|
@ -1631,6 +1643,7 @@ func TestServer_PublishAttachmentAndExpire(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestServer_PublishAttachmentWithTierBasedExpiry(t *testing.T) {
|
func TestServer_PublishAttachmentWithTierBasedExpiry(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
content := util.RandomString(5000) // > 4096
|
content := util.RandomString(5000) // > 4096
|
||||||
|
|
||||||
c := newTestConfigWithAuthFile(t)
|
c := newTestConfigWithAuthFile(t)
|
||||||
|
@ -1898,6 +1911,7 @@ func TestServer_Visitor_XForwardedFor_Multiple(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestServer_PublishWhileUpdatingStatsWithLotsOfMessages(t *testing.T) {
|
func TestServer_PublishWhileUpdatingStatsWithLotsOfMessages(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
count := 50000
|
count := 50000
|
||||||
c := newTestConfig(t)
|
c := newTestConfig(t)
|
||||||
c.TotalTopicLimit = 50001
|
c.TotalTopicLimit = 50001
|
||||||
|
|
|
@ -115,8 +115,8 @@ func (t *topic) CancelSubscribers(exceptUserID string) {
|
||||||
if s.userID != exceptUserID {
|
if s.userID != exceptUserID {
|
||||||
log.
|
log.
|
||||||
Tag(tagSubscribe).
|
Tag(tagSubscribe).
|
||||||
|
With(t).
|
||||||
Fields(log.Context{
|
Fields(log.Context{
|
||||||
"message_topic": t.ID,
|
|
||||||
"user_id": s.userID,
|
"user_id": s.userID,
|
||||||
}).
|
}).
|
||||||
Debug("Canceling subscriber %s", s.userID)
|
Debug("Canceling subscriber %s", s.userID)
|
||||||
|
@ -125,6 +125,20 @@ func (t *topic) CancelSubscribers(exceptUserID string) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *topic) Context() log.Context {
|
||||||
|
t.mu.RLock()
|
||||||
|
defer t.mu.RUnlock()
|
||||||
|
fields := map[string]any{
|
||||||
|
"topic": t.ID,
|
||||||
|
"topic_subscribers": len(t.subscribers),
|
||||||
|
}
|
||||||
|
if t.rateVisitor != nil {
|
||||||
|
fields["topic_rate_visitor_ip"] = t.rateVisitor.IP().String()
|
||||||
|
fields["topic_rate_visitor_user_id"] = t.rateVisitor.MaybeUserID()
|
||||||
|
}
|
||||||
|
return fields
|
||||||
|
}
|
||||||
|
|
||||||
// subscribersCopy returns a shallow copy of the subscribers map
|
// subscribersCopy returns a shallow copy of the subscribers map
|
||||||
func (t *topic) subscribersCopy() map[int]*topicSubscriber {
|
func (t *topic) subscribersCopy() map[int]*topicSubscriber {
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
|
|
|
@ -45,10 +45,10 @@ type message struct {
|
||||||
|
|
||||||
func (m *message) Context() log.Context {
|
func (m *message) Context() log.Context {
|
||||||
fields := map[string]any{
|
fields := map[string]any{
|
||||||
|
"topic": m.Topic,
|
||||||
"message_id": m.ID,
|
"message_id": m.ID,
|
||||||
"message_time": m.Time,
|
"message_time": m.Time,
|
||||||
"message_event": m.Event,
|
"message_event": m.Event,
|
||||||
"message_topic": m.Topic,
|
|
||||||
"message_body_size": len(m.Message),
|
"message_body_size": len(m.Message),
|
||||||
}
|
}
|
||||||
if m.Sender.IsValid() {
|
if m.Sender.IsValid() {
|
||||||
|
|
Loading…
Reference in New Issue