Source code for publiforge.scripts.pfpopulate

#!/usr/bin/env python
"""Populate database and storages."""

from logging import getLogger
from argparse import ArgumentParser
from os import listdir
from os.path import exists, join, abspath, dirname
from ConfigParser import ConfigParser
from locale import getdefaultlocale
from shutil import rmtree
from beaker.cache import CacheManager
from beaker.util import parse_cache_config_options
from sqlalchemy import engine_from_config
from sqlalchemy.exc import OperationalError

from pyramid.paster import get_appsettings

from ..lib.i18n import _, localizer
from ..lib.utils import normalize_name
from ..lib.config import config_get, config_get_namespace
from ..lib.log import setup_logging
from ..lib.xml import import_configuration
from ..lib.handler import HandlerManager
from ..models import ID_LEN, DBSession, Base
from ..models.users import PAGE_SIZE, User, UserPerm, UserIP
from ..models.storages import Storage


LOG = getLogger(__name__)
DEFAULT_SETTINGS_NAME = 'PubliForge'


# =============================================================================
[docs]def main(): """Main function.""" # Parse arguments parser = ArgumentParser(description='Populate database and storages.') parser.add_argument( 'conf_uri', help='URI of configuration (e.g. production.ini#foo)') parser.add_argument( 'files', nargs='*', help='optional XML configuration files to use') parser.add_argument( '--drop-tables', dest='drop_tables', help='drop existing tables', action='store_true') parser.add_argument( '--no-pull', dest='no_pull', help='do not synchronize storages', action='store_true') parser.add_argument( '--reset-index', dest='reset_index', help='delete storage indexes', action='store_true') parser.add_argument( '--no-index', dest='no_index', help='do not index storages', action='store_true') parser.add_argument( '--log-level', dest='log_level', help='log level', default='INFO', choices=('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL')) args = parser.parse_args() if not exists(args.conf_uri.partition('#')[0]): parser.print_usage() return setup_logging(args.log_level) # Populate database and storage Populate(args, args.conf_uri).all(args.files)
# =============================================================================
[docs]class Populate(object): """Class to populate database and storages.""" # ------------------------------------------------------------------------- def __init__(self, args, conf_uri): """Constructor method.""" self._args = args self._settings = get_appsettings(conf_uri) if len(self._settings) < 3: try: self._settings = get_appsettings( conf_uri, DEFAULT_SETTINGS_NAME) except LookupError: self._settings = None return conf_file = abspath(args.conf_uri.partition('#')[0]) self._config = ConfigParser({'here': dirname(conf_file)}) self._config.read(conf_file) if not self._initialize_sql(): self._settings = None return # -------------------------------------------------------------------------
[docs] def all(self, files=tuple()): """Check settings, initialize database and create storages. :param files: (list, optional) List of files on command-line. :return: (boolean) """ if self._settings is None: return False # Check general settings if not self._settings.get('uid'): LOG.critical(self._translate(_( 'Must define an unique identifier for this instance.'))) return False # Agent only if self._settings.get('storage.root') is None: return True # Clean up path = self._settings.get('opener.cache') if path and exists(path): rmtree(path) path = self._settings.get('temporary_dir') if path and exists(path): rmtree(path) # Populate self._populate_admin() self._populate_from_xml(files) if DBSession.query(UserPerm)\ .filter_by(scope='all', level='admin').first() is None: LOG.error(self._translate(_('You must define an administrator.'))) DBSession.remove() return False self._update_storages() DBSession.remove() return True
# ------------------------------------------------------------------------- def _initialize_sql(self): """Database initialization. :return: (boolean) ``True`` if it succeeds. """ # Initialize SqlAlchemy session try: dbengine = engine_from_config(self._settings, 'sqlalchemy.') except KeyError: LOG.critical(self._translate(_('Database is not defined.'))) return False DBSession.configure(bind=dbengine) Base.metadata.bind = dbengine # Possibly, drop any existing tables if self._args.drop_tables or \ self._settings.get('drop_tables') == 'true' or \ config_get(self._config, 'Populate', 'drop_tables') == 'true': LOG.info(self._translate(_('###### Dropping existing tables'))) try: Base.metadata.drop_all() except OperationalError as error: LOG.error(error.args[0]) return False # Create the tables if they don't already exist try: Base.metadata.create_all(dbengine) except OperationalError as error: LOG.error(error.args[0]) return False return True # ------------------------------------------------------------------------- def _populate_admin(self): """Populate database with an administrator.""" if not self._config.has_option('Populate', 'admin.login'): return LOG.info(self._translate(_('###### Adding administrator'))) login = normalize_name(self._get('admin.login'))[0:ID_LEN] ips = self._get('admin.ips', '') record = { 'status': 'active', 'password': self._get('admin.password'), 'name': self._get('admin.name'), 'email': self._get('admin.email'), 'lang': self._get( 'admin.language', self._settings.get('pyramid.default_locale_name', 'en')), 'restrict_ip': bool(ips), 'home': self._get('admin.home', 'site'), 'page_size': int(self._get('admin.page_size', PAGE_SIZE))} if not record['password'] or not record['name']\ or not record['email']: exit('*** Incorrect administrator definition.') user = DBSession.query(User.login).filter_by(login=login).first() if user is not None: return user = User(self._settings, login, **record) DBSession.add(user) DBSession.commit() user.perms.append(UserPerm(user.user_id, 'all', 'admin')) for my_ip in ips.split(): user.ips.append(UserIP(user.user_id, my_ip)) DBSession.commit() # ------------------------------------------------------------------------- def _populate_from_xml(self, extra_files): """Populate database with XML content. :param list extra_files: List of files on command-line. """ # List of files to load files = config_get_namespace(self._config, 'Populate', 'file') files = [files[k] for k in sorted(files.keys())] + list(extra_files) if not files: return # Load files LOG.info(self._translate(_('###### Loading configurations'))) done = set() for filename in files: filename = abspath(filename) if filename not in done: LOG.info(filename) errors = import_configuration( self._settings, filename, error_if_exists=False) for error in errors: LOG.error(self._translate(error)) done.add(filename) # ------------------------------------------------------------------------- def _update_storages(self): """Update all storages.""" # pylint: disable = logging-format-interpolation cache_manager = CacheManager( **parse_cache_config_options(self._settings)) handler_manager = HandlerManager(self._settings, cache_manager) if self._args.reset_index and 'storage.index' in self._settings \ and exists(self._settings['storage.index']): LOG.info(self._translate(_('###### Reseting indexes'))) handler_manager.delete_index(False) LOG.info(self._translate(_('###### Updating storages'))) for storage in DBSession.query(Storage): storage_dir = join( self._settings['storage.root'], storage.storage_id) handler = handler_manager.get_handler(storage.storage_id, storage) if handler is None: LOG.error(self._translate( _('${e}: unkwown engine', {'e': storage.vcs_engine}))) elif not exists(storage_dir) or not listdir(storage_dir): LOG.info('{0:.<32}'.format(storage.storage_id)) error = handler.clone() if error: LOG.error(error) elif not self._args.no_index: handler_manager.index(storage.storage_id) elif not hasattr(self._args, 'no_pull') \ or not self._args.no_pull: LOG.info('{0:.<32}'.format(storage.storage_id)) handler.synchronize(None, True) if not self._args.no_index: handler_manager.index(storage.storage_id) # ------------------------------------------------------------------------- def _get(self, option, default=None): """Retrieve a value from section [Populate]. :param option: (string) Option name. :param default: (string, optional) Default value :return: (string) Read value or default value. """ return config_get(self._config, 'Populate', option, default) # ------------------------------------------------------------------------- @classmethod def _translate(cls, text): """Return ``text`` translated. :param text: (string) Text to translate. """ return localizer(getdefaultlocale()[0]).translate(text)
# ============================================================================= if __name__ == '__main__': main()