Coverage for oc_meta / run / find / merged_entities.py: 0%

100 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-03 17:25 +0000

1import argparse 

2import csv 

3import json 

4import os 

5import zipfile 

6from collections import defaultdict 

7from concurrent.futures import ProcessPoolExecutor, as_completed 

8 

9import yaml 

10from rich.console import Console 

11from rich_argparse import RichHelpFormatter 

12from tqdm import tqdm 

13 

14console = Console() 

15 

16PROV_DERIVED_FROM = "http://www.w3.org/ns/prov#wasDerivedFrom" 

17PROV_SPECIALIZATION_OF = "http://www.w3.org/ns/prov#specializationOf" 

18 

19 

20def extract_entity_from_snapshot(snapshot_uri: str) -> str: 

21 return snapshot_uri.split("/prov/")[0] 

22 

23 

24def find_prov_files(rdf_dir: str, entity_type: str) -> list[str]: 

25 entity_dir = os.path.join(rdf_dir, entity_type) 

26 prov_files = [] 

27 

28 for root, _, files in os.walk(entity_dir): 

29 for f in files: 

30 if f == "se.zip": 

31 prov_files.append(os.path.join(root, f)) 

32 

33 return prov_files 

34 

35 

36def process_prov_file(prov_file: str) -> list[tuple[str, str]]: 

37 results = [] 

38 

39 try: 

40 with zipfile.ZipFile(prov_file, "r") as zf: 

41 with zf.open("se.json") as f: 

42 data = json.load(f) 

43 except (zipfile.BadZipFile, json.JSONDecodeError, KeyError): 

44 return results 

45 

46 for graph in data: 

47 for entity in graph.get("@graph", []): 

48 derived_from = entity.get(PROV_DERIVED_FROM, []) 

49 if len(derived_from) < 2: 

50 continue 

51 

52 specialization = entity.get(PROV_SPECIALIZATION_OF, []) 

53 if not specialization: 

54 continue 

55 

56 surviving_entity = specialization[0]["@id"] 

57 

58 for derived in derived_from: 

59 derived_snapshot = derived["@id"] 

60 derived_entity = extract_entity_from_snapshot(derived_snapshot) 

61 

62 if derived_entity != surviving_entity: 

63 results.append((surviving_entity, derived_entity)) 

64 

65 return results 

66 

67 

68def build_merge_graph( 

69 merge_results: list[tuple[str, str]], 

70) -> dict[str, str]: 

71 merged_to_surviving: dict[str, str] = {} 

72 

73 for surviving, merged in merge_results: 

74 merged_to_surviving[merged] = surviving 

75 

76 return merged_to_surviving 

77 

78 

79def find_final_surviving(entity: str, merged_to_surviving: dict[str, str]) -> str: 

80 current = entity 

81 visited = {entity} 

82 

83 while current in merged_to_surviving: 

84 next_entity = merged_to_surviving[current] 

85 if next_entity in visited: 

86 break 

87 visited.add(next_entity) 

88 current = next_entity 

89 

90 return current 

91 

92 

93def group_by_final_surviving( 

94 merged_to_surviving: dict[str, str], 

95) -> dict[str, list[str]]: 

96 final_to_merged: dict[str, list[str]] = defaultdict(list) 

97 

98 for merged_entity in merged_to_surviving.keys(): 

99 final = find_final_surviving(merged_entity, merged_to_surviving) 

100 final_to_merged[final].append(merged_entity) 

101 

102 return dict(final_to_merged) 

103 

104 

105def main(): 

106 parser = argparse.ArgumentParser( 

107 description="Find all merged entities and reconstruct merge chains from provenance files", 

108 formatter_class=RichHelpFormatter, 

109 ) 

110 parser.add_argument( 

111 "-c", "--config", required=True, help="Path to meta configuration YAML file" 

112 ) 

113 parser.add_argument( 

114 "-o", "--output", required=True, help="Output CSV file path" 

115 ) 

116 parser.add_argument( 

117 "--entity-type", 

118 choices=["br", "ra", "id", "ar", "re"], 

119 required=True, 

120 help="Entity type to search", 

121 ) 

122 parser.add_argument( 

123 "--workers", 

124 type=int, 

125 default=4, 

126 help="Number of parallel workers", 

127 ) 

128 args = parser.parse_args() 

129 

130 with open(args.config) as f: 

131 config = yaml.safe_load(f) 

132 

133 rdf_dir = os.path.join(config["output_rdf_dir"], "rdf") 

134 

135 console.print(f"Scanning for provenance files in: {rdf_dir}/{args.entity_type}") 

136 prov_files = find_prov_files(rdf_dir, args.entity_type) 

137 console.print(f"Found {len(prov_files)} provenance files") 

138 

139 all_results: list[tuple[str, str]] = [] 

140 

141 with ProcessPoolExecutor(max_workers=args.workers) as executor: 

142 futures = {executor.submit(process_prov_file, f): f for f in prov_files} 

143 

144 for future in tqdm(as_completed(futures), total=len(futures), desc="Processing files"): 

145 results = future.result() 

146 all_results.extend(results) 

147 

148 console.print(f"Found {len(all_results)} merge derivations") 

149 

150 merged_to_surviving = build_merge_graph(all_results) 

151 console.print(f"Found {len(merged_to_surviving)} merged entities") 

152 

153 final_to_merged = group_by_final_surviving(merged_to_surviving) 

154 console.print(f"Found {len(final_to_merged)} surviving entities") 

155 

156 with open(args.output, "w", newline="") as f: 

157 writer = csv.DictWriter(f, fieldnames=["surviving_entity", "merged_entities"]) 

158 writer.writeheader() 

159 for surviving, merged_list in final_to_merged.items(): 

160 writer.writerow({ 

161 "surviving_entity": surviving, 

162 "merged_entities": "; ".join(merged_list), 

163 }) 

164 

165 console.print(f"Output written to: {args.output}") 

166 

167 

168if __name__ == "__main__": 

169 main()