From 87278bda2ea5c5c316659bbc9da906110a1cf74b Mon Sep 17 00:00:00 2001 From: Alexander Bakker Date: Sun, 1 May 2022 14:15:40 +0200 Subject: [PATCH] Use bufio.Reader instead of bufio.Scanner when streaming I occasionally run into "bufio.Scanner: token too long" while streaming. This change should prevent that from happening. --- streaming.go | 28 +++++++++++++++++++++++----- streaming_test.go | 27 ++++++++++++++++++--------- 2 files changed, 41 insertions(+), 14 deletions(-) diff --git a/streaming.go b/streaming.go index 2148ed0..615efec 100644 --- a/streaming.go +++ b/streaming.go @@ -2,8 +2,10 @@ package mastodon import ( "bufio" + "bytes" "context" "encoding/json" + "errors" "io" "net/http" "net/url" @@ -43,10 +45,27 @@ type Event interface { func handleReader(q chan Event, r io.Reader) error { var name string - s := bufio.NewScanner(r) - for s.Scan() { - line := s.Text() - token := strings.SplitN(line, ":", 2) + var lineBuf bytes.Buffer + br := bufio.NewReader(r) + for { + line, isPrefix, err := br.ReadLine() + if err != nil { + if errors.Is(err, io.EOF) { + return nil + } + return err + } + if isPrefix { + lineBuf.Write(line) + continue + } + if lineBuf.Len() > 0 { + lineBuf.Write(line) + line = lineBuf.Bytes() + lineBuf.Reset() + } + + token := strings.SplitN(string(line), ":", 2) if len(token) != 2 { continue } @@ -76,7 +95,6 @@ func handleReader(q chan Event, r io.Reader) error { } } } - return s.Err() } func (c *Client) streaming(ctx context.Context, p string, params url.Values) (chan Event, error) { diff --git a/streaming_test.go b/streaming_test.go index 0bd5ccc..fd61f9c 100644 --- a/streaming_test.go +++ b/streaming_test.go @@ -1,6 +1,7 @@ package mastodon import ( + "bufio" "context" "fmt" "net/http" @@ -11,18 +12,23 @@ import ( ) func TestHandleReader(t *testing.T) { + large := "large" + largeContent := strings.Repeat(large, 2*(bufio.MaxScanTokenSize/len(large))) + q := make(chan Event) - r := strings.NewReader(` + r := strings.NewReader(fmt.Sprintf(` event: update data: {content: error} event: update data: {"content": "foo"} +event: update +data: {"content": "%s"} event: notification data: {"type": "mention"} event: delete data: 1234567 :thump - `) + `, largeContent)) go func() { defer close(q) err := handleReader(q, r) @@ -30,13 +36,16 @@ data: 1234567 t.Fatalf("should not be fail: %v", err) } }() - var passUpdate, passNotification, passDelete, passError bool + var passUpdate, passUpdateLarge, passNotification, passDelete, passError bool for e := range q { switch event := e.(type) { case *UpdateEvent: - passUpdate = true - if event.Status.Content != "foo" { - t.Fatalf("want %q but %q", "foo", event.Status.Content) + if event.Status.Content == "foo" { + passUpdate = true + } else if event.Status.Content == largeContent { + passUpdateLarge = true + } else { + t.Fatalf("bad update content: %q", event.Status.Content) } case *NotificationEvent: passNotification = true @@ -55,10 +64,10 @@ data: 1234567 } } } - if !passUpdate || !passNotification || !passDelete || !passError { + if !passUpdate || !passUpdateLarge || !passNotification || !passDelete || !passError { t.Fatalf("have not passed through somewhere: "+ - "update %t, notification %t, delete %t, error %t", - passUpdate, passNotification, passDelete, passError) + "update: %t, update (large): %t, notification: %t, delete: %t, error: %t", + passUpdate, passUpdateLarge, passNotification, passDelete, passError) } }