Fix local streaming
parent
a4904109e4
commit
9fa63b511c
63
streaming.go
63
streaming.go
|
@ -86,7 +86,15 @@ func (c *Client) streaming(ctx context.Context, p string, params url.Values) (ch
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
u.Path = path.Join(u.Path, "/api/v1/streaming/"+p)
|
||||
u.Path = path.Join(u.Path, "/api/v1/streaming", p)
|
||||
u.RawQuery = params.Encode()
|
||||
|
||||
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
req = req.WithContext(ctx)
|
||||
req.Header.Set("Authorization", "Bearer "+c.config.AccessToken)
|
||||
|
||||
var resp *http.Response
|
||||
|
||||
|
@ -95,18 +103,9 @@ func (c *Client) streaming(ctx context.Context, p string, params url.Values) (ch
|
|||
defer ctx.Done()
|
||||
|
||||
for {
|
||||
var in io.Reader
|
||||
if params != nil {
|
||||
in = strings.NewReader(params.Encode())
|
||||
}
|
||||
req, err := http.NewRequest(http.MethodGet, u.String(), in)
|
||||
if err == nil {
|
||||
req = req.WithContext(ctx)
|
||||
req.Header.Set("Authorization", "Bearer "+c.config.AccessToken)
|
||||
resp, err = c.Do(req)
|
||||
if resp != nil && resp.StatusCode != http.StatusOK {
|
||||
err = parseAPIError("bad request", resp)
|
||||
}
|
||||
resp, err = c.Do(req)
|
||||
if resp != nil && resp.StatusCode != http.StatusOK {
|
||||
err = parseAPIError("bad request", resp)
|
||||
}
|
||||
if err == nil {
|
||||
err = handleReader(ctx, q, resp.Body)
|
||||
|
@ -129,28 +128,30 @@ func (c *Client) streaming(ctx context.Context, p string, params url.Values) (ch
|
|||
return q, nil
|
||||
}
|
||||
|
||||
// StreamingPublic return channel to read events on public.
|
||||
func (c *Client) StreamingPublic(ctx context.Context) (chan Event, error) {
|
||||
params := url.Values{}
|
||||
return c.streaming(ctx, "public", params)
|
||||
}
|
||||
|
||||
// StreamingPublicLocal return channel to read events on public.
|
||||
func (c *Client) StreamingPublicLocal(ctx context.Context) (chan Event, error) {
|
||||
params := url.Values{}
|
||||
return c.streaming(ctx, "public/local", params)
|
||||
}
|
||||
|
||||
// StreamingUser return channel to read events on home.
|
||||
func (c *Client) StreamingUser(ctx context.Context, user string) (chan Event, error) {
|
||||
params := url.Values{}
|
||||
params.Set("user", user)
|
||||
return c.streaming(ctx, "user", params)
|
||||
func (c *Client) StreamingUser(ctx context.Context) (chan Event, error) {
|
||||
return c.streaming(ctx, "user", nil)
|
||||
}
|
||||
|
||||
// StreamingPublic return channel to read events on public.
|
||||
func (c *Client) StreamingPublic(ctx context.Context, isLocal bool) (chan Event, error) {
|
||||
p := "public"
|
||||
if isLocal {
|
||||
p = path.Join(p, "local")
|
||||
}
|
||||
|
||||
return c.streaming(ctx, p, nil)
|
||||
}
|
||||
|
||||
// StreamingHashtag return channel to read events on tagged timeline.
|
||||
func (c *Client) StreamingHashtag(ctx context.Context, tag string) (chan Event, error) {
|
||||
func (c *Client) StreamingHashtag(ctx context.Context, tag string, isLocal bool) (chan Event, error) {
|
||||
params := url.Values{}
|
||||
params.Set("tag", tag)
|
||||
return c.streaming(ctx, "hashtag", params)
|
||||
|
||||
p := "hashtag"
|
||||
if isLocal {
|
||||
p = path.Join(p, "local")
|
||||
}
|
||||
|
||||
return c.streaming(ctx, p, params)
|
||||
}
|
||||
|
|
|
@ -38,7 +38,7 @@ data: {"content": "bar"}
|
|||
AccessToken: "zoo",
|
||||
})
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
q, err := client.StreamingPublic(ctx)
|
||||
q, err := client.StreamingPublic(ctx, false)
|
||||
if err != nil {
|
||||
t.Fatalf("should not be fail: %v", err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue