Commit e7724e18 authored by Daniel Morcuende's avatar Daniel Morcuende
Browse files

Merge branch 'fix-prov' into 'master'

Fix provenance serialization in parallel jobs

See merge request contrera/lstosa!77
parents 6ecbf233 99d892c8
......@@ -510,7 +510,7 @@ def provprocessparsing():
# Checking arguments
if len(args) != 3:
standardhandle.error(tag, "incorrect number of arguments, type -h for help", 2)
if opts.filter != "r0_to_dl1" and opts.filter != "dl1_to_dl2" and opts.filter != "":
if opts.filter not in ["r0_to_dl1", "dl1_to_dl2", ""]:
standardhandle.error(tag, "incorrect value for --filter argument, type -h for help", 2)
# Set global variables
......
......@@ -183,13 +183,13 @@ def get_hash_buffer():
def get_file_hash(str_path, buffer=get_hash_buffer(), method=get_hash_method()):
"""Helper function that returns hash of the content of a file."""
file_hash = ""
full_path = Path(str_path)
if full_path.is_file():
hash_func = getattr(hashlib, method)()
if buffer == "content":
block_size = 65536
hash_func = getattr(hashlib, method)()
if buffer == "content":
if full_path.is_file():
with open(full_path, "rb") as f:
block_size = 65536
buf = f.read(block_size)
while len(buf) > 0:
hash_func.update(buf)
......@@ -197,16 +197,15 @@ def get_file_hash(str_path, buffer=get_hash_buffer(), method=get_hash_method()):
file_hash = hash_func.hexdigest()
logger.debug(f"File entity {str_path} has {method} hash {file_hash}")
return file_hash
elif "path":
hash_func.update(str(full_path).encode())
hash_path = hash_func.hexdigest()
return hash_path
else:
logger.warning(f"File entity {str_path} not found")
return str_path
else:
logger.warning(f"File entity {str_path} not found")
return str_path
if not file_hash:
hash_func.update(str(full_path).encode())
return hash_func.hexdigest()
def get_entity_id(value, item):
def get_entity_id(value, item, buffer="content"):
"""Helper function that makes the id of an entity, depending on its type."""
try:
......@@ -217,14 +216,15 @@ def get_entity_id(value, item):
entity_name = ""
entity_type = ""
if entity_type == "FileCollection":
filename = value
index = definition["entities"][entity_name].get("index", "")
if Path(value).is_dir() and index:
filename = Path(value) / index
return get_file_hash(filename)
# gammapy specific
# if entity_type == "FileCollection":
# filename = value
# index = definition["entities"][entity_name].get("index", "")
# if Path(filename).is_dir() and index:
# filename = Path(value) / index
# return get_file_hash(filename)
if entity_type == "File":
return get_file_hash(value)
return get_file_hash(value, buffer=buffer)
try:
entity_id = abs(hash(value) + hash(str(value)))
......@@ -276,7 +276,7 @@ def get_nested_value(nested, branch):
return val
def get_item_properties(nested, item):
def get_item_properties(nested, item, buffer="content"):
"""Helper function that returns properties of an entity or member."""
try:
......@@ -312,7 +312,7 @@ def get_item_properties(nested, item):
except AttributeError as ex:
logger.warning(f"{ex} for {value}")
if value and "id" not in properties:
properties["id"] = get_entity_id(value, item)
properties["id"] = get_entity_id(value, item, buffer=buffer)
if "File" in entity_type:
properties["filepath"] = value
if properties["id"] != value:
......@@ -467,7 +467,7 @@ def log_generation(class_instance, activity, activity_id):
generation_list = definition["activities"][activity]["generation"] or []
for item in generation_list:
props = get_item_properties(class_instance, item)
props = get_item_properties(class_instance, item, buffer="path")
if "id" in props:
entity_id = props.pop("id")
# record generation
......@@ -613,7 +613,6 @@ def get_system_provenance():
# version=get_info_version(),
# dependencies=get_info_dependencies(),
# envvars=get_info_envvar(),
# gammapy specific
executable=sys.executable,
platform=dict(
architecture_bits=bits,
......
......@@ -18,7 +18,7 @@ provconfig = yaml.safe_load(get_log_config())
PROV_PREFIX = provconfig["PREFIX"]
DEFAULT_NS = "id" # "logprov"
__all__ = ["provlist2provdoc", "provdoc2graph", "read_prov"]
__all__ = ["provlist2provdoc", "provdoc2graph", "provdoc2json", "read_prov"]
def provlist2provdoc(provlist):
......@@ -201,6 +201,12 @@ def provdoc2graph(provdoc, filename, fmt):
f.write(content)
def provdoc2json(provdoc, filename):
json_stream = provdoc.serialize(indent=4)
with open(filename, "w") as f:
f.write(json_stream)
def read_prov(filename="prov.log", start=None, end=None):
"""Read a list of provenance dictionaries from the logfile."""
......
......@@ -50,7 +50,7 @@ def select_config(tmp_path):
def make_args_r0_to_dl1():
args = (
return (
"calibration.Run2006.0000.hdf5",
"drs4_pedestal.Run2005.0000.fits",
"time_calibration",
......@@ -62,16 +62,14 @@ def make_args_r0_to_dl1():
"02006.0002",
"/fefs/aswg/data/real/DL1/20200218/v0.4.3_v00/sequence_LST1_02006.0000.txt",
)
return args
def make_args_dl1_to_dl2():
args = (
return (
"02006.0002",
"/fefs/aswg/data/real/DL1/20200218/v0.4.3_v00/sequence_LST1_02006.0000.txt",
)
return args
def test_trace_r0_to_dl2(tmp_path):
......@@ -88,7 +86,7 @@ def test_trace_r0_to_dl2(tmp_path):
# make json
json_filepath = tmp_path / "prov.json"
provdoc = provlist2provdoc(read_prov(filename="prov.log"))
provdoc.serialize(str(json_filepath), indent=4)
provdoc2json(provdoc, str(json_filepath))
# make graph
png_filepath = tmp_path / "prov.pdf"
......
......@@ -59,33 +59,39 @@ def parse_lines_log(filter_step, run_number):
prov_dict = yaml.safe_load(prov_str)
keep = False
session_tag = prov_dict.get("session_tag", "0:0")
session_id = prov_dict.get("session_id", False)
tag_activity, tag_run = session_tag.split(":")
# filter by run
if tag_run == run_number:
keep = True
if filter_step != "" and filter_step != tag_activity:
# filter by activity
if filter_step not in ["", tag_activity]:
keep = False
# always keep first line / session start
if keep or not filtered:
# always keep first session start
if session_id and tag_run == run_number:
keep = True
# remove parallel sessions
if session_id and len(filtered):
keep = False
if keep:
filtered.append(line)
return filtered
# TODO: add used DL1DL2Collection, generated DL2Collection
#
def parse_lines_run(filter_step, prov_lines, out):
"""Process provenance info to reduce session at run/process wise scope."""
i = 0
size = 0
container = {}
working_lines = []
r0filepath_str = ""
dl1filepath_str = ""
dl2filepath_str = ""
id_activity_run = ""
end_time_line = ""
for line in prov_lines:
# get info
i += 1
remove = False
endTime = line.get("endTime", "")
session_id = line.get("session_id", "")
......@@ -96,101 +102,105 @@ def parse_lines_run(filter_step, prov_lines, out):
parameters = line.get("parameters", "")
name = line.get("name", "")
content_type = line.get("contentType", "")
used_id = line.get("used_id", "")
# filter grain
session_tag = line.get("session_tag", "0:0")
tag_activity, tag_run = session_tag.split(":")
if tag_activity != filter_step and not session_id:
continue
# remove subruns info
if name == "R0SubrunDataset" or used_role == "Observation subrun":
if filepath:
r0filepath_str = filepath
remove = True
if (
name == "DL1SubrunDataset"
or generated_role == "DL1 subrun dataset"
or used_role == "DL1 subrun dataset"
):
if filepath:
dl1filepath_str = filepath
remove = True
if name == "DL2SubrunDataset" or generated_role == "DL2 subrun dataset":
if filepath:
dl2filepath_str = filepath
if name == "DL1SubrunDataset":
dl1filepath_str = filepath
elif name == "DL2SubrunDataset":
dl2filepath_str = filepath
elif name == "R0SubrunDataset":
r0filepath_str = filepath
if "Subrun" in name or "subrun" in used_role or "subrun" in generated_role:
remove = True
if parameters and "ObservationSubRun" in parameters:
del line["parameters"]["ObservationSubRun"]
# group subruns activities into a single run-wise activity
if name == filter_step:
size += 1
if not id_activity_run:
id_activity_run = get_activity_id()
if size > 1:
# remove sub-runs activities and info
if name == filter_step and not id_activity_run:
id_activity_run = get_activity_id()
if name in container or used_id in container:
remove = True
if parameters and "parameters" in container:
remove = True
if name:
container[name] = True
if used_id:
container[used_id] = True
if parameters:
container["parameters"] = True
if endTime:
remove = True
end_time_line = line
size += 1
# replace with new run-wise activity_id
if activity_id:
line["activity_id"] = id_activity_run
# filter grain
session_tag = line.get("session_tag", "0:0")
tag_activity, tag_run = session_tag.split(":")
if tag_activity != filter_step:
remove = True
# copy used files not subruns not RFs
if filepath and content_type != "application/x-spss-sav" and not remove:
copy_used_file(filepath, out)
# always keep first line / session start
if session_id:
remove = False
# append collection run used and generated at endtime line of last activitiy
if endTime and i == len(prov_lines) and size:
if r0filepath_str and filter_step == "r0_to_dl1":
entity_id = get_file_hash(r0filepath_str, buffer="path")
r0filepath_str = r0filepath_str.replace(PurePath(r0filepath_str).name, "")
used = {"entity_id": entity_id}
used.update({"name": "R0Collection"})
used.update({"type": "SetCollection"})
used.update({"size": size})
used.update({"filepath": r0filepath_str})
working_lines.append(used)
used = {"activity_id": id_activity_run}
used.update({"used_id": entity_id})
used.update({"used_role": "R0 Collection"})
working_lines.append(used)
if dl1filepath_str:
entity_id = get_file_hash(dl1filepath_str, buffer="path")
dl1filepath_str = dl1filepath_str.replace(PurePath(dl1filepath_str).name, "")
dl1 = {"entity_id": entity_id}
dl1.update({"name": "DL1Collection"})
dl1.update({"type": "SetCollection"})
dl1.update({"size": size})
dl1.update({"filepath": dl1filepath_str})
working_lines.append(dl1)
if dl1filepath_str and filter_step == "r0_to_dl1":
generated = {"activity_id": id_activity_run}
generated.update({"generated_id": entity_id})
generated.update({"generated_role": "DL1 Collection"})
working_lines.append(generated)
if dl1filepath_str and filter_step == "dl1_to_dl2":
used = {"activity_id": id_activity_run}
used.update({"used_id": entity_id})
used.update({"used_role": "DL1 Collection"})
working_lines.append(used)
if dl2filepath_str and filter_step == "dl1_to_dl2":
entity_id = get_file_hash(dl2filepath_str, buffer="path")
dl2filepath_str = dl2filepath_str.replace(PurePath(dl2filepath_str).name, "")
used = {"entity_id": entity_id}
used.update({"name": "DL2Collection"})
used.update({"type": "SetCollection"})
used.update({"size": size})
used.update({"filepath": dl2filepath_str})
working_lines.append(used)
used = {"activity_id": id_activity_run}
used.update({"generated_id": entity_id})
used.update({"generated_role": "DL2 Collection"})
working_lines.append(used)
if not remove:
working_lines.append(line)
# append collection run used and generated at endtime line of last activitiy
if end_time_line:
working_lines.append(end_time_line)
if r0filepath_str and filter_step == "r0_to_dl1":
entity_id = get_file_hash(r0filepath_str, buffer="path")
r0filepath_str = r0filepath_str.replace(PurePath(r0filepath_str).name, "")
used = {"entity_id": entity_id}
used.update({"name": "R0Collection"})
used.update({"type": "SetCollection"})
used.update({"size": size})
used.update({"filepath": r0filepath_str})
working_lines.append(used)
used = {"activity_id": id_activity_run}
used.update({"used_id": entity_id})
used.update({"used_role": "R0 Collection"})
working_lines.append(used)
if dl1filepath_str:
dl1filepath_str = dl1filepath_str.replace(PurePath(dl1filepath_str).name, "")
entity_id = get_file_hash(dl1filepath_str, buffer="path")
dl1 = {"entity_id": entity_id}
dl1.update({"name": "DL1Collection"})
dl1.update({"type": "SetCollection"})
dl1.update({"size": size})
dl1.update({"filepath": dl1filepath_str})
working_lines.append(dl1)
if dl1filepath_str and filter_step == "r0_to_dl1":
generated = {"activity_id": id_activity_run}
generated.update({"generated_id": entity_id})
generated.update({"generated_role": "DL1 Collection"})
working_lines.append(generated)
if dl1filepath_str and filter_step == "dl1_to_dl2":
used = {"activity_id": id_activity_run}
used.update({"used_id": entity_id})
used.update({"used_role": "DL1 Collection"})
working_lines.append(used)
if dl2filepath_str and filter_step == "dl1_to_dl2":
entity_id = get_file_hash(dl2filepath_str, buffer="path")
dl2filepath_str = dl2filepath_str.replace(PurePath(dl2filepath_str).name, "")
used = {"entity_id": entity_id}
used.update({"name": "DL2Collection"})
used.update({"type": "SetCollection"})
used.update({"size": size})
used.update({"filepath": dl2filepath_str})
working_lines.append(used)
used = {"activity_id": id_activity_run}
used.update({"generated_id": entity_id})
used.update({"generated_role": "DL2 Collection"})
working_lines.append(used)
# remove start session line
if len(working_lines) == 1:
else:
working_lines = []
return working_lines
......@@ -232,13 +242,15 @@ def produce_provenance():
processed_lines = r0_to_dl1_processed_lines + dl1_to_dl2_processed_lines[1:]
if processed_lines:
# copy session log file to its log folder
shutil.copyfile(session_log_filename, log_path)
# make filtered session log file
with open(log_path, "w") as f:
for line in processed_lines:
f.write(f"{line}\n")
standardhandle.output(tag, f"creating {log_path}")
provdoc = provlist2provdoc(processed_lines)
# make json
try:
provdoc = provlist2provdoc(processed_lines)
provdoc.serialize(str(json_filepath), indent=4)
provdoc2json(provdoc, str(json_filepath))
standardhandle.output(tag, f"creating {json_filepath}")
except Exception as ex:
standardhandle.error(tag, f"problem while creating json: {ex}", 2)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment