Source code for cioservice.services.zip

"""`Zip` service."""

from __future__ import annotations
from os import makedirs, walk
from os.path import basename, dirname, splitext, join, relpath, isdir
from collections import OrderedDict
from zipfile import ZIP_DEFLATED, ZipFile

from pyramid.config import Configurator

from ..lib.service import Service
from ..lib.build import Build
from ..lib.i18n import _


# =============================================================================
[docs] def includeme(configurator: Configurator): """Function to include `zip` service. :type configurator: pyramid.config.Configurator :param configurator: Object used to do configuration declaration within the application. """ Service.register(configurator, ServiceZip)
# =============================================================================
[docs] class ServiceZip(Service): """Class to manage `zip` service.""" label = _('Zip service') _variables = OrderedDict(( # yapf: disable ('zip_name', { 'type': 'string', 'label': _('ZIP file name'), 'visible': True}), )) _need_files = True _need_write_permission = True _select_files_message = _('Select files to compress!') # ------------------------------------------------------------------------- def _run(self, build: Build): """Execute the service on the build ``build``. See: :meth:`.lib.service.Service._run` """ # Output directory if not build.output: build.error(_('The output directory is not defined!')) self.write_traces(build) return # ZIP file name zip_name = build.values.get('zip_name') or ( len(build.files) == 1 and splitext(basename(build.files[0]))[0]) if not zip_name: build.error(_('You must define a name for the ZIP file!')) self.write_traces(build) return zip_name = basename(zip_name) if splitext(zip_name)[1] != '.zip': zip_name = f'{zip_name}.zip' if len(build.files) == 1 and build.output == build.files[0]: build.output = dirname(build.output) makedirs(build.output, exist_ok=True) # ZIP creation with ZipFile(join(build.output, zip_name), 'w', ZIP_DEFLATED) as zip_file: for filename in build.files: build.current['input_file'] = filename if isdir(filename): for root, ignored_, files in walk(filename): for name in files: zip_file.write( join(root, name), relpath(join(root, name), build.output)) else: zip_file.write(filename, relpath(filename, build.output)) build.progress_file(increase=1) build.progress_save() if build.aborted(): break build.current['input_file'] = None if not build.aborted(): build.info(_('${z} has been created.', {'z': zip_name})) build.output_info() build.output_unrefreshed() build.aborted_message() self.write_traces(build)