Source code for publiforge.views.indexer

# $Id$
# pylint: disable = C0322
"""Indexer view callables."""

from cPickle import loads, dumps
from colander import Mapping, SchemaNode, String, Integer
from colander import All, Length, Regex, OneOf
from sqlalchemy import desc

from pyramid.view import view_config
from pyramid.httpexceptions import HTTPFound, HTTPNotFound, HTTPForbidden

from ..lib.i18n import _
from ..lib.log import log_activity
from ..lib.config import settings_get_list
from ..lib.utils import has_permission, make_id, normalize_spaces
from ..lib.xml import upload_configuration, export_configuration
from ..lib.viewutils import get_action
from ..lib.form import Form
from ..lib.widget import TabSet
from ..lib.paging import PAGE_SIZES, Paging
from ..models import ID_LEN, VALUE_LEN, PATTERN_LEN, DBSession, close_dbsession
from ..models.indexers import INDEX_VALUE_TYPES, DISPLAYS, EXTRACTOR_TYPES
from ..models.indexers import Indexer, IndexerExtractor, IndexerValue


INDEXER_SETTINGS_TABS = (
    _('Description'), _('Extractors'), _('Closed list of values'))


# =============================================================================
[docs]class IndexerView(object): """Class to manage indexers.""" # ------------------------------------------------------------------------- def __init__(self, request): """Constructor method.""" request.add_finished_callback(close_dbsession) self._request = request # -------------------------------------------------------------------------
[docs] @view_config( route_name='indexer_admin', renderer='../Templates/idx_admin.pt', permission='idx.update') def admin(self): """List indexers for administration purpose.""" action, items = get_action(self._request) if action == 'imp!': upload_configuration(self._request, 'idx_manager', 'indexer') if 'search' in self._request.session: del self._request.session['search'] self._request.registry['handler'].delete_index() log_activity(self._request, 'indexer_import') action = '' elif action[0:4] == 'del!': self._delete_indexers(items) action = '' elif action[0:4] == 'exp!': # pylint: disable = redefined-variable-type action = self._export_indexers(items) if action: return action paging, defaults = self._paging_indexers() lang = self._request.session['lang'] default_lang = self._request.registry.settings.get( 'pyramid.default_locale_name', 'en') labels = dict([ (k.indexer_id, k.label(lang, default_lang)) for k in paging]) form = Form(self._request, defaults=defaults) depth = (self._request.breadcrumbs.current_path() == self._request.route_path('site_admin') and 3) or 2 self._request.breadcrumbs.add(_('Indexing administration'), depth) return { 'form': form, 'paging': paging, 'action': action, 'labels': labels, 'i_editor': has_permission(self._request, 'idx_editor'), 'i_manager': has_permission(self._request, 'idx_manager'), 'INDEX_VALUE_TYPES': INDEX_VALUE_TYPES, 'PAGE_SIZES': PAGE_SIZES}
# -------------------------------------------------------------------------
[docs] @view_config( route_name='indexer_view', renderer='../Templates/idx_view.pt', permission='idx.read') def view(self): """Show indexer settings.""" indexer_id = self._request.matchdict.get('indexer_id') action = get_action(self._request)[0] if action == 'exp!': return self._export_indexers((indexer_id,)) indexer = DBSession.query(Indexer).filter_by( indexer_id=indexer_id).first() if indexer is None: raise HTTPNotFound() labels = loads(str(indexer.labels)) label = indexer.label( self._request.session['lang'], self._request.registry.settings.get( 'pyramid.default_locale_name', 'en')) tab_set = TabSet(self._request, INDEXER_SETTINGS_TABS) self._request.breadcrumbs.add( _('Indexer settings'), replace=self._request.route_path( 'indexer_edit', indexer_id=indexer.indexer_id)) return { 'tab_set': tab_set, 'indexer': indexer, 'labels': labels, 'label': label, 'INDEX_VALUE_TYPES': INDEX_VALUE_TYPES, 'EXTRACTOR_TYPES': EXTRACTOR_TYPES, 'i_editor': has_permission(self._request, 'idx_editor')}
# -------------------------------------------------------------------------
[docs] @view_config( route_name='indexer_create', renderer='../Templates/idx_edit.pt', permission='idx.create') def create(self): """Create an indexer.""" default_lang = self._request.registry.settings.get( 'pyramid.default_locale_name', 'en') labels = {default_lang: None} if self._request.session['lang'] != default_lang: labels[self._request.session['lang']] = None form, tab_set = self._settings_form(default_lang, labels) if form.validate(): dbindexer = self._create(labels, form.values) if dbindexer is not None: if 'search' in self._request.session: del self._request.session['search'] self._request.registry['handler'].delete_index() self._request.breadcrumbs.pop() log_activity( self._request, 'indexer_create', dbindexer.indexer_id) return HTTPFound(self._request.route_path( 'indexer_edit', indexer_id=dbindexer.indexer_id)) if form.has_error(): self._request.session.flash(_('Correct errors.'), 'alert') self._request.breadcrumbs.add(_('Indexer creation')) return { 'form': form, 'tab_set': tab_set, 'indexer': None, 'labels': labels, 'default_lang': default_lang, 'INDEX_VALUE_TYPES': INDEX_VALUE_TYPES, 'DISPLAYS': DISPLAYS}
# -------------------------------------------------------------------------
[docs] @view_config( route_name='indexer_edit', renderer='../Templates/idx_edit.pt', permission='idx.update') def edit(self): """Edit indexer settings.""" # Action indexer_id = self._request.matchdict.get('indexer_id') action = get_action(self._request)[0] if action[0:4] == 'del!': DBSession.query(IndexerExtractor).filter_by( indexer_id=indexer_id, extractor_id=int(action[4:]))\ .delete() DBSession.commit() if 'search' in self._request.session: del self._request.session['search'] self._request.registry['handler'].delete_index() elif action[0:4] == 'dvl!': DBSession.query(IndexerValue).filter_by( indexer_id=indexer_id, value_id=int(action[4:])).delete() DBSession.commit() if 'search' in self._request.session: del self._request.session['search'] self._request.registry['handler'].delete_index() # Environment dbindexer = DBSession.query(Indexer).filter_by( indexer_id=indexer_id).first() if dbindexer is None: raise HTTPNotFound() default_lang = self._request.registry.settings.get( 'pyramid.default_locale_name', 'en') labels = loads(str(dbindexer.labels)) for lang in settings_get_list( self._request.registry.settings, 'languages'): if lang not in labels: labels[lang] = None label = dbindexer.label(self._request.session['lang'], default_lang) form, tab_set = self._settings_form(default_lang, labels, dbindexer) # Save view_path = self._request.route_path( 'indexer_view', indexer_id=dbindexer.indexer_id) if action == 'sav!' and form.validate(dbindexer) \ and self._save(labels, dbindexer, form.values): if 'search' in self._request.session: del self._request.session['search'] self._request.registry['handler'].delete_index() log_activity( self._request, 'indexer_edit', dbindexer.indexer_id) return HTTPFound(view_path) if form.has_error(): self._request.session.flash(_('Correct errors.'), 'alert') # Breadcrumbs self._request.breadcrumbs.add(_('Indexer settings'), replace=view_path) return { 'form': form, 'action': action, 'tab_set': tab_set, 'indexer': dbindexer, 'labels': labels, 'label': label, 'default_lang': default_lang, 'INDEX_VALUE_TYPES': INDEX_VALUE_TYPES, 'DISPLAYS': DISPLAYS, 'EXTRACTOR_TYPES': EXTRACTOR_TYPES}
# ------------------------------------------------------------------------- def _paging_indexers(self): """Return a :class:`~.widget.Paging` object filled with indexers. :return: (tuple) A tuple such as ``(paging, filters)`` where ``paging`` is a :class:`~.widget.Paging` object and ``filters`` a dictionary of filters. """ # Parameters params = Paging.params(self._request, 'indexers', '+indexer_id') # Query query = DBSession.query(Indexer) if 'f_id' in params: query = query.filter( Indexer.indexer_id.like('%%%s%%' % params['f_id'].lower())) # Order by oby = getattr(Indexer, params['sort'][1:]) query = query.order_by(desc(oby) if params['sort'][0] == '-' else oby) return Paging(self._request, 'indexers', query), params # ------------------------------------------------------------------------- def _delete_indexers(self, indexer_ids): """Delete indexers. :param list user_ids: List of indexers to delete. """ if not has_permission(self._request, 'idx_manager'): raise HTTPForbidden() # Delete deleted = [] for dbindexer in DBSession.query(Indexer).filter( Indexer.indexer_id.in_(indexer_ids)): deleted.append(dbindexer.indexer_id) DBSession.delete(dbindexer) if not deleted: return DBSession.commit() if 'search' in self._request.session: del self._request.session['search'] self._request.registry['handler'].delete_index() log_activity(self._request, 'indexer_delete', u' '.join(deleted)) # ------------------------------------------------------------------------- def _export_indexers(self, indexer_ids): """Export indexers. :param list user_ids: List of user IDs to export. :rtype: :class:`pyramid.response.Response` """ elements = [] exported = [] for dbindexer in DBSession.query(Indexer)\ .filter(Indexer.indexer_id.in_(indexer_ids))\ .order_by('indexer_id'): exported.append(dbindexer.indexer_id) elements.append(dbindexer.xml()) name = '%s_indexers.pfidx' % self._request.registry.settings.get( 'skin.name', 'publiforge') log_activity( self._request, 'indexer_export', u' '.join(exported)) return export_configuration(elements, name) # ------------------------------------------------------------------------- def _settings_form(self, default_lang, labels, indexer=None): """Return a indexer settings form. :param str default_lang: Default language. :param dict labels: Label in several languages. :type indexer: :class:`~..models.indexers.Indexer` :param indexer: (optional) Current indexer object. :rtype: tuple :return: A tuple such as ``(form, tab_set)``. """ # Schema defaults = {} schema = SchemaNode(Mapping()) if indexer is None: schema.add(SchemaNode( String(), name='indexer_id', validator=All( Regex(r'^[a-z_][a-z0-9-_]+$'), Length(max=ID_LEN)))) for lang in labels: if lang == default_lang: schema.add(SchemaNode(String(), name='label_%s' % lang)) else: schema.add(SchemaNode( String(), name='label_%s' % lang, missing=None)) if labels[lang]: defaults['label_%s' % lang] = labels[lang].decode('utf8') schema.add(SchemaNode( String(), name='value_type', validator=OneOf(INDEX_VALUE_TYPES.keys()))) schema.add(SchemaNode( String(), name='display', validator=OneOf(DISPLAYS.keys()), missing=None)) schema.add(SchemaNode(Integer(), name='result_column', missing=None)) # Extractors if indexer is not None: for extractor in indexer.extractors: schema.add(SchemaNode( String(), name='ex_%d_files' % extractor.extractor_id, validator=Length(max=PATTERN_LEN))) schema.add(SchemaNode( String(), name='ex_%d_type' % extractor.extractor_id, validator=OneOf(EXTRACTOR_TYPES.keys()))) schema.add(SchemaNode( String(), name='ex_%d_param' % extractor.extractor_id, validator=Length(max=VALUE_LEN))) schema.add(SchemaNode( Integer(), name='ex_%d_limit' % extractor.extractor_id, missing=None)) defaults['ex_%d_files' % extractor.extractor_id] = \ extractor.indexed_files defaults['ex_%d_type' % extractor.extractor_id] = \ extractor.extractor_type defaults['ex_%d_param' % extractor.extractor_id] = \ extractor.parameter defaults['ex_%d_limit' % extractor.extractor_id] = \ extractor.limit if self._request.params.get('ex_type'): schema.add(SchemaNode( String(), name='ex_files', validator=Length(max=PATTERN_LEN))) schema.add(SchemaNode( String(), name='ex_type', validator=OneOf(EXTRACTOR_TYPES.keys()))) schema.add(SchemaNode( String(), name='ex_param', validator=Length(max=VALUE_LEN))) schema.add(SchemaNode( Integer(), name='ex_limit', missing=None)) # Values if indexer is not None and \ indexer.value_type in ('select', 'filetype'): for value in indexer.values: schema.add(SchemaNode( String(), name='val_%d_label' % value.value_id, validator=Length(max=VALUE_LEN))) schema.add(SchemaNode( String(), name='val_%d_value' % value.value_id, validator=Length(max=VALUE_LEN), missing=None)) defaults['val_%d_label' % value.value_id] = value.label defaults['val_%d_value' % value.value_id] = value.value if self._request.params.get('val_label'): schema.add(SchemaNode( String(), name='val_label', validator=Length(max=VALUE_LEN))) schema.add(SchemaNode( String(), name='val_value', validator=Length(max=VALUE_LEN), missing=None)) return ( Form(self._request, schema=schema, defaults=defaults, obj=indexer), TabSet(self._request, INDEXER_SETTINGS_TABS)) # ------------------------------------------------------------------------- def _create(self, labels, values): """Create an indexer. :param labels: (dictionary) Label in several languages. :param values: (dictionary) Form values. :return: (:class:`~..models.indexers.Indexer` instance or ``None``) """ # Check existence indexer_id = make_id(values['indexer_id'], 'token', ID_LEN) indexer = DBSession.query(Indexer).filter_by( indexer_id=indexer_id).first() if indexer is not None: self._request.session.flash( _('This indexer already exists.'), 'alert') return # Create indexer indexer = Indexer( indexer_id, self._label_dict(labels, values), values['value_type'], values['display'], values['result_column']) DBSession.add(indexer) DBSession.commit() return indexer # ------------------------------------------------------------------------- def _save(self, labels, indexer, values): """Save an indexer. :param labels: (dictionary) Label in several languages. :param indexer: (:class:`~..models.indexers.Indexer` instance) Indexer to save. :param values: (dictionary) Form values. :return: (boolean) """ indexer.labels = dumps(self._label_dict(labels, values)) if values['value_type'] in ('filename', 'path'): DBSession.commit() return True # Extractors for extractor in indexer.extractors: extractor.indexed_files = \ values['ex_%d_files' % extractor.extractor_id] extractor.extractor_type = \ values['ex_%d_type' % extractor.extractor_id] extractor.parameter = \ values['ex_%d_param' % extractor.extractor_id] extractor.limit = \ values['ex_%d_limit' % extractor.extractor_id] if values.get('ex_type'): indexer.extractors.append(IndexerExtractor( values['ex_files'], values['ex_type'], values['ex_param'], values['ex_limit'])) # Values for value in indexer.values: value.label = \ normalize_spaces(values['val_%d_label' % value.value_id]) value.value = values['val_%d_value' % value.value_id] and \ normalize_spaces(values['val_%d_value' % value.value_id]) \ or value.label if values.get('val_label'): indexer.values.append(IndexerValue( values['val_label'], values['val_value'])) DBSession.commit() return True # ------------------------------------------------------------------------- @classmethod def _label_dict(cls, langs, values): """Return a label dictionary with language as key. :param langs: (list) Languages to select. :param values: (dictionary) Form values. :return: (dictionay) """ labels = {} for lang in langs: if values.get('label_%s' % lang): labels[lang] = values['label_%s' % lang].encode('utf8') return labels