From 9842494fc692f6c6fb0e5b1346d8017207a36969 Mon Sep 17 00:00:00 2001 From: Valentin Date: Sat, 24 Dec 2022 18:38:02 +0100 Subject: [PATCH] Fix error on 'relay inbox follow' --- .gitignore | 6 + relay/http_client.py | 285 ++++++++++++++++++++++++------------------- 2 files changed, 166 insertions(+), 125 deletions(-) diff --git a/.gitignore b/.gitignore index fc8aedd..6fd7d55 100644 --- a/.gitignore +++ b/.gitignore @@ -99,3 +99,9 @@ viera.jsonld # config file relay.yaml + +# JSON-LD file +relay.jsonld + +# VS Code Launch Configuration +launch.json \ No newline at end of file diff --git a/relay/http_client.py b/relay/http_client.py index 8802471..87f8493 100644 --- a/relay/http_client.py +++ b/relay/http_client.py @@ -11,182 +11,217 @@ from urllib.parse import urlparse from . import __version__ from .misc import ( - MIMETYPES, - DotDict, - Message + MIMETYPES, + DotDict, + Message ) HEADERS = { - 'Accept': f'{MIMETYPES["activity"]}, {MIMETYPES["json"]};q=0.9', - 'User-Agent': f'ActivityRelay/{__version__}' + 'Accept': f'{MIMETYPES["activity"]}, {MIMETYPES["json"]};q=0.9', + 'User-Agent': f'ActivityRelay/{__version__}' } class Cache(LRUCache): - def set_maxsize(self, value): - self.__maxsize = int(value) + def set_maxsize(self, value): + self.__maxsize = int(value) class HttpClient: - def __init__(self, database, limit=100, timeout=10, cache_size=1024): - self.database = database - self.cache = Cache(cache_size) - self.cfg = {'limit': limit, 'timeout': timeout} - self._conn = None - self._session = None + def __init__(self, database, limit=100, timeout=10, cache_size=1024): + self.database = database + self.cache = Cache(cache_size) + self.cfg = {'limit': limit, 'timeout': timeout} + self._conn = None + self._session = None + @property + def limit(self): + return self.cfg['limit'] - @property - def limit(self): - return self.cfg['limit'] + @property + def timeout(self): + return self.cfg['timeout'] + async def open(self): + if self._session: + return - @property - def timeout(self): - return self.cfg['timeout'] + self._conn = TCPConnector( + limit=self.limit, + ttl_dns_cache=300, + ) + self._session = ClientSession( + connector=self._conn, + headers=HEADERS, + connector_owner=True, + timeout=ClientTimeout(total=self.timeout) + ) - async def open(self): - if self._session: - return + async def close(self): + if not self._session: + return - self._conn = TCPConnector( - limit = self.limit, - ttl_dns_cache = 300, - ) + await self._session.close() + await self._conn.close() - self._session = ClientSession( - connector = self._conn, - headers = HEADERS, - connector_owner = True, - timeout = ClientTimeout(total=self.timeout) - ) + self._conn = None + self._session = None + async def get(self, url, sign_headers=False, loads=None, force=False): + await self.open() - async def close(self): - if not self._session: - return + try: + url, _ = url.split('#', 1) + except: + pass - await self._session.close() - await self._conn.close() + if not force and url in self.cache: + return self.cache[url] - self._conn = None - self._session = None + headers = {} + if sign_headers: + headers.update(self.database.signer.sign_headers( + 'GET', url, algorithm='original')) - async def get(self, url, sign_headers=False, loads=None, force=False): - await self.open() + try: + logging.verbose(f'Fetching resource: {url}') - try: url, _ = url.split('#', 1) - except: pass + async with self._session.get(url, headers=headers) as resp: + # Not expecting a response with 202s, so just return + if resp.status == 202: + return - if not force and url in self.cache: - return self.cache[url] + elif resp.status != 200: + logging.verbose( + f'Received error when requesting {url}: {resp.status}') + logging.verbose(await resp.read()) # change this to debug + return - headers = {} + if loads: + message = await resp.json(loads=loads) - if sign_headers: - headers.update(self.database.signer.sign_headers('GET', url, algorithm='original')) + elif resp.content_type == MIMETYPES['activity']: + message = await resp.json(loads=Message.new_from_json) - try: - logging.verbose(f'Fetching resource: {url}') + elif resp.content_type == MIMETYPES['json']: + message = await resp.json(loads=DotDict.new_from_json) - async with self._session.get(url, headers=headers) as resp: - ## Not expecting a response with 202s, so just return - if resp.status == 202: - return + else: + # todo: raise TypeError or something + logging.verbose( + f'Invalid Content-Type for "{url}": {resp.content_type}') + return logging.debug(f'Response: {resp.read()}') - elif resp.status != 200: - logging.verbose(f'Received error when requesting {url}: {resp.status}') - logging.verbose(await resp.read()) # change this to debug - return + logging.debug(f'{url} >> resp {message.to_json(4)}') - if loads: - message = await resp.json(loads=loads) + self.cache[url] = message + return message - elif resp.content_type == MIMETYPES['activity']: - message = await resp.json(loads=Message.new_from_json) + except JSONDecodeError: + logging.verbose(f'Failed to parse JSON') - elif resp.content_type == MIMETYPES['json']: - message = await resp.json(loads=DotDict.new_from_json) + except (ClientConnectorError, ServerTimeoutError): + logging.verbose(f'Failed to connect to {urlparse(url).netloc}') - else: - # todo: raise TypeError or something - logging.verbose(f'Invalid Content-Type for "{url}": {resp.content_type}') - return logging.debug(f'Response: {resp.read()}') + except Exception as e: + traceback.print_exc() + raise e - logging.debug(f'{url} >> resp {message.to_json(4)}') + async def post(self, url, message): + instance = self.database.get_inbox(url) - self.cache[url] = message - return message + # Using the old algo by default is probably a better idea right now + if instance and instance.get('software') in {'mastodon'}: + algorithm = 'hs2019' - except JSONDecodeError: - logging.verbose(f'Failed to parse JSON') + else: + algorithm = 'original' - except (ClientConnectorError, ServerTimeoutError): - logging.verbose(f'Failed to connect to {urlparse(url).netloc}') + headers = {'Content-Type': 'application/activity+json'} + headers.update(self.database.signer.sign_headers( + 'POST', url, message, algorithm=algorithm)) - except Exception as e: - traceback.print_exc() - raise e + try: + logging.verbose(f'Sending "{message.type}" to {url}') + logging.verbose( + f'url: {url}\nheaders: {headers}\ndata: {message.to_json()}') + # The following does not work and throws exception on 'relay inbox follow': + # Traceback (most recent call last): + # File "/home/vriess/Dokumente/Repos/relay-1/relay/http_client.py", line 153, in post + # async with self._session.post(url, headers=headers, data=message.to_json()) as resp: + # File "/home/vriess/Dokumente/Repos/relay-1/.venv/lib/python3.10/site-packages/aiohttp/client.py", line 1141, in __aenter__ + # self._resp = await self._coro + # File "/home/vriess/Dokumente/Repos/relay-1/.venv/lib/python3.10/site-packages/aiohttp/client.py", line 448, in _request + # handle = tm.start() + # File "/home/vriess/Dokumente/Repos/relay-1/.venv/lib/python3.10/site-packages/aiohttp/helpers.py", line 651, in start + # return self._loop.call_at(when, self.__call__) + # File "/usr/lib/python3.10/asyncio/base_events.py", line 732, in call_at + # self._check_closed() + # File "/usr/lib/python3.10/asyncio/base_events.py", line 515, in _check_closed + # raise RuntimeError('Event loop is closed') + # RuntimeError: Event loop is closed + # ↓↓↓↓ + # async with self._session.post(url, headers=headers, data=message.to_json()) as resp: + # ## Not expecting a response, so just return + # if resp.status in {200, 202}: + # return logging.info(f'Successfully sent "{message.type}" to {url}') - async def post(self, url, message): - await self.open() + # logging.info(f'Received error when pushing to {url}: {resp.status}') + # return logging.info(await resp.read()) # change this to debug - instance = self.database.get_inbox(url) + # Creating a session here works for some reason and does not throw an error + async with ClientSession( + connector=TCPConnector( + limit=self.limit, ttl_dns_cache=300), + headers=HEADERS, + connector_owner=True, + timeout=ClientTimeout(total=self.timeout)) as session: + async with session.post(url, headers=headers, data=message.to_json()) as resp: + # Not expecting a response, so just return + if resp.status in {200, 202}: + return logging.info(f'Successfully sent "{message.type}" to {url}') - ## Using the old algo by default is probably a better idea right now - if instance and instance.get('software') in {'mastodon'}: - algorithm = 'hs2019' + logging.info( + f'Received error when pushing to {url}: {resp.status}') + # change this to debug + return logging.info(await resp.read()) - else: - algorithm = 'original' + except (ClientConnectorError, ServerTimeoutError): + logging.verbose(f'Failed to connect to {url}') - headers = {'Content-Type': 'application/activity+json'} - headers.update(self.database.signer.sign_headers('POST', url, message, algorithm=algorithm)) + # prevent workers from being brought down + except Exception as e: + traceback.print_exc() - try: - logging.verbose(f'Sending "{message.type}" to {url}') + ## Additional methods ## - async with self._session.post(url, headers=headers, data=message.to_json()) as resp: - ## Not expecting a response, so just return - if resp.status in {200, 202}: - return logging.verbose(f'Successfully sent "{message.type}" to {url}') + async def fetch_nodeinfo(self, domain): + nodeinfo_url = None + wk_nodeinfo = await self.get( + f'https://{domain}/.well-known/nodeinfo', + loads=WellKnownNodeinfo.new_from_json + ) - logging.verbose(f'Received error when pushing to {url}: {resp.status}') - return logging.verbose(await resp.read()) # change this to debug + if not wk_nodeinfo: + logging.verbose( + f'Failed to fetch well-known nodeinfo url for domain: {domain}') + return False - except (ClientConnectorError, ServerTimeoutError): - logging.verbose(f'Failed to connect to {url}') + for version in ['20', '21']: + try: + nodeinfo_url = wk_nodeinfo.get_url(version) - ## prevent workers from being brought down - except Exception as e: - traceback.print_exc() + except KeyError: + pass + if not nodeinfo_url: + logging.verbose( + f'Failed to fetch nodeinfo url for domain: {domain}') + return False - ## Additional methods ## - async def fetch_nodeinfo(self, domain): - nodeinfo_url = None - wk_nodeinfo = await self.get( - f'https://{domain}/.well-known/nodeinfo', - loads = WellKnownNodeinfo.new_from_json - ) - - if not wk_nodeinfo: - logging.verbose(f'Failed to fetch well-known nodeinfo url for domain: {domain}') - return False - - for version in ['20', '21']: - try: - nodeinfo_url = wk_nodeinfo.get_url(version) - - except KeyError: - pass - - if not nodeinfo_url: - logging.verbose(f'Failed to fetch nodeinfo url for domain: {domain}') - return False - - return await self.get(nodeinfo_url, loads=Nodeinfo.new_from_json) or False + return await self.get(nodeinfo_url, loads=Nodeinfo.new_from_json) or False