import datetime import json from peewee import JOIN, fn from util import sqlite_time from .models import database, User, OrdersPool, DomSubUsers, Repeat, SkipDay, OrderStatus, MastodonServer, TimelineEvent def initdb(): database.connect() database.create_tables([ Repeat, SkipDay, OrderStatus ]) def user_add(username, chat_id): return User.create( telegram_username=username, telegram_chat_id=chat_id ) def user_get(username): return User.get( telegram_username=username ) def user_mastodon_server_set(id, mastodon_server): q = User.update( mastodon_server=mastodon_server ).where( User.id == id ) return q.execute() def user_mastodon_user_set(id, mastodon_username, mastodon_access_token): q = User.update( mastodon_username=mastodon_username, mastodon_access_token=mastodon_access_token ).where( User.id == id ) return q.execute() def user_preferences_set(id, mastodon_post_public, mastodon_attn_list): q = User.update( mastodon_attn_list=mastodon_attn_list, mastodon_post_public=mastodon_post_public ).where( User.id == id ) return q.execute() def user_has_doms(id): return DomSubUsers.select().where(DomSubUsers.sub_id == id).count() > 0 def mastodon_server_get(name): return MastodonServer.get(name=name) def mastodon_server_put(name, client_id, client_secret): return MastodonServer.create( name=name, client_id=client_id, client_secret=client_secret ) def orders_pool_list(user_id): return OrdersPool.select().where(OrdersPool.user_id == user_id) def orders_pool(user_id, set_id): return OrdersPool.get(OrdersPool.user_id == user_id, OrdersPool.id == set_id) def orders_pool_by_id(pool_id): return OrdersPool.get(OrdersPool.id == pool_id) def orders_pool_since(dt): return OrdersPool.select().where(OrdersPool.updated_at > dt) def orders_pool_scheduled(): return OrdersPool.select().where(OrdersPool.scheduled == True) def domsubusers_add(sub, dom): return DomSubUsers.create( sub=sub, dom=dom ) def domsubusers_delete(sub, dom): q = DomSubUsers.get( sub=sub, dom=dom ).delete() return q.execute() def domsubusers_list(dom): return DomSubUsers.select().where(DomSubUsers.dom == dom) def domsubusers_doms(sub): return [dsu.dom for dsu in DomSubUsers.select(DomSubUsers.dom).where(DomSubUsers.sub_id == sub.id)] def repeat_get(orders_pool_id): try: return Repeat.get(orders_pool_id = orders_pool_id) except Repeat.DoesNotExist: return None def repeat_increment(id): r = Repeat.get(id=id) q = Repeat.update( count = r.count + 1 ) q.execute() def repeat_put(orders_pool_id, probability, orders): return Repeat.create( orders_pool_id=orders_pool_id, probability=probability, orders=orders ) def repeat_clear(id): q = Repeat.delete() q.execute() def skip_day_put(user, date, order_pool=None): return SkipDay.create( user=user, date=date, pool=order_pool ) def skip_day_delete(user, date): q = SkipDay.delete().where((SkipDay.user == user) & (SkipDay.date == date)) return q.execute() def skip_days_upcoming(user): return (SkipDay.select() .where((SkipDay.user == user) & (SkipDay.date >= datetime.date.today())) .order_by(SkipDay.date) .limit(10)) def skip_day_get(user, date): return SkipDay.select().where((SkipDay.user == user) & (SkipDay.date == date)).get_or_none() def order_status_put(orders_pool, user, mastodon_id, created_at, due_at, text, punishment_for=None, verify_at=None): return OrderStatus.create( orders_pool_id=orders_pool.id, user_id=user.id, mastodon_id=mastodon_id, created_at=created_at, due_at=due_at, text=text, punishment_for=punishment_for, verify_at=verify_at ) def order_status_by_id(order_status_id): return OrderStatus.get(id=order_status_id) def order_status_confirm(id, confirmed_at): q = OrderStatus.update( confirmed_at=confirmed_at ).where( OrderStatus.id == id ) return q.execute() def order_status_outstanding(): Punishment = OrderStatus.alias() return (OrderStatus .select(OrderStatus) .where(OrderStatus.due_at.is_null(False) & OrderStatus.confirmed_at.is_null()) .join(Punishment, JOIN.LEFT_OUTER, on=(OrderStatus.id == Punishment.punishment_for)) .group_by(OrderStatus) .having(fn.COUNT(Punishment.id) == 0) ) def timeline_event_put(type, text, user, orders_pool=None, order_status=None, actor=None, extra=None): return TimelineEvent.create( updated_at=sqlite_time(datetime.datetime.now(datetime.UTC)), type=type, text=text, user=user, orders_pool=orders_pool, order_status=order_status, actor=actor, extra=json.dumps(extra) if extra is not None else None ) def timeline_event_recent(user_ids, actor_ids=None, limit=5): return (TimelineEvent .select() .where(( TimelineEvent.user_id.in_(user_ids) | (TimelineEvent.actor_user_id.in_(actor_ids) if actor_ids is not None else True) )) .order_by(TimelineEvent.updated_at.desc()) .limit(limit) )