Coverage for oc_meta / run / patches / has_next.py: 0%

456 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-03 17:25 +0000

1from __future__ import annotations 

2 

3import argparse 

4import csv 

5import json 

6import os 

7import re 

8import time 

9import unicodedata 

10import xml.etree.ElementTree as ET 

11from collections import defaultdict 

12from datetime import datetime, timezone 

13from typing import Dict, List, Optional, Tuple 

14 

15import requests 

16import yaml 

17from oc_ocdm.graph import GraphSet 

18from rdflib import URIRef 

19from rich.progress import (BarColumn, MofNCompleteColumn, Progress, 

20 SpinnerColumn, TextColumn, TimeElapsedColumn, 

21 TimeRemainingColumn) 

22from rich_argparse import RichHelpFormatter 

23 

24from oc_meta.core.editor import MetaEditor 

25from oc_meta.run.meta.generate_csv import find_file, load_json_from_file 

26 

27HAS_IDENTIFIER = "http://purl.org/spar/datacite/hasIdentifier" 

28USES_ID_SCHEME = "http://purl.org/spar/datacite/usesIdentifierScheme" 

29HAS_LITERAL_VALUE = ( 

30 "http://www.essepuntato.it/2010/06/literalreification/hasLiteralValue" 

31) 

32IS_DOC_CONTEXT_FOR = "http://purl.org/spar/pro/isDocumentContextFor" 

33WITH_ROLE = "http://purl.org/spar/pro/withRole" 

34IS_HELD_BY = "http://purl.org/spar/pro/isHeldBy" 

35HAS_NEXT = "https://w3id.org/oc/ontology/hasNext" 

36FAMILY_NAME = "http://xmlns.com/foaf/0.1/familyName" 

37GIVEN_NAME = "http://xmlns.com/foaf/0.1/givenName" 

38FOAF_NAME = "http://xmlns.com/foaf/0.1/name" 

39 

40ROLE_MAP = { 

41 "http://purl.org/spar/pro/author": "author", 

42 "http://purl.org/spar/pro/editor": "editor", 

43 "http://purl.org/spar/pro/publisher": "publisher", 

44} 

45 

46CSV_COLUMNS = [ 

47 "id", "title", "author", "pub_date", "venue", 

48 "volume", "issue", "page", "type", "publisher", "editor", 

49] 

50 

51CROSSREF_BASE = "https://api.crossref.org/works/" 

52DATACITE_BASE = "https://api.datacite.org/dois/" 

53PUBMED_BASE = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi" 

54 

55SESSION = requests.Session() 

56SESSION.headers.update( 

57 {"User-Agent": "oc_meta_fixer/1.0 (mailto:arcangelo.massari@unibo.it)"} 

58) 

59 

60 

61def get_supplier_prefix(uri: str) -> str | None: 

62 match = re.match(r"^(.+)/([a-z][a-z])/(0[1-9]+0)?([1-9][0-9]*)$", uri) 

63 if match is None: 

64 return None 

65 return match.group(3) 

66 

67 

68def extract_omid_number(uri: str) -> int: 

69 return int(uri.split("/")[-1]) 

70 

71 

72def normalize_name(name: str) -> str: 

73 if not name: 

74 return "" 

75 nfkd = unicodedata.normalize("NFKD", name) 

76 return "".join(c for c in nfkd if not unicodedata.combining(c)).lower().strip() 

77 

78 

79def normalize_orcid(orcid: str) -> str: 

80 if not orcid: 

81 return "" 

82 return ( 

83 orcid.replace("https://orcid.org/", "") 

84 .replace("http://orcid.org/", "") 

85 .strip() 

86 .upper() 

87 ) 

88 

89 

90def find_entity_in_file( 

91 uri: str, rdf_dir: str, dir_split: int, items_per_file: int 

92) -> Optional[dict]: 

93 filepath = find_file(rdf_dir, dir_split, items_per_file, uri) 

94 if not filepath: 

95 return None 

96 data = load_json_from_file(filepath) 

97 for graph in data: 

98 for entity in graph["@graph"]: 

99 if entity["@id"] == uri: 

100 return entity 

101 return None 

102 

103 

104def load_br_identifiers( 

105 br_uri: str, rdf_dir: str, dir_split: int, items_per_file: int 

106) -> Dict[str, str]: 

107 br_entity = find_entity_in_file(br_uri, rdf_dir, dir_split, items_per_file) 

108 if not br_entity or HAS_IDENTIFIER not in br_entity: 

109 return {} 

110 result = {} 

111 for id_ref in br_entity[HAS_IDENTIFIER]: 

112 id_entity = find_entity_in_file( 

113 id_ref["@id"], rdf_dir, dir_split, items_per_file 

114 ) 

115 if not id_entity: 

116 continue 

117 if USES_ID_SCHEME not in id_entity or HAS_LITERAL_VALUE not in id_entity: 

118 continue 

119 scheme = id_entity[USES_ID_SCHEME][0]["@id"].split("/datacite/")[1] 

120 value = id_entity[HAS_LITERAL_VALUE][0]["@value"] 

121 result[scheme] = value 

122 return result 

123 

124 

125def load_ra_info( 

126 ra_uri: str, rdf_dir: str, dir_split: int, items_per_file: int 

127) -> dict: 

128 ra_entity = find_entity_in_file(ra_uri, rdf_dir, dir_split, items_per_file) 

129 if not ra_entity: 

130 return {"family_name": None, "given_name": None, "name": None, "orcid": None} 

131 family = None 

132 given = None 

133 name = None 

134 orcid = None 

135 if FAMILY_NAME in ra_entity: 

136 family = ra_entity[FAMILY_NAME][0]["@value"] 

137 if GIVEN_NAME in ra_entity: 

138 given = ra_entity[GIVEN_NAME][0]["@value"] 

139 if FOAF_NAME in ra_entity: 

140 name = ra_entity[FOAF_NAME][0]["@value"] 

141 if HAS_IDENTIFIER in ra_entity: 

142 for id_ref in ra_entity[HAS_IDENTIFIER]: 

143 id_entity = find_entity_in_file( 

144 id_ref["@id"], rdf_dir, dir_split, items_per_file 

145 ) 

146 if not id_entity: 

147 continue 

148 if USES_ID_SCHEME not in id_entity or HAS_LITERAL_VALUE not in id_entity: 

149 continue 

150 scheme = id_entity[USES_ID_SCHEME][0]["@id"].split("/datacite/")[1] 

151 if scheme == "orcid": 

152 orcid = id_entity[HAS_LITERAL_VALUE][0]["@value"] 

153 break 

154 return {"family_name": family, "given_name": given, "name": name, "orcid": orcid} 

155 

156 

157def load_all_ars_for_br_role( 

158 br_uri: str, role_type: str, rdf_dir: str, dir_split: int, items_per_file: int 

159) -> List[dict]: 

160 br_entity = find_entity_in_file(br_uri, rdf_dir, dir_split, items_per_file) 

161 if not br_entity or IS_DOC_CONTEXT_FOR not in br_entity: 

162 return [] 

163 result = [] 

164 for ar_ref in br_entity[IS_DOC_CONTEXT_FOR]: 

165 ar_uri = ar_ref["@id"] 

166 ar_entity = find_entity_in_file(ar_uri, rdf_dir, dir_split, items_per_file) 

167 if not ar_entity or WITH_ROLE not in ar_entity: 

168 continue 

169 role_uri = ar_entity[WITH_ROLE][0]["@id"] 

170 ar_role = ROLE_MAP.get(role_uri, "unknown") 

171 if ar_role != role_type: 

172 continue 

173 ra_uri = None 

174 if IS_HELD_BY in ar_entity: 

175 ra_uri = ar_entity[IS_HELD_BY][0]["@id"] 

176 has_next = [] 

177 if HAS_NEXT in ar_entity: 

178 has_next = [item["@id"] for item in ar_entity[HAS_NEXT]] 

179 ra_info = {"family_name": None, "given_name": None, "name": None, "orcid": None} 

180 if ra_uri: 

181 ra_info = load_ra_info(ra_uri, rdf_dir, dir_split, items_per_file) 

182 ra_name = "" 

183 if ra_info["family_name"] or ra_info["given_name"]: 

184 parts = [ra_info["family_name"] or "", ra_info["given_name"] or ""] 

185 ra_name = f"{parts[0]}, {parts[1]}" 

186 elif ra_info["name"]: 

187 ra_name = ra_info["name"] 

188 result.append({ 

189 "ar": ar_uri, 

190 "ra": ra_uri, 

191 "ra_name": ra_name, 

192 "ra_family": ra_info["family_name"], 

193 "ra_given": ra_info["given_name"], 

194 "ra_orcid": ra_info["orcid"], 

195 "has_next": has_next, 

196 }) 

197 return result 

198 

199 

200def _strip_orcid_url(orcid: str) -> str: 

201 if not orcid: 

202 return orcid 

203 return ( 

204 orcid.replace("https://orcid.org/", "") 

205 .replace("http://orcid.org/", "") 

206 ) 

207 

208 

209def fetch_crossref(doi: str) -> Optional[dict]: 

210 resp = SESSION.get(CROSSREF_BASE + doi, timeout=30) 

211 if resp.status_code == 404: 

212 return None 

213 resp.raise_for_status() 

214 msg = resp.json()["message"] 

215 authors = [] 

216 for i, a in enumerate(msg.get("author", [])): 

217 authors.append({ 

218 "family": a.get("family", ""), 

219 "given": a.get("given", ""), 

220 "orcid": _strip_orcid_url(a.get("ORCID")), 

221 "position": i, 

222 }) 

223 editors = [] 

224 for i, e in enumerate(msg.get("editor", [])): 

225 editors.append({ 

226 "family": e.get("family", ""), 

227 "given": e.get("given", ""), 

228 "orcid": _strip_orcid_url(e.get("ORCID")), 

229 "position": i, 

230 }) 

231 return { 

232 "author": authors, 

233 "editor": editors, 

234 "publisher": msg.get("publisher", ""), 

235 "publisher_crossref_id": msg.get("member"), 

236 "source": "crossref", 

237 } 

238 

239 

240def fetch_datacite(doi: str) -> Optional[dict]: 

241 resp = SESSION.get(DATACITE_BASE + doi, timeout=30) 

242 if resp.status_code == 404: 

243 return None 

244 resp.raise_for_status() 

245 attrs = resp.json()["data"]["attributes"] 

246 authors = [] 

247 for i, c in enumerate(attrs.get("creators", [])): 

248 orcid = None 

249 for ni in c.get("nameIdentifiers", []): 

250 if ni.get("nameIdentifierScheme", "").upper() == "ORCID": 

251 orcid = _strip_orcid_url(ni["nameIdentifier"]) 

252 break 

253 authors.append({ 

254 "family": c.get("familyName", ""), 

255 "given": c.get("givenName", ""), 

256 "orcid": orcid, 

257 "position": i, 

258 }) 

259 editors = [] 

260 editor_idx = 0 

261 for c in attrs.get("contributors", []): 

262 if c.get("contributorType") != "Editor": 

263 continue 

264 orcid = None 

265 for ni in c.get("nameIdentifiers", []): 

266 if ni.get("nameIdentifierScheme", "").upper() == "ORCID": 

267 orcid = _strip_orcid_url(ni["nameIdentifier"]) 

268 break 

269 editors.append({ 

270 "family": c.get("familyName", ""), 

271 "given": c.get("givenName", ""), 

272 "orcid": orcid, 

273 "position": editor_idx, 

274 }) 

275 editor_idx += 1 

276 publisher = attrs.get("publisher", "") 

277 if isinstance(publisher, dict): 

278 publisher = publisher.get("name", "") 

279 return { 

280 "author": authors, 

281 "editor": editors, 

282 "publisher": publisher, 

283 "publisher_crossref_id": None, 

284 "source": "datacite", 

285 } 

286 

287 

288def fetch_pubmed(pmid: str) -> Optional[dict]: 

289 params = {"db": "pubmed", "id": pmid, "rettype": "xml", "retmode": "xml"} 

290 resp = SESSION.get(PUBMED_BASE, params=params, timeout=30) 

291 resp.raise_for_status() 

292 root = ET.fromstring(resp.content) 

293 article = root.find(".//Article") 

294 if article is None: 

295 return None 

296 authors = [] 

297 author_list = article.find("AuthorList") 

298 if author_list is not None: 

299 for i, author_elem in enumerate(author_list.findall("Author")): 

300 orcid = None 

301 for ident in author_elem.findall("Identifier"): 

302 if ident.get("Source") == "ORCID" and ident.text is not None: 

