"""Base class for services."""
from __future__ import annotations
from os.path import join, exists
from time import time
from tempfile import gettempdir
from re import compile as re_compile
from threading import Event
import colander
from pyramid.request import Request
from transaction import manager
from chrysalio.models import VALUE_LEN, get_tm_dbsession
from chrysalio.lib.utils import convert_value
from chrysalio.lib.form import Form
from chrysalio.helpers.builder import Builder
from chrysalio.scripts import ScriptRegistry
from .i18n import _, translate
from .build import Build
from ..routes import ROUTE_BUILD_VIEW
from ..models.dbjob import JOB_LOG_STATUS_LABELS, DBJob, DBJobLog
MENU_JOBS = (
'{theme}/cioservice/images/menu_job.png', _('Jobs'), 'job-view',
'job_index', None)
MODE_JOBS = (
'job', (
'{theme}/cioservice/images/menu_job.png', _('Jobs'), 'mode-job',
'job_index', None))
LOCATION_REGEX = '/?[a-zA-Z0-9_-]+:[a-zA-Z0-9_/.-]*'
# =============================================================================
[docs]
class Service():
"""Base class to manage a service.
:type registry: chrysalio.scripts.ScriptRegistry
:param registry:
Application registry.
"""
uid: str | None = None
label = _('Base service')
_variables_groups: dict = {}
_variables: dict = {}
_need_files = False
_need_write_permission = False
_file_regex = None
_select_files_message = _('Select the appropriate files!')
# -------------------------------------------------------------------------
def __init__(self, registry: ScriptRegistry):
"""Constructor method."""
if self.uid is None:
self.uid = self.__class__.__module__
self._registry = registry
self._lock_dir = registry['modules']['cioservice'].lock_dir \
if 'modules' in registry and 'cioservice' in registry['modules'] \
else join(gettempdir(), 'Chrysalio')
# -------------------------------------------------------------------------
[docs]
@classmethod
def register(cls, environment, service_class):
"""Method to register the service.
:type environment: :class:`pyramid.config.Configurator` or
:class:`dict`
:param environment:
Object used to do configuration declaration within the application
or a ScriptRegistry to simulate the application registry.
:param service_class:
Service class.
"""
# Server mode (environment == configurator)
if hasattr(environment, 'registry'):
if 'services' not in environment.registry:
environment.registry['services'] = {}
service = service_class(environment.registry)
environment.registry['services'][service.uid] = service
if 'menu' in environment.registry:
if MENU_JOBS not in environment.registry['menu']:
environment.registry['menu'].insert(-1, MENU_JOBS)
# Populate/backup/execute mode (environment == ScriptRegistry)
else:
if 'services' not in environment:
environment['services'] = {}
service = service_class(environment)
environment['services'][service.uid] = service
# -------------------------------------------------------------------------
[docs]
def need_dialog(self, context: str) -> bool:
"""Return ``True`` if this service needs a dialog box before launching.
:param str context:
Context name (processor ID...).
:rtype: bool
"""
variables = self.variables(context)
return bool([True for k in variables.values() if k.get('visible')])
# -------------------------------------------------------------------------
[docs]
@classmethod
def need_files(cls, context: str) -> bool:
"""Return ``True`` if this service needs input files.
:param str context:
Context name (processor ID...).
:rtype: bool
"""
# pylint: disable = unused-argument
return cls._need_files
# -------------------------------------------------------------------------
[docs]
def need_write_permission(self, context: str) -> bool:
"""Return ``True`` if this service needs permission to write output
files.
:param str context:
Context name (processor ID...).
:rtype: bool
"""
# pylint: disable = unused-argument
return self._need_write_permission
# -------------------------------------------------------------------------
[docs]
def select_files(self, params: dict):
"""Select in the build parameters dictionary files which are candidate
for the service.
:param dict params:
Dictionary defining the parameters of the build.
"""
if 'files' in params:
if self._file_regex is None:
params['files'] = sorted(
[k for k in params['files'] if exists(k)])
else:
regex = re_compile(self._file_regex)
params['files'] = sorted(
[
k for k in params['files']
if exists(k) and regex.search(k) is not None
])
# -------------------------------------------------------------------------
[docs]
def select_files_message(self, params: dict) -> str:
"""Return a message specifying files to be selected.
:param dict params:
Dictionary defining the parameters of the build.
:rtype: pyramid.i18n.TranslationString
"""
# pylint: disable = unused-argument
return self._select_files_message
# -------------------------------------------------------------------------
[docs]
def authorized(self, caller: str, **kwargs) -> bool:
"""Check if the service is authorized .
:param str caller:
Name of the caller: 'cioexecute', 'browse_view'...
:param dict kwargs:
Keyworded arguments to decide if the service is authorized. It can
contain the following keys: 'job', 'warehouse', 'file_writer',
'meta_writer'.
:rtype: bool
"""
# pylint: disable = too-many-boolean-expressions
if caller == 'cioexecute':
return True
job = kwargs.get('job', {})
if job.get('access') == 'restricted' \
and 'users' in job and 'user_id' in kwargs \
and 'groups' in job and 'groups' in kwargs \
and kwargs['user_id'] not in job['users'] \
and not kwargs['groups'] & job['groups']:
return False
return kwargs.get('file_writer') or not self.need_write_permission(
job.get('context'))
# -------------------------------------------------------------------------
[docs]
def make_build(
self,
build_id: str,
params: dict,
stopping: Event | None = None) -> Build:
"""Create a build object.
:param str build_id:
ID of the build.
:param dict params:
Dictionary defining the parameters of the build. It has the
keys ``'job_id'``, ``'context'``, ``'lang'``, ``'ttl'``,
``settings``, ``'values'``, ``'files'``, ``'resources'`` and
``'output'``. It possibly has the key ``'dbsession'``.
:type stopping: threading.Event
:param stopping: (optional)
Flag to stop the processing.
:rtype: .lib.build
"""
# Retrieve build manager
build_manager = None
if 'modules' in self._registry \
and 'cioservice' in self._registry['modules']:
build_manager = self._registry['modules'][
'cioservice'].build_manager
params['locations'] = self._registry['modules'][
'cioservice'].locations
# Create the build and lock it
build = Build(
self._lock_dir, build_manager, build_id, params, stopping)
if not build.lock():
if not build.settings.get('cron'):
build.warning(
translate(_('Job already in progress.'), build.lang))
self.write_traces(build)
build.result['no_execution'] = True
return build
# Check input
if self.need_files(build.context) and not build.files:
if not build.settings.get('cron'):
build.warning(translate(_('Nothing to do.'), build.lang))
self.write_traces(build)
build.result['no_execution'] = True
build.unlock()
return build
return build
# -------------------------------------------------------------------------
[docs]
def run(self, build: Build) -> dict:
"""Prepare the service for execution and launch it.
:type build: .lib.build.Build
:param build:
Current build object.
:rtype: dict
:return:
A dictionary of results.
"""
if 'no_execution' in build.result:
return build.result
# Process
self.write_running(build)
build.progress_file(0, len(build.files))
build.progress_step(0, 1)
build.progress_save()
self._run(build)
build.progress_step(1, 1)
build.progress_file(len(build.files), len(build.files))
build.progress_save()
self.erase_running(build)
build.unlock()
return build.result
# -------------------------------------------------------------------------
def _run(self, build: Build):
"""Execute the service on the build ``build``.
:type build: .lib.build.Build
:param build:
Current build object.
"""
build.output_unrefreshed()
build.aborted_message()
self.write_traces(build)
# -------------------------------------------------------------------------
[docs]
def write_traces(self, build: Build, domain: str | None = None):
"""Write trace messages into the database and/or a log file.
:type build: .lib.build.Build
:param build:
Current build object
:param str domain: (optional)
Domain of the log.
"""
if not build.result.get('traces'):
return
# In a stream
if self._registry.get('log_activity'):
messages: dict | None = {'error': [], 'warning': [], 'info': []}
caller = build.caller.get('login')
for trace in build.result['traces']:
message = '{caller}({job_id}) {domain}{text}'.format(
caller='[{0}] '.format(caller) if caller else '',
job_id=build.job_id,
domain='<{0}> '.format(domain) if domain else '',
text=self._translate(build, trace[1]))
if trace[0] == 'E' and messages is not None:
messages['error'].append(self._translate(build, trace[1]))
self._registry['log_activity'].error(message)
elif trace[0] == 'W' and messages is not None:
messages['warning'].append(
self._translate(build, trace[1]))
self._registry['log_activity'].warning(message)
elif trace[0] == 'I' and messages is not None:
messages['info'].append(self._translate(build, trace[1]))
self._registry['log_activity'].info(message)
else:
messages = None
# In a database
if not self._registry.get('no_db_log'):
if build.dbsession is not None:
self._traces2db(build.dbsession, build, domain, messages)
elif self._registry.get('dbsession_factory'):
with manager:
dbsession = get_tm_dbsession(
self._registry['dbsession_factory'], manager)
self._traces2db(dbsession, build, domain, messages)
build.clear_traces()
# -------------------------------------------------------------------------
[docs]
def write_running(self, build: Build):
"""Write a "running" message into the database.
:type build: .lib.build.Build
:param build:
Current build object
"""
if self._registry.get('no_db_log'):
return
with manager:
dbsession = get_tm_dbsession(
self._registry['dbsession_factory'], manager) \
if build.dbsession is None else build.dbsession
dbsession.add(
DBJobLog(
job_id=build.job_id,
timestamp=time(),
caller=build.caller.get('login', ''),
build_id=build.uid,
status='running',
text=self._translate(build, _( # yapf: disable
'Job in progress: `${j} <${r}>`_', {
'j': build.uid,
'r': ROUTE_BUILD_VIEW.format(build_id=build.uid)
}))))
# -------------------------------------------------------------------------
[docs]
def erase_running(self, build: Build):
"""Erase the "running" message in the database.
:type build: .lib.build.Build
:param build:
Current build object
"""
if self._registry.get('no_db_log'):
return
with manager:
dbsession = get_tm_dbsession(
self._registry['dbsession_factory'], manager) \
if build.dbsession is None else build.dbsession
dbsession.query(DBJobLog).filter_by(
job_id=build.job_id, build_id=build.uid,
status='running').delete()
# -------------------------------------------------------------------------
def _traces2db(
self,
dbsession,
build: Build,
domain: str | None,
messages: dict | None = None):
"""Write trace messages into the database.
:type dbsession: sqlalchemy.orm.session.Session
:param dbsession:
SQLAlchemy session.
:type build: .lib.build.Build
:param build:
Current build object
:param str domain:
Domain of the log.
:param dict messages: (optional)
A dictionary with keys ``'error'``, ``'warning'`` and ``'info'``.
"""
# Possibly, fill the message dictionary
if messages is None:
messages = {'error': [], 'warning': [], 'info': []}
for trace in build.result['traces']:
if trace[0] == 'E':
messages['error'].append(self._translate(build, trace[1]))
elif trace[0] == 'W':
messages['warning'].append(
self._translate(build, trace[1]))
else:
messages['info'].append(self._translate(build, trace[1]))
# Errors
if messages.get('error'):
dbsession.add(
DBJobLog(
job_id=build.job_id,
timestamp=time(),
caller=build.caller.get('login', ''),
build_id=build.uid,
status='error',
text='\n'.join(messages['error']),
domain=domain))
return
# Write messages in the database
for status in JOB_LOG_STATUS_LABELS:
if messages.get(status):
dbsession.add(
DBJobLog(
job_id=build.job_id,
timestamp=time(),
caller=build.caller.get('login', ''),
build_id=build.uid,
status=status,
text='\n'.join(messages[status]),
domain=domain))
# -------------------------------------------------------------------------
[docs]
def variables(
self, context: str | None, request: Request | None = None) -> dict:
"""Return an ordered dictionary of variables.
:param str context:
Context name (processor ID...).
:type request: pyramid.request.Request
:param request: (optional)
Current request.
:rtype: dict
Each variable has a name (the key in the ordered dictionary and a
describing dictionary with the following keys:
* ``'group'``: home group
* ``'type'`` (required): a type among ``'string'``, ``'boolean'``,
``'integer'``, ``'decimal'``, ``'regex'`` and ``'list'``
* ``'label'`` (required)
* ``'hint'``: a hint to set the variable
* ``'required'``: a boolean to specify if the variable must have a
value
* ``'visible'``: the variable is visible before launching
* ``'default'``: default value
* ``'regex'``: a string required for regex type only
* ``'options'``: a dictionary required for list type only
"""
# pylint: disable = unused-argument
return self._variables
# -------------------------------------------------------------------------
[docs]
def variables_groups(self, context: str, request: Request) -> dict:
"""Return a dictionary of groups of variables.
:param str context:
Context name (processor ID...).
:type request: pyramid.request.Request
:param request:
Current request.
:rtype: dict
"""
# pylint: disable = unused-argument
return self._variables_groups
# -------------------------------------------------------------------------
[docs]
def values_tabview(
self, request: Request, context: str, form: Form,
values: dict) -> str:
"""Generate the value tab.
:type request: pyramid.request.Request
:param request:
Current request.
:param str context:
Context name (processor ID...).
:type form: .lib.form.Form
:param form:
Current form object.
:param dict values:
Values of the variables.
:rtype: str
"""
html = ''
_translate = request.localizer.translate
variables = self.variables(context, request)
groups = self.variables_groups(context, request)
current_group = None
for name, variable in variables.items():
if name not in values and not variable.get('default'):
continue
if variable.get('group') != current_group:
html += Builder().h3(
_translate(groups.get(variable.get('group'), ' ')))
current_group = variable.get('group', ' ')
value = values[name] if name in values else variable['default']
if variable['type'] == 'boolean':
if convert_value('boolean', value):
html += form.grid_item(
_translate(variable['label']),
_translate(_('yes')),
clear=True)
elif variable['type'] == 'list':
html += form.grid_item(
_translate(variable['label']),
_translate(variable.get('options', {}).get(value, value)),
clear=True)
else:
html += form.grid_item(
_translate(variable['label']), value, clear=True)
return html
# -------------------------------------------------------------------------
[docs]
def values_schema(
self,
schema,
defaults: dict,
dbjob: DBJob,
visible_only: bool = False,
relax: bool = False):
"""Update a Colander schema with default values for form with values.
:type schema: colander.SchemaNode
:param schema:
Colander schema to update.
:param dict defaults:
Dictionary of default values for a form.
:type dbjob: .models.dbjob.DBJob
:param dbjob:
SQLAlchemy object DBJob.
:param bool visible_only: (default=False)
If ``True``, list only visible variables.
:param bool relax: (default=False)
If ``True``, ``required`` is ignored.
"""
# pylint: disable = too-many-branches
variables = self.variables(
str(dbjob.context) if dbjob and dbjob.context else None)
for name, variable in variables.items():
if visible_only and not variable.get('visible'):
continue
missing = colander.required if variable.get(
'required') and not relax else None
missing = '' \
if missing is None and 'default' in variable else missing
# Schema
if variable['type'] == 'boolean':
schema.add(
colander.SchemaNode(
colander.Boolean(), name=f'val:{name}', missing=False))
elif variable['type'] == 'integer':
schema.add(
colander.SchemaNode(
colander.Integer(),
name=f'val:{name}',
missing=missing))
elif variable['type'] == 'decimal':
schema.add(
colander.SchemaNode(
colander.Float(), name=f'val:{name}', missing=missing))
elif variable['type'] == 'regex':
regex = '^{0}$'.format(variable.get('regex', '.*'))
schema.add(
colander.SchemaNode(
colander.String(),
name=f'val:{name}',
validator=colander.All(
colander.Regex(regex),
colander.Length(max=VALUE_LEN)),
missing=missing))
elif variable['type'] == 'list':
schema.add(
colander.SchemaNode(
colander.String(),
name=f'val:{name}',
validator=colander.OneOf(
variable.get('options', {}).keys()),
missing=missing))
else:
schema.add(
colander.SchemaNode(
colander.String(),
name=f'val:{name}',
validator=colander.Length(max=VALUE_LEN),
missing=missing))
# Defaults
if 'default' in variable:
defaults[f'val:{name}'] = variable['default']
if dbjob is None:
return
for dbvalue in dbjob.values:
if dbvalue.variable not in variables:
continue
if variables[dbvalue.variable]['type'] == 'boolean':
if convert_value('boolean', dbvalue.value):
defaults[f'val:{dbvalue.variable}'] = True
else:
defaults[f'val:{dbvalue.variable}'] = convert_value(
variables[dbvalue.variable]['type'], dbvalue.value)
# -------------------------------------------------------------------------
[docs]
def values_tabedit(
self,
request: Request,
context: str,
form: Form,
visible_only: bool = False) -> str:
"""Generate the value tab for edition.
:type request: pyramid.request.Request
:param request:
Current request.
:param str context:
Context name (processor ID...).
:type form: .lib.form.Form
:param form:
Current form object.
:pram bool visible_only: (default=False)
If ``True``, list only visible variables.
:rtype: chrysalio.helpers.literal.Literal
"""
html = ''
_translate = request.localizer.translate
variables = self.variables(context, request)
groups = self.variables_groups(context, request)
current_group = None
for name, variable in variables.items():
visible = not visible_only or variable.get('visible')
class_ = 'cioFormItem' if visible else 'cioFormItem cioHidden'
if visible and variable.get('group') != current_group:
html += Builder().h3(
_translate(groups.get(variable.get('group'), ' ')))
current_group = variable.get('group', ' ')
if variable['type'] == 'boolean':
html += form.grid_custom_checkbox(
'val:{0}'.format(name),
_translate(variable['label']),
required=variable.get('required'),
hint=_translate(variable.get('hint', '')),
title=name,
clear=True,
class_=class_)
elif variable['type'] == 'list':
options = variable.get('options', {})
options = [(k, _translate(options[k])) for k in options]
html += form.grid_select(
'val:{0}'.format(name),
_translate(variable['label']), [('', ' ')] +
options if not variable.get('required') else options,
required=variable.get('required'),
hint=_translate(variable.get('hint', '')),
title=name,
clear=True,
class_=class_)
else:
html += form.grid_text(
'val:{0}'.format(name),
_translate(variable['label']),
maxlength=VALUE_LEN,
required=variable.get('required'),
hint=_translate(variable.get('hint', '')),
title=name,
clear=True,
class_=class_)
return html
# -------------------------------------------------------------------------
@classmethod
def _translate(cls, build: Build, text: str) -> str:
"""Return ``text`` translated.
:type build: .lib.build.Build
:param build:
Current build object
:param str text:
Text to translate.
:rtype: str
"""
return translate(text, build.lang)