Compare commits

..

101 commits

Author SHA1 Message Date
8d5b097ac4 Add donation link 2023-02-26 05:27:32 +00:00
8c49d92aea added version info 2023-02-10 01:15:20 +00:00
d06f51fca6 上傳檔案到「relay」
added version info
2023-02-10 01:14:32 +00:00
818a3573ae Merge pull request 'localization' (#1) from pch_xyz-patch-1 into master
Reviewed-on: #1
2023-02-10 00:54:07 +00:00
3dea5c030b localization 2023-02-10 00:51:40 +00:00
Izalia Mae
15b1324df2 Merge branch 'zen-master-patch-50595' into 'master'
Do not check instance's actor.type in case of Pleroma/Akkoma

See merge request pleroma/relay!50
2023-01-11 03:43:59 +00:00
Dmytro Poltavchenko
006efc1ba4 Do not check instance's actor.type in case of Pleroma/Akkoma 2023-01-08 00:23:36 +00:00
Izalia Mae
f4698aa4dc fix RuntimeError when running commands involving http client 2022-12-29 07:27:35 -05:00
Izalia Mae
0940921383 handle more client connection errors 2022-12-26 02:02:57 -05:00
Izalia Mae
af7fcc66fd fix missing modules when building via pyinstaller 2022-12-11 09:15:03 -05:00
Izalia Mae
bbdc151ed3 Merge branch 'dev' into 'master'
Version 0.2.4

See merge request pleroma/relay!46
2022-12-11 00:01:17 +00:00
Izalia Mae
04368c782d replace aputils git url with tar.gz 2022-12-10 02:44:07 -05:00
Izalia Mae
a742e7fb30 update setup.cfg and requirements.txt
* move deps to requirements.txt
* reference deps from requirements.txt in setup.cfg
* bump minimum python version to 3.7
* set version in setup.cfg from attribute
2022-12-10 02:10:56 -05:00
Izalia Mae
17f3e6be55 version 0.2.4 2022-12-08 04:17:17 -05:00
Izalia Mae
0e45763eff remove unnecessary config update 2022-12-08 03:53:13 -05:00
Izalia Mae
3968799d6f make sure exceptions don't bring down workers 2022-12-08 03:51:10 -05:00
Izalia Mae
aa8090eebb don't prompt for ignored settings in docker instances 2022-12-08 03:31:47 -05:00
Izalia Mae
f287b84ea3 update aputils 2022-12-07 23:23:13 -05:00
Izalia Mae
dc74bfb588 force certain config values in docker installs 2022-12-07 23:16:48 -05:00
Izalia Mae
e281a06e7f correctly call aputils.Signer.new 2022-12-07 23:15:54 -05:00
Izalia Mae
8f16cab048 prevent errors in post and fetch_nodeinfo 2022-12-07 23:15:31 -05:00
Izalia Mae
7d37ec8145 remove await from push_message calls and reject non-system actors 2022-12-04 04:40:40 -05:00
Izalia Mae
9f58c88e9f Fix NameError when getting nodeinfo software name in processors 2022-12-04 04:16:50 -05:00
Izalia Mae
6b86bb7d98 remove leftover semaphore property 2022-12-04 02:13:13 -05:00
Izalia Mae
90234a9724 move apkeys out of RelayConfig and rename relay_software_names 2022-12-04 01:20:17 -05:00
Izalia Mae
b0851c0652 remove http_debug 2022-12-04 01:15:28 -05:00
Izalia Mae
eab8a31001 document new commands 2022-12-04 01:12:58 -05:00
Izalia Mae
3b89aa5e84 sort out cli
added `whitelist import` command which adds all current inboxes to the whitelist
added `config list`
fixed a few errors
2022-12-04 01:09:45 -05:00
Izalia Mae
f7e1c6b0b8 make sure db config is a string when saving 2022-12-02 11:43:39 -05:00
Izalia Mae
dcca1eb0dc fix HttpClient fetch_nodeinfo and get 2022-12-02 00:52:15 -05:00
Izalia Mae
d5b9053f71 replace various classes with aputils classes 2022-12-02 00:50:57 -05:00
Izalia Mae
d172439fac update aputils 2022-12-02 00:11:22 -05:00
Izalia Mae
1a7abb4ecb fix distill_inboxes 2022-11-29 17:41:04 -05:00
Izalia Mae
5397bb4653 only use hs2019 for mastodon 2022-11-27 17:25:54 -05:00
Izalia Mae
a640db8f06 update list of active relay software 2022-11-26 23:41:57 -05:00
Izalia Mae
ce9e0c4d00 remove unnecessary print 2022-11-26 23:11:51 -05:00
Izalia Mae
335146a970 fix NameError in cli_setup 2022-11-26 23:01:18 -05:00
Izalia Mae
27914a7d27 Merge branch 'master' into dev 2022-11-26 22:53:43 -05:00
Izalia Mae
5d01211a34 add aputils module for hs2019 support 2022-11-26 22:16:14 -05:00
Izalia Mae
130111c847 update documentation 2022-11-26 20:53:06 -05:00
Izalia Mae
10301ecbde update example config file 2022-11-26 20:25:20 -05:00
Izalia Mae
15b314922c fix running via docker 2022-11-26 19:59:20 -05:00
Izalia Mae
b85b4ab80b create HttpClient class to avoid creating a new session every request 2022-11-26 18:56:34 -05:00
Izalia Mae
32764a1f93 make sure domain key exists for inboxes 2022-11-25 13:39:52 -05:00
Izalia Mae
fbe5746a18 fix NameError in cli_whitelist_remove 2022-11-25 13:29:45 -05:00
Izalia Mae
017363ecd5 fix nodeinfo fetching in run_processor 2022-11-25 13:19:29 -05:00
Izalia Mae
8541f63762 add timeout option to misc.request 2022-11-24 16:01:23 -05:00
Izalia Mae
ca36a765ea Merge branch 'fish-master-patch-76139' into 'master'
fix host check in setup

See merge request pleroma/relay!43
2022-11-24 06:24:01 +00:00
GQ Qin
6a3a35182e fix host check in setup 2022-11-23 03:46:34 +00:00
Izalia Mae
da56d4bb61 add extra logging in misc.request 2022-11-22 18:11:41 -05:00
Izalia Mae
a838e4324b fix NameError in inbox 2022-11-22 18:09:25 -05:00
Izalia Mae
242052386e use correct actor variable for cli_inbox_follow 2022-11-20 22:24:36 -05:00
Izalia Mae
395971914b organize manage.py 2022-11-20 06:24:33 -05:00
Izalia Mae
c96640bfd7 add config cli commands 2022-11-20 06:14:37 -05:00
Izalia Mae
9839da906c add optional push worker threads 2022-11-20 05:50:14 -05:00
Izalia Mae
c049657765 fetch nodeinfo software name on inbox request instead of startup 2022-11-20 05:22:57 -05:00
Izalia Mae
ffe14bead3 ignore account Deletes 2022-11-20 05:12:11 -05:00
Izalia Mae
85c4df7d8c remove unecessary method 2022-11-18 16:57:34 -05:00
Izalia Mae
ba9f2718aa use new request properties and only fetch nodeinfo on follow 2022-11-18 16:41:14 -05:00
Izalia Mae
4a8a8da740 add software kwarg to RelayDatabase.add_inbox 2022-11-18 16:39:53 -05:00
Izalia Mae
306b526808 add properties to aiohttp.web.Request 2022-11-18 16:38:39 -05:00
Izalia Mae
4ea6a040fb optimize RelayDatabase.get_inbox 2022-11-18 14:36:30 -05:00
Izalia Mae
9369b598fa add software name for inboxes 2022-11-18 14:10:39 -05:00
Izalia Mae
d4955828d4 return Nodeinfo object from fetch_nodeinfo 2022-11-18 13:45:26 -05:00
Izalia Mae
6e494ee671 Merge branch 'dev' into 'master'
v0.2.3

See merge request pleroma/relay!42
2022-11-18 17:39:31 +00:00
Izalia Mae
22b6e6b406 cleanup 2022-11-18 11:58:27 -05:00
Izalia Mae
6960c8d6c0 views.webfinger: return 400 error on missing resource 2022-11-18 11:50:12 -05:00
Izalia Mae
2b2e311be4 update example config 2022-11-18 06:26:58 -05:00
Izalia Mae
d08bd6625a use signature keyid instead of object actor to fetch actor 2022-11-17 16:30:56 -05:00
Izalia Mae
d2b243d88a await misc.request in handle_follow 2022-11-16 14:22:50 -05:00
Izalia Mae
e3b06d29ab ignore signals that don't exist 2022-11-16 13:26:47 -05:00
Izalia Mae
b87e52347b add spec file for building with pyinstaller 2022-11-16 11:18:31 -05:00
Izalia Mae
ef5d4bc579 only fetch commit hash if in running from git repo 2022-11-16 10:41:00 -05:00
Izalia Mae
8fd712c849 always fetch nodeinfo software name 2022-11-16 10:31:21 -05:00
Izalia Mae
c88e4e748a version bump 2022-11-16 10:26:08 -05:00
Izalia Mae
d615380610 Merge branch 'dev' of ssh://pleroma/pleroma/relay into dev 2022-11-16 10:24:35 -05:00
Izalia Mae
689fa1f8b4 Merge branch 'fix-newerror' into 'dev'
Fix Response.new_error

See merge request pleroma/relay!40
2022-11-16 15:23:58 +00:00
Izalia Mae
ec325f9f08 skip raising a KeyError on missing actor 2022-11-16 09:12:23 -05:00
Izalia Mae
4bdd2b031b prevent error in inbox 2022-11-16 09:10:52 -05:00
Jeong Arm
e6d7c60a5a Fix Response.new_error 2022-11-13 15:00:53 +09:00
Izalia Mae
7732a860e9 use right variable for inbox 2022-11-10 13:08:25 -05:00
Izalia Mae
3305a25da4 create View class and fix Response.new_error 2022-11-10 12:40:48 -05:00
Izalia Mae
c1c4b24b0a add ability to change cache size 2022-11-10 12:39:37 -05:00
Izalia Mae
f397e10b04 reset config on load 2022-11-10 12:38:08 -05:00
Izalia Mae
78ce1763e0 fix a couple nodeinfo values 2022-11-09 06:11:16 -05:00
Izalia Mae
ff95a3033d create Response class 2022-11-09 05:58:35 -05:00
Izalia Mae
6af9c8e6fe add follow request management methods to database 2022-11-09 04:54:46 -05:00
Izalia Mae
0b9281bec1 make sure sub-dicts in DotDict are DotDict objects 2022-11-09 04:35:57 -05:00
Izalia Mae
76476d1d03 add missing import 2022-11-07 16:14:00 -05:00
Izalia Mae
b275b7cd0b remove (un)follow_remote_actor 2022-11-07 09:53:04 -05:00
Izalia Mae
58ebefa3bd fix WKNodeinfo.get_url 2022-11-07 08:24:03 -05:00
Izalia Mae
e3bf4258aa create WKNodeinfo class and add nodeinfo 2.1 path 2022-11-07 08:18:25 -05:00
Izalia Mae
8d17749a50 create Application class 2022-11-07 07:54:32 -05:00
Izalia Mae
70e4870ba9 remove run_in_loop function 2022-11-07 05:40:08 -05:00
Izalia Mae
c66f9d34b3 create Message class 2022-11-07 05:30:13 -05:00
Izalia Mae
3b85e2c2f2 move DotDict to misc 2022-11-06 01:11:54 -05:00
Izalia Mae
f713f54306 announce forwarded messages 2022-11-06 01:11:36 -05:00
Izalia Mae
dcb7980c50 prevent old unfollows from booting instances 2022-11-05 22:15:37 -04:00
Izalia Mae
4d121adaa2 forward all non-Follow undos 2022-11-05 20:15:40 -04:00
Izalia Mae
c0d55cebb0 cache activity id for forwards 2022-11-05 20:10:01 -04:00
Izalia Mae
8ca198b611 simplify misc.request 2022-11-05 20:07:44 -04:00
18 changed files with 1485 additions and 951 deletions

View file

@ -24,6 +24,27 @@ Run the setup wizard to configure your relay.
activityrelay setup activityrelay setup
## Config
Manage the relay config
activityrelay config
### List
List the current config key/value pairs
activityrelay config list
### Set
Set a value for a config option
activityrelay config set <key> <value>
## Inbox ## Inbox
Manage the list of subscribed instances. Manage the list of subscribed instances.
@ -97,6 +118,13 @@ Remove a domain from the whitelist.
activityrelay whitelist remove <domain> activityrelay whitelist remove <domain>
### Import
Add all current inboxes to the whitelist
activityrelay whitelist import
## Instance ## Instance
Manage the instance ban list. Manage the instance ban list.

View file

@ -1,6 +1,8 @@
# Configuration # Configuration
## DB ## General
### DB
The path to the database. It contains the relay actor private key and all subscribed The path to the database. It contains the relay actor private key and all subscribed
instances. If the path is not absolute, it is relative to the working directory. instances. If the path is not absolute, it is relative to the working directory.
@ -8,7 +10,7 @@ instances. If the path is not absolute, it is relative to the working directory.
db: relay.jsonld db: relay.jsonld
## Listener ### Listener
The address and port the relay will listen on. If the reverse proxy (nginx, apache, caddy, etc) The address and port the relay will listen on. If the reverse proxy (nginx, apache, caddy, etc)
is running on the same host, it is recommended to change `listen` to `localhost` is running on the same host, it is recommended to change `listen` to `localhost`
@ -17,22 +19,41 @@ is running on the same host, it is recommended to change `listen` to `localhost`
port: 8080 port: 8080
## Note ### Note
A small blurb to describe your relay instance. This will show up on the relay's home page. A small blurb to describe your relay instance. This will show up on the relay's home page.
note: "Make a note about your instance here." note: "Make a note about your instance here."
## Post Limit ### Post Limit
The maximum number of messages to send out at once. For each incoming message, a message will be The maximum number of messages to send out at once. For each incoming message, a message will be
sent out to every subscribed instance minus the instance which sent the message. This limit sent out to every subscribed instance minus the instance which sent the message. This limit
is to prevent too many outgoing connections from being made, so adjust if necessary. is to prevent too many outgoing connections from being made, so adjust if necessary.
Note: If the `workers` option is set to anything above 0, this limit will be per worker.
push_limit: 512 push_limit: 512
### Push Workers
The relay can be configured to use threads to push messages out. For smaller relays, this isn't
necessary, but bigger ones (>100 instances) will want to set this to the number of available cpu
threads.
workers: 0
### JSON GET cache limit
JSON objects (actors, nodeinfo, etc) will get cached when fetched. This will set the max number of
objects to keep in the cache.
json_cache: 1024
## AP ## AP
Various ActivityPub-related settings Various ActivityPub-related settings
@ -82,29 +103,3 @@ setting this to the below list will block all other relays and prevent relay cha
- aoderelay - aoderelay
- social.seattle.wa.us-relay - social.seattle.wa.us-relay
- unciarelay - unciarelay
## Cache
These are object limits for various caches. Only change if you know what you're doing.
### Objects
The urls of messages which have been processed by the relay.
objects: 1024
### Actors
The ActivityPub actors of incoming messages.
actors: 1024
### Actors
The base64 encoded hashes of messages.
digests: 1024

View file

@ -15,7 +15,7 @@ the [official pipx docs](https://pypa.github.io/pipx/installation/) for more in-
Now simply install ActivityRelay directly from git Now simply install ActivityRelay directly from git
pipx install git+https://git.pleroma.social/pleroma/relay@0.2.0 pipx install git+https://git.pleroma.social/pleroma/relay@0.2.4
Or from a cloned git repo. Or from a cloned git repo.
@ -39,7 +39,7 @@ be installed via [pyenv](https://github.com/pyenv/pyenv).
The instructions for installation via pip are very similar to pipx. Installation can be done from The instructions for installation via pip are very similar to pipx. Installation can be done from
git git
python3 -m pip install git+https://git.pleroma.social/pleroma/relay@0.2.0 python3 -m pip install git+https://git.pleroma.social/pleroma/relay@0.2.4
or a cloned git repo. or a cloned git repo.

50
relay.spec Normal file
View file

@ -0,0 +1,50 @@
# -*- mode: python ; coding: utf-8 -*-
block_cipher = None
a = Analysis(
['relay/__main__.py'],
pathex=[],
binaries=[],
datas=[],
hiddenimports=[
'aputils.enums',
'aputils.errors',
'aputils.misc',
'aputils.objects',
'aputils.signer'
],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[],
win_no_prefer_redirects=False,
win_private_assemblies=False,
cipher=block_cipher,
noarchive=False,
)
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
exe = EXE(
pyz,
a.scripts,
a.binaries,
a.zipfiles,
a.datas,
[],
name='activityrelay',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
console=True,
disable_windowed_traceback=False,
argv_emulation=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
)

View file

@ -9,13 +9,19 @@ port: 8080
# Note # Note
note: "Make a note about your instance here." note: "Make a note about your instance here."
# maximum number of inbox posts to do at once # Number of worker threads to start. If 0, use asyncio futures instead of threads.
post_limit: 512 workers: 0
# Maximum number of inbox posts to do at once
# If workers is set to 1 or above, this is the max for each worker
push_limit: 512
# The amount of json objects to cache from GET requests
json_cache: 1024
# this section is for ActivityPub
ap: ap:
# this is used for generating activitypub messages, as well as instructions for # This is used for generating activitypub messages, as well as instructions for
# linking AP identities. it should be an SSL-enabled domain reachable by https. # linking AP identities. It should be an SSL-enabled domain reachable by https.
host: 'relay.example.com' host: 'relay.example.com'
blocked_instances: blocked_instances:
@ -35,9 +41,3 @@ ap:
#- 'aoderelay' #- 'aoderelay'
#- 'social.seattle.wa.us-relay' #- 'social.seattle.wa.us-relay'
#- 'unciarelay' #- 'unciarelay'
# cache limits as number of items. only change this if you know what you're doing
cache:
objects: 1024
actors: 1024
digests: 1024

View file

@ -1,8 +1,3 @@
__version__ = '0.2.2' __version__ = '0.2.4'
from aiohttp.web import Application
from . import logger from . import logger
app = Application()

View file

@ -1,4 +1,4 @@
from .manage import main from relay.manage import main
if __name__ == '__main__': if __name__ == '__main__':

216
relay/application.py Normal file
View file

@ -0,0 +1,216 @@
import asyncio
import logging
import os
import queue
import signal
import threading
import traceback
from aiohttp import web
from datetime import datetime, timedelta
from .config import RelayConfig
from .database import RelayDatabase
from .http_client import HttpClient
from .misc import DotDict, check_open_port, set_app
from .views import routes
class Application(web.Application):
def __init__(self, cfgpath):
web.Application.__init__(self)
self['starttime'] = None
self['running'] = False
self['config'] = RelayConfig(cfgpath)
if not self['config'].load():
self['config'].save()
if self.config.is_docker:
self.config.update({
'db': '/data/relay.jsonld',
'listen': '0.0.0.0',
'port': 8080
})
self['workers'] = []
self['last_worker'] = 0
set_app(self)
self['database'] = RelayDatabase(self['config'])
self['database'].load()
self['client'] = HttpClient(
database = self.database,
limit = self.config.push_limit,
timeout = self.config.timeout,
cache_size = self.config.json_cache
)
self.set_signal_handler()
@property
def client(self):
return self['client']
@property
def config(self):
return self['config']
@property
def database(self):
return self['database']
@property
def uptime(self):
if not self['starttime']:
return timedelta(seconds=0)
uptime = datetime.now() - self['starttime']
return timedelta(seconds=uptime.seconds)
def push_message(self, inbox, message):
if self.config.workers <= 0:
return asyncio.ensure_future(self.client.post(inbox, message))
worker = self['workers'][self['last_worker']]
worker.queue.put((inbox, message))
self['last_worker'] += 1
if self['last_worker'] >= len(self['workers']):
self['last_worker'] = 0
def set_signal_handler(self):
for sig in {'SIGHUP', 'SIGINT', 'SIGQUIT', 'SIGTERM'}:
try:
signal.signal(getattr(signal, sig), self.stop)
# some signals don't exist in windows, so skip them
except AttributeError:
pass
def run(self):
if not check_open_port(self.config.listen, self.config.port):
return logging.error(f'A server is already running on port {self.config.port}')
for route in routes:
self.router.add_route(*route)
logging.info(f'Starting webserver at {self.config.host} ({self.config.listen}:{self.config.port})')
asyncio.run(self.handle_run())
def stop(self, *_):
self['running'] = False
async def handle_run(self):
self['running'] = True
if self.config.workers > 0:
for i in range(self.config.workers):
worker = PushWorker(self)
worker.start()
self['workers'].append(worker)
runner = web.AppRunner(self, access_log_format='%{X-Forwarded-For}i "%r" %s %b "%{User-Agent}i"')
await runner.setup()
site = web.TCPSite(runner,
host = self.config.listen,
port = self.config.port,
reuse_address = True
)
await site.start()
self['starttime'] = datetime.now()
while self['running']:
await asyncio.sleep(0.25)
await site.stop()
self['starttime'] = None
self['running'] = False
self['workers'].clear()
class PushWorker(threading.Thread):
def __init__(self, app):
threading.Thread.__init__(self)
self.app = app
self.queue = queue.Queue()
def run(self):
self.client = HttpClient(
database = self.app.database,
limit = self.app.config.push_limit,
timeout = self.app.config.timeout,
cache_size = self.app.config.json_cache
)
asyncio.run(self.handle_queue())
async def handle_queue(self):
while self.app['running']:
try:
inbox, message = self.queue.get(block=True, timeout=0.25)
self.queue.task_done()
logging.verbose(f'New push from Thread-{threading.get_ident()}')
await self.client.post(inbox, message)
except queue.Empty:
pass
## make sure an exception doesn't bring down the worker
except Exception:
traceback.print_exc()
await self.client.close()
## Can't sub-class web.Request, so let's just add some properties
def request_actor(self):
try: return self['actor']
except KeyError: pass
def request_instance(self):
try: return self['instance']
except KeyError: pass
def request_message(self):
try: return self['message']
except KeyError: pass
def request_signature(self):
if 'signature' not in self._state:
try: self['signature'] = DotDict.new_from_signature(self.headers['signature'])
except KeyError: return
return self['signature']
setattr(web.Request, 'actor', property(request_actor))
setattr(web.Request, 'instance', property(request_instance))
setattr(web.Request, 'message', property(request_message))
setattr(web.Request, 'signature', property(request_signature))
setattr(web.Request, 'config', property(lambda self: self.app.config))
setattr(web.Request, 'database', property(lambda self: self.app.database))

View file

@ -1,109 +1,51 @@
import json import json
import os
import yaml import yaml
from functools import cached_property
from pathlib import Path from pathlib import Path
from urllib.parse import urlparse from urllib.parse import urlparse
from .misc import DotDict, boolean
relay_software_names = [
'activityrelay', RELAY_SOFTWARE = [
'aoderelay', 'activityrelay', # https://git.pleroma.social/pleroma/relay
'social.seattle.wa.us-relay', 'aoderelay', # https://git.asonix.dog/asonix/relay
'unciarelay' 'feditools-relay' # https://git.ptzo.gdn/feditools/relay
] ]
APKEYS = [
class DotDict(dict):
def __getattr__(self, k):
try:
return self[k]
except KeyError:
raise AttributeError(f'{self.__class__.__name__} object has no attribute {k}') from None
def __setattr__(self, k, v):
try:
if k in self._ignore_keys:
super().__setattr__(k, v)
except AttributeError:
pass
if k.startswith('_'):
super().__setattr__(k, v)
else:
self[k] = v
def __setitem__(self, k, v):
if type(v) == dict:
v = DotDict(v)
super().__setitem__(k, v)
def __delattr__(self, k):
try:
dict.__delitem__(self, k)
except KeyError:
raise AttributeError(f'{self.__class__.__name__} object has no attribute {k}') from None
class RelayConfig(DotDict):
apkeys = {
'host', 'host',
'whitelist_enabled', 'whitelist_enabled',
'blocked_software', 'blocked_software',
'blocked_instances', 'blocked_instances',
'whitelist' 'whitelist'
} ]
cachekeys = {
'json',
'objects',
'digests'
}
def __init__(self, path, is_docker): class RelayConfig(DotDict):
if is_docker: def __init__(self, path):
path = '/data/relay.yaml' DotDict.__init__(self, {})
if self.is_docker:
path = '/data/config.yaml'
self._isdocker = is_docker
self._path = Path(path).expanduser() self._path = Path(path).expanduser()
self.reset()
super().__init__({
'db': str(self._path.parent.joinpath(f'{self._path.stem}.jsonld')),
'listen': '0.0.0.0',
'port': 8080,
'note': 'Make a note about your instance here.',
'push_limit': 512,
'host': 'relay.example.com',
'blocked_software': [],
'blocked_instances': [],
'whitelist': [],
'whitelist_enabled': False,
'json': 1024,
'objects': 1024,
'digests': 1024
})
def __setitem__(self, key, value): def __setitem__(self, key, value):
if self._isdocker and key in ['db', 'listen', 'port']:
return
if key in ['blocked_instances', 'blocked_software', 'whitelist']: if key in ['blocked_instances', 'blocked_software', 'whitelist']:
assert isinstance(value, (list, set, tuple)) assert isinstance(value, (list, set, tuple))
elif key in ['port', 'json', 'objects', 'digests']: elif key in ['port', 'workers', 'json_cache', 'timeout']:
assert isinstance(value, (int)) if not isinstance(value, int):
value = int(value)
elif key == 'whitelist_enabled': elif key == 'whitelist_enabled':
assert isinstance(value, bool) if not isinstance(value, bool):
value = boolean(value)
super().__setitem__(key, value) super().__setitem__(key, value)
@ -133,6 +75,30 @@ class RelayConfig(DotDict):
return f'{self.actor}#main-key' return f'{self.actor}#main-key'
@cached_property
def is_docker(self):
return bool(os.environ.get('DOCKER_RUNNING'))
def reset(self):
self.clear()
self.update({
'db': str(self._path.parent.joinpath(f'{self._path.stem}.jsonld')),
'listen': '0.0.0.0',
'port': 8080,
'note': 'Make a note about your instance here.',
'push_limit': 512,
'json_cache': 1024,
'timeout': 10,
'workers': 0,
'host': 'relay.example.com',
'whitelist_enabled': False,
'blocked_software': [],
'blocked_instances': [],
'whitelist': []
})
def ban_instance(self, instance): def ban_instance(self, instance):
if instance.startswith('http'): if instance.startswith('http'):
instance = urlparse(instance).hostname instance = urlparse(instance).hostname
@ -218,6 +184,8 @@ class RelayConfig(DotDict):
def load(self): def load(self):
self.reset()
options = {} options = {}
try: try:
@ -237,13 +205,15 @@ class RelayConfig(DotDict):
return False return False
for key, value in config.items(): for key, value in config.items():
if key in ['ap', 'cache']: if key in ['ap']:
for k, v in value.items(): for k, v in value.items():
if k not in self: if k not in self:
continue continue
self[k] = v self[k] = v
continue
elif key not in self: elif key not in self:
continue continue
@ -257,13 +227,16 @@ class RelayConfig(DotDict):
def save(self): def save(self):
config = { config = {
'db': self['db'], # just turning config.db into a string is good enough for now
'db': str(self.db),
'listen': self.listen, 'listen': self.listen,
'port': self.port, 'port': self.port,
'note': self.note, 'note': self.note,
'push_limit': self.push_limit, 'push_limit': self.push_limit,
'ap': {key: self[key] for key in self.apkeys}, 'workers': self.workers,
'cache': {key: self[key] for key in self.cachekeys} 'json_cache': self.json_cache,
'timeout': self.timeout,
'ap': {key: self[key] for key in APKEYS}
} }
with open(self._path, 'w') as fd: with open(self._path, 'w') as fd:

View file

@ -1,50 +1,33 @@
import aputils
import asyncio
import json import json
import logging import logging
import traceback import traceback
from Crypto.PublicKey import RSA
from urllib.parse import urlparse from urllib.parse import urlparse
class RelayDatabase: class RelayDatabase(dict):
def __init__(self, config): def __init__(self, config):
dict.__init__(self, {
'relay-list': {},
'private-key': None,
'follow-requests': {},
'version': 1
})
self.config = config self.config = config
self.data = None self.signer = None
self.PRIVKEY = None
@property
def PUBKEY(self):
return self.PRIVKEY.publickey()
@property
def pubkey(self):
return self.PUBKEY.exportKey('PEM').decode('utf-8')
@property
def privkey(self):
try:
return self.data['private-key']
except KeyError:
return False
@property @property
def hostnames(self): def hostnames(self):
return [urlparse(inbox).hostname for inbox in self.inboxes] return tuple(self['relay-list'].keys())
@property @property
def inboxes(self): def inboxes(self):
return self.data.get('relay-list', []) return tuple(data['inbox'] for data in self['relay-list'].values())
def generate_key(self):
self.PRIVKEY = RSA.generate(4096)
self.data['private-key'] = self.PRIVKEY.exportKey('PEM').decode('utf-8')
def load(self): def load(self):
@ -52,14 +35,36 @@ class RelayDatabase:
try: try:
with self.config.db.open() as fd: with self.config.db.open() as fd:
self.data = json.load(fd) data = json.load(fd)
key = self.data.pop('actorKeys', None) self['version'] = data.get('version', None)
self['private-key'] = data.get('private-key')
if key: if self['version'] == None:
self.data['private-key'] = key.get('privateKey') self['version'] = 1
if 'actorKeys' in data:
self['private-key'] = data['actorKeys']['privateKey']
for item in data.get('relay-list', []):
domain = urlparse(item).hostname
self['relay-list'][domain] = {
'domain': domain,
'inbox': item,
'followid': None
}
else:
self['relay-list'] = data.get('relay-list', {})
for domain, instance in self['relay-list'].items():
if self.config.is_banned(domain) or (self.config.whitelist_enabled and not self.config.is_whitelisted(domain)):
self.del_inbox(domain)
continue
if not instance.get('domain'):
instance['domain'] = domain
self.data.pop('actors', None)
new_db = False new_db = False
except FileNotFoundError: except FileNotFoundError:
@ -69,20 +74,13 @@ class RelayDatabase:
if self.config.db.stat().st_size > 0: if self.config.db.stat().st_size > 0:
raise e from None raise e from None
if not self.data: if not self['private-key']:
logging.info('No database was found. Making a new one.')
self.data = {}
for inbox in self.inboxes:
if self.config.is_banned(inbox) or (self.config.whitelist_enabled and not self.config.is_whitelisted(inbox)):
self.del_inbox(inbox)
if not self.privkey:
logging.info("No actor keys present, generating 4096-bit RSA keypair.") logging.info("No actor keys present, generating 4096-bit RSA keypair.")
self.generate_key() self.signer = aputils.Signer.new(self.config.keyid, size=4096)
self['private-key'] = self.signer.export()
else: else:
self.PRIVKEY = RSA.importKey(self.privkey) self.signer = aputils.Signer(self['private-key'], self.config.keyid)
self.save() self.save()
return not new_db return not new_db
@ -90,34 +88,110 @@ class RelayDatabase:
def save(self): def save(self):
with self.config.db.open('w') as fd: with self.config.db.open('w') as fd:
data = { json.dump(self, fd, indent=4)
'relay-list': self.inboxes,
'private-key': self.privkey
}
json.dump(data, fd, indent=4)
def get_inbox(self, domain): def get_inbox(self, domain, fail=False):
if domain.startswith('http'): if domain.startswith('http'):
domain = urlparse(domain).hostname domain = urlparse(domain).hostname
for inbox in self.inboxes: inbox = self['relay-list'].get(domain)
if domain == urlparse(inbox).hostname:
if inbox:
return inbox return inbox
if fail:
def add_inbox(self, inbox): raise KeyError(domain)
assert inbox.startswith('https')
assert not self.get_inbox(inbox)
self.data['relay-list'].append(inbox)
def del_inbox(self, inbox_url): def add_inbox(self, inbox, followid=None, software=None):
inbox = self.get_inbox(inbox_url) assert inbox.startswith('https'), 'Inbox must be a url'
domain = urlparse(inbox).hostname
instance = self.get_inbox(domain)
if not inbox: if instance:
raise KeyError(inbox_url) if followid:
instance['followid'] = followid
self.data['relay-list'].remove(inbox) if software:
instance['software'] = software
return instance
self['relay-list'][domain] = {
'domain': domain,
'inbox': inbox,
'followid': followid,
'software': software
}
logging.verbose(f'Added inbox to database: {inbox}')
return self['relay-list'][domain]
def del_inbox(self, domain, followid=None, fail=False):
data = self.get_inbox(domain, fail=False)
if not data:
if fail:
raise KeyError(domain)
return False
if not data['followid'] or not followid or data['followid'] == followid:
del self['relay-list'][data['domain']]
logging.verbose(f'Removed inbox from database: {data["inbox"]}')
return True
if fail:
raise ValueError('Follow IDs do not match')
logging.debug(f'Follow ID does not match: db = {data["followid"]}, object = {followid}')
return False
def get_request(self, domain, fail=True):
if domain.startswith('http'):
domain = urlparse(domain).hostname
try:
return self['follow-requests'][domain]
except KeyError as e:
if fail:
raise e
def add_request(self, actor, inbox, followid):
domain = urlparse(inbox).hostname
try:
request = self.get_request(domain)
request['followid'] = followid
except KeyError:
pass
self['follow-requests'][domain] = {
'actor': actor,
'inbox': inbox,
'followid': followid
}
def del_request(self, domain):
if domain.startswith('http'):
domain = urlparse(inbox).hostname
del self['follow-requests'][domain]
def distill_inboxes(self, message):
src_domains = {
message.domain,
urlparse(message.objectid).netloc
}
for domain, instance in self['relay-list'].items():
if domain not in src_domains:
yield instance['inbox']

222
relay/http_client.py Normal file
View file

@ -0,0 +1,222 @@
import logging
import traceback
from aiohttp import ClientSession, ClientTimeout, TCPConnector
from aiohttp.client_exceptions import ClientConnectionError, ClientSSLError
from asyncio.exceptions import TimeoutError as AsyncTimeoutError
from aputils import Nodeinfo, WellKnownNodeinfo
from datetime import datetime
from cachetools import LRUCache
from json.decoder import JSONDecodeError
from urllib.parse import urlparse
from . import __version__
from .misc import (
MIMETYPES,
DotDict,
Message
)
HEADERS = {
'Accept': f'{MIMETYPES["activity"]}, {MIMETYPES["json"]};q=0.9',
'User-Agent': f'ActivityRelay/{__version__}'
}
class Cache(LRUCache):
def set_maxsize(self, value):
self.__maxsize = int(value)
class HttpClient:
def __init__(self, database, limit=100, timeout=10, cache_size=1024):
self.database = database
self.cache = Cache(cache_size)
self.cfg = {'limit': limit, 'timeout': timeout}
self._conn = None
self._session = None
async def __aenter__(self):
await self.open()
return self
async def __aexit__(self, *_):
await self.close()
@property
def limit(self):
return self.cfg['limit']
@property
def timeout(self):
return self.cfg['timeout']
async def open(self):
if self._session:
return
self._conn = TCPConnector(
limit = self.limit,
ttl_dns_cache = 300,
)
self._session = ClientSession(
connector = self._conn,
headers = HEADERS,
connector_owner = True,
timeout = ClientTimeout(total=self.timeout)
)
async def close(self):
if not self._session:
return
await self._session.close()
await self._conn.close()
self._conn = None
self._session = None
async def get(self, url, sign_headers=False, loads=None, force=False):
await self.open()
try: url, _ = url.split('#', 1)
except: pass
if not force and url in self.cache:
return self.cache[url]
headers = {}
if sign_headers:
headers.update(self.database.signer.sign_headers('GET', url, algorithm='original'))
try:
logging.verbose(f'Fetching resource: {url}')
async with self._session.get(url, headers=headers) as resp:
## Not expecting a response with 202s, so just return
if resp.status == 202:
return
elif resp.status != 200:
logging.verbose(f'Received error when requesting {url}: {resp.status}')
logging.verbose(await resp.read()) # change this to debug
return
if loads:
message = await resp.json(loads=loads)
elif resp.content_type == MIMETYPES['activity']:
message = await resp.json(loads=Message.new_from_json)
elif resp.content_type == MIMETYPES['json']:
message = await resp.json(loads=DotDict.new_from_json)
else:
# todo: raise TypeError or something
logging.verbose(f'Invalid Content-Type for "{url}": {resp.content_type}')
return logging.debug(f'Response: {resp.read()}')
logging.debug(f'{url} >> resp {message.to_json(4)}')
self.cache[url] = message
return message
except JSONDecodeError:
logging.verbose(f'Failed to parse JSON')
except ClientSSLError:
logging.verbose(f'SSL error when connecting to {urlparse(url).netloc}')
except (AsyncTimeoutError, ClientConnectionError):
logging.verbose(f'Failed to connect to {urlparse(url).netloc}')
except Exception as e:
traceback.print_exc()
async def post(self, url, message):
await self.open()
instance = self.database.get_inbox(url)
## Using the old algo by default is probably a better idea right now
if instance and instance.get('software') in {'mastodon'}:
algorithm = 'hs2019'
else:
algorithm = 'original'
headers = {'Content-Type': 'application/activity+json'}
headers.update(self.database.signer.sign_headers('POST', url, message, algorithm=algorithm))
try:
logging.verbose(f'Sending "{message.type}" to {url}')
async with self._session.post(url, headers=headers, data=message.to_json()) as resp:
## Not expecting a response, so just return
if resp.status in {200, 202}:
return logging.verbose(f'Successfully sent "{message.type}" to {url}')
logging.verbose(f'Received error when pushing to {url}: {resp.status}')
return logging.verbose(await resp.read()) # change this to debug
except ClientSSLError:
logging.warning(f'SSL error when pushing to {urlparse(url).netloc}')
except (AsyncTimeoutError, ClientConnectionError):
logging.warning(f'Failed to connect to {urlparse(url).netloc} for message push')
## prevent workers from being brought down
except Exception as e:
traceback.print_exc()
## Additional methods ##
async def fetch_nodeinfo(self, domain):
nodeinfo_url = None
wk_nodeinfo = await self.get(
f'https://{domain}/.well-known/nodeinfo',
loads = WellKnownNodeinfo.new_from_json
)
if not wk_nodeinfo:
logging.verbose(f'Failed to fetch well-known nodeinfo url for domain: {domain}')
return False
for version in ['20', '21']:
try:
nodeinfo_url = wk_nodeinfo.get_url(version)
except KeyError:
pass
if not nodeinfo_url:
logging.verbose(f'Failed to fetch nodeinfo url for domain: {domain}')
return False
return await self.get(nodeinfo_url, loads=Nodeinfo.new_from_json) or False
async def get(database, *args, **kwargs):
async with HttpClient(database) as client:
return await client.get(*args, **kwargs)
async def post(database, *args, **kwargs):
async with HttpClient(database) as client:
return await client.post(*args, **kwargs)
async def fetch_nodeinfo(database, *args, **kwargs):
async with HttpClient(database) as client:
return await client.fetch_nodeinfo(*args, **kwargs)

View file

@ -1,68 +0,0 @@
import logging
import aiohttp
from collections import defaultdict
STATS = {
'requests': defaultdict(int),
'response_codes': defaultdict(int),
'response_codes_per_domain': defaultdict(lambda: defaultdict(int)),
'delivery_codes': defaultdict(int),
'delivery_codes_per_domain': defaultdict(lambda: defaultdict(int)),
'exceptions': defaultdict(int),
'exceptions_per_domain': defaultdict(lambda: defaultdict(int)),
'delivery_exceptions': defaultdict(int),
'delivery_exceptions_per_domain': defaultdict(lambda: defaultdict(int))
}
async def on_request_start(session, trace_config_ctx, params):
global STATS
logging.debug("HTTP START [%r], [%r]", session, params)
STATS['requests'][params.url.host] += 1
async def on_request_end(session, trace_config_ctx, params):
global STATS
logging.debug("HTTP END [%r], [%r]", session, params)
host = params.url.host
status = params.response.status
STATS['response_codes'][status] += 1
STATS['response_codes_per_domain'][host][status] += 1
if params.method == 'POST':
STATS['delivery_codes'][status] += 1
STATS['delivery_codes_per_domain'][host][status] += 1
async def on_request_exception(session, trace_config_ctx, params):
global STATS
logging.debug("HTTP EXCEPTION [%r], [%r]", session, params)
host = params.url.host
exception = repr(params.exception)
STATS['exceptions'][exception] += 1
STATS['exceptions_per_domain'][host][exception] += 1
if params.method == 'POST':
STATS['delivery_exceptions'][exception] += 1
STATS['delivery_exceptions_per_domain'][host][exception] += 1
def http_debug():
if logging.DEBUG >= logging.root.level:
return
trace_config = aiohttp.TraceConfig()
trace_config.on_request_start.append(on_request_start)
trace_config.on_request_end.append(on_request_end)
trace_config.on_request_exception.append(on_request_exception)
return [trace_config]

View file

@ -1,17 +1,19 @@
import Crypto import Crypto
import asyncio import asyncio
import click import click
import json
import logging import logging
import os
import platform import platform
from aiohttp.web import AppRunner, TCPSite from urllib.parse import urlparse
from cachetools import LRUCache
from . import app, misc, views, __version__ from . import misc, __version__
from .config import DotDict, RelayConfig, relay_software_names from . import http_client as http
from .database import RelayDatabase from .application import Application
from .config import RELAY_SOFTWARE
app = None
CONFIG_IGNORE = {'blocked_software', 'blocked_instances', 'whitelist'}
@click.group('cli', context_settings={'show_default': True}, invoke_without_command=True) @click.group('cli', context_settings={'show_default': True}, invoke_without_command=True)
@ -19,32 +21,100 @@ from .database import RelayDatabase
@click.version_option(version=__version__, prog_name='ActivityRelay') @click.version_option(version=__version__, prog_name='ActivityRelay')
@click.pass_context @click.pass_context
def cli(ctx, config): def cli(ctx, config):
app['is_docker'] = bool(os.environ.get('DOCKER_RUNNING')) global app
app['config'] = RelayConfig(config, app['is_docker']) app = Application(config)
if not app['config'].load():
app['config'].save()
app['database'] = RelayDatabase(app['config'])
app['database'].load()
app['cache'] = DotDict()
app['semaphore'] = asyncio.Semaphore(app['config']['push_limit'])
for key in app['config'].cachekeys:
app['cache'][key] = LRUCache(app['config'][key])
if not ctx.invoked_subcommand: if not ctx.invoked_subcommand:
if app['config'].host.endswith('example.com'): if app.config.host.endswith('example.com'):
relay_setup.callback() cli_setup.callback()
else: else:
relay_run.callback() cli_run.callback()
@cli.command('setup')
def cli_setup():
'Generate a new config'
while True:
app.config.host = click.prompt('What domain will the relay be hosted on?', default=app.config.host)
if not app.config.host.endswith('example.com'):
break
click.echo('The domain must not be example.com')
if not app.config.is_docker:
app.config.listen = click.prompt('Which address should the relay listen on?', default=app.config.listen)
while True:
app.config.port = click.prompt('What TCP port should the relay listen on?', default=app.config.port, type=int)
break
app.config.save()
if not app.config.is_docker and click.confirm('Relay all setup! Would you like to run it now?'):
cli_run.callback()
@cli.command('run')
def cli_run():
'Run the relay'
if app.config.host.endswith('example.com'):
return click.echo('Relay is not set up. Please edit your relay config or run "activityrelay setup".')
vers_split = platform.python_version().split('.')
pip_command = 'pip3 uninstall pycrypto && pip3 install pycryptodome'
if Crypto.__version__ == '2.6.1':
if int(vers_split[1]) > 7:
click.echo('Error: PyCrypto is broken on Python 3.8+. Please replace it with pycryptodome before running again. Exiting...')
return click.echo(pip_command)
else:
click.echo('Warning: PyCrypto is old and should be replaced with pycryptodome')
return click.echo(pip_command)
if not misc.check_open_port(app.config.listen, app.config.port):
return click.echo(f'Error: A server is already running on port {app.config.port}')
app.run()
# todo: add config default command for resetting config key
@cli.group('config')
def cli_config():
'Manage the relay config'
pass
@cli_config.command('list')
def cli_config_list():
'List the current relay config'
click.echo('Relay Config:')
for key, value in app.config.items():
if key not in CONFIG_IGNORE:
key = f'{key}:'.ljust(20)
click.echo(f'- {key} {value}')
@cli_config.command('set')
@click.argument('key')
@click.argument('value')
def cli_config_set(key, value):
'Set a config value'
app.config[key] = value
app.config.save()
print(f'{key}: {app.config[key]}')
@cli.group('inbox') @cli.group('inbox')
@click.pass_context def cli_inbox():
def cli_inbox(ctx):
'Manage the inboxes in the database' 'Manage the inboxes in the database'
pass pass
@ -55,7 +125,7 @@ def cli_inbox_list():
click.echo('Connected to the following instances or relays:') click.echo('Connected to the following instances or relays:')
for inbox in app['database'].inboxes: for inbox in app.database.inboxes:
click.echo(f'- {inbox}') click.echo(f'- {inbox}')
@ -64,29 +134,34 @@ def cli_inbox_list():
def cli_inbox_follow(actor): def cli_inbox_follow(actor):
'Follow an actor (Relay must be running)' 'Follow an actor (Relay must be running)'
config = app['config'] if app.config.is_banned(actor):
database = app['database']
if config.is_banned(actor):
return click.echo(f'Error: Refusing to follow banned actor: {actor}') return click.echo(f'Error: Refusing to follow banned actor: {actor}')
if not actor.startswith('http'): if not actor.startswith('http'):
domain = actor
actor = f'https://{actor}/actor' actor = f'https://{actor}/actor'
if database.get_inbox(actor): else:
return click.echo(f'Error: Already following actor: {actor}') domain = urlparse(actor).hostname
actor_data = run_in_loop(misc.request, actor, sign_headers=True) try:
inbox_data = app.database['relay-list'][domain]
inbox = inbox_data['inbox']
except KeyError:
actor_data = asyncio.run(http.get(app.database, actor, sign_headers=True))
if not actor_data: if not actor_data:
return click.echo(f'Error: Failed to fetch actor: {actor}') return click.echo(f'Failed to fetch actor: {actor}')
inbox = misc.get_actor_inbox(actor_data) inbox = actor_data.shared_inbox
database.add_inbox(inbox) message = misc.Message.new_follow(
database.save() host = app.config.host,
actor = actor
)
run_in_loop(misc.follow_remote_actor, actor) asyncio.run(http.post(app.database, inbox, message))
click.echo(f'Sent follow message to actor: {actor}') click.echo(f'Sent follow message to actor: {actor}')
@ -95,18 +170,36 @@ def cli_inbox_follow(actor):
def cli_inbox_unfollow(actor): def cli_inbox_unfollow(actor):
'Unfollow an actor (Relay must be running)' 'Unfollow an actor (Relay must be running)'
database = app['database']
if not actor.startswith('http'): if not actor.startswith('http'):
domain = actor
actor = f'https://{actor}/actor' actor = f'https://{actor}/actor'
if not database.get_inbox(actor): else:
return click.echo(f'Error: Not following actor: {actor}') domain = urlparse(actor).hostname
database.del_inbox(actor) try:
database.save() inbox_data = app.database['relay-list'][domain]
inbox = inbox_data['inbox']
message = misc.Message.new_unfollow(
host = app.config.host,
actor = actor,
follow = inbox_data['followid']
)
run_in_loop(misc.unfollow_remote_actor, actor) except KeyError:
actor_data = asyncio.run(http.get(app.database, actor, sign_headers=True))
inbox = actor_data.shared_inbox
message = misc.Message.new_unfollow(
host = app.config.host,
actor = actor,
follow = {
'type': 'Follow',
'object': actor,
'actor': f'https://{app.config.host}/actor'
}
)
asyncio.run(http.post(app.database, inbox, message))
click.echo(f'Sent unfollow message to: {actor}') click.echo(f'Sent unfollow message to: {actor}')
@ -115,22 +208,18 @@ def cli_inbox_unfollow(actor):
def cli_inbox_add(inbox): def cli_inbox_add(inbox):
'Add an inbox to the database' 'Add an inbox to the database'
database = app['database']
config = app['config']
if not inbox.startswith('http'): if not inbox.startswith('http'):
inbox = f'https://{inbox}/inbox' inbox = f'https://{inbox}/inbox'
if database.get_inbox(inbox): if app.config.is_banned(inbox):
click.echo(f'Error: Inbox already in database: {inbox}') return click.echo(f'Error: Refusing to add banned inbox: {inbox}')
return
if config.is_banned(inbox): if app.database.get_inbox(inbox):
click.echo(f'Error: Refusing to add banned inbox: {inbox}') return click.echo(f'Error: Inbox already in database: {inbox}')
return
app.database.add_inbox(inbox)
app.database.save()
database.add_inbox(inbox)
database.save()
click.echo(f'Added inbox to the database: {inbox}') click.echo(f'Added inbox to the database: {inbox}')
@ -139,15 +228,16 @@ def cli_inbox_add(inbox):
def cli_inbox_remove(inbox): def cli_inbox_remove(inbox):
'Remove an inbox from the database' 'Remove an inbox from the database'
database = app['database'] try:
dbinbox = database.get_inbox(inbox) dbinbox = app.database.get_inbox(inbox, fail=True)
if not dbinbox: except KeyError:
click.echo(f'Error: Inbox does not exist: {inbox}') click.echo(f'Error: Inbox does not exist: {inbox}')
return return
database.del_inbox(dbinbox) app.database.del_inbox(dbinbox['domain'])
database.save() app.database.save()
click.echo(f'Removed inbox from the database: {inbox}') click.echo(f'Removed inbox from the database: {inbox}')
@ -163,7 +253,7 @@ def cli_instance_list():
click.echo('Banned instances or relays:') click.echo('Banned instances or relays:')
for domain in app['config'].blocked_instances: for domain in app.config.blocked_instances:
click.echo(f'- {domain}') click.echo(f'- {domain}')
@ -172,16 +262,14 @@ def cli_instance_list():
def cli_instance_ban(target): def cli_instance_ban(target):
'Ban an instance and remove the associated inbox if it exists' 'Ban an instance and remove the associated inbox if it exists'
config = app['config'] if target.startswith('http'):
database = app['database'] target = urlparse(target).hostname
inbox = database.get_inbox(target)
if config.ban_instance(target): if app.config.ban_instance(target):
config.save() app.config.save()
if inbox: if app.database.del_inbox(target):
database.del_inbox(inbox) app.database.save()
database.save()
click.echo(f'Banned instance: {target}') click.echo(f'Banned instance: {target}')
return return
@ -194,10 +282,8 @@ def cli_instance_ban(target):
def cli_instance_unban(target): def cli_instance_unban(target):
'Unban an instance' 'Unban an instance'
config = app['config'] if app.config.unban_instance(target):
app.config.save()
if config.unban_instance(target):
config.save()
click.echo(f'Unbanned instance: {target}') click.echo(f'Unbanned instance: {target}')
return return
@ -217,7 +303,7 @@ def cli_software_list():
click.echo('Banned software:') click.echo('Banned software:')
for software in app['config'].blocked_software: for software in app.config.blocked_software:
click.echo(f'- {software}') click.echo(f'- {software}')
@ -229,25 +315,23 @@ def cli_software_list():
def cli_software_ban(name, fetch_nodeinfo): def cli_software_ban(name, fetch_nodeinfo):
'Ban software. Use RELAYS for NAME to ban relays' 'Ban software. Use RELAYS for NAME to ban relays'
config = app['config']
if name == 'RELAYS': if name == 'RELAYS':
for name in relay_software_names: for name in RELAY_SOFTWARE:
config.ban_software(name) app.config.ban_software(name)
config.save() app.config.save()
return click.echo('Banned all relay software') return click.echo('Banned all relay software')
if fetch_nodeinfo: if fetch_nodeinfo:
software = run_in_loop(fetch_nodeinfo, name) nodeinfo = asyncio.run(http.fetch_nodeinfo(app.database, name))
if not software: if not nodeinfo:
click.echo(f'Failed to fetch software name from domain: {name}') click.echo(f'Failed to fetch software name from domain: {name}')
name = software name = nodeinfo.sw_name
if config.ban_software(name): if app.config.ban_software(name):
config.save() app.config.save()
return click.echo(f'Banned software: {name}') return click.echo(f'Banned software: {name}')
click.echo(f'Software already banned: {name}') click.echo(f'Software already banned: {name}')
@ -261,31 +345,28 @@ def cli_software_ban(name, fetch_nodeinfo):
def cli_software_unban(name, fetch_nodeinfo): def cli_software_unban(name, fetch_nodeinfo):
'Ban software. Use RELAYS for NAME to unban relays' 'Ban software. Use RELAYS for NAME to unban relays'
config = app['config']
if name == 'RELAYS': if name == 'RELAYS':
for name in relay_software_names: for name in RELAY_SOFTWARE:
config.unban_software(name) app.config.unban_software(name)
config.save() app.config.save()
return click.echo('Unbanned all relay software') return click.echo('Unbanned all relay software')
if fetch_nodeinfo: if fetch_nodeinfo:
software = run_in_loop(fetch_nodeinfo, name) nodeinfo = asyncio.run(http.fetch_nodeinfo(app.database, name))
if not software: if not nodeinfo:
click.echo(f'Failed to fetch software name from domain: {name}') click.echo(f'Failed to fetch software name from domain: {name}')
name = software name = nodeinfo.sw_name
if config.unban_software(name): if app.config.unban_software(name):
config.save() app.config.save()
return click.echo(f'Unbanned software: {name}') return click.echo(f'Unbanned software: {name}')
click.echo(f'Software wasn\'t banned: {name}') click.echo(f'Software wasn\'t banned: {name}')
@cli.group('whitelist') @cli.group('whitelist')
def cli_whitelist(): def cli_whitelist():
'Manage the instance whitelist' 'Manage the instance whitelist'
@ -294,9 +375,11 @@ def cli_whitelist():
@cli_whitelist.command('list') @cli_whitelist.command('list')
def cli_whitelist_list(): def cli_whitelist_list():
'List all the instances in the whitelist'
click.echo('Current whitelisted domains') click.echo('Current whitelisted domains')
for domain in app['config'].whitelist: for domain in app.config.whitelist:
click.echo(f'- {domain}') click.echo(f'- {domain}')
@ -305,12 +388,10 @@ def cli_whitelist_list():
def cli_whitelist_add(instance): def cli_whitelist_add(instance):
'Add an instance to the whitelist' 'Add an instance to the whitelist'
config = app['config'] if not app.config.add_whitelist(instance):
if not config.add_whitelist(instance):
return click.echo(f'Instance already in the whitelist: {instance}') return click.echo(f'Instance already in the whitelist: {instance}')
config.save() app.config.save()
click.echo(f'Instance added to the whitelist: {instance}') click.echo(f'Instance added to the whitelist: {instance}')
@ -319,106 +400,24 @@ def cli_whitelist_add(instance):
def cli_whitelist_remove(instance): def cli_whitelist_remove(instance):
'Remove an instance from the whitelist' 'Remove an instance from the whitelist'
config = app['config'] if not app.config.del_whitelist(instance):
database = app['database']
inbox = database.get_inbox(instance)
if not config.del_whitelist(instance):
return click.echo(f'Instance not in the whitelist: {instance}') return click.echo(f'Instance not in the whitelist: {instance}')
config.save() app.config.save()
if inbox and config.whitelist_enabled: if app.config.whitelist_enabled:
database.del_inbox(inbox) if app.database.del_inbox(instance):
database.save() app.database.save()
click.echo(f'Removed instance from the whitelist: {instance}') click.echo(f'Removed instance from the whitelist: {instance}')
@cli.command('setup') @cli_whitelist.command('import')
def relay_setup(): def cli_whitelist_import():
'Generate a new config' 'Add all current inboxes to the whitelist'
config = app['config'] for domain in app.database.hostnames:
cli_whitelist_add.callback(domain)
while True:
config.host = click.prompt('What domain will the relay be hosted on?', default=config.host)
if not config.host.endswith('example.com'):
break
click.echo('The domain must not be example.com')
config.listen = click.prompt('Which address should the relay listen on?', default=config.listen)
while True:
config.port = click.prompt('What TCP port should the relay listen on?', default=config.port, type=int)
break
config.save()
if not app['is_docker'] and click.confirm('Relay all setup! Would you like to run it now?'):
relay_run.callback()
@cli.command('run')
def relay_run():
'Run the relay'
config = app['config']
if config.host.endswith('example.com'):
return click.echo('Relay is not set up. Please edit your relay config or run "activityrelay setup".')
vers_split = platform.python_version().split('.')
pip_command = 'pip3 uninstall pycrypto && pip3 install pycryptodome'
if Crypto.__version__ == '2.6.1':
if int(vers_split[1]) > 7:
click.echo('Error: PyCrypto is broken on Python 3.8+. Please replace it with pycryptodome before running again. Exiting...')
return click.echo(pip_command)
else:
click.echo('Warning: PyCrypto is old and should be replaced with pycryptodome')
return click.echo(pip_command)
if not misc.check_open_port(config.listen, config.port):
return click.echo(f'Error: A server is already running on port {config.port}')
# web pages
app.router.add_get('/', views.home)
# endpoints
app.router.add_post('/actor', views.inbox)
app.router.add_post('/inbox', views.inbox)
app.router.add_get('/actor', views.actor)
app.router.add_get('/nodeinfo/2.0.json', views.nodeinfo_2_0)
app.router.add_get('/.well-known/nodeinfo', views.nodeinfo_wellknown)
app.router.add_get('/.well-known/webfinger', views.webfinger)
if logging.DEBUG >= logging.root.level:
app.router.add_get('/stats', views.stats)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
asyncio.ensure_future(handle_start_webserver(), loop=loop)
loop.run_forever()
def run_in_loop(func, *args, **kwargs):
loop = asyncio.new_event_loop()
return loop.run_until_complete(func(*args, **kwargs))
async def handle_start_webserver():
config = app['config']
runner = AppRunner(app, access_log_format='%{X-Forwarded-For}i "%r" %s %b "%{Referer}i" "%{User-Agent}i"')
logging.info(f'Starting webserver at {config.host} ({config.listen}:{config.port})')
await runner.setup()
site = TCPSite(runner, config.listen, config.port)
await site.start()
def main(): def main():

View file

@ -1,32 +1,69 @@
import aputils
import asyncio import asyncio
import base64 import base64
import json import json
import logging import logging
import socket import socket
import traceback import traceback
import uuid
from Crypto.Hash import SHA, SHA256, SHA512 from aiohttp.hdrs import METH_ALL as METHODS
from Crypto.PublicKey import RSA from aiohttp.web import Response as AiohttpResponse, View as AiohttpView
from Crypto.Signature import PKCS1_v1_5
from aiohttp import ClientSession
from datetime import datetime from datetime import datetime
from json.decoder import JSONDecodeError from json.decoder import JSONDecodeError
from urllib.parse import urlparse from urllib.parse import urlparse
from uuid import uuid4 from uuid import uuid4
from . import app
from .http_debug import http_debug
app = None
HASHES = { MIMETYPES = {
'sha1': SHA, 'activity': 'application/activity+json',
'sha256': SHA256, 'html': 'text/html',
'sha512': SHA512 'json': 'application/json',
'text': 'text/plain'
}
NODEINFO_NS = {
'20': 'http://nodeinfo.diaspora.software/ns/schema/2.0',
'21': 'http://nodeinfo.diaspora.software/ns/schema/2.1'
} }
def build_signing_string(headers, used_headers): def set_app(new_app):
return '\n'.join(map(lambda x: ': '.join([x.lower(), headers[x]]), used_headers)) global app
app = new_app
def boolean(value):
if isinstance(value, str):
if value.lower() in ['on', 'y', 'yes', 'true', 'enable', 'enabled', '1']:
return True
elif value.lower() in ['off', 'n', 'no', 'false', 'disable', 'disable', '0']:
return False
else:
raise TypeError(f'Cannot parse string "{value}" as a boolean')
elif isinstance(value, int):
if value == 1:
return True
elif value == 0:
return False
else:
raise ValueError('Integer value must be 1 or 0')
elif value == None:
return False
try:
return value.__bool__()
except AttributeError:
raise TypeError(f'Cannot convert object of type "{clsname(value)}"')
def check_open_port(host, port): def check_open_port(host, port):
@ -41,279 +78,263 @@ def check_open_port(host, port):
return False return False
def create_signature_header(headers): class DotDict(dict):
headers = {k.lower(): v for k, v in headers.items()} def __init__(self, _data, **kwargs):
used_headers = headers.keys() dict.__init__(self)
sigstring = build_signing_string(headers, used_headers)
sig = { self.update(_data, **kwargs)
'keyId': app['config'].keyid,
'algorithm': 'rsa-sha256',
'headers': ' '.join(used_headers),
'signature': sign_signing_string(sigstring, app['database'].PRIVKEY)
}
chunks = ['{}="{}"'.format(k, v) for k, v in sig.items()]
return ','.join(chunks)
def distill_object_id(activity): def __getattr__(self, k):
logging.debug(f'>> determining object ID for {activity["object"]}')
try: try:
return activity['object']['id'] return self[k]
except TypeError:
return activity['object']
def distill_inboxes(actor, object_id):
database = app['database']
origin_hostname = urlparse(object_id).hostname
actor_inbox = get_actor_inbox(actor)
targets = []
for inbox in database.inboxes:
if inbox != actor_inbox or urlparse(inbox).hostname != origin_hostname:
targets.append(inbox)
return targets
def generate_body_digest(body):
bodyhash = app['cache'].digests.get(body)
if bodyhash:
return bodyhash
h = SHA256.new(body.encode('utf-8'))
bodyhash = base64.b64encode(h.digest()).decode('utf-8')
app['cache'].digests[body] = bodyhash
return bodyhash
def get_actor_inbox(actor):
return actor.get('endpoints', {}).get('sharedInbox', actor['inbox'])
def sign_signing_string(sigstring, key):
pkcs = PKCS1_v1_5.new(key)
h = SHA256.new()
h.update(sigstring.encode('ascii'))
sigdata = pkcs.sign(h)
return base64.b64encode(sigdata).decode('utf-8')
def split_signature(sig):
default = {"headers": "date"}
sig = sig.strip().split(',')
for chunk in sig:
k, _, v = chunk.partition('=')
v = v.strip('\"')
default[k] = v
default['headers'] = default['headers'].split()
return default
async def fetch_actor_key(actor):
actor_data = await request(actor)
if not actor_data:
return None
try:
return RSA.importKey(actor_data['publicKey']['publicKeyPem'])
except Exception as e:
logging.debug(f'Exception occured while fetching actor key: {e}')
async def fetch_nodeinfo(domain):
nodeinfo_url = None
wk_nodeinfo = await request(f'https://{domain}/.well-known/nodeinfo', sign_headers=False, activity=False)
if not wk_nodeinfo:
return
for link in wk_nodeinfo.get('links', ''):
if link['rel'] == 'http://nodeinfo.diaspora.software/ns/schema/2.0':
nodeinfo_url = link['href']
break
if not nodeinfo_url:
return
nodeinfo_data = await request(nodeinfo_url, sign_headers=False, activity=False)
try:
return nodeinfo_data['software']['name']
except KeyError: except KeyError:
return False raise AttributeError(f'{self.__class__.__name__} object has no attribute {k}') from None
async def follow_remote_actor(actor_uri): def __setattr__(self, k, v):
config = app['config'] if k.startswith('_'):
super().__setattr__(k, v)
actor = await request(actor_uri) else:
inbox = get_actor_inbox(actor) self[k] = v
if not actor:
logging.error(f'failed to fetch actor at: {actor_uri}')
return
logging.verbose(f'sending follow request: {actor_uri}')
message = {
"@context": "https://www.w3.org/ns/activitystreams",
"type": "Follow",
"to": [actor['id']],
"object": actor['id'],
"id": f"https://{config.host}/activities/{uuid4()}",
"actor": f"https://{config.host}/actor"
}
await request(inbox, message)
async def unfollow_remote_actor(actor_uri): def __setitem__(self, k, v):
config = app['config'] if type(v) == dict:
v = DotDict(v)
actor = await request(actor_uri) super().__setitem__(k, v)
if not actor:
logging.error(f'failed to fetch actor: {actor_uri}')
return
inbox = get_actor_inbox(actor) def __delattr__(self, k):
logging.verbose(f'sending unfollow request to inbox: {inbox}') try:
dict.__delitem__(self, k)
message = { except KeyError:
"@context": "https://www.w3.org/ns/activitystreams", raise AttributeError(f'{self.__class__.__name__} object has no attribute {k}') from None
"type": "Undo",
"to": [actor_uri],
"object": { @classmethod
"type": "Follow", def new_from_json(cls, data):
"object": actor_uri, if not data:
"actor": actor_uri, raise JSONDecodeError('Empty body', data, 1)
"id": f"https://{config.host}/activities/{uuid4()}"
try:
return cls(json.loads(data))
except ValueError:
raise JSONDecodeError('Invalid body', data, 1)
@classmethod
def new_from_signature(cls, sig):
data = cls({})
for chunk in sig.strip().split(','):
key, value = chunk.split('=', 1)
value = value.strip('\"')
if key == 'headers':
value = value.split()
data[key.lower()] = value
return data
def to_json(self, indent=None):
return json.dumps(self, indent=indent)
def update(self, _data, **kwargs):
if isinstance(_data, dict):
for key, value in _data.items():
self[key] = value
elif isinstance(_data, (list, tuple, set)):
for key, value in _data:
self[key] = value
for key, value in kwargs.items():
self[key] = value
class Message(DotDict):
@classmethod
def new_actor(cls, host, pubkey, description=None):
return cls({
'@context': 'https://www.w3.org/ns/activitystreams',
'id': f'https://{host}/actor',
'type': 'Application',
'preferredUsername': 'relay',
'name': 'ActivityRelay',
'summary': description or 'ActivityRelay bot',
'followers': f'https://{host}/followers',
'following': f'https://{host}/following',
'inbox': f'https://{host}/inbox',
'url': f'https://{host}/inbox',
'endpoints': {
'sharedInbox': f'https://{host}/inbox'
}, },
"id": f"https://{config.host}/activities/{uuid4()}", 'publicKey': {
"actor": f"https://{config.host}/actor" 'id': f'https://{host}/actor#main-key',
'owner': f'https://{host}/actor',
'publicKeyPem': pubkey
} }
await request(inbox, message)
async def request(uri, data=None, force=False, sign_headers=True, activity=True):
## If a get request and not force, try to use the cache first
if not data and not force:
try:
return app['cache'].json[uri]
except KeyError:
pass
url = urlparse(uri)
method = 'POST' if data else 'GET'
headers = {'User-Agent': 'ActivityRelay'}
mimetype = 'application/activity+json' if activity else 'application/json'
## Set the content type for a POST
if data and 'Content-Type' not in headers:
headers['Content-Type'] = mimetype
## Set the accepted content type for a GET
elif not data and 'Accept' not in headers:
headers['Accept'] = mimetype
if sign_headers:
signing_headers = {
'(request-target)': f'{method.lower()} {url.path}',
'Date': datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT'),
'Host': url.netloc
}
if data:
assert isinstance(data, dict)
action = data.get('type')
data = json.dumps(data)
signing_headers.update({
'Digest': f'SHA-256={generate_body_digest(data)}',
'Content-Length': str(len(data.encode('utf-8')))
}) })
signing_headers['Signature'] = create_signature_header(signing_headers)
del signing_headers['(request-target)'] @classmethod
del signing_headers['Host'] def new_announce(cls, host, object):
return cls({
headers.update(signing_headers) '@context': 'https://www.w3.org/ns/activitystreams',
'id': f'https://{host}/activities/{uuid.uuid4()}',
try: 'type': 'Announce',
# json_serializer=DotDict maybe? 'to': [f'https://{host}/followers'],
async with ClientSession(trace_configs=http_debug()) as session, app['semaphore']: 'actor': f'https://{host}/actor',
async with session.request(method, uri, headers=headers, data=data) as resp: 'object': object
## aiohttp has been known to leak if the response hasn't been read, })
## so we're just gonna read the request no matter what
resp_data = await resp.read()
resp_payload = json.loads(resp_data.decode('utf-8'))
if resp.status not in [200, 202]:
if not data:
logging.verbose(f'Received error when requesting {uri}: {resp.status} {resp_payload}')
return
logging.verbose(f'Received error when sending {action} to {uri}: {resp.status} {resp_payload}')
return
logging.debug(f'{uri} >> resp {resp_payload}')
app['cache'].json[uri] = resp_payload
return resp_payload
except JSONDecodeError:
return
except Exception:
traceback.print_exc()
async def validate_signature(actor, http_request): @classmethod
pubkey = await fetch_actor_key(actor) def new_follow(cls, host, actor):
return cls({
'@context': 'https://www.w3.org/ns/activitystreams',
'type': 'Follow',
'to': [actor],
'object': actor,
'id': f'https://{host}/activities/{uuid.uuid4()}',
'actor': f'https://{host}/actor'
})
if not pubkey:
return False
logging.debug(f'actor key: {pubkey}') @classmethod
def new_unfollow(cls, host, actor, follow):
return cls({
'@context': 'https://www.w3.org/ns/activitystreams',
'id': f'https://{host}/activities/{uuid.uuid4()}',
'type': 'Undo',
'to': [actor],
'actor': f'https://{host}/actor',
'object': follow
})
headers = {key.lower(): value for key, value in http_request.headers.items()}
headers['(request-target)'] = ' '.join([http_request.method.lower(), http_request.path])
sig = split_signature(headers['signature']) @classmethod
logging.debug(f'sigdata: {sig}') def new_response(cls, host, actor, followid, accept):
return cls({
'@context': 'https://www.w3.org/ns/activitystreams',
'id': f'https://{host}/activities/{uuid.uuid4()}',
'type': 'Accept' if accept else 'Reject',
'to': [actor],
'actor': f'https://{host}/actor',
'object': {
'id': followid,
'type': 'Follow',
'object': f'https://{host}/actor',
'actor': actor
}
})
sigstring = build_signing_string(headers, sig['headers'])
logging.debug(f'sigstring: {sigstring}')
sign_alg, _, hash_alg = sig['algorithm'].partition('-') # misc properties
logging.debug(f'sign alg: {sign_alg}, hash alg: {hash_alg}') @property
def domain(self):
return urlparse(self.id).hostname
sigdata = base64.b64decode(sig['signature'])
pkcs = PKCS1_v1_5.new(pubkey) # actor properties
h = HASHES[hash_alg].new() @property
h.update(sigstring.encode('ascii')) def shared_inbox(self):
result = pkcs.verify(h, sigdata) return self.get('endpoints', {}).get('sharedInbox', self.inbox)
http_request['validated'] = result
logging.debug(f'validates? {result}') # activity properties
return result @property
def actorid(self):
if isinstance(self.actor, dict):
return self.actor.id
return self.actor
@property
def objectid(self):
if isinstance(self.object, dict):
return self.object.id
return self.object
@property
def signer(self):
return aputils.Signer.new_from_actor(self)
class Response(AiohttpResponse):
@classmethod
def new(cls, body='', status=200, headers=None, ctype='text'):
kwargs = {
'status': status,
'headers': headers,
'content_type': MIMETYPES[ctype]
}
if isinstance(body, bytes):
kwargs['body'] = body
elif isinstance(body, dict) and ctype in {'json', 'activity'}:
kwargs['text'] = json.dumps(body)
else:
kwargs['text'] = body
return cls(**kwargs)
@classmethod
def new_error(cls, status, body, ctype='text'):
if ctype == 'json':
body = json.dumps({'status': status, 'error': body})
return cls.new(body=body, status=status, ctype=ctype)
@property
def location(self):
return self.headers.get('Location')
@location.setter
def location(self, value):
self.headers['Location'] = value
class View(AiohttpView):
async def _iter(self):
if self.request.method not in METHODS:
self._raise_allowed_methods()
method = getattr(self, self.request.method.lower(), None)
if method is None:
self._raise_allowed_methods()
return await method(**self.request.match_info)
@property
def app(self):
return self._request.app
@property
def config(self):
return self.app.config
@property
def database(self):
return self.app.database

View file

@ -1,110 +1,138 @@
import asyncio import asyncio
import logging import logging
from cachetools import LRUCache
from uuid import uuid4 from uuid import uuid4
from . import app, misc from .misc import Message
async def handle_relay(actor, data, request): cache = LRUCache(1024)
cache = app['cache'].objects
object_id = misc.distill_object_id(data)
if object_id in cache:
logging.verbose(f'already relayed {object_id} as {cache[object_id]}') def person_check(actor, software):
## pleroma and akkoma may use Person for the actor type for some reason
if software in {'akkoma', 'pleroma'} and actor.id == f'https://{actor.domain}/relay':
return False
## make sure the actor is an application
if actor.type != 'Application':
return True
async def handle_relay(request):
if request.message.objectid in cache:
logging.verbose(f'already relayed {request.message.objectid}')
return return
logging.verbose(f'Relaying post from {actor["id"]}') message = Message.new_announce(
host = request.config.host,
activity_id = f"https://{request.host}/activities/{uuid4()}" object = request.message.objectid
)
message = {
"@context": "https://www.w3.org/ns/activitystreams",
"type": "Announce",
"to": [f"https://{request.host}/followers"],
"actor": f"https://{request.host}/actor",
"object": object_id,
"id": activity_id
}
cache[request.message.objectid] = message.id
logging.debug(f'>> relay: {message}') logging.debug(f'>> relay: {message}')
inboxes = misc.distill_inboxes(actor, object_id) inboxes = request.database.distill_inboxes(request.message)
futures = [misc.request(inbox, data=message) for inbox in inboxes]
asyncio.ensure_future(asyncio.gather(*futures)) for inbox in inboxes:
cache[object_id] = activity_id request.app.push_message(inbox, message)
async def handle_forward(actor, data, request): async def handle_forward(request):
cache = app['cache'].objects if request.message.id in cache:
object_id = misc.distill_object_id(data) logging.verbose(f'already forwarded {request.message.id}')
if object_id in cache:
logging.verbose(f'already forwarded {object_id}')
return return
logging.verbose(f'Forwarding post from {actor["id"]}') message = Message.new_announce(
logging.debug(f'>> Relay {data}') host = request.config.host,
object = request.message
)
inboxes = misc.distill_inboxes(actor, object_id) cache[request.message.id] = message.id
logging.debug(f'>> forward: {message}')
futures = [misc.request(inbox, data=data) for inbox in inboxes] inboxes = request.database.distill_inboxes(request.message)
asyncio.ensure_future(asyncio.gather(*futures))
cache[object_id] = object_id for inbox in inboxes:
request.app.push_message(inbox, message)
async def handle_follow(actor, data, request): async def handle_follow(request):
config = app['config'] nodeinfo = await request.app.client.fetch_nodeinfo(request.actor.domain)
database = app['database'] software = nodeinfo.sw_name if nodeinfo else None
inbox = misc.get_actor_inbox(actor) ## reject if software used by actor is banned
if request.config.is_banned_software(software):
request.app.push_message(
request.actor.shared_inbox,
Message.new_response(
host = request.config.host,
actor = request.actor.id,
followid = request.message.id,
accept = False
)
)
if inbox not in database.inboxes: return logging.verbose(f'Rejected follow from actor for using specific software: actor={request.actor.id}, software={software}')
database.add_inbox(inbox)
database.save()
asyncio.ensure_future(misc.follow_remote_actor(actor['id']))
message = { ## reject if the actor is not an instance actor
"@context": "https://www.w3.org/ns/activitystreams", if person_check(request.actor, software):
"type": "Accept", request.app.push_message(
"to": [actor["id"]], request.actor.shared_inbox,
"actor": config.actor, Message.new_response(
host = request.config.host,
actor = request.actor.id,
followid = request.message.id,
accept = False
)
)
# this is wrong per litepub, but mastodon < 2.4 is not compliant with that profile. return logging.verbose(f'Non-application actor tried to follow: {request.actor.id}')
"object": {
"type": "Follow",
"id": data["id"],
"object": config.actor,
"actor": actor["id"]
},
"id": f"https://{request.host}/activities/{uuid4()}", request.database.add_inbox(request.actor.shared_inbox, request.message.id, software)
} request.database.save()
asyncio.ensure_future(misc.request(inbox, message)) request.app.push_message(
request.actor.shared_inbox,
Message.new_response(
host = request.config.host,
actor = request.actor.id,
followid = request.message.id,
accept = True
)
)
# Are Akkoma and Pleroma the only two that expect a follow back?
# Ignoring only Mastodon for now
if software != 'mastodon':
request.app.push_message(
request.actor.shared_inbox,
Message.new_follow(
host = request.config.host,
actor = request.actor.id
)
)
async def handle_undo(actor, data, request): async def handle_undo(request):
## If the activity being undone is an Announce, forward it insteead ## If the object is not a Follow, forward it
if data['object']['type'] == 'Announce': if request.message.object.type != 'Follow':
await handle_forward(actor, data, request) return await handle_forward(request)
if not request.database.del_inbox(request.actor.domain, request.message.id):
return return
elif data['object']['type'] != 'Follow': request.database.save()
return
database = app['database'] request.app.push_message(
inbox = database.get_inbox(actor['id']) request.actor.shared_inbox,
Message.new_unfollow(
if not inbox: host = request.config.host,
return actor = request.actor.id,
follow = request.message
database.del_inbox(inbox) )
database.save() )
await misc.unfollow_remote_actor(actor['id'])
processors = { processors = {
@ -117,9 +145,16 @@ processors = {
} }
async def run_processor(request, data, actor): async def run_processor(request):
if data['type'] not in processors: if request.message.type not in processors:
return return
logging.verbose(f'New activity from actor: {actor["id"]} {data["type"]}') if request.instance and not request.instance.get('software'):
return await processors[data['type']](actor, data, request) nodeinfo = await request.app.client.fetch_nodeinfo(request.instance['domain'])
if nodeinfo:
request.instance['software'] = nodeinfo.sw_name
request.database.save()
logging.verbose(f'New "{request.message.type}" from actor: {request.actor.id}')
return await processors[request.message.type](request)

View file

@ -1,28 +1,47 @@
import aputils
import asyncio
import logging import logging
import subprocess import subprocess
import traceback import traceback
from aiohttp.web import HTTPForbidden, HTTPUnauthorized, Response, json_response from pathlib import Path
from urllib.parse import urlparse
from . import __version__, app, misc from . import __version__, misc
from .http_debug import STATS from .misc import DotDict, Message, Response
from .processors import run_processor from .processors import run_processor
routes = []
version = __version__
if Path(__file__).parent.parent.joinpath('.git').exists():
try: try:
commit_label = subprocess.check_output(["git", "rev-parse", "HEAD"]).strip().decode('ascii') commit_label = subprocess.check_output(["git", "rev-parse", "HEAD"]).strip().decode('ascii')
version = f'{__version__} {commit_label}' version = f'{__version__} {commit_label}'
except: except:
version = __version__ pass
def register_route(method, path):
def wrapper(func):
routes.append([method, path, func])
return func
return wrapper
@register_route('GET', '/')
async def home(request): async def home(request):
targets = '<br>'.join(app['database'].hostnames) targets = '<br>'.join(request.database.hostnames)
text = """ note = request.config.note
count = len(request.database.hostnames)
host = request.config.host
text = f"""
<html><head> <html><head>
<title>ActivityPub Relay at {host}</title> <title>SEDI中繼器</title>
<style> <style>
p {{ color: #FFFFFF; font-family: monospace, arial; font-size: 100%; }} p {{ color: #FFFFFF; font-family: monospace, arial; font-size: 100%; }}
body {{ background-color: #000000; }} body {{ background-color: #000000; }}
@ -34,170 +53,142 @@ a:hover {{ color: #8AF; }}
<body> <body>
<p>This is an Activity Relay for fediverse instances.</p> <p>This is an Activity Relay for fediverse instances.</p>
<p>{note}</p> <p>{note}</p>
<p>You may subscribe to this relay with the address: <a href="https://{host}/actor">https://{host}/actor</a></p> <p>Misskey及Mastodon站長請訂閱這個地址<a href="https://{host}/inbox">https://{host}/inbox</a></p>
<p>To host your own relay, you may download the code at this address: <a href="https://git.pleroma.social/pleroma/relay">https://git.pleroma.social/pleroma/relay</a></p> <p>Pleroma及Friendica站長請訂閱這個地址<a href="https://{host}/actor">https://{host}/actor</a></p>
<br><p>List of {count} registered instances:<br>{targets}</p> <p>原始碼<a href="https://git.seediqbale.xyz/pch_xyz/sedi-relay">https://git.seediqbale.xyz/pch_xyz/sedi-relay</a></p>
</body></html>""".format(host=request.host, note=app['config'].note, targets=targets, count=len(app['database'].inboxes)) <p>請我喝杯咖啡<a href="https://buymeacoffee.com/SEDI">https://buymeacoffee.com/SEDI</a></p>
<p>activityrelay v0.2.4</p>
<br><p> {count} 個實例訂閱中<br>{targets}</p>
</body></html>"""
return Response( return Response.new(text, ctype='html')
status = 200,
content_type = 'text/html',
charset = 'utf-8', @register_route('GET', '/inbox')
text = text @register_route('GET', '/actor')
async def actor(request):
data = Message.new_actor(
host = request.config.host,
pubkey = request.database.signer.pubkey
) )
return Response.new(data, ctype='activity')
async def actor(request):
database = app['database']
data = {
"@context": "https://www.w3.org/ns/activitystreams",
"endpoints": {
"sharedInbox": f"https://{request.host}/inbox"
},
"followers": f"https://{request.host}/followers",
"following": f"https://{request.host}/following",
"inbox": f"https://{request.host}/inbox",
"name": "ActivityRelay",
"type": "Application",
"id": f"https://{request.host}/actor",
"publicKey": {
"id": f"https://{request.host}/actor#main-key",
"owner": f"https://{request.host}/actor",
"publicKeyPem": database.pubkey
},
"summary": "ActivityRelay bot",
"preferredUsername": "relay",
"url": f"https://{request.host}/actor"
}
return json_response(data, content_type='application/activity+json')
@register_route('POST', '/inbox')
@register_route('POST', '/actor')
async def inbox(request): async def inbox(request):
config = app['config'] config = request.config
database = app['database'] database = request.database
## reject if missing signature header ## reject if missing signature header
if 'signature' not in request.headers: if not request.signature:
logging.verbose('Actor missing signature header') logging.verbose('Actor missing signature header')
raise HTTPUnauthorized(body='missing signature') raise HTTPUnauthorized(body='missing signature')
## read message and get actor id and domain
try: try:
data = await request.json() request['message'] = await request.json(loads=Message.new_from_json)
actor_id = data['actor']
actor_domain = urlparse(actor_id).hostname
except KeyError: ## reject if there is no message
logging.verbose('actor not in data') if not request.message:
raise HTTPUnauthorized(body='no actor in message') logging.verbose('empty message')
return Response.new_error(400, 'missing message', 'json')
## reject if there is no actor in the message ## reject if there is no actor in the message
if 'actor' not in request.message:
logging.verbose('actor not in message')
return Response.new_error(400, 'no actor in message', 'json')
except: except:
## this code should hopefully never get called
traceback.print_exc() traceback.print_exc()
logging.verbose('Failed to parse inbox message') logging.verbose('Failed to parse inbox message')
raise HTTPUnauthorized(body='failed to parse message') return Response.new_error(400, 'failed to parse message', 'json')
actor = await misc.request(actor_id) request['actor'] = await request.app.client.get(request.signature.keyid, sign_headers=True)
## reject if actor is empty ## reject if actor is empty
if not actor: if not request.actor:
logging.verbose(f'Failed to fetch actor: {actor_id}') ## ld signatures aren't handled atm, so just ignore it
raise HTTPUnauthorized('failed to fetch actor') if request['message'].type == 'Delete':
logging.verbose(f'Instance sent a delete which cannot be handled')
return Response.new(status=202)
logging.verbose(f'Failed to fetch actor: {request.signature.keyid}')
return Response.new_error(400, 'failed to fetch actor', 'json')
request['instance'] = request.database.get_inbox(request['actor'].inbox)
## reject if the actor isn't whitelisted while the whiltelist is enabled ## reject if the actor isn't whitelisted while the whiltelist is enabled
elif config.whitelist_enabled and not config.is_whitelisted(actor_id): if config.whitelist_enabled and not config.is_whitelisted(request.actor.domain):
logging.verbose(f'Rejected actor for not being in the whitelist: {actor_id}') logging.verbose(f'Rejected actor for not being in the whitelist: {request.actor.id}')
raise HTTPForbidden(body='access denied') return Response.new_error(403, 'access denied', 'json')
## reject if actor is banned ## reject if actor is banned
if app['config'].is_banned(actor_id): if request.config.is_banned(request.actor.domain):
logging.verbose(f'Ignored request from banned actor: {actor_id}') logging.verbose(f'Ignored request from banned actor: {actor.id}')
raise HTTPForbidden(body='access denied') return Response.new_error(403, 'access denied', 'json')
## reject if software used by actor is banned
if len(config.blocked_software):
software = await misc.fetch_nodeinfo(actor_domain)
if config.is_banned_software(software):
logging.verbose(f'Rejected actor for using specific software: {software}')
raise HTTPForbidden(body='access denied')
## reject if the signature is invalid ## reject if the signature is invalid
if not (await misc.validate_signature(actor_id, request)): try:
logging.verbose(f'signature validation failed for: {actor_id}') await request.actor.signer.validate_aiohttp_request(request)
raise HTTPUnauthorized(body='signature check failed, signature did not match key')
except aputils.SignatureValidationError as e:
logging.verbose(f'signature validation failed for: {actor.id}')
logging.debug(str(e))
return Response.new_error(401, str(e), 'json')
## reject if activity type isn't 'Follow' and the actor isn't following ## reject if activity type isn't 'Follow' and the actor isn't following
if data['type'] != 'Follow' and not database.get_inbox(actor_domain): if request.message.type != 'Follow' and not database.get_inbox(request.actor.domain):
logging.verbose(f'Rejected actor for trying to post while not following: {actor_id}') logging.verbose(f'Rejected actor for trying to post while not following: {request.actor.id}')
raise HTTPUnauthorized(body='access denied') return Response.new_error(401, 'access denied', 'json')
logging.debug(f">> payload {data}") logging.debug(f">> payload {request.message.to_json(4)}")
await run_processor(request, data, actor) asyncio.ensure_future(run_processor(request))
return Response(body=b'{}', content_type='application/activity+json') return Response.new(status=202)
@register_route('GET', '/.well-known/webfinger')
async def webfinger(request): async def webfinger(request):
config = app['config'] try:
subject = request.query['resource'] subject = request.query['resource']
if subject != f'acct:relay@{request.host}': except KeyError:
return json_response({'error': 'user not found'}, status=404) return Response.new_error(400, 'missing \'resource\' query key', 'json')
data = { if subject != f'acct:relay@{request.config.host}':
'subject': subject, return Response.new_error(404, 'user not found', 'json')
'aliases': [config.actor],
'links': [
{'href': config.actor, 'rel': 'self', 'type': 'application/activity+json'},
{'href': config.actor, 'rel': 'self', 'type': 'application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\"'}
]
}
return json_response(data) data = aputils.Webfinger.new(
handle = 'relay',
domain = request.config.host,
actor = request.config.actor
)
return Response.new(data, ctype='json')
async def nodeinfo_2_0(request): @register_route('GET', '/nodeinfo/{version:\d.\d\.json}')
data = { async def nodeinfo(request):
# XXX - is this valid for a relay? niversion = request.match_info['version'][:3]
'openRegistrations': True,
'protocols': ['activitypub'],
'services': {
'inbound': [],
'outbound': []
},
'software': {
'name': 'activityrelay',
'version': version
},
'usage': {
'localPosts': 0,
'users': {
'total': 1
}
},
'metadata': {
'peers': app['database'].hostnames
},
'version': '2.0'
}
return json_response(data) data = dict(
name = 'activityrelay',
version = version,
protocols = ['activitypub'],
open_regs = not request.config.whitelist_enabled,
users = 1,
metadata = {'peers': request.database.hostnames}
)
if niversion == '2.1':
data['repo'] = 'https://git.pleroma.social/pleroma/relay'
return Response.new(aputils.Nodeinfo.new(**data), ctype='json')
@register_route('GET', '/.well-known/nodeinfo')
async def nodeinfo_wellknown(request): async def nodeinfo_wellknown(request):
data = { data = aputils.WellKnownNodeinfo.new_template(request.config.host)
'links': [ return Response.new(data, ctype='json')
{
'rel': 'http://nodeinfo.diaspora.software/ns/schema/2.0',
'href': f'https://{request.host}/nodeinfo/2.0.json'
}
]
}
return json_response(data)
async def stats(request):
return json_response(STATS)

View file

@ -1 +1,5 @@
. aiohttp>=3.8.0
aputils@https://git.barkshark.xyz/barkshark/aputils/archive/0.1.3.tar.gz
cachetools>=5.2.0
click>=8.1.2
pyyaml>=6.0

View file

@ -1,6 +1,6 @@
[metadata] [metadata]
name = relay name = relay
version = 0.2.2 version = attr: relay.__version__
description = Generic LitePub relay (works with all LitePub consumers and Mastodon) description = Generic LitePub relay (works with all LitePub consumers and Mastodon)
long_description = file: README.md long_description = file: README.md
long_description_content_type = text/markdown; charset=UTF-8 long_description_content_type = text/markdown; charset=UTF-8
@ -22,13 +22,12 @@ project_urls =
[options] [options]
zip_safe = False zip_safe = False
packages = find: packages = find:
install_requires = install_requires = file: requirements.txt
aiohttp >= 3.8.0 python_requires = >=3.7
cachetools >= 5.0.0
click >= 8.1.2 [options.extras_require]
pycryptodome >= 3.14.1 dev =
PyYAML >= 5.0.0 pyinstaller >= 5.6.0
python_requires = >=3.6
[options.entry_points] [options.entry_points]
console_scripts = console_scripts =