Coverage for meta_prov_fixer / legacy / fix_via_sparql.py: 83%

800 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-16 15:12 +0000

1import logging 

2import time 

3from typing import List, Tuple, Union, Dict, Set, Generator, Any, Iterable 

4from string import Template 

5from SPARQLWrapper import SPARQLWrapper, JSON, POST 

6from tqdm import tqdm 

7from collections import defaultdict 

8from datetime import date, datetime 

9import os 

10from meta_prov_fixer.utils import * 

11import re 

12from rdflib.plugins.sparql.processor import SPARQLResult 

13from rdflib import Dataset 

14import json 

15from urllib.error import HTTPError, URLError 

16import contextlib 

17 

18# # OC Meta published RDF dumps publication dates and DOIs at the time of writing this code (2025-07-01). 

19# meta_dumps_pub_dates = [ 

20# ('2022-12-19', 'https://doi.org/10.6084/m9.figshare.21747536.v1'), 

21# ('2022-12-20', 'https://doi.org/10.6084/m9.figshare.21747536.v2'), 

22# ('2023-02-15', 'https://doi.org/10.6084/m9.figshare.21747536.v3'), 

23# ('2023-06-28', 'https://doi.org/10.6084/m9.figshare.21747536.v4'), 

24# ('2023-10-26', 'https://doi.org/10.6084/m9.figshare.21747536.v5'), 

25# ('2024-04-06', 'https://doi.org/10.6084/m9.figshare.21747536.v6'), 

26# ('2024-06-17', 'https://doi.org/10.6084/m9.figshare.21747536.v7'), 

27# ('2025-02-02', 'https://doi.org/10.6084/m9.figshare.21747536.v8') 

28# ] 

29 

30 

31def simulate_ff_changes(local_named_graph:dict) -> dict: 

32 """ 

33 Simulates the changes that FillerFixer would operate on the input local graph in the live triplestore  

34 when running FillerFixer.fix_issue(). `local_named_graph` is treated as if it was the triplestore itself. 

35 

36 :param local_named_graph: The JSON-LD-formatted dictionary corresponding to a provenance named graph. 

37 """ 

38 

39 logging.disable(logging.CRITICAL) # suspend all logs below CRITICAL 

40 

41 try: 

42 fake_ff = FillerFixer('http://example.org/sparql/') 

43 local_dataset = Dataset(default_union=True) 

44 local_dataset.parse(data=local_named_graph, format='json-ld') 

45 

46 def convert_query_results(qres: SPARQLResult) -> dict: 

47 """ 

48 Naively converts the results of a SPARQL query made with rdflib to a local Graph/Dataset object to 

49 to SPARQLWrapper-like results made to an endpoint (at least for what concerns the bindings['results'] part). 

50 """ 

51 try: 

52 return json.loads(qres.serialize(format='json')) 

53 except Exception as e: 

54 print(e) 

55 return None 

56 

57 def rdflib_query(dataset:Dataset, query: str) -> Union[dict, None]: 

58 """ 

59 Executes a SPARQL query on a given rdflib Dataset and returns the results in a SPARQLWrapper-like JSON format. 

60 """ 

61 return dataset.query(query) 

62 

63 def rdflib_update(dataset:Dataset, query: str) -> None: 

64 """ 

65 Executes a SPARQL update query on a given rdflib Dataset. 

66 """ 

67 dataset.update(query) 

68 

69 

70 def local_query(q): 

71 """Custom function to simulate _query using local Dataset.""" 

72 qres = rdflib_query(local_dataset, q) 

73 return convert_query_results(qres) 

74 

75 def local_update(q): 

76 """Custom function to simulate _update using local Dataset.""" 

77 return rdflib_update(local_dataset, q) 

78 

79 fake_ff._query = local_query # overwrite FillerFixer._query() 

80 fake_ff._update = local_update # overwrite FillerFixer._update() 

81 

82 fake_ff.fix_issue() 

83 

84 graph_object = json.loads(local_dataset.serialize(format='json-ld')) 

85 if len(graph_object) > 1: 

86 raise ValueError("The input named graph seems to contain multiple graphs!") 

87 

88 result = graph_object[0] 

89 return result 

90 

91 finally: 

92 logging.disable(logging.NOTSET) # re-enable normal logging 

93 

94 

95 

96def make_ff_rename_mapping(issues_fp, mapping_fp): 

97 """ 

98 Maps the old name (URI) of the snapshots in a graph with a filler to their new name, 

99 and stores the dictionary to a JSON file. 

100  

101 :param issues_fp: the path to the file storing the results of `FillerFixer.detect_issue()` 

102 :param mapping_fp: the path to the file where to store the name mapping. 

103 """ 

104 

105 general_mapping = dict() 

106 for line in chunker(issues_fp): 

107 to_delete:set = line[1]['to_delete'] 

108 to_rename:set = line[1]['remaining_snapshots'] 

109 renaming = FillerFixer.map_se_names(to_delete, to_rename) 

110 for old_name, new_name in renaming.items(): 

111 if old_name != new_name: 

112 general_mapping[old_name] = new_name 

113 

114 with open(mapping_fp, 'r', encoding='utf-8') as out: 

115 json.dump(general_mapping, out) 

116 

117 return 

118 

119 

120class ProvenanceIssueFixer: 

121 def __init__(self, endpoint: str, dump_dir:Union[str, None]=None, issues_log_dir:Union[str, None]=None, checkpoint_fp='checkpoint.json'): 

122 """ 

123 Base class for fixing provenance issues via SPARQL queries. 

124 Initializes the SPARQL endpoint and sets up the query method. 

125 Classes dedicated to fixing specific issues should inherit from this class and implement the `detect_issue` and `fix_issue` methods. 

126  

127 :param endpoint: The SPARQL endpoint URL. 

128 :param dump_dir: Path to the directory storing the JSON-LD dump files for provenance.  

129 If provided, the fixer will read from these files in the error detection phase(instead of querying the SPARQL endpoint directly) (default: None). 

130 :param issues_log_dir: If provided, the path to the directory where the data involved in a query-detected issue will be logged. If None, data is kept in memory. 

131 :param checkpoint_fp: Path to the checkpoint file for resuming interrupted processes (default: 'checkpoint.json'). 

132 """ 

133 

134 self.endpoint = endpoint 

135 # self.sparql = SPARQLWrapper(self.endpoint) 

136 self.dump_dir = dump_dir or None 

137 # self.sparql.setReturnFormat(JSON) 

138 # self.sparql.setMethod(POST) 

139 self.checkpoint_mngr = CheckpointManager(checkpoint_fp) 

140 self.failed_queries_fp = f"prov_fix_failed_queries_{datetime.today().strftime('%Y-%m-%d')}.txt" 

141 

142 if issues_log_dir: 

143 self.issues_log_fp = os.path.join(issues_log_dir, f"{type(self).__qualname__}.jsonl") 

144 else: 

145 self.issues_log_fp = None 

146 

147 if self.issues_log_fp: 

148 os.makedirs(os.path.dirname(self.issues_log_fp), exist_ok=True) 

149 

150 def _query(self, query: str, retries: int = 3, delay: float = 5.0) -> Union[dict, None]: 

151 

152 time.sleep(0.1) # slight delay to avoid overwhelming the endpoint 

153 for attempt in range(retries): 

154 try: 

155 # self.sparql.setQuery(query) 

156 # return self.sparql.query().convert() 

157 

158 # create a new connection for ever query to avoid memory leak 

159 sparql = SPARQLWrapper(self.endpoint) 

160 sparql.setMethod(POST) 

161 sparql.setQuery(query) 

162 

163 res = sparql.query() 

164 with contextlib.closing(res.response): 

165 finalres = res.convert() 

166 res.response.read() 

167 

168 return finalres 

169 

170 except HTTPError as e: 

171 # Virtuoso is up, but rejected the query 

172 if e.code == 503: 

173 logging.warning(f"[Attempt {attempt+1}] HTTP error 503: {e.reason}. Retrying...") 

174 else: 

175 logging.warning(f"[Attempt {attempt+1}] HTTP error {e.code}: {e.reason}. Retrying...") 

176 

177 except URLError as e: 

178 # Network-level errors (connection refused, dropped, etc.) 

179 if "connection refused" in str(e.reason).lower(): 

180 logging.error(f"[Attempt {attempt+1}] Virtuoso appears DOWN (connection refused): {e.reason}. Retrying...") 

181 if attempt == retries - 1: 

182 logging.error(f"[Attempt {attempt+1}] Virtuoso appeared DOWN (connection refused) for {retries} times: {e.reason}. Killing process.") 

183 raise e # kill whole process 

184 time.sleep(60.0) # sleep additional 60 seconds in case Virtuoso is restarting 

185 elif "closed connection" in str(e.reason).lower(): 

186 logging.warning(f"[Attempt {attempt+1}] Connection closed mid-request (?). Retrying...") 

187 else: 

188 logging.warning(f"[Attempt {attempt+1}] URL error: {e.reason}") 

189 

190 except Exception as e: # catch-all for other exceptions 

191 logging.warning(f"Attempt {attempt + 1} failed: {e}") 

192 

193 if attempt < retries - 1: 

194 time.sleep(delay**(attempt+1)) # exponential backoff 

195 else: 

196 logging.error("Max retries reached. Query failed.") 

197 return None 

198 

199 def _update(self, update_query: str, retries: int = 3, delay: float = 5.0) -> None: 

200 time.sleep(0.1) # slight delay to avoid overwhelming the endpoint 

201 for attempt in range(retries): 

202 try: 

203 # self.sparql.setQuery(update_query) 

204 # self.sparql.query() 

205 # return 

206 

207 # create a new connection for ever query to avoid memory leak 

208 sparql = SPARQLWrapper(self.endpoint) 

209 sparql.setMethod(POST) 

210 sparql.setQuery(update_query) 

211 

212 res = sparql.query() 

213 with contextlib.closing(res.response): 

214 res.response.read() 

215 return 

216 

217 except HTTPError as e: 

218 # Virtuoso is up, but rejected the query 

219 if e.code == 503: 

220 logging.warning(f"[Attempt {attempt+1}] HTTP error 503: {e.reason}. Retrying...") 

221 else: 

222 logging.warning(f"[Attempt {attempt+1}] HTTP error {e.code}: {e.reason}. Retrying...") 

223 

224 except URLError as e: 

225 # Network-level errors (connection refused, dropped, etc.) 

226 if "connection refused" in str(e.reason).lower(): 

