Coverage for oc_meta / run / find / hasnext_anomalies.py: 0%

179 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-21 09:24 +0000

1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5from __future__ import annotations 

6 

7import argparse 

8import os 

9from collections import Counter 

10from datetime import datetime, timezone 

11import multiprocessing 

12from typing import Dict, List, Optional, Tuple 

13 

14import orjson 

15import yaml 

16from rich_argparse import RichHelpFormatter 

17 

18from oc_meta.lib.console import create_progress 

19from oc_meta.lib.file_manager import collect_zip_files 

20from oc_meta.run.meta.generate_csv import find_file, load_json_from_file 

21 

22ROLE_MAP = { 

23 "http://purl.org/spar/pro/author": "author", 

24 "http://purl.org/spar/pro/editor": "editor", 

25 "http://purl.org/spar/pro/publisher": "publisher", 

26} 

27HAS_NEXT = "https://w3id.org/oc/ontology/hasNext" 

28IS_DOC_CONTEXT_FOR = "http://purl.org/spar/pro/isDocumentContextFor" 

29WITH_ROLE = "http://purl.org/spar/pro/withRole" 

30IS_HELD_BY = "http://purl.org/spar/pro/isHeldBy" 

31 

32_worker_config: Optional[Tuple[str, int, int]] = None 

33 

34 

35def _init_worker(rdf_dir: str, dir_split_number: int, items_per_file: int) -> None: 

36 global _worker_config 

37 _worker_config = (rdf_dir, dir_split_number, items_per_file) 

38 

39 

40def _ar_summary(ar_uri: str, info: dict) -> dict: 

41 return { 

42 "ar": ar_uri, 

43 "ra": info["ra"], 

44 "has_next": info["has_next"], 

45 } 

46 

47 

48def load_ar_data( 

49 ar_uri: str, rdf_dir: str, dir_split_number: int, items_per_file: int 

50) -> Optional[dict]: 

51 ar_file = find_file(rdf_dir, dir_split_number, items_per_file, ar_uri) 

52 if not ar_file: 

53 return None 

54 data = load_json_from_file(ar_file) 

55 for graph in data: 

56 for entity in graph.get("@graph", []): 

57 if entity["@id"] == ar_uri: 

58 role_uri = "" 

59 if WITH_ROLE in entity: 

60 role_uri = entity[WITH_ROLE][0]["@id"] 

61 role_type = ROLE_MAP.get(role_uri, "unknown") 

62 

63 ra_uri = None 

64 if IS_HELD_BY in entity: 

65 ra_uri = entity[IS_HELD_BY][0]["@id"] 

66 

67 has_next = [] 

68 if HAS_NEXT in entity: 

69 has_next = [item["@id"] for item in entity[HAS_NEXT]] 

70 

71 return { 

72 "role_type": role_type, 

73 "ra": ra_uri, 

74 "has_next": has_next, 

75 } 

76 return None 

77 

78 

79def detect_cycles(ar_data: Dict[str, dict], ar_uris_in_group: set) -> List[List[str]]: 

80 adj: Dict[str, List[str]] = {} 

81 for ar_uri, info in ar_data.items(): 

82 targets = [ 

83 t for t in info["has_next"] if t in ar_uris_in_group and t != ar_uri 

84 ] 

85 if targets: 

86 adj[ar_uri] = targets 

87 

88 globally_visited: set = set() 

89 cycles: List[List[str]] = [] 

90 

91 for start in ar_uris_in_group: 

92 if start in globally_visited: 

93 continue 

94 

95 path: List[str] = [] 

96 path_set: set = set() 

97 stack: List[Tuple[str, int]] = [(start, -1)] 

98 

99 while stack: 

100 node, ni = stack[-1] 

101 

102 if ni == -1: 

103 if node in path_set: 

104 cycle_start = path.index(node) 

105 cycles.append(list(path[cycle_start:])) 

106 stack.pop() 

107 continue 

108 if node in globally_visited: 

109 stack.pop() 

110 continue 

111 path.append(node) 

112 path_set.add(node) 

113 stack[-1] = (node, 0) 

114 continue 

115 

116 neighbors = adj.get(node, []) 

117 if ni < len(neighbors): 

118 stack[-1] = (node, ni + 1) 

119 stack.append((neighbors[ni], -1)) 

120 else: 

121 path.pop() 

122 path_set.discard(node) 

