Coverage for oc_meta / run / find / merged_entities.py: 0%
100 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 17:25 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 17:25 +0000
1import argparse
2import csv
3import json
4import os
5import zipfile
6from collections import defaultdict
7from concurrent.futures import ProcessPoolExecutor, as_completed
9import yaml
10from rich.console import Console
11from rich_argparse import RichHelpFormatter
12from tqdm import tqdm
14console = Console()
16PROV_DERIVED_FROM = "http://www.w3.org/ns/prov#wasDerivedFrom"
17PROV_SPECIALIZATION_OF = "http://www.w3.org/ns/prov#specializationOf"
20def extract_entity_from_snapshot(snapshot_uri: str) -> str:
21 return snapshot_uri.split("/prov/")[0]
24def find_prov_files(rdf_dir: str, entity_type: str) -> list[str]:
25 entity_dir = os.path.join(rdf_dir, entity_type)
26 prov_files = []
28 for root, _, files in os.walk(entity_dir):
29 for f in files:
30 if f == "se.zip":
31 prov_files.append(os.path.join(root, f))
33 return prov_files
36def process_prov_file(prov_file: str) -> list[tuple[str, str]]:
37 results = []
39 try:
40 with zipfile.ZipFile(prov_file, "r") as zf:
41 with zf.open("se.json") as f:
42 data = json.load(f)
43 except (zipfile.BadZipFile, json.JSONDecodeError, KeyError):
44 return results
46 for graph in data:
47 for entity in graph.get("@graph", []):
48 derived_from = entity.get(PROV_DERIVED_FROM, [])
49 if len(derived_from) < 2:
50 continue
52 specialization = entity.get(PROV_SPECIALIZATION_OF, [])
53 if not specialization:
54 continue
56 surviving_entity = specialization[0]["@id"]
58 for derived in derived_from:
59 derived_snapshot = derived["@id"]
60 derived_entity = extract_entity_from_snapshot(derived_snapshot)
62 if derived_entity != surviving_entity:
63 results.append((surviving_entity, derived_entity))
65 return results
68def build_merge_graph(
69 merge_results: list[tuple[str, str]],
70) -> dict[str, str]:
71 merged_to_surviving: dict[str, str] = {}
73 for surviving, merged in merge_results:
74 merged_to_surviving[merged] = surviving
76 return merged_to_surviving
79def find_final_surviving(entity: str, merged_to_surviving: dict[str, str]) -> str:
80 current = entity
81 visited = {entity}
83 while current in merged_to_surviving:
84 next_entity = merged_to_surviving[current]
85 if next_entity in visited:
86 break
87 visited.add(next_entity)
88 current = next_entity
90 return current
93def group_by_final_surviving(
94 merged_to_surviving: dict[str, str],
95) -> dict[str, list[str]]:
96 final_to_merged: dict[str, list[str]] = defaultdict(list)
98 for merged_entity in merged_to_surviving.keys():
99 final = find_final_surviving(merged_entity, merged_to_surviving)
100 final_to_merged[final].append(merged_entity)
102 return dict(final_to_merged)
105def main():
106 parser = argparse.ArgumentParser(
107 description="Find all merged entities and reconstruct merge chains from provenance files",
108 formatter_class=RichHelpFormatter,
109 )
110 parser.add_argument(
111 "-c", "--config", required=True, help="Path to meta configuration YAML file"
112 )
113 parser.add_argument(
114 "-o", "--output", required=True, help="Output CSV file path"
115 )
116 parser.add_argument(
117 "--entity-type",
118 choices=["br", "ra", "id", "ar", "re"],
119 required=True,
120 help="Entity type to search",
121 )
122 parser.add_argument(
123 "--workers",
124 type=int,
125 default=4,
126 help="Number of parallel workers",
127 )
128 args = parser.parse_args()
130 with open(args.config) as f:
131 config = yaml.safe_load(f)
133 rdf_dir = os.path.join(config["output_rdf_dir"], "rdf")
135 console.print(f"Scanning for provenance files in: {rdf_dir}/{args.entity_type}")
136 prov_files = find_prov_files(rdf_dir, args.entity_type)
137 console.print(f"Found {len(prov_files)} provenance files")
139 all_results: list[tuple[str, str]] = []
141 with ProcessPoolExecutor(max_workers=args.workers) as executor:
142 futures = {executor.submit(process_prov_file, f): f for f in prov_files}
144 for future in tqdm(as_completed(futures), total=len(futures), desc="Processing files"):
145 results = future.result()
146 all_results.extend(results)
148 console.print(f"Found {len(all_results)} merge derivations")
150 merged_to_surviving = build_merge_graph(all_results)
151 console.print(f"Found {len(merged_to_surviving)} merged entities")
153 final_to_merged = group_by_final_surviving(merged_to_surviving)
154 console.print(f"Found {len(final_to_merged)} surviving entities")
156 with open(args.output, "w", newline="") as f:
157 writer = csv.DictWriter(f, fieldnames=["surviving_entity", "merged_entities"])
158 writer.writeheader()
159 for surviving, merged_list in final_to_merged.items():
160 writer.writerow({
161 "surviving_entity": surviving,
162 "merged_entities": "; ".join(merged_list),
163 })
165 console.print(f"Output written to: {args.output}")
168if __name__ == "__main__":
169 main()