gear-orders/mastodon.py

68 lines
1.9 KiB
Python
Raw Normal View History

import os
import logging
2025-11-14 04:03:20 +00:00
from settings import MASTODON_INSTANCE, MASTODON_ACCESS_TOKEN
API_STATUSES = '/api/v1/statuses'
API_STATUS_CONTEXT = '/api/v1/statuses/%(id)s/context'
logger = logging.getLogger(__name__)
class Mastodon:
def __init__(self, aiosession, instance=None, access_token=None):
self.aiosession = aiosession
self.instance = instance if instance is not None else MASTODON_INSTANCE
self.access_token = access_token if access_token is not None else MASTODON_ACCESS_TOKEN
async def get(self, instance, request, params={}):
async with self.aiosession.get(
'https://{}{}'.format(
instance, request
),
params=params,
headers={
'Authorization': f'Bearer {self.access_token}'
} if self.access_token else {}
) as response:
if not response.ok:
logger.error('{} {}'.format(request, response.status))
raise Exception('{} {}'.format(request, response.status))
return await response.json()
async def post(self, instance, request, data):
async with self.aiosession.post(
'https://{}{}'.format(
instance,
request,
),
json=data,
headers={
'Authorization': f'Bearer {self.access_token}'
} if self.access_token else {}
) as response:
if not response.ok:
logger.error('{} {} {}'.format(instance, request, response.status))
raise Exception('{} {} {}'.format(instance, request, response.status))
return await response.json()
2025-11-14 04:03:20 +00:00
async def statusPost(self, status, in_reply_to_id=None):
return await self.post(
self.instance,
API_STATUSES,
data={
'status': status,
2025-11-14 04:03:20 +00:00
'visibility': 'direct',
'in_reply_to_id': in_reply_to_id
})
async def statusContext(self, id):
return await self.get(
self.instance,
API_STATUS_CONTEXT % {
'id': id
}
)