303 orcid = _strip_orcid_url(ident.text) 

304 break 

305 authors.append({ 

306 "family": author_elem.findtext("LastName", ""), 

307 "given": author_elem.findtext("ForeName", ""), 

308 "orcid": orcid, 

309 "position": i, 

310 }) 

311 return { 

312 "author": authors, 

313 "editor": [], 

314 "publisher": "", 

315 "publisher_crossref_id": None, 

316 "source": "pubmed", 

317 } 

318 

319 

320def fetch_api_data(identifiers: Dict[str, str]) -> Tuple[Optional[dict], str]: 

321 if "doi" in identifiers: 

322 doi = identifiers["doi"] 

323 try: 

324 result = fetch_crossref(doi) 

325 if result: 

326 return result, f"doi:{doi}" 

327 except requests.RequestException: 

328 pass 

329 try: 

330 result = fetch_datacite(doi) 

331 if result: 

332 return result, f"doi:{doi}" 

333 except requests.RequestException: 

334 pass 

335 if "pmid" in identifiers: 

336 pmid = identifiers["pmid"] 

337 try: 

338 time.sleep(0.34) 

339 result = fetch_pubmed(pmid) 

340 if result: 

341 return result, f"pmid:{pmid}" 

342 except requests.RequestException: 

343 pass 

344 return None, "" 

345 

346 

347def match_ars_to_api( 

348 ar_infos: List[dict], api_entries: List[dict] 

349) -> List[str]: 

350 if not api_entries: 

351 return [] 

352 orcid_to_pos = {} 

353 for entry in api_entries: 

354 if entry["orcid"]: 

355 orcid_to_pos[normalize_orcid(entry["orcid"])] = entry["position"] 

356 name_to_positions = defaultdict(list) 

357 for entry in api_entries: 

358 if entry["family"]: 

359 name_to_positions[normalize_name(entry["family"])].append(entry["position"]) 

360 ar_to_position: Dict[str, int] = {} 

361 used_positions: set = set() 

362 sorted_ars = sorted(ar_infos, key=lambda a: extract_omid_number(a["ar"])) 

363 for ar in sorted_ars: 

364 if ar["ra_orcid"]: 

365 norm = normalize_orcid(ar["ra_orcid"]) 

366 if norm in orcid_to_pos: 

367 pos = orcid_to_pos[norm] 

368 if pos not in used_positions: 

369 ar_to_position[ar["ar"]] = pos 

370 used_positions.add(pos) 

371 for ar in sorted_ars: 

372 if ar["ar"] in ar_to_position: 

373 continue 

374 if ar["ra_family"]: 

375 norm = normalize_name(ar["ra_family"]) 

376 if norm in name_to_positions: 

377 for pos in name_to_positions[norm]: 

378 if pos not in used_positions: 

379 ar_to_position[ar["ar"]] = pos 

380 used_positions.add(pos) 

381 break 

382 ordered = sorted(ar_to_position.items(), key=lambda x: x[1]) 

383 return [uri for uri, _ in ordered] 

384 

385 

386def match_publisher_ars( 

387 ar_infos: List[dict], api_publisher: str 

388) -> List[str]: 

389 if not api_publisher: 

390 return [] 

391 norm_api = normalize_name(api_publisher) 

392 for ar in sorted(ar_infos, key=lambda a: extract_omid_number(a["ar"])): 

393 if ar["ra_name"] and normalize_name(ar["ra_name"]) == norm_api: 

394 return [ar["ar"]] 

395 return [] 

396 

397 

398def format_person_for_csv(entry: dict) -> str: 

399 family = entry["family"] 

400 given = entry["given"] 

401 if family and given: 

402 name_str = f"{family}, {given}" 

403 elif family: 

404 name_str = family 

405 elif given: 

406 name_str = given 

407 else: 

408 name_str = "" 

409 if entry["orcid"]: 

410 name_str += f" [orcid:{entry['orcid']}]" 

411 return name_str 

412 

413 

414def build_csv_row(identifier: str, role_type: str, api_data: dict) -> dict: 

415 row = {"id": identifier} 

416 if role_type == "author": 

417 row["author"] = "; ".join( 

418 format_person_for_csv(e) for e in api_data["author"] 

419 ) 

420 elif role_type == "editor": 

