Commit ff2e2ea0 authored by Lab Saha's avatar Lab Saha
Browse files

Merge branch 'master' into 'master'

Update

See merge request contrera/lstosa!8
parents e1c213a9 c03ab637
# Configuration file for the On-Site Analysis (OSA) for the MAGIC observatory
#
# The squared enclosed tags are sections needed by different modules,
# so python does not override variables with the same name in different sections
# it just make life easier for the programming
#
# The %()s variables are wildcards for string substitutions
#
[ENV]
SBATCHBIN = srun -N 1 -n 1
#SBATCHBIN =
PYTHONBIN = /usr/bin
[LSTOSA]
#HOMEDIR = /fefs/aswg
HOMEDIR = /home/lab.saha
LSTCHAINDIR = /home/lab.saha
CONFIGFILE = %(HOMEDIR)s/
PYTHONDIR = %(HOMEDIR)s/lst-osa
#CALIBDIR = %(HOMEDIR)s/data/real/calibration/v000
#PEDESTALDIR =%(CALIBDIR)s
#DRIVEDIR = /fefs/home/lapp/DrivePositioning
#R0-DL1 = lstchain_data_r0_to_dl
R0-DL1 = hello.py
#BINDIR = %(HOMEDIR)s/bin
# Temporal storage in working node local disk
#SCRATCHDIR = /scratch
# Applications
CALIBRATION = calibration
#COMBINEMUON = combinemuons
GRAPH = dot
# File type pattern for DB
# Prefixes
SCALIBPREFIX = calibration.Run
SSIGNALPREFIX = drs4_pedestal.Run
#DRIVEPREFIX =
R0-DL1PREFIX = DL1
DL1-DL2PREFIX = DL2
ENDOFACTIVITYPREFIX = NightFinished
NIGHTSUMMARYPREFIX = NightSummary
WORKFLOWPREFIX = Workflow
MUONPREFIX = Muon
INCIDENCESPREFIX = Incidences
# HDF5 file patterns
SCALIBPATTERN =
SSIGNALPATTERN =
R0-DL1PATTERN = DL1
DL1-DL2PATTERN = DL2
# Suffixes
HDF5SUFFIX = .hdf5
FITSSUFFIX = .fits
SCALIBSUFFIX = %(HDF5SUFFIX)s
SSIGNALSUFFIX = %(FITSSUFFIX)s
PEDESTALSUFFIX = %(FITSSUFFIX)s
DRIVESUFFIX = .txt
R0-DL1SUFFIX = %(HDF5SUFFIX)s
DL1-DL2SUFFIX = %(FITSSUFFIX)s
# Other file suffixes
RAWSUFFIX = .raw
REPORTSUFFIX = .rep
COMPRESSEDSUFFIX = .fz
GRAPHSUFFIX = .dot
SCRIPTSUFFIX = .py
TEXTSUFFIX = .txt
SVGSUFFIX = .svg
HISTORYSUFFIX = .history
VETOSUFFIX = .veto
CLOSEDSUFFIX = .closed
# Night summary specifics
NIGHTSUMMARYSCRIPT = %(PYTHONDIR)s/create_nightsummary.py
# MAXTRYFAILED is the amount of tries each littlesequence job is allowed to run before being vetoed
MAXTRYFAILED = 2
[PROGRAM]
PEDESTAL = onsite_create_drs4_pedestal_file
CALIBRATION = onsite_create_calibration_file
[LST]
DATESEPARATOR = _
# NIGHTOFFSET sets the amount of hours after midnight for the default OSA date to be the current date, 4 = 04:00:00 UTC, -4 = 20:00:00 UTC day before
NIGHTOFFSET = -3
[LST1]
#DIR = /fefs/aswg/lst-osa
DIR = /home/lab.saha/lst-osa
RAWDIR = %(DIR)s/R0
REPORTDIR = %(DIR)s/OSA/CC
#DRIVEDIR = /fefs/home/lapp/DrivePositioning
DRIVEDIR = %(DIR)s/DrivePositioning
CALIBDIR = %(DIR)s/calibration
PEDESTALDIR = %(DIR)s/calibration
ANALYSISDIR = %(DIR)s/data/real/dl1
CLOSERDIR = %(DIR)s/lst-osa/Closer
ENDOFRAWTRANSFERDIR = %(DIR)s/Reports/Compress
CALIBRATIONCONFIGCARD = onsite_camera_calibration_param.json
CONFIGCARD =
[LST2]
DIR = /fefs/aswg/lst-osa
RAWDIR = %(DIR)s/R0
REPORTDIR = %(DIR)s/OSA/CC
CALIBDIR = %(DIR)s/calibration
PEDESTALDIR = %(DIR)s/calibration
ANALYSISDIR = %(DIR)s/data/real/dl1
CLOSERDIR = %(DIR)s/lst-osa/Closer
ENDOFRAWTRANSFERDIR = %(DIR)s/Reports/Compress
CONFIGCARD =
[ST]
DIR = %(HOMEDIR)s/data/real
RAWDIR = %(DIR)s/R0
REPORTDIR = %(DIR)s/OSA/CC
CALIBDIR = %(DIR)s/calibration
PEDESTALDIR = %(DIR)s/calibration
ANALYSISDIR = %(DIR)s/data/real/dl1
CLOSERDIR = %(DIR)s/lst_osa/Closer
ENDOFRAWTRANSFERDIR = %(DIR)s/Reports/Compress
CONFIGCARD =
#
[NONFATALRCS]
R0-DL1 = 0,2
DL1-DL2 = 0,2
[MYSQL]
SERVER = lstconatiner
USER = analysis
DATABASE = LST
DAQTABLE = DAQ
SEQUENCETABLE = SEQUENCE
ANALYSISTABLE = ANALYSIS
SUMMARYTABLE = SUMMARY
STORAGETABLE = STORAGE
NIGHTTIMES = NIGHTTIMES
[REMOTE]
STATISTICSHOST = polaris.gae.ucm.es
STATISTICSSSHPORT = 24
STATISTICSUSER = tomcat
STATISTICSDIR = /polaris2/data/tomcat/statistics-web
STATISTICSSUFFIX = .osa-finished
[OUTPUT]
# REPORTWIDTH is the width in characters of the heading frame for the output
REPORTWIDTH = 154
# Number of charachters padding the columns
PADDING = 2
[ALARM]
ONSITEEMAIL = gae-lst-onsite@gae.ucm.es
ONLINEEMAIL = gae-lst-onsite@gae.ucm.es
from utils.standardhandle import error, verbose, gettag, warning
from utils import options
##############################################################################
#
# readconf
#
##############################################################################
def readconf(file):
tag = gettag()
from os.path import exists
conf = None
try:
# Python 2.7
import ConfigParser
except ImportError as Error:
warning(tag, "Increasing to python 3 ConfigParser")
import configparser
conf = configparser.SafeConfigParser(allow_no_value=True)
else:
conf = ConfigParser.SafeConfigParser(allow_no_value=True)
try:
conf.read(file)
except ConfigParser.Error as NameError:
error(tag, NameError, 3)
verbose(tag, "sections of the config file are {0}".format(conf.sections()))
return conf
##############################################################################
#
# read_properties
#
##############################################################################
def read_properties(file):
tag = gettag()
""" To be used when config file has no header, creating a DUMMY header"""
import tempfile
from os import unlink
fname = None
with tempfile.NamedTemporaryFile(delete=False) as tf:
tf.write('[DUMMY]\n')
fname = tf.name
with open(file) as f:
tf.write(f.read())
tf.seek(0)
conf = readconf(tf.name)
unlink(tf.name)
return conf
##############################################################################
#
# cfg
#
##############################################################################
cfg = readconf(options.configfile)
from . import jobs
from . import nightsummary
from . import reports
from . import sequencers
from . import rawcopy
from . import utils
from . import autocloser
from . import configs
from . import veto
#!/usr/bin/env python2.7
##############################################################################
#
# closer.py
#
#
##############################################################################
from osa.utils.standardhandle import output, verbose, warning, error, stringify, gettag
__all__ = ["closer", "is_day_closed", "use_night_summary",
"is_raw_data_available", "is_sequencer_successful", "notify_sequencer_errors",
"ask_for_closing", "notify_neither_data_nor_reason_given", "post_process",
"post_process_files","set_closed_in_db", "set_closed_in_analysis_db",
"set_closed_in_summary_db", "set_closed_with_file", "ape", "is_finished_check",
"synchronize_remote", "setclosedfilename"]
##############################################################################
#
# closer
#
##############################################################################
def closer():
tag = gettag()
import datetime
import options, cliopts
from report import start
from nightsummary import readnightsummary
from utils import is_defined
""" Initiating report. """
start(tag)
""" Starting the algorithm. """
if is_day_closed():
# Exit
error(tag, "Night {0} already closed for {1}"\
.format(options.date, options.tel_id), 1)
else:
# Proceed
if options.seqtoclose != None:
output(tag, "Closing sequence {0}".format(options.seqtoclose))
sequencer_tuple = []
if options.reason != None:
# No data
warning(tag, "No data found")
sequencer_tuple = [False, []]
if is_defined(options.reason):
# Good user, proceed automatically
pass
else:
# Notify and Ask for closing and a reason
notify_neither_data_nor_reason_given()
ask_for_closing()
ask_for_reason()
# Proceed with a reason
elif is_raw_data_available() or use_night_summary():
# Proceed normally
verbose(tag, "Checking sequencer_tuple {0}".format(sequencer_tuple))
night_summary_output = readnightsummary()
sequencer_tuple = is_finished_check(night_summary_output)
if is_sequencer_successful(sequencer_tuple):
# Close automatically
pass
else:
# Notify and ask for closing
notify_sequencer_errors()
ask_for_closing()
else:
error(tag, "Never thought about this possibility,\
please check the code", 9)
post_process(sequencer_tuple)
##############################################################################
#
# is_day_closed
#
##############################################################################
def is_day_closed():
tag = gettag()
""" Get the name and Check for the existence of the Closer flag file. """
from os.path import exists
from osa.utils.utils import getlockfile
answer = False
flag_file = getlockfile()
if exists(flag_file):
answer = True
return answer
##############################################################################
#
#
#
##############################################################################
def use_night_summary():
tag = gettag()
""" Check for the usage of night summary option and file existance. """
from os.path import exists
from nightsummary import getnightsummaryfile
answer = False
if options.nightsum:
night_file = getnightsummaryfile()
if exists(night_file):
answer = True
else:
output(tag, "Night Summary expected but it does not exists.")
output(tag, "Please check it or use the -r option\
to give a reason.")
error(tag, "Night Summary missing and no reason option", 2)
return answer
##############################################################################
#
#
#
##############################################################################
def is_raw_data_available():
tag = gettag()
""" For the moment we are happy to get the rawdir and check existance.
This means the raw directory could be empty! """
from os.path import isdir
from raw import get_check_rawdir
answer = False
if options.tel_id != 'ST':
raw_dir = get_check_rawdir()
if isdir(raw_dir):
answer = True
else:
answer = True
return answer
##############################################################################
#
#
#
##############################################################################
def is_sequencer_successful(seq_tuple):
tag = gettag()
""" A strange way to check it.
TODO: implement a more reliable non-intrusive than is_finished_check.
"""
answer = seq_tuple[0]
return answer
##############################################################################
#
#
#
##############################################################################
def notify_sequencer_errors():
tag = gettag()
""" A good notification helps the user on deciding to close it or not. """
output(tag, "Sequencer did not complete or finish unsuccesfully")
pass
##############################################################################
#
#
#
##############################################################################
def ask_for_closing():
tag = gettag()
""" A True (Y/y) closes, while False(N/n) answer stops the program. """
import sys
answer = False
if options.noninteractive:
""" In not-interactive mode, we assume the answer is yes. """
pass
else:
answer_check = False
while not answer_check:
try:
question = 'Close that day? (y/n): '
if options.simulate:
question += '[SIMULATE ongoing] '
answer_user = raw_input(question)
except KeyboardInterrupt:
print ('')
warning(tag, "Program quitted by user. No answer")
sys.exit(1)
except EOFError as ErrorValue:
error(tag, "End of file not expected", ErrorValue)
else:
answer_check = True
if answer_user == 'n' or answer_user == 'N':
# The user does not want to close
output(tag, "Day {0} for {1} will remain open".\
format(options.date, options.tel_id))
sys.exit(0)
elif answer_user == 'y' or answer_user == 'Y':
continue
else:
warning(tag, "Answer not understood, please type y or n")
answer_check = False
##############################################################################
#
# notify_neither_data_nor_reason_given
#
##############################################################################
def notify_neither_data_nor_reason_given():
tag = gettag()
""" Message informing the user of the situation """
output(tag, "There is no data and you did not enter any reason for that")
##############################################################################
#
# post_process
#
##############################################################################
def post_process(seq_tuple):
tag = gettag()
""" Set of last instructions. """
from report import finished_assignments, finished_text
seq_list = seq_tuple[1]
analysis_dict = finished_assignments(seq_list)
analysis_text = finished_text(analysis_dict)
post_process_files(seq_list)
set_closed_in_db(analysis_dict)
if options.seqtoclose == None:
is_closed = set_closed_with_file(analysis_text)
return is_closed
return False
##############################################################################
#
# post_process_files
#
##############################################################################
def post_process_files(seq_list):
tag = gettag()
""" The hard job of executing the last tasks for files. """
from os.path import join, basename, exists, islink
from os import rename, unlink
from filecmp import cmp
from glob import glob
from re import search
from config import cfg
from veto import createclosed
from utils import magicdate_to_dir, make_directory
from register import register_run_concept_files
concept_set = []
if options.tel_id == 'LST1' or options.tel_id == 'LST2':
concept_set = ['SCALIB', 'SSIGNAL', 'SORCERER', 'MERPP',\
'STARHISTOGRAM', 'STAR']
elif options.tel_id == 'ST':
concept_set = ['SUPERSTAR', 'SUPERSTARHISTOGRAM', 'MELIBEA', 'MELIBEAHISTOGRAM']
middle_dir = magicdate_to_dir(options.date)
root_files = glob(join(options.directory,\
'*{0}'.format(cfg.get('OSA', 'ROOTSUFFIX'))))
root_set = set(root_files)
pattern = None
for concept in concept_set:
output(tag, "Processing {0} files, {1} files left".format(concept, len(root_set)))
if cfg.get('OSA', concept + 'PREFIX'):
pattern = cfg.get('OSA', concept + 'PREFIX')
else:
pattern = cfg.get('OSA', concept + 'PATTERN')
dir = join(cfg.get(options.tel_id, concept + 'DIR'), middle_dir)
delete_set = set()
verbose(tag, "Checking if {0} files need to be moved to {1}".format(concept, dir))
for r in root_set:
r_basename = basename(r)
pattern_found = search(pattern, r_basename)
verbose(tag, "Was pattern {0} found in {1} ?: {2}"\
.format(pattern, r_basename, pattern_found))
if options.seqtoclose != None:
seqtoclose_found = search(options.seqtoclose, r_basename)
verbose(tag, "Was pattern {0} found in {1} ?: {2}"\
.format(options.seqtoclose, r_basename, seqtoclose_found))
if seqtoclose_found == None:
pattern_found = None
if pattern_found != None:
new_dst = join(dir, r_basename)
if not options.simulate:
make_directory(dir)
if exists(new_dst):
if islink(r):
# Delete because the link has been correctly copied
verbose(tag, "Original file {0} is just a link".format(r))
if options.seqtoclose == None:
verbose(tag, "Deleting {0}".format(r))
unlink(r)
elif cmp(r, new_dst):
# Delete
verbose(tag, "Destination file exists and it is equal to {0}".format(r))
if options.seqtoclose == None:
verbose(tag, "Deleting {0}".format(r))
unlink(r)
else:
warning(tag, "Original file {0} is not a link or is different than destination {1}".format(r, new_dst))
else:
verbose(tag, "Destination file {0} does not exists".format(new_dst))
for s in seq_list:
verbose(tag, "Looking for {0}".format(s))
run_str_found = search(s.run_str, r_basename)
if run_str_found != None:
# Register and delete
verbose(tag, "Registering file {0}".format(run_str_found))
register_run_concept_files(s.run_str, concept)
if options.seqtoclose == None:
unlink(r)
setclosedfilename(s)
createclosed(s.closed)
break
delete_set.add(r)
root_set -= delete_set
##############################################################################
#
# set_closed_in_db
#
##############################################################################
def set_closed_in_db(ana_dict):
tag = gettag()
""" Prepare the calls for the different tables. """
import config
servername = config.cfg.get('MYSQL', 'SERVER')
username = config.cfg.get('MYSQL', 'USER')
database = config.cfg.get('MYSQL', 'DATABASE')
if options.seqtoclose == None:
set_closed_in_analysis_db(servername, username, database, ana_dict)
# the next line triggers the transfer to PIC, if the day and telescope is
# closed in the summary database it will look into the storage database to
# get the the list of files.
set_closed_in_summary_db(servername, username, database, ana_dict)
##############################################################################
#
#
#
##############################################################################
def set_closed_in_analysis_db(servername, username, database, ana_dict):
tag = gettag()
""" Insert the analysis key=value into the database. """
from os.path import exists, join
from config import cfg, read_properties
from mysql import insert_ignore_db
table = cfg.get('MYSQL', 'ANALYSISTABLE')
incidences_file = join(options.directory,\
cfg.get('OSA', 'INCIDENCESPREFIX') + cfg.get('OSA', 'TEXTSUFFIX'))
assignments = dict()
assignments.update(ana_dict)
if exists(incidences_file):
""" Add the incidences file """
incidences_cfg = read_properties(incidences_file)
assignments['COMMENTS'] = incidences_cfg.get('DUMMY', 'COMMENTS')
del assignments['RAW_GB']
del assignments['FILES_RAW']
del assignments['END']
conditions = {}
insert_ignore_db(servername, username, database, table, assignments,\
conditions)
##############################################################################
#
#
#
##############################################################################
def set_closed_in_summary_db(servername, username, database, ana_dict):
tag = gettag()
""" Insert the analysis key=value into the database. """
import config
from mysql import update_or_insert_and_select_id_db
table = config.cfg.get('MYSQL', 'SUMMARYTABLE')
conditions = {'ACTIVITY': 'OSA'}
for i in<