Source code for publiforge.lib.processor.leprisme

"""LePrisme module transforms files into another files or values."""

import re
from os import walk, listdir, makedirs, remove, rmdir
from os.path import join, exists, splitext, dirname, basename, isdir, relpath
from os.path import normpath, samefile, commonprefix
from shutil import copy, rmtree
from ConfigParser import ConfigParser
from zipfile import is_zipfile
from imp import load_source
from lxml import etree

from ...i18n import _
from ...config import config_get, config_get_list
from ...utils import copy_content, unzip, camel_case, make_id
from ...xml import load_xml
from .. import load_relaxngs
from .publiset import Publiset
from .transform import Transform


REMOVE_PATTERN = r'(~|\.tmp)(\.\w{1,4})?$'


# =============================================================================
[docs]class Processor(object): """Main class for LePrisme processor.""" # ------------------------------------------------------------------------- def __init__(self, build): """Constructor method. :param build: (:class:`~.lib.build.agent.AgentBuild`) Main Build object. """ # Attributes self.build = build self.output = join(self.build.path, 'Output') self.percents = [1, 90] # Configuration name = join(build.path, 'Processor', 'leprisme.ini') if not exists(name): build.stopped(_('File "leprisme.ini" is missing.')) return self._config = ConfigParser({ 'here': dirname(name), 'fid': '{fid}', 'ocffile': '{ocffile}'}) self._config.optionxform = str self._config.read(name) # Transformation steps steps = self._read_steps() if not steps: build.stopped(_('Transformation steps are missing.')) return # Relax NG, scripts and transformation self.relaxngs = load_relaxngs(self.build, self._config) self._scripts = self._load_scripts() self._transform = Transform(self, steps) # -------------------------------------------------------------------------
[docs] def start(self): """Start the processor.""" if self.build.stopped(): return # Process each file files = self._file_list() if not files: self.build.stopped(_('nothing to do!'), 'a_error') return for count, name in enumerate(files): self._process( name, 90 * count / len(files), 90 * (count + 1) / len(files)) if self.build.stopped(): break # Finalization if not self.build.processing['variables'].get('subdir'): self.output = join(self.build.path, 'Output') self.finalize()
# -------------------------------------------------------------------------
[docs] def config(self, section, option, default=None): """Retrieve a value from a configuration object. :param section: (string) Section name. :param option: (string) Option name. :param default: (string, optional) Default value :return: (string) Read value or default value. """ return config_get(self._config, section, option, default)
# -------------------------------------------------------------------------
[docs] def config_list(self, section, option, default=None): """Retrieve a list of values from a configuration object. :param section: (string) Section name. :param option: (string) Option name. :param default: (list, optional) Default values. :return: (list) """ return config_get_list(self._config, section, option, default)
# -------------------------------------------------------------------------
[docs] def make_id(self, fullname): """Compute a file ID according to processor configuration. :param fullname: (string) Absolute path to file to process. """ mode = self.config('Output', 'make_id', 'token') or 'token' if mode in ('standard', 'token', 'xmlid', 'class'): return make_id(splitext(basename(fullname))[0], mode) if not exists(mode): self.build.stopped( _('Unknown file "${n}".', {'n': basename(mode)})) return None module = load_source(splitext(basename(mode))[0], mode) return module.make_id(fullname, self.build, self._config)
# -------------------------------------------------------------------------
[docs] def finalize(self): """Finalization.""" # Remove temporary files if not self.build.processing['variables'].get('keeptmp'): self._remove_temporary_files(self.output) # Run finalization script if 'finalization' in self._scripts: self._scripts['finalization'](self)
# ------------------------------------------------------------------------- def _read_steps(self): """Detect in configuration file the names of the transformation steps. :return: (tuple) A tuple of suffix for ``[Transformation]`` section. """ steps = [] for section in self._config.sections(): if section.startswith('Transformation'): if self._config.has_option(section, 'inactive'): inactive = self._config.get(section, 'inactive') variable = self.build.processing['variables'].get( inactive.replace('!', '')) if (inactive[0] == '!' and not variable) \ or (inactive[0] != '!' and variable): continue steps.append(section[14:]) return tuple(steps) # ------------------------------------------------------------------------- def _load_scripts(self): """Load initialization and finalization script files. :return: (dictionary) A dictionary of script main functions. """ scripts = {} for section in ('Initialization', 'Finalization'): # Find file filename = self.config(section, 'script') if not filename: continue if not exists(filename): self.build.stopped(_('Unknown file "${n}".', {'n': filename})) continue # Load module module = load_source(splitext(basename(filename))[0], filename) scripts[section.lower()] = module.main return scripts # ------------------------------------------------------------------------- def _update_output(self, filename, fid): """Compute output directory for file ``filename``. :param filename: (string) Relative path to the original file to transform. :param fid: (string) File ID. :return: (string) Output path. """ subdir = self.build.processing['variables'].get('subdir') if subdir: prefix = commonprefix( tuple(self.build.pack['files']) + (filename,)) if not isdir(prefix): prefix = dirname(prefix) path = relpath(filename, prefix) parent = basename(dirname(filename)) \ if dirname(dirname(filename)) else '' self.output = join( self.build.path, 'Output', subdir.replace('%(fid)s', camel_case(fid)) .replace('%(path)s', dirname(path)) .replace('%(parent)s', parent)) else: self.output = join(self.build.path, 'Output') # ------------------------------------------------------------------------- def _file_list(self): """List files in pack according to settings. :return: (list) File list. """ regex = self._config.has_option('Input', 'file_regex') \ and re.compile(self.config('Input', 'file_regex')) input_is_dir = self._config.has_option('Input', 'is_dir') \ and self.config('Input', 'is_dir') == 'true' files = [] for base in self.build.pack['files']: fullname = normpath(join(self.build.data_path, base)) if not exists(fullname): self.build.stopped(_('Unknown file "${n}".', {'n': base})) continue if (not regex or regex.search(base)) \ and isdir(fullname) == input_is_dir: files.append(base) if not isdir(fullname): if base not in files: self.build.log( _('"${n}" ignored', {'n': base}), step='a_build') continue if self.build.pack.get('recursive'): for path, dirs, filenames in walk(fullname): names = dirs if input_is_dir else filenames for name in names: if not regex or regex.search(name): name = relpath( join(path, name), self.build.data_path) name = unicode(name.decode('utf8')) \ if isinstance(name, str) else name files.append(name) else: for name in listdir(fullname): if isdir(join(fullname, name)) == input_is_dir \ and (not regex or regex.search(name)): name = relpath( join(fullname, name), self.build.data_path) name = unicode(name.decode('utf8')) \ if isinstance(name, str) else name files.append(name) return files # ------------------------------------------------------------------------- def _initialize(self, fid): """Initialization. :param fid: (string) File ID. ``self.build.processing['templates']`` and ``self.build.pack['templates']`` are lists of tuples such as ``(<input_file>, <output_path>)``. """ # Check if not self.output.startswith(self.build.path): self.build.stopped(_('file outside build directory')) return # Clean up # pylint: disable = no-member if not exists(self.output): makedirs(self.output) fmt = self.config('Output', 'format') if fmt and exists(join(self.output, fmt.format(fid=fid))) and \ not isdir(join(self.output, fmt.format(fid=fid))): remove(join(self.output, fmt.format(fid=fid))) if not self.build.processing['variables'].get('keeptmp'): self._remove_temporary_files( self.output, self._config.has_option('Input', 'unzip') and '%s~' % fid) # Create directories for name in self.config_list('Initialization', 'directories'): name = name.format(fid=fid) if name and not exists(join(self.output, name)): makedirs(join(self.output, name)) # Copy templates if not self._copy_templates(fid): return # Run initialization script if 'initialization' in self._scripts: self._scripts['initialization'](self) # ------------------------------------------------------------------------- def _process(self, filename, percent_in, percent_out, file_elt=None): """Process one XML file. :param str filename: Relative path of the file to process. :param int percent_in: Percent of progress by entering the processing. :param int percent_out: Percent of progress by leaving the processing. :type file_elt: lxml.etree.Element :param file_elt: (optional) <file> XML element for the current file. """ # Load path fullname = normpath(join(self.build.data_path, filename)) fid = self.make_id(fullname) if self.build.stopped(): return self.percents = [max(percent_in, 1), percent_out] self.build.log( u'---------- %s (%s)' % (filename, fid), step='a_build', percent=self.percents[0]) self._update_output(filename, fid) # Load folder if isdir(fullname) or \ (self._config.has_option('Input', 'no_load') and self.config('Input', 'no_load') == 'true'): self._initialize(fid) self._transform.start(filename, fid, filename) return # Unzip file if self.config('Input', 'unzip') and is_zipfile(fullname): self.percents[0] = min(self.percents[0] + 1, self.percents[1]) self.build.log( _('${f}: uncompressing', {'f': fid}), percent=self.percents[0]) unzip(fullname, join(self.output, '%s~' % fid)) fullname = join( self.output, '%s~' % fid, self.config('Input', 'unzip')) # Load file content self.percents[0] = min(self.percents[0] + 1, self.percents[1]) self.build.log( _('${f}: loading file content', {'f': fid}), percent=self.percents[0]) data = self._file_content(fullname, filename) if data is None: if self._config.has_option('Input', 'unzip') \ and isdir(join(self.output, '%s~' % fid)): rmtree(join(self.output, '%s~' % fid)) return # Non Publiset file # pylint: disable = E1103 if isinstance(data, basestring) or data.getroot().tag != 'publiset' \ or self.config('Input', 'no_composition') == 'true': self._initialize(fid) self._add_pi(data, file_elt) self._transform.start(filename, fid, data) return # Publiset selection publiset = Publiset(self, dirname(fullname)) set_root = data.find('composition') \ if self.config('Input', 'as_selection') == 'true' and \ data.find('composition') is not None else data.find('selection') if set_root is not None: fid = set_root.get('id') or fid for elt in set_root.xpath('.//file'): name = relpath( publiset.fullname(elt), self.build.data_path) if name != filename: self._process(name, percent_in, percent_out, elt) if self.build.stopped(): return self.build.log( u'%s ............' % fid, step='a_build', percent=self.percents[0]) self._update_output(filename, fid) self._initialize(fid) if data.find('composition') is not None and \ self.config('Input', 'no_composition') != 'true': self._transform.start( filename, fid, publiset.compose(filename, set_root)) else: self._transform.start(filename, fid, publiset.create(set_root)) return # Publiset composition set_root = data.find('composition') fid = set_root.get('id') or fid self.percents[0] = min(self.percents[0] + 1, self.percents[1]) self.build.log(_('${f}: document composition', {'f': fid})) self._update_output(filename, fid) self._initialize(fid) self._transform.start( filename, fid, publiset.compose(filename, set_root)) # ------------------------------------------------------------------------- def _file_content(self, fullname, filename): """Load file content. :param fullname: (string) Absolute path to file to load. :param filename: (string) Relative path for messages. :return: (string or :class:`lxml.etree.ElementTree` instance or ``None``) """ # Content regex regex = self.config('Input', 'content_regex') # XML file if splitext(fullname)[1].lower() == '.xml': relaxngs = self.relaxngs \ if self.config('Input', 'validate') == 'true' else None data = load_xml(fullname, relaxngs) if not isinstance(data, basestring) and \ (not regex or re.search(regex, etree.tostring(data))): return data if isinstance(data, basestring): if regex and exists(fullname): with open(fullname, 'r') as hdl: data = re.search(regex, hdl.read()) and data self.build.stopped(data, 'a_error') self.build.log( _('"${n}" ignored', {'n': filename}), step='a_build') # Other elif exists(fullname): with open(fullname, 'r') as hdl: data = hdl.read() if not regex or re.search(regex, data): return data self.build.log( _('"${n}" ignored', {'n': filename}), step='a_build') else: self.build.stopped(_('Unknown file "${n}".', {'n': filename})) return None # ------------------------------------------------------------------------- def _add_pi(self, data, file_elt): """Add processing instructions according to attributes dictionary. :type data: class:`str` or :class:`lxml.etree.ElementTree` :param data: Name of file to transform or its content as a string or a tree. :type file_elt: lxml.etree.Element :param file_elt: (optional) <file> XML element for the current file. """ if file_elt is None or not file_elt.attrib or \ isinstance(data, basestring): return # Find the way to select the right node (XPath or Xsl) select_elt = file_elt while select_elt is not None and select_elt.get('xpath') is None: select_elt = select_elt.getparent() # ...root if select_elt is None: self._add_pi_on(file_elt, data.getroot()) # ...XPath elif select_elt.get('xpath'): elts = [] try: elts = data.xpath(select_elt.get('xpath')) except etree.XPathEvalError: return self._add_pi_on(file_elt, elts) # ------------------------------------------------------------------------- @classmethod def _add_pi_on(cls, file_elt, on_elts): """Add processing instruction to ``on_elt`` according to ``file_elts``. :type file_elt: lxml.etree.Element :param file_elt: <file> XML element for the current file. :param list on_elts: List of XML element selected to receive the PI. """ for elt in on_elts: for attr in file_elt.attrib: elt.insert(0, etree.ProcessingInstruction( attr, file_elt.attrib[attr])) # ------------------------------------------------------------------------- def _copy_templates(self, fid): """Copy processor, processing and pack templates into ``Output`` directory. :param fid: (string, optional) File ID. :return: (boolean) """ # Copy template files from INI files for name in self.config_list('Initialization', 'templates'): template = join(self.build.path, 'Processor', 'Templates', name) if not exists(template): self.build.stopped( _('Template "${t}" does not exist', {'t': name})) return False path = self.config('template:%s' % name, 'path', '').format( fid=fid or '') copy_content( template, join(self.output, path), self._excluded_list(name), True) # Copy template files from processing and pack templates for name, path in self.build.processing['templates'] \ + self.build.pack['templates']: template = join(self.build.data_path, name) if not exists(template): self.build.stopped( _('Template "${t}" does not exist', {'t': name})) return False do_unzip = path[0:6] == 'unzip:' path = join(self.output, path[6:]) \ if do_unzip else join(self.output, path) if isdir(template): copy_content(template, path, force=True) elif do_unzip and is_zipfile(template): unzip(template, path) else: if not exists(dirname(path)): makedirs(dirname(path)) copy(template, path) return True # ------------------------------------------------------------------------- def _excluded_list(self, template): """Return exluded file list. :param template: (string) Template name. :return: (list) """ exclude = [] section = 'template:%s' % template if not self._config.has_section(section): return exclude for option in self._config.options(section): if option == 'exclude': exclude += self.config_list(section, option) elif option.startswith('exclude['): var_name = option[8:-1] if var_name[0] != '!' \ and self.build.processing['variables'].get(var_name): exclude += self.config_list(section, option) elif var_name[0] == '!' and not \ self.build.processing['variables'].get(var_name[1:]): exclude += self.config_list(section, option) return exclude # ------------------------------------------------------------------------- def _remove_temporary_files(self, output, keep_dir=None): """Remove temporary files. :param output: (string) Absolute path to output directory. :param keep_dir: (string, optional) Name of directory to keep. """ regex = re.compile(self.config( 'Finalization', 'remove_regex', REMOVE_PATTERN)) for path, dirs, files in walk(output, topdown=False): for name in dirs: if name != keep_dir and \ (regex.search(name) or not listdir(join(path, name))): rmtree(join(path, name)) for name in files: if regex.search(name): remove(join(path, name)) if exists(output) \ and not samefile(output, join(self.build.path, 'Output')) \ and not listdir(output): rmdir(output)