421 row["editor"] = "; ".join( 

422 format_person_for_csv(e) for e in api_data["editor"] 

423 ) 

424 elif role_type == "publisher": 

425 publisher_name = api_data["publisher"] 

426 crossref_id = api_data["publisher_crossref_id"] 

427 if crossref_id: 

428 row["publisher"] = f"{publisher_name} [crossref:{crossref_id}]" 

429 else: 

430 row["publisher"] = publisher_name 

431 return row 

432 

433 

434def generate_csv(corrections: List[dict], output_path: str) -> None: 

435 ready = [c for c in corrections if c["status"] == "ready"] 

436 grouped: Dict[Tuple[str, str], dict] = {} 

437 for c in ready: 

438 key = (c["br"], c["identifier"]) 

439 if key not in grouped: 

440 grouped[key] = {col: "" for col in CSV_COLUMNS} 

441 grouped[key]["id"] = c["identifier"] 

442 for col in CSV_COLUMNS: 

443 value = c["csv_row"].get(col, "") 

444 if value: 

445 grouped[key][col] = value 

446 

447 output_dir = os.path.dirname(os.path.abspath(output_path)) 

448 if output_dir: 

449 os.makedirs(output_dir, exist_ok=True) 

450 with open(output_path, "w", encoding="utf-8", newline="") as f: 

451 writer = csv.DictWriter(f, fieldnames=CSV_COLUMNS) 

452 writer.writeheader() 

453 for row in grouped.values(): 

454 writer.writerow(row) 

455 

456 print(f"CSV for Meta saved to {output_path} ({len(grouped)} rows)") 

457 

458 

459def _ar_summary(ar: dict) -> dict: 

460 return { 

461 "ar": ar["ar"], 

462 "ra": ar["ra"], 

463 "ra_name": ar["ra_name"], 

464 "has_next": ar["has_next"], 

465 } 

466 

467 

468def dry_run( 

469 config_path: str, anomaly_path: str, output_path: str, csv_output: Optional[str] 

470) -> None: 

471 with open(config_path, encoding="utf-8") as f: 

472 settings = yaml.safe_load(f) 

473 rdf_dir = os.path.join(settings["output_rdf_dir"], "rdf") 

474 dir_split = settings["dir_split_number"] 

475 items_per_file = settings["items_per_file"] 

476 

477 with open(anomaly_path, encoding="utf-8") as f: 

478 report = json.load(f) 

479 

480 groups: Dict[Tuple[str, str], List[dict]] = defaultdict(list) 

481 for anomaly in report["anomalies"]: 

482 key = (anomaly["br"], anomaly["role_type"]) 

483 groups[key].append(anomaly) 

484 

485 corrections = [] 

486 summary = { 

487 "total_brs": len(groups), 

488 "api_resolved": 0, 

489 "no_identifiers": 0, 

490 "api_error": 0, 

491 "manual_review_needed": 0, 

492 } 

493 

494 with Progress( 

495 SpinnerColumn(), 

496 TextColumn("[progress.description]{task.description}"), 

497 BarColumn(), 

498 MofNCompleteColumn(), 

499 TimeElapsedColumn(), 

500 TimeRemainingColumn(), 

501 ) as progress: 

502 task = progress.add_task("Analyzing anomalies", total=len(groups)) 

503 for (br_uri, role_type), anomalies in groups.items(): 

504 anomaly_types = list({a["anomaly_type"] for a in anomalies}) 

505 ar_infos = load_all_ars_for_br_role( 

506 br_uri, role_type, rdf_dir, dir_split, items_per_file 

507 ) 

508 ar_summaries = [_ar_summary(ar) for ar in ar_infos] 

509 all_ar_uris = [ar["ar"] for ar in ar_infos] 

510 

511 if role_type == "unknown": 

512 summary["manual_review_needed"] += 1 

513 corrections.append({ 

514 "br": br_uri, 

515 "role_type": role_type, 

516 "anomalies": anomaly_types, 

517 "source": None, 

518 "identifier": None, 

519 "current_ars": ar_summaries, 

520 "csv_row": {}, 

521 "delete_ars": [], 

522 "operations": [], 

523 "status": "manual_review", 

524 }) 

525 progress.update(task, advance=1) 

526 continue 

527 

528 identifiers = load_br_identifiers( 

529 br_uri, rdf_dir, dir_split, items_per_file 

530 ) 

