"""Tasks that comprise the alert orchestrator, which responsible for the
vetting and annotation workflow to produce preliminary, initial, and update
alerts for gravitational-wave event candidates."""
import json
import re
from urllib.error import URLError
from celery import chain, group
from ligo.gracedb.rest import HTTPError
from ..import app
from . import bayestar
from . import circulars
from .core import identity, ordered_group
from . import detchar
from . import em_bright
from . import gcn
from . import gracedb
from . import lalinference
from . import lvalert
from . import skymaps
from . import p_astro_gstlal, p_astro_other
[docs]@lvalert.handler('superevent',
'mdc_superevent',
shared=False)
def handle_superevent(alert):
"""Schedule annotations for new superevents.
After waiting for a time specified by the
:obj:`~gwcelery.conf.orchestrator_timeout` configuration variable
for the choice of preferred event to settle down, this task peforms data
quality checks with :meth:`gwcelery.tasks.detchar.check_vectors` and
calls :meth:`~gwcelery.tasks.orchestrator.preliminary_alert` to send a
preliminary GCN notice.
"""
superevent_id = alert['uid']
if alert['alert_type'] == 'new':
start = alert['object']['t_start']
end = alert['object']['t_end']
(
_get_preferred_event.si(superevent_id).set(
countdown=app.conf['orchestrator_timeout']
)
|
gracedb.get_event.s()
|
group(
detchar.check_vectors.s(superevent_id, start, end)
|
preliminary_alert.s(superevent_id),
parameter_estimation.s(superevent_id)
)
).apply_async()
# check DQV label on superevent, run check_vectors if required
elif alert['alert_type'] == 'event_added':
new_event_id = alert['data']['preferred_event']
start = alert['data']['t_start']
end = alert['data']['t_end']
if 'DQV' in gracedb.get_labels(superevent_id):
(
detchar.check_vectors.s(new_event_id, superevent_id,
start, end)
|
_update_if_dqok.si(superevent_id, new_event_id)
).apply_async()
elif alert['alert_type'] == 'label_added':
label_name = alert['data']['name']
if label_name == 'ADVOK':
initial_alert(superevent_id)
elif label_name == 'ADVNO':
retraction_alert(superevent_id)
[docs]@lvalert.handler('cbc_gstlal',
'cbc_spiir',
'cbc_pycbc',
'cbc_mbtaonline',
shared=False)
def handle_cbc_event(alert):
"""Peform annotations for CBC events that depend on pipeline-specific
matched-filter parameter estimates.
Notes
-----
This LVAlert message handler is triggered by updates that include the files
``psd.xml.gz`` and ``ranking_data.xml.gz``. The table below lists which
files are created as a result, and which tasks generate them.
============================== =====================================================
File Task
============================== =====================================================
``bayestar.fits`` :meth:`gwcelery.tasks.bayestar.localize`
``em_bright.json`` :meth:`gwcelery.tasks.em_bright.classifier`
``p_astro.json`` :meth:`gwcelery.tasks.p_astro_gstlal.compute_p_astro`
============================== =====================================================
""" # noqa: E501
graceid = alert['uid']
# em_bright and p_astro calculation
if alert['alert_type'] == 'new':
pipeline = alert['object']['pipeline'].lower()
extra_attributes = alert['object']['extra_attributes']
snr = extra_attributes['CoincInspiral']['snr']
far = alert['object']['far']
mass1 = extra_attributes['SingleInspiral'][0]['mass1']
mass2 = extra_attributes['SingleInspiral'][0]['mass2']
# FIXME: GraceDb's JSON representations of LIGO-LW tables strips out
# fields whose values are zero. We did not notice this before because
# it's uncommon for most fields *except* spin to be zero.
# See https://git.ligo.org/emfollow/gwcelery/issues/85.
chi1 = extra_attributes['SingleInspiral'][0].get('spin1z', 0)
chi2 = extra_attributes['SingleInspiral'][0].get('spin2z', 0)
# em_bright task based on pipeline
em_bright_task = em_bright.classifier_gstlal if pipeline == 'gstlal' \
else em_bright.classifier_other
(
em_bright_task.si((mass1, mass2, chi1, chi2, snr), graceid)
|
gracedb.upload.s(
'em_bright.json', graceid,
'em bright complete', ['em_bright', 'public']
)
|
gracedb.create_label.si('EMBRIGHT_READY', graceid)
).delay()
# p_astro calculation for other pipelines
if pipeline != 'gstlal' or alert['object']['search'] == 'MDC':
(
p_astro_other.compute_p_astro.s(snr, far, mass1, mass2)
|
gracedb.upload.s(
'p_astro.json', graceid,
'p_astro computation complete', ['p_astro', 'public']
)
|
gracedb.create_label.si('PASTRO_READY', graceid)
).delay()
if alert['alert_type'] != 'log':
return
filename = alert['data']['filename']
if filename == 'psd.xml.gz':
(
ordered_group(
gracedb.download.s('coinc.xml', graceid),
gracedb.download.s('psd.xml.gz', graceid)
)
|
bayestar.localize.s(graceid)
|
gracedb.upload.s(
'bayestar.fits', graceid,
'sky localization complete', ['sky_loc', 'public']
)
|
gracedb.create_label.si('SKYMAP_READY', graceid)
).delay()
elif filename == 'ranking_data.xml.gz':
(
ordered_group(
gracedb.download.si('coinc.xml', graceid),
gracedb.download.si('ranking_data.xml.gz', graceid)
)
|
p_astro_gstlal.compute_p_astro.s()
|
gracedb.upload.s(
'p_astro.json', graceid,
'p_astro computation complete', ['p_astro', 'public']
)
|
gracedb.create_label.si('PASTRO_READY', graceid)
).delay()
[docs]@app.task(autoretry_for=(HTTPError, URLError, TimeoutError),
default_retry_delay=20.0, retry_backoff=True,
retry_kwargs=dict(max_retries=500), shared=False)
def _download(*args, **kwargs):
"""Download a file from GraceDb.
This works just like :func:`gwcelery.tasks.gracedb.download`, except that
it is retried for both :class:`TimeoutError` and
:class:`~urllib.error.URLError`. In particular, it will be retried for 404
(not found) errors."""
return gracedb.download(*args, **kwargs)
[docs]@app.task(shared=False, ignore_result=True)
def _update_if_dqok(superevent_id, event_id):
"""Update `preferred_event` of `superevent_id` to `event_id`
if `DQOK` label has been applied
"""
if 'DQOK' in gracedb.get_labels(superevent_id):
gracedb.update_superevent(superevent_id, preferred_event=event_id)
gracedb.create_log(
"DQOK applied based on new event %s" % (event_id), superevent_id)
[docs]@gracedb.task(shared=False)
def _get_preferred_event(superevent_id):
"""Determine preferred event for a superevent by querying GraceDb.
This works just like :func:`gwcelery.tasks.gracedb.get_superevent`, except
that it returns only the preferred event, and not the entire GraceDb JSON
response."""
return gracedb.get_superevent(superevent_id)['preferred_event']
[docs]@gracedb.task(shared=False)
def _create_voevent(classification, *args, **kwargs):
r"""Create a VOEvent record from an EM bright JSON file.
Parameters
----------
classification : tuple, None
A collection of JSON strings, generated by
:meth:`gwcelery.tasks.em_bright.classifier` and
:meth:`gwcelery.tasks.p_astro_gstlal.compute_p_astro` respectively; or
None
\*args
Additional positional arguments passed to
:meth:`gwcelery.tasks.gracedb.create_voevent`.
\*\*kwargs
Additional keyword arguments passed to
:meth:`gwcelery.tasks.gracedb.create_voevent`.
Returns
-------
str
The filename of the newly created VOEvent.
"""
kwargs = dict(kwargs)
if classification is not None:
# Merge source classification and source properties into kwargs.
for text in classification:
kwargs.update(json.loads(text))
# FIXME: These keys have differ between em_bright.json
# and the GraceDb REST API.
try:
kwargs['ProbHasNS'] = kwargs.pop('HasNS')
except KeyError:
pass
try:
kwargs['ProbHasRemnant'] = kwargs.pop('HasRemnant')
except KeyError:
pass
skymap_filename = kwargs.get('skymap_filename')
if skymap_filename is not None:
skymap_type = re.sub(r'\.fits(\..+)?$', '', skymap_filename)
kwargs.setdefault('skymap_type', skymap_type)
return gracedb.create_voevent(*args, **kwargs)
def _get_number_of_instruments(gracedb_id):
"""Get the number of gravitational-wave instruments that contributed to the
ranking statistic of a coincident event.
Parameters
----------
gracedb_id : str
The GraceDB ID.
Returns
-------
int
The number of instruments that contributed to the ranking statistic for
the event.
Notes
-----
The number of instruments that contributed *data* to an event is given by
the ``instruments`` key of the GraceDB event JSON structure. However, some
pipelines (e.g. gstlal) have a distinction between which instruments
contributed *data* and which were considered in the *ranking* of the
candidate. For such pipelines, we infer which pipelines contributed to the
ranking by counting only the SingleInspiral records for which the chi
squared field is non-empty.
"""
event = gracedb.get_event(gracedb_id)
attrib = event['extra_attributes']
try:
singles = attrib['SingleInspiral']
except KeyError:
return len(event.get('instruments', '').split(','))
else:
return sum(single.get('chisq') is not None for single in singles)
[docs]@app.task(ignore_result=True, shared=False)
def preliminary_alert(event, superevent_id):
"""Produce a preliminary alert by copying any sky maps.
This consists of the following steps:
1. Copy any sky maps and source classification from the preferred event
to the superevent.
2. Create standard annotations for sky maps including all-sky plots by
calling :meth:`gwcelery.tasks.skymaps.annotate_fits`.
3. Create a preliminary VOEvent.
4. Send the VOEvent to GCN.
5. Apply the GCN_PRELIM_SENT label to the superevent.
6. Create and upload a GCN Circular draft.
"""
preferred_event_id = event['graceid']
if event['group'] == 'CBC':
skymap_filename = 'bayestar.fits'
elif event['pipeline'] == 'CWB':
skymap_filename = 'cWB.fits.gz'
elif event['pipeline'] == 'oLIB':
skymap_filename = 'oLIB.fits.gz'
else:
skymap_filename = None
original_skymap_filename = skymap_filename
if skymap_filename.endswith('.fits'):
skymap_filename += '.gz'
# Determine if the event should be made public.
trials_factor = \
app.conf['preliminary_alert_trials_factor'][event['group'].lower()]
far_threshold = \
app.conf['preliminary_alert_far_threshold'][event['group'].lower()]
should_publish = (
not event['offline']
and trials_factor * event['far'] <= far_threshold
and {'DQV', 'INJ'}.isdisjoint(gracedb.get_labels(superevent_id))
# FIXME: For now, we do not issue public alerts for single-detecetor
# events. Remove this after we have gotten some more experience with
# single-instrument events.
and _get_number_of_instruments(preferred_event_id) != 1
# FIXME: For now, disable automated alerts except for cWB and gstlal.
# Remove this after oLIB, MBTA, PyCBC, and SPIIR have completed review.
and event['pipeline'].lower() in {'cwb', 'gstlal'})
if should_publish:
canvas = gracedb.expose.s(superevent_id)
else:
canvas = chain()
# If there is a sky map, then copy it to the superevent and create plots.
if skymap_filename is not None:
canvas |= (
_download.si(original_skymap_filename, preferred_event_id)
|
group(
gracedb.upload.s(
original_skymap_filename,
superevent_id,
message='Localization copied from {}'.format(
preferred_event_id),
tags=['sky_loc', 'public']
)
|
_download.si(original_skymap_filename, superevent_id)
|
skymaps.flatten.s(skymap_filename)
|
gracedb.upload.s(
skymap_filename,
superevent_id,
message='Flattened from multiresolution file {}'.format(
original_skymap_filename),
tags=['sky_loc', 'public']
)
|
gracedb.create_label.si('SKYMAP_READY', superevent_id),
)
)
(
_download.si(original_skymap_filename, superevent_id)
|
skymaps.annotate_fits(
original_skymap_filename,
superevent_id,
['sky_loc', 'public']
)
).apply_async()
# If this is a CBC event, then copy the EM bright classification.
if event['group'] == 'CBC':
canvas |= group(
_download.si('em_bright.json', preferred_event_id)
|
gracedb.upload.s(
'em_bright.json',
superevent_id,
message='Source properties copied from {}'.format(
preferred_event_id),
tags=['em_bright', 'public']
)
|
gracedb.create_label.si('EMBRIGHT_READY', superevent_id)
|
_download.si('em_bright.json', superevent_id),
_download.si('p_astro.json', preferred_event_id)
|
gracedb.upload.s(
'p_astro.json',
superevent_id,
message='Source classification copied from {}'.format(
preferred_event_id),
tags=['p_astro', 'public']
)
|
gracedb.create_label.si('PASTRO_READY', superevent_id)
|
_download.si('p_astro.json', superevent_id)
) | identity.s() # FIXME: necessary to pass result to next task?
else:
canvas |= identity.si(None)
# Send GCN notice and upload GCN circular draft for online events.
if should_publish:
# apply ADVREQ, compose preliminary GCN and send
canvas |= (
_create_voevent.s(
superevent_id, 'preliminary',
skymap_filename=skymap_filename,
internal=False,
open_alert=True
)
|
group(
gracedb.create_label.si('ADVREQ', superevent_id),
gracedb.download.s(superevent_id)
|
gcn.send.s()
|
gracedb.create_label.si('GCN_PRELIM_SENT', superevent_id),
gracedb.create_tag.s('public', superevent_id),
circulars.create_initial_circular.si(superevent_id)
|
gracedb.upload.s(
'preliminary-circular.txt',
superevent_id,
'Template for preliminary GCN Circular',
tags=['em_follow']
)
)
)
canvas.apply_async()
[docs]@app.task(ignore_result=True, shared=False)
def parameter_estimation(event, superevent_id):
"""Tasks for Parameter Estimation Followup with LALInference
This consists of the following steps:
1. Upload an ini file which is suitable for the target event.
2. Start Parameter Estimation if FAR is smaller than the PE threshold.
"""
preferred_event_id = event['graceid']
# FIXME: it will be better to start parameter estimation for 'burst'
# events.
if event['group'] == 'CBC' and event['search'] != 'MDC':
canvas = lalinference.pre_pe_tasks(event, superevent_id)
next_task = gracedb.upload.s(
filename=lalinference.ini_name,
graceid=superevent_id,
message='Automatically generated LALInference ' +
'configuration file for this event.',
tags='pe'
)
if event['far'] <= app.conf['pe_threshold']:
next_task = group(
next_task,
lalinference.start_pe.s(preferred_event_id, superevent_id)
)
else:
next_task |= gracedb.upload.si(
filecontents=None, filename=None,
graceid=superevent_id,
message='FAR is larger than the PE threshold, '
'{} Hz. Parameter Estimation will not '
'start.'.format(app.conf['pe_threshold']),
tags='pe'
)
canvas |= next_task
canvas.apply_async()
[docs]@app.task(ignore_result=True, shared=False)
def initial_or_update_alert(superevent_id, alert_type, skymap_filename=None,
em_bright_filename=None,
p_astro_filename=None):
"""
Create and send initial or update GCN notice.
Parameters
----------
superevent_id : str
The superevent ID.
alert_type : {'initial', 'update'}
The alert type.
skymap_filename : str, optional
The sky map to send.
If None, then most recent public sky map is used.
em_bright_filename : str, optional
The source classification file to use.
If None, then most recent one is used.
p_astro_filename : str, optional
The p_astro file to use.
If None, then most recent one is used.
"""
skymap_needed = (skymap_filename is None)
em_bright_needed = (em_bright_filename is None)
p_astro_needed = (p_astro_filename is None)
if skymap_needed or em_bright_needed or p_astro_needed:
for message in gracedb.get_log(superevent_id):
t = message['tag_names']
f = message['filename']
if not f:
continue
if skymap_needed \
and {'sky_loc', 'public'}.issubset(t) \
and (f.endswith('.fits') or f.endswith('.fits.gz')):
skymap_filename = f
if em_bright_needed \
and 'em_bright' in t \
and f.endswith('.json'):
em_bright_filename = f
if p_astro_needed \
and 'p_astro' in t \
and f.endswith('.json'):
p_astro_filename = f
(
gracedb.expose.s(superevent_id)
|
ordered_group(
gracedb.download.si(em_bright_filename, superevent_id),
gracedb.download.si(p_astro_filename, superevent_id)
)
|
# FIXME: random single task group here is needed for last group to run
# Check Celery issues on GitHub about this
group(
_create_voevent.s(
superevent_id,
alert_type,
skymap_filename=skymap_filename,
internal=False,
open_alert=True,
vetted=True
)
|
group(
gracedb.download.s(superevent_id)
|
gcn.send.s(),
circulars.create_initial_circular.si(superevent_id)
|
gracedb.upload.s(
'{}-circular.txt'.format(alert_type),
superevent_id,
'Template for {} GCN Circular'.format(alert_type),
tags=['em_follow']
),
gracedb.create_tag.s('public', superevent_id)
)
)
).apply_async()
[docs]@app.task(ignore_result=True, shared=False)
def initial_alert(superevent_id, skymap_filename=None,
em_bright_filename=None, p_astro_filename=None):
"""Produce an initial alert.
This does nothing more than call
:meth:`~gwcelery.tasks.orchestrator.initial_or_update_alert` with
``alert_type='initial'``.
Parameters
----------
superevent_id : str
The superevent ID.
skymap_filename : str, optional
The sky map to send.
If None, then most recent public sky map is used.
em_bright_filename : str, optional
The source classification file to use.
If None, then most recent one is used.
p_astro_filename : str, optional
The p_astro file to use.
If None, then most recent one is used.
"""
initial_or_update_alert(superevent_id, 'initial', skymap_filename,
em_bright_filename, p_astro_filename)
[docs]@app.task(ignore_result=True, shared=False)
def update_alert(superevent_id, skymap_filename=None,
em_bright_filename=None, p_astro_filename=None):
"""Produce an update alert.
This does nothing more than call
:meth:`~gwcelery.tasks.orchestrator.initial_or_update_alert` with
``alert_type='update'``.
Parameters
----------
superevent_id : str
The superevent ID.
skymap_filename : str, optional
The sky map to send.
If None, then most recent public sky map is used.
em_bright_filename : str, optional
The source classification file to use.
If None, then most recent one is used.
p_astro_filename : str, optional
The p_astro file to use.
If None, then most recent one is used.
"""
initial_or_update_alert(superevent_id, 'update', skymap_filename,
em_bright_filename, p_astro_filename)
[docs]@app.task(ignore_result=True, shared=False)
def retraction_alert(superevent_id):
"""Produce a retraction alert. This is currently just a stub and does
nothing more than create and send a VOEvent."""
(
gracedb.expose.s(superevent_id)
|
_create_voevent.si(
None, superevent_id, 'retraction',
internal=False,
open_alert=True,
vetted=True
)
|
group(
gracedb.download.s(superevent_id)
|
gcn.send.s(),
circulars.create_retraction_circular.si(superevent_id)
|
gracedb.upload.s(
'retraction-circular.txt',
superevent_id,
'Template for retraction GCN Circular',
tags=['em_follow']
),
gracedb.create_tag.s('public', superevent_id)
)
).apply_async()