Source code for gwcelery.tools.nagios

"""A `Nagios plugin <https://nagios-plugins.org/doc/guidelines.html>`_
for monitoring GWCelery."""

from enum import IntEnum
from sys import exit
from traceback import format_exc, format_exception

from celery.bin.base import Command
from celery_eternal import EternalTask
import kombu.exceptions
import sleek_lvalert

# Make sure that all tasks are registered
from .. import tasks


[docs]class NagiosPluginStatus(IntEnum): """Nagios plugin status codes.""" OK = 0 WARNING = 1 CRITICAL = 2 UNKNOWN = 3
[docs]class NagiosCriticalError(Exception): """An exception that maps to a Nagios status of `CRITICAL`."""
[docs]def get_active_queues(inspector): return {queue['name'] for queues in (inspector.active_queues() or {}).values() for queue in queues}
[docs]def get_active_tasks(inspector): return {task['name'] for tasks in inspector.active().values() for task in tasks}
[docs]def get_active_lvalert_nodes(app): client = sleek_lvalert.LVAlertClient(server=app.conf['lvalert_host']) client.connect() client.process(block=False) active = set(client.get_subscriptions()) client.disconnect() return active
[docs]def get_expected_queues(app): # Get the queues for all registered tasks. result = {getattr(task, 'queue', None) for task in app.tasks.values()} # We use 'celery' for all tasks that do not explicitly specify a queue. result -= {None} result |= {'celery'} # Done. return result
[docs]def get_expected_tasks(app): return {name for name, task in app.tasks.items() if isinstance(task, EternalTask)}
[docs]def get_expected_lvalert_nodes(): return set(tasks.lvalert.handler.keys())
[docs]def check_status(app): connection = app.connection() try: connection.ensure_connection(max_retries=1) except kombu.exceptions.OperationalError as e: raise NagiosCriticalError('No connection to broker') from e inspector = app.control.inspect() active = get_active_queues(inspector) expected = get_expected_queues(app) missing = expected - active if missing: raise NagiosCriticalError('Not all expected queues are active') from \ AssertionError('Missing queues: ' + ', '.join(missing)) active = get_active_tasks(inspector) expected = get_expected_tasks(app) missing = expected - active if missing: raise NagiosCriticalError('Not all expected tasks are active') from \ AssertionError('Missing tasks: ' + ', '.join(missing)) active = get_active_lvalert_nodes(app) expected = get_expected_lvalert_nodes() missing = expected - active extra = active - expected if missing: raise NagiosCriticalError('Not all lvalert nodes are subscribed') \ from AssertionError('Missing nodes: ' + ', '.join(missing)) if extra: raise NagiosCriticalError('Too many lvalert nodes are subscribed') \ from AssertionError('Extra nodes: ' + ', '.join(extra))
[docs]class NagiosCommand(Command):
[docs] def run(self, **kwargs): try: check_status(self.app) except NagiosCriticalError as e: status = NagiosPluginStatus.CRITICAL output, = e.args e = e.__cause__ detail = ''.join(format_exception(type(e), e, e.__traceback__)) except: # noqa: E722 status = NagiosPluginStatus.UNKNOWN output = 'Unexpected error' detail = format_exc() else: status = NagiosPluginStatus.OK output = 'Running normally' detail = None print('{}: {}'.format(status.name, output)) if detail: print(detail) exit(status)
NagiosCommand.__doc__ = __doc__