Coverage for oc_meta / run / patches / has_next.py: 0%

456 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-21 09:24 +0000

1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5from __future__ import annotations 

6 

7import argparse 

8import csv 

9import os 

10import re 

11import time 

12import unicodedata 

13import xml.etree.ElementTree as ET 

14from collections import defaultdict 

15from datetime import datetime, timezone 

16from typing import Dict, List, Optional, Tuple 

17 

18import orjson 

19import requests 

20import yaml 

21from oc_ocdm.graph import GraphSet 

22from rdflib import URIRef 

23from rich_argparse import RichHelpFormatter 

24 

25from oc_meta.core.editor import MetaEditor 

26from oc_meta.lib.console import create_progress 

27from oc_meta.run.meta.generate_csv import find_file, load_json_from_file 

28 

29HAS_IDENTIFIER = "http://purl.org/spar/datacite/hasIdentifier" 

30USES_ID_SCHEME = "http://purl.org/spar/datacite/usesIdentifierScheme" 

31HAS_LITERAL_VALUE = ( 

32 "http://www.essepuntato.it/2010/06/literalreification/hasLiteralValue" 

33) 

34IS_DOC_CONTEXT_FOR = "http://purl.org/spar/pro/isDocumentContextFor" 

35WITH_ROLE = "http://purl.org/spar/pro/withRole" 

36IS_HELD_BY = "http://purl.org/spar/pro/isHeldBy" 

37HAS_NEXT = "https://w3id.org/oc/ontology/hasNext" 

38FAMILY_NAME = "http://xmlns.com/foaf/0.1/familyName" 

39GIVEN_NAME = "http://xmlns.com/foaf/0.1/givenName" 

40FOAF_NAME = "http://xmlns.com/foaf/0.1/name" 

41 

42ROLE_MAP = { 

43 "http://purl.org/spar/pro/author": "author", 

44 "http://purl.org/spar/pro/editor": "editor", 

45 "http://purl.org/spar/pro/publisher": "publisher", 

46} 

47 

48CSV_COLUMNS = [ 

49 "id", "title", "author", "pub_date", "venue", 

50 "volume", "issue", "page", "type", "publisher", "editor", 

51] 

52 

53CROSSREF_BASE = "https://api.crossref.org/works/" 

54DATACITE_BASE = "https://api.datacite.org/dois/" 

55PUBMED_BASE = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi" 

56 

57SESSION = requests.Session() 

58SESSION.headers.update( 

59 {"User-Agent": "oc_meta_fixer/1.0 (mailto:arcangelo.massari@unibo.it)"} 

60) 

61 

62 

63def get_supplier_prefix(uri: str) -> str | None: 

64 match = re.match(r"^(.+)/([a-z][a-z])/(0[1-9]+0)?([1-9][0-9]*)$", uri) 

65 if match is None: 

66 return None 

67 return match.group(3) 

68 

69 

70def extract_omid_number(uri: str) -> int: 

71 return int(uri.split("/")[-1]) 

72 

73 

74def normalize_name(name: str) -> str: 

75 if not name: 

76 return "" 

77 nfkd = unicodedata.normalize("NFKD", name) 

78 return "".join(c for c in nfkd if not unicodedata.combining(c)).lower().strip() 

79 

80 

81def normalize_orcid(orcid: str) -> str: 

82 if not orcid: 

83 return "" 

84 return ( 

85 orcid.replace("https://orcid.org/", "") 

86 .replace("http://orcid.org/", "") 

87 .strip() 

88 .upper() 

89 ) 

90 

91 

92def find_entity_in_file( 

93 uri: str, rdf_dir: str, dir_split: int, items_per_file: int 

94) -> Optional[dict]: 

95 filepath = find_file(rdf_dir, dir_split, items_per_file, uri) 

96 if not filepath: 

97 return None 

98 data = load_json_from_file(filepath) 

99 for graph in data: 

100 for entity in graph["@graph"]: 

101 if entity["@id"] == uri: 

102 return entity 

103 return None 

104 

105 

106def load_br_identifiers( 

107 br_uri: str, rdf_dir: str, dir_split: int, items_per_file: int 

108) -> Dict[str, str]: 

