Fix error on 'relay inbox follow'

This commit is contained in:
Valentin 2022-12-24 18:38:02 +01:00
parent af7fcc66fd
commit 9842494fc6
No known key found for this signature in database
GPG key ID: AA2F52BADD7EF624
2 changed files with 166 additions and 125 deletions

6
.gitignore vendored
View file

@ -99,3 +99,9 @@ viera.jsonld
# config file # config file
relay.yaml relay.yaml
# JSON-LD file
relay.jsonld
# VS Code Launch Configuration
launch.json

View file

@ -11,182 +11,217 @@ from urllib.parse import urlparse
from . import __version__ from . import __version__
from .misc import ( from .misc import (
MIMETYPES, MIMETYPES,
DotDict, DotDict,
Message Message
) )
HEADERS = { HEADERS = {
'Accept': f'{MIMETYPES["activity"]}, {MIMETYPES["json"]};q=0.9', 'Accept': f'{MIMETYPES["activity"]}, {MIMETYPES["json"]};q=0.9',
'User-Agent': f'ActivityRelay/{__version__}' 'User-Agent': f'ActivityRelay/{__version__}'
} }
class Cache(LRUCache): class Cache(LRUCache):
def set_maxsize(self, value): def set_maxsize(self, value):
self.__maxsize = int(value) self.__maxsize = int(value)
class HttpClient: class HttpClient:
def __init__(self, database, limit=100, timeout=10, cache_size=1024): def __init__(self, database, limit=100, timeout=10, cache_size=1024):
self.database = database self.database = database
self.cache = Cache(cache_size) self.cache = Cache(cache_size)
self.cfg = {'limit': limit, 'timeout': timeout} self.cfg = {'limit': limit, 'timeout': timeout}
self._conn = None self._conn = None
self._session = None self._session = None
@property
def limit(self):
return self.cfg['limit']
@property @property
def limit(self): def timeout(self):
return self.cfg['limit'] return self.cfg['timeout']
async def open(self):
if self._session:
return
@property self._conn = TCPConnector(
def timeout(self): limit=self.limit,
return self.cfg['timeout'] ttl_dns_cache=300,
)
self._session = ClientSession(
connector=self._conn,
headers=HEADERS,
connector_owner=True,
timeout=ClientTimeout(total=self.timeout)
)
async def open(self): async def close(self):
if self._session: if not self._session:
return return
self._conn = TCPConnector( await self._session.close()
limit = self.limit, await self._conn.close()
ttl_dns_cache = 300,
)
self._session = ClientSession( self._conn = None
connector = self._conn, self._session = None
headers = HEADERS,
connector_owner = True,
timeout = ClientTimeout(total=self.timeout)
)
async def get(self, url, sign_headers=False, loads=None, force=False):
await self.open()
async def close(self): try:
if not self._session: url, _ = url.split('#', 1)
return except:
pass
await self._session.close() if not force and url in self.cache:
await self._conn.close() return self.cache[url]
self._conn = None headers = {}
self._session = None
if sign_headers:
headers.update(self.database.signer.sign_headers(
'GET', url, algorithm='original'))
async def get(self, url, sign_headers=False, loads=None, force=False): try:
await self.open() logging.verbose(f'Fetching resource: {url}')
try: url, _ = url.split('#', 1) async with self._session.get(url, headers=headers) as resp:
except: pass # Not expecting a response with 202s, so just return
if resp.status == 202:
return
if not force and url in self.cache: elif resp.status != 200:
return self.cache[url] logging.verbose(
f'Received error when requesting {url}: {resp.status}')
logging.verbose(await resp.read()) # change this to debug
return
headers = {} if loads:
message = await resp.json(loads=loads)
if sign_headers: elif resp.content_type == MIMETYPES['activity']:
headers.update(self.database.signer.sign_headers('GET', url, algorithm='original')) message = await resp.json(loads=Message.new_from_json)
try: elif resp.content_type == MIMETYPES['json']:
logging.verbose(f'Fetching resource: {url}') message = await resp.json(loads=DotDict.new_from_json)
async with self._session.get(url, headers=headers) as resp: else:
## Not expecting a response with 202s, so just return # todo: raise TypeError or something
if resp.status == 202: logging.verbose(
return f'Invalid Content-Type for "{url}": {resp.content_type}')
return logging.debug(f'Response: {resp.read()}')
elif resp.status != 200: logging.debug(f'{url} >> resp {message.to_json(4)}')
logging.verbose(f'Received error when requesting {url}: {resp.status}')
logging.verbose(await resp.read()) # change this to debug
return
if loads: self.cache[url] = message
message = await resp.json(loads=loads) return message
elif resp.content_type == MIMETYPES['activity']: except JSONDecodeError:
message = await resp.json(loads=Message.new_from_json) logging.verbose(f'Failed to parse JSON')
elif resp.content_type == MIMETYPES['json']: except (ClientConnectorError, ServerTimeoutError):
message = await resp.json(loads=DotDict.new_from_json) logging.verbose(f'Failed to connect to {urlparse(url).netloc}')
else: except Exception as e:
# todo: raise TypeError or something traceback.print_exc()
logging.verbose(f'Invalid Content-Type for "{url}": {resp.content_type}') raise e
return logging.debug(f'Response: {resp.read()}')
logging.debug(f'{url} >> resp {message.to_json(4)}') async def post(self, url, message):
instance = self.database.get_inbox(url)
self.cache[url] = message # Using the old algo by default is probably a better idea right now
return message if instance and instance.get('software') in {'mastodon'}:
algorithm = 'hs2019'
except JSONDecodeError: else:
logging.verbose(f'Failed to parse JSON') algorithm = 'original'
except (ClientConnectorError, ServerTimeoutError): headers = {'Content-Type': 'application/activity+json'}
logging.verbose(f'Failed to connect to {urlparse(url).netloc}') headers.update(self.database.signer.sign_headers(
'POST', url, message, algorithm=algorithm))
except Exception as e: try:
traceback.print_exc() logging.verbose(f'Sending "{message.type}" to {url}')
raise e logging.verbose(
f'url: {url}\nheaders: {headers}\ndata: {message.to_json()}')
# The following does not work and throws exception on 'relay inbox follow':
# Traceback (most recent call last):
# File "/home/vriess/Dokumente/Repos/relay-1/relay/http_client.py", line 153, in post
# async with self._session.post(url, headers=headers, data=message.to_json()) as resp:
# File "/home/vriess/Dokumente/Repos/relay-1/.venv/lib/python3.10/site-packages/aiohttp/client.py", line 1141, in __aenter__
# self._resp = await self._coro
# File "/home/vriess/Dokumente/Repos/relay-1/.venv/lib/python3.10/site-packages/aiohttp/client.py", line 448, in _request
# handle = tm.start()
# File "/home/vriess/Dokumente/Repos/relay-1/.venv/lib/python3.10/site-packages/aiohttp/helpers.py", line 651, in start
# return self._loop.call_at(when, self.__call__)
# File "/usr/lib/python3.10/asyncio/base_events.py", line 732, in call_at
# self._check_closed()
# File "/usr/lib/python3.10/asyncio/base_events.py", line 515, in _check_closed
# raise RuntimeError('Event loop is closed')
# RuntimeError: Event loop is closed
# ↓↓↓↓
# async with self._session.post(url, headers=headers, data=message.to_json()) as resp:
# ## Not expecting a response, so just return
# if resp.status in {200, 202}:
# return logging.info(f'Successfully sent "{message.type}" to {url}')
async def post(self, url, message): # logging.info(f'Received error when pushing to {url}: {resp.status}')
await self.open() # return logging.info(await resp.read()) # change this to debug
instance = self.database.get_inbox(url) # Creating a session here works for some reason and does not throw an error
async with ClientSession(
connector=TCPConnector(
limit=self.limit, ttl_dns_cache=300),
headers=HEADERS,
connector_owner=True,
timeout=ClientTimeout(total=self.timeout)) as session:
async with session.post(url, headers=headers, data=message.to_json()) as resp:
# Not expecting a response, so just return
if resp.status in {200, 202}:
return logging.info(f'Successfully sent "{message.type}" to {url}')
## Using the old algo by default is probably a better idea right now logging.info(
if instance and instance.get('software') in {'mastodon'}: f'Received error when pushing to {url}: {resp.status}')
algorithm = 'hs2019' # change this to debug
return logging.info(await resp.read())
else: except (ClientConnectorError, ServerTimeoutError):
algorithm = 'original' logging.verbose(f'Failed to connect to {url}')
headers = {'Content-Type': 'application/activity+json'} # prevent workers from being brought down
headers.update(self.database.signer.sign_headers('POST', url, message, algorithm=algorithm)) except Exception as e:
traceback.print_exc()
try: ## Additional methods ##
logging.verbose(f'Sending "{message.type}" to {url}')
async with self._session.post(url, headers=headers, data=message.to_json()) as resp: async def fetch_nodeinfo(self, domain):
## Not expecting a response, so just return nodeinfo_url = None
if resp.status in {200, 202}: wk_nodeinfo = await self.get(
return logging.verbose(f'Successfully sent "{message.type}" to {url}') f'https://{domain}/.well-known/nodeinfo',
loads=WellKnownNodeinfo.new_from_json
)
logging.verbose(f'Received error when pushing to {url}: {resp.status}') if not wk_nodeinfo:
return logging.verbose(await resp.read()) # change this to debug logging.verbose(
f'Failed to fetch well-known nodeinfo url for domain: {domain}')
return False
except (ClientConnectorError, ServerTimeoutError): for version in ['20', '21']:
logging.verbose(f'Failed to connect to {url}') try:
nodeinfo_url = wk_nodeinfo.get_url(version)
## prevent workers from being brought down except KeyError:
except Exception as e: pass
traceback.print_exc()
if not nodeinfo_url:
logging.verbose(
f'Failed to fetch nodeinfo url for domain: {domain}')
return False
## Additional methods ## return await self.get(nodeinfo_url, loads=Nodeinfo.new_from_json) or False
async def fetch_nodeinfo(self, domain):
nodeinfo_url = None
wk_nodeinfo = await self.get(
f'https://{domain}/.well-known/nodeinfo',
loads = WellKnownNodeinfo.new_from_json
)
if not wk_nodeinfo:
logging.verbose(f'Failed to fetch well-known nodeinfo url for domain: {domain}')
return False
for version in ['20', '21']:
try:
nodeinfo_url = wk_nodeinfo.get_url(version)
except KeyError:
pass
if not nodeinfo_url:
logging.verbose(f'Failed to fetch nodeinfo url for domain: {domain}')
return False
return await self.get(nodeinfo_url, loads=Nodeinfo.new_from_json) or False