Compare commits

..

No commits in common. "master" and "0.1.0" have entirely different histories.

31 changed files with 1049 additions and 2564 deletions

View file

@ -1,28 +1,11 @@
FROM python:3-alpine
# install build deps for pycryptodome and other c-based python modules
WORKDIR /workdir
RUN apk add alpine-sdk autoconf automake libtool gcc
# add env var to let the relay know it's in a container
ENV DOCKER_RUNNING=true
# setup various container properties
VOLUME ["/data"]
CMD ["python", "-m", "relay"]
EXPOSE 8080/tcp
WORKDIR /opt/activityrelay
# install and update important python modules
RUN pip3 install -U setuptools wheel pip
# only copy necessary files
COPY relay ./relay
COPY LICENSE .
COPY README.md .
COPY requirements.txt .
COPY setup.cfg .
COPY setup.py .
COPY .git ./.git
# install relay deps
ADD requirements.txt /workdir/
RUN pip3 install -r requirements.txt
ADD . /workdir/
CMD ["python", "-m", "relay"]
VOLUME ["/workdir/data"]

View file

@ -10,14 +10,72 @@ Affero General Public License version 3 (AGPLv3) license. You can find a copy o
in this package as the `LICENSE` file.
## Setup
You need at least Python 3.6 (latest version of 3.x recommended) to make use of this software.
It simply will not run on older Python versions.
Download the project and install with pip (`pip3 install .`).
Copy `relay.yaml.example` to `relay.yaml` and edit it as appropriate:
$ cp relay.yaml.example relay.yaml
$ $EDITOR relay.yaml
Finally, you can launch the relay:
$ python3 -m relay
It is suggested to run this under some sort of supervisor, such as runit, daemontools,
s6 or systemd. Configuration of the supervisor is not covered here, as it is different
depending on which system you have available.
The bot runs a webserver, internally, on localhost at port 8080. This needs to be
forwarded by nginx or similar. The webserver is used to receive ActivityPub messages,
and needs to be secured with an SSL certificate inside nginx or similar. Configuration
of your webserver is not discussed here, but any guide explaining how to configure a
modern non-PHP web application should cover it.
## Getting Started
Normally, you would direct your LitePub instance software to follow the LitePub actor
found on the relay. In Pleroma this would be something like:
$ MIX_ENV=prod mix relay_follow https://your.relay.hostname/actor
$ MIX_ENV=prod mix relay_follow https://your.relay.hostname/actor
Mastodon uses an entirely different relay protocol but supports LitePub relay protocol
as well when the Mastodon relay handshake is used. In these cases, Mastodon relay
clients should follow `http://your.relay.hostname/inbox` as they would with Mastodon's
own relay software.
## Documentation
## Performance
To install or manage your relay, check the [documentation](docs/index.md)
Performance is very good, with all data being stored in memory and serialized to a
JSON-LD object graph. Worker coroutines are spawned in the background to distribute
the messages in a scatter-gather pattern. Performance is comparable to, if not
superior to, the Mastodon relay software, with improved memory efficiency.
## Management
You can perform a few management tasks such as peering or depeering other relays by
invoking the `relay.manage` module.
This will show the available management tasks:
$ python3 -m relay.manage
When following remote relays, you should use the `/actor` endpoint as you would in
Pleroma and other LitePub-compliant software.
## Docker
You can run ActivityRelay with docker. Edit `relay.yaml` so that the database
location is set to `./data/relay.jsonld` and then build and run the docker
image :
$ docker volume create activityrelay-data
$ docker build -t activityrelay .
$ docker run -d -p 8080:8080 -v activityrelay-data:/workdir/data activityrelay

View file

@ -1,66 +0,0 @@
#!/usr/bin/env bash
case $1 in
install)
docker build -f Dockerfile -t activityrelay . && \
docker volume create activityrelay-data && \
docker run -it -p 8080:8080 -v activityrelay-data:/data --name activityrelay activityrelay
;;
uninstall)
docker stop activityrelay && \
docker container rm activityrelay && \
docker volume rm activityrelay-data && \
docker image rm activityrelay
;;
start)
docker start activityrelay
;;
stop)
docker stop activityrelay
;;
manage)
shift
docker exec -it activityrelay python3 -m relay "$@"
;;
shell)
docker exec -it activityrelay bash
;;
rescue)
docker run -it --rm --entrypoint bash -v activityrelay-data:/data activityrelay
;;
edit)
if [ -z ${EDITOR} ]; then
echo "EDITOR environmental variable not set"
exit
fi
CONFIG="/tmp/relay-$(date +"%T").yaml"
docker cp activityrelay:/data/relay.yaml $CONFIG && \
$EDITOR $CONFIG && \
docker cp $CONFIG activityrelay:/data/relay.yaml && \
rm $CONFIG
;;
*)
COLS="%-22s %s\n"
echo "Valid commands:"
printf "$COLS" "- start" "Run the relay in the background"
printf "$COLS" "- stop" "Stop the relay"
printf "$COLS" "- manage <cmd> [args]" "Run a relay management command"
printf "$COLS" "- edit" "Edit the relay's config in \$EDITOR"
printf "$COLS" "- shell" "Drop into a bash shell on the running container"
printf "$COLS" "- rescue" "Drop into a bash shell on a temp container with the data volume mounted"
printf "$COLS" "- install" "Build the image, create a new container and volume, and run relay setup"
printf "$COLS" "- uninstall" "Delete the relay image, container, and volume"
;;
esac

View file

@ -1,189 +0,0 @@
# Commands
There are a number of commands to manage your relay's database and config. You can add `--help` to
any category or command to get help on that specific option (ex. `activityrelay inbox --help`).
Note: Unless specified, it is recommended to run any commands while the relay is shutdown.
Note 2: `activityrelay` is only available via pip or pipx if `~/.local/bin` is in `$PATH`. If it
isn't, use `python3 -m relay` if installed via pip or `~/.local/bin/activityrelay` if installed
via pipx
## Run
Run the relay.
activityrelay run
## Setup
Run the setup wizard to configure your relay.
activityrelay setup
## Config
Manage the relay config
activityrelay config
### List
List the current config key/value pairs
activityrelay config list
### Set
Set a value for a config option
activityrelay config set <key> <value>
## Inbox
Manage the list of subscribed instances.
### List
List the currently subscribed instances or relays.
activityrelay inbox list
### Add
Add an inbox to the database. If a domain is specified, it will default to `https://{domain}/inbox`.
If the added instance is not following the relay, expect errors when pushing messages.
activityrelay inbox add <inbox or domain>
### Remove
Remove an inbox from the database. An inbox or domain can be specified.
activityrelay inbox remove <inbox or domain>
### Follow
Follow an instance or relay actor and add it to the database. If a domain is specified, it will
default to `https://{domain}/actor`.
activityrelay inbox follow <actor or domain>
Note: The relay must be running for this command to work.
### Unfollow
Unfollow an instance or relay actor and remove it from the database. If the instance or relay does
not exist anymore, use the `inbox remove` command instead.
activityrelay inbox unfollow <domain, actor, or inbox>
Note: The relay must be running for this command to work.
## Whitelist
Manage the whitelisted domains.
### List
List the current whitelist.
activityrelay whitelist list
### Add
Add a domain to the whitelist.
activityrelay whitelist add <domain>
### Remove
Remove a domain from the whitelist.
activityrelay whitelist remove <domain>
### Import
Add all current inboxes to the whitelist
activityrelay whitelist import
## Instance
Manage the instance ban list.
### List
List the currently banned instances
activityrelay instance list
### Ban
Add an instance to the ban list. If the instance is currently subscribed, remove it from the
database.
activityrelay instance ban <domain>
### Unban
Remove an instance from the ban list.
activityrelay instance unban <domain>
## Software
Manage the software ban list. To get the correct name, check the software's nodeinfo endpoint.
You can find it at nodeinfo\['software']\['name'].
### List
List the currently banned software.
activityrelay software list
### Ban
Add a software name to the ban list.
If `-f` or `--fetch-nodeinfo` is set, treat the name as a domain and try to fetch the software
name via nodeinfo.
If the name is `RELAYS` (case-sensitive), add all known relay software names to the list.
activityrelay software ban [-f/--fetch-nodeinfo] <name, domain, or RELAYS>
### Unban
Remove a software name from the ban list.
If `-f` or `--fetch-nodeinfo` is set, treat the name as a domain and try to fetch the software
name via nodeinfo.
If the name is `RELAYS` (case-sensitive), remove all known relay software names from the list.
activityrelay unban [-f/--fetch-nodeinfo] <name, domain, or RELAYS>

View file

@ -1,105 +0,0 @@
# Configuration
## General
### DB
The path to the database. It contains the relay actor private key and all subscribed
instances. If the path is not absolute, it is relative to the working directory.
db: relay.jsonld
### Listener
The address and port the relay will listen on. If the reverse proxy (nginx, apache, caddy, etc)
is running on the same host, it is recommended to change `listen` to `localhost`
listen: 0.0.0.0
port: 8080
### Note
A small blurb to describe your relay instance. This will show up on the relay's home page.
note: "Make a note about your instance here."
### Post Limit
The maximum number of messages to send out at once. For each incoming message, a message will be
sent out to every subscribed instance minus the instance which sent the message. This limit
is to prevent too many outgoing connections from being made, so adjust if necessary.
Note: If the `workers` option is set to anything above 0, this limit will be per worker.
push_limit: 512
### Push Workers
The relay can be configured to use threads to push messages out. For smaller relays, this isn't
necessary, but bigger ones (>100 instances) will want to set this to the number of available cpu
threads.
workers: 0
### JSON GET cache limit
JSON objects (actors, nodeinfo, etc) will get cached when fetched. This will set the max number of
objects to keep in the cache.
json_cache: 1024
## AP
Various ActivityPub-related settings
### Host
The domain your relay will use to identify itself.
host: relay.example.com
### Whitelist Enabled
If set to `true`, only instances in the whitelist can follow the relay. Any subscribed instances
not in the whitelist will be removed from the inbox list on startup.
whitelist_enabled: false
### Whitelist
A list of domains of instances which are allowed to subscribe to your relay.
whitelist:
- bad-instance.example.com
- another-bad-instance.example.com
### Blocked Instances
A list of instances which are unable to follow the instance. If a subscribed instance is added to
the block list, it will be removed from the inbox list on startup.
blocked_instances:
- bad-instance.example.com
- another-bad-instance.example.com
### Blocked Software
A list of ActivityPub software which cannot follow your relay. This list is empty by default, but
setting this to the below list will block all other relays and prevent relay chains
blocked_software:
- activityrelay
- aoderelay
- social.seattle.wa.us-relay
- unciarelay

View file

@ -1,9 +0,0 @@
# ActivityRelay Documentation
ActivityRelay is a small ActivityPub server that relays messages to subscribed instances.
[Installation](installation.md)
[Configuration](configuration.md)
[Commands](commands.md)

View file