227 logging.error(f"[Attempt {attempt+1}] Virtuoso appears DOWN (connection refused): {e.reason}. Retrying...") 

228 if attempt == retries -1: 

229 logging.error("Max retries reached. Update failed.") 

230 with open(self.failed_queries_fp, "a") as f: 

231 f.write(update_query.replace("\n", "\\n") + "\n") 

232 logging.error(f"[Attempt {attempt+1}] Virtuoso appeared DOWN (connection refused) for {retries} times: {e.reason}. Killing process.") 

233 raise e # kill whole process 

234 time.sleep(60.0) # sleep additional 60 seconds in case Virtuoso is restarting 

235 elif "closed connection" in str(e.reason).lower(): 

236 logging.warning(f"[Attempt {attempt+1}] Connection closed mid-request (?). Retrying...") 

237 else: 

238 logging.warning(f"[Attempt {attempt+1}] URL error: {e.reason}") 

239 

240 except Exception as e: # catch-all for other exceptions 

241 logging.warning(f"Attempt {attempt + 1} failed: {e}") 

242 

243 if attempt < retries - 1: 

244 time.sleep(delay**(attempt+1)) # exponential backoff 

245 else: 

246 logging.error("Max retries reached. Update failed.") 

247 with open(self.failed_queries_fp, "a") as f: 

248 f.write(update_query.replace("\n", "\\n") + "\n") 

249 

250 def _paginate_query(self, query_template: str, limit: int = 10000, sleep: float = 0.5) -> Generator[List[Dict[str, Any]], None, None]: 

251 """ 

252 Executes a paginated SPARQL SELECT query and yields result bindings in batches. 

253 

254 This method is designed to handle pagination by incrementing the OFFSET in the  

255 provided SPARQL query template. It assumes that the query returns results in a  

256 structure compatible with the SPARQL JSON results format. 

257 

258 :param query_template: A SPARQL query string with two `%d` placeholders for offset and limit values (in that order). 

259 :type query_template: str 

260 :param limit: The number of results to fetch per page. Defaults to 10000. 

261 :type limit: int 

262 :param sleep: Number of seconds to wait between successive queries to avoid overwhelming the endpoint. Defaults to 0.5 seconds. 

263 :type sleep: float 

264 

265 :yield: A list of SPARQL result bindings (each a dictionary with variable bindings). 

266 :rtype: Generator[List[Dict[str, Any]], None, None] 

267 """ 

268 offset = 0 

269 while True: 

270 query = query_template % (offset, limit) 

271 logging.debug(f"Fetching results {offset} to {offset + limit}...") 

272 result_batch = self._query(query) 

273 

274 if result_batch is None: 

275 break 

276 

277 bindings = result_batch["results"]["bindings"] 

278 if not bindings: 

279 break 

280 

281 yield bindings 

282 offset += limit 

283 time.sleep(sleep) 

284 

285 def detect_issue(self): 

286 raise NotImplementedError("Subclasses must implement `detect_issue()`.") 

287 

288 def fix_issue(self): 

289 raise NotImplementedError("Subclasses must implement `fix_issue()`.") 

290 

291 

292# (1) Delete filler snapshots and fix the rest of the graph -> move to daughter class FillerFixer 

293class FillerFixer(ProvenanceIssueFixer): 

294 """ 

295 A class to fix issues related to filler snapshots in the OpenCitations Meta provenance dataset. 

296 """ 

297 def __init__(self, endpoint: str, dump_dir:str=None, issues_log_dir:Union[str, None]=None, checkpoint_fp='checkpoint.json'): 

298 super().__init__(endpoint, dump_dir=dump_dir, issues_log_dir=issues_log_dir, checkpoint_fp=checkpoint_fp) 

299 

300 def detect_issue(self, limit=10000) -> List[Tuple[str, Dict[str, Set[str]]]]: 

301 """ 

302 Fetch snapshots that are fillers and need to be deleted, grouped by their named graph. 

303 

304 :param limit: The number of results to fetch per page. 

305 :type limit: int 

306 :returns: A list of tuples, where the first element is a graph URI and the second element is a  

307 dictionary with 'to_delete' and 'remaining_snapshots' as keys and a set as value of both keys. 

308 :rtype: List[Tuple[str, Dict[str, Set[str]]]] 

309 """ 

310 grouped_result = defaultdict(lambda: {'to_delete': set(), 'remaining_snapshots': set()}) 

311 

312 template = """ 

313 PREFIX prov: <http://www.w3.org/ns/prov#> 

314 PREFIX dcterms: <http://purl.org/dc/terms/> 

315 PREFIX oc: <https://w3id.org/oc/ontology/> 

316 

317 SELECT ?g ?snapshot ?other_se 

318 WHERE { 

319 { 

320 SELECT DISTINCT ?g ?snapshot ?other_se 

321 WHERE { 

322 GRAPH ?g { 

323 { 

324 ?snapshot a prov:Entity . 

325 OPTIONAL { 

326 ?snapshot dcterms:description ?description . 

327 } 

328 FILTER (!regex(str(?snapshot), "/prov/se/1$")) 

329 FILTER (!bound(?description) || !CONTAINS(LCASE(str(?description)), "merged")) 

330 FILTER NOT EXISTS { 

331 ?snapshot oc:hasUpdateQuery ?q . 

332 } 

333 } 

334 ?other_se ?p ?o . 

335 FILTER (?other_se != ?snapshot) 

336 } 

337 } 

338 ORDER BY ?g 

339 } 

340 } 

341 OFFSET %d 

342 LIMIT %d 

343 """ 

344 paradata = get_process_paradata(self) 

345 if self.issues_log_fp: 

346 res_log = open(self.issues_log_fp, 'w', encoding='utf-8') 

347 res_log.write(json.dumps(paradata, ensure_ascii=False) + '\n') 

348 else: 

349 logging.info(paradata) 

350 try: 

351 # Pagination loop 

352 logging.info(f"[{self.__class__.__name__}] Fetching filler snapshots (to be deleted) with pagination...") 

353 for current_bindings in self._paginate_query(template, limit): 

354 # group query results 

355 for b in current_bindings: 

356 g = b['g']['value'] 

357 snapshot = b['snapshot']['value'] 

358 other_se = b['other_se']['value'] 

359 

360 grouped_result[g]['to_delete'].add(snapshot) 

361 grouped_result[g]['remaining_snapshots'].add(other_se) 

362 # the following is necessary when there are multiple filler snapshots in the same graph 

363 grouped_result[g]['remaining_snapshots'].difference_update(grouped_result[g]['to_delete']) 

364 

365 logging.info(f"{sum([len(d['to_delete']) for d in grouped_result.values()])} snapshots marked for deletion.") 

366 if self.issues_log_fp: 

367 for obj in list(dict(grouped_result).items()): 

368 obj = make_json_safe(obj) 

369 res_log.write(json.dumps(obj, ensure_ascii=False) + '\n') 

370 

371 finally: 

372 if self.issues_log_fp: 

373 res_log.close() 

374 

375 if self.issues_log_fp: 

376 self.checkpoint_mngr.save(self.__class__.__name__, 'detection_done', -1) 

377 return list(dict(grouped_result).items()) if not self.issues_log_fp else None 

378 

379 def detect_issue_from_files(self): 

380 """ 

381 Detect filler snapshots by reading local JSON-LD dump files instead of querying the SPARQL endpoint directly. 

382 """ 

383 

384 grouped_result = defaultdict(lambda: {'to_delete': set(), 'remaining_snapshots': set()}) 

385 

386 paradata = get_process_paradata(self) 

387 if self.issues_log_fp: 

388 res_log = open(self.issues_log_fp, 'w', encoding='utf-8') 

389 res_log.write(json.dumps(paradata, ensure_ascii=False) + '\n') 

390 else: 

391 logging.info(paradata) 

392 

393 try: 

394 logging.info(f"[{self.__class__.__name__}] Fetching filler snapshots (to be deleted) from RDF files...") 

395 for graph_obj in read_rdf_dump(self.dump_dir): 

396 if len(graph_obj['@graph']) <= 1: 

397 continue 

398 graph_uri = graph_obj['@id'] 

399 fillers = set() 

400 

401 for snapshot_obj in graph_obj['@graph']: 

402 se_uri = snapshot_obj['@id'] 

403 if snapshot_obj.get('http://purl.org/dc/terms/description'): 

404 desc:list = [d['@value'].lower() for d in snapshot_obj['http://purl.org/dc/terms/description']] 

405 else: 

406 desc = [''] 

407 if ( 

408 not snapshot_obj.get('https://w3id.org/oc/ontology/hasUpdateQuery') 

409 and get_seq_num(se_uri) != 1 

410 and not any('merged' in d for d in desc) 

411 ): 

412 

413 fillers.add(se_uri) 

414 if fillers: 

415 other_se :set = {se_obj['@id'] for se_obj in graph_obj['@graph']} - fillers 

416 

417 

418 grouped_result.update({graph_uri: {'to_delete': fillers, 'remaining_snapshots': other_se}}) 

419 

420 

421 logging.info(f"{sum([len(d['to_delete']) for d in grouped_result.values()])} snapshots marked for deletion.") 

422 if self.issues_log_fp: 

423 for obj in list(dict(grouped_result).items()): 

424 obj = make_json_safe(obj) 

425 res_log.write(json.dumps(obj, ensure_ascii=False) + '\n') 

426 

427 finally: 

428 if self.issues_log_fp: 

429 res_log.close() 

430 

431 if self.issues_log_fp: 

432 self.checkpoint_mngr.save(self.__class__.__name__, 'detection_done', -1) 

433 return list(dict(grouped_result).items()) if not self.issues_log_fp else None 

434 

435 

436 def batch_fix_graphs_with_fillers(self, deletions: Union[str, List[Tuple[str, Dict[str, Set[str]]]]], batch_size=200) -> None: 

437 """ 

438 Deletes snapshots from the triplestore based on the provided deletions list, renames the remaining snapshot entities with relevant URIs (in the  

439 whole dataset) and adapts the time relationships of the remaining entities (in the named graph that contained the filler snapshot(s)). 

440 

441 :param deletions: A list object or the string filepath to a JSON file storing the object.  

442 Each item/line is a tuple where the first element is a graph URI,  

443 and the second is a dictionary with `'to_delete'` and `'remaining_snapshots'` sets. 

444 :type deletions: Union[str, List[Tuple[str, Dict[str, Set[str]]]]] 

445 """ 

