Source code for gwcelery.tasks.lvalert

"""LVAlert client."""
import json
import time

from celery_eternal import EternalTask
from celery.utils.log import get_task_logger
import sleek_lvalert

from ..import app
from .core import DispatchHandler
from . import gracedb

log = get_task_logger(__name__)


class _LVAlertDispatchHandler(DispatchHandler):

    def process_args(self, node, payload):
        # Determine GraceDB service URL
        alert = json.loads(payload)
        try:
            try:
                self_link = alert['object']['links']['self']
            except KeyError:
                self_link = alert['object']['self']
        except KeyError:
            log.exception('LVAlert message does not contain an API URL: %r',
                          alert)
            return None, None, None
        base, api, _ = self_link.partition('/api/')
        service = base + api

        if service != gracedb.client._service_url:
            log.warning('ignoring LVAlert message because it is intended for '
                        'GraceDb server %s, but we are set up for server %s',
                        service, gracedb.client._service_url)
            return None, None, None

        return super().process_args(node, alert)


handler = _LVAlertDispatchHandler()
r"""Function decorator to register a handler callback for specified LVAlert
message types. The decorated function is turned into a Celery task, which will
be automatically called whenever a matching LVAlert message is received.

Parameters
----------
\*keys
    List of LVAlert message types to accept
\*\*kwargs
    Additional keyword arguments for :meth:`celery.Celery.task`.

Examples
--------
Declare a new handler like this::

    @lvalert.handler('cbc_gstlal',
                     'cbc_spiir',
                     'cbc_pycbc',
                     'cbc_mbtaonline')
    def handle_cbc(alert_content):
        # do work here...
"""


[docs]@app.task(base=EternalTask, bind=True, shared=False) def listen(self): """Listen for LVAlert messages forever. LVAlert messages are dispatched asynchronously to tasks that have been registered with :meth:`gwcelery.tasks.lvalert.handler`.""" log.info('Starting client') client = sleek_lvalert.LVAlertClient(server=app.conf['lvalert_host']) client.connect() client.process(block=False) log.info('Updating subscriptions') current_subscriptions = set(client.get_subscriptions()) needed_subscriptions = set(handler.keys()) client.subscribe(*(needed_subscriptions - current_subscriptions)) log.info('Listening for pubsub messages') client.listen(handler.dispatch) while not self.is_aborted(): time.sleep(1) log.info('Disconnecting') client.abort() log.info('Exiting')