Skip to content
Snippets Groups Projects
Commit 1d4bc757 authored by Sybren A. Stüvel's avatar Sybren A. Stüvel
Browse files

Added Python 3.5 Flamenco Worker.

It can register with the Flamenco Manager, but nothing
else. It does save its secret & worker ID in a config
file, though.
parent df4ee7ff
No related branches found
No related tags found
No related merge requests found
Showing
with 378 additions and 0 deletions
...@@ -48,3 +48,5 @@ docs/site/ ...@@ -48,3 +48,5 @@ docs/site/
/packages/flamenco-manager-go/src/flamenco-manager/docker/flamenco-manager.yaml /packages/flamenco-manager-go/src/flamenco-manager/docker/flamenco-manager.yaml
/packages/flamenco-manager-go/src/flamenco-manager/docker/flamenco-manager-install-*.sh /packages/flamenco-manager-go/src/flamenco-manager/docker/flamenco-manager-install-*.sh
/packages/flamenco-manager-go/src/flamenco-manager/docker/flamenco-manager-*.docker.tgz /packages/flamenco-manager-go/src/flamenco-manager/docker/flamenco-manager-*.docker.tgz
/packages/flamenco-worker-python/flamenco_worker.egg-info/
- activity: latest thing the worker is doing
- short description
- current command name
- percentage completed of
- task
- command
- contained in task in Flamenco Server MongoDB
- log: full log of the whole task
- complete output of every command
- multiple chunks of gzipped data
- own collection in Flamenco Server MongoDB
[flamenco-worker]
manager_url = http://localhost:8083/
job_types = sleep blender_render_simple
"""Extra functionality for attrs."""
import logging
import attr
def log(name):
"""Returns a logger attr.ib
:param name: name to pass to logging.getLogger()
:rtype: attr.ib
"""
return attr.ib(default=logging.getLogger(name),
repr=False,
hash=False,
cmp=False)
"""Commandline interface entry points."""
import argparse
import collections
import configparser
import logging
import logging.config
import os
DEFAULT_CONFIG = {
'flamenco-worker': collections.OrderedDict([
('manager_url', 'http://flamenco-manager/'),
('job_types', 'sleep blender_render_simple'),
('worker_id', ''),
('worker_secret', ''),
])
}
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--config',
help='Load this configuration file instead of the default files.')
parser.add_argument('-q', '--quiet', action='store_true',
help='Log less (only WARNING and more severe).')
parser.add_argument('-v', '--verbose', action='store_true',
help='Log more (DEBUG and more severe).')
args = parser.parse_args()
# Set up logging
if args.quiet:
level = 'WARNING'
elif args.verbose:
level = 'DEBUG'
else:
level = 'INFO'
logging.config.dictConfig({
'version': 1,
'formatters': {
'default': {'format': '%(asctime)-15s %(levelname)8s %(name)s %(message)s'}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'default',
'stream': 'ext://sys.stderr',
}
},
'loggers': {
'flamenco_worker': {'level': level},
},
'root': {
'level': 'WARNING',
'handlers': [
'console',
],
}
})
log = logging.getLogger(__name__)
log.debug('Starting')
# Load configuration
confparser = configparser.ConfigParser()
confparser.read_dict(DEFAULT_CONFIG)
if args.config:
log.info('Loading configuration from %s', args.config)
confparser.read(args.config, encoding='utf8')
else:
from . import config as config_module
config_files = [config_module.GLOBAL_CONFIG_FILE,
config_module.HOME_CONFIG_FILE]
log.info('Loading configuration from %s', ', '.join(config_files))
confparser.read(config_files, encoding='utf8')
from .config import CONFIG_SECTION
if args.verbose:
import sys
log.info('Effective configuration:')
to_show = configparser.ConfigParser()
to_show.read_dict(confparser)
if to_show.get(CONFIG_SECTION, 'worker_secret'):
to_show.set(CONFIG_SECTION, 'worker_secret', '-hidden-')
to_show.write(sys.stderr)
from . import worker, upstream
fmanager = upstream.FlamencoManager(
manager_url=confparser.get(CONFIG_SECTION, 'manager_url'),
)
fworker = worker.FlamencoWorker(
manager=fmanager,
job_types=confparser.get(CONFIG_SECTION, 'job_types').split(),
worker_id=confparser.get(CONFIG_SECTION, 'worker_id'),
worker_secret=confparser.get(CONFIG_SECTION, 'worker_secret'),
)
try:
fworker.startup()
fworker.mainloop()
except:
log.exception('Uncaught exception!')
log.warning('Shutting down')
if __name__ == '__main__':
main()
"""Writes configuration to a config file in the home directory."""
import configparser
import os.path
import logging
HOME_CONFIG_FILE = os.path.expanduser('~/.flamenco-worker.cfg')
GLOBAL_CONFIG_FILE = 'flamenco-worker.cfg'
CONFIG_SECTION = 'flamenco-worker'
log = logging.getLogger(__name__)
def merge_with_home_config(new_conf: dict):
"""Updates the home configuration file with the given config dict."""
confparser = configparser.ConfigParser()
confparser.read_dict({CONFIG_SECTION: {}})
confparser.read(HOME_CONFIG_FILE, encoding='utf8')
for key, value in new_conf.items():
confparser.set(CONFIG_SECTION, key, value)
tmpname = HOME_CONFIG_FILE + '~'
log.debug('Writing configuration file to %s', tmpname)
with open(tmpname, mode='wt', encoding='utf8') as outfile:
confparser.write(outfile)
log.debug('Moving configuration file to %s', HOME_CONFIG_FILE)
os.replace(tmpname, HOME_CONFIG_FILE)
log.info('Updated configuration file %s', HOME_CONFIG_FILE)
import attr
import requests
from . import attrs_extra
@attr.s
class FlamencoManager:
manager_url = attr.ib(validator=attr.validators.instance_of(str))
session = attr.ib(default=None, init=False)
_log = attrs_extra.log('%s.FlamencoManager' % __name__)
def get(self, *args, **kwargs):
return self.client_request('GET', *args, **kwargs)
def post(self, *args, **kwargs):
return self.client_request('POST', *args, **kwargs)
def put(self, *args, **kwargs):
return self.client_request('PUT', *args, **kwargs)
def delete(self, *args, **kwargs):
return self.client_request('DELETE', *args, **kwargs)
def patch(self, *args, **kwargs):
return self.client_request('PATCH', *args, **kwargs)
def client_request(self, method, url,
params=None,
data=None,
headers=None,
cookies=None,
files=None,
auth=None,
timeout=None,
allow_redirects=True,
proxies=None,
hooks=None,
stream=None,
verify=None,
cert=None,
json=None,
*,
expected_status=200) -> requests.Response:
"""Performs a HTTP request to the server.
Creates and re-uses the HTTP session, to have efficient communication.
"""
import urllib.parse
if not self.session:
self.session = requests.session()
self._log.debug('Creating new HTTP session')
abs_url = urllib.parse.urljoin(self.manager_url, url)
self._log.debug('%s %s', method, abs_url)
resp = self.session.request(
method, abs_url,
params=params,
data=data,
headers=headers,
cookies=cookies,
files=files,
auth=auth,
timeout=timeout,
allow_redirects=allow_redirects,
proxies=proxies,
hooks=hooks,
stream=stream,
verify=verify,
cert=cert,
json=json)
return resp
import attr
from . import attrs_extra
from . import upstream
@attr.s
class FlamencoWorker:
manager = attr.ib(validator=attr.validators.instance_of(upstream.FlamencoManager))
job_types = attr.ib(validator=attr.validators.instance_of(list))
worker_id = attr.ib(validator=attr.validators.instance_of(str))
worker_secret = attr.ib(validator=attr.validators.instance_of(str))
_log = attrs_extra.log('%s.FlamencoWorker' % __name__)
def startup(self):
self._log.info('Starting up')
if not self.worker_id or not self.worker_secret:
self.register_at_manager()
def register_at_manager(self):
self._log.info('Registering at manager')
self.worker_secret = generate_secret()
platform = detect_platform()
resp = self.manager.post(
'/register-worker', json={
'secret': self.worker_secret,
'platform': platform,
'supported_job_types': self.job_types,
})
result = resp.json()
self._log.info('Response: %s', result)
self.worker_id = result['_id']
self.write_registration_info()
def write_registration_info(self):
"""Writes the current worker ID and secret to the home dir."""
from . import config
config.merge_with_home_config({
'worker_id': self.worker_id,
'worker_secret': self.worker_secret,
})
def mainloop(self):
self._log.info('Entering main loop')
def generate_secret() -> str:
"""Generates a 64-character secret key."""
import random
import string
randomizer = random.SystemRandom()
tokens = string.ascii_letters + string.digits
secret = ''.join(randomizer.choice(tokens) for _ in range(64))
return secret
def detect_platform() -> str:
"""Detects the platform, returning 'linux', 'windows' or 'darwin'.
Raises an exception when the current platform cannot be detected
as one of those three.
"""
import platform
plat = platform.system().lower()
if not plat:
raise EnvironmentError('Unable to determine platform.')
if plat in {'linux', 'windows', 'darwin'}:
return plat
raise EnvironmentError('Unable to determine platform; unknown platform %r', plat)
-r requirements-test.txt
ipython
-r requirements.txt
# Primary dependencies
pytest==3.0.5
pytest-cov==2.4.0
# Secondary dependencies
coverage==4.2
py==1.4.32
attrs==16.3.0
requests==2.12.4
[tool:pytest]
addopts = -v --cov flamenco_worker --cov-report term-missing --ignore node_modules -x
[pep8]
max-line-length = 100
#!/usr/bin/env python
import setuptools
if __name__ == '__main__':
setuptools.setup(
name='flamenco-worker',
version='1.0-alpha',
description='Flamenco Worker implementation',
author='Sybren A. Stüvel',
author_email='sybren@blender.studio',
packages=setuptools.find_packages(),
license='GPL',
classifiers=[
'Operating System :: OS Independent',
'Programming Language :: Python',
'Programming Language :: Python :: 3.5',
],
install_requires=[
'attrs >=16.3.0',
'requests>=2.12.4',
],
entry_points={'console_scripts': [
'flamenco-worker = flamenco_worker.cli:main',
]},
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment