Coverage for oc_meta / run / patches / has_next.py: 0%
456 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5from __future__ import annotations
7import argparse
8import csv
9import os
10import re
11import time
12import unicodedata
13import xml.etree.ElementTree as ET
14from collections import defaultdict
15from datetime import datetime, timezone
16from typing import Dict, List, Optional, Tuple
18import orjson
19import requests
20import yaml
21from oc_ocdm.graph import GraphSet
22from rdflib import URIRef
23from rich_argparse import RichHelpFormatter
25from oc_meta.core.editor import MetaEditor
26from oc_meta.lib.console import create_progress
27from oc_meta.run.meta.generate_csv import find_file, load_json_from_file
29HAS_IDENTIFIER = "http://purl.org/spar/datacite/hasIdentifier"
30USES_ID_SCHEME = "http://purl.org/spar/datacite/usesIdentifierScheme"
31HAS_LITERAL_VALUE = (
32 "http://www.essepuntato.it/2010/06/literalreification/hasLiteralValue"
33)
34IS_DOC_CONTEXT_FOR = "http://purl.org/spar/pro/isDocumentContextFor"
35WITH_ROLE = "http://purl.org/spar/pro/withRole"
36IS_HELD_BY = "http://purl.org/spar/pro/isHeldBy"
37HAS_NEXT = "https://w3id.org/oc/ontology/hasNext"
38FAMILY_NAME = "http://xmlns.com/foaf/0.1/familyName"
39GIVEN_NAME = "http://xmlns.com/foaf/0.1/givenName"
40FOAF_NAME = "http://xmlns.com/foaf/0.1/name"
42ROLE_MAP = {
43 "http://purl.org/spar/pro/author": "author",
44 "http://purl.org/spar/pro/editor": "editor",
45 "http://purl.org/spar/pro/publisher": "publisher",
46}
48CSV_COLUMNS = [
49 "id", "title", "author", "pub_date", "venue",
50 "volume", "issue", "page", "type", "publisher", "editor",
51]
53CROSSREF_BASE = "https://api.crossref.org/works/"
54DATACITE_BASE = "https://api.datacite.org/dois/"
55PUBMED_BASE = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi"
57SESSION = requests.Session()
58SESSION.headers.update(
59 {"User-Agent": "oc_meta_fixer/1.0 (mailto:arcangelo.massari@unibo.it)"}
60)
63def get_supplier_prefix(uri: str) -> str | None:
64 match = re.match(r"^(.+)/([a-z][a-z])/(0[1-9]+0)?([1-9][0-9]*)$", uri)
65 if match is None:
66 return None
67 return match.group(3)
70def extract_omid_number(uri: str) -> int:
71 return int(uri.split("/")[-1])
74def normalize_name(name: str) -> str:
75 if not name:
76 return ""
77 nfkd = unicodedata.normalize("NFKD", name)
78 return "".join(c for c in nfkd if not unicodedata.combining(c)).lower().strip()
81def normalize_orcid(orcid: str) -> str:
82 if not orcid:
83 return ""
84 return (
85 orcid.replace("https://orcid.org/", "")
86 .replace("http://orcid.org/", "")
87 .strip()
88 .upper()
89 )
92def find_entity_in_file(
93 uri: str, rdf_dir: str, dir_split: int, items_per_file: int
94) -> Optional[dict]:
95 filepath = find_file(rdf_dir, dir_split, items_per_file, uri)
96 if not filepath:
97 return None
98 data = load_json_from_file(filepath)
99 for graph in data:
100 for entity in graph["@graph"]:
101 if entity["@id"] == uri:
102 return entity
103 return None
106def load_br_identifiers(
107 br_uri: str, rdf_dir: str, dir_split: int, items_per_file: int
108) -> Dict[str, str]:
109 br_entity = find_entity_in_file(br_uri, rdf_dir, dir_split, items_per_file)
110 if not br_entity or HAS_IDENTIFIER not in br_entity:
111 return {}
112 result = {}
113 for id_ref in br_entity[HAS_IDENTIFIER]:
114 id_entity = find_entity_in_file(
115 id_ref["@id"], rdf_dir, dir_split, items_per_file
116 )
117 if not id_entity:
118 continue
119 if USES_ID_SCHEME not in id_entity or HAS_LITERAL_VALUE not in id_entity:
120 continue
121 scheme = id_entity[USES_ID_SCHEME][0]["@id"].split("/datacite/")[1]
122 value = id_entity[HAS_LITERAL_VALUE][0]["@value"]
123 result[scheme] = value
124 return result
127def load_ra_info(
128 ra_uri: str, rdf_dir: str, dir_split: int, items_per_file: int
129) -> dict:
130 ra_entity = find_entity_in_file(ra_uri, rdf_dir, dir_split, items_per_file)
131 if not ra_entity:
132 return {"family_name": None, "given_name": None, "name": None, "orcid": None}
133 family = None
134 given = None
135 name = None
136 orcid = None
137 if FAMILY_NAME in ra_entity:
138 family = ra_entity[FAMILY_NAME][0]["@value"]
139 if GIVEN_NAME in ra_entity:
140 given = ra_entity[GIVEN_NAME][0]["@value"]
141 if FOAF_NAME in ra_entity:
142 name = ra_entity[FOAF_NAME][0]["@value"]
143 if HAS_IDENTIFIER in ra_entity:
144 for id_ref in ra_entity[HAS_IDENTIFIER]:
145 id_entity = find_entity_in_file(
146 id_ref["@id"], rdf_dir, dir_split, items_per_file
147 )
148 if not id_entity:
149 continue
150 if USES_ID_SCHEME not in id_entity or HAS_LITERAL_VALUE not in id_entity:
151 continue
152 scheme = id_entity[USES_ID_SCHEME][0]["@id"].split("/datacite/")[1]
153 if scheme == "orcid":
154 orcid = id_entity[HAS_LITERAL_VALUE][0]["@value"]
155 break
156 return {"family_name": family, "given_name": given, "name": name, "orcid": orcid}
159def load_all_ars_for_br_role(
160 br_uri: str, role_type: str, rdf_dir: str, dir_split: int, items_per_file: int
161) -> List[dict]:
162 br_entity = find_entity_in_file(br_uri, rdf_dir, dir_split, items_per_file)
163 if not br_entity or IS_DOC_CONTEXT_FOR not in br_entity:
164 return []
165 result = []
166 for ar_ref in br_entity[IS_DOC_CONTEXT_FOR]:
167 ar_uri = ar_ref["@id"]
168 ar_entity = find_entity_in_file(ar_uri, rdf_dir, dir_split, items_per_file)
169 if not ar_entity or WITH_ROLE not in ar_entity:
170 continue
171 role_uri = ar_entity[WITH_ROLE][0]["@id"]
172 ar_role = ROLE_MAP.get(role_uri, "unknown")
173 if ar_role != role_type:
174 continue
175 ra_uri = None
176 if IS_HELD_BY in ar_entity:
177 ra_uri = ar_entity[IS_HELD_BY][0]["@id"]
178 has_next = []
179 if HAS_NEXT in ar_entity:
180 has_next = [item["@id"] for item in ar_entity[HAS_NEXT]]
181 ra_info = {"family_name": None, "given_name": None, "name": None, "orcid": None}
182 if ra_uri:
183 ra_info = load_ra_info(ra_uri, rdf_dir, dir_split, items_per_file)
184 ra_name = ""
185 if ra_info["family_name"] or ra_info["given_name"]:
186 parts = [ra_info["family_name"] or "", ra_info["given_name"] or ""]
187 ra_name = f"{parts[0]}, {parts[1]}"
188 elif ra_info["name"]:
189 ra_name = ra_info["name"]
190 result.append({
191 "ar": ar_uri,
192 "ra": ra_uri,
193 "ra_name": ra_name,
194 "ra_family": ra_info["family_name"],
195 "ra_given": ra_info["given_name"],
196 "ra_orcid": ra_info["orcid"],
197 "has_next": has_next,
198 })
199 return result
202def _strip_orcid_url(orcid: str) -> str:
203 if not orcid:
204 return orcid
205 return (
206 orcid.replace("https://orcid.org/", "")
207 .replace("http://orcid.org/", "")
208 )
211def fetch_crossref(doi: str) -> Optional[dict]:
212 resp = SESSION.get(CROSSREF_BASE + doi, timeout=30)
213 if resp.status_code == 404:
214 return None
215 resp.raise_for_status()
216 msg = resp.json()["message"]
217 authors = []
218 for i, a in enumerate(msg.get("author", [])):
219 authors.append({
220 "family": a.get("family", ""),
221 "given": a.get("given", ""),
222 "orcid": _strip_orcid_url(a.get("ORCID")),
223 "position": i,
224 })
225 editors = []
226 for i, e in enumerate(msg.get("editor", [])):
227 editors.append({
228 "family": e.get("family", ""),
229 "given": e.get("given", ""),
230 "orcid": _strip_orcid_url(e.get("ORCID")),
231 "position": i,
232 })
233 return {
234 "author": authors,
235 "editor": editors,
236 "publisher": msg.get("publisher", ""),
237 "publisher_crossref_id": msg.get("member"),
238 "source": "crossref",
239 }
242def fetch_datacite(doi: str) -> Optional[dict]:
243 resp = SESSION.get(DATACITE_BASE + doi, timeout=30)
244 if resp.status_code == 404:
245 return None
246 resp.raise_for_status()
247 attrs = resp.json()["data"]["attributes"]
248 authors = []
249 for i, c in enumerate(attrs.get("creators", [])):
250 orcid = None
251 for ni in c.get("nameIdentifiers", []):
252 if ni.get("nameIdentifierScheme", "").upper() == "ORCID":
253 orcid = _strip_orcid_url(ni["nameIdentifier"])
254 break
255 authors.append({
256 "family": c.get("familyName", ""),
257 "given": c.get("givenName", ""),
258 "orcid": orcid,
259 "position": i,
260 })
261 editors = []
262 editor_idx = 0
263 for c in attrs.get("contributors", []):
264 if c.get("contributorType") != "Editor":
265 continue
266 orcid = None
267 for ni in c.get("nameIdentifiers", []):
268 if ni.get("nameIdentifierScheme", "").upper() == "ORCID":
269 orcid = _strip_orcid_url(ni["nameIdentifier"])
270 break
271 editors.append({
272 "family": c.get("familyName", ""),
273 "given": c.get("givenName", ""),
274 "orcid": orcid,
275 "position": editor_idx,
276 })
277 editor_idx += 1
278 publisher = attrs.get("publisher", "")
279 if isinstance(publisher, dict):
280 publisher = publisher.get("name", "")
281 return {
282 "author": authors,
283 "editor": editors,
284 "publisher": publisher,
285 "publisher_crossref_id": None,
286 "source": "datacite",
287 }
290def fetch_pubmed(pmid: str) -> Optional[dict]:
291 params = {"db": "pubmed", "id": pmid, "rettype": "xml", "retmode": "xml"}
292 resp = SESSION.get(PUBMED_BASE, params=params, timeout=30)
293 resp.raise_for_status()
294 root = ET.fromstring(resp.content)
295 article = root.find(".//Article")
296 if article is None:
297 return None
298 authors = []
299 author_list = article.find("AuthorList")
300 if author_list is not None:
301 for i, author_elem in enumerate(author_list.findall("Author")):
302 orcid = None
303 for ident in author_elem.findall("Identifier"):
304 if ident.get("Source") == "ORCID" and ident.text is not None:
305 orcid = _strip_orcid_url(ident.text)
306 break
307 authors.append({
308 "family": author_elem.findtext("LastName", ""),
309 "given": author_elem.findtext("ForeName", ""),
310 "orcid": orcid,
311 "position": i,
312 })
313 return {
314 "author": authors,
315 "editor": [],
316 "publisher": "",
317 "publisher_crossref_id": None,
318 "source": "pubmed",
319 }
322def fetch_api_data(identifiers: Dict[str, str]) -> Tuple[Optional[dict], str]:
323 if "doi" in identifiers:
324 doi = identifiers["doi"]
325 try:
326 result = fetch_crossref(doi)
327 if result:
328 return result, f"doi:{doi}"
329 except requests.RequestException:
330 pass
331 try:
332 result = fetch_datacite(doi)
333 if result:
334 return result, f"doi:{doi}"
335 except requests.RequestException:
336 pass
337 if "pmid" in identifiers:
338 pmid = identifiers["pmid"]
339 try:
340 time.sleep(0.34)
341 result = fetch_pubmed(pmid)
342 if result:
343 return result, f"pmid:{pmid}"
344 except requests.RequestException:
345 pass
346 return None, ""
349def match_ars_to_api(
350 ar_infos: List[dict], api_entries: List[dict]
351) -> List[str]:
352 if not api_entries:
353 return []
354 orcid_to_pos = {}
355 for entry in api_entries:
356 if entry["orcid"]:
357 orcid_to_pos[normalize_orcid(entry["orcid"])] = entry["position"]
358 name_to_positions = defaultdict(list)
359 for entry in api_entries:
360 if entry["family"]:
361 name_to_positions[normalize_name(entry["family"])].append(entry["position"])
362 ar_to_position: Dict[str, int] = {}
363 used_positions: set = set()
364 sorted_ars = sorted(ar_infos, key=lambda a: extract_omid_number(a["ar"]))
365 for ar in sorted_ars:
366 if ar["ra_orcid"]:
367 norm = normalize_orcid(ar["ra_orcid"])
368 if norm in orcid_to_pos:
369 pos = orcid_to_pos[norm]
370 if pos not in used_positions:
371 ar_to_position[ar["ar"]] = pos
372 used_positions.add(pos)
373 for ar in sorted_ars:
374 if ar["ar"] in ar_to_position:
375 continue
376 if ar["ra_family"]:
377 norm = normalize_name(ar["ra_family"])
378 if norm in name_to_positions:
379 for pos in name_to_positions[norm]:
380 if pos not in used_positions:
381 ar_to_position[ar["ar"]] = pos
382 used_positions.add(pos)
383 break
384 ordered = sorted(ar_to_position.items(), key=lambda x: x[1])
385 return [uri for uri, _ in ordered]
388def match_publisher_ars(
389 ar_infos: List[dict], api_publisher: str
390) -> List[str]:
391 if not api_publisher:
392 return []
393 norm_api = normalize_name(api_publisher)
394 for ar in sorted(ar_infos, key=lambda a: extract_omid_number(a["ar"])):
395 if ar["ra_name"] and normalize_name(ar["ra_name"]) == norm_api:
396 return [ar["ar"]]
397 return []
400def format_person_for_csv(entry: dict) -> str:
401 family = entry["family"]
402 given = entry["given"]
403 if family and given:
404 name_str = f"{family}, {given}"
405 elif family:
406 name_str = family
407 elif given:
408 name_str = given
409 else:
410 name_str = ""
411 if entry["orcid"]:
412 name_str += f" [orcid:{entry['orcid']}]"
413 return name_str
416def build_csv_row(identifier: str, role_type: str, api_data: dict) -> dict:
417 row = {"id": identifier}
418 if role_type == "author":
419 row["author"] = "; ".join(
420 format_person_for_csv(e) for e in api_data["author"]
421 )
422 elif role_type == "editor":
423 row["editor"] = "; ".join(
424 format_person_for_csv(e) for e in api_data["editor"]
425 )
426 elif role_type == "publisher":
427 publisher_name = api_data["publisher"]
428 crossref_id = api_data["publisher_crossref_id"]
429 if crossref_id:
430 row["publisher"] = f"{publisher_name} [crossref:{crossref_id}]"
431 else:
432 row["publisher"] = publisher_name
433 return row
436def generate_csv(corrections: List[dict], output_path: str) -> None:
437 ready = [c for c in corrections if c["status"] == "ready"]
438 grouped: Dict[Tuple[str, str], dict] = {}
439 for c in ready:
440 key = (c["br"], c["identifier"])
441 if key not in grouped:
442 grouped[key] = {col: "" for col in CSV_COLUMNS}
443 grouped[key]["id"] = c["identifier"]
444 for col in CSV_COLUMNS:
445 value = c["csv_row"].get(col, "")
446 if value:
447 grouped[key][col] = value
449 output_dir = os.path.dirname(os.path.abspath(output_path))
450 if output_dir:
451 os.makedirs(output_dir, exist_ok=True)
452 with open(output_path, "w", encoding="utf-8", newline="") as f:
453 writer = csv.DictWriter(f, fieldnames=CSV_COLUMNS)
454 writer.writeheader()
455 for row in grouped.values():
456 writer.writerow(row)
458 print(f"CSV for Meta saved to {output_path} ({len(grouped)} rows)")
461def _ar_summary(ar: dict) -> dict:
462 return {
463 "ar": ar["ar"],
464 "ra": ar["ra"],
465 "ra_name": ar["ra_name"],
466 "has_next": ar["has_next"],
467 }
470def dry_run(
471 config_path: str, anomaly_path: str, output_path: str, csv_output: Optional[str]
472) -> None:
473 with open(config_path, encoding="utf-8") as f:
474 settings = yaml.safe_load(f)
475 rdf_dir = os.path.join(settings["output_rdf_dir"], "rdf")
476 dir_split = settings["dir_split_number"]
477 items_per_file = settings["items_per_file"]
479 with open(anomaly_path, "rb") as f:
480 report = orjson.loads(f.read())
482 groups: Dict[Tuple[str, str], List[dict]] = defaultdict(list)
483 for anomaly in report["anomalies"]:
484 key = (anomaly["br"], anomaly["role_type"])
485 groups[key].append(anomaly)
487 corrections = []
488 summary = {
489 "total_brs": len(groups),
490 "api_resolved": 0,
491 "no_identifiers": 0,
492 "api_error": 0,
493 "manual_review_needed": 0,
494 }
496 with create_progress() as progress:
497 task = progress.add_task("Analyzing anomalies", total=len(groups))
498 for (br_uri, role_type), anomalies in groups.items():
499 anomaly_types = list({a["anomaly_type"] for a in anomalies})
500 ar_infos = load_all_ars_for_br_role(
501 br_uri, role_type, rdf_dir, dir_split, items_per_file
502 )
503 ar_summaries = [_ar_summary(ar) for ar in ar_infos]
504 all_ar_uris = [ar["ar"] for ar in ar_infos]
506 if role_type == "unknown":
507 summary["manual_review_needed"] += 1
508 corrections.append({
509 "br": br_uri,
510 "role_type": role_type,
511 "anomalies": anomaly_types,
512 "source": None,
513 "identifier": None,
514 "current_ars": ar_summaries,
515 "csv_row": {},
516 "delete_ars": [],
517 "operations": [],
518 "status": "manual_review",
519 })
520 progress.update(task, advance=1)
521 continue
523 identifiers = load_br_identifiers(
524 br_uri, rdf_dir, dir_split, items_per_file
525 )
527 if not identifiers:
528 summary["no_identifiers"] += 1
529 corrections.append({
530 "br": br_uri,
531 "role_type": role_type,
532 "anomalies": anomaly_types,
533 "source": None,
534 "identifier": None,
535 "current_ars": ar_summaries,
536 "csv_row": {},
537 "delete_ars": [],
538 "operations": [],
539 "status": "no_identifiers",
540 })
541 progress.update(task, advance=1)
542 continue
544 api_data, identifier_str = fetch_api_data(identifiers)
546 if not api_data:
547 summary["api_error"] += 1
548 id_str = next(
549 (f"{k}:{v}" for k, v in identifiers.items()), None
550 )
551 corrections.append({
552 "br": br_uri,
553 "role_type": role_type,
554 "anomalies": anomaly_types,
555 "source": None,
556 "identifier": id_str,
557 "current_ars": ar_summaries,
558 "csv_row": {},
559 "delete_ars": [],
560 "operations": [],
561 "status": "api_error",
562 })
563 progress.update(task, advance=1)
564 continue
566 extra_fields = {}
567 if role_type in ("author", "editor"):
568 api_entries = api_data[role_type]
569 extra_fields[f"api_{role_type}s"] = api_entries
570 api_matched = match_ars_to_api(ar_infos, api_entries)
571 extra_fields["api_matched_ars"] = api_matched
572 has_api_data = len(api_entries) > 0
573 else:
574 api_publisher = api_data["publisher"]
575 extra_fields["api_publisher"] = api_publisher
576 api_matched = match_publisher_ars(ar_infos, api_publisher)
577 extra_fields["api_matched_ars"] = api_matched
578 has_api_data = bool(api_publisher)
580 if has_api_data:
581 status = "ready"
582 summary["api_resolved"] += 1
583 else:
584 status = "manual_review"
585 summary["manual_review_needed"] += 1
587 operations = []
588 if status == "ready":
589 for ar_uri in all_ar_uris:
590 operations.append({"action": "remove_next", "ar": ar_uri})
591 for ar_uri in all_ar_uris:
592 operations.append(
593 {"action": "delete_ar", "ar": ar_uri, "br": br_uri}
594 )
596 csv_row = (
597 build_csv_row(identifier_str, role_type, api_data)
598 if status == "ready"
599 else {}
600 )
602 correction = {
603 "br": br_uri,
604 "role_type": role_type,
605 "anomalies": anomaly_types,
606 "source": api_data["source"],
607 "identifier": identifier_str,
608 **extra_fields,
609 "current_ars": ar_summaries,
610 "csv_row": csv_row,
611 "delete_ars": all_ar_uris if status == "ready" else [],
612 "operations": operations,
613 "status": status,
614 }
615 corrections.append(correction)
616 progress.update(task, advance=1)
618 plan = {
619 "generated": datetime.now(timezone.utc).isoformat(),
620 "config": os.path.abspath(config_path),
621 "summary": summary,
622 "corrections": corrections,
623 }
625 output_dir = os.path.dirname(os.path.abspath(output_path))
626 if output_dir:
627 os.makedirs(output_dir, exist_ok=True)
628 with open(output_path, "wb") as f:
629 f.write(orjson.dumps(plan, option=orjson.OPT_INDENT_2))
631 print(f"Correction plan saved to {output_path}")
632 print(f" Total groups: {summary['total_brs']}")
633 print(f" API resolved: {summary['api_resolved']}")
634 print(f" No identifiers: {summary['no_identifiers']}")
635 print(f" API errors: {summary['api_error']}")
636 print(f" Manual review: {summary['manual_review_needed']}")
638 if csv_output:
639 generate_csv(corrections, csv_output)
642def apply_correction(editor: MetaEditor, correction: dict) -> None:
643 br_uri = correction["br"]
644 supplier_prefix = get_supplier_prefix(br_uri)
645 assert supplier_prefix is not None
646 g_set = GraphSet(
647 editor.base_iri,
648 supplier_prefix=supplier_prefix,
649 custom_counter_handler=editor.counter_handler,
650 )
651 entities_to_import = [URIRef(br_uri)]
652 for ar_info in correction["current_ars"]:
653 entities_to_import.append(URIRef(ar_info["ar"]))
654 editor.reader.import_entities_from_triplestore(
655 g_set=g_set,
656 ts_url=editor.endpoint,
657 entities=entities_to_import,
658 resp_agent=editor.resp_agent,
659 enable_validation=False,
660 batch_size=10,
661 )
662 br_entity = g_set.get_entity(br_uri)
663 assert br_entity is not None
664 for ar_uri in correction["delete_ars"]:
665 ar_entity = g_set.get_entity(ar_uri)
666 assert ar_entity is not None
667 ar_entity.remove_next() # type: ignore[attr-defined]
668 br_entity.remove_contributor(ar_entity) # type: ignore[attr-defined]
669 ar_entity.mark_as_to_be_deleted()
670 editor.save(g_set, supplier_prefix)
673def execute(config_path: str, plan_path: str, resp_agent: str) -> None:
674 with open(plan_path, "rb") as f:
675 plan = orjson.loads(f.read())
677 editor = MetaEditor(config_path, resp_agent)
678 ready_corrections = [c for c in plan["corrections"] if c["status"] == "ready"]
680 print(f"Executing {len(ready_corrections)} corrections...")
682 succeeded = 0
683 failed = 0
684 with create_progress() as progress:
685 task = progress.add_task(
686 "Applying corrections", total=len(ready_corrections)
687 )
688 for correction in ready_corrections:
689 try:
690 apply_correction(editor, correction)
691 succeeded += 1
692 except Exception as e:
693 print(
694 f" Error fixing {correction['br']}"
695 f" ({correction['role_type']}): {e}"
696 )
697 failed += 1
698 progress.update(task, advance=1)
700 print(f"Execution complete: {succeeded} succeeded, {failed} failed")
703def main() -> None:
704 parser = argparse.ArgumentParser(
705 description="Fix hasNext chain anomalies in RDF data",
706 formatter_class=RichHelpFormatter,
707 )
708 parser.add_argument(
709 "-c", "--config", required=True, help="Meta config YAML file path"
710 )
711 parser.add_argument(
712 "-a", "--anomalies", help="Anomaly report JSON file path (for dry run)"
713 )
714 parser.add_argument(
715 "-o", "--output", help="Output correction plan JSON path (for dry run)"
716 )
717 parser.add_argument(
718 "--csv-output", help="Output CSV path for Meta input (for dry run)"
719 )
720 parser.add_argument(
721 "--dry-run",
722 action="store_true",
723 help="Generate correction plan without applying",
724 )
725 parser.add_argument(
726 "--execute", metavar="PLAN", help="Execute corrections from a reviewed plan"
727 )
728 parser.add_argument(
729 "-r", "--resp-agent", help="Responsible agent URI (for execute mode)"
730 )
731 args = parser.parse_args()
733 if args.dry_run:
734 if not args.anomalies or not args.output:
735 parser.error("--dry-run requires -a/--anomalies and -o/--output")
736 dry_run(args.config, args.anomalies, args.output, args.csv_output)
737 elif args.execute:
738 if not args.resp_agent:
739 parser.error("--execute requires -r/--resp-agent")
740 execute(args.config, args.execute, args.resp_agent)
741 else:
742 parser.error("Specify either --dry-run or --execute")
745if __name__ == "__main__":
746 main()