"""Job management view callables."""
from __future__ import annotations
from sqlalchemy import desc, or_
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm.exc import FlushError
from webob.compat import cgi_FieldStorage as FieldStorage
from pyramid.response import Response
from pyramid.httpexceptions import HTTPFound, HTTPNotFound, HTTPForbidden
from pyramid.view import view_config
from chrysalio.lib.log import log_info
from chrysalio.lib.form import get_action, Form
from chrysalio.lib.filter import Filter
from chrysalio.lib.paging import PAGE_SIZES, Paging
from chrysalio.lib.utils import make_id, age, rst2html
from chrysalio.lib.tabset import Tabset
from chrysalio.lib.attachment import attachment_url, attachment_update
from chrysalio.helpers.literal import Literal
from chrysalio.includes.themes import theme_static_prefix
from chrysalio.views import BaseView
from chrysalio.models.dbuser import DBUser
from chrysalio.models.dbgroup import DBGroup
from chrysalio.models.populate import db2web, web2db
from ..relaxng import RELAXNG_CIOSERVICE
from ..models.dbjob import JOB_LOG_STATUS_LABELS, JOB_ACCESSES, DBJob, DBJobLog
from ..models.dbjob import DBJobArea, DBJobValue, DBJobUser, DBJobGroup
from ..models.populate import xml2db
from ..lib.service import Service
from ..lib.i18n import _
LOG_REFRESH = '300'
# =============================================================================
[docs]
class JobView(BaseView):
"""Class to manage jobs.
:type request: pyramid.request.Request
:param request:
Current request.
"""
# -------------------------------------------------------------------------
[docs]
@view_config(
route_name='job_index',
renderer='cioservice:Templates/job_index.pt',
permission='job-view')
@view_config(route_name='job_index', renderer='json', xhr=True)
def index(self) -> dict | Response:
"""List all jobs."""
# Ajax
i_creator = self._request.has_permission('job-create')
if self._request.is_xhr:
if i_creator:
self._import_jobs()
return {}
# Action
action, items = self._index_action(i_creator)
if action[:4] == 'exp!':
response = self._jobs2response(items)
if response:
return response
# Filter
paging_id = 'jobs'
if i_creator:
pfilter = Filter(
self._request,
paging_id, (
('i18n_label', _('Label'), False, ''),
('job_id', _('Identifier'), False, ''), (
'access', _('Access'), False,
[('', ' ')] + list(JOB_ACCESSES.items()))),
remove=action[:4] == 'crm!' and action[4:] or None)
else:
pfilter = Filter(
self._request,
paging_id, (
('i18n_label', _('Label'), False, ''),
('job_id', _('Identifier'), False, '')),
remove=action[:4] == 'crm!' and action[4:] or None)
Filter(self._request, 'joblog', ()).clear()
# Paging
defaults = Paging.params(self._request, paging_id, '+job_id')
dbquery = pfilter.sql(self._request.dbsession.query(DBJob), 'srv_jobs')
if not i_creator:
dbquery = dbquery.filter(DBJob.job_id.in_(self._my_job_ids()))
oby = getattr(DBJob, defaults['sort'][1:])
dbquery = dbquery.order_by(
desc(oby) if defaults['sort'][0] == '-' else oby)
job_paging = Paging(self._request, paging_id, dbquery, defaults)
job_paging.set_current_ids('job_id')
# Form & completed action
form = Form(self._request, defaults=defaults)
form.forget('filter_value')
if action and action[3] == '!':
action = ''
# Breadcrumbs & documentation
self._request.breadcrumbs(_('Jobs'), 1)
self._request.documentation = '/job/index'
return { # yapf: disable
'action': action,
'items': items,
'form': form,
'pfilter': pfilter,
'paging': job_paging,
'PAGE_SIZES': PAGE_SIZES,
'i_creator': i_creator,
'age': age,
'i_editor': self._request.has_permission('job-edit'),
'has_attachments': bool(
self._request.registry.settings.get('attachments')),
'attachment_url': attachment_url,
'download_max_size': self._request.registry['settings'][
'download-max-size']
}
# -------------------------------------------------------------------------
[docs]
@view_config(route_name='job_index_filter', renderer='json', xhr=True)
def index_filter(self) -> dict:
"""Return a dictionary to autocomplete a filter field."""
return Filter.sql_autocomplete(self._request, DBJob)
# -------------------------------------------------------------------------
[docs]
@view_config(
route_name='job_log_all',
renderer='cioservice:Templates/job_log.pt',
permission='job-view')
@view_config(
route_name='job_log',
renderer='cioservice:Templates/job_log.pt',
permission='job-view')
def log(self) -> dict | HTTPFound:
"""Display the log of all job or just one of a job."""
# Authorization
i_creator = self._request.has_permission('job-create')
i_editor = self._request.has_permission('job-edit')
jobs = self._my_jobs(i_creator)
if not jobs:
return HTTPFound(self._request.route_path('job_index'))
dbjob = self._get_job()
if dbjob is not None and dbjob.job_id not in jobs:
raise HTTPForbidden
# Filter
action = get_action(self._request)[0]
paging_id = 'joblog'
pfilter = Filter(
self._request,
paging_id, (
('job_id', _('Job'), False, [('', ' ')] + list(jobs.items())),
('domain', _('Domain'), False, ''), (
'status', _('Status'), False,
[('', ' ')] + list(JOB_LOG_STATUS_LABELS.items())[:-1])),
remove=action[:4] == 'crm!' and action[4:] or None)
if dbjob is not None:
pfilter.append_condition('job_id', '=', dbjob.job_id)
# Paging
defaults = Paging.params(
self._request, paging_id, '-timestamp', default_display='list')
dbquery = self._request.dbsession.query(DBJobLog)
if not i_creator and dbjob is None:
dbquery = dbquery.filter(DBJobLog.job_id.in_(jobs.keys()))
dbquery = pfilter.sql(dbquery, 'srv_jobs_logs')
oby = getattr(DBJobLog, defaults['sort'][1:])
dbquery = dbquery.order_by(
desc(oby) if defaults['sort'][0] == '-' else oby)
log_paging = Paging(self._request, paging_id, dbquery, defaults)
# Action
if action[:4] == 'del!' and i_editor:
for dblog in log_paging:
self._request.dbsession.query(DBJobLog).filter_by(
job_id=dblog.job_id).delete()
log_paging = Paging(self._request, paging_id, (), defaults)
# Columns
has_columns = [False, False]
for dblog in log_paging:
has_columns[0] |= bool(dblog.caller)
has_columns[1] |= bool(dblog.domain)
if has_columns[0] and has_columns[1]:
break
# Form & completed action
form = Form(self._request, defaults=defaults)
form.forget('filter_value')
if action and action[3] == '!':
action = ''
# Breadcrumbs & documentation
if self._request.breadcrumbs.current_route_name() == 'job_log':
self._request.breadcrumbs.pop()
self._request.breadcrumbs(
_('Log of "${l}"', {'l': dbjob.label(self._request)}
) if dbjob is not None else _('Logs'))
self._request.documentation = '/job/log'
# Refresh
self._request.response.headerlist.append(('Refresh', LOG_REFRESH))
return { # yapf: disable
'action': action,
'form': form,
'pfilter': pfilter,
'paging': log_paging,
'PAGE_SIZES': PAGE_SIZES,
'job_id': dbjob.job_id if dbjob is not None else None,
'has_caller': has_columns[0],
'has_domain': has_columns[1],
'i_editor': i_editor,
'age': age,
'rst2html': rst2html,
'literal': Literal
}
# -------------------------------------------------------------------------
[docs]
@view_config(route_name='job_log_all_filter', renderer='json', xhr=True)
@view_config(route_name='job_log_filter', renderer='json', xhr=True)
def log_filter(self) -> dict:
"""Return a dictionary to autocomplete a filter field."""
job_id = self._request.matchdict.get('job_id')
return Filter.sql_autocomplete(
self._request,
DBJobLog,
where={'job_id': job_id} if job_id else None)
# -------------------------------------------------------------------------
[docs]
@view_config(
route_name='job_view',
renderer='cioservice:Templates/job_view.pt',
permission='job-view')
def view(self) -> dict | Response:
"""Show job configuration."""
dbjob = self._get_job()
if dbjob is None:
raise HTTPForbidden
picture = self._request.registry.settings.get('attachments') and (
attachment_url(
self._request, dbjob.attachments_dir, dbjob.attachments_key,
dbjob.picture)
or '{0}/images/job.svg'.format(theme_static_prefix(self._request)))
icon = self._request.registry.settings.get('attachments') and (
attachment_url(
self._request, dbjob.attachments_dir, dbjob.attachments_key,
dbjob.icon))
# Action
action = get_action(self._request)[0]
if action == 'exp!':
response = self._jobs2response((dbjob.job_id, ))
if response:
return response
# User paging
user_paging, defaults, user_filter = self._user_paging(action, dbjob)
# Form
form = Form(self._request, defaults=defaults)
form.forget('filter_value')
# Breadcrumbs & documentation
self._request.breadcrumbs(
dbjob.label(self._request),
replace=self._request.route_path('job_edit', job_id=dbjob.job_id))
self._request.documentation = '/admin-job/view'
return { # yapf: disable
'form': form,
'dbjob': dbjob,
'user_filter': user_filter,
'user_paging': user_paging,
'tabset': Tabset(
self._request, 'tabJob', dbjob.settings_tabs(self._request)),
'navigator': Paging.navigator(
self._request, 'jobs', dbjob.job_id,
self._request.route_path('job_view', job_id='_ID_')),
'picture': picture,
'icon': icon,
'i_editor': self._request.has_permission('job-edit')
}
# -------------------------------------------------------------------------
[docs]
@view_config(
route_name='job_create',
renderer='cioservice:Templates/job_edit.pt',
permission='job-create')
@view_config(
route_name='job_edit',
renderer='cioservice:Templates/job_edit.pt',
permission='job-edit')
@view_config(
route_name='job_edit',
renderer='json',
xhr=True,
permission='job-edit')
def edit(self) -> dict:
"""Create or edit a job."""
# Ajax
dbjob = self._get_job()
if self._request.is_xhr:
if isinstance(self._request.POST.get('picture'), FieldStorage) \
and dbjob is not None:
dbjob.attachments_key, dbjob.picture = attachment_update(
self._request,
dbjob.attachments_dir,
dbjob.attachments_key,
self._request.POST['picture'],
replace=dbjob.picture,
prefix=str(dbjob.job_id)[:12])
log_info(self._request, 'job_update_picture', dbjob.job_id)
if isinstance(self._request.POST.get('icon'), FieldStorage) \
and dbjob is not None:
dbjob.attachments_key, dbjob.icon = attachment_update(
self._request,
dbjob.attachments_dir,
dbjob.attachments_key,
self._request.POST['icon'],
replace=dbjob.icon,
prefix=str(dbjob.job_id)[:12])
log_info(self._request, 'job_update_icon', dbjob.job_id)
return {}
# User paging and groups
action = get_action(self._request)[0]
user_paging, defaults, user_filter = self._user_paging(action)
groups = {
k.group_id: (k.label(self._request), k.description(self._request))
for k in self._request.dbsession.query(DBGroup)
}
# Form
form = Form(
self._request,
*DBJob.settings_schema(
self._request, defaults, groups, dbjob, True),
obj=dbjob,
force_defaults=dbjob is not None)
form.forget('filter_value')
# Action
if action == 'pct!' and dbjob is not None:
dbjob.attachments_key, dbjob.picture = attachment_update(
self._request,
dbjob.attachments_dir,
dbjob.attachments_key,
self._request.POST['picture'],
replace=dbjob.picture,
prefix=str(dbjob.job_id)[:12])
log_info(self._request, 'job_update_picture', dbjob.job_id)
elif action == 'icn!' and dbjob is not None:
dbjob.attachments_key, dbjob.icon = attachment_update(
self._request,
dbjob.attachments_dir,
dbjob.attachments_key,
self._request.POST['icon'],
replace=dbjob.icon,
prefix=str(dbjob.job_id)[:12])
log_info(self._request, 'job_update_icon', dbjob.job_id)
elif action == 'sav!' and form.validate():
dbjob = self._save(dbjob, groups, form.values)
if dbjob is not None:
if 'job_id' not in self._request.matchdict:
self._request.breadcrumbs.pop()
log_info(
self._request,
'job_id' in self._request.matchdict and 'job_edit'
or 'job_create', dbjob.job_id)
return HTTPFound(
self._request.route_path('job_view', job_id=dbjob.job_id))
if form.has_error():
self._request.session.flash(_('Correct errors.'), 'alert')
# Picture
picture = \
dbjob and self._request.registry.settings.get('attachments') and (
attachment_url(
self._request, DBJob.attachments_dir,
dbjob.attachments_key, dbjob.picture) or
'{0}/images/job.svg'.format(
theme_static_prefix(self._request)))
icon = \
dbjob and self._request.registry.settings.get('attachments') and (
attachment_url(
self._request, DBJob.attachments_dir,
dbjob.attachments_key, dbjob.icon) or
'{0}/images/job.svg'.format(
theme_static_prefix(self._request)))
# Breadcrumbs & documentation
if not dbjob:
self._request.breadcrumbs(_('Job Creation'))
else:
self._request.breadcrumbs(
_('Job Edition'),
replace=self._request.route_path(
'job_view', job_id=dbjob.job_id))
self._request.documentation = '/admin-job/edit'
return { # yapf: disable
'form': form,
'dbjob': dbjob or DBJob,
'action': action,
'job_label': dbjob and dbjob.label(self._request),
'user_filter': user_filter,
'user_paging': user_paging,
'picture': picture,
'icon': icon,
'groups': groups,
'tabset': Tabset(
self._request, 'tabJob', DBJob.settings_tabs(self._request))
}
# -------------------------------------------------------------------------
def _get_job(self) -> DBJob | None:
"""Return the SqlAlchemy object of the selected job or raise
an HTTPNotFound exception.
:rtype: :class:`.models.dbjob.DBJob` or ``None``
"""
job_id = self._request.matchdict.get('job_id')
if job_id is None:
return None
dbjob = self._request.dbsession.query(DBJob).filter_by(
job_id=job_id).first()
if dbjob is None:
raise HTTPNotFound()
return dbjob
# -------------------------------------------------------------------------
def _delete_jobs(self, job_ids: list):
"""Delete jobs.
:param list job_ids:
List of job IDs to delete.
"""
deleted = []
for dbjob in self._request.dbsession.query(DBJob).filter(
DBJob.job_id.in_(job_ids)):
deleted.append(dbjob.job_id)
self._request.dbsession.delete(dbjob)
if deleted:
log_info(self._request, 'job_delete', ' '.join(deleted))
# -------------------------------------------------------------------------
def _import_jobs(self):
"""Import jobs."""
# Get current IDs
job_ids = {k[0] for k in self._request.dbsession.query(DBJob.job_id)}
# Update database
web2db(
self._request,
xml2db,
'job',
relaxngs={ # yapf: disable
'{{{0}}}{1}'.format(
RELAXNG_CIOSERVICE['namespace'],
RELAXNG_CIOSERVICE['root']):
RELAXNG_CIOSERVICE['file']
})
# Get new IDs
job_ids = {
k[0]
for k in self._request.dbsession.query(DBJob.job_id)
} - job_ids
if job_ids:
log_info(self._request, 'job_import', ' '.join(job_ids))
# -------------------------------------------------------------------------
def _jobs2response(self, job_ids):
"""Export jobs as an XML file embedded in a Pyramid response.
:param list job_ids:
List of job IDs to export.
:rtype: :class:`pyramid.response.Response` or ``''``
"""
dbitems = tuple(
self._request.dbsession.query(DBJob).filter(
DBJob.job_id.in_(job_ids)).order_by('job_id'))
if not dbitems:
return ''
filename = '{0}.{1}.xml'.format(
len(dbitems) == 1 and dbitems[0].job_id
or make_id(self._request.registry['settings']['title'], 'token'),
DBJob.suffix)
log_info(
self._request, 'job_export', ' '.join([k.job_id for k in dbitems]))
return db2web(self._request, dbitems, filename, RELAXNG_CIOSERVICE)
# -------------------------------------------------------------------------
def _index_action(self, i_creator: bool) -> tuple[str, tuple]:
"""Execute actions for index view.
:param bool i_creator:
``True`` if the user can create a job.
:rtype: tuple
:return:
A tuple such as ``(action, items)``.
"""
action, items = get_action(self._request)
if action[0:4] in ('act!', 'stp!'):
for job_id in items:
dbjob = self._request.dbsession.query(DBJob).filter_by(
job_id=job_id).first()
if dbjob is not None:
dbjob.stopped = action[0:4] == 'stp!'
dbjob.locked = False
elif action[:4] == 'del!' and i_creator:
self._delete_jobs(items)
elif action == 'imp!' and i_creator:
self._import_jobs()
return action, items
# -------------------------------------------------------------------------
def _my_job_ids(self) -> set:
"""Return a list of authorized job IDs.
:rtype: set
"""
user_id = self._request.session['user']['user_id']
job_ids = {
k[0]
for k in self._request.dbsession.query(DBJob.job_id).outerjoin(
DBJobUser).filter(
or_(DBJobUser.user_id == user_id, DBJob.access == 'free'))
}
job_ids |= {
k[0]
for k in self._request.dbsession.query(DBJobGroup.job_id).filter(
DBJobGroup.group_id.in_(
self._request.session['user']['groups']))
}
return job_ids
# -------------------------------------------------------------------------
def _my_jobs(self, i_creator: bool) -> dict:
"""Return a dictionary of authorized jobs.
:param bool i_creator:
``True`` if the user can create a job.
:rtype: dict
"""
if i_creator:
return {
k.job_id: k.label(self._request)
for k in self._request.dbsession.query(DBJob)
}
user_id = self._request.session['user']['user_id']
jobs = {
k.job_id: k.label(self._request)
for k in self._request.dbsession.query(DBJob).outerjoin(DBJobUser).
filter(or_(DBJobUser.user_id == user_id, DBJob.access == 'free'))
}
jobs.update(
{
k.job_id: k.label(self._request)
for k in self._request.dbsession.query(DBJob).outerjoin(
DBJobGroup).filter(
DBJobGroup.group_id.in_(
self._request.session['user']['groups']))
})
return jobs
# -------------------------------------------------------------------------
def _save(
self, dbjob: DBJob | None, groups: dict,
values: dict) -> DBJob | None:
"""Save a job.
:type dbjob: .models.dbjob.DBJob
:param dbjob:
Job to save.
:param dict groups:
A dictionary such as ``{group_id: (label, description),...}``.
:param dict values:
Form values.
:rtype: :class:`~.models.dbjob.DBJob` instance or ``None``
"""
creation = dbjob is None
dbjob = dbjob or DBJob()
# Update job
record = {
k: values[k]
for k in values if k[:4] not in ('area', 'val:', 'usr:', 'grp:')
}
if not creation:
record['job_id'] = dbjob.job_id
record['service'] = dbjob.service
error = dbjob.record_format(record)
if error: # pragma: nocover
self._request.session.flash(error, 'alert')
return None
service = self._request.registry['services'].get(record['service']) \
if 'services' in self._request.registry else None
if service is None:
self._request.session.flash(
_(
'Service \"${s}\" is not available.',
{'s': record['service']}), 'alert')
return None
for field in record:
if getattr(dbjob, field) != record[field]:
setattr(dbjob, field, record[field])
# Save
if creation:
try:
self._request.dbsession.add(dbjob)
self._request.dbsession.flush()
except (IntegrityError, FlushError):
self._request.session.flash(
_('This job already exists.'), 'alert')
return None
# Update areas
self._area_update(dbjob, values)
# Update values
self._values_update(dbjob, values, service)
# Update users
self._users_update(dbjob)
# Update groups
self._groups_update(dbjob, groups, values)
# Clear cache
if 'modules' in self._request.registry \
and 'ciowarehouse' in self._request.registry['modules']:
self._request.registry['modules']['ciowarehouse'].warehouse_forget(
self._request) # pragma: nocover
if 'modules' in self._request.registry \
and 'ciowarehouse2' in self._request.registry['modules']:
self._request.registry['modules'][
'ciowarehouse2'].warehouse_forget(
self._request) # pragma: nocover
return dbjob
# -------------------------------------------------------------------------
def _area_update(self, dbjob: DBJob, values: dict):
"""Update the list of application areas.
:type dbjob: .models.dbwarhouse.DBJob
:param dbjob:
SQLAlchemy object for the current job.
:param dict values:
Form values.
"""
for dbarea in self._request.dbsession.query(DBJobArea).filter_by(
job_id=dbjob.job_id):
self._request.dbsession.delete(dbarea)
if values.get('area:all'):
return
area_ids = ['cioexecute']
for module in self._request.registry.get('modules', {}).values():
area_ids += module.areas.keys()
for area_id in area_ids:
checked = values.get('area:{0}'.format(area_id))
if checked:
dbjob.areas.append(DBJobArea(area_id=area_id))
# -------------------------------------------------------------------------
def _values_update(self, dbjob: DBJob, values: dict, service: Service):
"""Update the list of values.
:type dbjob: .models.dbwarhouse.DBJob
:param dbjob:
SQLAlchemy object for the current job.
:param dict values:
Form values.
:type service:
:param service: .lib.service.Service
Current service.
"""
for dbvalue in self._request.dbsession.query(DBJobValue).filter_by(
job_id=dbjob.job_id):
self._request.dbsession.delete(dbvalue)
context = str(dbjob.context) if dbjob and dbjob.context else None
for name in service.variables(context):
value = values.get(f'val:{name}')
if value is not None:
dbjob.values.append(DBJobValue(variable=name, value=value))
# -------------------------------------------------------------------------
def _users_update(self, dbjob: DBJob):
"""Update the list of users.
:type dbjob: .models.dbwarhouse.DBJob
:param dbjob:
SQLAlchemy object for the current job.
"""
is_set = {}
for value in self._request.POST:
if value[:4] == 'usr:':
is_set[value[4:]] = True
elif value[:4] == 'shw:' and value[4:] not in is_set:
is_set[value[4:]] = False
for user_id, user_set in is_set.items():
dbjob_user = self._request.dbsession.query(DBJobUser).filter_by(
job_id=dbjob.job_id, user_id=int(user_id)).first()
if dbjob_user is not None and not user_set:
self._request.dbsession.delete(dbjob_user)
elif dbjob_user is None and user_set:
self._request.dbsession.add(
DBJobUser(job_id=dbjob.job_id, user_id=int(user_id)))
# -------------------------------------------------------------------------
def _groups_update(self, dbjob: DBJob, groups: dict, values: dict):
"""Update the list of groups.
:type dbjob: .models.dbwarhouse.DBJob
:param dbjob:
SQLAlchemy object for the current job.
:param dict groups:
A dictionary such as ``{group_id: (label, description),...}``.
:param dict values:
Form values.
"""
job_groups = {k.group_id: k for k in dbjob.groups}
for group_id in groups:
value = values['grp:{0}'.format(group_id)]
if value and group_id not in job_groups:
self._request.dbsession.add(
DBJobGroup(job_id=dbjob.job_id, group_id=group_id))
elif not value and group_id in job_groups:
self._request.dbsession.delete(
self._request.dbsession.query(DBJobGroup).filter_by(
job_id=dbjob.job_id, group_id=group_id).first())
# -------------------------------------------------------------------------
def _user_paging(self, action: str, dbjob: DBJob | None = None):
"""Return a paging object for users.
:param str action:
Current action.
:type dbjob: .models.dbwarhouse.DBJob
:param dbjob: (optional)
If not ``None``, users are only users of the job.
:rtype: tuple
:return:
A tuple such as ``(user_paging, defaults, user_filter)``.
"""
# Filter
paging_id = 'job_users'
ufilter = Filter(
self._request, paging_id, ( # yapf: disable
('login', _('Login'), False, None),
('last_name', _('Last name'), False, None),
('email', _('Email'), False, None),
('status', _('Status'), False,
[('', ' ')] + list(DBUser.status_labels.items()))),
remove=action[:4] == 'crm!' and action[4:] or None)
# Database query
defaults = Paging.params(
self._request, paging_id, '+last_name', default_display='list')
dbquery = ufilter.sql(
self._request.dbsession.query(
DBUser.user_id, DBUser.login, DBUser.first_name,
DBUser.last_name, DBUser.honorific, DBUser.email,
DBUser.email_hidden, DBUser.status, DBUser.last_login,
DBUser.attachments_key, DBUser.picture), 'users')
if dbjob is not None:
dbquery = dbquery.filter(
DBUser.user_id.in_([k.user_id for k in dbjob.users]))
oby = getattr(DBUser, defaults['sort'][1:])
dbquery = dbquery.order_by(
desc(oby) if defaults['sort'][0] == '-' else oby)
return Paging(self._request, paging_id, dbquery, defaults), \
dict(defaults), ufilter