mirror of
https://git.pleroma.social/pleroma/relay.git
synced 2024-11-10 02:17:59 +00:00
add optional push worker threads
This commit is contained in:
parent
c049657765
commit
9839da906c
|
@ -1,7 +1,9 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
|
import queue
|
||||||
import signal
|
import signal
|
||||||
|
import threading
|
||||||
|
|
||||||
from aiohttp import web
|
from aiohttp import web
|
||||||
from cachetools import LRUCache
|
from cachetools import LRUCache
|
||||||
|
@ -9,7 +11,7 @@ from datetime import datetime, timedelta
|
||||||
|
|
||||||
from .config import RelayConfig
|
from .config import RelayConfig
|
||||||
from .database import RelayDatabase
|
from .database import RelayDatabase
|
||||||
from .misc import DotDict, check_open_port, fetch_nodeinfo, set_app
|
from .misc import DotDict, check_open_port, request, set_app
|
||||||
from .views import routes
|
from .views import routes
|
||||||
|
|
||||||
|
|
||||||
|
@ -27,6 +29,8 @@ class Application(web.Application):
|
||||||
|
|
||||||
self['cache'] = DotDict({key: Cache(maxsize=self['config'][key]) for key in self['config'].cachekeys})
|
self['cache'] = DotDict({key: Cache(maxsize=self['config'][key]) for key in self['config'].cachekeys})
|
||||||
self['semaphore'] = asyncio.Semaphore(self['config'].push_limit)
|
self['semaphore'] = asyncio.Semaphore(self['config'].push_limit)
|
||||||
|
self['workers'] = []
|
||||||
|
self['last_worker'] = 0
|
||||||
|
|
||||||
set_app(self)
|
set_app(self)
|
||||||
|
|
||||||
|
@ -71,6 +75,16 @@ class Application(web.Application):
|
||||||
return timedelta(seconds=uptime.seconds)
|
return timedelta(seconds=uptime.seconds)
|
||||||
|
|
||||||
|
|
||||||
|
def push_message(self, inbox, message):
|
||||||
|
worker = self['workers'][self['last_worker']]
|
||||||
|
worker.queue.put((inbox, message))
|
||||||
|
|
||||||
|
self['last_worker'] += 1
|
||||||
|
|
||||||
|
if self['last_worker'] >= len(self['workers']):
|
||||||
|
self['last_worker'] = 0
|
||||||
|
|
||||||
|
|
||||||
def set_signal_handler(self):
|
def set_signal_handler(self):
|
||||||
for sig in {'SIGHUP', 'SIGINT', 'SIGQUIT', 'SIGTERM'}:
|
for sig in {'SIGHUP', 'SIGINT', 'SIGQUIT', 'SIGTERM'}:
|
||||||
try:
|
try:
|
||||||
|
@ -102,6 +116,13 @@ class Application(web.Application):
|
||||||
async def handle_run(self):
|
async def handle_run(self):
|
||||||
self['running'] = True
|
self['running'] = True
|
||||||
|
|
||||||
|
if self.config.workers > 0:
|
||||||
|
for i in range(self.config.workers):
|
||||||
|
worker = PushWorker(self)
|
||||||
|
worker.start()
|
||||||
|
|
||||||
|
self['workers'].append(worker)
|
||||||
|
|
||||||
runner = web.AppRunner(self, access_log_format='%{X-Forwarded-For}i "%r" %s %b "%{User-Agent}i"')
|
runner = web.AppRunner(self, access_log_format='%{X-Forwarded-For}i "%r" %s %b "%{User-Agent}i"')
|
||||||
await runner.setup()
|
await runner.setup()
|
||||||
|
|
||||||
|
@ -121,6 +142,7 @@ class Application(web.Application):
|
||||||
|
|
||||||
self['starttime'] = None
|
self['starttime'] = None
|
||||||
self['running'] = False
|
self['running'] = False
|
||||||
|
self['workers'].clear()
|
||||||
|
|
||||||
|
|
||||||
class Cache(LRUCache):
|
class Cache(LRUCache):
|
||||||
|
@ -128,6 +150,30 @@ class Cache(LRUCache):
|
||||||
self.__maxsize = int(value)
|
self.__maxsize = int(value)
|
||||||
|
|
||||||
|
|
||||||
|
class PushWorker(threading.Thread):
|
||||||
|
def __init__(self, app):
|
||||||
|
threading.Thread.__init__(self)
|
||||||
|
self.app = app
|
||||||
|
self.queue = queue.Queue()
|
||||||
|
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
asyncio.run(self.handle_queue())
|
||||||
|
|
||||||
|
|
||||||
|
async def handle_queue(self):
|
||||||
|
while self.app['running']:
|
||||||
|
try:
|
||||||
|
inbox, message = self.queue.get(block=True, timeout=0.25)
|
||||||
|
self.queue.task_done()
|
||||||
|
await request(inbox, message)
|
||||||
|
|
||||||
|
logging.verbose(f'New push from Thread-{threading.get_ident()}')
|
||||||
|
|
||||||
|
except queue.Empty:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
## Can't sub-class web.Request, so let's just add some properties
|
## Can't sub-class web.Request, so let's just add some properties
|
||||||
def request_actor(self):
|
def request_actor(self):
|
||||||
try: return self['actor']
|
try: return self['actor']
|
||||||
|
|
|
@ -50,7 +50,7 @@ class RelayConfig(DotDict):
|
||||||
if key in ['blocked_instances', 'blocked_software', 'whitelist']:
|
if key in ['blocked_instances', 'blocked_software', 'whitelist']:
|
||||||
assert isinstance(value, (list, set, tuple))
|
assert isinstance(value, (list, set, tuple))
|
||||||
|
|
||||||
elif key in ['port', 'json', 'objects', 'digests']:
|
elif key in ['port', 'workers', 'json', 'objects', 'digests']:
|
||||||
assert isinstance(value, (int))
|
assert isinstance(value, (int))
|
||||||
|
|
||||||
elif key == 'whitelist_enabled':
|
elif key == 'whitelist_enabled':
|
||||||
|
@ -92,6 +92,7 @@ class RelayConfig(DotDict):
|
||||||
'port': 8080,
|
'port': 8080,
|
||||||
'note': 'Make a note about your instance here.',
|
'note': 'Make a note about your instance here.',
|
||||||
'push_limit': 512,
|
'push_limit': 512,
|
||||||
|
'workers': 0,
|
||||||
'host': 'relay.example.com',
|
'host': 'relay.example.com',
|
||||||
'blocked_software': [],
|
'blocked_software': [],
|
||||||
'blocked_instances': [],
|
'blocked_instances': [],
|
||||||
|
@ -233,6 +234,7 @@ class RelayConfig(DotDict):
|
||||||
'port': self.port,
|
'port': self.port,
|
||||||
'note': self.note,
|
'note': self.note,
|
||||||
'push_limit': self.push_limit,
|
'push_limit': self.push_limit,
|
||||||
|
'workers': self.workers,
|
||||||
'ap': {key: self[key] for key in self.apkeys},
|
'ap': {key: self[key] for key in self.apkeys},
|
||||||
'cache': {key: self[key] for key in self.cachekeys}
|
'cache': {key: self[key] for key in self.cachekeys}
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,20 +11,24 @@ async def handle_relay(request):
|
||||||
logging.verbose(f'already relayed {request.message.objectid}')
|
logging.verbose(f'already relayed {request.message.objectid}')
|
||||||
return
|
return
|
||||||
|
|
||||||
logging.verbose(f'Relaying post from {request.message.actorid}')
|
|
||||||
|
|
||||||
message = misc.Message.new_announce(
|
message = misc.Message.new_announce(
|
||||||
host = request.config.host,
|
host = request.config.host,
|
||||||
object = request.message.objectid
|
object = request.message.objectid
|
||||||
)
|
)
|
||||||
|
|
||||||
|
request.cache.objects[request.message.objectid] = message.id
|
||||||
|
logging.verbose(f'Relaying post from {request.message.actorid}')
|
||||||
logging.debug(f'>> relay: {message}')
|
logging.debug(f'>> relay: {message}')
|
||||||
|
|
||||||
inboxes = misc.distill_inboxes(request.actor, request.message.objectid)
|
inboxes = misc.distill_inboxes(request.actor, request.message.objectid)
|
||||||
futures = [misc.request(inbox, data=message) for inbox in inboxes]
|
|
||||||
|
|
||||||
asyncio.ensure_future(asyncio.gather(*futures))
|
if request.config.workers > 0:
|
||||||
request.cache.objects[request.message.objectid] = message.id
|
for inbox in inboxes:
|
||||||
|
request.app.push_message(inbox, message)
|
||||||
|
|
||||||
|
else:
|
||||||
|
futures = [misc.request(inbox, data=message) for inbox in inboxes]
|
||||||
|
asyncio.ensure_future(asyncio.gather(*futures))
|
||||||
|
|
||||||
|
|
||||||
async def handle_forward(request):
|
async def handle_forward(request):
|
||||||
|
@ -37,14 +41,19 @@ async def handle_forward(request):
|
||||||
object = request.message
|
object = request.message
|
||||||
)
|
)
|
||||||
|
|
||||||
|
request.cache.objects[request.message.id] = message.id
|
||||||
logging.verbose(f'Forwarding post from {request.actor.id}')
|
logging.verbose(f'Forwarding post from {request.actor.id}')
|
||||||
logging.debug(f'>> Relay {request.message}')
|
logging.debug(f'>> Relay {request.message}')
|
||||||
|
|
||||||
inboxes = misc.distill_inboxes(request.actor, request.message.id)
|
inboxes = misc.distill_inboxes(request.actor, request.message.objectid)
|
||||||
futures = [misc.request(inbox, data=message) for inbox in inboxes]
|
|
||||||
|
|
||||||
asyncio.ensure_future(asyncio.gather(*futures))
|
if request.config.workers > 0:
|
||||||
request.cache.objects[request.message.id] = message.id
|
for inbox in inboxes:
|
||||||
|
request.app.push_message(inbox, message)
|
||||||
|
|
||||||
|
else:
|
||||||
|
futures = [misc.request(inbox, data=message) for inbox in inboxes]
|
||||||
|
asyncio.ensure_future(asyncio.gather(*futures))
|
||||||
|
|
||||||
|
|
||||||
async def handle_follow(request):
|
async def handle_follow(request):
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
import subprocess
|
import subprocess
|
||||||
import traceback
|
import traceback
|
||||||
|
@ -137,7 +138,7 @@ async def inbox(request):
|
||||||
|
|
||||||
logging.debug(f">> payload {request.message.to_json(4)}")
|
logging.debug(f">> payload {request.message.to_json(4)}")
|
||||||
|
|
||||||
await run_processor(request)
|
asyncio.ensure_future(run_processor(request))
|
||||||
return Response.new(status=202)
|
return Response.new(status=202)
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue