rework final draft

This commit is contained in:
Izalia Mae 2022-04-09 03:07:16 -04:00
parent 0808414cb7
commit 0d77614bdb
9 changed files with 434 additions and 192 deletions

View file

@ -12,13 +12,3 @@ def set_app(app):
global APP global APP
APP = app APP = app
#import argparse
#parser = argparse.ArgumentParser(
#description="A generic LitePub relay (works with all LitePub consumers and Mastodon).",
#prog="python -m relay")
#parser.add_argument("-c", "--config", type=str, default="relay.yaml",
#metavar="<path>", help="the path to your config file")
#args = parser.parse_args()

View file

@ -1,63 +1,5 @@
import Crypto from .manage import main
import asyncio
import logging
import platform
import sys
from aiohttp.web import AppRunner, TCPSite
from . import views
from .application import app
def crypto_check():
vers_split = platform.python_version().split('.')
pip_command = 'pip3 uninstall pycrypto && pip3 install pycryptodome'
if Crypto.__version__ != '2.6.1':
return
if int(vers_split[1]) > 7 and Crypto.__version__ == '2.6.1':
logging.error('PyCrypto is broken on Python 3.8+. Please replace it with pycryptodome before running again. Exiting...')
logging.error(pip_command)
sys.exit()
else:
logging.warning('PyCrypto is old and should be replaced with pycryptodome')
logging.warning(pip_command)
async def start_webserver():
config = app['config']
runner = AppRunner(app, access_log_format='%{X-Forwarded-For}i "%r" %s %b "%{Referer}i" "%{User-Agent}i"')
logging.info(f'Starting webserver at {config.host} ({config.listen}:{config.port})')
await runner.setup()
site = TCPSite(runner, config.listen, config.port)
await site.start()
def main():
# web pages
app.router.add_get('/', views.home)
# endpoints
app.router.add_post('/actor', views.inbox)
app.router.add_post('/inbox', views.inbox)
app.router.add_get('/actor', views.actor)
app.router.add_get('/nodeinfo/2.0.json', views.nodeinfo_2_0)
app.router.add_get('/.well-known/nodeinfo', views.nodeinfo_wellknown)
app.router.add_get('/.well-known/webfinger', views.webfinger)
if logging.DEBUG <= logging.root.level:
app.router.add_get('/stats', views.stats)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
asyncio.ensure_future(start_webserver(), loop=loop)
loop.run_forever()
if __name__ == '__main__': if __name__ == '__main__':
crypto_check()
main() main()

View file

@ -3,12 +3,9 @@ import logging
import sys import sys
from aiohttp.web import Application from aiohttp.web import Application
from cachetools import LRUCache
from . import set_app from . import set_app
from . import views from . import views
from .config import DotDict, RelayConfig
from .database import RelayDatabase
from .middleware import http_signatures_middleware from .middleware import http_signatures_middleware
@ -16,21 +13,4 @@ app = Application(middlewares=[
http_signatures_middleware http_signatures_middleware
]) ])
app['config'] = RelayConfig('relay.yaml')
if not app['config'].load():
app['config'].save()
logging.error('Relay is not setup. Change the host in relay.yaml at the least and try again')
sys.exit(1)
app['database'] = RelayDatabase(app['config'])
app['database'].load()
app['cache'] = DotDict()
app['semaphore'] = asyncio.Semaphore(app['config']['push_limit'])
for key in app['config'].cachekeys:
app['cache'][key] = LRUCache(app['config'][key])
set_app(app) set_app(app)

View file

@ -47,10 +47,10 @@ class DotDict(dict):
class RelayConfig(DotDict): class RelayConfig(DotDict):
apkeys = { apkeys = {
'host', 'host',
'whitelist_enabled',
'blocked_software', 'blocked_software',
'blocked_instances', 'blocked_instances',
'whitelist', 'whitelist'
'whitelist_enabled'
} }
cachekeys = { cachekeys = {
@ -61,13 +61,15 @@ class RelayConfig(DotDict):
def __init__(self, path): def __init__(self, path):
self._path = Path(path).expanduser().resolve()
super().__init__({ super().__init__({
'db': 'relay.jsonld', 'db': f'{self._path.stem}.jsonld',
'listen': '0.0.0.0', 'listen': '0.0.0.0',
'port': 8080, 'port': 8080,
'note': 'Make a note about your instance here.', 'note': 'Make a note about your instance here.',
'push_limit': 512, 'push_limit': 512,
'host': 'example.com', 'host': 'relay.example.com',
'blocked_software': [], 'blocked_software': [],
'blocked_instances': [], 'blocked_instances': [],
'whitelist': [], 'whitelist': [],
@ -77,8 +79,6 @@ class RelayConfig(DotDict):
'digests': 1024 'digests': 1024
}) })
self._path = Path(path).expanduser().resolve()
def __setitem__(self, key, value): def __setitem__(self, key, value):
if key in ['blocked_instances', 'blocked_software', 'whitelist']: if key in ['blocked_instances', 'blocked_software', 'whitelist']:
@ -118,12 +118,85 @@ class RelayConfig(DotDict):
return f'{self.actor}#main-key' return f'{self.actor}#main-key'
def is_banned(self, inbox): def ban_instance(self, instance):
return urlparse(inbox).hostname in self.blocked_instances if instance.startswith('http'):
instance = urlparse(instance).hostname
if self.is_banned(instance):
return False
self.blocked_instances.append(instance)
return True
def is_whitelisted(self, inbox): def unban_instance(self, instance):
return urlparse(inbox).hostname in self.whitelist if instance.startswith('http'):
instance = urlparse(instance).hostname
try:
self.blocked_instances.remove(instance)
return True
except:
return False
def ban_software(self, software):
if self.is_banned_software(software):
return False
self.blocked_software.append(software)
return True
def unban_software(self, software):
try:
self.blocked_software.remove(software)
return True
except:
return False
def add_whitelist(self, instance):
if instance.startswith('http'):
instance = urlparse(instance).hostname
if self.is_whitelisted(instance):
return False
self.whitelist.append(instance)
return True
def del_whitelist(self, instance):
if instance.startswith('http'):
instance = urlparse(instance).hostname
try:
self.whitelist.remove(instance)
return True
except:
return False
def is_banned(self, instance):
if instance.startswith('http'):
instance = urlparse(instance).hostname
return instance in self.blocked_instances
def is_banned_software(self, software):
return software in self.blocked_software
def is_whitelisted(self, instance):
if instance.startswith('http'):
instance = urlparse(instance).hostname
return instance in self.whitelist
def load(self): def load(self):
@ -166,7 +239,7 @@ class RelayConfig(DotDict):
def save(self): def save(self):
config = { config = {
'db': self.db, 'db': self['db'],
'listen': self.listen, 'listen': self.listen,
'port': self.port, 'port': self.port,
'note': self.note, 'note': self.note,

View file

@ -58,7 +58,7 @@ async def on_request_exception(session, trace_config_ctx, params):
def http_debug(): def http_debug():
if logging.DEBUG <= logging.root.level: if logging.DEBUG >= logging.root.level:
return return
trace_config = aiohttp.TraceConfig() trace_config = aiohttp.TraceConfig()

View file

@ -1,80 +1,331 @@
import Crypto
import asyncio import asyncio
import sys import click
import simplejson as json import json
import logging
import platform
from aiohttp.web import AppRunner, TCPSite
from cachetools import LRUCache
from . import misc, views
from .application import app from .application import app
from .config import DotDict, RelayConfig
from .database import RelayDatabase
from .misc import follow_remote_actor, unfollow_remote_actor from .misc import follow_remote_actor, unfollow_remote_actor
def relay_list(): @click.group('cli', context_settings={'show_default': True}, invoke_without_command=True)
print('Connected to the following instances or relays:') @click.option('--config', '-c', default='relay.yaml', help='path to the relay\'s config')
[print('-', inbox) for inbox in app['database'].inboxes] @click.pass_context
def cli(ctx, config):
app['config'] = RelayConfig(config)
if not app['config'].load():
app['config'].save()
app['database'] = RelayDatabase(app['config'])
app['database'].load()
app['cache'] = DotDict()
app['semaphore'] = asyncio.Semaphore(app['config']['push_limit'])
for key in app['config'].cachekeys:
app['cache'][key] = LRUCache(app['config'][key])
if not ctx.invoked_subcommand:
relay_run.callback()
def relay_follow(): @cli.command('list')
if len(sys.argv) < 3: @click.argument('type', required=False, default='inbox')
print('usage: python3 -m relay.manage follow <target>') def relay_list(type):
exit() 'List all following instances'
target = sys.argv[2] assert type in [None, 'inbox', 'ban', 'whitelist']
loop = asyncio.get_event_loop() config = app['config']
loop.run_until_complete(follow_remote_actor(target)) database = app['database']
print('Sent follow message to:', target) if not type or type == 'inbox':
click.echo('Connected to the following instances or relays:')
for inbox in database.inboxes:
click.echo(f'- {inbox}')
elif type == 'ban':
click.echo('Banned instances:')
for instance in config.blocked_instances:
click.echo(f'- {instance}')
click.echo('\nBanned software:')
for software in config.blocked_software:
click.echo(f'- {software}')
elif type == 'whitelist':
click.echo('Whitelisted instances:')
for instance in config.whitelist:
click.echo(f'- {instance}')
def relay_unfollow(): @cli.command('follow')
if len(sys.argv) < 3: @click.argument('actor')
print('usage: python3 -m relay.manage unfollow <target>') def relay_follow(actor):
exit() 'Follow an actor (Relay must be running)'
target = sys.argv[2] loop = asyncio.new_event_loop()
loop.run_until_complete(handle_follow_actor(actor))
loop = asyncio.get_event_loop()
loop.run_until_complete(unfollow_remote_actor(target))
print('Sent unfollow message to:', target)
def relay_forceremove():
if len(sys.argv) < 3:
print('usage: python3 -m relay.manage force-remove <target>')
exit()
target = sys.argv[2]
try:
app['database'].del_inbox(target)
print('Removed inbox from DB:', target)
except KeyError:
print('Failed to remove inbox from DB:', target)
TASKS = { @cli.command('unfollow')
'list': relay_list, @click.argument('actor')
'follow': relay_follow, def relay_follow(actor):
'unfollow': relay_unfollow, 'Unfollow an actor (Relay must be running)'
'force-remove': relay_forceremove
} loop = asyncio.new_event_loop()
loop.run_until_complete(handle_unfollow_actor(actor))
def usage(): @cli.command('add')
print('usage: python3 -m relay.manage <task> [...]') @click.argument('inbox')
print('tasks:') def relay_add(inbox):
[print('-', task) for task in TASKS.keys()] 'Add an inbox to the database'
exit()
database = app['database']
config = app['config']
if not inbox.startswith('http'):
inbox = f'https://{inbox}/inbox'
if database.get_inbox(inbox):
click.echo(f'Error: Inbox already in database: {inbox}')
return
if database.get_inbox(inbox):
click.echo(f'Error: Already added inbox: {inbox}')
return
if config.is_banned(inbox):
click.echo(f'Error: Refusing to add banned inbox: {inbox}')
return
database.add_inbox(inbox)
database.save()
click.echo(f'Added inbox to the database: {inbox}')
@cli.command('remove')
@click.argument('inbox')
def relay_remove(inbox):
'Remove an inbox from the database'
database = app['database']
dbinbox = database.get_inbox(inbox)
if not dbinbox:
click.echo(f'Error: Inbox does not exist: {inbox}')
return
database.del_inbox(dbinbox)
database.save()
click.echo(f'Removed inbox from the database: {inbox}')
# todo: add nested groups
@cli.command('ban')
@click.argument('type')
@click.argument('target')
def relay_ban(type, target):
'Ban an instance or software'
assert type in ['instance', 'software']
config = app['config']
database = app['database']
inbox = database.get_inbox(target)
bancmd = getattr(config, f'ban_{type}')
if bancmd(target):
config.save()
if inbox:
database.del_inbox(inbox)
database.save()
click.echo(f'Banned {type}: {target}')
return
click.echo(f'{type.title()} already banned: {target}')
@cli.command('unban')
@click.argument('type')
@click.argument('target')
def relay_unban(type, target):
'Unban an instance or software'
assert type in ['instance', 'software']
config = app['config']
database = app['database']
unbancmd = getattr(config, f'unban_{type}')
if unbancmd(target):
config.save()
return click.echo(f'Unbanned {type}: {target}')
return click.echo(f'{type.title()} is not banned: {target}')
@cli.command('allow')
@click.argument('instance')
def relay_allow(instance):
'Add an instance to the whitelist'
config = app['config']
if not config.add_whitelist(instance):
return click.echo(f'Instance already in the whitelist: {instance}')
config.save()
click.echo(f'Instance added to the whitelist: {instance}')
@cli.command('deny')
@click.argument('instance')
def relay_deny(instance):
'Remove an instance from the whitelist'
config = app['config']
database = app['database']
inbox = database.get_inbox(instance)
if not config.del_whitelist(instance):
return click.echo(f'Instance not in the whitelist: {instance}')
config.save()
if inbox and config.whitelist_enabled:
database.del_inbox(inbox)
database.save()
click.echo(f'Removed instance from the whitelist: {instance}')
@cli.command('setup')
def relay_setup():
'Generate a new config'
config = app['config']
while True:
config.host = click.prompt('What domain will the relay be hosted on?', default=config.host)
if not config.host.endswith('example.com'):
break
click.echo('The domain must not be example.com')
config.listen = click.prompt('Which address should the relay listen on?', default=config.listen)
while True:
config.port = click.prompt('What TCP port should the relay listen on?', default=config.port, type=int)
break
config.save()
if click.confirm('Relay all setup! Would you like to run it now?'):
relay_run.callback()
@cli.command('run')
def relay_run():
'Run the relay'
if app['config'].host.endswith('example.com'):
return click.echo('Relay is not set up. Please edit your relay config or run "activityrelay setup".')
vers_split = platform.python_version().split('.')
pip_command = 'pip3 uninstall pycrypto && pip3 install pycryptodome'
if Crypto.__version__ == '2.6.1':
if int(vers_split[1]) > 7:
click.echo('Error: PyCrypto is broken on Python 3.8+. Please replace it with pycryptodome before running again. Exiting...')
return click.echo(pip_command)
else:
click.echo('Warning: PyCrypto is old and should be replaced with pycryptodome')
return click.echo(pip_command)
# web pages
app.router.add_get('/', views.home)
# endpoints
app.router.add_post('/actor', views.inbox)
app.router.add_post('/inbox', views.inbox)
app.router.add_get('/actor', views.actor)
app.router.add_get('/nodeinfo/2.0.json', views.nodeinfo_2_0)
app.router.add_get('/.well-known/nodeinfo', views.nodeinfo_wellknown)
app.router.add_get('/.well-known/webfinger', views.webfinger)
if logging.DEBUG >= logging.root.level:
app.router.add_get('/stats', views.stats)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
asyncio.ensure_future(handle_start_webserver(), loop=loop)
loop.run_forever()
async def handle_follow_actor(app, target):
database = app['database']
config = app['config']
if not target.startswith('http'):
target = f'https://{target}/actor'
if database.get_inbox(target):
return click.echo(f'Error: Already following actor: {target}')
if config.is_banned(target):
return click.echo(f'Error: Refusing to follow banned actor: {target}')
if config.whitelist_enabled and not config.is_whitelisted(target):
return click.echo(f'Error: Refusing to follow non-whitelisted actor: {target}')
await misc.follow_remote_actor(target)
click.echo(f'Sent follow message to: {target}')
async def handle_unfollow_actor(app, target):
database = app['database']
if not target.startswith('http'):
target = f'https://{target}/actor'
if not database.get_inbox(target):
return click.echo(f'Error: Not following actor: {target}')
await misc.unfollow_remote_actor(target)
click.echo(f'Sent unfollow message to: {target}')
async def handle_start_webserver():
config = app['config']
runner = AppRunner(app, access_log_format='%{X-Forwarded-For}i "%r" %s %b "%{Referer}i" "%{User-Agent}i"')
logging.info(f'Starting webserver at {config.host} ({config.listen}:{config.port})')
await runner.setup()
site = TCPSite(runner, config.listen, config.port)
await site.start()
def main(): def main():
if len(sys.argv) < 2: cli(prog_name='relay')
usage()
if sys.argv[1] in TASKS:
TASKS[sys.argv[1]]()
else:
usage()
if __name__ == '__main__':
main()

View file

@ -9,6 +9,7 @@ from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_v1_5 from Crypto.Signature import PKCS1_v1_5
from aiohttp import ClientSession from aiohttp import ClientSession
from datetime import datetime from datetime import datetime
from json.decoder import JSONDecodeError
from urllib.parse import urlparse from urllib.parse import urlparse
from uuid import uuid4 from uuid import uuid4
@ -60,7 +61,7 @@ def distill_inboxes(actor, object_id):
targets = [] targets = []
for inbox in database.inboxes: for inbox in database.inboxes:
if inbox != actor_inbox or urlparse(inbox).hostname != original_hostname: if inbox != actor_inbox or urlparse(inbox).hostname != origin_hostname:
targets.append(inbox) targets.append(inbox)
return targets return targets
@ -245,11 +246,15 @@ async def request(uri, data=None, force=False, sign_headers=True):
headers.update(signing_headers) headers.update(signing_headers)
try: try:
# json_serializer=DotDict maybe?
async with ClientSession(trace_configs=http_debug()) as session, get_app()['semaphore']: async with ClientSession(trace_configs=http_debug()) as session, get_app()['semaphore']:
async with session.request(method, uri, headers=headers, data=data) as resp: async with session.request(method, uri, headers=headers, data=data) as resp:
if resp.status not in [200, 202]: ## aiohttp has been known to leak if the response hasn't been read,
resp_payload = await resp.text(encoding='utf-8') ## so we're just gonna read the request no matter what
resp_data = await resp.read()
resp_payload = json.loads(resp_data.decode('utf-8'))
if resp.status not in [200, 202]:
if not data: if not data:
logging.verbose(f'Received error when requesting {uri}: {resp.status} {resp_payload}') logging.verbose(f'Received error when requesting {uri}: {resp.status} {resp_payload}')
return return
@ -257,20 +262,16 @@ async def request(uri, data=None, force=False, sign_headers=True):
logging.verbose(f'Received error when sending {action} to {uri}: {resp.status} {resp_payload}') logging.verbose(f'Received error when sending {action} to {uri}: {resp.status} {resp_payload}')
return return
try:
resp_payload = await resp.json(encoding='utf-8', content_type=None)
except:
return
logging.debug(f'{uri} >> resp {resp_payload}') logging.debug(f'{uri} >> resp {resp_payload}')
get_app()['cache'].json[uri] = resp_payload get_app()['cache'].json[uri] = resp_payload
return resp_payload return resp_payload
except Exception as e: except JSONDecodeError:
return
except Exception:
traceback.print_exc() traceback.print_exc()
return None
async def validate_signature(actor, http_request): async def validate_signature(actor, http_request):

View file

@ -13,10 +13,12 @@ async def handle_relay(actor, data, request):
object_id = misc.distill_object_id(data) object_id = misc.distill_object_id(data)
if object_id in cache: if object_id in cache:
logging.debug(f'>> already relayed {object_id} as {cache[object_id]}') logging.verbose(f'already relayed {object_id} as {cache[object_id]}')
return return
activity_id = f"https://{request.host}/activities/{uuid.uuid4()}" logging.verbose(f'Relaying post from {actor["id"]}')
activity_id = f"https://{request.host}/activities/{uuid4()}"
message = { message = {
"@context": "https://www.w3.org/ns/activitystreams", "@context": "https://www.w3.org/ns/activitystreams",
@ -29,11 +31,10 @@ async def handle_relay(actor, data, request):
logging.debug(f'>> relay: {message}') logging.debug(f'>> relay: {message}')
inboxes = misc.distill_inboxes(actor['id'], object_id) inboxes = misc.distill_inboxes(actor, object_id)
futures = [misc.request(inbox, data=message) for inbox in inboxes] futures = [misc.request(inbox, data=message) for inbox in inboxes]
asyncio.ensure_future(asyncio.gather(*futures))
asyncio.ensure_future(asyncio.gather(*futures))
cache[object_id] = activity_id cache[object_id] = activity_id
@ -42,9 +43,10 @@ async def handle_forward(actor, data, request):
object_id = misc.distill_object_id(data) object_id = misc.distill_object_id(data)
if object_id in cache: if object_id in cache:
logging.debug(f'>> already forwarded {object_id}.') logging.verbose(f'already forwarded {object_id}')
return return
logging.verbose(f'Forwarding post from {actor["id"]}')
logging.debug(f'>> Relay {data}') logging.debug(f'>> Relay {data}')
inboxes = misc.distill_inboxes(actor['id'], object_id) inboxes = misc.distill_inboxes(actor['id'], object_id)
@ -87,7 +89,12 @@ async def handle_follow(actor, data, request):
async def handle_undo(actor, data, request): async def handle_undo(actor, data, request):
if data['object']['type'] != 'Follow': ## If the activity being undone is an Announce, forward it insteead
if data['object']['type'] == 'Announce':
await handle_forward(actor, data, request)
return
elif data['object']['type'] != 'Follow':
return return
database = app['database'] database = app['database']
@ -113,4 +120,5 @@ processors = {
async def run_processor(request, data, actor): async def run_processor(request, data, actor):
logging.verbose(f'New activity from actor: {actor["id"]} {data["type"]}')
return await processors[data['type']](actor, data, request) return await processors[data['type']](actor, data, request)

View file

@ -1,6 +1,6 @@
[metadata] [metadata]
name = relay name = relay
version = 0.1.0 version = 0.2.0
description = Generic LitePub relay (works with all LitePub consumers and Mastodon) description = Generic LitePub relay (works with all LitePub consumers and Mastodon)
long_description = file: README.md long_description = file: README.md
long_description_content_type = text/markdown; charset=UTF-8 long_description_content_type = text/markdown; charset=UTF-8
@ -23,17 +23,14 @@ project_urls =
zip_safe = False zip_safe = False
packages = find: packages = find:
install_requires = install_requires =
aiohttp>=3.5.4 aiohttp >= 3.8.0
async-timeout>=3.0.0 cachetools >= 5.0.0
attrs>=18.1.0 click >= 8.1.2
chardet>=3.0.4 pycryptodome >= 3.14.1
idna>=2.7 PyYAML >= 5.0.0
idna-ssl>=1.1.0; python_version < "3.7"
multidict>=4.3.1
pycryptodome>=3.9.4
PyYAML>=5.1
simplejson>=3.16.0
yarl>=1.2.6
cachetools
async_lru
python_requires = >=3.6 python_requires = >=3.6
[options.entry_points]
console_scripts =
activityrelay = relay.manage:main