import logging import traceback from aiohttp import ClientSession, ClientTimeout, TCPConnector from aiohttp.client_exceptions import ClientConnectorError, ServerTimeoutError from aputils import Nodeinfo, WellKnownNodeinfo from datetime import datetime from cachetools import LRUCache from json.decoder import JSONDecodeError from urllib.error import HTTPError from urllib.parse import urlparse from urllib.request import Request, urlopen from . import __version__ from .misc import ( MIMETYPES, AppBase, DotDict, Message ) HEADERS = { 'Accept': f'{MIMETYPES["activity"]}, {MIMETYPES["json"]};q=0.9', 'User-Agent': f'ActivityRelay/{__version__}' } class Cache(LRUCache): def set_maxsize(self, value): self.__maxsize = int(value) class HttpClient(AppBase): def __init__(self, limit=100, timeout=10, cache_size=1024): self.cache = Cache(cache_size) self.cfg = {'limit': limit, 'timeout': timeout} self._conn = None self._session = None async def __aenter__(self): await self.open() return self async def __aexit__(self, *_): await self.close() @property def limit(self): return self.cfg['limit'] @property def timeout(self): return self.cfg['timeout'] def setup(self): with self.database.session as s: config = s.get_config_all() self.client.cfg['limit'] = config.push_limit self.client.cfg['timeout'] = config.http_timeout self.client.cache.set_maxsize(config.json_cache) async def open(self): if self._session and self._session._loop.is_running(): return self._conn = TCPConnector( limit = self.limit, ttl_dns_cache = 300, ) self._session = ClientSession( connector = self._conn, headers = HEADERS, connector_owner = True, timeout = ClientTimeout(total=self.timeout) ) async def close(self): if not self._session: return await self._session.close() await self._conn.close() self._conn = None self._session = None async def get(self, url, sign_headers=False, loads=None, force=False): await self.open() try: url, _ = url.split('#', 1) except: pass if not force and url in self.cache: return self.cache[url] headers = {} if sign_headers: headers.update(self.signer.sign_headers('GET', url, algorithm='original')) try: logging.verbose(f'Fetching resource: {url}') async with self._session.get(url, headers=headers) as resp: ## Not expecting a response with 202s, so just return if resp.status == 202: return elif resp.status != 200: logging.verbose(f'Received error when requesting {url}: {resp.status}') logging.verbose(await resp.read()) # change this to debug return if loads: message = await resp.json(loads=loads) elif resp.content_type == MIMETYPES['activity']: message = await resp.json(loads=Message.new_from_json) elif resp.content_type == MIMETYPES['json']: message = await resp.json(loads=DotDict.new_from_json) else: # todo: raise TypeError or something logging.verbose(f'Invalid Content-Type for "{url}": {resp.content_type}') return logging.debug(f'Response: {resp.read()}') logging.debug(f'{url} >> resp {message.to_json(4)}') self.cache[url] = message return message except JSONDecodeError: logging.verbose(f'Failed to parse JSON') except (ClientConnectorError, ServerTimeoutError): logging.verbose(f'Failed to connect to {urlparse(url).netloc}') except Exception as e: traceback.print_exc() raise e async def post(self, inbox, message): await self.open() with self.database.session as s: instance = s.get_instance(inbox) ## Using the old algo by default is probably a better idea right now if instance and instance['software'] in {'mastodon'}: algorithm = 'hs2019' else: algorithm = 'original' headers = {'Content-Type': 'application/activity+json'} headers.update(self.signer.sign_headers('POST', inbox, message, algorithm=algorithm)) try: logging.verbose(f'Sending "{message.type}" to {inbox}') async with self._session.post(inbox, headers=headers, data=message.to_json()) as resp: ## Not expecting a response, so just return if resp.status in {200, 202}: return logging.verbose(f'Successfully sent "{message.type}" to {inbox}') logging.verbose(f'Received error when pushing to {inbox}: {resp.status}') return logging.verbose(await resp.read()) # change this to debug except (ClientConnectorError, ServerTimeoutError): logging.verbose(f'Failed to connect to {inbox}') ## prevent workers from being brought down except Exception as e: traceback.print_exc() ## Additional methods ## async def fetch_nodeinfo(self, domain): nodeinfo_url = None wk_nodeinfo = await self.get( f'https://{domain}/.well-known/nodeinfo', loads = WellKnownNodeinfo.new_from_json ) if not wk_nodeinfo: logging.verbose(f'Failed to fetch well-known nodeinfo url for domain: {domain}') return False for version in ['20', '21']: try: nodeinfo_url = wk_nodeinfo.get_url(version) except KeyError: pass if not nodeinfo_url: logging.verbose(f'Failed to fetch nodeinfo url for domain: {domain}') return False return await self.get(nodeinfo_url, loads=Nodeinfo.new_from_json) or False ## http client methods can't be called directly from manage.py, ## so here's some wrapper functions async def get(*args, **kwargs): async with HttpClient() as client: return await client.get(*args, **kwargs) async def post(*args, **kwargs): async with HttpClient() as client: return await client.post(*args, **kwargs) async def fetch_nodeinfo(*args, **kwargs): async with HttpClient() as client: return await client.fetch_nodeinfo(*args, **kwargs)