add --watch option to lint dev command

This commit is contained in:
Izalia Mae 2024-03-31 11:00:31 -04:00
parent f7e31d9387
commit 3421846111

View file

@ -4,9 +4,10 @@ import subprocess
import sys import sys
import time import time
from datetime import datetime from datetime import datetime, timedelta
from pathlib import Path from pathlib import Path
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
from typing import Sequence
from . import __version__ from . import __version__
from . import logger as logging from . import logger as logging
@ -47,14 +48,24 @@ def cli_install():
@cli.command('lint') @cli.command('lint')
@click.argument('path', required = False, default = 'relay') @click.argument('path', required = False, default = 'relay')
@click.option('--strict', '-s', is_flag = True, help = 'Enable strict mode for mypy') @click.option('--strict', '-s', is_flag = True, help = 'Enable strict mode for mypy')
def cli_lint(path: str, strict: bool) -> None: @click.option('--watch', '-w', is_flag = True,
cmd: list[str] = [sys.executable, '-m', 'mypy'] help = 'Automatically, re-run the linters on source change')
def cli_lint(path: str, strict: bool, watch: bool) -> None:
flake8 = [sys.executable, '-m', 'flake8', path]
mypy = [sys.executable, '-m', 'mypy', path]
if strict: if strict:
cmd.append('--strict') mypy.append('--strict')
subprocess.run([*cmd, path], check = False) if watch:
subprocess.run([sys.executable, '-m', 'flake8', path]) handle_run_watcher(mypy, flake8, wait = True)
return
click.echo('----- flake8 -----')
subprocess.run(flake8)
click.echo('\n\n----- mypy -----')
subprocess.run(mypy)
@cli.command('build') @cli.command('build')
@ -91,8 +102,17 @@ def cli_build():
def cli_run(dev: bool): def cli_run(dev: bool):
print('Starting process watcher') print('Starting process watcher')
handler = WatchHandler(dev) cmd = [sys.executable, '-m', 'relay', 'run']
handler.run_proc()
if dev:
cmd.append('-d')
handle_run_watcher(cmd)
def handle_run_watcher(*commands: Sequence[str], wait: bool = False):
handler = WatchHandler(*commands, wait = wait)
handler.run_procs()
watcher = Observer() watcher = Observer()
watcher.schedule(handler, str(SCRIPT), recursive=True) watcher.schedule(handler, str(SCRIPT), recursive=True)
@ -100,13 +120,12 @@ def cli_run(dev: bool):
try: try:
while True: while True:
handler.proc.stdin.write(sys.stdin.read().encode('UTF-8')) # type: ignore time.sleep(1)
handler.proc.stdin.flush() # type: ignore
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
handler.kill_proc() handler.kill_procs()
watcher.stop() watcher.stop()
watcher.join() watcher.join()
@ -114,61 +133,65 @@ def cli_run(dev: bool):
class WatchHandler(PatternMatchingEventHandler): class WatchHandler(PatternMatchingEventHandler):
patterns = ['*.py'] patterns = ['*.py']
cmd = [sys.executable, '-m', 'relay', 'run']
def __init__(self, dev: bool): def __init__(self, *commands: Sequence[str], wait: bool = False):
PatternMatchingEventHandler.__init__(self) PatternMatchingEventHandler.__init__(self)
self.dev: bool = dev self.commands: Sequence[Sequence[str]] = commands
self.proc: subprocess.Popen | None = None self.wait: bool = wait
self.last_restart: datetime | None = None self.procs: list[subprocess.Popen] = []
self.last_restart: datetime = datetime.now()
def kill_proc(self): def kill_procs(self):
if not self.proc or self.proc.poll() is not None: for proc in self.procs:
return if proc.poll() is not None:
continue
logging.info(f'Terminating process {self.proc.pid}') logging.info(f'Terminating process {proc.pid}')
self.proc.terminate() proc.terminate()
sec = 0.0 sec = 0.0
while self.proc.poll() is None: while proc.poll() is None:
time.sleep(0.1) time.sleep(0.1)
sec += 0.1 sec += 0.1
if sec >= 5: if sec >= 5:
logging.error('Failed to terminate. Killing process...') logging.error('Failed to terminate. Killing process...')
self.proc.kill() proc.kill()
break break
logging.info('Process terminated') logging.info('Process terminated')
def run_proc(self, restart=False): def run_procs(self, restart: bool = False):
timestamp = datetime.timestamp(datetime.now()) if restart:
self.last_restart = timestamp if not self.last_restart else 0 if datetime.now() - timedelta(seconds = 3) < self.last_restart:
if restart and self.proc.pid != '':
if timestamp - 3 < self.last_restart:
return return
self.kill_proc() self.kill_procs()
cmd = [*self.cmd, '-d'] if self.dev else self.cmd self.last_restart = datetime.now()
self.proc = subprocess.Popen(cmd, stdin = subprocess.PIPE) if self.wait:
self.last_restart = timestamp self.procs = []
logging.info('Started process with PID %i', self.proc.pid) for cmd in self.commands:
logging.info('Command: %s', ' '.join(cmd)) logging.info('Running command: %s', ' '.join(cmd))
subprocess.run(cmd)
else:
self.procs = list(subprocess.Popen(cmd) for cmd in self.commands)
pids = (str(proc.pid) for proc in self.procs)
logging.info('Started processes with PIDs: %s', ', '.join(pids))
def on_any_event(self, event): def on_any_event(self, event):
if event.event_type not in ['modified', 'created', 'deleted']: if event.event_type not in ['modified', 'created', 'deleted']:
return return
self.run_proc(restart = True) self.run_procs(restart = True)
if __name__ == '__main__': if __name__ == '__main__':