Skip to content
Snippets Groups Projects
Commit 337e77b6 authored by Sybren A. Stüvel's avatar Sybren A. Stüvel
Browse files

Sign on at Manager when starting Worker

This passes the current hostname and list of supported task types to the
manager.
parent 95b4f137
No related branches found
No related tags found
No related merge requests found
...@@ -104,51 +104,89 @@ class FlamencoWorker: ...@@ -104,51 +104,89 @@ class FlamencoWorker:
return None return None
return self.task_id return self.task_id
async def startup(self, *, may_retry_register=True): async def startup(self, *, may_retry_loop=True):
self._log.info('Starting up') self._log.info('Starting up')
if not self.worker_id or not self.worker_secret: do_register = not self.worker_id or not self.worker_secret
await self.register_at_manager(may_retry_loop=may_retry_register) if do_register:
await self.register_at_manager(may_retry_loop=may_retry_loop)
# Once we know our ID and secret, update the manager object so that we # Once we know our ID and secret, update the manager object so that we
# don't have to pass our authentication info each and every call. # don't have to pass our authentication info each and every call.
self.manager.auth = (self.worker_id, self.worker_secret) self.manager.auth = (self.worker_id, self.worker_secret)
# We only need to sign on if we didn't just register. However, this
# can only happen after setting self.manager.auth.
if not do_register:
await self.signon(may_retry_loop=may_retry_loop)
self.schedule_fetch_task() self.schedule_fetch_task()
async def register_at_manager(self, *, may_retry_loop: bool): @staticmethod
import requests def hostname() -> str:
import socket import socket
return socket.gethostname()
self._log.info('Registering at manager') async def _keep_posting_to_manager(self, url: str, json: dict, *, use_auth=True,
may_retry_loop: bool):
import requests
self.worker_secret = generate_secret() post_kwargs = {
platform = detect_platform() 'json': json,
hostname = socket.gethostname() 'loop': self.loop,
}
if not use_auth:
post_kwargs['auth'] = None
while True: while True:
try: try:
resp = await self.manager.post( resp = await self.manager.post(url, **post_kwargs)
'/register-worker',
json={
'secret': self.worker_secret,
'platform': platform,
'supported_task_types': self.task_types,
'nickname': hostname,
},
auth=None, # explicitly do not use authentication
loop=self.loop,
)
resp.raise_for_status() resp.raise_for_status()
except requests.RequestException as ex: except requests.RequestException as ex:
if not may_retry_loop: if not may_retry_loop:
self._log.error('Unable to register at manager: %s', ex) self._log.error('Unable to POST to manager %s: %s', url, ex)
raise UnableToRegisterError() raise UnableToRegisterError()
self._log.warning('Unable to register at manager, retrying in %i seconds: %s', self._log.warning('Unable to POST to manager %s, retrying in %i seconds: %s',
REGISTER_AT_MANAGER_FAILED_RETRY_DELAY, ex) url, REGISTER_AT_MANAGER_FAILED_RETRY_DELAY, ex)
await asyncio.sleep(REGISTER_AT_MANAGER_FAILED_RETRY_DELAY) await asyncio.sleep(REGISTER_AT_MANAGER_FAILED_RETRY_DELAY)
else: else:
break return resp
async def signon(self, *, may_retry_loop: bool):
"""Signs on at the manager.
Only needed when we didn't just register.
"""
self._log.info('Signing on at manager.')
await self._keep_posting_to_manager(
'/sign-on',
json={
'supported_task_types': self.task_types,
'nickname': self.hostname(),
},
may_retry_loop=may_retry_loop,
)
self._log.info('Manager accepted sign-on.')
async def register_at_manager(self, *, may_retry_loop: bool):
self._log.info('Registering at manager')
self.worker_secret = generate_secret()
platform = detect_platform()
resp = await self._keep_posting_to_manager(
'/register-worker',
json={
'secret': self.worker_secret,
'platform': platform,
'supported_task_types': self.task_types,
'nickname': self.hostname(),
},
use_auth=False, # explicitly do not use authentication
may_retry_loop=may_retry_loop,
)
result = resp.json() result = resp.json()
self._log.info('Response: %s', result) self._log.info('Response: %s', result)
......
...@@ -56,11 +56,25 @@ class AbstractFWorkerTest(AbstractWorkerTest): ...@@ -56,11 +56,25 @@ class AbstractFWorkerTest(AbstractWorkerTest):
class WorkerStartupTest(AbstractFWorkerTest): class WorkerStartupTest(AbstractFWorkerTest):
# Mock merge_with_home_config() so that it doesn't overwrite actual config. # Mock merge_with_home_config() so that it doesn't overwrite actual config.
@unittest.mock.patch('socket.gethostname')
@unittest.mock.patch('flamenco_worker.config.merge_with_home_config') @unittest.mock.patch('flamenco_worker.config.merge_with_home_config')
def test_startup_already_registered(self, mock_merge_with_home_config): def test_startup_already_registered(self, mock_merge_with_home_config, mock_gethostname):
self.asyncio_loop.run_until_complete(self.worker.startup(may_retry_register=False))
from mock_responses import EmptyResponse, CoroMock
mock_gethostname.return_value = 'ws-unittest'
self.manager.post = CoroMock(return_value=EmptyResponse())
self.asyncio_loop.run_until_complete(self.worker.startup(may_retry_loop=False))
mock_merge_with_home_config.assert_not_called() # Starting with known ID/secret mock_merge_with_home_config.assert_not_called() # Starting with known ID/secret
self.manager.post.assert_not_called() self.manager.post.assert_called_once_with(
'/sign-on',
json={
'supported_task_types': ['sleep', 'unittest'],
'nickname': 'ws-unittest',
},
loop=self.asyncio_loop,
)
self.tuqueue.queue.assert_not_called() self.tuqueue.queue.assert_not_called()
@unittest.mock.patch('socket.gethostname') @unittest.mock.patch('socket.gethostname')
...@@ -76,7 +90,7 @@ class WorkerStartupTest(AbstractFWorkerTest): ...@@ -76,7 +90,7 @@ class WorkerStartupTest(AbstractFWorkerTest):
'_id': '5555', '_id': '5555',
})) }))
self.asyncio_loop.run_until_complete(self.worker.startup(may_retry_register=False)) self.asyncio_loop.run_until_complete(self.worker.startup(may_retry_loop=False))
mock_merge_with_home_config.assert_called_once_with( mock_merge_with_home_config.assert_called_once_with(
{'worker_id': '5555', {'worker_id': '5555',
'worker_secret': self.worker.worker_secret} 'worker_secret': self.worker.worker_secret}
...@@ -113,7 +127,7 @@ class WorkerStartupTest(AbstractFWorkerTest): ...@@ -113,7 +127,7 @@ class WorkerStartupTest(AbstractFWorkerTest):
# Mock merge_with_home_config() so that it doesn't overwrite actual config. # Mock merge_with_home_config() so that it doesn't overwrite actual config.
self.assertRaises(UnableToRegisterError, self.assertRaises(UnableToRegisterError,
self.asyncio_loop.run_until_complete, self.asyncio_loop.run_until_complete,
self.worker.startup(may_retry_register=False)) self.worker.startup(may_retry_loop=False))
mock_merge_with_home_config.assert_not_called() mock_merge_with_home_config.assert_not_called()
assert isinstance(self.manager.post, unittest.mock.Mock) assert isinstance(self.manager.post, unittest.mock.Mock)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment