Skip to content
Snippets Groups Projects
Commit b9a25c2f authored by Sybren A. Stüvel's avatar Sybren A. Stüvel
Browse files

Added pre-task sanity check

This check is performed every time before trying to fetch a task. It checks
for readability or writability of certain filesystem paths, to prevent
tasks from failing when the shared storage hasn't been mounted. Instead,
the worker will go to `error` status and sleep for 10 minutes before trying
again.
parent 33860727
No related branches found
No related tags found
No related merge requests found
...@@ -17,6 +17,11 @@ changed functionality, fixed bugs). ...@@ -17,6 +17,11 @@ changed functionality, fixed bugs).
- Added a new `log_a_lot` command and task type `debug` to aid in debugging. - Added a new `log_a_lot` command and task type `debug` to aid in debugging.
- Fixed bug where task updates would be sent in an infinite loop when the Manager didn't - Fixed bug where task updates would be sent in an infinite loop when the Manager didn't
know the task, blocking all other task updates. know the task, blocking all other task updates.
- Added a `pre_task_check` section to the configuration file, which can contain `write.N` and
`read.N` keys (where `N` can be anything to make the keys unique). Every value is a path to be
checked for writability or readability. Note that write checks are lossy, and bytes are appended
to any existing file used to check writability. When such a check fails, the Worker will go to
status `error` and sleep for 10 minutes before trying again.
## Version 2.1.0 (2018-01-04) ## Version 2.1.0 (2018-01-04)
......
...@@ -11,6 +11,10 @@ push_log_max_interval_seconds = 30 ...@@ -11,6 +11,10 @@ push_log_max_interval_seconds = 30
push_log_max_entries = 2000 push_log_max_entries = 2000
push_act_max_interval_seconds = 15 push_act_max_interval_seconds = 15
[pre_task_check]
write.0 = /render/_flamenco
write.1 = /render/spring/frames
[loggers] [loggers]
keys = root,flamenco_worker keys = root,flamenco_worker
......
...@@ -6,6 +6,7 @@ import logging ...@@ -6,6 +6,7 @@ import logging
import logging.config import logging.config
import os import os
import pathlib import pathlib
import typing
import requests import requests
...@@ -102,6 +103,8 @@ def main(): ...@@ -102,6 +103,8 @@ def main():
trunner = runner.TaskRunner( trunner = runner.TaskRunner(
shutdown_future=shutdown_future) shutdown_future=shutdown_future)
pretask_check_params = parse_pretask_check_config(confparser, log)
fworker = worker.FlamencoWorker( fworker = worker.FlamencoWorker(
manager=fmanager, manager=fmanager,
trunner=trunner, trunner=trunner,
...@@ -116,6 +119,7 @@ def main(): ...@@ -116,6 +119,7 @@ def main():
push_act_max_interval=confparser.interval_secs('push_act_max_interval_seconds'), push_act_max_interval=confparser.interval_secs('push_act_max_interval_seconds'),
initial_state='testing' if args.test else 'awake', initial_state='testing' if args.test else 'awake',
run_single_task=args.single, run_single_task=args.single,
pretask_check_params=pretask_check_params,
) )
mir = may_i_run.MayIRun( mir = may_i_run.MayIRun(
...@@ -199,6 +203,31 @@ def main(): ...@@ -199,6 +203,31 @@ def main():
log.warning('Flamenco Worker is shut down') log.warning('Flamenco Worker is shut down')
def parse_pretask_check_config(confparser, log):
"""Parse the [pre_task_check] config section.
:rtype: flamenco.worker.PreTaskCheckParams
"""
from . import worker
check_read: typing.List[pathlib.Path] = []
check_write: typing.List[pathlib.Path] = []
for name, value in confparser.items(section='pre_task_check'):
if name.startswith('write'):
check_write.append(pathlib.Path(value))
elif name.statswith('read'):
check_read.append(pathlib.Path(value))
else:
log.fatal('Config section "pre_task_check" should only have keys starting with '
'"read" or "write"; found %r', value)
raise SystemExit(47)
pretask_check_params = worker.PreTaskCheckParams(
pre_task_check_write=tuple(check_write),
pre_task_check_read=tuple(check_read),
)
return pretask_check_params
def asyncio_report_tasks(signum=0, stackframe=None): def asyncio_report_tasks(signum=0, stackframe=None):
"""Runs the garbage collector, then reports all AsyncIO tasks on the log. """Runs the garbage collector, then reports all AsyncIO tasks on the log.
......
...@@ -25,7 +25,8 @@ DEFAULT_CONFIG = { ...@@ -25,7 +25,8 @@ DEFAULT_CONFIG = {
('push_log_max_interval_seconds', str(worker.PUSH_LOG_MAX_INTERVAL.total_seconds())), ('push_log_max_interval_seconds', str(worker.PUSH_LOG_MAX_INTERVAL.total_seconds())),
('push_log_max_entries', str(worker.PUSH_LOG_MAX_ENTRIES)), ('push_log_max_entries', str(worker.PUSH_LOG_MAX_ENTRIES)),
('push_act_max_interval_seconds', str(worker.PUSH_ACT_MAX_INTERVAL.total_seconds())), ('push_act_max_interval_seconds', str(worker.PUSH_ACT_MAX_INTERVAL.total_seconds())),
]) ]),
'pre_task_check': collections.OrderedDict([]),
} }
# Will be assigned to the config key 'task_types' when started with --test CLI arg. # Will be assigned to the config key 'task_types' when started with --test CLI arg.
...@@ -92,12 +93,12 @@ def load_config(config_file: pathlib.Path = None, ...@@ -92,12 +93,12 @@ def load_config(config_file: pathlib.Path = None,
log.info('Loading configuration from %s', config_file) log.info('Loading configuration from %s', config_file)
if not config_file.exists(): if not config_file.exists():
log.fatal('Config file %s does not exist', config_file) log.fatal('Config file %s does not exist', config_file)
raise SystemExit() raise SystemExit(47)
loaded = confparser.read(str(config_file), encoding='utf8') loaded = confparser.read(str(config_file), encoding='utf8')
else: else:
if not GLOBAL_CONFIG_FILE.exists(): if not GLOBAL_CONFIG_FILE.exists():
log.fatal('Config file %s does not exist', GLOBAL_CONFIG_FILE) log.fatal('Config file %s does not exist', GLOBAL_CONFIG_FILE)
raise SystemExit() raise SystemExit(47)
config_files = [GLOBAL_CONFIG_FILE, HOME_CONFIG_FILE] config_files = [GLOBAL_CONFIG_FILE, HOME_CONFIG_FILE]
filenames = [str(f.absolute()) for f in config_files] filenames = [str(f.absolute()) for f in config_files]
......
import asyncio import asyncio
import datetime import datetime
import enum import enum
import itertools
import pathlib import pathlib
import tempfile
import typing import typing
import attr import attr
...@@ -16,6 +18,7 @@ REGISTER_AT_MANAGER_FAILED_RETRY_DELAY = 30 ...@@ -16,6 +18,7 @@ REGISTER_AT_MANAGER_FAILED_RETRY_DELAY = 30
FETCH_TASK_FAILED_RETRY_DELAY = 10 # when we failed obtaining a task FETCH_TASK_FAILED_RETRY_DELAY = 10 # when we failed obtaining a task
FETCH_TASK_EMPTY_RETRY_DELAY = 5 # when there are no tasks to perform FETCH_TASK_EMPTY_RETRY_DELAY = 5 # when there are no tasks to perform
FETCH_TASK_DONE_SCHEDULE_NEW_DELAY = 3 # after a task is completed FETCH_TASK_DONE_SCHEDULE_NEW_DELAY = 3 # after a task is completed
ERROR_RETRY_DELAY = 600 # after the pre-task sanity check failed
PUSH_LOG_MAX_ENTRIES = 1000 PUSH_LOG_MAX_ENTRIES = 1000
PUSH_LOG_MAX_INTERVAL = datetime.timedelta(seconds=30) PUSH_LOG_MAX_INTERVAL = datetime.timedelta(seconds=30)
...@@ -40,9 +43,20 @@ class WorkerState(enum.Enum): ...@@ -40,9 +43,20 @@ class WorkerState(enum.Enum):
STARTING = 'starting' STARTING = 'starting'
AWAKE = 'awake' AWAKE = 'awake'
ASLEEP = 'asleep' ASLEEP = 'asleep'
ERROR = 'error'
SHUTTING_DOWN = 'shutting-down' SHUTTING_DOWN = 'shutting-down'
@attr.s(auto_attribs=True)
class PreTaskCheckParams:
pre_task_check_write: typing.Tuple[pathlib.Path] = ()
pre_task_check_read: typing.Tuple[pathlib.Path] = ()
class PreTaskCheckFailed(PermissionError):
"""Raised when the pre-task sanity check fails."""
@attr.s @attr.s
class FlamencoWorker: class FlamencoWorker:
manager = attr.ib(validator=attr.validators.instance_of(upstream.FlamencoManager)) manager = attr.ib(validator=attr.validators.instance_of(upstream.FlamencoManager))
...@@ -110,6 +124,9 @@ class FlamencoWorker: ...@@ -110,6 +124,9 @@ class FlamencoWorker:
push_act_max_interval = attr.ib(default=PUSH_ACT_MAX_INTERVAL, push_act_max_interval = attr.ib(default=PUSH_ACT_MAX_INTERVAL,
validator=attr.validators.instance_of(datetime.timedelta)) validator=attr.validators.instance_of(datetime.timedelta))
pretask_check_params = attr.ib(factory=PreTaskCheckParams,
validator=attr.validators.instance_of(PreTaskCheckParams))
# Futures that represent delayed calls to push_to_manager(). # Futures that represent delayed calls to push_to_manager().
# They are scheduled when logs & activities are registered but not yet pushed. They are # They are scheduled when logs & activities are registered but not yet pushed. They are
# cancelled when a push_to_manager() actually happens for another reason. There are different # cancelled when a push_to_manager() actually happens for another reason. There are different
...@@ -160,7 +177,6 @@ class FlamencoWorker: ...@@ -160,7 +177,6 @@ class FlamencoWorker:
self.schedule_fetch_task() self.schedule_fetch_task()
@staticmethod @staticmethod
def hostname() -> str: def hostname() -> str:
import socket import socket
...@@ -265,7 +281,8 @@ class FlamencoWorker: ...@@ -265,7 +281,8 @@ class FlamencoWorker:
self._log.warning('Shutting down, not scheduling another fetch-task task.') self._log.warning('Shutting down, not scheduling another fetch-task task.')
return return
self.single_iteration_task = asyncio.ensure_future(self.single_iteration(delay), loop=self.loop) self.single_iteration_task = asyncio.ensure_future(self.single_iteration(delay),
loop=self.loop)
async def stop_current_task(self): async def stop_current_task(self):
"""Stops the current task by canceling the AsyncIO task. """Stops the current task by canceling the AsyncIO task.
...@@ -373,6 +390,13 @@ class FlamencoWorker: ...@@ -373,6 +390,13 @@ class FlamencoWorker:
self.schedule_fetch_task(FETCH_TASK_FAILED_RETRY_DELAY) self.schedule_fetch_task(FETCH_TASK_FAILED_RETRY_DELAY)
return return
try:
self.pre_task_sanity_check()
except PreTaskCheckFailed as ex:
self._log.exception('Pre-task sanity check failed: %s, waiting until it succeeds', ex)
self.go_to_state_error()
return
task_info = await self.fetch_task() task_info = await self.fetch_task()
if task_info is None: if task_info is None:
return return
...@@ -559,7 +583,7 @@ class FlamencoWorker: ...@@ -559,7 +583,7 @@ class FlamencoWorker:
self._push_act_to_manager = asyncio.ensure_future( self._push_act_to_manager = asyncio.ensure_future(
self.push_to_manager(delay=self.push_act_max_interval)) self.push_to_manager(delay=self.push_act_max_interval))
async def register_log(self, log_entry, *fmt_args): async def register_log(self, log_entry: str, *fmt_args):
"""Registers a log entry, and possibly sends all queued log entries to upstream Manager. """Registers a log entry, and possibly sends all queued log entries to upstream Manager.
Supports variable arguments, just like the logger.{info,warn,error}(...) family Supports variable arguments, just like the logger.{info,warn,error}(...) family
...@@ -620,6 +644,7 @@ class FlamencoWorker: ...@@ -620,6 +644,7 @@ class FlamencoWorker:
'asleep': self.go_to_state_asleep, 'asleep': self.go_to_state_asleep,
'awake': self.go_to_state_awake, 'awake': self.go_to_state_awake,
'shutdown': self.go_to_state_shutdown, 'shutdown': self.go_to_state_shutdown,
'error': self.go_to_state_error,
} }
try: try:
...@@ -678,6 +703,13 @@ class FlamencoWorker: ...@@ -678,6 +703,13 @@ class FlamencoWorker:
# to asleep status when we come back online. # to asleep status when we come back online.
self.loop.stop() self.loop.stop()
def go_to_state_error(self):
"""Go to the error state and try going to active after a delay."""
self.state = WorkerState.ERROR
self._log.warning('Going to state %r', self.state.value)
self.ack_status_change(self.state.value)
self.sleeping_task = self.loop.create_task(self.sleeping_for_error())
def stop_sleeping(self): def stop_sleeping(self):
"""Stops the asyncio task for sleeping.""" """Stops the asyncio task for sleeping."""
if self.sleeping_task is None or self.sleeping_task.done(): if self.sleeping_task is None or self.sleeping_task.done():
...@@ -717,6 +749,82 @@ class FlamencoWorker: ...@@ -717,6 +749,82 @@ class FlamencoWorker:
except: except:
self._log.exception('problems while sleeping') self._log.exception('problems while sleeping')
async def sleeping_for_error(self):
"""After a delay go to active mode to see if any errors are now resolved."""
try:
await asyncio.sleep(ERROR_RETRY_DELAY)
except asyncio.CancelledError:
self._log.info('Error-sleeping ended')
return
except:
self._log.exception('problems while error-sleeping')
return
self._log.warning('Error sleep is done, going to try to become active again')
self.go_to_state_awake()
def pre_task_sanity_check(self):
"""Perform readability and writability checks before fetching tasks."""
self._pre_task_check_read()
self._pre_task_check_write()
self._log.debug('Pre-task sanity check OK')
def _pre_task_check_read(self):
pre_task_check_read = self.pretask_check_params.pre_task_check_read
if not pre_task_check_read:
return
self._log.debug('Performing pre-task read check')
for read_name in pre_task_check_read:
read_path = pathlib.Path(read_name).absolute()
self._log.debug(' - Read check on %s', read_path)
if not read_path.exists():
raise PreTaskCheckFailed('%s does not exist' % read_path) from None
if read_path.is_dir():
try:
# Globbing an unreadable/nonexistant directory just
# returns an empty iterator. Globbing something nonexistant
# inside a non-readable directory raises a PermissionError.
glob = (read_path / 'anything').glob('*')
list(itertools.islice(glob, 1))
except PermissionError:
raise PreTaskCheckFailed('%s is not readable' % read_path) from None
else:
try:
with read_path.open(mode='r') as the_file:
the_file.read(1)
except IOError:
raise PreTaskCheckFailed('%s is not readable' % read_path) from None
def _pre_task_check_write(self):
pre_task_check_write = self.pretask_check_params.pre_task_check_write
if not pre_task_check_write:
return
self._log.debug('Performing pre-task write check')
for write_name in pre_task_check_write:
write_path = pathlib.Path(write_name).absolute()
self._log.debug(' - Write check on %s', write_path)
post_delete = False
try:
if write_path.is_dir():
testfile = tempfile.TemporaryFile('w', dir=write_path)
else:
post_delete = not write_path.exists()
testfile = write_path.open('a+')
with testfile as outfile:
outfile.write('')
except PermissionError:
raise PreTaskCheckFailed('%s is not writable' % write_path) from None
if post_delete:
try:
write_path.unlink()
except PermissionError:
self._log.warning('Unable to delete write-test-file %s', write_path)
def generate_secret() -> str: def generate_secret() -> str:
"""Generates a 64-character secret key.""" """Generates a 64-character secret key."""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment