Coverage for meta_prov_fixer / src.py: 85%

518 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-16 15:12 +0000

1from rdflib import Graph, Dataset, URIRef, Literal 

2from rdflib.namespace import XSD, PROV, DCTERMS, RDF 

3from typing import List, Union, Dict, Tuple, Generator, Iterable, Optional 

4from meta_prov_fixer.utils import remove_seq_num, get_seq_num, normalise_datetime, get_previous_meta_dump_uri, get_described_res_omid, \ 

5read_rdf_dump, get_rdf_prov_filepaths, batched, get_graph_uri_from_se_uri 

6import logging 

7from string import Template 

8import re 

9import json 

10from tqdm import tqdm 

11from sparqlite import SPARQLClient, QueryError, EndpointError 

12from typing import TextIO 

13from datetime import datetime, date, timezone 

14import os 

15from pathlib import Path 

16import logging 

17import traceback 

18import time 

19from zipfile import ZipFile, ZIP_DEFLATED 

20from enum import IntEnum 

21 

22 

23class Step(IntEnum): 

24 START = 0 

25 FILLER = 1 

26 DATETIME = 2 

27 MISSING_PS = 3 

28 MULTI_PA = 4 

29 MULTI_OBJECT = 5 

30 WRITE_FILE = 6 

31 

32 

33class Checkpoint: 

34 def __init__(self, path: str): 

35 self.path = path 

36 self.state = None 

37 self.dirty = False 

38 self.load() 

39 

40 def load(self): 

41 if os.path.exists(self.path): 

42 with open(self.path, "r", encoding="utf-8") as f: 

43 self.state = json.load(f) 

44 else: 

45 self.state = None 

46 

47 def _atomic_write(self, data: dict, retries=5, delay=0.05): 

48 tmp_path = self.path + ".tmp" 

49 with open(tmp_path, "w", encoding="utf-8") as f: 

50 json.dump(data, f, indent=2) 

51 for i in range(retries): 

52 try: 

53 os.replace(tmp_path, self.path) 

54 return 

55 

56 # catch PermissionError: [WinError 5] Accesso negato: 'fix_prov.checkpoint.json.tmp' -> 'fix_prov.checkpoint.json' 

57 except PermissionError: 

58 time.sleep(delay) 

59 

60 raise PermissionError(f"Failed to replace checkpoint file after {retries} retries") 

61 

62 def update_state( 

63 self, 

64 file_index: int, 

65 file_path: str, 

66 step: Step, 

67 endpoint_done: bool, 

68 local_done: bool 

69 ): 

70 self.state = { 

71 "file_index": file_index, 

72 "file_path": file_path, 

73 "step": step.name, 

74 "endpoint_done": endpoint_done, 

75 "local_done": local_done, 

76 "timestamp": datetime.now(timezone.utc).isoformat() 

77 } 

78 self.dirty = True 

79 

80 def flush(self): 

81 if self.dirty and self.state: 

82 self._atomic_write(self.state) 

83 self.dirty = False 

84 

85 def should_skip_file(self, idx: int) -> bool: 

86 return self.state and idx < self.state["file_index"] 

87 

88 def step_completed(self, step: Step, file_index:int) -> bool: 

89 if not self.state: 

90 return False 

91 return bool((Step[self.state["step"]] >= step) and self.state["file_index"] >= file_index) 

92 

93 

94# --- Caching mechanism for filler issues detection --- # 

95 

96def _atomic_json_write(path: str, data: dict): 

97 tmp = path + ".tmp" 

98 with open(tmp, "w", encoding="utf-8") as f: 

99 json.dump(data, f) 

100 os.replace(tmp, path) 

101 

102 

103def load_or_prepare_filler_issues(data_dir: str, cache_fp: str="filler_issues.cache.json"): 

104 

105 if os.path.exists(cache_fp): 

106 with open(cache_fp, "r", encoding="utf-8") as f: 

107 cached = json.load(f) 

108 

109 if cached.get("data_dir") == os.path.abspath(data_dir): 

110 filler_issues = [ 

111 (URIRef(g), { 

112 "to_delete": [URIRef(u) for u in v["to_delete"]], 

113 "remaining_snapshots": [URIRef(u) for u in v["remaining_snapshots"]], 

114 }) 

115 for g, v in cached["filler_issues"] 

116 ] 

117 return filler_issues, cached["tot_files"], cache_fp 

118 

119 # data_dir mismatch -> invalidate cache 

120 os.remove(cache_fp) 

121 

122 # Cache miss -> compute 

123 filler_issues, tot_files = prepare_filler_issues(data_dir) 

124 

125 serializable = [ 

126 ( 

127 str(g), 

128 { 

129 "to_delete": [str(u) for u in d["to_delete"]], 

130 "remaining_snapshots": [str(u) for u in d["remaining_snapshots"]], 

131 } 

132 ) 

133 for g, d in filler_issues 

134 ] 

135 

136 payload = { 

137 "data_dir": os.path.abspath(data_dir), 

138 "created_at": datetime.now(timezone.utc).isoformat(), 

139 "tot_files": tot_files, 

140 "filler_issues": serializable, 

141 } 

142 

143 _atomic_json_write(cache_fp, payload) 

144 

145 return filler_issues, tot_files, cache_fp 

146 

147 

148 

149class FillerFixerFile: 

150 

151 def __init__(self, endpoint): 

152 pass 

153 self.endpoint = endpoint 

154 

155 def detect(graph:Graph) -> Optional[Tuple[URIRef, Dict[str, List[URIRef]]]]: 

156 """ 

157 Detects the issues in the input graph. If no filler snapshots are found, return None.  

158 Else, a 2-elements tuple is returned, where the first element is the URIRef object of the  

159 graph's identifier, and the second element is a dictionary with "to_delete" and  

160 "remaining_snapshots" keys, both having as their value a list of URIRef objects, respectively  

161 representing the fillers snapshots that must be deleted and the snapshots that should be kept  

162 (but must be renamed). 

163 

164 :param graph: the named graph for the provenance of an entity 

165 """ 

166 

