Coverage for oc_meta / run / patches / has_next.py: 0%
456 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 17:25 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 17:25 +0000
1from __future__ import annotations
3import argparse
4import csv
5import json
6import os
7import re
8import time
9import unicodedata
10import xml.etree.ElementTree as ET
11from collections import defaultdict
12from datetime import datetime, timezone
13from typing import Dict, List, Optional, Tuple
15import requests
16import yaml
17from oc_ocdm.graph import GraphSet
18from rdflib import URIRef
19from rich.progress import (BarColumn, MofNCompleteColumn, Progress,
20 SpinnerColumn, TextColumn, TimeElapsedColumn,
21 TimeRemainingColumn)
22from rich_argparse import RichHelpFormatter
24from oc_meta.core.editor import MetaEditor
25from oc_meta.run.meta.generate_csv import find_file, load_json_from_file
27HAS_IDENTIFIER = "http://purl.org/spar/datacite/hasIdentifier"
28USES_ID_SCHEME = "http://purl.org/spar/datacite/usesIdentifierScheme"
29HAS_LITERAL_VALUE = (
30 "http://www.essepuntato.it/2010/06/literalreification/hasLiteralValue"
31)
32IS_DOC_CONTEXT_FOR = "http://purl.org/spar/pro/isDocumentContextFor"
33WITH_ROLE = "http://purl.org/spar/pro/withRole"
34IS_HELD_BY = "http://purl.org/spar/pro/isHeldBy"
35HAS_NEXT = "https://w3id.org/oc/ontology/hasNext"
36FAMILY_NAME = "http://xmlns.com/foaf/0.1/familyName"
37GIVEN_NAME = "http://xmlns.com/foaf/0.1/givenName"
38FOAF_NAME = "http://xmlns.com/foaf/0.1/name"
40ROLE_MAP = {
41 "http://purl.org/spar/pro/author": "author",
42 "http://purl.org/spar/pro/editor": "editor",
43 "http://purl.org/spar/pro/publisher": "publisher",
44}
46CSV_COLUMNS = [
47 "id", "title", "author", "pub_date", "venue",
48 "volume", "issue", "page", "type", "publisher", "editor",
49]
51CROSSREF_BASE = "https://api.crossref.org/works/"
52DATACITE_BASE = "https://api.datacite.org/dois/"
53PUBMED_BASE = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi"
55SESSION = requests.Session()
56SESSION.headers.update(
57 {"User-Agent": "oc_meta_fixer/1.0 (mailto:arcangelo.massari@unibo.it)"}
58)
61def get_supplier_prefix(uri: str) -> str | None:
62 match = re.match(r"^(.+)/([a-z][a-z])/(0[1-9]+0)?([1-9][0-9]*)$", uri)
63 if match is None:
64 return None
65 return match.group(3)
68def extract_omid_number(uri: str) -> int:
69 return int(uri.split("/")[-1])
72def normalize_name(name: str) -> str:
73 if not name:
74 return ""
75 nfkd = unicodedata.normalize("NFKD", name)
76 return "".join(c for c in nfkd if not unicodedata.combining(c)).lower().strip()
79def normalize_orcid(orcid: str) -> str:
80 if not orcid:
81 return ""
82 return (
83 orcid.replace("https://orcid.org/", "")
84 .replace("http://orcid.org/", "")
85 .strip()
86 .upper()
87 )
90def find_entity_in_file(
91 uri: str, rdf_dir: str, dir_split: int, items_per_file: int
92) -> Optional[dict]:
93 filepath = find_file(rdf_dir, dir_split, items_per_file, uri)
94 if not filepath:
95 return None
96 data = load_json_from_file(filepath)
97 for graph in data:
98 for entity in graph["@graph"]:
99 if entity["@id"] == uri:
100 return entity
101 return None
104def load_br_identifiers(
105 br_uri: str, rdf_dir: str, dir_split: int, items_per_file: int
106) -> Dict[str, str]:
107 br_entity = find_entity_in_file(br_uri, rdf_dir, dir_split, items_per_file)
108 if not br_entity or HAS_IDENTIFIER not in br_entity:
109 return {}
110 result = {}
111 for id_ref in br_entity[HAS_IDENTIFIER]:
112 id_entity = find_entity_in_file(
113 id_ref["@id"], rdf_dir, dir_split, items_per_file
114 )
115 if not id_entity:
116 continue
117 if USES_ID_SCHEME not in id_entity or HAS_LITERAL_VALUE not in id_entity:
118 continue
119 scheme = id_entity[USES_ID_SCHEME][0]["@id"].split("/datacite/")[1]
120 value = id_entity[HAS_LITERAL_VALUE][0]["@value"]
121 result[scheme] = value
122 return result
125def load_ra_info(
126 ra_uri: str, rdf_dir: str, dir_split: int, items_per_file: int
127) -> dict:
128 ra_entity = find_entity_in_file(ra_uri, rdf_dir, dir_split, items_per_file)
129 if not ra_entity:
130 return {"family_name": None, "given_name": None, "name": None, "orcid": None}
131 family = None
132 given = None
133 name = None
134 orcid = None
135 if FAMILY_NAME in ra_entity:
136 family = ra_entity[FAMILY_NAME][0]["@value"]
137 if GIVEN_NAME in ra_entity:
138 given = ra_entity[GIVEN_NAME][0]["@value"]
139 if FOAF_NAME in ra_entity:
140 name = ra_entity[FOAF_NAME][0]["@value"]
141 if HAS_IDENTIFIER in ra_entity:
142 for id_ref in ra_entity[HAS_IDENTIFIER]:
143 id_entity = find_entity_in_file(
144 id_ref["@id"], rdf_dir, dir_split, items_per_file
145 )
146 if not id_entity:
147 continue
148 if USES_ID_SCHEME not in id_entity or HAS_LITERAL_VALUE not in id_entity:
149 continue
150 scheme = id_entity[USES_ID_SCHEME][0]["@id"].split("/datacite/")[1]
151 if scheme == "orcid":
152 orcid = id_entity[HAS_LITERAL_VALUE][0]["@value"]
153 break
154 return {"family_name": family, "given_name": given, "name": name, "orcid": orcid}
157def load_all_ars_for_br_role(
158 br_uri: str, role_type: str, rdf_dir: str, dir_split: int, items_per_file: int
159) -> List[dict]:
160 br_entity = find_entity_in_file(br_uri, rdf_dir, dir_split, items_per_file)
161 if not br_entity or IS_DOC_CONTEXT_FOR not in br_entity:
162 return []
163 result = []
164 for ar_ref in br_entity[IS_DOC_CONTEXT_FOR]:
165 ar_uri = ar_ref["@id"]
166 ar_entity = find_entity_in_file(ar_uri, rdf_dir, dir_split, items_per_file)
167 if not ar_entity or WITH_ROLE not in ar_entity:
168 continue
169 role_uri = ar_entity[WITH_ROLE][0]["@id"]
170 ar_role = ROLE_MAP.get(role_uri, "unknown")
171 if ar_role != role_type:
172 continue
173 ra_uri = None
174 if IS_HELD_BY in ar_entity:
175 ra_uri = ar_entity[IS_HELD_BY][0]["@id"]
176 has_next = []
177 if HAS_NEXT in ar_entity:
178 has_next = [item["@id"] for item in ar_entity[HAS_NEXT]]
179 ra_info = {"family_name": None, "given_name": None, "name": None, "orcid": None}
180 if ra_uri:
181 ra_info = load_ra_info(ra_uri, rdf_dir, dir_split, items_per_file)
182 ra_name = ""
183 if ra_info["family_name"] or ra_info["given_name"]:
184 parts = [ra_info["family_name"] or "", ra_info["given_name"] or ""]
185 ra_name = f"{parts[0]}, {parts[1]}"
186 elif ra_info["name"]:
187 ra_name = ra_info["name"]
188 result.append({
189 "ar": ar_uri,
190 "ra": ra_uri,
191 "ra_name": ra_name,
192 "ra_family": ra_info["family_name"],
193 "ra_given": ra_info["given_name"],
194 "ra_orcid": ra_info["orcid"],
195 "has_next": has_next,
196 })
197 return result
200def _strip_orcid_url(orcid: str) -> str:
201 if not orcid:
202 return orcid
203 return (
204 orcid.replace("https://orcid.org/", "")
205 .replace("http://orcid.org/", "")
206 )
209def fetch_crossref(doi: str) -> Optional[dict]:
210 resp = SESSION.get(CROSSREF_BASE + doi, timeout=30)
211 if resp.status_code == 404:
212 return None
213 resp.raise_for_status()
214 msg = resp.json()["message"]
215 authors = []
216 for i, a in enumerate(msg.get("author", [])):
217 authors.append({
218 "family": a.get("family", ""),
219 "given": a.get("given", ""),
220 "orcid": _strip_orcid_url(a.get("ORCID")),
221 "position": i,
222 })
223 editors = []
224 for i, e in enumerate(msg.get("editor", [])):
225 editors.append({
226 "family": e.get("family", ""),
227 "given": e.get("given", ""),
228 "orcid": _strip_orcid_url(e.get("ORCID")),
229 "position": i,
230 })
231 return {
232 "author": authors,
233 "editor": editors,
234 "publisher": msg.get("publisher", ""),
235 "publisher_crossref_id": msg.get("member"),
236 "source": "crossref",
237 }
240def fetch_datacite(doi: str) -> Optional[dict]:
241 resp = SESSION.get(DATACITE_BASE + doi, timeout=30)
242 if resp.status_code == 404:
243 return None
244 resp.raise_for_status()
245 attrs = resp.json()["data"]["attributes"]
246 authors = []
247 for i, c in enumerate(attrs.get("creators", [])):
248 orcid = None
249 for ni in c.get("nameIdentifiers", []):
250 if ni.get("nameIdentifierScheme", "").upper() == "ORCID":
251 orcid = _strip_orcid_url(ni["nameIdentifier"])
252 break
253 authors.append({
254 "family": c.get("familyName", ""),
255 "given": c.get("givenName", ""),
256 "orcid": orcid,
257 "position": i,
258 })
259 editors = []
260 editor_idx = 0
261 for c in attrs.get("contributors", []):
262 if c.get("contributorType") != "Editor":
263 continue
264 orcid = None
265 for ni in c.get("nameIdentifiers", []):
266 if ni.get("nameIdentifierScheme", "").upper() == "ORCID":
267 orcid = _strip_orcid_url(ni["nameIdentifier"])
268 break
269 editors.append({
270 "family": c.get("familyName", ""),
271 "given": c.get("givenName", ""),
272 "orcid": orcid,
273 "position": editor_idx,
274 })
275 editor_idx += 1
276 publisher = attrs.get("publisher", "")
277 if isinstance(publisher, dict):
278 publisher = publisher.get("name", "")
279 return {
280 "author": authors,
281 "editor": editors,
282 "publisher": publisher,
283 "publisher_crossref_id": None,
284 "source": "datacite",
285 }
288def fetch_pubmed(pmid: str) -> Optional[dict]:
289 params = {"db": "pubmed", "id": pmid, "rettype": "xml", "retmode": "xml"}
290 resp = SESSION.get(PUBMED_BASE, params=params, timeout=30)
291 resp.raise_for_status()
292 root = ET.fromstring(resp.content)
293 article = root.find(".//Article")
294 if article is None:
295 return None
296 authors = []
297 author_list = article.find("AuthorList")
298 if author_list is not None:
299 for i, author_elem in enumerate(author_list.findall("Author")):
300 orcid = None
301 for ident in author_elem.findall("Identifier"):
302 if ident.get("Source") == "ORCID" and ident.text is not None:
303 orcid = _strip_orcid_url(ident.text)
304 break
305 authors.append({
306 "family": author_elem.findtext("LastName", ""),
307 "given": author_elem.findtext("ForeName", ""),
308 "orcid": orcid,
309 "position": i,
310 })
311 return {
312 "author": authors,
313 "editor": [],
314 "publisher": "",
315 "publisher_crossref_id": None,
316 "source": "pubmed",
317 }
320def fetch_api_data(identifiers: Dict[str, str]) -> Tuple[Optional[dict], str]:
321 if "doi" in identifiers:
322 doi = identifiers["doi"]
323 try:
324 result = fetch_crossref(doi)
325 if result:
326 return result, f"doi:{doi}"
327 except requests.RequestException:
328 pass
329 try:
330 result = fetch_datacite(doi)
331 if result:
332 return result, f"doi:{doi}"
333 except requests.RequestException:
334 pass
335 if "pmid" in identifiers:
336 pmid = identifiers["pmid"]
337 try:
338 time.sleep(0.34)
339 result = fetch_pubmed(pmid)
340 if result:
341 return result, f"pmid:{pmid}"
342 except requests.RequestException:
343 pass
344 return None, ""
347def match_ars_to_api(
348 ar_infos: List[dict], api_entries: List[dict]
349) -> List[str]:
350 if not api_entries:
351 return []
352 orcid_to_pos = {}
353 for entry in api_entries:
354 if entry["orcid"]:
355 orcid_to_pos[normalize_orcid(entry["orcid"])] = entry["position"]
356 name_to_positions = defaultdict(list)
357 for entry in api_entries:
358 if entry["family"]:
359 name_to_positions[normalize_name(entry["family"])].append(entry["position"])
360 ar_to_position: Dict[str, int] = {}
361 used_positions: set = set()
362 sorted_ars = sorted(ar_infos, key=lambda a: extract_omid_number(a["ar"]))
363 for ar in sorted_ars:
364 if ar["ra_orcid"]:
365 norm = normalize_orcid(ar["ra_orcid"])
366 if norm in orcid_to_pos:
367 pos = orcid_to_pos[norm]
368 if pos not in used_positions:
369 ar_to_position[ar["ar"]] = pos
370 used_positions.add(pos)
371 for ar in sorted_ars:
372 if ar["ar"] in ar_to_position:
373 continue
374 if ar["ra_family"]:
375 norm = normalize_name(ar["ra_family"])
376 if norm in name_to_positions:
377 for pos in name_to_positions[norm]:
378 if pos not in used_positions:
379 ar_to_position[ar["ar"]] = pos
380 used_positions.add(pos)
381 break
382 ordered = sorted(ar_to_position.items(), key=lambda x: x[1])
383 return [uri for uri, _ in ordered]
386def match_publisher_ars(
387 ar_infos: List[dict], api_publisher: str
388) -> List[str]:
389 if not api_publisher:
390 return []
391 norm_api = normalize_name(api_publisher)
392 for ar in sorted(ar_infos, key=lambda a: extract_omid_number(a["ar"])):
393 if ar["ra_name"] and normalize_name(ar["ra_name"]) == norm_api:
394 return [ar["ar"]]
395 return []
398def format_person_for_csv(entry: dict) -> str:
399 family = entry["family"]
400 given = entry["given"]
401 if family and given:
402 name_str = f"{family}, {given}"
403 elif family:
404 name_str = family
405 elif given:
406 name_str = given
407 else:
408 name_str = ""
409 if entry["orcid"]:
410 name_str += f" [orcid:{entry['orcid']}]"
411 return name_str
414def build_csv_row(identifier: str, role_type: str, api_data: dict) -> dict:
415 row = {"id": identifier}
416 if role_type == "author":
417 row["author"] = "; ".join(
418 format_person_for_csv(e) for e in api_data["author"]
419 )
420 elif role_type == "editor":
421 row["editor"] = "; ".join(
422 format_person_for_csv(e) for e in api_data["editor"]
423 )
424 elif role_type == "publisher":
425 publisher_name = api_data["publisher"]
426 crossref_id = api_data["publisher_crossref_id"]
427 if crossref_id:
428 row["publisher"] = f"{publisher_name} [crossref:{crossref_id}]"
429 else:
430 row["publisher"] = publisher_name
431 return row
434def generate_csv(corrections: List[dict], output_path: str) -> None:
435 ready = [c for c in corrections if c["status"] == "ready"]
436 grouped: Dict[Tuple[str, str], dict] = {}
437 for c in ready:
438 key = (c["br"], c["identifier"])
439 if key not in grouped:
440 grouped[key] = {col: "" for col in CSV_COLUMNS}
441 grouped[key]["id"] = c["identifier"]
442 for col in CSV_COLUMNS:
443 value = c["csv_row"].get(col, "")
444 if value:
445 grouped[key][col] = value
447 output_dir = os.path.dirname(os.path.abspath(output_path))
448 if output_dir:
449 os.makedirs(output_dir, exist_ok=True)
450 with open(output_path, "w", encoding="utf-8", newline="") as f:
451 writer = csv.DictWriter(f, fieldnames=CSV_COLUMNS)
452 writer.writeheader()
453 for row in grouped.values():
454 writer.writerow(row)
456 print(f"CSV for Meta saved to {output_path} ({len(grouped)} rows)")
459def _ar_summary(ar: dict) -> dict:
460 return {
461 "ar": ar["ar"],
462 "ra": ar["ra"],
463 "ra_name": ar["ra_name"],
464 "has_next": ar["has_next"],
465 }
468def dry_run(
469 config_path: str, anomaly_path: str, output_path: str, csv_output: Optional[str]
470) -> None:
471 with open(config_path, encoding="utf-8") as f:
472 settings = yaml.safe_load(f)
473 rdf_dir = os.path.join(settings["output_rdf_dir"], "rdf")
474 dir_split = settings["dir_split_number"]
475 items_per_file = settings["items_per_file"]
477 with open(anomaly_path, encoding="utf-8") as f:
478 report = json.load(f)
480 groups: Dict[Tuple[str, str], List[dict]] = defaultdict(list)
481 for anomaly in report["anomalies"]:
482 key = (anomaly["br"], anomaly["role_type"])
483 groups[key].append(anomaly)
485 corrections = []
486 summary = {
487 "total_brs": len(groups),
488 "api_resolved": 0,
489 "no_identifiers": 0,
490 "api_error": 0,
491 "manual_review_needed": 0,
492 }
494 with Progress(
495 SpinnerColumn(),
496 TextColumn("[progress.description]{task.description}"),
497 BarColumn(),
498 MofNCompleteColumn(),
499 TimeElapsedColumn(),
500 TimeRemainingColumn(),
501 ) as progress:
502 task = progress.add_task("Analyzing anomalies", total=len(groups))
503 for (br_uri, role_type), anomalies in groups.items():
504 anomaly_types = list({a["anomaly_type"] for a in anomalies})
505 ar_infos = load_all_ars_for_br_role(
506 br_uri, role_type, rdf_dir, dir_split, items_per_file
507 )
508 ar_summaries = [_ar_summary(ar) for ar in ar_infos]
509 all_ar_uris = [ar["ar"] for ar in ar_infos]
511 if role_type == "unknown":
512 summary["manual_review_needed"] += 1
513 corrections.append({
514 "br": br_uri,
515 "role_type": role_type,
516 "anomalies": anomaly_types,
517 "source": None,
518 "identifier": None,
519 "current_ars": ar_summaries,
520 "csv_row": {},
521 "delete_ars": [],
522 "operations": [],
523 "status": "manual_review",
524 })
525 progress.update(task, advance=1)
526 continue
528 identifiers = load_br_identifiers(
529 br_uri, rdf_dir, dir_split, items_per_file
530 )
532 if not identifiers:
533 summary["no_identifiers"] += 1
534 corrections.append({
535 "br": br_uri,
536 "role_type": role_type,
537 "anomalies": anomaly_types,
538 "source": None,
539 "identifier": None,
540 "current_ars": ar_summaries,
541 "csv_row": {},
542 "delete_ars": [],
543 "operations": [],
544 "status": "no_identifiers",
545 })
546 progress.update(task, advance=1)
547 continue
549 api_data, identifier_str = fetch_api_data(identifiers)
551 if not api_data:
552 summary["api_error"] += 1
553 id_str = next(
554 (f"{k}:{v}" for k, v in identifiers.items()), None
555 )
556 corrections.append({
557 "br": br_uri,
558 "role_type": role_type,
559 "anomalies": anomaly_types,
560 "source": None,
561 "identifier": id_str,
562 "current_ars": ar_summaries,
563 "csv_row": {},
564 "delete_ars": [],
565 "operations": [],
566 "status": "api_error",
567 })
568 progress.update(task, advance=1)
569 continue
571 extra_fields = {}
572 if role_type in ("author", "editor"):
573 api_entries = api_data[role_type]
574 extra_fields[f"api_{role_type}s"] = api_entries
575 api_matched = match_ars_to_api(ar_infos, api_entries)
576 extra_fields["api_matched_ars"] = api_matched
577 has_api_data = len(api_entries) > 0
578 else:
579 api_publisher = api_data["publisher"]
580 extra_fields["api_publisher"] = api_publisher
581 api_matched = match_publisher_ars(ar_infos, api_publisher)
582 extra_fields["api_matched_ars"] = api_matched
583 has_api_data = bool(api_publisher)
585 if has_api_data:
586 status = "ready"
587 summary["api_resolved"] += 1
588 else:
589 status = "manual_review"
590 summary["manual_review_needed"] += 1
592 operations = []
593 if status == "ready":
594 for ar_uri in all_ar_uris:
595 operations.append({"action": "remove_next", "ar": ar_uri})
596 for ar_uri in all_ar_uris:
597 operations.append(
598 {"action": "delete_ar", "ar": ar_uri, "br": br_uri}
599 )
601 csv_row = (
602 build_csv_row(identifier_str, role_type, api_data)
603 if status == "ready"
604 else {}
605 )
607 correction = {
608 "br": br_uri,
609 "role_type": role_type,
610 "anomalies": anomaly_types,
611 "source": api_data["source"],
612 "identifier": identifier_str,
613 **extra_fields,
614 "current_ars": ar_summaries,
615 "csv_row": csv_row,
616 "delete_ars": all_ar_uris if status == "ready" else [],
617 "operations": operations,
618 "status": status,
619 }
620 corrections.append(correction)
621 progress.update(task, advance=1)
623 plan = {
624 "generated": datetime.now(timezone.utc).isoformat(),
625 "config": os.path.abspath(config_path),
626 "summary": summary,
627 "corrections": corrections,
628 }
630 output_dir = os.path.dirname(os.path.abspath(output_path))
631 if output_dir:
632 os.makedirs(output_dir, exist_ok=True)
633 with open(output_path, "w", encoding="utf-8") as f:
634 json.dump(plan, f, indent=2, ensure_ascii=False)
636 print(f"Correction plan saved to {output_path}")
637 print(f" Total groups: {summary['total_brs']}")
638 print(f" API resolved: {summary['api_resolved']}")
639 print(f" No identifiers: {summary['no_identifiers']}")
640 print(f" API errors: {summary['api_error']}")
641 print(f" Manual review: {summary['manual_review_needed']}")
643 if csv_output:
644 generate_csv(corrections, csv_output)
647def apply_correction(editor: MetaEditor, correction: dict) -> None:
648 br_uri = correction["br"]
649 supplier_prefix = get_supplier_prefix(br_uri)
650 assert supplier_prefix is not None
651 g_set = GraphSet(
652 editor.base_iri,
653 supplier_prefix=supplier_prefix,
654 custom_counter_handler=editor.counter_handler,
655 )
656 entities_to_import = [URIRef(br_uri)]
657 for ar_info in correction["current_ars"]:
658 entities_to_import.append(URIRef(ar_info["ar"]))
659 editor.reader.import_entities_from_triplestore(
660 g_set=g_set,
661 ts_url=editor.endpoint,
662 entities=entities_to_import,
663 resp_agent=editor.resp_agent,
664 enable_validation=False,
665 batch_size=10,
666 )
667 br_entity = g_set.get_entity(URIRef(br_uri))
668 assert br_entity is not None
669 for ar_uri in correction["delete_ars"]:
670 ar_entity = g_set.get_entity(URIRef(ar_uri))
671 assert ar_entity is not None
672 ar_entity.remove_next() # type: ignore[attr-defined]
673 br_entity.remove_contributor(ar_entity) # type: ignore[attr-defined]
674 ar_entity.mark_as_to_be_deleted()
675 editor.save(g_set, supplier_prefix)
678def execute(config_path: str, plan_path: str, resp_agent: str) -> None:
679 with open(plan_path, encoding="utf-8") as f:
680 plan = json.load(f)
682 editor = MetaEditor(config_path, resp_agent)
683 ready_corrections = [c for c in plan["corrections"] if c["status"] == "ready"]
685 print(f"Executing {len(ready_corrections)} corrections...")
687 succeeded = 0
688 failed = 0
689 with Progress(
690 SpinnerColumn(),
691 TextColumn("[progress.description]{task.description}"),
692 BarColumn(),
693 MofNCompleteColumn(),
694 TimeElapsedColumn(),
695 TimeRemainingColumn(),
696 ) as progress:
697 task = progress.add_task(
698 "Applying corrections", total=len(ready_corrections)
699 )
700 for correction in ready_corrections:
701 try:
702 apply_correction(editor, correction)
703 succeeded += 1
704 except Exception as e:
705 print(
706 f" Error fixing {correction['br']}"
707 f" ({correction['role_type']}): {e}"
708 )
709 failed += 1
710 progress.update(task, advance=1)
712 print(f"Execution complete: {succeeded} succeeded, {failed} failed")
715def main() -> None:
716 parser = argparse.ArgumentParser(
717 description="Fix hasNext chain anomalies in RDF data",
718 formatter_class=RichHelpFormatter,
719 )
720 parser.add_argument(
721 "-c", "--config", required=True, help="Meta config YAML file path"
722 )
723 parser.add_argument(
724 "-a", "--anomalies", help="Anomaly report JSON file path (for dry run)"
725 )
726 parser.add_argument(
727 "-o", "--output", help="Output correction plan JSON path (for dry run)"
728 )
729 parser.add_argument(
730 "--csv-output", help="Output CSV path for Meta input (for dry run)"
731 )
732 parser.add_argument(
733 "--dry-run",
734 action="store_true",
735 help="Generate correction plan without applying",
736 )
737 parser.add_argument(
738 "--execute", metavar="PLAN", help="Execute corrections from a reviewed plan"
739 )
740 parser.add_argument(
741 "-r", "--resp-agent", help="Responsible agent URI (for execute mode)"
742 )
743 args = parser.parse_args()
745 if args.dry_run:
746 if not args.anomalies or not args.output:
747 parser.error("--dry-run requires -a/--anomalies and -o/--output")
748 dry_run(args.config, args.anomalies, args.output, args.csv_output)
749 elif args.execute:
750 if not args.resp_agent:
751 parser.error("--execute requires -r/--resp-agent")
752 execute(args.config, args.execute, args.resp_agent)
753 else:
754 parser.error("Specify either --dry-run or --execute")
757if __name__ == "__main__":
758 main()