Commit 4b1d8a25 authored by Lab's avatar Lab
Browse files

sequencer

parent 81cfd5cb
#!/usr/bin/env python2.7
##############################################################################
#
# datasequence.py
# Date: 12th January 2020
# Authors: L. Saha (lab.saha@gmail.com), D. Morcuende, I. Aguado
# A. Baquero, J. L. Contrera
# Last changes made on:
# Credits: This script is written and modified following scripts from MAGIC OSA. Hence, a big portion
# of the credits goes to the authors of MAGIC OSA.
##############################################################################
from standardhandle import output, verbose, warning, error, stringify, gettag
__all__ = ["datasequence", "r0_to_dl1"]
##############################################################################
#
# datasequence
#
#################################
def datasequence(args):
tag = gettag()
import subprocess
from os.path import join
from job import historylevel
from config import cfg
calibrationfile = args[0]
pedestalfile = args[1]
drivefile = args[2]
run_str = args[3]
textsuffix = cfg.get('LSTOSA', 'TEXTSUFFIX')
historysuffix = cfg.get('LSTOSA', 'HISTORYSUFFIX')
sequenceprebuild = join(options.directory,
'sequence_{0}_{1}'.format(options.tel_id, run_str))
sequencefile = sequenceprebuild + textsuffix
historyfile = sequenceprebuild + historysuffix
print("HistoryFile:",historyfile)
print("OPTION Directory:",options.directory)
print("sequence file:",sequencefile)
level, rc = historylevel(historyfile, 'DATA')
print("LEVEL:",level,rc)
verbose(tag, "Going to level {0}".format(level))
if level == 3:
rc = r0_to_dl1(calibrationfile, pedestalfile, drivefile, run_str, sequencefile, historyfile)
if level == 0:
verbose(tag, "Job for sequence {0} finished without fatal errors"
.format(run_str))
return rc
#=========================================================================================
#
# R0-to-DL1
#
#=========================================================================================
def r0_to_dl1(calibrationfile, pedestalfile, drivefile, run_str, sequencefile, historyfile):
import sys
import os
import subprocess
from os.path import join, dirname, basename
from glob import glob
from config import cfg
import raw
from register import register_run_concept_files
from job import historylevel
import report
configfile = cfg.get('LSTOSA','CONFIGFILE')
pythondir = cfg.get('LSTOSA', 'PYTHONDIR')
lstchaincommand = cfg.get('LSTOSA', 'R0-DL1')
python = os.path.join(cfg.get('ENV', 'PYTHONBIN'), 'python')
fullcommand = join(pythondir, lstchaincommand)
datafile = join(cfg.get('LST1','RAWDIR'),
'LST-1.1.Run{0}.'+'*'+'{1}{2}'.format(run_str,cfg.get('LSTOSA','FITSSUFFIX'),cfg.get('LSTOSA','COMPRESSEDSUFFIX')))
commandargs = [python,fullcommand]
commandargs.append('-f=' + datafile)
commandargs.append('-o=' + options.directory)
commandargs.append('--pedestal_drs4 =' + pedestalfile)
commandargs.append('--calibration_coeff=' + calibrationfile)
commandargs.append('--config=' + configfile )
commandargs.append('--pointing' )
commandargs.append('--drive=' + drivefile)
print("fullcommand",python,commandargs)
try:
verbose(tag, "Executing \"{0}\"".format(stringify(commandargs)))
rc = subprocess.call(commandargs)
except subprocess.CalledProcessError as Error:
error(tag, "{0}".format(Error), rc)
except OSError as (ValueError, NameError):
error(tag, "Command \"{0}\" failed, {1}"\
.format(stringify(commandargs), NameError), ValueError)
else:
report.history(run_str, basename(fullcommand),\
basename(calibrationfile), basename(pedestalfile), rc, historyfile)
return rc
##############################################################################
#
# MAIN
#
##############################################################################
if __name__ == '__main__':
tag = gettag
import sys
import options, cliopts
# Set the options through cli parsing
args = cliopts.datasequencecliparsing(sys.argv[0])
# Run the routine
rc = datasequence(args)
sys.exit(rc)
#!/usr/bin/env python2.7
##############################################################################
#
# sequencer.py
# Date: 12th January 2020
# Authors
# L. Saha <lab.saha@gmail.com>, D. Morcuende <morcuende@gae.ucm.es>
# A. Baquero <>, I. Aguado<>
# J. L. Contrera <>
# Last modified on:
# Credits: This script is written and modified following scripts from MAGIC OSA. Hence, a big portion
# of the credits goes to the authors of MAGIC OSA.
##############################################################################
from standardhandle import output, verbose, warning, error, stringify, gettag
import options, cliopts
# Only these functions will be considered when building the docs
__all__ = ["sequencer", "single_process", "stereo_process"]
##############################################################################
#
# sequencer
#
# This is the main script to be called in crontab
##############################################################################
def sequencer():
"""Runs the sequencer
"""
tag = gettag()
from report import start
process_mode = None
single_array = ['LST1', 'LST2']
print('TEl ID',options.tel_id)
start(tag)
if options.tel_id in single_array:
process_mode = 'single'
single_process(options.tel_id, process_mode)
else:
if options.tel_id == 'all':
process_mode = 'complete'
elif options.tel_id == 'ST':
process_mode = 'stereo'
sequence_LST1 = single_process('LST1', process_mode)
sequence_LST2 = single_process('LST2', process_mode)
sequence_ST = stereo_process('ST', sequence_M1, sequence_M2)
##############################################################################
#
# single_process
#
##############################################################################
def single_process(telescope, process_mode):
"""Runs the single process
Parameters
----------
telescope : str
Options: 'LST1', 'LST2' or 'ST'
process_mode : str
Options: 'single', 'stereo' or 'complete'
Returns
-------
sequence_list :
"""
tag = gettag()
""" This function processes everything for a single telescope """
import extract
import job
import veto
import dot
from nightsummary import readnightsummary
from report import rule
from closer import is_day_closed
sequence_list = []
options.tel_id = telescope
options.directory = cliopts.set_default_directory_if_needed()
print("DIR",options.directory)
# options.directory = "./"
simulate_save = options.simulate
is_report_needed = True
if process_mode == 'single':
if is_day_closed():
output(tag, "Day {0} for {1} already closed".\
format(options.date, options.tel_id))
return sequence_list
else:
if process_mode == 'stereo':
""" Only simulation for single array required """
options.nightsum = True
options.simulate = True
is_report_needed = False
""" Building the sequences """
night = readnightsummary() # night corresponds to f.read()
subrun_list = extract.extractsubruns(night)
run_list = extract.extractruns(subrun_list)
# Modifies run_list by adding the seq and parent info into runs
sequence_list = extract.extractsequences(run_list)
# Workflow and Submission
# dot.writeworkflow(sequence_list)
# Adds the scripts
job.preparejobs(sequence_list, subrun_list)
queue_list = job.getqueuejoblist(sequence_list)
# veto_list = veto.getvetolist(sequence_list)
# closed_list = veto.getclosedlist(sequence_list)
# updatelstchainstatus(sequence_list)
# updatesequencedb(sequence_list)
# actually, submitjobs does not need the queue_list nor veto_list
# job_list = job.submitjobs(sequence_list, queue_list, veto_list)
# combine_muon(job_list)
# # Report
# if is_report_needed:
# insert_if_new_activity_db(sequence_list)
# updatesequencedb(sequence_list)
# rule()
# reportsequences(sequence_list)
#
# # Cleaning
# options.directory = None
# options.simulate = simulate_save
return sequence_list
##############################################################################
#
# stereo_process
#
##############################################################################
#def stereo_process(telescope, s1_list, s2_list):
# tag = gettag()
# import extract
# import dot
# import job
# import veto
# from report import start, rule
#
# options.tel_id = telescope
# options.directory = cliopts.set_default_directory_if_needed()
#
# # Building the sequences
# sequence_list = extract.extractsequencesstereo(s1_list, s2_list)
# # Workflow and Submission
# dot.writeworkflow(sequence_list)
# # Adds the scripts
# job.preparestereojobs(sequence_list)
# #job.preparedailyjobs(dailysrc_list)
# queue_list = job.getqueuejoblist(sequence_list)
# veto_list = veto.getvetolist(sequence_list)
# closed_list = veto.getclosedlist(sequence_list)
# updatelstchainstatus(sequence_list)
# # actually, submitjobs does not need the queue_list nor veto_list
# job_list = job.submitjobs(sequence_list, queue_list, veto_list)
# # Finalizing report
# insert_if_new_activity_db(sequence_list)
# updatesequencedb(sequence_list)
# rule()
# reportsequences(sequence_list)
# # Cleaning
# options.directory = None
# return sequence_list
##############################################################################
#
# updatelstchainstatus
#
##############################################################################
def updatelstchainstatus(seq_list):
tag = gettag()
from decimal import Decimal
for s in seq_list:
if s.type == 'CALIBRATION':
s.scalibstatus = int(Decimal( getlstchainforsequence(s, 'Scalib') * 100 ) / s.subruns)
elif s.type == 'DATA':
s.dl1status = int(Decimal( getlstchainforsequence(s, 'R0-DL1') * 100 ) / s.subruns)
s.dl2status = int(Decimal( getlstchainforsequence(s, 'DL1-DL2') * 100 ) / s.subruns)
elif s.type == 'STEREO':
s.dl2status = int(Decimal( getlstchainforsequence(s, 'DL1-DL2') * 100 ) )
s.dl3status = int(Decimal( getlstchainforsequence(s, 'DL2-DL3') * 100 ) )
##############################################################################
#
# getlstchainforsequence
#
##############################################################################
def getlstchainforsequence(s, program):
tag = gettag()
from os.path import join
from glob import glob
from config import cfg
prefix = cfg.get('LSTOSA', program + 'PREFIX')
pattern = cfg.get('LSTOSA', program + 'PATTERN')
suffix = cfg.get('LSTOSA', program + 'SUFFIX')
files = glob(join(options.directory, "{0}*{1}*{2}*{3}".format(prefix, s.run, pattern, suffix)))
numberoffiles = len(files)
verbose(tag, "Found {0} {1}ed for sequence name {2}".format(numberoffiles, program, s.jobname))
return numberoffiles
##############################################################################
#
# reportsequences
#
##############################################################################
def reportsequences(seqlist):
tag = gettag()
import config
matrix = []
header = ['Tel', 'Seq', 'Parent', 'Type', 'Run', 'Subruns', 'Source', 'Wobble', 'Action', 'Tries', 'JobID', 'State', 'Host', 'CPU_time', 'Walltime', 'Exit']
if options.tel_id == 'M1' or options.tel_id == 'M2':
header.append('_Y_%')
header.append('_D_%')
header.append('_I_%')
elif options.tel_id == 'ST':
header.append('_S_%')
header.append('_Q_%')
header.append('ODI%')
matrix.append(header)
for s in seqlist:
row_list = [s.telescope, s.seq, s.parent, s.type, s.run, s.subruns, s.source, s.wobble, s.action, s.tries, s.jobid, s.state, s.jobhost, s.cputime, s.walltime, s.exit]
if s.type == 'CALIBRATION':
row_list.append(None)
row_list.append(None)
row_list.append(None)
elif s.type == 'DATA':
row_list.append(s.sorcererstatus)
row_list.append(s.merppstatus)
row_list.append(s.starstatus)
elif s.type == 'STEREO':
row_list.append(s.superstarstatus)
row_list.append(s.melibeastatus)
row_list.append(s.odiestatus)
matrix.append(row_list)
padding = int(config.cfg.get('OUTPUT', 'PADDING')) # space chars inserted between columnns
prettyoutputmatrix(matrix, padding)
##############################################################################
#
# insert_if_new_activity_db
#
##############################################################################
def insert_if_new_activity_db(sequence_list):
tag = gettag()
import config
from datetime import datetime
from mysql import update_db, insert_db, select_db
server = config.cfg.get('MYSQL', 'SERVER')
user = config.cfg.get('MYSQL', 'USER')
database = config.cfg.get('MYSQL', 'DATABASE')
table = config.cfg.get('MYSQL', 'SUMMARYTABLE')
if len(sequence_list) != 0:
""" Declare the beginning of OSA activity """
start = datetime.now()
selections = ['ID']
conditions = {'NIGHT': options.date, 'TELESCOPE': options.tel_id,\
'ACTIVITY': 'LSTOSA'}
matrix = select_db(server, user, database, table, selections, conditions)
id = None
if matrix:
id = matrix[0][0]
if id and int(id) > 0:
""" Activity already started """
else:
""" Insert it into the database """
assignments = conditions
assignments['IS_FINISHED'] = 0
assignments['START'] = start
assignments['END'] = None
conditions = {}
insert_db(server, user, database, table, assignments, conditions)
##############################################################################
#
# updatesequencedb
#
##############################################################################
def updatesequencedb(seqlist):
tag = gettag()
import config
from mysql import update_db, insert_db, select_db
server = config.cfg.get('MYSQL', 'SERVER')
user = config.cfg.get('MYSQL', 'USER')
database = config.cfg.get('MYSQL', 'DATABASE')
table = config.cfg.get('MYSQL', 'SEQUENCETABLE')
for s in seqlist:
""" Fine tuning """
hostname = None
id_processor = None
if s.jobhost != None:
hostname, id_processor = s.jobhost.split('/')
""" Select ID if exists """
selections = ['ID']
conditions = {'TELESCOPE': s.telescope, 'NIGHT': s.night, 'ID_NIGHTLY':s.seq}
matrix = select_db(server, user, database, table, selections, conditions)
id = None
if matrix:
id = matrix[0][0]
verbose(tag, "To this sequence corresponds an entry in the {0} with ID {1}".format(table, id))
assignments = {\
'TELESCOPE': s.telescope,\
'NIGHT': s.night,\
'ID_NIGHTLY': s.seq,\
'TYPE': s.type,\
'RUN': s.run,\
'SUBRUNS': s.subruns,\
'SOURCEWOBBLE': s.sourcewobble,\
'ACTION': s.action,\
'TRIES': s.tries,\
'JOBID': s.jobid,\
'STATE': s.state,\
'HOSTNAME': hostname,\
'ID_PROCESSOR': id_processor,\
'CPU_TIME': s.cputime,\
'WALL_TIME': s.walltime,\
'EXIT_STATUS': s.exit,\
}
if s.type == 'CALIBRATION':
assignments.update({'PROGRESS_SCALIB': s.scalibstatus})
elif s.type == 'DATA':
assignments.update({\
'PROGRESS_SORCERER': s.sorcererstatus,\
'PROGRESS_SSIGNAL': s.ssignalstatus,\
'PROGRESS_MERPP': s.merppstatus,\
'PROGRESS_STAR': s.starstatus,\
'PROGRESS_STARHISTOGRAM': s.starhistogramstatus,\
})
elif s.type == 'STEREO':
assignments.update({\
'PROGRESS_SUPERSTAR': s.superstarstatus,\
'PROGRESS_SUPERSTARHISTOGRAM': s.superstarhistogramstatus,\
'PROGRESS_MELIBEA': s.melibeastatus,\
'PROGRESS_MELIBEAHISTOGRAM': s.melibeahistogramstatus,\
})
# TODO: Add this (Mireia)
#'PROGRESS_ODIE': s.odiestatus,\
if s.parent != None:
assignments['ID_NIGHTLY_PARENTS'] = '{0},'.format(s.parent)
if not id:
conditions = {}
insert_db(server, user, database, table, assignments, conditions)
else:
conditions = {'ID':id}
update_db(server, user, database, table, assignments, conditions)
##############################################################################
#
# combinemuon
#
##############################################################################
def combine_muon(job_list):
tag = gettag()
import os.path
import subprocess
import config
from utils import magicdate_to_number
muon_script = get_muon_script()
update_muon_script(muon_script)
commandargs = ['qsub']
if job_list != 0:
commandargs.append('-W')
depend_list = 'depend=afterany'
for j in job_list:
if j != None and j>0:
depend_list += ":{0}".format(j)
commandargs.append(depend_list)
else:
warning(tag, "Empty job list")
return 0;
commandargs.append(muon_script)
if not options.simulate:
try:
outputstring = subprocess.check_output(commandargs)
#except OSError as (ValueError, NameError):
except OSError as NameError:
warning(tag, "Could not execute script {0}, {1}".format(stringify(commandargs), NameError))
return ValueError
else:
job_id = outputstring.rstrip()
verbose(tag, "Muon script {0} submitted with job_id {1}".format(muon_script, job_id))
else:
verbose(tag, "SIMULATE {0}".format(stringify(commandargs)))
##############################################################################
#
# updatemuoncontent
#
##############################################################################
def update_muon_script(muon_script):
tag = gettag()
import iofile
from os.path import join
from glob import glob
import config
from utils import magicdate_to_number
bindir = config.cfg.get('LSTOSA', 'BINDIR')
lstchaindir = config.cfg.get('LSTOSA', 'LSTCHAINDIR')
command = join(bindir, config.cfg.get('LSTOSA', 'COMBINEMUON'))
# Beware we want to change this in the future
filename = get_muon_file()
commandargs = [command, filename]
jobname = "{0}_{1}_{2}".format(config.cfg.get('LSTOSA', 'MUONPREFIX'), magicdate_to_number(options.date), options.tel_id)
content = "#!/bin/bash\n"
# PBS-like assignments, beware that being comented with # is still parsed by the qsub routine and assigned
content += "#PBS -S /bin/bash\n"
content += "#PBS -N {0}\n".format(jobname)
content += "#PBS -d {0}\n".format(options.directory)
content += "#PBS -e {0}_$PBS_JOBID.err\n".format(jobname)
content += "#PBS -o {0}_$PBS_JOBID.out\n".format(jobname)
# Preparing the environment
content += "LSTCHAIN={0}\n".format(LSTDIR)
content += "{0}\n".format(stringify(commandargs))
if not options.simulate:
is_updated = iofile.writetofile(muon_script, content)
return is_updated
##############################################################################
#
# get_muon_script
#
##############################################################################
def get_muon_script():
tag = gettag()
from utils import build_magicbasename
from os.path import join
from config import cfg
prefix = cfg.get('LSTOSA', 'MUONPREFIX')
suffix = cfg.get('LSTOSA', 'SCRIPTSUFFIX')
basename = build_magicbasename(prefix, suffix)
file = join(options.directory, basename)
return file
##############################################################################
#
# get_muonfile
#
##############################################################################
def get_muon_file():
tag = gettag()
from os.path import join
from config import cfg
prefix = cfg.get('LSTOSA', 'MUONPREFIX')
suffix = cfg.get('LSTOSA', 'TEXTSUFFIX')
basename = build_magicbasename(prefix, suffix)
file = join(options.directory, basename)
return file
##############################################################################
#
# prettyoutputmatrix
#
##############################################################################
def prettyoutputmatrix(m, paddingspace):
tag = gettag()
maxfieldlength = []
for i in range(len(m)):
row = m[i]
for j in range(len(row)):
col = row[j]
l = len(str(col))
# verbose(tag, "Row {0}, Col {1}, Val {2} Len {3}".format(i, j, col, l))
if m.index(row) == 0:
maxfieldlength.append(l)
elif l > maxfieldlength[j]:
# Insert or update the first length
maxfieldlength[j] = l
for row in m:
stringrow = ''
for j in range(len(row)):
col = row[j]
lpadding = (maxfieldlength[j] - len(str(col))) * ' '
rpadding = paddingspace * ' '
if isinstance(col, (int, long)):
# We got an integer, right aligned
stringrow += "{0}{1}{2}".format(lpadding, col, rpadding)
else:
# Should be a string, left aligned
stringrow += "{0}{1}{2}".format(col, lpadding, rpadding)
output(tag, stringrow)
##############################################################################
#
# MAIN
#
##############################################################################
""" Sequencer called as a script does the full job """
if __name__ == '__main__':
tag = gettag()
import sys
import options, cliopts
# Set the options through parsing of the command line interface
cliopts.sequencercliparsing(sys.argv[0])
# Run the routine
sequencer()
Supports Markdown