From 5d86bf140c3c93a0432ad7fa408ea202fc3e3645 Mon Sep 17 00:00:00 2001 From: sfan5 Date: Mon, 1 Jul 2024 20:50:12 +0200 Subject: [PATCH] . --- requirements.txt | 2 +- supportbot => supportbot/__main__.py | 35 +++++---- src/core.py => supportbot/bot.py | 106 +++++++++------------------ 3 files changed, 56 insertions(+), 87 deletions(-) rename supportbot => supportbot/__main__.py (68%) rename src/core.py => supportbot/bot.py (73%) diff --git a/requirements.txt b/requirements.txt index e1e1441..f2f86e6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ -pyTelegramBotAPI>=3.5.1 +pyTelegramBotAPI>=4.19.0 pyYAML>=3.12 diff --git a/supportbot b/supportbot/__main__.py similarity index 68% rename from supportbot rename to supportbot/__main__.py index 4f078f3..e109535 100755 --- a/supportbot +++ b/supportbot/__main__.py @@ -6,10 +6,11 @@ import sys import os import shelve import getopt +from pickle import Unpickler -import src.core as core +from . import bot -def start_new_thread(func, join=False, args=(), kwargs={}): +def start_new_thread(func, join=False, args=(), kwargs=None): t = threading.Thread(target=func, args=args, kwargs=kwargs) if not join: t.daemon = True @@ -17,22 +18,19 @@ def start_new_thread(func, join=False, args=(), kwargs={}): if join: t.join() -def readopt(name): - global opts - for e in opts: - if e[0] == name: - return e[1] - return None - def usage(): - print("Usage: %s [-q|-d] [-c config.yaml]" % sys.argv[0]) + print("Usage: %s [-q|-d] [-c file]" % sys.argv[0]) print("Options:") print(" -h Display this text") print(" -q Quiet, set log level to WARNING") print(" -c Location of config file (default: ./config.yaml)") -def open_db(config): - return shelve.open(config["database"]) +# well this is just dumb +class RenamingUnpickler(Unpickler): + def find_class(self, module, name): + if module == "src.core": + module = "supportbot.bot" + return super().find_class(module, name) def main(configpath, loglevel=logging.INFO): with open(configpath, "r") as f: @@ -40,12 +38,13 @@ def main(configpath, loglevel=logging.INFO): logging.basicConfig(format="%(levelname)-7s [%(asctime)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=loglevel) - db = open_db(config) + shelve.Unpickler = RenamingUnpickler + db = shelve.open(config["database"]) - core.init(config, db) + bot.init(config, db) try: - start_new_thread(core.run, join=True) + start_new_thread(bot.run, join=True) except KeyboardInterrupt: logging.info("Interrupted, exiting") db.close() @@ -57,7 +56,12 @@ if __name__ == "__main__": except getopt.GetoptError as e: print(str(e)) exit(1) + # Process command line args + def readopt(name): + for e in opts: + if e[0] == name: + return e[1] if readopt("-h") is not None or readopt("--help") is not None: usage() exit(0) @@ -67,5 +71,6 @@ if __name__ == "__main__": configpath = "./config.yaml" if readopt("-c") is not None: configpath = readopt("-c") + # Run the actual program main(configpath, loglevel) diff --git a/src/core.py b/supportbot/bot.py similarity index 73% rename from src/core.py rename to supportbot/bot.py index cec2262..ad17055 100644 --- a/src/core.py +++ b/supportbot/bot.py @@ -3,21 +3,26 @@ import logging import time import json from datetime import datetime, timedelta +from typing import Optional LONG_LONG_TIME = datetime(2100, 1, 1) ID_REMIND_DURATION = timedelta(days=2) -ALL_CONTENT_TYPES = ("text", "location", "venue", "contact", "animation", - "audio", "document", "photo", "sticker", "video", "video_note", "voice") +BAN_NOTSENT_WARNING = timedelta(minutes=10) +ALL_CONTENT_TYPES = ('animation', 'audio', 'contact', 'dice', 'document', + 'game', 'location', 'photo', 'sticker', 'story', 'text', 'venue', 'video', + 'video_note', 'voice') -bot = None +TMessage = telebot.types.Message + +bot: telebot.TeleBot = None db = None -bot_self_id = None -target_group = None -welcome_text = None -reply_text = None +bot_self_id: int = None +target_group: Optional[int] = None +welcome_text: str = None +reply_text: str = None -def init(config, _db): +def init(config: dict, _db): global bot, db, bot_self_id, target_group, welcome_text, reply_text if not config.get("bot_token"): logging.error("No telegram token specified.") @@ -45,15 +50,16 @@ def set_handler(func, *args, **kwargs): bot.message_handler(*args, **kwargs)(wrapper) def run(): + assert not bot.threaded while True: try: - bot.polling(none_stop=True, long_polling_timeout=60) + bot.polling(non_stop=True, long_polling_timeout=60) except Exception as e: # you're not supposed to call .polling() more than once but I'm left with no choice logging.warning("%s while polling Telegram, retrying.", type(e).__name__) time.sleep(1) -def callwrapper(f): +def callwrapper(f) -> Optional[str]: while True: try: f() @@ -86,13 +92,18 @@ class ModificationContext(): def __init__(self, key, obj): self.key = key self.obj = obj - def __enter__(self): + def __enter__(self) -> 'User': return self.obj def __exit__(self, exc_type, *_): if exc_type is None: db[self.key] = self.obj class User(): + id: int + username: str + realname: str + last_messaged: datetime + banned_until: Optional[datetime] def __init__(self): self.id = None self.username = None @@ -116,13 +127,14 @@ def db_auto_sync(): if now > db_last_sync + 15: db_last_sync = now db.sync() +# def db_get_user(id) -> User: return db["u%d" % id] -def db_modify_user(id, allow_new=False): +def db_modify_user(id, allow_new=False) -> ModificationContext: key = "u%d" % id - obj: User = db.get(key) + obj = db.get(key) if obj is None: if allow_new: obj = User() @@ -132,7 +144,7 @@ def db_modify_user(id, allow_new=False): ### Main stuff -def handle_msg(ev): +def handle_msg(ev: TMessage): db_auto_sync() if ev.chat.type in ("group", "supergroup"): if ev.chat.id == target_group: @@ -142,7 +154,7 @@ def handle_msg(ev): elif ev.chat.type == "private": return handle_private(ev) -def handle_group(ev): +def handle_group(ev: TMessage): if ev.reply_to_message is None: return if ev.reply_to_message.from_user.id != bot_self_id: @@ -160,17 +172,18 @@ def handle_group(ev): return handle_group_command(ev, user_id, c, arg) user = db_get_user(user_id) - if user.banned_until is not None and (user.banned_until >= datetime.now() and - datetime.now() - user.last_messaged >= timedelta(minutes=10)): + now = datetime.now() + if user.banned_until is not None and (user.banned_until >= now and + now - user.last_messaged >= BAN_NOTSENT_WARNING): msg = "Message was not delivered, unban recipient first." return callwrapper(lambda: bot.send_message(target_group, msg)) # deliver message - res = callwrapper(lambda: resend_message(user_id, ev)) + res = callwrapper(lambda: bot.copy_message(user_id, ev.chat.id, ev.message_id)) if res == "blocked": callwrapper(lambda: bot.send_message(target_group, "Bot was blocked by user.")) -def handle_group_command(ev, user_id, c, arg): +def handle_group_command(ev: TMessage, user_id: int, c: str, arg: str): if c == "info": msg = format_user_info(db_get_user(user_id)) return callwrapper(lambda: bot.send_message(target_group, msg, parse_mode="HTML")) @@ -195,7 +208,7 @@ def handle_group_command(ev, user_id, c, arg): msg = "User was unbanned." return callwrapper(lambda: bot.send_message(target_group, msg)) -def handle_private(ev): +def handle_private(ev: TMessage): if target_group is None: logging.error("Target group not set, dropping message from user!") return @@ -231,8 +244,7 @@ def handle_private(ev): return # deliver message - if (ev.forward_from is not None or ev.forward_from_chat is not None - or ev.json.get("forward_sender_name") is not None): + if ev.forward_origin is not None: msg = "It is not possible to forward messages here." return callwrapper(lambda: bot.send_message(ev.chat.id, msg)) @@ -252,7 +264,7 @@ def handle_private(ev): with db_modify_user(user.id) as user: user.last_messaged = now -def handle_private_command(ev, user, c): +def handle_private_command(ev: TMessage, user, c): if c == "start": callwrapper(lambda: bot.send_message(ev.chat.id, welcome_text, parse_mode="HTML")) return True @@ -295,51 +307,3 @@ def format_user_info(user): s += " (@%s)" % escape_html(user.username) s += "\nID: %d" % user.id return s - -def resend_message(chat_id, ev): - # re-send message based on content type - if ev.content_type == "text": - return bot.send_message(chat_id, ev.text) - elif ev.content_type == "photo": - photo = sorted(ev.photo, key=lambda e: e.width*e.height, reverse=True)[0] - return bot.send_photo(chat_id, photo.file_id, caption=ev.caption) - elif ev.content_type == "audio": - kwargs = { - "caption": ev.caption, - "performer": ev.audio.performer, - "title": ev.audio.performer, - } - return bot.send_audio(chat_id, ev.audio.file_id, **kwargs) - elif ev.content_type == "animation": - return bot.send_animation(chat_id, ev.animation.file_id, caption=ev.caption) - elif ev.content_type == "document": - return bot.send_document(chat_id, ev.document.file_id, caption=ev.caption) - elif ev.content_type == "video": - return bot.send_video(chat_id, ev.video.file_id, caption=ev.caption) - elif ev.content_type == "voice": - return bot.send_voice(chat_id, ev.voice.file_id, caption=ev.caption) - elif ev.content_type == "video_note": - return bot.send_video_note(chat_id, ev.video_note.file_id) - elif ev.content_type == "location": - kwargs = { - "latitude": ev.location.latitude, - "longitude": ev.location.longitude, - } - return bot.send_location(chat_id, **kwargs) - elif ev.content_type == "venue": - kwargs = { - "latitude": ev.venue.location.latitude, - "longitude": ev.venue.location.longitude, - } - for prop in ("title", "address", "foursquare_id", "foursquare_type", "google_place_id", "google_place_type"): - kwargs[prop] = getattr(ev.venue, prop) - return bot.send_venue(chat_id, **kwargs) - elif ev.content_type == "contact": - kwargs = {} - for prop in ("phone_number", "first_name", "last_name"): - kwargs[prop] = getattr(ev.contact, prop) - return bot.send_contact(chat_id, **kwargs) - elif ev.content_type == "sticker": - return bot.send_sticker(chat_id, ev.sticker.file_id) - else: - raise NotImplementedError("content_type = %s" % ev.content_type)