Source code for gwcelery.tasks.external_skymaps

"""Create and upload external sky maps."""
from astropy import units as u
from astropy.coordinates import ICRS, SkyCoord
from astropy_healpix import HEALPix, pixel_resolution_to_nside
from celery import group
#  import astropy.utils.data
import numpy as np
from ligo.skymap.io import fits
from ligo.skymap.tool import ligo_skymap_combine
import gcn
import healpy as hp
import lxml.etree
import re
import ssl
import urllib

from ..import app
from . import gracedb
from . import skymaps
from ..util.cmdline import handling_system_exit
from ..util.tempfile import NamedTemporaryFile
from ..import _version


[docs]def create_combined_skymap(se_id, ext_id): """Creates and uploads the combined LVC-Fermi skymap. This also uploads the external trigger skymap to the external trigger GraceDB page. """ se_skymap_filename = get_skymap_filename(se_id) ext_skymap_filename = get_skymap_filename(ext_id) new_skymap_filename = re.findall(r'(.*).fits.gz', se_skymap_filename)[0] # FIXME: put download functions in canvas se_skymap = gracedb.download(se_skymap_filename, se_id) ext_skymap = gracedb.download(ext_skymap_filename, ext_id) message = 'Combined LVC-external sky map using {0} and {1}'.format( se_skymap_filename, ext_skymap_filename) message_png = ( 'Mollweide projection of <a href="/api/events/{graceid}/files/' '{filename}">{filename}</a>').format( graceid=se_id, filename=new_skymap_filename + '-ext.fits.gz') ( combine_skymaps.si(se_skymap, ext_skymap) | group( gracedb.upload.s(new_skymap_filename + '-ext.fits.gz', se_id, message, ['sky_loc', 'public']), skymaps.plot_allsky.s() | gracedb.upload.s(new_skymap_filename + '-ext.png', se_id, message_png, ['sky_loc', 'ext_coinc', 'public']) ) ).delay()
[docs]@app.task(autoretry_for=(ValueError,), retry_backoff=10, retry_backoff_max=600) def get_skymap_filename(graceid): """Get the skymap fits filename. If not available, will try again 10 seconds later, then 20, then 40, etc. until up to 10 minutes after initial attempt. """ gracedb_log = gracedb.get_log(graceid) if 'S' in graceid: for message in reversed(gracedb_log): filename = message['filename'] if filename.endswith('.fits.gz'): return filename else: for message in reversed(gracedb_log): filename = message['filename'] if (filename.endswith('.fits') or filename.endswith('.fit') or filename.endswith('.fits.gz')): return filename raise ValueError('No skymap available for {0} yet.'.format(graceid))
[docs]@app.task(shared=False) def combine_skymaps(skymap1filebytes, skymap2filebytes): """This task combines the two input skymaps, in this case the external trigger skymap and the LVC skymap and writes to a temporary output file. It then returns the contents of the file as a byte array. """ with NamedTemporaryFile(mode='rb', suffix='.fits.gz') as combinedskymap, \ NamedTemporaryFile(content=skymap1filebytes) as skymap1file, \ NamedTemporaryFile(content=skymap2filebytes) as skymap2file, \ handling_system_exit(): ligo_skymap_combine.main([skymap1file.name, skymap2file.name, combinedskymap.name]) return combinedskymap.read()
[docs]@app.task(shared=False) def external_trigger(graceid): """Returns the associated external trigger GraceDB ID.""" em_events = gracedb.get_superevent(graceid)['em_events'] if len(em_events): for exttrig in em_events: if gracedb.get_event(exttrig)['search'] == 'GRB': return exttrig raise ValueError('No associated GRB EM event(s) for {0}.'.format(graceid))
[docs]@app.task(shared=False) def external_trigger_heasarc(external_id): """Returns the HEASARC fits file link.""" gracedb_log = gracedb.get_log(external_id) for message in gracedb_log: if 'Original Data' in message['comment']: filename = message['filename'] xmlfile = gracedb.download(urllib.parse.quote(filename), external_id) root = lxml.etree.fromstring(xmlfile) heasarc_url = root.find('./What/Param[@name="LightCurve_URL"]' ).attrib['value'] return re.sub(r'quicklook(.*)', 'current/', heasarc_url) raise ValueError('Not able to retrieve HEASARC link for {0}.'.format( external_id))
[docs]@app.task(autoretry_for=(urllib.error.HTTPError,), retry_backoff=10, retry_backoff_max=600) def get_external_skymap(link, search): """Download the Fermi sky map fits file and return the contents as a byte array. If GRB, will construct a HEASARC url, while if SubGRB, will use the link directly. If not available, will try again 10 seconds later, then 20, then 40, etc. until up to 10 minutes after initial attempt. """ if search == 'GRB': # if Fermi GRB, determine final HEASARC link trigger_id = re.sub(r'.*\/(\D+?)(\d+)(\D+)\/.*', r'\2', link) skymap_name = 'glg_healpix_all_bn{0}_v00.fit'.format(trigger_id) skymap_link = link + skymap_name elif search == 'SubGRB': skymap_link = link # FIXME: Under Anaconda on the LIGO Caltech computing cluster, Python # (and curl, for that matter) fail to negotiate TLSv1.2 with # heasarc.gsfc.nasa.gov context = ssl.create_default_context() context.options |= ssl.OP_NO_TLSv1_3 # return astropy.utils.data.get_file_contents( # (skymap_link), encoding='binary', cache=False) return urllib.request.urlopen(skymap_link, context=context).read()
[docs]@app.task(autoretry_for=(urllib.error.HTTPError, urllib.error.URLError, ValueError,), retry_backoff=10, retry_backoff_max=1200) def get_upload_external_skymap(graceid, search, skymap_link=None): """If a Fermi sky map is not uploaded yet, tries to download one and upload to external event. If sky map is not available, passes so that this can be re-run the next time an update GCN notice is received. If GRB, will construct a HEASARC url, while if SubGRB, will use the link directly. """ try: filename = get_skymap_filename(graceid) if 'glg_healpix_all_bn_v00.fit' in filename: return except ValueError: pass if search == 'GRB': external_skymap_canvas = ( external_trigger_heasarc.si(graceid) | get_external_skymap.s(search) ) elif search == 'SubGRB': external_skymap_canvas = get_external_skymap.si(skymap_link, search) skymap_filename = 'glg_healpix_all_bn_v00' message = ( 'Mollweide projection of <a href="/api/events/{graceid}/files/' '{filename}">{filename}</a>').format( graceid=graceid, filename=skymap_filename + '.fits') ( external_skymap_canvas | group( gracedb.upload.s( skymap_filename + '.fits', graceid, 'Official sky map from Fermi analysis.', ['sky_loc']), skymaps.plot_allsky.s() | gracedb.upload.s(skymap_filename + '.png', graceid, message, ['sky_loc']) ) | gracedb.create_label.si('EXT_SKYMAP_READY', graceid) ).delay()
[docs]def create_external_skymap(ra, dec, error, pipeline, notice_type=111): """Create a sky map, either a gaussian or a single pixel sky map, given an RA, dec, and error radius. If from Fermi, convolves the sky map with both a core and tail Gaussian and then sums these to account for systematic effects as measured in :doi:`10.1088/0067-0049/216/2/32` If from Swift, converts the error radius from that containing 90% of the credible region to ~68% (see description of Swift error here:`https://gcn.gsfc.nasa.gov/swift.html#tc7`) Parameters ---------- ra : float right ascension in deg dec: float declination in deg error: float error radius in deg Returns ------- skymap : numpy array sky map array """ max_nside = 2048 if error: # Correct 90% containment to 1-sigma for Swift if pipeline == 'Swift': error /= np.sqrt(-2 * np.log1p(-.9)) error_radius = error * u.deg nside = pixel_resolution_to_nside(error_radius, round='up') else: nside = np.inf if nside >= max_nside: nside = max_nside # Find the one pixel the event can localized to hpx = HEALPix(nside, 'ring', frame=ICRS()) skymap = np.zeros(hpx.npix) ind = hpx.lonlat_to_healpix(ra * u.deg, dec * u.deg) skymap[ind] = 1. else: # If larger error, create gaussian sky map hpx = HEALPix(nside, 'ring', frame=ICRS()) ipix = np.arange(hpx.npix) # Evaluate Gaussian. center = SkyCoord(ra * u.deg, dec * u.deg) distance = hpx.healpix_to_skycoord(ipix).separation(center) skymap = np.exp(-0.5 * np.square(distance / error_radius).to_value( u.dimensionless_unscaled)) skymap /= skymap.sum() if pipeline == 'Fermi': # Correct for Fermi systematics based on recommendations from GBM team # Convolve with both a narrow core and wide tail Gaussian with error # radius determined by the scales respectively, each comprising a # fraction determined by the weights respectively if notice_type == gcn.NoticeType.FERMI_GBM_FLT_POS: # Flight notice # Values from first row of Table 7 weights = [0.897, 0.103] scales = [7.52, 55.6] elif notice_type == gcn.NoticeType.FERMI_GBM_GND_POS: # Ground notice # Values from first row of Table 3 weights = [0.804, 0.196] scales = [3.72, 13.7] elif notice_type == gcn.NoticeType.FERMI_GBM_FIN_POS: # Final notice # Values from second row of Table 3 weights = [0.900, 0.100] scales = [3.71, 14.3] else: raise AssertionError( 'Need to provide a supported Fermi notice type') skymap = sum( weight * hp.sphtfunc.smoothing(skymap, sigma=np.radians(scale)) for weight, scale in zip(weights, scales)) return skymap
[docs]def write_to_fits(skymap, event, notice_type, notice_date): """Write external sky map fits file, populating the header with relevant info. Parameters ---------- skymap : numpy array sky map array event : dict Dictionary of Swift external event Returns ------- skymap fits : bytes array bytes array of sky map """ notice_type_dict = { '53': 'INTEGRAL_WAKEUP', '54': 'INTEGRAL_REFINED', '55': 'INTEGRAL_OFFLINE', '60': 'SWIFT_BAT_GRB_ALERT', '61': 'SWIFT_BAT_GRB_POSITION', '105': 'AGILE_MCAL_ALERT', '110': 'FERMI_GBM_ALERT', '111': 'FERMI_GBM_FLT_POS', '112': 'FERMI_GBM_GND_POS', '115': 'FERMI_GBM_FINAL_POS', '131': 'FERMI_GBM_SUBTHRESHOLD'} if notice_type is None: msgtype = event['pipeline'] + '_LVK_TARGETED_SEARCH' else: msgtype = notice_type_dict[str(notice_type)] gcn_id = event['extra_attributes']['GRB']['trigger_id'] with NamedTemporaryFile(suffix='.fits.gz') as f: fits.write_sky_map(f.name, skymap, objid=gcn_id, url=event['links']['self'], instruments=event['pipeline'], gps_time=event['gpstime'], msgtype=msgtype, msgdate=notice_date, creator='gwcelery', origin='LIGO-VIRGO-KAGRA', vcs_version=_version.get_versions()['version'], history='file only for internal use') with open(f.name, 'rb') as file: return file.read()
[docs]@app.task(shared=False) def create_upload_external_skymap(event, notice_type, notice_date): """Create and upload external sky map using RA, dec, and error radius information. Parameters ---------- event : dict Dictionary of Swift external event """ graceid = event['graceid'] skymap_filename = event['pipeline'].lower() + '_skymap.fits.gz' ra = event['extra_attributes']['GRB']['ra'] dec = event['extra_attributes']['GRB']['dec'] error = event['extra_attributes']['GRB']['error_radius'] pipeline = event['pipeline'] skymap = create_external_skymap(ra, dec, error, pipeline, notice_type) skymap_data = write_to_fits(skymap, event, notice_type, notice_date) message = ( 'Mollweide projection of <a href="/api/events/{graceid}/files/' '{filename}">{filename}</a>').format( graceid=graceid, filename=skymap_filename) ( gracedb.upload.si(skymap_data, skymap_filename, graceid, 'Sky map created from GCN RA, dec, and error.', ['sky_loc']) | skymaps.plot_allsky.si(skymap_data, ra=ra, dec=dec) | gracedb.upload.s(event['pipeline'].lower() + '_skymap.png', graceid, message, ['sky_loc']) | gracedb.create_label.si('EXT_SKYMAP_READY', graceid) ).delay()