provprocess.py 11.5 KB
Newer Older
1
2
3
4
"""
Provenance post processing script for OSA pipeline
"""

5
import copy
Jose Enrique.Ruiz's avatar
Jose Enrique.Ruiz committed
6
7
import shutil
from pathlib import Path, PurePath
8
9
10

import yaml

11
from osa.utils import cliopts, standardhandle
Jose Enrique.Ruiz's avatar
Jose Enrique.Ruiz committed
12
from provenance.capture import get_activity_id, get_file_hash
13
from provenance.io import *
14
15
16
17
from provenance.utils import get_log_config

provconfig = yaml.safe_load(get_log_config())
LOG_FILENAME = provconfig["handlers"]["provHandler"]["filename"]
18
PROV_PREFIX = provconfig["PREFIX"]
19
20


21
def copy_used_file(src, out):
22
    """Copy file used in process."""
23

Jose Enrique.Ruiz's avatar
Jose Enrique.Ruiz committed
24
25
    # check src file exists
    if not Path(src).is_file():
26
        standardhandle.warning(tag, f"{src} file cannot be accessed")
27

Jose Enrique.Ruiz's avatar
Jose Enrique.Ruiz committed
28
29
    hash_src = get_file_hash(src, buffer="content")
    filename = PurePath(src).name
Jose Enrique.Ruiz's avatar
Jose Enrique.Ruiz committed
30
    destpath = Path(out) / filename
Jose Enrique.Ruiz's avatar
Jose Enrique.Ruiz committed
31
    hash_out = ""
32

Jose Enrique.Ruiz's avatar
Jose Enrique.Ruiz committed
33
    # get hash and new name
Jose Enrique.Ruiz's avatar
Jose Enrique.Ruiz committed
34
35
    if destpath.exists():
        hash_out = get_file_hash(str(destpath), buffer="content")
Jose Enrique.Ruiz's avatar
Jose Enrique.Ruiz committed
36
        filename = filename + "_"
Jose Enrique.Ruiz's avatar
Jose Enrique.Ruiz committed
37
        destpath = Path(out) / filename
38

Jose Enrique.Ruiz's avatar
Jose Enrique.Ruiz committed
39
40
41
    # try copy file
    if hash_src != hash_out:
        try:
Jose Enrique.Ruiz's avatar
Jose Enrique.Ruiz committed
42
            shutil.copyfile(src, str(destpath))
43
            standardhandle.output(tag, f"copying {destpath}")
Jose Enrique.Ruiz's avatar
Jose Enrique.Ruiz committed
44
        except Exception as ex:
45
46
            standardhandle.warning(tag, f"could not copy {src} file into {str(destpath)}")
            standardhandle.warning(tag, f"{ex}")
47
48


49
def parse_lines_log(filter_step, run_number):
50
    """Filter content in log file to produce a run/process wise session log."""
51
52
53
54
    filtered = []
    with open(LOG_FILENAME, "r") as f:
        for line in f.readlines():
            ll = line.split(PROV_PREFIX)
55
56
57
            if len(ll) != 3:
                standardhandle.warning(tag, f"format {PROV_PREFIX} mismatch in log file {LOG_FILENAME}\n{line}")
                continue
58
59
60
            prov_str = ll.pop()
            prov_dict = yaml.safe_load(prov_str)
            keep = False
61
            session_tag = prov_dict.get("session_tag", "0:0")
62
            session_id = prov_dict.get("session_id", False)
63
            tag_activity, tag_run = session_tag.split(":")
64
            # filter by run
65
            if tag_run == run_number:
66
                keep = True
67
            # filter by activity
Jose Enrique.Ruiz's avatar
Jose Enrique.Ruiz committed
68
            if filter_step not in ["", tag_activity]:
69
                keep = False
70
71
72
73
74
75
76
            # always keep first session start
            if session_id and tag_run == run_number:
                keep = True
            # remove parallel sessions
            if session_id and len(filtered):
                keep = False
            if keep:
77
78
79
80
                filtered.append(line)
    return filtered


81
def parse_lines_run(filter_step, prov_lines, out):
82
    """Process provenance info to reduce session at run/process wise scope."""
83

84
    size = 0
85
    container = {}
86
    working_lines = []
87
88
89
    r0filepath_str = ""
    dl1filepath_str = ""
    dl2filepath_str = ""
90
    id_activity_run = ""
91
    end_time_line = ""
92
93
94
95
96
97
98
99
    for line in prov_lines:

        # get info
        remove = False
        endTime = line.get("endTime", "")
        session_id = line.get("session_id", "")
        activity_id = line.get("activity_id", "")
        filepath = line.get("filepath", "")