109 br_entity = find_entity_in_file(br_uri, rdf_dir, dir_split, items_per_file) 

110 if not br_entity or HAS_IDENTIFIER not in br_entity: 

111 return {} 

112 result = {} 

113 for id_ref in br_entity[HAS_IDENTIFIER]: 

114 id_entity = find_entity_in_file( 

115 id_ref["@id"], rdf_dir, dir_split, items_per_file 

116 ) 

117 if not id_entity: 

118 continue 

119 if USES_ID_SCHEME not in id_entity or HAS_LITERAL_VALUE not in id_entity: 

120 continue 

121 scheme = id_entity[USES_ID_SCHEME][0]["@id"].split("/datacite/")[1] 

122 value = id_entity[HAS_LITERAL_VALUE][0]["@value"] 

123 result[scheme] = value 

124 return result 

125 

126 

127def load_ra_info( 

128 ra_uri: str, rdf_dir: str, dir_split: int, items_per_file: int 

129) -> dict: 

130 ra_entity = find_entity_in_file(ra_uri, rdf_dir, dir_split, items_per_file) 

131 if not ra_entity: 

132 return {"family_name": None, "given_name": None, "name": None, "orcid": None} 

133 family = None 

134 given = None 

135 name = None 

136 orcid = None 

137 if FAMILY_NAME in ra_entity: 

138 family = ra_entity[FAMILY_NAME][0]["@value"] 

139 if GIVEN_NAME in ra_entity: 

140 given = ra_entity[GIVEN_NAME][0]["@value"] 

141 if FOAF_NAME in ra_entity: 

142 name = ra_entity[FOAF_NAME][0]["@value"] 

143 if HAS_IDENTIFIER in ra_entity: 

144 for id_ref in ra_entity[HAS_IDENTIFIER]: 

145 id_entity = find_entity_in_file( 

146 id_ref["@id"], rdf_dir, dir_split, items_per_file 

147 ) 

148 if not id_entity: 

149 continue 

150 if USES_ID_SCHEME not in id_entity or HAS_LITERAL_VALUE not in id_entity: 

151 continue 

152 scheme = id_entity[USES_ID_SCHEME][0]["@id"].split("/datacite/")[1] 

153 if scheme == "orcid": 

154 orcid = id_entity[HAS_LITERAL_VALUE][0]["@value"] 

155 break 

156 return {"family_name": family, "given_name": given, "name": name, "orcid": orcid} 

157 

158 

159def load_all_ars_for_br_role( 

160 br_uri: str, role_type: str, rdf_dir: str, dir_split: int, items_per_file: int 

161) -> List[dict]: 

162 br_entity = find_entity_in_file(br_uri, rdf_dir, dir_split, items_per_file) 

163 if not br_entity or IS_DOC_CONTEXT_FOR not in br_entity: 

164 return [] 

165 result = [] 

166 for ar_ref in br_entity[IS_DOC_CONTEXT_FOR]: 

167 ar_uri = ar_ref["@id"] 

168 ar_entity = find_entity_in_file(ar_uri, rdf_dir, dir_split, items_per_file) 

169 if not ar_entity or WITH_ROLE not in ar_entity: 

170 continue 

171 role_uri = ar_entity[WITH_ROLE][0]["@id"] 

172 ar_role = ROLE_MAP.get(role_uri, "unknown") 

173 if ar_role != role_type: 

174 continue 

175 ra_uri = None 

176 if IS_HELD_BY in ar_entity: 

177 ra_uri = ar_entity[IS_HELD_BY][0]["@id"] 

178 has_next = [] 

179 if HAS_NEXT in ar_entity: 

180 has_next = [item["@id"] for item in ar_entity[HAS_NEXT]] 

181 ra_info = {"family_name": None, "given_name": None, "name": None, "orcid": None} 

182 if ra_uri: 

183 ra_info = load_ra_info(ra_uri, rdf_dir, dir_split, items_per_file) 

184 ra_name = "" 

185 if ra_info["family_name"] or ra_info["given_name"]: 

186 parts = [ra_info["family_name"] or "", ra_info["given_name"] or ""] 

187 ra_name = f"{parts[0]}, {parts[1]}" 

188 elif ra_info["name"]: 

