Coverage for oc_meta / run / find / hasnext_anomalies.py: 0%

182 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-03 17:25 +0000

1from __future__ import annotations 

2 

3import argparse 

4import json 

5import os 

6from collections import Counter 

7from datetime import datetime, timezone 

8from multiprocessing import Pool 

9from typing import Dict, List, Optional, Tuple 

10 

11import yaml 

12from rich.progress import (BarColumn, MofNCompleteColumn, Progress, 

13 SpinnerColumn, TextColumn, TimeElapsedColumn, 

14 TimeRemainingColumn) 

15from rich_argparse import RichHelpFormatter 

16 

17from oc_meta.run.meta.generate_csv import find_file, load_json_from_file 

18 

19ROLE_MAP = { 

20 "http://purl.org/spar/pro/author": "author", 

21 "http://purl.org/spar/pro/editor": "editor", 

22 "http://purl.org/spar/pro/publisher": "publisher", 

23} 

24HAS_NEXT = "https://w3id.org/oc/ontology/hasNext" 

25IS_DOC_CONTEXT_FOR = "http://purl.org/spar/pro/isDocumentContextFor" 

26WITH_ROLE = "http://purl.org/spar/pro/withRole" 

27IS_HELD_BY = "http://purl.org/spar/pro/isHeldBy" 

28 

29_worker_config: Optional[Tuple[str, int, int]] = None 

30 

31 

32def _init_worker(rdf_dir: str, dir_split_number: int, items_per_file: int) -> None: 

33 global _worker_config 

34 _worker_config = (rdf_dir, dir_split_number, items_per_file) 

35 

36 

37def _ar_summary(ar_uri: str, info: dict) -> dict: 

38 return { 

39 "ar": ar_uri, 

40 "ra": info["ra"], 

41 "has_next": info["has_next"], 

42 } 

43 

44 

45def load_ar_data( 

46 ar_uri: str, rdf_dir: str, dir_split_number: int, items_per_file: int 

47) -> Optional[dict]: 

48 ar_file = find_file(rdf_dir, dir_split_number, items_per_file, ar_uri) 

49 if not ar_file: 

50 return None 

51 data = load_json_from_file(ar_file) 

52 for graph in data: 

53 for entity in graph.get("@graph", []): 

54 if entity["@id"] == ar_uri: 

55 role_uri = "" 

56 if WITH_ROLE in entity: 

57 role_uri = entity[WITH_ROLE][0]["@id"] 

58 role_type = ROLE_MAP.get(role_uri, "unknown") 

59 

60 ra_uri = None 

61 if IS_HELD_BY in entity: 

62 ra_uri = entity[IS_HELD_BY][0]["@id"] 

63 

64 has_next = [] 

65 if HAS_NEXT in entity: 

66 has_next = [item["@id"] for item in entity[HAS_NEXT]] 

67 

68 return { 

69 "role_type": role_type, 

70 "ra": ra_uri, 

71 "has_next": has_next, 

72 } 

73 return None 

74 

75 

76def detect_cycles(ar_data: Dict[str, dict], ar_uris_in_group: set) -> List[List[str]]: 

77 adj: Dict[str, List[str]] = {} 

78 for ar_uri, info in ar_data.items(): 

79 targets = [ 

80 t for t in info["has_next"] if t in ar_uris_in_group and t != ar_uri 

81 ] 

82 if targets: 

83 adj[ar_uri] = targets 

84 

85 globally_visited: set = set() 

86 cycles: List[List[str]] = [] 

87 

88 for start in ar_uris_in_group: 

89 if start in globally_visited: 

90 continue 

91 

92 path: List[str] = [] 

93 path_set: set = set() 

94 stack: List[Tuple[str, int]] = [(start, -1)] 

95 

96 while stack: 

97 node, ni = stack[-1] 

98 

99 if ni == -1: 

100 if node in path_set: 

101 cycle_start = path.index(node) 

102 cycles.append(list(path[cycle_start:])) 

103 stack.pop() 

104 continue 

105 if node in globally_visited: 

106 stack.pop() 

107 continue 

108 path.append(node) 

109 path_set.add(node) 

110 stack[-1] = (node, 0) 

111 continue 

112 

113 neighbors = adj.get(node, []) 

114 if ni < len(neighbors): 

115 stack[-1] = (node, ni + 1) 

116 stack.append((neighbors[ni], -1)) 

117 else: 

118 path.pop() 

119 path_set.discard(node) 

120 globally_visited.add(node) 

