"""Build management."""
from os import makedirs, remove
from os.path import exists, join, getmtime, basename, relpath
from time import time
from collections import OrderedDict
from chrysalio.lib.utils import deltatime_label
from .i18n import _, translate
TTL = 300
# =============================================================================
[docs]class Build(object):
"""This class manages a build.
:param str lock_dir:
Absolute path to a directory which stores the lock files.
:type manager: .lib.build_manager
:param manager:
Manager for this build.
:param str build_id:
Build ID.
:param dict params:
Dictionary defining the parameters of the build.
:type stopping: threading.Event
:param stopping:
Flag to stop the processing.
A build has the following keys attributes:
* ``'job_id'``: job ID
* ``'uid'``: build ID
* ``'context'``: Name of the context (processor ID...)
* ``'lang'``: prefered language for messages
* ``'ttl'``: Time To Live for an execution
* ``'deadline'``: deadline for the run
* ``'settings'``: settings for this build
* ``'values'``: values for the variables
* ``'files'``: list of files to process
* ``'resources'``: list of resources
* ``'locations'``: dictionary of locations of templates or warehouses
* ``'caller'``: a dictionary with login, name and email of the caller
* ``'callback'``: ID of the callback function
* ``'dbsession'``: possible current SQLAlchemy session
* ``'output'``: absolute path to the possible output directory
* ``'stopping'``: :class:`threading.Event` as a Flag to stop the processing
* ``'current'``: a dictionary describing the file under processing
* ``'result'``: dictionary containing the result
The ``build.current`` dictionary have the following keys:
* ``'input_file'``: absolute path to the original input file
* ``'fup'``: absolute path to the File Under Processing
The ``build.result`` dictionary has, at least, following keys:
* ``'no_execution'``: ``True`` if the execution is impossible
* ``'infos'``: information messages
* ``'warnings'``: warning messages
* ``'errors'``: errors messages
* ``'traces'``: log
* ``'files'``: list of absolute paths to successful files
* ``'output'``: if success, an absolute or relative path to the result
directory
It may have:
* ``'output_url'``: a possibly URL to the result directory
* ``'to_refresh'``: a list of relative path to refresh
"""
# pylint: disable = too-many-instance-attributes
# -------------------------------------------------------------------------
def __init__(self, lock_dir, manager, build_id, params, stopping):
"""Constructor method."""
self._lock_dir = lock_dir
self._manager = manager
self._progress = [0, 1, 0, 1]
self.uid = build_id
self.job_id = params.get('job_id', '')
self.context = params.get('context')
self.lang = params.get('lang')
self.ttl = int(params.get('ttl', TTL))
self.deadline = time() + self.ttl
self.settings = params.get('settings', {})
self.values = params.get('values', {})
self.files = list(OrderedDict.fromkeys(params.get('files', ()))) \
if isinstance(params.get('files'), (list, tuple)) \
else params.get('files', ())
self.resources = tuple(OrderedDict.fromkeys(
params.get('resources', ())))
self.locations = params.get('locations', {})
self.caller = params.get('caller', {})
self.callback = params.get('callback')
self.dbsession = params.get('dbsession')
self.output = params.get('output')
self.stopping = stopping
self.current = {'input_file': None, 'fup': None}
self.result = {}
# -------------------------------------------------------------------------
def __repr__(self):
"""Return a string containing a printable representation of a build."""
return '<Build {job_id}-{build_id}, values={values}>'.format(
job_id=self.job_id, build_id=self.uid, values=self.values)
# -------------------------------------------------------------------------
[docs] def lock(self):
"""Lock the build.
:rtype: bool
"""
lock_file = join(self._lock_dir, self.uid)
if exists(lock_file) and getmtime(lock_file) + self.ttl > time():
return False
if not exists(self._lock_dir):
makedirs(self._lock_dir)
with open(lock_file, 'w', encoding='utf8'):
pass
self.deadline = time() + self.ttl
return True
# -------------------------------------------------------------------------
[docs] def unlock(self):
"""Unlock the build."""
lock_file = join(self._lock_dir, self.uid)
if exists(lock_file):
remove(lock_file)
# -------------------------------------------------------------------------
[docs] def relock(self):
"""Refresh the lock of the build."""
lock_file = join(self._lock_dir, self.uid)
with open(lock_file, 'w', encoding='utf8'):
pass
self.deadline = time() + self.ttl
# -------------------------------------------------------------------------
[docs] def aborted(self):
"""Check if the process has been aborted.
:rtype: bool
"""
return time() > self.deadline or (
self.stopping is not None and self.stopping.is_set())
# -------------------------------------------------------------------------
[docs] def output_info(self, output=None):
"""Add 'output' key in the build result and add an information with
location of the result, possibly as a link.
:param str output: (optional)
Forced ouput.
"""
if output is None:
output = self.output
if not output:
return
# Output path in result
self.result['output'] = \
output if not self.settings.get('output.home.path') \
else relpath(output, self.settings['output.home.path'])
# Output URL in result
if self.settings.get('output.home.path') \
and self.settings.get('output.home.url'):
self.result['output_url'] = \
'`{root}/{path} <{url}{path}>`_'.format(
root=basename(self.settings['output.home.path']),
path=self.result['output']
if self.result['output'] != '.' else '',
url=self.settings['output.home.url'])
# Message
if self._manager is not None:
build_env = self._manager.build_env(self.uid)
duration = int(time() - build_env['launch']) \
if build_env and build_env.get('launch') else 0
else:
duration = 0
self.info(translate(
_('Result in "${o}" in ${d}.', {
'o': self.result.get('output_url', self.result['output']),
'd': deltatime_label(duration, lang=self.lang)})
if duration else
_('Result in "${o}".', {
'o': self.result.get('output_url', self.result['output'])}),
self.lang))
# -------------------------------------------------------------------------
[docs] def output_unrefreshed(self):
"""Try to remove the record of last refresh for the current output."""
if not self.output or not self.settings.get('output.refreshed') \
or not exists(self.settings['output.refreshed']):
return
try:
remove(self.settings['output.refreshed'])
except OSError: # pragma: nocover
pass
# -------------------------------------------------------------------------
[docs] def progress(self):
"""Return the progress of the build .
:rtype: tuple
:return:
A tuple such as ``(file_percent, file_name, step_percent,
step_trace)``.
"""
return (
int(round(self._progress[0] * 100 / self._progress[1], 0)),
basename(self.current.get('fup') or
self.current.get('input_file') or ''),
int(round(self._progress[2] * 100 / self._progress[3], 0)),
translate(self.result['traces'][-1][1]
if 'traces' in self.result else '', self.lang))
# -------------------------------------------------------------------------
[docs] def progress_file(self, current=None, total=None, increase=None):
"""Set the file progress."""
if current is not None:
self._progress[0] = current
if total:
self._progress[1] = total
if increase:
self._progress[0] += increase
# -------------------------------------------------------------------------
[docs] def progress_step(self, current=None, total=None, increase=None):
"""Set the step progress."""
if current is not None:
self._progress[2] = current
if total:
self._progress[3] = total
if increase:
self._progress[2] += increase
# -------------------------------------------------------------------------
[docs] def progress_save(self):
"""Save the current progress in the build environment."""
if self._manager is not None:
self._manager.set_progress(self.uid, self.progress())
# -------------------------------------------------------------------------
[docs] def dbsession_factory(self):
"""Retrieve a DB session maker function if the current
``self.dbsession`` is ``None``.
:rtype: :func:`sqlalchemy.orm.session.sessionmaker` or ``None``
"""
return self._manager.dbsession_factory() \
if self._manager is not None and self.dbsession is None else None
# -------------------------------------------------------------------------
[docs] def attachments_path(self):
"""Return a absolute path to the attachments directory or ``None``.
:rtype: class:`str` or ``None``
"""
return self._manager.attachments_path() \
if self._manager is not None else None
# -------------------------------------------------------------------------
[docs] def aborted_message(self):
"""Check if an aborted message is needed."""
if time() > self.deadline:
self.error(translate(_('Timeout!'), self.lang))
elif self.stopping is not None and self.stopping.isSet():
self.error(translate(_('Stopped by the user!'), self.lang))
# -------------------------------------------------------------------------
[docs] def clear_messages(self):
"""Clear information, warning and error messages."""
for message_type in ('infos', 'warnings', 'errors'):
if message_type in self.result:
del self.result[message_type]
# -------------------------------------------------------------------------
[docs] def info(self, text):
"""Add an information message in the result.
:param str text:
Information.
"""
if 'infos' not in self.result:
self.result['infos'] = []
self.result['infos'].append(text)
self.trace(text, 'I')
# -------------------------------------------------------------------------
[docs] def warning(self, text):
"""Add a warning message in the result.
:param str text:
Warning.
"""
if 'warnings' not in self.result:
self.result['warnings'] = []
self.result['warnings'].append(text)
self.trace(text, 'W')
# -------------------------------------------------------------------------
[docs] def error(self, text):
"""Add an error message in the result.
:param str text:
Error.
"""
if 'errors' not in self.result:
self.result['errors'] = []
self.result['errors'].append(text)
self.trace(text, 'E')
# -------------------------------------------------------------------------
[docs] def trace(self, text, status='T'):
"""Add a trace message in the result.
:param str text:
Trace.
:param str status: (default='T')
Type of trace: 'T'=normal, 'E'=error, 'W'=warning, 'I'=info.
Each trace is a tuple such as ``(status, text)``.
"""
if 'traces' not in self.result:
self.result['traces'] = []
self.result['traces'].append((status, text))
# -------------------------------------------------------------------------
[docs] def clear_traces(self):
"""Clear traces."""
if 'traces' in self.result:
del self.result['traces']