Source code for gwcelery.tasks.first2years_external

"""Create mock external events to be in coincidence
   with MDC superevents."""

import random
import re
from pathlib import Path

import numpy as np
from astropy.time import Time
from lxml import etree

from .. import app
from ..tests import data
from ..util import read_json
from . import external_triggers, igwn_alert, raven

IVORN_PIPELINE_DICT = {
    'Fermi': 'Fermi',
    'Swift': 'SWIFT',
    'SVOM': 'fsc',
    'SNEWS': 'SNEWS'
}
"""This dict maps from the pipeline name to the name listed in the IVORN."""


[docs]def create_upload_external_event(gpstime, pipeline, ext_search): """Create a random external event VOEvent or Kafka alert packet. Parameters ---------- gpstime : float Event's GPS time pipeline : str External trigger pipeline name ext_search : str Search field for external event Returns ------- event : str or dict Alert packet in format as if it was sent from GCN or Kafka, in a XML GCN notice alert packet in string format for the GRB or SubGRB search or a dictionary Kafka packet if for SubGRBTargeted search. """ new_date = str(Time(gpstime, format='gps', scale='utc').isot) + 'Z' new_TrigID = str(int(gpstime)) ra = random.uniform(0, 360) thetas = np.arange(-np.pi / 2, np.pi / 2, .01) dec = random.choices(np.rad2deg(thetas), weights=np.cos(thetas) / sum(np.cos(thetas)))[0] is_grb = False if pipeline == 'Fermi': error = random.uniform(1, 30) is_grb = True elif pipeline in {'Swift', 'SVOM', 'CHIME'}: error = random.uniform(0.05, .2) is_grb = True else: error = .05 # If SubGRBTargeted modify alert packet from Kafka if pipeline in {'CHIME', 'IceCube'} or ext_search == 'SubGRBTargeted': if pipeline == 'Fermi': # Template based on: # https://github.com/joshuarwood/gcn-schema/tree/main/gcn/notices/ # fermi/gbm alert = read_json(data, 'kafka_alert_fermi_targeted.json') # Remove sky map file to create our own sky map alert.pop('healpix_file') alert['dec_uncertainty'] = [error] elif pipeline == 'Swift': # Template based on: # https://github.com/nasa-gcn/gcn-schema/tree/main/gcn/notices/ # swift/bat/guano alert = read_json(data, 'kafka_alert_swift_targeted.json') alert['ra_uncertainty'] = [error] elif pipeline == 'CHIME': # Template based on: # https://github.com/nasa-gcn/gcn-schema/tree/main/gcn/notices/ # chime alert = read_json(data, 'kafka_alert_chime_detection.json') alert['ra_dec_error'] = [error] elif pipeline == 'IceCube': # Template based on: # https://github.com/nasa-gcn/gcn-schema/tree/main/gcn/notices/ # icecube alert = read_json(data, 'kafka_alert_icecube_bronze.json') alert['ra_dec_error'] = error else: raise ValueError( 'Only use SubGRBTargeted Fermi or Swift, or CHIME or ' 'IceCube events') alert['trigger_time'] = new_date alert['alert_datetime'] = new_date alert['id'] = [new_TrigID] alert['ra'] = ra alert['dec'] = dec # Generate FAR from max threshold and trials factors if ext_search == 'SubGRBTargeted': alert['far'] = \ (app.conf['raven_targeted_far_thresholds']['GRB'][pipeline] * random.uniform(0, 1)) external_triggers.handle_kafka_alert(alert) # Return VOEvent for testing until GraceDB natively ingests kafka # alerts return alert # Otherwise modify respective VOEvent template else: fname = str(Path(__file__).parent / '../tests/data/{0}{1}_{2}gcn.xml'.format( pipeline.lower(), '_subthresh' if ext_search == 'SubGRB' else '', 'grb_' if is_grb else '')) root = etree.parse(fname) # Change ivorn to indicate if this is an MDC event or O3 replay event root.xpath('.')[0].attrib['ivorn'] = \ 'ivo://lvk.internal/{0}#{1}{2}_event_{3}'.format( IVORN_PIPELINE_DICT[pipeline], '_subthresh' if ext_search == 'SubGRB' else '', 'MDC-test' if ext_search == 'MDC' else 'O3-replay', new_date).encode() # Change times to chosen time root.find("./Who/Date").text = str(new_date).encode() root.find(("./WhereWhen/ObsDataLocation/" "ObservationLocation/AstroCoords/Time/TimeInstant/" "ISOTime")).text = str(new_date).encode() if ext_search == 'SubGRB': root.find("./What/Param[@name='Trans_Num']").attrib['value'] = \ str(new_TrigID).encode() elif pipeline == 'SVOM': root.find(("./What/Group[@name='Svom_Identifiers']" "/Param[@name='Burst_Id']")).attrib['value'] = \ str(new_TrigID).encode() else: root.find("./What/Param[@name='TrigID']").attrib['value'] = \ str(new_TrigID).encode() if is_grb: # Give random sky position root.find(("./WhereWhen/ObsDataLocation/" "ObservationLocation/AstroCoords/Position2D/Value2/" "C1")).text = str(ra).encode() root.find(("./WhereWhen/ObsDataLocation/" "ObservationLocation/AstroCoords/Position2D/Value2/" "C2")).text = str(dec).encode() if pipeline != 'Swift': root.find( ("./WhereWhen/ObsDataLocation/" "ObservationLocation/AstroCoords/Position2D/" "Error2Radius")).text = str(error).encode() event = etree.tostring(root, xml_declaration=True, encoding="UTF-8", pretty_print=True) # Upload as from GCN if is_grb: external_triggers.handle_grb_gcn(event) else: external_triggers.handle_snews_gcn(event) return event
def _offset_time(gpstime, group, pipeline, ext_search, se_search): """Offsets the given GPS time by applying a random number within a time window, determine by the search being done. Parameters ---------- gpstime : float Event's GPS time group : str Burst or CBC pipeline : str Pipeline field for external event ext_search : str Search field for external event se_search : list Search field for superevent Returns ------- gsptime_adjusted : float Event's original gps time plus a random number within the determined search window """ tl, th = raven._time_window('S1', group, [pipeline], [ext_search], [se_search]) return gpstime + random.uniform(tl, th) def _is_joint_mdc(graceid, se_search): """Determine whether to upload an external events using user-defined frequency of MDC or AllSky superevents. Looks at the ending letters of a superevent (e.g. 'ac' from 'MS190124ac'), converts to a number, and checks if divisible by a number given in the configuration file. For example, if the configuration number :obj:`~gwcelery.conf.joint_mdc_freq` is 10, this means joint events with superevents ending with 'j', 't', 'ad', etc. Parameters ---------- graceid : str GraceDB ID of superevent se_search : str Search field for preferred event in superevent Returns ------- is_joint_mdc : bool Returns True if the GraceDB ID matches pre-determined frequency rates """ end_string = re.split(r'\d+', graceid)[-1].lower() val = 0 for i in range(len(end_string)): val += (ord(end_string[i]) - 96) * 26 ** (len(end_string) - i - 1) return val % int(app.conf['joint_mdc_freq']) == 0 if se_search == 'MDC' \ else val % int(app.conf['joint_O3_replay_freq']) == 0 def _get_pipelines_from_searches(searches): """Returns a list of randomly selected pipelines given a list of searches. For example if a search in the given list is GRB, a corresponding pipeline will be chosen between Fermi, Swift, and SVOM. Parameters ---------- searches : list List of external searches Returns ------- pipelines : list Returns list of external pipelines """ pipelines = [] # Choose pipeline(s) based on search for search in searches: if search in {'GRB', 'MDC'}: pipelines.append(np.random.choice(['Fermi', 'Swift', 'SVOM'], p=[.5, .2, .3])) elif search == 'SubGRB': pipelines.append('Fermi') elif search == 'SubGRBTargeted': pipelines.append(np.random.choice(['Fermi', 'Swift'], p=[.5, .5])) elif search == 'FRB': pipelines.append('CHIME') elif search == 'HEN': pipelines.append('IceCube') else: raise NotImplementedError( f"Non-supported search provided: {search}") return pipelines
[docs]@igwn_alert.handler('mdc_superevent', 'superevent', shared=False) def upload_external_event(alert, ext_search=None): """Upload a random GRB event(s) for a certain fraction of MDC or O3-replay superevents. Notes ----- Every n superevents, upload a GRB candidate within the standard CBC-GRB or Burst-GRB search window, where the frequency n is determined by the configuration variable :obj:`~gwcelery.conf.joint_mdc_freq` or :obj:`~gwcelery.conf.joint_O3_replay_freq`. For both testing internal MDC events on all instances and for all superevents uploaded to gracedb-playground. Parameters ---------- alert : dict IGWN alert packet ext_search : str Search field for external event, meant primarily for testing purposes Returns ------- events, pipelines : tuple Returns tuple of the list of external events created and the list of pipelines chosen for each event """ if alert['alert_type'] != 'new': return se_search = alert['object']['preferred_event_data']['search'] group = alert['object']['preferred_event_data']['group'] is_gracedb_playground = app.conf['gracedb_host'] \ == 'gracedb-playground.ligo.org' joint_mdc_alert = se_search == 'MDC' and _is_joint_mdc(alert['uid'], 'MDC') joint_o3replay_alert = se_search in {'AllSky', 'BBH'} and \ _is_joint_mdc(alert['uid'], 'AllSky') and is_gracedb_playground and \ group in {'CBC', 'Burst'} if not (joint_mdc_alert or joint_o3replay_alert): return # Potentially upload 1, 2, or 3 external events num = 1 + np.random.choice(np.arange(3), p=[.6, .3, .1]) if joint_mdc_alert: # If joint MDC alert, make external event MDC if ext_search is not None and ext_search != 'MDC': raise ValueError('External search must be "MDC" if MDC superevent') # If None, then set to MDC: ext_search = ext_search or 'MDC' # If not given a ext_search, choose searches from acceptable list if ext_search is None: # Determine search for external event and then pipelines ext_searches = \ np.random.choice(['GRB', 'SubGRB', 'SubGRBTargeted', 'FRB', 'HEN'], p=[.2, .1, .3, .3, .1], size=num) # If given a search, just populate fully with that else: ext_searches = np.full(num, ext_search) pipelines = _get_pipelines_from_searches(ext_searches) events = [] for pipeline, ext_search_t in zip(pipelines, ext_searches): gpstime = float(alert['object']['t_0']) new_time = _offset_time(gpstime, group, pipeline, ext_search_t, se_search) # Choose external grb pipeline to simulate ext_event = create_upload_external_event( new_time, pipeline, ext_search_t) events.append(ext_event) return events, pipelines
[docs]@app.task(ignore_result=True, shared=False) def upload_snews_event(): """Create and upload a SNEWS-like MDC external event. Returns ------- ext_event : str XML GCN notice alert packet in string format """ current_time = Time(Time.now(), format='gps').value ext_event = create_upload_external_event(current_time, 'SNEWS', 'MDC') return ext_event