Compare commits

..

5 commits

Author SHA1 Message Date
Izalia Mae 0709d8deb9 remove unnecessary ActorView.validate_signature method 2024-03-27 12:52:58 -04:00
Izalia Mae 8ed1daeae5 allow HttpClient.post to accept bytes objects 2024-03-27 12:50:53 -04:00
Izalia Mae a1ceb0cb4b update aputils to 0.2.0 2024-03-27 12:50:07 -04:00
Izalia Mae e700f9baa2 use right algorithm for inbox pushes
* return instance rows from distill_inboxes
* pass the row of the instance being POSTed to the instance parameter of HttpClient.post
2024-03-27 12:48:22 -04:00
Izalia Mae 938d3f419e properly detect if running via systemd 2024-03-27 11:48:05 -04:00
8 changed files with 46 additions and 55 deletions

View file

@ -4,6 +4,7 @@ Description=ActivityPub Relay
[Service]
WorkingDirectory=/home/relay/relay
ExecStart=/usr/bin/python3 -m relay run
Environment="IS_SYSTEMD=1"
[Install]
WantedBy=multi-user.target

View file

@ -65,7 +65,7 @@ class Application(web.Application):
Application.DEFAULT = self
self['running'] = None
self['running'] = False
self['signer'] = None
self['start_time'] = None
self['cleanup_thread'] = None
@ -142,7 +142,7 @@ class Application(web.Application):
return timedelta(seconds=uptime.seconds)
def push_message(self, inbox: str, message: Message, instance: Row) -> None:
def push_message(self, inbox: str, message: Message | bytes, instance: Row) -> None:
self['push_queue'].put((inbox, message, instance))

View file

@ -42,15 +42,15 @@ class Connection(SqlConnection):
return get_app()
def distill_inboxes(self, message: Message) -> Iterator[str]:
def distill_inboxes(self, message: Message) -> Iterator[Row]:
src_domains = {
message.domain,
urlparse(message.object_id).netloc
}
for inbox in self.get_inboxes():
if inbox['domain'] not in src_domains:
yield inbox['inbox']
for instance in self.get_inboxes():
if instance['domain'] not in src_domains:
yield instance
def get_config(self, key: str) -> Any:

View file

@ -7,7 +7,7 @@ import typing
from aiohttp import ClientSession, ClientTimeout, TCPConnector
from aiohttp.client_exceptions import ClientConnectionError, ClientSSLError
from asyncio.exceptions import TimeoutError as AsyncTimeoutError
from aputils import JsonBase, Nodeinfo, WellKnownNodeinfo
from aputils import AlgorithmType, JsonBase, Nodeinfo, ObjectType, WellKnownNodeinfo
from json.decoder import JSONDecodeError
from urllib.parse import urlparse
@ -111,7 +111,7 @@ class HttpClient:
headers = {}
if sign_headers:
headers = self.signer.sign_headers('GET', url, algorithm = 'original')
headers = self.signer.sign_headers('GET', url, algorithm = AlgorithmType.HS2019)
try:
logging.debug('Fetching resource: %s', url)
@ -164,31 +164,50 @@ class HttpClient:
return cls.parse(data)
async def post(self, url: str, message: Message, instance: Row | None = None) -> None:
async def post(self, url: str, data: Message | bytes, instance: Row | None = None) -> None:
if not self._session:
raise RuntimeError('Client not open')
# Using the old algo by default is probably a better idea right now
# akkoma and pleroma do not support HS2019 and other software still needs to be tested
if instance and instance['software'] in {'mastodon'}:
algorithm = 'hs2019'
algorithm = AlgorithmType.HS2019
else:
algorithm = 'original'
algorithm = AlgorithmType.RSASHA256
headers = {'Content-Type': 'application/activity+json'}
headers.update(get_app().signer.sign_headers('POST', url, message, algorithm=algorithm))
body: bytes
message: Message
if isinstance(data, bytes):
body = data
message = Message.parse(data)
else:
body = data.to_json().encode("utf-8")
message = data
mtype = message.type.value if isinstance(message.type, ObjectType) else message.type
headers = self.signer.sign_headers(
'POST',
url,
body,
headers = {'Content-Type': 'application/activity+json'},
algorithm = algorithm
)
try:
logging.verbose('Sending "%s" to %s', message.type.value, url)
logging.verbose('Sending "%s" to %s', mtype, url)
async with self._session.post(url, headers = headers, data = message.to_json()) as resp:
async with self._session.post(url, headers = headers, data = body) as resp:
# Not expecting a response, so just return
if resp.status in {200, 202}:
logging.verbose('Successfully sent "%s" to %s', message.type.value, url)
logging.verbose('Successfully sent "%s" to %s', mtype, url)
return
logging.verbose('Received error when pushing to %s: %i', url, resp.status)
logging.debug(await resp.read())
logging.debug("message: %s", body.decode("utf-8"))
logging.debug("headers: %s", json.dumps(headers, indent = 4))
return
except ClientSSLError:

