"""`Zip` service."""
from __future__ import annotations
from os import makedirs, walk
from os.path import basename, dirname, splitext, join, relpath, isdir
from collections import OrderedDict
from zipfile import ZIP_DEFLATED, ZipFile
from pyramid.config import Configurator
from ..lib.service import Service
from ..lib.build import Build
from ..lib.i18n import _
# =============================================================================
[docs]
def includeme(configurator: Configurator):
"""Function to include `zip` service.
:type configurator: pyramid.config.Configurator
:param configurator:
Object used to do configuration declaration within the application.
"""
Service.register(configurator, ServiceZip)
# =============================================================================
[docs]
class ServiceZip(Service):
"""Class to manage `zip` service."""
label = _('Zip service')
_variables = OrderedDict(( # yapf: disable
('zip_name', {
'type': 'string', 'label': _('ZIP file name'), 'visible': True}),
))
_need_files = True
_need_write_permission = True
_select_files_message = _('Select files to compress!')
# -------------------------------------------------------------------------
def _run(self, build: Build):
"""Execute the service on the build ``build``.
See: :meth:`.lib.service.Service._run`
"""
# Output directory
if not build.output:
build.error(_('The output directory is not defined!'))
self.write_traces(build)
return
# ZIP file name
zip_name = build.values.get('zip_name') or (
len(build.files) == 1 and splitext(basename(build.files[0]))[0])
if not zip_name:
build.error(_('You must define a name for the ZIP file!'))
self.write_traces(build)
return
zip_name = basename(zip_name)
if splitext(zip_name)[1] != '.zip':
zip_name = f'{zip_name}.zip'
if len(build.files) == 1 and build.output == build.files[0]:
build.output = dirname(build.output)
makedirs(build.output, exist_ok=True)
# ZIP creation
with ZipFile(join(build.output, zip_name), 'w',
ZIP_DEFLATED) as zip_file:
for filename in build.files:
build.current['input_file'] = filename
if isdir(filename):
for root, ignored_, files in walk(filename):
for name in files:
zip_file.write(
join(root, name),
relpath(join(root, name), build.output))
else:
zip_file.write(filename, relpath(filename, build.output))
build.progress_file(increase=1)
build.progress_save()
if build.aborted():
break
build.current['input_file'] = None
if not build.aborted():
build.info(_('${z} has been created.', {'z': zip_name}))
build.output_info()
build.output_unrefreshed()
build.aborted_message()
self.write_traces(build)