From af7fcc66fd9d5d85fb180c10c0b5892dbd63befe Mon Sep 17 00:00:00 2001 From: Izalia Mae Date: Sun, 11 Dec 2022 09:15:03 -0500 Subject: [PATCH 1/6] fix missing modules when building via pyinstaller --- relay.spec | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/relay.spec b/relay.spec index 57fedc7..c21a829 100644 --- a/relay.spec +++ b/relay.spec @@ -9,7 +9,13 @@ a = Analysis( pathex=[], binaries=[], datas=[], - hiddenimports=[], + hiddenimports=[ + 'aputils.enums', + 'aputils.errors', + 'aputils.misc', + 'aputils.objects', + 'aputils.signer' + ], hookspath=[], hooksconfig={}, runtime_hooks=[], From 9842494fc692f6c6fb0e5b1346d8017207a36969 Mon Sep 17 00:00:00 2001 From: Valentin Date: Sat, 24 Dec 2022 18:38:02 +0100 Subject: [PATCH 2/6] Fix error on 'relay inbox follow' --- .gitignore | 6 + relay/http_client.py | 285 ++++++++++++++++++++++++------------------- 2 files changed, 166 insertions(+), 125 deletions(-) diff --git a/.gitignore b/.gitignore index fc8aedd..6fd7d55 100644 --- a/.gitignore +++ b/.gitignore @@ -99,3 +99,9 @@ viera.jsonld # config file relay.yaml + +# JSON-LD file +relay.jsonld + +# VS Code Launch Configuration +launch.json \ No newline at end of file diff --git a/relay/http_client.py b/relay/http_client.py index 8802471..87f8493 100644 --- a/relay/http_client.py +++ b/relay/http_client.py @@ -11,182 +11,217 @@ from urllib.parse import urlparse from . import __version__ from .misc import ( - MIMETYPES, - DotDict, - Message + MIMETYPES, + DotDict, + Message ) HEADERS = { - 'Accept': f'{MIMETYPES["activity"]}, {MIMETYPES["json"]};q=0.9', - 'User-Agent': f'ActivityRelay/{__version__}' + 'Accept': f'{MIMETYPES["activity"]}, {MIMETYPES["json"]};q=0.9', + 'User-Agent': f'ActivityRelay/{__version__}' } class Cache(LRUCache): - def set_maxsize(self, value): - self.__maxsize = int(value) + def set_maxsize(self, value): + self.__maxsize = int(value) class HttpClient: - def __init__(self, database, limit=100, timeout=10, cache_size=1024): - self.database = database - self.cache = Cache(cache_size) - self.cfg = {'limit': limit, 'timeout': timeout} - self._conn = None - self._session = None + def __init__(self, database, limit=100, timeout=10, cache_size=1024): + self.database = database + self.cache = Cache(cache_size) + self.cfg = {'limit': limit, 'timeout': timeout} + self._conn = None + self._session = None + @property + def limit(self): + return self.cfg['limit'] - @property - def limit(self): - return self.cfg['limit'] + @property + def timeout(self): + return self.cfg['timeout'] + async def open(self): + if self._session: + return - @property - def timeout(self): - return self.cfg['timeout'] + self._conn = TCPConnector( + limit=self.limit, + ttl_dns_cache=300, + ) + self._session = ClientSession( + connector=self._conn, + headers=HEADERS, + connector_owner=True, + timeout=ClientTimeout(total=self.timeout) + ) - async def open(self): - if self._session: - return + async def close(self): + if not self._session: + return - self._conn = TCPConnector( - limit = self.limit, - ttl_dns_cache = 300, - ) + await self._session.close() + await self._conn.close() - self._session = ClientSession( - connector = self._conn, - headers = HEADERS, - connector_owner = True, - timeout = ClientTimeout(total=self.timeout) - ) + self._conn = None + self._session = None + async def get(self, url, sign_headers=False, loads=None, force=False): + await self.open() - async def close(self): - if not self._session: - return + try: + url, _ = url.split('#', 1) + except: + pass - await self._session.close() - await self._conn.close() + if not force and url in self.cache: + return self.cache[url] - self._conn = None - self._session = None + headers = {} + if sign_headers: + headers.update(self.database.signer.sign_headers( + 'GET', url, algorithm='original')) - async def get(self, url, sign_headers=False, loads=None, force=False): - await self.open() + try: + logging.verbose(f'Fetching resource: {url}') - try: url, _ = url.split('#', 1) - except: pass + async with self._session.get(url, headers=headers) as resp: + # Not expecting a response with 202s, so just return + if resp.status == 202: + return - if not force and url in self.cache: - return self.cache[url] + elif resp.status != 200: + logging.verbose( + f'Received error when requesting {url}: {resp.status}') + logging.verbose(await resp.read()) # change this to debug + return - headers = {} + if loads: + message = await resp.json(loads=loads) - if sign_headers: - headers.update(self.database.signer.sign_headers('GET', url, algorithm='original')) + elif resp.content_type == MIMETYPES['activity']: + message = await resp.json(loads=Message.new_from_json) - try: - logging.verbose(f'Fetching resource: {url}') + elif resp.content_type == MIMETYPES['json']: + message = await resp.json(loads=DotDict.new_from_json) - async with self._session.get(url, headers=headers) as resp: - ## Not expecting a response with 202s, so just return - if resp.status == 202: - return + else: + # todo: raise TypeError or something + logging.verbose( + f'Invalid Content-Type for "{url}": {resp.content_type}') + return logging.debug(f'Response: {resp.read()}') - elif resp.status != 200: - logging.verbose(f'Received error when requesting {url}: {resp.status}') - logging.verbose(await resp.read()) # change this to debug - return + logging.debug(f'{url} >> resp {message.to_json(4)}') - if loads: - message = await resp.json(loads=loads) + self.cache[url] = message + return message - elif resp.content_type == MIMETYPES['activity']: - message = await resp.json(loads=Message.new_from_json) + except JSONDecodeError: + logging.verbose(f'Failed to parse JSON') - elif resp.content_type == MIMETYPES['json']: - message = await resp.json(loads=DotDict.new_from_json) + except (ClientConnectorError, ServerTimeoutError): + logging.verbose(f'Failed to connect to {urlparse(url).netloc}') - else: - # todo: raise TypeError or something - logging.verbose(f'Invalid Content-Type for "{url}": {resp.content_type}') - return logging.debug(f'Response: {resp.read()}') + except Exception as e: + traceback.print_exc() + raise e - logging.debug(f'{url} >> resp {message.to_json(4)}') + async def post(self, url, message): + instance = self.database.get_inbox(url) - self.cache[url] = message - return message + # Using the old algo by default is probably a better idea right now + if instance and instance.get('software') in {'mastodon'}: + algorithm = 'hs2019' - except JSONDecodeError: - logging.verbose(f'Failed to parse JSON') + else: + algorithm = 'original' - except (ClientConnectorError, ServerTimeoutError): - logging.verbose(f'Failed to connect to {urlparse(url).netloc}') + headers = {'Content-Type': 'application/activity+json'} + headers.update(self.database.signer.sign_headers( + 'POST', url, message, algorithm=algorithm)) - except Exception as e: - traceback.print_exc() - raise e + try: + logging.verbose(f'Sending "{message.type}" to {url}') + logging.verbose( + f'url: {url}\nheaders: {headers}\ndata: {message.to_json()}') + # The following does not work and throws exception on 'relay inbox follow': + # Traceback (most recent call last): + # File "/home/vriess/Dokumente/Repos/relay-1/relay/http_client.py", line 153, in post + # async with self._session.post(url, headers=headers, data=message.to_json()) as resp: + # File "/home/vriess/Dokumente/Repos/relay-1/.venv/lib/python3.10/site-packages/aiohttp/client.py", line 1141, in __aenter__ + # self._resp = await self._coro + # File "/home/vriess/Dokumente/Repos/relay-1/.venv/lib/python3.10/site-packages/aiohttp/client.py", line 448, in _request + # handle = tm.start() + # File "/home/vriess/Dokumente/Repos/relay-1/.venv/lib/python3.10/site-packages/aiohttp/helpers.py", line 651, in start + # return self._loop.call_at(when, self.__call__) + # File "/usr/lib/python3.10/asyncio/base_events.py", line 732, in call_at + # self._check_closed() + # File "/usr/lib/python3.10/asyncio/base_events.py", line 515, in _check_closed + # raise RuntimeError('Event loop is closed') + # RuntimeError: Event loop is closed + # ↓↓↓↓ + # async with self._session.post(url, headers=headers, data=message.to_json()) as resp: + # ## Not expecting a response, so just return + # if resp.status in {200, 202}: + # return logging.info(f'Successfully sent "{message.type}" to {url}') - async def post(self, url, message): - await self.open() + # logging.info(f'Received error when pushing to {url}: {resp.status}') + # return logging.info(await resp.read()) # change this to debug - instance = self.database.get_inbox(url) + # Creating a session here works for some reason and does not throw an error + async with ClientSession( + connector=TCPConnector( + limit=self.limit, ttl_dns_cache=300), + headers=HEADERS, + connector_owner=True, + timeout=ClientTimeout(total=self.timeout)) as session: + async with session.post(url, headers=headers, data=message.to_json()) as resp: + # Not expecting a response, so just return + if resp.status in {200, 202}: + return logging.info(f'Successfully sent "{message.type}" to {url}') - ## Using the old algo by default is probably a better idea right now - if instance and instance.get('software') in {'mastodon'}: - algorithm = 'hs2019' + logging.info( + f'Received error when pushing to {url}: {resp.status}') + # change this to debug + return logging.info(await resp.read()) - else: - algorithm = 'original' + except (ClientConnectorError, ServerTimeoutError): + logging.verbose(f'Failed to connect to {url}') - headers = {'Content-Type': 'application/activity+json'} - headers.update(self.database.signer.sign_headers('POST', url, message, algorithm=algorithm)) + # prevent workers from being brought down + except Exception as e: + traceback.print_exc() - try: - logging.verbose(f'Sending "{message.type}" to {url}') + ## Additional methods ## - async with self._session.post(url, headers=headers, data=message.to_json()) as resp: - ## Not expecting a response, so just return - if resp.status in {200, 202}: - return logging.verbose(f'Successfully sent "{message.type}" to {url}') + async def fetch_nodeinfo(self, domain): + nodeinfo_url = None + wk_nodeinfo = await self.get( + f'https://{domain}/.well-known/nodeinfo', + loads=WellKnownNodeinfo.new_from_json + ) - logging.verbose(f'Received error when pushing to {url}: {resp.status}') - return logging.verbose(await resp.read()) # change this to debug + if not wk_nodeinfo: + logging.verbose( + f'Failed to fetch well-known nodeinfo url for domain: {domain}') + return False - except (ClientConnectorError, ServerTimeoutError): - logging.verbose(f'Failed to connect to {url}') + for version in ['20', '21']: + try: + nodeinfo_url = wk_nodeinfo.get_url(version) - ## prevent workers from being brought down - except Exception as e: - traceback.print_exc() + except KeyError: + pass + if not nodeinfo_url: + logging.verbose( + f'Failed to fetch nodeinfo url for domain: {domain}') + return False - ## Additional methods ## - async def fetch_nodeinfo(self, domain): - nodeinfo_url = None - wk_nodeinfo = await self.get( - f'https://{domain}/.well-known/nodeinfo', - loads = WellKnownNodeinfo.new_from_json - ) - - if not wk_nodeinfo: - logging.verbose(f'Failed to fetch well-known nodeinfo url for domain: {domain}') - return False - - for version in ['20', '21']: - try: - nodeinfo_url = wk_nodeinfo.get_url(version) - - except KeyError: - pass - - if not nodeinfo_url: - logging.verbose(f'Failed to fetch nodeinfo url for domain: {domain}') - return False - - return await self.get(nodeinfo_url, loads=Nodeinfo.new_from_json) or False + return await self.get(nodeinfo_url, loads=Nodeinfo.new_from_json) or False From 841d3e0ad9073f83750ba19519e6852665b8e11f Mon Sep 17 00:00:00 2001 From: Valentin Date: Sat, 24 Dec 2022 21:31:06 +0100 Subject: [PATCH 3/6] Revert converting tabs to whitespaces --- relay/http_client.py | 286 +++++++++++++++++++++---------------------- 1 file changed, 143 insertions(+), 143 deletions(-) diff --git a/relay/http_client.py b/relay/http_client.py index 87f8493..ef1242d 100644 --- a/relay/http_client.py +++ b/relay/http_client.py @@ -11,144 +11,144 @@ from urllib.parse import urlparse from . import __version__ from .misc import ( - MIMETYPES, - DotDict, - Message + MIMETYPES, + DotDict, + Message ) HEADERS = { - 'Accept': f'{MIMETYPES["activity"]}, {MIMETYPES["json"]};q=0.9', - 'User-Agent': f'ActivityRelay/{__version__}' + 'Accept': f'{MIMETYPES["activity"]}, {MIMETYPES["json"]};q=0.9', + 'User-Agent': f'ActivityRelay/{__version__}' } class Cache(LRUCache): - def set_maxsize(self, value): - self.__maxsize = int(value) + def set_maxsize(self, value): + self.__maxsize = int(value) class HttpClient: - def __init__(self, database, limit=100, timeout=10, cache_size=1024): - self.database = database - self.cache = Cache(cache_size) - self.cfg = {'limit': limit, 'timeout': timeout} - self._conn = None - self._session = None + def __init__(self, database, limit=100, timeout=10, cache_size=1024): + self.database = database + self.cache = Cache(cache_size) + self.cfg = {'limit': limit, 'timeout': timeout} + self._conn = None + self._session = None - @property - def limit(self): - return self.cfg['limit'] + @property + def limit(self): + return self.cfg['limit'] - @property - def timeout(self): - return self.cfg['timeout'] + @property + def timeout(self): + return self.cfg['timeout'] - async def open(self): - if self._session: - return + async def open(self): + if self._session: + return - self._conn = TCPConnector( - limit=self.limit, - ttl_dns_cache=300, - ) + self._conn = TCPConnector( + limit=self.limit, + ttl_dns_cache=300, + ) - self._session = ClientSession( - connector=self._conn, - headers=HEADERS, - connector_owner=True, - timeout=ClientTimeout(total=self.timeout) - ) + self._session = ClientSession( + connector=self._conn, + headers=HEADERS, + connector_owner=True, + timeout=ClientTimeout(total=self.timeout) + ) - async def close(self): - if not self._session: - return + async def close(self): + if not self._session: + return - await self._session.close() - await self._conn.close() + await self._session.close() + await self._conn.close() - self._conn = None - self._session = None + self._conn = None + self._session = None - async def get(self, url, sign_headers=False, loads=None, force=False): - await self.open() + async def get(self, url, sign_headers=False, loads=None, force=False): + await self.open() - try: - url, _ = url.split('#', 1) - except: - pass + try: + url, _ = url.split('#', 1) + except: + pass - if not force and url in self.cache: - return self.cache[url] + if not force and url in self.cache: + return self.cache[url] - headers = {} + headers = {} - if sign_headers: - headers.update(self.database.signer.sign_headers( - 'GET', url, algorithm='original')) + if sign_headers: + headers.update(self.database.signer.sign_headers( + 'GET', url, algorithm='original')) - try: - logging.verbose(f'Fetching resource: {url}') + try: + logging.verbose(f'Fetching resource: {url}') - async with self._session.get(url, headers=headers) as resp: - # Not expecting a response with 202s, so just return - if resp.status == 202: - return + async with self._session.get(url, headers=headers) as resp: + # Not expecting a response with 202s, so just return + if resp.status == 202: + return - elif resp.status != 200: - logging.verbose( - f'Received error when requesting {url}: {resp.status}') - logging.verbose(await resp.read()) # change this to debug - return + elif resp.status != 200: + logging.verbose( + f'Received error when requesting {url}: {resp.status}') + logging.verbose(await resp.read()) # change this to debug + return - if loads: - message = await resp.json(loads=loads) + if loads: + message = await resp.json(loads=loads) - elif resp.content_type == MIMETYPES['activity']: - message = await resp.json(loads=Message.new_from_json) + elif resp.content_type == MIMETYPES['activity']: + message = await resp.json(loads=Message.new_from_json) - elif resp.content_type == MIMETYPES['json']: - message = await resp.json(loads=DotDict.new_from_json) + elif resp.content_type == MIMETYPES['json']: + message = await resp.json(loads=DotDict.new_from_json) - else: - # todo: raise TypeError or something - logging.verbose( - f'Invalid Content-Type for "{url}": {resp.content_type}') - return logging.debug(f'Response: {resp.read()}') + else: + # todo: raise TypeError or something + logging.verbose( + f'Invalid Content-Type for "{url}": {resp.content_type}') + return logging.debug(f'Response: {resp.read()}') - logging.debug(f'{url} >> resp {message.to_json(4)}') + logging.debug(f'{url} >> resp {message.to_json(4)}') - self.cache[url] = message - return message + self.cache[url] = message + return message - except JSONDecodeError: - logging.verbose(f'Failed to parse JSON') + except JSONDecodeError: + logging.verbose(f'Failed to parse JSON') - except (ClientConnectorError, ServerTimeoutError): - logging.verbose(f'Failed to connect to {urlparse(url).netloc}') + except (ClientConnectorError, ServerTimeoutError): + logging.verbose(f'Failed to connect to {urlparse(url).netloc}') - except Exception as e: - traceback.print_exc() - raise e + except Exception as e: + traceback.print_exc() + raise e - async def post(self, url, message): - instance = self.database.get_inbox(url) + async def post(self, url, message): + instance = self.database.get_inbox(url) - # Using the old algo by default is probably a better idea right now - if instance and instance.get('software') in {'mastodon'}: - algorithm = 'hs2019' + # Using the old algo by default is probably a better idea right now + if instance and instance.get('software') in {'mastodon'}: + algorithm = 'hs2019' - else: - algorithm = 'original' + else: + algorithm = 'original' - headers = {'Content-Type': 'application/activity+json'} - headers.update(self.database.signer.sign_headers( - 'POST', url, message, algorithm=algorithm)) + headers = {'Content-Type': 'application/activity+json'} + headers.update(self.database.signer.sign_headers( + 'POST', url, message, algorithm=algorithm)) - try: - logging.verbose(f'Sending "{message.type}" to {url}') - logging.verbose( - f'url: {url}\nheaders: {headers}\ndata: {message.to_json()}') + try: + logging.verbose(f'Sending "{message.type}" to {url}') + logging.verbose( + f'url: {url}\nheaders: {headers}\ndata: {message.to_json()}') # The following does not work and throws exception on 'relay inbox follow': # Traceback (most recent call last): @@ -166,62 +166,62 @@ class HttpClient: # raise RuntimeError('Event loop is closed') # RuntimeError: Event loop is closed # ↓↓↓↓ - # async with self._session.post(url, headers=headers, data=message.to_json()) as resp: - # ## Not expecting a response, so just return - # if resp.status in {200, 202}: - # return logging.info(f'Successfully sent "{message.type}" to {url}') + # async with self._session.post(url, headers=headers, data=message.to_json()) as resp: + # ## Not expecting a response, so just return + # if resp.status in {200, 202}: + # return logging.info(f'Successfully sent "{message.type}" to {url}') - # logging.info(f'Received error when pushing to {url}: {resp.status}') - # return logging.info(await resp.read()) # change this to debug + # logging.info(f'Received error when pushing to {url}: {resp.status}') + # return logging.info(await resp.read()) # change this to debug # Creating a session here works for some reason and does not throw an error - async with ClientSession( - connector=TCPConnector( - limit=self.limit, ttl_dns_cache=300), - headers=HEADERS, - connector_owner=True, - timeout=ClientTimeout(total=self.timeout)) as session: - async with session.post(url, headers=headers, data=message.to_json()) as resp: - # Not expecting a response, so just return - if resp.status in {200, 202}: - return logging.info(f'Successfully sent "{message.type}" to {url}') + async with ClientSession( + connector=TCPConnector( + limit=self.limit, ttl_dns_cache=300), + headers=HEADERS, + connector_owner=True, + timeout=ClientTimeout(total=self.timeout)) as session: + async with session.post(url, headers=headers, data=message.to_json()) as resp: + # Not expecting a response, so just return + if resp.status in {200, 202}: + return logging.info(f'Successfully sent "{message.type}" to {url}') - logging.info( - f'Received error when pushing to {url}: {resp.status}') - # change this to debug - return logging.info(await resp.read()) + logging.info( + f'Received error when pushing to {url}: {resp.status}') + # change this to debug + return logging.info(await resp.read()) - except (ClientConnectorError, ServerTimeoutError): - logging.verbose(f'Failed to connect to {url}') + except (ClientConnectorError, ServerTimeoutError): + logging.verbose(f'Failed to connect to {url}') - # prevent workers from being brought down - except Exception as e: - traceback.print_exc() + # prevent workers from being brought down + except Exception as e: + traceback.print_exc() - ## Additional methods ## + ## Additional methods ## - async def fetch_nodeinfo(self, domain): - nodeinfo_url = None - wk_nodeinfo = await self.get( - f'https://{domain}/.well-known/nodeinfo', - loads=WellKnownNodeinfo.new_from_json - ) + async def fetch_nodeinfo(self, domain): + nodeinfo_url = None + wk_nodeinfo = await self.get( + f'https://{domain}/.well-known/nodeinfo', + loads=WellKnownNodeinfo.new_from_json + ) - if not wk_nodeinfo: - logging.verbose( - f'Failed to fetch well-known nodeinfo url for domain: {domain}') - return False + if not wk_nodeinfo: + logging.verbose( + f'Failed to fetch well-known nodeinfo url for domain: {domain}') + return False - for version in ['20', '21']: - try: - nodeinfo_url = wk_nodeinfo.get_url(version) + for version in ['20', '21']: + try: + nodeinfo_url = wk_nodeinfo.get_url(version) - except KeyError: - pass + except KeyError: + pass - if not nodeinfo_url: - logging.verbose( - f'Failed to fetch nodeinfo url for domain: {domain}') - return False + if not nodeinfo_url: + logging.verbose( + f'Failed to fetch nodeinfo url for domain: {domain}') + return False - return await self.get(nodeinfo_url, loads=Nodeinfo.new_from_json) or False + return await self.get(nodeinfo_url, loads=Nodeinfo.new_from_json) or False From 3d60ae2bbc8fe9c28cac0cfe28495b9b8c20e237 Mon Sep 17 00:00:00 2001 From: Valentin Date: Sat, 24 Dec 2022 21:40:37 +0100 Subject: [PATCH 4/6] Remove hiddenimports --- relay.spec | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/relay.spec b/relay.spec index c21a829..57fedc7 100644 --- a/relay.spec +++ b/relay.spec @@ -9,13 +9,7 @@ a = Analysis( pathex=[], binaries=[], datas=[], - hiddenimports=[ - 'aputils.enums', - 'aputils.errors', - 'aputils.misc', - 'aputils.objects', - 'aputils.signer' - ], + hiddenimports=[], hookspath=[], hooksconfig={}, runtime_hooks=[], From 6a0c1fe7267a1265e8d332ae5d5ef35448c2053a Mon Sep 17 00:00:00 2001 From: Valentin Date: Sat, 24 Dec 2022 22:23:32 +0100 Subject: [PATCH 5/6] Add software check on follow/unfollow --- relay/http_client.py | 129 ++++++++++++++++++------------------------- relay/manage.py | 12 +++- 2 files changed, 65 insertions(+), 76 deletions(-) diff --git a/relay/http_client.py b/relay/http_client.py index ef1242d..79ac8b9 100644 --- a/relay/http_client.py +++ b/relay/http_client.py @@ -87,55 +87,60 @@ class HttpClient: headers.update(self.database.signer.sign_headers( 'GET', url, algorithm='original')) - try: - logging.verbose(f'Fetching resource: {url}') + async with ClientSession( + connector=TCPConnector( + limit=self.limit, ttl_dns_cache=300), + headers=HEADERS, + connector_owner=True, + timeout=ClientTimeout(total=self.timeout)) as session: + try: + logging.verbose(f'Fetching resource: {url}') + async with session.get(url, headers=headers) as resp: + # Not expecting a response with 202s, so just return + if resp.status == 202: + return - async with self._session.get(url, headers=headers) as resp: - # Not expecting a response with 202s, so just return - if resp.status == 202: - return + elif resp.status != 200: + logging.verbose( + f'Received error when requesting {url}: {resp.status}') + logging.verbose(await resp.read()) # change this to debug + return - elif resp.status != 200: - logging.verbose( - f'Received error when requesting {url}: {resp.status}') - logging.verbose(await resp.read()) # change this to debug - return + if loads: + message = await resp.json(loads=loads) - if loads: - message = await resp.json(loads=loads) + elif resp.content_type == MIMETYPES['activity']: + message = await resp.json(loads=Message.new_from_json) - elif resp.content_type == MIMETYPES['activity']: - message = await resp.json(loads=Message.new_from_json) + elif resp.content_type == MIMETYPES['json']: + message = await resp.json(loads=DotDict.new_from_json) - elif resp.content_type == MIMETYPES['json']: - message = await resp.json(loads=DotDict.new_from_json) + else: + # todo: raise TypeError or something + logging.verbose( + f'Invalid Content-Type for "{url}": {resp.content_type}') + return logging.debug(f'Response: {resp.read()}') - else: - # todo: raise TypeError or something - logging.verbose( - f'Invalid Content-Type for "{url}": {resp.content_type}') - return logging.debug(f'Response: {resp.read()}') + logging.debug(f'{url} >> resp {message.to_json(4)}') - logging.debug(f'{url} >> resp {message.to_json(4)}') + self.cache[url] = message + return message - self.cache[url] = message - return message + except JSONDecodeError: + logging.verbose(f'Failed to parse JSON') - except JSONDecodeError: - logging.verbose(f'Failed to parse JSON') + except (ClientConnectorError, ServerTimeoutError): + logging.verbose(f'Failed to connect to {urlparse(url).netloc}') - except (ClientConnectorError, ServerTimeoutError): - logging.verbose(f'Failed to connect to {urlparse(url).netloc}') + except Exception as e: + traceback.print_exc() + raise e - except Exception as e: - traceback.print_exc() - raise e - - async def post(self, url, message): + async def post(self, url, message, software=None): instance = self.database.get_inbox(url) # Using the old algo by default is probably a better idea right now - if instance and instance.get('software') in {'mastodon'}: + if (instance and instance.get('software') in {'mastodon'}) or (software and software in {'mastodon'}): algorithm = 'hs2019' else: @@ -145,42 +150,18 @@ class HttpClient: headers.update(self.database.signer.sign_headers( 'POST', url, message, algorithm=algorithm)) - try: - logging.verbose(f'Sending "{message.type}" to {url}') - logging.verbose( - f'url: {url}\nheaders: {headers}\ndata: {message.to_json()}') + async with ClientSession( + connector=TCPConnector( + limit=self.limit, ttl_dns_cache=300), + headers=HEADERS, + connector_owner=True, + timeout=ClientTimeout(total=self.timeout)) as session: - # The following does not work and throws exception on 'relay inbox follow': - # Traceback (most recent call last): - # File "/home/vriess/Dokumente/Repos/relay-1/relay/http_client.py", line 153, in post - # async with self._session.post(url, headers=headers, data=message.to_json()) as resp: - # File "/home/vriess/Dokumente/Repos/relay-1/.venv/lib/python3.10/site-packages/aiohttp/client.py", line 1141, in __aenter__ - # self._resp = await self._coro - # File "/home/vriess/Dokumente/Repos/relay-1/.venv/lib/python3.10/site-packages/aiohttp/client.py", line 448, in _request - # handle = tm.start() - # File "/home/vriess/Dokumente/Repos/relay-1/.venv/lib/python3.10/site-packages/aiohttp/helpers.py", line 651, in start - # return self._loop.call_at(when, self.__call__) - # File "/usr/lib/python3.10/asyncio/base_events.py", line 732, in call_at - # self._check_closed() - # File "/usr/lib/python3.10/asyncio/base_events.py", line 515, in _check_closed - # raise RuntimeError('Event loop is closed') - # RuntimeError: Event loop is closed - # ↓↓↓↓ - # async with self._session.post(url, headers=headers, data=message.to_json()) as resp: - # ## Not expecting a response, so just return - # if resp.status in {200, 202}: - # return logging.info(f'Successfully sent "{message.type}" to {url}') + try: + logging.verbose(f'Sending "{message.type}" to {url}') + logging.verbose( + f'url: {url}\nheaders: {headers}\ndata: {message.to_json()}') - # logging.info(f'Received error when pushing to {url}: {resp.status}') - # return logging.info(await resp.read()) # change this to debug - - # Creating a session here works for some reason and does not throw an error - async with ClientSession( - connector=TCPConnector( - limit=self.limit, ttl_dns_cache=300), - headers=HEADERS, - connector_owner=True, - timeout=ClientTimeout(total=self.timeout)) as session: async with session.post(url, headers=headers, data=message.to_json()) as resp: # Not expecting a response, so just return if resp.status in {200, 202}: @@ -191,12 +172,12 @@ class HttpClient: # change this to debug return logging.info(await resp.read()) - except (ClientConnectorError, ServerTimeoutError): - logging.verbose(f'Failed to connect to {url}') + except (ClientConnectorError, ServerTimeoutError): + logging.verbose(f'Failed to connect to {url}') - # prevent workers from being brought down - except Exception as e: - traceback.print_exc() + # prevent workers from being brought down + except Exception as e: + traceback.print_exc() ## Additional methods ## diff --git a/relay/manage.py b/relay/manage.py index 0d7decc..5eb0a77 100644 --- a/relay/manage.py +++ b/relay/manage.py @@ -160,7 +160,11 @@ def cli_inbox_follow(actor): actor = actor ) - asyncio.run(app.client.post(inbox, message)) + # Fetch software to decide on algorithm + nodeinfo = asyncio.run(app.client.fetch_nodeinfo(domain)) + software = nodeinfo.sw_name if nodeinfo else None + + asyncio.run(app.client.post(inbox, message, software)) click.echo(f'Sent follow message to actor: {actor}') @@ -198,7 +202,11 @@ def cli_inbox_unfollow(actor): } ) - asyncio.run(app.client.post(inbox, message)) + # Fetch software to decide on algorithm + nodeinfo = asyncio.run(app.client.fetch_nodeinfo(domain)) + software = nodeinfo.sw_name if nodeinfo else None + + asyncio.run(app.client.post(inbox, message, software)) click.echo(f'Sent unfollow message to: {actor}') From acd38ce9f72b1f9a14da02bb1e57edd1c0da3ae7 Mon Sep 17 00:00:00 2001 From: Valentin Date: Sat, 24 Dec 2022 22:41:32 +0100 Subject: [PATCH 6/6] Fix algorithm reference --- relay/http_client.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/relay/http_client.py b/relay/http_client.py index 79ac8b9..57c0562 100644 --- a/relay/http_client.py +++ b/relay/http_client.py @@ -3,7 +3,7 @@ import traceback from aiohttp import ClientSession, ClientTimeout, TCPConnector from aiohttp.client_exceptions import ClientConnectorError, ServerTimeoutError -from aputils import Nodeinfo, WellKnownNodeinfo +from aputils import Nodeinfo, WellKnownNodeinfo, AlgorithmType from datetime import datetime from cachetools import LRUCache from json.decoder import JSONDecodeError @@ -141,10 +141,10 @@ class HttpClient: # Using the old algo by default is probably a better idea right now if (instance and instance.get('software') in {'mastodon'}) or (software and software in {'mastodon'}): - algorithm = 'hs2019' + algorithm = AlgorithmType.HS2019 else: - algorithm = 'original' + algorithm = AlgorithmType.ORIGINAL headers = {'Content-Type': 'application/activity+json'} headers.update(self.database.signer.sign_headers(