@ -1,67 +0,0 @@
# Installation
There are a few ways to install ActivityRelay. Follow one of the methods below, setup a reverse
proxy, and setup the relay to run via a supervisor. Example configs for caddy, nginx, and systemd
in `installation/`
## Pipx
Pipx uses pip and a custom venv implementation to automatically install modules into a Python
environment and is the recommended method. Install pipx if it isn't installed already. Check out
the [official pipx docs](https://pypa.github.io/pipx/installation/) for more in-depth instructions.
python3 -m pip install pipx
Now simply install ActivityRelay directly from git
pipx install git+https://git.pleroma.social/pleroma/relay@0.2.4
Or from a cloned git repo.
pipx install .
Once finished, you can set up the relay via the setup command. It will ask a few questions to fill
out config options for your relay
~/.local/bin/activityrelay setup
Finally start it up with the run command.
~/.local/bin/activityrelay run
Note: Pipx requires python 3.7+. If your distro doesn't have a compatible version of python, it can
be installed via [pyenv](https://github.com/pyenv/pyenv).
## Pip
The instructions for installation via pip are very similar to pipx. Installation can be done from
git
python3 -m pip install git+https://git.pleroma.social/pleroma/relay@0.2.4
or a cloned git repo.
python3 -m pip install .
Now run the configuration wizard
python3 -m relay setup
And start the relay when finished
python3 -m relay run
## Docker
Installation and management via Docker can be handled with the `docker.sh` script. To install
ActivityRelay, run the install command. Once the image is built and the container is created,
your will be asked to fill out some config options for your relay.
./docker.sh install
Finally start it up. It will be listening on TCP port 8080.
./docker.sh start

View file

@ -1,50 +0,0 @@
# -*- mode: python ; coding: utf-8 -*-
block_cipher = None
a = Analysis(
['relay/__main__.py'],
pathex=[],
binaries=[],
datas=[],
hiddenimports=[
'aputils.enums',
'aputils.errors',
'aputils.misc',
'aputils.objects',
'aputils.signer'
],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[],
win_no_prefer_redirects=False,
win_private_assemblies=False,
cipher=block_cipher,
noarchive=False,
)
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
exe = EXE(
pyz,
a.scripts,
a.binaries,
a.zipfiles,
a.datas,
[],
name='activityrelay',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
console=True,
disable_windowed_traceback=False,
argv_emulation=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
)

View file

@ -9,31 +9,18 @@ port: 8080
# Note
note: "Make a note about your instance here."
# Number of worker threads to start. If 0, use asyncio futures instead of threads.
workers: 0
# Maximum number of inbox posts to do at once
# If workers is set to 1 or above, this is the max for each worker
push_limit: 512
# The amount of json objects to cache from GET requests
json_cache: 1024
# this section is for ActivityPub
ap:
# This is used for generating activitypub messages, as well as instructions for
# linking AP identities. It should be an SSL-enabled domain reachable by https.
# this is used for generating activitypub messages, as well as instructions for
# linking AP identities. it should be an SSL-enabled domain reachable by https.
host: 'relay.example.com'
blocked_instances:
- 'bad-instance.example.com'
- 'another-bad-instance.example.com'
whitelist_enabled: false
whitelist:
- 'good-instance.example.com'
- 'another.good-instance.example.com'
# uncomment the lines below to prevent certain activitypub software from posting
# to the relay (all known relays by default). this uses the software name in nodeinfo
#blocked_software:

View file

@ -1,3 +1,58 @@
__version__ = '0.2.4'
from . import logging
from . import logger
import asyncio
import aiohttp
import aiohttp.web
import yaml
import argparse
parser = argparse.ArgumentParser(
description="A generic LitePub relay (works with all LitePub consumers and Mastodon).",
prog="python -m relay")
parser.add_argument("-c", "--config", type=str, default="relay.yaml",
metavar="<path>", help="the path to your config file")
args = parser.parse_args()
def load_config():
with open(args.config) as f:
options = {}
## Prevent a warning message for pyyaml 5.1+
if getattr(yaml, 'FullLoader', None):
options['Loader'] = yaml.FullLoader
yaml_file = yaml.load(f, **options)
config = {
'db': yaml_file.get('db', 'relay.jsonld'),
'listen': yaml_file.get('listen', '0.0.0.0'),
'port': int(yaml_file.get('port', 8080)),
'note': yaml_file.get('note', 'Make a note about your instance here.'),
'ap': {
'blocked_software': [v.lower() for v in yaml_file['ap'].get('blocked_software', [])],
'blocked_instances': yaml_file['ap'].get('blocked_instances', []),
'host': yaml_file['ap'].get('host', 'localhost'),
'whitelist': yaml_file['ap'].get('whitelist', []),
'whitelist_enabled': yaml_file['ap'].get('whitelist_enabled', False)
}
}
return config
CONFIG = load_config()
from .http_signatures import http_signatures_middleware
app = aiohttp.web.Application(middlewares=[
http_signatures_middleware
])
from . import database
from . import actor
from . import webfinger
from . import default
from . import nodeinfo
from . import http_stats

View file

@ -1,5 +1,55 @@
from relay.manage import main
import asyncio
import aiohttp.web
import logging
import platform
import sys
import Crypto
import time
from . import app, CONFIG
def crypto_check():
vers_split = platform.python_version().split('.')
pip_command = 'pip3 uninstall pycrypto && pip3 install pycryptodome'
if Crypto.__version__ != '2.6.1':
return
if int(vers_split[1]) > 7 and Crypto.__version__ == '2.6.1':
logging.error('PyCrypto is broken on Python 3.8+. Please replace it with pycryptodome before running again. Exiting in 10 sec...')
logging.error(pip_command)
time.sleep(10)
sys.exit()
else:
logging.warning('PyCrypto is old and should be replaced with pycryptodome')
logging.warning(pip_command)
async def start_webserver():
runner = aiohttp.web.AppRunner(app)
await runner.setup()
try:
listen = CONFIG['listen']
except:
listen = 'localhost'
try:
port = CONFIG['port']
except:
port = 8080
logging.info('Starting webserver at {listen}:{port}'.format(listen=listen,port=port))
site = aiohttp.web.TCPSite(runner, listen, port)
await site.start()
def main():
loop = asyncio.get_event_loop()
asyncio.ensure_future(start_webserver())
loop.run_forever()
if __name__ == '__main__':
main()
crypto_check()
main()

347
relay/actor.py Normal file
View file

@ -0,0 +1,347 @@
import aiohttp
import aiohttp.web
import asyncio
import logging
import uuid
import re
import simplejson as json
import cgi
import datetime
from urllib.parse import urlsplit
from Crypto.PublicKey import RSA
from cachetools import LFUCache
from . import app, CONFIG
from .database import DATABASE
from .http_debug import http_debug
from .remote_actor import fetch_actor
from .http_signatures import sign_headers, generate_body_digest
# generate actor keys if not present
if "actorKeys" not in DATABASE:
logging.info("No actor keys present, generating 4096-bit RSA keypair.")
privkey = RSA.generate(4096)
pubkey = privkey.publickey()
DATABASE["actorKeys"] = {
"publicKey": pubkey.exportKey('PEM').decode('utf-8'),
"privateKey": privkey.exportKey('PEM').decode('utf-8')
}
PRIVKEY = RSA.importKey(DATABASE["actorKeys"]["privateKey"])
PUBKEY = PRIVKEY.publickey()
AP_CONFIG = CONFIG['ap']
CACHE_SIZE = CONFIG.get('cache-size', 16384)
CACHE = LFUCache(CACHE_SIZE)
sem = asyncio.Semaphore(500)
async def actor(request):
data = {
"@context": "https://www.w3.org/ns/activitystreams",
"endpoints": {
"sharedInbox": "https://{}/inbox".format(request.host)
},
"followers": "https://{}/followers".format(request.host),
"following": "https://{}/following".format(request.host),
"inbox": "https://{}/inbox".format(request.host),
"name": "ActivityRelay",
"type": "Application",
"id": "https://{}/actor".format(request.host),
"publicKey": {
"id": "https://{}/actor#main-key".format(request.host),
"owner": "https://{}/actor".format(request.host),
"publicKeyPem": DATABASE["actorKeys"]["publicKey"]
},
"summary": "ActivityRelay bot",
"preferredUsername": "relay",
"url": "https://{}/actor".format(request.host)
}
return aiohttp.web.json_response(data, content_type='application/activity+json')
app.router.add_get('/actor', actor)
get_actor_inbox = lambda actor: actor.get('endpoints', {}).get('sharedInbox', actor['inbox'])
async def push_message_to_actor(actor, message, our_key_id):
inbox = get_actor_inbox(actor)
url = urlsplit(inbox)
# XXX: Digest
data = json.dumps(message)
headers = {
'(request-target)': 'post {}'.format(url.path),
'Content-Length': str(len(data)),
'Content-Type': 'application/activity+json',
'User-Agent': 'ActivityRelay',
'Host': url.netloc,
'Digest': 'SHA-256={}'.format(generate_body_digest(data)),
'Date': datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
}
headers['signature'] = sign_headers(headers, PRIVKEY, our_key_id)
headers.pop('(request-target)')
headers.pop('Host')
logging.debug('%r >> %r', inbox, message)
global sem
async with sem:
try:
async with aiohttp.ClientSession(trace_configs=[http_debug()]) as session:
async with session.post(inbox, data=data, headers=headers) as resp:
if resp.status == 202:
return
resp_payload = await resp.text()
logging.debug('%r >> resp %r', inbox, resp_payload)
except Exception as e:
logging.info('Caught %r while pushing to %r.', e, inbox)
async def fetch_nodeinfo(domain):
headers = {'Accept': 'application/json'}
nodeinfo_url = None
wk_nodeinfo = await fetch_actor(f'https://{domain}/.well-known/nodeinfo', headers=headers)
if not wk_nodeinfo:
return
for link in wk_nodeinfo.get('links', ''):
if link['rel'] == 'http://nodeinfo.diaspora.software/ns/schema/2.0':
nodeinfo_url = link['href']
break
if not nodeinfo_url:
return
nodeinfo_data = await fetch_actor(nodeinfo_url, headers=headers)
software = nodeinfo_data.get('software')
return software.get('name') if software else None
async def follow_remote_actor(actor_uri):
actor = await fetch_actor(actor_uri)
if not actor:
logging.info('failed to fetch actor at: %r', actor_uri)
return
if AP_CONFIG['whitelist_enabled'] is True and urlsplit(actor_uri).hostname not in AP_CONFIG['whitelist']:
logging.info('refusing to follow non-whitelisted actor: %r', actor_uri)
return
logging.info('following: %r', actor_uri)
message = {
"@context": "https://www.w3.org/ns/activitystreams",
"type": "Follow",
"to": [actor['id']],
"object": actor['id'],
"id": "https://{}/activities/{}".format(AP_CONFIG['host'], uuid.uuid4()),
"actor": "https://{}/actor".format(AP_CONFIG['host'])
}
await push_message_to_actor(actor, message, "https://{}/actor#main-key".format(AP_CONFIG['host']))
async def unfollow_remote_actor(actor_uri):
actor = await fetch_actor(actor_uri)
if not actor:
logging.info('failed to fetch actor at: %r', actor_uri)
return
logging.info('unfollowing: %r', actor_uri)
message = {
"@context": "https://www.w3.org/ns/activitystreams",
"type": "Undo",
"to": [actor['id']],
"object": {
"type": "Follow",
"object": actor_uri,
"actor": actor['id'],
"id": "https://{}/activities/{}".format(AP_CONFIG['host'], uuid.uuid4())
},
"id": "https://{}/activities/{}".format(AP_CONFIG['host'], uuid.uuid4()),
"actor": "https://{}/actor".format(AP_CONFIG['host'])
}
await push_message_to_actor(actor, message, "https://{}/actor#main-key".format(AP_CONFIG['host']))
tag_re = re.compile(r'(<!--.*?-->|<[^>]*>)')
def strip_html(data):
no_tags = tag_re.sub('', data)
return cgi.escape(no_tags)
def distill_inboxes(actor, object_id):
global DATABASE
origin_hostname = urlsplit(object_id).hostname
inbox = get_actor_inbox(actor)
targets = [target for target in DATABASE.get('relay-list', []) if target != inbox]
targets = [target for target in targets if urlsplit(target).hostname != origin_hostname]
hostnames = [urlsplit(target).hostname for target in targets]
assert inbox not in targets
assert origin_hostname not in hostnames
return targets
def distill_object_id(activity):
logging.debug('>> determining object ID for %r', activity['object'])
obj = activity['object']
if isinstance(obj, str):
return obj
return obj['id']
async def handle_relay(actor, data, request):
global CACHE
object_id = distill_object_id(data)
if object_id in CACHE:
logging.debug('>> already relayed %r as %r', object_id, CACHE[object_id])
return
activity_id = "https://{}/activities/{}".format(request.host, uuid.uuid4())
message = {
"@context": "https://www.w3.org/ns/activitystreams",
"type": "Announce",
"to": ["https://{}/followers".format(request.host)],
"actor": "https://{}/actor".format(request.host),
"object": object_id,
"id": activity_id
}
logging.debug('>> relay: %r', message)
inboxes = distill_inboxes(actor, object_id)
futures = [push_message_to_actor({'inbox': inbox}, message, 'https://{}/actor#main-key'.format(request.host)) for inbox in inboxes]
asyncio.ensure_future(asyncio.gather(*futures))
CACHE[object_id] = activity_id
async def handle_forward(actor, data, request):
object_id = distill_object_id(data)
logging.debug('>> Relay %r', data)
inboxes = distill_inboxes(actor, object_id)
futures = [
push_message_to_actor(
{'inbox': inbox},
data,
'https://{}/actor#main-key'.format(request.host))
for inbox in inboxes]
asyncio.ensure_future(asyncio.gather(*futures))
async def handle_follow(actor, data, request):
global DATABASE
following = DATABASE.get('relay-list', [])
inbox = get_actor_inbox(actor)
if urlsplit(inbox).hostname in AP_CONFIG['blocked_instances']:
return
if inbox not in following:
following += [inbox]
DATABASE['relay-list'] = following
asyncio.ensure_future(follow_remote_actor(actor['id']))
message = {
"@context": "https://www.w3.org/ns/activitystreams",
"type": "Accept",
"to": [actor["id"]],
"actor": "https://{}/actor".format(request.host),
# this is wrong per litepub, but mastodon < 2.4 is not compliant with that profile.
"object": {
"type": "Follow",
"id": data["id"],
"object": "https://{}/actor".format(request.host),
"actor": actor["id"]
},
"id": "https://{}/activities/{}".format(request.host, uuid.uuid4()),
}
asyncio.ensure_future(push_message_to_actor(actor, message, 'https://{}/actor#main-key'.format(request.host)))
async def handle_undo(actor, data, request):
global DATABASE
child = data['object']
if child['type'] == 'Follow':
following = DATABASE.get('relay-list', [])
inbox = get_actor_inbox(actor)
if inbox in following:
following.remove(inbox)
DATABASE['relay-list'] = following
await unfollow_remote_actor(actor['id'])
processors = {
'Announce': handle_relay,
'Create': handle_relay,
'Delete': handle_forward,
'Follow': handle_follow,
'Undo': handle_undo,
'Update': handle_forward,
}
async def inbox(request):
data = await request.json()
instance = urlsplit(data['actor']).hostname
if AP_CONFIG['blocked_software']:
software = await fetch_nodeinfo(instance)
if software and software.lower() in AP_CONFIG['blocked_software']:
raise aiohttp.web.HTTPUnauthorized(body='relays have been blocked', content_type='text/plain')
if 'actor' not in data or not request['validated']:
raise aiohttp.web.HTTPUnauthorized(body='access denied', content_type='text/plain')
elif data['type'] != 'Follow' and 'https://{}/inbox'.format(instance) not in DATABASE['relay-list']:
raise aiohttp.web.HTTPUnauthorized(body='access denied', content_type='text/plain')
elif AP_CONFIG['whitelist_enabled'] is True and instance not in AP_CONFIG['whitelist']:
raise aiohttp.web.HTTPUnauthorized(body='access denied', content_type='text/plain')
actor = await fetch_actor(data["actor"])
actor_uri = 'https://{}/actor'.format(request.host)
logging.debug(">> payload %r", data)
processor = processors.get(data['type'], None)
if processor:
await processor(actor, data, request)
return aiohttp.web.Response(body=b'{}', content_type='application/activity+json')
app.router.add_post('/inbox', inbox)

View file

@ -1,216 +0,0 @@
import asyncio
import logging
import os
import queue
import signal
import threading
import traceback
from aiohttp import web
from datetime import datetime, timedelta
from .config import RelayConfig
from .database import RelayDatabase
from .http_client import HttpClient
from .misc import DotDict, check_open_port, set_app
from .views import routes
class Application(web.Application):
def __init__(self, cfgpath):
web.Application.__init__(self)
self['starttime'] = None
self['running'] = False
self['config'] = RelayConfig(cfgpath)
if not self['config'].load():
self['config'].save()
if self.config.is_docker:
self.config.update({
'db': '/data/relay.jsonld',
'listen': '0.0.0.0',
'port': 8080
})
self['workers'] = []
self['last_worker'] = 0
set_app(self)
self['database'] = RelayDatabase(self['config'])
self['database'].load()
self['client'] = HttpClient(
database = self.database,
limit = self.config.push_limit,
timeout = self.config.timeout,
cache_size = self.config.json_cache
)
self.set_signal_handler()
@property
def client(self):
return self['client']
@property
def config(self):
return self['config']
@property
def database(self):
return self['database']
@property
def uptime(self):
if not self['starttime']:
return timedelta(seconds=0)
uptime = datetime.now() - self['starttime']
return timedelta(seconds=uptime.seconds)
def push_message(self, inbox, message):
if self.config.workers <= 0:
return asyncio.ensure_future(self.client.post(inbox, message))
worker = self['workers'][self['last_worker']]
worker.queue.put((inbox, message))
self['last_worker'] += 1
if self['last_worker'] >= len(self['workers']):
self['last_worker'] = 0
def set_signal_handler(self):
for sig in {'SIGHUP', 'SIGINT', 'SIGQUIT', 'SIGTERM'}:
try:
signal.signal(getattr(signal, sig), self.stop)
# some signals don't exist in windows, so skip them
except AttributeError:
pass
def run(self):
if not check_open_port(self.config.listen, self.config.port):
return logging.error(f'A server is already running on port {self.config.port}')
for route in routes:
self.router.add_route(*route)
logging.info(f'Starting webserver at {self.config.host} ({self.config.listen}:{self.config.port})')
asyncio.run(self.handle_run())
def stop(self, *_):
self['running'] = False
async def handle_run(self):
self['running'] = True
if self.config.workers > 0:
for i in range(self.config.workers):
worker = PushWorker(self)
worker.start()
self['workers'].append(worker)
runner = web.AppRunner(self, access_log_format='%{X-Forwarded-For}i "%r" %s %b "%{User-Agent}i"')
await runner.setup()
site = web.TCPSite(runner,
host = self.config.listen,
port = self.config.port,
reuse_address = True
)
await site.start()
self['starttime'] = datetime.now()
while self['running']:
await asyncio.sleep(0.25)
await site.stop()
self['starttime'] = None
self['running'] = False
self['workers'].clear()
class PushWorker(threading.Thread):
def __init__(self, app):
threading.Thread.__init__(self)
self.app = app
self.queue = queue.Queue()
def run(self):
self.client = HttpClient(
database = self.app.database,
limit = self.app.config.push_limit,
timeout = self.app.config.timeout,
cache_size = self.app.config.json_cache
)
asyncio.run(self.handle_queue())
async def handle_queue(self):
while self.app['running']:
try:
inbox, message = self.queue.get(block=True, timeout=0.25)
self.queue.task_done()
logging.verbose(f'New push from Thread-{threading.get_ident()}')
await self.client.post(inbox, message)
except queue.Empty:
pass
## make sure an exception doesn't bring down the worker
except Exception:
traceback.print_exc()
await self.client.close()
## Can't sub-class web.Request, so let's just add some properties
def request_actor(self):
try: return self['actor']
except KeyError: pass
def request_instance(self):
try: return self['instance']
except KeyError: pass
def request_message(self):
try: return self['message']
except KeyError: pass
def request_signature(self):
if 'signature' not in self._state:
try: self['signature'] = DotDict.new_from_signature(self.headers['signature'])
except KeyError: return
return self['signature']
setattr(web.Request, 'actor', property(request_actor))
setattr(web.Request, 'instance', property(request_instance))
setattr(web.Request, 'message', property(request_message))
setattr(web.Request, 'signature', property(request_signature))
setattr(web.Request, 'config', property(lambda self: self.app.config))
setattr(web.Request, 'database', property(lambda self: self.app.database))

View file

@ -1,245 +0,0 @@
import json
import os
import yaml
from functools import cached_property
from pathlib import Path
from urllib.parse import urlparse
from .misc import DotDict, boolean
RELAY_SOFTWARE = [
'activityrelay', # https://git.pleroma.social/pleroma/relay
'aoderelay', # https://git.asonix.dog/asonix/relay
'feditools-relay' # https://git.ptzo.gdn/feditools/relay
]
APKEYS = [
'host',
'whitelist_enabled',
'blocked_software',
'blocked_instances',
'whitelist'
]
class RelayConfig(DotDict):
def __init__(self, path):
DotDict.__init__(self, {})
if self.is_docker:
path = '/data/config.yaml'
self._path = Path(path).expanduser()
self.reset()
def __setitem__(self, key, value):
if key in ['blocked_instances', 'blocked_software', 'whitelist']:
assert isinstance(value, (list, set, tuple))
elif key in ['port', 'workers', 'json_cache', 'timeout']:
if not isinstance(value, int):
value = int(value)
elif key == 'whitelist_enabled':
if not isinstance(value, bool):
value = boolean(value)
super().__setitem__(key, value)
@property
def db(self):
return Path(self['db']).expanduser().resolve()
@property
def path(self):
return self._path
@property
def actor(self):
return f'https://{self.host}/actor'
@property
def inbox(self):
return f'https://{self.host}/inbox'
@property
def keyid(self):
return f'{self.actor}#main-key'
@cached_property
def is_docker(self):
return bool(os.environ.get('DOCKER_RUNNING'))
def reset(self):
self.clear()
self.update({
'db': str(self._path.parent.joinpath(f'{self._path.stem}.jsonld')),
'listen': '0.0.0.0',
'port': 8080,
'note': 'Make a note about your instance here.',
'push_limit': 512,
'json_cache': 1024,
'timeout': 10,
'workers': 0,
'host': 'relay.example.com',
'whitelist_enabled': False,
'blocked_software': [],
'blocked_instances': [],
'whitelist': []
})
def ban_instance(self, instance):
if instance.startswith('http'):
instance = urlparse(instance).hostname
if self.is_banned(instance):
return False
self.blocked_instances.append(instance)
return True
def unban_instance(self, instance):
if instance.startswith('http'):
instance = urlparse(instance).hostname
try:
self.blocked_instances.remove(instance)
return True
except:
return False
def ban_software(self, software):
if self.is_banned_software(software):
return False
self.blocked_software.append(software)
return True
def unban_software(self, software):
try:
self.blocked_software.remove(software)
return True
except:
return False
def add_whitelist(self, instance):
if instance.startswith('http'):
instance = urlparse(instance).hostname
if self.is_whitelisted(instance):
return False
self.whitelist.append(instance)
return True
def del_whitelist(self, instance):
if instance.startswith('http'):
instance = urlparse(instance).hostname
try:
self.whitelist.remove(instance)
return True
except:
return False
def is_banned(self, instance):
if instance.startswith('http'):
instance = urlparse(instance).hostname
return instance in self.blocked_instances
def is_banned_software(self, software):
if not software:
return False
return software.lower() in self.blocked_software
def is_whitelisted(self, instance):
if instance.startswith('http'):
instance = urlparse(instance).hostname
return instance in self.whitelist
def load(self):
self.reset()
options = {}
try:
options['Loader'] = yaml.FullLoader
except AttributeError:
pass
try:
with open(self.path) as fd:
config = yaml.load(fd, **options)
except FileNotFoundError:
return False
if not config:
return False
for key, value in config.items():
if key in ['ap']:
for k, v in value.items():
if k not in self:
continue
self[k] = v
continue
elif key not in self:
continue
self[key] = value
if self.host.endswith('example.com'):
return False
return True
def save(self):
config = {
# just turning config.db into a string is good enough for now
'db': str(self.db),
'listen': self.listen,
'port': self.port,
'note': self.note,
'push_limit': self.push_limit,
'workers': self.workers,
'json_cache': self.json_cache,
'timeout': self.timeout,
'ap': {key: self[key] for key in APKEYS}
}
with open(self._path, 'w') as fd:
yaml.dump(config, fd, sort_keys=False)
return config

View file

@ -1,197 +1,43 @@
import aputils
import asyncio
import json
import logging
import traceback
from urllib.parse import urlparse
import urllib.parse
import simplejson as json
from sys import exit
class RelayDatabase(dict):
def __init__(self, config):
dict.__init__(self, {
'relay-list': {},
'private-key': None,
'follow-requests': {},
'version': 1
})
from . import CONFIG
AP_CONFIG = CONFIG['ap']
self.config = config
self.signer = None
try:
with open(CONFIG['db']) as f:
DATABASE = json.load(f)
except FileNotFoundError:
logging.info('No database was found, making a new one.')
DATABASE = {}
except json.decoder.JSONDecodeError:
logging.info('Invalid JSON in db. Exiting...')
exit(1)
following = DATABASE.get('relay-list', [])
for inbox in following:
if urllib.parse.urlsplit(inbox).hostname in AP_CONFIG['blocked_instances']:
following.remove(inbox)
elif AP_CONFIG['whitelist_enabled'] is True and urllib.parse.urlsplit(inbox).hostname not in AP_CONFIG['whitelist']:
following.remove(inbox)
DATABASE['relay-list'] = following
if 'actors' in DATABASE:
DATABASE.pop('actors')
async def database_save():
while True:
with open(CONFIG['db'], 'w') as f:
json.dump(DATABASE, f)
await asyncio.sleep(30)
@property
def hostnames(self):
return tuple(self['relay-list'].keys())
@property
def inboxes(self):
return tuple(data['inbox'] for data in self['relay-list'].values())
def load(self):
new_db = True
try:
with self.config.db.open() as fd:
data = json.load(fd)
self['version'] = data.get('version', None)
self['private-key'] = data.get('private-key')
if self['version'] == None:
self['version'] = 1
if 'actorKeys' in data:
self['private-key'] = data['actorKeys']['privateKey']
for item in data.get('relay-list', []):
domain = urlparse(item).hostname
self['relay-list'][domain] = {
'domain': domain,
'inbox': item,
'followid': None
}
else:
self['relay-list'] = data.get('relay-list', {})
for domain, instance in self['relay-list'].items():
if self.config.is_banned(domain) or (self.config.whitelist_enabled and not self.config.is_whitelisted(domain)):
self.del_inbox(domain)
continue
if not instance.get('domain'):
instance['domain'] = domain
new_db = False
except FileNotFoundError:
pass
except json.decoder.JSONDecodeError as e:
if self.config.db.stat().st_size > 0:
raise e from None
if not self['private-key']:
logging.info("No actor keys present, generating 4096-bit RSA keypair.")
self.signer = aputils.Signer.new(self.config.keyid, size=4096)
self['private-key'] = self.signer.export()
else:
self.signer = aputils.Signer(self['private-key'], self.config.keyid)
self.save()
return not new_db
def save(self):
with self.config.db.open('w') as fd:
json.dump(self, fd, indent=4)
def get_inbox(self, domain, fail=False):
if domain.startswith('http'):
domain = urlparse(domain).hostname
inbox = self['relay-list'].get(domain)
if inbox:
return inbox
if fail:
raise KeyError(domain)
def add_inbox(self, inbox, followid=None, software=None):
assert inbox.startswith('https'), 'Inbox must be a url'
domain = urlparse(inbox).hostname
instance = self.get_inbox(domain)
if instance:
if followid:
instance['followid'] = followid
if software:
instance['software'] = software
return instance
self['relay-list'][domain] = {
'domain': domain,
'inbox': inbox,
'followid': followid,
'software': software
}
logging.verbose(f'Added inbox to database: {inbox}')
return self['relay-list'][domain]
def del_inbox(self, domain, followid=None, fail=False):
data = self.get_inbox(domain, fail=False)
if not data:
if fail:
raise KeyError(domain)
return False
if not data['followid'] or not followid or data['followid'] == followid:
del self['relay-list'][data['domain']]
logging.verbose(f'Removed inbox from database: {data["inbox"]}')
return True
if fail:
raise ValueError('Follow IDs do not match')
logging.debug(f'Follow ID does not match: db = {data["followid"]}, object = {followid}')
return False
def get_request(self, domain, fail=True):
if domain.startswith('http'):
domain = urlparse(domain).hostname
try:
return self['follow-requests'][domain]
except KeyError as e:
if fail:
raise e
def add_request(self, actor, inbox, followid):
domain = urlparse(inbox).hostname
try:
request = self.get_request(domain)
request['followid'] = followid
except KeyError:
pass
self['follow-requests'][domain] = {
'actor': actor,
'inbox': inbox,
'followid': followid
}
def del_request(self, domain):
if domain.startswith('http'):
domain = urlparse(inbox).hostname
del self['follow-requests'][domain]
def distill_inboxes(self, message):
src_domains = {
message.domain,
urlparse(message.objectid).netloc
}
for domain, instance in self['relay-list'].items():
if domain not in src_domains:
yield instance['inbox']
asyncio.ensure_future(database_save())

36
relay/default.py Normal file
View file

@ -0,0 +1,36 @@
import aiohttp.web
import urllib.parse
from . import app, CONFIG
from .database import DATABASE
host = CONFIG['ap']['host']
note = CONFIG['note']
inboxes = DATABASE.get('relay-list', [])
async def default(request):
targets = '<br>'.join([urllib.parse.urlsplit(target).hostname for target in inboxes])
return aiohttp.web.Response(
status=200,
content_type="text/html",
charset="utf-8",
text="""
<html><head>
<title>ActivityPub Relay at {host}</title>
<style>
p {{ color: #FFFFFF; font-family: monospace, arial; font-size: 100%; }}
body {{ background-color: #000000; }}
</style>
</head>
<body>
<p>This is an Activity Relay for fediverse instances.</p>
<p>{note}</p>
<p>For Mastodon and Misskey instances, you may subscribe to this relay with the address: <a href="https://{host}/inbox">https://{host}/inbox</a></p>
<p>For Pleroma and other instances, you may subscribe to this relay with the address: <a href="https://{host}/actor">https://{host}/actor</a></p>
<p>To host your own relay, you may download the code at this address: <a href="https://git.pleroma.social/pleroma/relay">https://git.pleroma.social/pleroma/relay</a></p>
<br><p>List of {count} registered instances:<br>{targets}</p>
</body></html>
""".format(host=host, note=note,targets=targets,count=len(inboxes)))
app.router.add_get('/', default)

View file

@ -1,222 +0,0 @@
import logging
import traceback
from aiohttp import ClientSession, ClientTimeout, TCPConnector
from aiohttp.client_exceptions import ClientConnectionError, ClientSSLError
from asyncio.exceptions import TimeoutError as AsyncTimeoutError
from aputils import Nodeinfo, WellKnownNodeinfo
from datetime import datetime
from cachetools import LRUCache
from json.decoder import JSONDecodeError
from urllib.parse import urlparse
from . import __version__
from .misc import (
MIMETYPES,
DotDict,
Message
)
HEADERS = {
'Accept': f'{MIMETYPES["activity"]}, {MIMETYPES["json"]};q=0.9',
'User-Agent': f'ActivityRelay/{__version__}'
}
class Cache(LRUCache):
def set_maxsize(self, value):
self.__maxsize = int(value)
class HttpClient:
def __init__(self, database, limit=100, timeout=10, cache_size=1024):
self.database = database
self.cache = Cache(cache_size)
self.cfg = {'limit': limit, 'timeout': timeout}
self._conn = None
self._session = None
async def __aenter__(self):
await self.open()
return self
async def __aexit__(self, *_):
await self.close()
@property
def limit(self):
return self.cfg['limit']
@property
def timeout(self):
return self.cfg['timeout']
async def open(self):
if self._session:
return
self._conn = TCPConnector(
limit = self.limit,
ttl_dns_cache = 300,
)
self._session = ClientSession(
connector = self._conn,
headers = HEADERS,
connector_owner = True,
timeout = ClientTimeout(total=self.timeout)
)
async def close(self):
if not self._session:
return
await self._session.close()
await self._conn.close()
self._conn = None
self._session = None
async def get(self, url, sign_headers=False, loads=None, force=False):
await self.open()
try: url, _ = url.split('#', 1)
except: pass
if not force and url in self.cache:
return self.cache[url]
headers = {}
if sign_headers:
headers.update(self.database.signer.sign_headers('GET', url, algorithm='original'))
try:
logging.verbose(f'Fetching resource: {url}')
async with self._session.get(url, headers=headers) as resp:
## Not expecting a response with 202s, so just return
if resp.status == 202:
return
elif resp.status != 200:
logging.verbose(f'Received error when requesting {url}: {resp.status}')
logging.verbose(await resp.read()) # change this to debug
return
if loads:
message = await resp.json(loads=loads)
elif resp.content_type == MIMETYPES['activity']:
message = await resp.json(loads=Message.new_from_json)
elif resp.content_type == MIMETYPES['json']:
message = await resp.json(loads=DotDict.new_from_json)
else:
# todo: raise TypeError or something
logging.verbose(f'Invalid Content-Type for "{url}": {resp.content_type}')
return logging.debug(f'Response: {resp.read()}')
logging.debug(f'{url} >> resp {message.to_json(4)}')
self.cache[url] = message
return message
except JSONDecodeError:
logging.verbose(f'Failed to parse JSON')
except ClientSSLError:
logging.verbose(f'SSL error when connecting to {urlparse(url).netloc}')
except (AsyncTimeoutError, ClientConnectionError):
logging.verbose(f'Failed to connect to {urlparse(url).netloc}')
except Exception as e:
traceback.print_exc()
async def post(self, url, message):
await self.open()
instance = self.database.get_inbox(url)
## Using the old algo by default is probably a better idea right now
if instance and instance.get('software') in {'mastodon'}:
algorithm = 'hs2019'
else:
algorithm = 'original'
headers = {'Content-Type': 'application/activity+json'}
headers.update(self.database.signer.sign_headers('POST', url, message, algorithm=algorithm))
try:
logging.verbose(f'Sending "{message.type}" to {url}')
async with self._session.post(url, headers=headers, data=message.to_json()) as resp:
## Not expecting a response, so just return
if resp.status in {200, 202}:
return logging.verbose(f'Successfully sent "{message.type}" to {url}')
logging.verbose(f'Received error when pushing to {url}: {resp.status}')
return logging.verbose(await resp.read()) # change this to debug
except ClientSSLError:
logging.warning(f'SSL error when pushing to {urlparse(url).netloc}')
except (AsyncTimeoutError, ClientConnectionError):
logging.warning(f'Failed to connect to {urlparse(url).netloc} for message push')
## prevent workers from being brought down
except Exception as e:
traceback.print_exc()
## Additional methods ##
async def fetch_nodeinfo(self, domain):
nodeinfo_url = None
wk_nodeinfo = await self.get(
f'https://{domain}/.well-known/nodeinfo',
loads = WellKnownNodeinfo.new_from_json
)
if not wk_nodeinfo:
logging.verbose(f'Failed to fetch well-known nodeinfo url for domain: {domain}')
return False
for version in ['20', '21']:
try:
nodeinfo_url = wk_nodeinfo.get_url(version)
except KeyError:
pass
if not nodeinfo_url:
logging.verbose(f'Failed to fetch nodeinfo url for domain: {domain}')
return False
return await self.get(nodeinfo_url, loads=Nodeinfo.new_from_json) or False
async def get(database, *args, **kwargs):
async with HttpClient(database) as client:
return await client.get(*args, **kwargs)
async def post(database, *args, **kwargs):
async with HttpClient(database) as client:
return await client.post(*args, **kwargs)
async def fetch_nodeinfo(database, *args, **kwargs):
async with HttpClient(database) as client:
return await client.fetch_nodeinfo(*args, **kwargs)

66
relay/http_debug.py Normal file
View file

@ -0,0 +1,66 @@
import logging
import aiohttp
import aiohttp.web
from collections import defaultdict
STATS = {
'requests': defaultdict(int),
'response_codes': defaultdict(int),
'response_codes_per_domain': defaultdict(lambda: defaultdict(int)),
'delivery_codes': defaultdict(int),
'delivery_codes_per_domain': defaultdict(lambda: defaultdict(int)),
'exceptions': defaultdict(int),
'exceptions_per_domain': defaultdict(lambda: defaultdict(int)),
'delivery_exceptions': defaultdict(int),
'delivery_exceptions_per_domain': defaultdict(lambda: defaultdict(int))
}
async def on_request_start(session, trace_config_ctx, params):
global STATS
logging.debug("HTTP START [%r], [%r]", session, params)
STATS['requests'][params.url.host] += 1
async def on_request_end(session, trace_config_ctx, params):
global STATS
logging.debug("HTTP END [%r], [%r]", session, params)
host = params.url.host
status = params.response.status
STATS['response_codes'][status] += 1
STATS['response_codes_per_domain'][host][status] += 1
if params.method == 'POST':
STATS['delivery_codes'][status] += 1
STATS['delivery_codes_per_domain'][host][status] += 1
async def on_request_exception(session, trace_config_ctx, params):
global STATS
logging.debug("HTTP EXCEPTION [%r], [%r]", session, params)
host = params.url.host
exception = repr(params.exception)
STATS['exceptions'][exception] += 1
STATS['exceptions_per_domain'][host][exception] += 1
if params.method == 'POST':
STATS['delivery_exceptions'][exception] += 1
STATS['delivery_exceptions_per_domain'][host][exception] += 1
def http_debug():
trace_config = aiohttp.TraceConfig()
trace_config.on_request_start.append(on_request_start)
trace_config.on_request_end.append(on_request_end)
trace_config.on_request_exception.append(on_request_exception)
return trace_config

148
relay/http_signatures.py Normal file
View file

@ -0,0 +1,148 @@
import aiohttp
import aiohttp.web
import base64
import logging
from Crypto.PublicKey import RSA
from Crypto.Hash import SHA, SHA256, SHA512
from Crypto.Signature import PKCS1_v1_5
from cachetools import LFUCache
from async_lru import alru_cache
from .remote_actor import fetch_actor
HASHES = {
'sha1': SHA,
'sha256': SHA256,
'sha512': SHA512
}
def split_signature(sig):
default = {"headers": "date"}
sig = sig.strip().split(',')
for chunk in sig:
k, _, v = chunk.partition('=')
v = v.strip('\"')
default[k] = v
default['headers'] = default['headers'].split()
return default
def build_signing_string(headers, used_headers):
return '\n'.join(map(lambda x: ': '.join([x.lower(), headers[x]]), used_headers))
SIGSTRING_CACHE = LFUCache(1024)
def sign_signing_string(sigstring, key):
if sigstring in SIGSTRING_CACHE:
return SIGSTRING_CACHE[sigstring]
pkcs = PKCS1_v1_5.new(key)
h = SHA256.new()
h.update(sigstring.encode('ascii'))
sigdata = pkcs.sign(h)
sigdata = base64.b64encode(sigdata)
SIGSTRING_CACHE[sigstring] = sigdata.decode('ascii')
return SIGSTRING_CACHE[sigstring]
def generate_body_digest(body):
bodyhash = SIGSTRING_CACHE.get(body)
if not bodyhash:
h = SHA256.new(body.encode('utf-8'))
bodyhash = base64.b64encode(h.digest()).decode('utf-8')
SIGSTRING_CACHE[body] = bodyhash
return bodyhash
def sign_headers(headers, key, key_id):
headers = {x.lower(): y for x, y in headers.items()}
used_headers = headers.keys()
sig = {
'keyId': key_id,
'algorithm': 'rsa-sha256',
'headers': ' '.join(used_headers)
}
sigstring = build_signing_string(headers, used_headers)
sig['signature'] = sign_signing_string(sigstring, key)
chunks = ['{}="{}"'.format(k, v) for k, v in sig.items()]
return ','.join(chunks)
@alru_cache(maxsize=16384)
async def fetch_actor_key(actor):
actor_data = await fetch_actor(actor)
if not actor_data:
return None
try:
return RSA.importKey(actor_data['publicKey']['publicKeyPem'])
except Exception as e:
logging.debug(f'Exception occured while fetching actor key: {e}')
async def validate(actor, request):
pubkey = await fetch_actor_key(actor)
if not pubkey:
return False
logging.debug('actor key: %r', pubkey)
headers = request.headers.copy()
headers['(request-target)'] = ' '.join([request.method.lower(), request.path])
sig = split_signature(headers['signature'])
logging.debug('sigdata: %r', sig)
sigstring = build_signing_string(headers, sig['headers'])
logging.debug('sigstring: %r', sigstring)
sign_alg, _, hash_alg = sig['algorithm'].partition('-')
logging.debug('sign alg: %r, hash alg: %r', sign_alg, hash_alg)
sigdata = base64.b64decode(sig['signature'])
pkcs = PKCS1_v1_5.new(pubkey)
h = HASHES[hash_alg].new()
h.update(sigstring.encode('ascii'))
result = pkcs.verify(h, sigdata)
request['validated'] = result
logging.debug('validates? %r', result)
return result
async def http_signatures_middleware(app, handler):
async def http_signatures_handler(request):
request['validated'] = False
if 'signature' in request.headers and request.method == 'POST':
data = await request.json()
if 'actor' not in data:
raise aiohttp.web.HTTPUnauthorized(body='signature check failed, no actor in message')
actor = data["actor"]
if not (await validate(actor, request)):
logging.info('Signature validation failed for: %r', actor)
raise aiohttp.web.HTTPUnauthorized(body='signature check failed, signature did not match key')
return (await handler(request))
return (await handler(request))
return http_signatures_handler

11
relay/http_stats.py Normal file
View file

@ -0,0 +1,11 @@
import aiohttp.web
from . import app
from .http_debug import STATS
async def stats(request):
return aiohttp.web.json_response(STATS)
app.router.add_get('/stats', stats)

View file

@ -1,47 +0,0 @@
import logging
import os
from pathlib import Path
## Add the verbose logging level
def verbose(message, *args, **kwargs):
if not logging.root.isEnabledFor(logging.VERBOSE):
return
logging.log(logging.VERBOSE, message, *args, **kwargs)
setattr(logging, 'verbose', verbose)
setattr(logging, 'VERBOSE', 15)
logging.addLevelName(15, 'VERBOSE')
## Get log level and file from environment if possible
env_log_level = os.environ.get('LOG_LEVEL', 'INFO').upper()
try:
env_log_file = Path(os.environ.get('LOG_FILE')).expanduser().resolve()
except TypeError:
env_log_file = None
## Make sure the level from the environment is valid
try:
log_level = getattr(logging, env_log_level)
except AttributeError:
log_level = logging.INFO
## Set logging config
handlers = [logging.StreamHandler()]
if env_log_file:
handlers.append(logging.FileHandler(env_log_file))
logging.basicConfig(
level = log_level,
format = "[%(asctime)s] %(levelname)s: %(message)s",
handlers = handlers
)

8
relay/logging.py Normal file
View file

@ -0,0 +1,8 @@
import logging
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] %(levelname)s: %(message)s",
handlers=[logging.StreamHandler()]
)

