From 517247284ec67d251e94b2b81331f4b01e5f35b0 Mon Sep 17 00:00:00 2001 From: 178inaba <178inaba@users.noreply.github.com> Date: Sun, 30 Apr 2017 03:12:55 +0900 Subject: [PATCH] Optimize streaming --- streaming.go | 71 ++++++++++++++++++++++++++--------------------- streaming_test.go | 70 +++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 102 insertions(+), 39 deletions(-) diff --git a/streaming.go b/streaming.go index 897be4c..060c9e7 100644 --- a/streaming.go +++ b/streaming.go @@ -8,8 +8,8 @@ import ( "net/http" "net/url" "path" + "strconv" "strings" - "time" ) // UpdateEvent is struct for passing status event to app. @@ -42,8 +42,8 @@ type Event interface { event() } -func handleReader(ctx context.Context, q chan Event, r io.Reader) error { - name := "" +func handleReader(q chan Event, r io.Reader) error { + var name string s := bufio.NewScanner(r) for s.Scan() { line := s.Text() @@ -55,30 +55,33 @@ func handleReader(ctx context.Context, q chan Event, r io.Reader) error { case "event": name = strings.TrimSpace(token[1]) case "data": + var err error switch name { case "update": var status Status - err := json.Unmarshal([]byte(token[1]), &status) + err = json.Unmarshal([]byte(token[1]), &status) if err == nil { q <- &UpdateEvent{&status} } case "notification": var notification Notification - err := json.Unmarshal([]byte(token[1]), ¬ification) + err = json.Unmarshal([]byte(token[1]), ¬ification) if err == nil { q <- &NotificationEvent{¬ification} } case "delete": var id int64 - err := json.Unmarshal([]byte(token[1]), &id) + id, err = strconv.ParseInt(strings.TrimSpace(token[1]), 10, 64) if err == nil { q <- &DeleteEvent{id} } } - default: + if err != nil { + q <- &ErrorEvent{err} + } } } - return ctx.Err() + return s.Err() } func (c *Client) streaming(ctx context.Context, p string, params url.Values) (chan Event, error) { @@ -96,38 +99,42 @@ func (c *Client) streaming(ctx context.Context, p string, params url.Values) (ch req = req.WithContext(ctx) req.Header.Set("Authorization", "Bearer "+c.config.AccessToken) - var resp *http.Response - - q := make(chan Event, 10) + q := make(chan Event) go func() { - defer ctx.Done() - + defer close(q) for { - resp, err = c.Do(req) - if resp != nil && resp.StatusCode != http.StatusOK { - err = parseAPIError("bad request", resp) + select { + case <-ctx.Done(): + q <- &ErrorEvent{ctx.Err()} + return + default: } - if err == nil { - err = handleReader(ctx, q, resp.Body) - if err == nil { - break - } - } else { - q <- &ErrorEvent{err} - } - resp.Body.Close() - time.Sleep(3 * time.Second) - } - }() - go func() { - <-ctx.Done() - if resp != nil && resp.Body != nil { - resp.Body.Close() + + c.doStreaming(req, q) } }() return q, nil } +func (c *Client) doStreaming(req *http.Request, q chan Event) { + resp, err := c.Do(req) + if err != nil { + q <- &ErrorEvent{err} + return + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + q <- &ErrorEvent{parseAPIError("bad request", resp)} + return + } + + err = handleReader(q, resp.Body) + if err != nil { + q <- &ErrorEvent{err} + } +} + // StreamingUser return channel to read events on home. func (c *Client) StreamingUser(ctx context.Context) (chan Event, error) { return c.streaming(ctx, "user", nil) diff --git a/streaming_test.go b/streaming_test.go index b5252d7..a417580 100644 --- a/streaming_test.go +++ b/streaming_test.go @@ -23,7 +23,7 @@ data: 1234567 `) go func() { defer close(q) - err := handleReader(context.Background(), q, r) + err := handleReader(q, r) if err != nil { t.Fatalf("should not be fail: %v", err) } @@ -54,9 +54,64 @@ data: 1234567 } } -func TestStreamingPublic(t *testing.T) { +func TestStreaming(t *testing.T) { + canErr := true ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path != "/api/v1/streaming/public" { + if canErr { + canErr = false + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + return + } + f := w.(http.Flusher) + fmt.Fprintln(w, ` +event: update +data: {"content": "foo"} + `) + f.Flush() + })) + defer ts.Close() + + c := NewClient(&Config{Server: ":"}) + _, err := c.streaming(context.Background(), "", nil) + if err == nil { + t.Fatalf("should be fail: %v", err) + } + + c = NewClient(&Config{Server: ts.URL}) + ctx, cancel := context.WithCancel(context.Background()) + time.AfterFunc(time.Second, func() { + cancel() + }) + q, err := c.streaming(ctx, "", nil) + if err != nil { + t.Fatalf("should not be fail: %v", err) + } + var passError, passUpdate bool + for e := range q { + switch event := e.(type) { + case *ErrorEvent: + passError = true + if event.err == nil { + t.Fatalf("should be fail: %v", event.err) + } + case *UpdateEvent: + passUpdate = true + if event.Status.Content != "foo" { + t.Fatalf("want %q but %q", "foo", event.Status.Content) + } + } + } + if !passError || !passUpdate { + t.Fatalf("have not passed through somewhere: error %t, update %t", passError, passUpdate) + } +} + +func TestStreamingPublic(t *testing.T) { + var isEnd bool + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if isEnd { + return + } else if r.URL.Path != "/api/v1/streaming/public" { http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound) return } @@ -72,7 +127,7 @@ event: update data: {"content": "bar"} `) f.Flush() - return + isEnd = true })) defer ts.Close() @@ -87,13 +142,14 @@ data: {"content": "bar"} if err != nil { t.Fatalf("should not be fail: %v", err) } - time.AfterFunc(3*time.Second, func() { + time.AfterFunc(time.Second, func() { cancel() - close(q) }) events := []Event{} for e := range q { - events = append(events, e) + if _, ok := e.(*ErrorEvent); !ok { + events = append(events, e) + } } if len(events) != 2 { t.Fatalf("result should be two: %d", len(events))