Skip to content
Snippets Groups Projects
test_runner.py 6.07 KiB
Newer Older
  • Learn to ignore specific revisions
  • import shlex
    import sys
    from pathlib import Path
    
    from unittest.mock import Mock, call
    
    
    from tests.abstract_worker_test import AbstractWorkerTest
    
    executable = Path(sys.executable).as_posix()
    
    
    
    class AbstractCommandTest(AbstractWorkerTest):
        def setUp(self):
    
            from tests.mock_responses import CoroMock
    
            from flamenco_worker.worker import FlamencoWorker
    
            from flamenco_worker.runner import TaskRunner
    
            from flamenco_worker.cli import construct_asyncio_loop
    
            self.loop = construct_asyncio_loop()
            self.fworker = Mock(spec=FlamencoWorker)
    
            self.fworker.trunner = Mock(spec=TaskRunner)
            self.fworker.trunner.subprocess_pid_file = None
    
            self.fworker.register_log = CoroMock()
            self.fworker.register_task_update = CoroMock()
    
    
            logging.getLogger('flamenco_worker.commands').setLevel(logging.DEBUG)
    
    
        def tearDown(self):
            # This is required for subprocesses, otherwise unregistering signal handlers goes wrong.
            self.loop.close()
    
    
    class SleepCommandTest(AbstractCommandTest):
        def test_sleep(self):
            import time
    
            from flamenco_worker.commands import SleepCommand
    
    
            cmd = SleepCommand(
                worker=self.fworker,
                task_id='12345',
                command_idx=0,
            )
    
            time_before = time.time()
            ok = self.loop.run_until_complete(asyncio.wait_for(
                cmd.run({'time_in_seconds': 0.5}),
                0.6  # the 'sleep' should be over in not more than 0.1 seconds extra
            ))
            duration = time.time() - time_before
            self.assertGreaterEqual(duration, 0.5)
            self.assertTrue(ok)
    
    
    class ExecCommandTest(AbstractCommandTest):
        def construct(self):
    
            from flamenco_worker.commands import ExecCommand
    
            cmd = ExecCommand(
                worker=self.fworker,
                task_id='12345',
                command_idx=0,
            )
            return cmd
    
        def test_bad_settings(self):
            cmd = self.construct()
    
            settings = {'cmd': [1, 2, 3]}
    
            ok = self.loop.run_until_complete(asyncio.wait_for(
                cmd.run(settings),
                0.6
            ))
    
            self.assertFalse(ok)
            self.fworker.register_task_update.assert_called_once_with(
                task_status='failed',
    
                activity='exec.(task_id=12345, command_idx=0): Invalid settings: "cmd" must be a string'
    
            )
    
        def test_exec_python(self):
            cmd = self.construct()
    
            # Use shlex to quote strings like this, so we're sure it's done well.
    
            args = [executable, '-c', r'print("hello, this is two lines\nYes, really.")']
    
            settings = {
                'cmd': ' '.join(shlex.quote(s) for s in args)
            }
    
            ok = self.loop.run_until_complete(asyncio.wait_for(
                cmd.run(settings),
                0.6
            ))
            self.assertTrue(ok)
    
            pid = cmd.proc.pid
    
    
            # Check that both lines have been reported.
            self.fworker.register_log.assert_has_calls([
                call('exec: Starting'),
                call('Executing %s',
    
                     '%s -c \'print("hello, this is two lines\\nYes, really.")\'' % executable),
    
    Sybren A. Stüvel's avatar
    Sybren A. Stüvel committed
                call('pid=%d > hello, this is two lines' % pid),
                call('pid=%d > Yes, really.' % pid),  # note the logged line doesn't end in a newline
    
                call('exec: Finished'),
            ])
    
            self.fworker.register_task_update.assert_called_with(
                activity='finished exec',
                command_progress_percentage=100,
                current_command_idx=0
            )
    
        def test_exec_invalid_utf(self):
            cmd = self.construct()
    
            # Use shlex to quote strings like this, so we're sure it's done well.
            # Writes an invalid sequence of continuation bytes.
    
            args = [executable, '-c', r'import sys; sys.stdout.buffer.write(bytes((0x80, 0x80, 0x80)))']
    
            settings = {
                'cmd': ' '.join(shlex.quote(s) for s in args)
            }
    
            ok = self.loop.run_until_complete(asyncio.wait_for(
                cmd.run(settings),
                0.6
            ))
            self.assertFalse(ok)
    
            # Check that the error has been reported.
    
    Sybren A. Stüvel's avatar
    Sybren A. Stüvel committed
            pid = cmd.proc.pid
            decode_err = "exec.(task_id=12345, command_idx=0): Error executing: Command pid=%d " \
                         "produced non-UTF8 output, aborting: 'utf-8' codec can't decode byte 0x80 "\
                         "in position 0: invalid start byte" % pid
    
            self.fworker.register_log.assert_has_calls([
                call('exec: Starting'),
                call('Executing %s',
    
                     '%s -c \'import sys; sys.stdout.buffer.write(bytes((0x80, 0x80, 0x80)))\'' % executable),
    
            # The update should NOT contain a new task status -- that is left to the Worker.
            self.fworker.register_task_update.assert_called_with(activity=decode_err)
    
        def test_exec_python_fails(self):
            cmd = self.construct()
    
            # Use shlex to quote strings like this, so we're sure it's done well.
    
            args = [executable, '-c', r'raise SystemExit("FAIL")']
    
            settings = {
                'cmd': ' '.join(shlex.quote(s) for s in args)
            }
    
            ok = self.loop.run_until_complete(asyncio.wait_for(
                cmd.run(settings),
                0.6
            ))
            self.assertFalse(ok)
    
            pid = cmd.proc.pid
    
    
            # Check that the execution error has been reported.
            self.fworker.register_log.assert_has_calls([
                call('exec: Starting'),
                call('Executing %s',
    
                     '%s -c \'raise SystemExit("FAIL")\'' % executable),
                call('pid=%d > FAIL' % pid),  # note the logged line doesn't end in a newline
    
                call('exec.(task_id=12345, command_idx=0): Error executing: '
    
                     'Command %s (pid=%d) failed with status 1' % (settings['cmd'], pid))
    
            ])
    
            # The update should NOT contain a new task status -- that is left to the Worker.
    
            self.fworker.register_task_update.assert_called_with(
    
                activity='exec.(task_id=12345, command_idx=0): Error executing: '
    
                         'Command %s (pid=%d) failed with status 1' % (settings['cmd'], pid),