diff --git a/.gitignore b/.gitignore index 625fb509eaaa73cee8d14cb13864e5a53ae6044a..4df48ff25d33539548c2dd058fdd306055b11685 100644 --- a/.gitignore +++ b/.gitignore @@ -48,3 +48,5 @@ docs/site/ /packages/flamenco-manager-go/src/flamenco-manager/docker/flamenco-manager.yaml /packages/flamenco-manager-go/src/flamenco-manager/docker/flamenco-manager-install-*.sh /packages/flamenco-manager-go/src/flamenco-manager/docker/flamenco-manager-*.docker.tgz + +/packages/flamenco-worker-python/flamenco_worker.egg-info/ diff --git a/packages/flamenco-worker-python/README.md b/packages/flamenco-worker-python/README.md new file mode 100644 index 0000000000000000000000000000000000000000..c8519a7e140b480a1e3c1527c44dd26037fc376f --- /dev/null +++ b/packages/flamenco-worker-python/README.md @@ -0,0 +1,13 @@ + +- activity: latest thing the worker is doing + - short description + - current command name + - percentage completed of + - task + - command + - contained in task in Flamenco Server MongoDB + +- log: full log of the whole task + - complete output of every command + - multiple chunks of gzipped data + - own collection in Flamenco Server MongoDB diff --git a/packages/flamenco-worker-python/flamenco-worker.cfg b/packages/flamenco-worker-python/flamenco-worker.cfg new file mode 100644 index 0000000000000000000000000000000000000000..2948535875f15593bb0831618551c3ecbcbf4f38 --- /dev/null +++ b/packages/flamenco-worker-python/flamenco-worker.cfg @@ -0,0 +1,3 @@ +[flamenco-worker] +manager_url = http://localhost:8083/ +job_types = sleep blender_render_simple diff --git a/packages/flamenco-worker-python/flamenco_worker/__init__.py b/packages/flamenco-worker-python/flamenco_worker/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/packages/flamenco-worker-python/flamenco_worker/attrs_extra.py b/packages/flamenco-worker-python/flamenco_worker/attrs_extra.py new file mode 100644 index 0000000000000000000000000000000000000000..cec464af16564860bff0c2dbd19654ec2f49f2a8 --- /dev/null +++ b/packages/flamenco-worker-python/flamenco_worker/attrs_extra.py @@ -0,0 +1,17 @@ +"""Extra functionality for attrs.""" + +import logging + +import attr + + +def log(name): + """Returns a logger attr.ib + + :param name: name to pass to logging.getLogger() + :rtype: attr.ib + """ + return attr.ib(default=logging.getLogger(name), + repr=False, + hash=False, + cmp=False) diff --git a/packages/flamenco-worker-python/flamenco_worker/cli.py b/packages/flamenco-worker-python/flamenco_worker/cli.py new file mode 100644 index 0000000000000000000000000000000000000000..631716b4ae0e2b158a151797a84c21b1bbc43f21 --- /dev/null +++ b/packages/flamenco-worker-python/flamenco_worker/cli.py @@ -0,0 +1,108 @@ +"""Commandline interface entry points.""" + +import argparse +import collections +import configparser +import logging +import logging.config +import os + +DEFAULT_CONFIG = { + 'flamenco-worker': collections.OrderedDict([ + ('manager_url', 'http://flamenco-manager/'), + ('job_types', 'sleep blender_render_simple'), + ('worker_id', ''), + ('worker_secret', ''), + ]) +} + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('-c', '--config', + help='Load this configuration file instead of the default files.') + parser.add_argument('-q', '--quiet', action='store_true', + help='Log less (only WARNING and more severe).') + parser.add_argument('-v', '--verbose', action='store_true', + help='Log more (DEBUG and more severe).') + args = parser.parse_args() + + # Set up logging + if args.quiet: + level = 'WARNING' + elif args.verbose: + level = 'DEBUG' + else: + level = 'INFO' + logging.config.dictConfig({ + 'version': 1, + 'formatters': { + 'default': {'format': '%(asctime)-15s %(levelname)8s %(name)s %(message)s'} + }, + 'handlers': { + 'console': { + 'class': 'logging.StreamHandler', + 'formatter': 'default', + 'stream': 'ext://sys.stderr', + } + }, + 'loggers': { + 'flamenco_worker': {'level': level}, + }, + 'root': { + 'level': 'WARNING', + 'handlers': [ + 'console', + ], + } + }) + + log = logging.getLogger(__name__) + log.debug('Starting') + + # Load configuration + confparser = configparser.ConfigParser() + confparser.read_dict(DEFAULT_CONFIG) + + if args.config: + log.info('Loading configuration from %s', args.config) + confparser.read(args.config, encoding='utf8') + else: + from . import config as config_module + config_files = [config_module.GLOBAL_CONFIG_FILE, + config_module.HOME_CONFIG_FILE] + log.info('Loading configuration from %s', ', '.join(config_files)) + confparser.read(config_files, encoding='utf8') + + from .config import CONFIG_SECTION + if args.verbose: + import sys + log.info('Effective configuration:') + to_show = configparser.ConfigParser() + to_show.read_dict(confparser) + if to_show.get(CONFIG_SECTION, 'worker_secret'): + to_show.set(CONFIG_SECTION, 'worker_secret', '-hidden-') + to_show.write(sys.stderr) + + from . import worker, upstream + + fmanager = upstream.FlamencoManager( + manager_url=confparser.get(CONFIG_SECTION, 'manager_url'), + ) + + fworker = worker.FlamencoWorker( + manager=fmanager, + job_types=confparser.get(CONFIG_SECTION, 'job_types').split(), + worker_id=confparser.get(CONFIG_SECTION, 'worker_id'), + worker_secret=confparser.get(CONFIG_SECTION, 'worker_secret'), + ) + try: + fworker.startup() + fworker.mainloop() + except: + log.exception('Uncaught exception!') + log.warning('Shutting down') + + +if __name__ == '__main__': + main() diff --git a/packages/flamenco-worker-python/flamenco_worker/config.py b/packages/flamenco-worker-python/flamenco_worker/config.py new file mode 100644 index 0000000000000000000000000000000000000000..d5b698892024ce0939117925e1bf778a55d84344 --- /dev/null +++ b/packages/flamenco-worker-python/flamenco_worker/config.py @@ -0,0 +1,32 @@ +"""Writes configuration to a config file in the home directory.""" + +import configparser +import os.path +import logging + +HOME_CONFIG_FILE = os.path.expanduser('~/.flamenco-worker.cfg') +GLOBAL_CONFIG_FILE = 'flamenco-worker.cfg' +CONFIG_SECTION = 'flamenco-worker' + +log = logging.getLogger(__name__) + + +def merge_with_home_config(new_conf: dict): + """Updates the home configuration file with the given config dict.""" + + confparser = configparser.ConfigParser() + confparser.read_dict({CONFIG_SECTION: {}}) + confparser.read(HOME_CONFIG_FILE, encoding='utf8') + + for key, value in new_conf.items(): + confparser.set(CONFIG_SECTION, key, value) + + tmpname = HOME_CONFIG_FILE + '~' + log.debug('Writing configuration file to %s', tmpname) + with open(tmpname, mode='wt', encoding='utf8') as outfile: + confparser.write(outfile) + + log.debug('Moving configuration file to %s', HOME_CONFIG_FILE) + os.replace(tmpname, HOME_CONFIG_FILE) + + log.info('Updated configuration file %s', HOME_CONFIG_FILE) diff --git a/packages/flamenco-worker-python/flamenco_worker/upstream.py b/packages/flamenco-worker-python/flamenco_worker/upstream.py new file mode 100644 index 0000000000000000000000000000000000000000..488d7cde14e85186a4f98d75b2c76c52aa9c7245 --- /dev/null +++ b/packages/flamenco-worker-python/flamenco_worker/upstream.py @@ -0,0 +1,76 @@ +import attr +import requests + +from . import attrs_extra + + +@attr.s +class FlamencoManager: + manager_url = attr.ib(validator=attr.validators.instance_of(str)) + session = attr.ib(default=None, init=False) + _log = attrs_extra.log('%s.FlamencoManager' % __name__) + + def get(self, *args, **kwargs): + return self.client_request('GET', *args, **kwargs) + + def post(self, *args, **kwargs): + return self.client_request('POST', *args, **kwargs) + + def put(self, *args, **kwargs): + return self.client_request('PUT', *args, **kwargs) + + def delete(self, *args, **kwargs): + return self.client_request('DELETE', *args, **kwargs) + + def patch(self, *args, **kwargs): + return self.client_request('PATCH', *args, **kwargs) + + def client_request(self, method, url, + params=None, + data=None, + headers=None, + cookies=None, + files=None, + auth=None, + timeout=None, + allow_redirects=True, + proxies=None, + hooks=None, + stream=None, + verify=None, + cert=None, + json=None, + *, + expected_status=200) -> requests.Response: + """Performs a HTTP request to the server. + + Creates and re-uses the HTTP session, to have efficient communication. + """ + + import urllib.parse + + if not self.session: + self.session = requests.session() + self._log.debug('Creating new HTTP session') + + abs_url = urllib.parse.urljoin(self.manager_url, url) + self._log.debug('%s %s', method, abs_url) + + resp = self.session.request( + method, abs_url, + params=params, + data=data, + headers=headers, + cookies=cookies, + files=files, + auth=auth, + timeout=timeout, + allow_redirects=allow_redirects, + proxies=proxies, + hooks=hooks, + stream=stream, + verify=verify, + cert=cert, + json=json) + + return resp diff --git a/packages/flamenco-worker-python/flamenco_worker/worker.py b/packages/flamenco-worker-python/flamenco_worker/worker.py new file mode 100644 index 0000000000000000000000000000000000000000..8a803d2587f62a21e7c9daca32271c987600567a --- /dev/null +++ b/packages/flamenco-worker-python/flamenco_worker/worker.py @@ -0,0 +1,83 @@ +import attr + +from . import attrs_extra +from . import upstream + + +@attr.s +class FlamencoWorker: + manager = attr.ib(validator=attr.validators.instance_of(upstream.FlamencoManager)) + job_types = attr.ib(validator=attr.validators.instance_of(list)) + worker_id = attr.ib(validator=attr.validators.instance_of(str)) + worker_secret = attr.ib(validator=attr.validators.instance_of(str)) + + _log = attrs_extra.log('%s.FlamencoWorker' % __name__) + + def startup(self): + self._log.info('Starting up') + + if not self.worker_id or not self.worker_secret: + self.register_at_manager() + + def register_at_manager(self): + self._log.info('Registering at manager') + + self.worker_secret = generate_secret() + platform = detect_platform() + resp = self.manager.post( + '/register-worker', json={ + 'secret': self.worker_secret, + 'platform': platform, + 'supported_job_types': self.job_types, + }) + + result = resp.json() + self._log.info('Response: %s', result) + self.worker_id = result['_id'] + + self.write_registration_info() + + def write_registration_info(self): + """Writes the current worker ID and secret to the home dir.""" + + from . import config + + config.merge_with_home_config({ + 'worker_id': self.worker_id, + 'worker_secret': self.worker_secret, + }) + + def mainloop(self): + self._log.info('Entering main loop') + + +def generate_secret() -> str: + """Generates a 64-character secret key.""" + + import random + import string + + randomizer = random.SystemRandom() + tokens = string.ascii_letters + string.digits + secret = ''.join(randomizer.choice(tokens) for _ in range(64)) + + return secret + + +def detect_platform() -> str: + """Detects the platform, returning 'linux', 'windows' or 'darwin'. + + Raises an exception when the current platform cannot be detected + as one of those three. + """ + + import platform + + plat = platform.system().lower() + if not plat: + raise EnvironmentError('Unable to determine platform.') + + if plat in {'linux', 'windows', 'darwin'}: + return plat + + raise EnvironmentError('Unable to determine platform; unknown platform %r', plat) diff --git a/packages/flamenco-worker-python/requirements-dev.txt b/packages/flamenco-worker-python/requirements-dev.txt new file mode 100644 index 0000000000000000000000000000000000000000..f122cc5e13fe465da12d6a3e4440814493b65b9f --- /dev/null +++ b/packages/flamenco-worker-python/requirements-dev.txt @@ -0,0 +1,2 @@ +-r requirements-test.txt +ipython diff --git a/packages/flamenco-worker-python/requirements-test.txt b/packages/flamenco-worker-python/requirements-test.txt new file mode 100644 index 0000000000000000000000000000000000000000..98ed669fe6479ac2986a0e3d3ee8ba4acbeaf18b --- /dev/null +++ b/packages/flamenco-worker-python/requirements-test.txt @@ -0,0 +1,9 @@ +-r requirements.txt + +# Primary dependencies +pytest==3.0.5 +pytest-cov==2.4.0 + +# Secondary dependencies +coverage==4.2 +py==1.4.32 diff --git a/packages/flamenco-worker-python/requirements.txt b/packages/flamenco-worker-python/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..846e4ce00e6506db510dd08358f16d9062196870 --- /dev/null +++ b/packages/flamenco-worker-python/requirements.txt @@ -0,0 +1,2 @@ +attrs==16.3.0 +requests==2.12.4 diff --git a/packages/flamenco-worker-python/setup.cfg b/packages/flamenco-worker-python/setup.cfg new file mode 100644 index 0000000000000000000000000000000000000000..f241d1877e00755246a7047980f8feba043e8116 --- /dev/null +++ b/packages/flamenco-worker-python/setup.cfg @@ -0,0 +1,5 @@ +[tool:pytest] +addopts = -v --cov flamenco_worker --cov-report term-missing --ignore node_modules -x + +[pep8] +max-line-length = 100 diff --git a/packages/flamenco-worker-python/setup.py b/packages/flamenco-worker-python/setup.py new file mode 100755 index 0000000000000000000000000000000000000000..f9eb0a04692ed999091358071562e33304611d39 --- /dev/null +++ b/packages/flamenco-worker-python/setup.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python + +import setuptools + +if __name__ == '__main__': + setuptools.setup( + name='flamenco-worker', + version='1.0-alpha', + description='Flamenco Worker implementation', + author='Sybren A. Stüvel', + author_email='sybren@blender.studio', + packages=setuptools.find_packages(), + license='GPL', + classifiers=[ + 'Operating System :: OS Independent', + 'Programming Language :: Python', + 'Programming Language :: Python :: 3.5', + ], + install_requires=[ + 'attrs >=16.3.0', + 'requests>=2.12.4', + ], + entry_points={'console_scripts': [ + 'flamenco-worker = flamenco_worker.cli:main', + ]}, + )