create Message class

This commit is contained in:
Izalia Mae 2022-11-07 05:30:13 -05:00
parent 3b85e2c2f2
commit c66f9d34b3
4 changed files with 229 additions and 146 deletions

View file

@ -81,9 +81,7 @@ def cli_inbox_follow(actor):
if not actor_data: if not actor_data:
return click.echo(f'Error: Failed to fetch actor: {actor}') return click.echo(f'Error: Failed to fetch actor: {actor}')
inbox = misc.get_actor_inbox(actor_data) database.add_inbox(actor_data.shared_inbox)
database.add_inbox(inbox)
database.save() database.save()
run_in_loop(misc.follow_remote_actor, actor) run_in_loop(misc.follow_remote_actor, actor)

View file

@ -4,6 +4,7 @@ import json
import logging import logging
import socket import socket
import traceback import traceback
import uuid
from Crypto.Hash import SHA, SHA256, SHA512 from Crypto.Hash import SHA, SHA256, SHA512
from Crypto.PublicKey import RSA from Crypto.PublicKey import RSA
@ -57,27 +58,12 @@ def create_signature_header(headers):
return ','.join(chunks) return ','.join(chunks)
def distill_object_id(activity):
logging.debug(f'>> determining object ID for {activity["object"]}')
try:
return activity['object']['id']
except TypeError:
return activity['object']
def distill_inboxes(actor, object_id): def distill_inboxes(actor, object_id):
database = app['database'] database = app['database']
origin_hostname = urlparse(object_id).hostname
actor_inbox = get_actor_inbox(actor)
targets = []
for inbox in database.inboxes: for inbox in database.inboxes:
if inbox != actor_inbox or urlparse(inbox).hostname != origin_hostname: if inbox != actor.shared_inbox or urlparse(inbox).hostname != urlparse(object_id).hostname:
targets.append(inbox) yield inbox
return targets
def generate_body_digest(body): def generate_body_digest(body):
@ -93,10 +79,6 @@ def generate_body_digest(body):
return bodyhash return bodyhash
def get_actor_inbox(actor):
return actor.get('endpoints', {}).get('sharedInbox', actor['inbox'])
def sign_signing_string(sigstring, key): def sign_signing_string(sigstring, key):
pkcs = PKCS1_v1_5.new(key) pkcs = PKCS1_v1_5.new(key)
h = SHA256.new() h = SHA256.new()
@ -162,14 +144,11 @@ async def follow_remote_actor(actor_uri):
config = app['config'] config = app['config']
actor = await request(actor_uri) actor = await request(actor_uri)
inbox = get_actor_inbox(actor)
if not actor: if not actor:
logging.error(f'failed to fetch actor at: {actor_uri}') logging.error(f'failed to fetch actor at: {actor_uri}')
return return
logging.verbose(f'sending follow request: {actor_uri}')
message = { message = {
"@context": "https://www.w3.org/ns/activitystreams", "@context": "https://www.w3.org/ns/activitystreams",
"type": "Follow", "type": "Follow",
@ -179,7 +158,8 @@ async def follow_remote_actor(actor_uri):
"actor": f"https://{config.host}/actor" "actor": f"https://{config.host}/actor"
} }
await request(inbox, message) logging.verbose(f'sending follow request: {actor_uri}')
await request(actor.shared_inbox, message)
async def unfollow_remote_actor(actor_uri): async def unfollow_remote_actor(actor_uri):
@ -191,9 +171,6 @@ async def unfollow_remote_actor(actor_uri):
logging.error(f'failed to fetch actor: {actor_uri}') logging.error(f'failed to fetch actor: {actor_uri}')
return return
inbox = get_actor_inbox(actor)
logging.verbose(f'sending unfollow request to inbox: {inbox}')
message = { message = {
"@context": "https://www.w3.org/ns/activitystreams", "@context": "https://www.w3.org/ns/activitystreams",
"type": "Undo", "type": "Undo",
@ -208,7 +185,8 @@ async def unfollow_remote_actor(actor_uri):
"actor": f"https://{config.host}/actor" "actor": f"https://{config.host}/actor"
} }
await request(inbox, message) logging.verbose(f'sending unfollow request to inbox: {actor.shared_inbox}')
await request(actor.shared_inbox, message)
async def request(uri, data=None, force=False, sign_headers=True, activity=True): async def request(uri, data=None, force=False, sign_headers=True, activity=True):
@ -265,15 +243,27 @@ async def request(uri, data=None, force=False, sign_headers=True, activity=True)
async with session.request(method, uri, headers=headers, data=data) as resp: async with session.request(method, uri, headers=headers, data=data) as resp:
## aiohttp has been known to leak if the response hasn't been read, ## aiohttp has been known to leak if the response hasn't been read,
## so we're just gonna read the request no matter what ## so we're just gonna read the request no matter what
resp_data = await resp.json() resp_data = await resp.read()
if resp.status not in [200, 202]: ## Not expecting a response, so just return
if resp.status == 202:
return
elif resp.status != 200:
if not resp_data: if not resp_data:
logging.verbose(f'Received error when requesting {uri}: {resp.status} {resp_data}') return logging.verbose(f'Received error when requesting {uri}: {resp.status} {resp_data}')
return
logging.verbose(f'Received error when sending {action} to {uri}: {resp.status} {resp_data}') return logging.verbose(f'Received error when sending {action} to {uri}: {resp.status} {resp_data}')
return
if resp.content_type == 'application/activity+json':
resp_data = await resp.json(loads=Message.new_from_json)
elif resp.content_type == 'application/json':
resp_data = await resp.json(loads=DotDict.new_from_json)
else:
logging.verbose(f'Invalid Content-Type for "{url}": {resp.content_type}')
return logging.debug(f'Response: {resp_data}')
logging.debug(f'{uri} >> resp {resp_data}') logging.debug(f'{uri} >> resp {resp_data}')
@ -354,8 +344,127 @@ class DotDict(dict):
@classmethod @classmethod
def new_from_json(cls, data): def new_from_json(cls, data):
if not data:
raise JSONDecodeError('Empty body', data, 1)
try:
return cls(json.loads(data)) return cls(json.loads(data))
except ValueError:
raise JSONDecodeError('Invalid body', data, 1)
def to_json(self, indent=None): def to_json(self, indent=None):
return json.dumps(self, indent=indent) return json.dumps(self, indent=indent)
class Message(DotDict):
@classmethod
def new_actor(cls, host, pubkey, description=None):
return cls({
'@context': 'https://www.w3.org/ns/activitystreams',
'id': f'https://{host}/actor',
'type': 'Application',
'preferredUsername': 'relay',
'name': 'ActivityRelay',
'summary': description or 'ActivityRelay bot',
'followers': f'https://{host}/followers',
'following': f'https://{host}/following',
'inbox': f'https://{host}/inbox',
'url': f'https://{host}/inbox',
'endpoints': {
'sharedInbox': f'https://{host}/inbox'
},
'publicKey': {
'id': f'https://{host}/actor#main-key',
'owner': f'https://{host}/actor',
'publicKeyPem': pubkey
}
})
@classmethod
def new_announce(cls, host, object):
return cls({
'@context': 'https://www.w3.org/ns/activitystreams',
'id': f'https://{host}/activities/{uuid.uuid4()}',
'type': 'Announce',
'to': [f'https://{host}/followers'],
'actor': f'https://{host}/actor',
'object': object
})
@classmethod
def new_follow(cls, host, actor):
return cls({
'@context': 'https://www.w3.org/ns/activitystreams',
'type': 'Follow',
'to': [actor],
'object': actor,
'id': f'https://{host}/activities/{uuid.uuid4()}',
'actor': f'https://{host}/actor'
})
@classmethod
def new_unfollow(cls, host, actor, follow):
return cls({
'@context': 'https://www.w3.org/ns/activitystreams',
'id': f'https://{host}/activities/{uuid.uuid4()}',
'type': 'Undo',
'to': [actor],
'actor': f'https://{host}/actor',
'object': follow
})
@classmethod
def new_response(cls, host, actor, followid, accept):
return cls({
'@context': 'https://www.w3.org/ns/activitystreams',
'id': f'https://{host}/activities/{uuid.uuid4()}',
'type': 'Accept' if accept else 'Reject',
'to': [actor],
'actor': f'https://{host}/actor',
'object': {
'id': followid,
'type': 'Follow',
'object': f'https://{host}/actor',
'actor': actor
}
})
# misc properties
@property
def domain(self):
return urlparse(self.id).hostname
# actor properties
@property
def pubkey(self):
return self.publicKey.publicKeyPem
@property
def shared_inbox(self):
return self.get('endpoints', {}).get('sharedInbox', self.inbox)
# activity properties
@property
def actorid(self):
if isinstance(self.actor, dict):
return self.actor.id
return self.actor
@property
def objectid(self):
if isinstance(self.object, dict):
return self.object.id
return self.object

