Skip to content
Snippets Groups Projects
Commit c410d5aa authored by Milan Jaros's avatar Milan Jaros
Browse files

merge

parents 1420660c dcc36017
Branches
No related tags found
No related merge requests found
tests/test_frames/000108.png

10.3 KiB

tests/test_frames/000109.png

10.3 KiB

tests/test_frames/000110.png

10.3 KiB

tests/test_frames/000111.png

10.3 KiB

tests/test_frames/000112.png

10.2 KiB

tests/test_frames/000113.png

10.3 KiB

tests/test_frames/000114.png

10.3 KiB

File added
File added
from unittest.mock import Mock
from abstract_worker_test import AbstractWorkerTest
from tests.abstract_worker_test import AbstractWorkerTest
class MayIRunTest(AbstractWorkerTest):
......
import pathlib
import unittest
import mypy.api
test_modules = ['flamenco_worker', 'tests']
class MypyRunnerTest(unittest.TestCase):
def test_run_mypy(self):
proj_root = pathlib.Path(__file__).parent.parent
args = ['--incremental', '--ignore-missing-imports'] + [str(proj_root / dirname) for dirname
in test_modules]
result = mypy.api.run(args)
stdout, stderr, status = result
messages = []
if stderr:
messages.append(stderr)
if stdout:
messages.append(stdout)
if status:
messages.append('Mypy failed with status %d' % status)
if messages:
self.fail('\n'.join(['Mypy errors:'] + messages))
import contextlib
import tempfile
from pathlib import Path
from unittest import mock
from tests.test_worker import AbstractFWorkerTest
# Mock merge_with_home_config() so that it doesn't overwrite actual config.
@mock.patch('flamenco_worker.config.merge_with_home_config', new=lambda *args: None)
@mock.patch('socket.gethostname', new=lambda: 'ws-unittest')
class PretaskWriteCheckTest(AbstractFWorkerTest):
def test_not_writable_dir(self):
with self.write_check() as tdir:
unwritable_dir = tdir / 'unwritable'
unwritable_dir.mkdir(0o555)
self.worker.pretask_check_params.pre_task_check_write = (unwritable_dir, )
def test_not_writable_file(self):
with self.write_check() as tdir:
unwritable_dir = tdir / 'unwritable'
unwritable_dir.mkdir(0o555)
unwritable_file = unwritable_dir / 'testfile.txt'
self.worker.pretask_check_params.pre_task_check_write = (unwritable_file, )
def test_write_file_exists(self):
def post_run():
self.assertTrue(existing.exists(), '%s should not have been deleted' % existing)
with self.write_check(post_run) as tdir:
existing = tdir / 'unwritable-testfile.txt'
existing.write_bytes(b'x')
existing.chmod(0o444) # only readable
self.worker.pretask_check_params.pre_task_check_write = (existing, )
def test_happy_remove_file(self):
from tests.mock_responses import EmptyResponse, CoroMock
self.manager.post = CoroMock(return_value=EmptyResponse())
with tempfile.TemporaryDirectory() as tdir_name:
tdir = Path(tdir_name)
testfile = tdir / 'writable-testfile.txt'
self.worker.pretask_check_params.pre_task_check_write = (testfile, )
self.worker.schedule_fetch_task()
self.asyncio_loop.run_until_complete(self.worker.single_iteration_fut)
self.assertFalse(testfile.exists(), '%s should have been deleted' % testfile)
self.manager.post.assert_called_once_with('/task', loop=mock.ANY)
self.assertIsNone(self.worker.sleeping_fut)
def test_happy_not_remove_file(self):
from tests.mock_responses import EmptyResponse, CoroMock
self.manager.post = CoroMock(return_value=EmptyResponse())
with tempfile.TemporaryDirectory() as tdir_name:
tdir = Path(tdir_name)
testfile = tdir / 'writable-testfile.txt'
# The file exists before, so it shouldn't be deleted afterwards.
with testfile.open('wb') as outfile:
outfile.write(b'x')
self.worker.pretask_check_params.pre_task_check_write = (testfile, )
self.worker.schedule_fetch_task()
self.asyncio_loop.run_until_complete(self.worker.single_iteration_fut)
self.assertTrue(testfile.exists(), '%s should not have been deleted' % testfile)
self.manager.post.assert_called_once_with('/task', loop=mock.ANY)
self.assertIsNone(self.worker.sleeping_fut)
@contextlib.contextmanager
def write_check(self, post_run=None):
from tests.mock_responses import EmptyResponse, CoroMock
self.manager.post = CoroMock(return_value=EmptyResponse())
with tempfile.TemporaryDirectory() as tdir_name:
tdir = Path(tdir_name)
yield tdir
self.worker.schedule_fetch_task()
self.asyncio_loop.run_until_complete(self.worker.single_iteration_fut)
if post_run is not None:
post_run()
self.manager.post.assert_called_once_with('/ack-status-change/error', loop=mock.ANY)
self.assertFalse(self.worker.sleeping_fut.done())
# Mock merge_with_home_config() so that it doesn't overread actual config.
@mock.patch('flamenco_worker.config.merge_with_home_config', new=lambda *args: None)
@mock.patch('socket.gethostname', new=lambda: 'ws-unittest')
class PretaskReadCheckTest(AbstractFWorkerTest):
def test_not_readable_dir(self):
def cleanup():
unreadable_dir.chmod(0o755)
with self.read_check(cleanup) as tdir:
unreadable_dir = tdir / 'unreadable'
unreadable_dir.mkdir(0o000)
self.worker.pretask_check_params.pre_task_check_read = (unreadable_dir, )
def test_read_file_exists(self):
def post_run():
self.assertTrue(existing.exists(), '%s should not have been deleted' % existing)
with self.read_check(post_run) as tdir:
existing = tdir / 'unreadable-testfile.txt'
existing.write_bytes(b'x')
existing.chmod(0o222) # only writable
self.worker.pretask_check_params.pre_task_check_read = (existing, )
def test_read_file_not_exists(self):
with self.read_check() as tdir:
nonexistant = tdir / 'nonexistant-testfile.txt'
self.worker.pretask_check_params.pre_task_check_read = (nonexistant, )
@contextlib.contextmanager
def read_check(self, post_run=None):
from tests.mock_responses import EmptyResponse, CoroMock
self.manager.post = CoroMock(return_value=EmptyResponse())
with tempfile.TemporaryDirectory() as tdir_name:
tdir = Path(tdir_name)
yield tdir
self.worker.schedule_fetch_task()
self.asyncio_loop.run_until_complete(self.worker.single_iteration_fut)
if post_run is not None:
post_run()
self.manager.post.assert_called_once_with('/ack-status-change/error', loop=mock.ANY)
self.assertFalse(self.worker.sleeping_fut.done())
import asyncio
from unittest.mock import Mock, call
from abstract_worker_test import AbstractWorkerTest
from tests.abstract_worker_test import AbstractWorkerTest
class AbstractCommandTest(AbstractWorkerTest):
def setUp(self):
from mock_responses import CoroMock
from tests.mock_responses import CoroMock
from flamenco_worker.worker import FlamencoWorker
from flamenco_worker.runner import TaskRunner
from flamenco_worker.cli import construct_asyncio_loop
self.loop = construct_asyncio_loop()
self.fworker = Mock(spec=FlamencoWorker)
self.fworker.trunner = Mock(spec=TaskRunner)
self.fworker.trunner.subprocess_pid_file = None
self.fworker.register_log = CoroMock()
self.fworker.register_task_update = CoroMock()
......@@ -83,14 +86,15 @@ class ExecCommandTest(AbstractCommandTest):
0.6
))
self.assertTrue(ok)
pid = cmd.proc.pid
# Check that both lines have been reported.
self.fworker.register_log.assert_has_calls([
call('exec: Starting'),
call('Executing %s',
'%s -c \'print("hello, this is two lines\\nYes, really.")\'' % sys.executable),
call('> hello, this is two lines'), # note the logged line doesn't end in a newline
call('> Yes, really.'), # note the logged line doesn't end in a newline
call('pid=%d > hello, this is two lines' % pid),
call('pid=%d > Yes, really.' % pid), # note the logged line doesn't end in a newline
call('exec: Finished'),
])
......@@ -119,9 +123,10 @@ class ExecCommandTest(AbstractCommandTest):
self.assertFalse(ok)
# Check that the error has been reported.
decode_err = "exec.(task_id=12345, command_idx=0): Error executing: Command produced " \
"non-UTF8 output, aborting: 'utf-8' codec can't decode byte 0x80 in " \
"position 0: invalid start byte"
pid = cmd.proc.pid
decode_err = "exec.(task_id=12345, command_idx=0): Error executing: Command pid=%d " \
"produced non-UTF8 output, aborting: 'utf-8' codec can't decode byte 0x80 "\
"in position 0: invalid start byte" % pid
self.fworker.register_log.assert_has_calls([
call('exec: Starting'),
call('Executing %s',
......@@ -148,19 +153,20 @@ class ExecCommandTest(AbstractCommandTest):
0.6
))
self.assertFalse(ok)
pid = cmd.proc.pid
# Check that the execution error has been reported.
self.fworker.register_log.assert_has_calls([
call('exec: Starting'),
call('Executing %s',
'%s -c \'raise SystemExit("¡FAIL!")\'' % sys.executable),
call('> ¡FAIL!'), # note the logged line doesn't end in a newline
call('pid=%d > ¡FAIL!' % pid), # note the logged line doesn't end in a newline
call('exec.(task_id=12345, command_idx=0): Error executing: '
'Command failed with status 1')
'Command %s (pid=%d) failed with status 1' % (settings['cmd'], pid))
])
# The update should NOT contain a new task status -- that is left to the Worker.
self.fworker.register_task_update.assert_called_with(
activity='exec.(task_id=12345, command_idx=0): Error executing: '
'Command failed with status 1',
'Command %s (pid=%d) failed with status 1' % (settings['cmd'], pid),
)
......@@ -6,7 +6,7 @@ from unittest.mock import Mock
import requests
from abstract_worker_test import AbstractWorkerTest
from tests.abstract_worker_test import AbstractWorkerTest
class TaskUpdateQueueTest(AbstractWorkerTest):
......@@ -14,7 +14,7 @@ class TaskUpdateQueueTest(AbstractWorkerTest):
from flamenco_worker.upstream import FlamencoManager
from flamenco_worker.upstream_update_queue import TaskUpdateQueue
from flamenco_worker.cli import construct_asyncio_loop
from mock_responses import CoroMock
from tests.mock_responses import CoroMock
self.asyncio_loop = construct_asyncio_loop()
self.shutdown_future = self.asyncio_loop.create_future()
......@@ -39,7 +39,7 @@ class TaskUpdateQueueTest(AbstractWorkerTest):
Also tests connection errors and other HTTP error statuses.
"""
from mock_responses import JsonResponse, EmptyResponse
from tests.mock_responses import JsonResponse, EmptyResponse
# Try different value types
payload = {'key': 'value',
......@@ -95,7 +95,7 @@ class TaskUpdateQueueTest(AbstractWorkerTest):
def test_queue_persistence(self):
"""Check that updates are pushed, even when the process is stopped & restarted."""
from mock_responses import EmptyResponse
from tests.mock_responses import EmptyResponse
from flamenco_worker.upstream_update_queue import TaskUpdateQueue
# Try different value types
......@@ -151,7 +151,7 @@ class TaskUpdateQueueTest(AbstractWorkerTest):
"""A 409 Conflict response should discard a queued task update.
"""
from mock_responses import TextResponse
from tests.mock_responses import TextResponse
# Try different value types
payload = {'key': 'value',
......@@ -182,3 +182,42 @@ class TaskUpdateQueueTest(AbstractWorkerTest):
# There should only be one attempt at delivering this payload.
self.assertEqual(1, tries)
self.assertEqual([], list(self.tuqueue._queue()))
def test_task_gone(self):
"""A 404 Not Found response should discard a queued task update.
This can happen in a race condition, when a task is deleted/archived
while there are still updates in the local queue.
"""
from tests.mock_responses import TextResponse
# Try different value types
payload = {'key': 'value',
'sub': {'some': 13,
'values': datetime.datetime.now()}}
tries = 0
async def push_callback(url, *, json, loop):
nonlocal tries
tries += 1
self.shutdown_future.cancel()
return TextResponse("no", status_code=404)
self.manager.post.side_effect = push_callback
self.tuqueue.queue('/push/here', payload)
# Run the loop for 2 seconds. This should be enough for 3 retries of 0.3 seconds + handling
# the actual payload.
self.asyncio_loop.run_until_complete(
asyncio.wait_for(
self.tuqueue.work(loop=self.asyncio_loop),
timeout=2
)
)
# There should only be one attempt at delivering this payload.
self.assertEqual(1, tries)
self.assertEqual([], list(self.tuqueue._queue()))
......@@ -6,7 +6,7 @@ from unittest.mock import Mock
import asyncio
import requests
from abstract_worker_test import AbstractWorkerTest
from tests.abstract_worker_test import AbstractWorkerTest
class AbstractFWorkerTest(AbstractWorkerTest):
......@@ -16,7 +16,7 @@ class AbstractFWorkerTest(AbstractWorkerTest):
from flamenco_worker.worker import FlamencoWorker
from flamenco_worker.runner import TaskRunner
from flamenco_worker.upstream_update_queue import TaskUpdateQueue
from mock_responses import CoroMock
from tests.mock_responses import CoroMock
self.asyncio_loop = construct_asyncio_loop()
self.asyncio_loop.set_debug(True)
......@@ -44,6 +44,11 @@ class AbstractFWorkerTest(AbstractWorkerTest):
shutdown_future=self.shutdown_future,
)
# Prime the LRU cache to always return this hostname.
with unittest.mock.patch('socket.gethostname') as gethostname:
gethostname.return_value = 'ws-unittest'
self.worker.hostname()
def tearDown(self):
if self.worker._push_act_to_manager is not None:
try:
......@@ -70,12 +75,10 @@ class AbstractFWorkerTest(AbstractWorkerTest):
class WorkerStartupTest(AbstractFWorkerTest):
# Mock merge_with_home_config() so that it doesn't overwrite actual config.
@unittest.mock.patch('socket.gethostname')
@unittest.mock.patch('flamenco_worker.config.merge_with_home_config')
def test_startup_already_registered(self, mock_merge_with_home_config, mock_gethostname):
from mock_responses import EmptyResponse, CoroMock
def test_startup_already_registered(self, mock_merge_with_home_config):
from tests.mock_responses import EmptyResponse, CoroMock
mock_gethostname.return_value = 'ws-unittest'
self.manager.post = CoroMock(return_value=EmptyResponse())
self.asyncio_loop.run_until_complete(self.worker.startup(may_retry_loop=False))
......@@ -90,14 +93,12 @@ class WorkerStartupTest(AbstractFWorkerTest):
)
self.tuqueue.queue.assert_not_called()
@unittest.mock.patch('socket.gethostname')
@unittest.mock.patch('flamenco_worker.config.merge_with_home_config')
def test_startup_registration(self, mock_merge_with_home_config, mock_gethostname):
def test_startup_registration(self, mock_merge_with_home_config):
from flamenco_worker.worker import detect_platform
from mock_responses import JsonResponse, CoroMock
from tests.mock_responses import JsonResponse, CoroMock
self.worker.worker_id = None
mock_gethostname.return_value = 'ws-unittest'
self.manager.post = CoroMock(return_value=JsonResponse({
'_id': '5555',
......@@ -128,7 +129,7 @@ class WorkerStartupTest(AbstractFWorkerTest):
"""Test that startup is aborted when the worker can't register."""
from flamenco_worker.worker import detect_platform, UnableToRegisterError
from mock_responses import JsonResponse, CoroMock
from tests.mock_responses import JsonResponse, CoroMock
self.worker.worker_id = None
mock_gethostname.return_value = 'ws-unittest'
......@@ -167,7 +168,7 @@ class TestWorkerTaskExecution(AbstractFWorkerTest):
def test_fetch_task_happy(self):
from unittest.mock import call
from mock_responses import JsonResponse, CoroMock
from tests.mock_responses import JsonResponse, CoroMock
self.manager.post = CoroMock()
# response when fetching a task
......@@ -199,11 +200,11 @@ class TestWorkerTaskExecution(AbstractFWorkerTest):
self.worker.schedule_fetch_task()
self.manager.post.assert_not_called()
interesting_task = self.worker.fetch_task_task
self.loop.run_until_complete(self.worker.fetch_task_task)
interesting_task = self.worker.single_iteration_fut
self.loop.run_until_complete(self.worker.single_iteration_fut)
# Another fetch-task task should have been scheduled.
self.assertNotEqual(self.worker.fetch_task_task, interesting_task)
self.assertNotEqual(self.worker.single_iteration_fut, interesting_task)
self.manager.post.assert_called_once_with('/task', loop=self.asyncio_loop)
self.tuqueue.queue.assert_has_calls([
......@@ -223,7 +224,70 @@ class TestWorkerTaskExecution(AbstractFWorkerTest):
def test_stop_current_task(self):
"""Test that stopped tasks get status 'canceled'."""
from mock_responses import JsonResponse, CoroMock
from tests.mock_responses import JsonResponse, CoroMock, EmptyResponse
self.manager.post = CoroMock()
self.manager.post.coro.side_effect = [
# response when fetching a task
JsonResponse({
'_id': '58514d1e9837734f2e71b479',
'job': '58514d1e9837734f2e71b477',
'manager': '585a795698377345814d2f68',
'project': '',
'user': '580f8c66983773759afdb20e',
'name': 'sleep-14-26',
'status': 'processing',
'priority': 50,
'job_type': 'unittest',
'task_type': 'sleep',
'commands': [
{'name': 'sleep', 'settings': {'time_in_seconds': 3}}
]
}),
EmptyResponse(), # stopping (and thus returning) a task.
EmptyResponse(), # signing off
]
self.worker.schedule_fetch_task()
stop_called = False
async def stop():
nonlocal stop_called
stop_called = True
await asyncio.sleep(0.2)
await self.worker.stop_current_task(self.worker.task_id)
asyncio.ensure_future(stop(), loop=self.loop)
self.loop.run_until_complete(self.worker.single_iteration_fut)
self.assertTrue(stop_called)
self.manager.post.assert_has_calls([
unittest.mock.call('/task', loop=self.asyncio_loop),
unittest.mock.call(f'/tasks/{self.worker.task_id}/return', loop=self.asyncio_loop),
])
self.tuqueue.queue.assert_any_call(
'/tasks/58514d1e9837734f2e71b479/update',
{'task_progress_percentage': 0, 'activity': '',
'command_progress_percentage': 0, 'task_status': 'active',
'current_command_idx': 0},
)
# A bit clunky because we don't know which timestamp is included in the log line.
last_args, last_kwargs = self.tuqueue.queue.call_args
self.assertEqual(last_args[0], '/tasks/58514d1e9837734f2e71b479/update')
self.assertEqual(last_kwargs, {})
self.assertIn('log', last_args[1])
self.assertIn(
'Worker 1234 (ws-unittest) stopped running this task, no longer '
'allowed to run by Manager', last_args[1]['log'])
self.assertEqual(self.tuqueue.queue.call_count, 2)
def test_stop_current_task_mismatch(self):
from tests.mock_responses import JsonResponse, CoroMock
self.manager.post = CoroMock()
# response when fetching a task
......@@ -252,10 +316,10 @@ class TestWorkerTaskExecution(AbstractFWorkerTest):
stop_called = True
await asyncio.sleep(0.2)
await self.worker.stop_current_task()
await self.worker.stop_current_task('other-task-id')
asyncio.ensure_future(stop(), loop=self.loop)
self.loop.run_until_complete(self.worker.fetch_task_task)
self.loop.run_until_complete(self.worker.single_iteration_fut)
self.assertTrue(stop_called)
......@@ -267,13 +331,12 @@ class TestWorkerTaskExecution(AbstractFWorkerTest):
'current_command_idx': 0},
)
# A bit clunky because we don't know which timestamp is included in the log line.
# The task shouldn't be stopped, because the wrong task ID was requested to stop.
last_args, last_kwargs = self.tuqueue.queue.call_args
self.assertEqual(last_args[0], '/tasks/58514d1e9837734f2e71b479/update')
self.assertEqual(last_kwargs, {})
self.assertIn('log', last_args[1])
self.assertTrue(last_args[1]['log'].endswith(
'Worker 1234 stopped running this task, no longer allowed to run by Manager'))
self.assertIn('activity', last_args[1])
self.assertEqual(last_args[1]['activity'], 'Task completed')
self.assertEqual(self.tuqueue.queue.call_count, 2)
......@@ -395,7 +458,7 @@ class WorkerShutdownTest(AbstractWorkerTest):
from flamenco_worker.worker import FlamencoWorker
from flamenco_worker.runner import TaskRunner
from flamenco_worker.upstream_update_queue import TaskUpdateQueue
from mock_responses import CoroMock
from tests.mock_responses import CoroMock
self.asyncio_loop = construct_asyncio_loop()
self.asyncio_loop.set_debug(True)
......@@ -439,7 +502,7 @@ class WorkerSleepingTest(AbstractFWorkerTest):
self.worker.loop = self.loop
def test_stop_current_task_go_sleep(self):
from mock_responses import JsonResponse, CoroMock
from tests.mock_responses import JsonResponse, CoroMock
self.manager.post = CoroMock()
# response when fetching a task
......@@ -449,8 +512,8 @@ class WorkerSleepingTest(AbstractFWorkerTest):
self.worker.schedule_fetch_task()
with self.assertRaises(concurrent.futures.CancelledError):
self.loop.run_until_complete(self.worker.fetch_task_task)
self.loop.run_until_complete(self.worker.single_iteration_fut)
self.assertIsNotNone(self.worker.sleeping_task)
self.assertFalse(self.worker.sleeping_task.done())
self.assertTrue(self.worker.fetch_task_task.done())
self.assertIsNotNone(self.worker.sleeping_fut)
self.assertFalse(self.worker.sleeping_fut.done())
self.assertTrue(self.worker.single_iteration_fut.done())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment