Source code for gwcelery.tasks.bayestar

"""Rapid sky localization with :mod:`BAYESTAR <ligo.skymap.bayestar>`."""
import io
import logging
import urllib.parse

from celery.exceptions import Ignore
from ligo.gracedb.logging import GraceDbLogHandler
from ligo.skymap import bayestar as _bayestar
from ligo.skymap.io import events
from ligo.skymap.io import fits

from .. import app
from . import gracedb

log = logging.getLogger('BAYESTAR')


[docs]@app.task(queue='openmp', shared=False) def localize(coinc_psd, graceid, filename='bayestar.fits.gz', disabled_detectors=None): """Generate a rapid sky localization using :mod:`BAYESTAR <ligo.skymap.bayestar>`. Parameters ---------- coinc_psd : tuple Tuple consisting of the byte contents of the input event's ``coinc.xml`` and ``psd.xml.gz`` files. graceid : str The GraceDB ID, used for FITS metadata and recording log messages to GraceDb. filename : str, optional The name of the FITS file. disabled_detectors : list, optional List of detectors to disable. Returns ------- bytes The byte contents of the finished FITS file. Notes ----- This task is adapted from the command-line tool :doc:`bayestar-localize-lvalert <ligo/skymap/tool/bayestar_localize_lvalert>`. It should execute in a special queue for computationally intensive, multithreaded, OpenMP tasks. """ handler = GraceDbLogHandler(gracedb.client, graceid) handler.setLevel(logging.INFO) log.addHandler(handler) # Determine the base URL for event pages. scheme, netloc, *_ = urllib.parse.urlparse(gracedb.client._service_url) base_url = urllib.parse.urlunparse((scheme, netloc, 'events', '', '', '')) try: # A little bit of Cylon humor log.info('by your command...') # Parse event coinc, psd = coinc_psd coinc = io.BytesIO(coinc) psd = io.BytesIO(psd) event_source = events.ligolw.open(coinc, psd_file=psd, coinc_def=None) if disabled_detectors: event_source = events.detector_disabled.open( event_source, disabled_detectors) event, = event_source.values() # Run BAYESTAR log.info("starting sky localization") skymap = _bayestar.localize(event) skymap.meta['objid'] = str(graceid) skymap.meta['url'] = '{}/{}'.format(base_url, graceid) log.info("sky localization complete") with io.BytesIO() as f: fits.write_sky_map(f, skymap, moc=True) return f.getvalue() except events.DetectorDisabledError: raise Ignore() except: # noqa # Produce log message for any otherwise uncaught exception log.exception("sky localization failed") raise finally: log.removeHandler(handler) del handler