View file

@ -1,428 +1,83 @@
import Crypto
import asyncio
import click
import logging
import platform
import sys
import simplejson as json
from urllib.parse import urlparse
from .actor import follow_remote_actor, unfollow_remote_actor
from . import CONFIG
from .database import DATABASE
from . import misc, __version__
from . import http_client as http
from .application import Application
from .config import RELAY_SOFTWARE
def relay_list():
print('Connected to the following instances or relays:')
[print('-', relay) for relay in DATABASE['relay-list']]
app = None
CONFIG_IGNORE = {'blocked_software', 'blocked_instances', 'whitelist'}
def relay_follow():
if len(sys.argv) < 3:
print('usage: python3 -m relay.manage follow <target>')
exit()
@click.group('cli', context_settings={'show_default': True}, invoke_without_command=True)
@click.option('--config', '-c', default='relay.yaml', help='path to the relay\'s config')
@click.version_option(version=__version__, prog_name='ActivityRelay')
@click.pass_context
def cli(ctx, config):
global app
app = Application(config)
target = sys.argv[2]
if not ctx.invoked_subcommand:
if app.config.host.endswith('example.com'):
cli_setup.callback()
loop = asyncio.get_event_loop()
loop.run_until_complete(follow_remote_actor(target))
else:
cli_run.callback()
print('Sent follow message to:', target)
@cli.command('setup')
def cli_setup():
'Generate a new config'
def relay_unfollow():
if len(sys.argv) < 3:
print('usage: python3 -m relay.manage unfollow <target>')
exit()
while True:
app.config.host = click.prompt('What domain will the relay be hosted on?', default=app.config.host)
target = sys.argv[2]
if not app.config.host.endswith('example.com'):
break
loop = asyncio.get_event_loop()
loop.run_until_complete(unfollow_remote_actor(target))
click.echo('The domain must not be example.com')
print('Sent unfollow message to:', target)
if not app.config.is_docker:
app.config.listen = click.prompt('Which address should the relay listen on?', default=app.config.listen)
def relay_forceremove():
if len(sys.argv) < 3:
print('usage: python3 -m relay.manage force-remove <target>')
exit()
while True:
app.config.port = click.prompt('What TCP port should the relay listen on?', default=app.config.port, type=int)
break
target = sys.argv[2]
app.config.save()
following = DATABASE.get('relay-list', [])
if not app.config.is_docker and click.confirm('Relay all setup! Would you like to run it now?'):
cli_run.callback()
if target in following:
following.remove(target)
DATABASE['relay-list'] = following
with open('relay.jsonld', 'w') as f:
json.dump(DATABASE, f)
print('Removed target from DB:', target)
@cli.command('run')
def cli_run():
'Run the relay'
TASKS = {
'list': relay_list,
'follow': relay_follow,
'unfollow': relay_unfollow,
'force-remove': relay_forceremove
}
if app.config.host.endswith('example.com'):
return click.echo('Relay is not set up. Please edit your relay config or run "activityrelay setup".')
vers_split = platform.python_version().split('.')
pip_command = 'pip3 uninstall pycrypto && pip3 install pycryptodome'
if Crypto.__version__ == '2.6.1':
if int(vers_split[1]) > 7:
click.echo('Error: PyCrypto is broken on Python 3.8+. Please replace it with pycryptodome before running again. Exiting...')
return click.echo(pip_command)
else:
click.echo('Warning: PyCrypto is old and should be replaced with pycryptodome')
return click.echo(pip_command)
if not misc.check_open_port(app.config.listen, app.config.port):
return click.echo(f'Error: A server is already running on port {app.config.port}')
app.run()
# todo: add config default command for resetting config key
@cli.group('config')
def cli_config():
'Manage the relay config'
pass
@cli_config.command('list')
def cli_config_list():
'List the current relay config'
click.echo('Relay Config:')
for key, value in app.config.items():
if key not in CONFIG_IGNORE:
key = f'{key}:'.ljust(20)
click.echo(f'- {key} {value}')
@cli_config.command('set')
@click.argument('key')
@click.argument('value')
def cli_config_set(key, value):
'Set a config value'
app.config[key] = value
app.config.save()
print(f'{key}: {app.config[key]}')
@cli.group('inbox')
def cli_inbox():
'Manage the inboxes in the database'
pass
@cli_inbox.command('list')
def cli_inbox_list():
'List the connected instances or relays'
click.echo('Connected to the following instances or relays:')
for inbox in app.database.inboxes:
click.echo(f'- {inbox}')
@cli_inbox.command('follow')
@click.argument('actor')
def cli_inbox_follow(actor):
'Follow an actor (Relay must be running)'
if app.config.is_banned(actor):
return click.echo(f'Error: Refusing to follow banned actor: {actor}')
if not actor.startswith('http'):
domain = actor
actor = f'https://{actor}/actor'
else:
domain = urlparse(actor).hostname
try:
inbox_data = app.database['relay-list'][domain]
inbox = inbox_data['inbox']
except KeyError:
actor_data = asyncio.run(http.get(app.database, actor, sign_headers=True))
if not actor_data:
return click.echo(f'Failed to fetch actor: {actor}')
inbox = actor_data.shared_inbox
message = misc.Message.new_follow(
host = app.config.host,
actor = actor
)
asyncio.run(http.post(app.database, inbox, message))
click.echo(f'Sent follow message to actor: {actor}')
@cli_inbox.command('unfollow')
@click.argument('actor')
def cli_inbox_unfollow(actor):
'Unfollow an actor (Relay must be running)'
if not actor.startswith('http'):
domain = actor
actor = f'https://{actor}/actor'
else:
domain = urlparse(actor).hostname
try:
inbox_data = app.database['relay-list'][domain]
inbox = inbox_data['inbox']
message = misc.Message.new_unfollow(
host = app.config.host,
actor = actor,
follow = inbox_data['followid']
)
except KeyError:
actor_data = asyncio.run(http.get(app.database, actor, sign_headers=True))
inbox = actor_data.shared_inbox
message = misc.Message.new_unfollow(
host = app.config.host,
actor = actor,
follow = {
'type': 'Follow',
'object': actor,
'actor': f'https://{app.config.host}/actor'
}
)
asyncio.run(http.post(app.database, inbox, message))
click.echo(f'Sent unfollow message to: {actor}')
@cli_inbox.command('add')
@click.argument('inbox')
def cli_inbox_add(inbox):
'Add an inbox to the database'
if not inbox.startswith('http'):
inbox = f'https://{inbox}/inbox'
if app.config.is_banned(inbox):
return click.echo(f'Error: Refusing to add banned inbox: {inbox}')
if app.database.get_inbox(inbox):
return click.echo(f'Error: Inbox already in database: {inbox}')
app.database.add_inbox(inbox)
app.database.save()
click.echo(f'Added inbox to the database: {inbox}')
@cli_inbox.command('remove')
@click.argument('inbox')
def cli_inbox_remove(inbox):
'Remove an inbox from the database'
try:
dbinbox = app.database.get_inbox(inbox, fail=True)
except KeyError:
click.echo(f'Error: Inbox does not exist: {inbox}')
return
app.database.del_inbox(dbinbox['domain'])
app.database.save()
click.echo(f'Removed inbox from the database: {inbox}')
@cli.group('instance')
def cli_instance():
'Manage instance bans'
pass
@cli_instance.command('list')
def cli_instance_list():
'List all banned instances'
click.echo('Banned instances or relays:')
for domain in app.config.blocked_instances:
click.echo(f'- {domain}')
@cli_instance.command('ban')
@click.argument('target')
def cli_instance_ban(target):
'Ban an instance and remove the associated inbox if it exists'
if target.startswith('http'):
target = urlparse(target).hostname
if app.config.ban_instance(target):
app.config.save()
if app.database.del_inbox(target):
app.database.save()
click.echo(f'Banned instance: {target}')
return
click.echo(f'Instance already banned: {target}')
@cli_instance.command('unban')
@click.argument('target')
def cli_instance_unban(target):
'Unban an instance'
if app.config.unban_instance(target):
app.config.save()
click.echo(f'Unbanned instance: {target}')
return
click.echo(f'Instance wasn\'t banned: {target}')
@cli.group('software')
def cli_software():
'Manage banned software'
pass
@cli_software.command('list')
def cli_software_list():
'List all banned software'
click.echo('Banned software:')
for software in app.config.blocked_software:
click.echo(f'- {software}')
@cli_software.command('ban')
@click.option('--fetch-nodeinfo/--ignore-nodeinfo', '-f', 'fetch_nodeinfo', default=False,
help='Treat NAME like a domain and try to fet the software name from nodeinfo'
)
@click.argument('name')
def cli_software_ban(name, fetch_nodeinfo):
'Ban software. Use RELAYS for NAME to ban relays'
if name == 'RELAYS':
for name in RELAY_SOFTWARE:
app.config.ban_software(name)
app.config.save()
return click.echo('Banned all relay software')
if fetch_nodeinfo:
nodeinfo = asyncio.run(http.fetch_nodeinfo(app.database, name))
if not nodeinfo:
click.echo(f'Failed to fetch software name from domain: {name}')
name = nodeinfo.sw_name
if app.config.ban_software(name):
app.config.save()
return click.echo(f'Banned software: {name}')
click.echo(f'Software already banned: {name}')
@cli_software.command('unban')
@click.option('--fetch-nodeinfo/--ignore-nodeinfo', '-f', 'fetch_nodeinfo', default=False,
help='Treat NAME like a domain and try to fet the software name from nodeinfo'
)
@click.argument('name')
def cli_software_unban(name, fetch_nodeinfo):
'Ban software. Use RELAYS for NAME to unban relays'
if name == 'RELAYS':
for name in RELAY_SOFTWARE:
app.config.unban_software(name)
app.config.save()
return click.echo('Unbanned all relay software')
if fetch_nodeinfo:
nodeinfo = asyncio.run(http.fetch_nodeinfo(app.database, name))
if not nodeinfo:
click.echo(f'Failed to fetch software name from domain: {name}')
name = nodeinfo.sw_name
if app.config.unban_software(name):
app.config.save()
return click.echo(f'Unbanned software: {name}')
click.echo(f'Software wasn\'t banned: {name}')
@cli.group('whitelist')
def cli_whitelist():
'Manage the instance whitelist'
pass
@cli_whitelist.command('list')
def cli_whitelist_list():
'List all the instances in the whitelist'
click.echo('Current whitelisted domains')
for domain in app.config.whitelist:
click.echo(f'- {domain}')
@cli_whitelist.command('add')
@click.argument('instance')
def cli_whitelist_add(instance):
'Add an instance to the whitelist'
if not app.config.add_whitelist(instance):
return click.echo(f'Instance already in the whitelist: {instance}')
app.config.save()
click.echo(f'Instance added to the whitelist: {instance}')
@cli_whitelist.command('remove')
@click.argument('instance')
def cli_whitelist_remove(instance):
'Remove an instance from the whitelist'
if not app.config.del_whitelist(instance):
return click.echo(f'Instance not in the whitelist: {instance}')
app.config.save()
if app.config.whitelist_enabled:
if app.database.del_inbox(instance):
app.database.save()
click.echo(f'Removed instance from the whitelist: {instance}')
@cli_whitelist.command('import')
def cli_whitelist_import():
'Add all current inboxes to the whitelist'
for domain in app.database.hostnames:
cli_whitelist_add.callback(domain)
def usage():
print('usage: python3 -m relay.manage <task> [...]')
print('tasks:')
[print('-', task) for task in TASKS.keys()]
exit()
def main():
cli(prog_name='relay')
if len(sys.argv) < 2:
usage()
if sys.argv[1] in TASKS:
TASKS[sys.argv[1]]()
else:
usage()
if __name__ == '__main__':
click.echo('Running relay.manage is depreciated. Run `activityrelay [command]` instead.')
main()

