Commit b6dd25f4 authored by Daniel Morcuende's avatar Daniel Morcuende
Browse files

Merge branch 'simulate' into 'master'

Simulate data processing

See merge request contrera/lstosa!78
parents cecfe9d7 01754e7e
......@@ -81,4 +81,4 @@ target
.mypy_cache
# Ignore NightSummary directory
./NightSummary
NightSummary/
......@@ -23,7 +23,8 @@ def datasequence(args):
historysuffix = cfg.get('LSTOSA', 'HISTORYSUFFIX')
sequenceprebuild = join(options.directory, f'sequence_{options.tel_id}_{run_str}')
historyfile = sequenceprebuild + historysuffix
level, rc = historylevel(historyfile, 'DATA')
level, rc = (3, 0) if options.simulate else historylevel(historyfile, 'DATA')
verbose(tag, f"Going to level {level}")
if level == 3:
......@@ -82,6 +83,9 @@ def r0_to_dl1(
historyfile
"""
if options.simulate:
return 0
import subprocess
from os.path import join, basename
from osa.configs.config import cfg
......@@ -142,6 +146,9 @@ def dl1_to_dl2(run_str, historyfile):
historyfile
"""
if options.simulate:
return 0
import subprocess
from os.path import join, basename
from osa.configs.config import cfg
......
......@@ -288,7 +288,7 @@ def guesscorrectinputcard(s):
return(options.configfile)
def createjobtemplate(s):
def createjobtemplate(s, get_content=False):
"""This file contains instruction to be submitted to torque"""
tag = gettag()
......@@ -416,6 +416,9 @@ def createjobtemplate(s):
if not options.simulate:
iofile.writetofile(s.script, content)
if get_content:
return content
# def submitjobs(sequence_list, queue_list, veto_list):
def submitjobs(sequence_list):
......
"""
Simulate executions of data processing pipeline and produce provenance
"""
import logging
import multiprocessing as mp
import subprocess
from pathlib import Path
import yaml
from datamodel import SequenceData
from osa.jobs.job import createjobtemplate
from osa.nightsummary import extract
from osa.nightsummary.nightsummary import readnightsummary
from osa.utils import cliopts, options
from osa.utils.utils import lstdate_to_number
from provenance.utils import get_log_config
CONFIG_FLAGS = {"Go": True, "TearDL1": False, "TearDL2": False}
provconfig = yaml.safe_load(get_log_config())
LOG_FILENAME = provconfig["handlers"]["provHandler"]["filename"]
def do_setup():
"""Set-up folder structure and check flags."""
from osa.configs.config import cfg
pathDL1 = Path(cfg.get("LST1", "ANALYSISDIR")) / options.directory
pathDL2 = Path(cfg.get("LST1", "DL2DIR")) / options.directory
pathDL1sub = pathDL1 / options.prod_id
pathDL2sub = pathDL2 / options.prod_id
if not pathDL1.exists():
CONFIG_FLAGS["Go"] = False
logging.info(f"Folder {pathDL1} does not exist.")
return
if not pathDL2.exists():
CONFIG_FLAGS["Go"] = False
logging.info(f"Folder {pathDL2} does not exist.")
return
if Path(LOG_FILENAME).exists() and not options.append:
CONFIG_FLAGS["Go"] = False
logging.info(f"File {LOG_FILENAME} already exists.")
logging.info(f"You must rename/remove {LOG_FILENAME} to produce a clean provenance.")
logging.info(f"You can also set --append flag to append captured provenance.")
return
CONFIG_FLAGS["TearDL1"] = False if pathDL1sub.exists() or options.provenance else pathDL1sub
CONFIG_FLAGS["TearDL2"] = False if pathDL2sub.exists() or options.provenance else pathDL2sub
if options.provenance and not options.force:
if pathDL1sub.exists():
CONFIG_FLAGS["Go"] = False
logging.info(f"Folder {pathDL1sub} already exist.")
if pathDL2sub.exists():
CONFIG_FLAGS["Go"] = False
logging.info(f"Folder {pathDL2sub} already exist.")
if not CONFIG_FLAGS["Go"]:
logging.info(f"You must enforce provenance files overwrite with --force flag.")
return
pathDL1sub.mkdir(exist_ok=True)
pathDL2sub.mkdir(exist_ok=True)
def tear_down():
"""Tear down created temporal folders."""
if isinstance(CONFIG_FLAGS["TearDL1"], Path):
CONFIG_FLAGS["TearDL1"].rmdir()
if isinstance(CONFIG_FLAGS["TearDL2"], Path):
CONFIG_FLAGS["TearDL2"].rmdir()
def parse_template(template, idx):
"""Parse batch templates."""
args = []
keep = False
for line in template.splitlines():
if keep:
line = line.replace("'", "")
line = line.replace(",", "")
line = line.replace(r"{0}.format(str(subruns).zfill(4))", str(idx).zfill(4))
if "--stdout=" in line or "--stderr" in line or "srun" in line:
continue
if "--prod_id" in line:
args.append("-s")
args.append(line.strip())
if line.startswith("subprocess.call"):
keep = True
args.pop()
return args
def simulate_subrun_processing(args):
"""Simulate subrun processing."""
run_str, subrun_idx = args[17].split(".")
logging.info(f"Simulating process call for run {run_str} subrun {subrun_idx}")
subprocess.run(args)
def simulate_processing():
"""Simulate daily processing and capture provenance."""
options.mode = "P"
options.simulate = True
night_content = readnightsummary()
logging.info(f"Night summary file content\n{night_content}")
sub_run_list = extract.extractsubruns(night_content)
run_list = extract.extractruns(sub_run_list)
sequence_list = extract.extractsequences(run_list)
# skip drs4 and calibration
for s in sequence_list:
processed = False
if not isinstance(s, SequenceData):
continue
for sl in s.subrun_list:
if sl.runobj.type != "DATA":
continue
with mp.Pool() as pool:
args_ds = [
parse_template(createjobtemplate(s, get_content=True), subrun_idx)
for subrun_idx in range(sl.subrun)
]
processed = pool.map(simulate_subrun_processing, args_ds)
# produce prov if overwrite prov arg
if processed and options.provenance:
args_pp = [
"python",
"provprocess.py",
"-c",
options.configfile,
s.run_str,
options.directory,
options.prod_id,
]
logging.info(f"Processing provenance for run {s.run_str}")
subprocess.run(args_pp)
if __name__ == "__main__":
format = "%(asctime)s: %(message)s"
logging.basicConfig(level=logging.INFO, format=format)
options, tag = cliopts.simprocparsing()
options.directory = lstdate_to_number(options.date)
logging.info(f"Running simulate processing")
do_setup()
if CONFIG_FLAGS["Go"]:
simulate_processing()
tear_down()
......@@ -218,6 +218,8 @@ def datasequencecliparsing(command):
help = "file for standard error")
parser.add_option("--stdout", action = "store", type = "string", dest = "stdout",
help = "file for standard output")
parser.add_option("-s", "--simulate", action = "store_true", dest = "simulate", default = False,
help = "do not submit sequences as jobs")
parser.add_option("--prod_id", action = "store", type = str, dest = "prod_id",
help="Set the prod_id variable which defines data directories")
......@@ -233,6 +235,7 @@ def datasequencecliparsing(command):
options.verbose = opts.verbose
options.warning = opts.warning
options.compressed = opts.compressed
options.simulate = opts.simulate
options.prod_id = opts.prod_id
# The standardhandle has to be declared here, since verbose and warnings are options from the cli
......@@ -524,6 +527,46 @@ def provprocessparsing():
return options, tag
##############################################################################
#
# simprocparsing
#
##############################################################################
def simprocparsing():
tag = standardhandle.gettag()
message = "Usage: %prog [-c CONFIGFILE] [-p] [--force] [--append] <YYYY_MM_DD> <vX.X.X_vXX> <TEL_ID>\n" \
"Run script from OSA root folder.\n\n" \
"Arguments:\n" \
"<YYYY_MM_DD> date analysis folder name for derived datasets\n" \
"<vX.X.X_vXX> software version and prod subfolder name\n" \
"<TEL_ID> telescope ID (i.e. LST1, ST,..)\n"
parser = OptionParser(usage=message)
parser.add_option("-c", "--config", action="store", dest="configfile", default="cfg/sequencer.cfg",
help="use specific config file [default cfg/sequencer.cfg]")
parser.add_option("-p", action="store_true", dest="provenance",
help="produce provenance files")
parser.add_option("--force", action="store_true", dest="force",
help="force overwrite provenance files")
parser.add_option("--append", action="store_true", dest="append",
help="append provenance capture to existing prov.log file")
# Parse the command line
(opts, args) = parser.parse_args()
# Checking arguments
if len(args) != 3:
standardhandle.error(tag, "incorrect number of arguments, type -h for help", 2)
# Set global variables
options.date = args[0]
options.prod_id = args[1]
options.tel_id = args[2]
options.configfile = opts.configfile
options.provenance = opts.provenance
options.force = opts.force
options.append = opts.append
return options, tag
##############################################################################
#
# set_default_date_if_needed
......
......@@ -43,18 +43,20 @@ def getnightdirectory():
tag = gettag()
from os.path import join, exists
from osa.configs.config import cfg
from lstchain.version import get_version
verbose(tag, f"Getting analysis path for tel_id {options.tel_id}")
nightdir = lstdate_to_dir(options.date)
options.lstchain_version = 'v' + get_version()
options.prod_id = options.lstchain_version + '_' + cfg.get('LST1', 'VERSION')
if not options.prod_id:
from lstchain.version import get_version
options.lstchain_version = 'v' + get_version()
options.prod_id = options.lstchain_version + '_' + cfg.get('LST1', 'VERSION')
directory = join(
cfg.get(options.tel_id, 'ANALYSISDIR'),
nightdir, options.prod_id
)
if not exists(directory):
if options.nightsum and options.tel_id != 'ST':
error(tag, f"Night directory {directory} does not exists!", 2)
......
......@@ -205,7 +205,7 @@ def get_file_hash(str_path, buffer=get_hash_buffer(), method=get_hash_method()):
return hash_func.hexdigest()
def get_entity_id(value, item, buffer="content"):
def get_entity_id(value, item):
"""Helper function that makes the id of an entity, depending on its type."""
try:
......@@ -224,7 +224,9 @@ def get_entity_id(value, item, buffer="content"):
# filename = Path(value) / index
# return get_file_hash(filename)
if entity_type == "File":
return get_file_hash(value, buffer=buffer)
# osa specific hash path
# async calls does not allow for hash content
return get_file_hash(value, buffer="path")
try:
entity_id = abs(hash(value) + hash(str(value)))
......@@ -276,7 +278,7 @@ def get_nested_value(nested, branch):
return val
def get_item_properties(nested, item, buffer="content"):
def get_item_properties(nested, item):
"""Helper function that returns properties of an entity or member."""
try:
......@@ -312,7 +314,7 @@ def get_item_properties(nested, item, buffer="content"):
except AttributeError as ex:
logger.warning(f"{ex} for {value}")
if value and "id" not in properties:
properties["id"] = get_entity_id(value, item, buffer=buffer)
properties["id"] = get_entity_id(value, item)
if "File" in entity_type:
properties["filepath"] = value
if properties["id"] != value:
......@@ -408,7 +410,7 @@ def get_derivation_records(class_instance, activity):
}
records.append(log_record)
traced_entities[var] = (new_id, item)
logger.warning(f"Derivation detected by {activity} for {var}. ID: {new_id}")
logger.warning(f"Derivation detected in {activity} for {var}. ID: {new_id}")
return records
......@@ -467,7 +469,7 @@ def log_generation(class_instance, activity, activity_id):
generation_list = definition["activities"][activity]["generation"] or []
for item in generation_list:
props = get_item_properties(class_instance, item, buffer="path")
props = get_item_properties(class_instance, item)
if "id" in props:
entity_id = props.pop("id")
# record generation
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment