Skip to content
Snippets Groups Projects
Select Git revision
  • cb6fa7e8932ff62ef6c1860ec334b880a7861268
  • master default protected
  • pbs_support
  • windows-fixes
  • T53099-gpu-rendering
  • wip-double-running
  • v2.2.1
  • v2.2
  • v2.2-dev8
  • v2.1.0
  • v2.0.8
  • v2.0.7
  • v2.0.6
  • v2.0.6-beta1
  • v2.0.5
  • v2.0.4
  • v2.0.2
  • v2.0.1
  • v2.0
  • v2.0-beta10-worker6
  • v2.0-beta10-worker5
  • v2.0-beta10-worker4
  • v2.0-beta10-worker3
  • v2.0-beta10
  • v2.0-beta9
  • v2.0-beta8
26 results

upstream_update_queue.py

Blame
  • user avatar
    Sybren A. Stüvel authored
    cb6fa7e8
    History
    upstream_update_queue.py 8.36 KiB
    """Queues task updates to Flamenco Manager.
    
    Task updates are pickled and stored in a SQLite database. Pickling allows
    for efficient conversion of Python objects into a binary data blob.
    """
    
    import asyncio
    import pickle
    import sqlite3
    import typing
    
    import attr
    
    from . import attrs_extra, upstream
    
    BACKOFF_TIME = 5  # seconds
    SHUTDOWN_RECHECK_TIME = 0.5  # seconds
    
    
    @attr.s
    class TaskUpdateQueue:
        manager = attr.ib(validator=attr.validators.instance_of(upstream.FlamencoManager))
        shutdown_future = attr.ib(
            validator=attr.validators.optional(attr.validators.instance_of(asyncio.Future)))
        db_fname = attr.ib(validator=attr.validators.instance_of(str))
    
        backoff_time = attr.ib(default=BACKOFF_TIME)
        shutdown_recheck_time = attr.ib(default=SHUTDOWN_RECHECK_TIME)
    
        _stuff_queued = attr.ib(default=attr.Factory(asyncio.Event), init=False)
        _db = attr.ib(default=None, init=False)
        _queue_lock = attr.ib(default=attr.Factory(asyncio.Lock), init=False)
        _log = attrs_extra.log('%s.TaskUpdateQueue' % __name__)
    
        def _connect_db(self):
            self._log.info('Connecting to database %s', self.db_fname)
            self._db = sqlite3.connect(self.db_fname, isolation_level=None)
    
            # We don't need to create a primary key; we use the implicit 'rowid' column.
            self._db.execute('CREATE TABLE IF NOT EXISTS fworker_queue(url TEXT, payload BLOB)')
    
            # Start with a more-or-less compact database.
            self._db.execute('VACUUM')
    
            # Now that that is out of the way, we can use the default SQLite behaviour again.
            self._db.isolation_level = ''
    
        def _disconnect_db(self):
            self._log.info('Disconnecting from database %s', self.db_fname)
            self._db.close()
            self._db = None
    
        def queue(self, url, payload):
            """Push some payload onto the queue."""
    
            if self._db is None:
                self._connect_db()
    
            # Store the pickled payload in the SQLite database.
            pickled = pickle.dumps(payload)
    
            self._db.execute('INSERT INTO fworker_queue (url, payload) values (?, ?)',
                             (url, pickled))
            self._db.commit()
    
            # Notify the work loop that stuff has been queued.
            self._stuff_queued.set()
    
        async def work(self, *, loop=None):
            """Loop that pushes queued payloads to the Flamenco Manager.
    
            Keeps running until shutdown_future.done() returns True.
            """
    
            # Always start by inspecting the persisted queue, so act as if something
            # was just queued.
            self._stuff_queued.set()
    
            while not self.shutdown_future.done():
                try:
                    await asyncio.wait_for(self._stuff_queued.wait(),
                                           self.shutdown_recheck_time,
                                           loop=loop)
                except asyncio.TimeoutError:
                    # This is normal, it just means that there wasn't anything queued within
                    # SHUTDOWN_RECHECK_TIME seconds.
                    continue
                except asyncio.CancelledError:
                    # We're being shut down.
                    break
    
                self._log.debug('Inspecting queued task updates.')
                await self.flush_and_catch(loop=loop)
            self._log.warning('Stopping work loop')
    
        def _queue(self) -> typing.Iterable[typing.Tuple[int, str, object]]:
            """Yields (rowid, url, unpickled payload) tuples from the database."""
    
            if self._db is None:
                self._connect_db()
                assert self._db is not None
    
            result = self._db.execute('''
                SELECT rowid, url, payload
                FROM fworker_queue
                ORDER BY rowid ASC
            ''')
            for row in result:
                rowid = row[0]
                url = row[1]
                payload = pickle.loads(row[2])
                yield rowid, url, payload
    
        def queue_size(self) -> int:
            """Return the number of items queued."""
            if self._db is None:
                self._connect_db()
                assert self._db is not None
    
            result = self._db.execute('SELECT count(*) FROM fworker_queue')
            count = next(result)[0]
            return count
    
        def _unqueue(self, rowid: int):
            """Removes a queued payload from the database."""
    
            # TODO Sybren: every once in a while, run 'vacuum' on the database.
            self._db.execute('DELETE FROM fworker_queue WHERE rowid=?', (rowid,))
            self._db.commit()
    
        async def flush(self, *, loop: asyncio.AbstractEventLoop) -> bool:
            """Tries to flush the queue to the Manager.
    
            Returns True iff the queue was empty, even before flushing.
            """
    
            async with self._queue_lock:
                queue_is_empty = True
                queue_size_before = self.queue_size()
                handled = 0
                for rowid, url, payload in self._queue():
                    queue_is_empty = False
    
                    queue_size = self.queue_size()
                    self._log.info('Pushing task update to Manager, queue size is %d', queue_size)
                    resp = await self.manager.post(url, json=payload, loop=loop)
                    if resp.status_code in {404, 409}:
                        # 404: Task doesn't exist (any more).
                        # 409: The task was assigned to another worker, so we're not allowed to
                        #      push updates for it. We have to un-queue this update, as it will
                        #      never be accepted.
                        self._log.warning('discarding update, Manager says %s', resp.text)
                        # TODO(sybren): delete all queued updates to the same URL?
                    else:
                        resp.raise_for_status()
                        self._log.debug('Master accepted pushed update.')
                    self._unqueue(rowid)
    
                    handled += 1
                    if handled > 1000:
                        self._log.info('Taking a break from queue flushing after %d items', handled)
                        break
    
                if queue_is_empty:
                    # Only clear the flag once the queue has really been cleared.
                    self._stuff_queued.clear()
    
                queue_size_after = self.queue_size()
                if queue_size_after > 0:
                    if queue_size_after >= queue_size_before:
                        self._log.warning(
                            'Queue size increased from %d to %d, after having flushed %d items',
                            queue_size_before, queue_size_after, handled)
                    else:
                        self._log.info(
                            'Queue size decreased from %d to %d, after having flushed %d items',
                            queue_size_before, queue_size_after, handled)
    
                self._log.debug('Vacuuming database')
                self._db.execute('VACUUM')
    
                return queue_is_empty
    
        async def flush_and_report(self, *, loop: asyncio.AbstractEventLoop):
            """Flushes the queue, and just reports errors, doesn't wait nor retry."""
    
            import requests
    
            self._log.info('flush_and_report: trying one last push to get updates to Manager')
    
            try:
                await self.flush(loop=loop)
            except requests.ConnectionError:
                self._log.warning('flush_and_report: Unable to connect to Manager, '
                                  'some items are still queued.')
            except requests.HTTPError as ex:
                self._log.warning('flush_and_report: Manager did not accept our updates (%s),'
                                  ' some items are still queued.', ex)
            except Exception:
                self._log.exception('flush_and_report: Unexpected exception, '
                                    'Some items are still queued.')
    
        async def flush_and_catch(self, *, loop: asyncio.AbstractEventLoop):
            """Flushes the queue, reports errors and waits before returning for another try."""
    
            import requests
    
            try:
                await self.flush(loop=loop)
            except requests.ConnectionError:
                self._log.warning('Unable to connect to Manager, will retry later.')
                await asyncio.sleep(self.backoff_time)
            except requests.HTTPError as ex:
                self._log.warning('Manager did not accept our updates (%s), will retry later.',
                                  ex)
                await asyncio.sleep(self.backoff_time)
            except Exception:
                self._log.exception('Unexpected exception in work loop. '
                                    'Backing off and retring later.')
                await asyncio.sleep(self.backoff_time)