sedi-relay/relay/http_client.py

233 lines
5.7 KiB
Python
Raw Normal View History

import logging
import traceback
from aiohttp import ClientSession, ClientTimeout, TCPConnector
2022-12-26 07:02:57 +00:00
from aiohttp.client_exceptions import ClientConnectionError, ClientSSLError
from asyncio.exceptions import TimeoutError as AsyncTimeoutError
from aputils import Nodeinfo, WellKnownNodeinfo
from datetime import datetime
from cachetools import LRUCache
from json.decoder import JSONDecodeError
2022-12-13 13:27:09 +00:00
from urllib.error import HTTPError
from urllib.parse import urlparse
2022-12-13 13:27:09 +00:00
from urllib.request import Request, urlopen
from . import __version__
from .misc import (
MIMETYPES,
2022-12-13 13:27:09 +00:00
AppBase,
DotDict,
2022-11-27 03:16:14 +00:00
Message
)
HEADERS = {
'Accept': f'{MIMETYPES["activity"]}, {MIMETYPES["json"]};q=0.9',
'User-Agent': f'ActivityRelay/{__version__}'
}
class Cache(LRUCache):
def set_maxsize(self, value):
self.__maxsize = int(value)
2022-12-13 13:27:09 +00:00
class HttpClient(AppBase):
def __init__(self, limit=100, timeout=10, cache_size=1024):
self.cache = Cache(cache_size)
self.cfg = {'limit': limit, 'timeout': timeout}
self._conn = None
self._session = None
2022-12-13 13:27:09 +00:00
async def __aenter__(self):
await self.open()
return self
async def __aexit__(self, *_):
await self.close()
@property
def limit(self):
return self.cfg['limit']
@property
def timeout(self):
return self.cfg['timeout']
2022-12-13 13:27:09 +00:00
def setup(self):
with self.database.session as s:
config = s.get_config_all()
self.client.cfg['limit'] = config.push_limit
self.client.cfg['timeout'] = config.http_timeout
self.client.cache.set_maxsize(config.json_cache)
async def open(self):
2022-12-13 13:27:09 +00:00
if self._session and self._session._loop.is_running():
return
self._conn = TCPConnector(
limit = self.limit,
ttl_dns_cache = 300,
)
self._session = ClientSession(
connector = self._conn,
headers = HEADERS,
connector_owner = True,
timeout = ClientTimeout(total=self.timeout)
)
async def close(self):
if not self._session:
return
await self._session.close()
await self._conn.close()
self._conn = None
self._session = None
async def get(self, url, sign_headers=False, loads=None, force=False):
await self.open()
try: url, _ = url.split('#', 1)
except: pass
if not force and url in self.cache:
return self.cache[url]
headers = {}
if sign_headers:
2022-12-13 13:27:09 +00:00
headers.update(self.signer.sign_headers('GET', url, algorithm='original'))
try:
logging.verbose(f'Fetching resource: {url}')
async with self._session.get(url, headers=headers) as resp:
## Not expecting a response with 202s, so just return
if resp.status == 202:
return
elif resp.status != 200:
logging.verbose(f'Received error when requesting {url}: {resp.status}')
logging.verbose(await resp.read()) # change this to debug
return
if loads:
2022-12-02 05:52:15 +00:00
message = await resp.json(loads=loads)
elif resp.content_type == MIMETYPES['activity']:
message = await resp.json(loads=Message.new_from_json)
elif resp.content_type == MIMETYPES['json']:
message = await resp.json(loads=DotDict.new_from_json)
else:
# todo: raise TypeError or something
logging.verbose(f'Invalid Content-Type for "{url}": {resp.content_type}')
return logging.debug(f'Response: {resp.read()}')
logging.debug(f'{url} >> resp {message.to_json(4)}')
self.cache[url] = message
return message
except JSONDecodeError:
logging.verbose(f'Failed to parse JSON')
2022-12-26 07:02:57 +00:00
except ClientSSLError:
logging.verbose(f'SSL error when connecting to {urlparse(url).netloc}')
except (AsyncTimeoutError, ClientConnectionError):
logging.verbose(f'Failed to connect to {urlparse(url).netloc}')
except Exception as e:
traceback.print_exc()
2022-12-13 13:27:09 +00:00
async def post(self, inbox, message):
await self.open()
2022-12-13 13:27:09 +00:00
with self.database.session as s:
instance = s.get_instance(inbox)
2022-11-27 03:16:14 +00:00
2022-11-27 22:25:54 +00:00
## Using the old algo by default is probably a better idea right now
2022-12-13 13:27:09 +00:00
if instance and instance['software'] in {'mastodon'}:
2022-12-02 05:11:22 +00:00
algorithm = 'hs2019'
2022-11-27 03:16:14 +00:00
else:
2022-12-02 05:11:22 +00:00
algorithm = 'original'
2022-11-27 03:16:14 +00:00
headers = {'Content-Type': 'application/activity+json'}
2022-12-13 13:27:09 +00:00
headers.update(self.signer.sign_headers('POST', inbox, message, algorithm=algorithm))
2022-11-27 03:16:14 +00:00
try:
2022-12-13 13:27:09 +00:00
logging.verbose(f'Sending "{message.type}" to {inbox}')
2022-12-13 13:27:09 +00:00
async with self._session.post(inbox, headers=headers, data=message.to_json()) as resp:
## Not expecting a response, so just return
if resp.status in {200, 202}:
2022-12-13 13:27:09 +00:00
return logging.verbose(f'Successfully sent "{message.type}" to {inbox}')
2022-12-13 13:27:09 +00:00
logging.verbose(f'Received error when pushing to {inbox}: {resp.status}')
return logging.verbose(await resp.read()) # change this to debug
except (ClientConnectorError, ServerTimeoutError):
2022-12-13 13:27:09 +00:00
logging.verbose(f'Failed to connect to {inbox}')
## prevent workers from being brought down
except Exception as e:
traceback.print_exc()
## Additional methods ##
2022-11-27 03:16:14 +00:00
async def fetch_nodeinfo(self, domain):
nodeinfo_url = None
wk_nodeinfo = await self.get(
f'https://{domain}/.well-known/nodeinfo',
loads = WellKnownNodeinfo.new_from_json
)
if not wk_nodeinfo:
logging.verbose(f'Failed to fetch well-known nodeinfo url for domain: {domain}')
return False
for version in ['20', '21']:
try:
nodeinfo_url = wk_nodeinfo.get_url(version)
except KeyError:
pass
if not nodeinfo_url:
logging.verbose(f'Failed to fetch nodeinfo url for domain: {domain}')
return False
2022-12-02 05:52:15 +00:00
return await self.get(nodeinfo_url, loads=Nodeinfo.new_from_json) or False
2022-12-13 13:27:09 +00:00
2022-12-14 00:44:08 +00:00
## http client methods can't be called directly from manage.py,
## so here's some wrapper functions
2022-12-13 13:27:09 +00:00
async def get(*args, **kwargs):
async with HttpClient() as client:
return await client.get(*args, **kwargs)
async def post(*args, **kwargs):
async with HttpClient() as client:
return await client.post(*args, **kwargs)
async def fetch_nodeinfo(*args, **kwargs):
async with HttpClient() as client:
return await client.fetch_nodeinfo(*args, **kwargs)