167 # out = (URIRef(graph.identifier), {"to_delete":[], "remaining_snapshots":[]}) 

168 

169 snapshots = list(graph.subjects(unique=True)) 

170 if len(snapshots) == 1: 

171 return None 

172 

173 creation_se = URIRef(str(graph.identifier) + 'se/1') 

174 fillers = set() 

175 remaining = set() 

176 

177 for s in snapshots: 

178 if s == creation_se: 

179 remaining.add(s) 

180 continue 

181 if (s, URIRef('https://w3id.org/oc/ontology/hasUpdateQuery'), None) not in graph: 

182 if (s, DCTERMS.description, None) not in graph: 

183 fillers.add(s) 

184 else: 

185 for desc_val in graph.objects(s, DCTERMS.description, unique=True): 

186 if "merged" not in str(desc_val).lower(): 

187 fillers.add(s) 

188 if s not in fillers: 

189 remaining.add(s) 

190 

191 if not fillers: 

192 return None 

193 

194 out = ( 

195 URIRef(graph.identifier), 

196 { 

197 "to_delete":list(fillers), 

198 "remaining_snapshots":list(remaining) 

199 } 

200 ) 

201 

202 return out 

203 

204 

205 @staticmethod 

206 def map_se_names(to_delete:set, remaining: set) -> dict: 

207 """ 

208 Associates a new URI value to each snapshot URI in the union of ``to_delete`` and ``remaining`` (containing snapshot URIs). 

209 

210 Values in the mapping dictionary are not unique, i.e., multiple old URIs can be mapped to the same new URI. 

211 If ``to_delete`` is empty, the returned dictionary will have identical keys and values, i.e., the URIs will not change. 

212 Each URI in ``to_delete`` will be mapped to the new name of the URI in ``remaining`` that immediately precedes it in 

213 a sequence ordered by sequence number. 

214 

215 **Examples:** 

216 

217 .. code-block:: python 

218 

219 to_delete = {'https://w3id.org/oc/meta/br/06101234191/prov/se/3'} 

220 remaining = {'https://w3id.org/oc/meta/br/06101234191/prov/se/1', 'https://w3id.org/oc/meta/br/06101234191/prov/se/2', 'https://w3id.org/oc/meta/br/06101234191/prov/se/4'} 

221 

222 # The returned mapping will be: 

223 { 

224 'https://w3id.org/oc/meta/br/06101234191/prov/se/1': 'https://w3id.org/oc/meta/br/06101234191/prov/se/1', 

225 'https://w3id.org/oc/meta/br/06101234191/prov/se/2': 'https://w3id.org/oc/meta/br/06101234191/prov/se/2', 

226 'https://w3id.org/oc/meta/br/06101234191/prov/se/3': 'https://w3id.org/oc/meta/br/06101234191/prov/se/2', 

227 'https://w3id.org/oc/meta/br/06101234191/prov/se/4': 'https://w3id.org/oc/meta/br/06101234191/prov/se/3' 

228 } 

229 

230 :param to_delete: A set of snapshot URIs that should be deleted. 

231 :type to_delete: set 

232 :param remaining: A set of URIs of snapshots that should remain in the graph (AFTER BEING RENAMED). 

233 :type remaining: set 

234 :returns: A dictionary mapping old snapshot URIs to their new URIs. 

235 :rtype: dict 

236 """ 

237 to_delete :set = {str(el) for el in to_delete} 

238 remaining :set = {str(el) for el in remaining} 

239 all_snapshots:list = sorted(to_delete|remaining, key=lambda x: get_seq_num(x)) # sorting is required! 

240 

241 mapping = {} 

242 sorted_remaining = [] 

243 base_uri = remove_seq_num(all_snapshots[0]) 

244 

245 if not all(u.startswith(base_uri) for u in all_snapshots): 

246 logging.error(f"All snapshots must start with the same base URI: {base_uri}. Found: {all_snapshots}") 

247 raise ValueError(f"Can rename only snapshots that are included in the same named graph.") 

248 

249 for old_uri in all_snapshots: 

250 if old_uri in remaining: 

251 new_uri = f"{base_uri}{len(sorted_remaining)+1}" 

252 mapping[old_uri] = new_uri 

253 sorted_remaining.append(new_uri) 

254 

255 else: # i.e., elif old_uri in to_delete 

256 try: 

257 new_uri = f"{base_uri}{get_seq_num(sorted_remaining[-1])}" 

258 except IndexError: 

259 # all snapshots are fillers (must be deleted), including the first one (creation) 

260 logging.error(f"The first snapshot {old_uri} is a filler. Cannot rename the remaining snapshots.") 

261 

262 mapping[old_uri] = new_uri 

263 

264 return mapping 

265 

266 @staticmethod 

267 def make_global_rename_map(graphs_with_fillers:Iterable): 

268 """ 

269 Create a dictionary of the form <old_subject_uri>:<new_snapshot_name>. 

270  

271 :param graphs_with_fillers: an Iterable consisting of all the graphs containing filler snapshots, 

272 the snapshots to delete in that graph, and the other snapshot in that same graph (part of  

273 which must be renamed). 

274 :type graphs_with_fillers: Iterable 

275 """ 

276 out = dict() 

277 

278 for g, _dict in graphs_with_fillers: 

279 mapping = FillerFixerFile.map_se_names(_dict['to_delete'], _dict['remaining_snapshots']) 

280 for k, v in mapping.items(): 

281 if k != v: 

282 out[k] = v 

283 return out 

284 

285 

286 def fix_local_graph(ds: Dataset, graph:Graph, global_rename_map:dict, fillers_issues_lookup:dict) -> None: 

287 

288 # delete all triples where subject is a filler (in local graph) 

289 for snapshot_node, _, _, _ in ds.quads((None, None, None, graph.identifier)): 

290 if str(snapshot_node) in global_rename_map: 

291 if snapshot_node in fillers_issues_lookup[graph.identifier]['to_delete']: 

292 ds.remove((URIRef(snapshot_node), None, None, graph.identifier)) 

293 

294 # replace objects that used to be fillers snapshots (in this graph or in other graphs, using global mapping) 

