Source code for publiforge.views.storage

# pylint: disable = C0322
"""Storage view callables."""

from os.path import join, exists, isdir, dirname, basename, normpath
from shutil import copy2
from webhelpers2.html import HTML, literal
from sqlalchemy import select, desc
from colander import Mapping, SchemaNode, String, Integer
from colander import All, Length, Regex, OneOf

from pyramid.view import view_config
from pyramid.httpexceptions import HTTPFound, HTTPNotFound, HTTPForbidden

from .selection import Selection
from ..lib.i18n import _
from ..lib.log import log_activity
from ..lib.config import settings_get_list
from ..lib.utils import MIME_TYPES, NORMALIZE_FILE_MODE, has_permission
from ..lib.utils import age, normalize_spaces, copy_content
from ..lib.xml import upload_configuration, export_configuration
from ..lib.viewutils import vcs_user, save_vcs_user, paging_users
from ..lib.viewutils import paging_groups, get_action, file_download
from ..lib.viewutils import dbquery_storages, current_storage, get_selection
from ..lib.packutils import create_pack
from ..lib.form import Form
from ..lib.tabset import TabSet
from ..lib.paging import PAGE_SIZES, Paging, sortable_column
from ..models import ID_LEN, LABEL_LEN, DESCRIPTION_LEN, PATH_LEN
from ..models import DBSession, close_dbsession
from ..models.users import User, UserFile
from ..models.groups import GROUP_USER, Group
from ..models.storages import STORAGE_ACCESS, VCS_ENGINES, STORAGE_PERMS
from ..models.storages import Storage, StorageOpener, StorageUser, StorageGroup


STORAGE_SETTINGS_TABS = (
    _('Description'), _('Openers'), _('Authorized members'),
    _('Authorized groups'))
REFRESH = {
    900: _('every 15 minutes'), 1800: _('every 30 minutes'),
    3600: _('every hour'), 7200: _('every 2 hours'),
    14400: _('every 4 hours'), 28800: _('every 8 hours'),
    86400: _('every day')}
INDEXED = {
    r'\.(txt|ini|rst|xml)$': _('Texts'),
    r'\.(txt|ini|rst|xml|png|jpg|jpeg|tif|tiff|gif|svg)$': _('Texts, images'),
    r'\.xml$': _('XML files'),
    r'\.(xml|png)$': _('XML files, PNG images'),
    r'\.(xml|png|jpg|jpeg|tif|tiff|gif|svg)$': _('XML files, images'),
    r'\.(png|jpg|jpeg|tif|tiff|gif|svg)$': _('Images'),
    r'\.(wav|ogg)$': _('Audios'),
    r'\.(mp4|avi)$': _('Videos'),
    r'\.(png|jpg|jpeg|tif|tiff|gif|svg|wav|ogg|mpg|mp4|avi)$':
        _('Images, audios, videos'),
    r'\.(txt|ini|rst|xml|png|jpg|jpeg|tif|tiff|gif|svg|wav|ogg|mpg|mp4|avi)$':
        _('Texts, images, audios, videos')}


# =============================================================================
[docs]class StorageView(object): """Class to manage storages.""" # ------------------------------------------------------------------------- def __init__(self, request): """Constructor method.""" request.add_finished_callback(close_dbsession) self._request = request self._vcs_engines = dict([ (k, VCS_ENGINES[k]) for k in settings_get_list(request.registry.settings, 'storage.vcs')]) # -------------------------------------------------------------------------
[docs] @view_config( route_name='storage_admin', renderer='../Templates/stg_admin.pt', permission='stg.update') def admin(self): """List storages for administration purpose.""" action, items = get_action(self._request) if action == 'imp!': upload_configuration(self._request, 'stg_manager', 'storage') log_activity(self._request, 'storage_import') action = '' del self._request.session['menu'] elif action[0:4] == 'del!': if not has_permission(self._request, 'stg_manager'): raise HTTPForbidden() storage_root = self._request.registry.settings['storage.root'] for storage_id in items: Storage.delete(storage_root, storage_id) self._request.registry['handler'].remove_handler(storage_id) log_activity(self._request, 'storage_delete', storage_id) action = '' self._request.registry['handler'].delete_index() del self._request.session['menu'] elif action[0:4] == 'exp!': return self._export_storages(items) paging, defaults = self._paging_storages() form = Form(self._request, defaults=defaults) groups = [(k.group_id, k.label) for k in DBSession.query(Group).join(StorageGroup)] depth = 3 if self._request.breadcrumbs.current_path() == \ self._request.route_path('site_admin') else 2 self._request.breadcrumbs.add(_('Storage administration'), depth) return { 'form': form, 'paging': paging, 'action': action, 'groups': groups, 'STORAGE_ACCESS': STORAGE_ACCESS, 'PAGE_SIZES': PAGE_SIZES, 'vcs_engines': self._vcs_engines, 'i_editor': has_permission(self._request, 'stg_editor'), 'i_manager': has_permission(self._request, 'stg_manager')}
# -------------------------------------------------------------------------
[docs] @view_config( route_name='storage_index', renderer='../Templates/stg_index.pt', permission='stg.read') @view_config(route_name='storage_index', renderer='json', xhr=True) def index(self): """List authorized storages.""" # Action user_id = self._request.session['user_id'] action, items = get_action(self._request) if action[0:4] == 'exp!': return self._export_storages(items) elif action[0:3] == 'men': stg_user = DBSession.query(StorageUser).filter_by( storage_id=action[4:], user_id=user_id).first() if stg_user is None: stg_user = StorageUser(action[4:], user_id) DBSession.add(stg_user) stg_user.in_menu = action[3] == '+' DBSession.commit() del self._request.session['menu'] # Environment paging, defaults = self._paging_storages(user_id) refresh = int(self._request.registry.settings.get('refresh.long', 5)) # Ajax if self._request.is_xhr: action, prgrss = self._request.registry['handler'].progress( [k.storage_id for k in paging]) return { 'working': action, 'refresh': refresh, 'status': dict([(k, prgrss[k][0]) for k in prgrss])} # Changes changes = {} for storage in paging: handler = self._request.registry['handler']\ .get_handler(storage.storage_id, storage) changes[handler.uid] = list(handler.vcs.last_change()) changes[handler.uid][0] = ( age(changes[handler.uid][0]), changes[handler.uid][0].isoformat(' ').partition('.')[0]) force = (action == 'syn!%s' % storage.storage_id) or \ (action == 'syn!#' and storage.storage_id in items) if handler.synchronize(self._request, force): handler.cache.clear() in_menus = [k[0] for k in DBSession.query(StorageUser.storage_id) .filter_by(user_id=self._request.session['user_id']) .filter_by(in_menu=True).all()] # Work in progress action, prgrss = \ self._request.registry['handler'].progress(changes.keys()) if action and 'ajax' not in self._request.params: self._request.response.headerlist.append(('Refresh', str(refresh))) self._request.breadcrumbs.add( _('All storages'), int(self._request.GET.get('crumbs', 2))) return { 'vcs_engines': self._vcs_engines, 'paging': paging, 'form': Form(self._request, defaults=defaults), 'changes': changes, 'in_menus': in_menus, 'progress': prgrss, 'working': action, 'refresh': refresh, 'PAGE_SIZES': PAGE_SIZES}
# -------------------------------------------------------------------------
[docs] @view_config(route_name='storage_view', renderer='../Templates/stg_view.pt', permission='stg.read') def view(self): """Show storage settings with its users and groups.""" # Environment storage = current_storage(self._request, only_dict=False)[0] tab_set = TabSet(self._request, STORAGE_SETTINGS_TABS) members = DBSession.query( User.user_id, User.login, User.name, StorageUser.perm)\ .join(StorageUser)\ .filter(StorageUser.storage_id == storage.storage_id)\ .order_by(User.name) member_groups = DBSession.query( Group.group_id, Group.label, StorageGroup.perm)\ .filter(StorageGroup.storage_id == storage.storage_id)\ .filter(Group.group_id == StorageGroup.group_id)\ .order_by(Group.label).all() i_editor = has_permission(self._request, 'stg_editor') \ or self._request.session['storage']['perm'] == 'editor' openers = [ k.opener_id for k in sorted(storage.openers, key=lambda k: k.sort)] openers = self._request.registry['opener'].descriptions( self._request, openers) indexed = INDEXED.get(storage.indexed_files, storage.indexed_files) # Action action = get_action(self._request)[0] if action == 'exp!': return self._export_storages(( self._request.matchdict.get('storage_id'),)) elif action == 'rec!': self._vcs_recover(storage) # Breadcrumbs self._request.breadcrumbs.add( _('Storage settings'), replace=self._request.route_path( 'storage_edit', storage_id=storage.storage_id)) return { 'tab_set': tab_set, 'STORAGE_ACCESS': STORAGE_ACCESS, 'REFRESH': REFRESH, 'PERMS': STORAGE_PERMS, 'PROJECT_ENTRIES': None, 'vcs_engines': self._vcs_engines, 'storage': storage, 'indexed': indexed, 'NORMALIZE_FILE_MODE': NORMALIZE_FILE_MODE, 'openers': openers, 'members': members, 'member_groups': member_groups, 'i_editor': i_editor}
# -------------------------------------------------------------------------
[docs] @view_config( route_name='storage_create', renderer='../Templates/stg_edit.pt', permission='stg.create') def create(self): """Create a storage.""" form, tab_set = self._settings_form() if form.validate(): dbstorage = self._create(form.values) if dbstorage is not None: self._request.breadcrumbs.pop() log_activity( self._request, 'storage_create', dbstorage.storage_id) return HTTPFound(self._request.route_path( 'storage_edit', storage_id=dbstorage.storage_id)) if form.has_error(): self._request.session.flash(_('Correct errors.'), 'alert') self._request.breadcrumbs.add(_('Storage creation')) return { 'form': form, 'tab_set': tab_set, 'STORAGE_ACCESS': STORAGE_ACCESS, 'REFRESH': REFRESH, 'PERMS': STORAGE_PERMS, 'vcs_engines': self._vcs_engines, 'INDEXED': INDEXED, 'NORMALIZE_FILE_MODE': NORMALIZE_FILE_MODE, 'PAGE_SIZES': PAGE_SIZES, 'storage': None, 'allopeners': None, 'members': None, 'group_members': None, 'paging': None, 'groups': None}
# -------------------------------------------------------------------------
[docs] @view_config( route_name='storage_edit', renderer='../Templates/stg_edit.pt') def edit(self): """Edit storage settings.""" # Authorization dbstorage = current_storage(self._request, only_dict=False)[0] if not has_permission(self._request, 'stg_editor') \ and self._request.session['storage']['perm'] != 'editor': raise HTTPForbidden() # Action action, allopeners, paging, groups = self._edit_action(dbstorage) # Environment form, tab_set = self._settings_form(dbstorage) members = DBSession.query( User.user_id, User.login, User.name, StorageUser.perm)\ .filter(StorageUser.storage_id == dbstorage.storage_id)\ .filter(User.user_id == StorageUser.user_id)\ .order_by(User.name).all() member_groups = DBSession.query( Group.group_id, Group.label, StorageGroup.perm)\ .filter(StorageGroup.storage_id == dbstorage.storage_id)\ .filter(Group.group_id == StorageGroup.group_id)\ .order_by(Group.label).all() openers = [ k.opener_id for k in sorted(dbstorage.openers, key=lambda k: k.sort)] openers = self._request.registry['opener'].descriptions( self._request, openers) # Save view_path = self._request.route_path( 'storage_view', storage_id=dbstorage.storage_id) if action == 'sav!' and form.validate(dbstorage): dbstorage.set_vcs_password( self._request.registry.settings, form.values.get('password')) for user in dbstorage.users: user.perm = form.values['usr_%d' % user.user_id] for group in dbstorage.groups: group.perm = form.values['grp_%s' % group.group_id] DBSession.commit() self._request.registry['handler'].remove_handler( dbstorage.storage_id) self._request.registry['handler'].delete_index() del self._request.session['menu'] log_activity(self._request, 'storage_edit', dbstorage.storage_id) return HTTPFound(view_path) if form.has_error(): self._request.session.flash(_('Correct errors.'), 'alert') # Breadcrumbs self._request.breadcrumbs.add( _('Storage settings'), replace=view_path) return { 'form': form, 'action': action, 'tab_set': tab_set, 'STORAGE_ACCESS': STORAGE_ACCESS, 'REFRESH': REFRESH, 'PERMS': STORAGE_PERMS, 'PROJECT_ENTRIES': None, 'vcs_engines': self._vcs_engines, 'storage': dbstorage, 'INDEXED': INDEXED, 'NORMALIZE_FILE_MODE': NORMALIZE_FILE_MODE, 'PAGE_SIZES': PAGE_SIZES, 'openers': openers, 'allopeners': allopeners, 'members': members, 'member_groups': member_groups, 'paging': paging, 'groups': groups}
# -------------------------------------------------------------------------
[docs] @view_config( route_name='storage_root', renderer='../Templates/stg_browse.pt', permission='stg.read') @view_config( route_name='storage_browse', renderer='../Templates/stg_browse.pt', permission='stg.read') @view_config(route_name='storage_root', renderer='json', xhr=True) @view_config(route_name='storage_browse', renderer='json', xhr=True) def browse(self): """Browse a storage.""" # Environment storage, handler = current_storage(self._request) storage_id = storage['storage_id'] path = self._request.matchdict.get('path', []) path, real_path, html_path = self._html_path(storage_id, path) # Form, action & working status form, action, items, working = \ self._browse_action(storage, handler, path) if self._request.is_xhr: handler.cache.clear() return None if action[0:4] == 'dnl!': return file_download(self._request, real_path, items) elif action[0:4] == 'sch!': return HTTPFound(self._request.route_path('file_search')) elif action[0:4] == 'npk!' and items and items[0]: return HTTPFound(self._request.route_path( 'pack_edit', project_id=items[0], pack_id=items[1], _anchor='tab0')) # Work in progress refresh = self._request.registry.settings.get('refresh.short', '2') working, progress = self._request\ .registry['handler'].progress((storage_id,), working) if working: self._request.response.headerlist.append(('Refresh', refresh)) if storage_id in progress and progress[storage_id][0] == 'error': self._request.session.flash( str(progress[storage_id][1]).decode('utf8'), 'alert') # Directory content dirs, files = handler.dir_infos( self._request, path, self._request.params.get('sort', '+name'), working, storage) if working: handler.cache.clear() self._request.breadcrumbs.add(_('Storage browsing'), root_chunks=3) return { 'form': form, 'action': action, 'working': working, 'overview': self._request.session['overviews'][storage_id], 'sortable_column': sortable_column, 'storage': storage, 'path': path, 'html_path': html_path, 'dirs': dirs, 'files': files, 'MIME_TYPES': MIME_TYPES, 'vcs_engines': self._vcs_engines, 'literal': literal}
# ------------------------------------------------------------------------- def _paging_storages(self, user_id=None): """Return a :class:`~..lib.widget.Paging` object filled with storages. :param user_id: (integer, optional) Select only storages of user ``user_id``. :return: (tuple) A tuple such as ``(paging, filters)`` where ``paging`` is a :class:`~..lib.widget.Paging` object and ``filters`` a dictionary of filters. """ # Parameters paging_id = 'storages!' if user_id is None else 'storages' params = Paging.params(self._request, paging_id, '+label') if user_id is not None and 'f_access' in params: del params['f_access'] # Query dbquery = DBSession.query(Storage) if user_id is not None: dbquery = dbquery_storages(dbquery, user_id) elif 'f_login' in params: dbquery = dbquery.join(StorageUser).join(User)\ .filter(User.login == params['f_login']) if 'f_id' in params: dbquery = dbquery.filter(Storage.storage_id.ilike( '%%%s%%' % params['f_id'])) if 'f_group' in params: dbquery = dbquery.join(StorageGroup).filter( StorageGroup.group_id == params['f_group']) if 'f_label' in params: dbquery = dbquery.filter( Storage.label.ilike('%%%s%%' % params['f_label'])) if 'f_access' in params and params['f_access'] != '*': dbquery = dbquery.filter(Storage.access == params['f_access']) elif 'f_access' not in params: dbquery = dbquery.filter(Storage.access != 'closed') if 'f_engine' in params: dbquery = dbquery.filter(Storage.vcs_engine == params['f_engine']) # Order by oby = getattr(Storage, params['sort'][1:]) dbquery = dbquery.order_by( desc(oby) if params['sort'][0] == '-' else oby) return Paging(self._request, paging_id, dbquery), params # ------------------------------------------------------------------------- def _export_storages(self, storage_ids): """Export storages. :param storage_ids: (list) List of storage IDs to export. :return: (:class:`pyramid.response.Response` instance) """ i_editor = has_permission(self._request, 'stg_editor') user_id = self._request.session['user_id'] groups = set([k.group_id for k in DBSession.execute( select([GROUP_USER], GROUP_USER.c.user_id == user_id))]) elements = [] exported = [] for dbstorage in DBSession.query(Storage)\ .filter(Storage.storage_id.in_(storage_ids))\ .order_by('label'): if i_editor or dbstorage.access == 'open' or \ user_id in [k.user_id for k in dbstorage.users] or \ (groups and groups & set( [k.group_id for k in dbstorage.groups])): exported.append(dbstorage.storage_id) elements.append(dbstorage.xml()) name = '%s_storages.pfstg' % self._request.registry.settings.get( 'skin.name', 'publiforge') log_activity( self._request, 'storage_export', u' '.join(exported)) return export_configuration(elements, name) # ------------------------------------------------------------------------- def _settings_form(self, storage=None): """Return a storage settings form. :param storage: (:class:`~..models.storages.Storage` instance, optional) Current storage object. :return: (tuple) A tuple such as ``(form, tab_set)``. """ vcs_engine = storage.vcs_engine \ if storage else self._request.params.get('vcs_engine') # Schema schema = SchemaNode(Mapping()) schema.add(SchemaNode( String(), name='access', validator=OneOf(STORAGE_ACCESS.keys()))) if storage is None: schema.add(SchemaNode( String(), name='storage_id', validator=All(Regex(r'^[\w_][\w_.-]+$'), Length(max=ID_LEN)))) schema.add(SchemaNode( String(), name='label', validator=Length(min=2, max=LABEL_LEN))) schema.add(SchemaNode( String(), name='description', validator=Length(max=DESCRIPTION_LEN), missing='')) if storage is None: schema.add(SchemaNode( String(), name='vcs_engine', validator=OneOf(self._vcs_engines.keys()))) if vcs_engine is None or vcs_engine not in ('none', 'local'): schema.add(SchemaNode( String(), name='vcs_url', validator=Length(max=PATH_LEN))) schema.add(SchemaNode( String(), name='vcs_user', validator=Length(max=ID_LEN), missing=None)) schema.add(SchemaNode(String(), name='password', missing=None)) if vcs_engine is None or vcs_engine in ('none', 'local'): schema.add(SchemaNode( String(), name='public_url', validator=Length(max=PATH_LEN), missing=None)) schema.add(SchemaNode(Integer(), name='refresh', missing=3600)) schema.add(SchemaNode(String(), name='indexed_files', missing=None)) schema.add(SchemaNode(String(), name='normalize_mode', missing=None)) if storage is not None: for user in storage.users: schema.add(SchemaNode( String(), name='usr_%d' % user.user_id, validator=OneOf(STORAGE_PERMS.keys()), missing=None)) for group in storage.groups: schema.add(SchemaNode( String(), name='grp_%s' % group.group_id, validator=OneOf(STORAGE_PERMS.keys()))) defaults = {'access': 'open', 'vcs_engine': 'local', 'refresh': 3600} if 'paging' in self._request.session: if 'groups' in self._request.session['paging'][1]: defaults.update(self._request.session['paging'][1]['groups']) if 'users' in self._request.session['paging'][1]: defaults.update(self._request.session['paging'][1]['users']) return ( Form(self._request, schema=schema, defaults=defaults, obj=storage), TabSet(self._request, STORAGE_SETTINGS_TABS)) # ------------------------------------------------------------------------- def _create(self, values): """Create a storage. :param values: (dictionary) Form values. :return: (:class:`~..models.storages.Storage` instance or ``None``) """ # Check existence storage_id = values['storage_id'].strip() storage = DBSession.query(Storage).filter_by( storage_id=storage_id).first() if storage is None: label = normalize_spaces(values['label'], LABEL_LEN) storage = DBSession.query(Storage).filter_by( label=label).first() if storage is not None: self._request.session.flash( _('This storage already exists.'), 'alert') return None # Create storage record = dict( [(k, values[k]) for k in values if hasattr(Storage, k)]) storage = Storage(self._request.registry.settings, **record) storage.set_vcs_password( self._request.registry.settings, values.get('password')) DBSession.add(storage) DBSession.commit() # Clone handler = self._request.registry['handler'].get_handler( storage.storage_id, storage) handler.clone(self._request) return storage # ------------------------------------------------------------------------- @classmethod def _save(cls, storage, values): """Save a storage settings. :param storage: (:class:`~..models.storages.Storage` instance) Storage to update. :param values: (dictionary) Form values. :return: (boolean) ``True`` if succeeds. """ # Update permissions for user in storage.users: user.perm = values['perm_%d' % user.user_id] DBSession.commit() return True # ------------------------------------------------------------------------- def _edit_action(self, storage): """Process actions for edition. :param storage: (:class:`~..models.storages.Storage` instance) Storage object. :return: (tuple) A tuple such as ``(action, allopeners, paging, groups)``. """ paging = groups = allopeners = None action, items = get_action(self._request) if action == 'aop?': allopeners = self._request.registry['opener'].descriptions( self._request) elif action[0:4] == 'aop!': self._add_openers(storage, items) elif action[0:3] in ('mup', 'dwn'): self._swap_openers(action[0:3], action[4:], storage) DBSession.commit() elif action[0:4] == 'rop!': DBSession.query(StorageOpener).filter_by( storage_id=storage.storage_id, opener_id=action[4:]).delete() DBSession.commit() if 'storage' in self._request.session: del self._request.session['storage'] elif action == 'aur?' or \ (not action and self._request.GET.get('paging_id') == 'users'): groups = DBSession.query(Group.group_id, Group.label).all() paging = paging_users(self._request)[0] elif action[0:4] == 'aur!': self._add_members(storage, items) elif action[0:4] == 'rur!': DBSession.query(StorageUser).filter_by( storage_id=storage.storage_id, user_id=int(action[4:]))\ .delete() DBSession.commit() del self._request.session['menu'] elif action == 'agp?' or ( not action and self._request.GET.get('paging_id') == 'groups'): paging = paging_groups(self._request)[0] elif action[0:4] == 'agp!': self._add_member_groups(storage, items) elif action[0:4] == 'rgp!': DBSession.query(StorageUser).filter_by( storage_id=storage.storage_id, perm=None).delete() DBSession.query(StorageGroup).filter_by( storage_id=storage.storage_id, group_id=action[4:]).delete() DBSession.commit() del self._request.session['menu'] return action, allopeners, paging, groups # ------------------------------------------------------------------------- def _add_openers(self, storage, new_ids): """Add selected openers to the storage. :param new_ids: (list): List of openers to add. :param storage: (:class:`~..models.storages.Storage` instance) Current storage object. """ if not has_permission(self._request, 'stg_editor') \ and self._request.session['storage']['perm'] != 'editor': return openers = [k for k in sorted(storage.openers, key=lambda k: k.sort)] opener_ids = [k.opener_id for k in openers] for opener_id in sorted(new_ids, reverse=True): if opener_id not in opener_ids: openers.insert(0, StorageOpener(storage.storage_id, opener_id)) DBSession.add(openers[0]) for num, opener in enumerate(openers): opener.sort = num + 1 DBSession.commit() if 'storage' in self._request.session: del self._request.session['storage'] # ------------------------------------------------------------------------- def _add_members(self, storage, new_ids): """Add selected users to the storage. :param new_ids: (list): List of users to add. :param storage: (:class:`~..models.storages.Storage` instance) Current storage object. """ if not has_permission(self._request, 'stg_editor') \ and self._request.session['storage']['perm'] != 'editor': return user_ids = [k.user_id for k in storage.users] for user_id in new_ids: user_id = int(user_id) if user_id not in user_ids: storage.users.append(StorageUser( storage.storage_id, user_id)) DBSession.commit() del self._request.session['menu'] # ------------------------------------------------------------------------- def _add_member_groups(self, storage, new_ids): """Add selected groups to the storage. :param new_ids: (list): List of groups to add. :param storage_id: (:class:`~..models.storages.Storage` instance) Storage object. """ if not has_permission(self._request, 'stg_editor') \ and self._request.session['storage']['perm'] != 'editor': return group_ids = [k.group_id for k in storage.groups] for group_id in new_ids: if group_id not in group_ids: storage.groups.append(StorageGroup( storage.storage_id, group_id)) DBSession.commit() del self._request.session['menu'] # ------------------------------------------------------------------------- def _html_path(self, storage_id, path): """Return relative path in storage, real path and path in one string with ``<a>`` tags. :param storage_id: (string) Storage ID. :param path: (list) Splitted relative path inside the storage. :return: (tuple) A tuple such as ``(path, real_path, html_path)`` or a :class:`pyramid.httpexceptions.`HTTPNotFound` exception.. """ # Path exists? real_path = join(self._request.registry.settings['storage.root'], storage_id, *path) if not isdir(real_path) and path: real_path = dirname(real_path) path = path[0:-1] if not isdir(real_path): raise HTTPNotFound(comment=_('This directory does not exist!')) # Root path if not path: return '', real_path, HTML.span(storage_id) # Other path html = HTML.a(storage_id, href=self._request.route_path( 'storage_root', storage_id=storage_id)) for index in range(len(path) - 1): html += ' / ' + HTML.a(path[index], href=self._request.route_path( 'storage_browse', storage_id=storage_id, path='/'.join(path[0:index + 1])).decode('utf8')) html += ' / ' + HTML.span(path[-1]) return join('.', *path), real_path, html # ------------------------------------------------------------------------- def _browse_action(self, storage, handler, path): """Return form, current action and working status for browse view. :param storage: (dictionary) Storage dictionary. :param handler: (:class:`~..lib.handler.Handler` instance) Storage Control System. :param path: Relative path in storage. :return: (tuple) A tuple such as ``(form, action, items, working)``. """ # pylint: disable=too-many-branches working = False action, items = get_action(self._request) form = self._vcs_user_form(storage, action) # Authorization if action[0:4] in ('upl!', 'new!', 'dir!', 'ren!', 'del!', 'get!'): if storage['perm'] != 'writer': raise HTTPForbidden( comment=_("You can't modify this storage!")) if not self._request.is_xhr and not form.validate(): action = '%s?%s' % (action[0:3], action[4:]) return form, action, items, working # Action if action[0:4] == 'upl!': upload_files = [ self._request.params.get(k) for k in self._request.params if k.startswith('upload_file')] translate = self._request.localizer.translate working = handler.launch( self._request, handler.upload, (vcs_user(self._request, storage), path, upload_files, items and items[0] or None, form.values.get('message') or translate(_('Uploading from disk')))) elif action[0:4] == 'ren!': translate = self._request.localizer.translate working = handler.launch( self._request, handler.rename, (vcs_user(self._request, storage), path, action[4:], form.values['new_name'], form.values.get('message') or translate(_('Renaming of "${o}"', {'o': action[4:]})))) for user_file in DBSession.query(UserFile).filter_by(path=normpath( join(storage['storage_id'], path, action[4:]))): user_file.path = join( storage['storage_id'], path, form.values['new_name']) DBSession.commit() Selection(self._request).invalidate() action = '' elif action[0:4] == 'del!': translate = self._request.localizer.translate working = handler.launch( self._request, handler.remove, (vcs_user(self._request, storage), path, items, form.values.get('message') or translate(_('Deletion')))) DBSession.query(UserFile).filter(UserFile.path.in_( [normpath(join(storage['storage_id'], path, k)) for k in items])).delete('fetch') DBSession.commit() Selection(self._request).invalidate() elif action == 'ovw!': self._request.session['overviews'][storage['storage_id']] = \ not self._request.session['overviews'][storage['storage_id']] elif action == 'sch!' and 'search' in self._request.session: del self._request.session['search'] elif action in ('get!', 'new?', 'new!', 'dir!'): working |= self._browse_action_create( storage, handler, path, action, form) elif action[0:4] in ('sel!', 'npk!'): items = self._browse_action_container( action, items, normpath(join(storage['storage_id'], path)), form) # Synchronize if not working: working = handler.synchronize(self._request, action == 'syn!') action = '' if working else action return form, action, items, working # ------------------------------------------------------------------------- def _browse_action_create(self, storage, handler, path, action, form): """Manage actions for file or directory creation. :param storage: (dictionary) Storage dictionary. :param handler: (:class:`~..lib.handler.Handler` instance) Storage Control System. :param path: Relative path in storage. :param action: (string) Action. :param form: (:class:`~..lib.form.Form` instance) Browse form. :return: (boolean) ``True`` if work in progress. """ working = False if action == 'get!': root = self._request.registry.settings['storage.root'] for seed in get_selection(self._request): seed = normpath(seed) if len(seed.split('/')) < 2: continue seed = join(root, seed) name = join(root, storage['storage_id'], path, basename(seed)) if seed.startswith(root) and name != seed: if isdir(seed): copy_content(seed, name) elif exists(seed): copy2(seed, name) translate = self._request.localizer.translate working = handler.launch( self._request, handler.add, (vcs_user(self._request, storage), path, translate(_('Copy from selection')))) elif action == 'new?' \ and storage['seeds'] is not None and not storage['seeds']: storage['seeds'] = self._request.registry['opener'].seed_list( self._request, storage['openers']) elif action == 'new!': seed = self._request.registry['opener'].seed_fullpath( self._request, form.values['seed']) translate = self._request.localizer.translate working = handler.launch( self._request, handler.create, (vcs_user(self._request, storage), seed, path, form.values['new_name'], form.values.get('message') or translate(_('Creation')))) elif action == 'dir!': translate = self._request.localizer.translate working = handler.launch( self._request, handler.mkdir, (vcs_user(self._request, storage), path, form.values['directory'], form.values.get('message') or translate(_('Creation')))) return working # ------------------------------------------------------------------------- def _browse_action_container(self, action, items, path, form): """Manage container (selection or pack) actions. :param action: (string) Action. :param items: (list) List of selected items. :param path: (string) Relative path from storage root. :param form: (:class:`~..lib.form.Form` instance) Browse form. :return: (tuple) A tuple such as ``(project_id, pack_id)`` or ``items`` """ if action[0:4] == 'sel!': Selection(self._request).add([join(path, k) for k in items]) form.forget('#') elif action[0:4] == 'npk!': if 'project' in self._request.session \ and self._request.session['project']['perm'] \ in ('leader', 'packmaker'): return create_pack(self._request, items, path) form.forget('#') return [] return items # ------------------------------------------------------------------------- def _vcs_user_form(self, storage, action): """Create a VCS user form and, possibly, update ``StorageUser`` table setting up ``vcs_user`` and ``vcs_password``. :param storage: (dictionary) Storage dictionary. :param action: (string) Current action :return: (:class:`~.lib.form.Form` instance) """ schema = SchemaNode(Mapping()) schema.add(SchemaNode( String(), name='vcs_user', validator=Length(max=ID_LEN), missing=None)) schema.add(SchemaNode(String(), name='vcs_password', missing=None)) if 'seed' in self._request.params: schema.add(SchemaNode(String(), name='seed')) if 'message' in self._request.params: schema.add(SchemaNode( String(), name='message', validator=Length(max=DESCRIPTION_LEN), missing=None)) if 'directory' in self._request.params: schema.add(SchemaNode( String(), name='directory', validator=Length(max=PATH_LEN))) elif action[0:4] == 'ren?' or 'new_name' in self._request.params: schema.add(SchemaNode( String(), name='new_name', validator=Length(max=PATH_LEN))) form = Form(self._request, schema=schema) if action[0:4] == 'ren?': form.values['new_name'] = action[4:] return form if not form.validate() or 'vcs_change' not in self._request.params: return form save_vcs_user(self._request, storage) return form # ------------------------------------------------------------------------- def _vcs_recover(self, storage): """Run a `recover` action on the current storage. :param storage: (:class:`~..lib.models.Storage` instance) Storage dictionary. """ handler = self._request.registry['handler']\ .get_handler(storage.storage_id, storage) handler.launch(self._request, handler.recover) self._request.session.flash(_('Recover has been launched.')) # ------------------------------------------------------------------------- def _swap_openers(self, direction, opener_id, storage): """Swap two openers. :param direction: ('mup' or 'dwn') Direction of the move. :param opener_id: (string) ID of the opener to move. :param storage: (:class:`~..models.storage.Storage` instance) Current storage object. """ # Something to do? opener_ids = [ k.opener_id for k in sorted(storage.openers, key=lambda k: k.sort)] direction = 1 if direction == 'dwn' else -1 index = opener_ids.index(opener_id) if (direction == -1 and index == 0) \ or (direction == 1 and index + 1 == len(opener_ids)): return # Swap in database item1 = [k for k in storage.openers if k.opener_id == opener_id][0] item2 = opener_ids[index + direction] item2 = [k for k in storage.openers if k.opener_id == item2][0] item1.sort, item2.sort = item2.sort, item1.sort if 'storage' in self._request.session: del self._request.session['storage']