diff --git a/flamenco_worker/commands.py b/flamenco_worker/commands.py index 0b7d19bd4ff321041622d12830f4e8b90c40ba48..e8b3821a7f62c4f2eb76d78a2b76ce43fcc3218e 100644 --- a/flamenco_worker/commands.py +++ b/flamenco_worker/commands.py @@ -1299,3 +1299,34 @@ class MoveWithCounterCommand(AbstractCommand): shutil.move(str(src), str(dest)) self.worker.output_produced(dest) + + +@command_executor('create_python_file') +class CreatePythonFile(AbstractCommand): + def validate(self, settings: Settings): + filepath, err = self._setting(settings, 'filepath', True) + if err: + return err + if not filepath: + return 'filepath may not be empty' + if not filepath.endswith('.py'): + return 'filepath must end in .py' + + dest, err = self._setting(settings, 'contents', True) + if err: + return err + + async def execute(self, settings: Settings): + filepath = Path(settings['filepath']) + await self._mkdir_if_not_exists(filepath.parent) + + if filepath.exists(): + msg = f'Overwriting Python file {filepath}' + else: + msg = f'Creating Python file {filepath}' + + self._log.info(msg) + await self.worker.register_log('%s: %s', self.command_name, msg) + await self.worker.register_log('%s: contents:\n%s', self.command_name, settings['contents']) + + filepath.write_text(settings['contents'], encoding='utf-8') diff --git a/tests/test_commands_create_python_file.py b/tests/test_commands_create_python_file.py new file mode 100644 index 0000000000000000000000000000000000000000..cc52713ec72977d3aeff541854f76b4f4f6d8913 --- /dev/null +++ b/tests/test_commands_create_python_file.py @@ -0,0 +1,59 @@ +from pathlib import Path +import os + +from tests.test_runner import AbstractCommandTest + + +class CreatePythonFileTest(AbstractCommandTest): + def setUp(self): + super().setUp() + + from flamenco_worker.commands import CreatePythonFile + import tempfile + + self.tmpdir = tempfile.TemporaryDirectory() + self.tmppath = Path(self.tmpdir.name) + + self.cmd = CreatePythonFile( + worker=self.fworker, + task_id='12345', + command_idx=0, + ) + + def tearDown(self): + super().tearDown() + self.tmpdir.cleanup() + + def test_validate_settings(self): + self.assertIn('filepath', self.cmd.validate({'filepath': 12, 'contents': '# comment'})) + self.assertIn('filepath', self.cmd.validate({'filepath': '', 'contents': '# comment'})) + self.assertIn('filepath', self.cmd.validate({'filepath': '/nonpy/path', 'contents': '#'})) + self.assertIn('filepath', self.cmd.validate({'contents': '#'})) + + self.assertIn('content', self.cmd.validate({'filepath': '/valid/path.py', 'contents': 12})) + self.assertIn('content', self.cmd.validate({'filepath': '/valid/path.py'})) + + self.assertTrue(self.cmd.validate({})) + self.assertFalse(self.cmd.validate({'filepath': '/valid/path.py', 'contents': ''})) + self.assertFalse(self.cmd.validate({'filepath': '/valid/path.py', 'contents': '#'})) + self.assertFalse(self.cmd.validate({'filepath': '/valid/path.py', 'contents': '##\na=b\n'})) + + def test_nonexistant_path(self): + filepath = self.tmppath / 'nonexisting-dir' / 'somefile.py' + task = self.cmd.run({'filepath': str(filepath), 'contents': 'aapje'}) + ok = self.loop.run_until_complete(task) + + self.assertTrue(ok) + self.assertTrue(filepath.exists()) + self.assertEqual('aapje', filepath.read_text()) + + def test_existing_path(self): + filepath = self.tmppath / 'existing.py' + filepath.write_text('old content') + + task = self.cmd.run({'filepath': str(filepath), 'contents': 'öpje'}) + ok = self.loop.run_until_complete(task) + + self.assertTrue(ok) + self.assertTrue(filepath.exists()) + self.assertEqual('öpje', filepath.read_text(encoding='utf8'))