Source code for cioservice.lib.build

"""Build management."""

from os import makedirs, remove
from os.path import exists, join, getmtime, basename, relpath
from time import time
from collections import OrderedDict

from chrysalio.lib.utils import deltatime_label
from .i18n import _, translate

TTL = 300


# =============================================================================
[docs]class Build(object): """This class manages a build. :param str lock_dir: Absolute path to a directory which stores the lock files. :type manager: .lib.build_manager :param manager: Manager for this build. :param str build_id: Build ID. :param dict params: Dictionary defining the parameters of the build. :type stopping: threading.Event :param stopping: Flag to stop the processing. A build has the following keys attributes: * ``'job_id'``: job ID * ``'uid'``: build ID * ``'context'``: Name of the context (processor ID...) * ``'lang'``: prefered language for messages * ``'ttl'``: Time To Live for an execution * ``'deadline'``: deadline for the run * ``'settings'``: settings for this build * ``'values'``: values for the variables * ``'files'``: list of files to process * ``'resources'``: list of resources * ``'locations'``: dictionary of locations of templates or warehouses * ``'caller'``: a dictionary with login, name and email of the caller * ``'callback'``: ID of the callback function * ``'dbsession'``: possible current SQLAlchemy session * ``'output'``: absolute path to the possible output directory * ``'stopping'``: :class:`threading.Event` as a Flag to stop the processing * ``'current'``: a dictionary describing the file under processing * ``'result'``: dictionary containing the result The ``build.current`` dictionary have the following keys: * ``'input_file'``: absolute path to the original input file * ``'fup'``: absolute path to the File Under Processing The ``build.result`` dictionary has, at least, following keys: * ``'no_execution'``: ``True`` if the execution is impossible * ``'infos'``: information messages * ``'warnings'``: warning messages * ``'errors'``: errors messages * ``'traces'``: log * ``'files'``: list of absolute paths to successful files * ``'output'``: if success, an absolute or relative path to the result directory It may have: * ``'output_url'``: a possibly URL to the result directory * ``'to_refresh'``: a list of relative path to refresh """ # pylint: disable = too-many-instance-attributes # ------------------------------------------------------------------------- def __init__(self, lock_dir, manager, build_id, params, stopping): """Constructor method.""" self._lock_dir = lock_dir self._manager = manager self._progress = [0, 1, 0, 1] self.uid = build_id self.job_id = params.get('job_id', '') self.context = params.get('context') self.lang = params.get('lang') self.ttl = int(params.get('ttl', TTL)) self.deadline = time() + self.ttl self.settings = params.get('settings', {}) self.values = params.get('values', {}) self.files = list(OrderedDict.fromkeys(params.get('files', ()))) \ if isinstance(params.get('files'), (list, tuple)) \ else params.get('files', ()) self.resources = tuple(OrderedDict.fromkeys( params.get('resources', ()))) self.locations = params.get('locations', {}) self.caller = params.get('caller', {}) self.callback = params.get('callback') self.dbsession = params.get('dbsession') self.output = params.get('output') self.stopping = stopping self.current = {'input_file': None, 'fup': None} self.result = {} # ------------------------------------------------------------------------- def __repr__(self): """Return a string containing a printable representation of a build.""" return '<Build {job_id}-{build_id}, values={values}>'.format( job_id=self.job_id, build_id=self.uid, values=self.values) # -------------------------------------------------------------------------
[docs] def lock(self): """Lock the build. :rtype: bool """ lock_file = join(self._lock_dir, self.uid) if exists(lock_file) and getmtime(lock_file) + self.ttl > time(): return False if not exists(self._lock_dir): makedirs(self._lock_dir) with open(lock_file, 'w', encoding='utf8'): pass self.deadline = time() + self.ttl return True
# -------------------------------------------------------------------------
[docs] def unlock(self): """Unlock the build.""" lock_file = join(self._lock_dir, self.uid) if exists(lock_file): remove(lock_file)
# -------------------------------------------------------------------------
[docs] def relock(self): """Refresh the lock of the build.""" lock_file = join(self._lock_dir, self.uid) with open(lock_file, 'w', encoding='utf8'): pass self.deadline = time() + self.ttl
# -------------------------------------------------------------------------
[docs] def aborted(self): """Check if the process has been aborted. :rtype: bool """ return time() > self.deadline or ( self.stopping is not None and self.stopping.is_set())
# -------------------------------------------------------------------------
[docs] def output_info(self, output=None): """Add 'output' key in the build result and add an information with location of the result, possibly as a link. :param str output: (optional) Forced ouput. """ if output is None: output = self.output if not output: return # Output path in result self.result['output'] = \ output if not self.settings.get('output.home.path') \ else relpath(output, self.settings['output.home.path']) # Output URL in result if self.settings.get('output.home.path') \ and self.settings.get('output.home.url'): self.result['output_url'] = \ '`{root}/{path} <{url}{path}>`_'.format( root=basename(self.settings['output.home.path']), path=self.result['output'] if self.result['output'] != '.' else '', url=self.settings['output.home.url']) # Message if self._manager is not None: build_env = self._manager.build_env(self.uid) duration = int(time() - build_env['launch']) \ if build_env and build_env.get('launch') else 0 else: duration = 0 self.info(translate( _('Result in "${o}" in ${d}.', { 'o': self.result.get('output_url', self.result['output']), 'd': deltatime_label(duration, lang=self.lang)}) if duration else _('Result in "${o}".', { 'o': self.result.get('output_url', self.result['output'])}), self.lang))
# -------------------------------------------------------------------------
[docs] def output_unrefreshed(self): """Try to remove the record of last refresh for the current output.""" if not self.output or not self.settings.get('output.refreshed') \ or not exists(self.settings['output.refreshed']): return try: remove(self.settings['output.refreshed']) except OSError: # pragma: nocover pass
# -------------------------------------------------------------------------
[docs] def progress(self): """Return the progress of the build . :rtype: tuple :return: A tuple such as ``(file_percent, file_name, step_percent, step_trace)``. """ return ( int(round(self._progress[0] * 100 / self._progress[1], 0)), basename(self.current.get('fup') or self.current.get('input_file') or ''), int(round(self._progress[2] * 100 / self._progress[3], 0)), translate(self.result['traces'][-1][1] if 'traces' in self.result else '', self.lang))
# -------------------------------------------------------------------------
[docs] def progress_file(self, current=None, total=None, increase=None): """Set the file progress.""" if current is not None: self._progress[0] = current if total: self._progress[1] = total if increase: self._progress[0] += increase
# -------------------------------------------------------------------------
[docs] def progress_step(self, current=None, total=None, increase=None): """Set the step progress.""" if current is not None: self._progress[2] = current if total: self._progress[3] = total if increase: self._progress[2] += increase
# -------------------------------------------------------------------------
[docs] def progress_save(self): """Save the current progress in the build environment.""" if self._manager is not None: self._manager.set_progress(self.uid, self.progress())
# -------------------------------------------------------------------------
[docs] def dbsession_factory(self): """Retrieve a DB session maker function if the current ``self.dbsession`` is ``None``. :rtype: :func:`sqlalchemy.orm.session.sessionmaker` or ``None`` """ return self._manager.dbsession_factory() \ if self._manager is not None and self.dbsession is None else None
# -------------------------------------------------------------------------
[docs] def attachments_path(self): """Return a absolute path to the attachments directory or ``None``. :rtype: class:`str` or ``None`` """ return self._manager.attachments_path() \ if self._manager is not None else None
# -------------------------------------------------------------------------
[docs] def aborted_message(self): """Check if an aborted message is needed.""" if time() > self.deadline: self.error(translate(_('Timeout!'), self.lang)) elif self.stopping is not None and self.stopping.isSet(): self.error(translate(_('Stopped by the user!'), self.lang))
# -------------------------------------------------------------------------
[docs] def clear_messages(self): """Clear information, warning and error messages.""" for message_type in ('infos', 'warnings', 'errors'): if message_type in self.result: del self.result[message_type]
# -------------------------------------------------------------------------
[docs] def info(self, text): """Add an information message in the result. :param str text: Information. """ if 'infos' not in self.result: self.result['infos'] = [] self.result['infos'].append(text) self.trace(text, 'I')
# -------------------------------------------------------------------------
[docs] def warning(self, text): """Add a warning message in the result. :param str text: Warning. """ if 'warnings' not in self.result: self.result['warnings'] = [] self.result['warnings'].append(text) self.trace(text, 'W')
# -------------------------------------------------------------------------
[docs] def error(self, text): """Add an error message in the result. :param str text: Error. """ if 'errors' not in self.result: self.result['errors'] = [] self.result['errors'].append(text) self.trace(text, 'E')
# -------------------------------------------------------------------------
[docs] def trace(self, text, status='T'): """Add a trace message in the result. :param str text: Trace. :param str status: (default='T') Type of trace: 'T'=normal, 'E'=error, 'W'=warning, 'I'=info. Each trace is a tuple such as ``(status, text)``. """ if 'traces' not in self.result: self.result['traces'] = [] self.result['traces'].append((status, text))
# -------------------------------------------------------------------------
[docs] def clear_traces(self): """Clear traces.""" if 'traces' in self.result: del self.result['traces']