123 globally_visited.add(node) 

124 stack.pop() 

125 

126 return cycles 

127 

128 

129def find_anomalies( 

130 br_uri: str, role_type: str, ar_data: Dict[str, dict] 

131) -> List[dict]: 

132 anomalies: List[dict] = [] 

133 ar_uris_in_group = set(ar_data.keys()) 

134 

135 for ar_uri, info in ar_data.items(): 

136 if ar_uri in info["has_next"]: 

137 anomalies.append({ 

138 "anomaly_type": "self_loop", 

139 "br": br_uri, 

140 "role_type": role_type, 

141 "ars_involved": [_ar_summary(ar_uri, info)], 

142 "details": f"AR {ar_uri.split('/')[-1]} hasNext points to itself", 

143 }) 

144 

145 for ar_uri, info in ar_data.items(): 

146 if len(info["has_next"]) > 1: 

147 anomalies.append({ 

148 "anomaly_type": "multiple_has_next", 

149 "br": br_uri, 

150 "role_type": role_type, 

151 "ars_involved": [_ar_summary(ar_uri, info)], 

152 "details": ( 

153 f"AR {ar_uri.split('/')[-1]} has" 

154 f" {len(info['has_next'])} hasNext targets" 

155 ), 

156 }) 

157 

158 for ar_uri, info in ar_data.items(): 

159 for target in info["has_next"]: 

160 if target not in ar_uris_in_group: 

161 anomalies.append({ 

162 "anomaly_type": "dangling_has_next", 

163 "br": br_uri, 

164 "role_type": role_type, 

165 "ars_involved": [_ar_summary(ar_uri, info)], 

166 "details": ( 

167 f"AR {ar_uri.split('/')[-1]} hasNext points to" 

168 f" {target.split('/')[-1]} which is not in this" 

169 " BR/role group" 

170 ), 

171 }) 

172 

173 referenced_ars = set() 

174 for info in ar_data.values(): 

175 for target in info["has_next"]: 

176 if target in ar_uris_in_group: 

177 referenced_ars.add(target) 

178 

179 start_nodes = [ar for ar in ar_uris_in_group if ar not in referenced_ars] 

180 

181 if len(ar_data) > 1: 

182 if len(start_nodes) == 0: 

183 anomalies.append({ 

184 "anomaly_type": "no_start_node", 

185 "br": br_uri, 

186 "role_type": role_type, 

187 "ars_involved": [ 

188 _ar_summary(ar_uri, ar_data[ar_uri]) for ar_uri in ar_data 

189 ], 

190 "details": ( 

191 f"All {len(ar_data)} ARs are targets of hasNext" 

192 " (fully circular)" 

193 ), 

194 }) 

195 elif len(start_nodes) > 1: 

196 anomalies.append({ 

197 "anomaly_type": "multiple_start_nodes", 

198 "br": br_uri, 

199 "role_type": role_type, 

200 "ars_involved": [ 

201 _ar_summary(ar_uri, ar_data[ar_uri]) for ar_uri in start_nodes 

202 ], 

203 "details": ( 

204 f"{len(start_nodes)} ARs have no incoming hasNext" 

205 " (disconnected fragments)" 

206 ), 

207 }) 

208 

209 cycles = detect_cycles(ar_data, ar_uris_in_group) 

210 for cycle in cycles: 

211 cycle_ids = [uri.split("/")[-1] for uri in cycle] 

212 anomalies.append({ 

213 "anomaly_type": "cycle", 

214 "br": br_uri, 

215 "role_type": role_type, 

216 "ars_involved": [ 

217 _ar_summary(ar_uri, ar_data[ar_uri]) for ar_uri in cycle 

218 ], 

219 "details": ( 

220 f"{len(cycle)}-node cycle:" 

221 f" {' -> '.join(cycle_ids)} -> {cycle_ids[0]}" 

222 ), 

223 }) 

224 

225 return anomalies 

226 

227 

228def _detect_anomalies_in_file(filepath: str) -> Tuple[str, int, List[dict]]: 

229 assert _worker_config is not None 

230 rdf_dir, dir_split_number, items_per_file = _worker_config 

231 anomalies: List[dict] = [] 

232 br_count = 0 

233 data = load_json_from_file(filepath) 

234 for graph in data: 

235 for entity in graph.get("@graph", []): 