100
        used_role = line.get("used_role", "")
101
102
103
        generated_role = line.get("generated_role", "")
        parameters = line.get("parameters", "")
        name = line.get("name", "")
104
        content_type = line.get("contentType", "")
105
106
107
108
109
110
111
        used_id = line.get("used_id", "")

        # filter grain
        session_tag = line.get("session_tag", "0:0")
        tag_activity, tag_run = session_tag.split(":")
        if tag_activity != filter_step and not session_id:
            continue
112
113

        # remove subruns info
114
115
116
117
118
119
120
        if name == "DL1SubrunDataset":
            dl1filepath_str = filepath
        elif name == "DL2SubrunDataset":
            dl2filepath_str = filepath
        elif name == "R0SubrunDataset":
            r0filepath_str = filepath
        if "Subrun" in name or "subrun" in used_role or "subrun" in generated_role:
121
122
            remove = True
        if parameters and "ObservationSubRun" in parameters:
123
            del line["parameters"]["ObservationSubRun"]
124
125
126
127
128
129
130

        # remove sub-runs activities and info
        if name == filter_step and not id_activity_run:
            id_activity_run = get_activity_id()
        if name in container or used_id in container:
            remove = True
        if parameters and "parameters" in container:
131
            remove = True
132
133
134
135
136
137
138
139
140
141
142
        if name:
            container[name] = True
        if used_id:
            container[used_id] = True
        if parameters:
            container["parameters"] = True
        if endTime:
            remove = True
            end_time_line = line
            size += 1

143
        # replace with new run-wise activity_id
144
145
        if activity_id:
            line["activity_id"] = id_activity_run
146

147
148
        # copy used files not subruns not RFs
        if filepath and content_type != "application/x-spss-sav" and not remove:
149
            copy_used_file(filepath, out)
150

Jose Enrique.Ruiz's avatar
Jose Enrique.Ruiz committed
151
152
153
        if not remove:
            working_lines.append(line)

154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
    # append collection run used and generated at endtime line of last activitiy
    if end_time_line:
        working_lines.append(end_time_line)
        if r0filepath_str and filter_step == "r0_to_dl1":
            entity_id = get_file_hash(r0filepath_str, buffer="path")
            r0filepath_str = r0filepath_str.replace(PurePath(r0filepath_str).name, "")
            used = {"entity_id": entity_id}
            used.update({"name": "R0Collection"})
            used.update({"type": "SetCollection"})
            used.update({"size": size})
            used.update({"filepath": r0filepath_str})
            working_lines.append(used)
            used = {"activity_id": id_activity_run}
            used.update({"used_id": entity_id})
            used.update({"used_role": "R0 Collection"})
            working_lines.append(used)
        if dl1filepath_str:
            dl1filepath_str = dl1filepath_str.replace(PurePath(dl1filepath_str).name, "")
            entity_id = get_file_hash(dl1filepath_str, buffer="path")
            dl1 = {"entity_id": entity_id}
            dl1.update({"name": "DL1Collection"})
            dl1.update({"type": "SetCollection"})
            dl1.update({"size": size})
            dl1.update({"filepath": dl1filepath_str})
            working_lines.append(dl1)
        if dl1filepath_str and filter_step == "r0_to_dl1":
            generated = {"activity_id": id_activity_run}
            generated.update({"generated_id": entity_id})
            generated.update({"generated_role": "DL1 Collection"})
            working_lines.append(generated)
        if dl1filepath_str and filter_step == "dl1_to_dl2":
            used = {"activity_id": id_activity_run}
            used.update({"used_id": entity_id})
            used.update({"used_role": "DL1 Collection"})
            working_lines.append(used)
        if dl2filepath_str and filter_step == "dl1_to_dl2":
            entity_id = get_file_hash(dl2filepath_str, buffer="path")
            dl2filepath_str = dl2filepath_str.replace(PurePath(dl2filepath_str).name, "")
            used = {"entity_id": entity_id}
            used.update({"name": "DL2Collection"})
            used.update({"type": "SetCollection"})
            used.update({"size": size})
            used.update({"filepath": dl2filepath_str})
            working_lines.append(used)
            used = {"activity_id": id_activity_run}
            used.update({"generated_id": entity_id})
            used.update({"generated_role": "DL2 Collection"})
            working_lines.append(used)
202
    # remove start session line
203
    else:
204
205
        working_lines = []

206
    return working_lines
207
208


209
210
211
212
213
214
215
216
def produce_provenance():
    """Create run-wise provenance products as JSON logs and graphs according to granularity."""

    # create prov products for each granularity level
    r0_to_dl1_processed_lines = []
    dl1_to_dl2_processed_lines = []
    for grain, fold in GRANULARITY.items():