446 

447 deletion_template = Template(""" 

448 $dels 

449 """) 

450 

451 logging.info(f"[{self.__class__.__name__}] Fixing graphs with filler snapshots in batches...") 

452 

453 ckpt_mg = self.checkpoint_mngr if self.issues_log_fp else None 

454 

455 for batch_idx, (batch, line_num) in checkpointed_batch( 

456 deletions, 

457 batch_size, 

458 fixer_name=self.__class__.__name__, 

459 phase="batch_fix_fillers", 

460 ckpnt_mngr=ckpt_mg 

461 ): 

462 try: 

463 

464 # step 1: delete filler snapshots in the role of subjects 

465 dels = [] 

466 for g_uri, values in batch: 

467 for se_to_delete in values['to_delete']: 

468 single_del = f"DELETE WHERE {{ GRAPH <{g_uri}> {{ <{se_to_delete}> ?p ?o . }}}};\n" 

469 dels.append(single_del) 

470 dels_str = " ".join(dels) 

471 

472 query = deletion_template.substitute(dels=dels_str) 

473 

474 self._update(query) 

475 logging.debug(f"[{self.__class__.__name__}] Deleted fillers for all graphs in Batch {batch_idx}. Fixing single quads in the same batch...") 

476 

477 # step 2: delete filler snapshots in the role of objects and rename rename remaining snapshots 

478 for g, _dict in batch: 

479 mapping = self.map_se_names(_dict['to_delete'], _dict['remaining_snapshots']) 

480 self.rename_snapshots(mapping) 

481 

482 # step 3: adapt values of prov:invalidatedAtTime for the entities existing now, identified by "new" URIs 

483 new_names = list(set(mapping.values())) 

484 self.adapt_invalidatedAtTime(g, new_names) 

485 

486 logging.debug(f"[{self.__class__.__name__}] Batch {batch_idx} (filler deletion + renaming + adapting time sequence) completed.") 

487 

488 except Exception as e: 

489 logging.error(f"Error while fixing graph with filler snapshot(s) in Batch {batch_idx}, lines {line_num-batch_size} to {line_num}: {e}") 

490 print(f"Error while while fixing graph with filler snapshot(s) in Batch {batch_idx}, lines {line_num-batch_size} to {line_num}: {e}") 

491 raise e 

492 return None 

493 

494 

495 @staticmethod 

496 def map_se_names(to_delete:set, remaining: set) -> dict: 

497 """ 

498 Associates a new URI value to each snapshot URI in the union of ``to_delete`` and ``remaining`` (containing snapshot URIs). 

499 

500 Values in the mapping dictionary are not unique, i.e., multiple old URIs can be mapped to the same new URI. 

501 If ``to_delete`` is empty, the returned dictionary will have identical keys and values, i.e., the URIs will not change. 

502 Each URI in ``to_delete`` will be mapped to the new name of the URI in ``remaining`` that immediately precedes it in 

503 a sequence ordered by sequence number. 

504 

505 **Examples:** 

506 

507 .. code-block:: python 

508 

509 to_delete = {'https://w3id.org/oc/meta/br/06101234191/prov/se/3'} 

510 remaining = {'https://w3id.org/oc/meta/br/06101234191/prov/se/1', 'https://w3id.org/oc/meta/br/06101234191/prov/se/2', 'https://w3id.org/oc/meta/br/06101234191/prov/se/4'} 

511 

512 # The returned mapping will be: 

513 { 

514 'https://w3id.org/oc/meta/br/06101234191/prov/se/1': 'https://w3id.org/oc/meta/br/06101234191/prov/se/1', 

515 'https://w3id.org/oc/meta/br/06101234191/prov/se/2': 'https://w3id.org/oc/meta/br/06101234191/prov/se/2', 

516 'https://w3id.org/oc/meta/br/06101234191/prov/se/3': 'https://w3id.org/oc/meta/br/06101234191/prov/se/2', 

517 'https://w3id.org/oc/meta/br/06101234191/prov/se/4': 'https://w3id.org/oc/meta/br/06101234191/prov/se/3' 

518 } 

519 

520 :param to_delete: A set of snapshot URIs that should be deleted. 

521 :type to_delete: set 

522 :param remaining: A set of URIs of snapshots that should remain in the graph (AFTER BEING RENAMED). 

523 :type remaining: set 

524 :returns: A dictionary mapping old snapshot URIs to their new URIs. 

525 :rtype: dict 

526 """ 

527 to_delete = set(to_delete) 

528 remaining = set(remaining) 

529 all_snapshots:list = sorted(to_delete|remaining, key=lambda x: get_seq_num(x)) # sorting is required! 

530 

531 mapping = {} 

532 sorted_remaining = [] 

533 base_uri = remove_seq_num(all_snapshots[0]) 

534 

535 if not all(u.startswith(base_uri) for u in all_snapshots): 

536 logging.error(f"All snapshots must start with the same base URI: {base_uri}. Found: {all_snapshots}") 

537 raise ValueError(f"Can rename only snapshots that are included in the same named graph.") 

538 

539 for old_uri in all_snapshots: 

540 if old_uri in remaining: 

541 new_uri = f"{base_uri}{len(sorted_remaining)+1}" 

542 mapping[old_uri] = new_uri 

543 sorted_remaining.append(new_uri) 

544 

545 else: # i.e., elif old_uri in to_delete 

546 try: 

547 new_uri = f"{base_uri}{get_seq_num(sorted_remaining[-1])}" 

548 except IndexError: 

549 # all snapshots are fillers (must be deleted), including the first one (creation) 

550 logging.error(f"The first snapshot {old_uri} is a filler. Cannot rename the remaining snapshots.") 

551 

552 mapping[old_uri] = new_uri 

553 

554 return mapping 

555 

556 def rename_snapshots(self, mapping): 

557 """ 

558 Renames snapshots in the triplestore according to the provided mapping. 

559 

560 :param mapping: A dictionary where keys are old snapshot URIs and values are new snapshot URIs. 

561 :type mapping: dict 

562 """ 

563 # TODO: consider modifying the query template to support bulk updates (using UNION in the WHERE block) 

564 

565 # !IMPORTANT: mapping is sorted ascendingly by the sequence number of old URI (get_sequence_number on mapping's keys) 

566 # This is required, otherwise newly inserted URIs might be deleted when iterating over mapping's items 

567 mapping = dict(sorted(mapping.items(), key=lambda i: get_seq_num(i[0]))) 

568 

569 template = Template(""" 

570 DELETE { 

571 GRAPH ?g { 

572 <$old_uri> ?p ?o . 

573 ?s ?p2 <$old_uri> . 

574 } 

575 } 

576 INSERT { 

577 GRAPH ?g { 

578 <$new_uri> ?p ?o . 

579 ?s ?p2 <$new_uri> . 

580 } 

581 } 

582 WHERE { 

583 GRAPH ?g { 

584 { 

585 <$old_uri> ?p ?o . 

586 } 

587 UNION 

588 { 

589 ?s ?p2 <$old_uri> . 

590 } 

591 } 

592 } 

593 """) 

594 

595 for old_uri, new_uri in mapping.items(): 

596 if old_uri == new_uri: 

597 continue 

598 query = template.substitute(old_uri=old_uri, new_uri=new_uri) 

599 self._update(query) 

600 

601 def adapt_invalidatedAtTime(self, graph_uri: str, snapshots: list) -> None: 

602 """ 

603 Update the ``prov:invalidatedAtTime`` property of each snapshot in the provided list to match  

604 the value of ``prov:generatedAtTime`` of the following snapshot. 

605 

606 :param graph_uri: The URI of the named graph containing the snapshots. 

607 :type graph_uri: str 

608 :param snapshots: A list of snapshot URIs. 

609 :type snapshots: list 

610 :returns: None 

611 """ 

612 

613 # TODO: consider modifying the query template to support bulk updates 

614 snapshots = sorted(snapshots, key=lambda x: get_seq_num(x)) # sorting is required! 

615 template = Template(""" 

616 PREFIX prov: <http://www.w3.org/ns/prov#> 

617 

618 # WITH <$graph> 

619 DELETE { 

620 GRAPH <$graph> { <$snapshot> prov:invalidatedAtTime ?old_time . } 

621 } 

622 INSERT { 

623 GRAPH <$graph> { <$snapshot> prov:invalidatedAtTime ?new_time . } 

624 } 

625 WHERE { 

626 GRAPH <$graph> { 

627 OPTIONAL { 

628 <$snapshot> prov:invalidatedAtTime ?old_time . 

629 } 

630 <$following_snapshot> prov:generatedAtTime ?new_time . 

631 } 

632 } 

633 """) 

634 

635 for s, following_se in zip(snapshots, snapshots[1:]): 

636 query = template.substitute( 

637 graph=graph_uri, 

638 snapshot=s, 

639 following_snapshot=following_se 

640 ) 

641 

642 self._update(query) 

643 

644 def fix_issue(self, batch_size=200): 

645 

646 ckpt_mg = self.checkpoint_mngr if self.issues_log_fp else None 

647 

648 # step 0: detect filler snapshots 

649 if not self.issues_log_fp: # keep all the issues in memory 

650 to_fix = self.detect_issue() 

651 else: # writes all issues to self.issues_log_fp 

652 if not detection_completed(self.__class__.__name__, ckpt_mg): 

653 if os.path.exists(self.issues_log_fp): 

654 logging.warning(f"Issues log file {self.issues_log_fp} already exists and will be overwritten.") 

655 if not self.dump_dir: # read from the SPARQL endpoint 

656 self.detect_issue() 

657 else: # read from local RDF dump files 

658 self.detect_issue_from_files() 

659 else: 

660 logging.warning(f"[{self.__class__.__name__}] Issues already detected: reading from file {self.issues_log_fp}") 

661 

662 if not self.issues_log_fp: 

663 self.batch_fix_graphs_with_fillers(to_fix) 

664 else: 

665 self.batch_fix_graphs_with_fillers(self.issues_log_fp) 

666 

667 logging.info(f"[{self.__class__.__name__}] Fixing graphs with filler snapshots terminated.") 

668 

669 

670# (2) DATETIME values correction -> move in daughter class DateTimeFixer 

671class DateTimeFixer(ProvenanceIssueFixer): 

