Coverage for oc_meta / run / find / hasnext_anomalies.py: 0%
179 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5from __future__ import annotations
7import argparse
8import os
9from collections import Counter
10from datetime import datetime, timezone
11import multiprocessing
12from typing import Dict, List, Optional, Tuple
14import orjson
15import yaml
16from rich_argparse import RichHelpFormatter
18from oc_meta.lib.console import create_progress
19from oc_meta.lib.file_manager import collect_zip_files
20from oc_meta.run.meta.generate_csv import find_file, load_json_from_file
22ROLE_MAP = {
23 "http://purl.org/spar/pro/author": "author",
24 "http://purl.org/spar/pro/editor": "editor",
25 "http://purl.org/spar/pro/publisher": "publisher",
26}
27HAS_NEXT = "https://w3id.org/oc/ontology/hasNext"
28IS_DOC_CONTEXT_FOR = "http://purl.org/spar/pro/isDocumentContextFor"
29WITH_ROLE = "http://purl.org/spar/pro/withRole"
30IS_HELD_BY = "http://purl.org/spar/pro/isHeldBy"
32_worker_config: Optional[Tuple[str, int, int]] = None
35def _init_worker(rdf_dir: str, dir_split_number: int, items_per_file: int) -> None:
36 global _worker_config
37 _worker_config = (rdf_dir, dir_split_number, items_per_file)
40def _ar_summary(ar_uri: str, info: dict) -> dict:
41 return {
42 "ar": ar_uri,
43 "ra": info["ra"],
44 "has_next": info["has_next"],
45 }
48def load_ar_data(
49 ar_uri: str, rdf_dir: str, dir_split_number: int, items_per_file: int
50) -> Optional[dict]:
51 ar_file = find_file(rdf_dir, dir_split_number, items_per_file, ar_uri)
52 if not ar_file:
53 return None
54 data = load_json_from_file(ar_file)
55 for graph in data:
56 for entity in graph.get("@graph", []):
57 if entity["@id"] == ar_uri:
58 role_uri = ""
59 if WITH_ROLE in entity:
60 role_uri = entity[WITH_ROLE][0]["@id"]
61 role_type = ROLE_MAP.get(role_uri, "unknown")
63 ra_uri = None
64 if IS_HELD_BY in entity:
65 ra_uri = entity[IS_HELD_BY][0]["@id"]
67 has_next = []
68 if HAS_NEXT in entity:
69 has_next = [item["@id"] for item in entity[HAS_NEXT]]
71 return {
72 "role_type": role_type,
73 "ra": ra_uri,
74 "has_next": has_next,
75 }
76 return None
79def detect_cycles(ar_data: Dict[str, dict], ar_uris_in_group: set) -> List[List[str]]:
80 adj: Dict[str, List[str]] = {}
81 for ar_uri, info in ar_data.items():
82 targets = [
83 t for t in info["has_next"] if t in ar_uris_in_group and t != ar_uri
84 ]
85 if targets:
86 adj[ar_uri] = targets
88 globally_visited: set = set()
89 cycles: List[List[str]] = []
91 for start in ar_uris_in_group:
92 if start in globally_visited:
93 continue
95 path: List[str] = []
96 path_set: set = set()
97 stack: List[Tuple[str, int]] = [(start, -1)]
99 while stack:
100 node, ni = stack[-1]
102 if ni == -1:
103 if node in path_set:
104 cycle_start = path.index(node)
105 cycles.append(list(path[cycle_start:]))
106 stack.pop()
107 continue
108 if node in globally_visited:
109 stack.pop()
110 continue
111 path.append(node)
112 path_set.add(node)
113 stack[-1] = (node, 0)
114 continue
116 neighbors = adj.get(node, [])
117 if ni < len(neighbors):
118 stack[-1] = (node, ni + 1)
119 stack.append((neighbors[ni], -1))
120 else:
121 path.pop()
122 path_set.discard(node)
123 globally_visited.add(node)
124 stack.pop()
126 return cycles
129def find_anomalies(
130 br_uri: str, role_type: str, ar_data: Dict[str, dict]
131) -> List[dict]:
132 anomalies: List[dict] = []
133 ar_uris_in_group = set(ar_data.keys())
135 for ar_uri, info in ar_data.items():
136 if ar_uri in info["has_next"]:
137 anomalies.append({
138 "anomaly_type": "self_loop",
139 "br": br_uri,
140 "role_type": role_type,
141 "ars_involved": [_ar_summary(ar_uri, info)],
142 "details": f"AR {ar_uri.split('/')[-1]} hasNext points to itself",
143 })
145 for ar_uri, info in ar_data.items():
146 if len(info["has_next"]) > 1:
147 anomalies.append({
148 "anomaly_type": "multiple_has_next",
149 "br": br_uri,
150 "role_type": role_type,
151 "ars_involved": [_ar_summary(ar_uri, info)],
152 "details": (
153 f"AR {ar_uri.split('/')[-1]} has"
154 f" {len(info['has_next'])} hasNext targets"
155 ),
156 })
158 for ar_uri, info in ar_data.items():
159 for target in info["has_next"]:
160 if target not in ar_uris_in_group:
161 anomalies.append({
162 "anomaly_type": "dangling_has_next",
163 "br": br_uri,
164 "role_type": role_type,
165 "ars_involved": [_ar_summary(ar_uri, info)],
166 "details": (
167 f"AR {ar_uri.split('/')[-1]} hasNext points to"
168 f" {target.split('/')[-1]} which is not in this"
169 " BR/role group"
170 ),
171 })
173 referenced_ars = set()
174 for info in ar_data.values():
175 for target in info["has_next"]:
176 if target in ar_uris_in_group:
177 referenced_ars.add(target)
179 start_nodes = [ar for ar in ar_uris_in_group if ar not in referenced_ars]
181 if len(ar_data) > 1:
182 if len(start_nodes) == 0:
183 anomalies.append({
184 "anomaly_type": "no_start_node",
185 "br": br_uri,
186 "role_type": role_type,
187 "ars_involved": [
188 _ar_summary(ar_uri, ar_data[ar_uri]) for ar_uri in ar_data
189 ],
190 "details": (
191 f"All {len(ar_data)} ARs are targets of hasNext"
192 " (fully circular)"
193 ),
194 })
195 elif len(start_nodes) > 1:
196 anomalies.append({
197 "anomaly_type": "multiple_start_nodes",
198 "br": br_uri,
199 "role_type": role_type,
200 "ars_involved": [
201 _ar_summary(ar_uri, ar_data[ar_uri]) for ar_uri in start_nodes
202 ],
203 "details": (
204 f"{len(start_nodes)} ARs have no incoming hasNext"
205 " (disconnected fragments)"
206 ),
207 })
209 cycles = detect_cycles(ar_data, ar_uris_in_group)
210 for cycle in cycles:
211 cycle_ids = [uri.split("/")[-1] for uri in cycle]
212 anomalies.append({
213 "anomaly_type": "cycle",
214 "br": br_uri,
215 "role_type": role_type,
216 "ars_involved": [
217 _ar_summary(ar_uri, ar_data[ar_uri]) for ar_uri in cycle
218 ],
219 "details": (
220 f"{len(cycle)}-node cycle:"
221 f" {' -> '.join(cycle_ids)} -> {cycle_ids[0]}"
222 ),
223 })
225 return anomalies
228def _detect_anomalies_in_file(filepath: str) -> Tuple[str, int, List[dict]]:
229 assert _worker_config is not None
230 rdf_dir, dir_split_number, items_per_file = _worker_config
231 anomalies: List[dict] = []
232 br_count = 0
233 data = load_json_from_file(filepath)
234 for graph in data:
235 for entity in graph.get("@graph", []):
236 if IS_DOC_CONTEXT_FOR not in entity:
237 continue
238 br_count += 1
239 br_uri = entity["@id"]
241 ar_uris = [ar["@id"] for ar in entity[IS_DOC_CONTEXT_FOR]]
242 ar_data: Dict[str, dict] = {}
243 for ar_uri in ar_uris:
244 info = load_ar_data(
245 ar_uri, rdf_dir, dir_split_number, items_per_file
246 )
247 if info:
248 ar_data[ar_uri] = info
250 role_groups: Dict[str, Dict[str, dict]] = {}
251 for ar_uri, info in ar_data.items():
252 role = info["role_type"]
253 if role not in role_groups:
254 role_groups[role] = {}
255 role_groups[role][ar_uri] = info
257 for role_type, group in role_groups.items():
258 anomalies.extend(find_anomalies(br_uri, role_type, group))
260 return (filepath, br_count, anomalies)
263def main() -> None:
264 parser = argparse.ArgumentParser(
265 description="Detect hasNext chain anomalies in RDF data",
266 formatter_class=RichHelpFormatter,
267 )
268 parser.add_argument(
269 "-c", "--config", required=True, help="Meta config YAML file path"
270 )
271 parser.add_argument(
272 "-o", "--output", required=True, help="Output JSON report path"
273 )
274 parser.add_argument(
275 "--workers",
276 type=int,
277 default=4,
278 help="Number of parallel workers (default: 4)",
279 )
280 args = parser.parse_args()
282 with open(args.config, encoding="utf-8") as f:
283 settings = yaml.safe_load(f)
285 rdf_dir = os.path.join(settings["output_rdf_dir"], "rdf")
286 dir_split_number = settings["dir_split_number"]
287 items_per_file = settings["items_per_file"]
289 br_dir = os.path.join(rdf_dir, "br")
290 if not os.path.exists(br_dir):
291 print(f"Error: BR directory not found at {br_dir}")
292 return
294 all_files = collect_zip_files(br_dir, only_data=True)
296 if not all_files:
297 print("No BR zip files found")
298 return
300 print(
301 f"Processing {len(all_files)} BR files with {args.workers} workers..."
302 )
304 total_brs = 0
305 all_anomalies: List[dict] = []
307 # Use forkserver to avoid deadlocks when forking in a multi-threaded environment
308 ctx = multiprocessing.get_context('forkserver')
309 with ctx.Pool(
310 args.workers,
311 _init_worker,
312 (rdf_dir, dir_split_number, items_per_file),
313 ) as pool:
314 with create_progress() as progress:
315 task = progress.add_task(
316 "Scanning for anomalies", total=len(all_files)
317 )
318 for filepath, br_count, anomalies in pool.imap_unordered(
319 _detect_anomalies_in_file, all_files
320 ):
321 total_brs += br_count
322 all_anomalies.extend(anomalies)
323 progress.update(task, advance=1)
325 anomalies_by_type = dict(
326 Counter(a["anomaly_type"] for a in all_anomalies)
327 )
329 report = {
330 "config": os.path.abspath(args.config),
331 "rdf_dir": rdf_dir,
332 "timestamp": datetime.now(timezone.utc).isoformat(),
333 "total_brs_analyzed": total_brs,
334 "total_anomalies": len(all_anomalies),
335 "anomalies_by_type": anomalies_by_type,
336 "anomalies": all_anomalies,
337 }
339 output_dir = os.path.dirname(os.path.abspath(args.output))
340 os.makedirs(output_dir, exist_ok=True)
341 with open(args.output, "wb") as f:
342 f.write(orjson.dumps(report, option=orjson.OPT_INDENT_2))
344 print(f"Analyzed {total_brs} BRs, found {len(all_anomalies)} anomalies")
345 for atype, count in sorted(anomalies_by_type.items()):
346 print(f" {atype}: {count}")
347 print(f"Report saved to {args.output}")
350if __name__ == "__main__":
351 main()