Source code for gwcelery.tasks.lalinference

"""Source Parameter Estimation with LALInference."""
from distutils.spawn import find_executable
from distutils.dir_util import mkpath
import glob
import itertools
import json
import os
import shutil
import subprocess
import tempfile
import urllib

from celery import group
from gwdatafind import find_urls

from .. import app
from ..jinja import env
from . import condor
from . import gracedb
from . import skymaps


ini_name = 'online_pe.ini'

executables = {'datafind': 'gw_data_find',
               'mergeNSscript': 'lalinference_nest2pos',
               'mergeMCMCscript': 'cbcBayesMCMC2pos',
               'combinePTMCMCh5script': 'cbcBayesCombinePTMCMCh5s',
               'resultspage': 'cbcBayesPostProc',
               'segfind': 'ligolw_segment_query',
               'ligolw_print': 'ligolw_print',
               'coherencetest': 'lalinference_coherence_test',
               'lalinferencenest': 'lalinference_nest',
               'lalinferencemcmc': 'lalinference_mcmc',
               'lalinferencebambi': 'lalinference_bambi',
               'lalinferencedatadump': 'lalinference_datadump',
               'ligo-skymap-from-samples': 'ligo-skymap-from-samples',
               'ligo-skymap-plot': 'ligo-skymap-plot',
               'processareas': 'process_areas',
               'computeroqweights': 'lalinference_compute_roq_weights',
               'mpiwrapper': 'lalinference_mpi_wrapper',
               'gracedb': 'gracedb',
               'ppanalysis': 'cbcBayesPPAnalysis',
               'pos_to_sim_inspiral': 'cbcBayesPosToSimInspiral'}


def _data_exists(end, frametype_dict):
    """Check whether data at end time can be found with gwdatafind and return
    true it it is found.
    """
    return min(
        len(
            find_urls(ifo[0], frametype_dict[ifo], end, end + 1)
        ) for ifo in frametype_dict.keys()
    ) > 0


