Commit 99d892c8 authored by Jose Enrique.Ruiz's avatar Jose Enrique.Ruiz
Browse files

fix parsing mixed info coming from parallel jobs

parent 5951dc0f
......@@ -81,17 +81,17 @@ def parse_lines_log(filter_step, run_number):
def parse_lines_run(filter_step, prov_lines, out):
"""Process provenance info to reduce session at run/process wise scope."""
i = 0
size = 0
container = {}
working_lines = []
r0filepath_str = ""
dl1filepath_str = ""
dl2filepath_str = ""
id_activity_run = ""
end_time_line = ""
for line in prov_lines:
# get info
i += 1
remove = False
endTime = line.get("endTime", "")
session_id = line.get("session_id", "")
......@@ -102,101 +102,105 @@ def parse_lines_run(filter_step, prov_lines, out):
parameters = line.get("parameters", "")
name = line.get("name", "")
content_type = line.get("contentType", "")
used_id = line.get("used_id", "")
# filter grain
session_tag = line.get("session_tag", "0:0")
tag_activity, tag_run = session_tag.split(":")
if tag_activity != filter_step and not session_id:
continue
# remove subruns info
if name == "R0SubrunDataset" or used_role == "Observation subrun":
if filepath:
r0filepath_str = filepath
remove = True
if (
name == "DL1SubrunDataset"
or generated_role == "DL1 subrun dataset"
or used_role == "DL1 subrun dataset"
):
if filepath:
dl1filepath_str = filepath
remove = True
if name == "DL2SubrunDataset" or generated_role == "DL2 subrun dataset":
if filepath:
dl2filepath_str = filepath
if name == "DL1SubrunDataset":
dl1filepath_str = filepath
elif name == "DL2SubrunDataset":
dl2filepath_str = filepath
elif name == "R0SubrunDataset":
r0filepath_str = filepath
if "Subrun" in name or "subrun" in used_role or "subrun" in generated_role:
remove = True
if parameters and "ObservationSubRun" in parameters:
del line["parameters"]["ObservationSubRun"]
# group subruns activities into a single run-wise activity
if name == filter_step:
size += 1
if not id_activity_run:
id_activity_run = get_activity_id()
if size > 1:
# remove sub-runs activities and info
if name == filter_step and not id_activity_run:
id_activity_run = get_activity_id()
if name in container or used_id in container:
remove = True
if parameters and "parameters" in container:
remove = True
if name:
container[name] = True
if used_id:
container[used_id] = True
if parameters:
container["parameters"] = True
if endTime:
remove = True
end_time_line = line
size += 1
# replace with new run-wise activity_id
if activity_id:
line["activity_id"] = id_activity_run
# filter grain
session_tag = line.get("session_tag", "0:0")
tag_activity, tag_run = session_tag.split(":")
if tag_activity != filter_step:
remove = True
# copy used files not subruns not RFs
if filepath and content_type != "application/x-spss-sav" and not remove:
copy_used_file(filepath, out)
# always keep first line / session start
if session_id:
remove = False
# append collection run used and generated at endtime line of last activitiy
if endTime and i == len(prov_lines) and size:
if r0filepath_str and filter_step == "r0_to_dl1":
entity_id = get_file_hash(r0filepath_str, buffer="path")
r0filepath_str = r0filepath_str.replace(PurePath(r0filepath_str).name, "")
used = {"entity_id": entity_id}
used.update({"name": "R0Collection"})
used.update({"type": "SetCollection"})
used.update({"size": size})
used.update({"filepath": r0filepath_str})
working_lines.append(used)
used = {"activity_id": id_activity_run}
used.update({"used_id": entity_id})
used.update({"used_role": "R0 Collection"})
working_lines.append(used)
if dl1filepath_str:
entity_id = get_file_hash(dl1filepath_str, buffer="path")
dl1filepath_str = dl1filepath_str.replace(PurePath(dl1filepath_str).name, "")
dl1 = {"entity_id": entity_id}
dl1.update({"name": "DL1Collection"})
dl1.update({"type": "SetCollection"})
dl1.update({"size": size})
dl1.update({"filepath": dl1filepath_str})
working_lines.append(dl1)
if dl1filepath_str and filter_step == "r0_to_dl1":
generated = {"activity_id": id_activity_run}
generated.update({"generated_id": entity_id})
generated.update({"generated_role": "DL1 Collection"})
working_lines.append(generated)
if dl1filepath_str and filter_step == "dl1_to_dl2":
used = {"activity_id": id_activity_run}
used.update({"used_id": entity_id})
used.update({"used_role": "DL1 Collection"})
working_lines.append(used)
if dl2filepath_str and filter_step == "dl1_to_dl2":
entity_id = get_file_hash(dl2filepath_str, buffer="path")
dl2filepath_str = dl2filepath_str.replace(PurePath(dl2filepath_str).name, "")
used = {"entity_id": entity_id}
used.update({"name": "DL2Collection"})
used.update({"type": "SetCollection"})
used.update({"size": size})
used.update({"filepath": dl2filepath_str})
working_lines.append(used)
used = {"activity_id": id_activity_run}
used.update({"generated_id": entity_id})
used.update({"generated_role": "DL2 Collection"})
working_lines.append(used)
if not remove:
working_lines.append(line)
# append collection run used and generated at endtime line of last activitiy
if end_time_line:
working_lines.append(end_time_line)
if r0filepath_str and filter_step == "r0_to_dl1":
entity_id = get_file_hash(r0filepath_str, buffer="path")
r0filepath_str = r0filepath_str.replace(PurePath(r0filepath_str).name, "")
used = {"entity_id": entity_id}
used.update({"name": "R0Collection"})
used.update({"type": "SetCollection"})
used.update({"size": size})
used.update({"filepath": r0filepath_str})
working_lines.append(used)
used = {"activity_id": id_activity_run}
used.update({"used_id": entity_id})
used.update({"used_role": "R0 Collection"})
working_lines.append(used)
if dl1filepath_str:
dl1filepath_str = dl1filepath_str.replace(PurePath(dl1filepath_str).name, "")
entity_id = get_file_hash(dl1filepath_str, buffer="path")
dl1 = {"entity_id": entity_id}
dl1.update({"name": "DL1Collection"})
dl1.update({"type": "SetCollection"})
dl1.update({"size": size})
dl1.update({"filepath": dl1filepath_str})
working_lines.append(dl1)
if dl1filepath_str and filter_step == "r0_to_dl1":
generated = {"activity_id": id_activity_run}
generated.update({"generated_id": entity_id})
generated.update({"generated_role": "DL1 Collection"})
working_lines.append(generated)
if dl1filepath_str and filter_step == "dl1_to_dl2":
used = {"activity_id": id_activity_run}
used.update({"used_id": entity_id})
used.update({"used_role": "DL1 Collection"})
working_lines.append(used)
if dl2filepath_str and filter_step == "dl1_to_dl2":
entity_id = get_file_hash(dl2filepath_str, buffer="path")
dl2filepath_str = dl2filepath_str.replace(PurePath(dl2filepath_str).name, "")
used = {"entity_id": entity_id}
used.update({"name": "DL2Collection"})
used.update({"type": "SetCollection"})
used.update({"size": size})
used.update({"filepath": dl2filepath_str})
working_lines.append(used)
used = {"activity_id": id_activity_run}
used.update({"generated_id": entity_id})
used.update({"generated_role": "DL2 Collection"})
working_lines.append(used)
# remove start session line
if len(working_lines) == 1:
else:
working_lines = []
return working_lines
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment