Merge branch 'master' into 'dev'

Draft: Fix 'relay inbox follow/unfollow'

See merge request pleroma/relay!49
This commit is contained in:
Valentin R 2022-12-26 07:09:01 +00:00
commit e3bfbbf19e
3 changed files with 103 additions and 73 deletions

6
.gitignore vendored
View file

@ -99,3 +99,9 @@ viera.jsonld
# config file # config file
relay.yaml relay.yaml
# JSON-LD file
relay.jsonld
# VS Code Launch Configuration
launch.json

View file

@ -3,7 +3,7 @@ import traceback
from aiohttp import ClientSession, ClientTimeout, TCPConnector from aiohttp import ClientSession, ClientTimeout, TCPConnector
from aiohttp.client_exceptions import ClientConnectorError, ServerTimeoutError from aiohttp.client_exceptions import ClientConnectorError, ServerTimeoutError
from aputils import Nodeinfo, WellKnownNodeinfo from aputils import Nodeinfo, WellKnownNodeinfo, AlgorithmType
from datetime import datetime from datetime import datetime
from cachetools import LRUCache from cachetools import LRUCache
from json.decoder import JSONDecodeError from json.decoder import JSONDecodeError
@ -36,34 +36,30 @@ class HttpClient:
self._conn = None self._conn = None
self._session = None self._session = None
@property @property
def limit(self): def limit(self):
return self.cfg['limit'] return self.cfg['limit']
@property @property
def timeout(self): def timeout(self):
return self.cfg['timeout'] return self.cfg['timeout']
async def open(self): async def open(self):
if self._session: if self._session:
return return
self._conn = TCPConnector( self._conn = TCPConnector(
limit = self.limit, limit=self.limit,
ttl_dns_cache = 300, ttl_dns_cache=300,
) )
self._session = ClientSession( self._session = ClientSession(
connector = self._conn, connector=self._conn,
headers = HEADERS, headers=HEADERS,
connector_owner = True, connector_owner=True,
timeout = ClientTimeout(total=self.timeout) timeout=ClientTimeout(total=self.timeout)
) )
async def close(self): async def close(self):
if not self._session: if not self._session:
return return
@ -74,12 +70,13 @@ class HttpClient:
self._conn = None self._conn = None
self._session = None self._session = None
async def get(self, url, sign_headers=False, loads=None, force=False): async def get(self, url, sign_headers=False, loads=None, force=False):
await self.open() await self.open()
try: url, _ = url.split('#', 1) try:
except: pass url, _ = url.split('#', 1)
except:
pass
if not force and url in self.cache: if not force and url in self.cache:
return self.cache[url] return self.cache[url]
@ -87,95 +84,113 @@ class HttpClient:
headers = {} headers = {}
if sign_headers: if sign_headers:
headers.update(self.database.signer.sign_headers('GET', url, algorithm='original')) headers.update(self.database.signer.sign_headers(
'GET', url, algorithm='original'))
try: async with ClientSession(
logging.verbose(f'Fetching resource: {url}') connector=TCPConnector(
limit=self.limit, ttl_dns_cache=300),
headers=HEADERS,
connector_owner=True,
timeout=ClientTimeout(total=self.timeout)) as session:
try:
logging.verbose(f'Fetching resource: {url}')
async with session.get(url, headers=headers) as resp:
# Not expecting a response with 202s, so just return
if resp.status == 202:
return
async with self._session.get(url, headers=headers) as resp: elif resp.status != 200:
## Not expecting a response with 202s, so just return logging.verbose(
if resp.status == 202: f'Received error when requesting {url}: {resp.status}')
return logging.verbose(await resp.read()) # change this to debug
return
elif resp.status != 200: if loads:
logging.verbose(f'Received error when requesting {url}: {resp.status}') message = await resp.json(loads=loads)
logging.verbose(await resp.read()) # change this to debug
return
if loads: elif resp.content_type == MIMETYPES['activity']:
message = await resp.json(loads=loads) message = await resp.json(loads=Message.new_from_json)
elif resp.content_type == MIMETYPES['activity']: elif resp.content_type == MIMETYPES['json']:
message = await resp.json(loads=Message.new_from_json) message = await resp.json(loads=DotDict.new_from_json)
elif resp.content_type == MIMETYPES['json']: else:
message = await resp.json(loads=DotDict.new_from_json) # todo: raise TypeError or something
logging.verbose(
f'Invalid Content-Type for "{url}": {resp.content_type}')
return logging.debug(f'Response: {resp.read()}')
else: logging.debug(f'{url} >> resp {message.to_json(4)}')
# todo: raise TypeError or something
logging.verbose(f'Invalid Content-Type for "{url}": {resp.content_type}')
return logging.debug(f'Response: {resp.read()}')
logging.debug(f'{url} >> resp {message.to_json(4)}') self.cache[url] = message
return message
self.cache[url] = message except JSONDecodeError:
return message logging.verbose(f'Failed to parse JSON')
except JSONDecodeError: except (ClientConnectorError, ServerTimeoutError):
logging.verbose(f'Failed to parse JSON') logging.verbose(f'Failed to connect to {urlparse(url).netloc}')
except (ClientConnectorError, ServerTimeoutError): except Exception as e:
logging.verbose(f'Failed to connect to {urlparse(url).netloc}') traceback.print_exc()
raise e
except Exception as e:
traceback.print_exc()
raise e
async def post(self, url, message):
await self.open()
async def post(self, url, message, software=None):
instance = self.database.get_inbox(url) instance = self.database.get_inbox(url)
## Using the old algo by default is probably a better idea right now # Using the old algo by default is probably a better idea right now
if instance and instance.get('software') in {'mastodon'}: if (instance and instance.get('software') in {'mastodon'}) or (software and software in {'mastodon'}):
algorithm = 'hs2019' algorithm = AlgorithmType.HS2019
else: else:
algorithm = 'original' algorithm = AlgorithmType.ORIGINAL
headers = {'Content-Type': 'application/activity+json'} headers = {'Content-Type': 'application/activity+json'}
headers.update(self.database.signer.sign_headers('POST', url, message, algorithm=algorithm)) headers.update(self.database.signer.sign_headers(
'POST', url, message, algorithm=algorithm))
try: async with ClientSession(
logging.verbose(f'Sending "{message.type}" to {url}') connector=TCPConnector(
limit=self.limit, ttl_dns_cache=300),
headers=HEADERS,
connector_owner=True,
timeout=ClientTimeout(total=self.timeout)) as session:
async with self._session.post(url, headers=headers, data=message.to_json()) as resp: try:
## Not expecting a response, so just return logging.verbose(f'Sending "{message.type}" to {url}')
if resp.status in {200, 202}: logging.verbose(
return logging.verbose(f'Successfully sent "{message.type}" to {url}') f'url: {url}\nheaders: {headers}\ndata: {message.to_json()}')
logging.verbose(f'Received error when pushing to {url}: {resp.status}') async with session.post(url, headers=headers, data=message.to_json()) as resp:
return logging.verbose(await resp.read()) # change this to debug # Not expecting a response, so just return
if resp.status in {200, 202}:
return logging.info(f'Successfully sent "{message.type}" to {url}')
except (ClientConnectorError, ServerTimeoutError): logging.info(
logging.verbose(f'Failed to connect to {url}') f'Received error when pushing to {url}: {resp.status}')
# change this to debug
return logging.info(await resp.read())
## prevent workers from being brought down except (ClientConnectorError, ServerTimeoutError):
except Exception as e: logging.verbose(f'Failed to connect to {url}')
traceback.print_exc()
# prevent workers from being brought down
except Exception as e:
traceback.print_exc()
## Additional methods ## ## Additional methods ##
async def fetch_nodeinfo(self, domain): async def fetch_nodeinfo(self, domain):
nodeinfo_url = None nodeinfo_url = None
wk_nodeinfo = await self.get( wk_nodeinfo = await self.get(
f'https://{domain}/.well-known/nodeinfo', f'https://{domain}/.well-known/nodeinfo',
loads = WellKnownNodeinfo.new_from_json loads=WellKnownNodeinfo.new_from_json
) )
if not wk_nodeinfo: if not wk_nodeinfo:
logging.verbose(f'Failed to fetch well-known nodeinfo url for domain: {domain}') logging.verbose(
f'Failed to fetch well-known nodeinfo url for domain: {domain}')
return False return False
for version in ['20', '21']: for version in ['20', '21']:
@ -186,7 +201,8 @@ class HttpClient:
pass pass
if not nodeinfo_url: if not nodeinfo_url:
logging.verbose(f'Failed to fetch nodeinfo url for domain: {domain}') logging.verbose(
f'Failed to fetch nodeinfo url for domain: {domain}')
return False return False
return await self.get(nodeinfo_url, loads=Nodeinfo.new_from_json) or False return await self.get(nodeinfo_url, loads=Nodeinfo.new_from_json) or False

View file

@ -160,7 +160,11 @@ def cli_inbox_follow(actor):
actor = actor actor = actor
) )
asyncio.run(app.client.post(inbox, message)) # Fetch software to decide on algorithm
nodeinfo = asyncio.run(app.client.fetch_nodeinfo(domain))
software = nodeinfo.sw_name if nodeinfo else None
asyncio.run(app.client.post(inbox, message, software))
click.echo(f'Sent follow message to actor: {actor}') click.echo(f'Sent follow message to actor: {actor}')
@ -198,7 +202,11 @@ def cli_inbox_unfollow(actor):
} }
) )
asyncio.run(app.client.post(inbox, message)) # Fetch software to decide on algorithm
nodeinfo = asyncio.run(app.client.fetch_nodeinfo(domain))
software = nodeinfo.sw_name if nodeinfo else None
asyncio.run(app.client.post(inbox, message, software))
click.echo(f'Sent unfollow message to: {actor}') click.echo(f'Sent unfollow message to: {actor}')