Source code for cioservice

"""CioService, a module for Chrysalio to manage services."""

from __future__ import annotations
from sys import exit as sys_exit
from logging import getLogger
from os.path import dirname, join, exists, normpath
from tempfile import gettempdir
from shutil import rmtree
from argparse import Namespace

from sqlalchemy.orm.session import Session

from pyramid.config import Configurator
from pyramid.registry import Registry

from chrysalio.initialize import Initialize
from chrysalio.lib.config import settings_get_list
from chrysalio.includes.modules.models import DBModule
from chrysalio.modules import Module
from chrysalio.scripts import ScriptRegistry
from chrysalio.lib.navigation import NavEntry
from .relaxng import RELAXNG_CIOSERVICE
from .lib.i18n import _, translate
from .lib.build_manager import DEFAULT_CONCURRENCY, BuildManager
from .models.populate import xml2db as _xml2db, db2xml as _db2xml
from .menu import MENU_CIOSERVICE

LOG = getLogger(__name__)


# =============================================================================
[docs] def includeme(configurator: Configurator | ScriptRegistry): """Function to include module. :type configurator: pyramid.config.Configurator :param configurator: Object used to do configuration declaration within the application. """ # Registration Module.register(configurator, ModuleCioService) if not isinstance(configurator, Configurator): return # Permissions configurator.include('cioservice.security') # Routes configurator.include('cioservice.routes') # Translations configurator.add_translation_dirs(join(dirname(__file__), 'Locale')) # Views static_dir = join(dirname(__file__), 'Static') Initialize(configurator).add_static_views( __package__, ( ('images', join(static_dir, 'Images')), ('css', join(static_dir, 'Css')), ('js', join(static_dir, 'Js')))) configurator.scan('cioservice.views') # Translation configurator.add_translation_dirs(join(dirname(__file__), 'Locale'))
# ============================================================================= # =============================================================================
[docs] class ModuleCioService(Module): """Class for CioService module. :param str config_ini: Absolute path to the configuration file (e.g. development.ini). This module has the following attributes: * ``lock_dir``: absolute path to the directory containing locks for build * ``build_manager``: instance of common :class:`~.lib.BuilManager` * ``locations``: a dictionary of root directories for service locations """ name = _('Service') implements = ('service', ) dependencies = () relaxng = RELAXNG_CIOSERVICE xml2db = (_xml2db, ) db2xml = (_db2xml, ) _DBModule = DBModule # ------------------------------------------------------------------------- def __init__(self, config_ini: str): """Constructor method.""" super(ModuleCioService, self).__init__(config_ini) # Lock directory settings = self._settings(config_ini) self.lock_dir = normpath( settings.get('locks') or join(gettempdir(), 'Chrysalio')) # Build manager self.build_manager = BuildManager( int(settings.get('build.concurrency', DEFAULT_CONCURRENCY))) # Locations locations = settings_get_list(settings, 'locations') self.locations = {} for location in locations: key, sep, path = location.partition(':') if sep != ':': sys_exit(translate(_( # yapf: disable '*** ${n}: "locations" must be a list of ID:PATH.', {'n': self.uid}))) self.locations[key] = normpath(path) # Navigation self._nav_entry = NavJobs() # -------------------------------------------------------------------------
[docs] def populate( self, args: Namespace, registry: Registry, dbsession: Session): """Method called by populate script to complete the operation. See: :meth:`chrysalio.modules.Module.populate` """ # pylint: disable = too-many-boolean-expressions if '--remove-locks' not in args.extra \ and (not hasattr(args, 'remove_locks') or not args.remove_locks) \ and '--remove-builds' not in args.extra \ and (not hasattr(args, 'remove_builds') or not args.remove_builds): return lock_dir = registry['modules']['cioservice'].lock_dir \ if 'modules' in registry and 'cioservice' in registry['modules'] \ else None if lock_dir is not None and exists(lock_dir): LOG.info(translate(_('====== Removal of the locks directory'))) rmtree(lock_dir)
# -------------------------------------------------------------------------
[docs] def activate(self, registry: Registry, dbsession: Session): """Method to activate the module. See: :meth:`chrysalio.modules.Module.activate` """ # Navigation if 'navigation' in registry and 'main' in registry['navigation'] \ and self._nav_entry not in registry['navigation']['main']: registry['navigation']['main'].insert(1, self._nav_entry) # Menu (DEPRECATED) if 'menu' in registry and MENU_CIOSERVICE not in registry['menu']: registry['menu'].insert(1, MENU_CIOSERVICE) # Build manager self.build_manager.set_registry(registry)
# -------------------------------------------------------------------------
[docs] def deactivate(self, registry, dbsession): """Method to deactivate the module. See: :meth:`chrysalio.modules.Module.deactivate` """ # Navigation if 'navigation' in registry and 'main' in registry['navigation'] \ and self._nav_entry in registry['navigation']['main']: registry['navigation']['main'].remove(self._nav_entry) # Menu (DEPRECATED) if 'menu' in registry and MENU_CIOSERVICE in registry['menu']: registry['menu'].remove(MENU_CIOSERVICE)