295 for subj, pred, obj, _ in ds.quads((None, None, None, graph.identifier)): 

296 new_subj_name = subj 

297 new_obj_name = obj 

298 if str(subj) in global_rename_map: 

299 new_subj_name = URIRef(global_rename_map.get(str(subj), subj)) 

300 if str(obj) in global_rename_map: 

301 new_obj_name = URIRef(global_rename_map.get(str(obj), obj)) 

302 if (new_subj_name!=subj) or (new_obj_name!=obj): 

303 ds.remove((subj, pred, obj, graph.identifier)) 

304 ds.add((new_subj_name, pred, new_obj_name, graph.identifier)) 

305 

306 # adapt invalidatedAtTime relationships (in local graph only) 

307 unsorted_curr_snpsts_strngs = [] 

308 for s in ds.graph(graph.identifier).subjects(unique=True): 

309 unsorted_curr_snpsts_strngs.append(str(s)) 

310 snapshots_strings:list = sorted(unsorted_curr_snpsts_strngs, key=lambda x: get_seq_num(str(x))) 

311 

312 for s, following_se in zip(snapshots_strings, snapshots_strings[1:]): 

313 try: 

314 new_invaldt = min( 

315 list(ds.graph(graph.identifier).objects(URIRef(following_se), PROV.generatedAtTime, unique=True)), 

316 key=lambda x: normalise_datetime(str(x)) 

317 ) 

318 except ValueError: 

319 # TODO: This should be a very rare case, but consider implementing a more robust handling 

320 logging.error(f"Cannot find prov:generatedAtTime for snapshot {following_se} in graph {graph.identifier}. Skipping invalidatedAtTime update for snapshot {s}.") 

321 continue 

322 

323 for old_invaldt in ds.graph(graph.identifier).objects(URIRef(s), PROV.invalidatedAtTime, unique=True): 

324 ds.remove((URIRef(s), PROV.invalidatedAtTime, old_invaldt, graph.identifier)) 

325 ds.add((URIRef(s), PROV.invalidatedAtTime, new_invaldt, graph.identifier)) 

326 

327 def build_delete_sparql_query(local_deletions:tuple)->str: 

328 """ 

329 Makes the SPARQL query text for deleting snapshots from the triplestore based on the provided deletions list. 

330 

331 :param deletions: A tuple or list where the first element is a graph URI,  

332 and the second is a dictionary with `'to_delete'` and `'remaining_snapshots'` sets. 

333 """ 

334 

335 deletion_template = Template(""" 

336 $dels 

337 """) 

338 

339 # step 1: delete filler snapshots in the role of subjects 

340 dels = [] 

341 g_uri, values = local_deletions 

342 for se_to_delete in values['to_delete']: 

343 single_del = f"DELETE WHERE {{ GRAPH <{str(g_uri)}> {{ <{str(se_to_delete)}> ?p ?o . }}}};\n" 

344 dels.append(single_del) 

345 dels_str = " ".join(dels) 

346 query_str = deletion_template.substitute(dels=dels_str) 

347 return query_str 

348 

349 def build_rename_sparql_query(local_mapping:dict) -> str: 

350 """ 

351 Makes the SPARQL query text to rename snapshots in the triplestore according to the provided mapping. 

352 

353 :param local_mapping: A dictionary where keys are old snapshot URIs and values are new snapshot URIs. 

354 :type local_mapping: dict 

355 """ 

356 

357 mapping = dict(sorted(local_mapping.items(), key=lambda i: get_seq_num(i[0]))) 

358 

359 per_snapshot_template = Template(""" 

360 DELETE { 

361 GRAPH ?g { 

362 <$old_uri> ?p ?o . 

363 ?s ?p2 <$old_uri> . 

364 } 

365 } 

366 INSERT { 

367 GRAPH ?g { 

368 <$new_uri> ?p ?o . 

369 ?s ?p2 <$new_uri> . 

370 } 

371 } 

372 WHERE { 

373 GRAPH ?g { 

374 { 

375 <$old_uri> ?p ?o . 

376 } 

377 UNION 

378 { 

379 ?s ?p2 <$old_uri> . 

380 } 

381 } 

382 } 

383 """) 

384 

385 

386 per_snapshot_portions = [] 

387 for old_uri, new_uri in mapping.items(): 

388 if old_uri == new_uri: 

389 continue 

390 query_portion = per_snapshot_template.substitute(old_uri=old_uri, new_uri=new_uri) 

391 per_snapshot_portions.append(query_portion) 

392 

393 final_query_str = ";\n".join(per_snapshot_portions) 

394 

395 return final_query_str 

396 

397 

398 def build_adapt_invaltime_sparql_query(graph_uri: str, local_snapshots: list) -> str: 

399 """ 

400 Update the ``prov:invalidatedAtTime`` property of each snapshot in the provided list to match  

401 the value of ``prov:generatedAtTime`` of the following snapshot. 

402 

403 :param graph_uri: The URI of the named graph containing the snapshots. 

404 :type graph_uri: str 

405 :param local_snapshots: A list of snapshot URIs. 

406 :type local_snapshots: list 

407 :returns: None 

408 """ 

409 

410 snapshots = sorted(local_snapshots, key=lambda x: get_seq_num(x)) # sorting is required! 

411 per_snaphot_template = Template(""" 

412 DELETE { 

413 GRAPH <$graph> { <$snapshot> prov:invalidatedAtTime ?old_time . } 

414 } 

415 INSERT { 

416 GRAPH <$graph> { <$snapshot> prov:invalidatedAtTime ?new_time . } 

417 } 

418 WHERE { 

419 GRAPH <$graph> { 

420 OPTIONAL { 

421 <$snapshot> prov:invalidatedAtTime ?old_time . 

422 } 

423 <$following_snapshot> prov:generatedAtTime ?new_time . 

424 } 

425 } 

426 """) 

427 

428 per_snapshot_portions = [] 

429 

430 for s, following_se in zip(snapshots, snapshots[1:]): 

431 query_portion = per_snaphot_template.substitute( 

432 graph=graph_uri, 

433 snapshot=s, 

434 following_snapshot=following_se 

435 ) 

