Use bufio.Reader instead of bufio.Scanner when streaming

I occasionally run into "bufio.Scanner: token too long" while
streaming. This change should prevent that from happening.
pull/147/head
Alexander Bakker 2022-05-01 14:15:40 +02:00 committed by mattn
parent f436c5397c
commit 87278bda2e
2 changed files with 41 additions and 14 deletions

View File

@ -2,8 +2,10 @@ package mastodon
import ( import (
"bufio" "bufio"
"bytes"
"context" "context"
"encoding/json" "encoding/json"
"errors"
"io" "io"
"net/http" "net/http"
"net/url" "net/url"
@ -43,10 +45,27 @@ type Event interface {
func handleReader(q chan Event, r io.Reader) error { func handleReader(q chan Event, r io.Reader) error {
var name string var name string
s := bufio.NewScanner(r) var lineBuf bytes.Buffer
for s.Scan() { br := bufio.NewReader(r)
line := s.Text() for {
token := strings.SplitN(line, ":", 2) line, isPrefix, err := br.ReadLine()
if err != nil {
if errors.Is(err, io.EOF) {
return nil
}
return err
}
if isPrefix {
lineBuf.Write(line)
continue
}
if lineBuf.Len() > 0 {
lineBuf.Write(line)
line = lineBuf.Bytes()
lineBuf.Reset()
}
token := strings.SplitN(string(line), ":", 2)
if len(token) != 2 { if len(token) != 2 {
continue continue
} }
@ -76,7 +95,6 @@ func handleReader(q chan Event, r io.Reader) error {
} }
} }
} }
return s.Err()
} }
func (c *Client) streaming(ctx context.Context, p string, params url.Values) (chan Event, error) { func (c *Client) streaming(ctx context.Context, p string, params url.Values) (chan Event, error) {

View File

@ -1,6 +1,7 @@
package mastodon package mastodon
import ( import (
"bufio"
"context" "context"
"fmt" "fmt"
"net/http" "net/http"
@ -11,18 +12,23 @@ import (
) )
func TestHandleReader(t *testing.T) { func TestHandleReader(t *testing.T) {
large := "large"
largeContent := strings.Repeat(large, 2*(bufio.MaxScanTokenSize/len(large)))
q := make(chan Event) q := make(chan Event)
r := strings.NewReader(` r := strings.NewReader(fmt.Sprintf(`
event: update event: update
data: {content: error} data: {content: error}
event: update event: update
data: {"content": "foo"} data: {"content": "foo"}
event: update
data: {"content": "%s"}
event: notification event: notification
data: {"type": "mention"} data: {"type": "mention"}
event: delete event: delete
data: 1234567 data: 1234567
:thump :thump
`) `, largeContent))
go func() { go func() {
defer close(q) defer close(q)
err := handleReader(q, r) err := handleReader(q, r)
@ -30,13 +36,16 @@ data: 1234567
t.Fatalf("should not be fail: %v", err) t.Fatalf("should not be fail: %v", err)
} }
}() }()
var passUpdate, passNotification, passDelete, passError bool var passUpdate, passUpdateLarge, passNotification, passDelete, passError bool
for e := range q { for e := range q {
switch event := e.(type) { switch event := e.(type) {
case *UpdateEvent: case *UpdateEvent:
passUpdate = true if event.Status.Content == "foo" {
if event.Status.Content != "foo" { passUpdate = true
t.Fatalf("want %q but %q", "foo", event.Status.Content) } else if event.Status.Content == largeContent {
passUpdateLarge = true
} else {
t.Fatalf("bad update content: %q", event.Status.Content)
} }
case *NotificationEvent: case *NotificationEvent:
passNotification = true passNotification = true
@ -55,10 +64,10 @@ data: 1234567
} }
} }
} }
if !passUpdate || !passNotification || !passDelete || !passError { if !passUpdate || !passUpdateLarge || !passNotification || !passDelete || !passError {
t.Fatalf("have not passed through somewhere: "+ t.Fatalf("have not passed through somewhere: "+
"update %t, notification %t, delete %t, error %t", "update: %t, update (large): %t, notification: %t, delete: %t, error: %t",
passUpdate, passNotification, passDelete, passError) passUpdate, passUpdateLarge, passNotification, passDelete, passError)
} }
} }