236 if IS_DOC_CONTEXT_FOR not in entity: 

237 continue 

238 br_count += 1 

239 br_uri = entity["@id"] 

240 

241 ar_uris = [ar["@id"] for ar in entity[IS_DOC_CONTEXT_FOR]] 

242 ar_data: Dict[str, dict] = {} 

243 for ar_uri in ar_uris: 

244 info = load_ar_data( 

245 ar_uri, rdf_dir, dir_split_number, items_per_file 

246 ) 

247 if info: 

248 ar_data[ar_uri] = info 

249 

250 role_groups: Dict[str, Dict[str, dict]] = {} 

251 for ar_uri, info in ar_data.items(): 

252 role = info["role_type"] 

253 if role not in role_groups: 

254 role_groups[role] = {} 

255 role_groups[role][ar_uri] = info 

256 

257 for role_type, group in role_groups.items(): 

258 anomalies.extend(find_anomalies(br_uri, role_type, group)) 

259 

260 return (filepath, br_count, anomalies) 

261 

262 

263def main() -> None: 

264 parser = argparse.ArgumentParser( 

265 description="Detect hasNext chain anomalies in RDF data", 

266 formatter_class=RichHelpFormatter, 

267 ) 

268 parser.add_argument( 

269 "-c", "--config", required=True, help="Meta config YAML file path" 

270 ) 

271 parser.add_argument( 

272 "-o", "--output", required=True, help="Output JSON report path" 

273 ) 

274 parser.add_argument( 

275 "--workers", 

276 type=int, 

277 default=4, 

278 help="Number of parallel workers (default: 4)", 

279 ) 

280 args = parser.parse_args() 

281 

282 with open(args.config, encoding="utf-8") as f: 

283 settings = yaml.safe_load(f) 

284 

285 rdf_dir = os.path.join(settings["output_rdf_dir"], "rdf") 

286 dir_split_number = settings["dir_split_number"] 

287 items_per_file = settings["items_per_file"] 

288 

289 br_dir = os.path.join(rdf_dir, "br") 

290 if not os.path.exists(br_dir): 

291 print(f"Error: BR directory not found at {br_dir}") 

292 return 

293 

294 all_files = collect_zip_files(br_dir, only_data=True) 

295 

296 if not all_files: 

297 print("No BR zip files found") 

298 return 

299 

300 print( 

301 f"Processing {len(all_files)} BR files with {args.workers} workers..." 

302 ) 

303 

304 total_brs = 0 

305 all_anomalies: List[dict] = [] 

306 

307 # Use forkserver to avoid deadlocks when forking in a multi-threaded environment 

308 ctx = multiprocessing.get_context('forkserver') 

309 with ctx.Pool( 

310 args.workers, 

311 _init_worker, 

312 (rdf_dir, dir_split_number, items_per_file), 

313 ) as pool: 

314 with create_progress() as progress: 

315 task = progress.add_task( 

316 "Scanning for anomalies", total=len(all_files) 

317 ) 

318 for filepath, br_count, anomalies in pool.imap_unordered( 

319 _detect_anomalies_in_file, all_files 

320 ): 

321 total_brs += br_count 

322 all_anomalies.extend(anomalies) 

323 progress.update(task, advance=1) 

324 

325 anomalies_by_type = dict( 

326 Counter(a["anomaly_type"] for a in all_anomalies) 

327 ) 

328 

329 report = { 

330 "config": os.path.abspath(args.config), 

331 "rdf_dir": rdf_dir, 

332 "timestamp": datetime.now(timezone.utc).isoformat(), 

333 "total_brs_analyzed": total_brs, 

334 "total_anomalies": len(all_anomalies), 

335 "anomalies_by_type": anomalies_by_type, 

336 "anomalies": all_anomalies, 

337 } 

338 

339 output_dir = os.path.dirname(os.path.abspath(args.output)) 

340 os.makedirs(output_dir, exist_ok=True) 

341 with open(args.output, "wb") as f: 

342 f.write(orjson.dumps(report, option=orjson.OPT_INDENT_2)) 

343 

344 print(f"Analyzed {total_brs} BRs, found {len(all_anomalies)} anomalies") 

345 for atype, count in sorted(anomalies_by_type.items()): 

346 print(f" {atype}: {count}") 

347 print(f"Report saved to {args.output}") 

348 

349 

350if __name__ == "__main__": 

351 main()