"""Source Parameter Estimation with LALInference."""
from distutils.spawn import find_executable
from distutils.dir_util import mkpath
import glob
import itertools
import json
import os
import shutil
import subprocess
import tempfile
import urllib
from celery import group
from gwdatafind import find_urls
from ligo.gracedb.exceptions import HTTPError
import numpy as np
from .. import app
from ..jinja import env
from .core import ordered_group
from . import condor
from . import gracedb
ini_name = 'online_pe.ini'
executables = {'datafind': 'gw_data_find',
'mergeNSscript': 'lalinference_nest2pos',
'mergeMCMCscript': 'cbcBayesMCMC2pos',
'combinePTMCMCh5script': 'cbcBayesCombinePTMCMCh5s',
'resultspage': 'cbcBayesPostProc',
'segfind': 'ligolw_segment_query',
'ligolw_print': 'ligolw_print',
'coherencetest': 'lalinference_coherence_test',
'lalinferencenest': 'lalinference_nest',
'lalinferencemcmc': 'lalinference_mcmc',
'lalinferencebambi': 'lalinference_bambi',
'lalinferencedatadump': 'lalinference_datadump',
'ligo-skymap-from-samples': 'ligo-skymap-from-samples',
'ligo-skymap-plot': 'ligo-skymap-plot',
'processareas': 'process_areas',
'computeroqweights': 'lalinference_compute_roq_weights',
'mpiwrapper': 'lalinference_mpi_wrapper',
'gracedb': 'gracedb',
'ppanalysis': 'cbcBayesPPAnalysis',
'pos_to_sim_inspiral': 'cbcBayesPosToSimInspiral'}
def _data_exists(end, frametype_dict):
"""Check whether data at end time can be found with gwdatafind and return
true it it is found.
"""
return min(
len(
find_urls(ifo[0], frametype_dict[ifo], end, end + 1)
) for ifo in frametype_dict.keys()
) > 0
[docs]class NotEnoughData(Exception):
"""Raised if found data is not enough due to the latency of data
transfer
"""
[docs]@app.task(bind=True, autoretry_for=(NotEnoughData, ), default_retry_delay=1,
max_retries=86400, retry_backoff=True, shared=False)
def query_data(self, trigtime):
"""Continues to query data until it is found with gwdatafind and return
frametypes for the data. If data is not found in 86400 seconds = 1 day,
raise NotEnoughData.
"""
end = trigtime + 2
if _data_exists(end, app.conf['low_latency_frame_types']):
return app.conf['low_latency_frame_types']
elif _data_exists(end, app.conf['high_latency_frame_types']):
return app.conf['high_latency_frame_types']
else:
raise NotEnoughData
[docs]@app.task(ignore_result=True, shared=False)
def upload_no_frame_files(request, exc, traceback, superevent_id):
"""Upload notification when no frame files are found.
Parameters
----------
request : Context (placeholder)
Task request variables
exc : Exception
Exception rased by condor.submit
traceback : str (placeholder)
Traceback message from a task
superevent_id : str
The GraceDB ID of a target superevent
"""
if isinstance(exc, NotEnoughData):
gracedb.upload.delay(
filecontents=None, filename=None,
graceid=superevent_id,
message='Frame files have not been found.',
tags='pe'
)
def _find_appropriate_cal_env(trigtime, dir_name):
"""Return the path to the calibration uncertainties estimated at the time
before and closest to the trigger time. If there are no calibration
uncertainties estimated before the trigger time, return the oldest one. The
gpstimes at which the calibration uncertainties were estimated and the
names of the files containing the uncertaintes are saved in
[HLV]_CalEnvs.txt.
Parameters
----------
trigtime : float
The trigger time of a target event
dir_name : str
The path to the directory where files containing calibration
uncertainties exist
Return
------
path : str
The path to the calibration uncertainties appropriate for a target
event
"""
filename, = glob.glob(os.path.join(dir_name, '[HLV]_CalEnvs.txt'))
calibration_index = np.atleast_1d(
np.recfromtxt(filename, names=['gpstime', 'filename'])
)
gpstimes = calibration_index['gpstime']
candidate_gpstimes = gpstimes < trigtime
if np.any(candidate_gpstimes):
idx = np.argmax(gpstimes * candidate_gpstimes)
appropriate_cal = calibration_index['filename'][idx]
else:
appropriate_cal = calibration_index['filename'][np.argmin(gpstimes)]
return os.path.join(dir_name, appropriate_cal.decode('utf-8'))
[docs]@app.task(shared=False)
def prepare_ini(frametype_dict, event, superevent_id=None):
"""Determine an appropriate PE settings for the target event and return ini
file content
"""
# Get template of .ini file
ini_template = env.get_template('online_pe.jinja2')
# fill out the ini template and return the resultant content
singleinspiraltable = event['extra_attributes']['SingleInspiral']
trigtime = event['gpstime']
ini_settings = {
'service_url': gracedb.client._service_url,
'types': frametype_dict,
'channels': app.conf['strain_channel_names'],
'state_vector_channels': app.conf['state_vector_channel_names'],
'webdir': os.path.join(app.conf['pe_results_path'], event['graceid']),
'paths': [{'name': name, 'path': find_executable(executable)}
for name, executable in executables.items()],
'h1_calibration': _find_appropriate_cal_env(
trigtime,
'/home/cbc/pe/O3/calibrationenvelopes/LIGO_Hanford'
),
'l1_calibration': _find_appropriate_cal_env(
trigtime,
'/home/cbc/pe/O3/calibrationenvelopes/LIGO_Livingston'
),
'v1_calibration': _find_appropriate_cal_env(
trigtime,
'/home/cbc/pe/O3/calibrationenvelopes/Virgo'
),
'q': min([sngl['mass2'] / sngl['mass1']
for sngl in singleinspiraltable]),
}
ini_rota = ini_template.render(ini_settings)
ini_settings.update({'use_of_ini': 'online'})
ini_online = ini_template.render(ini_settings)
# upload ini file to GraceDB
if superevent_id is not None:
gracedb.upload.delay(
ini_rota, filename=ini_name, graceid=superevent_id,
message='Automatically generated LALInference configuration file'
' for this event.',
tags='pe')
return ini_online
[docs]def pre_pe_tasks(event, superevent_id):
"""Return canvas of tasks executed before parameter estimation starts"""
return query_data.s(event['gpstime']).on_error(
upload_no_frame_files.s(superevent_id)
) | prepare_ini.s(event, superevent_id)
[docs]@app.task(shared=False)
def dag_prepare(
coinc_psd, ini_contents, rundir, superevent_id
):
"""Create a Condor DAG to run LALInference on a given event.
Parameters
----------
coinc_psd : tuple
The tuple of the byte contents of ``coinc.xml`` and ``psd.xml.gz``
ini_contents : str
The content of online_pe.ini
rundir : str
The path to a run directory where the DAG file exits
superevent_id : str
The GraceDB ID of a target superevent
Returns
-------
submit_file : str
The path to the .sub file
"""
coinc_contents, psd_contents = coinc_psd
# write down coicn.xml in the run directory
path_to_coinc = os.path.join(rundir, 'coinc.xml')
with open(path_to_coinc, 'wb') as f:
f.write(coinc_contents)
# write down psd.xml.gz
if psd_contents is not None:
path_to_psd = os.path.join(rundir, 'psd.xml.gz')
with open(path_to_psd, 'wb') as f:
f.write(psd_contents)
psd_arg = ['--psd', path_to_psd]
else:
psd_arg = []
# write down .ini file in the run directory.
path_to_ini = rundir + '/' + ini_name
with open(path_to_ini, 'w') as f:
f.write(ini_contents)
# run lalinference_pipe
try:
lalinference_arg = ['lalinference_pipe', '--run-path', rundir,
'--coinc', path_to_coinc, path_to_ini] + psd_arg
subprocess.run(lalinference_arg, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, check=True)
subprocess.run(['condor_submit_dag', '-no_submit',
rundir + '/multidag.dag'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
check=True)
except subprocess.CalledProcessError as e:
contents = b'args:\n' + json.dumps(e.args[1]).encode('utf-8') + \
b'\n\nstdout:\n' + e.stdout + b'\n\nstderr:\n' + e.stderr
gracedb.upload.delay(
filecontents=contents, filename='pe_dag.log',
graceid=superevent_id,
message='Failed to prepare DAG', tags='pe'
)
shutil.rmtree(rundir)
raise
finally:
# Remove the ini file so that people do not accidentally use this ini
# file and hence online-PE-only nodes.
os.remove(path_to_ini)
return rundir + '/multidag.dag.condor.sub'
def _find_paths_from_name(directory, name):
"""Return the paths of files or directories with given name under the
specfied directory
Parameters
----------
directory : string
Name of directory under which the target file or directory is searched
for.
name : string
Name of target files or directories
Returns
-------
paths : generator
Paths to the target files or directories
"""
return glob.iglob(os.path.join(directory, '**', name), recursive=True)
[docs]@app.task(ignore_result=True, shared=False)
def job_error_notification(request, exc, traceback, superevent_id, rundir):
"""Upload notification when condor.submit terminates unexpectedly.
Parameters
----------
request : Context (placeholder)
Task request variables
exc : Exception
Exception rased by condor.submit
traceback : str (placeholder)
Traceback message from a task
superevent_id : str
The GraceDB ID of a target superevent
rundir : str
The run directory for PE
"""
if isinstance(exc, condor.JobAborted):
gracedb.upload.delay(
filecontents=None, filename=None, graceid=superevent_id,
message='Job was aborted.', tags='pe'
)
elif isinstance(exc, condor.JobFailed):
gracedb.upload.delay(
filecontents=None, filename=None, graceid=superevent_id,
message='Job failed.', tags='pe'
)
# Get paths to .log files, .err files, .out files
paths_to_log = _find_paths_from_name(rundir, '*.log')
paths_to_err = _find_paths_from_name(rundir, '*.err')
paths_to_out = _find_paths_from_name(rundir, '*.out')
# Upload .log and .err files
for path in itertools.chain(paths_to_log, paths_to_err, paths_to_out):
with open(path, 'rb') as f:
contents = f.read()
if contents:
# put .log suffix in log file names so that users can directly
# read the contents instead of downloading them when they click
# file names
gracedb.upload.delay(
filecontents=contents,
filename=os.path.basename(path) + '.log',
graceid=superevent_id,
message='Here is a log file for PE.',
tags='pe'
)
[docs]@app.task(ignore_result=True, shared=False)
def _upload_url(pe_results_path, graceid):
"""Upload url of a page containing all of the plots."""
path_to_posplots, = _find_paths_from_name(pe_results_path, 'posplots.html')
baseurl = urllib.parse.urljoin(
app.conf['pe_results_url'],
os.path.relpath(
path_to_posplots,
app.conf['pe_results_path']
)
)
gracedb.upload.delay(
filecontents=None, filename=None, graceid=graceid,
message=('LALInference online parameter estimation finished.'
'<a href={}>results</a>').format(baseurl),
tags='pe'
)
[docs]@app.task(ignore_result=True, shared=False)
def _get_result_contents(pe_results_path, filename):
"""Return the contents of a PE results file by reading it from the local
filesystem.
"""
path, = _find_paths_from_name(pe_results_path, filename)
with open(path, 'rb') as f:
contents = f.read()
return contents
def _upload_result(pe_results_path, filename, graceid, message, tag,
uploaded_filename=None):
"""Return a canvas to get the contents of a PE result file and upload it to
GraceDB.
"""
if uploaded_filename is None:
uploaded_filename = filename
return _get_result_contents.si(pe_results_path, filename) | \
gracedb.upload.s(uploaded_filename, graceid, message, tag)
[docs]@app.task(ignore_result=True, shared=False)
def clean_up(rundir):
"""Clean up a run directory.
Parameters
----------
rundir : str
The path to a run directory where the DAG file exits
"""
shutil.rmtree(rundir)
[docs]@app.task(ignore_result=True, shared=False)
def dag_finished(rundir, preferred_event_id, superevent_id):
"""Upload PE results and clean up run directory
Parameters
----------
rundir : str
The path to a run directory where the DAG file exits
preferred_event_id : str
The GraceDB ID of a target preferred event
superevent_id : str
The GraceDB ID of a target superevent
Returns
-------
tasks : canvas
The work-flow for uploading PE results
"""
# get path to pe results
pe_results_path = \
os.path.join(app.conf['pe_results_path'], preferred_event_id)
# FIXME: _upload_url.si has to be out of group for gracedb.create_label.si
# to run
(
_upload_url.si(pe_results_path, superevent_id) |
group(
_upload_result(
rundir, 'posterior*.hdf5', superevent_id,
'LALInference posterior samples', 'pe',
'LALInference.posterior_samples.hdf5'
),
_upload_result(
pe_results_path, 'extrinsic.png', superevent_id,
'Corner plot for extrinsic parameters', 'pe',
'LALInference.extrinsic.png'
),
_upload_result(
pe_results_path, 'sourceFrame.png', superevent_id,
'Corner plot for source frame parameters', 'pe',
'LALInference.intrinsic.png'
)
) | gracedb.create_label.si('PE_READY', superevent_id) |
clean_up.si(rundir)
).delay()
[docs]@gracedb.task(shared=False)
def _download_psd(gid):
"""Download ``psd.xml.gz`` and return its content. If that file does not
exist, return None.
"""
try:
return gracedb.download("psd.xml.gz", gid)
except HTTPError:
return None
[docs]@app.task(ignore_result=True, shared=False)
def start_pe(ini_contents, preferred_event_id, superevent_id):
"""Run LALInference on a given event.
Parameters
----------
ini_contents : str
The content of online_pe.ini
preferred_event_id : str
The GraceDB ID of a target preferred event
superevent_id : str
The GraceDB ID of a target superevent
"""
gracedb.upload.delay(
filecontents=None, filename=None, graceid=superevent_id,
message=('starting LALInference online parameter estimation '
'for {}').format(preferred_event_id),
tags='pe'
)
# make a run directory
lalinference_dir = os.path.expanduser('~/.cache/lalinference')
mkpath(lalinference_dir)
rundir = tempfile.mkdtemp(dir=lalinference_dir,
prefix='{}_'.format(superevent_id))
# give permissions to read the files under the run directory so that PE
# ROTA people can check the status of parameter estimation.
os.chmod(rundir, 0o755)
(
ordered_group(
gracedb.download.s('coinc.xml', preferred_event_id),
_download_psd.s(preferred_event_id)
)
|
dag_prepare.s(ini_contents, rundir, superevent_id)
|
condor.submit.s().on_error(
job_error_notification.s(superevent_id, rundir)
)
|
dag_finished.si(rundir, preferred_event_id, superevent_id)
).delay()