672 """ 

673 A class to fix issues related to ill-formed datetime values in the OpenCitations Meta provenance dataset. 

674 

675 The following datetime formats are considered ill-formed or to be normalized: 

676 - Datetime values without timezone information (e.g. 2020-04-22T12:00:00). 

677 - Datetime values including microseconds (e.g. 2020-04-22T12:00:00.123456Z). 

678 - Datetime values with timezone offsets different from UTC (e.g. 2020-04-22T12:00:00+01:00). 

679 - All or some of the above combined (e.g. 2020-04-22T12:00:00.123456+02:00). 

680 """ 

681 def __init__(self, endpoint: str, dump_dir:str=None, issues_log_dir:Union[str, None]=None, checkpoint_fp='checkpoint.json'): 

682 super().__init__(endpoint, dump_dir=dump_dir, issues_log_dir=issues_log_dir, checkpoint_fp=checkpoint_fp) 

683 

684 def detect_issue(self, limit=10000) -> Union[None, List[Tuple[str]]]: 

685 """ 

686 Fetch all quads where the datetime object value is not syntactically correct or complete, including cases where 

687 the timezone is not specified (making the datetime impossible to compare with other offset-aware datetimes)  

688 and/or where the time value includes microseconds. Querying is paginated. 

689 

690 :param limit: The number of results to fetch per page. 

691 :type limit: int 

692 :returns: List of tuples (graph URI, subject, predicate, datetime value). 

693 :rtype: Union[None, List[Tuple[str]]] 

694 """ 

695 result = [] 

696 counter = 0 

697 

698 template = r''' 

699 PREFIX prov: <http://www.w3.org/ns/prov#> 

700  

701 SELECT ?g ?s ?p ?dt 

702 WHERE { 

703 { 

704 SELECT ?g ?s ?p ?dt WHERE { 

705 GRAPH ?g { 

706 VALUES ?p {prov:generatedAtTime prov:invalidatedAtTime} 

707 ?s ?p ?dt . 

708 FILTER (!REGEX(STR(?dt), "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}(?:(?:\\+00:00)|Z)$")) 

709 } 

710 } 

711 ORDER BY ?s 

712 } 

713 } 

714 OFFSET %d 

715 LIMIT %d 

716 ''' 

717 paradata = get_process_paradata(self) 

718 if self.issues_log_fp: 

719 res_log = open(self.issues_log_fp, 'w', encoding='utf-8') 

720 res_log.write(json.dumps(paradata, ensure_ascii=False) + '\n') 

721 else: 

722 logging.info(paradata) 

723 

724 try: 

725 # Pagination loop 

726 logging.info(f"[{self.__class__.__name__}] Fetching ill-formed datetime values with pagination...") 

727 for current_bindings in self._paginate_query(template, limit): 

728 # group query results 

729 for b in current_bindings: 

730 g = b['g']['value'] 

731 s = b['s']['value'] 

732 p = b['p']['value'] 

733 dt = b['dt']['value'] 

734 

735 counter +=1 

736 out_row = (g, s, p, dt) 

737 

738 if not self.issues_log_fp: 

739 result.append(out_row) 

740 else: 

741 out_row = make_json_safe(out_row) 

742 res_log.write(json.dumps(out_row, ensure_ascii=False) + '\n') 

743 

744 logging.info(f"Fetched {counter} quads with a badly formed datetime object value.") 

745 

746 finally: 

747 if self.issues_log_fp: 

748 res_log.close() 

749 

750 if self.issues_log_fp: 

751 self.checkpoint_mngr.save(self.__class__.__name__, 'detection_done', -1) 

752 return result if not self.issues_log_fp else None # if issues_log_fp is provided, the results are logged to the file and not returned 

753 

754 def detect_issue_from_files(self, modified_graphs:dict) -> List[Tuple[str]]: 

755 """ 

756 Detect ill-formed datetime values by reading local JSON-LD dump files instead of querying the SPARQL endpoint directly. 

757 

758 :param modified_graphs: A dictionary where keys are graph URIs and values are the modified graph objects 

759 """ 

760 

761 result = [] 

762 counter = 0 

763 

764 paradata = get_process_paradata(self) 

765 if self.issues_log_fp: 

766 res_log = open(self.issues_log_fp, 'w', encoding='utf-8') 

767 res_log.write(json.dumps(paradata, ensure_ascii=False) + '\n') 

768 else: 

769 logging.info(paradata) 

770 

771 try: 

772 logging.info(f"[{self.__class__.__name__}] Fetching ill-formed datetime values from RDF files...") 

773 

774 gentime_prop = 'http://www.w3.org/ns/prov#generatedAtTime' 

775 invaltime_prop = 'http://www.w3.org/ns/prov#invalidatedAtTime' 

776 

777 for graph_obj in read_rdf_dump(self.dump_dir): 

778 if graph_obj['@id'] in modified_graphs: 

779 graph_obj = modified_graphs[graph_obj['@id']] # use the graph already modified by FillerFixer (simulation) 

780 

781 for se_obj in graph_obj['@graph']: 

782 genTime_vals = [] 

783 invalTime_vals = [] 

784 if se_obj.get(gentime_prop): 

785 genTime_vals = [d['@value'] for d in se_obj[gentime_prop]] 

786 if se_obj.get(invaltime_prop): 

787 invalTime_vals = [d['@value'] for d in se_obj[invaltime_prop]] 

788 

789 

790 pattern = r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:(?:\+00:00)|Z)$" 

791 

792 for prop, values in [(gentime_prop, genTime_vals), (invaltime_prop, invalTime_vals)]: 

793 if values: 

794 for dt in values: 

795 if not re.match(pattern, dt): 

796 g = graph_obj['@id'] 

797 s = se_obj['@id'] 

798 p = prop 

799 

800 counter +=1 

801 out_row = (g, s, p, dt) 

802 

803 if not self.issues_log_fp: 

804 result.append(out_row) 

805 else: 

806 out_row = make_json_safe(out_row) 

807 res_log.write(json.dumps(out_row, ensure_ascii=False) + '\n') 

808 

809 logging.info(f"Fetched {counter} quads with a badly formed datetime object value.") 

810 

811 finally: 

812 if self.issues_log_fp: 

813 res_log.close() 

814 

815 if self.issues_log_fp: 

816 self.checkpoint_mngr.save(self.__class__.__name__, 'detection_done', -1) 

817 

818 return result if not self.issues_log_fp else None # if issues_log_fp is provided, the results are logged to the file and not returned 

819 

820 def batch_fix_illformed_datetimes(self, quads: Union[list, str], batch_size=200) -> None: 

821 """ 

822 Replace the datetime object of each quad in ``quads`` with its correct version (offset-aware and without microseconds). 

823 Note that ``xsd:dateTime`` is always made explicit in newly inserted values. 

824 

825 .. note:: 

826 If a snapshot has multiple objects for ``prov:invalidatedAtTime`` or ``prov:generatedAtTime`` (though the latter should never  

827 be the case), they all get replaced with their respective normalised value. 

828 

829 :param quads: List of quads to fix (in memory) or a path to a file containing quads in JSON Lines format. 

830 :type quads: Union[list, str] 

831 :param batch_size: Number of quads to process per batch. 

832 :type batch_size: int 

833 :returns: None 

834 """ 

835 

836 template = Template(''' 

837 PREFIX xsd: <http://www.w3.org/2001/XMLSchema#> 

838 

839 DELETE DATA { 

840 $to_delete 

841 } ; 

842 INSERT DATA { 

843 $to_insert 

844 } 

845 ''') 

846 

847 ckpt_mg = self.checkpoint_mngr if self.issues_log_fp else None 

848 

849 logging.info(f"[{self.__class__.__name__}] Fixing ill-formed datetime values in batches...") 

850 

851 for batch_idx, (batch, line_num) in checkpointed_batch( 

852 quads, 

853 batch_size, 

854 fixer_name=self.__class__.__name__, 

855 phase="batch_fix_illformed_datetimes", 

856 ckpnt_mngr=ckpt_mg 

857 ): 

858 try: 

859 to_delete = [] 

860 to_insert = [] 

861 for g, s, p, dt in batch: 

862 

863 to_delete.append(f'GRAPH <{g}> {{ <{s}> <{p}> "{dt}"^^xsd:dateTime . }}\n') 

864 correct_dt = normalise_datetime(dt) 

865 to_insert.append(f'GRAPH <{g}> {{ <{s}> <{p}> "{correct_dt}"^^xsd:dateTime . }}\n') 

866 

867 to_delete_str = " ".join(to_delete) 

868 to_insert_str = " ".join(to_insert) 

869 query = template.substitute(to_delete=to_delete_str, to_insert=to_insert_str) 

870 

871 self._update(query) 

872 logging.debug(f"[{self.__class__.__name__}] Batch {batch_idx} completed.") 

873 

874 except Exception as e: 

875 logging.error(f"Error while fixing datetime values in Batch {batch_idx} for quads {line_num-batch_size} to {line_num}: {e}") 

876 print(f"Error while fixing datetime values in Batch {batch_idx} for quads {line_num-batch_size} to {line_num}: {e}") 

877 raise e 

878 return None 

879 

880 def fix_issue(self, modified_graphs:dict=None): 

881 

882 ckpt_mg = self.checkpoint_mngr if self.issues_log_fp else None 

883 

884 # step 0: detect ill-formed datetime values 

885 if not self.issues_log_fp: 

886 to_fix = self.detect_issue() # keep all the issues in memory 

887 else: 

888 if not detection_completed(self.__class__.__name__, ckpt_mg): 

889 if os.path.exists(self.issues_log_fp): 

890 logging.warning(f"Issues log file {self.issues_log_fp} already exists and will be overwritten.") 

891 if not self.dump_dir: 

892 self.detect_issue() # writes all issues to self.issues_log_fp 

893 else: 

894 self.detect_issue_from_files(modified_graphs) 

895 else: 

896 logging.warning(f"[{self.__class__.__name__}] Issues already detected: reading from file {self.issues_log_fp}") 

897 

898 # step 1: 

899 if not self.issues_log_fp: 

900 self.batch_fix_illformed_datetimes(to_fix) 

901 else: 

902 self.batch_fix_illformed_datetimes(self.issues_log_fp) 

903 logging.info(f"[{self.__class__.__name__}] Fixing ill-formed datetime values terminated.") 

904 

905# (3) correct creation snapshots without primary source -> move to daughter class MissingPrimSourceFixer 

906class MissingPrimSourceFixer(ProvenanceIssueFixer): 

907 """ 

908 A class to fix issues related to creation snapshots that do not have a primary source in the OpenCitations Meta provenance dataset. 

909 """ 

