"""Console script to execute jobs."""
from sys import exit as sys_exit
from os.path import abspath, expanduser
from logging import getLogger
from locale import getlocale
from json import loads
from datetime import datetime, timedelta
from transaction import manager
from sqlalchemy import desc
from chrysalio.lib.utils import make_digest, convert_value
from chrysalio.lib.config import settings_get_list
from chrysalio.lib.log import log_activity_setup
from chrysalio.scripts import Script
from chrysalio.models import get_tm_dbsession
from chrysalio.models.dbsettings import DBSettings
from ..lib.i18n import _, translate
from ..lib.build_manager import BuildManager
from ..models.dbjob import DBJob, DBJobArea
LOG = getLogger()
# =============================================================================
[docs]def main(args=None):
"""Main function."""
args = Execute.arguments(Execute.argument_parser(), args, 'a')
if args is not None:
sys_exit(Execute(args).run())
# =============================================================================
[docs]class Execute(Script):
"""Class to execute jobs.
:type args: argparse.Namespace
:param args:
Command line arguments.
:param list includes: (optional)
List of hard coding `includes`.
:type dbsession_factory: sqlalchemy.orm.session.sessionmaker
:param dbsession_factory: (optional)
Function to create session.
"""
# -------------------------------------------------------------------------
def __init__(self, args, includes=None, dbsession_factory=None):
"""Constructor method."""
super(Execute, self).__init__(
args, False, includes, dbsession_factory=dbsession_factory)
if self.registry is None:
return
# Check services
if not self.registry.get('services'):
LOG.critical(self._translate(_('No service is available.')))
self.registry = None
return
# Setup logging
self.registry['log_activity'] = log_activity_setup(
self._config,
self.registry.settings['__file__'] if self._args.cron else None)
if not self._args.cron:
self.registry['no_db_log'] = True
# -------------------------------------------------------------------------
[docs] @classmethod
def argument_parser(cls, description='Execute jobs.'):
"""Create an argument parser object to parse command line arguments.
:param str description: (optional)
Description of the script populating the database.
:rtype: argparse.ArgumentParser
"""
parser = super(Execute, cls).argument_parser(description=description)
parser.add_argument(
'files', nargs='*', help='optional list of files to process')
parser.add_argument('--job', help='the only job to execute')
parser.add_argument('--resources', nargs='*', help='needed resources')
parser.add_argument('--output', help='output directory')
parser.add_argument(
'--cron', dest='cron', help='periodically execution',
action='store_true')
parser.add_argument(
'--stop', dest='stop', help='stop periodically execution',
action='store_true')
parser.add_argument(
'--start', dest='start', help='start periodically execution',
action='store_true')
parser.add_argument(
'--no-delay', dest='no_delay',
help='execute periodical job immediatly', action='store_true')
parser.add_argument(
'--list-jobs', dest='list_jobs', help='list of available jobs',
action='store_true')
return parser
# -------------------------------------------------------------------------
[docs] def run(self):
"""Execute jobs.
:rtype: int
:return:
Exit code.
"""
if self.registry is None:
return 1
# List jobs
if self._args.list_jobs:
self._list_jobs()
return 0
# Stop or start jobs
if self._args.stop or self._args.start:
self._stop_start_jobs()
return 0
# Clean up log
with manager:
dbsession = get_tm_dbsession(
self.registry['dbsession_factory'], manager)
if not DBSettings.exists(dbsession):
LOG.warning(self._translate(_('Database is empty.')))
return 0
BuildManager.clean_logs(dbsession)
with manager:
dbsession = get_tm_dbsession(
self.registry['dbsession_factory'], manager)
# Activate modules
for module_id in self.registry['modules']:
self.registry['modules'][module_id].activate(
self.registry, dbsession)
# Browse jobs
dbjobs = dbsession.query(DBJob).order_by(desc('priority')) \
if not self._args.job \
else dbsession.query(DBJob).filter_by(job_id=self._args.job)
for dbjob in dbjobs:
# Check if the job is stopped
if self._args.cron and dbjob.stopped:
continue
# Check if the job is for cioexecute
if not self._args.job:
area_ids = dbsession.query(DBJobArea.area_id).filter_by(
job_id=dbjob.job_id).all()
if area_ids and ('cioexecute',) not in area_ids:
continue
# Check if it is time to launch the job
not_before = self._not_before(dbjob)
if not_before is not None and not_before > datetime.now():
continue
# Run
service = self.registry['services'].get(dbjob.service)
if service is None:
LOG.error(self._translate(_(
'Service "${s}" is not available.',
{'s': dbjob.service})))
self._run_job(dbjob.job_id, service, dbjob.context, dbjob.ttl)
return 0
# -------------------------------------------------------------------------
def _run_job(self, job_id, service, context, ttl):
"""Execute a job.
:param str job_id:
Job ID.
:type service: .services.Service
:param service:
Service to execute.
:param str context:
Context name.
:param int ttl:
Time To Live of the execution.
"""
# Check authorization
if service is None and not self._args.cron:
return
if service is not None and not service.authorized(
caller='cioexecute', job={
'job_id': job_id, 'service_id': service.uid,
'context': context, 'ttl': ttl}): # pragma: nocover
LOG.warning(self._translate(_(
'[${j}] This job is not authorized.', {'j': job_id})))
return
# Set parameters
params = {
'job_id': job_id, 'context': context, 'ttl': ttl,
'lang': self._args.lang}
with manager:
# Stop
dbsession = get_tm_dbsession(
self.registry['dbsession_factory'], manager)
dbjob = dbsession.query(DBJob).filter_by(job_id=job_id).first()
if self._args.cron:
dbjob.last_trigger = datetime.now()
if service is None:
dbjob.stopped = True
return
# Get values of variables
variables = service.variables(context)
params['values'] = {
k: variables[k]['default'] for k in variables
if 'default' in variables[k]}
for dbvalue in dbjob.values:
if dbvalue.variable in variables:
params['values'][dbvalue.variable] = convert_value(
variables[dbvalue.variable]['type'], dbvalue.value)
# Get settings
params['settings'] = {}
if self._config.has_section('Job:{0}'.format(job_id)):
params['settings'] = dict(
self._config.items('Job:{0}'.format(job_id)))
params['settings']['cron'] = self._args.cron
# Set files
params['files'] = [abspath(k) for k in self._args.files] \
or settings_get_list(params['settings'], 'files')
service.select_files(params)
# Set resources
params['resources'] = [abspath(k) for k in self._args.resources or '']\
+ settings_get_list(params['settings'], 'resources')
# Set output
if service.need_write_permission(context):
params['output'] = self._args.output \
or params['settings'].get('output')
params['output'] = abspath(expanduser(params['output'])) \
if params['output'] else None
# Execute the job
build_id = '{0}-{1}'.format(
job_id, make_digest(','.join(sorted(params['files']))))
result = service.run(service.make_build(build_id, params))
# Set last trigger and execution time
if self._args.cron:
with manager:
dbsession = get_tm_dbsession(
self.registry['dbsession_factory'], manager)
dbjob = dbsession.query(DBJob).filter_by(job_id=job_id).first()
if dbjob is not None:
dbjob.last_trigger = datetime.now()
if 'no_execution' not in result:
dbjob.last_execution = dbjob.last_trigger
# -------------------------------------------------------------------------
def _list_jobs(self):
"""List current jobs."""
LOG.info('=' * 80)
LOG.info('{0:^80}'.format(self._translate(_('Job list'))))
LOG.info('=' * 80)
with manager:
dbsession = get_tm_dbsession(
self.registry['dbsession_factory'], manager)
for dbjob in dbsession.query(DBJob).order_by(desc('priority')):
state = self._translate(_('inactive:')) \
if dbjob.stopped else self._translate(_('active :'))
if not dbjob.stopped and dbjob.areas \
and 'cioexecute' not in [k.area_id for k in dbjob.areas]:
state = self._translate(_('(active):'))
LOG.info('%s (%s) %s', state, dbjob.job_id, self._job_label(
dbjob))
# -------------------------------------------------------------------------
def _stop_start_jobs(self):
"""Stop/start jobs."""
with manager:
dbsession = get_tm_dbsession(
self.registry['dbsession_factory'], manager)
for dbjob in dbsession.query(DBJob).order_by(desc('priority')):
if self._args.job and dbjob.job_id != self._args.job:
continue
dbjob.stopped = self._args.stop
state = self._translate(_('inactive:')) \
if dbjob.stopped else self._translate(_('active :'))
if not dbjob.stopped and dbjob.areas \
and 'cioexecute' not in [k.area_id for k in dbjob.areas]:
state = self._translate(_('(active):'))
LOG.info(
'%s (%s) %s', state, dbjob.job_id, self._job_label(dbjob))
# -------------------------------------------------------------------------
def _not_before(self, dbjob):
"""Estimate the date of next trigger.
:type dbjob: .models.dbjob.DBJob
:param dbjob:
SQLAlchemy Job object for the current job.
:rtype: :class:`datetime.datetime` or ``None``
"""
if self._args.cron and not dbjob.every:
return datetime.now() + timedelta(hours=1)
if not self._args.cron or self._args.no_delay:
return None
# Check the period
not_before = dbjob.last_trigger \
if dbjob.last_trigger is not None \
else datetime(2001, 1, 1)
if dbjob.every_unit == 'day':
not_before = datetime(*not_before.timetuple()[0:3]) + \
timedelta(days=dbjob.every, minutes=dbjob.every_at)
elif dbjob.every_unit == 'hour':
not_before = datetime(*not_before.timetuple()[0:4]) + \
timedelta(hours=dbjob.every, minutes=dbjob.every_at)
else:
not_before = datetime(*not_before.timetuple()[0:5]) + \
timedelta(minutes=dbjob.every)
return not_before
# -------------------------------------------------------------------------
def _job_label(self, dbjob):
"""Return the job label in the most approriate language.
:type dbjob: :class:`.models.dbjob.DBJob`
:param dbjob:
Current SqlAlchemy object.
:rtype: str
"""
lang = self._args.lang or getlocale()[0]
labels = loads(dbjob.i18n_label)
return labels.get(
lang, labels.get(lang.partition('_')[0], labels.get(
'en', dbjob.job_id)))
# -------------------------------------------------------------------------
def _translate(self, text):
"""Return ``text`` translated.
:param str text:
Text to translate.
:rtype: str
"""
return translate(text, self._args.lang)