Source code for cioservice.lib.service

"""Base class for services."""

from __future__ import annotations
from os.path import join, exists
from time import time
from tempfile import gettempdir
from re import compile as re_compile
from threading import Event

import colander
from pyramid.request import Request
from transaction import manager

from chrysalio.models import VALUE_LEN, get_tm_dbsession
from chrysalio.lib.utils import convert_value
from chrysalio.lib.form import Form
from chrysalio.helpers.builder import Builder
from chrysalio.scripts import ScriptRegistry
from .i18n import _, translate
from .build import Build
from ..routes import ROUTE_BUILD_VIEW
from ..models.dbjob import JOB_LOG_STATUS_LABELS, DBJob, DBJobLog

MENU_JOBS = (
    '{theme}/cioservice/images/menu_job.png', _('Jobs'), 'job-view',
    'job_index', None)
MODE_JOBS = (
    'job', (
        '{theme}/cioservice/images/menu_job.png', _('Jobs'), 'mode-job',
        'job_index', None))
LOCATION_REGEX = '/?[a-zA-Z0-9_-]+:[a-zA-Z0-9_/.-]*'


# =============================================================================
[docs] class Service(): """Base class to manage a service. :type registry: chrysalio.scripts.ScriptRegistry :param registry: Application registry. """ uid: str | None = None label = _('Base service') _variables_groups: dict = {} _variables: dict = {} _need_files = False _need_write_permission = False _file_regex = None _select_files_message = _('Select the appropriate files!') # ------------------------------------------------------------------------- def __init__(self, registry: ScriptRegistry): """Constructor method.""" if self.uid is None: self.uid = self.__class__.__module__ self._registry = registry self._lock_dir = registry['modules']['cioservice'].lock_dir \ if 'modules' in registry and 'cioservice' in registry['modules'] \ else join(gettempdir(), 'Chrysalio') # -------------------------------------------------------------------------
[docs] @classmethod def register(cls, environment, service_class): """Method to register the service. :type environment: :class:`pyramid.config.Configurator` or :class:`dict` :param environment: Object used to do configuration declaration within the application or a ScriptRegistry to simulate the application registry. :param service_class: Service class. """ # Server mode (environment == configurator) if hasattr(environment, 'registry'): if 'services' not in environment.registry: environment.registry['services'] = {} service = service_class(environment.registry) environment.registry['services'][service.uid] = service if 'menu' in environment.registry: if MENU_JOBS not in environment.registry['menu']: environment.registry['menu'].insert(-1, MENU_JOBS) # Populate/backup/execute mode (environment == ScriptRegistry) else: if 'services' not in environment: environment['services'] = {} service = service_class(environment) environment['services'][service.uid] = service
# -------------------------------------------------------------------------
[docs] def need_dialog(self, context: str) -> bool: """Return ``True`` if this service needs a dialog box before launching. :param str context: Context name (processor ID...). :rtype: bool """ variables = self.variables(context) return bool([True for k in variables.values() if k.get('visible')])
# -------------------------------------------------------------------------
[docs] @classmethod def need_files(cls, context: str) -> bool: """Return ``True`` if this service needs input files. :param str context: Context name (processor ID...). :rtype: bool """ # pylint: disable = unused-argument return cls._need_files
# -------------------------------------------------------------------------
[docs] def need_write_permission(self, context: str) -> bool: """Return ``True`` if this service needs permission to write output files. :param str context: Context name (processor ID...). :rtype: bool """ # pylint: disable = unused-argument return self._need_write_permission
# -------------------------------------------------------------------------
[docs] def select_files(self, params: dict): """Select in the build parameters dictionary files which are candidate for the service. :param dict params: Dictionary defining the parameters of the build. """ if 'files' in params: if self._file_regex is None: params['files'] = sorted( [k for k in params['files'] if exists(k)]) else: regex = re_compile(self._file_regex) params['files'] = sorted( [ k for k in params['files'] if exists(k) and regex.search(k) is not None ])
# -------------------------------------------------------------------------
[docs] def select_files_message(self, params: dict) -> str: """Return a message specifying files to be selected. :param dict params: Dictionary defining the parameters of the build. :rtype: pyramid.i18n.TranslationString """ # pylint: disable = unused-argument return self._select_files_message
# -------------------------------------------------------------------------
[docs] def authorized(self, caller: str, **kwargs) -> bool: """Check if the service is authorized . :param str caller: Name of the caller: 'cioexecute', 'browse_view'... :param dict kwargs: Keyworded arguments to decide if the service is authorized. It can contain the following keys: 'job', 'warehouse', 'file_writer', 'meta_writer'. :rtype: bool """ # pylint: disable = too-many-boolean-expressions if caller == 'cioexecute': return True job = kwargs.get('job', {}) if job.get('access') == 'restricted' \ and 'users' in job and 'user_id' in kwargs \ and 'groups' in job and 'groups' in kwargs \ and kwargs['user_id'] not in job['users'] \ and not kwargs['groups'] & job['groups']: return False return kwargs.get('file_writer') or not self.need_write_permission( job.get('context'))
# -------------------------------------------------------------------------
[docs] def make_build( self, build_id: str, params: dict, stopping: Event | None = None) -> Build: """Create a build object. :param str build_id: ID of the build. :param dict params: Dictionary defining the parameters of the build. It has the keys ``'job_id'``, ``'context'``, ``'lang'``, ``'ttl'``, ``settings``, ``'values'``, ``'files'``, ``'resources'`` and ``'output'``. It possibly has the key ``'dbsession'``. :type stopping: threading.Event :param stopping: (optional) Flag to stop the processing. :rtype: .lib.build """ # Retrieve build manager build_manager = None if 'modules' in self._registry \ and 'cioservice' in self._registry['modules']: build_manager = self._registry['modules'][ 'cioservice'].build_manager params['locations'] = self._registry['modules'][ 'cioservice'].locations # Create the build and lock it build = Build( self._lock_dir, build_manager, build_id, params, stopping) if not build.lock(): if not build.settings.get('cron'): build.warning( translate(_('Job already in progress.'), build.lang)) self.write_traces(build) build.result['no_execution'] = True return build # Check input if self.need_files(build.context) and not build.files: if not build.settings.get('cron'): build.warning(translate(_('Nothing to do.'), build.lang)) self.write_traces(build) build.result['no_execution'] = True build.unlock() return build return build
# -------------------------------------------------------------------------
[docs] def run(self, build: Build) -> dict: """Prepare the service for execution and launch it. :type build: .lib.build.Build :param build: Current build object. :rtype: dict :return: A dictionary of results. """ if 'no_execution' in build.result: return build.result # Process self.write_running(build) build.progress_file(0, len(build.files)) build.progress_step(0, 1) build.progress_save() self._run(build) build.progress_step(1, 1) build.progress_file(len(build.files), len(build.files)) build.progress_save() self.erase_running(build) build.unlock() return build.result
# ------------------------------------------------------------------------- def _run(self, build: Build): """Execute the service on the build ``build``. :type build: .lib.build.Build :param build: Current build object. """ build.output_unrefreshed() build.aborted_message() self.write_traces(build) # -------------------------------------------------------------------------
[docs] def write_traces(self, build: Build, domain: str | None = None): """Write trace messages into the database and/or a log file. :type build: .lib.build.Build :param build: Current build object :param str domain: (optional) Domain of the log. """ if not build.result.get('traces'): return # In a stream if self._registry.get('log_activity'): messages: dict | None = {'error': [], 'warning': [], 'info': []} caller = build.caller.get('login') for trace in build.result['traces']: message = '{caller}({job_id}) {domain}{text}'.format( caller='[{0}] '.format(caller) if caller else '', job_id=build.job_id, domain='<{0}> '.format(domain) if domain else '', text=self._translate(build, trace[1])) if trace[0] == 'E' and messages is not None: messages['error'].append(self._translate(build, trace[1])) self._registry['log_activity'].error(message) elif trace[0] == 'W' and messages is not None: messages['warning'].append( self._translate(build, trace[1])) self._registry['log_activity'].warning(message) elif trace[0] == 'I' and messages is not None: messages['info'].append(self._translate(build, trace[1])) self._registry['log_activity'].info(message) else: messages = None # In a database if not self._registry.get('no_db_log'): if build.dbsession is not None: self._traces2db(build.dbsession, build, domain, messages) elif self._registry.get('dbsession_factory'): with manager: dbsession = get_tm_dbsession( self._registry['dbsession_factory'], manager) self._traces2db(dbsession, build, domain, messages) build.clear_traces()
# -------------------------------------------------------------------------
[docs] def write_running(self, build: Build): """Write a "running" message into the database. :type build: .lib.build.Build :param build: Current build object """ if self._registry.get('no_db_log'): return with manager: dbsession = get_tm_dbsession( self._registry['dbsession_factory'], manager) \ if build.dbsession is None else build.dbsession dbsession.add( DBJobLog( job_id=build.job_id, timestamp=time(), caller=build.caller.get('login', ''), build_id=build.uid, status='running', text=self._translate(build, _( # yapf: disable 'Job in progress: `${j} <${r}>`_', { 'j': build.uid, 'r': ROUTE_BUILD_VIEW.format(build_id=build.uid) }))))
# -------------------------------------------------------------------------
[docs] def erase_running(self, build: Build): """Erase the "running" message in the database. :type build: .lib.build.Build :param build: Current build object """ if self._registry.get('no_db_log'): return with manager: dbsession = get_tm_dbsession( self._registry['dbsession_factory'], manager) \ if build.dbsession is None else build.dbsession dbsession.query(DBJobLog).filter_by( job_id=build.job_id, build_id=build.uid, status='running').delete()
# ------------------------------------------------------------------------- def _traces2db( self, dbsession, build: Build, domain: str | None, messages: dict | None = None): """Write trace messages into the database. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: SQLAlchemy session. :type build: .lib.build.Build :param build: Current build object :param str domain: Domain of the log. :param dict messages: (optional) A dictionary with keys ``'error'``, ``'warning'`` and ``'info'``. """ # Possibly, fill the message dictionary if messages is None: messages = {'error': [], 'warning': [], 'info': []} for trace in build.result['traces']: if trace[0] == 'E': messages['error'].append(self._translate(build, trace[1])) elif trace[0] == 'W': messages['warning'].append( self._translate(build, trace[1])) else: messages['info'].append(self._translate(build, trace[1])) # Errors if messages.get('error'): dbsession.add( DBJobLog( job_id=build.job_id, timestamp=time(), caller=build.caller.get('login', ''), build_id=build.uid, status='error', text='\n'.join(messages['error']), domain=domain)) return # Write messages in the database for status in JOB_LOG_STATUS_LABELS: if messages.get(status): dbsession.add( DBJobLog( job_id=build.job_id, timestamp=time(), caller=build.caller.get('login', ''), build_id=build.uid, status=status, text='\n'.join(messages[status]), domain=domain)) # -------------------------------------------------------------------------
[docs] def variables( self, context: str | None, request: Request | None = None) -> dict: """Return an ordered dictionary of variables. :param str context: Context name (processor ID...). :type request: pyramid.request.Request :param request: (optional) Current request. :rtype: dict Each variable has a name (the key in the ordered dictionary and a describing dictionary with the following keys: * ``'group'``: home group * ``'type'`` (required): a type among ``'string'``, ``'boolean'``, ``'integer'``, ``'decimal'``, ``'regex'`` and ``'list'`` * ``'label'`` (required) * ``'hint'``: a hint to set the variable * ``'required'``: a boolean to specify if the variable must have a value * ``'visible'``: the variable is visible before launching * ``'default'``: default value * ``'regex'``: a string required for regex type only * ``'options'``: a dictionary required for list type only """ # pylint: disable = unused-argument return self._variables
# -------------------------------------------------------------------------
[docs] def variables_groups(self, context: str, request: Request) -> dict: """Return a dictionary of groups of variables. :param str context: Context name (processor ID...). :type request: pyramid.request.Request :param request: Current request. :rtype: dict """ # pylint: disable = unused-argument return self._variables_groups
# -------------------------------------------------------------------------
[docs] def values_tabview( self, request: Request, context: str, form: Form, values: dict) -> str: """Generate the value tab. :type request: pyramid.request.Request :param request: Current request. :param str context: Context name (processor ID...). :type form: .lib.form.Form :param form: Current form object. :param dict values: Values of the variables. :rtype: str """ html = '' _translate = request.localizer.translate variables = self.variables(context, request) groups = self.variables_groups(context, request) current_group = None for name, variable in variables.items(): if name not in values and not variable.get('default'): continue if variable.get('group') != current_group: html += Builder().h3( _translate(groups.get(variable.get('group'), ' '))) current_group = variable.get('group', ' ') value = values[name] if name in values else variable['default'] if variable['type'] == 'boolean': if convert_value('boolean', value): html += form.grid_item( _translate(variable['label']), _translate(_('yes')), clear=True) elif variable['type'] == 'list': html += form.grid_item( _translate(variable['label']), _translate(variable.get('options', {}).get(value, value)), clear=True) else: html += form.grid_item( _translate(variable['label']), value, clear=True) return html
# -------------------------------------------------------------------------
[docs] def values_schema( self, schema, defaults: dict, dbjob: DBJob, visible_only: bool = False, relax: bool = False): """Update a Colander schema with default values for form with values. :type schema: colander.SchemaNode :param schema: Colander schema to update. :param dict defaults: Dictionary of default values for a form. :type dbjob: .models.dbjob.DBJob :param dbjob: SQLAlchemy object DBJob. :param bool visible_only: (default=False) If ``True``, list only visible variables. :param bool relax: (default=False) If ``True``, ``required`` is ignored. """ # pylint: disable = too-many-branches variables = self.variables( str(dbjob.context) if dbjob and dbjob.context else None) for name, variable in variables.items(): if visible_only and not variable.get('visible'): continue missing = colander.required if variable.get( 'required') and not relax else None missing = '' \ if missing is None and 'default' in variable else missing # Schema if variable['type'] == 'boolean': schema.add( colander.SchemaNode( colander.Boolean(), name=f'val:{name}', missing=False)) elif variable['type'] == 'integer': schema.add( colander.SchemaNode( colander.Integer(), name=f'val:{name}', missing=missing)) elif variable['type'] == 'decimal': schema.add( colander.SchemaNode( colander.Float(), name=f'val:{name}', missing=missing)) elif variable['type'] == 'regex': regex = '^{0}$'.format(variable.get('regex', '.*')) schema.add( colander.SchemaNode( colander.String(), name=f'val:{name}', validator=colander.All( colander.Regex(regex), colander.Length(max=VALUE_LEN)), missing=missing)) elif variable['type'] == 'list': schema.add( colander.SchemaNode( colander.String(), name=f'val:{name}', validator=colander.OneOf( variable.get('options', {}).keys()), missing=missing)) else: schema.add( colander.SchemaNode( colander.String(), name=f'val:{name}', validator=colander.Length(max=VALUE_LEN), missing=missing)) # Defaults if 'default' in variable: defaults[f'val:{name}'] = variable['default'] if dbjob is None: return for dbvalue in dbjob.values: if dbvalue.variable not in variables: continue if variables[dbvalue.variable]['type'] == 'boolean': if convert_value('boolean', dbvalue.value): defaults[f'val:{dbvalue.variable}'] = True else: defaults[f'val:{dbvalue.variable}'] = convert_value( variables[dbvalue.variable]['type'], dbvalue.value)
# -------------------------------------------------------------------------
[docs] def values_tabedit( self, request: Request, context: str, form: Form, visible_only: bool = False) -> str: """Generate the value tab for edition. :type request: pyramid.request.Request :param request: Current request. :param str context: Context name (processor ID...). :type form: .lib.form.Form :param form: Current form object. :pram bool visible_only: (default=False) If ``True``, list only visible variables. :rtype: chrysalio.helpers.literal.Literal """ html = '' _translate = request.localizer.translate variables = self.variables(context, request) groups = self.variables_groups(context, request) current_group = None for name, variable in variables.items(): visible = not visible_only or variable.get('visible') class_ = 'cioFormItem' if visible else 'cioFormItem cioHidden' if visible and variable.get('group') != current_group: html += Builder().h3( _translate(groups.get(variable.get('group'), ' '))) current_group = variable.get('group', ' ') if variable['type'] == 'boolean': html += form.grid_custom_checkbox( 'val:{0}'.format(name), _translate(variable['label']), required=variable.get('required'), hint=_translate(variable.get('hint', '')), title=name, clear=True, class_=class_) elif variable['type'] == 'list': options = variable.get('options', {}) options = [(k, _translate(options[k])) for k in options] html += form.grid_select( 'val:{0}'.format(name), _translate(variable['label']), [('', ' ')] + options if not variable.get('required') else options, required=variable.get('required'), hint=_translate(variable.get('hint', '')), title=name, clear=True, class_=class_) else: html += form.grid_text( 'val:{0}'.format(name), _translate(variable['label']), maxlength=VALUE_LEN, required=variable.get('required'), hint=_translate(variable.get('hint', '')), title=name, clear=True, class_=class_) return html
# ------------------------------------------------------------------------- @classmethod def _translate(cls, build: Build, text: str) -> str: """Return ``text`` translated. :type build: .lib.build.Build :param build: Current build object :param str text: Text to translate. :rtype: str """ return translate(text, build.lang)