View file

@ -87,7 +87,7 @@ handlers: list[Any] = [logging.StreamHandler()]
if env_log_file:
handlers.append(logging.FileHandler(env_log_file))
if os.environ.get('INVOCATION_ID'):
if os.environ.get('IS_SYSTEMD'):
logging_format = '%(levelname)s: %(message)s'
else:

View file

@ -35,8 +35,8 @@ async def handle_relay(view: ActorView, conn: Connection) -> None:
message = Message.new_announce(view.config.domain, view.message.object_id)
logging.debug('>> relay: %s', message)
for inbox in conn.distill_inboxes(view.message):
view.app.push_message(inbox, message, view.instance)
for instance in conn.distill_inboxes(view.message):
view.app.push_message(instance["inbox"], message, instance)
view.cache.set('handle-relay', view.message.object_id, message.id, 'str')
@ -53,8 +53,8 @@ async def handle_forward(view: ActorView, conn: Connection) -> None:
message = Message.new_announce(view.config.domain, view.message)
logging.debug('>> forward: %s', message)
for inbox in conn.distill_inboxes(view.message):
view.app.push_message(inbox, message, view.instance)
for instance in conn.distill_inboxes(view.message):
view.app.push_message(instance["inbox"], await view.request.read(), instance)
view.cache.set('handle-relay', view.message.id, message.id, 'str')

View file

@ -3,6 +3,7 @@ from __future__ import annotations
import aputils
import traceback
import typing
import json
from .base import View, register_route
@ -71,7 +72,7 @@ class ActorView(View):
async def get_post_data(self) -> Response | None:
try:
self.signature = aputils.Signature.new_from_signature(self.request.headers['signature'])
self.signature = aputils.Signature.parse(self.request.headers['signature'])
except KeyError:
logging.verbose('Missing signature header')
@ -116,7 +117,7 @@ class ActorView(View):
return Response.new_error(400, 'actor missing public key', 'json')
try:
self.validate_signature(await self.request.read())
await self.signer.validate_aiohttp_request(self.request)
except aputils.SignatureFailureError as e:
logging.verbose('signature validation failed for "%s": %s', self.actor.id, e)
@ -125,36 +126,6 @@ class ActorView(View):
return None
def validate_signature(self, body: bytes) -> None:
headers = {key.lower(): value for key, value in self.request.headers.items()}
headers["(request-target)"] = " ".join([self.request.method.lower(), self.request.path])
if (digest := aputils.Digest.new_from_digest(headers.get("digest"))):
if not body:
raise aputils.SignatureFailureError("Missing body for digest verification")
if not digest.validate(body):
raise aputils.SignatureFailureError("Body digest does not match")
if self.signature.algorithm_type == aputils.AlgorithmType.HS2019:
if self.signature.created is None or self.signature.expires is None:
raise aputils.SignatureFailureError("Missing 'created' or 'expireds' parameter")
current_timestamp = aputils.HttpDate.new_utc().timestamp()
if self.signature.created > current_timestamp:
raise aputils.SignatureFailureError("Creation date after current date")
if self.signature.expires < current_timestamp:
raise aputils.SignatureFailureError("Signature has expired")
headers["(created)"] = str(self.signature.created)
headers["(expires)"] = str(self.signature.expires)
if not self.signer._validate_signature(headers, self.signature):
raise aputils.SignatureFailureError("Signature does not match")
@register_route('/.well-known/webfinger')
class WebfingerView(View):
async def get(self, request: Request) -> Response:

View file

@ -1,4 +1,4 @@
activitypub-utils == 0.1.9
activitypub-utils == 0.2.0
aiohttp >= 3.9.1
aiohttp-swagger[performance] == 1.0.16
argon2-cffi == 23.1.0