diff --git a/.gitignore b/.gitignore index fc8aedd..6fd7d55 100644 --- a/.gitignore +++ b/.gitignore @@ -99,3 +99,9 @@ viera.jsonld # config file relay.yaml + +# JSON-LD file +relay.jsonld + +# VS Code Launch Configuration +launch.json \ No newline at end of file diff --git a/relay/http_client.py b/relay/http_client.py index 8802471..57c0562 100644 --- a/relay/http_client.py +++ b/relay/http_client.py @@ -3,7 +3,7 @@ import traceback from aiohttp import ClientSession, ClientTimeout, TCPConnector from aiohttp.client_exceptions import ClientConnectorError, ServerTimeoutError -from aputils import Nodeinfo, WellKnownNodeinfo +from aputils import Nodeinfo, WellKnownNodeinfo, AlgorithmType from datetime import datetime from cachetools import LRUCache from json.decoder import JSONDecodeError @@ -36,34 +36,30 @@ class HttpClient: self._conn = None self._session = None - @property def limit(self): return self.cfg['limit'] - @property def timeout(self): return self.cfg['timeout'] - async def open(self): if self._session: return self._conn = TCPConnector( - limit = self.limit, - ttl_dns_cache = 300, + limit=self.limit, + ttl_dns_cache=300, ) self._session = ClientSession( - connector = self._conn, - headers = HEADERS, - connector_owner = True, - timeout = ClientTimeout(total=self.timeout) + connector=self._conn, + headers=HEADERS, + connector_owner=True, + timeout=ClientTimeout(total=self.timeout) ) - async def close(self): if not self._session: return @@ -74,12 +70,13 @@ class HttpClient: self._conn = None self._session = None - async def get(self, url, sign_headers=False, loads=None, force=False): await self.open() - try: url, _ = url.split('#', 1) - except: pass + try: + url, _ = url.split('#', 1) + except: + pass if not force and url in self.cache: return self.cache[url] @@ -87,95 +84,113 @@ class HttpClient: headers = {} if sign_headers: - headers.update(self.database.signer.sign_headers('GET', url, algorithm='original')) + headers.update(self.database.signer.sign_headers( + 'GET', url, algorithm='original')) - try: - logging.verbose(f'Fetching resource: {url}') + async with ClientSession( + connector=TCPConnector( + limit=self.limit, ttl_dns_cache=300), + headers=HEADERS, + connector_owner=True, + timeout=ClientTimeout(total=self.timeout)) as session: + try: + logging.verbose(f'Fetching resource: {url}') + async with session.get(url, headers=headers) as resp: + # Not expecting a response with 202s, so just return + if resp.status == 202: + return - async with self._session.get(url, headers=headers) as resp: - ## Not expecting a response with 202s, so just return - if resp.status == 202: - return + elif resp.status != 200: + logging.verbose( + f'Received error when requesting {url}: {resp.status}') + logging.verbose(await resp.read()) # change this to debug + return - elif resp.status != 200: - logging.verbose(f'Received error when requesting {url}: {resp.status}') - logging.verbose(await resp.read()) # change this to debug - return + if loads: + message = await resp.json(loads=loads) - if loads: - message = await resp.json(loads=loads) + elif resp.content_type == MIMETYPES['activity']: + message = await resp.json(loads=Message.new_from_json) - elif resp.content_type == MIMETYPES['activity']: - message = await resp.json(loads=Message.new_from_json) + elif resp.content_type == MIMETYPES['json']: + message = await resp.json(loads=DotDict.new_from_json) - elif resp.content_type == MIMETYPES['json']: - message = await resp.json(loads=DotDict.new_from_json) + else: + # todo: raise TypeError or something + logging.verbose( + f'Invalid Content-Type for "{url}": {resp.content_type}') + return logging.debug(f'Response: {resp.read()}') - else: - # todo: raise TypeError or something - logging.verbose(f'Invalid Content-Type for "{url}": {resp.content_type}') - return logging.debug(f'Response: {resp.read()}') + logging.debug(f'{url} >> resp {message.to_json(4)}') - logging.debug(f'{url} >> resp {message.to_json(4)}') + self.cache[url] = message + return message - self.cache[url] = message - return message + except JSONDecodeError: + logging.verbose(f'Failed to parse JSON') - except JSONDecodeError: - logging.verbose(f'Failed to parse JSON') + except (ClientConnectorError, ServerTimeoutError): + logging.verbose(f'Failed to connect to {urlparse(url).netloc}') - except (ClientConnectorError, ServerTimeoutError): - logging.verbose(f'Failed to connect to {urlparse(url).netloc}') - - except Exception as e: - traceback.print_exc() - raise e - - - async def post(self, url, message): - await self.open() + except Exception as e: + traceback.print_exc() + raise e + async def post(self, url, message, software=None): instance = self.database.get_inbox(url) - ## Using the old algo by default is probably a better idea right now - if instance and instance.get('software') in {'mastodon'}: - algorithm = 'hs2019' + # Using the old algo by default is probably a better idea right now + if (instance and instance.get('software') in {'mastodon'}) or (software and software in {'mastodon'}): + algorithm = AlgorithmType.HS2019 else: - algorithm = 'original' + algorithm = AlgorithmType.ORIGINAL headers = {'Content-Type': 'application/activity+json'} - headers.update(self.database.signer.sign_headers('POST', url, message, algorithm=algorithm)) + headers.update(self.database.signer.sign_headers( + 'POST', url, message, algorithm=algorithm)) - try: - logging.verbose(f'Sending "{message.type}" to {url}') + async with ClientSession( + connector=TCPConnector( + limit=self.limit, ttl_dns_cache=300), + headers=HEADERS, + connector_owner=True, + timeout=ClientTimeout(total=self.timeout)) as session: - async with self._session.post(url, headers=headers, data=message.to_json()) as resp: - ## Not expecting a response, so just return - if resp.status in {200, 202}: - return logging.verbose(f'Successfully sent "{message.type}" to {url}') + try: + logging.verbose(f'Sending "{message.type}" to {url}') + logging.verbose( + f'url: {url}\nheaders: {headers}\ndata: {message.to_json()}') - logging.verbose(f'Received error when pushing to {url}: {resp.status}') - return logging.verbose(await resp.read()) # change this to debug + async with session.post(url, headers=headers, data=message.to_json()) as resp: + # Not expecting a response, so just return + if resp.status in {200, 202}: + return logging.info(f'Successfully sent "{message.type}" to {url}') - except (ClientConnectorError, ServerTimeoutError): - logging.verbose(f'Failed to connect to {url}') + logging.info( + f'Received error when pushing to {url}: {resp.status}') + # change this to debug + return logging.info(await resp.read()) - ## prevent workers from being brought down - except Exception as e: - traceback.print_exc() + except (ClientConnectorError, ServerTimeoutError): + logging.verbose(f'Failed to connect to {url}') + # prevent workers from being brought down + except Exception as e: + traceback.print_exc() ## Additional methods ## + async def fetch_nodeinfo(self, domain): nodeinfo_url = None wk_nodeinfo = await self.get( f'https://{domain}/.well-known/nodeinfo', - loads = WellKnownNodeinfo.new_from_json + loads=WellKnownNodeinfo.new_from_json ) if not wk_nodeinfo: - logging.verbose(f'Failed to fetch well-known nodeinfo url for domain: {domain}') + logging.verbose( + f'Failed to fetch well-known nodeinfo url for domain: {domain}') return False for version in ['20', '21']: @@ -186,7 +201,8 @@ class HttpClient: pass if not nodeinfo_url: - logging.verbose(f'Failed to fetch nodeinfo url for domain: {domain}') + logging.verbose( + f'Failed to fetch nodeinfo url for domain: {domain}') return False return await self.get(nodeinfo_url, loads=Nodeinfo.new_from_json) or False diff --git a/relay/manage.py b/relay/manage.py index 0d7decc..5eb0a77 100644 --- a/relay/manage.py +++ b/relay/manage.py @@ -160,7 +160,11 @@ def cli_inbox_follow(actor): actor = actor ) - asyncio.run(app.client.post(inbox, message)) + # Fetch software to decide on algorithm + nodeinfo = asyncio.run(app.client.fetch_nodeinfo(domain)) + software = nodeinfo.sw_name if nodeinfo else None + + asyncio.run(app.client.post(inbox, message, software)) click.echo(f'Sent follow message to actor: {actor}') @@ -198,7 +202,11 @@ def cli_inbox_unfollow(actor): } ) - asyncio.run(app.client.post(inbox, message)) + # Fetch software to decide on algorithm + nodeinfo = asyncio.run(app.client.fetch_nodeinfo(domain)) + software = nodeinfo.sw_name if nodeinfo else None + + asyncio.run(app.client.post(inbox, message, software)) click.echo(f'Sent unfollow message to: {actor}')