"""Module containing the functionality for management of superevents.
* There is serial processing of triggers from low latency pipelines.
* Dedicated **superevent** queue for this purpose.
* Primary logic to respond to low latency triggers contained in
:meth:`process` function.
"""
from itertools import filterfalse, groupby
from celery.utils.log import get_task_logger
from .. import app
from . import gracedb, igwn_alert
log = get_task_logger(__name__)
REQUIRED_LABELS_BY_GROUP_SEARCH = {
'cbc': {
'allsky': {'PASTRO_READY', 'EMBRIGHT_READY', 'SKYMAP_READY'},
'earlywarning': {'PASTRO_READY', 'EMBRIGHT_READY', 'SKYMAP_READY'},
'mdc': {'PASTRO_READY', 'EMBRIGHT_READY', 'SKYMAP_READY'},
'bbh': {'PASTRO_READY', 'EMBRIGHT_READY', 'SKYMAP_READY'}, # required by cwb-bbh # noqa: E501
'ssm': {'EMBRIGHT_READY', 'SKYMAP_READY'}
},
'burst': {
'allsky': {'SKYMAP_READY'}
}
}
"""These labels should be present on an event to consider it to
be complete.
"""
FROZEN_LABEL = 'LOW_SIGNIF_LOCKED'
"""This label indicates that the superevent manager should make no further
changes to the preferred event."""
SIGNIFICANT_LABEL = 'SIGNIF_LOCKED'
"""This label indicates that the superevent is elevated to significant"""
EARLY_WARNING_LABEL = 'EARLY_WARNING'
"""This label indicates that the superevent contains a significant
early warning event."""
EARLY_WARNING_SEARCH_NAME = 'EarlyWarning'
"""Search name for Early Warning searches. Only significant events
result in consideration by the superevent manager."""
READY_LABEL = 'EM_READY'
"""This label indicates that a preferred event has been assigned and it
has all data products required to make it ready for annotations."""
VT_SEARCH_NAME = 'VTInjection'
"""Search name for events uploaded as a part of estimating the spacetime
sensitive volume. Events from this search do not form superevents, and
are not annotated."""
[docs]@igwn_alert.handler('cbc_gstlal',
'cbc_spiir',
'cbc_pycbc',
'cbc_mbta',
'cbc_sgnl',
'cbc_aframe',
'burst_cwb',
'burst_mly',
shared=False)
def handle(payload):
"""Respond to IGWN alert topics from low-latency search pipelines and
delegate to :meth:`process` for superevent management.
"""
alert_type = payload['alert_type']
gid = payload['object']['graceid']
alert_search = payload['object']['search']
if alert_type != 'label_added':
return
ifos = get_instruments(payload['object'])
# Ignore inclusion of events involving KAGRA; revert when policy is changed
if "K1" in ifos:
log.info('Skipping %s because it involves KAGRA data', gid)
return
# Ignore inclusion of events from VT search
if alert_search == VT_SEARCH_NAME:
log.info("Skipping {} event {}".format(VT_SEARCH_NAME, gid))
return
if payload['object']['processing_status'] == 'reserved':
log.info('Skipping %s because processing_status is reserved', gid)
return
try:
far = payload['object']['far']
except KeyError:
log.info('Skipping %s because it lacks a FAR', gid)
return
if far > app.conf['superevent_far_threshold']:
log.info("Skipping processing of %s because of high FAR", gid)
return
# FIXME: reinstate priority once the effect is well understood.
# priority = 1
label = payload['data']['name']
group = payload['object']['group'].lower()
search = payload['object']['search'].lower()
pipeline = payload['object']['pipeline'].lower()
# Special case of cWB-BBH that require the same label of CBC
if search == 'bbh' and pipeline == 'cwb':
group = 'cbc'
if label == 'RAVEN_ALERT':
log.info('Label %s added to %s', label, gid)
elif label not in REQUIRED_LABELS_BY_GROUP_SEARCH[group][search]:
return
elif not is_complete(payload['object']):
log.info("Ignoring since %s has %s labels. "
"It is not complete", gid, payload['object']['labels'])
return
process.si(payload).apply_async()
[docs]@gracedb.task(queue='superevent', shared=False, time_limit=600)
@gracedb.catch_retryable_http_errors
def process(payload):
"""
Respond to `payload` and update the preferred event of an existing
superevent if necessary.
Parameters
----------
payload : dict
IGWN alert payload
"""
event_info = payload['object']
sid = event_info['superevent']
_update_superevent(sid, event_info)
if should_publish(event_info, significant=True):
gracedb.create_label.delay('ADVREQ', sid)
if is_complete(event_info):
if EARLY_WARNING_LABEL in event_info['labels']:
gracedb.create_label(EARLY_WARNING_LABEL, sid)
else:
gracedb.create_label(SIGNIFICANT_LABEL, sid)
if should_publish(event_info, significant=False):
if is_complete(event_info):
gracedb.create_label(FROZEN_LABEL, sid)
[docs]def get_snr(event):
"""Get the SNR from the LVAlert packet.
Different groups and pipelines store the SNR in different fields.
Parameters
----------
event : dict
Event dictionary (e.g., the return value from
:meth:`gwcelery.tasks.gracedb.get_event`, or
``preferred_event_data`` in igwn-alert packet.)
Returns
-------
snr : float
The SNR.
"""
group = event['group'].lower()
pipeline = event['pipeline'].lower()
if group == 'cbc':
if pipeline == 'aframe':
raise NotImplementedError("SNR is not reported by aframe")
attribs = event['extra_attributes']['CoincInspiral']
return attribs['snr']
elif pipeline == 'cwb':
attribs = event['extra_attributes']['MultiBurst']
return attribs['snr']
elif pipeline == 'mly':
attribs = event['extra_attributes']['MLyBurst']
return attribs['SNR']
else:
raise NotImplementedError('SNR attribute not found')
[docs]def get_instruments(event):
"""Get the instruments that contributed data to an event.
Parameters
----------
event : dict
Event dictionary (e.g., the return value from
:meth:`gwcelery.tasks.gracedb.get_event`, or
``preferred_event_data`` in igwn-alert packet.)
Returns
-------
set
The set of instruments that contributed to the event.
The instruments that contributed to the event are
generally stored in the instrument field of the G-event
as a comma separated list. For pipeline that
provide the 'SingleInspiral' use the list
of the detector there.
"""
if (('extra_attributes' in event) and
('SingleInspiral' in event['extra_attributes'])):
attribs = event['extra_attributes']['SingleInspiral']
ifos = {single['ifo'] for single in attribs}
else:
ifos = set(event['instruments'].split(','))
return ifos
[docs]def get_instruments_in_ranking_statistic(event):
"""Get the instruments that contribute to the false alarm rate.
Parameters
----------
event : dict
Event dictionary (e.g., the return value from
:meth:`gwcelery.tasks.gracedb.get_event`, or
``preferred_event_data`` in igwn-alert packet.)
Returns
-------
set
The set of instruments that contributed to the ranking statistic for
the event.
Notes
-----
The number of instruments that contributed *data* to an event is given by
the ``instruments`` key of the GraceDB event JSON structure. However, some
pipelines (e.g. gstlal) have a distinction between which instruments
contributed *data* and which were considered in the *ranking* of the
candidate. All the pipeline should conform to this rule.
Unmodeled searches do not provide a SingleInspiral table, In such case
use the event information.
"""
if (('extra_attributes' in event) and
('SingleInspiral' in event['extra_attributes'])):
attribs = event['extra_attributes']['SingleInspiral']
ifos = {single['ifo'] for single in attribs
if single.get('chisq') is not None}
else:
ifos = set(event['instruments'].split(','))
return ifos
[docs]@app.task(shared=False)
def select_pipeline_preferred_event(events):
"""Group list of G events by pipeline, and apply :meth:`keyfunc`
to select the pipeline preferred events.
Parameters
----------
events : list
list of event dictionaries
Returns
-------
dict
pipeline, graceid pairs
"""
g_events = list(
filterfalse(lambda x: x['group'] == "External", events))
g_events_by_pipeline = groupby(
sorted(g_events, key=lambda x: x['pipeline']),
key=lambda x: x['pipeline']
)
return dict(
(k, select_preferred_event(g)) for k, g in g_events_by_pipeline)
[docs]@app.task(shared=False)
def select_preferred_event(events):
"""Select the preferred event out of a list of G events, typically
contents of a superevent, based on :meth:`keyfunc`.
Parameters
----------
events : list
list of event dictionaries
"""
g_events = list(
filterfalse(lambda x: x['group'] == "External", events))
return max(g_events, key=keyfunc)
[docs]def is_complete(event):
"""
Determine if a G event is complete in the sense of the event
has its data products complete i.e. labels mentioned under
:data:`REQUIRED_LABELS_BY_GROUP_SEARCH`. Test events are not
processed by low-latency infrastructure and are always labeled
complete.
Parameters
----------
event : dict
Event dictionary (e.g., the return value from
:meth:`gwcelery.tasks.gracedb.get_event`, or
``preferred_event_data`` in igwn-alert packet.)
"""
group = event['group'].lower()
pipeline = event['pipeline'].lower()
search = event['search'].lower()
label_set = set(event['labels'])
# Define the special case burst-cwb-bbh that is a CBC
if pipeline == 'cwb' and search == 'bbh':
group = 'cbc'
required_labels = REQUIRED_LABELS_BY_GROUP_SEARCH[group][search]
return required_labels.issubset(label_set)
[docs]def should_publish(event, significant=True):
"""Determine whether an event should be published as a public alert.
All of the following conditions must be true for a public alert:
* The event's ``offline`` flag is not set.
* The event's false alarm rate, weighted by the group-specific trials
factor as specified by the
:obj:`~gwcelery.conf.preliminary_alert_trials_factor`
(:obj:`~gwcelery.conf.significant_alert_trials_factor`)
configuration setting, is less than or equal to
:obj:`~gwcelery.conf.preliminary_alert_far_threshold`
(:obj:`~gwcelery.conf.significant_alert_far_threshold`)
Parameters
----------
event : dict
Event dictionary (e.g., the return value from
:meth:`gwcelery.tasks.gracedb.get_event`, or
``preferred_event_data`` in igwn-alert packet.)
significant : bool
Flag to use significant
(:obj:`~gwcelery.conf.significant_alert_far_threshold`),
or less-significant
(:obj:`~gwcelery.conf.preliminary_alert_far_threshold`)
FAR threshold.
Returns
-------
should_publish : bool
:obj:`True` if the event meets the criteria for a public alert or
:obj:`False` if it does not.
"""
return all(_should_publish(event, significant=significant))
def _should_publish(event, significant=False):
"""Wrapper around :meth:`should_publish`. Returns the boolean returns of
the publishability criteria as a tuple for later use.
"""
group = event['group'].lower()
search = event.get('search', '').lower()
if search in app.conf['significant_alert_far_threshold'][group]:
low_signif_far_threshold = \
app.conf['preliminary_alert_far_threshold'][group][search]
low_signif_trials_factor = \
app.conf['preliminary_alert_trials_factor'][group][search]
signif_far_threshold = \
app.conf['significant_alert_far_threshold'][group][search]
signif_trials_factor = \
app.conf['significant_alert_trials_factor'][group][search]
low_signif_far = low_signif_trials_factor * event['far']
signif_far = signif_trials_factor * event['far']
else:
# Fallback in case an event is uploaded to an unlisted search
low_signif_far = -1 * float('inf')
signif_far = -1 * float('inf')
raven_coincidence = ('RAVEN_ALERT' in event['labels'])
# Ensure that anything that returns True for significant=True also returns
# True for significant=False. For example, a significant EarlyWarning event
# should return True for both significant=True and significant=False.
if significant or signif_far < signif_far_threshold:
far = signif_far
far_threshold = signif_far_threshold
else:
far = low_signif_far
far_threshold = low_signif_far_threshold
return (not event['offline'] and 'INJ' not in event['labels'],
far <= far_threshold or raven_coincidence)
[docs]def keyfunc(event):
"""Key function for selection of the preferred event.
Return a value suitable for identifying the preferred event. Given events
``a`` and ``b``, ``a`` is preferred over ``b`` if
``keyfunc(a) > keyfunc(b)``, else ``b`` is preferred.
Parameters
----------
event : dict
Event dictionary (e.g., the return value from
:meth:`gwcelery.tasks.gracedb.get_event`).
Returns
-------
key : tuple
The comparison key.
Notes
-----
Tuples are compared lexicographically in Python: they are compared
element-wise until an unequal pair of elements is found.
"""
group = event['group'].lower()
pipeline = event['pipeline'].lower()
search = event['search'].lower()
# Define special case for aframe to be at the same ranking
# as CWB-BBH. This however does not impact the completeness
# criteria for aframe which is still determined similar to its
# cbc counterparts.
# FIXME: rework the code to avoid changing the group internally.
if pipeline == 'aframe' and search == 'allsky':
group = 'burst'
try:
group_rank = app.conf['superevent_candidate_preference'][group].get(
search, -1
)
except KeyError:
group_rank = -1
if group == 'cbc':
n_ifos = len(get_instruments(event))
snr_or_far_ranking = get_snr(event)
else:
# We don't care about the number of detectors for burst events.
n_ifos = -1
# Smaller FAR -> higher IFAR -> more significant.
# Use -FAR instead of IFAR=1/FAR so that rank for FAR=0 is defined.
snr_or_far_ranking = -event['far']
# Conditions that determine choice of the preferred event
# event completeness comes first
# then, publishability criteria for significant events
# then, publishability criteria for less-significant events
# then, CBC group is given higher priority over Burst
# then, prioritize more number of detectors
# finally, use SNR (FAR) between two CBC (Burst) events
# See https://rtd.igwn.org/projects/gwcelery/en/latest/gwcelery.tasks.superevents.html#selection-of-the-preferred-event # noqa: E501
return (
is_complete(event),
*_should_publish(event, significant=True),
*_should_publish(event, significant=False),
group_rank,
n_ifos,
snr_or_far_ranking
)
def _update_superevent(superevent_id, new_event_dict):
"""Update the preferred event of a superevent if necessary. Events with
multiple detectors take precedence over single-detector events, then CBC
events take precedence over burst events, and any remaining tie is broken
by SNR/FAR values for CBC/Burst. Single detector events are not promoted to
preferred event status if the existing preferred event is multi-detector.
Parameters
----------
superevent_id : str
the superevent_id
new_event_dict : dict
event info of the new trigger as a dictionary
"""
# labels and preferred event in the IGWN alert are not the latest
superevent_dict = gracedb.get_superevent(superevent_id)
superevent_labels = superevent_dict['labels']
preferred_event_dict = superevent_dict['preferred_event_data']
if FROZEN_LABEL not in superevent_labels:
if keyfunc(new_event_dict) > keyfunc(preferred_event_dict):
# update preferred event when LOW_SIGNIF_LOCKED is not applied
gracedb.update_superevent(
superevent_id,
preferred_event=new_event_dict['graceid']
)
# completeness takes first precedence in deciding preferred event
# necessary and suffiecient condition to superevent as ready
if is_complete(new_event_dict):
gracedb.create_label.delay(READY_LABEL, superevent_id)