"""Source Parameter Estimation with LALInference and Bilby."""
from distutils.spawn import find_executable
from distutils.dir_util import mkpath
import glob
import itertools
import json
import os
import shutil
import subprocess
import tempfile
import urllib
from celery import group
from gwdatafind import find_urls
import numpy as np
from requests.exceptions import HTTPError
from .. import app
from ..jinja import env
from .core import ordered_group
from . import condor
from . import gracedb
ini_name = 'online_lalinference_pe.ini'
executables = {'datafind': 'gw_data_find',
'mergeNSscript': 'lalinference_nest2pos',
'mergeMCMCscript': 'cbcBayesMCMC2pos',
'combinePTMCMCh5script': 'cbcBayesCombinePTMCMCh5s',
'resultspage': 'cbcBayesPostProc',
'segfind': 'ligolw_segment_query',
'ligolw_print': 'ligolw_print',
'coherencetest': 'lalinference_coherence_test',
'lalinferencenest': 'lalinference_nest',
'lalinferencemcmc': 'lalinference_mcmc',
'lalinferencebambi': 'lalinference_bambi',
'lalinferencedatadump': 'lalinference_datadump',
'ligo-skymap-from-samples': 'true',
'ligo-skymap-plot': 'true',
'processareas': 'process_areas',
'computeroqweights': 'lalinference_compute_roq_weights',
'mpiwrapper': 'lalinference_mpi_wrapper',
'gracedb': 'gracedb',
'ppanalysis': 'cbcBayesPPAnalysis',
'pos_to_sim_inspiral': 'cbcBayesPosToSimInspiral',
'bayeswave': 'BayesWave',
'bayeswavepost': 'BayesWavePost'}
def _data_exists(end, frametype_dict):
"""Check whether data at end time can be found with gwdatafind and return
true it it is found.
"""
return min(
len(
find_urls(ifo[0], frametype_dict[ifo], end, end + 1)
) for ifo in frametype_dict.keys()
) > 0
[docs]class NotEnoughData(Exception):
"""Raised if found data is not enough due to the latency of data
transfer
"""
[docs]@app.task(bind=True, autoretry_for=(NotEnoughData, ), default_retry_delay=1,
max_retries=86400, retry_backoff=True, shared=False)
def query_data(self, trigtime):
"""Continues to query data until it is found with gwdatafind and return
frametypes for the data. If data is not found in 86400 seconds = 1 day,
raise NotEnoughData.
"""
end = trigtime + 2
if _data_exists(end, app.conf['low_latency_frame_types']):
return app.conf['low_latency_frame_types']
elif _data_exists(end, app.conf['high_latency_frame_types']):
return app.conf['high_latency_frame_types']
else:
raise NotEnoughData
[docs]@app.task(ignore_result=True, shared=False)
def upload_no_frame_files(request, exc, traceback, superevent_id):
"""Upload notification when no frame files are found.
Parameters
----------
request : Context (placeholder)
Task request variables
exc : Exception
Exception rased by condor.submit
traceback : str (placeholder)
Traceback message from a task
superevent_id : str
The GraceDB ID of a target superevent
"""
if isinstance(exc, NotEnoughData):
gracedb.upload.delay(
filecontents=None, filename=None,
graceid=superevent_id,
message='Frame files have not been found.',
tags='pe'
)
def _find_appropriate_cal_env(trigtime, dir_name):
"""Return the path to the calibration uncertainties estimated at the time
before and closest to the trigger time. If there are no calibration
uncertainties estimated before the trigger time, return the oldest one. The
gpstimes at which the calibration uncertainties were estimated and the
names of the files containing the uncertaintes are saved in
[HLV]_CalEnvs.txt.
Parameters
----------
trigtime : float
The trigger time of a target event
dir_name : str
The path to the directory where files containing calibration
uncertainties exist
Return
------
path : str
The path to the calibration uncertainties appropriate for a target
event
"""
filename, = glob.glob(os.path.join(dir_name, '[HLV]_CalEnvs.txt'))
calibration_index = np.atleast_1d(
np.recfromtxt(filename, names=['gpstime', 'filename'])
)
gpstimes = calibration_index['gpstime']
candidate_gpstimes = gpstimes < trigtime
if np.any(candidate_gpstimes):
idx = np.argmax(gpstimes * candidate_gpstimes)
appropriate_cal = calibration_index['filename'][idx]
else:
appropriate_cal = calibration_index['filename'][np.argmin(gpstimes)]
return os.path.join(dir_name, appropriate_cal.decode('utf-8'))
[docs]@app.task(shared=False)
def prepare_ini(frametype_dict, event, superevent_id=None):
"""Determine an appropriate PE settings for the target event and return ini
file content for LALInference pipeline
"""
# Get template of .ini file
ini_template = env.get_template('online_pe.jinja2')
# fill out the ini template and return the resultant content
singleinspiraltable = event['extra_attributes']['SingleInspiral']
trigtime = event['gpstime']
ini_settings = {
'gracedb_host': app.conf['gracedb_host'],
'types': frametype_dict,
'channels': app.conf['strain_channel_names'],
'state_vector_channels': app.conf['state_vector_channel_names'],
'webdir': os.path.join(
app.conf['pe_results_path'], event['graceid'], 'lalinference'
),
'paths': [{'name': name, 'path': find_executable(executable)}
for name, executable in executables.items()],
'h1_calibration': _find_appropriate_cal_env(
trigtime,
'/home/cbc/pe/O3/calibrationenvelopes/LIGO_Hanford'
),
'l1_calibration': _find_appropriate_cal_env(
trigtime,
'/home/cbc/pe/O3/calibrationenvelopes/LIGO_Livingston'
),
'v1_calibration': _find_appropriate_cal_env(
trigtime,
'/home/cbc/pe/O3/calibrationenvelopes/Virgo'
),
'mc': min([sngl['mchirp'] for sngl in singleinspiraltable]),
'q': min([sngl['mass2'] / sngl['mass1']
for sngl in singleinspiraltable]),
'mpirun': find_executable('mpirun')
}
ini_rota = ini_template.render(ini_settings)
ini_settings.update({'use_of_ini': 'online'})
ini_online = ini_template.render(ini_settings)
# upload LALInference ini file to GraceDB
if superevent_id is not None:
gracedb.upload.delay(
ini_rota, filename=ini_name, graceid=superevent_id,
message=('Automatically generated LALInference configuration file'
' for this event.'),
tags='pe')
return ini_online
[docs]def pre_pe_tasks(event, superevent_id):
"""Return canvas of tasks executed before parameter estimation starts"""
return query_data.s(event['gpstime']).on_error(
upload_no_frame_files.s(superevent_id)
) | prepare_ini.s(event, superevent_id)
[docs]@app.task(shared=False)
def _setup_dag_for_lalinference(coinc_psd, ini_contents,
rundir, superevent_id):
"""Create DAG for a lalinference run and return the path to DAG.
Parameters
----------
coinc_psd : tuple of byte contents
Tuple of the byte contents of ``coinc.xml`` and ``psd.xml.gz``
ini_contents : str
The content of online_lalinference_pe.ini
rundir : str
The path to a run directory where the DAG file exits
superevent_id : str
The GraceDB ID of a target superevent
Returns
-------
path_to_dag : str
The path to the .dag file
"""
coinc_contents, psd_contents = coinc_psd
# write down coinc.xml in the run directory
path_to_coinc = os.path.join(rundir, 'coinc.xml')
with open(path_to_coinc, 'wb') as f:
f.write(coinc_contents)
# write down psd.xml.gz
if psd_contents is not None:
path_to_psd = os.path.join(rundir, 'psd.xml.gz')
with open(path_to_psd, 'wb') as f:
f.write(psd_contents)
psd_arg = ['--psd', path_to_psd]
else:
psd_arg = []
# write down .ini file in the run directory.
path_to_ini = os.path.join(rundir, ini_name)
with open(path_to_ini, 'w') as f:
f.write(ini_contents)
try:
subprocess.run(
['lalinference_pipe', '--run-path', rundir,
'--coinc', path_to_coinc, path_to_ini] + psd_arg,
capture_output=True, check=True)
except subprocess.CalledProcessError as e:
contents = b'args:\n' + json.dumps(e.args[1]).encode('utf-8') + \
b'\n\nstdout:\n' + e.stdout + b'\n\nstderr:\n' + e.stderr
gracedb.upload.delay(
filecontents=contents, filename='pe_dag.log',
graceid=superevent_id,
message='Failed to prepare DAG for lalinference', tags='pe'
)
shutil.rmtree(rundir)
raise
else:
# Remove the ini file so that people do not accidentally use this ini
# file and hence online-PE-only nodes.
os.remove(path_to_ini)
return os.path.join(rundir, 'multidag.dag')
[docs]@app.task(shared=False)
def _setup_dag_for_bilby(event, rundir, preferred_event_id, superevent_id):
"""Create DAG for a bilby run and return the path to DAG.
Parameters
----------
event : json contents
The json contents retrieved from gracedb.get_event()
rundir : str
The path to a run directory where the DAG file exits
preferred_event_id : str
The GraceDB ID of a target preferred event
superevent_id : str
The GraceDB ID of a target superevent
Returns
-------
path_to_dag : str
The path to the .dag file
"""
path_to_json = os.path.join(rundir, 'event.json')
with open(path_to_json, 'w') as f:
json.dump(event, f, indent=2)
path_to_webdir = os.path.join(
app.conf['pe_results_path'], preferred_event_id, 'bilby'
)
setup_arg = ['bilby_pipe_gracedb', '--webdir', path_to_webdir,
'--outdir', rundir, '--json', path_to_json, '--online-pe',
'--convert-to-flat-in-component-mass']
if not app.conf['gracedb_host'] == 'gracedb.ligo.org':
setup_arg += ['--channel-dict', 'o2replay',
'--sampler-kwargs', 'FastTest']
try:
subprocess.run(setup_arg, capture_output=True, check=True)
except subprocess.CalledProcessError as e:
contents = b'args:\n' + json.dumps(e.args[1]).encode('utf-8') + \
b'\n\nstdout:\n' + e.stdout + b'\n\nstderr:\n' + e.stderr
gracedb.upload.delay(
filecontents=contents, filename='pe_dag.log',
graceid=superevent_id,
message='Failed to prepare DAG for bilby', tags='pe'
)
shutil.rmtree(rundir)
raise
else:
# Uploads bilby ini file to GraceDB
group(upload_results_tasks(
rundir, 'bilby_config.ini', superevent_id,
'Automatically generated Bilby configuration file',
'pe', 'online_bilby_pe.ini')).delay()
path_to_dag, = glob.glob(os.path.join(rundir, 'submit/dag*.submit'))
print(path_to_dag)
return path_to_dag
[docs]@app.task(shared=False)
def _condor_no_submit(path_to_dag):
"""Run 'condor_submit_dag -no_submit' and return the path to .sub file."""
subprocess.run(['condor_submit_dag', '-no_submit', path_to_dag],
capture_output=True, check=True)
return '{}.condor.sub'.format(path_to_dag)
[docs]@app.task(shared=False)
def dag_prepare_task(rundir, superevent_id, preferred_event_id, pe_pipeline,
ini_contents=None):
"""Return a canvas of tasks to prepare DAG.
Parameters
----------
rundir : str
The path to a run directory where the DAG file exits
superevent_id : str
The GraceDB ID of a target superevent
preferred_event_id : str
The GraceDB ID of a target preferred event
pe_pipeline : str
The parameter estimation pipeline used
Either 'lalinference' OR 'bilby'
ini_contents : str
The content of online_lalinference_pe.ini
Required if pe_pipeline == 'lalinference'
Returns
-------
canvas : canvas of tasks
The canvas of tasks to prepare DAG
"""
if pe_pipeline == 'lalinference':
canvas = ordered_group(
gracedb.download.si('coinc.xml', preferred_event_id),
_download_psd.si(preferred_event_id)
) | _setup_dag_for_lalinference.s(ini_contents, rundir, superevent_id)
elif pe_pipeline == 'bilby':
canvas = gracedb.get_event.si(preferred_event_id) | \
_setup_dag_for_bilby.s(rundir, preferred_event_id, superevent_id)
else:
raise NotImplementedError(f'Unknown PE pipeline {pe_pipeline}.')
canvas |= _condor_no_submit.s()
return canvas
def _find_paths_from_name(directory, name):
"""Return the paths of files or directories with given name under the
specfied directory
Parameters
----------
directory : string
Name of directory under which the target file or directory is searched
for.
name : string
Name of target files or directories
Returns
-------
paths : generator
Paths to the target files or directories
"""
return glob.iglob(os.path.join(directory, '**', name), recursive=True)
[docs]@app.task(ignore_result=True, shared=False)
def job_error_notification(request, exc, traceback,
superevent_id, rundir, pe_pipeline):
"""Upload notification when condor.submit terminates unexpectedly.
Parameters
----------
request : Context (placeholder)
Task request variables
exc : Exception
Exception rased by condor.submit
traceback : str (placeholder)
Traceback message from a task
superevent_id : str
The GraceDB ID of a target superevent
rundir : str
The run directory for PE
pe_pipeline : str
The parameter estimation pipeline used
Either lalinference OR bilby
"""
if isinstance(exc, condor.JobAborted):
gracedb.upload.delay(
filecontents=None, filename=None, graceid=superevent_id, tags='pe',
message='The {} condor job was aborted.'.format(pe_pipeline)
)
elif isinstance(exc, condor.JobFailed):
gracedb.upload.delay(
filecontents=None, filename=None, graceid=superevent_id, tags='pe',
message='The {} condor job failed.'.format(pe_pipeline)
)
# Get paths to .log files, .err files, .out files
paths_to_log = _find_paths_from_name(rundir, '*.log')
paths_to_err = _find_paths_from_name(rundir, '*.err')
paths_to_out = _find_paths_from_name(rundir, '*.out')
# Upload .log and .err files
for path in itertools.chain(paths_to_log, paths_to_err, paths_to_out):
with open(path, 'rb') as f:
contents = f.read()
if contents:
# put .log suffix in log file names so that users can directly
# read the contents instead of downloading them when they click
# file names
gracedb.upload.delay(
filecontents=contents,
filename=os.path.basename(path) + '.log',
graceid=superevent_id,
message='A log file for {} condor job.'.format(pe_pipeline),
tags='pe'
)
[docs]@app.task(ignore_result=True, shared=False)
def _upload_url(pe_results_path, graceid, pe_pipeline):
"""Upload url of a page containing all of the plots."""
if pe_pipeline == 'lalinference':
path_to_posplots, = _find_paths_from_name(
pe_results_path, 'posplots.html'
)
elif pe_pipeline == 'bilby':
path_to_posplots, = _find_paths_from_name(
pe_results_path, 'home.html'
)
else:
raise NotImplementedError(f'Unknown PE pipeline {pe_pipeline}.')
baseurl = urllib.parse.urljoin(
app.conf['pe_results_url'],
os.path.relpath(
path_to_posplots,
app.conf['pe_results_path']
)
)
gracedb.upload.delay(
filecontents=None, filename=None, graceid=graceid,
message=('Online {} parameter estimation finished.'
'<a href={}>results</a>').format(pe_pipeline, baseurl),
tags='pe'
)
[docs]def upload_results_tasks(pe_results_path, filename, graceid, message, tag,
uploaded_filename=None):
"""Return tasks to get the contents of PE result files and upload them to
GraceDB.
Parameters
----------
pe_results_path : string
Directory under which the target file located.
filename : string
Name of the target file
graceid : string
GraceDB ID
message : string
Message uploaded to GraceDB
tag : str
Name of tag to add the GraceDB log
uploaded_filename : str
Name of the uploaded file. If not supplied, it is the same as the
original file name.
Returns
-------
tasks : list of celery tasks
"""
tasks = []
for path in _find_paths_from_name(pe_results_path, filename):
if uploaded_filename is None:
_uploaded_filename = os.path.basename(path)
else:
_uploaded_filename = uploaded_filename
with open(path, 'rb') as f:
tasks.append(gracedb.upload.si(f.read(), _uploaded_filename,
graceid, message, tag))
return tasks
[docs]@app.task(ignore_result=True, shared=False)
def clean_up(rundir):
"""Clean up a run directory.
Parameters
----------
rundir : str
The path to a run directory where the DAG file exits
"""
shutil.rmtree(rundir)
[docs]@app.task(ignore_result=True, shared=False)
def dag_finished(rundir, preferred_event_id, superevent_id, pe_pipeline):
"""Upload PE results and clean up run directory
Parameters
----------
rundir : str
The path to a run directory where the DAG file exits
preferred_event_id : str
The GraceDB ID of a target preferred event
superevent_id : str
The GraceDB ID of a target superevent
pe_pipeline : str
The parameter estimation pipeline used
Either lalinference OR bilby
Returns
-------
tasks : canvas
The work-flow for uploading PE results
"""
pe_results_path = os.path.join(
app.conf['pe_results_path'], preferred_event_id, pe_pipeline
)
if pe_pipeline == 'lalinference':
uploads = [
(rundir, 'glitch_median_PSD_forLI_*.dat',
'Bayeswave PSD used for LALInference PE', None),
(rundir, 'lalinference*.dag', 'LALInference DAG', None),
(rundir, 'posterior*.hdf5',
'LALInference posterior samples',
'LALInference.posterior_samples.hdf5'),
(pe_results_path, 'extrinsic.png',
'LALInference corner plot for extrinsic parameters',
'LALInference.extrinsic.png'),
(pe_results_path, 'sourceFrame.png',
'LALInference corner plot for source frame parameters',
'LALInference.intrinsic.png')
]
elif pe_pipeline == 'bilby':
resultdir = os.path.join(rundir, 'result')
uploads = [
(resultdir, '*merge_result.json',
'Bilby posterior samples',
'Bilby.posterior_samples.json'),
(resultdir, '*_extrinsic_corner.png',
'Bilby corner plot for extrinsic parameters',
'Bilby.extrinsic.png'),
(resultdir, '*_intrinsic_corner.png',
'Bilby corner plot for intrinsic parameters',
'Bilby.intrinsic.png')
]
else:
raise NotImplementedError(f'Unknown PE pipeline {pe_pipeline}.')
upload_tasks = []
for dir, name1, comment, name2 in uploads:
upload_tasks += upload_results_tasks(
dir, name1, superevent_id, comment, 'pe', name2)
# FIXME: _upload_url.si has to be out of group for
# gracedb.create_label.si to run
(
_upload_url.si(pe_results_path, superevent_id, pe_pipeline)
|
group(upload_tasks)
|
clean_up.si(rundir)
).delay()
if pe_pipeline == 'lalinference':
gracedb.create_label.delay('PE_READY', superevent_id)
[docs]@gracedb.task(shared=False)
def _download_psd(gid):
"""Download ``psd.xml.gz`` and return its content. If that file does not
exist, return None.
"""
try:
return gracedb.download("psd.xml.gz", gid)
except HTTPError:
return None
[docs]@app.task(ignore_result=True, shared=False)
def start_pe(ini_contents, preferred_event_id, superevent_id, pe_pipeline):
"""Run Parameter Estimation on a given event.
Parameters
----------
ini_contents : str
The content of online_lalinference_pe.ini
preferred_event_id : str
The GraceDB ID of a target preferred event
superevent_id : str
The GraceDB ID of a target superevent
pe_pipeline : str
The parameter estimation pipeline used
lalinference OR bilby
"""
gracedb.upload.delay(
filecontents=None, filename=None, graceid=superevent_id,
message=('Starting {} online parameter estimation '
'for {}').format(pe_pipeline, preferred_event_id),
tags='pe'
)
# make a run directory
pipeline_dir = os.path.expanduser('~/.cache/{}'.format(pe_pipeline))
mkpath(pipeline_dir)
rundir = tempfile.mkdtemp(
dir=pipeline_dir, prefix='{}_'.format(superevent_id)
)
# give permissions to read the files under the run directory so that PE
# ROTA people can check the status of parameter estimation.
os.chmod(rundir, 0o755)
canvas = (
dag_prepare_task(
rundir, superevent_id, preferred_event_id,
pe_pipeline, ini_contents
)
|
condor.submit.s().on_error(
job_error_notification.s(superevent_id, rundir, pe_pipeline)
)
|
dag_finished.si(
rundir, preferred_event_id, superevent_id, pe_pipeline
)
)
canvas.delay()