From c66c3aaa2298a47e724259457385fe7a80345b21 Mon Sep 17 00:00:00 2001 From: 178inaba <178inaba@users.noreply.github.com> Date: Sat, 29 Apr 2017 03:42:45 +0900 Subject: [PATCH 1/7] Add TestHandleReader --- streaming_test.go | 45 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/streaming_test.go b/streaming_test.go index 6e6153d..b5252d7 100644 --- a/streaming_test.go +++ b/streaming_test.go @@ -5,10 +5,55 @@ import ( "fmt" "net/http" "net/http/httptest" + "strings" "testing" "time" ) +func TestHandleReader(t *testing.T) { + q := make(chan Event) + r := strings.NewReader(` +event: update +data: {"content": "foo"} +event: notification +data: {"type": "mention"} +event: delete +data: 1234567 +:thump + `) + go func() { + defer close(q) + err := handleReader(context.Background(), q, r) + if err != nil { + t.Fatalf("should not be fail: %v", err) + } + }() + var passUpdate, passNotification, passDelete bool + for e := range q { + switch event := e.(type) { + case *UpdateEvent: + passUpdate = true + if event.Status.Content != "foo" { + t.Fatalf("want %q but %q", "foo", event.Status.Content) + } + case *NotificationEvent: + passNotification = true + if event.Notification.Type != "mention" { + t.Fatalf("want %q but %q", "mention", event.Notification.Type) + } + case *DeleteEvent: + passDelete = true + if event.ID != 1234567 { + t.Fatalf("want %d but %d", 1234567, event.ID) + } + } + } + if !passUpdate || !passNotification || !passDelete { + t.Fatalf("have not passed through somewhere: update %t, notification %t, delete %t", + passUpdate, passNotification, passDelete) + } +} + func TestStreamingPublic(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.URL.Path != "/api/v1/streaming/public" { From 517247284ec67d251e94b2b81331f4b01e5f35b0 Mon Sep 17 00:00:00 2001 From: 178inaba <178inaba@users.noreply.github.com> Date: Sun, 30 Apr 2017 03:12:55 +0900 Subject: [PATCH 2/7] Optimize streaming --- streaming.go | 71 ++++++++++++++++++++++++++--------------------- streaming_test.go | 70 +++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 102 insertions(+), 39 deletions(-) diff --git a/streaming.go b/streaming.go index 897be4c..060c9e7 100644 --- a/streaming.go +++ b/streaming.go @@ -8,8 +8,8 @@ import ( "net/http" "net/url" "path" + "strconv" "strings" - "time" ) // UpdateEvent is struct for passing status event to app. @@ -42,8 +42,8 @@ type Event interface { event() } -func handleReader(ctx context.Context, q chan Event, r io.Reader) error { - name := "" +func handleReader(q chan Event, r io.Reader) error { + var name string s := bufio.NewScanner(r) for s.Scan() { line := s.Text() @@ -55,30 +55,33 @@ func handleReader(ctx context.Context, q chan Event, r io.Reader) error { case "event": name = strings.TrimSpace(token[1]) case "data": + var err error switch name { case "update": var status Status - err := json.Unmarshal([]byte(token[1]), &status) + err = json.Unmarshal([]byte(token[1]), &status) if err == nil { q <- &UpdateEvent{&status} } case "notification": var notification Notification - err := json.Unmarshal([]byte(token[1]), ¬ification) + err = json.Unmarshal([]byte(token[1]), ¬ification) if err == nil { q <- &NotificationEvent{¬ification} } case "delete": var id int64 - err := json.Unmarshal([]byte(token[1]), &id) + id, err = strconv.ParseInt(strings.TrimSpace(token[1]), 10, 64) if err == nil { q <- &DeleteEvent{id} } } - default: + if err != nil { + q <- &ErrorEvent{err} + } } } - return ctx.Err() + return s.Err() } func (c *Client) streaming(ctx context.Context, p string, params url.Values) (chan Event, error) { @@ -96,38 +99,42 @@ func (c *Client) streaming(ctx context.Context, p string, params url.Values) (ch req = req.WithContext(ctx) req.Header.Set("Authorization", "Bearer "+c.config.AccessToken) - var resp *http.Response - - q := make(chan Event, 10) + q := make(chan Event) go func() { - defer ctx.Done() - + defer close(q) for { - resp, err = c.Do(req) - if resp != nil && resp.StatusCode != http.StatusOK { - err = parseAPIError("bad request", resp) + select { + case <-ctx.Done(): + q <- &ErrorEvent{ctx.Err()} + return + default: } - if err == nil { - err = handleReader(ctx, q, resp.Body) - if err == nil { - break - } - } else { - q <- &ErrorEvent{err} - } - resp.Body.Close() - time.Sleep(3 * time.Second) - } - }() - go func() { - <-ctx.Done() - if resp != nil && resp.Body != nil { - resp.Body.Close() + + c.doStreaming(req, q) } }() return q, nil } +func (c *Client) doStreaming(req *http.Request, q chan Event) { + resp, err := c.Do(req) + if err != nil { + q <- &ErrorEvent{err} + return + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + q <- &ErrorEvent{parseAPIError("bad request", resp)} + return + } + + err = handleReader(q, resp.Body) + if err != nil { + q <- &ErrorEvent{err} + } +} + // StreamingUser return channel to read events on home. func (c *Client) StreamingUser(ctx context.Context) (chan Event, error) { return c.streaming(ctx, "user", nil) diff --git a/streaming_test.go b/streaming_test.go index b5252d7..a417580 100644 --- a/streaming_test.go +++ b/streaming_test.go @@ -23,7 +23,7 @@ data: 1234567 `) go func() { defer close(q) - err := handleReader(context.Background(), q, r) + err := handleReader(q, r) if err != nil { t.Fatalf("should not be fail: %v", err) } @@ -54,9 +54,64 @@ data: 1234567 } } -func TestStreamingPublic(t *testing.T) { +func TestStreaming(t *testing.T) { + canErr := true ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path != "/api/v1/streaming/public" { + if canErr { + canErr = false + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + return + } + f := w.(http.Flusher) + fmt.Fprintln(w, ` +event: update +data: {"content": "foo"} + `) + f.Flush() + })) + defer ts.Close() + + c := NewClient(&Config{Server: ":"}) + _, err := c.streaming(context.Background(), "", nil) + if err == nil { + t.Fatalf("should be fail: %v", err) + } + + c = NewClient(&Config{Server: ts.URL}) + ctx, cancel := context.WithCancel(context.Background()) + time.AfterFunc(time.Second, func() { + cancel() + }) + q, err := c.streaming(ctx, "", nil) + if err != nil { + t.Fatalf("should not be fail: %v", err) + } + var passError, passUpdate bool + for e := range q { + switch event := e.(type) { + case *ErrorEvent: + passError = true + if event.err == nil { + t.Fatalf("should be fail: %v", event.err) + } + case *UpdateEvent: + passUpdate = true + if event.Status.Content != "foo" { + t.Fatalf("want %q but %q", "foo", event.Status.Content) + } + } + } + if !passError || !passUpdate { + t.Fatalf("have not passed through somewhere: error %t, update %t", passError, passUpdate) + } +} + +func TestStreamingPublic(t *testing.T) { + var isEnd bool + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if isEnd { + return + } else if r.URL.Path != "/api/v1/streaming/public" { http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound) return } @@ -72,7 +127,7 @@ event: update data: {"content": "bar"} `) f.Flush() - return + isEnd = true })) defer ts.Close() @@ -87,13 +142,14 @@ data: {"content": "bar"} if err != nil { t.Fatalf("should not be fail: %v", err) } - time.AfterFunc(3*time.Second, func() { + time.AfterFunc(time.Second, func() { cancel() - close(q) }) events := []Event{} for e := range q { - events = append(events, e) + if _, ok := e.(*ErrorEvent); !ok { + events = append(events, e) + } } if len(events) != 2 { t.Fatalf("result should be two: %d", len(events)) From 088cb0b74d36a101f78c5b756d9c6797d3d76ff2 Mon Sep 17 00:00:00 2001 From: 178inaba <178inaba@users.noreply.github.com> Date: Sun, 30 Apr 2017 03:19:58 +0900 Subject: [PATCH 3/7] Fix TestHandleReader --- streaming_test.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/streaming_test.go b/streaming_test.go index a417580..1ee1655 100644 --- a/streaming_test.go +++ b/streaming_test.go @@ -14,6 +14,8 @@ func TestHandleReader(t *testing.T) { q := make(chan Event) r := strings.NewReader(` event: update +data: {content: error} +event: update data: {"content": "foo"} event: notification data: {"type": "mention"} @@ -28,7 +30,7 @@ data: 1234567 t.Fatalf("should not be fail: %v", err) } }() - var passUpdate, passNotification, passDelete bool + var passUpdate, passNotification, passDelete, passError bool for e := range q { switch event := e.(type) { case *UpdateEvent: @@ -46,11 +48,17 @@ data: 1234567 if event.ID != 1234567 { t.Fatalf("want %d but %d", 1234567, event.ID) } + case *ErrorEvent: + passError = true + if event.err == nil { + t.Fatalf("should be fail: %v", event.err) + } } } - if !passUpdate || !passNotification || !passDelete { - t.Fatalf("have not passed through somewhere: update %t, notification %t, delete %t", - passUpdate, passNotification, passDelete) + if !passUpdate || !passNotification || !passDelete || !passError { + t.Fatalf("have not passed through somewhere: "+ + "update %t, notification %t, delete %t, error %t", + passUpdate, passNotification, passDelete, passError) } } From 9b39dd033ffc10a18fd104cfe9b9ccf3b637750d Mon Sep 17 00:00:00 2001 From: 178inaba <178inaba@users.noreply.github.com> Date: Sun, 30 Apr 2017 04:30:04 +0900 Subject: [PATCH 4/7] Add TestDoStreaming --- streaming_test.go | 64 +++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 57 insertions(+), 7 deletions(-) diff --git a/streaming_test.go b/streaming_test.go index 1ee1655..da960a3 100644 --- a/streaming_test.go +++ b/streaming_test.go @@ -63,9 +63,12 @@ data: 1234567 } func TestStreaming(t *testing.T) { + var isEnd bool canErr := true ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if canErr { + if isEnd { + return + } else if canErr { canErr = false http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) return @@ -76,6 +79,7 @@ event: update data: {"content": "foo"} `) f.Flush() + isEnd = true })) defer ts.Close() @@ -87,13 +91,12 @@ data: {"content": "foo"} c = NewClient(&Config{Server: ts.URL}) ctx, cancel := context.WithCancel(context.Background()) - time.AfterFunc(time.Second, func() { - cancel() - }) + time.AfterFunc(time.Second, cancel) q, err := c.streaming(ctx, "", nil) if err != nil { t.Fatalf("should not be fail: %v", err) } + var cnt int var passError, passUpdate bool for e := range q { switch event := e.(type) { @@ -103,17 +106,63 @@ data: {"content": "foo"} t.Fatalf("should be fail: %v", event.err) } case *UpdateEvent: + cnt++ passUpdate = true if event.Status.Content != "foo" { t.Fatalf("want %q but %q", "foo", event.Status.Content) } } } + if cnt != 1 { + t.Fatalf("result should be one: %d", cnt) + } if !passError || !passUpdate { t.Fatalf("have not passed through somewhere: error %t, update %t", passError, passUpdate) } } +func TestDoStreaming(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.(http.Flusher).Flush() + time.Sleep(time.Second) + })) + defer ts.Close() + + c := NewClient(&Config{Server: ts.URL}) + + req, err := http.NewRequest(http.MethodGet, ts.URL, nil) + if err != nil { + t.Fatalf("should not be fail: %v", err) + } + ctx, cancel := context.WithCancel(context.Background()) + time.AfterFunc(time.Millisecond, cancel) + req = req.WithContext(ctx) + + q := make(chan Event) + go func() { + defer close(q) + c.doStreaming(req, q) + if err != nil { + t.Fatalf("should not be fail: %v", err) + } + }() + var passError bool + for e := range q { + if event, ok := e.(*ErrorEvent); ok { + passError = true + if event.err == nil { + t.Fatalf("should be fail: %v", event.err) + } + } + } + if !passError { + t.Fatalf("have not passed through: error %t", passError) + } +} + +func TestStreamingUser(t *testing.T) { +} + func TestStreamingPublic(t *testing.T) { var isEnd bool ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -150,9 +199,7 @@ data: {"content": "bar"} if err != nil { t.Fatalf("should not be fail: %v", err) } - time.AfterFunc(time.Second, func() { - cancel() - }) + time.AfterFunc(time.Second, cancel) events := []Event{} for e := range q { if _, ok := e.(*ErrorEvent); !ok { @@ -169,3 +216,6 @@ data: {"content": "bar"} t.Fatalf("want %q but %q", "bar", events[1].(*UpdateEvent).Status.Content) } } + +func TestStreamingHashtag(t *testing.T) { +} From e2a440b1703a13b879e4eae4783dc0a1fa5a76e9 Mon Sep 17 00:00:00 2001 From: 178inaba <178inaba@users.noreply.github.com> Date: Sun, 30 Apr 2017 04:41:56 +0900 Subject: [PATCH 5/7] Add TestStreamingUser --- streaming_test.go | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/streaming_test.go b/streaming_test.go index da960a3..f2cfa86 100644 --- a/streaming_test.go +++ b/streaming_test.go @@ -161,6 +161,40 @@ func TestDoStreaming(t *testing.T) { } func TestStreamingUser(t *testing.T) { + var isEnd bool + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if isEnd { + return + } + f, _ := w.(http.Flusher) + fmt.Fprintln(w, ` +event: update +data: {"content": "foo"} + `) + f.Flush() + isEnd = true + })) + defer ts.Close() + + c := NewClient(&Config{Server: ts.URL}) + ctx, cancel := context.WithCancel(context.Background()) + time.AfterFunc(time.Second, cancel) + q, err := c.StreamingUser(ctx) + if err != nil { + t.Fatalf("should not be fail: %v", err) + } + events := []Event{} + for e := range q { + if _, ok := e.(*ErrorEvent); !ok { + events = append(events, e) + } + } + if len(events) != 1 { + t.Fatalf("result should be one: %d", len(events)) + } + if events[0].(*UpdateEvent).Status.Content != "foo" { + t.Fatalf("want %q but %q", "foo", events[0].(*UpdateEvent).Status.Content) + } } func TestStreamingPublic(t *testing.T) { From c32612c3c98c05f682bf31d81727b5537156abe6 Mon Sep 17 00:00:00 2001 From: 178inaba <178inaba@users.noreply.github.com> Date: Sun, 30 Apr 2017 04:53:13 +0900 Subject: [PATCH 6/7] Add TestStreamingHashtag --- streaming_test.go | 44 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 42 insertions(+), 2 deletions(-) diff --git a/streaming_test.go b/streaming_test.go index f2cfa86..9f5b715 100644 --- a/streaming_test.go +++ b/streaming_test.go @@ -165,6 +165,9 @@ func TestStreamingUser(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if isEnd { return + } else if r.URL.Path != "/api/v1/streaming/user" { + http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound) + return } f, _ := w.(http.Flusher) fmt.Fprintln(w, ` @@ -202,7 +205,7 @@ func TestStreamingPublic(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if isEnd { return - } else if r.URL.Path != "/api/v1/streaming/public" { + } else if r.URL.Path != "/api/v1/streaming/public/local" { http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound) return } @@ -229,7 +232,7 @@ data: {"content": "bar"} AccessToken: "zoo", }) ctx, cancel := context.WithCancel(context.Background()) - q, err := client.StreamingPublic(ctx, false) + q, err := client.StreamingPublic(ctx, true) if err != nil { t.Fatalf("should not be fail: %v", err) } @@ -252,4 +255,41 @@ data: {"content": "bar"} } func TestStreamingHashtag(t *testing.T) { + var isEnd bool + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if isEnd { + return + } else if r.URL.Path != "/api/v1/streaming/hashtag/local" { + http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound) + return + } + f, _ := w.(http.Flusher) + fmt.Fprintln(w, ` +event: update +data: {"content": "foo"} + `) + f.Flush() + isEnd = true + })) + defer ts.Close() + + client := NewClient(&Config{Server: ts.URL}) + ctx, cancel := context.WithCancel(context.Background()) + time.AfterFunc(time.Second, cancel) + q, err := client.StreamingHashtag(ctx, "hashtag", true) + if err != nil { + t.Fatalf("should not be fail: %v", err) + } + events := []Event{} + for e := range q { + if _, ok := e.(*ErrorEvent); !ok { + events = append(events, e) + } + } + if len(events) != 1 { + t.Fatalf("result should be one: %d", len(events)) + } + if events[0].(*UpdateEvent).Status.Content != "foo" { + t.Fatalf("want %q but %q", "foo", events[0].(*UpdateEvent).Status.Content) + } } From c88e2df363e5c9bade8322b6020e4cc100b0d208 Mon Sep 17 00:00:00 2001 From: 178inaba <178inaba@users.noreply.github.com> Date: Sun, 30 Apr 2017 13:48:08 +0900 Subject: [PATCH 7/7] Remove channel close of mstdn stream command --- cmd/mstdn/cmd_stream.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cmd/mstdn/cmd_stream.go b/cmd/mstdn/cmd_stream.go index dac8865..f2ca59c 100644 --- a/cmd/mstdn/cmd_stream.go +++ b/cmd/mstdn/cmd_stream.go @@ -80,7 +80,6 @@ func cmdStream(c *cli.Context) error { go func() { <-sc cancel() - close(q) }() c.App.Metadata["signal"] = sc