Commit 7367207d authored by Lab's avatar Lab
Browse files

Latest update

parent a661f222
...@@ -99,7 +99,7 @@ CLOSERDIR = %(DIR)s/lst-osa/Closer ...@@ -99,7 +99,7 @@ CLOSERDIR = %(DIR)s/lst-osa/Closer
ENDOFRAWTRANSFERDIR = %(DIR)s/Reports/Compress ENDOFRAWTRANSFERDIR = %(DIR)s/Reports/Compress
CALIBRATIONCONFIGCARD = onsite_camera_calibration_param.json CALIBRATIONCONFIGCARD = onsite_camera_calibration_param.json
CONFIGCARD = CONFIGCARD =
VERSION = v01 VERSION = v00
[LST2] [LST2]
DIR = /fefs/aswg/lst-osa DIR = /fefs/aswg/lst-osa
......
...@@ -9,18 +9,18 @@ def writeworkflow(sequence_list): ...@@ -9,18 +9,18 @@ def writeworkflow(sequence_list):
tag = gettag() tag = gettag()
from iofile import writetofile from iofile import writetofile
from os.path import join, exists from os.path import join, exists
from config import cfg from osa.configs.config import cfg
from utils import magicdate_to_number from osa.utils.utils import lstdate_to_number
replaced = None replaced = None
dot_basename = "{0}_{1}_{2}{3}".\ dot_basename = "{0}_{1}_{2}{3}".\
format(cfg.get('OSA', 'WORKFLOWPREFIX'),\ format(cfg.get('LSTOSA', 'WORKFLOWPREFIX'),\
magicdate_to_number(options.date),\ lstdate_to_number(options.date),\
options.tel_id, cfg.get('OSA', 'GRAPHSUFFIX')) options.tel_id, cfg.get('LSTOSA', 'GRAPHSUFFIX'))
dot_path = join(options.directory, dot_basename) dot_path = join(options.directory, dot_basename)
# We could think of using the pydot interface as well, but this is relatively simple anyway # We could think of using the pydot interface as well, but this is relatively simple anyway
content = "strict digraph {\n" content = "strict digraph {\n"
content += "label=\"OSA workflow for " + options.tel_id + " on " + options.date + "\";" content += "label=\"LSTOSA workflow for " + options.tel_id + " on " + options.date + "\";"
content += "labelloc=t;\n" content += "labelloc=t;\n"
content += "rankdir=LR;\n" content += "rankdir=LR;\n"
content += "node [shape=box];\n" content += "node [shape=box];\n"
...@@ -49,7 +49,7 @@ def writeworkflow(sequence_list): ...@@ -49,7 +49,7 @@ def writeworkflow(sequence_list):
if not options.simulate: if not options.simulate:
replaced = writetofile(dot_path, content) replaced = writetofile(dot_path, content)
verbose(tag, "Workflow updated? {0} in {1}".format(replaced, dot_path)) verbose(tag, "Workflow updated? {0} in {1}".format(replaced, dot_path))
svg_path = dot_path.rsplit('.', 1)[0] + cfg.get('OSA', 'SVGSUFFIX') svg_path = dot_path.rsplit('.', 1)[0] + cfg.get('LSTOSA', 'SVGSUFFIX')
if replaced or not exists(svg_path): if replaced or not exists(svg_path):
verbose(tag, "Updating workflow file: {0}".format(dot_path)) verbose(tag, "Updating workflow file: {0}".format(dot_path))
convert_dot_into_svg(dot_path, svg_path) convert_dot_into_svg(dot_path, svg_path)
...@@ -62,10 +62,10 @@ def convert_dot_into_svg(dotfile, svgfile): ...@@ -62,10 +62,10 @@ def convert_dot_into_svg(dotfile, svgfile):
tag = gettag() tag = gettag()
# Pretty clear what it does, isn't it? # Pretty clear what it does, isn't it?
import subprocess import subprocess
from config import cfg from osa.configs.config import cfg
command = cfg.get('OSA', 'GRAPH') command = cfg.get('LSTOSA', 'GRAPH')
svgflag = '-' + cfg.get('OSA', 'SVGSUFFIX').replace('.', 'T') svgflag = '-' + cfg.get('LSTOSA', 'SVGSUFFIX').replace('.', 'T')
try: try:
commandoutput = subprocess.check_output(['which', command]) commandoutput = subprocess.check_output(['which', command])
except subprocess.CalledProcessError as Error: except subprocess.CalledProcessError as Error:
......
...@@ -343,8 +343,9 @@ def createjobtemplate(s): ...@@ -343,8 +343,9 @@ def createjobtemplate(s):
content = "#!/bin/env python\n" content = "#!/bin/env python\n"
# SLURM assignments # SLURM assignments
content += "#SBATCH -p compute\n" content += "#SBATCH -p compute\n"
content += "#SBATCH --tasks=60\n" content += "#SBATCH --tasks=2\n"
content += "#SBATCH --nodes=2\n" if s.type == DATA:
content += "#SBATCH --array=1-{0}\n".format(len(s.subrun_list))
content += "#SBATCH --cpus-per-task=1\n" content += "#SBATCH --cpus-per-task=1\n"
content += "#SBATCH --mem-per-cpu=1600\n" content += "#SBATCH --mem-per-cpu=1600\n"
content += "#SBATCH -t 0-48:00\n" content += "#SBATCH -t 0-48:00\n"
...@@ -377,75 +378,88 @@ def createjobtemplate(s): ...@@ -377,75 +378,88 @@ def createjobtemplate(s):
# submitjobs # submitjobs
# #
############################################################################## ##############################################################################
def submitjobs(sequence_list, queue_list, veto_list): #def submitjobs(sequence_list, queue_list, veto_list):
def submitjobs(sequence_list):
tag = gettag() tag = gettag()
import subprocess import subprocess
from os.path import join from os.path import join
from osa.configs import config from osa.configs import config
job_list = [] job_list = []
command = 'qsub' command = 'sbatch'
for s in sequence_list: for s in sequence_list:
commandargs = [command, s.script] # List of two elements commandargs = [command, s.script] # List of two elements
commandargs.append('-W') # commandargs.append('-W')
commandargs.append('umask=0022') # commandargs.append('umask=0022')
""" Introduce the job dependencies """ # """ Introduce the job dependencies """
if len(s.parent_list) != 0: # if len(s.parent_list) != 0:
commandargs.append('-W') # commandargs.append('-W')
depend_string = 'depend=' # depend_string = 'depend='
if s.type == 'DATA': # if s.type == 'DATA':
depend_string += 'afterok' # depend_string += 'afterok'
elif s.type == 'STEREO': # elif s.type == 'STEREO':
depend_string += 'afterany' # depend_string += 'afterany'
for pseq in s.parent_list: # for pseq in s.parent_list:
if pseq.jobid > 0: # if pseq.jobid > 0:
depend_string += ":{0}".format(pseq.jobid) # depend_string += ":{0}".format(pseq.jobid)
commandargs.append(depend_string) # commandargs.append(depend_string)
""" Skip vetoed """ # """ Skip vetoed """
if s.action == 'Veto': # if s.action == 'Veto':
verbose(tag, "job {0} has been vetoed".format(s.jobname)) # verbose(tag, "job {0} has been vetoed".format(s.jobname))
elif s.action == 'Closed': # elif s.action == 'Closed':
verbose(tag, "job {0} is already closed".format(s.jobname)) # verbose(tag, "job {0} is already closed".format(s.jobname))
elif s.action == 'Check' and s.state != 'C': # elif s.action == 'Check' and s.state != 'C':
verbose(tag, "job {0} checked to be dispatched but not completed yet".format(s.jobname)) # verbose(tag, "job {0} checked to be dispatched but not completed yet".format(s.jobname))
if s.state == 'H' or s.state == 'R': # if s.state == 'H' or s.state == 'R':
# Reset values # # Reset values
s.exit = None # s.exit = None
if s.state == 'H': # if s.state == 'H':
s.jobhost = None # s.jobhost = None
s.cputime = None # s.cputime = None
s.walltime = None # s.walltime = None
elif s.action == 'Check' and s.state == 'C' and s.exit == 0: # elif s.action == 'Check' and s.state == 'C' and s.exit == 0:
verbose(tag, "job {0} checked to be successful".format(s.jobname)) # verbose(tag, "job {0} checked to be successful".format(s.jobname))
else: # else:
if options.simulate == True: # if options.simulate == True:
commandargs.insert(0, 'echo') # commandargs.insert(0, 'echo')
s.action = 'Simulate' # s.action = 'Simulate'
# This jobid is negative showing it belongs to a simulated environment (not real jobid) # # This jobid is negative showing it belongs to a simulated environment (not real jobid)
s.jobid = -1 - s.seq # s.jobid = -1 - s.seq
else: # else:
s.action = 'Submit' # s.action = 'Submit'
# Reset the values to avoid misleading info from previous jobs # # Reset the values to avoid misleading info from previous jobs
s.jobhost = None # s.jobhost = None
s.state = 'Q' # s.state = 'Q'
s.cputime = None # s.cputime = None
s.walltime = None # s.walltime = None
s.exit = None # s.exit = None
try: # try:
stdout = subprocess.check_output(commandargs) # stdout = subprocess.check_output(commandargs)
except subprocess.CalledProcessError as Error: # except subprocess.CalledProcessError as Error:
# error(tag, Error, 2)
# except OSError (ValueError, NameError):
# error(tag, "Command {0}, {1}".format(stringify(commandargs), NameError), ValueError)
# else:
# if options.simulate == False:
# try:
# s.jobid = int(stdout.split('.', 1)[0])
# except ValueError as e:
# warning(tag, "Wrong parsing of jobid {0} not being an integer, {1}".format(stdout.split('.', 1)[0], e))
# job_list.append(s.jobid)
# verbose(tag, "{0} {1}".format(s.action, stringify(commandargs)))
print("Launching scripts {0} ".format(str(s.script)))
try:
verbose(tag,"Launching scripts {0} ".format(str(s.script)))
stdout = subprocess.check_output(commandargs)
except subprocess.CalledProcessError as Error:
error(tag, Error, 2) error(tag, Error, 2)
except OSError (ValueError, NameError): except OSError (ValueError, NameError):
error(tag, "Command {0}, {1}".format(stringify(commandargs), NameError), ValueError) error(tag, "Command {0}, {1}".format(stringify(commandargs), NameError), ValueError)
else:
if options.simulate == False: print(commandargs)
try: job_list.append(s.script)
s.jobid = int(stdout.split('.', 1)[0])
except ValueError as e:
warning(tag, "Wrong parsing of jobid {0} not being an integer, {1}".format(stdout.split('.', 1)[0], e))
job_list.append(s.jobid)
verbose(tag, "{0} {1}".format(s.action, stringify(commandargs)))
return job_list return job_list
############################################################################## ##############################################################################
# #
# getqueuejoblist # getqueuejoblist
# #
...@@ -459,7 +473,7 @@ def getqueuejoblist(sequence_list): ...@@ -459,7 +473,7 @@ def getqueuejoblist(sequence_list):
command = config.cfg.get('ENV', 'SBATCHBIN') command = config.cfg.get('ENV', 'SBATCHBIN')
commandargs = [command] commandargs = [command]
queue_list = [] queue_list = []
print("DEBUG",command) print("DEBUG",commandargs)
try: try:
xmloutput = subprocess.check_output(commandargs) xmloutput = subprocess.check_output(commandargs)
except subprocess.CalledProcessError as Error: except subprocess.CalledProcessError as Error:
...@@ -474,6 +488,7 @@ def getqueuejoblist(sequence_list): ...@@ -474,6 +488,7 @@ def getqueuejoblist(sequence_list):
document = xml.dom.minidom.parseString(xmloutput) document = xml.dom.minidom.parseString(xmloutput)
queue_list = xmlhandle.xmlhandleData(document) queue_list = xmlhandle.xmlhandleData(document)
setqueuevalues(queue_list, sequence_list) setqueuevalues(queue_list, sequence_list)
print("DEBUG",command)
return queue_list return queue_list
############################################################################## ##############################################################################
# #
......
...@@ -427,9 +427,9 @@ def set_default_date_if_needed(): ...@@ -427,9 +427,9 @@ def set_default_date_if_needed():
if is_defined(options.date): if is_defined(options.date):
return options.date return options.date
else: else:
from utils import getcurrentdate2 from .utils import getcurrentdate2
from config import cfg from osa.configs.config import cfg
return getcurrentdate2(cfg.get('MAGIC', 'DATESEPARATOR')) return getcurrentdate2(cfg.get('LST', 'DATESEPARATOR'))
############################################################################## ##############################################################################
# #
# set_default_directory_if_needed # set_default_directory_if_needed
......
...@@ -108,13 +108,15 @@ def single_process(telescope, process_mode): ...@@ -108,13 +108,15 @@ def single_process(telescope, process_mode):
# dot.writeworkflow(sequence_list) # dot.writeworkflow(sequence_list)
# Adds the scripts # Adds the scripts
job.preparejobs(sequence_list, subrun_list) job.preparejobs(sequence_list, subrun_list)
queue_list = job.getqueuejoblist(sequence_list)
# queue_list = job.getqueuejoblist(sequence_list)
# veto_list = veto.getvetolist(sequence_list) # veto_list = veto.getvetolist(sequence_list)
# closed_list = veto.getclosedlist(sequence_list) # closed_list = veto.getclosedlist(sequence_list)
# updatelstchainstatus(sequence_list) # updatelstchainstatus(sequence_list)
# updatesequencedb(sequence_list) # updatesequencedb(sequence_list)
# actually, submitjobs does not need the queue_list nor veto_list # actually, submitjobs does not need the queue_list nor veto_list
# job_list = job.submitjobs(sequence_list, queue_list, veto_list) # job_list = job.submitjobs(sequence_list, queue_list, veto_list)
job_list = job.submitjobs(sequence_list)
# combine_muon(job_list) # combine_muon(job_list)
# # Report # # Report
# if is_report_needed: # if is_report_needed:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment