tgsupportbot/supportbot/bot.py
2026-06-30 15:53:08 +01:00

535 lines
16 KiB
Python

import telebot
import logging
import time
import json
import os
import threading
from datetime import datetime, timedelta
from typing import Optional
LONG_LONG_TIME = datetime(2100, 1, 1)
ID_REMIND_DURATION = timedelta(days=2)
BAN_NOTSENT_WARNING = timedelta(minutes=10)
REMINDER_CHECK_INTERVAL = 300 # seconds between checks
ALL_CONTENT_TYPES = ('animation', 'audio', 'contact', 'dice', 'document',
'game', 'location', 'photo', 'sticker', 'story', 'text', 'venue', 'video',
'video_note', 'voice')
TMessage = telebot.types.Message
bot: telebot.TeleBot = None
db: 'JsonDb' = None
db_lock = threading.Lock()
bot_self_id: int = None
target_group: Optional[int] = None
message_thread_id: Optional[int] = None
welcome_text: str = None
reply_text: str = None
integration_fmt: Optional[str] = None
reply_reminder_interval: timedelta = timedelta(hours=6)
quiet_hours_start: Optional[int] = None
quiet_hours_end: Optional[int] = None
_in_quiet_hours: bool = False
def init(config: dict, _db):
global bot, db, bot_self_id, target_group, message_thread_id, welcome_text, reply_text, integration_fmt, reply_reminder_interval, quiet_hours_start, quiet_hours_end, _in_quiet_hours
if not config.get("bot_token"):
logging.error("No telegram token specified.")
exit(1)
logging.getLogger("urllib3").setLevel(logging.WARNING) # very noisy with debug otherwise
bot = telebot.TeleBot(config["bot_token"], threaded=False)
db = _db
if config.get("target_group"):
target_group = int(config["target_group"])
if config.get("message_thread_id"):
message_thread_id = int(config["message_thread_id"])
welcome_text = config["welcome_text"]
reply_text = config["reply_text"]
integration_fmt = config.get("integration_fmt")
if config.get("reminder_interval_minutes"):
reply_reminder_interval = timedelta(minutes=int(config["reminder_interval_minutes"]))
if config.get("quiet_hours_start") is not None:
quiet_hours_start = int(config["quiet_hours_start"])
if config.get("quiet_hours_end") is not None:
quiet_hours_end = int(config["quiet_hours_end"])
_in_quiet_hours = _is_quiet_hour(datetime.now())
set_handler(handle_msg, content_types=ALL_CONTENT_TYPES)
bot_self_id = bot.get_me().id
logging.info("Startup OK")
def set_handler(func, *args, **kwargs):
def wrapper(*args, **kwargs):
try:
func(*args, **kwargs)
except Exception as e:
logging.exception("Exception raised in event handler")
bot.message_handler(*args, **kwargs)(wrapper)
def run():
assert not bot.threaded
while True:
try:
bot.polling(non_stop=True, long_polling_timeout=60)
except Exception as e:
# you're not supposed to call .polling() more than once but I'm left with no choice
logging.warning("%s while polling Telegram, retrying.", type(e).__name__)
time.sleep(1)
def callwrapper_ex(f):
"""Like callwrapper but also returns the Telegram error text on failure."""
while True:
try:
f()
except telebot.apihelper.ApiException as e:
status = check_telegram_exc(e)
if not status:
continue
try:
err = json.loads(e.result.text).get("description", e.result.text)
except Exception:
err = e.result.text
return status, err
return None, None
def callwrapper(f) -> Optional[str]:
while True:
try:
f()
except telebot.apihelper.ApiException as e:
status = check_telegram_exc(e)
if not status:
continue
return status
return
def check_telegram_exc(e):
errmsgs = ["bot was blocked by the user", "user is deactivated",
"PEER_ID_INVALID", "bot can't initiate conversation"]
if any(msg in e.result.text for msg in errmsgs):
return "blocked"
if "Too Many Requests" in e.result.text:
d = json.loads(e.result.text)["parameters"]["retry_after"]
d = min(d, 30) # supposedly this is in seconds, but you sometimes get 100 or even 2000
logging.warning("API rate limit hit, waiting for %ds", d)
time.sleep(d)
return False # retry
logging.error("Telegram API error: %s", e.result.text)
return "exception"
### db
class JsonDb:
def __init__(self, path: str):
self.path = path
self._data = {"users": {}, "msg_map": {}}
if os.path.exists(path):
with open(path) as f:
self._data = json.load(f)
def _save(self):
with open(self.path, "w") as f:
json.dump(self._data, f, indent=2)
def get_user(self, user_id: int) -> Optional['User']:
d = self._data["users"].get(str(user_id))
return User.from_dict(d) if d is not None else None
def set_user(self, user: 'User'):
self._data["users"][str(user.id)] = user.to_dict()
self._save()
def get_msg_user(self, msg_id: int) -> Optional[int]:
return self._data["msg_map"].get(str(msg_id))
def set_msg_user(self, msg_id: int, user_id: int):
self._data["msg_map"][str(msg_id)] = user_id
self._save()
def all_users(self) -> list:
return [User.from_dict(d) for d in self._data["users"].values()]
class ModificationContext():
def __init__(self, obj):
self.obj = obj
def __enter__(self) -> 'User':
return self.obj
def __exit__(self, exc_type, *_):
if exc_type is None:
with db_lock:
db.set_user(self.obj)
class User():
id: int
username: str
realname: str
last_messaged: datetime
banned_until: Optional[datetime]
replied_to: bool
last_reminder_sent: Optional[datetime]
last_fwd_msg_id: Optional[int]
def __init__(self):
self.id = None
self.username = None
self.realname = None
self.last_messaged = None
self.banned_until = None
self.replied_to = True
self.last_reminder_sent = None
self.last_fwd_msg_id = None
def __eq__(self, other):
if isinstance(other, User):
return self.id == other.id
return NotImplemented
def __str__(self):
return "<User id=%d>" % self.id
def defaults(self):
self.last_messaged = datetime(1970, 1, 1)
self.replied_to = True
self.last_reminder_sent = None
def to_dict(self) -> dict:
return {
"id": self.id,
"username": self.username,
"realname": self.realname,
"last_messaged": self.last_messaged.isoformat() if self.last_messaged else None,
"banned_until": self.banned_until.isoformat() if self.banned_until else None,
"replied_to": self.replied_to,
"last_reminder_sent": self.last_reminder_sent.isoformat() if self.last_reminder_sent else None,
"last_fwd_msg_id": self.last_fwd_msg_id,
}
@classmethod
def from_dict(cls, d: dict) -> 'User':
u = cls()
u.id = d["id"]
u.username = d.get("username")
u.realname = d.get("realname")
u.last_messaged = datetime.fromisoformat(d["last_messaged"]) if d.get("last_messaged") else datetime(1970, 1, 1)
u.banned_until = datetime.fromisoformat(d["banned_until"]) if d.get("banned_until") else None
u.replied_to = d.get("replied_to", True)
u.last_reminder_sent = datetime.fromisoformat(d["last_reminder_sent"]) if d.get("last_reminder_sent") else None
u.last_fwd_msg_id = d.get("last_fwd_msg_id")
return u
def db_get_user(id) -> User:
with db_lock:
user = db.get_user(id)
if user is None:
raise KeyError(id)
return user
def db_modify_user(id, allow_new=False) -> ModificationContext:
with db_lock:
obj = db.get_user(id)
if obj is None:
if allow_new:
obj = User()
else:
raise KeyError(id)
return ModificationContext(obj)
### Main stuff
def handle_msg(ev: TMessage):
if ev.chat.type in ("group", "supergroup"):
if ev.chat.id == target_group:
return handle_group(ev)
logging.warning("Got message from group %d which "
"we're not supposed to be in", ev.chat.id)
elif ev.chat.type == "private":
return handle_private(ev)
def handle_group(ev: TMessage):
if ev.reply_to_message is None:
return
if ev.reply_to_message.from_user.id != bot_self_id:
return
with db_lock:
user_id = db.get_msg_user(ev.reply_to_message.message_id)
logging.debug("found id = %d mapped to user %s", ev.reply_to_message.message_id, user_id)
if user_id is None:
logging.warning("Couldn't find replied to message in target group")
return
# handle commands
if ev.content_type == "text" and ev.text.startswith("/"):
c, _, arg = ev.text[1:].partition(" ")
return handle_group_command(ev, user_id, c, arg)
user = db_get_user(user_id)
now = datetime.now()
if user.banned_until is not None and (user.banned_until >= now and
now - user.last_messaged >= BAN_NOTSENT_WARNING):
msg = "Message was not delivered, unban recipient first."
return callwrapper(lambda: bot.send_message(target_group, message_thread_id=message_thread_id, text=msg))
# deliver message
def copy_and_log():
result = bot.copy_message(user_id, ev.chat.id, ev.message_id)
logging.info("copy_message response: %s", result)
res, err_text = callwrapper_ex(copy_and_log)
if res:
reply_id = ev.message_id
msg = "Failed to deliver: %s" % err_text
callwrapper(lambda: bot.send_message(target_group, message_thread_id=message_thread_id,
text=msg, reply_to_message_id=reply_id))
else:
with db_modify_user(user_id) as u:
u.replied_to = True
def handle_group_command(ev: TMessage, user_id: int, c: str, arg: str):
if c == "status":
user = db_get_user(user_id)
msg = format_user_info(user) + "\n" + format_ticket_info(user)
return callwrapper(lambda: bot.send_message(target_group, message_thread_id=message_thread_id, text=msg, parse_mode="HTML"))
elif c == "ban":
delta = parse_timedelta(arg)
if not delta:
until = LONG_LONG_TIME
msg = "User banned permanently."
else:
until = datetime.now() + delta
msg = "User banned until %s." % format_datetime(until)
with db_modify_user(user_id) as user:
user.banned_until = until
return callwrapper(lambda: bot.send_message(target_group, message_thread_id=message_thread_id, text=msg))
elif c == "unban":
msg = None
with db_modify_user(user_id) as user:
if user.banned_until is None or user.banned_until < datetime.now():
msg = "User was not banned or ban expired already."
else:
user.banned_until = None
msg = "User was unbanned."
return callwrapper(lambda: bot.send_message(target_group, message_thread_id=message_thread_id, text=msg))
elif c == "finished":
with db_modify_user(user_id) as user:
user.replied_to = True
user.last_reminder_sent = None
msg = "Marked as replied. Reminders stopped."
return callwrapper(lambda: bot.send_message(target_group, message_thread_id=message_thread_id, text=msg))
def handle_private(ev: TMessage):
if target_group is None:
logging.error("Target group not set, dropping message from user!")
return
# refresh user in db
now = datetime.now()
with db_modify_user(ev.chat.id, allow_new=True) as user:
if user.id is None:
user.defaults()
user.id = ev.chat.id
assert ev.chat.id == ev.from_user.id
user.username = ev.from_user.username
user.realname = ev.from_user.first_name
if ev.from_user.last_name:
user.realname += " " + ev.from_user.last_name
# check things
error = None
if user.banned_until is not None:
if now >= user.banned_until:
pass
elif user.banned_until >= LONG_LONG_TIME:
error = "You cannot message the support bot."
else:
error = "You cannot message the support bot now, try again later."
if error is not None:
return callwrapper(lambda: bot.send_message(ev.chat.id, error))
# handle commands
if ev.content_type == "text" and ev.text.startswith("/"):
c = ev.text[1:].split(" ", 1)[0]
if handle_private_command(ev, user, c):
return
# deliver message
if ev.forward_origin is not None:
msg = "It is not possible to forward messages here."
return callwrapper(lambda: bot.send_message(ev.chat.id, msg))
if now - user.last_messaged >= ID_REMIND_DURATION:
msg = "---------------------------------------\n"
msg += format_user_info(user)
callwrapper(lambda: bot.send_message(chat_id=target_group, message_thread_id=message_thread_id, text=msg, parse_mode="HTML"))
def f(user_id=user.id):
ev2 = bot.forward_message(chat_id=target_group, from_chat_id=ev.chat.id, message_id=ev.message_id, message_thread_id=message_thread_id)
with db_lock:
db.set_msg_user(ev2.message_id, user_id)
with db_modify_user(user_id) as u:
u.last_fwd_msg_id = ev2.message_id
logging.debug("delivered msg from %s -> id = %d", user, ev2.message_id)
res = callwrapper(f)
if res == "blocked":
return
if reply_text:
callwrapper(lambda: bot.send_message(ev.chat.id, reply_text, parse_mode="HTML"))
with db_modify_user(user.id) as user:
user.last_messaged = now
user.replied_to = False
user.last_reminder_sent = now # first reminder fires reply_reminder_interval from now
def handle_private_command(ev: TMessage, user: User, c):
if c == "start":
callwrapper(lambda: bot.send_message(ev.chat.id, welcome_text, parse_mode="HTML"))
return True
elif c == "stop":
return True
### Reminders
def reminder_loop():
while True:
try:
_send_pending_reminders()
except Exception:
logging.exception("Exception in reminder loop")
time.sleep(REMINDER_CHECK_INTERVAL)
def _is_quiet_hour(now: datetime) -> bool:
if quiet_hours_start is None or quiet_hours_end is None:
return False
h = now.hour
if quiet_hours_start <= quiet_hours_end:
return quiet_hours_start <= h < quiet_hours_end
return h >= quiet_hours_start or h < quiet_hours_end # wraps midnight
def _send_pending_reminders():
global _in_quiet_hours
if target_group is None:
return
now = datetime.now()
if _is_quiet_hour(now):
_in_quiet_hours = True
return
with db_lock:
users = db.all_users()
all_pending = []
due_for_reminder = []
for user in users:
if user.replied_to:
continue
all_pending.append(user)
last_reminder = user.last_reminder_sent
if last_reminder is None or now - last_reminder >= reply_reminder_interval:
due_for_reminder.append(user)
was_quiet = _in_quiet_hours
_in_quiet_hours = False
if was_quiet and all_pending:
_send_batch_reminder(all_pending, now)
else:
for user in due_for_reminder:
try:
_send_reminder(user, now)
except Exception:
logging.exception("Failed to send reminder for user %d", user.id)
def _send_batch_reminder(users: list, now: datetime):
group_str = str(target_group)
channel_id = group_str[4:] if group_str.startswith('-100') else group_str.lstrip('-')
lines = ["\U0001f514 Pending replies:"]
for user in users:
if user.last_fwd_msg_id is None:
continue
lines.append("https://t.me/c/%s/%d" % (channel_id, user.last_fwd_msg_id))
if len(lines) == 1:
return
msg_text = "\n".join(lines)
callwrapper(lambda: bot.send_message(chat_id=target_group, message_thread_id=message_thread_id,
text=msg_text))
for user in users:
if user.last_fwd_msg_id is None:
continue
with db_modify_user(user.id) as u:
u.replied_to = False
u.last_reminder_sent = now
logging.info("Sent batch reminder for %d users", len(users))
def _send_reminder(user: User, now: datetime):
if user.last_fwd_msg_id is None:
return
def f():
bot.send_message(chat_id=target_group, message_thread_id=message_thread_id,
text="\U0001f514 Pending reply", reply_to_message_id=user.last_fwd_msg_id)
status = callwrapper(f)
if status:
logging.warning("Reminder failed for user %d: %s", user.id, status)
return
with db_modify_user(user.id) as u:
u.replied_to = False
u.last_reminder_sent = now
logging.info("Sent reminder for user %d", user.id)
### Helpers
def str_is_printable(s):
NOT_PRINTABLE = (0x20, 0x115f, 0x1160, 0x3164, 0xffa0)
return any((c.isprintable() and ord(c) not in NOT_PRINTABLE) for c in s)
def parse_timedelta(s):
if len(s) < 2 or not s[:-1].isdigit():
return
suff = s[-1].lower()
suff = ({"s": 1, "m": 60, "h": 60*60, "d": 24*60*60, "w": 7*24*60*60}).get(suff)
if not suff:
return
return timedelta(seconds=int(s[:-1]) * suff)
def escape_html(s):
ret = ""
for c in s:
if c in ("<", ">", "&"):
c = "&#" + str(ord(c)) + ";"
ret += c
return ret
def format_datetime(dt):
return dt.strftime("%Y-%m-%d %H:%M:%S")
def format_ticket_info(user: User):
now = datetime.now()
lines = []
if user.replied_to:
lines.append("Status: replied")
else:
lines.append("Status: <b>pending reply</b>")
if user.last_reminder_sent:
lines.append("Last reminder: %s" % format_datetime(user.last_reminder_sent))
if user.banned_until is not None:
if user.banned_until >= LONG_LONG_TIME:
lines.append("Ban: permanent")
elif user.banned_until > now:
lines.append("Ban: until %s" % format_datetime(user.banned_until))
else:
lines.append("Ban: expired")
return "\n".join(lines)
def format_user_info(user: User):
realname = user.realname
if not str_is_printable(realname):
realname = "<empty name>"
s = "User: <a href=\"tg://user?id=%d\">%s</a>" % (
user.id, escape_html(realname))
if user.username is not None:
s += " (@%s)" % escape_html(user.username)
if integration_fmt:
s += "\n" + (integration_fmt % user.id)
s += "\nID: <code>%d</code>" % user.id
return s