Support pending updates

This commit is contained in:
c0re100 2022-02-06 11:22:20 +08:00
parent a339293774
commit 6400f59b1b
No known key found for this signature in database
GPG key ID: 7C3B3004FE745AAF
2 changed files with 179 additions and 28 deletions

View file

@ -9,10 +9,13 @@ import (
"time"
)
var pendingUpdateType []Type
type Client struct {
jsonClient *JsonClient
extraGenerator ExtraGenerator
responses chan *Response
pendingResp chan *Response
listenerStore *listenerStore
catchersStore *sync.Map
successMsgStore *sync.Map
@ -56,10 +59,18 @@ func SetFilePath(path string) {
})
}
// Keep specific update type in memory when listener is not ready.
func SetPendingUpdateType(update ...Type) {
for _, v := range update {
pendingUpdateType = append(pendingUpdateType, v)
}
}
func NewClient(authorizationStateHandler AuthorizationStateHandler, options ...Option) (*Client, error) {
client := &Client{
jsonClient: NewJsonClient(),
responses: make(chan *Response, 1000),
pendingResp: make(chan *Response, 1000),
listenerStore: newListenerStore(),
catchersStore: &sync.Map{},
successMsgStore: &sync.Map{},
@ -74,6 +85,7 @@ func NewClient(authorizationStateHandler AuthorizationStateHandler, options ...O
tdlibInstance.addClient(client)
go client.processPendingResponse()
go client.receiver()
err := Authorize(client, authorizationStateHandler)
@ -84,40 +96,72 @@ func NewClient(authorizationStateHandler AuthorizationStateHandler, options ...O
return client, nil
}
func (client *Client) processResponse(response *Response) {
if response.Extra != "" {
value, ok := client.catchersStore.Load(response.Extra)
if ok {
value.(chan *Response) <- response
}
}
typ, err := UnmarshalType(response.Data)
if err != nil {
return
}
if typ.GetType() == (&UpdateMessageSendSucceeded{}).GetType() {
value, ok := client.successMsgStore.Load(typ.(*UpdateMessageSendSucceeded).OldMessageId)
if ok {
value.(chan *Response) <- response
}
}
if len(client.listenerStore.Listeners()) == 0 {
for _, p := range pendingUpdateType {
if typ.GetType() == p.GetType() {
client.pendingResp <- response
}
}
}
needGc := false
for _, listener := range client.listenerStore.Listeners() {
if listener.IsActive() && listener.Updates != nil && typ.GetType() == listener.Filter.GetType() { // All updates go to Updates channel if type == filter
listener.Updates <- typ
} else if listener.IsActive() && listener.RawUpdates != nil { // All updates go to RawUpdates channel if filter is empty
listener.RawUpdates <- typ
} else if !listener.IsActive() { // GC inactive listener
needGc = true
}
}
if needGc {
client.listenerStore.gc()
}
}
func (client *Client) receiver() {
for response := range client.responses {
if response.Extra != "" {
value, ok := client.catchersStore.Load(response.Extra)
if ok {
value.(chan *Response) <- response
}
}
client.processResponse(response)
}
}
typ, err := UnmarshalType(response.Data)
if err != nil {
continue
}
func (client *Client) processPendingResponse() {
// No need to process pending response if no pending list.
if len(pendingUpdateType) == 0 {
return
}
if typ.GetType() == (&UpdateMessageSendSucceeded{}).GetType() {
value, ok := client.successMsgStore.Load(typ.(*UpdateMessageSendSucceeded).OldMessageId)
if ok {
value.(chan *Response) <- response
}
// Wait for listener to be ready.
for {
if len(client.listenerStore.Listeners()) > 0 {
break
}
time.Sleep(1 * time.Second)
}
needGc := false
for _, listener := range client.listenerStore.Listeners() {
if listener.IsActive() && listener.Updates != nil && typ.GetType() == listener.Filter.GetType() { // All updates go to Updates channel if type == filter
listener.Updates <- typ
} else if listener.IsActive() && listener.RawUpdates != nil { // All updates go to RawUpdates channel if filter is empty
listener.RawUpdates <- typ
} else if !listener.IsActive() { // GC inactive listener
needGc = true
}
}
if needGc {
client.listenerStore.gc()
}
// Start processing pending response
for response := range client.pendingResp {
client.processResponse(response)
}
}