import re import logging import asyncio from settings import TELEGRAM_API_TOKEN, TELEGRAM_CHAT_ID from db import Database from util import make_session logger = logging.getLogger(__name__) TELEGRAM_API_URL = 'https://api.telegram.org/bot' class Telegram: def __init__(self, aiosession): if TELEGRAM_API_TOKEN is None: raise Exception("TELEGRAM_API_TOKEN is required") self.aiosession = aiosession async def get(self, request, params): async with self.aiosession.get('{}{}/{}'.format( TELEGRAM_API_URL, TELEGRAM_API_TOKEN, request ), params=params) as response: if not response.ok: logger.error('{} {}'.format(request, response.status)) raise Exception('{} {}'.format(request, response.status)) return await response.json() async def post(self, request, data): async with self.aiosession.post('{}{}/{}'.format( TELEGRAM_API_URL, TELEGRAM_API_TOKEN, request ), json=data) as response: if not response.ok: logger.error('{} {}'.format(request, response.status)) raise Exception('{} {}'.format(request, response.status)) return await response.json() async def message_send(self, text): data = { 'chat_id': TELEGRAM_CHAT_ID, 'text': text } return await self.post('sendMessage', data=data) async def updates_get(self, offset=0, timeout=240): return await self.get('getUpdates', params={ 'offset': offset, 'timeout': timeout }) class TelegramCommand(): command_regex = None command_text = None authorized = True def matches(self, text): return ( (self.command_text is not None and text.startswith(self.command_text)) or (self.command_regex is not None and self.command_regex.match(text) is not None) ) async def exec(self, text, update, session): pass class SkipDayAddCommand(TelegramCommand): command_regex = re.compile(r"^\/skip_day( (?P\d{4}-\d{2}-\d{2}))?$") async def exec(self, text, update, session): date_str = text.split(' ')[1] db = Database() db.skip_day_put(date_str) yield f"Added skip day {date_str}" commands = [ SkipDayAddCommand() ] async def do_reply(data, t): if(type(data) is str): await t.message_send( data ) async def handle_commands(commands=commands): async with make_session() as session: t = Telegram(session) offset = 0 while True: try: updates = await t.updates_get(offset=offset) if(updates['ok']): for update in updates['result']: if('message' in update): chat_id = update['message']['chat']['id'] if(chat_id != TELEGRAM_CHAT_ID): continue for command in commands: if(command.matches(update['message']['text'])): try: result = command.exec(update['message']['text'], update, session) if result.__class__.__name__ == 'async_generator': async for r in result: await do_reply(r, t) else: await do_reply(await result, t) except: await t.message_send('There was a problem with your command') logger.exception('Problem while executing a command') break offset = update['update_id'] + 1 else: logger.warning("getUpdates was not ok {}".format(update)) await asyncio.sleep(30) except Exception as e: logger.exception("getUpdates exception {}".format(e)) await asyncio.sleep(30)