"docs.it4i/anselm/software/ansys/ansys-fluent.md" did not exist on "b834a65c057cad6c7467ae28c087a6c3d31dae2e"
Newer
Older
import asyncio
import enum
import functools
import pathlib
import traceback
Sybren A. Stüvel
committed
import typing
import requests.exceptions
from . import upstream_update_queue
# All durations/delays/etc are in seconds.
REGISTER_AT_MANAGER_FAILED_RETRY_DELAY = 30
FETCH_TASK_FAILED_RETRY_DELAY = 10 # when we failed obtaining a task
FETCH_TASK_EMPTY_RETRY_DELAY = 5 # when there are no tasks to perform
FETCH_TASK_DONE_SCHEDULE_NEW_DELAY = 3 # after a task is completed
ERROR_RETRY_DELAY = 600 # after the pre-task sanity check failed
PUSH_LOG_MAX_ENTRIES = 1000
PUSH_LOG_MAX_INTERVAL = datetime.timedelta(seconds=30)
PUSH_ACT_MAX_INTERVAL = datetime.timedelta(seconds=15)
ASLEEP_POLL_STATUS_CHANGE_REQUESTED_DELAY = 30
# If there are more than this number of queued task updates, we won't ask
# the Manager for another task to execute. Task execution is delayed until
# the queue size is below this threshold.
QUEUE_SIZE_THRESHOLD = 10
class UnableToRegisterError(Exception):
"""Raised when the worker can't register at the manager.
Will cause an immediate shutdown.
"""
class WorkerState(enum.Enum):
STARTING = 'starting'
AWAKE = 'awake'
ASLEEP = 'asleep'
SHUTTING_DOWN = 'shutting-down'
pre_task_check_write: typing.Iterable[str] = []
pre_task_check_read: typing.Iterable[str] = []
class PreTaskCheckFailed(PermissionError):
"""Raised when the pre-task sanity check fails."""
@attr.s
class FlamencoWorker:
manager = attr.ib(validator=attr.validators.instance_of(upstream.FlamencoManager))
trunner = attr.ib() # Instance of flamenco_worker.runner.TaskRunner
tuqueue = attr.ib(validator=attr.validators.instance_of(upstream_update_queue.TaskUpdateQueue))
task_types = attr.ib(validator=attr.validators.instance_of(list))
worker_id = attr.ib(validator=attr.validators.instance_of(str))
worker_secret = attr.ib(validator=attr.validators.instance_of(str))
worker_dir = attr.ib(validator=attr.validators.instance_of(str))
worker_blender_param = attr.ib(validator=attr.validators.instance_of(str))
worker_blender_cmd = attr.ib(validator=attr.validators.instance_of(str))
loop = attr.ib(validator=attr.validators.instance_of(asyncio.AbstractEventLoop))
shutdown_future = attr.ib(
validator=attr.validators.optional(attr.validators.instance_of(asyncio.Future)))
state = attr.ib(default=WorkerState.STARTING,
validator=attr.validators.instance_of(WorkerState))
# Indicates the state in which the Worker should start
initial_state = attr.ib(validator=attr.validators.instance_of(str), default='awake')
run_single_task = attr.ib(validator=attr.validators.instance_of(bool), default=False)
Sybren A. Stüvel
committed
# When Manager tells us we may no longer run our current task, this is set to True.
# As a result, the cancelled state isn't pushed to Manager any more. It is reset
# to False when a new task is started.
task_is_silently_aborting = attr.ib(default=False, init=False,
validator=attr.validators.instance_of(bool))
validator=attr.validators.optional(attr.validators.instance_of(asyncio.Future)))
asyncio_execution_fut = attr.ib(
Sybren A. Stüvel
committed
default=None, init=False,
validator=attr.validators.optional(attr.validators.instance_of(asyncio.Future)))
default=None, init=False,
validator=attr.validators.optional(attr.validators.instance_of(asyncio.Future)))
task_id = attr.ib(
default=None, init=False,
validator=attr.validators.optional(attr.validators.instance_of(str))
)
current_task_status = attr.ib(
default=None, init=False,
validator=attr.validators.optional(attr.validators.instance_of(str))
)
_queued_log_entries = attr.ib(default=attr.Factory(list), init=False) # type: typing.List[str]
_queue_lock = attr.ib(default=attr.Factory(asyncio.Lock), init=False)
default=attr.Factory(datetime.datetime.now),
validator=attr.validators.optional(attr.validators.instance_of(datetime.datetime)))
last_activity_push = attr.ib(
default=attr.Factory(datetime.datetime.now),
validator=attr.validators.optional(attr.validators.instance_of(datetime.datetime)))
# Kept in sync with the task updates we send to upstream Manager, so that we can send
# a complete Activity each time.
last_task_activity = attr.ib(default=attr.Factory(documents.Activity))
Sybren A. Stüvel
committed
# Configuration
push_log_max_interval = attr.ib(default=PUSH_LOG_MAX_INTERVAL,
validator=attr.validators.instance_of(datetime.timedelta))
push_log_max_entries = attr.ib(default=PUSH_LOG_MAX_ENTRIES,
validator=attr.validators.instance_of(int))
push_act_max_interval = attr.ib(default=PUSH_ACT_MAX_INTERVAL,
validator=attr.validators.instance_of(datetime.timedelta))
pretask_check_params = attr.ib(factory=PreTaskCheckParams,
validator=attr.validators.instance_of(PreTaskCheckParams))
# Futures that represent delayed calls to push_to_manager().
Sybren A. Stüvel
committed
# They are scheduled when logs & activities are registered but not yet pushed. They are
# cancelled when a push_to_manager() actually happens for another reason. There are different
Sybren A. Stüvel
committed
# futures for activity and log pushing, as these can have different max intervals.
_push_log_to_manager = attr.ib(
default=None, init=False,
validator=attr.validators.optional(attr.validators.instance_of(asyncio.Future)))
_push_act_to_manager = attr.ib(
default=None, init=False,
validator=attr.validators.optional(attr.validators.instance_of(asyncio.Future)))
# When the worker is shutting down, the currently running task will be
# handed back to the manager for re-scheduling. In such a situation,
# an abort is expected and acceptable.
failures_are_acceptable = attr.ib(default=False, init=False,
validator=attr.validators.instance_of(bool))
_log = attrs_extra.log('%s.FlamencoWorker' % __name__)
_last_output_produced = 0.0 # seconds since epoch
Sybren A. Stüvel
committed
@property
def active_task_id(self) -> typing.Optional[str]:
"""Returns the task ID, but only if it is currently executing; returns None otherwise."""
if self.asyncio_execution_fut is None or self.asyncio_execution_fut.done():
Sybren A. Stüvel
committed
return None
return self.task_id
async def startup(self, *, may_retry_loop=True):
do_register = not self.worker_id or not self.worker_secret
if do_register:
await self.register_at_manager(may_retry_loop=may_retry_loop)
# Once we know our ID and secret, update the manager object so that we
# don't have to pass our authentication info each and every call.
self.manager.auth = (self.worker_id, self.worker_secret)
# We only need to sign on if we didn't just register. However, this
# can only happen after setting self.manager.auth.
if not do_register:
await self.signon(may_retry_loop=may_retry_loop)
# If we're not supposed to start in 'awake' state, let the Manager know.
if self.initial_state != 'awake':
self._log.info('Telling Manager we are in state %r', self.initial_state)
self.ack_status_change(self.initial_state)
self.schedule_fetch_task()
@functools.lru_cache(maxsize=1)
@property
def nickname(self) -> str:
return self.hostname()
@property
def identifier(self) -> str:
return f'{self.worker_id} ({self.nickname})'
async def _keep_posting_to_manager(self, url: str, json: dict, *, use_auth=True,
post_kwargs = {
'json': json,
'loop': self.loop,
}
if not use_auth:
post_kwargs['auth'] = None
while True:
try:
resp = await self.manager.post(url, **post_kwargs)
resp.raise_for_status()
except requests.RequestException as ex:
if not may_retry_loop:
self._log.error('Unable to POST to manager %s: %s', url, ex)
raise UnableToRegisterError()
self._log.warning('Unable to POST to manager %s, retrying in %i seconds: %s',
url, REGISTER_AT_MANAGER_FAILED_RETRY_DELAY, ex)
await asyncio.sleep(REGISTER_AT_MANAGER_FAILED_RETRY_DELAY)
else:
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
return resp
async def signon(self, *, may_retry_loop: bool):
"""Signs on at the manager.
Only needed when we didn't just register.
"""
self._log.info('Signing on at manager.')
await self._keep_posting_to_manager(
'/sign-on',
json={
'supported_task_types': self.task_types,
'nickname': self.hostname(),
},
may_retry_loop=may_retry_loop,
)
self._log.info('Manager accepted sign-on.')
async def register_at_manager(self, *, may_retry_loop: bool):
self._log.info('Registering at manager')
self.worker_secret = generate_secret()
platform = detect_platform()
resp = await self._keep_posting_to_manager(
'/register-worker',
json={
'secret': self.worker_secret,
'platform': platform,
'supported_task_types': self.task_types,
'nickname': self.hostname(),
},
use_auth=False, # explicitly do not use authentication
may_retry_loop=may_retry_loop,
)
result = resp.json()
self._log.info('Response: %s', result)
self.worker_id = result['_id']
self.write_registration_info()
def write_registration_info(self):
"""Writes the current worker ID and secret to the home dir."""
from . import config
config.merge_with_home_config({
'worker_id': self.worker_id,
'worker_secret': self.worker_secret,
})
def mainloop(self):
self._log.info('Entering main loop')
# TODO: add "watchdog" task that checks the asyncio loop and ensures there is
# always either a task being executed or a task fetch scheduled.
self.loop.run_forever()
"""Schedules a task fetch.
If a task fetch was already queued, that one is cancelled.
:param delay: delay in seconds, after which the task fetch will be performed.
"""
# The current task may still be running, as single_iteration() calls schedule_fetch_task() to
# schedule a future run. This may result in the task not being awaited when we are
# shutting down.
if self.shutdown_future is not None and self.shutdown_future.done():
self._log.warning('Shutting down, not scheduling another fetch-task task.')
return
self.single_iteration_fut = asyncio.ensure_future(self.single_iteration(delay),
loop=self.loop)
async def stop_current_task(self, task_id: str):
Sybren A. Stüvel
committed
"""Stops the current task by canceling the AsyncIO task.
This causes a CancelledError in the self.single_iteration() function, which then takes care
Sybren A. Stüvel
committed
of the task status change and subsequent activity push.
:param task_id: the task ID to stop. Will only perform a stop if it
matches the currently executing task. This is to avoid race
conditions.
Sybren A. Stüvel
committed
"""
if not self.asyncio_execution_fut or self.asyncio_execution_fut.done():
Sybren A. Stüvel
committed
self._log.warning('stop_current_task() called but no task is running.')
return
if self.task_id != task_id:
self._log.warning('stop_current_task(%r) called, but current task is %r, not stopping',
task_id, self.task_id)
return
Sybren A. Stüvel
committed
self._log.warning('Stopping task %s', self.task_id)
Sybren A. Stüvel
committed
self.task_is_silently_aborting = True
Sybren A. Stüvel
committed
try:
await self.trunner.abort_current_task()
except asyncio.CancelledError:
self._log.info('asyncio task was canceled for task runner task %s', self.task_id)
Sybren A. Stüvel
committed
Sybren A. Stüvel
committed
await self.register_log('Worker %s stopped running this task,'
' no longer allowed to run by Manager', self.identifier)
await self.requeue_task_on_manager(task_id)
async def requeue_task_on_manager(self, task_id: str):
"""Return a task to the Manager's queue for later execution."""
self._log.info('Returning task %s to the Manager queue', task_id)
Sybren A. Stüvel
committed
await self.push_to_manager()
await self.tuqueue.flush_and_report(loop=self.loop)
url = f'/tasks/{task_id}/return'
try:
resp = await self.manager.post(url, loop=self.loop)
except IOError as ex:
self._log.exception('Exception POSTing to %s', url)
return
if resp.status_code != 204:
self._log.warning('Error %d returning task %s to Manager: %s',
resp.status_code, resp.json())
await self.register_log('Worker %s could not return this task to the Manager queue',
self.identifier)
return
await self.register_log('Worker %s returned this task to the Manager queue.',
self.identifier)
def shutdown(self):
"""Gracefully shuts down any asynchronous tasks."""
self._log.warning('Shutting down')
self.state = WorkerState.SHUTTING_DOWN
self.failures_are_acceptable = True
self.stop_fetching_tasks()
self.stop_sleeping()
# Stop the task runner
self.loop.run_until_complete(self.trunner.abort_current_task())
# Queue anything that should still be pushed to the Manager
push_act_sched = self._push_act_to_manager is not None \
and not self._push_act_to_manager.done()
push_log_sched = self._push_log_to_manager is not None \
and not self._push_log_to_manager.done()
if push_act_sched or push_log_sched:
# Try to push queued task updates to manager before shutting down
self._log.info('shutdown(): pushing queued updates to manager')
self.loop.run_until_complete(self.push_to_manager())
# Try to do a final push of queued updates to the Manager.
self.loop.run_until_complete(self.tuqueue.flush_and_report(loop=self.loop))
# Let the Manager know we're shutting down
self._log.info('shutdown(): signing off at Manager')
try:
self.loop.run_until_complete(self.manager.post('/sign-off', loop=self.loop))
except Exception as ex:
self._log.warning('Error signing off. Continuing with shutdown. %s', ex)
self.failures_are_acceptable = False
def stop_fetching_tasks(self):
"""Stops the delayed task-fetching from running.
Used in shutdown and when we're going to status 'asleep'.
"""
if self.single_iteration_fut is None or self.single_iteration_fut.done():
self._log.info('stopping task fetching task %s', self.single_iteration_fut)
self.single_iteration_fut.cancel()
# This prevents a 'Task was destroyed but it is pending!' warning on the console.
# Sybren: I've only seen this in unit tests, so maybe this code should be moved
# there, instead.
try:
if not self.loop.is_running():
self.loop.run_until_complete(self.single_iteration_fut)
except asyncio.CancelledError:
pass
async def single_iteration(self, delay: float):
Sybren A. Stüvel
committed
"""Fetches a single task to perform from Flamenco Manager, and executes it.
:param delay: waits this many seconds before fetching a task.
"""
self.state = WorkerState.AWAKE
Sybren A. Stüvel
committed
self._cleanup_state_for_new_task()
# self._log.debug('Going to fetch task in %s seconds', delay)
await asyncio.sleep(delay)
# Prevent outgoing queue overflowing by waiting until it's below the
# threshold before starting another task.
# TODO(sybren): introduce another worker state for this, and handle there.
queue_size = self.tuqueue.queue_size()
if queue_size > QUEUE_SIZE_THRESHOLD:
self._log.info('Task Update Queue size too large (%d > %d), waiting until it shrinks.',
queue_size, QUEUE_SIZE_THRESHOLD)
self.schedule_fetch_task(FETCH_TASK_FAILED_RETRY_DELAY)
return
try:
self.pre_task_sanity_check()
except PreTaskCheckFailed as ex:
self._log.exception('Pre-task sanity check failed: %s, waiting until it succeeds', ex)
self.go_to_state_error()
return
task_info = await self.fetch_task()
if task_info is None:
return
await self.execute_task(task_info)
async def fetch_task(self) -> typing.Optional[dict]:
# TODO: use exponential backoff instead of retrying every fixed N seconds.
log = self._log.getChild('fetch_task')
log.debug('Fetching task')
try:
Sybren A. Stüvel
committed
resp = await self.manager.post('/task', loop=self.loop)
except requests.exceptions.RequestException as ex:
log.warning('Error fetching new task, will retry in %i seconds: %s',
FETCH_TASK_FAILED_RETRY_DELAY, ex)
self.schedule_fetch_task(FETCH_TASK_FAILED_RETRY_DELAY)
return None
if resp.status_code == 204:
log.debug('No tasks available, will retry in %i seconds.',
self.schedule_fetch_task(FETCH_TASK_EMPTY_RETRY_DELAY)
return None
if resp.status_code == 423:
status_change = documents.StatusChangeRequest(**resp.json())
log.info('status change to %r requested when fetching new task',
status_change.status_requested)
self.change_status(status_change.status_requested)
return None
if resp.status_code != 200:
log.warning('Error %i fetching new task, will retry in %i seconds.',
resp.status_code, FETCH_TASK_FAILED_RETRY_DELAY)
self.schedule_fetch_task(FETCH_TASK_FAILED_RETRY_DELAY)
return None
task_info = resp.json()
self.task_id = task_info['_id']
log.info('Received task: %s', self.task_id)
log.debug('Received task: %s', task_info)
return task_info
async def execute_task(self, task_info: dict) -> None:
"""Feed a task to the task runner and monitor for exceptions."""
await self.register_task_update(task_status='active')
self.asyncio_execution_fut = asyncio.ensure_future(
Sybren A. Stüvel
committed
self.trunner.execute(task_info, self),
loop=self.loop)
ok = await self.asyncio_execution_fut
if ok:
await self.register_task_update(
task_status='completed',
activity='Task completed',
)
elif self.failures_are_acceptable:
self._log.warning('Task %s failed, but ignoring it since we are shutting down.',
self.task_id)
Sybren A. Stüvel
committed
else:
self._log.error('Task %s failed', self.task_id)
await self.register_task_update(task_status='failed')
Sybren A. Stüvel
committed
except asyncio.CancelledError:
if self.failures_are_acceptable:
self._log.warning('Task %s was cancelled, but ignoring it since '
'we are shutting down.', self.task_id)
Sybren A. Stüvel
committed
elif self.task_is_silently_aborting:
self._log.warning('Task %s was cancelled, but ignoring it since '
'we are no longer allowed to run it.', self.task_id)
else:
self._log.warning('Task %s was cancelled', self.task_id)
await self.register_task_update(task_status='canceled',
activity='Task was canceled')
except Exception as ex:
self._log.exception('Uncaught exception executing task %s' % self.task_id)
try:
# Such a failure will always result in a failed task, even when
# self.failures_are_acceptable = True; only expected failures are
# acceptable then.
self._queued_log_entries.append(traceback.format_exc())
await self.register_task_update(
task_status='failed',
activity='Uncaught exception: %s %s' % (type(ex).__name__, ex),
)
except Exception:
self._log.exception('While notifying manager of failure, another error happened.')
if self.run_single_task:
self._log.info('Running in single-task mode, exiting.')
self.go_to_state_shutdown()
return
if self.state == WorkerState.AWAKE:
# Schedule a new task run unless shutting down or sleeping; after a little delay to
# not hammer the world when we're in some infinite failure loop.
self.schedule_fetch_task(FETCH_TASK_DONE_SCHEDULE_NEW_DELAY)
Sybren A. Stüvel
committed
def _cleanup_state_for_new_task(self):
"""Cleans up internal state to prepare for a new task to be executed."""
self.last_task_activity = documents.Activity()
self.task_is_silently_aborting = False
self.current_task_status = ''
async def push_to_manager(self, *, delay: datetime.timedelta = None):
"""Updates a task's status and activity.
Sybren A. Stüvel
committed
Uses the TaskUpdateQueue to handle persistent queueing.
"""
Sybren A. Stüvel
committed
if delay is not None:
delay_sec = delay.total_seconds()
self._log.debug('Scheduled delayed push to manager in %r seconds', delay_sec)
Sybren A. Stüvel
committed
await asyncio.sleep(delay_sec)
if self.shutdown_future.done():
self._log.info('Shutting down, not pushing changes to manager.')
self._log.info('Updating task %s with status %r and activity %r',
self.task_id, self.current_task_status, self.last_task_activity)
payload: typing.MutableMapping[str, typing.Any] = {}
Sybren A. Stüvel
committed
if self.task_is_silently_aborting:
self._log.info('push_to_manager: task is silently aborting, will only push logs')
else:
payload = attr.asdict(self.last_task_activity)
if self.current_task_status:
payload['task_status'] = self.current_task_status
now = datetime.datetime.now()
self.last_activity_push = now
Sybren A. Stüvel
committed
# Cancel any pending push task, as we're pushing activities now.
if self._push_act_to_manager is not None:
self._push_act_to_manager.cancel()
if self._queued_log_entries:
payload['log'] = '\n'.join(self._queued_log_entries)
self._queued_log_entries.clear()
self.last_log_push = now
Sybren A. Stüvel
committed
# Cancel any pending push task, as we're pushing logs now.
if self._push_log_to_manager is not None:
self._push_log_to_manager.cancel()
Sybren A. Stüvel
committed
if not payload:
self._log.debug('push_to_manager: nothing to push')
return
self.tuqueue.queue('/tasks/%s/update' % self.task_id, payload)
async def register_task_update(self, *,
task_status: str = None,
**kwargs):
"""Stores the task status and activity, and possibly sends to Flamenco Manager.
If the last update to Manager was long enough ago, or the task status changed,
the info is sent to Manager. This way we can update command progress percentage
hundreds of times per second, without worrying about network overhead.
"""
self._log.debug('Task update: task_status=%s, %s', task_status, kwargs)
# Update the current activity
for key, value in kwargs.items():
setattr(self.last_task_activity, key, value)
if task_status is None:
task_status_changed = False
else:
task_status_changed = self.current_task_status != task_status
self.current_task_status = task_status
if task_status_changed:
self._log.info('Task changed status to %s, pushing to manager', task_status)
await self.push_to_manager()
Sybren A. Stüvel
committed
elif datetime.datetime.now() - self.last_activity_push > self.push_act_max_interval:
self._log.info('More than %s since last activity update, pushing to manager',
Sybren A. Stüvel
committed
self.push_act_max_interval)
await self.push_to_manager()
Sybren A. Stüvel
committed
elif self._push_act_to_manager is None or self._push_act_to_manager.done():
# Schedule a future push to manager.
self._push_act_to_manager = asyncio.ensure_future(
self.push_to_manager(delay=self.push_act_max_interval))
async def register_log(self, log_entry: str, *fmt_args):
Sybren A. Stüvel
committed
"""Registers a log entry, and possibly sends all queued log entries to upstream Manager.
Supports variable arguments, just like the logger.{info,warn,error}(...) family
of methods.
"""
Sybren A. Stüvel
committed
if fmt_args:
log_entry %= fmt_args
now = datetime.datetime.now(tz.tzutc()).isoformat()
self._queued_log_entries.append('%s: %s' % (now, log_entry))
queue_size = len(self._queued_log_entries)
Sybren A. Stüvel
committed
if queue_size > self.push_log_max_entries:
self._log.info('Queued up %i > %i log entries, pushing to manager',
queue_size, self.push_log_max_entries)
await self.push_to_manager()
Sybren A. Stüvel
committed
elif datetime.datetime.now() - self.last_log_push > self.push_log_max_interval:
self._log.info('More than %s since last log update, pushing to manager',
Sybren A. Stüvel
committed
self.push_log_max_interval)
await self.push_to_manager()
Sybren A. Stüvel
committed
elif self._push_log_to_manager is None or self._push_log_to_manager.done():
# Schedule a future push to manager.
self._push_log_to_manager = asyncio.ensure_future(
self.push_to_manager(delay=self.push_log_max_interval))
def output_produced(self, *paths: typing.Union[str, pathlib.PurePath]):
"""Registers a produced output (e.g. rendered frame) with the manager.
This performs a HTTP POST in a background task, returning as soon as
the task is scheduled.
Only sends an update every X seconds, to avoid sending too many
requests when we output frames rapidly.
"""
now = time.time()
if now - self._last_output_produced < 30:
self._log.debug('Throttling POST to Manager /output-produced endpoint')
return
self._last_output_produced = now
async def do_post():
try:
self._log.info('Sending %i path(s) to Manager', len(paths))
resp = await self.manager.post('/output-produced',
json={'paths': [str(p) for p in paths]},
loop=self.loop)
if resp.status_code == 204:
self._log.info('Manager accepted our output notification for %s', paths)
else:
self._log.warning('Manager rejected our output notification: %d %s',
resp.status_code, resp.text)
except Exception:
self._log.exception('error POSTing to manager /output-produced')
self.loop.create_task(do_post())
def change_status(self, new_status: str):
"""Called whenever the Flamenco Manager has a change in current status for us."""
self._log.info('Manager requested we go to status %r', new_status)
status_change_handlers = {
'asleep': self.go_to_state_asleep,
'awake': self.go_to_state_awake,
'shutdown': self.go_to_state_shutdown,
}
try:
handler = status_change_handlers[new_status]
except KeyError:
self._log.error('We have no way to go to status %r, going to sleep instead', new_status)
handler = self.go_to_state_asleep
handler()
def ack_status_change(self, new_status: str) -> typing.Optional[asyncio.Task]:
"""Confirm that we're now in a certain state.
This ACK can be given without a request from the server, for example to support
state changes originating from UNIX signals.
"""
try:
post = self.manager.post('/ack-status-change/%s' % new_status, loop=self.loop)
return self.loop.create_task(post)
except Exception:
self._log.exception('unable to notify Manager')
def go_to_state_asleep(self):
"""Starts polling for wakeup calls."""
self._log.info('Going to sleep')
self.state = WorkerState.ASLEEP
self.stop_fetching_tasks()
self.sleeping_fut = self.loop.create_task(self.sleeping())
self._log.debug('Created task %s', self.sleeping_fut)
self.ack_status_change('asleep')
def go_to_state_awake(self):
"""Restarts the task-fetching asyncio task."""
self._log.info('Waking up')
self.state = WorkerState.AWAKE
self.schedule_fetch_task(FETCH_TASK_DONE_SCHEDULE_NEW_DELAY)
self.ack_status_change('awake')
def go_to_state_shutdown(self):
"""Shuts down the Flamenco Worker.
Whether it comes back up depends on the environment. For example,
using systemd on Linux with Restart=always will do this.
"""
self._log.info('Shutting down by request of the Manager or due to single-task mode')
self.state = WorkerState.SHUTTING_DOWN
# Don't bother acknowledging this status, as we'll push an "offline" status anyway.
# This also makes sure that when we're asleep and told to shut down, the Manager
# sees an asleep → offline status change, and can remember that we should go back
# to asleep status when we come back online.
self.loop.stop()
def go_to_state_error(self):
"""Go to the error state and try going to active after a delay."""
self.state = WorkerState.ERROR
self._log.warning('Going to state %r', self.state.value)
self.ack_status_change(self.state.value)
self.sleeping_fut = self.loop.create_task(self.sleeping_for_error())
def stop_sleeping(self):
"""Stops the asyncio task for sleeping."""
if self.sleeping_fut is None or self.sleeping_fut.done():
try:
except (asyncio.CancelledError, asyncio.InvalidStateError):
pass
except Exception:
self._log.exception('Unexpected exception in sleeping() task.')
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
async def sleeping(self):
"""Regularly polls the Manager to see if we're allowed to wake up again."""
while True:
try:
await asyncio.sleep(ASLEEP_POLL_STATUS_CHANGE_REQUESTED_DELAY)
resp = await self.manager.get('/status-change', loop=self.loop)
if resp.status_code == 204:
# No change, don't do anything
self._log.debug('status the same, continuing sleeping')
elif resp.status_code == 200:
# There is a status change
self._log.debug('/status-change: %s', resp.json())
new_status = resp.json()['status_requested']
self.change_status(new_status)
return
else:
self._log.error(
'Error %d trying to fetch /status-change on Manager, will retry later.',
resp.status_code)
except asyncio.CancelledError:
self._log.info('Sleeping ended')
return
except:
self._log.exception('problems while sleeping')
async def sleeping_for_error(self):
"""After a delay go to active mode to see if any errors are now resolved."""
try:
await asyncio.sleep(ERROR_RETRY_DELAY)
except asyncio.CancelledError:
self._log.info('Error-sleeping ended')
return
except:
self._log.exception('problems while error-sleeping')
return
self._log.warning('Error sleep is done, going to try to become active again')
self.go_to_state_awake()
def pre_task_sanity_check(self):
"""Perform readability and writability checks before fetching tasks."""
self._pre_task_check_read()
self._pre_task_check_write()
self._log.getChild('sanity_check').debug('Pre-task sanity check OK')
def _pre_task_check_read(self):
pre_task_check_read = self.pretask_check_params.pre_task_check_read
if not pre_task_check_read:
return
log = self._log.getChild('sanity_check')
log.debug('Performing pre-task read check')
for read_name in pre_task_check_read:
read_path = pathlib.Path(read_name).absolute()
log.debug(' - Read check on %s', read_path)
if not read_path.exists():
raise PreTaskCheckFailed('%s does not exist' % read_path) from None
if read_path.is_dir():
try:
except PermissionError:
raise PreTaskCheckFailed('%s is not readable' % read_path) from None
except FileNotFoundError:
# This is expected.
pass
except:
log.exception('Unexpected shit happened')
else:
try:
with read_path.open(mode='r') as the_file:
the_file.read(1)
except IOError:
raise PreTaskCheckFailed('%s is not readable' % read_path) from None
def _pre_task_check_write(self):
pre_task_check_write = self.pretask_check_params.pre_task_check_write
if not pre_task_check_write:
return
log = self._log.getChild('sanity_check')
log.debug('Performing pre-task write check')
for write_name in pre_task_check_write:
write_path = pathlib.Path(write_name).absolute()
log.debug(' - Write check on %s', write_path)
post_delete = False
try:
if write_path.is_dir():
testfile = tempfile.TemporaryFile('w', dir=str(write_path))
else:
post_delete = not write_path.exists()
testfile = write_path.open('a+')
with testfile as outfile:
outfile.write('♥')
except PermissionError:
raise PreTaskCheckFailed('%s is not writable' % write_path) from None
if post_delete:
try:
write_path.unlink()
except PermissionError:
log.warning('Unable to delete write-test-file %s', write_path)
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
def generate_secret() -> str:
"""Generates a 64-character secret key."""
import random
import string
randomizer = random.SystemRandom()
tokens = string.ascii_letters + string.digits
secret = ''.join(randomizer.choice(tokens) for _ in range(64))
return secret
def detect_platform() -> str:
"""Detects the platform, returning 'linux', 'windows' or 'darwin'.
Raises an exception when the current platform cannot be detected
as one of those three.
"""
import platform
plat = platform.system().lower()
if not plat:
raise EnvironmentError('Unable to determine platform.')
if plat in {'linux', 'windows', 'darwin'}:
return plat
raise EnvironmentError('Unable to determine platform; unknown platform %r', plat)