189 ra_name = ra_info["name"] 

190 result.append({ 

191 "ar": ar_uri, 

192 "ra": ra_uri, 

193 "ra_name": ra_name, 

194 "ra_family": ra_info["family_name"], 

195 "ra_given": ra_info["given_name"], 

196 "ra_orcid": ra_info["orcid"], 

197 "has_next": has_next, 

198 }) 

199 return result 

200 

201 

202def _strip_orcid_url(orcid: str) -> str: 

203 if not orcid: 

204 return orcid 

205 return ( 

206 orcid.replace("https://orcid.org/", "") 

207 .replace("http://orcid.org/", "") 

208 ) 

209 

210 

211def fetch_crossref(doi: str) -> Optional[dict]: 

212 resp = SESSION.get(CROSSREF_BASE + doi, timeout=30) 

213 if resp.status_code == 404: 

214 return None 

215 resp.raise_for_status() 

216 msg = resp.json()["message"] 

217 authors = [] 

218 for i, a in enumerate(msg.get("author", [])): 

219 authors.append({ 

220 "family": a.get("family", ""), 

221 "given": a.get("given", ""), 

222 "orcid": _strip_orcid_url(a.get("ORCID")), 

223 "position": i, 

224 }) 

225 editors = [] 

226 for i, e in enumerate(msg.get("editor", [])): 

227 editors.append({ 

228 "family": e.get("family", ""), 

229 "given": e.get("given", ""), 

230 "orcid": _strip_orcid_url(e.get("ORCID")), 

231 "position": i, 

232 }) 

233 return { 

234 "author": authors, 

235 "editor": editors, 

236 "publisher": msg.get("publisher", ""), 

237 "publisher_crossref_id": msg.get("member"), 

238 "source": "crossref", 

239 } 

240 

241 

242def fetch_datacite(doi: str) -> Optional[dict]: 

243 resp = SESSION.get(DATACITE_BASE + doi, timeout=30) 

244 if resp.status_code == 404: 

245 return None 

246 resp.raise_for_status() 

247 attrs = resp.json()["data"]["attributes"] 

248 authors = [] 

249 for i, c in enumerate(attrs.get("creators", [])): 

250 orcid = None 

251 for ni in c.get("nameIdentifiers", []): 

252 if ni.get("nameIdentifierScheme", "").upper() == "ORCID": 

253 orcid = _strip_orcid_url(ni["nameIdentifier"]) 

254 break 

255 authors.append({ 

256 "family": c.get("familyName", ""), 

257 "given": c.get("givenName", ""), 

258 "orcid": orcid, 

259 "position": i, 

260 }) 

261 editors = [] 

262 editor_idx = 0 

263 for c in attrs.get("contributors", []): 

264 if c.get("contributorType") != "Editor": 

265 continue 

266 orcid = None 

267 for ni in c.get("nameIdentifiers", []): 

268 if ni.get("nameIdentifierScheme", "").upper() == "ORCID": 

269 orcid = _strip_orcid_url(ni["nameIdentifier"]) 

270 break 

271 editors.append({ 

272 "family": c.get("familyName", ""), 

273 "given": c.get("givenName", ""), 

274 "orcid": orcid, 

275 "position": editor_idx, 

276 }) 

277 editor_idx += 1 

278 publisher = attrs.get("publisher", "") 

279 if isinstance(publisher, dict): 

280 publisher = publisher.get("name", "") 

281 return { 

282 "author": authors, 

283 "editor": editors, 

284 "publisher": publisher, 

285 "publisher_crossref_id": None, 

286 "source": "datacite", 

287 } 

288 

289 

290def fetch_pubmed(pmid: str) -> Optional[dict]: 

291 params = {"db": "pubmed", "id": pmid, "rettype": "xml", "retmode": "xml"} 

292 resp = SESSION.get(PUBMED_BASE, params=params, timeout=30) 

293 resp.raise_for_status() 

294 root = ET.fromstring(resp.content) 

295 article = root.find(".//Article") 

296 if article is None: 

297 return None 

298 authors = [] 

299 author_list = article.find("AuthorList") 

300 if author_list is not None: 

301 for i, author_elem in enumerate(author_list.findall("Author")): 