View file

@ -1,340 +0,0 @@
import aputils
import asyncio
import base64
import json
import logging
import socket
import traceback
import uuid
from aiohttp.hdrs import METH_ALL as METHODS
from aiohttp.web import Response as AiohttpResponse, View as AiohttpView
from datetime import datetime
from json.decoder import JSONDecodeError
from urllib.parse import urlparse
from uuid import uuid4
app = None
MIMETYPES = {
'activity': 'application/activity+json',
'html': 'text/html',
'json': 'application/json',
'text': 'text/plain'
}
NODEINFO_NS = {
'20': 'http://nodeinfo.diaspora.software/ns/schema/2.0',
'21': 'http://nodeinfo.diaspora.software/ns/schema/2.1'
}
def set_app(new_app):
global app
app = new_app
def boolean(value):
if isinstance(value, str):
if value.lower() in ['on', 'y', 'yes', 'true', 'enable', 'enabled', '1']:
return True
elif value.lower() in ['off', 'n', 'no', 'false', 'disable', 'disable', '0']:
return False
else:
raise TypeError(f'Cannot parse string "{value}" as a boolean')
elif isinstance(value, int):
if value == 1:
return True
elif value == 0:
return False
else:
raise ValueError('Integer value must be 1 or 0')
elif value == None:
return False
try:
return value.__bool__()
except AttributeError:
raise TypeError(f'Cannot convert object of type "{clsname(value)}"')
def check_open_port(host, port):
if host == '0.0.0.0':
host = '127.0.0.1'
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
return s.connect_ex((host , port)) != 0
except socket.error as e:
return False
class DotDict(dict):
def __init__(self, _data, **kwargs):
dict.__init__(self)
self.update(_data, **kwargs)
def __getattr__(self, k):
try:
return self[k]
except KeyError:
raise AttributeError(f'{self.__class__.__name__} object has no attribute {k}') from None
def __setattr__(self, k, v):
if k.startswith('_'):
super().__setattr__(k, v)
else:
self[k] = v
def __setitem__(self, k, v):
if type(v) == dict:
v = DotDict(v)
super().__setitem__(k, v)
def __delattr__(self, k):
try:
dict.__delitem__(self, k)
except KeyError:
raise AttributeError(f'{self.__class__.__name__} object has no attribute {k}') from None
@classmethod
def new_from_json(cls, data):
if not data:
raise JSONDecodeError('Empty body', data, 1)
try:
return cls(json.loads(data))
except ValueError:
raise JSONDecodeError('Invalid body', data, 1)
@classmethod
def new_from_signature(cls, sig):
data = cls({})
for chunk in sig.strip().split(','):
key, value = chunk.split('=', 1)
value = value.strip('\"')
if key == 'headers':
value = value.split()
data[key.lower()] = value
return data
def to_json(self, indent=None):
return json.dumps(self, indent=indent)
def update(self, _data, **kwargs):
if isinstance(_data, dict):
for key, value in _data.items():
self[key] = value
elif isinstance(_data, (list, tuple, set)):
for key, value in _data:
self[key] = value
for key, value in kwargs.items():
self[key] = value
class Message(DotDict):
@classmethod
def new_actor(cls, host, pubkey, description=None):
return cls({
'@context': 'https://www.w3.org/ns/activitystreams',
'id': f'https://{host}/actor',
'type': 'Application',
'preferredUsername': 'relay',
'name': 'ActivityRelay',
'summary': description or 'ActivityRelay bot',
'followers': f'https://{host}/followers',
'following': f'https://{host}/following',
'inbox': f'https://{host}/inbox',
'url': f'https://{host}/inbox',
'endpoints': {
'sharedInbox': f'https://{host}/inbox'
},
'publicKey': {
'id': f'https://{host}/actor#main-key',
'owner': f'https://{host}/actor',
'publicKeyPem': pubkey
}
})
@classmethod
def new_announce(cls, host, object):
return cls({
'@context': 'https://www.w3.org/ns/activitystreams',
'id': f'https://{host}/activities/{uuid.uuid4()}',
'type': 'Announce',
'to': [f'https://{host}/followers'],
'actor': f'https://{host}/actor',
'object': object
})
@classmethod
def new_follow(cls, host, actor):
return cls({
'@context': 'https://www.w3.org/ns/activitystreams',
'type': 'Follow',
'to': [actor],
'object': actor,
'id': f'https://{host}/activities/{uuid.uuid4()}',
'actor': f'https://{host}/actor'
})
@classmethod
def new_unfollow(cls, host, actor, follow):
return cls({
'@context': 'https://www.w3.org/ns/activitystreams',
'id': f'https://{host}/activities/{uuid.uuid4()}',
'type': 'Undo',
'to': [actor],
'actor': f'https://{host}/actor',
'object': follow
})
@classmethod
def new_response(cls, host, actor, followid, accept):
return cls({
'@context': 'https://www.w3.org/ns/activitystreams',
'id': f'https://{host}/activities/{uuid.uuid4()}',
'type': 'Accept' if accept else 'Reject',
'to': [actor],
'actor': f'https://{host}/actor',
'object': {
'id': followid,
'type': 'Follow',
'object': f'https://{host}/actor',
'actor': actor
}
})
# misc properties
@property
def domain(self):
return urlparse(self.id).hostname
# actor properties
@property
def shared_inbox(self):
return self.get('endpoints', {}).get('sharedInbox', self.inbox)
# activity properties
@property
def actorid(self):
if isinstance(self.actor, dict):
return self.actor.id
return self.actor
@property
def objectid(self):
if isinstance(self.object, dict):
return self.object.id
return self.object
@property
def signer(self):
return aputils.Signer.new_from_actor(self)
class Response(AiohttpResponse):
@classmethod
def new(cls, body='', status=200, headers=None, ctype='text'):
kwargs = {
'status': status,
'headers': headers,
'content_type': MIMETYPES[ctype]
}
if isinstance(body, bytes):
kwargs['body'] = body
elif isinstance(body, dict) and ctype in {'json', 'activity'}:
kwargs['text'] = json.dumps(body)
else:
kwargs['text'] = body
return cls(**kwargs)
@classmethod
def new_error(cls, status, body, ctype='text'):
if ctype == 'json':
body = json.dumps({'status': status, 'error': body})
return cls.new(body=body, status=status, ctype=ctype)
@property
def location(self):
return self.headers.get('Location')
@location.setter
def location(self, value):
self.headers['Location'] = value
class View(AiohttpView):
async def _iter(self):
if self.request.method not in METHODS:
self._raise_allowed_methods()
method = getattr(self, self.request.method.lower(), None)
if method is None:
self._raise_allowed_methods()
return await method(**self.request.match_info)
@property
def app(self):
return self._request.app
@property
def config(self):
return self.app.config
@property
def database(self):
return self.app.database

