Source code for gwcelery.tasks.superevents

"""Module containing the functionality for management of superevents.

*   There is serial processing of triggers from low latency pipelines.
*   Dedicated **superevent** queue for this purpose.
*   Primary logic to respond to low latency triggers contained in
    :meth:`process` function.
"""
from itertools import filterfalse, groupby

from celery.utils.log import get_task_logger

from .. import app
from . import gracedb, igwn_alert

log = get_task_logger(__name__)

REQUIRED_LABELS_BY_GROUP_SEARCH = {
    'cbc': {
        'allsky': {'PASTRO_READY', 'EMBRIGHT_READY', 'SKYMAP_READY'},
        'earlywarning': {'PASTRO_READY', 'EMBRIGHT_READY', 'SKYMAP_READY'},
        'mdc': {'PASTRO_READY', 'EMBRIGHT_READY', 'SKYMAP_READY'},
        'bbh': {'PASTRO_READY', 'EMBRIGHT_READY', 'SKYMAP_READY'},  # required by cwb-bbh # noqa: E501
        'ssm': {'EMBRIGHT_READY', 'SKYMAP_READY'}
    },
    'burst': {
        'allsky': {'SKYMAP_READY'}
    }
}
"""These labels should be present on an event to consider it to
be complete.
"""

FROZEN_LABEL = 'LOW_SIGNIF_LOCKED'
"""This label indicates that the superevent manager should make no further
changes to the preferred event."""

SIGNIFICANT_LABEL = 'SIGNIF_LOCKED'
"""This label indicates that the superevent is elevated to significant"""

EARLY_WARNING_LABEL = 'EARLY_WARNING'
"""This label indicates that the superevent contains a significant
early warning event."""

EARLY_WARNING_SEARCH_NAME = 'EarlyWarning'
"""Search name for Early Warning searches. Only significant events
result in consideration by the superevent manager."""

READY_LABEL = 'EM_READY'
"""This label indicates that a preferred event has been assigned and it
has all data products required to make it ready for annotations."""

VT_SEARCH_NAME = 'VTInjection'
"""Search name for events uploaded as a part of estimating the spacetime
sensitive volume. Events from this search do not form superevents, and
are not annotated."""


[docs]@igwn_alert.handler('cbc_gstlal', 'cbc_spiir', 'cbc_pycbc', 'cbc_mbta', 'cbc_sgnl', 'cbc_aframe', 'burst_cwb', 'burst_mly', shared=False) def handle(payload): """Respond to IGWN alert topics from low-latency search pipelines and delegate to :meth:`process` for superevent management. """ alert_type = payload['alert_type'] gid = payload['object']['graceid'] alert_search = payload['object']['search'] if alert_type != 'label_added': return ifos = get_instruments(payload['object']) # Ignore inclusion of events involving KAGRA; revert when policy is changed if "K1" in ifos: log.info('Skipping %s because it involves KAGRA data', gid) return # Ignore inclusion of events from VT search if alert_search == VT_SEARCH_NAME: log.info("Skipping {} event {}".format(VT_SEARCH_NAME, gid)) return if payload['object']['processing_status'] == 'reserved': log.info('Skipping %s because processing_status is reserved', gid) return try: far = payload['object']['far'] except KeyError: log.info('Skipping %s because it lacks a FAR', gid) return if far > app.conf['superevent_far_threshold']: log.info("Skipping processing of %s because of high FAR", gid) return # FIXME: reinstate priority once the effect is well understood. # priority = 1 label = payload['data']['name'] group = payload['object']['group'].lower() search = payload['object']['search'].lower() pipeline = payload['object']['pipeline'].lower() # Special case of cWB-BBH that require the same label of CBC if search == 'bbh' and pipeline == 'cwb': group = 'cbc' if label == 'RAVEN_ALERT': log.info('Label %s added to %s', label, gid) elif label not in REQUIRED_LABELS_BY_GROUP_SEARCH[group][search]: return elif not is_complete(payload['object']): log.info("Ignoring since %s has %s labels. " "It is not complete", gid, payload['object']['labels']) return process.si(payload).apply_async()
[docs]@gracedb.task(queue='superevent', shared=False, time_limit=600) @gracedb.catch_retryable_http_errors def process(payload): """ Respond to `payload` and update the preferred event of an existing superevent if necessary. Parameters ---------- payload : dict IGWN alert payload """ event_info = payload['object'] sid = event_info['superevent'] _update_superevent(sid, event_info) if should_publish(event_info, significant=True): gracedb.create_label.delay('ADVREQ', sid) if is_complete(event_info): if EARLY_WARNING_LABEL in event_info['labels']: gracedb.create_label(EARLY_WARNING_LABEL, sid) else: gracedb.create_label(SIGNIFICANT_LABEL, sid) if should_publish(event_info, significant=False): if is_complete(event_info): gracedb.create_label(FROZEN_LABEL, sid)
[docs]def get_snr(event): """Get the SNR from the LVAlert packet. Different groups and pipelines store the SNR in different fields. Parameters ---------- event : dict Event dictionary (e.g., the return value from :meth:`gwcelery.tasks.gracedb.get_event`, or ``preferred_event_data`` in igwn-alert packet.) Returns ------- snr : float The SNR. """ group = event['group'].lower() pipeline = event['pipeline'].lower() if group == 'cbc': if pipeline == 'aframe': raise NotImplementedError("SNR is not reported by aframe") attribs = event['extra_attributes']['CoincInspiral'] return attribs['snr'] elif pipeline == 'cwb': attribs = event['extra_attributes']['MultiBurst'] return attribs['snr'] elif pipeline == 'mly': attribs = event['extra_attributes']['MLyBurst'] return attribs['SNR'] else: raise NotImplementedError('SNR attribute not found')
[docs]def get_instruments(event): """Get the instruments that contributed data to an event. Parameters ---------- event : dict Event dictionary (e.g., the return value from :meth:`gwcelery.tasks.gracedb.get_event`, or ``preferred_event_data`` in igwn-alert packet.) Returns ------- set The set of instruments that contributed to the event. The instruments that contributed to the event are generally stored in the instrument field of the G-event as a comma separated list. For pipeline that provide the 'SingleInspiral' use the list of the detector there. """ if (('extra_attributes' in event) and ('SingleInspiral' in event['extra_attributes'])): attribs = event['extra_attributes']['SingleInspiral'] ifos = {single['ifo'] for single in attribs} else: ifos = set(event['instruments'].split(',')) return ifos
[docs]def get_instruments_in_ranking_statistic(event): """Get the instruments that contribute to the false alarm rate. Parameters ---------- event : dict Event dictionary (e.g., the return value from :meth:`gwcelery.tasks.gracedb.get_event`, or ``preferred_event_data`` in igwn-alert packet.) Returns ------- set The set of instruments that contributed to the ranking statistic for the event. Notes ----- The number of instruments that contributed *data* to an event is given by the ``instruments`` key of the GraceDB event JSON structure. However, some pipelines (e.g. gstlal) have a distinction between which instruments contributed *data* and which were considered in the *ranking* of the candidate. All the pipeline should conform to this rule. Unmodeled searches do not provide a SingleInspiral table, In such case use the event information. """ if (('extra_attributes' in event) and ('SingleInspiral' in event['extra_attributes'])): attribs = event['extra_attributes']['SingleInspiral'] ifos = {single['ifo'] for single in attribs if single.get('chisq') is not None} else: ifos = set(event['instruments'].split(',')) return ifos
[docs]@app.task(shared=False) def select_pipeline_preferred_event(events): """Group list of G events by pipeline, and apply :meth:`keyfunc` to select the pipeline preferred events. Parameters ---------- events : list list of event dictionaries Returns ------- dict pipeline, graceid pairs """ g_events = list( filterfalse(lambda x: x['group'] == "External", events)) g_events_by_pipeline = groupby( sorted(g_events, key=lambda x: x['pipeline']), key=lambda x: x['pipeline'] ) return dict( (k, select_preferred_event(g)) for k, g in g_events_by_pipeline)
[docs]@app.task(shared=False) def select_preferred_event(events): """Select the preferred event out of a list of G events, typically contents of a superevent, based on :meth:`keyfunc`. Parameters ---------- events : list list of event dictionaries """ g_events = list( filterfalse(lambda x: x['group'] == "External", events)) return max(g_events, key=keyfunc)
[docs]def is_complete(event): """ Determine if a G event is complete in the sense of the event has its data products complete i.e. labels mentioned under :data:`REQUIRED_LABELS_BY_GROUP_SEARCH`. Test events are not processed by low-latency infrastructure and are always labeled complete. Parameters ---------- event : dict Event dictionary (e.g., the return value from :meth:`gwcelery.tasks.gracedb.get_event`, or ``preferred_event_data`` in igwn-alert packet.) """ group = event['group'].lower() pipeline = event['pipeline'].lower() search = event['search'].lower() label_set = set(event['labels']) # Define the special case burst-cwb-bbh that is a CBC if pipeline == 'cwb' and search == 'bbh': group = 'cbc' required_labels = REQUIRED_LABELS_BY_GROUP_SEARCH[group][search] return required_labels.issubset(label_set)
[docs]def should_publish(event, significant=True): """Determine whether an event should be published as a public alert. All of the following conditions must be true for a public alert: * The event's ``offline`` flag is not set. * The event's false alarm rate, weighted by the group-specific trials factor as specified by the :obj:`~gwcelery.conf.preliminary_alert_trials_factor` (:obj:`~gwcelery.conf.significant_alert_trials_factor`) configuration setting, is less than or equal to :obj:`~gwcelery.conf.preliminary_alert_far_threshold` (:obj:`~gwcelery.conf.significant_alert_far_threshold`) Parameters ---------- event : dict Event dictionary (e.g., the return value from :meth:`gwcelery.tasks.gracedb.get_event`, or ``preferred_event_data`` in igwn-alert packet.) significant : bool Flag to use significant (:obj:`~gwcelery.conf.significant_alert_far_threshold`), or less-significant (:obj:`~gwcelery.conf.preliminary_alert_far_threshold`) FAR threshold. Returns ------- should_publish : bool :obj:`True` if the event meets the criteria for a public alert or :obj:`False` if it does not. """ return all(_should_publish(event, significant=significant))
def _should_publish(event, significant=False): """Wrapper around :meth:`should_publish`. Returns the boolean returns of the publishability criteria as a tuple for later use. """ group = event['group'].lower() search = event.get('search', '').lower() if search in app.conf['significant_alert_far_threshold'][group]: low_signif_far_threshold = \ app.conf['preliminary_alert_far_threshold'][group][search] low_signif_trials_factor = \ app.conf['preliminary_alert_trials_factor'][group][search] signif_far_threshold = \ app.conf['significant_alert_far_threshold'][group][search] signif_trials_factor = \ app.conf['significant_alert_trials_factor'][group][search] low_signif_far = low_signif_trials_factor * event['far'] signif_far = signif_trials_factor * event['far'] else: # Fallback in case an event is uploaded to an unlisted search low_signif_far = -1 * float('inf') signif_far = -1 * float('inf') raven_coincidence = ('RAVEN_ALERT' in event['labels']) # Ensure that anything that returns True for significant=True also returns # True for significant=False. For example, a significant EarlyWarning event # should return True for both significant=True and significant=False. if significant or signif_far < signif_far_threshold: far = signif_far far_threshold = signif_far_threshold else: far = low_signif_far far_threshold = low_signif_far_threshold return (not event['offline'] and 'INJ' not in event['labels'], far <= far_threshold or raven_coincidence)
[docs]def keyfunc(event): """Key function for selection of the preferred event. Return a value suitable for identifying the preferred event. Given events ``a`` and ``b``, ``a`` is preferred over ``b`` if ``keyfunc(a) > keyfunc(b)``, else ``b`` is preferred. Parameters ---------- event : dict Event dictionary (e.g., the return value from :meth:`gwcelery.tasks.gracedb.get_event`). Returns ------- key : tuple The comparison key. Notes ----- Tuples are compared lexicographically in Python: they are compared element-wise until an unequal pair of elements is found. """ group = event['group'].lower() pipeline = event['pipeline'].lower() search = event['search'].lower() # Define special case for aframe to be at the same ranking # as CWB-BBH. This however does not impact the completeness # criteria for aframe which is still determined similar to its # cbc counterparts. # FIXME: rework the code to avoid changing the group internally. if pipeline == 'aframe' and search == 'allsky': group = 'burst' try: group_rank = app.conf['superevent_candidate_preference'][group].get( search, -1 ) except KeyError: group_rank = -1 if group == 'cbc': n_ifos = len(get_instruments(event)) snr_or_far_ranking = get_snr(event) else: # We don't care about the number of detectors for burst events. n_ifos = -1 # Smaller FAR -> higher IFAR -> more significant. # Use -FAR instead of IFAR=1/FAR so that rank for FAR=0 is defined. snr_or_far_ranking = -event['far'] # Conditions that determine choice of the preferred event # event completeness comes first # then, publishability criteria for significant events # then, publishability criteria for less-significant events # then, CBC group is given higher priority over Burst # then, prioritize more number of detectors # finally, use SNR (FAR) between two CBC (Burst) events # See https://rtd.igwn.org/projects/gwcelery/en/latest/gwcelery.tasks.superevents.html#selection-of-the-preferred-event # noqa: E501 return ( is_complete(event), *_should_publish(event, significant=True), *_should_publish(event, significant=False), group_rank, n_ifos, snr_or_far_ranking )
def _update_superevent(superevent_id, new_event_dict): """Update the preferred event of a superevent if necessary. Events with multiple detectors take precedence over single-detector events, then CBC events take precedence over burst events, and any remaining tie is broken by SNR/FAR values for CBC/Burst. Single detector events are not promoted to preferred event status if the existing preferred event is multi-detector. Parameters ---------- superevent_id : str the superevent_id new_event_dict : dict event info of the new trigger as a dictionary """ # labels and preferred event in the IGWN alert are not the latest superevent_dict = gracedb.get_superevent(superevent_id) superevent_labels = superevent_dict['labels'] preferred_event_dict = superevent_dict['preferred_event_data'] if FROZEN_LABEL not in superevent_labels: if keyfunc(new_event_dict) > keyfunc(preferred_event_dict): # update preferred event when LOW_SIGNIF_LOCKED is not applied gracedb.update_superevent( superevent_id, preferred_event=new_event_dict['graceid'] ) # completeness takes first precedence in deciding preferred event # necessary and suffiecient condition to superevent as ready if is_complete(new_event_dict): gracedb.create_label.delay(READY_LABEL, superevent_id)