"""Module containing the functionality for creation and management of
superevents.
* There is serial processing of triggers from low latency
pipelines.
* Dedicated **superevent** queue for this purpose.
* Primary logic to respond to low latency triggers contained
in :meth:`handle` function.
"""
from celery.utils.log import get_task_logger
from ligo.gracedb.exceptions import HTTPError
from ligo.segments import segment, segmentlist
from ..import app
from . import gracedb, lvalert
log = get_task_logger(__name__)
[docs]@lvalert.handler('cbc_gstlal',
'cbc_spiir',
'cbc_pycbc',
'cbc_mbtaonline',
'burst_olib',
'burst_cwb',
'test_gstlal',
'test_pycbc',
'test_mbtaonline',
queue='superevent',
shared=False)
def handle(payload):
"""LVAlert handler for superevent manager.
Recieves payload from test and production nodes and
serially processes them to create/modify superevents
"""
alert_type = payload['alert_type']
if alert_type != 'new':
log.info('Not new type alert, passing...')
return
gid = payload['uid']
try:
far = payload['object']['far']
except KeyError:
log.info(
'Skipping %s because LVAlert message does not provide FAR', gid)
return
else:
if far > app.conf['superevent_far_threshold']:
log.info("Skipping processing of %s because of low FAR", gid)
return
event_info = _get_event_info(payload)
if event_info['search'] == 'MDC':
category = 'mdc'
elif event_info['group'] == 'Test':
category = 'test'
else:
category = 'production'
superevents = gracedb.get_superevents('category: {} {} .. {}'.format(
category,
event_info['gpstime'] - app.conf['superevent_query_d_t_start'],
event_info['gpstime'] + app.conf['superevent_query_d_t_end']))
for superevent in superevents:
if gid in superevent['gw_events']:
sid = superevent['superevent_id']
break # Found matching superevent
else:
sid = None # No matching superevent
d_t_start, d_t_end = _get_dts(event_info)
if sid is None:
log.debug('Entered 1st if')
event_segment = _Event(event_info['gpstime'],
event_info['gpstime'] - d_t_start,
event_info['gpstime'] + d_t_end,
event_info['graceid'],
event_info['group'],
event_info['pipeline'],
event_info['search'],
event_dict=payload)
superevent = _partially_intersects(superevents, event_segment)
if not superevent:
log.info('New event %s with no superevent in GraceDb, '
'creating new superevent', gid)
gracedb.create_superevent(event_info['graceid'],
event_info['gpstime'],
d_t_start,
d_t_end,
category)
return
log.info('Event %s in window of %s. Adding event to superevent',
gid, superevent.superevent_id)
gracedb.add_event_to_superevent(superevent.superevent_id,
event_segment.gid)
# extend the time window of the superevent
new_superevent = superevent | event_segment
if new_superevent != superevent:
log.info("%s not completely contained in %s, "
"extending superevent window",
event_segment.gid, superevent.superevent_id)
new_t_start, new_t_end = new_superevent[0], new_superevent[1]
else:
log.info("%s is completely contained in %s",
event_segment.gid, superevent.superevent_id)
new_t_start = new_t_end = None
# FIXME handle the 400 properly when arises
try:
_update_superevent(superevent.superevent_id,
superevent.preferred_event,
event_info,
t_start=new_t_start,
t_end=new_t_end)
except HTTPError as err:
if err.status == 400 and err.reason == "Bad Request":
log.exception("Server returned bad request")
else:
raise err
else:
log.critical('Superevent %s exists for alert_type new for %s',
sid, gid)
def _get_event_info(payload):
"""Helper function to fetch required event info (from GraceDb)
at once and reduce polling
"""
# pull basic info
alert_type = payload.get('alert_type')
payload = payload.get('object', payload)
event_info = dict(
graceid=payload['graceid'],
gpstime=payload['gpstime'],
far=payload['far'],
instruments=payload['instruments'],
group=payload['group'],
pipeline=payload['pipeline'],
search=payload.get('search'),
alert_type=alert_type)
# pull pipeline based extra attributes
if payload['group'].lower() == 'cbc':
event_info['snr'] = \
payload['extra_attributes']['CoincInspiral']['snr']
if payload['pipeline'].lower() == 'cwb':
extra_attributes = ['duration', 'start_time', 'snr']
event_info.update(
{attr:
payload['extra_attributes']['MultiBurst'][attr]
for attr in extra_attributes})
elif payload['pipeline'].lower() == 'olib':
extra_attributes = ['quality_mean', 'frequency_mean']
event_info.update(
{attr:
payload['extra_attributes']['LalInferenceBurst'][attr]
for attr in extra_attributes})
# oLIB snr key has a different name, call it snr
event_info['snr'] = \
payload[
'extra_attributes']['LalInferenceBurst']['omicron_snr_network']
return event_info
def _get_dts(event_info):
"""
Returns the d_t_start and d_t_end values based on CBC/Burst
type alerts
"""
pipeline = event_info['pipeline'].lower()
if pipeline == 'cwb':
d_t_start = d_t_end = event_info['duration']
elif pipeline == 'olib':
d_t_start = d_t_end = (event_info['quality_mean'] /
event_info['frequency_mean'])
else:
d_t_start = app.conf['superevent_d_t_start'].get(
pipeline, app.conf['superevent_default_d_t_start'])
d_t_end = app.conf['superevent_d_t_end'].get(
pipeline, app.conf['superevent_default_d_t_end'])
return d_t_start, d_t_end
def _keyfunc(event_info):
group = event_info['group'].lower()
num_ifos = len(event_info['instruments'].split(","))
ifo_rank = (num_ifos <= 1)
try:
group_rank = ['cbc', 'burst'].index(group)
except ValueError:
group_rank = float('inf')
# return the index of group and negative snr in spirit
# of rank being lower for higher SNR for CBC
if group == 'cbc':
return ifo_rank, group_rank, -1.0*event_info['snr']
else:
return ifo_rank, group_rank, event_info['far']
def _update_superevent(superevent_id, preferred_event, new_event_dict,
t_start, t_end):
"""
Update preferred event and/or change time window.
Events with multiple detectors take precedence over
single-detector events, then CBC events take precedence
over burst events, and any remaining tie is broken by SNR/FAR
values for CBC/Burst. Single detector are not promoted
to preferred event status, if existing preferred event is
multi-detector
Parameters
----------
superevent_id : str
superevent uid
preferred_event : str
preferred event id of the superevent
new_event_dict : dict
event info of the new trigger as a dictionary
t_start : float
start time of `superevent_id`, None for no change
t_end : float
end time of `superevent_id`, None for no change
"""
preferred_event_dict = _get_event_info(gracedb.get_event(preferred_event))
kwargs = {}
if t_start is not None:
kwargs['t_start'] = t_start
if t_end is not None:
kwargs['t_end'] = t_end
if _keyfunc(new_event_dict) < _keyfunc(preferred_event_dict):
kwargs['preferred_event'] = new_event_dict['graceid']
if kwargs:
gracedb.update_superevent(superevent_id, **kwargs)
def _superevent_segment_list(superevents):
"""Ingests a list of superevent dictionaries, and returns
a segmentlist with start and end times as the duration of
each segment
Parameters
----------
superevents : list
list of superevent dictionaries, usually fetched by
:meth:`GraceDb.superevents()`.
Returns
-------
superevent_list : segmentlist
superevents as a segmentlist object
"""
return segmentlist([_SuperEvent(s.get('t_start'),
s.get('t_end'),
s.get('t_0'),
s.get('superevent_id'),
s.get('preferred_event'),
s)
for s in superevents])
def _partially_intersects(superevents, event_segment):
"""Similar to :meth:`segmentlist.find`
except it also returns the segment of
`superevents` which partially intersects argument.
If there are more than one intersections,
first occurence is returned.
Parameters
----------
superevents : list
list pulled down using the gracedb client
:method:`superevents`
event_segment : segment
segment object whose index is wanted
Returns
-------
match_segment : segment
segment in `self` which intersects. `None` if not found
"""
# create a segmentlist using start and end times
superevents = _superevent_segment_list(superevents)
for s in superevents:
if s.intersects(event_segment):
return s
return None
class _Event(segment):
"""An event implemented as an extension of
:class:`segment`
"""
def __new__(cls, t0, t_start, t_end, *args, **kwargs):
return super().__new__(cls, t_start, t_end)
def __init__(self, t0, t_start, t_end, gid, group=None, pipeline=None,
search=None, event_dict={}):
self.t0 = t0
self.gid = gid
self.group = group
self.pipeline = pipeline
self.search = search
self.event_dict = event_dict
class _SuperEvent(segment):
"""An superevent implemented as an extension of
:class:`segment`
"""
def __new__(cls, t_start, t_end, *args, **kwargs):
return super().__new__(cls, t_start, t_end)
def __init__(self, t_start, t_end, t_0, sid,
preferred_event=None, event_dict={}):
self.t_start = t_start
self.t_end = t_end
self.t_0 = t_0
self.superevent_id = sid
self.preferred_event = preferred_event
self.event_dict = event_dict