Coverage for oc_meta / run / find / hasnext_anomalies.py: 0%
182 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 17:25 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 17:25 +0000
1from __future__ import annotations
3import argparse
4import json
5import os
6from collections import Counter
7from datetime import datetime, timezone
8from multiprocessing import Pool
9from typing import Dict, List, Optional, Tuple
11import yaml
12from rich.progress import (BarColumn, MofNCompleteColumn, Progress,
13 SpinnerColumn, TextColumn, TimeElapsedColumn,
14 TimeRemainingColumn)
15from rich_argparse import RichHelpFormatter
17from oc_meta.run.meta.generate_csv import find_file, load_json_from_file
19ROLE_MAP = {
20 "http://purl.org/spar/pro/author": "author",
21 "http://purl.org/spar/pro/editor": "editor",
22 "http://purl.org/spar/pro/publisher": "publisher",
23}
24HAS_NEXT = "https://w3id.org/oc/ontology/hasNext"
25IS_DOC_CONTEXT_FOR = "http://purl.org/spar/pro/isDocumentContextFor"
26WITH_ROLE = "http://purl.org/spar/pro/withRole"
27IS_HELD_BY = "http://purl.org/spar/pro/isHeldBy"
29_worker_config: Optional[Tuple[str, int, int]] = None
32def _init_worker(rdf_dir: str, dir_split_number: int, items_per_file: int) -> None:
33 global _worker_config
34 _worker_config = (rdf_dir, dir_split_number, items_per_file)
37def _ar_summary(ar_uri: str, info: dict) -> dict:
38 return {
39 "ar": ar_uri,
40 "ra": info["ra"],
41 "has_next": info["has_next"],
42 }
45def load_ar_data(
46 ar_uri: str, rdf_dir: str, dir_split_number: int, items_per_file: int
47) -> Optional[dict]:
48 ar_file = find_file(rdf_dir, dir_split_number, items_per_file, ar_uri)
49 if not ar_file:
50 return None
51 data = load_json_from_file(ar_file)
52 for graph in data:
53 for entity in graph.get("@graph", []):
54 if entity["@id"] == ar_uri:
55 role_uri = ""
56 if WITH_ROLE in entity:
57 role_uri = entity[WITH_ROLE][0]["@id"]
58 role_type = ROLE_MAP.get(role_uri, "unknown")
60 ra_uri = None
61 if IS_HELD_BY in entity:
62 ra_uri = entity[IS_HELD_BY][0]["@id"]
64 has_next = []
65 if HAS_NEXT in entity:
66 has_next = [item["@id"] for item in entity[HAS_NEXT]]
68 return {
69 "role_type": role_type,
70 "ra": ra_uri,
71 "has_next": has_next,
72 }
73 return None
76def detect_cycles(ar_data: Dict[str, dict], ar_uris_in_group: set) -> List[List[str]]:
77 adj: Dict[str, List[str]] = {}
78 for ar_uri, info in ar_data.items():
79 targets = [
80 t for t in info["has_next"] if t in ar_uris_in_group and t != ar_uri
81 ]
82 if targets:
83 adj[ar_uri] = targets
85 globally_visited: set = set()
86 cycles: List[List[str]] = []
88 for start in ar_uris_in_group:
89 if start in globally_visited:
90 continue
92 path: List[str] = []
93 path_set: set = set()
94 stack: List[Tuple[str, int]] = [(start, -1)]
96 while stack:
97 node, ni = stack[-1]
99 if ni == -1:
100 if node in path_set:
101 cycle_start = path.index(node)
102 cycles.append(list(path[cycle_start:]))
103 stack.pop()
104 continue
105 if node in globally_visited:
106 stack.pop()
107 continue
108 path.append(node)
109 path_set.add(node)
110 stack[-1] = (node, 0)
111 continue
113 neighbors = adj.get(node, [])
114 if ni < len(neighbors):
115 stack[-1] = (node, ni + 1)
116 stack.append((neighbors[ni], -1))
117 else:
118 path.pop()
119 path_set.discard(node)
120 globally_visited.add(node)
121 stack.pop()
123 return cycles
126def find_anomalies(
127 br_uri: str, role_type: str, ar_data: Dict[str, dict]
128) -> List[dict]:
129 anomalies: List[dict] = []
130 ar_uris_in_group = set(ar_data.keys())
132 for ar_uri, info in ar_data.items():
133 if ar_uri in info["has_next"]:
134 anomalies.append({
135 "anomaly_type": "self_loop",
136 "br": br_uri,
137 "role_type": role_type,
138 "ars_involved": [_ar_summary(ar_uri, info)],
139 "details": f"AR {ar_uri.split('/')[-1]} hasNext points to itself",
140 })
142 for ar_uri, info in ar_data.items():
143 if len(info["has_next"]) > 1:
144 anomalies.append({
145 "anomaly_type": "multiple_has_next",
146 "br": br_uri,
147 "role_type": role_type,
148 "ars_involved": [_ar_summary(ar_uri, info)],
149 "details": (
150 f"AR {ar_uri.split('/')[-1]} has"
151 f" {len(info['has_next'])} hasNext targets"
152 ),
153 })
155 for ar_uri, info in ar_data.items():
156 for target in info["has_next"]:
157 if target not in ar_uris_in_group:
158 anomalies.append({
159 "anomaly_type": "dangling_has_next",
160 "br": br_uri,
161 "role_type": role_type,
162 "ars_involved": [_ar_summary(ar_uri, info)],
163 "details": (
164 f"AR {ar_uri.split('/')[-1]} hasNext points to"
165 f" {target.split('/')[-1]} which is not in this"
166 " BR/role group"
167 ),
168 })
170 referenced_ars = set()
171 for info in ar_data.values():
172 for target in info["has_next"]:
173 if target in ar_uris_in_group:
174 referenced_ars.add(target)
176 start_nodes = [ar for ar in ar_uris_in_group if ar not in referenced_ars]
178 if len(ar_data) > 1:
179 if len(start_nodes) == 0:
180 anomalies.append({
181 "anomaly_type": "no_start_node",
182 "br": br_uri,
183 "role_type": role_type,
184 "ars_involved": [
185 _ar_summary(ar_uri, ar_data[ar_uri]) for ar_uri in ar_data
186 ],
187 "details": (
188 f"All {len(ar_data)} ARs are targets of hasNext"
189 " (fully circular)"
190 ),
191 })
192 elif len(start_nodes) > 1:
193 anomalies.append({
194 "anomaly_type": "multiple_start_nodes",
195 "br": br_uri,
196 "role_type": role_type,
197 "ars_involved": [
198 _ar_summary(ar_uri, ar_data[ar_uri]) for ar_uri in start_nodes
199 ],
200 "details": (
201 f"{len(start_nodes)} ARs have no incoming hasNext"
202 " (disconnected fragments)"
203 ),
204 })
206 cycles = detect_cycles(ar_data, ar_uris_in_group)
207 for cycle in cycles:
208 cycle_ids = [uri.split("/")[-1] for uri in cycle]
209 anomalies.append({
210 "anomaly_type": "cycle",
211 "br": br_uri,
212 "role_type": role_type,
213 "ars_involved": [
214 _ar_summary(ar_uri, ar_data[ar_uri]) for ar_uri in cycle
215 ],
216 "details": (
217 f"{len(cycle)}-node cycle:"
218 f" {' -> '.join(cycle_ids)} -> {cycle_ids[0]}"
219 ),
220 })
222 return anomalies
225def _detect_anomalies_in_file(filepath: str) -> Tuple[str, int, List[dict]]:
226 assert _worker_config is not None
227 rdf_dir, dir_split_number, items_per_file = _worker_config
228 anomalies: List[dict] = []
229 br_count = 0
230 data = load_json_from_file(filepath)
231 for graph in data:
232 for entity in graph.get("@graph", []):
233 if IS_DOC_CONTEXT_FOR not in entity:
234 continue
235 br_count += 1
236 br_uri = entity["@id"]
238 ar_uris = [ar["@id"] for ar in entity[IS_DOC_CONTEXT_FOR]]
239 ar_data: Dict[str, dict] = {}
240 for ar_uri in ar_uris:
241 info = load_ar_data(
242 ar_uri, rdf_dir, dir_split_number, items_per_file
243 )
244 if info:
245 ar_data[ar_uri] = info
247 role_groups: Dict[str, Dict[str, dict]] = {}
248 for ar_uri, info in ar_data.items():
249 role = info["role_type"]
250 if role not in role_groups:
251 role_groups[role] = {}
252 role_groups[role][ar_uri] = info
254 for role_type, group in role_groups.items():
255 anomalies.extend(find_anomalies(br_uri, role_type, group))
257 return (filepath, br_count, anomalies)
260def main() -> None:
261 parser = argparse.ArgumentParser(
262 description="Detect hasNext chain anomalies in RDF data",
263 formatter_class=RichHelpFormatter,
264 )
265 parser.add_argument(
266 "-c", "--config", required=True, help="Meta config YAML file path"
267 )
268 parser.add_argument(
269 "-o", "--output", required=True, help="Output JSON report path"
270 )
271 parser.add_argument(
272 "--workers",
273 type=int,
274 default=4,
275 help="Number of parallel workers (default: 4)",
276 )
277 args = parser.parse_args()
279 with open(args.config, encoding="utf-8") as f:
280 settings = yaml.safe_load(f)
282 rdf_dir = os.path.join(settings["output_rdf_dir"], "rdf")
283 dir_split_number = settings["dir_split_number"]
284 items_per_file = settings["items_per_file"]
286 br_dir = os.path.join(rdf_dir, "br")
287 if not os.path.exists(br_dir):
288 print(f"Error: BR directory not found at {br_dir}")
289 return
291 all_files = []
292 for root, _, files in os.walk(br_dir):
293 if "prov" in root:
294 continue
295 all_files.extend(
296 os.path.join(root, f) for f in files if f.endswith(".zip")
297 )
298 all_files = sorted(all_files)
300 if not all_files:
301 print("No BR zip files found")
302 return
304 print(
305 f"Processing {len(all_files)} BR files with {args.workers} workers..."
306 )
308 total_brs = 0
309 all_anomalies: List[dict] = []
311 with Pool(
312 args.workers,
313 _init_worker,
314 (rdf_dir, dir_split_number, items_per_file),
315 ) as pool:
316 with Progress(
317 SpinnerColumn(),
318 TextColumn("[progress.description]{task.description}"),
319 BarColumn(),
320 MofNCompleteColumn(),
321 TimeElapsedColumn(),
322 TimeRemainingColumn(),
323 ) as progress:
324 task = progress.add_task(
325 "Scanning for anomalies", total=len(all_files)
326 )
327 for filepath, br_count, anomalies in pool.imap_unordered(
328 _detect_anomalies_in_file, all_files
329 ):
330 total_brs += br_count
331 all_anomalies.extend(anomalies)
332 progress.update(task, advance=1)
334 anomalies_by_type = dict(
335 Counter(a["anomaly_type"] for a in all_anomalies)
336 )
338 report = {
339 "config": os.path.abspath(args.config),
340 "rdf_dir": rdf_dir,
341 "timestamp": datetime.now(timezone.utc).isoformat(),
342 "total_brs_analyzed": total_brs,
343 "total_anomalies": len(all_anomalies),
344 "anomalies_by_type": anomalies_by_type,
345 "anomalies": all_anomalies,
346 }
348 output_dir = os.path.dirname(os.path.abspath(args.output))
349 os.makedirs(output_dir, exist_ok=True)
350 with open(args.output, "w", encoding="utf-8") as f:
351 json.dump(report, f, indent=2, ensure_ascii=False)
353 print(f"Analyzed {total_brs} BRs, found {len(all_anomalies)} anomalies")
354 for atype, count in sorted(anomalies_by_type.items()):
355 print(f" {atype}: {count}")
356 print(f"Report saved to {args.output}")
359if __name__ == "__main__":
360 main()