910 def __init__(self, endpoint: str, meta_dumps_pub_dates: List[Tuple[str, str]], dump_dir:str=None, issues_log_dir:Union[str, None]=None, checkpoint_fp='checkpoint.json'): 

911 """ 

912 :param endpoint: The SPARQL endpoint URL. 

913 :type endpoint: str 

914 :param meta_dumps_pub_dates: Register of published OpenCitations Meta dumps, in the form: [(<ISO format date 1>, <dump DOI1>), (<ISO format date 2>, <dump DOI2>), ...] 

915 :type meta_dumps_pub_dates: List[Tuple[str, str]] 

916 """ 

917 super().__init__(endpoint, dump_dir=dump_dir, issues_log_dir=issues_log_dir, checkpoint_fp=checkpoint_fp) 

918 validate_meta_dumps_pub_dates(meta_dumps_pub_dates) # raises errors if something wrong 

919 self.meta_dumps_pub_dates = sorted([(date.fromisoformat(d), doi) for d, doi in meta_dumps_pub_dates], key=lambda x: x[0]) 

920 

921 def detect_issue(self, limit=10000) -> Union[str, List[Tuple[str, str]]]: 

922 """ 

923 Fetch creation snapshots that do not have a primary source. 

924 

925 :param limit: The number of results to fetch per page. 

926 :type limit: int 

927 :returns: A list of tuples with snapshot URI and generation time. 

928 :rtype: Union[str, List[Tuple[str, str]]] 

929 """ 

930 results = [] 

931 counter = 0 

932 

933 template = """ 

934 PREFIX prov: <http://www.w3.org/ns/prov#> 

935 

936 SELECT ?s ?genTime WHERE { 

937 { 

938 SELECT DISTINCT ?s ?genTime WHERE { 

939 GRAPH ?g { 

940 ?s a prov:Entity ; 

941 prov:generatedAtTime ?genTime . 

942 FILTER NOT EXISTS { ?s prov:hadPrimarySource ?anySource } 

943 FILTER(REGEX(STR(?s), "/prov/se/1$")) # escape the dollar sign if using string.Template compiling 

944 } 

945 } 

946 ORDER BY ?s 

947 } 

948 } 

949 OFFSET %d 

950 LIMIT %d 

951 """ 

952 

953 paradata = get_process_paradata(self) 

954 if self.issues_log_fp: 

955 res_log = open(self.issues_log_fp, 'w', encoding='utf-8') 

956 res_log.write(json.dumps(paradata, ensure_ascii=False) + '\n') 

957 else: 

958 logging.info(paradata) 

959 

960 # Pagination loop 

961 logging.info(f"[{self.__class__.__name__}] Fetching creation snapshots without a primary source with pagination...") 

962 try: 

963 for current_bindings in self._paginate_query(template, limit): 

964 for b in current_bindings: 

965 s = b["s"]["value"] 

966 gen_time = b["genTime"]["value"] 

967 # results.append((s, gen_time)) 

968 counter +=1 

969 out_row = (s, gen_time) 

970 

971 if not self.issues_log_fp: 

972 results.append(out_row) 

973 else: 

974 out_row = make_json_safe(out_row) 

975 res_log.write(json.dumps(out_row, ensure_ascii=False) + '\n') 

976 

977 logging.info(f"Found {counter} creation snapshots without a primary source.") 

978 finally: 

979 if self.issues_log_fp: 

980 res_log.close() 

981 

982 if self.issues_log_fp: 

983 self.checkpoint_mngr.save(self.__class__.__name__, 'detection_done', -1) 

984 

985 return results if not self.issues_log_fp else None # (<snapshot uri>, <gen. time>) 

986 

987 

988 def detect_issue_from_files(self, modified_graphs:dict) -> Union[str, List[Tuple[str, str]]]: 

989 

990 results = [] 

991 counter = 0 

992 

993 paradata = get_process_paradata(self) 

994 if self.issues_log_fp: 

995 res_log = open(self.issues_log_fp, 'w', encoding='utf-8') 

996 res_log.write(json.dumps(paradata, ensure_ascii=False) + '\n') 

997 else: 

998 logging.info(paradata) 

999 

1000 logging.info(f"[{self.__class__.__name__}] Fetching creation snapshots without a primary source from RDF files...") 

1001 try: 

1002 for graph_obj in read_rdf_dump(self.dump_dir): 

1003 if graph_obj['@id'] in modified_graphs: 

1004 graph_obj = modified_graphs[graph_obj['@id']] # use the graph already modified by FillerFixer (simulation) 

1005 for se_obj in graph_obj['@graph']: 

1006 s = se_obj['@id'] 

1007 if get_seq_num(s) != 1: 

1008 continue 

1009 gentime_prop = 'http://www.w3.org/ns/prov#generatedAtTime' 

1010 

1011 if se_obj.get(gentime_prop) and not se_obj.get('http://www.w3.org/ns/prov#hadPrimarySource'): 

1012 genTime_values = [d['@value'] for d in se_obj[gentime_prop]] 

1013 if len(genTime_values) == 1: 

1014 gen_time = normalise_datetime(genTime_values[0]) 

1015 elif len(genTime_values) > 1: 

1016 gen_time = min([normalise_datetime(dt) for dt in genTime_values], key=lambda x: datetime.fromisoformat(x)) # take the earliest datetime value 

1017 

1018 counter +=1 

1019 out_row = (s, gen_time) 

1020 

1021 if not self.issues_log_fp: 

1022 results.append(out_row) 

1023 else: 

1024 out_row = make_json_safe(out_row) 

1025 res_log.write(json.dumps(out_row, ensure_ascii=False) + '\n') 

1026 

1027 logging.info(f"Found {counter} creation snapshots without a primary source.") 

1028 finally: 

1029 if self.issues_log_fp: 

1030 res_log.close() 

1031 

1032 if self.issues_log_fp: 

1033 self.checkpoint_mngr.save(self.__class__.__name__, 'detection_done', -1) 

1034 

1035 return results if not self.issues_log_fp else None # (<snapshot uri>, <gen. time>) 

1036 

1037 def batch_insert_missing_primsource(self, creations_to_fix: Union[str, List[Tuple[str, str]]], batch_size=200): 

1038 """ 

1039 Insert primary sources for creation snapshots that do not have one, in batches. 

1040 

1041 :param creations_to_fix: A list of tuples where each tuple contains the snapshot URI and the generation time, representing all the creation snapshots that must be fixed. 

1042 :type creations_to_fix: Union[str, List[Tuple[str, str]]] 

1043 :param batch_size: The number of snapshots to process in each batch. 

1044 :type batch_size: int 

1045 :returns: None 

1046 """ 

1047 template = Template(""" 

1048 PREFIX prov: <http://www.w3.org/ns/prov#> 

1049 

1050 INSERT DATA { 

1051 $quads 

1052 } 

1053 """) 

1054 

1055 ckpt_mg = self.checkpoint_mngr if self.issues_log_fp else None 

1056 

1057 logging.info(f"[{self.__class__.__name__}] Inserting missing primary sources in batches...") 

1058 

1059 for batch_idx, (batch, line_num) in checkpointed_batch( 

1060 creations_to_fix, 

1061 batch_size, 

1062 fixer_name=self.__class__.__name__, 

1063 phase="batch_insert_missing_primsource", 

1064 ckpnt_mngr=ckpt_mg 

1065 ): 

1066 try: 

1067 quads = [] 

1068 for snapshot_uri, gen_time in batch: 

1069 prim_source_uri = get_previous_meta_dump_uri(self.meta_dumps_pub_dates, gen_time) 

1070 graph_uri = get_graph_uri_from_se_uri(snapshot_uri) 

1071 quads.append(f"GRAPH <{graph_uri}> {{ <{snapshot_uri}> prov:hadPrimarySource <{prim_source_uri}> . }}\n") 

1072 quads_str = " ".join(quads) 

1073 query = template.substitute(quads=quads_str) 

1074 

1075 self._update(query) 

1076 logging.debug(f"[{self.__class__.__name__}] Batch {batch_idx} completed.") 

1077 except Exception as e: 

1078 logging.error(f"Error while fixing multiple primary source in Batch {batch_idx} for snapshots {line_num-batch_size} to {line_num}: {e}") 

1079 print(f"Error while fixing multiple primary source in Batch {batch_idx} for snapshots {line_num-batch_size} to {line_num}: {e}") 

1080 raise e 

1081 return None 

1082 

1083 

1084 def fix_issue(self, modified_graphs:dict=None): 

1085 

1086 ckpt_mg = self.checkpoint_mngr if self.issues_log_fp else None 

1087 

1088 # step 0: detect creation snapshots missing a primary source 

1089 if not self.issues_log_fp: 

1090 to_fix = self.detect_issue() 

1091 else: 

1092 if not detection_completed(self.__class__.__name__, ckpt_mg): 

1093 if os.path.exists(self.issues_log_fp): 

1094 logging.warning(f"Issues log file {self.issues_log_fp} already exists and will be overwritten.") 

1095 if not self.dump_dir: 

1096 self.detect_issue() # writes all issues to self.issues_log_fp 

1097 else: 

1098 self.detect_issue_from_files(modified_graphs) 

1099 else: 

1100 logging.warning(f"[{self.__class__.__name__}] Issues already detected: reading from file {self.issues_log_fp}") 

1101 

1102 # step 1: insert primary source for the snapshots 

1103 if not self.issues_log_fp: 

1104 self.batch_insert_missing_primsource(to_fix) 

1105 else: 

1106 self.batch_insert_missing_primsource(self.issues_log_fp) 

1107 logging.info(f"[{self.__class__.__name__}] Fixing creation snapshots without a primary source terminated.") 

1108 

1109 

1110# TODO: (4) Correct snapshots with multiple objects for prov:wasAttributedTo -> move in daughter class MultiPAFixer 

1111class MultiPAFixer(ProvenanceIssueFixer): 

1112 """ 

1113 A class to fix issues related to snapshots that have multiple objects for the ``prov:wasAttributedTo`` property in the OpenCitations Meta provenance dataset. 

1114 """ 

1115 def __init__(self, endpoint: str, dump_dir:str=None, issues_log_dir:Union[str, None]=None, checkpoint_fp='checkpoint.json'): 

1116 super().__init__(endpoint, dump_dir=dump_dir, issues_log_dir=issues_log_dir, checkpoint_fp=checkpoint_fp) 

1117 

1118 def detect_issue(self, limit=10000): 

1119 """ 

1120 Fetch graph-snapshot pairs where the snapshot has more than one object for the ``prov:wasAttributedTo`` property. 

1121 

1122 :param limit: The number of results to fetch per page. 

1123 :type limit: int 

1124 :returns: List of tuples (graph URI, snapshot URI). 

1125 :rtype: List[Tuple[str, str]] 

1126 """ 

1127 result = [] 

1128 counter = 0 

1129 

1130 template = """ 

1131 PREFIX prov: <http://www.w3.org/ns/prov#> 

1132 PREFIX xsd: <http://www.w3.org/2001/XMLSchema#> 

1133 

1134 SELECT ?g ?s 

1135 WHERE { 

1136 { 

1137 SELECT ?g ?s 

1138 WHERE { 

1139 GRAPH ?g { 

1140 ?s prov:wasAttributedTo <https://w3id.org/oc/meta/prov/pa/1> . 

1141 FILTER EXISTS { 

1142 ?s prov:wasAttributedTo ?other_pa 

1143 FILTER (?other_pa != <https://w3id.org/oc/meta/prov/pa/1>) 

1144 } 

1145 } 

1146 } 

1147 ORDER BY ?g 

1148 } 

1149 } 

1150 OFFSET %d 

1151 LIMIT %d 

1152 """ 

1153 paradata = get_process_paradata(self) 

1154 if self.issues_log_fp: 

1155 res_log = open(self.issues_log_fp, 'w', encoding='utf-8') 

1156 res_log.write(json.dumps(paradata, ensure_ascii=False) + '\n') 

1157 else: 

1158 logging.info(paradata) 

1159 

1160 logging.info(f"[{self.__class__.__name__}] Fetching snapshots with multiple objects for prov:wasAttributedTo...") 

1161 try: 

1162 for current_bindings in self._paginate_query(template, limit): 

1163 for b in current_bindings: 

1164 g = b['g']['value'] 

1165 s = b['s']['value'] 

1166 counter +=1 

1167 out_row = (g, s) 

1168 

1169 if not self.issues_log_fp: 

1170 result.append(out_row) 

1171 else: 

1172 out_row = make_json_safe(out_row) 

1173 res_log.write(json.dumps(out_row, ensure_ascii=False) + '\n') 

1174 logging.info(f"Found {counter} snapshots with multiple objects for prov:wasAttributedTo.") 

1175 finally: 

1176 if self.issues_log_fp: 

1177 res_log.close() 

1178 

1179 if self.issues_log_fp: 

1180 self.checkpoint_mngr.save(self.__class__.__name__, 'detection_done', -1) 

1181 

1182 return result if not self.issues_log_fp else None 

1183 

1184 

1185 def detect_issue_from_files(self, modified_graphs:dict): 

1186 

1187 result = [] 

1188 counter = 0 

1189 

1190 paradata = get_process_paradata(self) 

1191 if self.issues_log_fp: 

1192 res_log = open(self.issues_log_fp, 'w', encoding='utf-8') 

1193 res_log.write(json.dumps(paradata, ensure_ascii=False) + '\n') 

1194 else: 

1195 logging.info(paradata) 

1196 

1197 logging.info(f"[{self.__class__.__name__}] Fetching snapshots with multiple objects for prov:wasAttributedTo from RDF files...") 

1198 try: 

1199 attribut_prop = 'http://www.w3.org/ns/prov#wasAttributedTo' 

1200 default_val = 'https://w3id.org/oc/meta/prov/pa/1' 

1201 for graph_obj in read_rdf_dump(self.dump_dir): 

1202 if graph_obj['@id'] in modified_graphs: 

1203 graph_obj = modified_graphs[graph_obj['@id']] # use the graph already modified by FillerFixer (simulation) 

1204 for se_obj in graph_obj['@graph']: 

1205 if len(se_obj.get(attribut_prop, [])) <= 1: 

1206 continue 

1207 attribut_vals = [d['@id'] for d in se_obj[attribut_prop]] 

1208 if default_val in attribut_vals and any(v for v in attribut_vals if v != default_val): 

1209 g = graph_obj['@id'] 

1210 s = se_obj['@id'] 

1211 

1212 counter +=1 

1213 out_row = (g, s) 

1214 

1215 if not self.issues_log_fp: 

1216 result.append(out_row) 

1217 else: 

1218 out_row = make_json_safe(out_row) 

1219 res_log.write(json.dumps(out_row, ensure_ascii=False) + '\n') 

1220 logging.info(f"Found {counter} snapshots with multiple objects for prov:wasAttributedTo.") 

1221 finally: 

1222 if self.issues_log_fp: 

1223 res_log.close() 

1224 

1225 if self.issues_log_fp: 

1226 self.checkpoint_mngr.save(self.__class__.__name__, 'detection_done', -1) 

1227 

1228 return result if not self.issues_log_fp else None 

1229 

1230 

1231 def batch_fix_extra_pa(self, multi_pa_snapshots:Union[str, List[Tuple[str]]], batch_size=200): 

1232 """ 

1233 Delete triples where the value of ``prov:wasAttributedTo`` is <https://w3id.org/oc/meta/prov/pa/1> if there  

1234 is at least another processing agent for the same snapshot subject. 

1235 

1236 :param multi_pa_snapshots: A list of tuples where each tuple contains a graph URI and a snapshot URI. 

1237 :type multi_pa_snapshots: Union[str, List[Tuple[str]]] 

1238 :param batch_size: Number of snapshots to process per batch. 

1239 :type batch_size: int 

1240 :returns: None 

1241 """ 

1242 template = Template(""" 

1243 PREFIX prov: <http://www.w3.org/ns/prov#> 

1244 PREFIX xsd: <http://www.w3.org/2001/XMLSchema#> 

1245 

1246 DELETE DATA { 

1247 $quads_to_delete 

1248 } ; 

1249 INSERT DATA { 

1250 $quads_to_insert 

1251 } 

1252 """) 

1253 

1254 ckpt_mg = self.checkpoint_mngr if self.issues_log_fp else None 

1255 

1256 logging.info(f"[{self.__class__.__name__}] Fixing snapshots with multiple processing agents in batches...") 

1257 

1258 for batch_idx, (batch, line_num) in checkpointed_batch( 

1259 multi_pa_snapshots, 

1260 batch_size, 

1261 fixer_name=self.__class__.__name__, 

1262 phase="batch_fix_extra_pa", 

1263 ckpnt_mngr=ckpt_mg 

1264 ): 

1265 try: 

1266 to_delete = [] 

1267 to_insert = [] 

1268 for g, s in batch: 

1269 to_delete.append(f"GRAPH <{g}> {{ <{s}> prov:wasAttributedTo <https://orcid.org/0000-0002-8420-0696> . }}\n") # deletes Arcangelo's ORCID 

1270 to_delete.append(f"GRAPH <{g}> {{ <{s}> prov:wasAttributedTo <https://w3id.org/oc/meta/prov/pa/1> . }}\n") # deletes Meta's default processing agent (for ingestions only) 

1271 to_insert.append(f"GRAPH <{g}> {{ <{s}> prov:wasAttributedTo <https://w3id.org/oc/meta/prov/pa/2> . }}\n") # inserts Meta's processsing agent for modification processes 

1272 

1273 to_delete_str = " ".join(to_delete) 

1274 to_insert_str = " ".join(to_insert) 

1275 query = template.substitute(quads_to_delete=to_delete_str, quads_to_insert=to_insert_str) 

1276 

1277 self._update(query) 

1278 logging.debug(f"[{self.__class__.__name__}] Batch {batch_idx} completed.") 

1279 except Exception as e: 

1280 logging.error(f"Error while fixing multiple processing agents in Batch {batch_idx} for snapshots {line_num-batch_size} to {line_num}: {e}") 

1281 print(f"Error while fixing multiple processing agents in Batch {batch_idx} for snapshots {line_num-batch_size} to {line_num}: {e}") 

1282 raise e 

1283 return None 

1284 

1285 def fix_issue(self, modified_graphs:dict=None): 

1286 

1287 ckpt_mg = self.checkpoint_mngr if self.issues_log_fp else None 

1288 

1289 # step 0: detect graph and snapshots that have an additional object besides the typical <https://w3id.org/oc/meta/prov/pa/1> for prov:wasAttributedTo 

1290 if not self.issues_log_fp: 

1291 to_fix = self.detect_issue() 

1292 else: 

1293 if not detection_completed(self.__class__.__name__, ckpt_mg): 

1294 if os.path.exists(self.issues_log_fp): 

1295 logging.warning(f"Issues log file {self.issues_log_fp} already exists and will be overwritten.") 

1296 if not self.dump_dir: 

1297 self.detect_issue() # writes all issues to self.issues_log_fp 

1298 else: 

1299 self.detect_issue_from_files(modified_graphs) 

1300 else: 

1301 logging.warning(f"[{self.__class__.__name__}] Issues already detected: reading from file {self.issues_log_fp}") 

1302 

1303 # step 1: delete all objects for prov:wasAttributedTo and insert only <https://w3id.org/oc/meta/prov/pa/2> 

1304 if not self.issues_log_fp: 

1305 self.batch_fix_extra_pa(to_fix) 

1306 else: 

1307 self.batch_fix_extra_pa(self.issues_log_fp) 

1308 logging.info(f"[{self.__class__.__name__}] Fixing graphs with multiple processing agents terminated.") 

1309 

1310 

1311# (5) Correct graphs where at least one snapshots has too many objects for specific properties -> move to daughter class MultiObjectFixer 

1312class MultiObjectFixer(ProvenanceIssueFixer): 

1313 """ 

1314 A class to fix issues related to graphs where at least one snapshot has too many objects for specific properties in the OpenCitations Meta provenance dataset. 

1315 """ 

1316 def __init__(self, endpoint:str, meta_dumps_pub_dates: List[Tuple[str, str]], dump_dir:str=None, issues_log_dir:Union[str, None]=None, checkpoint_fp='checkpoint.json'): 

1317 """ 

1318 :param meta_dumps_pub_dates: Register of published OpenCitations Meta dumps, in the form: [(<ISO format date 1>, <dump DOI1>), (<ISO format date 2>, <dump DOI2>), ...] 

1319 :type meta_dumps_pub_dates: List[Tuple[str, str]] 

1320 """ 

1321 super().__init__(endpoint, dump_dir=dump_dir, issues_log_dir=issues_log_dir, checkpoint_fp=checkpoint_fp) 

1322 self.pa_uri = "https://w3id.org/oc/meta/prov/pa/1" # URI of the processing agent to be used as objects of prov:wasAtttributedTo for newly created snapshots, which is always the default one 

1323 validate_meta_dumps_pub_dates(meta_dumps_pub_dates) # raises errors if something wrong 

1324 self.meta_dumps_pub_dates = sorted([(date.fromisoformat(d), doi) for d, doi in meta_dumps_pub_dates], key=lambda x: x[0]) 

1325 

1326 def detect_issue(self, limit=10000) -> Union[None, List[str]]: 

1327 """ 

1328 Fetch graphs containing at least one snapshot with multiple objects for  

1329 a property that only admits one (e.g. ``oc:hasUpdateQuery``). 

1330 

1331 :param limit: The number of results to fetch per page. 

1332 :type limit: int 

1333 :returns: A list of tuples (graph URI, generation time). 

1334 :rtype: Union[None, List[str]] 

1335 """ 

1336 output = [] 

1337 counter = 0 

1338 

1339 template = """ 

1340 PREFIX prov: <http://www.w3.org/ns/prov#> 

1341 PREFIX oc: <https://w3id.org/oc/ontology/> 

1342 

1343 SELECT ?g ?genTime 

1344 WHERE { 

1345 { 

1346 SELECT DISTINCT ?g 

1347 WHERE { 

1348 GRAPH ?g { 

1349 VALUES ?p { prov:invalidatedAtTime prov:hadPrimarySource oc:hasUpdateQuery } 

1350 ?s ?p ?o ; 

1351 a prov:Entity . 

1352 FILTER EXISTS { 

1353 ?s ?p ?o2 . 

1354 FILTER (?o2 != ?o) 

1355 } 

1356 } 

1357 } 

1358 ORDER BY ?g 

1359 } 

1360 BIND (IRI(CONCAT(str(?g), "se/1")) AS ?target) 

1361 ?target prov:generatedAtTime ?genTime . 

1362 } 

1363 OFFSET %d 

1364 LIMIT %d 

1365 """ 

1366 paradata = get_process_paradata(self) 

1367 if self.issues_log_fp: 

1368 res_log = open(self.issues_log_fp, 'w', encoding='utf-8') 

1369 res_log.write(json.dumps(paradata, ensure_ascii=False) + '\n') 

1370 else: 

1371 logging.info(paradata) 

1372 

1373 

1374 logging.info(f"[{self.__class__.__name__}] Fetching URIs of graphs containing snapshots with too many objects...") 

1375 try: 

1376 for current_bindings in self._paginate_query(template, limit): 

1377 for b in current_bindings: 

1378 g = b['g']['value'] 

1379 gen_time = b['genTime']['value'] 

1380 counter += 1 

1381 out_row = (g, gen_time) 

1382 if not self.issues_log_fp: 

1383 output.append(out_row) 

1384 else: 

1385 out_row = make_json_safe(out_row) 

1386 res_log.write(json.dumps(out_row, ensure_ascii=False) + '\n') 

1387 logging.info(f"Found {counter} distinct graphs containing snapshots with too many objects for some properties.") 

1388 finally: 

1389 if self.issues_log_fp: 

1390 res_log.close() 

1391 

1392 if self.issues_log_fp: 

1393 self.checkpoint_mngr.save(self.__class__.__name__, 'detection_done', -1) 

1394 

1395 return output if not self.issues_log_fp else None 

1396 

1397 def detect_issue_from_files(self, modified_graphs:dict) -> Union[None, List[str]]: 

1398 

1399 output = [] 

1400 counter = 0 

1401 

1402 paradata = get_process_paradata(self) 

1403 if self.issues_log_fp: 

1404 res_log = open(self.issues_log_fp, 'w', encoding='utf-8') 

1405 res_log.write(json.dumps(paradata, ensure_ascii=False) + '\n') 

1406 else: 

1407 logging.info(paradata) 

1408 

1409 

1410 logging.info(f"[{self.__class__.__name__}] Fetching URIs of graphs containing snapshots with too many objects from RDF files...") 

1411 try: 

1412 inval_dt_prop = 'http://www.w3.org/ns/prov#invalidatedAtTime' 

1413 prim_source_prop = 'http://www.w3.org/ns/prov#hadPrimarySource' 

1414 upd_query_prop = 'https://w3id.org/oc/ontology/hasUpdateQuery' 

1415 

1416 for graph_obj in read_rdf_dump(self.dump_dir): 

1417 if graph_obj['@id'] in modified_graphs: 

1418 graph_obj = modified_graphs[graph_obj['@id']] # use the graph already modified by FillerFixer (simulation) 

1419 switch = False 

1420 for se_obj in graph_obj['@graph']: 

1421 # check if any property among [inval_dt_prop, prim_source_prop, upd_query_prop] has more than one value 

1422 if any(len(se_obj.get(p, [])) > 1 for p in [inval_dt_prop, prim_source_prop, upd_query_prop]): 

1423 switch = True 

1424 break 

1425 if not switch: 

1426 continue 

1427 

1428 g = graph_obj['@id'] 

1429 gen_time = None 

1430 

1431 for se_obj in graph_obj['@graph']: 

1432 if se_obj['@id'] == g + 'se/1': 

1433 genTime_values = [d['@value'] for d in se_obj.get('http://www.w3.org/ns/prov#generatedAtTime', [])] 

1434 if len(genTime_values) == 1: 

1435 gen_time = normalise_datetime(genTime_values[0]) 

1436 elif len(genTime_values) > 1: 

1437 gen_time = min([normalise_datetime(dt) for dt in genTime_values], key=lambda x: datetime.fromisoformat(x)) # take the earliest datetime value 

1438 break 

1439 else: # i.e. no break 

1440 logging.warning(f"No creation snapshot found for graph {g}. Skipping...") 

1441 continue 

1442 

1443 counter += 1 

1444 out_row = (g, gen_time) 

1445 if not self.issues_log_fp: 

1446 output.append(out_row) 

1447 else: 

1448 out_row = make_json_safe(out_row) 

1449 res_log.write(json.dumps(out_row, ensure_ascii=False) + '\n') 

1450 logging.info(f"Found {counter} distinct graphs containing snapshots with too many objects for some properties.") 

1451 finally: 

1452 if self.issues_log_fp: 

1453 res_log.close() 

1454 

1455 if self.issues_log_fp: 

1456 self.checkpoint_mngr.save(self.__class__.__name__, 'detection_done', -1) 

1457 

1458 return output if not self.issues_log_fp else None 

1459 

1460 

1461 def reset_multi_object_graphs(self, graphs:Union[str, list], batch_size=200): 

1462 """ 

1463 Reset each graph in ``graphs`` by deleting the existing snapshots and creating a new  

1464 creation snapshot, which will be the only one left for that graph. 

1465 

1466 :param graphs: A list of tuples (graph URI, generation time) for graphs that have too many objects for properties that only admit one. 

1467 :type graphs: Union[str, list] 

1468 :returns: None 

1469 """ 

1470 

1471 template = Template(""" 

1472 PREFIX prov: <http://www.w3.org/ns/prov#> 

1473 PREFIX dcterms: <http://purl.org/dc/terms/> 

1474 PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> 

1475 PREFIX xsd: <http://www.w3.org/2001/XMLSchema#> 

1476 

1477 CLEAR GRAPH <$graph> ; 

1478 INSERT DATA { 

1479 GRAPH <$graph> { 

1480 <$creation_snapshot> prov:hadPrimarySource <$primary_source> ; 

1481 prov:wasAttributedTo <$processing_agent> ; 

1482 prov:specializationOf <$specialization_of> ; 

1483 dcterms:description "$description" ; 

1484 rdf:type prov:Entity ; 

1485 prov:generatedAtTime "$gen_time"^^xsd:dateTime . 

1486 } 

1487 } 

1488 """) 

1489 

1490 ckpt_mg = self.checkpoint_mngr if self.issues_log_fp else None 

1491 

1492 logging.info(f"[{self.__class__.__name__}] Resetting graphs with too many objects by creating a new single creation snapshot...") 

1493 

1494 # batch_size = 200 # updates are executed with individual queries anyway 

1495 

1496 for batch_idx, (batch, line_num) in checkpointed_batch( 

1497 graphs, 

1498 batch_size, 

1499 fixer_name=self.__class__.__name__, 

1500 phase="reset_multi_object_graphs", 

1501 ckpnt_mngr=ckpt_mg 

1502 ): 

1503 try: 

1504 for g, gen_time in batch: 

1505 creation_se = g + 'se/1' 

1506 gen_time = gen_time.replace("^^xsd:dateTime", "") 

1507 gen_time = gen_time.replace("^^http://www.w3.org/2001/XMLSchema#dateTime", "") 

1508 prim_source = get_previous_meta_dump_uri(self.meta_dumps_pub_dates, gen_time) 

1509 processing_agent = self.pa_uri 

1510 referent = get_described_res_omid(g) 

1511 desc = f"The entity '{referent}' has been created." 

1512 

1513 query = template.substitute( 

1514 graph = g, 

1515 creation_snapshot = creation_se, 

1516 primary_source = prim_source, 

1517 processing_agent = processing_agent, 

1518 specialization_of = referent, 

1519 description = desc, 

1520 gen_time = gen_time 

1521 ) 

1522 

1523 self._update(query) 

1524 logging.debug(f"[{self.__class__.__name__}] Batch {batch_idx} completed.") 

1525 except Exception as e: 

1526 logging.error(f"Error while resetting graphs in Batch {batch_idx} for graphs{line_num-batch_size} to {line_num}: {e}") 

1527 print(f"Error while resetting graphs in Batch {batch_idx} for graphs{line_num-batch_size} to {line_num}: {e}") 

1528 raise e 

1529 logging.debug(f"[{self.__class__.__name__}] Graph-resetting process terminated.") 

1530 return None 

1531 

1532 

1533 def fix_issue(self, modified_graphs:dict=None): 

1534 

1535 ckpt_mg = self.checkpoint_mngr if self.issues_log_fp else None 

1536 

1537 # step 0: detect graphs and snapshots with multiple objects for 1-cardinality properties 

1538 if not self.issues_log_fp: 

1539 to_fix = self.detect_issue() 

1540 else: 

1541 if not detection_completed(self.__class__.__name__, ckpt_mg): 

1542 if os.path.exists(self.issues_log_fp): 

1543 logging.warning(f"Issues log file {self.issues_log_fp} already exists and will be overwritten.") 

1544 if not self.dump_dir: 

1545 self.detect_issue() # writes all issues to self.issues_log_fp 

1546 else: 

1547 self.detect_issue_from_files(modified_graphs) 

1548 else: 

1549 logging.warning(f"[{self.__class__.__name__}] Issues already detected: reading from file {self.issues_log_fp}") 

1550 

1551 # step 1: reset graphs with a snapshot having extra objects 

1552 if not self.issues_log_fp: 

1553 self.reset_multi_object_graphs(to_fix) 

1554 else: 

1555 self.reset_multi_object_graphs(self.issues_log_fp) 

1556 logging.info(f"[{self.__class__.__name__}] Fixing graphs with multiple objects for 1-cardinality properties terminated.") 

1557 

1558 

1559def fix_process( 

1560 endpoint: str, 

1561 meta_dumps_pub_dates: List[Tuple[str, str]], 

1562 issues_log_dir: Union[str, None] = None, 

1563 dry_run: bool = False, 

1564 checkpoint_fp='checkpoint.json' 

1565): 

1566 """ 

1567 Function wrapping all the single fix operations into a single process, 

1568 with strictly ordered steps, checkpointing, and timing. 

1569 

1570 :param endpoint: SPARQL endpoint URL 

1571 :param meta_dumps_pub_dates: List of (date, url) tuples for meta dumps. 

1572 :param issues_log_dir: Directory in which to write files storing detected issues. 

1573 :param dry_run: If True, only print what would be done, don't actually do it. 

1574 :param checkpoint_fp: Path to checkpoint file. If None, no checkpointing is performed. 

1575 """ 

1576 

1577 ckpt_mngr = CheckpointManager(checkpoint_fp) # checkpoint file will be deleted if program terminates succesfully 

1578 

1579 fixers = ( 

1580 FillerFixer(endpoint, issues_log_dir=issues_log_dir, checkpoint_fp=checkpoint_fp), 

1581 DateTimeFixer(endpoint, issues_log_dir=issues_log_dir, checkpoint_fp=checkpoint_fp), 

1582 MissingPrimSourceFixer(endpoint, meta_dumps_pub_dates, issues_log_dir=issues_log_dir, checkpoint_fp=checkpoint_fp), 

1583 MultiPAFixer(endpoint, issues_log_dir=issues_log_dir, checkpoint_fp=checkpoint_fp), 

1584 MultiObjectFixer(endpoint, meta_dumps_pub_dates, issues_log_dir=issues_log_dir, checkpoint_fp=checkpoint_fp), 

1585 ) 

1586 steps = tuple(fixer.__class__.__name__ for fixer in fixers) 

1587 

1588 timer = TimedProcess(total_phases=len(fixers)) 

1589 timer.start() 

1590 

1591 for i, fixer in enumerate(fixers): 

1592 try: 

1593 logging.info(f"Created instance of {fixer.__class__.__qualname__}.") 

1594 

1595 # --- resume logic: skip completed fixer --- 

1596 if ckpt_mngr: 

1597 state = ckpt_mngr.load() 

1598 if ( 

1599 state.get("fixer") == fixer.__class__.__name__ 

1600 and state.get("phase") == "done" 

1601 ) or ( 

1602 steps.index(state.get("fixer", steps[0])) > i # skip if a later fixer was completed 

1603 ): 

1604 logging.info(f"Skipping {fixer.__class__.__name__} (already completed).") 

1605 continue 

1606 

1607 # --- run fixer --- 

1608 timer.start_phase() 

1609 if not dry_run: 

1610 fixer.fix_issue() 

1611 else: 

1612 logging.debug(f"[fix_process]: Would run {fixer.__class__.__name__}") 

1613 phase_time = timer.end_phase() 

1614 

1615 finally: 

1616 if ckpt_mngr: 

1617 logging.info(ckpt_mngr.load()) 

1618 

1619 # --- log progress --- 

1620 elapsed, remaining = timer.eta(i) 

1621 logging.info( 

1622 f"{fixer.__class__.__name__} finished in {phase_time:.1f}s | " 

1623 f"Elapsed: {elapsed/60:.1f}m | ETA: {remaining/60:.1f}m" 

1624 ) 

1625 

1626 # --- mark fixer as done --- 

1627 if ckpt_mngr: 

1628 ckpt_mngr.save(fixer.__class__.__name__, "done", -1) 

1629 logging.info(f"{fixer.__class__.__name__} completed.") 

1630 

1631 # clear checkpoint only when the whole pipeline is done 

1632 logging.info(" ----- All fixing operations terminated. ----- ") 

1633 if ckpt_mngr: 

1634 ckpt_mngr.clear() 

1635 return None 

1636 

1637 

1638 

1639def fix_process_reading_from_files( 

1640 endpoint: str, 

1641 dump_dir:str, 

1642 meta_dumps_pub_dates: List[Tuple[str, str]], 

1643 issues_log_dir: str, 

1644 dry_run: bool = False, 

1645 checkpoint_fp='checkpoint.json' 

1646): 

1647 """ 

1648 Function wrapping all the single fix operations into a single process, 

1649 with strictly ordered steps, checkpointing, and timing. Reads from RDF dump files for detecting issues. 

1650 

1651 :param endpoint: SPARQL endpoint URL 

1652 :param dump_dir: Directory containing RDF dump files to read from. 

1653 :param meta_dumps_pub_dates: List of (date, url) tuples for meta dumps. 

1654 :param issues_log_dir: Directory in which to write files storing detected issues. 

1655 :param dry_run: If True, only print what would be done, don't actually do it. 

1656 :param checkpoint_fp: Path to checkpoint file. If None, no checkpointing is performed. 

1657 """ 

1658 

1659 ckpt_mngr = CheckpointManager(checkpoint_fp) # checkpoint file will be deleted if program terminates succesfully 

1660 

1661 fixers = ( 

1662 FillerFixer(endpoint, dump_dir=dump_dir, issues_log_dir=issues_log_dir, checkpoint_fp=checkpoint_fp), 

1663 DateTimeFixer(endpoint, dump_dir=dump_dir, issues_log_dir=issues_log_dir, checkpoint_fp=checkpoint_fp), 

1664 MissingPrimSourceFixer(endpoint, meta_dumps_pub_dates, dump_dir=dump_dir, issues_log_dir=issues_log_dir, checkpoint_fp=checkpoint_fp), 

1665 MultiPAFixer(endpoint, dump_dir=dump_dir, issues_log_dir=issues_log_dir, checkpoint_fp=checkpoint_fp), 

1666 MultiObjectFixer(endpoint, meta_dumps_pub_dates, dump_dir=dump_dir, issues_log_dir=issues_log_dir, checkpoint_fp=checkpoint_fp), 

1667 ) 

1668 steps = tuple(fixer.__class__.__name__ for fixer in fixers) 

1669 timer = TimedProcess(total_phases=len(fixers)) 

1670 timer.start() 

1671 

1672 modified_graphs_mapping = dict() # to keep track of graphs modified by FillerFixer 

1673 TMP_FILE_SIMULATED_GRAPHS = os.path.join(issues_log_dir if issues_log_dir else '.', 'temp_simulated_graphs.json') # store to file for checkpointing 

1674 

1675 for i, fixer in enumerate(fixers): 

1676 try: 

1677 logging.info(f"Created instance of {fixer.__class__.__qualname__}.") 

1678 

1679 # --- resume logic: skip completed fixer --- 

1680 if ckpt_mngr: 

1681 state = ckpt_mngr.load() 

1682 if ( 

1683 state.get("fixer") == fixer.__class__.__name__ 

1684 and state.get("phase") == "done" 

1685 ) or ( 

1686 steps.index(state.get("fixer", steps[0])) > i # skip if a later fixer was completed 

1687 ): 

1688 logging.info(f"Skipping {fixer.__class__.__name__} (already completed).") 

1689 continue 

1690 

1691 # --- run fixer --- 

1692 timer.start_phase() 

1693 if not dry_run: 

1694 if i == 0: # i.e. FillerFixer 

1695 fixer.fix_issue() 

1696 mod_graphs_uris = load_modified_graphs_uris(fixer.issues_log_fp) 

1697 logging.info(f"Simulating FillerFixer changes for {len(mod_graphs_uris)} graphs...") 

1698 modified_graphs_mapping.update({g['@id']: simulate_ff_changes(g) for g in read_rdf_dump(dump_dir) if g['@id'] in mod_graphs_uris}) 

1699 

1700 # save simulated graphs to temp file 

1701 with open(TMP_FILE_SIMULATED_GRAPHS, 'w', encoding='utf-8') as tf: 

1702 json.dump(modified_graphs_mapping, tf) 

1703 

1704 

1705 else: # i.e. all other fixers 

1706 with open(TMP_FILE_SIMULATED_GRAPHS, 'r', encoding='utf-8') as tf: 

1707 modified_graphs_mapping = json.load(tf) 

1708 fixer.fix_issue(modified_graphs=modified_graphs_mapping) 

1709 

1710 else: 

1711 logging.debug(f"[fix_process]: Would run {fixer.__class__.__name__}") 

1712 phase_time = timer.end_phase() 

1713 

1714 finally: 

1715 if ckpt_mngr: 

1716 logging.info(f"Latest checkpoint state: {ckpt_mngr.load()}") 

1717 

1718 # --- log progress --- 

1719 elapsed, remaining = timer.eta(i) 

1720 logging.info( 

1721 f"{fixer.__class__.__name__} finished in {phase_time:.1f}s | " 

1722 f"Elapsed: {elapsed/60:.1f}m | ETA: {remaining/60:.1f}m" 

1723 ) 

1724 

1725 # --- mark fixer as done --- 

1726 if ckpt_mngr: 

1727 ckpt_mngr.save(fixer.__class__.__name__, "done", -1) 

1728 logging.info(f"{fixer.__class__.__name__} completed.") 

1729 

1730 # clear checkpoint and remove temp file for simulated graphs only when the whole pipeline is done 

1731 logging.info(" ----- All fixing operations terminated. ----- ") 

1732 if ckpt_mngr: 

1733 ckpt_mngr.clear() 

1734 # if os.path.exists(TMP_FILE_SIMULATED_GRAPHS): 

1735 # os.remove(TMP_FILE_SIMULATED_GRAPHS) # remove temp file storing simulated graphs 

1736 

1737 return None