531 

532 if not identifiers: 

533 summary["no_identifiers"] += 1 

534 corrections.append({ 

535 "br": br_uri, 

536 "role_type": role_type, 

537 "anomalies": anomaly_types, 

538 "source": None, 

539 "identifier": None, 

540 "current_ars": ar_summaries, 

541 "csv_row": {}, 

542 "delete_ars": [], 

543 "operations": [], 

544 "status": "no_identifiers", 

545 }) 

546 progress.update(task, advance=1) 

547 continue 

548 

549 api_data, identifier_str = fetch_api_data(identifiers) 

550 

551 if not api_data: 

552 summary["api_error"] += 1 

553 id_str = next( 

554 (f"{k}:{v}" for k, v in identifiers.items()), None 

555 ) 

556 corrections.append({ 

557 "br": br_uri, 

558 "role_type": role_type, 

559 "anomalies": anomaly_types, 

560 "source": None, 

561 "identifier": id_str, 

562 "current_ars": ar_summaries, 

563 "csv_row": {}, 

564 "delete_ars": [], 

565 "operations": [], 

566 "status": "api_error", 

567 }) 

568 progress.update(task, advance=1) 

569 continue 

570 

571 extra_fields = {} 

572 if role_type in ("author", "editor"): 

573 api_entries = api_data[role_type] 

574 extra_fields[f"api_{role_type}s"] = api_entries 

575 api_matched = match_ars_to_api(ar_infos, api_entries) 

576 extra_fields["api_matched_ars"] = api_matched 

577 has_api_data = len(api_entries) > 0 

578 else: 

579 api_publisher = api_data["publisher"] 

580 extra_fields["api_publisher"] = api_publisher 

581 api_matched = match_publisher_ars(ar_infos, api_publisher) 

582 extra_fields["api_matched_ars"] = api_matched 

583 has_api_data = bool(api_publisher) 

584 

585 if has_api_data: 

586 status = "ready" 

587 summary["api_resolved"] += 1 

588 else: 

589 status = "manual_review" 

590 summary["manual_review_needed"] += 1 

591 

592 operations = [] 

593 if status == "ready": 

594 for ar_uri in all_ar_uris: 

595 operations.append({"action": "remove_next", "ar": ar_uri}) 

596 for ar_uri in all_ar_uris: 

597 operations.append( 

598 {"action": "delete_ar", "ar": ar_uri, "br": br_uri} 

599 ) 

600 

601 csv_row = ( 

602 build_csv_row(identifier_str, role_type, api_data) 

603 if status == "ready" 

604 else {} 

605 ) 

606 

607 correction = { 

608 "br": br_uri, 

609 "role_type": role_type, 

610 "anomalies": anomaly_types, 

611 "source": api_data["source"], 

612 "identifier": identifier_str, 

613 **extra_fields, 

614 "current_ars": ar_summaries, 

615 "csv_row": csv_row, 

616 "delete_ars": all_ar_uris if status == "ready" else [], 

617 "operations": operations, 

618 "status": status, 

619 } 

620 corrections.append(correction) 

621 progress.update(task, advance=1) 

622 

623 plan = { 

624 "generated": datetime.now(timezone.utc).isoformat(), 

625 "config": os.path.abspath(config_path), 

626 "summary": summary, 

627 "corrections": corrections, 

628 } 

629 

630 output_dir = os.path.dirname(os.path.abspath(output_path)) 

631 if output_dir: 

632 os.makedirs(output_dir, exist_ok=True) 

633 with open(output_path, "w", encoding="utf-8") as f: 

634 json.dump(plan, f, indent=2, ensure_ascii=False) 

635 

636 print(f"Correction plan saved to {output_path}") 

637 print(f" Total groups: {summary['total_brs']}") 

638 print(f" API resolved: {summary['api_resolved']}") 

639 print(f" No identifiers: {summary['no_identifiers']}") 

640 print(f" API errors: {summary['api_error']}") 

641 print(f" Manual review: {summary['manual_review_needed']}") 

642 

643 if csv_output: 

644 generate_csv(corrections, csv_output) 

645 

646 

647def apply_correction(editor: MetaEditor, correction: dict) -> None: 

648 br_uri = correction["br"] 