436 

437 per_snapshot_portions.append(query_portion) 

438 

439 final_query_str = "PREFIX prov: <http://www.w3.org/ns/prov#>\n" + ";\n".join(per_snapshot_portions) 

440 

441 return final_query_str 

442 

443 

444 

445class DateTimeFixerFile: 

446 

447 def __init__(self): 

448 pass 

449 

450 def detect(graph:Graph) -> Optional[List[Tuple[URIRef]]]: 

451 

452 result = [] 

453 pattern_dt = r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:(?:\+00:00)|Z)$" 

454 

455 for s, p, o in graph.triples((None, None, None)): 

456 if p in (PROV.generatedAtTime, PROV.invalidatedAtTime): 

457 if not re.match(pattern_dt, str(o)): 

458 result.append((graph.identifier, s, p, o)) 

459 return result 

460 

461 def fix_local_graph(ds:Dataset, graph:Graph, to_fix:tuple) -> None: 

462 

463 for g_uri, subj, prop, obj in to_fix: 

464 correct_dt_res = Literal(normalise_datetime(str(obj)), datatype=XSD.dateTime) 

465 # ds.set((subj, prop, correct_dt_res)) 

466 ds.remove((subj, prop, obj, g_uri)) 

467 ds.add((subj, prop, correct_dt_res, g_uri)) 

468 

469 def build_update_query(to_fix:List[Tuple[URIRef]]): 

470 template = Template(''' 

471 PREFIX xsd: <http://www.w3.org/2001/XMLSchema#> 

472 

473 DELETE DATA { 

474 $to_delete 

475 } ; 

476 INSERT DATA { 

477 $to_insert 

478 } 

479 ''') 

480 

481 to_delete = [] 

482 to_insert = [] 

483 for g, s, p, dt in list(to_fix): 

484 g = str(g) 

485 s = str(s) 

486 p = str(p) 

487 dt = str(dt) 

488 

489 to_delete.append(f'GRAPH <{g}> {{ <{s}> <{p}> "{dt}"^^xsd:dateTime . }}\n') 

490 correct_dt = normalise_datetime(dt) 

491 to_insert.append(f'GRAPH <{g}> {{ <{s}> <{p}> "{correct_dt}"^^xsd:dateTime . }}\n') 

492 

493 to_delete_str = "\n ".join(to_delete) 

494 to_insert_str = "\n ".join(to_insert) 

495 query = template.substitute(to_delete=to_delete_str, to_insert=to_insert_str) 

496 return query 

497 

498 

499class MissingPrimSourceFixerFile: 

500 

501 def __init__(self, meta_dumps_pub_dates): 

502 self.meta_dumps = meta_dumps_pub_dates 

503 

504 def detect(graph:Graph) -> Optional[Tuple[URIRef, Literal]]: 

505 

506 creation_se_uri = URIRef(graph.identifier + 'se/1') 

507 if ((creation_se_uri, PROV.generatedAtTime, None) in graph) and not ((creation_se_uri, PROV.hadPrimarySource, None) in graph): 

508 try: 

509 genTime = min(graph.objects(creation_se_uri, PROV.generatedAtTime)) 

510 except ValueError: 

511 logging.warning(f"Could not find generatedAtTime value for creation snapshot {creation_se_uri}. Skipping MissingPrimSourceFixerFile detection.") 

512 return None 

513 return (creation_se_uri, genTime) 

514 

515 def fix_local_graph(ds:Dataset, graph:Graph, to_fix:tuple, meta_dumps) -> None: 

516 

517 primSource_str = get_previous_meta_dump_uri(meta_dumps, str(to_fix[1])) 

518 ds.add((to_fix[0], PROV.hadPrimarySource, URIRef(primSource_str), graph.identifier)) 

519 

520 

521 def build_update_query(to_fix:List[Tuple[URIRef, Literal]], meta_dumps): 

522 

523 template = Template(""" 

524 PREFIX prov: <http://www.w3.org/ns/prov#> 

525 

526 INSERT DATA { 

527 $quads 

528 } 

529 """) 

530 

531 fixes = [] 

532 for snapshot_uri, gen_time in to_fix: 

533 snapshot_uri = str(snapshot_uri) 

534 gen_time = str(gen_time) 

535 prim_source_uri = get_previous_meta_dump_uri(meta_dumps, gen_time) 

536 graph_uri = get_graph_uri_from_se_uri(snapshot_uri) 

537 fixes.append(f"GRAPH <{graph_uri}> {{ <{snapshot_uri}> prov:hadPrimarySource <{prim_source_uri}> . }}\n") 

538 quads_str = " ".join(fixes) 

539 query = template.substitute(quads=quads_str) 

540 

541 return query 

542 

543 

544 

545class MultiPAFixerFile: 

546 

547 def __init__(self): 

548 pass 

549 

550 def detect(graph:Graph) -> Optional[List[Tuple[URIRef]]]: 

551 result = [] 

552 for s, _, o in graph.triples((None, PROV.wasAttributedTo, None)): 

553 processing_agents = list(graph.objects(s, PROV.wasAttributedTo, unique=True)) 

554 if len(processing_agents) > 1 and URIRef('https://w3id.org/oc/meta/prov/pa/1') in processing_agents: 

555 

556 result.append((graph.identifier, s)) 

557 return result 

558 

559 

560 def fix_local_graph(ds:Dataset, graph:Graph, to_fix:List[Tuple[URIRef]]) -> None: 

561 

562 for g_uri, subj in to_fix: 

563 # ds.set((subj, PROV.wasAttributedTo, URIRef('https://w3id.org/oc/meta/prov/pa/2'))) 

564 for obj in ds.objects(subj, PROV.wasAttributedTo, unique=True): 

565 ds.remove((subj, PROV.wasAttributedTo, obj, g_uri)) 

566 ds.add((subj, PROV.wasAttributedTo, URIRef('https://w3id.org/oc/meta/prov/pa/2'), g_uri)) 

567 

568 

569 def build_update_query(to_fix:List[Tuple[URIRef]]): 

570 

