diff --git a/CHANGELOG.md b/CHANGELOG.md index 6aaaedcfa43f81b95ab835b26a738dc3db5d963f..d79d74c103a6b023626cbc644fb4fa6ada3fb196 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,11 @@ changed functionality, fixed bugs). - Added a new `log_a_lot` command and task type `debug` to aid in debugging. - Fixed bug where task updates would be sent in an infinite loop when the Manager didn't know the task, blocking all other task updates. +- Added a `pre_task_check` section to the configuration file, which can contain `write.N` and + `read.N` keys (where `N` can be anything to make the keys unique). Every value is a path to be + checked for writability or readability. Note that write checks are lossy, and bytes are appended + to any existing file used to check writability. When such a check fails, the Worker will go to + status `error` and sleep for 10 minutes before trying again. ## Version 2.1.0 (2018-01-04) diff --git a/flamenco-worker.cfg b/flamenco-worker.cfg index df03106a031c1a00dee950950323a10718031a56..488286eec0031f3846249ab91c3720ba78556cbd 100644 --- a/flamenco-worker.cfg +++ b/flamenco-worker.cfg @@ -11,6 +11,10 @@ push_log_max_interval_seconds = 30 push_log_max_entries = 2000 push_act_max_interval_seconds = 15 +[pre_task_check] +write.0 = /render/_flamenco +write.1 = /render/spring/frames + [loggers] keys = root,flamenco_worker diff --git a/flamenco_worker/cli.py b/flamenco_worker/cli.py index f737598870de0d9414ee3652b84d3e524e757ad0..2b91fbaa1c520cc1040eed23d2f4588758d0b20c 100644 --- a/flamenco_worker/cli.py +++ b/flamenco_worker/cli.py @@ -6,6 +6,7 @@ import logging import logging.config import os import pathlib +import typing import requests @@ -102,6 +103,8 @@ def main(): trunner = runner.TaskRunner( shutdown_future=shutdown_future) + pretask_check_params = parse_pretask_check_config(confparser, log) + fworker = worker.FlamencoWorker( manager=fmanager, trunner=trunner, @@ -116,6 +119,7 @@ def main(): push_act_max_interval=confparser.interval_secs('push_act_max_interval_seconds'), initial_state='testing' if args.test else 'awake', run_single_task=args.single, + pretask_check_params=pretask_check_params, ) mir = may_i_run.MayIRun( @@ -199,6 +203,31 @@ def main(): log.warning('Flamenco Worker is shut down') +def parse_pretask_check_config(confparser, log): + """Parse the [pre_task_check] config section. + + :rtype: flamenco.worker.PreTaskCheckParams + """ + from . import worker + + check_read: typing.List[pathlib.Path] = [] + check_write: typing.List[pathlib.Path] = [] + for name, value in confparser.items(section='pre_task_check'): + if name.startswith('write'): + check_write.append(pathlib.Path(value)) + elif name.statswith('read'): + check_read.append(pathlib.Path(value)) + else: + log.fatal('Config section "pre_task_check" should only have keys starting with ' + '"read" or "write"; found %r', value) + raise SystemExit(47) + pretask_check_params = worker.PreTaskCheckParams( + pre_task_check_write=tuple(check_write), + pre_task_check_read=tuple(check_read), + ) + return pretask_check_params + + def asyncio_report_tasks(signum=0, stackframe=None): """Runs the garbage collector, then reports all AsyncIO tasks on the log. diff --git a/flamenco_worker/config.py b/flamenco_worker/config.py index 0638771166d0b9381fd8007f5d2f8c4d3b393423..8d19cd7fa099b2e26327e4fff2b0a3669bb4132e 100644 --- a/flamenco_worker/config.py +++ b/flamenco_worker/config.py @@ -25,7 +25,8 @@ DEFAULT_CONFIG = { ('push_log_max_interval_seconds', str(worker.PUSH_LOG_MAX_INTERVAL.total_seconds())), ('push_log_max_entries', str(worker.PUSH_LOG_MAX_ENTRIES)), ('push_act_max_interval_seconds', str(worker.PUSH_ACT_MAX_INTERVAL.total_seconds())), - ]) + ]), + 'pre_task_check': collections.OrderedDict([]), } # Will be assigned to the config key 'task_types' when started with --test CLI arg. @@ -92,12 +93,12 @@ def load_config(config_file: pathlib.Path = None, log.info('Loading configuration from %s', config_file) if not config_file.exists(): log.fatal('Config file %s does not exist', config_file) - raise SystemExit() + raise SystemExit(47) loaded = confparser.read(str(config_file), encoding='utf8') else: if not GLOBAL_CONFIG_FILE.exists(): log.fatal('Config file %s does not exist', GLOBAL_CONFIG_FILE) - raise SystemExit() + raise SystemExit(47) config_files = [GLOBAL_CONFIG_FILE, HOME_CONFIG_FILE] filenames = [str(f.absolute()) for f in config_files] diff --git a/flamenco_worker/worker.py b/flamenco_worker/worker.py index acd3c5cbc8490aa659a9d203b999029bbb40a6db..d258334120b8a270878fa154be61a38f15fb34f8 100644 --- a/flamenco_worker/worker.py +++ b/flamenco_worker/worker.py @@ -1,7 +1,9 @@ import asyncio import datetime import enum +import itertools import pathlib +import tempfile import typing import attr @@ -16,6 +18,7 @@ REGISTER_AT_MANAGER_FAILED_RETRY_DELAY = 30 FETCH_TASK_FAILED_RETRY_DELAY = 10 # when we failed obtaining a task FETCH_TASK_EMPTY_RETRY_DELAY = 5 # when there are no tasks to perform FETCH_TASK_DONE_SCHEDULE_NEW_DELAY = 3 # after a task is completed +ERROR_RETRY_DELAY = 600 # after the pre-task sanity check failed PUSH_LOG_MAX_ENTRIES = 1000 PUSH_LOG_MAX_INTERVAL = datetime.timedelta(seconds=30) @@ -40,9 +43,20 @@ class WorkerState(enum.Enum): STARTING = 'starting' AWAKE = 'awake' ASLEEP = 'asleep' + ERROR = 'error' SHUTTING_DOWN = 'shutting-down' +@attr.s(auto_attribs=True) +class PreTaskCheckParams: + pre_task_check_write: typing.Tuple[pathlib.Path] = () + pre_task_check_read: typing.Tuple[pathlib.Path] = () + + +class PreTaskCheckFailed(PermissionError): + """Raised when the pre-task sanity check fails.""" + + @attr.s class FlamencoWorker: manager = attr.ib(validator=attr.validators.instance_of(upstream.FlamencoManager)) @@ -110,6 +124,9 @@ class FlamencoWorker: push_act_max_interval = attr.ib(default=PUSH_ACT_MAX_INTERVAL, validator=attr.validators.instance_of(datetime.timedelta)) + pretask_check_params = attr.ib(factory=PreTaskCheckParams, + validator=attr.validators.instance_of(PreTaskCheckParams)) + # Futures that represent delayed calls to push_to_manager(). # They are scheduled when logs & activities are registered but not yet pushed. They are # cancelled when a push_to_manager() actually happens for another reason. There are different @@ -160,7 +177,6 @@ class FlamencoWorker: self.schedule_fetch_task() - @staticmethod def hostname() -> str: import socket @@ -265,7 +281,8 @@ class FlamencoWorker: self._log.warning('Shutting down, not scheduling another fetch-task task.') return - self.single_iteration_task = asyncio.ensure_future(self.single_iteration(delay), loop=self.loop) + self.single_iteration_task = asyncio.ensure_future(self.single_iteration(delay), + loop=self.loop) async def stop_current_task(self): """Stops the current task by canceling the AsyncIO task. @@ -373,6 +390,13 @@ class FlamencoWorker: self.schedule_fetch_task(FETCH_TASK_FAILED_RETRY_DELAY) return + try: + self.pre_task_sanity_check() + except PreTaskCheckFailed as ex: + self._log.exception('Pre-task sanity check failed: %s, waiting until it succeeds', ex) + self.go_to_state_error() + return + task_info = await self.fetch_task() if task_info is None: return @@ -559,7 +583,7 @@ class FlamencoWorker: self._push_act_to_manager = asyncio.ensure_future( self.push_to_manager(delay=self.push_act_max_interval)) - async def register_log(self, log_entry, *fmt_args): + async def register_log(self, log_entry: str, *fmt_args): """Registers a log entry, and possibly sends all queued log entries to upstream Manager. Supports variable arguments, just like the logger.{info,warn,error}(...) family @@ -620,6 +644,7 @@ class FlamencoWorker: 'asleep': self.go_to_state_asleep, 'awake': self.go_to_state_awake, 'shutdown': self.go_to_state_shutdown, + 'error': self.go_to_state_error, } try: @@ -678,6 +703,13 @@ class FlamencoWorker: # to asleep status when we come back online. self.loop.stop() + def go_to_state_error(self): + """Go to the error state and try going to active after a delay.""" + self.state = WorkerState.ERROR + self._log.warning('Going to state %r', self.state.value) + self.ack_status_change(self.state.value) + self.sleeping_task = self.loop.create_task(self.sleeping_for_error()) + def stop_sleeping(self): """Stops the asyncio task for sleeping.""" if self.sleeping_task is None or self.sleeping_task.done(): @@ -717,6 +749,82 @@ class FlamencoWorker: except: self._log.exception('problems while sleeping') + async def sleeping_for_error(self): + """After a delay go to active mode to see if any errors are now resolved.""" + + try: + await asyncio.sleep(ERROR_RETRY_DELAY) + except asyncio.CancelledError: + self._log.info('Error-sleeping ended') + return + except: + self._log.exception('problems while error-sleeping') + return + + self._log.warning('Error sleep is done, going to try to become active again') + self.go_to_state_awake() + + def pre_task_sanity_check(self): + """Perform readability and writability checks before fetching tasks.""" + + self._pre_task_check_read() + self._pre_task_check_write() + self._log.debug('Pre-task sanity check OK') + + def _pre_task_check_read(self): + pre_task_check_read = self.pretask_check_params.pre_task_check_read + if not pre_task_check_read: + return + + self._log.debug('Performing pre-task read check') + for read_name in pre_task_check_read: + read_path = pathlib.Path(read_name).absolute() + self._log.debug(' - Read check on %s', read_path) + if not read_path.exists(): + raise PreTaskCheckFailed('%s does not exist' % read_path) from None + if read_path.is_dir(): + try: + # Globbing an unreadable/nonexistant directory just + # returns an empty iterator. Globbing something nonexistant + # inside a non-readable directory raises a PermissionError. + glob = (read_path / 'anything').glob('*') + list(itertools.islice(glob, 1)) + except PermissionError: + raise PreTaskCheckFailed('%s is not readable' % read_path) from None + else: + try: + with read_path.open(mode='r') as the_file: + the_file.read(1) + except IOError: + raise PreTaskCheckFailed('%s is not readable' % read_path) from None + + def _pre_task_check_write(self): + pre_task_check_write = self.pretask_check_params.pre_task_check_write + if not pre_task_check_write: + return + + self._log.debug('Performing pre-task write check') + for write_name in pre_task_check_write: + write_path = pathlib.Path(write_name).absolute() + self._log.debug(' - Write check on %s', write_path) + + post_delete = False + try: + if write_path.is_dir(): + testfile = tempfile.TemporaryFile('w', dir=write_path) + else: + post_delete = not write_path.exists() + testfile = write_path.open('a+') + with testfile as outfile: + outfile.write('♥') + except PermissionError: + raise PreTaskCheckFailed('%s is not writable' % write_path) from None + if post_delete: + try: + write_path.unlink() + except PermissionError: + self._log.warning('Unable to delete write-test-file %s', write_path) + def generate_secret() -> str: """Generates a 64-character secret key."""