Source code for chrysalio.scripts

"""Console scripts."""

from sys import exit as sys_exit
from logging import getLogger
from os.path import exists, abspath, dirname
from argparse import ArgumentParser
from collections import OrderedDict
from configparser import ConfigParser
from importlib import import_module

from sqlalchemy.exc import OperationalError
from plaster.exceptions import InvalidURI

from pyramid.paster import get_appsettings

from ..security import PRINCIPALS
from ..lib.i18n import _, translate
from ..lib.config import config_get, settings_get_list
from ..lib.log import setup_logging
from ..lib.utils import tounicode
from ..modules import Module
from ..models import DB_METADATA
from ..models import get_dbengine, get_dbsession_factory


LOG = getLogger(__name__)


# =============================================================================
[docs]class ScriptRegistry(dict): """Class to simulate Application Registry.""" def __init__(self, settings): """Constructor method.""" super().__init__() self.settings = settings
# =============================================================================
[docs]class Script(object): """Base class for a script which reads the INI file and interacts with the database. :type args: argparse.Namespace :param args: Command line arguments. :param bool create_all: If ``True``, it creates all the tables, otherwise, it checks if the table ``users`` exists with at least one user. :param list includes: (optional) List of hard coding `includes`. :type dbsession_factory: sqlalchemy.orm.session.sessionmaker :param dbsession_factory: (optional) Function to create session. """ # ------------------------------------------------------------------------- def __init__( self, args, create_all, includes=None, dbsession_factory=None): """Constructor method.""" self._args = args self.registry = None try: settings = get_appsettings(args.conf_uri, options=args.options) except (LookupError, IOError, InvalidURI) as error: LOG.critical(error) return config_file = abspath(args.conf_uri.partition('#')[0]) self._config = ConfigParser({'here': dirname(config_file)}) self._config.read(tounicode(config_file), encoding='utf8') self.registry = ScriptRegistry(settings) self.registry.settings['__file__'] = config_file self.registry['principals'] = list(PRINCIPALS) self.registry['modules'] = OrderedDict() self._load_includes(includes) if not self._initialize_database(create_all, dbsession_factory): self.registry = None return # -------------------------------------------------------------------------
[docs] @classmethod def argument_parser(cls, description): """Create an argument parser object to parse command line arguments. :param str description: Description of the script. :rtype: argparse.ArgumentParser """ parser = ArgumentParser(description=description) parser.add_argument( 'conf_uri', help='URI of configuration (e.g. production.ini#foo)') parser.add_argument( '--options', dest='options', help='optional configuration options') parser.add_argument('--lang', dest='lang', help='user language') parser.add_argument( '--log-level', dest='log_level', help='log level', default='INFO', choices=('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL')) parser.add_argument('--log-file', dest='log_file', help='log file') return parser
# -------------------------------------------------------------------------
[docs] @classmethod def arguments(cls, parser, args=None, filemode='w'): """Retrieve arguments from parser, check them and setup logging. :type parser: argparse.ArgumentParser :param parser: Configuration parser object. :param list args: (optional) Command line arguments (for testing). :param str filemode: File mode for logging (``'w'`` or ``'a'``). :rtype: argparse.Namespace """ args = parser.parse_args(args) if not exists(args.conf_uri.partition('#')[0]): parser.print_usage() return None setup_logging(args.log_level, args.log_file, filemode=filemode) return args
# ------------------------------------------------------------------------- def _load_includes(self, includes=None): """Load needed modules with an `includeme()` function and fill ``self.registry`` dictionary. :param list includes: (optional) List of hard coding `includes`. """ # Read include list if includes is None: includes = [] includes += [k for k in settings_get_list( self.registry.settings, 'chrysalio.includes') if k] # Load modules for include in includes: try: module = import_module(include) except ImportError as error: sys_exit(self._translate( _('*** Unable to load module "${m}" [${e}]', { 'm': include, 'e': error}))) try: module.includeme(self.registry) except AttributeError as error: sys_exit(self._translate(_( '*** Module "${m}" error: ${e}', {'m': include, 'e': error}))) # Check conflicts implementations, err = Module.check_conflicts( includes, self.registry['modules']) if err is not None: sys_exit(self._translate(err)) # Check dependencies for module_id in self.registry['modules']: err = self.registry['modules'][module_id].check_dependencies( implementations) if err is not None: sys_exit(self._translate(err)) # ------------------------------------------------------------------------- def _initialize_database(self, create_all, dbsession_factory=None): """Database initialization. :param bool create_all: If ``True``, it creates all the tables, otherwise, it checks if the table ``users`` exists with at least one user. :type dbsession_factory: sqlalchemy.orm.session.sessionmaker :param dbsession_factory: (optional) Function to create session. :rtype: bool """ # Get database session factory self.registry['dbsession_factory'] = dbsession_factory if self.registry['dbsession_factory'] is None: try: dbengine = get_dbengine(self.registry.settings) except KeyError: LOG.critical(self._translate(_('Database is not defined.'))) return False DB_METADATA.bind = dbengine self.registry['dbsession_factory'] = get_dbsession_factory( dbengine) # Possibly, drop any existing tables if hasattr(self._args, 'drop_tables') and ( self._args.drop_tables or config_get(self._config, 'Populate', 'drop_tables') == 'true'): LOG.info(self._translate(_('====== Dropping existing tables'))) tables = set(DB_METADATA.tables) try: DB_METADATA.reflect() DB_METADATA.drop_all() except UnicodeEncodeError: # pragma: nocover LOG.error(self._translate(_('Unknown database!'))) return False except OperationalError as error: # pragma: nocover LOG.error(error.args[0]) return False for table in set(DB_METADATA.tables) - tables: # pragma: nocover DB_METADATA.remove(DB_METADATA.tables[table]) if create_all: # Create the tables if they don't already exist try: DB_METADATA.create_all() except OperationalError as error: # pragma: nocover LOG.error(error.args[0]) return False return True # ------------------------------------------------------------------------- def _translate(self, text): """Return ``text`` translated. :param str text: Text to translate. :rtype: str """ return translate(text, self._args.lang)