Commit 81bb067b authored by Lab's avatar Lab
Browse files

re-structure new

parent 4f61fce0
5088301 1 PEDESTAL 2020-01-12 21:06:32 Algol 16450 24.4 03:08:10 40:57:20 DEFAULT No_Test stdHV No_Filter
5088301 2 PEDESTAL 2020-01-12 21:07:09 Algol 16450 24.2 03:08:10 40:57:20 DEFAULT No_Test stdHV No_Filter
5088301 3 PEDESTAL 2020-01-12 21:07:48 Algol 16450 24.1 03:08:10 40:57:20 DEFAULT No_Test stdHV No_Filter
5088301 4 PEDESTAL 2020-01-12 21:08:29 Algol 16450 23.9 03:08:10 40:57:20 DEFAULT No_Test stdHV No_Filter
5088301 5 PEDESTAL 2020-01-12 21:09:07 Algol 16450 23.8 03:08:10 40:57:20 DEFAULT No_Test stdHV No_Filter
5088301 6 PEDESTAL 2020-01-12 21:09:46 Algol 16450 23.6 03:08:10 40:57:20 DEFAULT No_Test stdHV No_Filter
5088301 7 PEDESTAL 2020-01-12 21:09:51 Algol 1300 23.6 03:08:10 40:57:20 DEFAULT No_Test stdHV No_Filter
5088302 1 PEDESTAL 2020-01-12 21:09:59 S30218+35-W0.40+000 2000 8.1 02:23:03 35:56:14 DEFAULT No_Test stdHV No_Filter
5088303 1 CALIBRATION 2020-01-12 21:10:07 S30218+35-W0.40+000 2000 8.1 02:23:03 35:56:14 DEFAULT No_Test stdHV No_Filter
5088304 1 DATA 2020-01-12 21:10:47 S30218+35-W0.40+000 16450 8.0 02:23:03 35:56:14 DEFAULT No_Test stdHV No_Filter
5088304 2 DATA 2020-01-12 21:11:27 S30218+35-W0.40+000 16450 8.0 02:23:03 35:56:14 DEFAULT No_Test stdHV No_Filter
5088304 3 DATA 2020-01-12 21:12:06 S30218+35-W0.40+000 16450 7.9 02:23:03 35:56:14 DEFAULT No_Test stdHV No_Filter
5088304 4 DATA 2020-01-12 21:12:47 S30218+35-W0.40+000 16450 7.9 02:23:03 35:56:14 DEFAULT No_Test stdHV No_Filter
5088304 5 DATA 2020-01-12 21:13:29 S30218+35-W0.40+000 16450 7.8 02:23:03 35:56:14 DEFAULT No_Test stdHV No_Filter
5088304 6 DATA 2020-01-12 21:14:08 S30218+35-W0.40+000 16450 7.8 02:23:03 35:56:14 DEFAULT No_Test stdHV No_Filter
5088304 7 DATA 2020-01-12 21:14:50 S30218+35-W0.40+000 16450 7.7 02:23:03 35:56:14 DEFAULT No_Test stdHV No_Filter
5088304 8 DATA 2020-01-12 21:15:31 S30218+35-W0.40+000 16450 7.7 02:23:03 35:56:14 DEFAULT No_Test stdHV No_Filter
5088304 9 DATA 2020-01-12 21:16:12 S30218+35-W0.40+000 16450 7.7 02:23:03 35:56:14 DEFAULT No_Test stdHV No_Filter
5088304 10 DATA 2020-01-12 21:16:53 S30218+35-W0.40+000 16450 7.6 02:23:03 35:56:14 DEFAULT No_Test stdHV No_Filter
5088304 11 DATA 2020-01-12 21:17:34 S30218+35-W0.40+000 16450 7.6 02:23:03 35:56:14 DEFAULT No_Test stdHV No_Filter
5088304 12 DATA 2020-01-12 21:18:15 S30218+35-W0.40+000 16450 7.6 02:23:03 35:56:14 DEFAULT No_Test stdHV No_Filter
5088304 13 DATA 2020-01-12 21:18:57 S30218+35-W0.40+000 16450 7.6 02:23:03 35:56:14 DEFAULT No_Test stdHV No_Filter
5088304 14 DATA 2020-01-12 21:19:36 S30218+35-W0.40+000 16450 7.5 02:23:03 35:56:14 DEFAULT No_Test stdHV No_Filter
5088304 15 DATA 2020-01-12 21:20:17 S30218+35-W0.40+000 16450 7.5 02:23:03 35:56:14 DEFAULT No_Test stdHV No_Filter
5088304 16 DATA 2020-01-12 21:21:11 S30218+35-W0.40+000 16450 7.5 02:23:03 35:56:14 DEFAULT No_Test stdHV No_Filter
5088304 17 DATA 2020-01-12 21:21:50 S30218+35-W0.40+000 16450 7.5 02:23:03 35:56:14 DEFAULT No_Test stdHV No_Filter
5088304 18 DATA 2020-01-12 21:22:33 S30218+35-W0.40+000 16450 7.4 02:23:03 35:56:14 DEFAULT No_Test stdHV No_Filter
from utils.standardhandle import error, verbose, gettag, warning
from utils import options
##############################################################################
#
# readconf
#
##############################################################################
def readconf(file):
tag = gettag()
from os.path import exists
conf = None
try:
# Python 2.7
import ConfigParser
except ImportError as Error:
warning(tag, "Increasing to python 3 ConfigParser")
import configparser
conf = configparser.SafeConfigParser(allow_no_value=True)
else:
conf = ConfigParser.SafeConfigParser(allow_no_value=True)
try:
conf.read(file)
except ConfigParser.Error as NameError:
error(tag, NameError, 3)
verbose(tag, "sections of the config file are {0}".format(conf.sections()))
return conf
##############################################################################
#
# read_properties
#
##############################################################################
def read_properties(file):
tag = gettag()
""" To be used when config file has no header, creating a DUMMY header"""
import tempfile
from os import unlink
fname = None
with tempfile.NamedTemporaryFile(delete=False) as tf:
tf.write('[DUMMY]\n')
fname = tf.name
with open(file) as f:
tf.write(f.read())
tf.seek(0)
conf = readconf(tf.name)
unlink(tf.name)
return conf
##############################################################################
#
# cfg
#
##############################################################################
cfg = readconf(options.configfile)
#!/bin/env python
#SBATCH -p compute
#SBATCH --tasks=60
#SBATCH --nodes=2
#SBATCH --cpus-per-task=1
#SBATCH --mem-per-cpu=1600
#SBATCH -t 0-48:00
#SBATCH -o ./log/slurm.%j.%N.out
#SBATCH -e ./log/slurm.%j.%N.err
import sys
import subprocess
#subprocess.call(["print("Hellow world")"])
print("Hellow world")
filename = sys.argv[1]
outdir = sys.argv[2]
calibrationfile = sys.argv[3]
pedetalfile = sys.argv[4]
drivefile = sys.argv[5]
print(filename)
print(outdir)
print(calibrationfile)
print(pedetalfile)
print(drivefile)
This diff is collapsed.
#!/usr/bin/env python2.7
from subprocess import call, check_output
import xml.dom.minidom
import xmlhandle
import sys
""" Read the argument """
if len(sys.argv) != 2 or (sys.argv[1] != 'suspend' and sys.argv[1] != 'resume' and sys.argv[1] != 'kill'):
sys.stderr.write("Error. Usage osa_jobs.py <action>\n where action: suspend, resume, kill\n")
sys.exit(1)
argument = sys.argv[1]
""" Check the jobs in queue """
commandargs = ['qstat', '-x']
xml_output = check_output(commandargs)
""" We have to parse the xml """
document = xml.dom.minidom.parseString(xml_output)
queue_list = xmlhandle.xmlhandleData(document)
"""
The queue list contains element like
{'name': u'STDIN', 'jobhost': u'fcana2/0', 'cputime': u'00:00:00', 'jobid': 9412, 'state': u'C', 'exit': 0, 'walltime': u'00:00:17'}
"""
for job in queue_list:
if job['state']:
if job['state'] != 'C':
if (argument == 'suspend' and job['state'] != 'S') or (argument == 'resume' and job['state'] == 'S'):
print "{0}ing jobid {1}, {2}...".format(argument, job['jobid'], job['name']),
commandargs = ['qsig', '-s', argument, str(job['jobid'])]
rc = call(commandargs)
if rc == 0:
print "Success!"
else:
print "Failed!"
else:
print "Warning, no state for jobid {0}".format(job['jobid'])
from standardhandle import output, warning, verbose, error, gettag
import options, cliopts
import config
##############################################################################
#
# register_files
#
##############################################################################
def register_files(type, run_str, inputdir, prefix, pattern, suffix, outputdir):
tag = gettag()
from os.path import join, basename, exists, getmtime, getsize
from filecmp import cmp
from glob import glob
from re import search
from socket import gethostname
from datetime import datetime
from utils import get_md5sum_and_copy, magicdate_to_iso
from mysql import select_db, update_or_insert_and_select_id_db
from config import cfg
file_list = glob(join(inputdir, "{0}*{1}*{2}*{3}".format(prefix, run_str, pattern, suffix)))
verbose(tag, "File list is {0}".format(file_list))
hostname = gethostname()
# The default subrun index for most of the files
run = int(run_str.lstrip('0'))
subrun = 1
for inputf in file_list:
""" Next is the way of searching 3 digits after a dot in filenames """
subrunsearch = search('(?<=\.)\d\d\d_', basename(inputf))
if subrunsearch:
""" And strip the zeroes after getting the first 3 character """
subrun = subrunsearch.group(0)[0:3].lstrip('0')
outputf = join(outputdir, basename(inputf))
if exists(outputf) and cmp(inputf, outputf):
# Do nothing than acknowledging
verbose(tag, "Destination file {0} exists and it is identical to input".format(outputf))
else:
# There is no output file or it is different
verbose(tag, "Copying file {0}".format(outputf))
md5sum = get_md5sum_and_copy(inputf, outputf)
verbose(tag, "Resulting md5sum={0}".format(md5sum))
mtime = datetime.fromtimestamp(getmtime(outputf))
size = getsize(outputf)
# DB config parameters
s = cfg.get('MYSQL', 'SERVER')
u = cfg.get('MYSQL', 'USER')
d = cfg.get('MYSQL', 'DATABASE')
daqtable = cfg.get('MYSQL', 'DAQTABLE')
storagetable = cfg.get('MYSQL', 'STORAGETABLE')
sequencetable = cfg.get('MYSQL', 'SEQUENCETABLE')
# Get the other values of parent files querying the database
selections = ['ID', 'DAQ_ID', 'REPORT_ID']
conditions = {'DAQ_ID': "(SELECT ID FROM {0} WHERE\
RUN={1} AND SUBRUN={2} AND TELESCOPE='{3}')"\
.format(daqtable, run, subrun, options.tel_id)}
id_parent, daq_id, report_id = [None, None, None]
querymatrix = select_db(s, u, d, storagetable, selections,\
conditions)
# It could be that they don't exists
if len(querymatrix) != 0:
verbose(tag, "Resulting query={0}".format(querymatrix[0]))
id_parent, daq_id, report_id = querymatrix[0]
verbose(tag, "id_parent={0}, daq_id={1}, report_id={2}".\
format(id_parent, daq_id, report_id))
# Get also de sequence id
selections = ['ID']
conditions = {'TELESCOPE': options.tel_id, 'RUN': run}
querymatrix = select_db(s, u, d, sequencetable, selections,\
conditions)
sequence_id = None
if len(querymatrix) != 0:
sequence_id, = querymatrix[0]
verbose(tag, "sequence_id={0}".format(sequence_id))
# Finally we update or insert the file
night = magicdate_to_iso(options.date)
assignments = {'ID_PARENT': id_parent, 'DAQ_ID': daq_id,\
'REPORT_ID': report_id, 'SEQUENCE_ID':sequence_id,\
'NIGHT': night, 'TELESCOPE': options.tel_id,\
'HOSTNAME': hostname, 'FILE_TYPE': type, 'SIZE': size,\
'M_TIME': mtime, 'MD5SUM': md5sum}
conditions = {'FILE_PATH': outputf}
update_or_insert_and_select_id_db(s, u, d, storagetable,\
assignments, conditions)
##############################################################################
#
# register_run_concept_files()
#
##############################################################################
def register_run_concept_files(run_string, concept):
tag = gettag()
from os.path import join
from config import cfg
from utils import magicdate_to_dir
inputdir = options.directory
middledir = magicdate_to_dir(options.date)
outputdir = join(cfg.get(options.tel_id, concept + 'DIR'), middledir)
type = cfg.get('OSA', concept + 'TYPE')
prefix = cfg.get('OSA', concept + 'PREFIX')
pattern = cfg.get('OSA', concept + 'PATTERN')
suffix = cfg.get('OSA', concept + 'SUFFIX')
verbose(tag, "Registering {0} file for {2}*{1}*{3}*{4}"\
.format(type, run_string, prefix, pattern, suffix ))
register_files(type, run_string, inputdir, prefix, pattern, suffix, outputdir)
#!/usr/bin/env python2.7
##############################################################################
#
# sequencer.py
# Date: 12th January 2020
# Authors
# L. Saha <lab.saha@gmail.com>, D. Morcuende <morcuende@gae.ucm.es>
# A. Baquero <>, I. Aguado<>
# J. L. Contrera <>
# Last modified on:
# Credits: This script is written and modified following scripts from MAGIC OSA. Hence, a big portion
# of the credits goes to the authors of MAGIC OSA.
##############################################################################
#from utils import standardhandle
#from .. import utils
#import sys
#sys.path.append("..")
from osa.utils.standardhandle import output, verbose, warning, error, stringify, gettag
from osa.utils import options, cliopts
#report import start
# Only these functions will be considered when building the docs
__all__ = ["sequencer", "single_process", "stereo_process"]
##############################################################################
#
# sequencer
#
# This is the main script to be called in crontab
##############################################################################
#def sequencer():
def main():
"""Runs the sequencer
"""
tag = gettag()
from osa.reports.report import start
from osa.configs import config
process_mode = None
single_array = ['LST1', 'LST2']
print('TEl ID',options.tel_id)
start(tag)
if options.tel_id in single_array:
process_mode = 'single'
single_process(options.tel_id, process_mode)
else:
if options.tel_id == 'all':
process_mode = 'complete'
elif options.tel_id == 'ST':
process_mode = 'stereo'
sequence_LST1 = single_process('LST1', process_mode)
sequence_LST2 = single_process('LST2', process_mode)
sequence_ST = stereo_process('ST', sequence_M1, sequence_M2)
##############################################################################
#
# single_process
#
##############################################################################
def single_process(telescope, process_mode):
"""Runs the single process
Parameters
----------
telescope : str
Options: 'LST1', 'LST2' or 'ST'
process_mode : str
Options: 'single', 'stereo' or 'complete'
Returns
-------
sequence_list :
"""
tag = gettag()
""" This function processes everything for a single telescope """
from osa.nightsummary import extract
from osa.jobs import job
from osa.veto import veto
import dot
from osa.nightsummary.nightsummary import readnightsummary
from osa.reports.report import rule
from osa.autocloser.closer import is_day_closed
sequence_list = []
options.tel_id = telescope
options.directory = cliopts.set_default_directory_if_needed()
print("DIR",options.directory)
# options.directory = "./"
simulate_save = options.simulate
is_report_needed = True
if process_mode == 'single':
if is_day_closed():
output(tag, "Day {0} for {1} already closed".\
format(options.date, options.tel_id))
return sequence_list
else:
if process_mode == 'stereo':
""" Only simulation for single array required """
options.nightsum = True
options.simulate = True
is_report_needed = False
""" Building the sequences """
night = readnightsummary() # night corresponds to f.read()
subrun_list = extract.extractsubruns(night)
run_list = extract.extractruns(subrun_list)
# Modifies run_list by adding the seq and parent info into runs
sequence_list = extract.extractsequences(run_list)
# Workflow and Submission
# dot.writeworkflow(sequence_list)
# Adds the scripts
job.preparejobs(sequence_list, subrun_list)
queue_list = job.getqueuejoblist(sequence_list)
# veto_list = veto.getvetolist(sequence_list)
# closed_list = veto.getclosedlist(sequence_list)
# updatelstchainstatus(sequence_list)
# updatesequencedb(sequence_list)
# actually, submitjobs does not need the queue_list nor veto_list
# job_list = job.submitjobs(sequence_list, queue_list, veto_list)
# combine_muon(job_list)
# # Report
# if is_report_needed:
# insert_if_new_activity_db(sequence_list)
# updatesequencedb(sequence_list)
# rule()
# reportsequences(sequence_list)
#
# # Cleaning
# options.directory = None
# options.simulate = simulate_save
return sequence_list
##############################################################################
#
# stereo_process
#
##############################################################################
#def stereo_process(telescope, s1_list, s2_list):
# tag = gettag()
# import extract
# import dot
# import job
# import veto
# from report import start, rule
#
# options.tel_id = telescope
# options.directory = cliopts.set_default_directory_if_needed()
#
# # Building the sequences
# sequence_list = extract.extractsequencesstereo(s1_list, s2_list)
# # Workflow and Submission
# dot.writeworkflow(sequence_list)
# # Adds the scripts
# job.preparestereojobs(sequence_list)
# #job.preparedailyjobs(dailysrc_list)
# queue_list = job.getqueuejoblist(sequence_list)
# veto_list = veto.getvetolist(sequence_list)
# closed_list = veto.getclosedlist(sequence_list)
# updatelstchainstatus(sequence_list)
# # actually, submitjobs does not need the queue_list nor veto_list
# job_list = job.submitjobs(sequence_list, queue_list, veto_list)
# # Finalizing report
# insert_if_new_activity_db(sequence_list)
# updatesequencedb(sequence_list)
# rule()
# reportsequences(sequence_list)
# # Cleaning
# options.directory = None
# return sequence_list
##############################################################################
#
# updatelstchainstatus
#
##############################################################################
def updatelstchainstatus(seq_list):
tag = gettag()
from decimal import Decimal
for s in seq_list:
if s.type == 'CALIBRATION':
s.scalibstatus = int(Decimal( getlstchainforsequence(s, 'Scalib') * 100 ) / s.subruns)
elif s.type == 'DATA':
s.dl1status = int(Decimal( getlstchainforsequence(s, 'R0-DL1') * 100 ) / s.subruns)
s.dl2status = int(Decimal( getlstchainforsequence(s, 'DL1-DL2') * 100 ) / s.subruns)
elif s.type == 'STEREO':
s.dl2status = int(Decimal( getlstchainforsequence(s, 'DL1-DL2') * 100 ) )
s.dl3status = int(Decimal( getlstchainforsequence(s, 'DL2-DL3') * 100 ) )
##############################################################################
#
# getlstchainforsequence
#
##############################################################################
def getlstchainforsequence(s, program):
tag = gettag()
from os.path import join
from glob import glob
from config import cfg
prefix = cfg.get('LSTOSA', program + 'PREFIX')
pattern = cfg.get('LSTOSA', program + 'PATTERN')
suffix = cfg.get('LSTOSA', program + 'SUFFIX')
files = glob(join(options.directory, "{0}*{1}*{2}*{3}".format(prefix, s.run, pattern, suffix)))
numberoffiles = len(files)
verbose(tag, "Found {0} {1}ed for sequence name {2}".format(numberoffiles, program, s.jobname))
return numberoffiles
##############################################################################
#
# reportsequences
#
##############################################################################
def reportsequences(seqlist):
tag = gettag()
import config
matrix = []
header = ['Tel', 'Seq', 'Parent', 'Type', 'Run', 'Subruns', 'Source', 'Wobble', 'Action', 'Tries', 'JobID', 'State', 'Host', 'CPU_time', 'Walltime', 'Exit']
if options.tel_id == 'M1' or options.tel_id == 'M2':
header.append('_Y_%')
header.append('_D_%')
header.append('_I_%')
elif options.tel_id == 'ST':
header.append('_S_%')
header.append('_Q_%')
header.append('ODI%')
matrix.append(header)
for s in seqlist:
row_list = [s.telescope, s.seq, s.parent, s.type, s.run, s.subruns, s.source, s.wobble, s.action, s.tries, s.jobid, s.state, s.jobhost, s.cputime, s.walltime, s.exit]
if s.type == 'CALIBRATION':
row_list.append(None)
row_list.append(None)
row_list.append(None)
elif s.type == 'DATA':
row_list.append(s.sorcererstatus)
row_list.append(s.merppstatus)
row_list.append(s.starstatus)
elif s.type == 'STEREO':
row_list.append(s.superstarstatus)
row_list.append(s.melibeastatus)
row_list.append(s.odiestatus)
matrix.append(row_list)
padding = int(config.cfg.get('OUTPUT', 'PADDING')) # space chars inserted between columnns
prettyoutputmatrix(matrix, padding)
##############################################################################
#
# insert_if_new_activity_db
#
##############################################################################
def insert_if_new_activity_db(sequence_list):
tag = gettag()
import config
from datetime import datetime
from mysql import update_db, insert_db, select_db
server = config.cfg.get('MYSQL', 'SERVER')
user = config.cfg.get('MYSQL', 'USER')
database = config.cfg.get('MYSQL', 'DATABASE')
table = config.cfg.get('MYSQL', 'SUMMARYTABLE')
if len(sequence_list) != 0:
""" Declare the beginning of OSA activity """
start = datetime.now()
selections = ['ID']
conditions = {'NIGHT': options.date, 'TELESCOPE': options.tel_id,\
'ACTIVITY': 'LSTOSA'}
matrix = select_db(server, user, database, table, selections, conditions)
id = None
if matrix:
id = matrix[0][0]
if id and int(id) > 0:
""" Activity already started """
else:
""" Insert it into the database """
assignments = conditions
assignments['IS_FINISHED'] = 0
assignments['START'] = start
assignments['END'] = None
conditions = {}
insert_db(server, user, database, table, assignments, conditions)
##############################################################################
#
# updatesequencedb
#
##############################################################################
def updatesequencedb(seqlist):
tag = gettag()
import config
from mysql import update_db, insert_db, select_db
server = config.cfg.get('MYSQL', 'SERVER')
user = config.cfg.get('MYSQL', 'USER')
database = config.cfg.get('MYSQL', 'DATABASE')
table = config.cfg.get('MYSQL', 'SEQUENCETABLE')
for s in seqlist:
""" Fine tuning """
hostname = None
id_processor = None
if s.jobhost != None:
hostname, id_processor = s.jobhost.split('/')
""" Select ID if exists """
selections = ['ID']
conditions = {'TELESCOPE': s.telescope, 'NIGHT': s.night, 'ID_NIGHTLY':s.seq}
matrix = select_db(server, user, database, table, selections, conditions)
id = None
if matrix:
id = matrix[0][0]
verbose(tag, "To this sequence corresponds an entry in the {0} with ID {1}".format(table, id))
assignments = {\
'TELESCOPE': s.telescope,\
'NIGHT': s.night,\
'ID_NIGHTLY': s.seq,\
'TYPE': s.type,\
'RUN': s.run,\
'SUBRUNS': s.subruns,\
'SOURCEWOBBLE': s.sourcewobble,\
'ACTION': s.action,\
'TRIES': s.tries,\
'JOBID': s.jobid,\
'STATE': s.state,\
'HOSTNAME': hostname,\
'ID_PROCESSOR': id_processor,\
'CPU_TIME': s.cputime,\
'WALL_TIME': s.walltime,\
'EXIT_STATUS': s.exit,\
}
if s.type == 'CALIBRATION':
assignments.update({'PROGRESS_SCALIB': s.scalibstatus})
elif s.type == 'DATA':
assignments.update({\
'PROGRESS_SORCERER': s.sorcererstatus,\
'PROGRESS_SSIGNAL': s.ssignalstatus,\
'PROGRESS_MERPP': s.merppstatus,\
'PROGRESS_STAR': s.starstatus,\
'PROGRESS_STARHISTOGRAM': s.starhistogramstatus,\
})
elif s.type == 'STEREO':
assignments.update({\
'PROGRESS_SUPERSTAR': s.superstarstatus,\
'PROGRESS_SUPERSTARHISTOGRAM': s.superstarhistogramstatus,\
'PROGRESS_MELIBEA': s.melibeastatus,\
'PROGRESS_MELIBEAHISTOGRAM': s.melibeahistogramstatus,\
})
# TODO: Add this (Mireia)