217
        processed_lines = []
218
219
220
221
222
        # derive destination folder
        step_path = Path(fold) / options.datefolder / options.subfolder

        # check destination folder exists
        if not step_path.exists():
223
            standardhandle.error(tag, f"Path {step_path} does not exist", 2)
224
225
226

        # make folder log/ if does not exist
        outpath = step_path / "log"
227
        outpath.mkdir(exist_ok=True)
228
229
230
231
232
233
234
235
236
237
238
239
240

        # define paths for prov products
        log_path = outpath / f"{grain}_{base_filename}.log"
        json_filepath = outpath / f"{grain}_{base_filename}.json"
        graph_filepath = outpath / f"{grain}_{base_filename}.pdf"

        # process temp log file
        if grain != "r0_to_dl2":
            processed_lines = parse_lines_run(grain, read_prov(filename=session_log_filename), str(outpath))
        if grain == "r0_to_dl1":
            r0_to_dl1_processed_lines = copy.deepcopy(processed_lines)
        if grain == "dl1_to_dl2":
            dl1_to_dl2_processed_lines = copy.deepcopy(processed_lines)
241
        if grain == "r0_to_dl2" and r0_to_dl1_processed_lines and dl1_to_dl2_processed_lines:
242
243
244
            processed_lines = r0_to_dl1_processed_lines + dl1_to_dl2_processed_lines[1:]

        if processed_lines:
245
246
247
248
            # make filtered session log file
            with open(log_path, "w") as f:
                for line in processed_lines:
                    f.write(f"{line}\n")
249
            standardhandle.output(tag, f"creating {log_path}")
Jose Enrique.Ruiz's avatar
Jose Enrique.Ruiz committed
250
            provdoc = provlist2provdoc(processed_lines)
251
252
            # make json
            try:
Jose Enrique.Ruiz's avatar
Jose Enrique.Ruiz committed
253
                provdoc2json(provdoc, str(json_filepath))
254
                standardhandle.output(tag, f"creating {json_filepath}")
255
256
257
258
259
            except Exception as ex:
                standardhandle.error(tag, f"problem while creating json: {ex}", 2)
            # make graph
            try:
                provdoc2graph(provdoc, str(graph_filepath), "pdf")
260
                standardhandle.output(tag, f"creating {graph_filepath}")
261
262
263
264
            except Exception as ex:
                standardhandle.error(tag, f"problem while creating graph: {ex}", 2)


265
if __name__ == "__main__":
266

267
268
    # provprocess.py
    # 02006
269
    # v0.4.3_v00
270
    # -c cfg/sequencer.cfg
271
    # -f r0_to_dl1
272
    # -q
273
    options, tag = cliopts.provprocessparsing()
274

275
    from osa.configs.config import cfg
276

277
278
279
    pathRO = cfg.get("LST1", "RAWDIR")
    pathDL1 = cfg.get("LST1", "ANALYSISDIR")
    pathDL2 = cfg.get("LST1", "DL2DIR")
280
    GRANULARITY = {"r0_to_dl1": pathDL1, "dl1_to_dl2": pathDL2, "r0_to_dl2": pathDL2}
281
282
    if options.filter:
        GRANULARITY = {options.filter: GRANULARITY[options.filter]}
283

284
285
286
    # check LOG_FILENAME exists
    if not Path(LOG_FILENAME).exists():
        standardhandle.error(tag, f"file {LOG_FILENAME} does not exist", 2)
287

288
    # check LOG_FILENAME is not empty
289
290
    if not Path(LOG_FILENAME).stat().st_size:
        standardhandle.warning(tag, f"file {LOG_FILENAME} is empty")
291
292
        exit()

293
294
295
296
    # build base_filename
    base_filename = f"{options.run}_prov"
    session_log_filename = f"{base_filename}.log"

297
298
    # parse LOG_FILENAME content for a specific run / process
    parsed_content = parse_lines_log(options.filter, options.run)
299
300

    # create temporal session log file
301
    with open(session_log_filename, "w") as f:
302
303
        for line in parsed_content:
            f.write(line)
304

305
306
307
308
309
310
311
    try:
        # create run-wise JSON logs and graphs for each
        produce_provenance()
    finally:
        # remove temporal session log file
        remove_session_log_file = Path(session_log_filename)
        remove_session_log_file.unlink()
312
313

    # remove LOG_FILENAME
314
315
316
    if options.quit:
        remove_log_file = Path(LOG_FILENAME)
        remove_log_file.unlink()