Source code for gwcelery.tasks.orchestrator

"""Tasks that comprise the alert orchestrator, which responsible for the
vetting and annotation workflow to produce preliminary, initial, and update
alerts for gravitational-wave event candidates."""
import json
import re
from socket import gaierror
from urllib.error import URLError

from celery import chain, group
from ligo.gracedb.rest import HTTPError

from ..import app
from . import bayestar
from . import circulars
from .core import identity, ordered_group, ordered_group_first
from . import detchar
from . import em_bright
from . import gcn
from . import gracedb
from . import lalinference
from . import lvalert
from . import p_astro
from . import skymaps
from . import superevents


[docs]@lvalert.handler('cbc_gstlal', 'cbc_spiir', 'cbc_pycbc', 'cbc_mbtaonline', 'burst_olib', 'burst_cwb', shared=False) def handle_selected_as_preferred(alert): # FIXME DQV & INJ labels not incorporated into labeling ADVREQ, # Remove after !495 is merged if alert['alert_type'] == 'selected_as_preferred': superevent_id = alert['object']['superevent'] gracedb.create_label.delay('EM_Selected', superevent_id) if superevents.should_publish(alert['object']): gracedb.create_label.delay('ADVREQ', superevent_id)
[docs]@lvalert.handler('superevent', 'mdc_superevent', shared=False) def handle_superevent(alert): """Schedule annotations for new superevents. After waiting for a time specified by the :obj:`~gwcelery.conf.orchestrator_timeout` configuration variable for the choice of preferred event to settle down, this task performs data quality checks with :meth:`gwcelery.tasks.detchar.check_vectors` and calls :meth:`~gwcelery.tasks.orchestrator.preliminary_alert` to send a preliminary GCN notice. """ superevent_id = alert['uid'] # launch PE and detchar based on new type superevents if alert['alert_type'] == 'new': ( _get_preferred_event.si(superevent_id).set( countdown=app.conf['pe_timeout'] ) | ordered_group( _get_lowest_far.si(superevent_id), gracedb.get_event.s() ) | parameter_estimation.s(superevent_id) ).apply_async() elif alert['alert_type'] == 'label_added': label_name = alert['data']['name'] # launch preliminary alert on EM_Selected if label_name == 'EM_Selected': ( _get_preferred_event.si(superevent_id).set( countdown=app.conf['orchestrator_timeout'] ) | gracedb.get_event.s() | detchar.check_vectors.s( superevent_id, alert['object']['t_start'], alert['object']['t_end'] ) | preliminary_alert.s(superevent_id) ).apply_async() # launch initial/retraction alert on ADVOK/ADVNO elif label_name == 'ADVOK': initial_alert((None, None, None), superevent_id) elif label_name == 'ADVNO': retraction_alert(superevent_id) # check DQV label on superevent, run check_vectors if required elif alert['alert_type'] == 'event_added': new_event_id = alert['data']['preferred_event'] start = alert['data']['t_start'] end = alert['data']['t_end'] if 'DQV' in gracedb.get_labels(superevent_id): ( gracedb.get_event.s(new_event_id) | detchar.check_vectors.s(superevent_id, start, end) | _update_if_dqok.si(superevent_id, new_event_id) ).apply_async()
[docs]@lvalert.handler('cbc_gstlal', 'cbc_spiir', 'cbc_pycbc', 'cbc_mbtaonline', shared=False) def handle_cbc_event(alert): """Perform annotations for CBC events that depend on pipeline-specific matched-filter parameter estimates. Notes ----- This LVAlert message handler is triggered by updates that include the file ``psd.xml.gz``. The table below lists which files are created as a result, and which tasks generate them. ============================== ================================================ File Task ============================== ================================================ ``em_bright.json`` :meth:`gwcelery.tasks.em_bright.classifier` ``p_astro.json.json`` :meth:`gwcelery.tasks.p_astro.compute_p_astro` ============================== ================================================ """ # noqa: E501 graceid = alert['uid'] priority = 0 if superevents.should_publish(alert['object']) else 1 # em_bright and p_astro calculation if alert['alert_type'] == 'new': pipeline = alert['object']['pipeline'].lower() instruments = superevents.get_instruments_in_ranking_statistic( alert['object']) extra_attributes = alert['object']['extra_attributes'] snr = superevents.get_snr(alert['object']) far = alert['object']['far'] mass1 = extra_attributes['SingleInspiral'][0]['mass1'] mass2 = extra_attributes['SingleInspiral'][0]['mass2'] chi1 = extra_attributes['SingleInspiral'][0]['spin1z'] chi2 = extra_attributes['SingleInspiral'][0]['spin2z'] em_bright_task = em_bright.classifier_gstlal if pipeline == 'gstlal' \ else em_bright.classifier_other ( em_bright_task.si((mass1, mass2, chi1, chi2, snr), graceid) | gracedb.upload.s( 'em_bright.json', graceid, 'em bright complete', ['em_bright', 'public'] ) | gracedb.create_label.si('EMBRIGHT_READY', graceid) ).apply_async(priority=priority) # p_astro calculation for other pipelines if pipeline != 'gstlal' or alert['object']['search'] == 'MDC': ( p_astro.compute_p_astro.s(snr, far, mass1, mass2, pipeline, instruments) | gracedb.upload.s( 'p_astro.json', graceid, 'p_astro computation complete', ['p_astro', 'public'] ) | gracedb.create_label.si('PASTRO_READY', graceid) ).apply_async(priority=priority)
[docs]@lvalert.handler('superevent', 'mdc_superevent', shared=False) def handle_posterior_samples(alert): """Generate multi-resolution and flat-resolution fits files and skymaps from an uploaded HDF5 file containing posterior samples. """ if alert['alert_type'] != 'log' or \ not alert['data']['filename'].endswith('.posterior_samples.hdf5'): return superevent_id = alert['uid'] filename = alert['data']['filename'] prefix, _ = filename.rsplit('.posterior_samples.') # FIXME: It is assumed that posterior samples always come from # lalinference. After bilby or rift is integrated, this has to be fixed. ( gracedb.download.si(filename, superevent_id) | skymaps.skymap_from_samples.s() | group( skymaps.annotate_fits.s( '{}.fits.gz'.format(prefix), superevent_id, ['pe', 'sky_loc'] ), gracedb.upload.s( '{}.multiorder.fits'.format(prefix), superevent_id, 'Multiresolution fits file generated from {}'.format(filename), ['pe', 'sky_loc'] ), skymaps.flatten.s('{}.fits.gz'.format(prefix)) | gracedb.upload.s( '{}.fits.gz'.format(prefix), superevent_id, 'Flat-resolution fits file created from {}'.format(filename), ['pe', 'sky_loc'] ) ) ).delay()
[docs]@app.task(autoretry_for=(gaierror, HTTPError, URLError, TimeoutError), default_retry_delay=20.0, retry_backoff=True, retry_kwargs=dict(max_retries=500), shared=False) def _download(*args, **kwargs): """Download a file from GraceDB. This works just like :func:`gwcelery.tasks.gracedb.download`, except that it is retried for both :class:`TimeoutError` and :class:`~urllib.error.URLError`. In particular, it will be retried for 404 (not found) errors.""" # FIXME: remove ._orig_run when this bug is fixed: # https://github.com/getsentry/sentry-python/issues/370 return gracedb.download._orig_run(*args, **kwargs)
[docs]@app.task(shared=False, ignore_result=True) def _update_if_dqok(superevent_id, event_id): """Update `preferred_event` of `superevent_id` to `event_id` if `DQOK` label has been applied """ if 'DQOK' in gracedb.get_labels(superevent_id): gracedb.update_superevent(superevent_id, preferred_event=event_id) gracedb.create_log( "DQOK applied based on new event %s" % (event_id), superevent_id)
[docs]@gracedb.task(shared=False) def _get_preferred_event(superevent_id): """Determine preferred event for a superevent by querying GraceDB. This works just like :func:`gwcelery.tasks.gracedb.get_superevent`, except that it returns only the preferred event, and not the entire GraceDB JSON response.""" # FIXME: remove ._orig_run when this bug is fixed: # https://github.com/getsentry/sentry-python/issues/370 return gracedb.get_superevent._orig_run(superevent_id)['preferred_event']
[docs]@gracedb.task(shared=False) def _create_voevent(classification, *args, **kwargs): r"""Create a VOEvent record from an EM bright JSON file. Parameters ---------- classification : tuple, None A collection of JSON strings, generated by :meth:`gwcelery.tasks.em_bright.classifier` and :meth:`gwcelery.tasks.p_astro.compute_p_astro` or content of ``p_astro.json`` uploaded by gstlal respectively; or None \*args Additional positional arguments passed to :meth:`gwcelery.tasks.gracedb.create_voevent`. \*\*kwargs Additional keyword arguments passed to :meth:`gwcelery.tasks.gracedb.create_voevent`. Returns ------- str The filename of the newly created VOEvent. """ kwargs = dict(kwargs) if classification is not None: # Merge source classification and source properties into kwargs. for text in classification: if text is not None: kwargs.update(json.loads(text)) # FIXME: These keys have differ between em_bright.json # and the GraceDB REST API. try: kwargs['ProbHasNS'] = kwargs.pop('HasNS') except KeyError: pass try: kwargs['ProbHasRemnant'] = kwargs.pop('HasRemnant') except KeyError: pass skymap_filename = kwargs.get('skymap_filename') if skymap_filename is not None: skymap_type = re.sub(r'\.fits(\..+)?$', '', skymap_filename) kwargs.setdefault('skymap_type', skymap_type) # FIXME: remove ._orig_run when this bug is fixed: # https://github.com/getsentry/sentry-python/issues/370 return gracedb.create_voevent._orig_run(*args, **kwargs)
[docs]@gracedb.task(shared=False) def _create_label_and_return_filename(filename, label, graceid): gracedb.create_label(label, graceid) return filename
[docs]@app.task(ignore_result=True, shared=False) def preliminary_alert(event, superevent_id): """Produce a preliminary alert by copying any sky maps. This consists of the following steps: 1. Copy any sky maps and source classification from the preferred event to the superevent. 2. Create standard annotations for sky maps including all-sky plots by calling :meth:`gwcelery.tasks.skymaps.annotate_fits`. 3. Create a preliminary VOEvent. 4. Send the VOEvent to GCN. 5. Apply the GCN_PRELIM_SENT label to the superevent. 6. Create and upload a GCN Circular draft. """ priority = 0 if superevents.should_publish(event) else 1 preferred_event_id = event['graceid'] if event['group'] == 'CBC': skymap_filename = 'bayestar.multiorder.fits' elif event['pipeline'] == 'CWB': skymap_filename = 'cWB.fits.gz' elif event['pipeline'] == 'oLIB': skymap_filename = 'oLIB.fits.gz' else: skymap_filename = None original_skymap_filename = skymap_filename if skymap_filename.endswith('.multiorder.fits'): skymap_filename = skymap_filename.replace('.multiorder.fits', '.fits') if skymap_filename.endswith('.fits'): skymap_filename += '.gz' # Determine if the event should be made public. is_publishable = (superevents.should_publish(event) and {'DQV', 'INJ'}.isdisjoint(event['labels'])) canvas = chain() if event['group'] == 'CBC': canvas |= ( ordered_group( gracedb.download.si('coinc.xml', preferred_event_id), _download.si('psd.xml.gz', preferred_event_id) ) | bayestar.localize.s(preferred_event_id) | gracedb.upload.s( 'bayestar.multiorder.fits', preferred_event_id, 'sky localization complete', ['sky_loc', 'public'] ) | gracedb.create_label.si('SKYMAP_READY', preferred_event_id) ) canvas |= ordered_group( ( _download.si(original_skymap_filename, preferred_event_id) | ordered_group_first( skymaps.flatten.s(skymap_filename) | gracedb.upload.s( skymap_filename, superevent_id, message='Flattened from multiresolution file {}'.format( original_skymap_filename), tags=['sky_loc', 'public'] ) | _create_label_and_return_filename.s( 'SKYMAP_READY', superevent_id ), gracedb.upload.s( original_skymap_filename, superevent_id, message='Localization copied from {}'.format( preferred_event_id), tags=['sky_loc', 'public'] ), skymaps.annotate_fits.s( skymap_filename, superevent_id, ['sky_loc', 'public'] ) ) ) if skymap_filename is not None else identity.s(None), ( _download.si('em_bright.json', preferred_event_id) | gracedb.upload.s( 'em_bright.json', superevent_id, message='Source properties copied from {}'.format( preferred_event_id), tags=['em_bright', 'public'] ) | _create_label_and_return_filename.s( 'EMBRIGHT_READY', superevent_id ) ) if event['group'] == 'CBC' else identity.s(None), ( _download.si('p_astro.json', preferred_event_id) | gracedb.upload.s( 'p_astro.json', superevent_id, message='Source classification copied from {}'.format( preferred_event_id), tags=['p_astro', 'public'] ) | _create_label_and_return_filename.s( 'PASTRO_READY', superevent_id ) ) if event['group'] == 'CBC' else identity.s(None) ) # Send GCN notice and upload GCN circular draft for online events. if is_publishable: canvas |= preliminary_initial_update_alert.s( superevent_id, 'preliminary') canvas.apply_async(priority=priority)
[docs]@gracedb.task(shared=False) def _get_lowest_far(superevent_id): """Obtain the lowest FAR of the events contained in the target superevent""" # FIXME: remove ._orig_run when this bug is fixed: # https://github.com/getsentry/sentry-python/issues/370 return min(gracedb.get_event._orig_run(gid)['far'] for gid in gracedb.get_superevent._orig_run(superevent_id)["gw_events"])
[docs]@app.task(ignore_result=True, shared=False) def parameter_estimation(far_event, superevent_id): """Tasks for Parameter Estimation Followup with LALInference This consists of the following steps: 1. Prepare and upload an ini file which is suitable for the target event. 2. Start Parameter Estimation if FAR is smaller than the PE threshold. """ far, event = far_event preferred_event_id = event['graceid'] # FIXME: it will be better to start parameter estimation for 'burst' # events. if event['group'] == 'CBC' and event['search'] != 'MDC': canvas = lalinference.pre_pe_tasks(event, superevent_id) if far <= app.conf['pe_threshold']: canvas |= lalinference.start_pe.s(preferred_event_id, superevent_id) else: canvas |= gracedb.upload.si( filecontents=None, filename=None, graceid=superevent_id, message='FAR is larger than the PE threshold, ' '{} Hz. Parameter Estimation will not ' 'start.'.format(app.conf['pe_threshold']), tags='pe' ) canvas.apply_async()
[docs]@gracedb.task(ignore_result=True, shared=False) def preliminary_initial_update_alert(filenames, superevent_id, alert_type): """ Create and send a preliminary, initial, or update GCN notice. Parameters ---------- filenames : tuple A list of the sky map, em_bright, and p_astro filenames. superevent_id : str The superevent ID. alert_type : {'preliminary', 'initial', 'update'} The alert type. Notes ----- This function is decorated with :obj:`gwcelery.tasks.gracedb.task` rather than :obj:`gwcelery.app.task` so that a synchronous call to :func:`gwcelery.tasks.gracedb.get_log` is retried in the event of GraceDB API failures. """ skymap_filename, em_bright_filename, p_astro_filename = filenames skymap_needed = (skymap_filename is None) em_bright_needed = (em_bright_filename is None) p_astro_needed = (p_astro_filename is None) if skymap_needed or em_bright_needed or p_astro_needed: for message in gracedb.get_log(superevent_id): t = message['tag_names'] f = message['filename'] v = message['file_version'] fv = '{},{}'.format(f, v) if not f: continue if skymap_needed \ and {'sky_loc', 'public'}.issubset(t) \ and f.endswith('.fits.gz'): skymap_filename = fv if em_bright_needed \ and 'em_bright' in t \ and f.endswith('.json'): em_bright_filename = fv if p_astro_needed \ and 'p_astro' in t \ and f.endswith('.json'): p_astro_filename = fv send_canvas = ( gracedb.download.s(superevent_id) | gcn.send.s() ) if alert_type == 'preliminary': send_canvas |= gracedb.create_label.si( 'GCN_PRELIM_SENT', superevent_id) ( group( gracedb.expose.s(superevent_id), *( gracedb.create_tag.s(filename, 'public', superevent_id) for filename in [ skymap_filename, em_bright_filename, p_astro_filename ] if filename is not None ) ) | group( gracedb.download.si(em_bright_filename, superevent_id), gracedb.download.si(p_astro_filename, superevent_id), ) | _create_voevent.s( superevent_id, alert_type, skymap_filename=skymap_filename, internal=False, open_alert=True, vetted=True ) | group( send_canvas, circulars.create_initial_circular.si(superevent_id) | gracedb.upload.s( '{}-circular.txt'.format(alert_type), superevent_id, 'Template for {} GCN Circular'.format(alert_type), tags=['em_follow'] ), gracedb.create_tag.s('public', superevent_id) ) ).apply_async()
[docs]@gracedb.task(ignore_result=True, shared=False) def initial_alert(filenames, superevent_id): """Produce an initial alert. This does nothing more than call :meth:`~gwcelery.tasks.orchestrator.preliminary_initial_update_alert` with ``alert_type='initial'``. Parameters ---------- filenames : tuple A list of the sky map, em_bright, and p_astro filenames. superevent_id : str The superevent ID. Notes ----- This function is decorated with :obj:`gwcelery.tasks.gracedb.task` rather than :obj:`gwcelery.app.task` so that a synchronous call to :func:`gwcelery.tasks.gracedb.get_log` is retried in the event of GraceDB API failures. """ preliminary_initial_update_alert(filenames, superevent_id, 'initial')
[docs]@gracedb.task(ignore_result=True, shared=False) def update_alert(filenames, superevent_id): """Produce an update alert. This does nothing more than call :meth:`~gwcelery.tasks.orchestrator.preliminary_initial_update_alert` with ``alert_type='update'``. Parameters ---------- filenames : tuple A list of the sky map, em_bright, and p_astro filenames. superevent_id : str The superevent ID. Notes ----- This function is decorated with :obj:`gwcelery.tasks.gracedb.task` rather than :obj:`gwcelery.app.task` so that a synchronous call to :func:`gwcelery.tasks.gracedb.get_log` is retried in the event of GraceDB API failures. """ preliminary_initial_update_alert(filenames, superevent_id, 'update')
[docs]@app.task(ignore_result=True, shared=False) def retraction_alert(superevent_id): """Produce a retraction alert. This is currently just a stub and does nothing more than create and send a VOEvent.""" ( gracedb.expose.si(superevent_id) | _create_voevent.si( None, superevent_id, 'retraction', internal=False, open_alert=True, vetted=True ) | group( gracedb.download.s(superevent_id) | gcn.send.s(), circulars.create_retraction_circular.si(superevent_id) | gracedb.upload.s( 'retraction-circular.txt', superevent_id, 'Template for retraction GCN Circular', tags=['em_follow'] ), gracedb.create_tag.s('public', superevent_id) ) ).apply_async()