"""An example of a service using FTP."""
from collections import OrderedDict
from chrysalio.lib.utils import tounicode
from chrysalio.lib.ftp import Ftp
from ..lib.service import Service
from ..lib.i18n import _
# =============================================================================
[docs]def includeme(configurator):
"""Function to include `monitor_ftp` service.
:type configurator: pyramid.config.Configurator
:param configurator:
Object used to do configuration declaration within the application.
"""
Service.register(configurator, ServiceMonitorFtp)
# =============================================================================
[docs]class ServiceMonitorFtp(Service):
"""Class to manage FTP monitoring service."""
label = _('FTP Monitoring')
_variables = OrderedDict((
('ftp_host', {
'type': 'string', 'label': _('FTP, Host'), 'required': True}),
('ftp_port', {'type': 'integer', 'label': _('FTP, Port')}),
('ftp_user', {
'type': 'string', 'label': _('FTP, User'), 'required': True}),
('ftp_password', {'type': 'string', 'label': _('FTP, Password')}),
('ftp_tls', {'type': 'boolean', 'label': _('FTP, TLS')}),
('ftp_pasv', {'type': 'boolean', 'label': _('FTP, Passive mode')}),
('ftp_path', {'type': 'string', 'label': _('FTP, Path')})))
# -------------------------------------------------------------------------
def _run(self, build):
"""Execute the service on the build ``build``.
See: :meth:`.lib.service.Service._run`
"""
ftp = Ftp(build.error)
if not ftp.connect(build.values):
self.write_traces(build)
return
dirs, files = ftp.list_directory()
if build.aborted():
build.error(_('Build aborted'))
self.write_traces(build)
return
for name in dirs:
build.info(tounicode(name))
for name in files:
build.info(tounicode(name))
ftp.quit()
self.write_traces(build)