[docs]class NotEnoughData(Exception): """Raised if found data is not enough due to the latency of data transfer """
[docs]@app.task(bind=True, autoretry_for=(NotEnoughData, ), default_retry_delay=1, max_retries=86400, retry_backoff=True, shared=False) def query_data(self, trigtime): """Continues to query data until it is found with gwdatafind and return frametypes for the data. If data is not found in 86400 seconds = 1 day, raise NotEnoughData. """ end = trigtime + 2 if _data_exists(end, app.conf['low_latency_frame_types']): return app.conf['low_latency_frame_types'] elif _data_exists(end, app.conf['high_latency_frame_types']): return app.conf['high_latency_frame_types'] else: raise NotEnoughData
[docs]@app.task(ignore_result=True, shared=False) def upload_no_frame_files(request, exc, traceback, superevent_id): """Upload notification when no frame files are found. Parameters ---------- request : Context (placeholder) Task request variables exc : Exception Exception rased by condor.submit traceback : str (placeholder) Traceback message from a task superevent_id : str The GraceDb ID of a target superevent """ if isinstance(exc, NotEnoughData): gracedb.upload.delay( filecontents=None, filename=None, graceid=superevent_id, message='Frame files have not been found.', tags='pe' )
[docs]@app.task(shared=False) def prepare_ini(frametype_dict, event, superevent_id=None): """Determine an appropriate PE settings for the target event and return ini file content """ # Get template of .ini file ini_template = env.get_template('online_pe.jinja2') # fill out the ini template and return the resultant content singleinspiraltable = event['extra_attributes']['SingleInspiral'] ini_settings = { 'service_url': gracedb.client._service_url, 'types': frametype_dict, 'channels': app.conf['strain_channel_names'], 'state_vector_channels': app.conf['state_vector_channel_names'], 'webdir': os.path.join(app.conf['pe_results_path'], event['graceid']), 'paths': [{'name': name, 'path': find_executable(executable)} for name, executable in executables.items()], 'q': min([sngl['mass2'] / sngl['mass1'] for sngl in singleinspiraltable]), } return ini_template.render(ini_settings)
[docs]def pre_pe_tasks(event, superevent_id): """Return canvas of tasks executed before parameter estimation starts""" return query_data.s(event['gpstime']).on_error( upload_no_frame_files.s(superevent_id) ) | prepare_ini.s(event, superevent_id)
[docs]@app.task(shared=False) def dag_prepare( coinc_contents, ini_contents, rundir, superevent_id ): """Create a Condor DAG to run LALInference on a given event. Parameters ---------- coinc_contents : bytes The byte contents of ``coinc.xml`` ini_contents : str The content of online_pe.ini rundir : str The path to a run directory where the DAG file exits superevent_id : str The GraceDb ID of a target superevent Returns ------- submit_file : str The path to the .sub file """ # write down coicn.xml in the run directory path_to_coinc = os.path.join(rundir, 'coinc.xml') with open(path_to_coinc, 'wb') as f: f.write(coinc_contents) # write down .ini file in the run directory path_to_ini = rundir + '/' + ini_name with open(path_to_ini, 'w') as f: f.write(ini_contents) # run lalinference_pipe gracedb.upload.delay( filecontents=None, filename=None, graceid=superevent_id, message='starting LALInference online parameter estimation', tags='pe' ) try: subprocess.run(['lalinference_pipe', '--run-path', rundir, '--coinc', path_to_coinc, path_to_ini], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True) subprocess.run(['condor_submit_dag', '-no_submit', rundir + '/multidag.dag'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True) except subprocess.CalledProcessError as e: contents = b'args:\n' + json.dumps(e.args[1]).encode('utf-8') + \ b'\n\nstdout:\n' + e.stdout + b'\n\nstderr:\n' + e.stderr gracedb.upload.delay( filecontents=contents, filename='pe_dag.log', graceid=superevent_id, message='Failed to prepare DAG', tags='pe' ) shutil.rmtree(rundir) raise return rundir + '/multidag.dag.condor.sub'
def _find_paths_from_name(directory, name): """Return the paths of files or directories with given name under the specfied directory Parameters ---------- directory : string Name of directory under which the target file or directory is searched for. name : string Name of target files or directories Returns ------- paths : generator Paths to the target files or directories """ return glob.iglob(os.path.join(directory, '**', name), recursive=True)
[docs]@app.task(ignore_result=True, shared=False) def job_error_notification(request, exc, traceback, superevent_id, rundir): """Upload notification when condor.submit terminates unexpectedly. Parameters ---------- request : Context (placeholder) Task request variables exc : Exception Exception rased by condor.submit traceback : str (placeholder) Traceback message from a task superevent_id : str The GraceDb ID of a target superevent rundir : str The run directory for PE """ if isinstance(exc, condor.JobAborted): gracedb.upload.delay( filecontents=None, filename=None, graceid=superevent_id, message='Job was aborted.', tags='pe' ) elif isinstance(exc, condor.JobFailed): gracedb.upload.delay( filecontents=None, filename=None, graceid=superevent_id, message='Job failed.', tags='pe' ) # Get paths to .log files, .err files, .out files paths_to_log = _find_paths_from_name(rundir, '*.log') paths_to_err = _find_paths_from_name(rundir, '*.err') paths_to_out = _find_paths_from_name(rundir, '*.out') # Upload .log and .err files for path in itertools.chain(paths_to_log, paths_to_err, paths_to_out): with open(path, 'rb') as f: contents = f.read() if contents: # put .log suffix in log file names so that users can directly # read the contents instead of downloading them when they click # file names gracedb.upload.delay( filecontents=contents, filename=os.path.basename(path) + '.log', graceid=superevent_id, message='Here is a log file for PE.', tags='pe' )
[docs]@app.task(ignore_result=True, shared=False) def _upload_url(pe_results_path, graceid): """Upload url of a page containing all of the plots.""" path_to_posplots, = _find_paths_from_name(pe_results_path, 'posplots.html') baseurl = urllib.parse.urljoin( app.conf['pe_results_url'], os.path.relpath( path_to_posplots, app.conf['pe_results_path'] ) ) gracedb.upload.delay( filecontents=None, filename=None, graceid=graceid, message=('LALInference online parameter estimation finished.' '<a href={}>results</a>').format(baseurl), tags='pe' )
[docs]@app.task(ignore_result=True, shared=False) def _get_result_contents(pe_results_path, filename): """Return the contents of a PE results file by reading it from the local filesystem. """ path, = _find_paths_from_name(pe_results_path, filename) with open(path, 'rb') as f: contents = f.read() return contents
def _upload_result(pe_results_path, filename, graceid, message, tag): """Return a canvas to get the contents of a PE result file and upload it to GraceDb. """ return _get_result_contents.si(pe_results_path, filename) | \ gracedb.upload.s(filename, graceid, message, tag) def _upload_skymap(pe_results_path, graceid): return _get_result_contents.si(pe_results_path, 'LALInference.fits') | \ group( skymaps.annotate_fits('LALInference.fits', graceid, ['pe', 'sky_loc']), gracedb.upload.s('LALInference.fits', graceid, 'LALInference FITS sky map', ['pe', 'sky_loc']) )
[docs]@app.task(ignore_result=True, shared=False) def clean_up(rundir): """Clean up a run directory. Parameters ---------- rundir : str The path to a run directory where the DAG file exits """ shutil.rmtree(rundir)
[docs]def dag_finished(rundir, preferred_event_id, superevent_id): """Upload PE results and clean up run directory Parameters ---------- rundir : str The path to a run directory where the DAG file exits preferred_event_id : str The GraceDb ID of a target preferred event superevent_id : str The GraceDb ID of a target superevent Returns ------- tasks : canvas The work-flow for uploading PE results """ # get path to pe results pe_results_path = \ os.path.join(app.conf['pe_results_path'], preferred_event_id) # FIXME: _upload_url.si has to be out of group for gracedb.create_label.si # to run return \ _upload_url.si(pe_results_path, superevent_id) | \ group( _upload_skymap(pe_results_path, superevent_id), _upload_result( pe_results_path, 'extrinsic.png', superevent_id, 'Corner plot for extrinsic parameters', 'pe' ), _upload_result( pe_results_path, 'intrinsic.png', superevent_id, 'Corner plot for intrinsic parameters', 'pe' ), _upload_result( pe_results_path, 'sourceFrame.png', superevent_id, 'Corner plot for source frame parameters', 'pe' ) ) | gracedb.create_label.si('PE_READY', superevent_id) | \ clean_up.si(rundir)
[docs]@app.task(ignore_result=True, shared=False) def start_pe(ini_contents, preferred_event_id, superevent_id): """Run LALInference on a given event. Parameters ---------- ini_contents : str The content of online_pe.ini preferred_event_id : str The GraceDb ID of a target preferred event superevent_id : str The GraceDb ID of a target superevent """ # make a run directory lalinference_dir = os.path.expanduser('~/.cache/lalinference') mkpath(lalinference_dir) rundir = tempfile.mkdtemp(dir=lalinference_dir) ( gracedb.download.s('coinc.xml', preferred_event_id) | dag_prepare.s(ini_contents, rundir, superevent_id) | condor.submit.s().on_error( job_error_notification.s(superevent_id, rundir) ) | dag_finished(rundir, preferred_event_id, superevent_id) ).delay()