From 32a185060baf920ed969500ecc0ea0c9e4effdbe Mon Sep 17 00:00:00 2001 From: Johnny Gear Date: Sun, 16 Nov 2025 10:34:52 -0600 Subject: [PATCH] Asynchronous commands --- main.py | 8 ++- orders.py | 2 +- settings.py | 1 + telegram/__init__.py | 0 telegram/commands.py | 31 +++++++++++ telegram.py => telegram/telegram.py | 84 +++++++++++++++++------------ 6 files changed, 88 insertions(+), 38 deletions(-) create mode 100644 telegram/__init__.py create mode 100644 telegram/commands.py rename telegram.py => telegram/telegram.py (56%) diff --git a/main.py b/main.py index 58c841c..2792bae 100644 --- a/main.py +++ b/main.py @@ -5,7 +5,8 @@ import asyncio from scheduling import OrderScheduler from orders import order_issue, order_check -from telegram import handle_commands +from telegram.telegram import handle_commands +from telegram.commands import commands from db.queries import initdb logger = logging.getLogger(__name__) @@ -41,5 +42,8 @@ if __name__=='__main__': else: loop = asyncio.new_event_loop() s = OrderScheduler(loop) - loop.run_until_complete(handle_commands()) + loop.run_until_complete(handle_commands( + commands=commands, + loop=loop + )) loop.run_forever() diff --git a/orders.py b/orders.py index 5585b90..d2b9834 100644 --- a/orders.py +++ b/orders.py @@ -5,7 +5,7 @@ from util import make_session from generate import generate_order, generate_punishment from db.queries import order_status_put, punishment_status_put, order_status_outstanding, order_status_confirm from mastodon import Mastodon -from telegram import Telegram +from telegram.telegram import Telegram from settings import MASTODON_USERNAME, ORDER_TIMEOUT, ENV from util import timezone diff --git a/settings.py b/settings.py index f3bd9e9..3d0d741 100644 --- a/settings.py +++ b/settings.py @@ -15,6 +15,7 @@ MASTODON_VISIBILITY = os.environ.get('MASTODON_VISIBILITY', 'direct') TELEGRAM_API_TOKEN = os.environ.get('TELEGRAM_API_TOKEN') TELEGRAM_CHAT_ID = int(os.environ.get('TELEGRAM_CHAT_ID')) +TELEGRAM_COMMAND_TIMEOUT = int(os.environ.get('TELEGRAM_COMMAND_TIMEOUT', 120)) SQLITE_DB = os.environ.get('SQLITE_DB', 'db.sqlite3') diff --git a/telegram/__init__.py b/telegram/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/telegram/commands.py b/telegram/commands.py new file mode 100644 index 0000000..5e0a95b --- /dev/null +++ b/telegram/commands.py @@ -0,0 +1,31 @@ +import re +import logging +import peewee + +from db.queries import skip_day_put +from .telegram import TelegramCommand + +logger = logging.getLogger(__name__) + +class SkipDayAddCommand(TelegramCommand): + command_text = "/skip_day" + date_regex = re.compile(r"^(?P\d{4}-\d{2}-\d{2})$") + + async def exec_inner(self, text, update, session, forResponse=None, reply=None): + try: + yield "Please enter a skip day" + + response = await forResponse() + + m = self.date_regex.match(response) + if(m is not None): + skip_day_put(m.group('date')) + yield f"Skip day {m.group('date')} has been added" + else: + yield "Please enter a valid date" + except peewee.IntegrityError: + yield "That day has already been added" + +commands = [ + SkipDayAddCommand() +] diff --git a/telegram.py b/telegram/telegram.py similarity index 56% rename from telegram.py rename to telegram/telegram.py index b93b51e..c8021a9 100644 --- a/telegram.py +++ b/telegram/telegram.py @@ -1,9 +1,7 @@ -import re import logging import asyncio -from settings import TELEGRAM_API_TOKEN, TELEGRAM_CHAT_ID -from db.queries import skip_day_put +from settings import TELEGRAM_API_TOKEN, TELEGRAM_CHAT_ID, TELEGRAM_COMMAND_TIMEOUT from util import make_session logger = logging.getLogger(__name__) @@ -66,33 +64,28 @@ class TelegramCommand(): (self.command_regex is not None and self.command_regex.match(text) is not None) ) - async def exec(self, text, update, session): + async def exec(self, *args, **kwargs): + try: + async for reply in self.exec_inner(*args, **kwargs): + yield reply + except Exception as e: + yield "There was a problem with your command" + logger.error(f"{self.__class__.__name__} {e}") + + async def exec_inner(self, text, update, session): pass -class SkipDayAddCommand(TelegramCommand): - command_regex = re.compile(r"^\/skip_day( (?P\d{4}-\d{2}-\d{2}))?$") - - async def exec(self, text, update, session): - date_str = text.split(' ')[1] - - skip_day_put(date_str) - - yield f"Added skip day {date_str}" - -commands = [ - SkipDayAddCommand() -] - -async def do_reply(data, t): - if(type(data) is str): - await t.message_send( - data - ) - -async def handle_commands(commands=commands): +async def handle_commands(commands=[], loop=None): async with make_session() as session: t = Telegram(session) + command_tasks = set() + command_futures = {} + def forResponse(): + f = loop.create_future() + command_futures[chat_id] = f + return f + offset = 0 while True: try: @@ -106,19 +99,40 @@ async def handle_commands(commands=commands): if(chat_id != TELEGRAM_CHAT_ID): continue + if(chat_id in command_futures): + command_futures[chat_id].set_result( + update['message']['text'] + ) + del command_futures[chat_id] + continue + for command in commands: if(command.matches(update['message']['text'])): - try: - result = command.exec(update['message']['text'], update, session) + async def timeoutTask(): + try: + async with asyncio.timeout(TELEGRAM_COMMAND_TIMEOUT): + async for reply in command.exec( + update['message']['text'], + update, + session, + forResponse=forResponse + ): + await t.message_send(reply) + except TimeoutError: + if chat_id in command_futures: + del command_futures[chat_id] + + await t.message_send("Your command has timed out") + except Exception: + await t.message_send('There was a problem with your command') + logger.exception('Problem while executing a command') + + task = asyncio.create_task( + timeoutTask() + ) - if result.__class__.__name__ == 'async_generator': - async for r in result: - await do_reply(r, t) - else: - await do_reply(await result, t) - except: - await t.message_send('There was a problem with your command') - logger.exception('Problem while executing a command') + command_tasks.add(task) + task.add_done_callback(command_tasks.discard) break offset = update['update_id'] + 1