"""Management of parallelized builds."""
from __future__ import annotations
from time import time
from threading import Thread, Event, enumerate as thread_enumerate
from pyramid.registry import Registry
from ..models.dbjob import JOB_DEFAULT_TTL, DBJob, DBJobLog
from .i18n import _, translate
CACHE_NS_BUILDS = 'ciosrv-builds'
CACHE_NS_SPOOL = 'ciosrv-spool'
DEFAULT_CONCURRENCY = 3
TIMEOUT_STOP = 10
DEADLINE_DELTA = 30
ENVIRONMENT_TTL = 172800 # 2 days
# =============================================================================
[docs]class BuildManager():
"""This class manages the launch of parallelized builds.
:param int concurrency: (default=DEFAULT_CONCURRENCY)
Maximum number of builds running at the same time.
:param registry:
Application registry or registry like.
The build environment is a dictionary with the following keys:
* ``'job'``: job dictionary
* ``'params'``: dictionary of parameters for the build
* ``'status'``: current status (``'pending'``, ``'in_spool'``,
``'running'``, ``'stopping'`` or ``'completed'``)
* ``'launch'``: launch time
* ``'duration'``: duration of the processing
* ``'result'``: (optional) result of the processing
"""
# -------------------------------------------------------------------------
def __init__(
self,
concurrency: int = DEFAULT_CONCURRENCY,
registry: Registry | None = None):
"""Constructor method."""
self.concurrency: int = concurrency
self._registry: Registry | None = registry
self._callbacks: dict = {}
# -------------------------------------------------------------------------
[docs] def set_registry(self, registry: Registry):
"""Set the internal registry.
:param registry:
Application registry or registry like.
"""
self._registry = registry
# -------------------------------------------------------------------------
[docs] def add_callback(self, callback_id: str, callback_function):
"""Add a callback function.
:param str callback_id:
Callback ID.
:param str callback_function:
Callback fucntion.
"""
self._callbacks[callback_id] = callback_function
# -------------------------------------------------------------------------
[docs] def remove_callback(self, callback_id: str):
"""Remove a callback function.
:param str callback_id:
Callback ID.
:param str callback_function:
Callback fucntion.
"""
if callback_id in self._callbacks:
del self._callbacks[callback_id]
# -------------------------------------------------------------------------
[docs] def build_env(
self, build_id: str, check_stopping: bool = False) -> dict | None:
"""Return the build environment for build `build_id`.
:param str build_id:
Build ID.
:param bool check_stopping: (default=False)
if ``True``, check if the build is stopping correctly.
:rtype: :class:`dict` or ``None``
"""
if self._registry is None or 'cache_global' not in self._registry:
return None
environment = self._registry['cache_global'].get(
build_id, CACHE_NS_BUILDS, expire=ENVIRONMENT_TTL)
if not check_stopping or not environment:
return environment
thread = self._thread(build_id)
if thread is None:
if environment['launch'] and \
environment['launch'] + environment['job']['ttl'] \
< time() - 2 * DEADLINE_DELTA:
self._build_env_set_status(environment, 'completed')
self._build_env_save(build_id, environment)
return environment
if not thread.is_alive():
if environment['status'] in ('running', 'stopping'):
self._build_env_set_status(environment, 'completed')
self._build_env_save(build_id, environment)
return environment
if environment['status'] == 'stopping' \
and not thread.stopping.is_set():
thread.stop()
return environment
if environment['status'] in ('running', 'stopping'):
if thread.deadline < time() - 2 * DEADLINE_DELTA:
thread.stop()
self._build_env_set_status(environment, 'completed')
self._build_env_save(build_id, environment)
elif thread.deadline < time() - DEADLINE_DELTA:
thread.stop()
self._build_env_set_status(environment, 'stopping')
self._build_env_save(build_id, environment)
return environment
# -------------------------------------------------------------------------
[docs] def register(self, build_id: str, job: dict, params: dict) -> str | None:
"""Register a new build.
:param str build_id:
Build ID.
:param dict job:
Job to launch.
:param dict params:
Parameters of the build.
:return:
Error message or ``None``.
"""
if self._registry is None or 'cache_global' not in self._registry:
return _('Build manager is not activated.')
build_env = self.build_env(build_id, True)
if build_env is not None \
and build_env['status'] in ('running', 'stopping'):
self._schedule()
return None
build_env = {
'job': job,
'params': params,
'launch': 0,
'duration': 0,
'progress': (0, '', 0, '')
}
self._build_env_set_status(build_env, 'pending')
self._build_env_save(build_id, build_env)
self._schedule()
return None
# -------------------------------------------------------------------------
[docs] def run(self, build_id: str, build_env: dict | None = None) -> str | None:
"""Add a build to the spool and try to launch it.
:param str build_id:
Build ID.
:param dict build_env: (optional)
The environment of the current build.
:return:
Error message or ``None``.
"""
if self._registry is None or 'cache_global' not in self._registry:
return _('Build manager is not activated.')
if build_env is None:
build_env = self.build_env(build_id, True)
if build_env is None:
return _('This build does not exist.')
if build_env['status'] == 'in_spool':
return _('Pending build.')
if build_env['status'] in ('running', 'stopping'):
return _('Build in progress.')
self._build_env_set_status(build_env, 'in_spool')
self._build_env_save(build_id, build_env)
spool = self._spool_load()
spool.append(build_id)
self._schedule(spool)
return None
# -------------------------------------------------------------------------
[docs] def set_progress(self, build_id: str, progress: list):
"""Save the current progress value in the build environment.
:param str build_id:
Build ID.
:param list progress:
A tuple such as ``[file_percent, file_name, step_percent,
step_trace]``.
"""
build_env = self.build_env(build_id)
if build_env is not None:
build_env['progress'] = progress
self._build_env_save(build_id, build_env)
# -------------------------------------------------------------------------
[docs] def stop(
self, build_id: str, build_env: dict | None = None) -> dict | None:
"""Stop a build and return its environment.
:param str build_id:
ID of the build to stop.
:param dict build_env: (optional)
The environment of the current build.
:rtype: :class:`dict` or ``None``
"""
if build_env is None:
build_env = self.build_env(build_id)
# Unknown
if build_env is None or \
self._registry is None or 'cache_global' not in self._registry:
return None
if build_env['status'] == 'in_spool':
self._build_env_set_status(build_env, 'pending')
self._build_env_save(build_id, build_env)
elif build_env['status'] == 'running':
self._build_env_set_status(build_env, 'stopping')
self._build_env_save(build_id, build_env)
thread = self._thread(build_id)
if thread is not None:
thread.join(TIMEOUT_STOP)
elif build_env['status'] == 'stopping' and build_env['launch'] and \
build_env['launch'] + build_env['job']['ttl'] \
< time() - 2 * DEADLINE_DELTA:
self._build_env_set_status(build_env, 'completed')
self._build_env_save(build_id, build_env)
# Launch other builds
self._schedule()
return build_env
# -------------------------------------------------------------------------
[docs] def completed(self, build_id: str, callback_id: str, result: dict):
"""Call back for a completed build.
:param str build_id:
ID of the completed build.
:param str callback_id:
ID of the callback function.
:param dict result:
Result of the processing. See: :class:`.lib.build.Build`.
"""
if self._registry is None or 'cache_global' not in self._registry:
return
build_env = self.build_env(build_id)
if build_env is None:
build_env = {
'job': {
'service_id': None,
'context': None,
'ttl': JOB_DEFAULT_TTL
},
'params': {},
'launch': 0,
'duration': 0,
'progress': (0, '', 0, '')
}
self._warning(_('Recovery of a lost thread.'))
if callback_id in self._callbacks:
build_env['progress'] = (
100,
translate(_('Finalization'),
build_env['params'].get('lang')), 100, '')
self._build_env_save(build_id, build_env)
self._callbacks[callback_id](self._registry, build_env, result)
build_env['progress'] = (0, '', 0, '')
self._build_env_set_status(build_env, 'completed', result)
self._build_env_save(build_id, build_env)
self._schedule()
# -------------------------------------------------------------------------
[docs] @classmethod
def clean_logs(cls, dbsession):
"""Clean up old logs.
:type dbsession: sqlalchemy.orm.session.Session
:param dbsession:
SQLAlchemy session.
"""
now = time()
for dbjob in dbsession.query(DBJob):
dbsession.query(DBJobLog).filter_by(job_id=dbjob.job_id).filter(
DBJobLog.timestamp + dbjob.ttl_log < now).delete()
# -------------------------------------------------------------------------
[docs] def set_launch_time(self, build_id: str, build_env: dict):
"""Set the launch time and reset the duration.
:param str build_id:
Build ID.
:param dict build_env:
ID of the completed build.
"""
build_env['launch'] = time()
build_env['duration'] = 0
self._build_env_save(build_id, build_env)
# -------------------------------------------------------------------------
[docs] def attachments_path(self) -> str | None:
"""Return a absolute path to the attachments directory or ``None``.
:rtype: class:`str` or ``None``
"""
return self._registry.settings.get('attachments') \
if self._registry is not None else None
# -------------------------------------------------------------------------
[docs] def dbsession_factory(self):
"""Retrieve a DB session maker function.
:rtype: :func:`sqlalchemy.orm.session.sessionmaker` or ``None``
"""
return self._registry.get('dbsession_factory') \
if self._registry is not None else None
# -------------------------------------------------------------------------
def _schedule(self, spool: list | None = None):
"""Manage build launches.
:param list spool: (optional)
List of IDs of build waiting for an execution.
"""
running: dict[str, ServiceThread] = {
k.name[6:]: k # type: ignore
for k in thread_enumerate()
if k.name[:6] == 'build:'
and not k.completed and k.is_alive() # type: ignore
}
# Stop too long builds
now = time() - DEADLINE_DELTA
for build_id in tuple(running):
if running[build_id].deadline < now:
running[build_id].stop()
del running[build_id]
# Launch new builds
if spool is None:
spool = self._spool_load()
while spool and len(running) < self.concurrency:
build_id = spool.pop(0)
build_env = self.build_env(build_id)
if build_env is None or build_env['status'] != 'in_spool':
continue
service = self._registry['services'].get( # type: ignore
build_env['job']['service_id'])
if service is None: # pragma: nocover
self._error(
_(
'Unknown service: ${s}',
{'s': build_env['job']['service_id']}))
self._build_env_set_status(build_env, 'pending')
self._build_env_save(build_id, build_env)
continue
thread = ServiceThread(
service, build_id, build_env['params'], self.completed)
thread.deadline = time() + build_env['job']['ttl']
try:
thread.start()
except RuntimeError as error: # pragma: nocover
self._error(str(error))
continue
running[build_id] = thread
self._build_env_set_status(build_env, 'running')
self._build_env_save(build_id, build_env)
# Save spool
self._spool_save(spool)
# -------------------------------------------------------------------------
@classmethod
def _build_env_set_status(
cls, build_env: dict, status: str, result: dict | None = None):
"""Update the build environment according to the status.
:param dict build_env:
ID of the completed build.
:param str status:
New status.
:param dict result: (optional)
Result of the processing. See: :class:`.lib.build.Build`.
"""
build_env['status'] = status
if status in ('in_spool', 'running'):
build_env['launch'] = time()
build_env['duration'] = 0
if status == 'completed':
if build_env['launch'] and not build_env['duration']:
build_env['duration'] = int(time() - build_env['launch'])
build_env['progress'] = (0, '', 0, '')
if result is not None:
build_env['result'] = result
elif 'result' in build_env and status not in ('stopping', 'completed'):
del build_env['result']
# -------------------------------------------------------------------------
def _build_env_save(self, build_id: str, build_env: dict):
"""Save build environment into the cache.
:param str build_id:
Build ID.
:param dict build_env:
Build environment.
"""
self._registry['cache_global'].set( # type: ignore
build_id, build_env, CACHE_NS_BUILDS, expire=ENVIRONMENT_TTL)
# -------------------------------------------------------------------------
def _spool_load(self) -> list:
"""Load spool.
:rtype: list
"""
return self._registry['cache_global'].get( # type: ignore
'spool', CACHE_NS_SPOOL, expire=ENVIRONMENT_TTL) or []
# -------------------------------------------------------------------------
def _spool_save(self, spool: list):
"""Save spool.
:param list spool:
List of IDs of build waiting for an execution.
"""
self._registry['cache_global'].set( # type: ignore
'spool', spool, CACHE_NS_SPOOL, expire=ENVIRONMENT_TTL)
# -------------------------------------------------------------------------
@classmethod
def _thread(cls, build_id: str):
"""Return the thread corresponding to the build ``build_id``.
:param str build_id:
ID of the completed build.
:rtype: :class:`threading.thread` or ``None``
"""
for thread in thread_enumerate():
if thread.name[:6] == 'build:' and thread.name[6:] == build_id:
return thread
return None
# -------------------------------------------------------------------------
def _warning(self, text: str):
"""Log a warning message.
:param str text:
Error.
"""
if self._registry is not None \
and self._registry.get('log_activity') is not None:
self._registry['log_activity'].warning(text)
# -------------------------------------------------------------------------
def _error(self, text: str):
"""Log an error message.
:param str text:
Error.
"""
if self._registry is not None \
and self._registry.get('log_activity') is not None:
self._registry['log_activity'].error(text)
# =============================================================================
[docs]class ServiceThread(Thread):
"""A thread to execute a service.
:type service: .lib.service.Service
:param service:
Service to execute.
:param str build_id:
ID of the build.
:param dict params:
Dictionary defining the parameters of the build.
:param callback:
Call back function to call at the end of the processing.
See :meth:`.lib.service.Service.run` for details on ``params``.
The attribute ``result`` contains the result of the processing.
"""
# -------------------------------------------------------------------------
def __init__(self, service, build_id, params, callback):
"""Constructor method."""
super(ServiceThread, self).__init__(name='build:{0}'.format(build_id))
self.deadline = 0
self.stopping = Event()
self.completed = False
self._service = service
self._params = params
self._callback = callback
self._build = None
# -------------------------------------------------------------------------
[docs] def run(self):
"""Prepare the service for execution and launch it."""
self.completed = False
self._build = self._service.make_build(
self.name[6:], self._params, self.stopping)
result = self._service.run(self._build)
self.completed = True
self._callback(self._build.uid, self._build.callback, result)
# -------------------------------------------------------------------------
[docs] def stop(self): # pragma: nocover
"""Stop the execution."""
self.stopping.set()
# -------------------------------------------------------------------------
[docs] def join(self, timeout: float | None = None):
"""Wait until the thread terminates.
:param float timeout: (optional)
A timeout for stopping in seconds.
"""
self.stopping.set()
super(ServiceThread, self).join(timeout)