302 orcid = None 

303 for ident in author_elem.findall("Identifier"): 

304 if ident.get("Source") == "ORCID" and ident.text is not None: 

305 orcid = _strip_orcid_url(ident.text) 

306 break 

307 authors.append({ 

308 "family": author_elem.findtext("LastName", ""), 

309 "given": author_elem.findtext("ForeName", ""), 

310 "orcid": orcid, 

311 "position": i, 

312 }) 

313 return { 

314 "author": authors, 

315 "editor": [], 

316 "publisher": "", 

317 "publisher_crossref_id": None, 

318 "source": "pubmed", 

319 } 

320 

321 

322def fetch_api_data(identifiers: Dict[str, str]) -> Tuple[Optional[dict], str]: 

323 if "doi" in identifiers: 

324 doi = identifiers["doi"] 

325 try: 

326 result = fetch_crossref(doi) 

327 if result: 

328 return result, f"doi:{doi}" 

329 except requests.RequestException: 

330 pass 

331 try: 

332 result = fetch_datacite(doi) 

333 if result: 

334 return result, f"doi:{doi}" 

335 except requests.RequestException: 

336 pass 

337 if "pmid" in identifiers: 

338 pmid = identifiers["pmid"] 

339 try: 

340 time.sleep(0.34) 

341 result = fetch_pubmed(pmid) 

342 if result: 

343 return result, f"pmid:{pmid}" 

344 except requests.RequestException: 

345 pass 

346 return None, "" 

347 

348 

349def match_ars_to_api( 

350 ar_infos: List[dict], api_entries: List[dict] 

351) -> List[str]: 

352 if not api_entries: 

353 return [] 

354 orcid_to_pos = {} 

355 for entry in api_entries: 

356 if entry["orcid"]: 

357 orcid_to_pos[normalize_orcid(entry["orcid"])] = entry["position"] 

358 name_to_positions = defaultdict(list) 

359 for entry in api_entries: 

360 if entry["family"]: 

361 name_to_positions[normalize_name(entry["family"])].append(entry["position"]) 

362 ar_to_position: Dict[str, int] = {} 

363 used_positions: set = set() 

364 sorted_ars = sorted(ar_infos, key=lambda a: extract_omid_number(a["ar"])) 

365 for ar in sorted_ars: 

366 if ar["ra_orcid"]: 

367 norm = normalize_orcid(ar["ra_orcid"]) 

368 if norm in orcid_to_pos: 

369 pos = orcid_to_pos[norm] 

370 if pos not in used_positions: 

371 ar_to_position[ar["ar"]] = pos 

372 used_positions.add(pos) 

373 for ar in sorted_ars: 

374 if ar["ar"] in ar_to_position: 

375 continue 

376 if ar["ra_family"]: 

377 norm = normalize_name(ar["ra_family"]) 

378 if norm in name_to_positions: 

379 for pos in name_to_positions[norm]: 

380 if pos not in used_positions: 

381 ar_to_position[ar["ar"]] = pos 

382 used_positions.add(pos) 

383 break 

384 ordered = sorted(ar_to_position.items(), key=lambda x: x[1]) 

385 return [uri for uri, _ in ordered] 

386 

387 

388def match_publisher_ars( 

389 ar_infos: List[dict], api_publisher: str 

390) -> List[str]: 

391 if not api_publisher: 

392 return [] 

393 norm_api = normalize_name(api_publisher) 

394 for ar in sorted(ar_infos, key=lambda a: extract_omid_number(a["ar"])): 

395 if ar["ra_name"] and normalize_name(ar["ra_name"]) == norm_api: 

396 return [ar["ar"]] 

397 return [] 

398 

399 

400def format_person_for_csv(entry: dict) -> str: 

401 family = entry["family"] 

402 given = entry["given"] 

403 if family and given: 

404 name_str = f"{family}, {given}" 

405 elif family: 

406 name_str = family 

407 elif given: 

408 name_str = given 

409 else: 

410 name_str = "" 

411 if entry["orcid"]: 

412 name_str += f" [orcid:{entry['orcid']}]" 

413 return name_str 

414 

415 

416def build_csv_row(identifier: str, role_type: str, api_data: dict) -> dict: 