571 template = Template(""" 

572 PREFIX prov: <http://www.w3.org/ns/prov#> 

573 PREFIX xsd: <http://www.w3.org/2001/XMLSchema#> 

574 

575 DELETE DATA { 

576 $quads_to_delete 

577 } ; 

578 INSERT DATA { 

579 $quads_to_insert 

580 } 

581 """) 

582 

583 to_delete = [] 

584 to_insert = [] 

585 for g, s in to_fix: 

586 g = str(g) 

587 s = str(s) 

588 to_delete.append(f"GRAPH <{g}> {{ <{s}> prov:wasAttributedTo <https://orcid.org/0000-0002-8420-0696> . }}\n") # deletes Arcangelo's ORCID 

589 to_delete.append(f"GRAPH <{g}> {{ <{s}> prov:wasAttributedTo <https://w3id.org/oc/meta/prov/pa/1> . }}\n") # deletes Meta's default processing agent (for ingestions only) 

590 to_insert.append(f"GRAPH <{g}> {{ <{s}> prov:wasAttributedTo <https://w3id.org/oc/meta/prov/pa/2> . }}\n") # inserts Meta's processsing agent for modification processes 

591 

592 to_delete_str = " ".join(to_delete) 

593 to_insert_str = " ".join(to_insert) 

594 query = template.substitute(quads_to_delete=to_delete_str, quads_to_insert=to_insert_str) 

595 

596 return query 

597 

598 

599class MultiObjectFixerFile: 

600 

601 def __init__(self): 

602 pass 

603 

604 def detect(graph:Graph) -> Optional[Tuple[URIRef, Literal]]: 

605 

606 creation_se_uri = URIRef(graph.identifier + 'se/1') 

607 

608 for prop in {PROV.invalidatedAtTime, PROV.hadPrimarySource, URIRef('https://w3id.org/oc/ontology/hasUpdateQuery')}: 

609 for s in graph.subjects(): 

610 if len(list(graph.objects(s, prop, unique=True))) > 1: 

611 try: 

612 creation_gen_time = min(graph.objects(creation_se_uri, PROV.generatedAtTime, unique=True)) 

613 except ValueError: 

614 logging.error(f"Could not find generatedAtTime value for creation snapshot {creation_se_uri}. Skipping MultiObjectFixerFile detection.") 

615 return None 

616 return (graph.identifier, creation_gen_time) 

617 

618 

619 def fix_local_graph(ds:Dataset, graph:Graph, to_fix:tuple, meta_dumps) -> None: 

620 

621 creation_se_uri = URIRef(graph.identifier + 'se/1') 

622 genTime_str = to_fix[1] 

623 primSource_str = get_previous_meta_dump_uri(meta_dumps, genTime_str) 

624 referent = URIRef(get_described_res_omid(str(creation_se_uri))) 

625 desc = Literal(f"The entity '{str(referent)}' has been created.") 

626 triples_to_add = ( 

627 (creation_se_uri, PROV.hadPrimarySource, URIRef(primSource_str)), 

628 (creation_se_uri, PROV.wasAttributedTo, URIRef('https://w3id.org/oc/meta/prov/pa/1')), 

629 (creation_se_uri, PROV.specializationOf, referent), 

630 (creation_se_uri, DCTERMS.description, desc), 

631 (creation_se_uri, RDF.type, PROV.Entity), 

632 (creation_se_uri, PROV.generatedAtTime, Literal(genTime_str, datatype=XSD.dateTime)) 

633 ) 

634 

635 ds.remove((None, None, None, graph.identifier)) 

636 for t in triples_to_add: 

637 quad = t + (graph.identifier, ) 

638 ds.add(quad) 

639 

640 

641 def build_update_query(to_fix, meta_dumps, pa_uri="https://w3id.org/oc/meta/prov/pa/1"): 

642 

643 prefixes = """ 

644 PREFIX prov: <http://www.w3.org/ns/prov#> 

645 PREFIX dcterms: <http://purl.org/dc/terms/> 

646 PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> 

647 PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>\n\n 

648 """ 

649 

650 per_graph_template = Template(""" 

651 CLEAR GRAPH <$graph> ; 

652 INSERT DATA { 

653 GRAPH <$graph> { 

654 <$creation_snapshot> prov:hadPrimarySource <$primary_source> ; 

655 prov:wasAttributedTo <$processing_agent> ; 

656 prov:specializationOf <$specialization_of> ; 

657 dcterms:description "$description" ; 

658 rdf:type prov:Entity ; 

659 prov:generatedAtTime "$gen_time"^^xsd:dateTime . 

660 } 

661 } 

662 """) 

663 

664 query_parts = [] 

665 for g, gen_time in to_fix: 

666 g = str(g) 

667 gen_time = str(gen_time) 

668 creation_se = g + 'se/1' 

669 gen_time = gen_time.replace("^^xsd:dateTime", "") 

670 gen_time = gen_time.replace("^^http://www.w3.org/2001/XMLSchema#dateTime", "") 

671 prim_source = get_previous_meta_dump_uri(meta_dumps, gen_time) 

672 processing_agent = pa_uri 

673 referent = get_described_res_omid(g) 

674 desc = f"The entity '{referent}' has been created." 

675 

676 per_graph_part = per_graph_template.substitute( 

677 graph = g, 

678 creation_snapshot = creation_se, 

679 primary_source = prim_source, 

680 processing_agent = processing_agent, 

681 specialization_of = referent, 

682 description = desc, 

683 gen_time = gen_time 

684 ) 

685 query_parts.append(per_graph_part) 

686 

687 query = prefixes + " ; \n\n".join(query_parts) 

688 

689 return query 

690 

691 

692 

693def prepare_filler_issues(data_dir)->Tuple[List[tuple], int]: 

694 

695 result = [] 

696 tot_files = len(get_rdf_prov_filepaths(data_dir)) 

697 

698 for file_data in tqdm( 

699 read_rdf_dump(data_dir, whole_file=True), 

700 desc=f'Detecting graphs with fillers...', 

701 total=tot_files, 

702 dynamic_ncols=True 

703 ): 

