Merge branch 'dev' into 'master'

v0.2.3

See merge request pleroma/relay!42
This commit is contained in:
Izalia Mae 2022-11-18 17:39:31 +00:00
commit 6e494ee671
13 changed files with 964 additions and 573 deletions

View file

@ -15,7 +15,7 @@ the [official pipx docs](https://pypa.github.io/pipx/installation/) for more in-
Now simply install ActivityRelay directly from git Now simply install ActivityRelay directly from git
pipx install git+https://git.pleroma.social/pleroma/relay@0.2.0 pipx install git+https://git.pleroma.social/pleroma/relay@0.2.3
Or from a cloned git repo. Or from a cloned git repo.
@ -39,7 +39,7 @@ be installed via [pyenv](https://github.com/pyenv/pyenv).
The instructions for installation via pip are very similar to pipx. Installation can be done from The instructions for installation via pip are very similar to pipx. Installation can be done from
git git
python3 -m pip install git+https://git.pleroma.social/pleroma/relay@0.2.0 python3 -m pip install git+https://git.pleroma.social/pleroma/relay@0.2.3
or a cloned git repo. or a cloned git repo.

44
relay.spec Normal file
View file

@ -0,0 +1,44 @@
# -*- mode: python ; coding: utf-8 -*-
block_cipher = None
a = Analysis(
['relay/__main__.py'],
pathex=[],
binaries=[],
datas=[],
hiddenimports=[],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[],
win_no_prefer_redirects=False,
win_private_assemblies=False,
cipher=block_cipher,
noarchive=False,
)
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
exe = EXE(
pyz,
a.scripts,
a.binaries,
a.zipfiles,
a.datas,
[],
name='activityrelay',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
console=True,
disable_windowed_traceback=False,
argv_emulation=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
)

View file

@ -10,7 +10,7 @@ port: 8080
note: "Make a note about your instance here." note: "Make a note about your instance here."
# maximum number of inbox posts to do at once # maximum number of inbox posts to do at once
post_limit: 512 push_limit: 512
# this section is for ActivityPub # this section is for ActivityPub
ap: ap:
@ -39,5 +39,5 @@ ap:
# cache limits as number of items. only change this if you know what you're doing # cache limits as number of items. only change this if you know what you're doing
cache: cache:
objects: 1024 objects: 1024
actors: 1024 json: 1024
digests: 1024 digests: 1024

View file

@ -1,8 +1,3 @@
__version__ = '0.2.2' __version__ = '0.2.3'
from aiohttp.web import Application
from . import logger from . import logger
app = Application()

View file

@ -1,4 +1,4 @@
from .manage import main from relay.manage import main
if __name__ == '__main__': if __name__ == '__main__':

127
relay/application.py Normal file
View file

@ -0,0 +1,127 @@
import asyncio
import logging
import os
import signal
from aiohttp import web
from cachetools import LRUCache
from datetime import datetime, timedelta
from .config import RelayConfig
from .database import RelayDatabase
from .misc import DotDict, check_open_port, set_app
from .views import routes
class Application(web.Application):
def __init__(self, cfgpath):
web.Application.__init__(self)
self['starttime'] = None
self['running'] = False
self['is_docker'] = bool(os.environ.get('DOCKER_RUNNING'))
self['config'] = RelayConfig(cfgpath, self['is_docker'])
if not self['config'].load():
self['config'].save()
self['database'] = RelayDatabase(self['config'])
self['database'].load()
self['cache'] = DotDict({key: Cache(maxsize=self['config'][key]) for key in self['config'].cachekeys})
self['semaphore'] = asyncio.Semaphore(self['config'].push_limit)
self.set_signal_handler()
set_app(self)
@property
def cache(self):
return self['cache']
@property
def config(self):
return self['config']
@property
def database(self):
return self['database']
@property
def is_docker(self):
return self['is_docker']
@property
def semaphore(self):
return self['semaphore']
@property
def uptime(self):
if not self['starttime']:
return timedelta(seconds=0)
uptime = datetime.now() - self['starttime']
return timedelta(seconds=uptime.seconds)
def set_signal_handler(self):
for sig in {'SIGHUP', 'SIGINT', 'SIGQUIT', 'SIGTERM'}:
try:
signal.signal(getattr(signal, sig), self.stop)
# some signals don't exist in windows, so skip them
except AttributeError:
pass
def run(self):
if not check_open_port(self.config.listen, self.config.port):
return logging.error(f'A server is already running on port {self.config.port}')
for route in routes:
if route[1] == '/stats' and logging.DEBUG < logging.root.level:
continue
self.router.add_route(*route)
logging.info(f'Starting webserver at {self.config.host} ({self.config.listen}:{self.config.port})')
asyncio.run(self.handle_run())
def stop(self, *_):
self['running'] = False
async def handle_run(self):
self['running'] = True
runner = web.AppRunner(self, access_log_format='%{X-Forwarded-For}i "%r" %s %b "%{User-Agent}i"')
await runner.setup()
site = web.TCPSite(runner,
host = self.config.listen,
port = self.config.port,
reuse_address = True
)
await site.start()
self['starttime'] = datetime.now()
while self['running']:
await asyncio.sleep(0.25)
await site.stop()
self['starttime'] = None
self['running'] = False
class Cache(LRUCache):
def set_maxsize(self, value):
self.__maxsize = int(value)

View file

@ -4,6 +4,8 @@ import yaml
from pathlib import Path from pathlib import Path
from urllib.parse import urlparse from urllib.parse import urlparse
from .misc import DotDict
relay_software_names = [ relay_software_names = [
'activityrelay', 'activityrelay',
@ -13,45 +15,6 @@ relay_software_names = [
] ]
class DotDict(dict):
def __getattr__(self, k):
try:
return self[k]
except KeyError:
raise AttributeError(f'{self.__class__.__name__} object has no attribute {k}') from None
def __setattr__(self, k, v):
try:
if k in self._ignore_keys:
super().__setattr__(k, v)
except AttributeError:
pass
if k.startswith('_'):
super().__setattr__(k, v)
else:
self[k] = v
def __setitem__(self, k, v):
if type(v) == dict:
v = DotDict(v)
super().__setitem__(k, v)
def __delattr__(self, k):
try:
dict.__delitem__(self, k)
except KeyError:
raise AttributeError(f'{self.__class__.__name__} object has no attribute {k}') from None
class RelayConfig(DotDict): class RelayConfig(DotDict):
apkeys = { apkeys = {
'host', 'host',
@ -69,27 +32,15 @@ class RelayConfig(DotDict):
def __init__(self, path, is_docker): def __init__(self, path, is_docker):
DotDict.__init__(self, {})
if is_docker: if is_docker:
path = '/data/relay.yaml' path = '/data/relay.yaml'
self._isdocker = is_docker self._isdocker = is_docker
self._path = Path(path).expanduser() self._path = Path(path).expanduser()
super().__init__({ self.reset()
'db': str(self._path.parent.joinpath(f'{self._path.stem}.jsonld')),
'listen': '0.0.0.0',
'port': 8080,
'note': 'Make a note about your instance here.',
'push_limit': 512,
'host': 'relay.example.com',
'blocked_software': [],
'blocked_instances': [],
'whitelist': [],
'whitelist_enabled': False,
'json': 1024,
'objects': 1024,
'digests': 1024
})
def __setitem__(self, key, value): def __setitem__(self, key, value):
@ -133,6 +84,24 @@ class RelayConfig(DotDict):
return f'{self.actor}#main-key' return f'{self.actor}#main-key'
def reset(self):
self.clear()
self.update({
'db': str(self._path.parent.joinpath(f'{self._path.stem}.jsonld')),
'listen': '0.0.0.0',
'port': 8080,
'note': 'Make a note about your instance here.',
'push_limit': 512,
'host': 'relay.example.com',
'blocked_software': [],
'blocked_instances': [],
'whitelist': [],
'whitelist_enabled': False,
'json': 1024,
'objects': 1024,
'digests': 1024
})
def ban_instance(self, instance): def ban_instance(self, instance):
if instance.startswith('http'): if instance.startswith('http'):
instance = urlparse(instance).hostname instance = urlparse(instance).hostname
@ -218,6 +187,8 @@ class RelayConfig(DotDict):
def load(self): def load(self):
self.reset()
options = {} options = {}
try: try:

View file

@ -6,10 +6,16 @@ from Crypto.PublicKey import RSA
from urllib.parse import urlparse from urllib.parse import urlparse
class RelayDatabase: class RelayDatabase(dict):
def __init__(self, config): def __init__(self, config):
dict.__init__(self, {
'relay-list': {},
'private-key': None,
'follow-requests': {},
'version': 1
})
self.config = config self.config = config
self.data = None
self.PRIVKEY = None self.PRIVKEY = None
@ -25,26 +31,22 @@ class RelayDatabase:
@property @property
def privkey(self): def privkey(self):
try: return self['private-key']
return self.data['private-key']
except KeyError:
return False
@property @property
def hostnames(self): def hostnames(self):
return [urlparse(inbox).hostname for inbox in self.inboxes] return tuple(self['relay-list'].keys())
@property @property
def inboxes(self): def inboxes(self):
return self.data.get('relay-list', []) return tuple(data['inbox'] for data in self['relay-list'].values())
def generate_key(self): def generate_key(self):
self.PRIVKEY = RSA.generate(4096) self.PRIVKEY = RSA.generate(4096)
self.data['private-key'] = self.PRIVKEY.exportKey('PEM').decode('utf-8') self['private-key'] = self.PRIVKEY.exportKey('PEM').decode('utf-8')
def load(self): def load(self):
@ -52,14 +54,31 @@ class RelayDatabase:
try: try:
with self.config.db.open() as fd: with self.config.db.open() as fd:
self.data = json.load(fd) data = json.load(fd)
key = self.data.pop('actorKeys', None) self['version'] = data.get('version', None)
self['private-key'] = data.get('private-key')
if key: if self['version'] == None:
self.data['private-key'] = key.get('privateKey') self['version'] = 1
if 'actorKeys' in data:
self['private-key'] = data['actorKeys']['privateKey']
for item in data.get('relay-list', []):
domain = urlparse(item).hostname
self['relay-list'][domain] = {
'inbox': item,
'followid': None
}
else:
self['relay-list'] = data.get('relay-list', {})
for domain in self['relay-list'].keys():
if self.config.is_banned(domain) or (self.config.whitelist_enabled and not self.config.is_whitelisted(domain)):
self.del_inbox(domain)
self.data.pop('actors', None)
new_db = False new_db = False
except FileNotFoundError: except FileNotFoundError:
@ -69,14 +88,6 @@ class RelayDatabase:
if self.config.db.stat().st_size > 0: if self.config.db.stat().st_size > 0:
raise e from None raise e from None
if not self.data:
logging.info('No database was found. Making a new one.')
self.data = {}
for inbox in self.inboxes:
if self.config.is_banned(inbox) or (self.config.whitelist_enabled and not self.config.is_whitelisted(inbox)):
self.del_inbox(inbox)
if not self.privkey: if not self.privkey:
logging.info("No actor keys present, generating 4096-bit RSA keypair.") logging.info("No actor keys present, generating 4096-bit RSA keypair.")
self.generate_key() self.generate_key()
@ -90,34 +101,99 @@ class RelayDatabase:
def save(self): def save(self):
with self.config.db.open('w') as fd: with self.config.db.open('w') as fd:
data = { json.dump(self, fd, indent=4)
'relay-list': self.inboxes,
'private-key': self.privkey
}
json.dump(data, fd, indent=4)
def get_inbox(self, domain): def get_inbox(self, domain, fail=False):
if domain.startswith('http'): if domain.startswith('http'):
domain = urlparse(domain).hostname domain = urlparse(domain).hostname
for inbox in self.inboxes: if domain not in self['relay-list']:
if domain == urlparse(inbox).hostname: if fail:
return inbox raise KeyError(domain)
return
return self['relay-list'][domain]
def add_inbox(self, inbox): def add_inbox(self, inbox, followid=None, fail=False):
assert inbox.startswith('https') assert inbox.startswith('https'), 'Inbox must be a url'
assert not self.get_inbox(inbox) domain = urlparse(inbox).hostname
self.data['relay-list'].append(inbox) if self.get_inbox(domain):
if fail:
raise KeyError(domain)
return False
self['relay-list'][domain] = {
'domain': domain,
'inbox': inbox,
'followid': followid
}
logging.verbose(f'Added inbox to database: {inbox}')
return self['relay-list'][domain]
def del_inbox(self, inbox_url): def del_inbox(self, domain, followid=None, fail=False):
inbox = self.get_inbox(inbox_url) data = self.get_inbox(domain, fail=False)
if not inbox: if not data:
raise KeyError(inbox_url) if fail:
raise KeyError(domain)
self.data['relay-list'].remove(inbox) return False
if not data['followid'] or not followid or data['followid'] == followid:
del self['relay-list'][data['domain']]
logging.verbose(f'Removed inbox from database: {data["inbox"]}')
return True
if fail:
raise ValueError('Follow IDs do not match')
logging.debug(f'Follow ID does not match: db = {data["followid"]}, object = {followid}')
return False
def set_followid(self, domain, followid):
data = self.get_inbox(domain, fail=True)
data['followid'] = followid
def get_request(self, domain, fail=True):
if domain.startswith('http'):
domain = urlparse(domain).hostname
try:
return self['follow-requests'][domain]
except KeyError as e:
if fail:
raise e
def add_request(self, actor, inbox, followid):
domain = urlparse(inbox).hostname
try:
request = self.get_request(domain)
request['followid'] = followid
except KeyError:
pass
self['follow-requests'][domain] = {
'actor': actor,
'inbox': inbox,
'followid': followid
}
def del_request(self, domain):
if domain.startswith('http'):
domain = urlparse(inbox).hostname
del self['follow-requests'][domain]

View file

@ -1,17 +1,17 @@
import Crypto import Crypto
import asyncio import asyncio
import click import click
import json
import logging import logging
import os
import platform import platform
from aiohttp.web import AppRunner, TCPSite from urllib.parse import urlparse
from cachetools import LRUCache
from . import app, misc, views, __version__ from . import misc, __version__
from .config import DotDict, RelayConfig, relay_software_names from .application import Application
from .database import RelayDatabase from .config import relay_software_names
app = None
@click.group('cli', context_settings={'show_default': True}, invoke_without_command=True) @click.group('cli', context_settings={'show_default': True}, invoke_without_command=True)
@ -19,23 +19,11 @@ from .database import RelayDatabase
@click.version_option(version=__version__, prog_name='ActivityRelay') @click.version_option(version=__version__, prog_name='ActivityRelay')
@click.pass_context @click.pass_context
def cli(ctx, config): def cli(ctx, config):
app['is_docker'] = bool(os.environ.get('DOCKER_RUNNING')) global app
app['config'] = RelayConfig(config, app['is_docker']) app = Application(config)
if not app['config'].load():
app['config'].save()
app['database'] = RelayDatabase(app['config'])
app['database'].load()
app['cache'] = DotDict()
app['semaphore'] = asyncio.Semaphore(app['config']['push_limit'])
for key in app['config'].cachekeys:
app['cache'][key] = LRUCache(app['config'][key])
if not ctx.invoked_subcommand: if not ctx.invoked_subcommand:
if app['config'].host.endswith('example.com'): if app.config.host.endswith('example.com'):
relay_setup.callback() relay_setup.callback()
else: else:
@ -55,7 +43,7 @@ def cli_inbox_list():
click.echo('Connected to the following instances or relays:') click.echo('Connected to the following instances or relays:')
for inbox in app['database'].inboxes: for inbox in app.database.inboxes:
click.echo(f'- {inbox}') click.echo(f'- {inbox}')
@ -64,29 +52,30 @@ def cli_inbox_list():
def cli_inbox_follow(actor): def cli_inbox_follow(actor):
'Follow an actor (Relay must be running)' 'Follow an actor (Relay must be running)'
config = app['config'] if app.config.is_banned(actor):
database = app['database']
if config.is_banned(actor):
return click.echo(f'Error: Refusing to follow banned actor: {actor}') return click.echo(f'Error: Refusing to follow banned actor: {actor}')
if not actor.startswith('http'): if not actor.startswith('http'):
domain = actor
actor = f'https://{actor}/actor' actor = f'https://{actor}/actor'
if database.get_inbox(actor): else:
return click.echo(f'Error: Already following actor: {actor}') domain = urlparse(actor).hostname
actor_data = run_in_loop(misc.request, actor, sign_headers=True) try:
inbox_data = app.database['relay-list'][domain]
inbox = inbox_data['inbox']
if not actor_data: except KeyError:
return click.echo(f'Error: Failed to fetch actor: {actor}') actor_data = asyncio.run(misc.request(actor))
inbox = actor_data.shared_inbox
inbox = misc.get_actor_inbox(actor_data) message = misc.Message.new_follow(
host = app.config.host,
actor = actor.id
)
database.add_inbox(inbox) asyncio.run(misc.request(inbox, message))
database.save()
run_in_loop(misc.follow_remote_actor, actor)
click.echo(f'Sent follow message to actor: {actor}') click.echo(f'Sent follow message to actor: {actor}')
@ -95,18 +84,36 @@ def cli_inbox_follow(actor):
def cli_inbox_unfollow(actor): def cli_inbox_unfollow(actor):
'Unfollow an actor (Relay must be running)' 'Unfollow an actor (Relay must be running)'
database = app['database']
if not actor.startswith('http'): if not actor.startswith('http'):
domain = actor
actor = f'https://{actor}/actor' actor = f'https://{actor}/actor'
if not database.get_inbox(actor): else:
return click.echo(f'Error: Not following actor: {actor}') domain = urlparse(actor).hostname
database.del_inbox(actor) try:
database.save() inbox_data = app.database['relay-list'][domain]
inbox = inbox_data['inbox']
message = misc.Message.new_unfollow(
host = app.config.host,
actor = actor,
follow = inbox_data['followid']
)
run_in_loop(misc.unfollow_remote_actor, actor) except KeyError:
actor_data = asyncio.run(misc.request(actor))
inbox = actor_data.shared_inbox
message = misc.Message.new_unfollow(
host = app.config.host,
actor = actor,
follow = {
'type': 'Follow',
'object': actor,
'actor': f'https://{app.config.host}/actor'
}
)
asyncio.run(misc.request(inbox, message))
click.echo(f'Sent unfollow message to: {actor}') click.echo(f'Sent unfollow message to: {actor}')
@ -115,23 +122,17 @@ def cli_inbox_unfollow(actor):
def cli_inbox_add(inbox): def cli_inbox_add(inbox):
'Add an inbox to the database' 'Add an inbox to the database'
database = app['database']
config = app['config']
if not inbox.startswith('http'): if not inbox.startswith('http'):
inbox = f'https://{inbox}/inbox' inbox = f'https://{inbox}/inbox'
if database.get_inbox(inbox): if app.config.is_banned(inbox):
click.echo(f'Error: Inbox already in database: {inbox}') return click.echo(f'Error: Refusing to add banned inbox: {inbox}')
return
if config.is_banned(inbox): if app.database.add_inbox(inbox):
click.echo(f'Error: Refusing to add banned inbox: {inbox}') app.database.save()
return return click.echo(f'Added inbox to the database: {inbox}')
database.add_inbox(inbox) click.echo(f'Error: Inbox already in database: {inbox}')
database.save()
click.echo(f'Added inbox to the database: {inbox}')
@cli_inbox.command('remove') @cli_inbox.command('remove')
@ -139,15 +140,16 @@ def cli_inbox_add(inbox):
def cli_inbox_remove(inbox): def cli_inbox_remove(inbox):
'Remove an inbox from the database' 'Remove an inbox from the database'
database = app['database'] try:
dbinbox = database.get_inbox(inbox) dbinbox = app.database.get_inbox(inbox, fail=True)
if not dbinbox: except KeyError:
click.echo(f'Error: Inbox does not exist: {inbox}') click.echo(f'Error: Inbox does not exist: {inbox}')
return return
database.del_inbox(dbinbox) app.database.del_inbox(dbinbox['domain'])
database.save() app.database.save()
click.echo(f'Removed inbox from the database: {inbox}') click.echo(f'Removed inbox from the database: {inbox}')
@ -163,7 +165,7 @@ def cli_instance_list():
click.echo('Banned instances or relays:') click.echo('Banned instances or relays:')
for domain in app['config'].blocked_instances: for domain in app.config.blocked_instances:
click.echo(f'- {domain}') click.echo(f'- {domain}')
@ -172,16 +174,14 @@ def cli_instance_list():
def cli_instance_ban(target): def cli_instance_ban(target):
'Ban an instance and remove the associated inbox if it exists' 'Ban an instance and remove the associated inbox if it exists'
config = app['config'] if target.startswith('http'):
database = app['database'] target = urlparse(target).hostname
inbox = database.get_inbox(target)
if config.ban_instance(target): if app.config.ban_instance(target):
config.save() app.config.save()
if inbox: if app.database.del_inbox(target):
database.del_inbox(inbox) app.database.save()
database.save()
click.echo(f'Banned instance: {target}') click.echo(f'Banned instance: {target}')
return return
@ -194,10 +194,8 @@ def cli_instance_ban(target):
def cli_instance_unban(target): def cli_instance_unban(target):
'Unban an instance' 'Unban an instance'
config = app['config'] if app.config.unban_instance(target):
app.config.save()
if config.unban_instance(target):
config.save()
click.echo(f'Unbanned instance: {target}') click.echo(f'Unbanned instance: {target}')
return return
@ -217,7 +215,7 @@ def cli_software_list():
click.echo('Banned software:') click.echo('Banned software:')
for software in app['config'].blocked_software: for software in app.config.blocked_software:
click.echo(f'- {software}') click.echo(f'- {software}')
@ -229,17 +227,15 @@ def cli_software_list():
def cli_software_ban(name, fetch_nodeinfo): def cli_software_ban(name, fetch_nodeinfo):
'Ban software. Use RELAYS for NAME to ban relays' 'Ban software. Use RELAYS for NAME to ban relays'
config = app['config']
if name == 'RELAYS': if name == 'RELAYS':
for name in relay_software_names: for name in relay_software_names:
config.ban_software(name) app.config.ban_software(name)
config.save() app.config.save()
return click.echo('Banned all relay software') return click.echo('Banned all relay software')
if fetch_nodeinfo: if fetch_nodeinfo:
software = run_in_loop(fetch_nodeinfo, name) software = asyncio.run(misc.fetch_nodeinfo(name))
if not software: if not software:
click.echo(f'Failed to fetch software name from domain: {name}') click.echo(f'Failed to fetch software name from domain: {name}')
@ -247,7 +243,7 @@ def cli_software_ban(name, fetch_nodeinfo):
name = software name = software
if config.ban_software(name): if config.ban_software(name):
config.save() app.config.save()
return click.echo(f'Banned software: {name}') return click.echo(f'Banned software: {name}')
click.echo(f'Software already banned: {name}') click.echo(f'Software already banned: {name}')
@ -261,25 +257,23 @@ def cli_software_ban(name, fetch_nodeinfo):
def cli_software_unban(name, fetch_nodeinfo): def cli_software_unban(name, fetch_nodeinfo):
'Ban software. Use RELAYS for NAME to unban relays' 'Ban software. Use RELAYS for NAME to unban relays'
config = app['config']
if name == 'RELAYS': if name == 'RELAYS':
for name in relay_software_names: for name in relay_software_names:
config.unban_software(name) app.config.unban_software(name)
config.save() config.save()
return click.echo('Unbanned all relay software') return click.echo('Unbanned all relay software')
if fetch_nodeinfo: if fetch_nodeinfo:
software = run_in_loop(fetch_nodeinfo, name) software = asyncio.run(misc.fetch_nodeinfo(name))
if not software: if not software:
click.echo(f'Failed to fetch software name from domain: {name}') click.echo(f'Failed to fetch software name from domain: {name}')
name = software name = software
if config.unban_software(name): if app.config.unban_software(name):
config.save() app.config.save()
return click.echo(f'Unbanned software: {name}') return click.echo(f'Unbanned software: {name}')
click.echo(f'Software wasn\'t banned: {name}') click.echo(f'Software wasn\'t banned: {name}')
@ -296,7 +290,7 @@ def cli_whitelist():
def cli_whitelist_list(): def cli_whitelist_list():
click.echo('Current whitelisted domains') click.echo('Current whitelisted domains')
for domain in app['config'].whitelist: for domain in app.config.whitelist:
click.echo(f'- {domain}') click.echo(f'- {domain}')
@ -305,12 +299,10 @@ def cli_whitelist_list():
def cli_whitelist_add(instance): def cli_whitelist_add(instance):
'Add an instance to the whitelist' 'Add an instance to the whitelist'
config = app['config'] if not app.config.add_whitelist(instance):
if not config.add_whitelist(instance):
return click.echo(f'Instance already in the whitelist: {instance}') return click.echo(f'Instance already in the whitelist: {instance}')
config.save() app.config.save()
click.echo(f'Instance added to the whitelist: {instance}') click.echo(f'Instance added to the whitelist: {instance}')
@ -319,18 +311,14 @@ def cli_whitelist_add(instance):
def cli_whitelist_remove(instance): def cli_whitelist_remove(instance):
'Remove an instance from the whitelist' 'Remove an instance from the whitelist'
config = app['config'] if not app.config.del_whitelist(instance):
database = app['database']
inbox = database.get_inbox(instance)
if not config.del_whitelist(instance):
return click.echo(f'Instance not in the whitelist: {instance}') return click.echo(f'Instance not in the whitelist: {instance}')
config.save() app.config.save()
if inbox and config.whitelist_enabled: if app.config.whitelist_enabled:
database.del_inbox(inbox) if app.database.del_inbox(inbox):
database.save() app.database.save()
click.echo(f'Removed instance from the whitelist: {instance}') click.echo(f'Removed instance from the whitelist: {instance}')
@ -339,23 +327,21 @@ def cli_whitelist_remove(instance):
def relay_setup(): def relay_setup():
'Generate a new config' 'Generate a new config'
config = app['config']
while True: while True:
config.host = click.prompt('What domain will the relay be hosted on?', default=config.host) app.config.host = click.prompt('What domain will the relay be hosted on?', default=app.config.host)
if not config.host.endswith('example.com'): if not config.host.endswith('example.com'):
break break
click.echo('The domain must not be example.com') click.echo('The domain must not be example.com')
config.listen = click.prompt('Which address should the relay listen on?', default=config.listen) app.config.listen = click.prompt('Which address should the relay listen on?', default=app.config.listen)
while True: while True:
config.port = click.prompt('What TCP port should the relay listen on?', default=config.port, type=int) app.config.port = click.prompt('What TCP port should the relay listen on?', default=app.config.port, type=int)
break break
config.save() app.config.save()
if not app['is_docker'] and click.confirm('Relay all setup! Would you like to run it now?'): if not app['is_docker'] and click.confirm('Relay all setup! Would you like to run it now?'):
relay_run.callback() relay_run.callback()
@ -365,9 +351,7 @@ def relay_setup():
def relay_run(): def relay_run():
'Run the relay' 'Run the relay'
config = app['config'] if app.config.host.endswith('example.com'):
if config.host.endswith('example.com'):
return click.echo('Relay is not set up. Please edit your relay config or run "activityrelay setup".') return click.echo('Relay is not set up. Please edit your relay config or run "activityrelay setup".')
vers_split = platform.python_version().split('.') vers_split = platform.python_version().split('.')
@ -382,43 +366,10 @@ def relay_run():
click.echo('Warning: PyCrypto is old and should be replaced with pycryptodome') click.echo('Warning: PyCrypto is old and should be replaced with pycryptodome')
return click.echo(pip_command) return click.echo(pip_command)
if not misc.check_open_port(config.listen, config.port): if not misc.check_open_port(app.config.listen, app.config.port):
return click.echo(f'Error: A server is already running on port {config.port}') return click.echo(f'Error: A server is already running on port {app.config.port}')
# web pages app.run()
app.router.add_get('/', views.home)
# endpoints
app.router.add_post('/actor', views.inbox)
app.router.add_post('/inbox', views.inbox)
app.router.add_get('/actor', views.actor)
app.router.add_get('/nodeinfo/2.0.json', views.nodeinfo_2_0)
app.router.add_get('/.well-known/nodeinfo', views.nodeinfo_wellknown)
app.router.add_get('/.well-known/webfinger', views.webfinger)
if logging.DEBUG >= logging.root.level:
app.router.add_get('/stats', views.stats)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
asyncio.ensure_future(handle_start_webserver(), loop=loop)
loop.run_forever()
def run_in_loop(func, *args, **kwargs):
loop = asyncio.new_event_loop()
return loop.run_until_complete(func(*args, **kwargs))
async def handle_start_webserver():
config = app['config']
runner = AppRunner(app, access_log_format='%{X-Forwarded-For}i "%r" %s %b "%{Referer}i" "%{User-Agent}i"')
logging.info(f'Starting webserver at {config.host} ({config.listen}:{config.port})')
await runner.setup()
site = TCPSite(runner, config.listen, config.port)
await site.start()
def main(): def main():

View file

@ -4,26 +4,47 @@ import json
import logging import logging
import socket import socket
import traceback import traceback
import uuid
from Crypto.Hash import SHA, SHA256, SHA512 from Crypto.Hash import SHA, SHA256, SHA512
from Crypto.PublicKey import RSA from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_v1_5 from Crypto.Signature import PKCS1_v1_5
from aiohttp import ClientSession from aiohttp import ClientSession
from aiohttp.hdrs import METH_ALL as METHODS
from aiohttp.web import Response as AiohttpResponse, View as AiohttpView
from datetime import datetime from datetime import datetime
from json.decoder import JSONDecodeError from json.decoder import JSONDecodeError
from urllib.parse import urlparse from urllib.parse import urlparse
from uuid import uuid4 from uuid import uuid4
from . import app
from .http_debug import http_debug from .http_debug import http_debug
app = None
HASHES = { HASHES = {
'sha1': SHA, 'sha1': SHA,
'sha256': SHA256, 'sha256': SHA256,
'sha512': SHA512 'sha512': SHA512
} }
MIMETYPES = {
'activity': 'application/activity+json',
'html': 'text/html',
'json': 'application/json',
'text': 'text/plain'
}
NODEINFO_NS = {
'20': 'http://nodeinfo.diaspora.software/ns/schema/2.0',
'21': 'http://nodeinfo.diaspora.software/ns/schema/2.1'
}
def set_app(new_app):
global app
app = new_app
def build_signing_string(headers, used_headers): def build_signing_string(headers, used_headers):
return '\n'.join(map(lambda x: ': '.join([x.lower(), headers[x]]), used_headers)) return '\n'.join(map(lambda x: ': '.join([x.lower(), headers[x]]), used_headers))
@ -47,56 +68,35 @@ def create_signature_header(headers):
sigstring = build_signing_string(headers, used_headers) sigstring = build_signing_string(headers, used_headers)
sig = { sig = {
'keyId': app['config'].keyid, 'keyId': app.config.keyid,
'algorithm': 'rsa-sha256', 'algorithm': 'rsa-sha256',
'headers': ' '.join(used_headers), 'headers': ' '.join(used_headers),
'signature': sign_signing_string(sigstring, app['database'].PRIVKEY) 'signature': sign_signing_string(sigstring, app.database.PRIVKEY)
} }
chunks = ['{}="{}"'.format(k, v) for k, v in sig.items()] chunks = ['{}="{}"'.format(k, v) for k, v in sig.items()]
return ','.join(chunks) return ','.join(chunks)
def distill_object_id(activity):
logging.debug(f'>> determining object ID for {activity["object"]}')
try:
return activity['object']['id']
except TypeError:
return activity['object']
def distill_inboxes(actor, object_id): def distill_inboxes(actor, object_id):
database = app['database'] for inbox in app.database.inboxes:
origin_hostname = urlparse(object_id).hostname if inbox != actor.shared_inbox and urlparse(inbox).hostname != urlparse(object_id).hostname:
actor_inbox = get_actor_inbox(actor) yield inbox
targets = []
for inbox in database.inboxes:
if inbox != actor_inbox or urlparse(inbox).hostname != origin_hostname:
targets.append(inbox)
return targets
def generate_body_digest(body): def generate_body_digest(body):
bodyhash = app['cache'].digests.get(body) bodyhash = app.cache.digests.get(body)
if bodyhash: if bodyhash:
return bodyhash return bodyhash
h = SHA256.new(body.encode('utf-8')) h = SHA256.new(body.encode('utf-8'))
bodyhash = base64.b64encode(h.digest()).decode('utf-8') bodyhash = base64.b64encode(h.digest()).decode('utf-8')
app['cache'].digests[body] = bodyhash app.cache.digests[body] = bodyhash
return bodyhash return bodyhash
def get_actor_inbox(actor):
return actor.get('endpoints', {}).get('sharedInbox', actor['inbox'])
def sign_signing_string(sigstring, key): def sign_signing_string(sigstring, key):
pkcs = PKCS1_v1_5.new(key) pkcs = PKCS1_v1_5.new(key)
h = SHA256.new() h = SHA256.new()
@ -106,20 +106,6 @@ def sign_signing_string(sigstring, key):
return base64.b64encode(sigdata).decode('utf-8') return base64.b64encode(sigdata).decode('utf-8')
def split_signature(sig):
default = {"headers": "date"}
sig = sig.strip().split(',')
for chunk in sig:
k, _, v = chunk.partition('=')
v = v.strip('\"')
default[k] = v
default['headers'] = default['headers'].split()
return default
async def fetch_actor_key(actor): async def fetch_actor_key(actor):
actor_data = await request(actor) actor_data = await request(actor)
@ -135,103 +121,52 @@ async def fetch_actor_key(actor):
async def fetch_nodeinfo(domain): async def fetch_nodeinfo(domain):
nodeinfo_url = None nodeinfo_url = None
wk_nodeinfo = await request(f'https://{domain}/.well-known/nodeinfo', sign_headers=False, activity=False) wk_nodeinfo = await request(f'https://{domain}/.well-known/nodeinfo', sign_headers=False, activity=False)
if not wk_nodeinfo: if not wk_nodeinfo:
return return
for link in wk_nodeinfo.get('links', ''): wk_nodeinfo = WKNodeinfo(wk_nodeinfo)
if link['rel'] == 'http://nodeinfo.diaspora.software/ns/schema/2.0':
nodeinfo_url = link['href'] for version in ['20', '21']:
break try:
nodeinfo_url = wk_nodeinfo.get_url(version)
except KeyError:
pass
if not nodeinfo_url: if not nodeinfo_url:
return logging.verbose(f'Failed to fetch nodeinfo url for domain: {domain}')
return False
nodeinfo_data = await request(nodeinfo_url, sign_headers=False, activity=False) nodeinfo = await request(nodeinfo_url, sign_headers=False, activity=False)
try: try:
return nodeinfo_data['software']['name'] return nodeinfo['software']['name']
except KeyError: except KeyError:
return False return False
async def follow_remote_actor(actor_uri):
config = app['config']
actor = await request(actor_uri)
inbox = get_actor_inbox(actor)
if not actor:
logging.error(f'failed to fetch actor at: {actor_uri}')
return
logging.verbose(f'sending follow request: {actor_uri}')
message = {
"@context": "https://www.w3.org/ns/activitystreams",
"type": "Follow",
"to": [actor['id']],
"object": actor['id'],
"id": f"https://{config.host}/activities/{uuid4()}",
"actor": f"https://{config.host}/actor"
}
await request(inbox, message)
async def unfollow_remote_actor(actor_uri):
config = app['config']
actor = await request(actor_uri)
if not actor:
logging.error(f'failed to fetch actor: {actor_uri}')
return
inbox = get_actor_inbox(actor)
logging.verbose(f'sending unfollow request to inbox: {inbox}')
message = {
"@context": "https://www.w3.org/ns/activitystreams",
"type": "Undo",
"to": [actor_uri],
"object": {
"type": "Follow",
"object": actor_uri,
"actor": actor_uri,
"id": f"https://{config.host}/activities/{uuid4()}"
},
"id": f"https://{config.host}/activities/{uuid4()}",
"actor": f"https://{config.host}/actor"
}
await request(inbox, message)
async def request(uri, data=None, force=False, sign_headers=True, activity=True): async def request(uri, data=None, force=False, sign_headers=True, activity=True):
## If a get request and not force, try to use the cache first ## If a get request and not force, try to use the cache first
if not data and not force: if not data and not force:
try: try:
return app['cache'].json[uri] return app.cache.json[uri]
except KeyError: except KeyError:
pass pass
url = urlparse(uri) url = urlparse(uri)
method = 'POST' if data else 'GET' method = 'POST' if data else 'GET'
headers = {'User-Agent': 'ActivityRelay'} action = data.get('type') if data else None
mimetype = 'application/activity+json' if activity else 'application/json' headers = {
'Accept': f'{MIMETYPES["activity"]}, {MIMETYPES["json"]};q=0.9',
'User-Agent': 'ActivityRelay',
}
## Set the content type for a POST if data:
if data and 'Content-Type' not in headers: headers['Content-Type'] = MIMETYPES['activity' if activity else 'json']
headers['Content-Type'] = mimetype
## Set the accepted content type for a GET
elif not data and 'Accept' not in headers:
headers['Accept'] = mimetype
if sign_headers: if sign_headers:
signing_headers = { signing_headers = {
@ -243,7 +178,6 @@ async def request(uri, data=None, force=False, sign_headers=True, activity=True)
if data: if data:
assert isinstance(data, dict) assert isinstance(data, dict)
action = data.get('type')
data = json.dumps(data) data = json.dumps(data)
signing_headers.update({ signing_headers.update({
'Digest': f'SHA-256={generate_body_digest(data)}', 'Digest': f'SHA-256={generate_body_digest(data)}',
@ -258,26 +192,42 @@ async def request(uri, data=None, force=False, sign_headers=True, activity=True)
headers.update(signing_headers) headers.update(signing_headers)
try: try:
# json_serializer=DotDict maybe? if data:
async with ClientSession(trace_configs=http_debug()) as session, app['semaphore']: logging.verbose(f'Sending "{action}" to inbox: {uri}')
else:
logging.verbose(f'Sending GET request to url: {uri}')
async with ClientSession(trace_configs=http_debug()) as session, app.semaphore:
async with session.request(method, uri, headers=headers, data=data) as resp: async with session.request(method, uri, headers=headers, data=data) as resp:
## aiohttp has been known to leak if the response hasn't been read, ## aiohttp has been known to leak if the response hasn't been read,
## so we're just gonna read the request no matter what ## so we're just gonna read the request no matter what
resp_data = await resp.read() resp_data = await resp.read()
resp_payload = json.loads(resp_data.decode('utf-8'))
if resp.status not in [200, 202]: ## Not expecting a response, so just return
if not data: if resp.status == 202:
logging.verbose(f'Received error when requesting {uri}: {resp.status} {resp_payload}')
return
logging.verbose(f'Received error when sending {action} to {uri}: {resp.status} {resp_payload}')
return return
logging.debug(f'{uri} >> resp {resp_payload}') elif resp.status != 200:
if not resp_data:
return logging.verbose(f'Received error when requesting {uri}: {resp.status} {resp_data}')
app['cache'].json[uri] = resp_payload return logging.verbose(f'Received error when sending {action} to {uri}: {resp.status} {resp_data}')
return resp_payload
if resp.content_type == MIMETYPES['activity']:
resp_data = await resp.json(loads=Message.new_from_json)
elif resp.content_type == MIMETYPES['json']:
resp_data = await resp.json(loads=DotDict.new_from_json)
else:
logging.verbose(f'Invalid Content-Type for "{url}": {resp.content_type}')
return logging.debug(f'Response: {resp_data}')
logging.debug(f'{uri} >> resp {resp_data}')
app.cache.json[uri] = resp_data
return resp_data
except JSONDecodeError: except JSONDecodeError:
return return
@ -286,29 +236,19 @@ async def request(uri, data=None, force=False, sign_headers=True, activity=True)
traceback.print_exc() traceback.print_exc()
async def validate_signature(actor, http_request): async def validate_signature(actor, signature, http_request):
pubkey = await fetch_actor_key(actor)
if not pubkey:
return False
logging.debug(f'actor key: {pubkey}')
headers = {key.lower(): value for key, value in http_request.headers.items()} headers = {key.lower(): value for key, value in http_request.headers.items()}
headers['(request-target)'] = ' '.join([http_request.method.lower(), http_request.path]) headers['(request-target)'] = ' '.join([http_request.method.lower(), http_request.path])
sig = split_signature(headers['signature']) sigstring = build_signing_string(headers, signature['headers'])
logging.debug(f'sigdata: {sig}')
sigstring = build_signing_string(headers, sig['headers'])
logging.debug(f'sigstring: {sigstring}') logging.debug(f'sigstring: {sigstring}')
sign_alg, _, hash_alg = sig['algorithm'].partition('-') sign_alg, _, hash_alg = signature['algorithm'].partition('-')
logging.debug(f'sign alg: {sign_alg}, hash alg: {hash_alg}') logging.debug(f'sign alg: {sign_alg}, hash alg: {hash_alg}')
sigdata = base64.b64decode(sig['signature']) sigdata = base64.b64decode(signature['signature'])
pkcs = PKCS1_v1_5.new(pubkey) pkcs = PKCS1_v1_5.new(actor.PUBKEY)
h = HASHES[hash_alg].new() h = HASHES[hash_alg].new()
h.update(sigstring.encode('ascii')) h.update(sigstring.encode('ascii'))
result = pkcs.verify(h, sigdata) result = pkcs.verify(h, sigdata)
@ -317,3 +257,294 @@ async def validate_signature(actor, http_request):
logging.debug(f'validates? {result}') logging.debug(f'validates? {result}')
return result return result
class DotDict(dict):
def __init__(self, _data, **kwargs):
dict.__init__(self)
self.update(_data, **kwargs)
def __getattr__(self, k):
try:
return self[k]
except KeyError:
raise AttributeError(f'{self.__class__.__name__} object has no attribute {k}') from None
def __setattr__(self, k, v):
if k.startswith('_'):
super().__setattr__(k, v)
else:
self[k] = v
def __setitem__(self, k, v):
if type(v) == dict:
v = DotDict(v)
super().__setitem__(k, v)
def __delattr__(self, k):
try:
dict.__delitem__(self, k)
except KeyError:
raise AttributeError(f'{self.__class__.__name__} object has no attribute {k}') from None
@classmethod
def new_from_json(cls, data):
if not data:
raise JSONDecodeError('Empty body', data, 1)
try:
return cls(json.loads(data))
except ValueError:
raise JSONDecodeError('Invalid body', data, 1)
@classmethod
def new_from_signature(cls, sig):
data = cls({})
for chunk in sig.strip().split(','):
key, value = chunk.split('=', 1)
value = value.strip('\"')
if key == 'headers':
value = value.split()
data[key.lower()] = value
return data
def to_json(self, indent=None):
return json.dumps(self, indent=indent)
def update(self, _data, **kwargs):
if isinstance(_data, dict):
for key, value in _data.items():
self[key] = value
elif isinstance(_data, (list, tuple, set)):
for key, value in _data:
self[key] = value
for key, value in kwargs.items():
self[key] = value
class Message(DotDict):
@classmethod
def new_actor(cls, host, pubkey, description=None):
return cls({
'@context': 'https://www.w3.org/ns/activitystreams',
'id': f'https://{host}/actor',
'type': 'Application',
'preferredUsername': 'relay',
'name': 'ActivityRelay',
'summary': description or 'ActivityRelay bot',
'followers': f'https://{host}/followers',
'following': f'https://{host}/following',
'inbox': f'https://{host}/inbox',
'url': f'https://{host}/inbox',
'endpoints': {
'sharedInbox': f'https://{host}/inbox'
},
'publicKey': {
'id': f'https://{host}/actor#main-key',
'owner': f'https://{host}/actor',
'publicKeyPem': pubkey
}
})
@classmethod
def new_announce(cls, host, object):
return cls({
'@context': 'https://www.w3.org/ns/activitystreams',
'id': f'https://{host}/activities/{uuid.uuid4()}',
'type': 'Announce',
'to': [f'https://{host}/followers'],
'actor': f'https://{host}/actor',
'object': object
})
@classmethod
def new_follow(cls, host, actor):
return cls({
'@context': 'https://www.w3.org/ns/activitystreams',
'type': 'Follow',
'to': [actor],
'object': actor,
'id': f'https://{host}/activities/{uuid.uuid4()}',
'actor': f'https://{host}/actor'
})
@classmethod
def new_unfollow(cls, host, actor, follow):
return cls({
'@context': 'https://www.w3.org/ns/activitystreams',
'id': f'https://{host}/activities/{uuid.uuid4()}',
'type': 'Undo',
'to': [actor],
'actor': f'https://{host}/actor',
'object': follow
})
@classmethod
def new_response(cls, host, actor, followid, accept):
return cls({
'@context': 'https://www.w3.org/ns/activitystreams',
'id': f'https://{host}/activities/{uuid.uuid4()}',
'type': 'Accept' if accept else 'Reject',
'to': [actor],
'actor': f'https://{host}/actor',
'object': {
'id': followid,
'type': 'Follow',
'object': f'https://{host}/actor',
'actor': actor
}
})
# misc properties
@property
def domain(self):
return urlparse(self.id).hostname
# actor properties
@property
def PUBKEY(self):
return RSA.import_key(self.pubkey)
@property
def pubkey(self):
return self.publicKey.publicKeyPem
@property
def shared_inbox(self):
return self.get('endpoints', {}).get('sharedInbox', self.inbox)
# activity properties
@property
def actorid(self):
if isinstance(self.actor, dict):
return self.actor.id
return self.actor
@property
def objectid(self):
if isinstance(self.object, dict):
return self.object.id
return self.object
class Response(AiohttpResponse):
@classmethod
def new(cls, body='', status=200, headers=None, ctype='text'):
kwargs = {
'status': status,
'headers': headers,
'content_type': MIMETYPES[ctype]
}
if isinstance(body, bytes):
kwargs['body'] = body
elif isinstance(body, dict) and ctype in {'json', 'activity'}:
kwargs['text'] = json.dumps(body)
else:
kwargs['text'] = body
return cls(**kwargs)
@classmethod
def new_error(cls, status, body, ctype='text'):
if ctype == 'json':
body = json.dumps({'status': status, 'error': body})
return cls.new(body=body, status=status, ctype=ctype)
@property
def location(self):
return self.headers.get('Location')
@location.setter
def location(self, value):
self.headers['Location'] = value
class View(AiohttpView):
async def _iter(self):
if self.request.method not in METHODS:
self._raise_allowed_methods()
method = getattr(self, self.request.method.lower(), None)
if method is None:
self._raise_allowed_methods()
return await method(**self.request.match_info)
@property
def app(self):
return self._request.app
@property
def cache(self):
return self.app.cache
@property
def config(self):
return self.app.config
@property
def database(self):
return self.app.database
class WKNodeinfo(DotDict):
@classmethod
def new(cls, v20, v21):
return cls({
'links': [
{'rel': NODEINFO_NS['20'], 'href': v20},
{'rel': NODEINFO_NS['21'], 'href': v21}
]
})
def get_url(self, version='20'):
for item in self.links:
if item['rel'] == NODEINFO_NS[version]:
return item['href']
raise KeyError(version)

View file

@ -3,108 +3,95 @@ import logging
from uuid import uuid4 from uuid import uuid4
from . import app, misc from . import misc
async def handle_relay(actor, data, request): async def handle_relay(request, actor, data, software):
cache = app['cache'].objects if data.objectid in request.app.cache.objects:
object_id = misc.distill_object_id(data) logging.verbose(f'already relayed {data.objectid}')
if object_id in cache:
logging.verbose(f'already relayed {object_id} as {cache[object_id]}')
return return
logging.verbose(f'Relaying post from {actor["id"]}') logging.verbose(f'Relaying post from {data.actorid}')
activity_id = f"https://{request.host}/activities/{uuid4()}" message = misc.Message.new_announce(
host = request.app.config.host,
message = { object = data.objectid
"@context": "https://www.w3.org/ns/activitystreams", )
"type": "Announce",
"to": [f"https://{request.host}/followers"],
"actor": f"https://{request.host}/actor",
"object": object_id,
"id": activity_id
}
logging.debug(f'>> relay: {message}') logging.debug(f'>> relay: {message}')
inboxes = misc.distill_inboxes(actor, object_id) inboxes = misc.distill_inboxes(actor, data.objectid)
futures = [misc.request(inbox, data=message) for inbox in inboxes] futures = [misc.request(inbox, data=message) for inbox in inboxes]
asyncio.ensure_future(asyncio.gather(*futures)) asyncio.ensure_future(asyncio.gather(*futures))
cache[object_id] = activity_id request.app.cache.objects[data.objectid] = message.id
async def handle_forward(actor, data, request): async def handle_forward(request, actor, data, software):
cache = app['cache'].objects if data.id in request.app.cache.objects:
object_id = misc.distill_object_id(data) logging.verbose(f'already forwarded {data.id}')
if object_id in cache:
logging.verbose(f'already forwarded {object_id}')
return return
logging.verbose(f'Forwarding post from {actor["id"]}') message = misc.Message.new_announce(
host = request.app.config.host,
object = data
)
logging.verbose(f'Forwarding post from {actor.id}')
logging.debug(f'>> Relay {data}') logging.debug(f'>> Relay {data}')
inboxes = misc.distill_inboxes(actor, object_id) inboxes = misc.distill_inboxes(actor, data.id)
futures = [misc.request(inbox, data=message) for inbox in inboxes]
futures = [misc.request(inbox, data=data) for inbox in inboxes]
asyncio.ensure_future(asyncio.gather(*futures)) asyncio.ensure_future(asyncio.gather(*futures))
request.app.cache.objects[data.id] = message.id
cache[object_id] = object_id
async def handle_follow(actor, data, request): async def handle_follow(request, actor, data, software):
config = app['config'] if not request.app.database.add_inbox(actor.shared_inbox, data.id):
database = app['database'] request.app.database.set_followid(actor.id, data.id)
inbox = misc.get_actor_inbox(actor) request.app.database.save()
if inbox not in database.inboxes: await misc.request(
database.add_inbox(inbox) actor.shared_inbox,
database.save() misc.Message.new_response(
asyncio.ensure_future(misc.follow_remote_actor(actor['id'])) host = request.app.config.host,
actor = actor.id,
followid = data.id,
accept = True
)
)
message = { # Are Akkoma and Pleroma the only two that expect a follow back?
"@context": "https://www.w3.org/ns/activitystreams", # Ignoring only Mastodon for now
"type": "Accept", if software != 'mastodon':
"to": [actor["id"]], await misc.request(
"actor": config.actor, actor.shared_inbox,
misc.Message.new_follow(
# this is wrong per litepub, but mastodon < 2.4 is not compliant with that profile. host = request.app.config.host,
"object": { actor = actor.id
"type": "Follow", )
"id": data["id"], )
"object": config.actor,
"actor": actor["id"]
},
"id": f"https://{request.host}/activities/{uuid4()}",
}
asyncio.ensure_future(misc.request(inbox, message))
async def handle_undo(actor, data, request): async def handle_undo(request, actor, data, software):
## If the activity being undone is an Announce, forward it insteead ## If the object is not a Follow, forward it
if data['object']['type'] == 'Announce': if data['object']['type'] != 'Follow':
await handle_forward(actor, data, request) return await handle_forward(request, actor, data, software)
if not request.app.database.del_inbox(actor.domain, data.id):
return return
elif data['object']['type'] != 'Follow': request.app.database.save()
return
database = app['database'] message = misc.Message.new_unfollow(
inbox = database.get_inbox(actor['id']) host = request.app.config.host,
actor = actor.id,
follow = data
)
if not inbox: await misc.request(actor.shared_inbox, message)
return
database.del_inbox(inbox)
database.save()
await misc.unfollow_remote_actor(actor['id'])
processors = { processors = {
@ -117,9 +104,9 @@ processors = {
} }
async def run_processor(request, data, actor): async def run_processor(request, actor, data, software):
if data['type'] not in processors: if data.type not in processors:
return return
logging.verbose(f'New activity from actor: {actor["id"]} {data["type"]}') logging.verbose(f'New "{data.type}" from actor: {actor.id}')
return await processors[data['type']](actor, data, request) return await processors[data.type](request, actor, data, software)

View file

@ -2,25 +2,43 @@ import logging
import subprocess import subprocess
import traceback import traceback
from aiohttp.web import HTTPForbidden, HTTPUnauthorized, Response, json_response from pathlib import Path
from urllib.parse import urlparse
from . import __version__, app, misc from . import __version__, misc
from .http_debug import STATS from .http_debug import STATS
from .misc import DotDict, Message, Response, WKNodeinfo
from .processors import run_processor from .processors import run_processor
try: routes = []
commit_label = subprocess.check_output(["git", "rev-parse", "HEAD"]).strip().decode('ascii') version = __version__
version = f'{__version__} {commit_label}'
except:
version = __version__
if Path(__file__).parent.parent.joinpath('.git').exists():
try:
commit_label = subprocess.check_output(["git", "rev-parse", "HEAD"]).strip().decode('ascii')
version = f'{__version__} {commit_label}'
except:
pass
def register_route(method, path):
def wrapper(func):
routes.append([method, path, func])
return func
return wrapper
@register_route('GET', '/')
async def home(request): async def home(request):
targets = '<br>'.join(app['database'].hostnames) targets = '<br>'.join(request.app.database.hostnames)
text = """ note = request.app.config.note
count = len(request.app.database.hostnames)
host = request.app.config.host
text = f"""
<html><head> <html><head>
<title>ActivityPub Relay at {host}</title> <title>ActivityPub Relay at {host}</title>
<style> <style>
@ -37,132 +55,117 @@ a:hover {{ color: #8AF; }}
<p>You may subscribe to this relay with the address: <a href="https://{host}/actor">https://{host}/actor</a></p> <p>You may subscribe to this relay with the address: <a href="https://{host}/actor">https://{host}/actor</a></p>
<p>To host your own relay, you may download the code at this address: <a href="https://git.pleroma.social/pleroma/relay">https://git.pleroma.social/pleroma/relay</a></p> <p>To host your own relay, you may download the code at this address: <a href="https://git.pleroma.social/pleroma/relay">https://git.pleroma.social/pleroma/relay</a></p>
<br><p>List of {count} registered instances:<br>{targets}</p> <br><p>List of {count} registered instances:<br>{targets}</p>
</body></html>""".format(host=request.host, note=app['config'].note, targets=targets, count=len(app['database'].inboxes)) </body></html>"""
return Response( return Response.new(text, ctype='html')
status = 200,
content_type = 'text/html',
charset = 'utf-8', @register_route('GET', '/inbox')
text = text @register_route('GET', '/actor')
async def actor(request):
data = Message.new_actor(
host = request.app.config.host,
pubkey = request.app.database.pubkey
) )
return Response.new(data, ctype='activity')
async def actor(request):
database = app['database']
data = {
"@context": "https://www.w3.org/ns/activitystreams",
"endpoints": {
"sharedInbox": f"https://{request.host}/inbox"
},
"followers": f"https://{request.host}/followers",
"following": f"https://{request.host}/following",
"inbox": f"https://{request.host}/inbox",
"name": "ActivityRelay",
"type": "Application",
"id": f"https://{request.host}/actor",
"publicKey": {
"id": f"https://{request.host}/actor#main-key",
"owner": f"https://{request.host}/actor",
"publicKeyPem": database.pubkey
},
"summary": "ActivityRelay bot",
"preferredUsername": "relay",
"url": f"https://{request.host}/actor"
}
return json_response(data, content_type='application/activity+json')
@register_route('POST', '/inbox')
@register_route('POST', '/actor')
async def inbox(request): async def inbox(request):
config = app['config'] config = request.app.config
database = app['database'] database = request.app.database
## reject if missing signature header ## reject if missing signature header
if 'signature' not in request.headers: try:
signature = DotDict.new_from_signature(request.headers['signature'])
except KeyError:
logging.verbose('Actor missing signature header') logging.verbose('Actor missing signature header')
raise HTTPUnauthorized(body='missing signature') raise HTTPUnauthorized(body='missing signature')
## read message and get actor id and domain ## read message
try: try:
data = await request.json() data = await request.json(loads=Message.new_from_json)
actor_id = data['actor']
actor_domain = urlparse(actor_id).hostname
except KeyError: ## reject if there is no actor in the message
logging.verbose('actor not in data') if 'actor' not in data:
raise HTTPUnauthorized(body='no actor in message') logging.verbose('actor not in data')
return Response.new_error(400, 'no actor in message', 'json')
## reject if there is no actor in the message
except: except:
traceback.print_exc() traceback.print_exc()
logging.verbose('Failed to parse inbox message') logging.verbose('Failed to parse inbox message')
raise HTTPUnauthorized(body='failed to parse message') return Response.new_error(400, 'failed to parse message', 'json')
actor = await misc.request(actor_id) actor = await misc.request(signature.keyid)
software = await misc.fetch_nodeinfo(actor.domain)
## reject if actor is empty ## reject if actor is empty
if not actor: if not actor:
logging.verbose(f'Failed to fetch actor: {actor_id}') logging.verbose(f'Failed to fetch actor: {actor.id}')
raise HTTPUnauthorized('failed to fetch actor') return Response.new_error(400, 'failed to fetch actor', 'json')
## reject if the actor isn't whitelisted while the whiltelist is enabled ## reject if the actor isn't whitelisted while the whiltelist is enabled
elif config.whitelist_enabled and not config.is_whitelisted(actor_id): elif config.whitelist_enabled and not config.is_whitelisted(actor.domain):
logging.verbose(f'Rejected actor for not being in the whitelist: {actor_id}') logging.verbose(f'Rejected actor for not being in the whitelist: {actor.id}')
raise HTTPForbidden(body='access denied') return Response.new_error(403, 'access denied', 'json')
## reject if actor is banned ## reject if actor is banned
if app['config'].is_banned(actor_id): if request.app['config'].is_banned(actor.domain):
logging.verbose(f'Ignored request from banned actor: {actor_id}') logging.verbose(f'Ignored request from banned actor: {actor.id}')
raise HTTPForbidden(body='access denied') return Response.new_error(403, 'access denied', 'json')
## reject if software used by actor is banned ## reject if software used by actor is banned
if len(config.blocked_software): if config.is_banned_software(software):
software = await misc.fetch_nodeinfo(actor_domain) logging.verbose(f'Rejected actor for using specific software: {software}')
return Response.new_error(403, 'access denied', 'json')
if config.is_banned_software(software):
logging.verbose(f'Rejected actor for using specific software: {software}')
raise HTTPForbidden(body='access denied')
## reject if the signature is invalid ## reject if the signature is invalid
if not (await misc.validate_signature(actor_id, request)): if not (await misc.validate_signature(actor, signature, request)):
logging.verbose(f'signature validation failed for: {actor_id}') logging.verbose(f'signature validation failed for: {actor.id}')
raise HTTPUnauthorized(body='signature check failed, signature did not match key') return Response.new_error(401, 'signature check failed', 'json')
## reject if activity type isn't 'Follow' and the actor isn't following ## reject if activity type isn't 'Follow' and the actor isn't following
if data['type'] != 'Follow' and not database.get_inbox(actor_domain): if data['type'] != 'Follow' and not database.get_inbox(actor.domain):
logging.verbose(f'Rejected actor for trying to post while not following: {actor_id}') logging.verbose(f'Rejected actor for trying to post while not following: {actor.id}')
raise HTTPUnauthorized(body='access denied') return Response.new_error(401, 'access denied', 'json')
logging.debug(f">> payload {data}") logging.debug(f">> payload {data}")
await run_processor(request, data, actor) await run_processor(request, actor, data, software)
return Response(body=b'{}', content_type='application/activity+json') return Response.new(status=202)
@register_route('GET', '/.well-known/webfinger')
async def webfinger(request): async def webfinger(request):
config = app['config'] try:
subject = request.query['resource'] subject = request.query['resource']
if subject != f'acct:relay@{request.host}': except KeyError:
return json_response({'error': 'user not found'}, status=404) return Response.new_error(400, 'missing \'resource\' query key', 'json')
if subject != f'acct:relay@{request.app.config.host}':
return Response.new_error(404, 'user not found', 'json')
data = { data = {
'subject': subject, 'subject': subject,
'aliases': [config.actor], 'aliases': [request.app.config.actor],
'links': [ 'links': [
{'href': config.actor, 'rel': 'self', 'type': 'application/activity+json'}, {'href': request.app.config.actor, 'rel': 'self', 'type': 'application/activity+json'},
{'href': config.actor, 'rel': 'self', 'type': 'application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\"'} {'href': request.app.config.actor, 'rel': 'self', 'type': 'application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\"'}
] ]
} }
return json_response(data) return Response.new(data, ctype='json')
@register_route('GET', '/nodeinfo/{version:\d.\d\.json}')
async def nodeinfo_2_0(request): async def nodeinfo_2_0(request):
niversion = request.match_info['version'][:3]
data = { data = {
# XXX - is this valid for a relay? 'openRegistrations': not request.app.config.whitelist_enabled,
'openRegistrations': True,
'protocols': ['activitypub'], 'protocols': ['activitypub'],
'services': { 'services': {
'inbound': [], 'inbound': [],
@ -179,25 +182,27 @@ async def nodeinfo_2_0(request):
} }
}, },
'metadata': { 'metadata': {
'peers': app['database'].hostnames 'peers': request.app.database.hostnames
}, },
'version': '2.0' 'version': niversion
} }
return json_response(data) if version == '2.1':
data['software']['repository'] = 'https://git.pleroma.social/pleroma/relay'
return Response.new(data, ctype='json')
@register_route('GET', '/.well-known/nodeinfo')
async def nodeinfo_wellknown(request): async def nodeinfo_wellknown(request):
data = { data = WKNodeinfo.new(
'links': [ v20 = f'https://{request.app.config.host}/nodeinfo/2.0.json',
{ v21 = f'https://{request.app.config.host}/nodeinfo/2.1.json'
'rel': 'http://nodeinfo.diaspora.software/ns/schema/2.0', )
'href': f'https://{request.host}/nodeinfo/2.0.json'
} return Response.new(data, ctype='json')
]
}
return json_response(data)
@register_route('GET', '/stats')
async def stats(request): async def stats(request):
return json_response(STATS) return Response.new(STATS, ctype='json')

View file

@ -1,6 +1,6 @@
[metadata] [metadata]
name = relay name = relay
version = 0.2.2 version = 0.2.3
description = Generic LitePub relay (works with all LitePub consumers and Mastodon) description = Generic LitePub relay (works with all LitePub consumers and Mastodon)
long_description = file: README.md long_description = file: README.md
long_description_content_type = text/markdown; charset=UTF-8 long_description_content_type = text/markdown; charset=UTF-8
@ -30,6 +30,10 @@ install_requires =
PyYAML >= 5.0.0 PyYAML >= 5.0.0
python_requires = >=3.6 python_requires = >=3.6
[options.extras_require]
dev =
pyinstaller >= 5.6.0
[options.entry_points] [options.entry_points]
console_scripts = console_scripts =
activityrelay = relay.manage:main activityrelay = relay.manage:main