Source code for gwcelery.lvalert.bootsteps

from celery import bootsteps
from celery.utils.log import get_logger

from . import client
from .signals import lvalert_received

__all__ = ('Receiver',)

log = get_logger(__name__)


class LVAlertBootStep(bootsteps.ConsumerStep):
    """Generic boot step to limit us to appropriate kinds of workers.

    Only include this bootstep in workers that are started with the
    ``--lvalert`` command line option."""

    def __init__(self, consumer, lvalert=False, **kwargs):
        self.enabled = bool(lvalert)

    def start(self, consumer):
        log.info('Starting %s', self.name)

    def stop(self, consumer):
        log.info('Stopping %s', self.name)


def _send_lvalert_received(node, payload):
    """Shim to send Celery signal."""
    lvalert_received.send(None, node=node, payload=payload)


[docs]class Receiver(LVAlertBootStep): """Run the global LVAlert receiver in background thread.""" name = 'LVAlert client'
[docs] def create(self, consumer): super().create(consumer) self._client = client.LVAlertClient( server=consumer.app.conf['lvalert_host'], nodes=consumer.app.conf['lvalert_nodes'])
[docs] def start(self, consumer): super().start(consumer) self._client.connect() self._client.process() self._client.listen(_send_lvalert_received)
[docs] def stop(self, consumer): super().stop(consumer) self._client.disconnect()
[docs] def info(self, consumer): return {'lvalert-nodes': self._client.get_subscriptions()}