Source code for cioservice.views.job

"""Job management view callables."""

from __future__ import annotations

from sqlalchemy import desc, or_
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm.exc import FlushError
from webob.compat import cgi_FieldStorage as FieldStorage

from pyramid.response import Response
from pyramid.httpexceptions import HTTPFound, HTTPNotFound, HTTPForbidden
from pyramid.view import view_config

from chrysalio.lib.log import log_info
from chrysalio.lib.form import get_action, Form
from chrysalio.lib.filter import Filter
from chrysalio.lib.paging import PAGE_SIZES, Paging
from chrysalio.lib.utils import make_id, age, rst2html
from chrysalio.lib.tabset import Tabset
from chrysalio.lib.attachment import attachment_url, attachment_update
from chrysalio.helpers.literal import Literal
from chrysalio.includes.themes import theme_static_prefix
from chrysalio.views import BaseView
from chrysalio.models.dbuser import DBUser
from chrysalio.models.dbgroup import DBGroup
from chrysalio.models.populate import db2web, web2db
from ..relaxng import RELAXNG_CIOSERVICE
from ..models.dbjob import JOB_LOG_STATUS_LABELS, JOB_ACCESSES, DBJob, DBJobLog
from ..models.dbjob import DBJobArea, DBJobValue, DBJobUser, DBJobGroup
from ..models.populate import xml2db
from ..lib.service import Service
from ..lib.i18n import _

LOG_REFRESH = '300'