417 row = {"id": identifier} 

418 if role_type == "author": 

419 row["author"] = "; ".join( 

420 format_person_for_csv(e) for e in api_data["author"] 

421 ) 

422 elif role_type == "editor": 

423 row["editor"] = "; ".join( 

424 format_person_for_csv(e) for e in api_data["editor"] 

425 ) 

426 elif role_type == "publisher": 

427 publisher_name = api_data["publisher"] 

428 crossref_id = api_data["publisher_crossref_id"] 

429 if crossref_id: 

430 row["publisher"] = f"{publisher_name} [crossref:{crossref_id}]" 

431 else: 

432 row["publisher"] = publisher_name 

433 return row 

434 

435 

436def generate_csv(corrections: List[dict], output_path: str) -> None: 

437 ready = [c for c in corrections if c["status"] == "ready"] 

438 grouped: Dict[Tuple[str, str], dict] = {} 

439 for c in ready: 

440 key = (c["br"], c["identifier"]) 

441 if key not in grouped: 

442 grouped[key] = {col: "" for col in CSV_COLUMNS} 

443 grouped[key]["id"] = c["identifier"] 

444 for col in CSV_COLUMNS: 

445 value = c["csv_row"].get(col, "") 

446 if value: 

447 grouped[key][col] = value 

448 

449 output_dir = os.path.dirname(os.path.abspath(output_path)) 

450 if output_dir: 

451 os.makedirs(output_dir, exist_ok=True) 

452 with open(output_path, "w", encoding="utf-8", newline="") as f: 

453 writer = csv.DictWriter(f, fieldnames=CSV_COLUMNS) 

454 writer.writeheader() 

455 for row in grouped.values(): 

456 writer.writerow(row) 

457 

458 print(f"CSV for Meta saved to {output_path} ({len(grouped)} rows)") 

459 

460 

461def _ar_summary(ar: dict) -> dict: 

462 return { 

463 "ar": ar["ar"], 

464 "ra": ar["ra"], 

465 "ra_name": ar["ra_name"], 

466 "has_next": ar["has_next"], 

467 } 

468 

469 

470def dry_run( 

471 config_path: str, anomaly_path: str, output_path: str, csv_output: Optional[str] 

472) -> None: 

473 with open(config_path, encoding="utf-8") as f: 

474 settings = yaml.safe_load(f) 

475 rdf_dir = os.path.join(settings["output_rdf_dir"], "rdf") 

476 dir_split = settings["dir_split_number"] 

477 items_per_file = settings["items_per_file"] 

478 

479 with open(anomaly_path, "rb") as f: 

480 report = orjson.loads(f.read()) 

481 

482 groups: Dict[Tuple[str, str], List[dict]] = defaultdict(list) 

483 for anomaly in report["anomalies"]: 

484 key = (anomaly["br"], anomaly["role_type"]) 

485 groups[key].append(anomaly) 

486 

487 corrections = [] 

488 summary = { 

489 "total_brs": len(groups), 

490 "api_resolved": 0, 

491 "no_identifiers": 0, 

492 "api_error": 0, 

493 "manual_review_needed": 0, 

494 } 

495 

496 with create_progress() as progress: 

497 task = progress.add_task("Analyzing anomalies", total=len(groups)) 

498 for (br_uri, role_type), anomalies in groups.items(): 

499 anomaly_types = list({a["anomaly_type"] for a in anomalies}) 

500 ar_infos = load_all_ars_for_br_role( 

501 br_uri, role_type, rdf_dir, dir_split, items_per_file 

502 ) 

503 ar_summaries = [_ar_summary(ar) for ar in ar_infos] 

504 all_ar_uris = [ar["ar"] for ar in ar_infos] 

505 

506 if role_type == "unknown": 

507 summary["manual_review_needed"] += 1 

508 corrections.append({ 

509 "br": br_uri, 

510 "role_type": role_type, 

511 "anomalies": anomaly_types, 

512 "source": None, 

513 "identifier": None, 

514 "current_ars": ar_summaries, 

515 "csv_row": {}, 

516 "delete_ars": [], 

517 "operations": [], 

518 "status": "manual_review", 

519 }) 

520 progress.update(task, advance=1) 