649 supplier_prefix = get_supplier_prefix(br_uri) 

650 assert supplier_prefix is not None 

651 g_set = GraphSet( 

652 editor.base_iri, 

653 supplier_prefix=supplier_prefix, 

654 custom_counter_handler=editor.counter_handler, 

655 ) 

656 entities_to_import = [URIRef(br_uri)] 

657 for ar_info in correction["current_ars"]: 

658 entities_to_import.append(URIRef(ar_info["ar"])) 

659 editor.reader.import_entities_from_triplestore( 

660 g_set=g_set, 

661 ts_url=editor.endpoint, 

662 entities=entities_to_import, 

663 resp_agent=editor.resp_agent, 

664 enable_validation=False, 

665 batch_size=10, 

666 ) 

667 br_entity = g_set.get_entity(URIRef(br_uri)) 

668 assert br_entity is not None 

669 for ar_uri in correction["delete_ars"]: 

670 ar_entity = g_set.get_entity(URIRef(ar_uri)) 

671 assert ar_entity is not None 

672 ar_entity.remove_next() # type: ignore[attr-defined] 

673 br_entity.remove_contributor(ar_entity) # type: ignore[attr-defined] 

674 ar_entity.mark_as_to_be_deleted() 

675 editor.save(g_set, supplier_prefix) 

676 

677 

678def execute(config_path: str, plan_path: str, resp_agent: str) -> None: 

679 with open(plan_path, encoding="utf-8") as f: 

680 plan = json.load(f) 

681 

682 editor = MetaEditor(config_path, resp_agent) 

683 ready_corrections = [c for c in plan["corrections"] if c["status"] == "ready"] 

684 

685 print(f"Executing {len(ready_corrections)} corrections...") 

686 

687 succeeded = 0 

688 failed = 0 

689 with Progress( 

690 SpinnerColumn(), 

691 TextColumn("[progress.description]{task.description}"), 

692 BarColumn(), 

693 MofNCompleteColumn(), 

694 TimeElapsedColumn(), 

695 TimeRemainingColumn(), 

696 ) as progress: 

697 task = progress.add_task( 

698 "Applying corrections", total=len(ready_corrections) 

699 ) 

700 for correction in ready_corrections: 

701 try: 

702 apply_correction(editor, correction) 

703 succeeded += 1 

704 except Exception as e: 

705 print( 

706 f" Error fixing {correction['br']}" 

707 f" ({correction['role_type']}): {e}" 

708 ) 

709 failed += 1 

710 progress.update(task, advance=1) 

711 

712 print(f"Execution complete: {succeeded} succeeded, {failed} failed") 

713 

714 

715def main() -> None: 

716 parser = argparse.ArgumentParser( 

717 description="Fix hasNext chain anomalies in RDF data", 

718 formatter_class=RichHelpFormatter, 

719 ) 

720 parser.add_argument( 

721 "-c", "--config", required=True, help="Meta config YAML file path" 

722 ) 

723 parser.add_argument( 

724 "-a", "--anomalies", help="Anomaly report JSON file path (for dry run)" 

725 ) 

726 parser.add_argument( 

727 "-o", "--output", help="Output correction plan JSON path (for dry run)" 

728 ) 

729 parser.add_argument( 

730 "--csv-output", help="Output CSV path for Meta input (for dry run)" 

731 ) 

732 parser.add_argument( 

733 "--dry-run", 

734 action="store_true", 

735 help="Generate correction plan without applying", 

736 ) 

737 parser.add_argument( 

738 "--execute", metavar="PLAN", help="Execute corrections from a reviewed plan" 

739 ) 

740 parser.add_argument( 

741 "-r", "--resp-agent", help="Responsible agent URI (for execute mode)" 

742 ) 

743 args = parser.parse_args() 

744 

745 if args.dry_run: 

746 if not args.anomalies or not args.output: 

747 parser.error("--dry-run requires -a/--anomalies and -o/--output") 

748 dry_run(args.config, args.anomalies, args.output, args.csv_output) 

749 elif args.execute: 

750 if not args.resp_agent: 

751 parser.error("--execute requires -r/--resp-agent") 

752 execute(args.config, args.execute, args.resp_agent) 

753 else: 

754 parser.error("Specify either --dry-run or --execute") 

755 

756 

757if __name__ == "__main__": 

758 main()