Move background tasks to functions
This commit is contained in:
		
							parent
							
								
									bd09fb4c54
								
							
						
					
					
						commit
						9cb48dbb60
					
				
					 1 changed files with 35 additions and 28 deletions
				
			
		|  | @ -188,39 +188,14 @@ func createFirebaseSubscriber(conf *config.Config) (subscriber, error) { | ||||||
| // Run executes the main server. It listens on HTTP (+ HTTPS, if configured), and starts | // Run executes the main server. It listens on HTTP (+ HTTPS, if configured), and starts | ||||||
| // a manager go routine to print stats and prune messages. | // a manager go routine to print stats and prune messages. | ||||||
| func (s *Server) Run() error { | func (s *Server) Run() error { | ||||||
| 	go func() { | 	go s.runManager() | ||||||
| 		ticker := time.NewTicker(s.config.ManagerInterval) | 	go s.runAtSender() | ||||||
| 		for { | 	go s.runFirebaseKeepliver() | ||||||
| 			<-ticker.C |  | ||||||
| 			s.updateStatsAndPrune() |  | ||||||
| 		} |  | ||||||
| 	}() |  | ||||||
| 	go func() { |  | ||||||
| 		ticker := time.NewTicker(s.config.AtSenderInterval) |  | ||||||
| 		for { |  | ||||||
| 			<-ticker.C |  | ||||||
| 			if err := s.sendDelayedMessages(); err != nil { |  | ||||||
| 				log.Printf("error sending scheduled messages: %s", err.Error()) |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
| 	}() |  | ||||||
| 	if s.firebase != nil { |  | ||||||
| 		go func() { |  | ||||||
| 			ticker := time.NewTicker(s.config.FirebaseKeepaliveInterval) |  | ||||||
| 			for { |  | ||||||
| 				<-ticker.C |  | ||||||
| 				if err := s.firebase(newKeepaliveMessage(firebaseControlTopic)); err != nil { |  | ||||||
| 					log.Printf("error sending Firebase keepalive message: %s", err.Error()) |  | ||||||
| 				} |  | ||||||
| 			} |  | ||||||
| 		}() |  | ||||||
| 	} |  | ||||||
| 	listenStr := fmt.Sprintf("%s/http", s.config.ListenHTTP) | 	listenStr := fmt.Sprintf("%s/http", s.config.ListenHTTP) | ||||||
| 	if s.config.ListenHTTPS != "" { | 	if s.config.ListenHTTPS != "" { | ||||||
| 		listenStr += fmt.Sprintf(" %s/https", s.config.ListenHTTPS) | 		listenStr += fmt.Sprintf(" %s/https", s.config.ListenHTTPS) | ||||||
| 	} | 	} | ||||||
| 	log.Printf("Listening on %s", listenStr) | 	log.Printf("Listening on %s", listenStr) | ||||||
| 
 |  | ||||||
| 	http.HandleFunc("/", s.handle) | 	http.HandleFunc("/", s.handle) | ||||||
| 	errChan := make(chan error) | 	errChan := make(chan error) | ||||||
| 	go func() { | 	go func() { | ||||||
|  | @ -611,6 +586,38 @@ func (s *Server) updateStatsAndPrune() { | ||||||
| 		s.messages, len(s.topics), subscribers, messages, len(s.visitors)) | 		s.messages, len(s.topics), subscribers, messages, len(s.visitors)) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | func (s *Server) runManager() { | ||||||
|  | 	func() { | ||||||
|  | 		ticker := time.NewTicker(s.config.ManagerInterval) | ||||||
|  | 		for { | ||||||
|  | 			<-ticker.C | ||||||
|  | 			s.updateStatsAndPrune() | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (s *Server) runAtSender() { | ||||||
|  | 	ticker := time.NewTicker(s.config.AtSenderInterval) | ||||||
|  | 	for { | ||||||
|  | 		<-ticker.C | ||||||
|  | 		if err := s.sendDelayedMessages(); err != nil { | ||||||
|  | 			log.Printf("error sending scheduled messages: %s", err.Error()) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (s *Server) runFirebaseKeepliver() { | ||||||
|  | 	if s.firebase == nil { | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 	ticker := time.NewTicker(s.config.FirebaseKeepaliveInterval) | ||||||
|  | 	for { | ||||||
|  | 		<-ticker.C | ||||||
|  | 		if err := s.firebase(newKeepaliveMessage(firebaseControlTopic)); err != nil { | ||||||
|  | 			log.Printf("error sending Firebase keepalive message: %s", err.Error()) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
| func (s *Server) sendDelayedMessages() error { | func (s *Server) sendDelayedMessages() error { | ||||||
| 	s.mu.Lock() | 	s.mu.Lock() | ||||||
| 	defer s.mu.Unlock() | 	defer s.mu.Unlock() | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue