Source code for gwcelery.tasks.superevents

"""Module containing the functionality for creation and management of
superevents.

*   There is serial processing of triggers from low latency pipelines.
*   Dedicated **superevent** queue for this purpose.
*   Primary logic to respond to low latency triggers contained in
    :meth:`process` function.
"""
from itertools import filterfalse

from celery.utils.log import get_task_logger
from ligo.segments import segment, segmentlist

from ..import app
from . import gracedb, lvalert

log = get_task_logger(__name__)

REQUIRED_LABELS_BY_GROUP = {
    'cbc': {'PASTRO_READY', 'EMBRIGHT_READY', 'SKYMAP_READY'},
    'burst': {'SKYMAP_READY'}
}
"""These labels should be present on an event to consider it to
be complete.
"""

FROZEN_LABEL = 'EM_Selected'
"""This label indicates that the superevent manager should make no further
changes to the preferred event."""

READY_LABEL = 'EM_READY'
"""This label indicates that a preferred event has been assigned and it
has all data products required to make it ready for annotations."""


[docs]@lvalert.handler('cbc_gstlal', 'cbc_spiir', 'cbc_pycbc', 'cbc_mbtaonline', 'burst_olib', 'burst_cwb', shared=False) def handle(payload): """Respond to lvalert nodes from low-latency search pipelines and delegate to :meth:`process` for superevent management. """ alert_type = payload['alert_type'] gid = payload['object']['graceid'] try: far = payload['object']['far'] except KeyError: log.info('Skipping %s because it lacks a FAR', gid) return if far > app.conf['superevent_far_threshold']: log.info("Skipping processing of %s because of high FAR", gid) return priority = 1 if alert_type == 'label_added': priority = 0 label = payload['data']['name'] group = payload['object']['group'].lower() if label == 'RAVEN_ALERT': log.info('Label %s added to %s', label, gid) elif label not in REQUIRED_LABELS_BY_GROUP[group]: return elif not is_complete(payload['object']): log.info("Ignoring since %s has %s labels. " "It is not complete", gid, payload['object']['labels']) return elif alert_type != 'new': return process.si(payload).apply_async(priority=priority)
[docs]@gracedb.task(queue='superevent', shared=False) @gracedb.catch_retryable_http_errors def process(payload): """ Respond to `payload` and serially processes them to create new superevents, add events to existing ones. Parameters ---------- payload : dict LVAlert payload """ event_info = payload['object'] gid = event_info['graceid'] category = get_category(event_info) t_0, t_start, t_end = get_ts(event_info) if event_info.get('superevent'): sid = event_info['superevent'] log.info('Event %s already belongs to superevent %s', gid, sid) s = gracedb.get_superevent(sid) superevent = _SuperEvent(s['t_start'], s['t_end'], s['t_0'], s['superevent_id'], s['preferred_event'], s) _update_superevent(superevent, event_info, t_0=t_0, t_start=None, t_end=None) else: # not event_info.get('superevent') log.info('Event %s does not yet belong to a superevent', gid) superevents = gracedb.get_superevents('category: {} {} .. {}'.format( category, event_info['gpstime'] - app.conf['superevent_query_d_t_start'], event_info['gpstime'] + app.conf['superevent_query_d_t_end'])) for s in superevents: if gid in s['gw_events']: sid = s['superevent_id'] log.info('Event %s found assigned to superevent %s', gid, sid) if payload['alert_type'] == 'label_added': log.info('Label %s added to %s', payload['data']['name'], gid) elif payload['alert_type'] == 'new': log.info('New type LVAlert for %s with ' 'existing superevent %s. ' 'No action required', gid, sid) return superevent = _SuperEvent(s['t_start'], s['t_end'], s['t_0'], s['superevent_id'], s['preferred_event'], s) _update_superevent(superevent, event_info, t_0=t_0, t_start=None, t_end=None) break else: # s not in superevents event_segment = _Event(t_0, t_start, t_end, event_info['graceid'], event_info['group'], event_info['pipeline'], event_info.get('search'), event_dict=event_info) superevent = _partially_intersects(superevents, event_segment) if superevent: sid = superevent.superevent_id log.info('Event %s in window of %s. ' 'Adding event to superevent', gid, sid) gracedb.add_event_to_superevent(sid, event_segment.gid) # extend the time window of the superevent new_superevent = superevent | event_segment if new_superevent != superevent: log.info('%s not completely contained in %s, ' 'extending superevent window', event_segment.gid, sid) new_t_start, new_t_end = new_superevent else: # new_superevent == superevent log.info('%s is completely contained in %s', event_segment.gid, sid) new_t_start = new_t_end = None _update_superevent(superevent, event_info, t_0=t_0, t_start=new_t_start, t_end=new_t_end) else: # not superevent log.info('New event %s with no superevent in GraceDB, ' 'creating new superevent', gid) sid = gracedb.create_superevent(event_info['graceid'], t_0, t_start, t_end) if should_publish(event_info): gracedb.create_label.delay('ADVREQ', sid) if is_complete(event_info): if app.conf['preliminary_alert_timeout'] \ and 'EARLY_WARNING' not in event_info['labels']: gracedb.create_label.s(FROZEN_LABEL, sid).set( queue='superevent', countdown=app.conf['preliminary_alert_timeout'] ).delay() else: # fast path if no countdown gracedb.create_label(FROZEN_LABEL, sid)
[docs]def get_category(event): """Get the superevent category for an event. Parameters ---------- event : dict Event dictionary (e.g., the return value from :meth:`gwcelery.tasks.gracedb.get_event`). Returns ------- {'mdc', 'test', 'production'} """ if event.get('search') == 'MDC': return 'mdc' elif event['group'] == 'Test': return 'test' else: return 'production'
[docs]def get_ts(event): """Get time extent of an event, depending on pipeline-specific parameters. * For CWB, use the event's ``duration`` field. * For oLIB, use the ratio of the event's ``quality_mean`` and ``frequency_mean`` fields. * For all other pipelines, use the :obj:`~gwcelery.conf.superevent_d_t_start` and :obj:`~gwcelery.conf.superevent_d_t_end` configuration values. Parameters ---------- event : dict Event dictionary (e.g., the return value from :meth:`gwcelery.tasks.gracedb.get_event`). Returns ------- t_0: float Segment center time in GPS seconds. t_start : float Segment start time in GPS seconds. t_end : float Segment end time in GPS seconds. """ pipeline = event['pipeline'].lower() if pipeline == 'cwb': attribs = event['extra_attributes']['MultiBurst'] d_t_start = d_t_end = attribs['duration'] elif pipeline == 'olib': attribs = event['extra_attributes']['LalInferenceBurst'] d_t_start = d_t_end = (attribs['quality_mean'] / attribs['frequency_mean']) else: d_t_start = app.conf['superevent_d_t_start'].get( pipeline, app.conf['superevent_default_d_t_start']) d_t_end = app.conf['superevent_d_t_end'].get( pipeline, app.conf['superevent_default_d_t_end']) return (event['gpstime'], event['gpstime'] - d_t_start, event['gpstime'] + d_t_end)
[docs]def get_snr(event): """Get the SNR from the LVAlert packet. Different groups and pipelines store the SNR in different fields. Parameters ---------- event : dict Event dictionary (e.g., the return value from :meth:`gwcelery.tasks.gracedb.get_event`). Returns ------- snr : float The SNR. """ group = event['group'].lower() pipeline = event['pipeline'].lower() if group == 'cbc': attribs = event['extra_attributes']['CoincInspiral'] return attribs['snr'] elif pipeline == 'cwb': attribs = event['extra_attributes']['MultiBurst'] return attribs['snr'] elif pipeline == 'olib': attribs = event['extra_attributes']['LalInferenceBurst'] return attribs['omicron_snr_network'] else: raise NotImplementedError('SNR attribute not found')
[docs]def get_instruments(event): """Get the instruments that contributed data to an event. Parameters ---------- event : dict Event dictionary (e.g., the return value from :meth:`gwcelery.tasks.gracedb.get_event`). Returns ------- set The set of instruments that contributed to the event. """ attribs = event['extra_attributes']['SingleInspiral'] ifos = {single['ifo'] for single in attribs} return ifos
[docs]def get_instruments_in_ranking_statistic(event): """Get the instruments that contribute to the false alarm rate. Parameters ---------- event : dict Event dictionary (e.g., the return value from :meth:`gwcelery.tasks.gracedb.get_event`). Returns ------- set The set of instruments that contributed to the ranking statistic for the event. Notes ----- The number of instruments that contributed *data* to an event is given by the ``instruments`` key of the GraceDB event JSON structure. However, some pipelines (e.g. gstlal) have a distinction between which instruments contributed *data* and which were considered in the *ranking* of the candidate. For such pipelines, we infer which pipelines contributed to the ranking by counting only the SingleInspiral records for which the chi squared field is non-empty. For PyCBC Live in the O3 configuration, an empty chi^2 field does not mean that the detector did not contribute to the ranking; in fact, *all* detectors listed in the SingleInspiral table contribute to the significance even if the chi^2 is not computed for some of them. Hence PyCBC Live is handled as a special case. """ if event['pipeline'].lower() == 'pycbc': return set(event['instruments'].split(',')) else: attribs = event['extra_attributes']['SingleInspiral'] return {single['ifo'] for single in attribs if single.get('chisq') is not None}
[docs]@app.task(shared=False) def select_preferred_event(events): """Select the preferred event out of a list of G events, typically contents of a superevent, based on :meth:`keyfunc`. Parameters ---------- events : list list of event dictionaries """ # FIXME: Requires robust determination of an External event g_events = list( filterfalse(lambda x: x['graceid'].startswith('E'), events)) return max(g_events, key=keyfunc)
[docs]def is_complete(event): """ Determine if a G event is complete in the sense of the event has its data products complete i.e. has PASTRO_READY, SKYMAP_READY, EMBRIGHT_READY for CBC events and the SKYMAP_READY label for the Burst events. Test events are not processed by low-latency infrastructure and are always labeled complete. Parameters ---------- event : dict Event dictionary (e.g., the return value from :meth:`gwcelery.tasks.gracedb.get_event`). """ group = event['group'].lower() label_set = set(event['labels']) required_labels = REQUIRED_LABELS_BY_GROUP[group] return required_labels.issubset(label_set)
[docs]def should_publish(event): """Determine whether an event should be published as a public alert. All of the following conditions must be true for a public alert: * The event's ``offline`` flag is not set. * The event's false alarm rate, weighted by the group-specific trials factor as specified by the :obj:`~gwcelery.conf.preliminary_alert_trials_factor` configuration setting, is less than or equal to :obj:`~gwcelery.conf.preliminary_alert_far_threshold`. Parameters ---------- event : dict Event dictionary (e.g., the return value from :meth:`gwcelery.tasks.gracedb.get_event`). Returns ------- should_publish : bool :obj:`True` if the event meets the criteria for a public alert or :obj:`False` if it does not. """ return all(_should_publish(event))
def _should_publish(event): """Wrapper around :meth:`should_publish`. Returns the boolean returns of the publishability criteria as a tuple for later use. """ group = event['group'].lower() if 'EARLY_WARNING' in event['labels']: far_threshold = app.conf['early_warning_alert_far_threshold'] trials_factor = app.conf['early_warning_alert_trials_factor'] else: far_threshold = app.conf['preliminary_alert_far_threshold'][group] trials_factor = app.conf['preliminary_alert_trials_factor'][group] far = trials_factor * event['far'] raven_coincidence = ('RAVEN_ALERT' in event['labels']) return (not event['offline'] and 'INJ' not in event['labels'], far <= far_threshold or raven_coincidence)
[docs]def keyfunc(event): """Key function for selection of the preferred event. Return a value suitable for identifying the preferred event. Given events ``a`` and ``b``, ``a`` is preferred over ``b`` if ``keyfunc(a) > keyfunc(b)``, else ``b`` is preferred. Parameters ---------- event : dict Event dictionary (e.g., the return value from :meth:`gwcelery.tasks.gracedb.get_event`). Returns ------- key : tuple The comparison key. Notes ----- Tuples are compared lexicographically in Python: they are compared element-wise until an unequal pair of elements is found. """ group = event['group'].lower() try: group_rank = ['burst', 'cbc'].index(group) except ValueError: group_rank = -1 if group == 'cbc': group_rank = 1 n_ifos = len(get_instruments(event)) significance = get_snr(event) else: # We don't care about the number of detectors for burst events. n_ifos = -1 # Smaller FAR -> higher IFAR -> more significant. # Use -FAR instead of IFAR=1/FAR so that rank for FAR=0 is defined. significance = -event['far'] return (is_complete(event), *_should_publish(event), group_rank, n_ifos, significance)
def _update_superevent(superevent, new_event_dict, t_0, t_start, t_end): """Update preferred event and/or change time window. Events with multiple detectors take precedence over single-detector events, then CBC events take precedence over burst events, and any remaining tie is broken by SNR/FAR values for CBC/Burst. Single detector are not promoted to preferred event status, if existing preferred event is multi-detector Parameters ---------- superevent : object instance of :class:`_SuperEvent` new_event_dict : dict event info of the new trigger as a dictionary t_0 : float center time of `superevent_id`, None for no change t_start : float start time of `superevent_id`, None for no change t_end : float end time of `superevent_id`, None for no change """ superevent_id = superevent.superevent_id preferred_event = superevent.preferred_event kwargs = {} if t_start is not None: kwargs['t_start'] = t_start if t_end is not None: kwargs['t_end'] = t_end if FROZEN_LABEL not in superevent.event_dict['labels']: preferred_event_dict = gracedb.get_event(preferred_event) if keyfunc(new_event_dict) > keyfunc(preferred_event_dict): # update preferred event when EM_Selected is not applied kwargs['t_0'] = t_0 kwargs['preferred_event'] = new_event_dict['graceid'] if kwargs: gracedb.update_superevent(superevent_id, **kwargs) # completeness takes first precedence in deciding preferred event # necessary and suffiecient condition to superevent as ready if is_complete(new_event_dict): gracedb.create_label.delay(READY_LABEL, superevent_id) def _superevent_segment_list(superevents): """Ingests a list of superevent dictionaries, and returns a segmentlist with start and end times as the duration of each segment. Parameters ---------- superevents : list List of superevent dictionaries (e.g., the return value from :meth:`gwcelery.tasks.gracedb.get_superevents`). Returns ------- superevent_list : segmentlist superevents as a segmentlist object """ return segmentlist([_SuperEvent(s.get('t_start'), s.get('t_end'), s.get('t_0'), s.get('superevent_id'), s.get('preferred_event'), s) for s in superevents]) def _partially_intersects(superevents, event_segment): """Similar to :meth:`segmentlist.find` except it also returns the segment of `superevents` which partially intersects argument. If there are more than one intersections, first occurence is returned. Parameters ---------- superevents : list list pulled down using the gracedb client :method:`superevents` event_segment : segment segment object whose index is wanted Returns ------- match_segment : segment segment in `self` which intersects. `None` if not found """ # create a segmentlist using start and end times superevents = _superevent_segment_list(superevents) for s in superevents: if s.intersects(event_segment): return s return None class _Event(segment): """An event implemented as an extension of :class:`segment`.""" def __new__(cls, t0, t_start, t_end, *args, **kwargs): return super().__new__(cls, t_start, t_end) def __init__(self, t0, t_start, t_end, gid, group=None, pipeline=None, search=None, event_dict={}): self.t0 = t0 self.gid = gid self.group = group self.pipeline = pipeline self.search = search self.event_dict = event_dict class _SuperEvent(segment): """An superevent implemented as an extension of :class:`segment`.""" def __new__(cls, t_start, t_end, *args, **kwargs): return super().__new__(cls, t_start, t_end) def __init__(self, t_start, t_end, t_0, sid, preferred_event=None, event_dict={}): self.t_start = t_start self.t_end = t_end self.t_0 = t_0 self.superevent_id = sid self.preferred_event = preferred_event self.event_dict = event_dict