View file

@ -6,113 +6,103 @@ from uuid import uuid4
from . import app, misc from . import app, misc
async def handle_relay(actor, data, request): async def handle_relay(request, actor, data, software):
cache = app['cache'].objects cache = app['cache'].objects
object_id = misc.distill_object_id(data) config = app['config']
if object_id in cache: if data.objectid in cache:
logging.verbose(f'already relayed {object_id} as {cache[object_id]}') logging.verbose(f'already relayed {data.objectid} as {cache[data.objectid]}')
return return
logging.verbose(f'Relaying post from {actor["id"]}') logging.verbose(f'Relaying post from {data.actorid}')
activity_id = f"https://{request.host}/activities/{uuid4()}" message = misc.Message.new_announce(
host = config.host,
message = { object = data.objectid
"@context": "https://www.w3.org/ns/activitystreams", )
"type": "Announce",
"to": [f"https://{request.host}/followers"],
"actor": f"https://{request.host}/actor",
"object": object_id,
"id": activity_id
}
logging.debug(f'>> relay: {message}') logging.debug(f'>> relay: {message}')
inboxes = misc.distill_inboxes(actor, object_id) inboxes = misc.distill_inboxes(actor, data.objectid)
futures = [misc.request(inbox, data=message) for inbox in inboxes] futures = [misc.request(inbox, data=message) for inbox in inboxes]
asyncio.ensure_future(asyncio.gather(*futures)) asyncio.ensure_future(asyncio.gather(*futures))
cache[object_id] = activity_id cache[data.objectid] = message.id
async def handle_forward(actor, data, request): async def handle_forward(request, actor, data, software):
cache = app['cache'].objects cache = app['cache'].objects
object_id = data['id'] config = app['config']
if object_id in cache: if data.id in cache:
logging.verbose(f'already forwarded {object_id}') logging.verbose(f'already forwarded {data.id}')
return return
activity_id = f"https://{request.host}/activities/{uuid4()}" message = misc.Message.new_announce(
host = config.host,
object = data
)
message = { logging.verbose(f'Forwarding post from {actor.id}')
"@context": "https://www.w3.org/ns/activitystreams",
"type": "Announce",
"to": [f"https://{request.host}/followers"],
"actor": f"https://{request.host}/actor",
"object": data,
"id": activity_id
}
logging.verbose(f'Forwarding post from {actor["id"]}')
logging.debug(f'>> Relay {data}') logging.debug(f'>> Relay {data}')
inboxes = misc.distill_inboxes(actor, object_id) inboxes = misc.distill_inboxes(actor, data.id)
futures = [misc.request(inbox, data=message) for inbox in inboxes] futures = [misc.request(inbox, data=message) for inbox in inboxes]
asyncio.ensure_future(asyncio.gather(*futures)) asyncio.ensure_future(asyncio.gather(*futures))
cache[data.id] = message.id
cache[object_id] = activity_id
async def handle_follow(actor, data, request): async def handle_follow(request, actor, data, software):
config = app['config'] config = app['config']
database = app['database'] database = app['database']
inbox = misc.get_actor_inbox(actor) if database.add_inbox(inbox, data.id):
dbinbox = database.get_inbox(inbox) database.set_followid(actor.id, data.id)
if not database.add_inbox(inbox, data['id']):
database.set_followid(inbox, data['id'])
database.save() database.save()
asyncio.ensure_future(misc.follow_remote_actor(actor['id'])) await misc.request(
actor.shared_inbox,
misc.Message.new_response(
host = config.host,
actor = actor.id,
followid = data.id,
accept = True
)
)
message = { # Are Akkoma and Pleroma the only two that expect a follow back?
"@context": "https://www.w3.org/ns/activitystreams", # Ignoring only Mastodon for now
"type": "Accept", if software != 'mastodon':
"to": [actor["id"]], misc.request(
"actor": config.actor, actor.shared_inbox,
misc.Message.new_follow(
# this is wrong per litepub, but mastodon < 2.4 is not compliant with that profile. host = config.host,
"object": { actor = actor.id
"type": "Follow", )
"id": data["id"], )
"object": config.actor,
"actor": actor["id"]
},
"id": f"https://{request.host}/activities/{uuid4()}",
}
asyncio.ensure_future(misc.request(inbox, message))
async def handle_undo(actor, data, request): async def handle_undo(request, actor, data, software):
## If the object is not a Follow, forward it ## If the object is not a Follow, forward it
if data['object']['type'] != 'Follow': if data['object']['type'] != 'Follow':
return await handle_forward(actor, data, request) return await handle_forward(request, actor, data, software)
database = app['database'] database = app['database']
objectid = misc.distill_object_id(data)
if not database.del_inbox(actor['id'], objectid): if not database.del_inbox(actor.domain, data.id):
return return
database.save() database.save()
await misc.unfollow_remote_actor(actor['id']) message = misc.Message.new_unfollow(
host = config.host,
actor = actor.id,
follow = data
)
await misc.request(actor.shared_inbox, message)
processors = { processors = {
@ -125,9 +115,9 @@ processors = {
} }
async def run_processor(request, data, actor): async def run_processor(request, actor, data, software):
if data['type'] not in processors: if data.type not in processors:
return return
logging.verbose(f'New activity from actor: {actor["id"]} {data["type"]}') logging.verbose(f'New "{data.type}" from actor: {actor.id}')
return await processors[data['type']](actor, data, request) return await processors[data.type](request, actor, data, software)

View file

@ -3,10 +3,10 @@ import subprocess
import traceback import traceback
from aiohttp.web import HTTPForbidden, HTTPUnauthorized, Response, json_response from aiohttp.web import HTTPForbidden, HTTPUnauthorized, Response, json_response
from urllib.parse import urlparse
from . import __version__, app, misc from . import __version__, app, misc
from .http_debug import STATS from .http_debug import STATS
from .misc import Message
from .processors import run_processor from .processors import run_processor
@ -48,28 +48,13 @@ a:hover {{ color: #8AF; }}
async def actor(request): async def actor(request):
config = app['config']
database = app['database'] database = app['database']
data = { data = Message.new_actor(
"@context": "https://www.w3.org/ns/activitystreams", host = config.host,
"endpoints": { pubkey = database.pubkey
"sharedInbox": f"https://{request.host}/inbox" )
},
"followers": f"https://{request.host}/followers",
"following": f"https://{request.host}/following",
"inbox": f"https://{request.host}/inbox",
"name": "ActivityRelay",
"type": "Application",
"id": f"https://{request.host}/actor",
"publicKey": {
"id": f"https://{request.host}/actor#main-key",
"owner": f"https://{request.host}/actor",
"publicKeyPem": database.pubkey
},
"summary": "ActivityRelay bot",
"preferredUsername": "relay",
"url": f"https://{request.host}/actor"
}
return json_response(data, content_type='application/activity+json') return json_response(data, content_type='application/activity+json')
@ -85,9 +70,10 @@ async def inbox(request):
## read message and get actor id and domain ## read message and get actor id and domain
try: try:
data = await request.json() data = await request.json(loads=Message.new_from_json)
actor_id = data['actor']
actor_domain = urlparse(actor_id).hostname if 'actor' not in data:
raise KeyError('actor')
## reject if there is no actor in the message ## reject if there is no actor in the message
except KeyError: except KeyError:
@ -99,44 +85,44 @@ async def inbox(request):
logging.verbose('Failed to parse inbox message') logging.verbose('Failed to parse inbox message')
raise HTTPUnauthorized(body='failed to parse message') raise HTTPUnauthorized(body='failed to parse message')
actor = await misc.request(actor_id) actor = await misc.request(data.actorid)
## reject if actor is empty ## reject if actor is empty
if not actor: if not actor:
logging.verbose(f'Failed to fetch actor: {actor_id}') logging.verbose(f'Failed to fetch actor: {data.actorid}')
raise HTTPUnauthorized('failed to fetch actor') raise HTTPUnauthorized('failed to fetch actor')
## reject if the actor isn't whitelisted while the whiltelist is enabled ## reject if the actor isn't whitelisted while the whiltelist is enabled
elif config.whitelist_enabled and not config.is_whitelisted(actor_id): elif config.whitelist_enabled and not config.is_whitelisted(data.domain):
logging.verbose(f'Rejected actor for not being in the whitelist: {actor_id}') logging.verbose(f'Rejected actor for not being in the whitelist: {data.actorid}')
raise HTTPForbidden(body='access denied') raise HTTPForbidden(body='access denied')
## reject if actor is banned ## reject if actor is banned
if app['config'].is_banned(actor_id): if app['config'].is_banned(data.domain):
logging.verbose(f'Ignored request from banned actor: {actor_id}') logging.verbose(f'Ignored request from banned actor: {data.actorid}')
raise HTTPForbidden(body='access denied') raise HTTPForbidden(body='access denied')
## reject if software used by actor is banned ## reject if software used by actor is banned
if len(config.blocked_software): if len(config.blocked_software):
software = await misc.fetch_nodeinfo(actor_domain) software = await misc.fetch_nodeinfo(data.domain)
if config.is_banned_software(software): if config.is_banned_software(software):
logging.verbose(f'Rejected actor for using specific software: {software}') logging.verbose(f'Rejected actor for using specific software: {software}')
raise HTTPForbidden(body='access denied') raise HTTPForbidden(body='access denied')
## reject if the signature is invalid ## reject if the signature is invalid
if not (await misc.validate_signature(actor_id, request)): if not (await misc.validate_signature(data.actorid, request)):
logging.verbose(f'signature validation failed for: {actor_id}') logging.verbose(f'signature validation failed for: {data.actorid}')
raise HTTPUnauthorized(body='signature check failed, signature did not match key') raise HTTPUnauthorized(body='signature check failed, signature did not match key')
## reject if activity type isn't 'Follow' and the actor isn't following ## reject if activity type isn't 'Follow' and the actor isn't following
if data['type'] != 'Follow' and not database.get_inbox(actor_domain): if data['type'] != 'Follow' and not database.get_inbox(data.domain):
logging.verbose(f'Rejected actor for trying to post while not following: {actor_id}') logging.verbose(f'Rejected actor for trying to post while not following: {data.actorid}')
raise HTTPUnauthorized(body='access denied') raise HTTPUnauthorized(body='access denied')
logging.debug(f">> payload {data}") logging.debug(f">> payload {data}")
await run_processor(request, data, actor) await run_processor(request, actor, data, software)
return Response(body=b'{}', content_type='application/activity+json') return Response(body=b'{}', content_type='application/activity+json')