121 stack.pop() 

122 

123 return cycles 

124 

125 

126def find_anomalies( 

127 br_uri: str, role_type: str, ar_data: Dict[str, dict] 

128) -> List[dict]: 

129 anomalies: List[dict] = [] 

130 ar_uris_in_group = set(ar_data.keys()) 

131 

132 for ar_uri, info in ar_data.items(): 

133 if ar_uri in info["has_next"]: 

134 anomalies.append({ 

135 "anomaly_type": "self_loop", 

136 "br": br_uri, 

137 "role_type": role_type, 

138 "ars_involved": [_ar_summary(ar_uri, info)], 

139 "details": f"AR {ar_uri.split('/')[-1]} hasNext points to itself", 

140 }) 

141 

142 for ar_uri, info in ar_data.items(): 

143 if len(info["has_next"]) > 1: 

144 anomalies.append({ 

145 "anomaly_type": "multiple_has_next", 

146 "br": br_uri, 

147 "role_type": role_type, 

148 "ars_involved": [_ar_summary(ar_uri, info)], 

149 "details": ( 

150 f"AR {ar_uri.split('/')[-1]} has" 

151 f" {len(info['has_next'])} hasNext targets" 

152 ), 

153 }) 

154 

155 for ar_uri, info in ar_data.items(): 

156 for target in info["has_next"]: 

157 if target not in ar_uris_in_group: 

158 anomalies.append({ 

159 "anomaly_type": "dangling_has_next", 

160 "br": br_uri, 

161 "role_type": role_type, 

162 "ars_involved": [_ar_summary(ar_uri, info)], 

163 "details": ( 

164 f"AR {ar_uri.split('/')[-1]} hasNext points to" 

165 f" {target.split('/')[-1]} which is not in this" 

166 " BR/role group" 

167 ), 

168 }) 

169 

170 referenced_ars = set() 

171 for info in ar_data.values(): 

172 for target in info["has_next"]: 

173 if target in ar_uris_in_group: 

174 referenced_ars.add(target) 

175 

176 start_nodes = [ar for ar in ar_uris_in_group if ar not in referenced_ars] 

177 

178 if len(ar_data) > 1: 

179 if len(start_nodes) == 0: 

180 anomalies.append({ 

181 "anomaly_type": "no_start_node", 

182 "br": br_uri, 

183 "role_type": role_type, 

184 "ars_involved": [ 

185 _ar_summary(ar_uri, ar_data[ar_uri]) for ar_uri in ar_data 

186 ], 

187 "details": ( 

188 f"All {len(ar_data)} ARs are targets of hasNext" 

189 " (fully circular)" 

190 ), 

191 }) 

192 elif len(start_nodes) > 1: 

193 anomalies.append({ 

194 "anomaly_type": "multiple_start_nodes", 

195 "br": br_uri, 

196 "role_type": role_type, 

197 "ars_involved": [ 

198 _ar_summary(ar_uri, ar_data[ar_uri]) for ar_uri in start_nodes 

199 ], 

200 "details": ( 

201 f"{len(start_nodes)} ARs have no incoming hasNext" 

202 " (disconnected fragments)" 

203 ), 

204 }) 

205 

206 cycles = detect_cycles(ar_data, ar_uris_in_group) 

207 for cycle in cycles: 

208 cycle_ids = [uri.split("/")[-1] for uri in cycle] 

209 anomalies.append({ 

210 "anomaly_type": "cycle", 

211 "br": br_uri, 

212 "role_type": role_type, 

213 "ars_involved": [ 

214 _ar_summary(ar_uri, ar_data[ar_uri]) for ar_uri in cycle 

215 ], 

216 "details": ( 

217 f"{len(cycle)}-node cycle:" 

218 f" {' -> '.join(cycle_ids)} -> {cycle_ids[0]}" 

219 ), 

220 }) 

221 

222 return anomalies 

223 

224 

225def _detect_anomalies_in_file(filepath: str) -> Tuple[str, int, List[dict]]: 

226 assert _worker_config is not None 

227 rdf_dir, dir_split_number, items_per_file = _worker_config 

228 anomalies: List[dict] = [] 

229 br_count = 0 

230 data = load_json_from_file(filepath) 

231 for graph in data: 

232 for entity in graph.get("@graph", []): 

233 if IS_DOC_CONTEXT_FOR not in entity: 

234 continue 

235 br_count += 1 

236 br_uri = entity["@id"] 

237 