704 stringified_data = json.dumps(file_data) 

705 d = Dataset(default_union=True) 

706 d.parse(data=stringified_data, format='json-ld') 

707 

708 for graph in d.graphs(): 

709 issues_in_graph = FillerFixerFile.detect(graph) 

710 if issues_in_graph: 

711 result.append(issues_in_graph) 

712 return result, tot_files 

713 

714 

715 

716def sparql_update( 

717 client: SPARQLClient, 

718 update_query: str, 

719 failed_log_fp: str, 

720) -> bool: 

721 """ 

722 Execute a SPARQL UPDATE via client.update(). 

723 

724 Uses the client's built-in retry settings. If the update still fails 

725 after all retries, writes the query to `failed_log`. 

726 

727 Returns: 

728 True if the update succeeded, False if it failed and was logged. 

729 """ 

730 try: 

731 client.update(update_query) 

732 return True # success 

733 

734 except QueryError as exc: 

735 # log syntax errors that weren't recoverable 

736 msg = str(exc)[:1000].replace('\n', '\\n ') 

737 logging.warning(f"SPARQL UPDATE failed after retries: {type(exc).__name__}: {msg}...") 

738 with open(failed_log_fp, 'a', encoding='utf-8') as lf: 

739 lf.write(update_query.replace("\n", "\\n") + "\n") 

740 lf.write(f"# Failure: {type(exc).__name__}: {exc}\n\n") 

741 lf.flush() 

742 return False 

743 

744 except EndpointError as e: 

745 # log endpoint errors that weren't recoverable 

746 msg = str(e)[:1000].replace('\n', '\\n ') 

747 logging.warning(f"SPARQL UPDATE failed after retries: {type(e).__name__}: {msg}...") 

748 with open(failed_log_fp, 'a', encoding='utf-8') as lf: 

749 lf.write(update_query.replace("\n", "\\n") + "\n") 

750 lf.write(f"# Failure: {type(e).__name__}: {e}\n\n") 

751 lf.flush() 

752 

753 logging.warning(f"EndpointError --> Possible DB warmup underway: sleeping 15 minutes before sending other udpates...") 

754 time.sleep(900) 

755 

756 return False 

757 

758 finally: 

759 time.sleep(0.1) # brief pause to avoid overwhelming the endpoint with rapid retries or subsequent updates 

760 

761 

762def fix_provenance_process( 

763 endpoint, 

764 data_dir, 

765 out_dir, 

766 meta_dumps_register, 

767 dry_run_db=False, 

768 dry_run_files=False, 

769 dry_run_callback=None, 

770 chunk_size=100, 

771 failed_queries_fp=f"prov_fix_failed_queries_{datetime.today().strftime('%Y-%m-%d')}.txt", 

772 overwrite_ok=False, 

773 resume=True, 

774 checkpoint_fp="fix_prov.checkpoint.json", 

775 cache_fp="filler_issues.cache.json", 

776 client_recreate_interval=100, 

777 zip_output=True, 

778 ): 

779 """ 

780 Fix OpenCitations Meta provenance issues found in RDF dump files and optionally apply fixes to a 

781 SPARQL endpoint and output files. 

782 

783 This function processes all provenance JSON-LD files in ``data_dir`` and detects common 

784 issues: fillers, invalid datetime formats, missing primary sources, multiple processing 

785 agents, and multiple object occurrences. It applies fixes locally to an ``rdflib.Dataset``  

786 corresponding to each file and, unless ``dry_run_db`` is ``True``, issues SPARQL UPDATE  

787 requests to ``endpoint``. If ``dry_run_files`` is ``False``, fixed datasets are dumped to  

788 a file in a subdirectory of ``out_dir`` with a path derived from the input file path  

789 relative to ``data_dir`` (else, no output files are written). 

790 

791 :param str endpoint: URL of the SPARQL endpoint used to apply updates. 

792 :param str data_dir: Path to the directory containing provenance JSON-LD dump files to 

793 process. 

794 :param str out_dir: Path where fixed JSON-LD files will be written. 

795 :param Iterable[Tuple[str, str]] meta_dumps_register: Iterable of ``(publication_date_iso, 

796 dump_url)`` pairs used to compute primary source URIs. 

797 :param bool dry_run_db: If ``True``, no SPARQL updates are sent to the endpoint. 

798 Defaults to ``False``. 

799 :param bool dry_run_files: If ``True``, no output files are written to ``out_dir``. 

800 Defaults to ``False``. 

801 :param callable dry_run_callback: Callback invoked when ``dry_run_db`` is ``True`` with 

802 signature ``(file_path, (ff_issues, dt_issues, mps_issues, pa_issues, mo_issues))``. 

803 :param int chunk_size: Number of items per SPARQL update batch. Defaults to ``100``. 

804 :param str failed_queries_fp: Path to a log file where failing SPARQL queries are appended. 

805 Defaults to ``prov_fix_failed_queries_YYYY-MM-DD.txt``. 

806 :param bool overwrite_ok: When ``False``, a :class:`FileExistsError` is raised if a target 

807 output file already exists. Defaults to ``False``. 

808 :param bool resume: When ``True``, use the checkpoint file to skip already-processed files 

809 and steps. Defaults to ``True``. 

810 :param str checkpoint_fp: Path of the checkpoint JSON file used to record progress for 

811 resuming. 

812 :param str cache_fp: Path to the filler issues cache used to avoid re-scanning ``data_dir``. 

813 :param int client_recreate_interval: Number of files to process before recreating the SPARQLClient 

814 to prevent pycurl's accumulated state from degrading performance. Defaults to ``100``. 

815 This is necessary because pycurl's Curl object accumulates internal state (DNS cache, 

816 connection pool, SSL/TLS session state) over hundreds of thousands of requests, 

817 causing progressive performance degradation. 

818 :param bool zip_output: If ``True``, output files are compressed using zip. Defaults to ``True``. 

819 

820 :returns: None 

821 :rtype: None 

822 

823 :raises FileExistsError: If ``overwrite_ok`` is False and an output file already exists. 

824 :raises RuntimeError: If the function would write inside ``data_dir`` (safeguard to avoid 

825 corrupting input). 

826 :raises Exception: Other exceptions may be raised for unexpected errors or endpoint 

827 failures. 

828 

829 Side effects 

830 - Writes fixed JSON-LD files into ``out_dir`` (unless ``dry_run``). 

831 - May send SPARQL UPDATE requests to ``endpoint`` (unless ``dry_run``). 

832 - Creates or updates ``checkpoint_fp`` and ``cache_fp`` files. 

833 - Logs summary information and error details. 

834 """ 

835 

836 start_time = time.time() 

837 

838 os.makedirs(out_dir, exist_ok=True) 

839 logging.info(f"[Provenance fixing process paradata]: {locals()}") # log parameters 

840 

841 checkpoint = Checkpoint(checkpoint_fp) 

842 client = SPARQLClient(endpoint) 

843 ff_c, dt_c, mps_c, pa_c, mo_c = 0, 0, 0, 0, 0 # counters for issues 

844 client_reset_counter = 0 # Track files processed with current client instance 

845 times_per_file = [] 

846 

847 try: 

848 logging.info("Provenance fixing process started.") 

849 logging.info(f"Checkpoint state: {str(checkpoint.state)}") 

850 logging.info("Detecting Filler issues (via cache or new scan)...") 

851 # check cache for filler issues or get them 

852 filler_issues, tot_files, filler_cache_fp = load_or_prepare_filler_issues(data_dir, cache_fp) 

853 rename_mapping = FillerFixerFile.make_global_rename_map(filler_issues) 

854 graphs_with_fillers = {t[0]: t[1] for t in filler_issues} 

855 

856 meta_dumps = sorted( 

857 [(date.fromisoformat(d), url) for d, url in meta_dumps_register], 

858 key=lambda x: x[0] 

859 ) 

860 

861 logging.info("Processing RDF dump files...") 

862 for file_index, (file_data, fp) in enumerate( 

863 tqdm( 

864 read_rdf_dump(data_dir, whole_file=True, include_fp=True), 

865 desc="Processing RDF dump files...", 

866 total=tot_files, 

867 dynamic_ncols=True 

868 ) 

869 ): 

870 

871 start_file = time.time() 

872 

873 if resume and checkpoint.should_skip_file(file_index): 

874 continue 

875 

876 stringified_data = json.dumps(file_data) 

877 d = Dataset(default_union=True) 

878 d.parse(data=stringified_data, format='json-ld') 

879 

880 ff_issues_in_file = [] 

881 dt_issues = [] 

882 mps_issues = [] 

883 pa_issues = [] 

884 mo_issues = [] 

885 

886 # ---------------- FILLER FIXER ---------------- 

887 if not (resume and checkpoint.step_completed(Step.FILLER, file_index)): 

888 

889 for graph in d.graphs(): 

890 if graph.identifier == d.default_graph.identifier: 

891 continue 

892 ff_to_fix_val = graphs_with_fillers.get(graph.identifier) 

893 if ff_to_fix_val: 

894 ff_issues_in_file.append((graph.identifier, ff_to_fix_val)) 

895 ff_c += 1 

896 FillerFixerFile.fix_local_graph(d, graph, rename_mapping, graphs_with_fillers) 

897 

898 for chunk in batched(ff_issues_in_file, chunk_size): 

899 for t in chunk: 

900 g_id = str(t[0]) 

901 to_delete = [str(i) for i in t[1]['to_delete']] 

902 to_rename = [str(i) for i in t[1]['remaining_snapshots']] 

903 local_mapping = FillerFixerFile.map_se_names(to_delete, to_rename) 

904 newest_names = list(set(local_mapping.values())) 

905 

906 if not dry_run_db: 

907 sparql_update(client, 

908 FillerFixerFile.build_delete_sparql_query(t), 

909 failed_queries_fp) 

910 sparql_update(client, 

911 FillerFixerFile.build_rename_sparql_query(local_mapping), 

912 failed_queries_fp) 

913 sparql_update(client, 

914 FillerFixerFile.build_adapt_invaltime_sparql_query(g_id, newest_names), 

915 failed_queries_fp) 

916 

917 checkpoint.update_state(file_index, fp, Step.FILLER, endpoint_done=True, local_done=False) 

918 

919 # ---------------- DATETIME FIXER ---------------- 

920 if not (resume and checkpoint.step_completed(Step.DATETIME, file_index)): 

921 

922 for graph in d.graphs(): 

923 if graph.identifier != d.default_graph.identifier: 

924 issues = DateTimeFixerFile.detect(graph) 

925 if issues: 

926 dt_issues.extend(issues) 

927 dt_c += len(issues) 

928 DateTimeFixerFile.fix_local_graph(d, graph, issues) 

929 

930 if not dry_run_db: 

931 for chunk in batched(dt_issues, chunk_size): 

932 sparql_update(client, 

933 DateTimeFixerFile.build_update_query(chunk), 

934 failed_queries_fp) 

935 

936 checkpoint.update_state(file_index, fp, Step.DATETIME, endpoint_done=True, local_done=False) 

937 

938 # ---------------- MISSING PRIMARY SOURCE ---------------- 

939 if not (resume and checkpoint.step_completed(Step.MISSING_PS, file_index)): 

940 

941 for graph in d.graphs(): 

942 if graph.identifier != d.default_graph.identifier: 

943 issue = MissingPrimSourceFixerFile.detect(graph) 

944 if issue: 

945 mps_issues.append(issue) 

946 mps_c += 1 

947 MissingPrimSourceFixerFile.fix_local_graph(d, graph, issue, meta_dumps) 

948 

949 if not dry_run_db: 

950 for chunk in batched(mps_issues, chunk_size): 

951 sparql_update(client, 

952 MissingPrimSourceFixerFile.build_update_query(chunk, meta_dumps), 

953 failed_queries_fp) 

954 

955 checkpoint.update_state(file_index, fp, Step.MISSING_PS, endpoint_done=True, local_done=False) 

956 

957 # ---------------- MULTI PA FIXER ---------------- 

958 if not (resume and checkpoint.step_completed(Step.MULTI_PA, file_index)): 

959 

960 for graph in d.graphs(): 

961 if graph.identifier != d.default_graph.identifier: 

962 issues = MultiPAFixerFile.detect(graph) 

963 if issues: 

964 pa_issues.extend(issues) 

965 pa_c += len(issues) 

966 MultiPAFixerFile.fix_local_graph(d, graph, issues) 

967 

968 if not dry_run_db: 

969 for chunk in batched(pa_issues, chunk_size): 

970 sparql_update(client, 

971 MultiPAFixerFile.build_update_query(chunk), 

972 failed_queries_fp) 

973 

974 checkpoint.update_state(file_index, fp, Step.MULTI_PA, endpoint_done=True, local_done=False) 

975 

976 # ---------------- MULTI OBJECT FIXER ---------------- 

977 if not (resume and checkpoint.step_completed(Step.MULTI_OBJECT, file_index)): 

978 

979 for graph in d.graphs(): 

980 if graph.identifier != d.default_graph.identifier: 

981 issue = MultiObjectFixerFile.detect(graph) 

982 if issue: 

983 mo_issues.append(issue) 

984 mo_c += 1 

985 MultiObjectFixerFile.fix_local_graph(d, graph, issue, meta_dumps) 

986 

987 if not dry_run_db: 

988 for chunk in batched(mo_issues, chunk_size): 

989 sparql_update(client, 

990 MultiObjectFixerFile.build_update_query(chunk, meta_dumps), 

991 failed_queries_fp) 

992 

993 checkpoint.update_state(file_index, fp, Step.MULTI_OBJECT, endpoint_done=True, local_done=False) 

994 

995 # ---------------- WRITE OUTPUT (FIXED) FILE ---------------- 

996 if not (resume and checkpoint.step_completed(Step.WRITE_FILE, file_index)): 

997 

998 if not dry_run_files: 

999 abs_data_dir = Path(data_dir).resolve() 

1000 abs_out_dir = Path(out_dir).resolve() 

1001 

1002 rel_path = Path(fp).resolve().relative_to(abs_data_dir) 

1003 fixed_fp = abs_out_dir / rel_path 

1004 fixed_fp = fixed_fp.with_suffix('.json') 

1005 

1006 fixed_fp.parent.mkdir(parents=True, exist_ok=True) 

1007 

1008 if os.path.isfile(fixed_fp) and not overwrite_ok: 

1009 raise FileExistsError(f"{fixed_fp} already exists") 

1010 

1011 if abs_data_dir in fixed_fp.parents: # safeguard for not corrupting input data 

1012 raise RuntimeError(f"Refusing to write inside data_dir! {fixed_fp}") 

1013 

1014 out_data = d.serialize(format='json-ld', indent=None, separators=(', ', ': ')) 

1015 

1016 if zip_output: 

1017 with ZipFile(fixed_fp.with_suffix('.zip'), 'w', compression=ZIP_DEFLATED, allowZip64=True) as zipf: 

1018 zipf.writestr(fixed_fp.name, out_data) 

1019 else: 

1020 with open(fixed_fp, 'w', encoding='utf-8') as out_file: 

1021 out_file.write(out_data) 

1022 

1023 checkpoint.update_state(file_index, fp, Step.WRITE_FILE, endpoint_done=True, local_done=True) 

1024 

1025 checkpoint.flush() 

1026 

1027 # Periodically recreate SPARQLClient to prevent pycurl's accumulated state degradation 

1028 client_reset_counter += 1 

1029 if not dry_run_db and client_reset_counter >= client_recreate_interval: 

1030 logging.debug(f"Recreating SPARQLClient after {client_reset_counter} files to clear accumulated pycurl state") 

1031 client.close() 

1032 client = SPARQLClient(endpoint) 

1033 client_reset_counter = 0 

1034 

1035 if dry_run_db and dry_run_callback: # use callback function to use issues found in each file 

1036 dry_run_callback(fp, (ff_issues_in_file, dt_issues, mps_issues, pa_issues, mo_issues)) 

1037 

1038 elapsed_file :float = time.time() - start_file 

1039 times_per_file.append(elapsed_file) 

1040 if file_index % 500 == 0: 

1041 avg_time = sum(times_per_file)/len(times_per_file) 

1042 est_remaining = avg_time * (tot_files - file_index - 1) 

1043 logging.info(f"Average time per file with last {len(times_per_file)} files: {avg_time:.2f} seconds. Estimated remaining time: {est_remaining/3600:.2f} hours.") 

1044 times_per_file = [] 

1045 

1046 

1047 # successful termination -> cleanup 

1048 elapsed = time.time() - start_time 

1049 logging.info(f"Provenance fixing process completed successfully in {elapsed/3600:.2f} hours.") 

1050 print(f"Provenance fixing process completed successfully in {elapsed/3600:.2f} hours.") 

1051 logging.info(f"Total Filler issues found and fixed: {ff_c}") 

1052 logging.info(f"Total DateTime issues found and fixed: {dt_c}") 

1053 logging.info(f"Total Missing Primary Source issues found and fixed: {mps_c}") 

1054 logging.info(f"Total Multiple Processing Agent issues found and fixed: {pa_c}") 

1055 logging.info(f"Total Multiple Object issues found and fixed: {mo_c}") 

1056 

1057 if os.path.exists(filler_cache_fp): 

1058 os.remove(filler_cache_fp) 

1059 if os.path.exists(checkpoint.path): 

1060 os.remove(checkpoint.path) 

1061 

1062 except (Exception, KeyboardInterrupt) as e: 

1063 print(traceback.print_exc()) 

1064 if type(e) == KeyboardInterrupt: 

1065 logging.error("KeyboardInterrupt") 

1066 else: 

1067 logging.error(e) 

1068 elapsed = time.time() - start_time 

1069 logging.info(f"Process ran for {elapsed/3600:.2f} hours before interruption.") 

1070 logging.info(f"Checkpoint state at process interruption: {checkpoint.state}") 

1071 

1072 finally: 

1073 checkpoint.flush() 

1074 client.close()