Source code for gwcelery.tasks.ligo_fermi_skymaps
"""Create and upload LVC-Fermi sky maps."""
import re
import urllib
from celery import group
from ligo.skymap.tool import ligo_skymap_combine
import astropy.utils.data
import lxml.etree
from . import gracedb
from ..import app
from ..util.cmdline import handling_system_exit
from ..util.tempfile import NamedTemporaryFile
[docs]def create_combined_skymap(graceid):
"""Creates and uploads the combined LVC-Fermi skymap. This also
uploads the external trigger skymap to the external trigger GraceDB
page."""
preferred_skymap = get_preferred_skymap(graceid)
message = 'Combined LVC-Fermi sky map using {0}.'.format(preferred_skymap)
new_skymap = re.findall(r'(.*).fits', preferred_skymap)[0]+'-gbm.fits.gz'
external_trigger_id = external_trigger(graceid)
return (external_trigger_heasarc.s(external_trigger_id) |
get_external_skymap.s() |
group(
combine_skymaps.s(gracedb.download(preferred_skymap,
graceid)) |
gracedb.upload.s(
new_skymap, graceid, message, ['sky_loc', 'public']),
gracedb.upload.s('glg_healpix_all_bn_v00.fit',
external_trigger_id,
'Sky map from HEASARC.',
['sky_loc', 'public']))
)
[docs]@app.task(autoretry_for=(ValueError,), retry_backoff=10,
retry_backoff_max=600)
def get_preferred_skymap(graceid):
"""Get the LVC skymap fits filename.
If not available, will try again 10 seconds later, then 20,
then 40, etc. until up to 10 minutes after initial attempt."""
gracedb_log = gracedb.get_log(graceid)
for message in reversed(gracedb_log):
comment = message['comment']
filename = message['filename']
if (filename.endswith('.fits.gz') or filename.endswith('.fits')) and \
'copied' in comment:
return filename
raise ValueError('No skymap available for {0} yet.'.format(graceid))
[docs]@app.task(shared=False)
def combine_skymaps(skymap1filebytes, skymap2filebytes):
"""This task combines the two input skymaps, in this case the external
trigger skymap and the LVC skymap and writes to a temporary
output file. It then returns the contents of the file as a byte array."""
with NamedTemporaryFile(mode='rb', suffix='.fits.gz') as combinedskymap, \
NamedTemporaryFile(content=skymap1filebytes) as skymap1file, \
NamedTemporaryFile(content=skymap2filebytes) as skymap2file, \
handling_system_exit():
ligo_skymap_combine.main([skymap1file.name,
skymap2file.name, combinedskymap.name])
return combinedskymap.read()
[docs]@app.task(shared=False)
def external_trigger(graceid):
"""Returns the associated external trigger GraceDB ID."""
em_events = gracedb.get_superevent(graceid)['em_events']
if len(em_events):
for exttrig in em_events:
if gracedb.get_event(exttrig)['search'] == 'GRB':
return exttrig
raise ValueError('No associated GRB EM event(s) for {0}.'.format(graceid))
[docs]@app.task(shared=False)
def external_trigger_heasarc(external_id):
"""Returns the HEASARC fits file link"""
gracedb_log = gracedb.get_log(external_id)
for message in gracedb_log:
if 'Original Data' in message['comment']:
filename = message['filename']
xmlfile = gracedb.download(urllib.parse.quote(filename),
external_id)
root = lxml.etree.fromstring(xmlfile)
heasarc_url = root.find('./What/Param[@name="LightCurve_URL"]'
).attrib['value']
return re.sub(r'quicklook(.*)', 'current/', heasarc_url)
raise ValueError('Not able to retrieve HEASARC link for {0}.'.format(
external_id))
[docs]@app.task(autoretry_for=(urllib.error.HTTPError,), retry_backoff=10,
retry_backoff_max=600)
def get_external_skymap(heasarc_link):
"""Download the Fermi sky map fits file and return the contents as a
byte array.
If not available, will try again 10 seconds later, then 20,
then 40, etc. until up to 10 minutes after initial attempt."""
trigger_id = re.sub(r'.*\/(\D+?)(\d+)(\D+)\/.*', r'\2', heasarc_link)
skymap_name = 'glg_healpix_all_bn{0}_v00.fit'.format(trigger_id)
skymap_link = heasarc_link + skymap_name
return astropy.utils.data.get_file_contents(
(skymap_link), encoding='binary', cache=False)