521 continue 

522 

523 identifiers = load_br_identifiers( 

524 br_uri, rdf_dir, dir_split, items_per_file 

525 ) 

526 

527 if not identifiers: 

528 summary["no_identifiers"] += 1 

529 corrections.append({ 

530 "br": br_uri, 

531 "role_type": role_type, 

532 "anomalies": anomaly_types, 

533 "source": None, 

534 "identifier": None, 

535 "current_ars": ar_summaries, 

536 "csv_row": {}, 

537 "delete_ars": [], 

538 "operations": [], 

539 "status": "no_identifiers", 

540 }) 

541 progress.update(task, advance=1) 

542 continue 

543 

544 api_data, identifier_str = fetch_api_data(identifiers) 

545 

546 if not api_data: 

547 summary["api_error"] += 1 

548 id_str = next( 

549 (f"{k}:{v}" for k, v in identifiers.items()), None 

550 ) 

551 corrections.append({ 

552 "br": br_uri, 

553 "role_type": role_type, 

554 "anomalies": anomaly_types, 

555 "source": None, 

556 "identifier": id_str, 

557 "current_ars": ar_summaries, 

558 "csv_row": {}, 

559 "delete_ars": [], 

560 "operations": [], 

561 "status": "api_error", 

562 }) 

563 progress.update(task, advance=1) 

564 continue 

565 

566 extra_fields = {} 

567 if role_type in ("author", "editor"): 

568 api_entries = api_data[role_type] 

569 extra_fields[f"api_{role_type}s"] = api_entries 

570 api_matched = match_ars_to_api(ar_infos, api_entries) 

571 extra_fields["api_matched_ars"] = api_matched 

572 has_api_data = len(api_entries) > 0 

573 else: 

574 api_publisher = api_data["publisher"] 

575 extra_fields["api_publisher"] = api_publisher 

576 api_matched = match_publisher_ars(ar_infos, api_publisher) 

577 extra_fields["api_matched_ars"] = api_matched 

578 has_api_data = bool(api_publisher) 

579 

580 if has_api_data: 

581 status = "ready" 

582 summary["api_resolved"] += 1 

583 else: 

584 status = "manual_review" 

585 summary["manual_review_needed"] += 1 

586 

587 operations = [] 

588 if status == "ready": 

589 for ar_uri in all_ar_uris: 

590 operations.append({"action": "remove_next", "ar": ar_uri}) 

591 for ar_uri in all_ar_uris: 

592 operations.append( 

593 {"action": "delete_ar", "ar": ar_uri, "br": br_uri} 

594 ) 

595 

596 csv_row = ( 

597 build_csv_row(identifier_str, role_type, api_data) 

598 if status == "ready" 

599 else {} 

600 ) 

601 

602 correction = { 

603 "br": br_uri, 

604 "role_type": role_type, 

605 "anomalies": anomaly_types, 

606 "source": api_data["source"], 

607 "identifier": identifier_str, 

608 **extra_fields, 

609 "current_ars": ar_summaries, 

610 "csv_row": csv_row, 

611 "delete_ars": all_ar_uris if status == "ready" else [], 

612 "operations": operations, 

613 "status": status, 

614 } 

615 corrections.append(correction) 

616 progress.update(task, advance=1) 

617 

618 plan = { 

619 "generated": datetime.now(timezone.utc).isoformat(), 

620 "config": os.path.abspath(config_path), 

621 "summary": summary, 

622 "corrections": corrections, 

623 } 

624 

625 output_dir = os.path.dirname(os.path.abspath(output_path)) 

626 if output_dir: 

627 os.makedirs(output_dir, exist_ok=True) 

628 with open(output_path, "wb") as f: 

629 f.write(orjson.dumps(plan, option=orjson.OPT_INDENT_2)) 

630 

631 print(f"Correction plan saved to {output_path}") 

632 print(f" Total groups: {summary['total_brs']}") 

633 print(f" API resolved: {summary['api_resolved']}") 

634 print(f" No identifiers: {summary['no_identifiers']}") 

635 print(f" API errors: {summary['api_error']}") 

636 print(f" Manual review: {summary['manual_review_needed']}") 

637 

638 if csv_output: 

