Source code for cioservice.services.monitor_ftp

"""An example of a service using FTP."""

from collections import OrderedDict

from chrysalio.lib.utils import tounicode
from chrysalio.lib.ftp import Ftp
from ..lib.service import Service
from ..lib.i18n import _


# =============================================================================
[docs]def includeme(configurator): """Function to include `monitor_ftp` service. :type configurator: pyramid.config.Configurator :param configurator: Object used to do configuration declaration within the application. """ Service.register(configurator, ServiceMonitorFtp)
# =============================================================================
[docs]class ServiceMonitorFtp(Service): """Class to manage FTP monitoring service.""" label = _('FTP Monitoring') _variables = OrderedDict(( ('ftp_host', { 'type': 'string', 'label': _('FTP, Host'), 'required': True}), ('ftp_port', {'type': 'integer', 'label': _('FTP, Port')}), ('ftp_user', { 'type': 'string', 'label': _('FTP, User'), 'required': True}), ('ftp_password', {'type': 'string', 'label': _('FTP, Password')}), ('ftp_tls', {'type': 'boolean', 'label': _('FTP, TLS')}), ('ftp_pasv', {'type': 'boolean', 'label': _('FTP, Passive mode')}), ('ftp_path', {'type': 'string', 'label': _('FTP, Path')}))) # ------------------------------------------------------------------------- def _run(self, build): """Execute the service on the build ``build``. See: :meth:`.lib.service.Service._run` """ ftp = Ftp(build.error) if not ftp.connect(build.values): self.write_traces(build) return dirs, files = ftp.list_directory() if build.aborted(): build.error(_('Build aborted')) self.write_traces(build) return for name in dirs: build.info(tounicode(name)) for name in files: build.info(tounicode(name)) ftp.quit() self.write_traces(build)