import logging import asyncio from settings import ENV, TELEGRAM_API_TOKEN, TELEGRAM_CHAT_ID, TELEGRAM_COMMAND_TIMEOUT from util import make_session logger = logging.getLogger(__name__) TELEGRAM_API_URL = 'https://api.telegram.org/bot' class Telegram: def __init__(self, aiosession): if TELEGRAM_API_TOKEN is None: raise Exception("TELEGRAM_API_TOKEN is required") self.aiosession = aiosession async def get(self, request, params): async with self.aiosession.get('{}{}/{}'.format( TELEGRAM_API_URL, TELEGRAM_API_TOKEN, request ), params=params) as response: if not response.ok: logger.error('{} {}'.format(request, response.status)) raise Exception('{} {}'.format(request, response.status)) return await response.json() async def post(self, request, data): async with self.aiosession.post('{}{}/{}'.format( TELEGRAM_API_URL, TELEGRAM_API_TOKEN, request ), json=data) as response: if not response.ok: logger.error('{} {}'.format(request, response.status)) raise Exception('{} {}'.format(request, response.status)) return await response.json() async def message_send(self, text, chat_id=None): chat_id_actual = chat_id if chat_id != None else TELEGRAM_CHAT_ID text_actual = text if ENV == 'dev' and chat_id != TELEGRAM_CHAT_ID: text_actual = f"⚠️ Message intended for chat id {chat_id}\n\n" + text chat_id_actual = TELEGRAM_CHAT_ID data = { 'chat_id': chat_id_actual, 'text': text_actual } return await self.post('sendMessage', data=data) async def updates_get(self, offset=0, timeout=240): return await self.get('getUpdates', params={ 'offset': offset, 'timeout': timeout }) class TelegramCommand(): command_regex = None command_text = None authorized = True def matches(self, text): return ( (self.command_text is not None and text == self.command_text) or (self.command_regex is not None and self.command_regex.match(text) is not None) ) async def exec(self, *args, **kwargs): try: async for reply in self.exec_inner(*args, **kwargs): yield reply except Exception as e: yield "There was a problem with your command" logger.error(f"{self.__class__.__name__} {e}") async def exec_inner(self, text, update, session): pass async def handle_commands(commands=[], loop=None): async with make_session() as session: t = Telegram(session) command_tasks = set() command_futures = {} def forResponse(): f = loop.create_future() command_futures[chat_id] = f return f offset = 0 while True: try: updates = await t.updates_get(offset=offset) if(updates['ok']): for update in updates['result']: if('message' in update): chat_id = update['message']['chat']['id'] if(chat_id != TELEGRAM_CHAT_ID): continue if('text' not in update['message']): continue if(chat_id in command_futures): command_futures[chat_id].set_result( update['message']['text'] ) del command_futures[chat_id] continue for command in commands: if(command.matches(update['message']['text'])): async def timeoutTask(): try: async with asyncio.timeout(TELEGRAM_COMMAND_TIMEOUT): async for reply in command.exec( update['message']['text'], update, session, forResponse=forResponse ): await t.message_send(reply, chat_id) except TimeoutError: if chat_id in command_futures: del command_futures[chat_id] await t.message_send("Your command has timed out", chat_id) except Exception: await t.message_send('There was a problem with your command', chat_id) logger.exception('Problem while executing a command') task = asyncio.create_task( timeoutTask() ) command_tasks.add(task) task.add_done_callback(command_tasks.discard) break offset = update['update_id'] + 1 else: logger.warning("getUpdates was not ok {}".format(update)) await asyncio.sleep(30) except Exception as e: logger.exception("getUpdates exception {}".format(e)) await asyncio.sleep(30)