238 ar_uris = [ar["@id"] for ar in entity[IS_DOC_CONTEXT_FOR]] 

239 ar_data: Dict[str, dict] = {} 

240 for ar_uri in ar_uris: 

241 info = load_ar_data( 

242 ar_uri, rdf_dir, dir_split_number, items_per_file 

243 ) 

244 if info: 

245 ar_data[ar_uri] = info 

246 

247 role_groups: Dict[str, Dict[str, dict]] = {} 

248 for ar_uri, info in ar_data.items(): 

249 role = info["role_type"] 

250 if role not in role_groups: 

251 role_groups[role] = {} 

252 role_groups[role][ar_uri] = info 

253 

254 for role_type, group in role_groups.items(): 

255 anomalies.extend(find_anomalies(br_uri, role_type, group)) 

256 

257 return (filepath, br_count, anomalies) 

258 

259 

260def main() -> None: 

261 parser = argparse.ArgumentParser( 

262 description="Detect hasNext chain anomalies in RDF data", 

263 formatter_class=RichHelpFormatter, 

264 ) 

265 parser.add_argument( 

266 "-c", "--config", required=True, help="Meta config YAML file path" 

267 ) 

268 parser.add_argument( 

269 "-o", "--output", required=True, help="Output JSON report path" 

270 ) 

271 parser.add_argument( 

272 "--workers", 

273 type=int, 

274 default=4, 

275 help="Number of parallel workers (default: 4)", 

276 ) 

277 args = parser.parse_args() 

278 

279 with open(args.config, encoding="utf-8") as f: 

280 settings = yaml.safe_load(f) 

281 

282 rdf_dir = os.path.join(settings["output_rdf_dir"], "rdf") 

283 dir_split_number = settings["dir_split_number"] 

284 items_per_file = settings["items_per_file"] 

285 

286 br_dir = os.path.join(rdf_dir, "br") 

287 if not os.path.exists(br_dir): 

288 print(f"Error: BR directory not found at {br_dir}") 

289 return 

290 

291 all_files = [] 

292 for root, _, files in os.walk(br_dir): 

293 if "prov" in root: 

294 continue 

295 all_files.extend( 

296 os.path.join(root, f) for f in files if f.endswith(".zip") 

297 ) 

298 all_files = sorted(all_files) 

299 

300 if not all_files: 

301 print("No BR zip files found") 

302 return 

303 

304 print( 

305 f"Processing {len(all_files)} BR files with {args.workers} workers..." 

306 ) 

307 

308 total_brs = 0 

309 all_anomalies: List[dict] = [] 

310 

311 with Pool( 

312 args.workers, 

313 _init_worker, 

314 (rdf_dir, dir_split_number, items_per_file), 

315 ) as pool: 

316 with Progress( 

317 SpinnerColumn(), 

318 TextColumn("[progress.description]{task.description}"), 

319 BarColumn(), 

320 MofNCompleteColumn(), 

321 TimeElapsedColumn(), 

322 TimeRemainingColumn(), 

323 ) as progress: 

324 task = progress.add_task( 

325 "Scanning for anomalies", total=len(all_files) 

326 ) 

327 for filepath, br_count, anomalies in pool.imap_unordered( 

328 _detect_anomalies_in_file, all_files 

329 ): 

330 total_brs += br_count 

331 all_anomalies.extend(anomalies) 

332 progress.update(task, advance=1) 

333 

334 anomalies_by_type = dict( 

335 Counter(a["anomaly_type"] for a in all_anomalies) 

336 ) 

337 

338 report = { 

339 "config": os.path.abspath(args.config), 

340 "rdf_dir": rdf_dir, 

341 "timestamp": datetime.now(timezone.utc).isoformat(), 

342 "total_brs_analyzed": total_brs, 

343 "total_anomalies": len(all_anomalies), 

344 "anomalies_by_type": anomalies_by_type, 

345 "anomalies": all_anomalies, 

346 } 

347 

348 output_dir = os.path.dirname(os.path.abspath(args.output)) 

349 os.makedirs(output_dir, exist_ok=True) 

350 with open(args.output, "w", encoding="utf-8") as f: 

351 json.dump(report, f, indent=2, ensure_ascii=False) 

352 

353 print(f"Analyzed {total_brs} BRs, found {len(all_anomalies)} anomalies") 

354 for atype, count in sorted(anomalies_by_type.items()): 

355 print(f" {atype}: {count}") 

356 print(f"Report saved to {args.output}") 

357 

358 

359if __name__ == "__main__": 

360 main()