67
relay/nodeinfo.py Normal file
View file

@ -0,0 +1,67 @@
import subprocess
import urllib.parse
import aiohttp.web
from . import app
from .database import DATABASE
try:
commit_label = subprocess.check_output(["git", "rev-parse", "HEAD"]).strip().decode('ascii')
except:
commit_label = '???'
nodeinfo_template = {
# XXX - is this valid for a relay?
'openRegistrations': True,
'protocols': ['activitypub'],
'services': {
'inbound': [],
'outbound': []
},
'software': {
'name': 'activityrelay',
'version': '0.1 {}'.format(commit_label)
},
'usage': {
'localPosts': 0,
'users': {
'total': 1
}
},
'version': '2.0'
}
def get_peers():
global DATABASE
return [urllib.parse.urlsplit(inbox).hostname for inbox in DATABASE.get('relay-list', [])]
async def nodeinfo_2_0(request):
data = nodeinfo_template.copy()
data['metadata'] = {
'peers': get_peers()
}
return aiohttp.web.json_response(data)
app.router.add_get('/nodeinfo/2.0.json', nodeinfo_2_0)
async def nodeinfo_wellknown(request):
data = {
'links': [
{
'rel': 'http://nodeinfo.diaspora.software/ns/schema/2.0',
'href': 'https://{}/nodeinfo/2.0.json'.format(request.host)
}
]
}
return aiohttp.web.json_response(data)
app.router.add_get('/.well-known/nodeinfo', nodeinfo_wellknown)

View file

@ -1,160 +0,0 @@
import asyncio
import logging
from cachetools import LRUCache
from uuid import uuid4
from .misc import Message
cache = LRUCache(1024)
def person_check(actor, software):
## pleroma and akkoma may use Person for the actor type for some reason
if software in {'akkoma', 'pleroma'} and actor.id == f'https://{actor.domain}/relay':
return False
## make sure the actor is an application
if actor.type != 'Application':
return True
async def handle_relay(request):
if request.message.objectid in cache:
logging.verbose(f'already relayed {request.message.objectid}')
return
message = Message.new_announce(
host = request.config.host,
object = request.message.objectid
)
cache[request.message.objectid] = message.id
logging.debug(f'>> relay: {message}')
inboxes = request.database.distill_inboxes(request.message)
for inbox in inboxes:
request.app.push_message(inbox, message)
async def handle_forward(request):
if request.message.id in cache:
logging.verbose(f'already forwarded {request.message.id}')
return
message = Message.new_announce(
host = request.config.host,
object = request.message
)
cache[request.message.id] = message.id
logging.debug(f'>> forward: {message}')
inboxes = request.database.distill_inboxes(request.message)
for inbox in inboxes:
request.app.push_message(inbox, message)
async def handle_follow(request):
nodeinfo = await request.app.client.fetch_nodeinfo(request.actor.domain)
software = nodeinfo.sw_name if nodeinfo else None
## reject if software used by actor is banned
if request.config.is_banned_software(software):
request.app.push_message(
request.actor.shared_inbox,
Message.new_response(
host = request.config.host,
actor = request.actor.id,
followid = request.message.id,
accept = False
)
)
return logging.verbose(f'Rejected follow from actor for using specific software: actor={request.actor.id}, software={software}')
## reject if the actor is not an instance actor
if person_check(request.actor, software):
request.app.push_message(
request.actor.shared_inbox,
Message.new_response(
host = request.config.host,
actor = request.actor.id,
followid = request.message.id,
accept = False
)
)
return logging.verbose(f'Non-application actor tried to follow: {request.actor.id}')
request.database.add_inbox(request.actor.shared_inbox, request.message.id, software)
request.database.save()
request.app.push_message(
request.actor.shared_inbox,
Message.new_response(
host = request.config.host,
actor = request.actor.id,
followid = request.message.id,
accept = True
)
)
# Are Akkoma and Pleroma the only two that expect a follow back?
# Ignoring only Mastodon for now
if software != 'mastodon':
request.app.push_message(
request.actor.shared_inbox,
Message.new_follow(
host = request.config.host,
actor = request.actor.id
)
)
async def handle_undo(request):
## If the object is not a Follow, forward it
if request.message.object.type != 'Follow':
return await handle_forward(request)
if not request.database.del_inbox(request.actor.domain, request.message.id):
return
request.database.save()
request.app.push_message(
request.actor.shared_inbox,
Message.new_unfollow(
host = request.config.host,
actor = request.actor.id,
follow = request.message
)
)
processors = {
'Announce': handle_relay,
'Create': handle_relay,
'Delete': handle_forward,
'Follow': handle_follow,
'Undo': handle_undo,
'Update': handle_forward,
}
async def run_processor(request):
if request.message.type not in processors:
return
if request.instance and not request.instance.get('software'):
nodeinfo = await request.app.client.fetch_nodeinfo(request.instance['domain'])
if nodeinfo:
request.instance['software'] = nodeinfo.sw_name
request.database.save()
logging.verbose(f'New "{request.message.type}" from actor: {request.actor.id}')
return await processors[request.message.type](request)

