diff --git a/.gitignore b/.gitignore index af7e0a22088e133df1c0c025b13bae90463c9ccb..90ecd3fb2222ee25dc94d0a32de72a6e9d1601a7 100644 --- a/.gitignore +++ b/.gitignore @@ -4,9 +4,10 @@ .cache/ .coverage +.mypy_cache/ /flamenco_worker.egg-info/ /flamenco-worker.db /build/ /dist/ -/flamenco-worker*.log +/flamenco-worker*.log* diff --git a/CHANGELOG.md b/CHANGELOG.md index 75389d1aa3878abfe2b44207c9d9bc66ef4403ce..653515a463f1ace4c4a2df3ebfd082554e2ea373 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ changed functionality, fixed bugs). ## Version 2.2 (in development) - Always log the version of Flamenco Worker. +- Requires Flamenco Manager 2.2 or newer. - Include missing merge-exr.blend, required for progressive rendering, in the distribution bundle. - Include `exr-merge` task type in default configuration, which is required for progressive rendering. @@ -15,6 +16,28 @@ changed functionality, fixed bugs). amounts of logs before pushing to Flamenco Manager). - Fixed a memory leak in the ask update queue. - Added a new `log_a_lot` command and task type `debug` to aid in debugging. +- Fixed bug where task updates would be sent in an infinite loop when the Manager didn't + know the task, blocking all other task updates. +- Added a `pre_task_check` section to the configuration file, which can contain `write.N` and + `read.N` keys (where `N` can be anything to make the keys unique). Every value is a path to be + checked for writability or readability. Note that write checks are lossy, and bytes are appended + to any existing file used to check writability. When such a check fails, the Worker will go to + status `error` and sleep for 10 minutes before trying again. +- Subprocess commands now write the spawned process PID in a text file, and refuse to run if there + already is such a file with an alive PID. +- Log lines produced by subprocesses are now prefixed with 'PID=nnn'. +- Moved from pip-installing requirements.txt to Pipenv. +- Upgraded Python from 3.5 to 3.7 +- Added a new command `create_video` which uses FFmpeg to create a video after rendering an image + sequence. It's up to Flamenco Server to include (or not) this command in a render job. +- Explicitly return tasks to the Manager queue when stopping them (that is, when going asleep or + shutting down). Requires Flamenco Manager 2.2 or newer. +- Added support for commands used in the blender-video-chunks job type: + - blender_render_audio + - concat_videos + - create_video + - move_with_counter + - mux_audio ## Version 2.1.0 (2018-01-04) diff --git a/Pipfile b/Pipfile new file mode 100644 index 0000000000000000000000000000000000000000..8742be27630f0c81b28fe915cc53bba64b2bbe4a --- /dev/null +++ b/Pipfile @@ -0,0 +1,21 @@ +[[source]] +url = "https://pypi.org/simple" +verify_ssl = true +name = "pypi" + +[packages] +attrs = "*" +requests = "*" +psutil = "*" + +[dev-packages] +pytest = "<4" # pytest-cov uses deprecated function (removed in pytest 4) when using --no-cov +pytest-cov = "*" +wheel = "*" +pyinstaller = "*" +ipython = "*" +pathlib2 = {version = "*", markers="python_version < '3.6'"} +mypy = "*" + +[requires] +python_version = "3.7" diff --git a/Pipfile.lock b/Pipfile.lock new file mode 100644 index 0000000000000000000000000000000000000000..39a4591cf071e111f9fb6e6183b700345cf54d67 --- /dev/null +++ b/Pipfile.lock @@ -0,0 +1,364 @@ +{ + "_meta": { + "hash": { + "sha256": "b0736d28b87f6c3f3541d2bab2740817ad8f51db7f8e92ce7bd6068ab2ac32c8" + }, + "pipfile-spec": 6, + "requires": { + "python_version": "3.7" + }, + "sources": [ + { + "name": "pypi", + "url": "https://pypi.org/simple", + "verify_ssl": true + } + ] + }, + "default": { + "attrs": { + "hashes": [ + "sha256:10cbf6e27dbce8c30807caf056c8eb50917e0eaafe86347671b57254006c3e69", + "sha256:ca4be454458f9dec299268d472aaa5a11f67a4ff70093396e1ceae9c76cf4bbb" + ], + "index": "pypi", + "version": "==18.2.0" + }, + "certifi": { + "hashes": [ + "sha256:339dc09518b07e2fa7eda5450740925974815557727d6bd35d319c1524a04a4c", + "sha256:6d58c986d22b038c8c0df30d639f23a3e6d172a05c3583e766f4c0b785c0986a" + ], + "version": "==2018.10.15" + }, + "chardet": { + "hashes": [ + "sha256:84ab92ed1c4d4f16916e05906b6b75a6c0fb5db821cc65e70cbd64a3e2a5eaae", + "sha256:fc323ffcaeaed0e0a02bf4d117757b98aed530d9ed4531e3e15460124c106691" + ], + "version": "==3.0.4" + }, + "idna": { + "hashes": [ + "sha256:156a6814fb5ac1fc6850fb002e0852d56c0c8d2531923a51032d1b70760e186e", + "sha256:684a38a6f903c1d71d6d5fac066b58d7768af4de2b832e426ec79c30daa94a16" + ], + "version": "==2.7" + }, + "psutil": { + "hashes": [ + "sha256:1c19957883e0b93d081d41687089ad630e370e26dc49fd9df6951d6c891c4736", + "sha256:1c71b9716790e202a00ab0931a6d1e25db1aa1198bcacaea2f5329f75d257fff", + "sha256:3b7a4daf4223dae171a67a89314ac5ca0738e94064a78d99cfd751c55d05f315", + "sha256:3e19be3441134445347af3767fa7770137d472a484070840eee6653b94ac5576", + "sha256:6e265c8f3da00b015d24b842bfeb111f856b13d24f2c57036582568dc650d6c3", + "sha256:809c9cef0402e3e48b5a1dddc390a8a6ff58b15362ea5714494073fa46c3d293", + "sha256:b4d1b735bf5b120813f4c89db8ac22d89162c558cbd7fdd298866125fe906219", + "sha256:bbffac64cfd01c6bcf90eb1bedc6c80501c4dae8aef4ad6d6dd49f8f05f6fc5a", + "sha256:bfcea4f189177b2d2ce4a34b03c4ac32c5b4c22e21f5b093d9d315e6e253cd81" + ], + "index": "pypi", + "version": "==5.4.8" + }, + "requests": { + "hashes": [ + "sha256:65b3a120e4329e33c9889db89c80976c5272f56ea92d3e74da8a463992e3ff54", + "sha256:ea881206e59f41dbd0bd445437d792e43906703fff75ca8ff43ccdb11f33f263" + ], + "index": "pypi", + "version": "==2.20.1" + }, + "urllib3": { + "hashes": [ + "sha256:61bf29cada3fc2fbefad4fdf059ea4bd1b4a86d2b6d15e1c7c0b582b9752fe39", + "sha256:de9529817c93f27c8ccbfead6985011db27bd0ddfcdb2d86f3f663385c6a9c22" + ], + "version": "==1.24.1" + } + }, + "develop": { + "altgraph": { + "hashes": [ + "sha256:d6814989f242b2b43025cba7161fc1b8fb487a62cd49c49245d6fd01c18ac997", + "sha256:ddf5320017147ba7b810198e0b6619bd7b5563aa034da388cea8546b877f9b0c" + ], + "version": "==0.16.1" + }, + "atomicwrites": { + "hashes": [ + "sha256:0312ad34fcad8fac3704d441f7b317e50af620823353ec657a53e981f92920c0", + "sha256:ec9ae8adaae229e4f8446952d204a3e4b5fdd2d099f9be3aaf556120135fb3ee" + ], + "version": "==1.2.1" + }, + "attrs": { + "hashes": [ + "sha256:10cbf6e27dbce8c30807caf056c8eb50917e0eaafe86347671b57254006c3e69", + "sha256:ca4be454458f9dec299268d472aaa5a11f67a4ff70093396e1ceae9c76cf4bbb" + ], + "index": "pypi", + "version": "==18.2.0" + }, + "backcall": { + "hashes": [ + "sha256:38ecd85be2c1e78f77fd91700c76e14667dc21e2713b63876c0eb901196e01e4", + "sha256:bbbf4b1e5cd2bdb08f915895b51081c041bac22394fdfcfdfbe9f14b77c08bf2" + ], + "version": "==0.1.0" + }, + "coverage": { + "hashes": [ + "sha256:09e47c529ff77bf042ecfe858fb55c3e3eb97aac2c87f0349ab5a7efd6b3939f", + "sha256:0a1f9b0eb3aa15c990c328535655847b3420231af299386cfe5efc98f9c250fe", + "sha256:0cc941b37b8c2ececfed341444a456912e740ecf515d560de58b9a76562d966d", + "sha256:10e8af18d1315de936d67775d3a814cc81d0747a1a0312d84e27ae5610e313b0", + "sha256:1b4276550b86caa60606bd3572b52769860a81a70754a54acc8ba789ce74d607", + "sha256:1e8a2627c48266c7b813975335cfdea58c706fe36f607c97d9392e61502dc79d", + "sha256:2b224052bfd801beb7478b03e8a66f3f25ea56ea488922e98903914ac9ac930b", + "sha256:447c450a093766744ab53bf1e7063ec82866f27bcb4f4c907da25ad293bba7e3", + "sha256:46101fc20c6f6568561cdd15a54018bb42980954b79aa46da8ae6f008066a30e", + "sha256:4710dc676bb4b779c4361b54eb308bc84d64a2fa3d78e5f7228921eccce5d815", + "sha256:510986f9a280cd05189b42eee2b69fecdf5bf9651d4cd315ea21d24a964a3c36", + "sha256:5535dda5739257effef56e49a1c51c71f1d37a6e5607bb25a5eee507c59580d1", + "sha256:5a7524042014642b39b1fcae85fb37556c200e64ec90824ae9ecf7b667ccfc14", + "sha256:5f55028169ef85e1fa8e4b8b1b91c0b3b0fa3297c4fb22990d46ff01d22c2d6c", + "sha256:6694d5573e7790a0e8d3d177d7a416ca5f5c150742ee703f3c18df76260de794", + "sha256:6831e1ac20ac52634da606b658b0b2712d26984999c9d93f0c6e59fe62ca741b", + "sha256:77f0d9fa5e10d03aa4528436e33423bfa3718b86c646615f04616294c935f840", + "sha256:828ad813c7cdc2e71dcf141912c685bfe4b548c0e6d9540db6418b807c345ddd", + "sha256:85a06c61598b14b015d4df233d249cd5abfa61084ef5b9f64a48e997fd829a82", + "sha256:8cb4febad0f0b26c6f62e1628f2053954ad2c555d67660f28dfb1b0496711952", + "sha256:a5c58664b23b248b16b96253880b2868fb34358911400a7ba39d7f6399935389", + "sha256:aaa0f296e503cda4bc07566f592cd7a28779d433f3a23c48082af425d6d5a78f", + "sha256:ab235d9fe64833f12d1334d29b558aacedfbca2356dfb9691f2d0d38a8a7bfb4", + "sha256:b3b0c8f660fae65eac74fbf003f3103769b90012ae7a460863010539bb7a80da", + "sha256:bab8e6d510d2ea0f1d14f12642e3f35cefa47a9b2e4c7cea1852b52bc9c49647", + "sha256:c45297bbdbc8bb79b02cf41417d63352b70bcb76f1bbb1ee7d47b3e89e42f95d", + "sha256:d19bca47c8a01b92640c614a9147b081a1974f69168ecd494687c827109e8f42", + "sha256:d64b4340a0c488a9e79b66ec9f9d77d02b99b772c8b8afd46c1294c1d39ca478", + "sha256:da969da069a82bbb5300b59161d8d7c8d423bc4ccd3b410a9b4d8932aeefc14b", + "sha256:ed02c7539705696ecb7dc9d476d861f3904a8d2b7e894bd418994920935d36bb", + "sha256:ee5b8abc35b549012e03a7b1e86c09491457dba6c94112a2482b18589cc2bdb9" + ], + "version": "==4.5.2" + }, + "decorator": { + "hashes": [ + "sha256:2c51dff8ef3c447388fe5e4453d24a2bf128d3a4c32af3fabef1f01c6851ab82", + "sha256:c39efa13fbdeb4506c476c9b3babf6a718da943dab7811c206005a4a956c080c" + ], + "version": "==4.3.0" + }, + "future": { + "hashes": [ + "sha256:67045236dcfd6816dc439556d009594abf643e5eb48992e36beac09c2ca659b8" + ], + "version": "==0.17.1" + }, + "ipython": { + "hashes": [ + "sha256:a5781d6934a3341a1f9acb4ea5acdc7ea0a0855e689dbe755d070ca51e995435", + "sha256:b10a7ddd03657c761fc503495bc36471c8158e3fc948573fb9fe82a7029d8efd" + ], + "index": "pypi", + "version": "==7.1.1" + }, + "ipython-genutils": { + "hashes": [ + "sha256:72dd37233799e619666c9f639a9da83c34013a73e8bbc79a7a6348d93c61fab8", + "sha256:eb2e116e75ecef9d4d228fdc66af54269afa26ab4463042e33785b887c628ba8" + ], + "version": "==0.2.0" + }, + "jedi": { + "hashes": [ + "sha256:0191c447165f798e6a730285f2eee783fff81b0d3df261945ecb80983b5c3ca7", + "sha256:b7493f73a2febe0dc33d51c99b474547f7f6c0b2c8fb2b21f453eef204c12148" + ], + "version": "==0.13.1" + }, + "macholib": { + "hashes": [ + "sha256:ac02d29898cf66f27510d8f39e9112ae00590adb4a48ec57b25028d6962b1ae1", + "sha256:c4180ffc6f909bf8db6cd81cff4b6f601d575568f4d5dee148c830e9851eb9db" + ], + "version": "==1.11" + }, + "more-itertools": { + "hashes": [ + "sha256:c187a73da93e7a8acc0001572aebc7e3c69daf7bf6881a2cea10650bd4420092", + "sha256:c476b5d3a34e12d40130bc2f935028b5f636df8f372dc2c1c01dc19681b2039e", + "sha256:fcbfeaea0be121980e15bc97b3817b5202ca73d0eae185b4550cbfce2a3ebb3d" + ], + "version": "==4.3.0" + }, + "mypy": { + "hashes": [ + "sha256:8e071ec32cc226e948a34bbb3d196eb0fd96f3ac69b6843a5aff9bd4efa14455", + "sha256:fb90c804b84cfd8133d3ddfbd630252694d11ccc1eb0166a1b2efb5da37ecab2" + ], + "index": "pypi", + "version": "==0.641" + }, + "mypy-extensions": { + "hashes": [ + "sha256:37e0e956f41369209a3d5f34580150bcacfabaa57b33a15c0b25f4b5725e0812", + "sha256:b16cabe759f55e3409a7d231ebd2841378fb0c27a5d1994719e340e4f429ac3e" + ], + "version": "==0.4.1" + }, + "parso": { + "hashes": [ + "sha256:35704a43a3c113cce4de228ddb39aab374b8004f4f2407d070b6a2ca784ce8a2", + "sha256:895c63e93b94ac1e1690f5fdd40b65f07c8171e3e53cbd7793b5b96c0e0a7f24" + ], + "version": "==0.3.1" + }, + "pathlib2": { + "hashes": [ + "sha256:8eb170f8d0d61825e09a95b38be068299ddeda82f35e96c3301a8a5e7604cb83", + "sha256:d1aa2a11ba7b8f7b21ab852b1fb5afb277e1bb99d5dfc663380b5015c0d80c5a" + ], + "index": "pypi", + "markers": "python_version < '3.6'", + "version": "==2.3.2" + }, + "pefile": { + "hashes": [ + "sha256:4c5b7e2de0c8cb6c504592167acf83115cbbde01fe4a507c16a1422850e86cd6" + ], + "version": "==2018.8.8" + }, + "pexpect": { + "hashes": [ + "sha256:2a8e88259839571d1251d278476f3eec5db26deb73a70be5ed5dc5435e418aba", + "sha256:3fbd41d4caf27fa4a377bfd16fef87271099463e6fa73e92a52f92dfee5d425b" + ], + "markers": "sys_platform != 'win32'", + "version": "==4.6.0" + }, + "pickleshare": { + "hashes": [ + "sha256:87683d47965c1da65cdacaf31c8441d12b8044cdec9aca500cd78fc2c683afca", + "sha256:9649af414d74d4df115d5d718f82acb59c9d418196b7b4290ed47a12ce62df56" + ], + "version": "==0.7.5" + }, + "pluggy": { + "hashes": [ + "sha256:447ba94990e8014ee25ec853339faf7b0fc8050cdc3289d4d71f7f410fb90095", + "sha256:bde19360a8ec4dfd8a20dcb811780a30998101f078fc7ded6162f0076f50508f" + ], + "version": "==0.8.0" + }, + "prompt-toolkit": { + "hashes": [ + "sha256:c1d6aff5252ab2ef391c2fe498ed8c088066f66bc64a8d5c095bbf795d9fec34", + "sha256:d4c47f79b635a0e70b84fdb97ebd9a274203706b1ee5ed44c10da62755cf3ec9", + "sha256:fd17048d8335c1e6d5ee403c3569953ba3eb8555d710bfc548faf0712666ea39" + ], + "version": "==2.0.7" + }, + "ptyprocess": { + "hashes": [ + "sha256:923f299cc5ad920c68f2bc0bc98b75b9f838b93b599941a6b63ddbc2476394c0", + "sha256:d7cc528d76e76342423ca640335bd3633420dc1366f258cb31d05e865ef5ca1f" + ], + "version": "==0.6.0" + }, + "py": { + "hashes": [ + "sha256:bf92637198836372b520efcba9e020c330123be8ce527e535d185ed4b6f45694", + "sha256:e76826342cefe3c3d5f7e8ee4316b80d1dd8a300781612ddbc765c17ba25a6c6" + ], + "version": "==1.7.0" + }, + "pygments": { + "hashes": [ + "sha256:78f3f434bcc5d6ee09020f92ba487f95ba50f1e3ef83ae96b9d5ffa1bab25c5d", + "sha256:dbae1046def0efb574852fab9e90209b23f556367b5a320c0bcb871c77c3e8cc" + ], + "version": "==2.2.0" + }, + "pyinstaller": { + "hashes": [ + "sha256:a5a6e04a66abfcf8761e89a2ebad937919c6be33a7b8963e1a961b55cb35986b" + ], + "index": "pypi", + "version": "==3.4" + }, + "pytest": { + "hashes": [ + "sha256:3f193df1cfe1d1609d4c583838bea3d532b18d6160fd3f55c9447fdca30848ec", + "sha256:e246cf173c01169b9617fc07264b7b1316e78d7a650055235d6d897bc80d9660" + ], + "index": "pypi", + "version": "==3.10.1" + }, + "pytest-cov": { + "hashes": [ + "sha256:513c425e931a0344944f84ea47f3956be0e416d95acbd897a44970c8d926d5d7", + "sha256:e360f048b7dae3f2f2a9a4d067b2dd6b6a015d384d1577c994a43f3f7cbad762" + ], + "index": "pypi", + "version": "==2.6.0" + }, + "six": { + "hashes": [ + "sha256:70e8a77beed4562e7f14fe23a786b54f6296e34344c23bc42f07b15018ff98e9", + "sha256:832dc0e10feb1aa2c68dcc57dbb658f1c7e65b9b61af69048abc87a2db00a0eb" + ], + "version": "==1.11.0" + }, + "traitlets": { + "hashes": [ + "sha256:9c4bd2d267b7153df9152698efb1050a5d84982d3384a37b2c1f7723ba3e7835", + "sha256:c6cb5e6f57c5a9bdaa40fa71ce7b4af30298fbab9ece9815b5d995ab6217c7d9" + ], + "version": "==4.3.2" + }, + "typed-ast": { + "hashes": [ + "sha256:0948004fa228ae071054f5208840a1e88747a357ec1101c17217bfe99b299d58", + "sha256:10703d3cec8dcd9eef5a630a04056bbc898abc19bac5691612acba7d1325b66d", + "sha256:1f6c4bd0bdc0f14246fd41262df7dfc018d65bb05f6e16390b7ea26ca454a291", + "sha256:25d8feefe27eb0303b73545416b13d108c6067b846b543738a25ff304824ed9a", + "sha256:29464a177d56e4e055b5f7b629935af7f49c196be47528cc94e0a7bf83fbc2b9", + "sha256:2e214b72168ea0275efd6c884b114ab42e316de3ffa125b267e732ed2abda892", + "sha256:3e0d5e48e3a23e9a4d1a9f698e32a542a4a288c871d33ed8df1b092a40f3a0f9", + "sha256:519425deca5c2b2bdac49f77b2c5625781abbaf9a809d727d3a5596b30bb4ded", + "sha256:57fe287f0cdd9ceaf69e7b71a2e94a24b5d268b35df251a88fef5cc241bf73aa", + "sha256:668d0cec391d9aed1c6a388b0d5b97cd22e6073eaa5fbaa6d2946603b4871efe", + "sha256:68ba70684990f59497680ff90d18e756a47bf4863c604098f10de9716b2c0bdd", + "sha256:6de012d2b166fe7a4cdf505eee3aaa12192f7ba365beeefaca4ec10e31241a85", + "sha256:79b91ebe5a28d349b6d0d323023350133e927b4de5b651a8aa2db69c761420c6", + "sha256:8550177fa5d4c1f09b5e5f524411c44633c80ec69b24e0e98906dd761941ca46", + "sha256:898f818399cafcdb93cbbe15fc83a33d05f18e29fb498ddc09b0214cdfc7cd51", + "sha256:94b091dc0f19291adcb279a108f5d38de2430411068b219f41b343c03b28fb1f", + "sha256:a26863198902cda15ab4503991e8cf1ca874219e0118cbf07c126bce7c4db129", + "sha256:a8034021801bc0440f2e027c354b4eafd95891b573e12ff0418dec385c76785c", + "sha256:bc978ac17468fe868ee589c795d06777f75496b1ed576d308002c8a5756fb9ea", + "sha256:c05b41bc1deade9f90ddc5d988fe506208019ebba9f2578c622516fd201f5863", + "sha256:c9b060bd1e5a26ab6e8267fd46fc9e02b54eb15fffb16d112d4c7b1c12987559", + "sha256:edb04bdd45bfd76c8292c4d9654568efaedf76fe78eb246dde69bdb13b2dad87", + "sha256:f19f2a4f547505fe9072e15f6f4ae714af51b5a681a97f187971f50c283193b6" + ], + "version": "==1.1.0" + }, + "wcwidth": { + "hashes": [ + "sha256:3df37372226d6e63e1b1e1eda15c594bca98a22d33a23832a90998faa96bc65e", + "sha256:f4ebe71925af7b40a864553f761ed559b43544f8f71746c2d756c7fe788ade7c" + ], + "version": "==0.1.7" + }, + "wheel": { + "hashes": [ + "sha256:196c9842d79262bb66fcf59faa4bd0deb27da911dbc7c6cdca931080eb1f0783", + "sha256:c93e2d711f5f9841e17f53b0e6c0ff85593f3b416b6eec7a9452041a59a42688" + ], + "index": "pypi", + "version": "==0.32.2" + } + } +} diff --git a/README.md b/README.md index 19cb3e1d8479ce26bd30de40c917a5e8a99f89f3..560d5a5548c1202c0fbdc4a80c8e6be81834214b 100644 --- a/README.md +++ b/README.md @@ -6,15 +6,14 @@ Author: Sybren A. Stüvel <sybren@blender.studio> ## Installation -Before you begin, make sure you have Flamenco Manager up and running. - -There are two ways to install Flamenco Worker: - -- If you have a distributable zip file (see [Packaging for distribution](#packaging-for-distribution)) - unzip it, `cd` into it, then run `./flamenco-worker` (or `flamenco-worker.exe` on Windows). - -- If you have a copy of the source files, run `pip3 install -e .` then run `flamenco-worker`. This - requires Python 3.5.2 or newer. +- Make sure you have Flamenco Manager up and running. +- Install [FFmpeg](https://ffmpeg.org/) and make sure the `ffmpeg` binary is on `$PATH`. +- Install Flamenco Worker in one of two ways: + - If you have a distributable zip file (see + [Packaging for distribution](#packaging-for-distribution)) unzip it, `cd` into it, + then run `./flamenco-worker` (or `flamenco-worker.exe` on Windows). + - If you have a copy of the source files, run `pipenv install` then run `flamenco-worker`. + This requires Python 3.7 or newer. ## Upgrading diff --git a/deploy_via_biflamanager.sh b/deploy_via_biflamanager.sh new file mode 100755 index 0000000000000000000000000000000000000000..6538157acbbec5754075ac245cb45adb8751a599 --- /dev/null +++ b/deploy_via_biflamanager.sh @@ -0,0 +1,30 @@ +#!/usr/bin/env bash + +ssh -o ClearAllForwardings=yes biflamanager -T <<EOT +set -e +cd \$HOME/flamenco-worker + +git reset --hard +git pull +pipenv install --dev --deploy +pipenv run ./mkdistfile.py + +last_file=\$(ls -rt dist/flamenco-worker* | tail -n 1) +dirname=\$(basename \$last_file | sed s/-linux.*//) +tar_path=\$(pwd)/\$last_file + +echo +echo "--------------------------------------------------------------" +echo "Deploying \$last_file" +echo "--------------------------------------------------------------" + +cd /shared/bin/flamenco-worker +tar zxvf \$tar_path +rm -f flamenco-worker +ln -s \$dirname/flamenco-worker . + +echo +echo "--------------------------------------------------------------" +echo "Done! Now restart workers to pick up the changes." +echo "--------------------------------------------------------------" +EOT diff --git a/flamenco-worker.cfg b/flamenco-worker.cfg index bacbe88c7b73d085f0d1b9f6df7b1c6460720931..e230405eae2e19a8c8bd7186115516c74d79928f 100644 --- a/flamenco-worker.cfg +++ b/flamenco-worker.cfg @@ -3,17 +3,22 @@ # The URL of the Flamenco Manager. Leave empty for auto-discovery via UPnP/SSDP. manager_url = http://LOCALHOST:5983 -task_types = sleep blender-render file-management +# Add the 'video-encoding' task type if you have ffmpeg installed. +task_types = sleep blender-render file-management exr-merge debug task_update_queue_db = flamenco-worker.db may_i_run_interval_seconds = 5 -push_log_max_interval_seconds = 20 -push_log_max_entries = 200 -push_act_max_interval_seconds = 10 +push_log_max_interval_seconds = 30 +push_log_max_entries = 2000 +push_act_max_interval_seconds = 15 worker_storage_dir = /home/milanjaros/work/temp/public/flamenco/render/in worker_output_dir = /home/milanjaros/work/temp/public/flamenco/render/out worker_blender_cmd = ./run_icc_mpi.sh +[pre_task_check] +write.0 = /home/milanjaros/work/temp/public/flamenco/render/_flamenco +write.1 = /home/milanjaros/work/temp/public/flamenco/render/spring/frames + [loggers] keys = root,flamenco_worker diff --git a/flamenco_worker/__init__.py b/flamenco_worker/__init__.py index 9b882979a61d61100247083788d72b4fe82c3120..53c32dd210eaf5cdf5c8728274f6522fbcead10d 100644 --- a/flamenco_worker/__init__.py +++ b/flamenco_worker/__init__.py @@ -1 +1 @@ -__version__ = '2.2-dev1' +__version__ = '2.2-dev10' diff --git a/flamenco_worker/cli.py b/flamenco_worker/cli.py index e89b9e35bb5d0bd01ae36da89023784e41be30ad..660d63101b6c67d020683c47ec62d5b83a227b4f 100644 --- a/flamenco_worker/cli.py +++ b/flamenco_worker/cli.py @@ -6,6 +6,7 @@ import logging import logging.config import os import pathlib +import typing import requests @@ -100,7 +101,11 @@ def main(): shutdown_future=shutdown_future, ) trunner = runner.TaskRunner( - shutdown_future=shutdown_future) + shutdown_future=shutdown_future, + subprocess_pid_file=confparser.value('subprocess_pid_file'), + ) + + pretask_check_params = parse_pretask_check_config(confparser, log) fworker = worker.FlamencoWorker( manager=fmanager, @@ -120,6 +125,7 @@ def main(): worker_blender_cmd=confparser.value('worker_blender_cmd'), initial_state='testing' if args.test else 'awake', run_single_task=args.single, + pretask_check_params=pretask_check_params, ) mir = may_i_run.MayIRun( @@ -171,6 +177,9 @@ def main(): loop.run_until_complete(asyncio.wait_for(mir_work_task, 5)) except requests.exceptions.ConnectionError: log.warning("Unable to connect to HTTP server, but that's fine as we're shutting down.") + except asyncio.TimeoutError: + log.debug("Timeout waiting for may-I-run task, " + "but that's fine as we're shutting down.") fworker.shutdown() @@ -203,6 +212,31 @@ def main(): log.warning('Flamenco Worker is shut down') +def parse_pretask_check_config(confparser, log): + """Parse the [pre_task_check] config section. + + :rtype: flamenco.worker.PreTaskCheckParams + """ + from . import worker + + check_read = [] + check_write = [] + for name, value in confparser.items(section='pre_task_check'): + if name.startswith('write'): + check_write.append(pathlib.Path(value)) + elif name.startswith('read'): + check_read.append(pathlib.Path(value)) + else: + log.fatal('Config section "pre_task_check" should only have keys starting with ' + '"read" or "write"; found %r', value) + raise SystemExit(47) + pretask_check_params = worker.PreTaskCheckParams( + pre_task_check_write=tuple(check_write), + pre_task_check_read=tuple(check_read), + ) + return pretask_check_params + + def asyncio_report_tasks(signum=0, stackframe=None): """Runs the garbage collector, then reports all AsyncIO tasks on the log. diff --git a/flamenco_worker/commands.py b/flamenco_worker/commands.py index 3f776eb378cd648d9fed595ed0c809d073db0ee8..500882b10ac5d7596b003250e99122842239bcfe 100644 --- a/flamenco_worker/commands.py +++ b/flamenco_worker/commands.py @@ -3,17 +3,31 @@ import abc import asyncio import asyncio.subprocess +import datetime import logging +import pathlib import re +import shlex +import shutil +import subprocess +import tempfile + import time import typing from pathlib import Path import attr +import psutil from . import worker -command_handlers = {} +command_handlers = {} # type: typing.Mapping[str, typing.Type['AbstractCommand']] + +# Some type declarations. +Settings = typing.MutableMapping[str, typing.Any] +# This is the type of the 2nd arg for instanceof(a, b) +InstanceOfType = typing.Union[type, typing.Tuple[typing.Union[type, typing.Tuple[typing.Any, ...]], + ...]] # Timeout of subprocess.stdout.readline() call. SUBPROC_READLINE_TIMEOUT = 3600 # seconds @@ -38,6 +52,8 @@ scene.render.filepath = "%(tmpdir)s/preview.jpg" bpy.ops.render.render(write_still=True) """ +log = logging.getLogger(__name__) + def command_executor(cmdname): """Class decorator, registers a command executor.""" @@ -75,17 +91,17 @@ class AbstractCommand(metaclass=abc.ABCMeta): # Set by __attr_post_init__() identifier = attr.ib(default=None, init=False, validator=attr.validators.optional(attr.validators.instance_of(str))) - _log = attr.ib(default=None, init=False, - validator=attr.validators.optional(attr.validators.instance_of(logging.Logger))) + _log = attr.ib(init=False, default=logging.getLogger('AbstractCommand'), + validator=attr.validators.instance_of(logging.Logger)) def __attrs_post_init__(self): self.identifier = '%s.(task_id=%s, command_idx=%s)' % ( self.command_name, self.task_id, self.command_idx) - self._log = logging.getLogger('%s.%s' % (__name__, self.identifier)) + self._log = log.getChild(self.identifier) - async def run(self, settings: dict) -> bool: + async def run(self, settings: Settings) -> bool: """Runs the command, parsing output and sending it back to the worker. Returns True when the command was succesful, and False otherwise. @@ -93,7 +109,7 @@ class AbstractCommand(metaclass=abc.ABCMeta): verr = self.validate(settings) if verr is not None: - self._log.warning('%s: Error in settings: %s', self.identifier, verr) + self._log.warning('Error in settings: %s', verr) await self.worker.register_log('%s: Error in settings: %s', self.identifier, verr) await self.worker.register_task_update( task_status='failed', @@ -150,13 +166,13 @@ class AbstractCommand(metaclass=abc.ABCMeta): ) @abc.abstractmethod - async def execute(self, settings: dict): + async def execute(self, settings: Settings) -> None: """Executes the command. An error should be indicated by an exception. """ - def validate(self, settings: dict): + def validate(self, settings: Settings) -> typing.Optional[str]: """Validates the settings for this command. If there is an error, a description of the error is returned. @@ -167,8 +183,10 @@ class AbstractCommand(metaclass=abc.ABCMeta): return None - def _setting(self, settings: dict, key: str, is_required: bool, valtype: typing.Type = str) -> ( - typing.Any, typing.Optional[str]): + def _setting(self, settings: Settings, key: str, is_required: bool, + valtype: InstanceOfType = str, + default: typing.Any = None) \ + -> typing.Tuple[typing.Any, typing.Optional[str]]: """Parses a setting, returns either (value, None) or (None, errormsg)""" try: @@ -176,20 +194,34 @@ class AbstractCommand(metaclass=abc.ABCMeta): except KeyError: if is_required: return None, 'Missing "%s"' % key - return None, None + settings.setdefault(key, default) + return default, None if value is None and not is_required: - return None, None + settings.setdefault(key, default) + return default, None if not isinstance(value, valtype): return None, '"%s" must be a %s, not a %s' % (key, valtype, type(value)) return value, None + async def _mkdir_if_not_exists(self, dirpath: Path): + """Create a directory if it doesn't exist yet. + + Also logs a message to the Worker to indicate the directory was created. + """ + if dirpath.exists(): + return + + await self.worker.register_log('%s: Directory %s does not exist; creating.', + self.command_name, dirpath) + dirpath.mkdir(parents=True) + @command_executor('echo') class EchoCommand(AbstractCommand): - def validate(self, settings: dict): + def validate(self, settings: Settings): try: msg = settings['message'] except KeyError: @@ -198,20 +230,20 @@ class EchoCommand(AbstractCommand): if not isinstance(msg, str): return 'Message must be a string' - async def execute(self, settings: dict): + async def execute(self, settings: Settings): await self.worker.register_log(settings['message']) @command_executor('log_a_lot') class LogALotCommand(AbstractCommand): - def validate(self, settings: dict): + def validate(self, settings: Settings): lines = settings.get('lines', 20000) if isinstance(lines, float): lines = int(lines) if not isinstance(lines, int): return '"lines" setting must be an integer, not %s' % type(lines) - async def execute(self, settings: dict): + async def execute(self, settings: Settings): lines = settings.get('lines', 20000) await self.worker.register_task_update(activity='logging %d lines' % lines) @@ -221,7 +253,7 @@ class LogALotCommand(AbstractCommand): @command_executor('sleep') class SleepCommand(AbstractCommand): - def validate(self, settings: dict): + def validate(self, settings: Settings): try: sleeptime = settings['time_in_seconds'] except KeyError: @@ -230,7 +262,7 @@ class SleepCommand(AbstractCommand): if not isinstance(sleeptime, (int, float)): return 'time_in_seconds must be an int or float' - async def execute(self, settings: dict): + async def execute(self, settings: Settings): time_in_seconds = settings['time_in_seconds'] await self.worker.register_log('Sleeping for %s seconds' % time_in_seconds) await asyncio.sleep(time_in_seconds) @@ -240,13 +272,11 @@ class SleepCommand(AbstractCommand): def _timestamped_path(path: Path) -> Path: """Returns the path with its modification time appended to the name.""" - from datetime import datetime - mtime = path.stat().st_mtime # Round away the milliseconds, as those aren't all that interesting. # Uniqueness is ensured by calling _unique_path(). - mdatetime = datetime.fromtimestamp(round(mtime)) + mdatetime = datetime.datetime.fromtimestamp(round(mtime)) # Make the ISO-8601 timestamp a bit more eye- and filename-friendly. iso = mdatetime.isoformat().replace('T', '_').replace(':', '') @@ -258,8 +288,6 @@ def _timestamped_path(path: Path) -> Path: def _unique_path(path: Path) -> Path: """Returns the path, or if it exists, the path with a unique suffix.""" - import re - suf_re = re.compile(r'~([0-9]+)$') # See which suffixes are in use @@ -271,16 +299,39 @@ def _unique_path(path: Path) -> Path: suffix = m.group(1) try: - suffix = int(suffix) + suffix_value = int(suffix) except ValueError: continue - max_nr = max(max_nr, suffix) + max_nr = max(max_nr, suffix_value) return path.with_name(path.name + '~%i' % (max_nr + 1)) +def _numbered_path(directory: Path, fname_prefix: str, fname_suffix: str) -> Path: + """Return a unique Path with a number between prefix and suffix. + + :return: directory / '{fname_prefix}001{fname_suffix}' where 001 is + replaced by the highest number + 1 if there already is a file with + such a prefix & suffix. + """ + + # See which suffixes are in use + max_nr = 0 + len_prefix = len(fname_prefix) + len_suffix = len(fname_suffix) + for altpath in directory.glob(f'{fname_prefix}*{fname_suffix}'): + num_str: str = altpath.name[len_prefix:-len_suffix] + + try: + num = int(num_str) + except ValueError: + continue + max_nr = max(max_nr, num) + return directory / f'{fname_prefix}{max_nr+1:03}{fname_suffix}' + + @command_executor('move_out_of_way') class MoveOutOfWayCommand(AbstractCommand): - def validate(self, settings: dict): + def validate(self, settings: Settings): try: src = settings['src'] except KeyError: @@ -289,7 +340,7 @@ class MoveOutOfWayCommand(AbstractCommand): if not isinstance(src, str): return 'src must be a string' - async def execute(self, settings: dict): + async def execute(self, settings: Settings): src = Path(settings['src']) if not src.exists(): self._log.info('Render output path %s does not exist, not moving out of way', src) @@ -310,12 +361,12 @@ class MoveOutOfWayCommand(AbstractCommand): @command_executor('move_to_final') class MoveToFinalCommand(AbstractCommand): - def validate(self, settings: dict): + def validate(self, settings: Settings): _, err1 = self._setting(settings, 'src', True) _, err2 = self._setting(settings, 'dest', True) return err1 or err2 - async def execute(self, settings: dict): + async def execute(self, settings: Settings): src = Path(settings['src']) if not src.exists(): msg = 'Path %s does not exist, not moving' % src @@ -344,7 +395,7 @@ class MoveToFinalCommand(AbstractCommand): @command_executor('copy_file') class CopyFileCommand(AbstractCommand): - def validate(self, settings: dict): + def validate(self, settings: Settings): src, err = self._setting(settings, 'src', True) if err: return err @@ -356,7 +407,7 @@ class CopyFileCommand(AbstractCommand): if not dest: return 'dest may not be empty' - async def execute(self, settings: dict): + async def execute(self, settings: Settings): src = Path(settings['src']) if not src.exists(): raise CommandExecutionError('Path %s does not exist, unable to copy' % src) @@ -370,25 +421,22 @@ class CopyFileCommand(AbstractCommand): self._log.info('Copying %s to %s', src, dest) await self.worker.register_log('%s: Copying %s to %s', self.command_name, src, dest) - if not dest.parent.exists(): - await self.worker.register_log('%s: Target directory %s does not exist; creating.', - self.command_name, dest.parent) - dest.parent.mkdir(parents=True) + await self._mkdir_if_not_exists(dest.parent) - import shutil shutil.copy(str(src), str(dest)) self.worker.output_produced(dest) + @command_executor('remove_tree') class RemoveTreeCommand(AbstractCommand): - def validate(self, settings: dict): + def validate(self, settings: Settings): path, err = self._setting(settings, 'path', True) if err: return err if not path: return "'path' may not be empty" - async def execute(self, settings: dict): + async def execute(self, settings: Settings): path = Path(settings['path']) if not path.exists(): msg = 'Path %s does not exist, so not removing.' % path @@ -401,26 +449,90 @@ class RemoveTreeCommand(AbstractCommand): await self.worker.register_log(msg) if path.is_dir(): - import shutil shutil.rmtree(str(path)) else: path.unlink() +@command_executor('remove_file') +class RemoveFileCommand(AbstractCommand): + def validate(self, settings: Settings): + path, err = self._setting(settings, 'path', True) + if err: + return err + if not path: + return "Parameter 'path' cannot be empty." + + async def execute(self, settings: Settings): + path = Path(settings['path']) + if not path.exists(): + msg = 'Path %s does not exist, so not removing.' % path + self._log.debug(msg) + await self.worker.register_log(msg) + return + + if path.is_dir(): + raise CommandExecutionError(f'Path {path} is a directory. Cannot remove with ' + 'this command; use remove_tree instead.') + + msg = 'Removing file %s' % path + self._log.info(msg) + await self.worker.register_log(msg) + + path.unlink() + + @attr.s -class AbstractSubprocessCommand(AbstractCommand): +class AbstractSubprocessCommand(AbstractCommand, abc.ABC): readline_timeout = attr.ib(default=SUBPROC_READLINE_TIMEOUT) proc = attr.ib(validator=attr.validators.instance_of(asyncio.subprocess.Process), init=False) - async def subprocess(self, args: list): - import subprocess - import shlex + @property + def subprocess_pid_file(self) -> typing.Optional[pathlib.Path]: + subprocess_pid_file = self.worker.trunner.subprocess_pid_file + if not subprocess_pid_file: + return None + return pathlib.Path(subprocess_pid_file) + + def validate(self, settings: Settings) -> typing.Optional[str]: + supererr = super().validate(settings) + if supererr: + return supererr + + pidfile = self.subprocess_pid_file + if pidfile is None: + self._log.warning('No subprocess PID file configured; this is not recommended.') + return None + + try: + pid_str = pidfile.read_text() + except FileNotFoundError: + # This is expected, as it means no subprocess is running. + return None + if not pid_str: + # This could be an indication that a PID file is being written right now + # (already opened, but the content hasn't been written yet). + return 'Empty PID file %s, refusing to create new subprocess just to be sure' % pidfile + + pid = int(pid_str) + self._log.warning('Found PID file %s with pid=%d', pidfile, pid) + + try: + proc = psutil.Process(pid) + except psutil.NoSuchProcess: + self._log.warning('Deleting pidfile %s for stale pid=%d', pidfile, pid) + pidfile.unlink() + return None + return 'Subprocess from %s is still running: %s' % (pidfile, proc) + async def subprocess(self, args: list): cmd_to_log = ' '.join(shlex.quote(s) for s in args) self._log.info('Executing %s', cmd_to_log) await self.worker.register_log('Executing %s', cmd_to_log) + line_logger = log.getChild(f'line.{self.identifier}') + self.proc = await asyncio.create_subprocess_exec( *args, stdin=subprocess.DEVNULL, @@ -428,55 +540,74 @@ class AbstractSubprocessCommand(AbstractCommand): stderr=subprocess.STDOUT, ) + pid_path = self.subprocess_pid_file + pid = self.proc.pid + if pid_path: + # Require exclusive creation to prevent race conditions. + try: + with pid_path.open('x') as pidfile: + pidfile.write(str(pid)) + except FileExistsError: + self._log.error('PID file %r already exists, killing just-spawned process pid=%d', + pid_path, pid) + await self.abort() + raise try: + assert self.proc.stdout is not None + while not self.proc.stdout.at_eof(): try: - line = await asyncio.wait_for(self.proc.stdout.readline(), - self.readline_timeout) + line_bytes = await asyncio.wait_for(self.proc.stdout.readline(), + self.readline_timeout) except asyncio.TimeoutError: - raise CommandExecutionError('Command timed out after %i seconds' % - self.readline_timeout) + raise CommandExecutionError('Command pid=%d timed out after %i seconds' % + (pid, self.readline_timeout)) - if len(line) == 0: + if len(line_bytes) == 0: # EOF received, so let's bail. break try: - line = line.decode('utf8') + line = line_bytes.decode('utf8') except UnicodeDecodeError as ex: await self.abort() - raise CommandExecutionError('Command produced non-UTF8 output, ' - 'aborting: %s' % ex) + raise CommandExecutionError( + 'Command pid=%d produced non-UTF8 output, aborting: %s' % (pid, ex)) line = line.rstrip() - self._log.debug('Read line: %s', line) - line = await self.process_line(line) - if line is not None: - await self.worker.register_log(line) + line_logger.debug('Read line pid=%d: %s', pid, line) + processed_line = await self.process_line(line) + if processed_line is not None: + await self.worker.register_log(processed_line) retcode = await self.proc.wait() - self._log.info('Command %r stopped with status code %s', args, retcode) + self._log.info('Command %s (pid=%d) stopped with status code %s', + cmd_to_log, pid, retcode) if retcode: - raise CommandExecutionError('Command failed with status %s' % retcode) + raise CommandExecutionError('Command %s (pid=%d) failed with status %s' % + (cmd_to_log, pid, retcode)) except asyncio.CancelledError: - self._log.info('asyncio task got canceled, killing subprocess.') + self._log.info('asyncio task got canceled, killing subprocess pid=%d', pid) await self.abort() raise + finally: + if pid_path: + pid_path.unlink() async def process_line(self, line: str) -> typing.Optional[str]: """Processes the line, returning None to ignore it.""" - return '> %s' % line + return 'pid=%d > %s' % (self.proc.pid, line) async def abort(self): """Aborts the command by killing the subprocess.""" - if self.proc is None or self.proc == attr.NOTHING: + if getattr(self, 'proc', None) is None or self.proc == attr.NOTHING: self._log.debug("No process to kill. That's ok.") return - self._log.info('Terminating subprocess') + self._log.info('Terminating subprocess pid=%d', self.proc.pid) try: self.proc.terminate() @@ -495,27 +626,30 @@ class AbstractSubprocessCommand(AbstractCommand): except asyncio.TimeoutError: pass else: - self._log.info('The process aborted with status code %s', retval) + self._log.info('The process pid=%d aborted with status code %s', self.proc.pid, retval) return - self._log.warning('The process did not stop in %d seconds, going to kill it', timeout) + self._log.warning('The process pid=%d did not stop in %d seconds, going to kill it', + self.proc.pid, timeout) try: self.proc.kill() except ProcessLookupError: - self._log.debug("The process was already stopped, aborting is impossible. That's ok.") + self._log.debug("The process pid=%d was already stopped, aborting is impossible. " + "That's ok.", self.proc.pid) return except AttributeError: # This can happen in some race conditions, it's fine. - self._log.debug("The process was not yet started, aborting is impossible. That's ok.") + self._log.debug("The process pid=%d was not yet started, aborting is impossible. " + "That's ok.", self.proc.pid) return retval = await self.proc.wait() - self._log.info('The process aborted with status code %s', retval) + self._log.info('The process %d aborted with status code %s', self.proc.pid, retval) @command_executor('exec') class ExecCommand(AbstractSubprocessCommand): - def validate(self, settings: dict): + def validate(self, settings: Settings): try: cmd = settings['cmd'] except KeyError: @@ -525,9 +659,9 @@ class ExecCommand(AbstractSubprocessCommand): return '"cmd" must be a string' if not cmd: return '"cmd" may not be empty' + return super().validate(settings) - async def execute(self, settings: dict): - import shlex + async def execute(self, settings: Settings): await self.subprocess(shlex.split(settings['cmd'])) @@ -540,6 +674,15 @@ class BlenderRenderCommand(AbstractSubprocessCommand): re_path_not_found = attr.ib(init=False) re_file_saved = attr.ib(init=False) + # These lines are produced by Cycles (and other rendering engines) for each + # object, choking the Manager with logs when there are too many objects. + # For now we have some custom code to swallow those lines, in lieu of a + # logging system that can handle those volumes properly. + substring_synchronizing = {'| Synchronizing object |', ' | Syncing '} + seen_synchronizing_line = False + + _last_activity_time: float = 0.0 + def __attrs_post_init__(self): super().__attrs_post_init__() @@ -554,9 +697,9 @@ class BlenderRenderCommand(AbstractSubprocessCommand): self.re_path_not_found = re.compile(r"Warning: Path '.*' not found") self.re_file_saved = re.compile(r"Saved: '(?P<filename>.*)'") - def validate(self, settings: dict): - import shlex + self._last_activity_time = 0.0 + def validate(self, settings: Settings): blender_cmd, err = self._setting(settings, 'blender_cmd', True) if err: return err @@ -594,9 +737,9 @@ class BlenderRenderCommand(AbstractSubprocessCommand): # Ok, now it's fatal. return 'filepath %r does not exist' % filepath - return None + return super().validate(settings) - async def execute(self, settings: dict): + async def execute(self, settings: Settings): cmd = self._build_blender_cmd(settings) await self.worker.register_task_update(activity='Starting Blender') @@ -654,6 +797,10 @@ class BlenderRenderCommand(AbstractSubprocessCommand): return info + def _is_sync_line(self, line: str) -> bool: + return any(substring in line + for substring in self.substring_synchronizing) + async def process_line(self, line: str) -> typing.Optional[str]: """Processes the line, returning None to ignore it.""" @@ -661,8 +808,18 @@ class BlenderRenderCommand(AbstractSubprocessCommand): if 'Warning: Unable to open' in line or self.re_path_not_found.search(line): await self.worker.register_task_update(activity=line) + if self._is_sync_line(line): + if self.seen_synchronizing_line: + return None + self.seen_synchronizing_line = True + return '> %s (NOTE FROM WORKER: only logging this line; skipping the rest of ' \ + 'the Synchronizing Objects lines)' % line + render_info = self.parse_render_line(line) - if render_info: + now = time.time() + # Only update render info every this many seconds, and not for every line Blender produces. + if render_info and now - self._last_activity_time < 30: + self._last_activity_time = now # Render progress. Not interesting to log all of them, but we do use # them to update the render progress. # TODO: For now we return this as a string, but at some point we may want @@ -671,7 +828,7 @@ class BlenderRenderCommand(AbstractSubprocessCommand): fmt = 'Fra:{fra} Mem:{mem} | Time:{time_sec} | Remaining:{remaining_sec} | {status}' activity = fmt.format(**render_info) else: - self._log.debug('Unable to find remaining time in line: %s', line) + # self._log.debug('Unable to find remaining time in line: %s', line) activity = line await self.worker.register_task_update(activity=activity) @@ -681,12 +838,12 @@ class BlenderRenderCommand(AbstractSubprocessCommand): self.worker.output_produced(m.group('filename')) # Not a render progress line; just log it for now. - return '> %s' % line + return 'pid=%d > %s' % (self.proc.pid, line) @command_executor('blender_render_progressive') class BlenderRenderProgressiveCommand(BlenderRenderCommand): - def validate(self, settings: dict): + def validate(self, settings: Settings): err = super().validate(settings) if err: return err @@ -712,9 +869,7 @@ class BlenderRenderProgressiveCommand(BlenderRenderCommand): @command_executor('merge_progressive_renders') class MergeProgressiveRendersCommand(AbstractSubprocessCommand): - def validate(self, settings: dict): - import shlex - + def validate(self, settings: Settings): blender_cmd, err = self._setting(settings, 'blender_cmd', True) if err: return err @@ -747,9 +902,9 @@ class MergeProgressiveRendersCommand(AbstractSubprocessCommand): _, err = self._setting(settings, 'weight2', True, int) if err: return err - async def execute(self, settings: dict): - import tempfile + return super().validate(settings) + async def execute(self, settings: Settings): blendpath = Path(__file__).with_name('merge-exr.blend') cmd = settings['blender_cmd'][:] @@ -763,7 +918,7 @@ class MergeProgressiveRendersCommand(AbstractSubprocessCommand): # set up node properties and render settings. output = Path(settings['output']) - output.parent.mkdir(parents=True, exist_ok=True) + await self._mkdir_if_not_exists(output.parent) with tempfile.TemporaryDirectory(dir=str(output.parent)) as tmpdir: tmppath = Path(tmpdir) @@ -787,8 +942,6 @@ class MergeProgressiveRendersCommand(AbstractSubprocessCommand): async def move(self, src: Path, dst: Path): """Moves a file to another location.""" - import shutil - self._log.info('Moving %s to %s', src, dst) assert src.is_file() @@ -797,3 +950,336 @@ class MergeProgressiveRendersCommand(AbstractSubprocessCommand): await self.worker.register_log('Moving %s to %s', src, dst) shutil.move(str(src), str(dst)) + + +@command_executor('blender_render_audio') +class BlenderRenderAudioCommand(BlenderRenderCommand): + def validate(self, settings: Settings): + err = super().validate(settings) + if err: + return err + + render_output, err = self._setting(settings, 'render_output', True) + if err: + return err + if not render_output: + return "'render_output' is a required setting" + + _, err = self._setting(settings, 'frame_start', False, int) + if err: + return err + _, err = self._setting(settings, 'frame_end', False, int) + if err: + return err + + def _build_blender_cmd(self, settings: Settings) -> typing.List[str]: + frame_start = settings.get('frame_start') + frame_end = settings.get('frame_end') + render_output = settings.get('render_output') + + py_lines = [ + "import bpy" + ] + if frame_start is not None: + py_lines.append(f'bpy.context.scene.frame_start = {frame_start}') + if frame_end is not None: + py_lines.append(f'bpy.context.scene.frame_end = {frame_end}') + + py_lines.append(f"bpy.ops.sound.mixdown(filepath={render_output!r}, " + f"codec='FLAC', container='FLAC', " + f"accuracy=128)") + py_lines.append('bpy.ops.wm.quit_blender()') + py_script = '\n'.join(py_lines) + + return [ + *settings['blender_cmd'], + '--enable-autoexec', + '-noaudio', + '--background', + settings['filepath'], + '--python-exit-code', '47', + '--python-expr', py_script + ] + + +class AbstractFFmpegCommand(AbstractSubprocessCommand, abc.ABC): + + def validate(self, settings: Settings) -> typing.Optional[str]: + # Check that FFmpeg can be found and shlex-split the string. + ffmpeg_cmd, err = self._setting(settings, 'ffmpeg_cmd', is_required=False, default='ffmpeg') + if err: + return err + + cmd = shlex.split(ffmpeg_cmd) + executable_path: typing.Optional[str] = shutil.which(cmd[0]) + if not executable_path: + return f'FFmpeg command {ffmpeg_cmd!r} not found on $PATH' + settings['ffmpeg_cmd'] = cmd + self._log.debug('Found FFmpeg command at %r', executable_path) + return None + + async def execute(self, settings: Settings) -> None: + cmd = self._build_ffmpeg_command(settings) + await self.subprocess(cmd) + + def _build_ffmpeg_command(self, settings: Settings) -> typing.List[str]: + assert isinstance(settings['ffmpeg_cmd'], list), \ + 'run validate() before _build_ffmpeg_command' + cmd = [ + *settings['ffmpeg_cmd'], + *self.ffmpeg_args(settings), + ] + return cmd + + @abc.abstractmethod + def ffmpeg_args(self, settings: Settings) -> typing.List[str]: + """Construct the FFmpeg arguments to execute. + + Does not need to include the FFmpeg command itself, just + its arguments. + """ + pass + + +@command_executor('create_video') +class CreateVideoCommand(AbstractFFmpegCommand): + """Create a video from individual frames. + + Requires FFmpeg to be installed and available with the 'ffmpeg' command. + """ + + codec_video = 'h264' + + # Select some settings that are useful for scrubbing through the video. + constant_rate_factor = 17 # perceptually lossless + keyframe_interval = 1 # GOP size + max_b_frames: typing.Optional[int] = 0 + + def validate(self, settings: Settings) -> typing.Optional[str]: + err = super().validate(settings) + if err: + return err + + # Check that we know our input and output image files. + input_files, err = self._setting(settings, 'input_files', is_required=True) + if err: + return err + self._log.debug('Input files: %s', input_files) + output_file, err = self._setting(settings, 'output_file', is_required=True) + if err: + return err + self._log.debug('Output file: %s', output_file) + + fps, err = self._setting(settings, 'fps', is_required=True, valtype=(int, float)) + if err: + return err + self._log.debug('Frame rate: %r fps', fps) + return None + + def ffmpeg_args(self, settings: Settings) -> typing.List[str]: + args = [ + '-pattern_type', 'glob', + '-r', str(settings['fps']), + '-i', settings['input_files'], + '-c:v', self.codec_video, + '-crf', str(self.constant_rate_factor), + '-g', str(self.keyframe_interval), + '-y', + ] + if self.max_b_frames is not None: + args.extend(['-bf', str(self.max_b_frames)]) + args += [ + settings['output_file'] + ] + return args + + +@command_executor('concatenate_videos') +class ConcatenateVideosCommand(AbstractFFmpegCommand): + """Create a video by concatenating other videos. + + Requires FFmpeg to be installed and available with the 'ffmpeg' command. + """ + + index_file: typing.Optional[Path] = None + + def validate(self, settings: Settings) -> typing.Optional[str]: + err = super().validate(settings) + if err: + return err + + # Check that we know our input and output image files. + input_files, err = self._setting(settings, 'input_files', is_required=True) + if err: + return err + self._log.debug('Input files: %s', input_files) + output_file, err = self._setting(settings, 'output_file', is_required=True) + if err: + return err + self._log.debug('Output file: %s', output_file) + + return None + + async def execute(self, settings: Settings) -> None: + await super().execute(settings) + + if self.index_file is not None and self.index_file.exists(): + try: + self.index_file.unlink() + except IOError: + msg = f'unable to unlink file {self.index_file}, ignoring' + self.worker.register_log(msg) + self._log.warning(msg) + + def ffmpeg_args(self, settings: Settings) -> typing.List[str]: + input_files = Path(settings['input_files']).absolute() + self.index_file = input_files.with_name('ffmpeg-input.txt') + + # Construct the list of filenames for ffmpeg to process. + # The index file needs to sit next to the input files, as + # ffmpeg checks for 'unsafe paths'. + with self.index_file.open('w') as outfile: + for video_path in sorted(input_files.parent.glob(input_files.name)): + escaped = str(video_path.name).replace("'", "\\'") + print("file '%s'" % escaped, file=outfile) + + output_file = Path(settings['output_file']) + self._log.debug('Output file: %s', output_file) + + args = [ + '-f', 'concat', + '-i', str(self.index_file), + '-c', 'copy', + '-y', + str(output_file), + ] + return args + + +@command_executor('mux_audio') +class MuxAudioCommand(AbstractFFmpegCommand): + + def validate(self, settings: Settings) -> typing.Optional[str]: + err = super().validate(settings) + if err: + return err + + # Check that we know our input and output image files. + audio_file, err = self._setting(settings, 'audio_file', is_required=True) + if err: + return err + if not Path(audio_file).exists(): + return f'Audio file {audio_file} does not exist' + self._log.debug('Audio file: %s', audio_file) + + video_file, err = self._setting(settings, 'video_file', is_required=True) + if err: + return err + if not Path(video_file).exists(): + return f'Video file {video_file} does not exist' + self._log.debug('Video file: %s', video_file) + + output_file, err = self._setting(settings, 'output_file', is_required=True) + if err: + return err + self._log.debug('Output file: %s', output_file) + + return None + + def ffmpeg_args(self, settings: Settings) -> typing.List[str]: + audio_file = Path(settings['audio_file']).absolute() + video_file = Path(settings['video_file']).absolute() + output_file = Path(settings['output_file']).absolute() + + args = [ + '-i', str(audio_file), + '-i', str(video_file), + '-c', 'copy', + '-y', + str(output_file), + ] + return args + + +@command_executor('encode_audio') +class EncodeAudioCommand(AbstractFFmpegCommand): + + def validate(self, settings: Settings) -> typing.Optional[str]: + err = super().validate(settings) + if err: + return err + + # Check that we know our input and output image files. + input_file, err = self._setting(settings, 'input_file', is_required=True) + if err: + return err + if not Path(input_file).exists(): + return f'Audio file {input_file} does not exist' + self._log.debug('Audio file: %s', input_file) + + output_file, err = self._setting(settings, 'output_file', is_required=True) + if err: + return err + self._log.debug('Output file: %s', output_file) + + _, err = self._setting(settings, 'bitrate', is_required=True) + if err: + return err + _, err = self._setting(settings, 'codec', is_required=True) + if err: + return err + return None + + def ffmpeg_args(self, settings: Settings) -> typing.List[str]: + input_file = Path(settings['input_file']).absolute() + output_file = Path(settings['output_file']).absolute() + + args = [ + '-i', str(input_file), + '-c:a', settings['codec'], + '-b:a', settings['bitrate'], + '-y', + str(output_file), + ] + return args + + +@command_executor('move_with_counter') +class MoveWithCounterCommand(AbstractCommand): + # Split '2018_12_06-spring.mkv' into a '2018_12_06' prefix and '-spring.mkv' suffix. + filename_parts = re.compile(r'(?P<prefix>^[0-9_]+)(?P<suffix>.*)$') + + def validate(self, settings: Settings): + src, err = self._setting(settings, 'src', True) + if err: + return err + if not src: + return 'src may not be empty' + dest, err = self._setting(settings, 'dest', True) + if err: + return err + if not dest: + return 'dest may not be empty' + + async def execute(self, settings: Settings): + src = Path(settings['src']) + if not src.exists(): + raise CommandExecutionError('Path %s does not exist, unable to move' % src) + + dest = Path(settings['dest']) + fname_parts = self.filename_parts.match(dest.name) + if fname_parts: + prefix = fname_parts.group('prefix') + '_' + suffix = fname_parts.group('suffix') + else: + prefix = dest.stem + '_' + suffix = dest.suffix + self._log.debug('Adding counter to output name between %r and %r', prefix, suffix) + dest = _numbered_path(dest.parent, prefix, suffix) + + self._log.info('Moving %s to %s', src, dest) + await self.worker.register_log('%s: Moving %s to %s', self.command_name, src, dest) + await self._mkdir_if_not_exists(dest.parent) + + shutil.move(str(src), str(dest)) + self.worker.output_produced(dest) diff --git a/flamenco_worker/config.py b/flamenco_worker/config.py index ea8c785c9ab8b6dc404e5aac77591d9d51f0292f..6c09d16b89068d4a8eb0e62f0cc67ee81e5ecb51 100644 --- a/flamenco_worker/config.py +++ b/flamenco_worker/config.py @@ -5,6 +5,7 @@ import configparser import datetime import pathlib import logging +import typing from . import worker @@ -15,9 +16,10 @@ CONFIG_SECTION = 'flamenco-worker' DEFAULT_CONFIG = { 'flamenco-worker': collections.OrderedDict([ ('manager_url', ''), - ('task_types', 'unknown sleep blender-render'), + # The 'video-encoding' tasks require ffmpeg to be installed, so it's not enabled by default. + ('task_types', 'sleep blender-render file-management exr-merge'), ('task_update_queue_db', 'flamenco-worker.db'), - ('task_only_one', 'False'), + ('subprocess_pid_file', 'flamenco-worker-subprocess.pid'), ('may_i_run_interval_seconds', '5'), ('worker_id', ''), ('worker_secret', ''), @@ -26,8 +28,9 @@ DEFAULT_CONFIG = { ('push_log_max_interval_seconds', str(worker.PUSH_LOG_MAX_INTERVAL.total_seconds())), ('push_log_max_entries', str(worker.PUSH_LOG_MAX_ENTRIES)), ('push_act_max_interval_seconds', str(worker.PUSH_ACT_MAX_INTERVAL.total_seconds())), - ]) -} + ]), + 'pre_task_check': collections.OrderedDict([]), +} # type: typing.Mapping[str, typing.Mapping[str, typing.Any]] # Will be assigned to the config key 'task_types' when started with --test CLI arg. TESTING_TASK_TYPES = 'test-blender-render' @@ -52,8 +55,8 @@ class ConfigParser(configparser.ConfigParser): secs = self.value(key, float) return datetime.timedelta(seconds=secs) - def erase(self, key: str) -> bool: - return self.set(CONFIG_SECTION, key, '') + def erase(self, key: str) -> None: + self.set(CONFIG_SECTION, key, '') def merge_with_home_config(new_conf: dict): @@ -92,13 +95,13 @@ def load_config(config_file: pathlib.Path = None, if config_file: log.info('Loading configuration from %s', config_file) if not config_file.exists(): - log.fatal('Config file %s does not exist', config_file) - raise SystemExit() + log.error('Config file %s does not exist', config_file) + raise SystemExit(47) loaded = confparser.read(str(config_file), encoding='utf8') else: if not GLOBAL_CONFIG_FILE.exists(): - log.fatal('Config file %s does not exist', GLOBAL_CONFIG_FILE) - raise SystemExit() + log.error('Config file %s does not exist', GLOBAL_CONFIG_FILE) + raise SystemExit(47) config_files = [GLOBAL_CONFIG_FILE, HOME_CONFIG_FILE] filenames = [str(f.absolute()) for f in config_files] diff --git a/flamenco_worker/may_i_run.py b/flamenco_worker/may_i_run.py index 5eec5ffd2d096779157cd3182ed642f4d89ccb27..2540474580a67313cc2a0190f3698dc775ba422c 100644 --- a/flamenco_worker/may_i_run.py +++ b/flamenco_worker/may_i_run.py @@ -38,11 +38,11 @@ class MayIRun: return if await self.may_i_run(task_id): - self._log.debug('Current task may run') + self._log.debug('Current task %s may run', task_id) return self._log.warning('We have to stop task %s', task_id) - await self.worker.stop_current_task() + await self.worker.stop_current_task(task_id) async def may_i_run(self, task_id: str) -> bool: """Asks the Manager whether we are still allowed to run the given task.""" diff --git a/flamenco_worker/runner.py b/flamenco_worker/runner.py index 69f140d280e4f6c0c99dc934eff3befdb4cc81aa..a517e6598e885f6986bd1c665a42ca800d66751b 100644 --- a/flamenco_worker/runner.py +++ b/flamenco_worker/runner.py @@ -17,6 +17,7 @@ class TaskRunner: """Runs tasks, sending updates back to the worker.""" shutdown_future = attr.ib(validator=attr.validators.instance_of(asyncio.Future)) + subprocess_pid_file = attr.ib(validator=attr.validators.instance_of(str)) last_command_idx = attr.ib(default=0, init=False) _log = attrs_extra.log('%s.TaskRunner' % __name__) diff --git a/flamenco_worker/ssdp_discover.py b/flamenco_worker/ssdp_discover.py index c1f6c18db00391004069d23bb1db559a89527fd5..29e6c9a80b7db289b84860811a494cf8b608fdce 100644 --- a/flamenco_worker/ssdp_discover.py +++ b/flamenco_worker/ssdp_discover.py @@ -30,9 +30,16 @@ class Response(HTTPResponse): self.fp = BytesIO(payload) self.debuglevel = 0 self.strict = 0 - self.headers = self.msg = None + + # This is also done in the HTTPResponse __init__ function, but + # MyPy still doesn't like it. + self.headers = self.msg = None # type: ignore + self._method = None - self.begin() + + # This function is available on the superclass, but still + # MyPy doesn't think it is. + self.begin() # type: ignore def interface_addresses(): diff --git a/flamenco_worker/tz.py b/flamenco_worker/tz.py index 25ea8c9bed931d50b9001075f38f7c322a2d1f74..4bc360d8e525d59b23be351915f5b0d6b66c4fac 100644 --- a/flamenco_worker/tz.py +++ b/flamenco_worker/tz.py @@ -22,7 +22,9 @@ class tzutc(datetime.tzinfo): return True - __hash__ = None + # Assigning a different type than object.__hash__ (None resp. Callable) + # is not allowed by MyPy. Here it's intentional, though. + __hash__ = None # type: ignore def __ne__(self, other): return not (self == other) diff --git a/flamenco_worker/upstream.py b/flamenco_worker/upstream.py index d48ab0b24cb91c4980dbde0c6f840d1ce394aa15..ec623c7a6948c8d9a85e6fb2de608054708263e0 100644 --- a/flamenco_worker/upstream.py +++ b/flamenco_worker/upstream.py @@ -1,6 +1,6 @@ import attr import concurrent.futures -import functools +import json as json_module # to prevent shadowing of 'json' parameter import requests from . import attrs_extra @@ -9,6 +9,13 @@ HTTP_RETRY_COUNT = 5 HTTP_TIMEOUT = 3 # in seconds +def elide(string: str, length: int) -> str: + """Cut the string to be no longer than 'length' characters.""" + if len(string) <= length: + return string + return f'{string[:length-3]}...' + + @attr.s class FlamencoManager: manager_url = attr.ib(validator=attr.validators.instance_of(str)) @@ -78,7 +85,8 @@ class FlamencoManager: if json is None: self._log.debug('%s %s', method, abs_url) else: - self._log.debug('%s %s with JSON: %s', method, abs_url, json) + for_log = elide(json_module.dumps(json), 80) + self._log.debug('%s %s with JSON: %s', method, abs_url, for_log) if headers is None: headers = {} diff --git a/flamenco_worker/upstream_update_queue.py b/flamenco_worker/upstream_update_queue.py index e4d24b66b751e4dde1f5ecd88cf716124a60fb66..c266da3fb265307d3ce13f4cd1fe5a98f6924013 100644 --- a/flamenco_worker/upstream_update_queue.py +++ b/flamenco_worker/upstream_update_queue.py @@ -98,6 +98,7 @@ class TaskUpdateQueue: if self._db is None: self._connect_db() + assert self._db is not None result = self._db.execute(''' SELECT rowid, url, payload @@ -114,6 +115,7 @@ class TaskUpdateQueue: """Return the number of items queued.""" if self._db is None: self._connect_db() + assert self._db is not None result = self._db.execute('SELECT count(*) FROM fworker_queue') count = next(result)[0] @@ -132,7 +134,7 @@ class TaskUpdateQueue: Returns True iff the queue was empty, even before flushing. """ - with (await self._queue_lock): + async with self._queue_lock: queue_is_empty = True queue_size_before = self.queue_size() handled = 0 @@ -142,15 +144,16 @@ class TaskUpdateQueue: queue_size = self.queue_size() self._log.info('Pushing task update to Manager, queue size is %d', queue_size) resp = await self.manager.post(url, json=payload, loop=loop) - if resp.status_code == 409: - # The task was assigned to another worker, so we're not allowed to - # push updates for it. We have to un-queue this update, as it will - # never be accepted. + if resp.status_code in {404, 409}: + # 404: Task doesn't exist (any more). + # 409: The task was assigned to another worker, so we're not allowed to + # push updates for it. We have to un-queue this update, as it will + # never be accepted. self._log.warning('discarding update, Manager says %s', resp.text) # TODO(sybren): delete all queued updates to the same URL? else: resp.raise_for_status() - self._log.debug('Master accepted pushed update.') + self._log.debug('Manager accepted pushed update.') self._unqueue(rowid) handled += 1 diff --git a/flamenco_worker/worker.py b/flamenco_worker/worker.py index 6df00177fe24e9526639914c1a205ca858f7790c..2c599f210e41305cdd04fe9b90adb70fdbf99926 100644 --- a/flamenco_worker/worker.py +++ b/flamenco_worker/worker.py @@ -1,10 +1,16 @@ import asyncio import datetime import enum +import functools +import itertools import pathlib +import tempfile +import time +import traceback import typing import attr +import requests.exceptions from . import attrs_extra from . import documents @@ -16,6 +22,7 @@ REGISTER_AT_MANAGER_FAILED_RETRY_DELAY = 30 FETCH_TASK_FAILED_RETRY_DELAY = 10 # when we failed obtaining a task FETCH_TASK_EMPTY_RETRY_DELAY = 5 # when there are no tasks to perform FETCH_TASK_DONE_SCHEDULE_NEW_DELAY = 3 # after a task is completed +ERROR_RETRY_DELAY = 600 # after the pre-task sanity check failed PUSH_LOG_MAX_ENTRIES = 1000 PUSH_LOG_MAX_INTERVAL = datetime.timedelta(seconds=30) @@ -40,9 +47,20 @@ class WorkerState(enum.Enum): STARTING = 'starting' AWAKE = 'awake' ASLEEP = 'asleep' + ERROR = 'error' SHUTTING_DOWN = 'shutting-down' +@attr.s(auto_attribs=True) +class PreTaskCheckParams: + pre_task_check_write: typing.Iterable[str] = [] + pre_task_check_read: typing.Iterable[str] = [] + + +class PreTaskCheckFailed(PermissionError): + """Raised when the pre-task sanity check fails.""" + + @attr.s class FlamencoWorker: manager = attr.ib(validator=attr.validators.instance_of(upstream.FlamencoManager)) @@ -74,17 +92,17 @@ class FlamencoWorker: task_is_silently_aborting = attr.ib(default=False, init=False, validator=attr.validators.instance_of(bool)) - fetch_task_task = attr.ib( + single_iteration_fut = attr.ib( default=None, init=False, - validator=attr.validators.optional(attr.validators.instance_of(asyncio.Task))) - asyncio_execution_task = attr.ib( + validator=attr.validators.optional(attr.validators.instance_of(asyncio.Future))) + asyncio_execution_fut = attr.ib( default=None, init=False, - validator=attr.validators.optional(attr.validators.instance_of(asyncio.Task))) + validator=attr.validators.optional(attr.validators.instance_of(asyncio.Future))) # See self.sleeping() - sleeping_task = attr.ib( + sleeping_fut = attr.ib( default=None, init=False, - validator=attr.validators.optional(attr.validators.instance_of(asyncio.Task))) + validator=attr.validators.optional(attr.validators.instance_of(asyncio.Future))) task_id = attr.ib( default=None, init=False, @@ -94,7 +112,7 @@ class FlamencoWorker: default=None, init=False, validator=attr.validators.optional(attr.validators.instance_of(str)) ) - _queued_log_entries = attr.ib(default=attr.Factory(list), init=False) + _queued_log_entries = attr.ib(default=attr.Factory(list), init=False) # type: typing.List[str] _queue_lock = attr.ib(default=attr.Factory(asyncio.Lock), init=False) last_log_push = attr.ib( default=attr.Factory(datetime.datetime.now), @@ -115,6 +133,9 @@ class FlamencoWorker: push_act_max_interval = attr.ib(default=PUSH_ACT_MAX_INTERVAL, validator=attr.validators.instance_of(datetime.timedelta)) + pretask_check_params = attr.ib(factory=PreTaskCheckParams, + validator=attr.validators.instance_of(PreTaskCheckParams)) + # Futures that represent delayed calls to push_to_manager(). # They are scheduled when logs & activities are registered but not yet pushed. They are # cancelled when a push_to_manager() actually happens for another reason. There are different @@ -134,11 +155,13 @@ class FlamencoWorker: _log = attrs_extra.log('%s.FlamencoWorker' % __name__) + _last_output_produced = 0.0 # seconds since epoch + @property def active_task_id(self) -> typing.Optional[str]: """Returns the task ID, but only if it is currently executing; returns None otherwise.""" - if self.asyncio_execution_task is None or self.asyncio_execution_task.done(): + if self.asyncio_execution_fut is None or self.asyncio_execution_fut.done(): return None return self.task_id @@ -165,16 +188,22 @@ class FlamencoWorker: self.schedule_fetch_task() - @staticmethod + @functools.lru_cache(maxsize=1) def hostname() -> str: import socket return socket.gethostname() - async def _keep_posting_to_manager(self, url: str, json: dict, *, use_auth=True, - may_retry_loop: bool): - import requests + @property + def nickname(self) -> str: + return self.hostname() + + @property + def identifier(self) -> str: + return f'{self.worker_id} ({self.nickname})' + async def _keep_posting_to_manager(self, url: str, json: dict, *, use_auth=True, + may_retry_loop: bool) -> requests.Response: post_kwargs = { 'json': json, 'loop': self.loop, @@ -263,30 +292,36 @@ class FlamencoWorker: :param delay: delay in seconds, after which the task fetch will be performed. """ - if first and self.task_only_one: - self.shutdown() - return - - # The current task may still be running, as fetch_task() calls schedule_fetch_task() to + # The current task may still be running, as single_iteration() calls schedule_fetch_task() to # schedule a future run. This may result in the task not being awaited when we are # shutting down. - if self.shutdown_future.done(): + if self.shutdown_future is not None and self.shutdown_future.done(): self._log.warning('Shutting down, not scheduling another fetch-task task.') return - self.fetch_task_task = asyncio.ensure_future(self.fetch_task(delay), loop=self.loop) + self.single_iteration_fut = asyncio.ensure_future(self.single_iteration(delay), + loop=self.loop) - async def stop_current_task(self): + async def stop_current_task(self, task_id: str): """Stops the current task by canceling the AsyncIO task. - This causes a CancelledError in the self.fetch_task() function, which then takes care + This causes a CancelledError in the self.single_iteration() function, which then takes care of the task status change and subsequent activity push. + + :param task_id: the task ID to stop. Will only perform a stop if it + matches the currently executing task. This is to avoid race + conditions. """ - if not self.asyncio_execution_task or self.asyncio_execution_task.done(): + if not self.asyncio_execution_fut or self.asyncio_execution_fut.done(): self._log.warning('stop_current_task() called but no task is running.') return + if self.task_id != task_id: + self._log.warning('stop_current_task(%r) called, but current task is %r, not stopping', + task_id, self.task_id) + return + self._log.warning('Stopping task %s', self.task_id) self.task_is_silently_aborting = True @@ -294,13 +329,38 @@ class FlamencoWorker: await self.trunner.abort_current_task() except asyncio.CancelledError: self._log.info('asyncio task was canceled for task runner task %s', self.task_id) - self.asyncio_execution_task.cancel() + self.asyncio_execution_fut.cancel() await self.register_log('Worker %s stopped running this task,' - ' no longer allowed to run by Manager', self.worker_id) + ' no longer allowed to run by Manager', self.identifier) + await self.requeue_task_on_manager(task_id) + + + async def requeue_task_on_manager(self, task_id: str): + """Return a task to the Manager's queue for later execution.""" + + self._log.info('Returning task %s to the Manager queue', task_id) + await self.push_to_manager() await self.tuqueue.flush_and_report(loop=self.loop) + url = f'/tasks/{task_id}/return' + try: + resp = await self.manager.post(url, loop=self.loop) + except IOError as ex: + self._log.exception('Exception POSTing to %s', url) + return + + if resp.status_code != 204: + self._log.warning('Error %d returning task %s to Manager: %s', + resp.status_code, resp.json()) + await self.register_log('Worker %s could not return this task to the Manager queue', + self.identifier) + return + + await self.register_log('Worker %s returned this task to the Manager queue.', + self.identifier) + def shutdown(self): """Gracefully shuts down any asynchronous tasks.""" @@ -333,6 +393,8 @@ class FlamencoWorker: self.loop.run_until_complete(self.manager.post('/sign-off', loop=self.loop)) except Exception as ex: self._log.warning('Error signing off. Continuing with shutdown. %s', ex) + + # TODO(Sybren): do this in a finally-clause: self.failures_are_acceptable = False def stop_fetching_tasks(self): @@ -341,40 +403,37 @@ class FlamencoWorker: Used in shutdown and when we're going to status 'asleep'. """ - if self.fetch_task_task is None or self.fetch_task_task.done(): + if self.single_iteration_fut is None or self.single_iteration_fut.done(): return - self._log.info('stopping task fetching task %s', self.fetch_task_task) - self.fetch_task_task.cancel() + self._log.info('stopping task fetching task %s', self.single_iteration_fut) + self.single_iteration_fut.cancel() # This prevents a 'Task was destroyed but it is pending!' warning on the console. # Sybren: I've only seen this in unit tests, so maybe this code should be moved # there, instead. try: if not self.loop.is_running(): - self.loop.run_until_complete(self.fetch_task_task) + self.loop.run_until_complete(self.single_iteration_fut) except asyncio.CancelledError: pass - async def fetch_task(self, delay: float): + async def single_iteration(self, delay: float): """Fetches a single task to perform from Flamenco Manager, and executes it. :param delay: waits this many seconds before fetching a task. """ - import traceback - import requests - self.state = WorkerState.AWAKE self._cleanup_state_for_new_task() - self._log.debug('Going to fetch task in %s seconds', delay) + # self._log.debug('Going to fetch task in %s seconds', delay) await asyncio.sleep(delay) # Prevent outgoing queue overflowing by waiting until it's below the # threshold before starting another task. # TODO(sybren): introduce another worker state for this, and handle there. - with (await self._queue_lock): + async with self._queue_lock: queue_size = self.tuqueue.queue_size() if queue_size > QUEUE_SIZE_THRESHOLD: self._log.info('Task Update Queue size too large (%d > %d), waiting until it shrinks.', @@ -382,46 +441,64 @@ class FlamencoWorker: self.schedule_fetch_task(FETCH_TASK_FAILED_RETRY_DELAY) return + try: + self.pre_task_sanity_check() + except PreTaskCheckFailed as ex: + self._log.exception('Pre-task sanity check failed: %s, waiting until it succeeds', ex) + self.go_to_state_error() + return + + task_info = await self.fetch_task() + if task_info is None: + return + + await self.execute_task(task_info) + + async def fetch_task(self) -> typing.Optional[dict]: # TODO: use exponential backoff instead of retrying every fixed N seconds. - self._log.debug('Fetching task') + log = self._log.getChild('fetch_task') + log.debug('Fetching task') try: resp = await self.manager.post('/task', loop=self.loop) except requests.exceptions.RequestException as ex: - self._log.warning('Error fetching new task, will retry in %i seconds: %s', + log.warning('Error fetching new task, will retry in %i seconds: %s', FETCH_TASK_FAILED_RETRY_DELAY, ex) self.schedule_fetch_task(FETCH_TASK_FAILED_RETRY_DELAY) - return + return None if resp.status_code == 204: - self._log.debug('No tasks available, will retry in %i seconds.', + log.debug('No tasks available, will retry in %i seconds.', FETCH_TASK_EMPTY_RETRY_DELAY) self.schedule_fetch_task(FETCH_TASK_EMPTY_RETRY_DELAY) - return + return None if resp.status_code == 423: status_change = documents.StatusChangeRequest(**resp.json()) - self._log.info('status change to %r requested when fetching new task', + log.info('status change to %r requested when fetching new task', status_change.status_requested) self.change_status(status_change.status_requested) - return + return None if resp.status_code != 200: - self._log.warning('Error %i fetching new task, will retry in %i seconds.', + log.warning('Error %i fetching new task, will retry in %i seconds.', resp.status_code, FETCH_TASK_FAILED_RETRY_DELAY) self.schedule_fetch_task(FETCH_TASK_FAILED_RETRY_DELAY) - return + return None task_info = resp.json() self.task_id = task_info['_id'] - self._log.info('Received task: %s', self.task_id) - self._log.debug('Received task: %s', task_info) + log.info('Received task: %s', self.task_id) + log.debug('Received task: %s', task_info) + return task_info + async def execute_task(self, task_info: dict) -> None: + """Feed a task to the task runner and monitor for exceptions.""" try: await self.register_task_update(task_status='active') - self.asyncio_execution_task = asyncio.ensure_future( + self.asyncio_execution_fut = asyncio.ensure_future( self.trunner.execute(task_info, self), loop=self.loop) - ok = await self.asyncio_execution_task + ok = await self.asyncio_execution_fut if ok: await self.register_task_update( task_status='completed', @@ -450,7 +527,7 @@ class FlamencoWorker: # Such a failure will always result in a failed task, even when # self.failures_are_acceptable = True; only expected failures are # acceptable then. - with (await self._queue_lock): + async with self._queue_lock: self._queued_log_entries.append(traceback.format_exc()) await self.register_task_update( task_status='failed', @@ -487,15 +564,16 @@ class FlamencoWorker: self._log.debug('Scheduled delayed push to manager in %r seconds', delay_sec) await asyncio.sleep(delay_sec) + assert self.shutdown_future is not None if self.shutdown_future.done(): self._log.info('Shutting down, not pushing changes to manager.') self._log.info('Updating task %s with status %r and activity %r', self.task_id, self.current_task_status, self.last_task_activity) + payload: typing.MutableMapping[str, typing.Any] = {} if self.task_is_silently_aborting: self._log.info('push_to_manager: task is silently aborting, will only push logs') - payload = {} else: payload = attr.asdict(self.last_task_activity) if self.current_task_status: @@ -508,7 +586,7 @@ class FlamencoWorker: if self._push_act_to_manager is not None: self._push_act_to_manager.cancel() - with (await self._queue_lock): + async with self._queue_lock: if self._queued_log_entries: payload['log'] = '\n'.join(self._queued_log_entries) self._queued_log_entries.clear() @@ -558,7 +636,7 @@ class FlamencoWorker: self._push_act_to_manager = asyncio.ensure_future( self.push_to_manager(delay=self.push_act_max_interval)) - async def register_log(self, log_entry, *fmt_args): + async def register_log(self, log_entry: str, *fmt_args): """Registers a log entry, and possibly sends all queued log entries to upstream Manager. Supports variable arguments, just like the logger.{info,warn,error}(...) family @@ -571,7 +649,7 @@ class FlamencoWorker: log_entry %= fmt_args now = datetime.datetime.now(tz.tzutc()).isoformat() - with (await self._queue_lock): + async with self._queue_lock: self._queued_log_entries.append('%s: %s' % (now, log_entry)) queue_size = len(self._queued_log_entries) @@ -593,8 +671,17 @@ class FlamencoWorker: This performs a HTTP POST in a background task, returning as soon as the task is scheduled. + + Only sends an update every X seconds, to avoid sending too many + requests when we output frames rapidly. """ + now = time.time() + if now - self._last_output_produced < 30: + self._log.debug('Throttling POST to Manager /output-produced endpoint') + return + self._last_output_produced = now + async def do_post(): try: self._log.info('Sending %i path(s) to Manager', len(paths)) @@ -619,6 +706,7 @@ class FlamencoWorker: 'asleep': self.go_to_state_asleep, 'awake': self.go_to_state_awake, 'shutdown': self.go_to_state_shutdown, + 'error': self.go_to_state_error, } try: @@ -629,7 +717,7 @@ class FlamencoWorker: handler() - def ack_status_change(self, new_status: str) -> asyncio.Task: + def ack_status_change(self, new_status: str) -> typing.Optional[asyncio.Task]: """Confirm that we're now in a certain state. This ACK can be given without a request from the server, for example to support @@ -641,6 +729,7 @@ class FlamencoWorker: return self.loop.create_task(post) except Exception: self._log.exception('unable to notify Manager') + return None def go_to_state_asleep(self): """Starts polling for wakeup calls.""" @@ -648,8 +737,8 @@ class FlamencoWorker: self._log.info('Going to sleep') self.state = WorkerState.ASLEEP self.stop_fetching_tasks() - self.sleeping_task = self.loop.create_task(self.sleeping()) - self._log.debug('Created task %s', self.sleeping_task) + self.sleeping_fut = self.loop.create_task(self.sleeping()) + self._log.debug('Created task %s', self.sleeping_fut) self.ack_status_change('asleep') def go_to_state_awake(self): @@ -658,7 +747,7 @@ class FlamencoWorker: self._log.info('Waking up') self.state = WorkerState.AWAKE self.stop_sleeping() - self.schedule_fetch_task(3) + self.schedule_fetch_task(FETCH_TASK_DONE_SCHEDULE_NEW_DELAY) self.ack_status_change('awake') def go_to_state_shutdown(self): @@ -677,13 +766,20 @@ class FlamencoWorker: # to asleep status when we come back online. self.loop.stop() + def go_to_state_error(self): + """Go to the error state and try going to active after a delay.""" + self.state = WorkerState.ERROR + self._log.warning('Going to state %r', self.state.value) + self.ack_status_change(self.state.value) + self.sleeping_fut = self.loop.create_task(self.sleeping_for_error()) + def stop_sleeping(self): """Stops the asyncio task for sleeping.""" - if self.sleeping_task is None or self.sleeping_task.done(): + if self.sleeping_fut is None or self.sleeping_fut.done(): return - self.sleeping_task.cancel() + self.sleeping_fut.cancel() try: - self.sleeping_task.result() + self.sleeping_fut.result() except (asyncio.CancelledError, asyncio.InvalidStateError): pass except Exception: @@ -716,6 +812,86 @@ class FlamencoWorker: except: self._log.exception('problems while sleeping') + async def sleeping_for_error(self): + """After a delay go to active mode to see if any errors are now resolved.""" + + try: + await asyncio.sleep(ERROR_RETRY_DELAY) + except asyncio.CancelledError: + self._log.info('Error-sleeping ended') + return + except: + self._log.exception('problems while error-sleeping') + return + + self._log.warning('Error sleep is done, going to try to become active again') + self.go_to_state_awake() + + def pre_task_sanity_check(self): + """Perform readability and writability checks before fetching tasks.""" + + self._pre_task_check_read() + self._pre_task_check_write() + self._log.getChild('sanity_check').debug('Pre-task sanity check OK') + + def _pre_task_check_read(self): + pre_task_check_read = self.pretask_check_params.pre_task_check_read + if not pre_task_check_read: + return + + log = self._log.getChild('sanity_check') + log.debug('Performing pre-task read check') + for read_name in pre_task_check_read: + read_path = pathlib.Path(read_name).absolute() + log.debug(' - Read check on %s', read_path) + if not read_path.exists(): + raise PreTaskCheckFailed('%s does not exist' % read_path) from None + if read_path.is_dir(): + try: + (read_path / 'anything').stat() + except PermissionError: + raise PreTaskCheckFailed('%s is not readable' % read_path) from None + except FileNotFoundError: + # This is expected. + pass + except: + log.exception('Unexpected shit happened') + raise SystemExit(44) + else: + try: + with read_path.open(mode='r') as the_file: + the_file.read(1) + except IOError: + raise PreTaskCheckFailed('%s is not readable' % read_path) from None + + def _pre_task_check_write(self): + pre_task_check_write = self.pretask_check_params.pre_task_check_write + if not pre_task_check_write: + return + + log = self._log.getChild('sanity_check') + log.debug('Performing pre-task write check') + for write_name in pre_task_check_write: + write_path = pathlib.Path(write_name).absolute() + log.debug(' - Write check on %s', write_path) + + post_delete = False + try: + if write_path.is_dir(): + testfile = tempfile.TemporaryFile('w', dir=str(write_path)) + else: + post_delete = not write_path.exists() + testfile = write_path.open('a+') + with testfile as outfile: + outfile.write('♥') + except PermissionError: + raise PreTaskCheckFailed('%s is not writable' % write_path) from None + if post_delete: + try: + write_path.unlink() + except PermissionError: + log.warning('Unable to delete write-test-file %s', write_path) + def generate_secret() -> str: """Generates a 64-character secret key.""" diff --git a/publish-online.sh b/publish-online.sh index 8af1d16728dc502890c3dc858ab8e4a61f28cd84..a0a49f06e90d4d66f0321c8ecc313be2b19925f0 100755 --- a/publish-online.sh +++ b/publish-online.sh @@ -1,6 +1,6 @@ #!/bin/bash -e -FLAMENCO_VERSION="2.2-dev1" +FLAMENCO_VERSION="2.2-dev10" cd dist diff --git a/requirements-dev.txt b/requirements-dev.txt deleted file mode 100644 index 4f85618ef25a7e780deedc045f6993217e2a01d0..0000000000000000000000000000000000000000 --- a/requirements-dev.txt +++ /dev/null @@ -1,4 +0,0 @@ --r requirements-test.txt -ipython -pyinstaller -wheel diff --git a/requirements-test.txt b/requirements-test.txt deleted file mode 100644 index 98ed669fe6479ac2986a0e3d3ee8ba4acbeaf18b..0000000000000000000000000000000000000000 --- a/requirements-test.txt +++ /dev/null @@ -1,9 +0,0 @@ --r requirements.txt - -# Primary dependencies -pytest==3.0.5 -pytest-cov==2.4.0 - -# Secondary dependencies -coverage==4.2 -py==1.4.32 diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 846e4ce00e6506db510dd08358f16d9062196870..0000000000000000000000000000000000000000 --- a/requirements.txt +++ /dev/null @@ -1,2 +0,0 @@ -attrs==16.3.0 -requests==2.12.4 diff --git a/setup.cfg b/setup.cfg index 8a274d060a80911bee79825592bba41437583418..41e185cf81c3826fd38c34077c387658130671c5 100644 --- a/setup.cfg +++ b/setup.cfg @@ -2,10 +2,11 @@ addopts = -v --cov flamenco_worker --cov-report term-missing --ignore node_modules [mypy] -python_version = 3.5 +python_version = 3.7 warn_unused_ignores = True ignore_missing_imports = True follow_imports = skip +incremental = True [pep8] max-line-length = 100 diff --git a/setup.py b/setup.py index c159d32320734227a809754e2aac68845d0f64c2..3102383a2e0562a2918d56d3a730b31ad4761526 100755 --- a/setup.py +++ b/setup.py @@ -86,7 +86,7 @@ if __name__ == '__main__': setuptools.setup( cmdclass={'zip': ZipCommand}, name='flamenco-worker', - version='2.2-dev1', + version='2.2-dev10', description='Flamenco Worker implementation', author='Sybren A. Stüvel', author_email='sybren@blender.studio', diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/tests/test_commands_blender_render.py b/tests/test_commands_blender_render.py index d5a3e84b8694145eff29924915cf2efc8389581e..caa1a560bd1ddb1b051a8b6948076766888ab43d 100644 --- a/tests/test_commands_blender_render.py +++ b/tests/test_commands_blender_render.py @@ -1,9 +1,10 @@ from pathlib import Path import subprocess +from unittest import mock -from unittest.mock import patch, call +from unittest.mock import patch -from test_runner import AbstractCommandTest +from tests.test_runner import AbstractCommandTest class BlenderRenderTest(AbstractCommandTest): @@ -29,6 +30,17 @@ class BlenderRenderTest(AbstractCommandTest): 'hunds': '17', }) + def test_is_sync_line(self): + # Cycles + line = 'Fra:116 Mem:2348.62M (0.00M, Peak 2562.33M) | Time:02:31.54 | Mem:0.00M, ' \ + 'Peak:0.00M | 02_005_A.lighting, R-final ' \ + '| Synchronizing object | GEO-frost_particle.007' + self.assertTrue(self.cmd._is_sync_line(line)) + + # Non-cycles (render engine set to Cycles without Cycles support in Blender). + line = 'Fra:1 Mem:67.05M (0.00M, Peak 98.78M) | Time:00:00.17 | Syncing Suzanne.003' + self.assertTrue(self.cmd._is_sync_line(line)) + def test_parse_render_line(self): line = 'Fra:10 Mem:17.52M (0.00M, Peak 33.47M) | Time:00:04.17 | Remaining:00:00.87 | ' \ 'Mem:1.42M, Peak:1.42M | Scene, RenderLayer | Path Tracing Tile 110/135' @@ -60,6 +72,8 @@ class BlenderRenderTest(AbstractCommandTest): """Missing files should not abort the render.""" line = 'Warning: Unable to open je moeder' + self.cmd.proc = mock.Mock() + self.cmd.proc.pid = 47 self.loop.run_until_complete(self.cmd.process_line(line)) self.fworker.register_task_update.assert_called_once_with(activity=line) @@ -70,7 +84,7 @@ class BlenderRenderTest(AbstractCommandTest): def test_cli_args(self): """Test that CLI arguments in the blender_cmd setting are handled properly.""" - from mock_responses import CoroMock + from tests.mock_responses import CoroMock filepath = str(Path(__file__).parent) settings = { @@ -84,6 +98,7 @@ class BlenderRenderTest(AbstractCommandTest): cse = CoroMock(...) cse.coro.return_value.wait = CoroMock(return_value=0) + cse.coro.return_value.pid = 47 with patch('asyncio.create_subprocess_exec', new=cse) as mock_cse: self.loop.run_until_complete(self.cmd.run(settings)) @@ -103,7 +118,7 @@ class BlenderRenderTest(AbstractCommandTest): ) def test_python_expr(self): - from mock_responses import CoroMock + from tests.mock_responses import CoroMock filepath = str(Path(__file__).parent) settings = { @@ -118,6 +133,7 @@ class BlenderRenderTest(AbstractCommandTest): cse = CoroMock(...) cse.coro.return_value.wait = CoroMock(return_value=0) + cse.coro.return_value.pid = 47 with patch('asyncio.create_subprocess_exec', new=cse) as mock_cse: self.loop.run_until_complete(self.cmd.run(settings)) diff --git a/tests/test_commands_blender_render_audio.py b/tests/test_commands_blender_render_audio.py new file mode 100644 index 0000000000000000000000000000000000000000..dfa3fe27a3285581810492cde6aeb76eca7afb25 --- /dev/null +++ b/tests/test_commands_blender_render_audio.py @@ -0,0 +1,62 @@ +from pathlib import Path +import subprocess +from unittest import mock + +from unittest.mock import patch + +from tests.test_runner import AbstractCommandTest + +expected_script = """ +import bpy +bpy.context.scene.frame_start = 1 +bpy.context.scene.frame_end = 47 +bpy.ops.sound.mixdown(filepath='/tmp/output.flac', codec='FLAC', container='FLAC', accuracy=128) +bpy.ops.wm.quit_blender() +""".strip('\n') + + +class RenderAudioTest(AbstractCommandTest): + def setUp(self): + super().setUp() + + from flamenco_worker.commands import BlenderRenderAudioCommand + + self.cmd = BlenderRenderAudioCommand( + worker=self.fworker, + task_id='12345', + command_idx=0, + ) + + def test_cli_args(self): + from tests.mock_responses import CoroMock + + filepath = Path(__file__) + settings = { + # Point blender_cmd to this file so that we're sure it exists. + 'blender_cmd': '%s --with --cli="args for CLI"' % __file__, + 'frame_start': 1, + 'frame_end': 47, + 'filepath': str(filepath), + 'render_output': '/tmp/output.flac', + } + + cse = CoroMock(...) + cse.coro.return_value.wait = CoroMock(return_value=0) + cse.coro.return_value.pid = 47 + with patch('asyncio.create_subprocess_exec', new=cse) as mock_cse: + self.loop.run_until_complete(self.cmd.run(settings)) + + mock_cse.assert_called_once_with( + __file__, + '--with', + '--cli=args for CLI', + '--enable-autoexec', + '-noaudio', + '--background', + str(filepath), + '--python-exit-code', '47', + '--python-expr', expected_script, + stdin=subprocess.DEVNULL, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) diff --git a/tests/test_commands_concat_videos.py b/tests/test_commands_concat_videos.py new file mode 100644 index 0000000000000000000000000000000000000000..52383c92c6777739dd4628eae4e7d6e659586d3c --- /dev/null +++ b/tests/test_commands_concat_videos.py @@ -0,0 +1,69 @@ +import logging +import shutil +import typing +from pathlib import Path +import shlex +import subprocess +import sys +import tempfile + +from tests.test_runner import AbstractCommandTest + +log = logging.getLogger(__name__) +frame_dir = Path(__file__).with_name('test_frames') + + +class ConcatVideosTest(AbstractCommandTest): + settings: typing.Dict[str, typing.Any] = { + 'ffmpeg_cmd': f'"{sys.executable}" -hide_banner', + 'input_files': str(frame_dir / 'chunk-*.mkv'), + 'output_file': '/tmp/merged.mkv', + } + + def setUp(self): + super().setUp() + + from flamenco_worker.commands import ConcatenateVideosCommand + + self.cmd = ConcatenateVideosCommand( + worker=self.fworker, + task_id='12345', + command_idx=0, + ) + self.settings = self.settings.copy() + + def test_build_ffmpeg_cmd(self): + self.cmd.validate(self.settings) + cliargs = self.cmd._build_ffmpeg_command(self.settings) + + self.assertEqual([ + sys.executable, '-hide_banner', + '-f', 'concat', + '-i', str(frame_dir / 'ffmpeg-input.txt'), + '-c', 'copy', + '-y', + '/tmp/merged.mkv', + ], cliargs) + + def test_run_ffmpeg(self): + with tempfile.TemporaryDirectory() as tempdir: + outfile = Path(tempdir) / 'merged.mkv' + settings: typing.Dict[str, typing.Any] = { + **self.settings, + 'ffmpeg_cmd': 'ffmpeg', # use the real FFmpeg for this test. + 'output_file': str(outfile), + } + + self.loop.run_until_complete(self.cmd.run(settings)) + self.assertTrue(outfile.exists()) + + ffprobe_cmd = [shutil.which('ffprobe'), '-v', 'error', + '-show_entries', 'format=duration', + '-of', 'default=noprint_wrappers=1:nokey=1', + str(outfile)] + log.debug('Running %s', ' '.join(shlex.quote(arg) for arg in ffprobe_cmd)) + probe_out = subprocess.check_output(ffprobe_cmd) + probed_duration = float(probe_out) + + # The combined videos are 7 frames @ 24 frames per second. + self.assertAlmostEqual(0.291, probed_duration, places=3) diff --git a/tests/test_commands_copy_file.py b/tests/test_commands_copy_file.py index 78408d30cacb0453440d54e364689ec2e2ccf7f8..7b4b69788e4b9e8f5ba6f0e34e01a017a3c9bdfa 100644 --- a/tests/test_commands_copy_file.py +++ b/tests/test_commands_copy_file.py @@ -1,7 +1,7 @@ from pathlib import Path import os -from test_runner import AbstractCommandTest +from tests.test_runner import AbstractCommandTest class CopyFileTest(AbstractCommandTest): diff --git a/tests/test_commands_create_video.py b/tests/test_commands_create_video.py new file mode 100644 index 0000000000000000000000000000000000000000..02f016936702a50eb5f9215e88670957691e2223 --- /dev/null +++ b/tests/test_commands_create_video.py @@ -0,0 +1,87 @@ +import logging +import shutil +import typing +from pathlib import Path +import shlex +import subprocess +import sys +import tempfile + +from tests.test_runner import AbstractCommandTest + +log = logging.getLogger(__name__) + + +class CreateVideoTest(AbstractCommandTest): + settings: typing.Dict[str, typing.Any] = { + 'ffmpeg_cmd': f'"{sys.executable}" -hide_banner', + 'input_files': '/tmp/*.png', + 'output_file': '/tmp/merged.mkv', + 'fps': 24, + } + + def setUp(self): + super().setUp() + + from flamenco_worker.commands import CreateVideoCommand + + self.cmd = CreateVideoCommand( + worker=self.fworker, + task_id='12345', + command_idx=0, + ) + self.settings = self.settings.copy() + + def test_validate(self): + self.assertIn('not found on $PATH', self.cmd.validate({'ffmpeg_cmd': '/does/not/exist'})) + self.assertIsNone(self.cmd.validate(self.settings)) + + def test_validate_without_ffmpeg(self): + settings = self.settings.copy() + del settings['ffmpeg_cmd'] + + self.assertIsNone(self.cmd.validate(settings)) + self.assertEqual(['ffmpeg'], settings['ffmpeg_cmd'], + 'The default setting should be stored in the dict after validation') + + def test_build_ffmpeg_cmd(self): + self.cmd.validate(self.settings) + cliargs = self.cmd._build_ffmpeg_command(self.settings) + + self.assertEqual([ + sys.executable, '-hide_banner', + '-pattern_type', 'glob', + '-r', '24', + '-i', '/tmp/*.png', + '-c:v', 'h264', + '-crf', '17', + '-g', '1', + '-y', + '-bf', '0', + '/tmp/merged.mkv', + ], cliargs) + + def test_run_ffmpeg(self): + with tempfile.TemporaryDirectory() as tempdir: + outfile = Path(tempdir) / 'merged.mkv' + frame_dir = Path(__file__).with_name('test_frames') + settings: typing.Dict[str, typing.Any] = { + **self.settings, + 'ffmpeg_cmd': 'ffmpeg', # use the real FFmpeg for this test. + 'input_files': f'{frame_dir}/*.png', + 'output_file': str(outfile), + } + + self.loop.run_until_complete(self.cmd.run(settings)) + self.assertTrue(outfile.exists()) + + ffprobe_cmd = [shutil.which('ffprobe'), '-v', 'error', + '-show_entries', 'format=duration', + '-of', 'default=noprint_wrappers=1:nokey=1', + str(outfile)] + log.debug('Running %s', ' '.join(shlex.quote(arg) for arg in ffprobe_cmd)) + probe_out = subprocess.check_output(ffprobe_cmd) + probed_duration = float(probe_out) + fps: int = settings['fps'] + expect_duration = len(list(frame_dir.glob('*.png'))) / fps + self.assertAlmostEqual(expect_duration, probed_duration, places=3) diff --git a/tests/test_commands_encode_audio.py b/tests/test_commands_encode_audio.py new file mode 100644 index 0000000000000000000000000000000000000000..9f39d1245426446533c5a11c39b526dc016b8656 --- /dev/null +++ b/tests/test_commands_encode_audio.py @@ -0,0 +1,48 @@ +import logging +import shutil +import typing +from pathlib import Path +import shlex +import subprocess +import sys +import tempfile + +from tests.test_runner import AbstractCommandTest + +log = logging.getLogger(__name__) +frame_dir = Path(__file__).with_name('test_frames') + + +class EncodeAudioTest(AbstractCommandTest): + settings: typing.Dict[str, typing.Any] = { + 'ffmpeg_cmd': f'"{sys.executable}" -hide_banner', + 'input_file': f'{frame_dir}/audio.flac', + 'codec': 'aac', + 'bitrate': '192k', + 'output_file': f'{frame_dir}/audio.aac', + } + + def setUp(self): + super().setUp() + + from flamenco_worker.commands import EncodeAudioCommand + + self.cmd = EncodeAudioCommand( + worker=self.fworker, + task_id='12345', + command_idx=0, + ) + self.settings = self.__class__.settings.copy() + + def test_build_ffmpeg_cmd(self): + self.cmd.validate(self.settings) + cliargs = self.cmd._build_ffmpeg_command(self.settings) + + self.assertEqual([ + sys.executable, '-hide_banner', + '-i', str(frame_dir / 'audio.flac'), + '-c:a', 'aac', + '-b:a', '192k', + '-y', + str(frame_dir / 'audio.aac'), + ], cliargs) diff --git a/tests/test_commands_merge_exr.py b/tests/test_commands_merge_exr.py index 6f2d1371aae683f9ef41964db6b4317b6a296cd7..d34339852b974639795735ca5396ae9b42895adc 100644 --- a/tests/test_commands_merge_exr.py +++ b/tests/test_commands_merge_exr.py @@ -1,6 +1,6 @@ from pathlib import Path -from test_runner import AbstractCommandTest +from tests.test_runner import AbstractCommandTest class MergeProgressiveRendersCommandTest(AbstractCommandTest): diff --git a/tests/test_commands_move_out_of_way.py b/tests/test_commands_move_out_of_way.py index fce01100d1df7edf8e53ef25f97998ea2dab4cc0..e1ffb18e98b578709f64c0f635b1307d68b9453c 100644 --- a/tests/test_commands_move_out_of_way.py +++ b/tests/test_commands_move_out_of_way.py @@ -1,7 +1,7 @@ from pathlib import Path import os -from test_runner import AbstractCommandTest +from tests.test_runner import AbstractCommandTest class MoveOutOfWayTest(AbstractCommandTest): diff --git a/tests/test_commands_move_to_final.py b/tests/test_commands_move_to_final.py index 8b9976b8a625d0b290698b7ca442fde58796fa50..389f0b462ab6f500ea04b9a673d6a49867f6189f 100644 --- a/tests/test_commands_move_to_final.py +++ b/tests/test_commands_move_to_final.py @@ -1,7 +1,7 @@ import os from pathlib import Path -from test_runner import AbstractCommandTest +from tests.test_runner import AbstractCommandTest class MoveToFinalTest(AbstractCommandTest): diff --git a/tests/test_commands_move_with_counter.py b/tests/test_commands_move_with_counter.py new file mode 100644 index 0000000000000000000000000000000000000000..978578826c19c03caaf1702109c4cbf5f5c69c35 --- /dev/null +++ b/tests/test_commands_move_with_counter.py @@ -0,0 +1,61 @@ +import logging +import shutil +import typing +from pathlib import Path +import shlex +import subprocess +import sys +import tempfile + +from tests.test_runner import AbstractCommandTest + +log = logging.getLogger(__name__) +frame_dir = Path(__file__).with_name('test_frames') + + +class MoveWithCounterTest(AbstractCommandTest): + def setUp(self): + super().setUp() + + from flamenco_worker.commands import MoveWithCounterCommand + + self.cmd = MoveWithCounterCommand( + worker=self.fworker, + task_id='12345', + command_idx=0, + ) + + self._tempdir = tempfile.TemporaryDirectory() + self.temppath = Path(self._tempdir.name) + + self.srcpath = (self.temppath / 'somefile.mkv') + self.srcpath.touch() + + (self.temppath / '2018_06_12_001-spring.mkv').touch() + (self.temppath / '2018_06_12_004-spring.mkv').touch() + + def tearDown(self): + self._tempdir.cleanup() + super().tearDown() + + def test_numbers_with_holes(self): + settings = { + 'src': str(self.srcpath), + 'dest': str(self.temppath / '2018_06_12-spring.mkv'), + } + task = self.cmd.execute(settings) + self.loop.run_until_complete(task) + + self.assertFalse(self.srcpath.exists()) + self.assertTrue((self.temppath / '2018_06_12_005-spring.mkv').exists()) + + def test_no_regexp_match(self): + settings = { + 'src': str(self.srcpath), + 'dest': str(self.temppath / 'jemoeder.mkv'), + } + task = self.cmd.execute(settings) + self.loop.run_until_complete(task) + + self.assertFalse(self.srcpath.exists()) + self.assertTrue((self.temppath / 'jemoeder_001.mkv').exists()) diff --git a/tests/test_commands_mux_audio.py b/tests/test_commands_mux_audio.py new file mode 100644 index 0000000000000000000000000000000000000000..c5eae85ef1a0b2701bfe9765121fa409d068e366 --- /dev/null +++ b/tests/test_commands_mux_audio.py @@ -0,0 +1,47 @@ +import logging +import shutil +import typing +from pathlib import Path +import shlex +import subprocess +import sys +import tempfile + +from tests.test_runner import AbstractCommandTest + +log = logging.getLogger(__name__) +frame_dir = Path(__file__).with_name('test_frames') + + +class MuxAudioTest(AbstractCommandTest): + settings: typing.Dict[str, typing.Any] = { + 'ffmpeg_cmd': f'"{sys.executable}" -hide_banner', + 'audio_file': str(frame_dir / 'audio.mkv'), + 'video_file': str(frame_dir / 'video.mkv'), + 'output_file': str(frame_dir / 'muxed.mkv'), + } + + def setUp(self): + super().setUp() + + from flamenco_worker.commands import MuxAudioCommand + + self.cmd = MuxAudioCommand( + worker=self.fworker, + task_id='12345', + command_idx=0, + ) + self.settings = self.settings.copy() + + def test_build_ffmpeg_cmd(self): + self.cmd.validate(self.settings) + cliargs = self.cmd._build_ffmpeg_command(self.settings) + + self.assertEqual([ + sys.executable, '-hide_banner', + '-i', str(frame_dir / 'audio.mkv'), + '-i', str(frame_dir / 'video.mkv'), + '-c', 'copy', + '-y', + str(frame_dir / 'muxed.mkv'), + ], cliargs) diff --git a/tests/test_commands_remove_file.py b/tests/test_commands_remove_file.py new file mode 100644 index 0000000000000000000000000000000000000000..fc4bc83e6854852bc24d6ad6e43e697e5ef3ad98 --- /dev/null +++ b/tests/test_commands_remove_file.py @@ -0,0 +1,60 @@ +from pathlib import Path + +from tests.test_runner import AbstractCommandTest + + +class RemoveFileTest(AbstractCommandTest): + def setUp(self): + super().setUp() + + from flamenco_worker.commands import RemoveFileCommand + import tempfile + + self.tmpdir = tempfile.TemporaryDirectory() + self.tmppath = Path(self.tmpdir.name) + + self.cmd = RemoveFileCommand( + worker=self.fworker, + task_id='12345', + command_idx=0, + ) + + def tearDown(self): + super().tearDown() + self.tmpdir.cleanup() + + def test_validate_settings(self): + self.assertIn('path', self.cmd.validate({'path': 12})) + self.assertIn('path', self.cmd.validate({'path': ''})) + self.assertIn('path', self.cmd.validate({})) + self.assertFalse(self.cmd.validate({'path': '/some/path'})) + + def test_nonexistant_source(self): + path = self.tmppath / 'nonexisting' + task = self.cmd.run({'path': str(path)}) + ok = self.loop.run_until_complete(task) + + self.assertTrue(ok) + self.assertFalse(path.exists()) + + def test_source_file(self): + path = self.tmppath / 'existing' + path.touch() + task = self.cmd.run({'path': str(path)}) + ok = self.loop.run_until_complete(task) + + self.assertTrue(ok) + self.assertFalse(path.exists()) + + def test_soure_dir_with_files(self): + path = self.tmppath / 'dir' + path.mkdir() + (path / 'a.file').touch() + (path / 'b.file').touch() + (path / 'c.file').touch() + + task = self.cmd.run({'path': str(path)}) + ok = self.loop.run_until_complete(task) + + self.assertFalse(ok) + self.assertTrue(path.exists()) diff --git a/tests/test_commands_remove_tree.py b/tests/test_commands_remove_tree.py index 8308612f041a8c56f659b30a4050c3ab84c7a437..5147b5cff2f843db7c3d2319a99d7f9e2b70e44b 100644 --- a/tests/test_commands_remove_tree.py +++ b/tests/test_commands_remove_tree.py @@ -1,7 +1,7 @@ from pathlib import Path import os -from test_runner import AbstractCommandTest +from tests.test_runner import AbstractCommandTest class RemoveTreeTest(AbstractCommandTest): diff --git a/tests/test_commands_subprocess.py b/tests/test_commands_subprocess.py new file mode 100644 index 0000000000000000000000000000000000000000..46fe2411c74af2fcc20f3a0bdfeb36ca7028caf9 --- /dev/null +++ b/tests/test_commands_subprocess.py @@ -0,0 +1,138 @@ +import asyncio +import os +from pathlib import Path +import random +import shlex +import sys +import tempfile +import time + +import psutil + +from tests.test_runner import AbstractCommandTest + + +class PIDFileTest(AbstractCommandTest): + def setUp(self): + super().setUp() + + from flamenco_worker.commands import ExecCommand + + self.cmd = ExecCommand( + worker=self.fworker, + task_id='12345', + command_idx=0, + ) + + def test_alive(self): + with tempfile.NamedTemporaryFile(suffix='.pid') as tmpfile: + pidfile = Path(tmpfile.name) + my_pid = os.getpid() + pidfile.write_text(str(my_pid)) + + self.cmd.worker.trunner.subprocess_pid_file = pidfile + + msg = self.cmd.validate({'cmd': 'echo'}) + self.assertIn(str(pidfile), msg) + self.assertIn(str(psutil.Process(my_pid)), msg) + + def test_alive_newlines(self): + with tempfile.NamedTemporaryFile(suffix='.pid') as tmpfile: + pidfile = Path(tmpfile.name) + my_pid = os.getpid() + pidfile.write_text('\n%s\n' % my_pid) + + self.cmd.worker.trunner.subprocess_pid_file = pidfile + + msg = self.cmd.validate({'cmd': 'echo'}) + self.assertIn(str(pidfile), msg) + self.assertIn(str(psutil.Process(my_pid)), msg) + + def test_dead(self): + # Find a PID that doesn't exist. + for _ in range(1000): + pid = random.randint(1, 2**16) + try: + psutil.Process(pid) + except psutil.NoSuchProcess: + break + else: + self.fail('Unable to find unused PID') + + with tempfile.TemporaryDirectory(suffix='.pid') as tmpname: + tmpdir = Path(tmpname) + pidfile = tmpdir / 'stale.pid' + pidfile.write_text(str(pid)) + + self.cmd.worker.trunner.subprocess_pid_file = pidfile + + msg = self.cmd.validate({'cmd': 'echo'}) + self.assertFalse(msg) + self.assertFalse(pidfile.exists(), 'Stale PID file should have been deleted') + + def test_nonexistant(self): + with tempfile.TemporaryDirectory(suffix='.pid') as tmpname: + tmpdir = Path(tmpname) + pidfile = tmpdir / 'nonexistant.pid' + + self.cmd.worker.trunner.subprocess_pid_file = pidfile + + msg = self.cmd.validate({'cmd': 'echo'}) + self.assertFalse(msg) + + def test_empty(self): + with tempfile.TemporaryDirectory(suffix='.pid') as tmpname: + tmpdir = Path(tmpname) + pidfile = tmpdir / 'empty.pid' + pidfile.write_bytes(b'') + + self.cmd.worker.trunner.subprocess_pid_file = pidfile + + msg = self.cmd.validate({'cmd': 'echo'}) + self.assertTrue(msg, "Empty PID file should be treated as 'alive'") + self.assertTrue(pidfile.exists(), 'Empty PID file should not have been deleted') + + def test_not_configured(self): + self.cmd.worker.trunner.subprocess_pid_file = None + + msg = self.cmd.validate({'cmd': 'echo'}) + self.assertFalse(msg) + + def test_race_open_exclusive(self): + # When there is a race condition such that the exclusive open() of the + # subprocess PID file fails, the new subprocess should be killed. + + # Use shlex to quote strings like this, so we're sure it's done well. + args = [sys.executable, '-c', 'import time; time.sleep(1)'] + cmd = ' '.join(shlex.quote(s) for s in args) + + with tempfile.TemporaryDirectory() as tmpdir: + pidfile = Path(tmpdir) / 'race.pid' + my_pid = os.getpid() + + # Set up the race condition: at validation time the PID file doesn't exist yet, + # but at execute time it does. + self.cmd.worker.trunner.subprocess_pid_file = pidfile + msg = self.cmd.validate({'cmd': cmd}) + self.assertIsNone(msg) + + # Mock an already-running process by writing our own PID. + pidfile.write_text(str(my_pid)) + + start_time = time.time() + with self.assertRaises(FileExistsError): + self.loop.run_until_complete(asyncio.wait_for( + self.cmd.execute({'cmd': cmd}), + 1.3 # no more than 300 ms longer than the actual sleep + )) + duration = time.time() - start_time + + # This shouldn't take anywhere near the entire sleep time, as that would + # mean the command was executed while there was already another one running. + self.assertLess(duration, 0.8, + "Checking the PID file and killing the process should be fast") + + pid = self.cmd.proc.pid + with self.assertRaises(psutil.NoSuchProcess): + process = psutil.Process(pid) + self.fail(f'Process {process} is still running') diff --git a/tests/test_coro_mock.py b/tests/test_coro_mock.py index 2486600edabe8cc01796dbb8e35e0d9143b9e70f..16a1364be8e7ed4dbdb83f08a998eae9e3b5de9b 100644 --- a/tests/test_coro_mock.py +++ b/tests/test_coro_mock.py @@ -10,7 +10,7 @@ class CoroMockTest(unittest.TestCase): self.loop = construct_asyncio_loop() def test_setting_return_value(self): - from mock_responses import CoroMock + from tests.mock_responses import CoroMock cm = CoroMock() cm.coro.return_value = '123' diff --git a/tests/test_frames/000108.png b/tests/test_frames/000108.png new file mode 100644 index 0000000000000000000000000000000000000000..72ca4998b4b899f2ca2f23702a255d987a05b6b2 Binary files /dev/null and b/tests/test_frames/000108.png differ diff --git a/tests/test_frames/000109.png b/tests/test_frames/000109.png new file mode 100644 index 0000000000000000000000000000000000000000..c7b1bb57c612ceb28ae1d39c8340592f5f430768 Binary files /dev/null and b/tests/test_frames/000109.png differ diff --git a/tests/test_frames/000110.png b/tests/test_frames/000110.png new file mode 100644 index 0000000000000000000000000000000000000000..fc109200bca92a1328aef9b9716bb6d5d750c053 Binary files /dev/null and b/tests/test_frames/000110.png differ diff --git a/tests/test_frames/000111.png b/tests/test_frames/000111.png new file mode 100644 index 0000000000000000000000000000000000000000..6ffea987b1b3670e8bb590445a3e074bdea6e116 Binary files /dev/null and b/tests/test_frames/000111.png differ diff --git a/tests/test_frames/000112.png b/tests/test_frames/000112.png new file mode 100644 index 0000000000000000000000000000000000000000..f5238025d1dd730ff450b2d74dd05d0d97166fce Binary files /dev/null and b/tests/test_frames/000112.png differ diff --git a/tests/test_frames/000113.png b/tests/test_frames/000113.png new file mode 100644 index 0000000000000000000000000000000000000000..7a7f346d1e337e05de02972dc166229bef063ebf Binary files /dev/null and b/tests/test_frames/000113.png differ diff --git a/tests/test_frames/000114.png b/tests/test_frames/000114.png new file mode 100644 index 0000000000000000000000000000000000000000..a56146da7ed061fec185d190a941e1e4646db5b6 Binary files /dev/null and b/tests/test_frames/000114.png differ diff --git a/tests/test_frames/chunk-0001-0004.mkv b/tests/test_frames/chunk-0001-0004.mkv new file mode 100644 index 0000000000000000000000000000000000000000..ac4041c96e3efbed0013923658ca9ce40f27ab2e Binary files /dev/null and b/tests/test_frames/chunk-0001-0004.mkv differ diff --git a/tests/test_frames/chunk-0005-0007.mkv b/tests/test_frames/chunk-0005-0007.mkv new file mode 100644 index 0000000000000000000000000000000000000000..56fe49524f6497e5dae3fe0a5a9151e968ed2ac7 Binary files /dev/null and b/tests/test_frames/chunk-0005-0007.mkv differ diff --git a/tests/test_may_i_run.py b/tests/test_may_i_run.py index 90e23b8bdde853676300076c302295d2a13decff..5c082b2189480fad3a1d15b1d1b76eb9819e943c 100644 --- a/tests/test_may_i_run.py +++ b/tests/test_may_i_run.py @@ -1,6 +1,6 @@ from unittest.mock import Mock -from abstract_worker_test import AbstractWorkerTest +from tests.abstract_worker_test import AbstractWorkerTest class MayIRunTest(AbstractWorkerTest): diff --git a/tests/test_mypy.py b/tests/test_mypy.py new file mode 100644 index 0000000000000000000000000000000000000000..969fbbb43fa86f294ef257aaedf0ed417ec8fadf --- /dev/null +++ b/tests/test_mypy.py @@ -0,0 +1,27 @@ +import pathlib +import unittest + +import mypy.api + +test_modules = ['flamenco_worker', 'tests'] + + +class MypyRunnerTest(unittest.TestCase): + def test_run_mypy(self): + proj_root = pathlib.Path(__file__).parent.parent + args = ['--incremental', '--ignore-missing-imports'] + [str(proj_root / dirname) for dirname + in test_modules] + + result = mypy.api.run(args) + + stdout, stderr, status = result + + messages = [] + if stderr: + messages.append(stderr) + if stdout: + messages.append(stdout) + if status: + messages.append('Mypy failed with status %d' % status) + if messages: + self.fail('\n'.join(['Mypy errors:'] + messages)) diff --git a/tests/test_pretask_check.py b/tests/test_pretask_check.py new file mode 100644 index 0000000000000000000000000000000000000000..e9f77bae5622cdb0786680fea3b6cf93f76ddf82 --- /dev/null +++ b/tests/test_pretask_check.py @@ -0,0 +1,142 @@ +import contextlib +import tempfile +from pathlib import Path +from unittest import mock + +from tests.test_worker import AbstractFWorkerTest + + +# Mock merge_with_home_config() so that it doesn't overwrite actual config. +@mock.patch('flamenco_worker.config.merge_with_home_config', new=lambda *args: None) +@mock.patch('socket.gethostname', new=lambda: 'ws-unittest') +class PretaskWriteCheckTest(AbstractFWorkerTest): + def test_not_writable_dir(self): + with self.write_check() as tdir: + unwritable_dir = tdir / 'unwritable' + unwritable_dir.mkdir(0o555) + self.worker.pretask_check_params.pre_task_check_write = (unwritable_dir, ) + + def test_not_writable_file(self): + with self.write_check() as tdir: + unwritable_dir = tdir / 'unwritable' + unwritable_dir.mkdir(0o555) + unwritable_file = unwritable_dir / 'testfile.txt' + self.worker.pretask_check_params.pre_task_check_write = (unwritable_file, ) + + def test_write_file_exists(self): + def post_run(): + self.assertTrue(existing.exists(), '%s should not have been deleted' % existing) + + with self.write_check(post_run) as tdir: + existing = tdir / 'unwritable-testfile.txt' + existing.write_bytes(b'x') + existing.chmod(0o444) # only readable + self.worker.pretask_check_params.pre_task_check_write = (existing, ) + + def test_happy_remove_file(self): + from tests.mock_responses import EmptyResponse, CoroMock + + self.manager.post = CoroMock(return_value=EmptyResponse()) + + with tempfile.TemporaryDirectory() as tdir_name: + tdir = Path(tdir_name) + testfile = tdir / 'writable-testfile.txt' + self.worker.pretask_check_params.pre_task_check_write = (testfile, ) + + self.worker.schedule_fetch_task() + self.asyncio_loop.run_until_complete(self.worker.single_iteration_fut) + + self.assertFalse(testfile.exists(), '%s should have been deleted' % testfile) + + self.manager.post.assert_called_once_with('/task', loop=mock.ANY) + self.assertIsNone(self.worker.sleeping_fut) + + def test_happy_not_remove_file(self): + from tests.mock_responses import EmptyResponse, CoroMock + + self.manager.post = CoroMock(return_value=EmptyResponse()) + + with tempfile.TemporaryDirectory() as tdir_name: + tdir = Path(tdir_name) + testfile = tdir / 'writable-testfile.txt' + # The file exists before, so it shouldn't be deleted afterwards. + with testfile.open('wb') as outfile: + outfile.write(b'x') + self.worker.pretask_check_params.pre_task_check_write = (testfile, ) + + self.worker.schedule_fetch_task() + self.asyncio_loop.run_until_complete(self.worker.single_iteration_fut) + + self.assertTrue(testfile.exists(), '%s should not have been deleted' % testfile) + + self.manager.post.assert_called_once_with('/task', loop=mock.ANY) + self.assertIsNone(self.worker.sleeping_fut) + + @contextlib.contextmanager + def write_check(self, post_run=None): + from tests.mock_responses import EmptyResponse, CoroMock + + self.manager.post = CoroMock(return_value=EmptyResponse()) + + with tempfile.TemporaryDirectory() as tdir_name: + tdir = Path(tdir_name) + + yield tdir + + self.worker.schedule_fetch_task() + self.asyncio_loop.run_until_complete(self.worker.single_iteration_fut) + + if post_run is not None: + post_run() + + self.manager.post.assert_called_once_with('/ack-status-change/error', loop=mock.ANY) + self.assertFalse(self.worker.sleeping_fut.done()) + + +# Mock merge_with_home_config() so that it doesn't overread actual config. +@mock.patch('flamenco_worker.config.merge_with_home_config', new=lambda *args: None) +@mock.patch('socket.gethostname', new=lambda: 'ws-unittest') +class PretaskReadCheckTest(AbstractFWorkerTest): + def test_not_readable_dir(self): + def cleanup(): + unreadable_dir.chmod(0o755) + + with self.read_check(cleanup) as tdir: + unreadable_dir = tdir / 'unreadable' + unreadable_dir.mkdir(0o000) + self.worker.pretask_check_params.pre_task_check_read = (unreadable_dir, ) + + def test_read_file_exists(self): + def post_run(): + self.assertTrue(existing.exists(), '%s should not have been deleted' % existing) + + with self.read_check(post_run) as tdir: + existing = tdir / 'unreadable-testfile.txt' + existing.write_bytes(b'x') + existing.chmod(0o222) # only writable + self.worker.pretask_check_params.pre_task_check_read = (existing, ) + + def test_read_file_not_exists(self): + with self.read_check() as tdir: + nonexistant = tdir / 'nonexistant-testfile.txt' + self.worker.pretask_check_params.pre_task_check_read = (nonexistant, ) + + @contextlib.contextmanager + def read_check(self, post_run=None): + from tests.mock_responses import EmptyResponse, CoroMock + + self.manager.post = CoroMock(return_value=EmptyResponse()) + + with tempfile.TemporaryDirectory() as tdir_name: + tdir = Path(tdir_name) + + yield tdir + + self.worker.schedule_fetch_task() + self.asyncio_loop.run_until_complete(self.worker.single_iteration_fut) + + if post_run is not None: + post_run() + + self.manager.post.assert_called_once_with('/ack-status-change/error', loop=mock.ANY) + self.assertFalse(self.worker.sleeping_fut.done()) diff --git a/tests/test_runner.py b/tests/test_runner.py index a4e989aa4b187617ab39a80787b4e050af73fb85..454787bd4e2c3a272a690f0bf8a8e560d6e19167 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -1,17 +1,20 @@ import asyncio from unittest.mock import Mock, call -from abstract_worker_test import AbstractWorkerTest +from tests.abstract_worker_test import AbstractWorkerTest class AbstractCommandTest(AbstractWorkerTest): def setUp(self): - from mock_responses import CoroMock + from tests.mock_responses import CoroMock from flamenco_worker.worker import FlamencoWorker + from flamenco_worker.runner import TaskRunner from flamenco_worker.cli import construct_asyncio_loop self.loop = construct_asyncio_loop() self.fworker = Mock(spec=FlamencoWorker) + self.fworker.trunner = Mock(spec=TaskRunner) + self.fworker.trunner.subprocess_pid_file = None self.fworker.register_log = CoroMock() self.fworker.register_task_update = CoroMock() @@ -83,14 +86,15 @@ class ExecCommandTest(AbstractCommandTest): 0.6 )) self.assertTrue(ok) + pid = cmd.proc.pid # Check that both lines have been reported. self.fworker.register_log.assert_has_calls([ call('exec: Starting'), call('Executing %s', '%s -c \'print("hello, this is two lines\\nYes, really.")\'' % sys.executable), - call('> hello, this is two lines'), # note the logged line doesn't end in a newline - call('> Yes, really.'), # note the logged line doesn't end in a newline + call('pid=%d > hello, this is two lines' % pid), + call('pid=%d > Yes, really.' % pid), # note the logged line doesn't end in a newline call('exec: Finished'), ]) @@ -119,9 +123,10 @@ class ExecCommandTest(AbstractCommandTest): self.assertFalse(ok) # Check that the error has been reported. - decode_err = "exec.(task_id=12345, command_idx=0): Error executing: Command produced " \ - "non-UTF8 output, aborting: 'utf-8' codec can't decode byte 0x80 in " \ - "position 0: invalid start byte" + pid = cmd.proc.pid + decode_err = "exec.(task_id=12345, command_idx=0): Error executing: Command pid=%d " \ + "produced non-UTF8 output, aborting: 'utf-8' codec can't decode byte 0x80 "\ + "in position 0: invalid start byte" % pid self.fworker.register_log.assert_has_calls([ call('exec: Starting'), call('Executing %s', @@ -148,19 +153,20 @@ class ExecCommandTest(AbstractCommandTest): 0.6 )) self.assertFalse(ok) + pid = cmd.proc.pid # Check that the execution error has been reported. self.fworker.register_log.assert_has_calls([ call('exec: Starting'), call('Executing %s', '%s -c \'raise SystemExit("¡FAIL!")\'' % sys.executable), - call('> ¡FAIL!'), # note the logged line doesn't end in a newline + call('pid=%d > ¡FAIL!' % pid), # note the logged line doesn't end in a newline call('exec.(task_id=12345, command_idx=0): Error executing: ' - 'Command failed with status 1') + 'Command %s (pid=%d) failed with status 1' % (settings['cmd'], pid)) ]) # The update should NOT contain a new task status -- that is left to the Worker. self.fworker.register_task_update.assert_called_with( activity='exec.(task_id=12345, command_idx=0): Error executing: ' - 'Command failed with status 1', + 'Command %s (pid=%d) failed with status 1' % (settings['cmd'], pid), ) diff --git a/tests/test_upstream_update_queue.py b/tests/test_upstream_update_queue.py index 8843c0b4bbf05f5c968c43c2474e14e29cf4541d..6a7c894ebe65cd82c6c688081bb16729ed4b7d7c 100644 --- a/tests/test_upstream_update_queue.py +++ b/tests/test_upstream_update_queue.py @@ -6,7 +6,7 @@ from unittest.mock import Mock import requests -from abstract_worker_test import AbstractWorkerTest +from tests.abstract_worker_test import AbstractWorkerTest class TaskUpdateQueueTest(AbstractWorkerTest): @@ -14,7 +14,7 @@ class TaskUpdateQueueTest(AbstractWorkerTest): from flamenco_worker.upstream import FlamencoManager from flamenco_worker.upstream_update_queue import TaskUpdateQueue from flamenco_worker.cli import construct_asyncio_loop - from mock_responses import CoroMock + from tests.mock_responses import CoroMock self.asyncio_loop = construct_asyncio_loop() self.shutdown_future = self.asyncio_loop.create_future() @@ -39,7 +39,7 @@ class TaskUpdateQueueTest(AbstractWorkerTest): Also tests connection errors and other HTTP error statuses. """ - from mock_responses import JsonResponse, EmptyResponse + from tests.mock_responses import JsonResponse, EmptyResponse # Try different value types payload = {'key': 'value', @@ -95,7 +95,7 @@ class TaskUpdateQueueTest(AbstractWorkerTest): def test_queue_persistence(self): """Check that updates are pushed, even when the process is stopped & restarted.""" - from mock_responses import EmptyResponse + from tests.mock_responses import EmptyResponse from flamenco_worker.upstream_update_queue import TaskUpdateQueue # Try different value types @@ -151,7 +151,7 @@ class TaskUpdateQueueTest(AbstractWorkerTest): """A 409 Conflict response should discard a queued task update. """ - from mock_responses import TextResponse + from tests.mock_responses import TextResponse # Try different value types payload = {'key': 'value', @@ -182,3 +182,42 @@ class TaskUpdateQueueTest(AbstractWorkerTest): # There should only be one attempt at delivering this payload. self.assertEqual(1, tries) self.assertEqual([], list(self.tuqueue._queue())) + + def test_task_gone(self): + """A 404 Not Found response should discard a queued task update. + + This can happen in a race condition, when a task is deleted/archived + while there are still updates in the local queue. + """ + + from tests.mock_responses import TextResponse + + # Try different value types + payload = {'key': 'value', + 'sub': {'some': 13, + 'values': datetime.datetime.now()}} + + tries = 0 + + async def push_callback(url, *, json, loop): + nonlocal tries + tries += 1 + self.shutdown_future.cancel() + return TextResponse("no", status_code=404) + + self.manager.post.side_effect = push_callback + + self.tuqueue.queue('/push/here', payload) + + # Run the loop for 2 seconds. This should be enough for 3 retries of 0.3 seconds + handling + # the actual payload. + self.asyncio_loop.run_until_complete( + asyncio.wait_for( + self.tuqueue.work(loop=self.asyncio_loop), + timeout=2 + ) + ) + + # There should only be one attempt at delivering this payload. + self.assertEqual(1, tries) + self.assertEqual([], list(self.tuqueue._queue())) diff --git a/tests/test_worker.py b/tests/test_worker.py index 8ff3fa02c12ef89bd01d811dfda8952c3be1c05c..21694032362e252bc0b633ac0dc87e6a892aaedb 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -6,7 +6,7 @@ from unittest.mock import Mock import asyncio import requests -from abstract_worker_test import AbstractWorkerTest +from tests.abstract_worker_test import AbstractWorkerTest class AbstractFWorkerTest(AbstractWorkerTest): @@ -16,7 +16,7 @@ class AbstractFWorkerTest(AbstractWorkerTest): from flamenco_worker.worker import FlamencoWorker from flamenco_worker.runner import TaskRunner from flamenco_worker.upstream_update_queue import TaskUpdateQueue - from mock_responses import CoroMock + from tests.mock_responses import CoroMock self.asyncio_loop = construct_asyncio_loop() self.asyncio_loop.set_debug(True) @@ -44,6 +44,11 @@ class AbstractFWorkerTest(AbstractWorkerTest): shutdown_future=self.shutdown_future, ) + # Prime the LRU cache to always return this hostname. + with unittest.mock.patch('socket.gethostname') as gethostname: + gethostname.return_value = 'ws-unittest' + self.worker.hostname() + def tearDown(self): if self.worker._push_act_to_manager is not None: try: @@ -70,12 +75,10 @@ class AbstractFWorkerTest(AbstractWorkerTest): class WorkerStartupTest(AbstractFWorkerTest): # Mock merge_with_home_config() so that it doesn't overwrite actual config. - @unittest.mock.patch('socket.gethostname') @unittest.mock.patch('flamenco_worker.config.merge_with_home_config') - def test_startup_already_registered(self, mock_merge_with_home_config, mock_gethostname): - from mock_responses import EmptyResponse, CoroMock + def test_startup_already_registered(self, mock_merge_with_home_config): + from tests.mock_responses import EmptyResponse, CoroMock - mock_gethostname.return_value = 'ws-unittest' self.manager.post = CoroMock(return_value=EmptyResponse()) self.asyncio_loop.run_until_complete(self.worker.startup(may_retry_loop=False)) @@ -90,14 +93,12 @@ class WorkerStartupTest(AbstractFWorkerTest): ) self.tuqueue.queue.assert_not_called() - @unittest.mock.patch('socket.gethostname') @unittest.mock.patch('flamenco_worker.config.merge_with_home_config') - def test_startup_registration(self, mock_merge_with_home_config, mock_gethostname): + def test_startup_registration(self, mock_merge_with_home_config): from flamenco_worker.worker import detect_platform - from mock_responses import JsonResponse, CoroMock + from tests.mock_responses import JsonResponse, CoroMock self.worker.worker_id = None - mock_gethostname.return_value = 'ws-unittest' self.manager.post = CoroMock(return_value=JsonResponse({ '_id': '5555', @@ -128,7 +129,7 @@ class WorkerStartupTest(AbstractFWorkerTest): """Test that startup is aborted when the worker can't register.""" from flamenco_worker.worker import detect_platform, UnableToRegisterError - from mock_responses import JsonResponse, CoroMock + from tests.mock_responses import JsonResponse, CoroMock self.worker.worker_id = None mock_gethostname.return_value = 'ws-unittest' @@ -167,7 +168,7 @@ class TestWorkerTaskExecution(AbstractFWorkerTest): def test_fetch_task_happy(self): from unittest.mock import call - from mock_responses import JsonResponse, CoroMock + from tests.mock_responses import JsonResponse, CoroMock self.manager.post = CoroMock() # response when fetching a task @@ -199,11 +200,11 @@ class TestWorkerTaskExecution(AbstractFWorkerTest): self.worker.schedule_fetch_task() self.manager.post.assert_not_called() - interesting_task = self.worker.fetch_task_task - self.loop.run_until_complete(self.worker.fetch_task_task) + interesting_task = self.worker.single_iteration_fut + self.loop.run_until_complete(self.worker.single_iteration_fut) # Another fetch-task task should have been scheduled. - self.assertNotEqual(self.worker.fetch_task_task, interesting_task) + self.assertNotEqual(self.worker.single_iteration_fut, interesting_task) self.manager.post.assert_called_once_with('/task', loop=self.asyncio_loop) self.tuqueue.queue.assert_has_calls([ @@ -223,7 +224,70 @@ class TestWorkerTaskExecution(AbstractFWorkerTest): def test_stop_current_task(self): """Test that stopped tasks get status 'canceled'.""" - from mock_responses import JsonResponse, CoroMock + from tests.mock_responses import JsonResponse, CoroMock, EmptyResponse + + self.manager.post = CoroMock() + self.manager.post.coro.side_effect = [ + # response when fetching a task + JsonResponse({ + '_id': '58514d1e9837734f2e71b479', + 'job': '58514d1e9837734f2e71b477', + 'manager': '585a795698377345814d2f68', + 'project': '', + 'user': '580f8c66983773759afdb20e', + 'name': 'sleep-14-26', + 'status': 'processing', + 'priority': 50, + 'job_type': 'unittest', + 'task_type': 'sleep', + 'commands': [ + {'name': 'sleep', 'settings': {'time_in_seconds': 3}} + ] + }), + EmptyResponse(), # stopping (and thus returning) a task. + EmptyResponse(), # signing off + ] + + self.worker.schedule_fetch_task() + + stop_called = False + + async def stop(): + nonlocal stop_called + stop_called = True + + await asyncio.sleep(0.2) + await self.worker.stop_current_task(self.worker.task_id) + + asyncio.ensure_future(stop(), loop=self.loop) + self.loop.run_until_complete(self.worker.single_iteration_fut) + + self.assertTrue(stop_called) + + self.manager.post.assert_has_calls([ + unittest.mock.call('/task', loop=self.asyncio_loop), + unittest.mock.call(f'/tasks/{self.worker.task_id}/return', loop=self.asyncio_loop), + ]) + self.tuqueue.queue.assert_any_call( + '/tasks/58514d1e9837734f2e71b479/update', + {'task_progress_percentage': 0, 'activity': '', + 'command_progress_percentage': 0, 'task_status': 'active', + 'current_command_idx': 0}, + ) + + # A bit clunky because we don't know which timestamp is included in the log line. + last_args, last_kwargs = self.tuqueue.queue.call_args + self.assertEqual(last_args[0], '/tasks/58514d1e9837734f2e71b479/update') + self.assertEqual(last_kwargs, {}) + self.assertIn('log', last_args[1]) + self.assertIn( + 'Worker 1234 (ws-unittest) stopped running this task, no longer ' + 'allowed to run by Manager', last_args[1]['log']) + + self.assertEqual(self.tuqueue.queue.call_count, 2) + + def test_stop_current_task_mismatch(self): + from tests.mock_responses import JsonResponse, CoroMock self.manager.post = CoroMock() # response when fetching a task @@ -252,10 +316,10 @@ class TestWorkerTaskExecution(AbstractFWorkerTest): stop_called = True await asyncio.sleep(0.2) - await self.worker.stop_current_task() + await self.worker.stop_current_task('other-task-id') asyncio.ensure_future(stop(), loop=self.loop) - self.loop.run_until_complete(self.worker.fetch_task_task) + self.loop.run_until_complete(self.worker.single_iteration_fut) self.assertTrue(stop_called) @@ -267,13 +331,12 @@ class TestWorkerTaskExecution(AbstractFWorkerTest): 'current_command_idx': 0}, ) - # A bit clunky because we don't know which timestamp is included in the log line. + # The task shouldn't be stopped, because the wrong task ID was requested to stop. last_args, last_kwargs = self.tuqueue.queue.call_args self.assertEqual(last_args[0], '/tasks/58514d1e9837734f2e71b479/update') self.assertEqual(last_kwargs, {}) - self.assertIn('log', last_args[1]) - self.assertTrue(last_args[1]['log'].endswith( - 'Worker 1234 stopped running this task, no longer allowed to run by Manager')) + self.assertIn('activity', last_args[1]) + self.assertEqual(last_args[1]['activity'], 'Task completed') self.assertEqual(self.tuqueue.queue.call_count, 2) @@ -395,7 +458,7 @@ class WorkerShutdownTest(AbstractWorkerTest): from flamenco_worker.worker import FlamencoWorker from flamenco_worker.runner import TaskRunner from flamenco_worker.upstream_update_queue import TaskUpdateQueue - from mock_responses import CoroMock + from tests.mock_responses import CoroMock self.asyncio_loop = construct_asyncio_loop() self.asyncio_loop.set_debug(True) @@ -439,7 +502,7 @@ class WorkerSleepingTest(AbstractFWorkerTest): self.worker.loop = self.loop def test_stop_current_task_go_sleep(self): - from mock_responses import JsonResponse, CoroMock + from tests.mock_responses import JsonResponse, CoroMock self.manager.post = CoroMock() # response when fetching a task @@ -449,8 +512,8 @@ class WorkerSleepingTest(AbstractFWorkerTest): self.worker.schedule_fetch_task() with self.assertRaises(concurrent.futures.CancelledError): - self.loop.run_until_complete(self.worker.fetch_task_task) + self.loop.run_until_complete(self.worker.single_iteration_fut) - self.assertIsNotNone(self.worker.sleeping_task) - self.assertFalse(self.worker.sleeping_task.done()) - self.assertTrue(self.worker.fetch_task_task.done()) + self.assertIsNotNone(self.worker.sleeping_fut) + self.assertFalse(self.worker.sleeping_fut.done()) + self.assertTrue(self.worker.single_iteration_fut.done())