From 94e669cf7d7fe827c05d09c883934b44307377ea Mon Sep 17 00:00:00 2001 From: gled Date: Tue, 11 Jun 2019 10:54:05 -0700 Subject: [PATCH] use global semaphore in push_to_actor to prevent overloading system with outbound connections --- relay/actor.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/relay/actor.py b/relay/actor.py index b2117fd..7eefab7 100644 --- a/relay/actor.py +++ b/relay/actor.py @@ -30,6 +30,7 @@ if "actorKeys" not in DATABASE: PRIVKEY = RSA.importKey(DATABASE["actorKeys"]["privateKey"]) PUBKEY = PRIVKEY.publickey() +sem = asyncio.Semaphore(500) from . import app, CONFIG from .remote_actor import fetch_actor @@ -77,7 +78,6 @@ get_actor_inbox = lambda actor: actor.get('endpoints', {}).get('sharedInbox', ac async def push_message_to_actor(actor, message, our_key_id): inbox = get_actor_inbox(actor) - url = urlsplit(inbox) # XXX: Digest @@ -93,15 +93,17 @@ async def push_message_to_actor(actor, message, our_key_id): logging.debug('%r >> %r', inbox, message) - try: - async with aiohttp.ClientSession(trace_configs=[http_debug()]) as session: - async with session.post(inbox, data=data, headers=headers) as resp: - if resp.status == 202: - return - resp_payload = await resp.text() - logging.debug('%r >> resp %r', inbox, resp_payload) - except Exception as e: - logging.info('Caught %r while pushing to %r.', e, inbox) + global sem + async with sem: + try: + async with aiohttp.ClientSession(trace_configs=[http_debug()]) as session: + async with session.post(inbox, data=data, headers=headers) as resp: + if resp.status == 202: + return + resp_payload = await resp.text() + logging.debug('%r >> resp %r', inbox, resp_payload) + except Exception as e: + logging.info('Caught %r while pushing to %r.', e, inbox) async def follow_remote_actor(actor_uri): @@ -308,5 +310,4 @@ async def inbox(request): return aiohttp.web.Response(body=b'{}', content_type='application/activity+json') - app.router.add_post('/inbox', inbox)