56
relay/remote_actor.py Normal file
View file

@ -0,0 +1,56 @@
import logging
import aiohttp
from cachetools import TTLCache
from datetime import datetime
from urllib.parse import urlsplit
from . import CONFIG
from .http_debug import http_debug
CACHE_SIZE = CONFIG.get('cache-size', 16384)
CACHE_TTL = CONFIG.get('cache-ttl', 3600)
ACTORS = TTLCache(CACHE_SIZE, CACHE_TTL)
async def fetch_actor(uri, headers={}, force=False, sign_headers=True):
if uri in ACTORS and not force:
return ACTORS[uri]
from .actor import PRIVKEY
from .http_signatures import sign_headers
url = urlsplit(uri)
key_id = 'https://{}/actor#main-key'.format(CONFIG['ap']['host'])
headers.update({
'Accept': 'application/activity+json',
'User-Agent': 'ActivityRelay'
})
if sign_headers:
headers.update({
'(request-target)': 'get {}'.format(url.path),
'Date': datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT'),
'Host': url.netloc
})
headers['signature'] = sign_headers(headers, PRIVKEY, key_id)
headers.pop('(request-target)')
headers.pop('Host')
try:
async with aiohttp.ClientSession(trace_configs=[http_debug()]) as session:
async with session.get(uri, headers=headers) as resp:
if resp.status != 200:
return None
ACTORS[uri] = (await resp.json(encoding='utf-8', content_type=None))
return ACTORS[uri]
except Exception as e:
logging.info('Caught %r while fetching actor %r.', e, uri)
return None