639 generate_csv(corrections, csv_output) 

640 

641 

642def apply_correction(editor: MetaEditor, correction: dict) -> None: 

643 br_uri = correction["br"] 

644 supplier_prefix = get_supplier_prefix(br_uri) 

645 assert supplier_prefix is not None 

646 g_set = GraphSet( 

647 editor.base_iri, 

648 supplier_prefix=supplier_prefix, 

649 custom_counter_handler=editor.counter_handler, 

650 ) 

651 entities_to_import = [URIRef(br_uri)] 

652 for ar_info in correction["current_ars"]: 

653 entities_to_import.append(URIRef(ar_info["ar"])) 

654 editor.reader.import_entities_from_triplestore( 

655 g_set=g_set, 

656 ts_url=editor.endpoint, 

657 entities=entities_to_import, 

658 resp_agent=editor.resp_agent, 

659 enable_validation=False, 

660 batch_size=10, 

661 ) 

662 br_entity = g_set.get_entity(br_uri) 

663 assert br_entity is not None 

664 for ar_uri in correction["delete_ars"]: 

665 ar_entity = g_set.get_entity(ar_uri) 

666 assert ar_entity is not None 

667 ar_entity.remove_next() # type: ignore[attr-defined] 

668 br_entity.remove_contributor(ar_entity) # type: ignore[attr-defined] 

669 ar_entity.mark_as_to_be_deleted() 

670 editor.save(g_set, supplier_prefix) 

671 

672 

673def execute(config_path: str, plan_path: str, resp_agent: str) -> None: 

674 with open(plan_path, "rb") as f: 

675 plan = orjson.loads(f.read()) 

676 

677 editor = MetaEditor(config_path, resp_agent) 

678 ready_corrections = [c for c in plan["corrections"] if c["status"] == "ready"] 

679 

680 print(f"Executing {len(ready_corrections)} corrections...") 

681 

682 succeeded = 0 

683 failed = 0 

684 with create_progress() as progress: 

685 task = progress.add_task( 

686 "Applying corrections", total=len(ready_corrections) 

687 ) 

688 for correction in ready_corrections: 

689 try: 

690 apply_correction(editor, correction) 

691 succeeded += 1 

692 except Exception as e: 

693 print( 

694 f" Error fixing {correction['br']}" 

695 f" ({correction['role_type']}): {e}" 

696 ) 

697 failed += 1 

698 progress.update(task, advance=1) 

699 

700 print(f"Execution complete: {succeeded} succeeded, {failed} failed") 

701 

702 

703def main() -> None: 

704 parser = argparse.ArgumentParser( 

705 description="Fix hasNext chain anomalies in RDF data", 

706 formatter_class=RichHelpFormatter, 

707 ) 

708 parser.add_argument( 

709 "-c", "--config", required=True, help="Meta config YAML file path" 

710 ) 

711 parser.add_argument( 

712 "-a", "--anomalies", help="Anomaly report JSON file path (for dry run)" 

713 ) 

714 parser.add_argument( 

715 "-o", "--output", help="Output correction plan JSON path (for dry run)" 

716 ) 

717 parser.add_argument( 

718 "--csv-output", help="Output CSV path for Meta input (for dry run)" 

719 ) 

720 parser.add_argument( 

721 "--dry-run", 

722 action="store_true", 

723 help="Generate correction plan without applying", 

724 ) 

725 parser.add_argument( 

726 "--execute", metavar="PLAN", help="Execute corrections from a reviewed plan" 

727 ) 

728 parser.add_argument( 

729 "-r", "--resp-agent", help="Responsible agent URI (for execute mode)" 

730 ) 

731 args = parser.parse_args() 

732 

733 if args.dry_run: 

734 if not args.anomalies or not args.output: 

735 parser.error("--dry-run requires -a/--anomalies and -o/--output") 

736 dry_run(args.config, args.anomalies, args.output, args.csv_output) 

737 elif args.execute: 

738 if not args.resp_agent: 

739 parser.error("--execute requires -r/--resp-agent") 

740 execute(args.config, args.execute, args.resp_agent) 

741 else: 

742 parser.error("Specify either --dry-run or --execute") 

743 

744 

745if __name__ == "__main__": 

746 main()