Skip to content
Snippets Groups Projects
Commit 8d2a7d24 authored by Sybren A. Stüvel's avatar Sybren A. Stüvel
Browse files

Worker: HTTP calls to Master are performed asynchronously & in a diff thread

parent 777d8353
Branches
No related tags found
No related merge requests found
......@@ -90,7 +90,7 @@ def main():
asyncio.ensure_future(tuqueue.work(loop=loop))
try:
fworker.startup()
loop.run_until_complete(fworker.startup())
fworker.mainloop()
except worker.UnableToRegisterError:
# The worker will have logged something, we'll just shut down cleanly.
......
import attr
import concurrent.futures
import requests
from . import attrs_extra
......@@ -9,51 +10,55 @@ HTTP_TIMEOUT = 3 # in seconds
@attr.s
class FlamencoManager:
# TODO Sybren: make all functions async
manager_url = attr.ib(validator=attr.validators.instance_of(str))
session = attr.ib(default=None, init=False)
auth = attr.ib(default=None, init=False) # tuple (worker_id, worker_secret)
# Executor for HTTP requests, so that they can run in separate threads.
_executor = attr.ib(default=attr.Factory(concurrent.futures.ThreadPoolExecutor),
init=False)
_log = attrs_extra.log('%s.FlamencoManager' % __name__)
def get(self, *args, **kwargs) -> requests.Response:
return self.client_request('GET', *args, **kwargs)
def post(self, *args, **kwargs) -> requests.Response:
return self.client_request('POST', *args, **kwargs)
def put(self, *args, **kwargs) -> requests.Response:
return self.client_request('PUT', *args, **kwargs)
def delete(self, *args, **kwargs) -> requests.Response:
return self.client_request('DELETE', *args, **kwargs)
def patch(self, *args, **kwargs) -> requests.Response:
return self.client_request('PATCH', *args, **kwargs)
def client_request(self, method, url,
params=None,
data=None,
headers=None,
cookies=None,
files=None,
auth=...,
timeout=HTTP_TIMEOUT,
allow_redirects=True,
proxies=None,
hooks=None,
stream=None,
verify=None,
cert=None,
json=None) -> requests.Response:
async def get(self, *args, loop, **kwargs) -> requests.Response:
return await self.client_request('GET', *args, loop=loop, **kwargs)
async def post(self, *args, loop, **kwargs) -> requests.Response:
return await self.client_request('POST', *args, loop=loop, **kwargs)
async def put(self, *args, loop, **kwargs) -> requests.Response:
return await self.client_request('PUT', *args, loop=loop, **kwargs)
async def delete(self, *args, loop, **kwargs) -> requests.Response:
return await self.client_request('DELETE', *args, loop=loop, **kwargs)
async def patch(self, *args, loop, **kwargs) -> requests.Response:
return await self.client_request('PATCH', *args, loop=loop, **kwargs)
async def client_request(self, method, url, *,
params=None,
data=None,
headers=None,
cookies=None,
files=None,
auth=...,
timeout=HTTP_TIMEOUT,
allow_redirects=True,
proxies=None,
hooks=None,
stream=None,
verify=None,
cert=None,
json=None,
loop) -> requests.Response:
"""Performs a HTTP request to the server.
Creates and re-uses the HTTP session, to have efficient communication.
if 'auth=...' (the default), self.auth is used. If 'auth=None', no authentication is used.
if 'auth=...' (the async default), self.auth is used. If 'auth=None', no authentication is used.
"""
import urllib.parse
from functools import partial
if not self.session:
from requests.adapters import HTTPAdapter
......@@ -65,21 +70,23 @@ class FlamencoManager:
abs_url = urllib.parse.urljoin(self.manager_url, url)
self._log.debug('%s %s JSON: %s', method, abs_url, json)
resp = self.session.request(
method, abs_url,
params=params,
data=data,
headers=headers,
cookies=cookies,
files=files,
auth=self.auth if auth is ... else auth,
timeout=timeout,
allow_redirects=allow_redirects,
proxies=proxies,
hooks=hooks,
stream=stream,
verify=verify,
cert=cert,
json=json)
http_req = partial(self.session.request,
method, abs_url,
params=params,
data=data,
headers=headers,
cookies=cookies,
files=files,
auth=self.auth if auth is ... else auth,
timeout=timeout,
allow_redirects=allow_redirects,
proxies=proxies,
hooks=hooks,
stream=stream,
verify=verify,
cert=cert,
json=json)
resp = await loop.run_in_executor(self._executor, http_req)
return resp
......@@ -85,7 +85,7 @@ class TaskUpdateQueue:
queue_is_empty = False
self._log.info('Pushing task update to Manager')
resp = self.manager.post(url, json=payload)
resp = await self.manager.post(url, json=payload, loop=loop)
resp.raise_for_status()
self._log.debug('Master accepted pushed update.')
self._unqueue(rowid)
......
......@@ -65,18 +65,18 @@ class FlamencoWorker:
_log = attrs_extra.log('%s.FlamencoWorker' % __name__)
def startup(self):
async def startup(self):
self._log.info('Starting up')
if not self.worker_id or not self.worker_secret:
self.register_at_manager()
await self.register_at_manager()
# Once we know our ID and secret, update the manager object so that we
# don't have to pass our authentication info each and every call.
self.manager.auth = (self.worker_id, self.worker_secret)
self.schedule_fetch_task()
def register_at_manager(self):
async def register_at_manager(self):
import requests
self._log.info('Registering at manager')
......@@ -85,7 +85,7 @@ class FlamencoWorker:
platform = detect_platform()
try:
resp = self.manager.post(
resp = await self.manager.post(
'/register-worker',
json={
'secret': self.worker_secret,
......@@ -93,6 +93,7 @@ class FlamencoWorker:
'supported_job_types': self.job_types,
},
auth=None, # explicitly do not use authentication
loop=self.loop,
)
except requests.ConnectionError:
self._log.error('Unable to register at manager, aborting.')
......@@ -173,7 +174,7 @@ class FlamencoWorker:
# TODO: use exponential backoff instead of retrying every fixed N seconds.
self._log.info('Fetching task')
try:
resp = self.manager.post('/task')
resp = await self.manager.post('/task', loop=self.loop)
except requests.exceptions.RequestException as ex:
self._log.warning('Error fetching new task, will retry in %i seconds: %s',
FETCH_TASK_FAILED_RETRY_DELAY, ex)
......
......@@ -35,3 +35,24 @@ class EmptyResponse:
def raise_for_status(self):
pass
def CoroMock(return_value=...):
"""Corountine mocking object.
For an example, see test_coro_mock.py.
Source: http://stackoverflow.com/a/32505333/875379
"""
import asyncio
from unittest.mock import Mock
coro = Mock(name="CoroutineResult")
corofunc = Mock(name="CoroutineFunction", side_effect=asyncio.coroutine(coro))
corofunc.coro = coro
if return_value is not ...:
corofunc.coro.return_value = return_value
return corofunc
"""Unit test for our CoroMock implementation."""
import asyncio
import unittest
class CoroMockTest(unittest.TestCase):
def test_setting_return_value(self):
from mock_responses import CoroMock
cm = CoroMock()
cm.coro.return_value = '123'
result = asyncio.get_event_loop().run_until_complete(cm(3, 4))
cm.assert_called_once_with(3, 4)
self.assertEqual('123', result)
......@@ -13,12 +13,13 @@ class TaskUpdateQueueTest(AbstractWorkerTest):
def setUp(self):
from flamenco_worker.upstream import FlamencoManager
from flamenco_worker.upstream_update_queue import TaskUpdateQueue
from mock_responses import CoroMock
self.asyncio_loop = asyncio.get_event_loop()
self.shutdown_future = self.asyncio_loop.create_future()
self.manager = Mock(spec=FlamencoManager)
self.manager.post = Mock()
self.manager.post = CoroMock()
self.tmpdir = tempfile.TemporaryDirectory()
self.tuqueue = TaskUpdateQueue(
......@@ -47,11 +48,13 @@ class TaskUpdateQueueTest(AbstractWorkerTest):
tries = 0
received_payload = None
received_url = None
received_loop = None
def push_callback(url, *, json):
async def push_callback(url, *, json, loop):
nonlocal tries
nonlocal received_url
nonlocal received_payload
nonlocal received_loop
tries += 1
if tries < 3:
......@@ -66,6 +69,8 @@ class TaskUpdateQueueTest(AbstractWorkerTest):
# since the work loop is designed to keep running, even when exceptions are thrown.
received_url = url
received_payload = copy.deepcopy(json)
received_loop = loop
return EmptyResponse()
self.manager.post.side_effect = push_callback
......@@ -84,6 +89,7 @@ class TaskUpdateQueueTest(AbstractWorkerTest):
# Check the payload.
self.assertEqual(received_url, '/push/here')
self.assertEqual(received_payload, payload)
self.assertEqual(received_loop, self.asyncio_loop)
def test_queue_persistence(self):
"""Check that updates are pushed, even when the process is stopped & restarted."""
......@@ -110,16 +116,19 @@ class TaskUpdateQueueTest(AbstractWorkerTest):
received_payload = None
received_url = None
received_loop = None
def push_callback(url, *, json):
async def push_callback(url, *, json, loop):
nonlocal received_url
nonlocal received_payload
nonlocal received_loop
# Shut down after handling this push.
self.shutdown_future.cancel()
received_url = url
received_payload = copy.deepcopy(json)
received_loop = loop
return EmptyResponse()
self.manager.post.side_effect = push_callback
......@@ -135,3 +144,4 @@ class TaskUpdateQueueTest(AbstractWorkerTest):
# Check the payload
self.assertEqual(received_url, '/push/there')
self.assertEqual(received_payload, payload)
self.assertEqual(received_loop, self.asyncio_loop)
......@@ -44,24 +44,30 @@ class AbstractWorkerTest(unittest.TestCase):
class WorkerStartupTest(AbstractWorkerTest):
def test_startup_already_registered(self):
self.worker.startup()
# Mock merge_with_home_config() so that it doesn't overwrite actual config.
@unittest.mock.patch('flamenco_worker.config.merge_with_home_config')
def test_startup_already_registered(self, mock_merge_with_home_config):
self.asyncio_loop.run_until_complete(self.worker.startup())
mock_merge_with_home_config.assert_not_called() # Starting with known ID/secret
self.manager.post.assert_not_called()
self.tuqueue.queue.assert_not_called()
def test_startup_registration(self):
@unittest.mock.patch('flamenco_worker.config.merge_with_home_config')
def test_startup_registration(self, mock_merge_with_home_config):
from flamenco_worker.worker import detect_platform
from mock_responses import JsonResponse
from mock_responses import JsonResponse, CoroMock
self.worker.worker_id = None
self.manager.post = Mock(return_value=JsonResponse({
self.manager.post = CoroMock(return_value=JsonResponse({
'_id': '5555',
}))
# Mock merge_with_home_config() so that it doesn't overwrite actual config.
with unittest.mock.patch('flamenco_worker.config.merge_with_home_config'):
self.worker.startup()
self.asyncio_loop.run_until_complete(self.worker.startup())
mock_merge_with_home_config.assert_called_once_with(
{'worker_id': '5555',
'worker_secret': self.worker.worker_secret}
)
assert isinstance(self.manager.post, unittest.mock.Mock)
self.manager.post.assert_called_once_with(
......@@ -72,23 +78,27 @@ class WorkerStartupTest(AbstractWorkerTest):
'secret': self.worker.worker_secret,
},
auth=None,
loop=self.asyncio_loop,
)
def test_startup_registration_unhappy(self):
@unittest.mock.patch('flamenco_worker.config.merge_with_home_config')
def test_startup_registration_unhappy(self, mock_merge_with_home_config):
"""Test that startup is aborted when the worker can't register."""
from flamenco_worker.worker import detect_platform
from mock_responses import JsonResponse
from mock_responses import JsonResponse, CoroMock
self.worker.worker_id = None
self.manager.post = unittest.mock.Mock(return_value=JsonResponse({
self.manager.post = CoroMock(return_value=JsonResponse({
'_id': '5555',
}, status_code=500))
# Mock merge_with_home_config() so that it doesn't overwrite actual config.
with unittest.mock.patch('flamenco_worker.config.merge_with_home_config'):
self.assertRaises(requests.HTTPError, self.worker.startup)
self.assertRaises(requests.HTTPError,
self.asyncio_loop.run_until_complete,
self.worker.startup())
mock_merge_with_home_config.assert_not_called()
assert isinstance(self.manager.post, unittest.mock.Mock)
self.manager.post.assert_called_once_with(
......@@ -99,6 +109,7 @@ class WorkerStartupTest(AbstractWorkerTest):
'secret': self.worker.worker_secret,
},
auth=None,
loop=self.asyncio_loop,
)
......@@ -119,11 +130,11 @@ class TestWorkerTaskFetch(AbstractWorkerTest):
def test_fetch_task_happy(self):
from unittest.mock import call
from mock_responses import JsonResponse, EmptyResponse
from mock_responses import JsonResponse, CoroMock
self.manager.post = Mock()
self.manager.post = CoroMock()
# response when fetching a task
self.manager.post.return_value = JsonResponse({
self.manager.post.coro.return_value = JsonResponse({
'_id': '58514d1e9837734f2e71b479',
'job': '58514d1e9837734f2e71b477',
'manager': '585a795698377345814d2f68',
......@@ -156,12 +167,13 @@ class TestWorkerTaskFetch(AbstractWorkerTest):
# Another fetch-task task should have been scheduled.
self.assertNotEqual(self.worker.fetch_task_task, interesting_task)
self.manager.post.assert_called_once_with('/task')
self.manager.post.assert_called_once_with('/task', loop=self.asyncio_loop)
self.tuqueue.queue.assert_has_calls([
call('/tasks/58514d1e9837734f2e71b479/update',
{'task_progress_percentage': 0, 'activity': '',
'command_progress_percentage': 0, 'task_status': 'active',
'current_command_idx': 0}),
'current_command_idx': 0}
),
call('/tasks/58514d1e9837734f2e71b479/update',
{'task_progress_percentage': 0, 'activity': '',
'command_progress_percentage': 0, 'task_status': 'completed',
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment