Source code for cioservice.lib.build_manager

"""Management of parallelized builds."""

from __future__ import annotations
from time import time
from threading import Thread, Event, enumerate as thread_enumerate

from pyramid.registry import Registry

from ..models.dbjob import JOB_DEFAULT_TTL, DBJob, DBJobLog
from .i18n import _, translate

CACHE_NS_BUILDS = 'ciosrv-builds'
CACHE_NS_SPOOL = 'ciosrv-spool'
DEFAULT_CONCURRENCY = 3
TIMEOUT_STOP = 10
DEADLINE_DELTA = 30
ENVIRONMENT_TTL = 172800  # 2 days


# =============================================================================
[docs]class BuildManager(): """This class manages the launch of parallelized builds. :param int concurrency: (default=DEFAULT_CONCURRENCY) Maximum number of builds running at the same time. :param registry: Application registry or registry like. The build environment is a dictionary with the following keys: * ``'job'``: job dictionary * ``'params'``: dictionary of parameters for the build * ``'status'``: current status (``'pending'``, ``'in_spool'``, ``'running'``, ``'stopping'`` or ``'completed'``) * ``'launch'``: launch time * ``'duration'``: duration of the processing * ``'result'``: (optional) result of the processing """ # ------------------------------------------------------------------------- def __init__( self, concurrency: int = DEFAULT_CONCURRENCY, registry: Registry | None = None): """Constructor method.""" self.concurrency: int = concurrency self._registry: Registry | None = registry self._callbacks: dict = {} # -------------------------------------------------------------------------
[docs] def set_registry(self, registry: Registry): """Set the internal registry. :param registry: Application registry or registry like. """ self._registry = registry
# -------------------------------------------------------------------------
[docs] def add_callback(self, callback_id: str, callback_function): """Add a callback function. :param str callback_id: Callback ID. :param str callback_function: Callback fucntion. """ self._callbacks[callback_id] = callback_function
# -------------------------------------------------------------------------
[docs] def remove_callback(self, callback_id: str): """Remove a callback function. :param str callback_id: Callback ID. :param str callback_function: Callback fucntion. """ if callback_id in self._callbacks: del self._callbacks[callback_id]
# -------------------------------------------------------------------------
[docs] def build_env( self, build_id: str, check_stopping: bool = False) -> dict | None: """Return the build environment for build `build_id`. :param str build_id: Build ID. :param bool check_stopping: (default=False) if ``True``, check if the build is stopping correctly. :rtype: :class:`dict` or ``None`` """ if self._registry is None or 'cache_global' not in self._registry: return None environment = self._registry['cache_global'].get( build_id, CACHE_NS_BUILDS, expire=ENVIRONMENT_TTL) if not check_stopping or not environment: return environment thread = self._thread(build_id) if thread is None: if environment['launch'] and \ environment['launch'] + environment['job']['ttl'] \ < time() - 2 * DEADLINE_DELTA: self._build_env_set_status(environment, 'completed') self._build_env_save(build_id, environment) return environment if not thread.is_alive(): if environment['status'] in ('running', 'stopping'): self._build_env_set_status(environment, 'completed') self._build_env_save(build_id, environment) return environment if environment['status'] == 'stopping' \ and not thread.stopping.is_set(): thread.stop() return environment if environment['status'] in ('running', 'stopping'): if thread.deadline < time() - 2 * DEADLINE_DELTA: thread.stop() self._build_env_set_status(environment, 'completed') self._build_env_save(build_id, environment) elif thread.deadline < time() - DEADLINE_DELTA: thread.stop() self._build_env_set_status(environment, 'stopping') self._build_env_save(build_id, environment) return environment
# -------------------------------------------------------------------------
[docs] def register(self, build_id: str, job: dict, params: dict) -> str | None: """Register a new build. :param str build_id: Build ID. :param dict job: Job to launch. :param dict params: Parameters of the build. :return: Error message or ``None``. """ if self._registry is None or 'cache_global' not in self._registry: return _('Build manager is not activated.') build_env = self.build_env(build_id, True) if build_env is not None \ and build_env['status'] in ('running', 'stopping'): self._schedule() return None build_env = { 'job': job, 'params': params, 'launch': 0, 'duration': 0, 'progress': (0, '', 0, '') } self._build_env_set_status(build_env, 'pending') self._build_env_save(build_id, build_env) self._schedule() return None
# -------------------------------------------------------------------------
[docs] def run(self, build_id: str, build_env: dict | None = None) -> str | None: """Add a build to the spool and try to launch it. :param str build_id: Build ID. :param dict build_env: (optional) The environment of the current build. :return: Error message or ``None``. """ if self._registry is None or 'cache_global' not in self._registry: return _('Build manager is not activated.') if build_env is None: build_env = self.build_env(build_id, True) if build_env is None: return _('This build does not exist.') if build_env['status'] == 'in_spool': return _('Pending build.') if build_env['status'] in ('running', 'stopping'): return _('Build in progress.') self._build_env_set_status(build_env, 'in_spool') self._build_env_save(build_id, build_env) spool = self._spool_load() spool.append(build_id) self._schedule(spool) return None
# -------------------------------------------------------------------------
[docs] def set_progress(self, build_id: str, progress: list): """Save the current progress value in the build environment. :param str build_id: Build ID. :param list progress: A tuple such as ``[file_percent, file_name, step_percent, step_trace]``. """ build_env = self.build_env(build_id) if build_env is not None: build_env['progress'] = progress self._build_env_save(build_id, build_env)
# -------------------------------------------------------------------------
[docs] def stop( self, build_id: str, build_env: dict | None = None) -> dict | None: """Stop a build and return its environment. :param str build_id: ID of the build to stop. :param dict build_env: (optional) The environment of the current build. :rtype: :class:`dict` or ``None`` """ if build_env is None: build_env = self.build_env(build_id) # Unknown if build_env is None or \ self._registry is None or 'cache_global' not in self._registry: return None if build_env['status'] == 'in_spool': self._build_env_set_status(build_env, 'pending') self._build_env_save(build_id, build_env) elif build_env['status'] == 'running': self._build_env_set_status(build_env, 'stopping') self._build_env_save(build_id, build_env) thread = self._thread(build_id) if thread is not None: thread.join(TIMEOUT_STOP) elif build_env['status'] == 'stopping' and build_env['launch'] and \ build_env['launch'] + build_env['job']['ttl'] \ < time() - 2 * DEADLINE_DELTA: self._build_env_set_status(build_env, 'completed') self._build_env_save(build_id, build_env) # Launch other builds self._schedule() return build_env
# -------------------------------------------------------------------------
[docs] def completed(self, build_id: str, callback_id: str, result: dict): """Call back for a completed build. :param str build_id: ID of the completed build. :param str callback_id: ID of the callback function. :param dict result: Result of the processing. See: :class:`.lib.build.Build`. """ if self._registry is None or 'cache_global' not in self._registry: return build_env = self.build_env(build_id) if build_env is None: build_env = { 'job': { 'service_id': None, 'context': None, 'ttl': JOB_DEFAULT_TTL }, 'params': {}, 'launch': 0, 'duration': 0, 'progress': (0, '', 0, '') } self._warning(_('Recovery of a lost thread.')) if callback_id in self._callbacks: build_env['progress'] = ( 100, translate(_('Finalization'), build_env['params'].get('lang')), 100, '') self._build_env_save(build_id, build_env) self._callbacks[callback_id](self._registry, build_env, result) build_env['progress'] = (0, '', 0, '') self._build_env_set_status(build_env, 'completed', result) self._build_env_save(build_id, build_env) self._schedule()
# -------------------------------------------------------------------------
[docs] @classmethod def clean_logs(cls, dbsession): """Clean up old logs. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: SQLAlchemy session. """ now = time() for dbjob in dbsession.query(DBJob): dbsession.query(DBJobLog).filter_by(job_id=dbjob.job_id).filter( DBJobLog.timestamp + dbjob.ttl_log < now).delete()
# -------------------------------------------------------------------------
[docs] def set_launch_time(self, build_id: str, build_env: dict): """Set the launch time and reset the duration. :param str build_id: Build ID. :param dict build_env: ID of the completed build. """ build_env['launch'] = time() build_env['duration'] = 0 self._build_env_save(build_id, build_env)
# -------------------------------------------------------------------------
[docs] def attachments_path(self) -> str | None: """Return a absolute path to the attachments directory or ``None``. :rtype: class:`str` or ``None`` """ return self._registry.settings.get('attachments') \ if self._registry is not None else None
# -------------------------------------------------------------------------
[docs] def dbsession_factory(self): """Retrieve a DB session maker function. :rtype: :func:`sqlalchemy.orm.session.sessionmaker` or ``None`` """ return self._registry.get('dbsession_factory') \ if self._registry is not None else None
# ------------------------------------------------------------------------- def _schedule(self, spool: list | None = None): """Manage build launches. :param list spool: (optional) List of IDs of build waiting for an execution. """ running: dict[str, ServiceThread] = { k.name[6:]: k # type: ignore for k in thread_enumerate() if k.name[:6] == 'build:' and not k.completed and k.is_alive() # type: ignore } # Stop too long builds now = time() - DEADLINE_DELTA for build_id in tuple(running): if running[build_id].deadline < now: running[build_id].stop() del running[build_id] # Launch new builds if spool is None: spool = self._spool_load() while spool and len(running) < self.concurrency: build_id = spool.pop(0) build_env = self.build_env(build_id) if build_env is None or build_env['status'] != 'in_spool': continue service = self._registry['services'].get( # type: ignore build_env['job']['service_id']) if service is None: # pragma: nocover self._error( _( 'Unknown service: ${s}', {'s': build_env['job']['service_id']})) self._build_env_set_status(build_env, 'pending') self._build_env_save(build_id, build_env) continue thread = ServiceThread( service, build_id, build_env['params'], self.completed) thread.deadline = time() + build_env['job']['ttl'] try: thread.start() except RuntimeError as error: # pragma: nocover self._error(str(error)) continue running[build_id] = thread self._build_env_set_status(build_env, 'running') self._build_env_save(build_id, build_env) # Save spool self._spool_save(spool) # ------------------------------------------------------------------------- @classmethod def _build_env_set_status( cls, build_env: dict, status: str, result: dict | None = None): """Update the build environment according to the status. :param dict build_env: ID of the completed build. :param str status: New status. :param dict result: (optional) Result of the processing. See: :class:`.lib.build.Build`. """ build_env['status'] = status if status in ('in_spool', 'running'): build_env['launch'] = time() build_env['duration'] = 0 if status == 'completed': if build_env['launch'] and not build_env['duration']: build_env['duration'] = int(time() - build_env['launch']) build_env['progress'] = (0, '', 0, '') if result is not None: build_env['result'] = result elif 'result' in build_env and status not in ('stopping', 'completed'): del build_env['result'] # ------------------------------------------------------------------------- def _build_env_save(self, build_id: str, build_env: dict): """Save build environment into the cache. :param str build_id: Build ID. :param dict build_env: Build environment. """ self._registry['cache_global'].set( # type: ignore build_id, build_env, CACHE_NS_BUILDS, expire=ENVIRONMENT_TTL) # ------------------------------------------------------------------------- def _spool_load(self) -> list: """Load spool. :rtype: list """ return self._registry['cache_global'].get( # type: ignore 'spool', CACHE_NS_SPOOL, expire=ENVIRONMENT_TTL) or [] # ------------------------------------------------------------------------- def _spool_save(self, spool: list): """Save spool. :param list spool: List of IDs of build waiting for an execution. """ self._registry['cache_global'].set( # type: ignore 'spool', spool, CACHE_NS_SPOOL, expire=ENVIRONMENT_TTL) # ------------------------------------------------------------------------- @classmethod def _thread(cls, build_id: str): """Return the thread corresponding to the build ``build_id``. :param str build_id: ID of the completed build. :rtype: :class:`threading.thread` or ``None`` """ for thread in thread_enumerate(): if thread.name[:6] == 'build:' and thread.name[6:] == build_id: return thread return None # ------------------------------------------------------------------------- def _warning(self, text: str): """Log a warning message. :param str text: Error. """ if self._registry is not None \ and self._registry.get('log_activity') is not None: self._registry['log_activity'].warning(text) # ------------------------------------------------------------------------- def _error(self, text: str): """Log an error message. :param str text: Error. """ if self._registry is not None \ and self._registry.get('log_activity') is not None: self._registry['log_activity'].error(text)
# =============================================================================
[docs]class ServiceThread(Thread): """A thread to execute a service. :type service: .lib.service.Service :param service: Service to execute. :param str build_id: ID of the build. :param dict params: Dictionary defining the parameters of the build. :param callback: Call back function to call at the end of the processing. See :meth:`.lib.service.Service.run` for details on ``params``. The attribute ``result`` contains the result of the processing. """ # ------------------------------------------------------------------------- def __init__(self, service, build_id, params, callback): """Constructor method.""" super(ServiceThread, self).__init__(name='build:{0}'.format(build_id)) self.deadline = 0 self.stopping = Event() self.completed = False self._service = service self._params = params self._callback = callback self._build = None # -------------------------------------------------------------------------
[docs] def run(self): """Prepare the service for execution and launch it.""" self.completed = False self._build = self._service.make_build( self.name[6:], self._params, self.stopping) result = self._service.run(self._build) self.completed = True self._callback(self._build.uid, self._build.callback, result)
# -------------------------------------------------------------------------
[docs] def stop(self): # pragma: nocover """Stop the execution.""" self.stopping.set()
# -------------------------------------------------------------------------
[docs] def join(self, timeout: float | None = None): """Wait until the thread terminates. :param float timeout: (optional) A timeout for stopping in seconds. """ self.stopping.set() super(ServiceThread, self).join(timeout)