View file

@ -1,194 +0,0 @@
import aputils
import asyncio
import logging
import subprocess
import traceback
from pathlib import Path
from . import __version__, misc
from .misc import DotDict, Message, Response
from .processors import run_processor
routes = []
version = __version__
if Path(__file__).parent.parent.joinpath('.git').exists():
try:
commit_label = subprocess.check_output(["git", "rev-parse", "HEAD"]).strip().decode('ascii')
version = f'{__version__} {commit_label}'
except:
pass
def register_route(method, path):
def wrapper(func):
routes.append([method, path, func])
return func
return wrapper
@register_route('GET', '/')
async def home(request):
targets = '<br>'.join(request.database.hostnames)
note = request.config.note
count = len(request.database.hostnames)
host = request.config.host
text = f"""
<html><head>
<title>SEDI中繼器</title>
<style>
p {{ color: #FFFFFF; font-family: monospace, arial; font-size: 100%; }}
body {{ background-color: #000000; }}
a {{ color: #26F; }}
a:visited {{ color: #46C; }}
a:hover {{ color: #8AF; }}
</style>
</head>
<body>
<p>This is an Activity Relay for fediverse instances.</p>
<p>{note}</p>
<p>Misskey及Mastodon站長請訂閱這個地址<a href="https://{host}/inbox">https://{host}/inbox</a></p>
<p>Pleroma及Friendica站長請訂閱這個地址<a href="https://{host}/actor">https://{host}/actor</a></p>
<p>原始碼<a href="https://git.seediqbale.xyz/pch_xyz/sedi-relay">https://git.seediqbale.xyz/pch_xyz/sedi-relay</a></p>
<p>請我喝杯咖啡<a href="https://buymeacoffee.com/SEDI">https://buymeacoffee.com/SEDI</a></p>
<p>activityrelay v0.2.4</p>
<br><p> {count} 個實例訂閱中<br>{targets}</p>
</body></html>"""
return Response.new(text, ctype='html')
@register_route('GET', '/inbox')
@register_route('GET', '/actor')
async def actor(request):
data = Message.new_actor(
host = request.config.host,
pubkey = request.database.signer.pubkey
)
return Response.new(data, ctype='activity')
@register_route('POST', '/inbox')
@register_route('POST', '/actor')
async def inbox(request):
config = request.config
database = request.database
## reject if missing signature header
if not request.signature:
logging.verbose('Actor missing signature header')
raise HTTPUnauthorized(body='missing signature')
try:
request['message'] = await request.json(loads=Message.new_from_json)
## reject if there is no message
if not request.message:
logging.verbose('empty message')
return Response.new_error(400, 'missing message', 'json')
## reject if there is no actor in the message
if 'actor' not in request.message:
logging.verbose('actor not in message')
return Response.new_error(400, 'no actor in message', 'json')
except:
## this code should hopefully never get called
traceback.print_exc()
logging.verbose('Failed to parse inbox message')
return Response.new_error(400, 'failed to parse message', 'json')
request['actor'] = await request.app.client.get(request.signature.keyid, sign_headers=True)
## reject if actor is empty
if not request.actor:
## ld signatures aren't handled atm, so just ignore it
if request['message'].type == 'Delete':
logging.verbose(f'Instance sent a delete which cannot be handled')
return Response.new(status=202)
logging.verbose(f'Failed to fetch actor: {request.signature.keyid}')
return Response.new_error(400, 'failed to fetch actor', 'json')
request['instance'] = request.database.get_inbox(request['actor'].inbox)
## reject if the actor isn't whitelisted while the whiltelist is enabled
if config.whitelist_enabled and not config.is_whitelisted(request.actor.domain):
logging.verbose(f'Rejected actor for not being in the whitelist: {request.actor.id}')
return Response.new_error(403, 'access denied', 'json')
## reject if actor is banned
if request.config.is_banned(request.actor.domain):
logging.verbose(f'Ignored request from banned actor: {actor.id}')
return Response.new_error(403, 'access denied', 'json')
## reject if the signature is invalid
try:
await request.actor.signer.validate_aiohttp_request(request)
except aputils.SignatureValidationError as e:
logging.verbose(f'signature validation failed for: {actor.id}')
logging.debug(str(e))
return Response.new_error(401, str(e), 'json')
## reject if activity type isn't 'Follow' and the actor isn't following
if request.message.type != 'Follow' and not database.get_inbox(request.actor.domain):
logging.verbose(f'Rejected actor for trying to post while not following: {request.actor.id}')
return Response.new_error(401, 'access denied', 'json')
logging.debug(f">> payload {request.message.to_json(4)}")
asyncio.ensure_future(run_processor(request))
return Response.new(status=202)
@register_route('GET', '/.well-known/webfinger')
async def webfinger(request):
try:
subject = request.query['resource']
except KeyError:
return Response.new_error(400, 'missing \'resource\' query key', 'json')
if subject != f'acct:relay@{request.config.host}':
return Response.new_error(404, 'user not found', 'json')
data = aputils.Webfinger.new(
handle = 'relay',
domain = request.config.host,
actor = request.config.actor
)
return Response.new(data, ctype='json')
@register_route('GET', '/nodeinfo/{version:\d.\d\.json}')
async def nodeinfo(request):
niversion = request.match_info['version'][:3]
data = dict(
name = 'activityrelay',
version = version,
protocols = ['activitypub'],
open_regs = not request.config.whitelist_enabled,
users = 1,
metadata = {'peers': request.database.hostnames}
)
if niversion == '2.1':
data['repo'] = 'https://git.pleroma.social/pleroma/relay'
return Response.new(aputils.Nodeinfo.new(**data), ctype='json')
@register_route('GET', '/.well-known/nodeinfo')
async def nodeinfo_wellknown(request):
data = aputils.WellKnownNodeinfo.new_template(request.config.host)
return Response.new(data, ctype='json')

24
relay/webfinger.py Normal file
View file

@ -0,0 +1,24 @@
import aiohttp.web
from . import app
async def webfinger(request):
subject = request.query['resource']
if subject != 'acct:relay@{}'.format(request.host):
return aiohttp.web.json_response({'error': 'user not found'}, status=404)
actor_uri = "https://{}/actor".format(request.host)
data = {
"aliases": [actor_uri],
"links": [
{"href": actor_uri, "rel": "self", "type": "application/activity+json"},
{"href": actor_uri, "rel": "self", "type": "application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\""}
],
"subject": subject
}
return aiohttp.web.json_response(data)
app.router.add_get('/.well-known/webfinger', webfinger)

View file

@ -1,5 +0,0 @@
aiohttp>=3.8.0
aputils@https://git.barkshark.xyz/barkshark/aputils/archive/0.1.3.tar.gz
cachetools>=5.2.0
click>=8.1.2
pyyaml>=6.0

View file

@ -1,6 +1,5 @@
[metadata]
name = relay
version = attr: relay.__version__
description = Generic LitePub relay (works with all LitePub consumers and Mastodon)
long_description = file: README.md
long_description_content_type = text/markdown; charset=UTF-8
@ -22,14 +21,18 @@ project_urls =
[options]
zip_safe = False
packages = find:
install_requires = file: requirements.txt
python_requires = >=3.7
[options.extras_require]
dev =
pyinstaller >= 5.6.0
[options.entry_points]
console_scripts =
activityrelay = relay.manage:main
install_requires =
aiohttp>=3.5.4
async-timeout>=3.0.0
attrs>=18.1.0
chardet>=3.0.4
idna>=2.7
idna-ssl>=1.1.0; python_version < "3.7"
multidict>=4.3.1
pycryptodome>=3.9.4
PyYAML>=5.1
simplejson>=3.16.0
yarl>=1.2.6
cachetools
async_lru
python_requires = >=3.6