# =============================================================================
[docs] class JobView(BaseView): """Class to manage jobs. :type request: pyramid.request.Request :param request: Current request. """ # -------------------------------------------------------------------------
[docs] @view_config( route_name='job_index', renderer='cioservice:Templates/job_index.pt', permission='job-view') @view_config(route_name='job_index', renderer='json', xhr=True) def index(self) -> dict | Response: """List all jobs.""" # Ajax i_creator = self._request.has_permission('job-create') if self._request.is_xhr: if i_creator: self._import_jobs() return {} # Action action, items = self._index_action(i_creator) if action[:4] == 'exp!': response = self._jobs2response(items) if response: return response # Filter paging_id = 'jobs' if i_creator: pfilter = Filter( self._request, paging_id, ( ('i18n_label', _('Label'), False, ''), ('job_id', _('Identifier'), False, ''), ( 'access', _('Access'), False, [('', ' ')] + list(JOB_ACCESSES.items()))), remove=action[:4] == 'crm!' and action[4:] or None) else: pfilter = Filter( self._request, paging_id, ( ('i18n_label', _('Label'), False, ''), ('job_id', _('Identifier'), False, '')), remove=action[:4] == 'crm!' and action[4:] or None) Filter(self._request, 'joblog', ()).clear() # Paging defaults = Paging.params(self._request, paging_id, '+job_id') dbquery = pfilter.sql(self._request.dbsession.query(DBJob), 'srv_jobs') if not i_creator: dbquery = dbquery.filter(DBJob.job_id.in_(self._my_job_ids())) oby = getattr(DBJob, defaults['sort'][1:]) dbquery = dbquery.order_by( desc(oby) if defaults['sort'][0] == '-' else oby) job_paging = Paging(self._request, paging_id, dbquery, defaults) job_paging.set_current_ids('job_id') # Form & completed action form = Form(self._request, defaults=defaults) form.forget('filter_value') if action and action[3] == '!': action = '' # Breadcrumbs & documentation self._request.breadcrumbs(_('Jobs'), 1) self._request.documentation = '/job/index' return { # yapf: disable 'action': action, 'items': items, 'form': form, 'pfilter': pfilter, 'paging': job_paging, 'PAGE_SIZES': PAGE_SIZES, 'i_creator': i_creator, 'age': age, 'i_editor': self._request.has_permission('job-edit'), 'has_attachments': bool( self._request.registry.settings.get('attachments')), 'attachment_url': attachment_url, 'download_max_size': self._request.registry['settings'][ 'download-max-size'] }
# -------------------------------------------------------------------------
[docs] @view_config(route_name='job_index_filter', renderer='json', xhr=True) def index_filter(self) -> dict: """Return a dictionary to autocomplete a filter field.""" return Filter.sql_autocomplete(self._request, DBJob)
# -------------------------------------------------------------------------
[docs] @view_config( route_name='job_log_all', renderer='cioservice:Templates/job_log.pt', permission='job-view') @view_config( route_name='job_log', renderer='cioservice:Templates/job_log.pt', permission='job-view') def log(self) -> dict | HTTPFound: """Display the log of all job or just one of a job.""" # Authorization i_creator = self._request.has_permission('job-create') i_editor = self._request.has_permission('job-edit') jobs = self._my_jobs(i_creator) if not jobs: return HTTPFound(self._request.route_path('job_index')) dbjob = self._get_job() if dbjob is not None and dbjob.job_id not in jobs: raise HTTPForbidden # Filter action = get_action(self._request)[0] paging_id = 'joblog' pfilter = Filter( self._request, paging_id, ( ('job_id', _('Job'), False, [('', ' ')] + list(jobs.items())), ('domain', _('Domain'), False, ''), ( 'status', _('Status'), False, [('', ' ')] + list(JOB_LOG_STATUS_LABELS.items())[:-1])), remove=action[:4] == 'crm!' and action[4:] or None) if dbjob is not None: pfilter.append_condition('job_id', '=', dbjob.job_id) # Paging defaults = Paging.params( self._request, paging_id, '-timestamp', default_display='list') dbquery = self._request.dbsession.query(DBJobLog) if not i_creator and dbjob is None: dbquery = dbquery.filter(DBJobLog.job_id.in_(jobs.keys())) dbquery = pfilter.sql(dbquery, 'srv_jobs_logs') oby = getattr(DBJobLog, defaults['sort'][1:]) dbquery = dbquery.order_by( desc(oby) if defaults['sort'][0] == '-' else oby) log_paging = Paging(self._request, paging_id, dbquery, defaults) # Action if action[:4] == 'del!' and i_editor: for dblog in log_paging: self._request.dbsession.query(DBJobLog).filter_by( job_id=dblog.job_id).delete() log_paging = Paging(self._request, paging_id, (), defaults) # Columns has_columns = [False, False] for dblog in log_paging: has_columns[0] |= bool(dblog.caller) has_columns[1] |= bool(dblog.domain) if has_columns[0] and has_columns[1]: break # Form & completed action form = Form(self._request, defaults=defaults) form.forget('filter_value') if action and action[3] == '!': action = '' # Breadcrumbs & documentation if self._request.breadcrumbs.current_route_name() == 'job_log': self._request.breadcrumbs.pop() self._request.breadcrumbs( _('Log of "${l}"', {'l': dbjob.label(self._request)} ) if dbjob is not None else _('Logs')) self._request.documentation = '/job/log' # Refresh self._request.response.headerlist.append(('Refresh', LOG_REFRESH)) return { # yapf: disable 'action': action, 'form': form, 'pfilter': pfilter, 'paging': log_paging, 'PAGE_SIZES': PAGE_SIZES, 'job_id': dbjob.job_id if dbjob is not None else None, 'has_caller': has_columns[0], 'has_domain': has_columns[1], 'i_editor': i_editor, 'age': age, 'rst2html': rst2html, 'literal': Literal }
# -------------------------------------------------------------------------
[docs] @view_config(route_name='job_log_all_filter', renderer='json', xhr=True) @view_config(route_name='job_log_filter', renderer='json', xhr=True) def log_filter(self) -> dict: """Return a dictionary to autocomplete a filter field.""" job_id = self._request.matchdict.get('job_id') return Filter.sql_autocomplete( self._request, DBJobLog, where={'job_id': job_id} if job_id else None)
# -------------------------------------------------------------------------
[docs] @view_config( route_name='job_view', renderer='cioservice:Templates/job_view.pt', permission='job-view') def view(self) -> dict | Response: """Show job configuration.""" dbjob = self._get_job() if dbjob is None: raise HTTPForbidden picture = self._request.registry.settings.get('attachments') and ( attachment_url( self._request, dbjob.attachments_dir, dbjob.attachments_key, dbjob.picture) or '{0}/images/job.svg'.format(theme_static_prefix(self._request))) icon = self._request.registry.settings.get('attachments') and ( attachment_url( self._request, dbjob.attachments_dir, dbjob.attachments_key, dbjob.icon)) # Action action = get_action(self._request)[0] if action == 'exp!': response = self._jobs2response((dbjob.job_id, )) if response: return response # User paging user_paging, defaults, user_filter = self._user_paging(action, dbjob) # Form form = Form(self._request, defaults=defaults) form.forget('filter_value') # Breadcrumbs & documentation self._request.breadcrumbs( dbjob.label(self._request), replace=self._request.route_path('job_edit', job_id=dbjob.job_id)) self._request.documentation = '/admin-job/view' return { # yapf: disable 'form': form, 'dbjob': dbjob, 'user_filter': user_filter, 'user_paging': user_paging, 'tabset': Tabset( self._request, 'tabJob', dbjob.settings_tabs(self._request)), 'navigator': Paging.navigator( self._request, 'jobs', dbjob.job_id, self._request.route_path('job_view', job_id='_ID_')), 'picture': picture, 'icon': icon, 'i_editor': self._request.has_permission('job-edit') }
# -------------------------------------------------------------------------
[docs] @view_config( route_name='job_create', renderer='cioservice:Templates/job_edit.pt', permission='job-create') @view_config( route_name='job_edit', renderer='cioservice:Templates/job_edit.pt', permission='job-edit') @view_config( route_name='job_edit', renderer='json', xhr=True, permission='job-edit') def edit(self) -> dict: """Create or edit a job.""" # Ajax dbjob = self._get_job() if self._request.is_xhr: if isinstance(self._request.POST.get('picture'), FieldStorage) \ and dbjob is not None: dbjob.attachments_key, dbjob.picture = attachment_update( self._request, dbjob.attachments_dir, dbjob.attachments_key, self._request.POST['picture'], replace=dbjob.picture, prefix=str(dbjob.job_id)[:12]) log_info(self._request, 'job_update_picture', dbjob.job_id) if isinstance(self._request.POST.get('icon'), FieldStorage) \ and dbjob is not None: dbjob.attachments_key, dbjob.icon = attachment_update( self._request, dbjob.attachments_dir, dbjob.attachments_key, self._request.POST['icon'], replace=dbjob.icon, prefix=str(dbjob.job_id)[:12]) log_info(self._request, 'job_update_icon', dbjob.job_id) return {} # User paging and groups action = get_action(self._request)[0] user_paging, defaults, user_filter = self._user_paging(action) groups = { k.group_id: (k.label(self._request), k.description(self._request)) for k in self._request.dbsession.query(DBGroup) } # Form form = Form( self._request, *DBJob.settings_schema( self._request, defaults, groups, dbjob, True), obj=dbjob, force_defaults=dbjob is not None) form.forget('filter_value') # Action if action == 'pct!' and dbjob is not None: dbjob.attachments_key, dbjob.picture = attachment_update( self._request, dbjob.attachments_dir, dbjob.attachments_key, self._request.POST['picture'], replace=dbjob.picture, prefix=str(dbjob.job_id)[:12]) log_info(self._request, 'job_update_picture', dbjob.job_id) elif action == 'icn!' and dbjob is not None: dbjob.attachments_key, dbjob.icon = attachment_update( self._request, dbjob.attachments_dir, dbjob.attachments_key, self._request.POST['icon'], replace=dbjob.icon, prefix=str(dbjob.job_id)[:12]) log_info(self._request, 'job_update_icon', dbjob.job_id) elif action == 'sav!' and form.validate(): dbjob = self._save(dbjob, groups, form.values) if dbjob is not None: if 'job_id' not in self._request.matchdict: self._request.breadcrumbs.pop() log_info( self._request, 'job_id' in self._request.matchdict and 'job_edit' or 'job_create', dbjob.job_id) return HTTPFound( self._request.route_path('job_view', job_id=dbjob.job_id)) if form.has_error(): self._request.session.flash(_('Correct errors.'), 'alert') # Picture picture = \ dbjob and self._request.registry.settings.get('attachments') and ( attachment_url( self._request, DBJob.attachments_dir, dbjob.attachments_key, dbjob.picture) or '{0}/images/job.svg'.format( theme_static_prefix(self._request))) icon = \ dbjob and self._request.registry.settings.get('attachments') and ( attachment_url( self._request, DBJob.attachments_dir, dbjob.attachments_key, dbjob.icon) or '{0}/images/job.svg'.format( theme_static_prefix(self._request))) # Breadcrumbs & documentation if not dbjob: self._request.breadcrumbs(_('Job Creation')) else: self._request.breadcrumbs( _('Job Edition'), replace=self._request.route_path( 'job_view', job_id=dbjob.job_id)) self._request.documentation = '/admin-job/edit' return { # yapf: disable 'form': form, 'dbjob': dbjob or DBJob, 'action': action, 'job_label': dbjob and dbjob.label(self._request), 'user_filter': user_filter, 'user_paging': user_paging, 'picture': picture, 'icon': icon, 'groups': groups, 'tabset': Tabset( self._request, 'tabJob', DBJob.settings_tabs(self._request)) }
# ------------------------------------------------------------------------- def _get_job(self) -> DBJob | None: """Return the SqlAlchemy object of the selected job or raise an HTTPNotFound exception. :rtype: :class:`.models.dbjob.DBJob` or ``None`` """ job_id = self._request.matchdict.get('job_id') if job_id is None: return None dbjob = self._request.dbsession.query(DBJob).filter_by( job_id=job_id).first() if dbjob is None: raise HTTPNotFound() return dbjob # ------------------------------------------------------------------------- def _delete_jobs(self, job_ids: list): """Delete jobs. :param list job_ids: List of job IDs to delete. """ deleted = [] for dbjob in self._request.dbsession.query(DBJob).filter( DBJob.job_id.in_(job_ids)): deleted.append(dbjob.job_id) self._request.dbsession.delete(dbjob) if deleted: log_info(self._request, 'job_delete', ' '.join(deleted)) # ------------------------------------------------------------------------- def _import_jobs(self): """Import jobs.""" # Get current IDs job_ids = {k[0] for k in self._request.dbsession.query(DBJob.job_id)} # Update database web2db( self._request, xml2db, 'job', relaxngs={ # yapf: disable '{{{0}}}{1}'.format( RELAXNG_CIOSERVICE['namespace'], RELAXNG_CIOSERVICE['root']): RELAXNG_CIOSERVICE['file'] }) # Get new IDs job_ids = { k[0] for k in self._request.dbsession.query(DBJob.job_id) } - job_ids if job_ids: log_info(self._request, 'job_import', ' '.join(job_ids)) # ------------------------------------------------------------------------- def _jobs2response(self, job_ids): """Export jobs as an XML file embedded in a Pyramid response. :param list job_ids: List of job IDs to export. :rtype: :class:`pyramid.response.Response` or ``''`` """ dbitems = tuple( self._request.dbsession.query(DBJob).filter( DBJob.job_id.in_(job_ids)).order_by('job_id')) if not dbitems: return '' filename = '{0}.{1}.xml'.format( len(dbitems) == 1 and dbitems[0].job_id or make_id(self._request.registry['settings']['title'], 'token'), DBJob.suffix) log_info( self._request, 'job_export', ' '.join([k.job_id for k in dbitems])) return db2web(self._request, dbitems, filename, RELAXNG_CIOSERVICE) # ------------------------------------------------------------------------- def _index_action(self, i_creator: bool) -> tuple[str, tuple]: """Execute actions for index view. :param bool i_creator: ``True`` if the user can create a job. :rtype: tuple :return: A tuple such as ``(action, items)``. """ action, items = get_action(self._request) if action[0:4] in ('act!', 'stp!'): for job_id in items: dbjob = self._request.dbsession.query(DBJob).filter_by( job_id=job_id).first() if dbjob is not None: dbjob.stopped = action[0:4] == 'stp!' dbjob.locked = False elif action[:4] == 'del!' and i_creator: self._delete_jobs(items) elif action == 'imp!' and i_creator: self._import_jobs() return action, items # ------------------------------------------------------------------------- def _my_job_ids(self) -> set: """Return a list of authorized job IDs. :rtype: set """ user_id = self._request.session['user']['user_id'] job_ids = { k[0] for k in self._request.dbsession.query(DBJob.job_id).outerjoin( DBJobUser).filter( or_(DBJobUser.user_id == user_id, DBJob.access == 'free')) } job_ids |= { k[0] for k in self._request.dbsession.query(DBJobGroup.job_id).filter( DBJobGroup.group_id.in_( self._request.session['user']['groups'])) } return job_ids # ------------------------------------------------------------------------- def _my_jobs(self, i_creator: bool) -> dict: """Return a dictionary of authorized jobs. :param bool i_creator: ``True`` if the user can create a job. :rtype: dict """ if i_creator: return { k.job_id: k.label(self._request) for k in self._request.dbsession.query(DBJob) } user_id = self._request.session['user']['user_id'] jobs = { k.job_id: k.label(self._request) for k in self._request.dbsession.query(DBJob).outerjoin(DBJobUser). filter(or_(DBJobUser.user_id == user_id, DBJob.access == 'free')) } jobs.update( { k.job_id: k.label(self._request) for k in self._request.dbsession.query(DBJob).outerjoin( DBJobGroup).filter( DBJobGroup.group_id.in_( self._request.session['user']['groups'])) }) return jobs # ------------------------------------------------------------------------- def _save( self, dbjob: DBJob | None, groups: dict, values: dict) -> DBJob | None: """Save a job. :type dbjob: .models.dbjob.DBJob :param dbjob: Job to save. :param dict groups: A dictionary such as ``{group_id: (label, description),...}``. :param dict values: Form values. :rtype: :class:`~.models.dbjob.DBJob` instance or ``None`` """ creation = dbjob is None dbjob = dbjob or DBJob() # Update job record = { k: values[k] for k in values if k[:4] not in ('area', 'val:', 'usr:', 'grp:') } if not creation: record['job_id'] = dbjob.job_id record['service'] = dbjob.service error = dbjob.record_format(record) if error: # pragma: nocover self._request.session.flash(error, 'alert') return None service = self._request.registry['services'].get(record['service']) \ if 'services' in self._request.registry else None if service is None: self._request.session.flash( _( 'Service \"${s}\" is not available.', {'s': record['service']}), 'alert') return None for field in record: if getattr(dbjob, field) != record[field]: setattr(dbjob, field, record[field]) # Save if creation: try: self._request.dbsession.add(dbjob) self._request.dbsession.flush() except (IntegrityError, FlushError): self._request.session.flash( _('This job already exists.'), 'alert') return None # Update areas self._area_update(dbjob, values) # Update values self._values_update(dbjob, values, service) # Update users self._users_update(dbjob) # Update groups self._groups_update(dbjob, groups, values) # Clear cache if 'modules' in self._request.registry \ and 'ciowarehouse' in self._request.registry['modules']: self._request.registry['modules']['ciowarehouse'].warehouse_forget( self._request) # pragma: nocover if 'modules' in self._request.registry \ and 'ciowarehouse2' in self._request.registry['modules']: self._request.registry['modules'][ 'ciowarehouse2'].warehouse_forget( self._request) # pragma: nocover return dbjob # ------------------------------------------------------------------------- def _area_update(self, dbjob: DBJob, values: dict): """Update the list of application areas. :type dbjob: .models.dbwarhouse.DBJob :param dbjob: SQLAlchemy object for the current job. :param dict values: Form values. """ for dbarea in self._request.dbsession.query(DBJobArea).filter_by( job_id=dbjob.job_id): self._request.dbsession.delete(dbarea) if values.get('area:all'): return area_ids = ['cioexecute'] for module in self._request.registry.get('modules', {}).values(): area_ids += module.areas.keys() for area_id in area_ids: checked = values.get('area:{0}'.format(area_id)) if checked: dbjob.areas.append(DBJobArea(area_id=area_id)) # ------------------------------------------------------------------------- def _values_update(self, dbjob: DBJob, values: dict, service: Service): """Update the list of values. :type dbjob: .models.dbwarhouse.DBJob :param dbjob: SQLAlchemy object for the current job. :param dict values: Form values. :type service: :param service: .lib.service.Service Current service. """ for dbvalue in self._request.dbsession.query(DBJobValue).filter_by( job_id=dbjob.job_id): self._request.dbsession.delete(dbvalue) context = str(dbjob.context) if dbjob and dbjob.context else None for name in service.variables(context): value = values.get(f'val:{name}') if value is not None: dbjob.values.append(DBJobValue(variable=name, value=value)) # ------------------------------------------------------------------------- def _users_update(self, dbjob: DBJob): """Update the list of users. :type dbjob: .models.dbwarhouse.DBJob :param dbjob: SQLAlchemy object for the current job. """ is_set = {} for value in self._request.POST: if value[:4] == 'usr:': is_set[value[4:]] = True elif value[:4] == 'shw:' and value[4:] not in is_set: is_set[value[4:]] = False for user_id, user_set in is_set.items(): dbjob_user = self._request.dbsession.query(DBJobUser).filter_by( job_id=dbjob.job_id, user_id=int(user_id)).first() if dbjob_user is not None and not user_set: self._request.dbsession.delete(dbjob_user) elif dbjob_user is None and user_set: self._request.dbsession.add( DBJobUser(job_id=dbjob.job_id, user_id=int(user_id))) # ------------------------------------------------------------------------- def _groups_update(self, dbjob: DBJob, groups: dict, values: dict): """Update the list of groups. :type dbjob: .models.dbwarhouse.DBJob :param dbjob: SQLAlchemy object for the current job. :param dict groups: A dictionary such as ``{group_id: (label, description),...}``. :param dict values: Form values. """ job_groups = {k.group_id: k for k in dbjob.groups} for group_id in groups: value = values['grp:{0}'.format(group_id)] if value and group_id not in job_groups: self._request.dbsession.add( DBJobGroup(job_id=dbjob.job_id, group_id=group_id)) elif not value and group_id in job_groups: self._request.dbsession.delete( self._request.dbsession.query(DBJobGroup).filter_by( job_id=dbjob.job_id, group_id=group_id).first()) # ------------------------------------------------------------------------- def _user_paging(self, action: str, dbjob: DBJob | None = None): """Return a paging object for users. :param str action: Current action. :type dbjob: .models.dbwarhouse.DBJob :param dbjob: (optional) If not ``None``, users are only users of the job. :rtype: tuple :return: A tuple such as ``(user_paging, defaults, user_filter)``. """ # Filter paging_id = 'job_users' ufilter = Filter( self._request, paging_id, ( # yapf: disable ('login', _('Login'), False, None), ('last_name', _('Last name'), False, None), ('email', _('Email'), False, None), ('status', _('Status'), False, [('', ' ')] + list(DBUser.status_labels.items()))), remove=action[:4] == 'crm!' and action[4:] or None) # Database query defaults = Paging.params( self._request, paging_id, '+last_name', default_display='list') dbquery = ufilter.sql( self._request.dbsession.query( DBUser.user_id, DBUser.login, DBUser.first_name, DBUser.last_name, DBUser.honorific, DBUser.email, DBUser.email_hidden, DBUser.status, DBUser.last_login, DBUser.attachments_key, DBUser.picture), 'users') if dbjob is not None: dbquery = dbquery.filter( DBUser.user_id.in_([k.user_id for k in dbjob.users])) oby = getattr(DBUser, defaults['sort'][1:]) dbquery = dbquery.order_by( desc(oby) if defaults['sort'][0] == '-' else oby) return Paging(self._request, paging_id, dbquery, defaults), \ dict(defaults), ufilter