Coverage for meta_prov_fixer / legacy / fix_via_sparql.py: 83%
800 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 15:12 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 15:12 +0000
1import logging
2import time
3from typing import List, Tuple, Union, Dict, Set, Generator, Any, Iterable
4from string import Template
5from SPARQLWrapper import SPARQLWrapper, JSON, POST
6from tqdm import tqdm
7from collections import defaultdict
8from datetime import date, datetime
9import os
10from meta_prov_fixer.utils import *
11import re
12from rdflib.plugins.sparql.processor import SPARQLResult
13from rdflib import Dataset
14import json
15from urllib.error import HTTPError, URLError
16import contextlib
18# # OC Meta published RDF dumps publication dates and DOIs at the time of writing this code (2025-07-01).
19# meta_dumps_pub_dates = [
20# ('2022-12-19', 'https://doi.org/10.6084/m9.figshare.21747536.v1'),
21# ('2022-12-20', 'https://doi.org/10.6084/m9.figshare.21747536.v2'),
22# ('2023-02-15', 'https://doi.org/10.6084/m9.figshare.21747536.v3'),
23# ('2023-06-28', 'https://doi.org/10.6084/m9.figshare.21747536.v4'),
24# ('2023-10-26', 'https://doi.org/10.6084/m9.figshare.21747536.v5'),
25# ('2024-04-06', 'https://doi.org/10.6084/m9.figshare.21747536.v6'),
26# ('2024-06-17', 'https://doi.org/10.6084/m9.figshare.21747536.v7'),
27# ('2025-02-02', 'https://doi.org/10.6084/m9.figshare.21747536.v8')
28# ]
31def simulate_ff_changes(local_named_graph:dict) -> dict:
32 """
33 Simulates the changes that FillerFixer would operate on the input local graph in the live triplestore
34 when running FillerFixer.fix_issue(). `local_named_graph` is treated as if it was the triplestore itself.
36 :param local_named_graph: The JSON-LD-formatted dictionary corresponding to a provenance named graph.
37 """
39 logging.disable(logging.CRITICAL) # suspend all logs below CRITICAL
41 try:
42 fake_ff = FillerFixer('http://example.org/sparql/')
43 local_dataset = Dataset(default_union=True)
44 local_dataset.parse(data=local_named_graph, format='json-ld')
46 def convert_query_results(qres: SPARQLResult) -> dict:
47 """
48 Naively converts the results of a SPARQL query made with rdflib to a local Graph/Dataset object to
49 to SPARQLWrapper-like results made to an endpoint (at least for what concerns the bindings['results'] part).
50 """
51 try:
52 return json.loads(qres.serialize(format='json'))
53 except Exception as e:
54 print(e)
55 return None
57 def rdflib_query(dataset:Dataset, query: str) -> Union[dict, None]:
58 """
59 Executes a SPARQL query on a given rdflib Dataset and returns the results in a SPARQLWrapper-like JSON format.
60 """
61 return dataset.query(query)
63 def rdflib_update(dataset:Dataset, query: str) -> None:
64 """
65 Executes a SPARQL update query on a given rdflib Dataset.
66 """
67 dataset.update(query)
70 def local_query(q):
71 """Custom function to simulate _query using local Dataset."""
72 qres = rdflib_query(local_dataset, q)
73 return convert_query_results(qres)
75 def local_update(q):
76 """Custom function to simulate _update using local Dataset."""
77 return rdflib_update(local_dataset, q)
79 fake_ff._query = local_query # overwrite FillerFixer._query()
80 fake_ff._update = local_update # overwrite FillerFixer._update()
82 fake_ff.fix_issue()
84 graph_object = json.loads(local_dataset.serialize(format='json-ld'))
85 if len(graph_object) > 1:
86 raise ValueError("The input named graph seems to contain multiple graphs!")
88 result = graph_object[0]
89 return result
91 finally:
92 logging.disable(logging.NOTSET) # re-enable normal logging
96def make_ff_rename_mapping(issues_fp, mapping_fp):
97 """
98 Maps the old name (URI) of the snapshots in a graph with a filler to their new name,
99 and stores the dictionary to a JSON file.
101 :param issues_fp: the path to the file storing the results of `FillerFixer.detect_issue()`
102 :param mapping_fp: the path to the file where to store the name mapping.
103 """
105 general_mapping = dict()
106 for line in chunker(issues_fp):
107 to_delete:set = line[1]['to_delete']
108 to_rename:set = line[1]['remaining_snapshots']
109 renaming = FillerFixer.map_se_names(to_delete, to_rename)
110 for old_name, new_name in renaming.items():
111 if old_name != new_name:
112 general_mapping[old_name] = new_name
114 with open(mapping_fp, 'r', encoding='utf-8') as out:
115 json.dump(general_mapping, out)
117 return
120class ProvenanceIssueFixer:
121 def __init__(self, endpoint: str, dump_dir:Union[str, None]=None, issues_log_dir:Union[str, None]=None, checkpoint_fp='checkpoint.json'):
122 """
123 Base class for fixing provenance issues via SPARQL queries.
124 Initializes the SPARQL endpoint and sets up the query method.
125 Classes dedicated to fixing specific issues should inherit from this class and implement the `detect_issue` and `fix_issue` methods.
127 :param endpoint: The SPARQL endpoint URL.
128 :param dump_dir: Path to the directory storing the JSON-LD dump files for provenance.
129 If provided, the fixer will read from these files in the error detection phase(instead of querying the SPARQL endpoint directly) (default: None).
130 :param issues_log_dir: If provided, the path to the directory where the data involved in a query-detected issue will be logged. If None, data is kept in memory.
131 :param checkpoint_fp: Path to the checkpoint file for resuming interrupted processes (default: 'checkpoint.json').
132 """
134 self.endpoint = endpoint
135 # self.sparql = SPARQLWrapper(self.endpoint)
136 self.dump_dir = dump_dir or None
137 # self.sparql.setReturnFormat(JSON)
138 # self.sparql.setMethod(POST)
139 self.checkpoint_mngr = CheckpointManager(checkpoint_fp)
140 self.failed_queries_fp = f"prov_fix_failed_queries_{datetime.today().strftime('%Y-%m-%d')}.txt"
142 if issues_log_dir:
143 self.issues_log_fp = os.path.join(issues_log_dir, f"{type(self).__qualname__}.jsonl")
144 else:
145 self.issues_log_fp = None
147 if self.issues_log_fp:
148 os.makedirs(os.path.dirname(self.issues_log_fp), exist_ok=True)
150 def _query(self, query: str, retries: int = 3, delay: float = 5.0) -> Union[dict, None]:
152 time.sleep(0.1) # slight delay to avoid overwhelming the endpoint
153 for attempt in range(retries):
154 try:
155 # self.sparql.setQuery(query)
156 # return self.sparql.query().convert()
158 # create a new connection for ever query to avoid memory leak
159 sparql = SPARQLWrapper(self.endpoint)
160 sparql.setMethod(POST)
161 sparql.setQuery(query)
163 res = sparql.query()
164 with contextlib.closing(res.response):
165 finalres = res.convert()
166 res.response.read()
168 return finalres
170 except HTTPError as e:
171 # Virtuoso is up, but rejected the query
172 if e.code == 503:
173 logging.warning(f"[Attempt {attempt+1}] HTTP error 503: {e.reason}. Retrying...")
174 else:
175 logging.warning(f"[Attempt {attempt+1}] HTTP error {e.code}: {e.reason}. Retrying...")
177 except URLError as e:
178 # Network-level errors (connection refused, dropped, etc.)
179 if "connection refused" in str(e.reason).lower():
180 logging.error(f"[Attempt {attempt+1}] Virtuoso appears DOWN (connection refused): {e.reason}. Retrying...")
181 if attempt == retries - 1:
182 logging.error(f"[Attempt {attempt+1}] Virtuoso appeared DOWN (connection refused) for {retries} times: {e.reason}. Killing process.")
183 raise e # kill whole process
184 time.sleep(60.0) # sleep additional 60 seconds in case Virtuoso is restarting
185 elif "closed connection" in str(e.reason).lower():
186 logging.warning(f"[Attempt {attempt+1}] Connection closed mid-request (?). Retrying...")
187 else:
188 logging.warning(f"[Attempt {attempt+1}] URL error: {e.reason}")
190 except Exception as e: # catch-all for other exceptions
191 logging.warning(f"Attempt {attempt + 1} failed: {e}")
193 if attempt < retries - 1:
194 time.sleep(delay**(attempt+1)) # exponential backoff
195 else:
196 logging.error("Max retries reached. Query failed.")
197 return None
199 def _update(self, update_query: str, retries: int = 3, delay: float = 5.0) -> None:
200 time.sleep(0.1) # slight delay to avoid overwhelming the endpoint
201 for attempt in range(retries):
202 try:
203 # self.sparql.setQuery(update_query)
204 # self.sparql.query()
205 # return
207 # create a new connection for ever query to avoid memory leak
208 sparql = SPARQLWrapper(self.endpoint)
209 sparql.setMethod(POST)
210 sparql.setQuery(update_query)
212 res = sparql.query()
213 with contextlib.closing(res.response):
214 res.response.read()
215 return
217 except HTTPError as e:
218 # Virtuoso is up, but rejected the query
219 if e.code == 503:
220 logging.warning(f"[Attempt {attempt+1}] HTTP error 503: {e.reason}. Retrying...")
221 else:
222 logging.warning(f"[Attempt {attempt+1}] HTTP error {e.code}: {e.reason}. Retrying...")
224 except URLError as e:
225 # Network-level errors (connection refused, dropped, etc.)
226 if "connection refused" in str(e.reason).lower():
227 logging.error(f"[Attempt {attempt+1}] Virtuoso appears DOWN (connection refused): {e.reason}. Retrying...")
228 if attempt == retries -1:
229 logging.error("Max retries reached. Update failed.")
230 with open(self.failed_queries_fp, "a") as f:
231 f.write(update_query.replace("\n", "\\n") + "\n")
232 logging.error(f"[Attempt {attempt+1}] Virtuoso appeared DOWN (connection refused) for {retries} times: {e.reason}. Killing process.")
233 raise e # kill whole process
234 time.sleep(60.0) # sleep additional 60 seconds in case Virtuoso is restarting
235 elif "closed connection" in str(e.reason).lower():
236 logging.warning(f"[Attempt {attempt+1}] Connection closed mid-request (?). Retrying...")
237 else:
238 logging.warning(f"[Attempt {attempt+1}] URL error: {e.reason}")
240 except Exception as e: # catch-all for other exceptions
241 logging.warning(f"Attempt {attempt + 1} failed: {e}")
243 if attempt < retries - 1:
244 time.sleep(delay**(attempt+1)) # exponential backoff
245 else:
246 logging.error("Max retries reached. Update failed.")
247 with open(self.failed_queries_fp, "a") as f:
248 f.write(update_query.replace("\n", "\\n") + "\n")
250 def _paginate_query(self, query_template: str, limit: int = 10000, sleep: float = 0.5) -> Generator[List[Dict[str, Any]], None, None]:
251 """
252 Executes a paginated SPARQL SELECT query and yields result bindings in batches.
254 This method is designed to handle pagination by incrementing the OFFSET in the
255 provided SPARQL query template. It assumes that the query returns results in a
256 structure compatible with the SPARQL JSON results format.
258 :param query_template: A SPARQL query string with two `%d` placeholders for offset and limit values (in that order).
259 :type query_template: str
260 :param limit: The number of results to fetch per page. Defaults to 10000.
261 :type limit: int
262 :param sleep: Number of seconds to wait between successive queries to avoid overwhelming the endpoint. Defaults to 0.5 seconds.
263 :type sleep: float
265 :yield: A list of SPARQL result bindings (each a dictionary with variable bindings).
266 :rtype: Generator[List[Dict[str, Any]], None, None]
267 """
268 offset = 0
269 while True:
270 query = query_template % (offset, limit)
271 logging.debug(f"Fetching results {offset} to {offset + limit}...")
272 result_batch = self._query(query)
274 if result_batch is None:
275 break
277 bindings = result_batch["results"]["bindings"]
278 if not bindings:
279 break
281 yield bindings
282 offset += limit
283 time.sleep(sleep)
285 def detect_issue(self):
286 raise NotImplementedError("Subclasses must implement `detect_issue()`.")
288 def fix_issue(self):
289 raise NotImplementedError("Subclasses must implement `fix_issue()`.")
292# (1) Delete filler snapshots and fix the rest of the graph -> move to daughter class FillerFixer
293class FillerFixer(ProvenanceIssueFixer):
294 """
295 A class to fix issues related to filler snapshots in the OpenCitations Meta provenance dataset.
296 """
297 def __init__(self, endpoint: str, dump_dir:str=None, issues_log_dir:Union[str, None]=None, checkpoint_fp='checkpoint.json'):
298 super().__init__(endpoint, dump_dir=dump_dir, issues_log_dir=issues_log_dir, checkpoint_fp=checkpoint_fp)
300 def detect_issue(self, limit=10000) -> List[Tuple[str, Dict[str, Set[str]]]]:
301 """
302 Fetch snapshots that are fillers and need to be deleted, grouped by their named graph.
304 :param limit: The number of results to fetch per page.
305 :type limit: int
306 :returns: A list of tuples, where the first element is a graph URI and the second element is a
307 dictionary with 'to_delete' and 'remaining_snapshots' as keys and a set as value of both keys.
308 :rtype: List[Tuple[str, Dict[str, Set[str]]]]
309 """
310 grouped_result = defaultdict(lambda: {'to_delete': set(), 'remaining_snapshots': set()})
312 template = """
313 PREFIX prov: <http://www.w3.org/ns/prov#>
314 PREFIX dcterms: <http://purl.org/dc/terms/>
315 PREFIX oc: <https://w3id.org/oc/ontology/>
317 SELECT ?g ?snapshot ?other_se
318 WHERE {
319 {
320 SELECT DISTINCT ?g ?snapshot ?other_se
321 WHERE {
322 GRAPH ?g {
323 {
324 ?snapshot a prov:Entity .
325 OPTIONAL {
326 ?snapshot dcterms:description ?description .
327 }
328 FILTER (!regex(str(?snapshot), "/prov/se/1$"))
329 FILTER (!bound(?description) || !CONTAINS(LCASE(str(?description)), "merged"))
330 FILTER NOT EXISTS {
331 ?snapshot oc:hasUpdateQuery ?q .
332 }
333 }
334 ?other_se ?p ?o .
335 FILTER (?other_se != ?snapshot)
336 }
337 }
338 ORDER BY ?g
339 }
340 }
341 OFFSET %d
342 LIMIT %d
343 """
344 paradata = get_process_paradata(self)
345 if self.issues_log_fp:
346 res_log = open(self.issues_log_fp, 'w', encoding='utf-8')
347 res_log.write(json.dumps(paradata, ensure_ascii=False) + '\n')
348 else:
349 logging.info(paradata)
350 try:
351 # Pagination loop
352 logging.info(f"[{self.__class__.__name__}] Fetching filler snapshots (to be deleted) with pagination...")
353 for current_bindings in self._paginate_query(template, limit):
354 # group query results
355 for b in current_bindings:
356 g = b['g']['value']
357 snapshot = b['snapshot']['value']
358 other_se = b['other_se']['value']
360 grouped_result[g]['to_delete'].add(snapshot)
361 grouped_result[g]['remaining_snapshots'].add(other_se)
362 # the following is necessary when there are multiple filler snapshots in the same graph
363 grouped_result[g]['remaining_snapshots'].difference_update(grouped_result[g]['to_delete'])
365 logging.info(f"{sum([len(d['to_delete']) for d in grouped_result.values()])} snapshots marked for deletion.")
366 if self.issues_log_fp:
367 for obj in list(dict(grouped_result).items()):
368 obj = make_json_safe(obj)
369 res_log.write(json.dumps(obj, ensure_ascii=False) + '\n')
371 finally:
372 if self.issues_log_fp:
373 res_log.close()
375 if self.issues_log_fp:
376 self.checkpoint_mngr.save(self.__class__.__name__, 'detection_done', -1)
377 return list(dict(grouped_result).items()) if not self.issues_log_fp else None
379 def detect_issue_from_files(self):
380 """
381 Detect filler snapshots by reading local JSON-LD dump files instead of querying the SPARQL endpoint directly.
382 """
384 grouped_result = defaultdict(lambda: {'to_delete': set(), 'remaining_snapshots': set()})
386 paradata = get_process_paradata(self)
387 if self.issues_log_fp:
388 res_log = open(self.issues_log_fp, 'w', encoding='utf-8')
389 res_log.write(json.dumps(paradata, ensure_ascii=False) + '\n')
390 else:
391 logging.info(paradata)
393 try:
394 logging.info(f"[{self.__class__.__name__}] Fetching filler snapshots (to be deleted) from RDF files...")
395 for graph_obj in read_rdf_dump(self.dump_dir):
396 if len(graph_obj['@graph']) <= 1:
397 continue
398 graph_uri = graph_obj['@id']
399 fillers = set()
401 for snapshot_obj in graph_obj['@graph']:
402 se_uri = snapshot_obj['@id']
403 if snapshot_obj.get('http://purl.org/dc/terms/description'):
404 desc:list = [d['@value'].lower() for d in snapshot_obj['http://purl.org/dc/terms/description']]
405 else:
406 desc = ['']
407 if (
408 not snapshot_obj.get('https://w3id.org/oc/ontology/hasUpdateQuery')
409 and get_seq_num(se_uri) != 1
410 and not any('merged' in d for d in desc)
411 ):
413 fillers.add(se_uri)
414 if fillers:
415 other_se :set = {se_obj['@id'] for se_obj in graph_obj['@graph']} - fillers
418 grouped_result.update({graph_uri: {'to_delete': fillers, 'remaining_snapshots': other_se}})
421 logging.info(f"{sum([len(d['to_delete']) for d in grouped_result.values()])} snapshots marked for deletion.")
422 if self.issues_log_fp:
423 for obj in list(dict(grouped_result).items()):
424 obj = make_json_safe(obj)
425 res_log.write(json.dumps(obj, ensure_ascii=False) + '\n')
427 finally:
428 if self.issues_log_fp:
429 res_log.close()
431 if self.issues_log_fp:
432 self.checkpoint_mngr.save(self.__class__.__name__, 'detection_done', -1)
433 return list(dict(grouped_result).items()) if not self.issues_log_fp else None
436 def batch_fix_graphs_with_fillers(self, deletions: Union[str, List[Tuple[str, Dict[str, Set[str]]]]], batch_size=200) -> None:
437 """
438 Deletes snapshots from the triplestore based on the provided deletions list, renames the remaining snapshot entities with relevant URIs (in the
439 whole dataset) and adapts the time relationships of the remaining entities (in the named graph that contained the filler snapshot(s)).
441 :param deletions: A list object or the string filepath to a JSON file storing the object.
442 Each item/line is a tuple where the first element is a graph URI,
443 and the second is a dictionary with `'to_delete'` and `'remaining_snapshots'` sets.
444 :type deletions: Union[str, List[Tuple[str, Dict[str, Set[str]]]]]
445 """
447 deletion_template = Template("""
448 $dels
449 """)
451 logging.info(f"[{self.__class__.__name__}] Fixing graphs with filler snapshots in batches...")
453 ckpt_mg = self.checkpoint_mngr if self.issues_log_fp else None
455 for batch_idx, (batch, line_num) in checkpointed_batch(
456 deletions,
457 batch_size,
458 fixer_name=self.__class__.__name__,
459 phase="batch_fix_fillers",
460 ckpnt_mngr=ckpt_mg
461 ):
462 try:
464 # step 1: delete filler snapshots in the role of subjects
465 dels = []
466 for g_uri, values in batch:
467 for se_to_delete in values['to_delete']:
468 single_del = f"DELETE WHERE {{ GRAPH <{g_uri}> {{ <{se_to_delete}> ?p ?o . }}}};\n"
469 dels.append(single_del)
470 dels_str = " ".join(dels)
472 query = deletion_template.substitute(dels=dels_str)
474 self._update(query)
475 logging.debug(f"[{self.__class__.__name__}] Deleted fillers for all graphs in Batch {batch_idx}. Fixing single quads in the same batch...")
477 # step 2: delete filler snapshots in the role of objects and rename rename remaining snapshots
478 for g, _dict in batch:
479 mapping = self.map_se_names(_dict['to_delete'], _dict['remaining_snapshots'])
480 self.rename_snapshots(mapping)
482 # step 3: adapt values of prov:invalidatedAtTime for the entities existing now, identified by "new" URIs
483 new_names = list(set(mapping.values()))
484 self.adapt_invalidatedAtTime(g, new_names)
486 logging.debug(f"[{self.__class__.__name__}] Batch {batch_idx} (filler deletion + renaming + adapting time sequence) completed.")
488 except Exception as e:
489 logging.error(f"Error while fixing graph with filler snapshot(s) in Batch {batch_idx}, lines {line_num-batch_size} to {line_num}: {e}")
490 print(f"Error while while fixing graph with filler snapshot(s) in Batch {batch_idx}, lines {line_num-batch_size} to {line_num}: {e}")
491 raise e
492 return None
495 @staticmethod
496 def map_se_names(to_delete:set, remaining: set) -> dict:
497 """
498 Associates a new URI value to each snapshot URI in the union of ``to_delete`` and ``remaining`` (containing snapshot URIs).
500 Values in the mapping dictionary are not unique, i.e., multiple old URIs can be mapped to the same new URI.
501 If ``to_delete`` is empty, the returned dictionary will have identical keys and values, i.e., the URIs will not change.
502 Each URI in ``to_delete`` will be mapped to the new name of the URI in ``remaining`` that immediately precedes it in
503 a sequence ordered by sequence number.
505 **Examples:**
507 .. code-block:: python
509 to_delete = {'https://w3id.org/oc/meta/br/06101234191/prov/se/3'}
510 remaining = {'https://w3id.org/oc/meta/br/06101234191/prov/se/1', 'https://w3id.org/oc/meta/br/06101234191/prov/se/2', 'https://w3id.org/oc/meta/br/06101234191/prov/se/4'}
512 # The returned mapping will be:
513 {
514 'https://w3id.org/oc/meta/br/06101234191/prov/se/1': 'https://w3id.org/oc/meta/br/06101234191/prov/se/1',
515 'https://w3id.org/oc/meta/br/06101234191/prov/se/2': 'https://w3id.org/oc/meta/br/06101234191/prov/se/2',
516 'https://w3id.org/oc/meta/br/06101234191/prov/se/3': 'https://w3id.org/oc/meta/br/06101234191/prov/se/2',
517 'https://w3id.org/oc/meta/br/06101234191/prov/se/4': 'https://w3id.org/oc/meta/br/06101234191/prov/se/3'
518 }
520 :param to_delete: A set of snapshot URIs that should be deleted.
521 :type to_delete: set
522 :param remaining: A set of URIs of snapshots that should remain in the graph (AFTER BEING RENAMED).
523 :type remaining: set
524 :returns: A dictionary mapping old snapshot URIs to their new URIs.
525 :rtype: dict
526 """
527 to_delete = set(to_delete)
528 remaining = set(remaining)
529 all_snapshots:list = sorted(to_delete|remaining, key=lambda x: get_seq_num(x)) # sorting is required!
531 mapping = {}
532 sorted_remaining = []
533 base_uri = remove_seq_num(all_snapshots[0])
535 if not all(u.startswith(base_uri) for u in all_snapshots):
536 logging.error(f"All snapshots must start with the same base URI: {base_uri}. Found: {all_snapshots}")
537 raise ValueError(f"Can rename only snapshots that are included in the same named graph.")
539 for old_uri in all_snapshots:
540 if old_uri in remaining:
541 new_uri = f"{base_uri}{len(sorted_remaining)+1}"
542 mapping[old_uri] = new_uri
543 sorted_remaining.append(new_uri)
545 else: # i.e., elif old_uri in to_delete
546 try:
547 new_uri = f"{base_uri}{get_seq_num(sorted_remaining[-1])}"
548 except IndexError:
549 # all snapshots are fillers (must be deleted), including the first one (creation)
550 logging.error(f"The first snapshot {old_uri} is a filler. Cannot rename the remaining snapshots.")
552 mapping[old_uri] = new_uri
554 return mapping
556 def rename_snapshots(self, mapping):
557 """
558 Renames snapshots in the triplestore according to the provided mapping.
560 :param mapping: A dictionary where keys are old snapshot URIs and values are new snapshot URIs.
561 :type mapping: dict
562 """
563 # TODO: consider modifying the query template to support bulk updates (using UNION in the WHERE block)
565 # !IMPORTANT: mapping is sorted ascendingly by the sequence number of old URI (get_sequence_number on mapping's keys)
566 # This is required, otherwise newly inserted URIs might be deleted when iterating over mapping's items
567 mapping = dict(sorted(mapping.items(), key=lambda i: get_seq_num(i[0])))
569 template = Template("""
570 DELETE {
571 GRAPH ?g {
572 <$old_uri> ?p ?o .
573 ?s ?p2 <$old_uri> .
574 }
575 }
576 INSERT {
577 GRAPH ?g {
578 <$new_uri> ?p ?o .
579 ?s ?p2 <$new_uri> .
580 }
581 }
582 WHERE {
583 GRAPH ?g {
584 {
585 <$old_uri> ?p ?o .
586 }
587 UNION
588 {
589 ?s ?p2 <$old_uri> .
590 }
591 }
592 }
593 """)
595 for old_uri, new_uri in mapping.items():
596 if old_uri == new_uri:
597 continue
598 query = template.substitute(old_uri=old_uri, new_uri=new_uri)
599 self._update(query)
601 def adapt_invalidatedAtTime(self, graph_uri: str, snapshots: list) -> None:
602 """
603 Update the ``prov:invalidatedAtTime`` property of each snapshot in the provided list to match
604 the value of ``prov:generatedAtTime`` of the following snapshot.
606 :param graph_uri: The URI of the named graph containing the snapshots.
607 :type graph_uri: str
608 :param snapshots: A list of snapshot URIs.
609 :type snapshots: list
610 :returns: None
611 """
613 # TODO: consider modifying the query template to support bulk updates
614 snapshots = sorted(snapshots, key=lambda x: get_seq_num(x)) # sorting is required!
615 template = Template("""
616 PREFIX prov: <http://www.w3.org/ns/prov#>
618 # WITH <$graph>
619 DELETE {
620 GRAPH <$graph> { <$snapshot> prov:invalidatedAtTime ?old_time . }
621 }
622 INSERT {
623 GRAPH <$graph> { <$snapshot> prov:invalidatedAtTime ?new_time . }
624 }
625 WHERE {
626 GRAPH <$graph> {
627 OPTIONAL {
628 <$snapshot> prov:invalidatedAtTime ?old_time .
629 }
630 <$following_snapshot> prov:generatedAtTime ?new_time .
631 }
632 }
633 """)
635 for s, following_se in zip(snapshots, snapshots[1:]):
636 query = template.substitute(
637 graph=graph_uri,
638 snapshot=s,
639 following_snapshot=following_se
640 )
642 self._update(query)
644 def fix_issue(self, batch_size=200):
646 ckpt_mg = self.checkpoint_mngr if self.issues_log_fp else None
648 # step 0: detect filler snapshots
649 if not self.issues_log_fp: # keep all the issues in memory
650 to_fix = self.detect_issue()
651 else: # writes all issues to self.issues_log_fp
652 if not detection_completed(self.__class__.__name__, ckpt_mg):
653 if os.path.exists(self.issues_log_fp):
654 logging.warning(f"Issues log file {self.issues_log_fp} already exists and will be overwritten.")
655 if not self.dump_dir: # read from the SPARQL endpoint
656 self.detect_issue()
657 else: # read from local RDF dump files
658 self.detect_issue_from_files()
659 else:
660 logging.warning(f"[{self.__class__.__name__}] Issues already detected: reading from file {self.issues_log_fp}")
662 if not self.issues_log_fp:
663 self.batch_fix_graphs_with_fillers(to_fix)
664 else:
665 self.batch_fix_graphs_with_fillers(self.issues_log_fp)
667 logging.info(f"[{self.__class__.__name__}] Fixing graphs with filler snapshots terminated.")
670# (2) DATETIME values correction -> move in daughter class DateTimeFixer
671class DateTimeFixer(ProvenanceIssueFixer):
672 """
673 A class to fix issues related to ill-formed datetime values in the OpenCitations Meta provenance dataset.
675 The following datetime formats are considered ill-formed or to be normalized:
676 - Datetime values without timezone information (e.g. 2020-04-22T12:00:00).
677 - Datetime values including microseconds (e.g. 2020-04-22T12:00:00.123456Z).
678 - Datetime values with timezone offsets different from UTC (e.g. 2020-04-22T12:00:00+01:00).
679 - All or some of the above combined (e.g. 2020-04-22T12:00:00.123456+02:00).
680 """
681 def __init__(self, endpoint: str, dump_dir:str=None, issues_log_dir:Union[str, None]=None, checkpoint_fp='checkpoint.json'):
682 super().__init__(endpoint, dump_dir=dump_dir, issues_log_dir=issues_log_dir, checkpoint_fp=checkpoint_fp)
684 def detect_issue(self, limit=10000) -> Union[None, List[Tuple[str]]]:
685 """
686 Fetch all quads where the datetime object value is not syntactically correct or complete, including cases where
687 the timezone is not specified (making the datetime impossible to compare with other offset-aware datetimes)
688 and/or where the time value includes microseconds. Querying is paginated.
690 :param limit: The number of results to fetch per page.
691 :type limit: int
692 :returns: List of tuples (graph URI, subject, predicate, datetime value).
693 :rtype: Union[None, List[Tuple[str]]]
694 """
695 result = []
696 counter = 0
698 template = r'''
699 PREFIX prov: <http://www.w3.org/ns/prov#>
701 SELECT ?g ?s ?p ?dt
702 WHERE {
703 {
704 SELECT ?g ?s ?p ?dt WHERE {
705 GRAPH ?g {
706 VALUES ?p {prov:generatedAtTime prov:invalidatedAtTime}
707 ?s ?p ?dt .
708 FILTER (!REGEX(STR(?dt), "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}(?:(?:\\+00:00)|Z)$"))
709 }
710 }
711 ORDER BY ?s
712 }
713 }
714 OFFSET %d
715 LIMIT %d
716 '''
717 paradata = get_process_paradata(self)
718 if self.issues_log_fp:
719 res_log = open(self.issues_log_fp, 'w', encoding='utf-8')
720 res_log.write(json.dumps(paradata, ensure_ascii=False) + '\n')
721 else:
722 logging.info(paradata)
724 try:
725 # Pagination loop
726 logging.info(f"[{self.__class__.__name__}] Fetching ill-formed datetime values with pagination...")
727 for current_bindings in self._paginate_query(template, limit):
728 # group query results
729 for b in current_bindings:
730 g = b['g']['value']
731 s = b['s']['value']
732 p = b['p']['value']
733 dt = b['dt']['value']
735 counter +=1
736 out_row = (g, s, p, dt)
738 if not self.issues_log_fp:
739 result.append(out_row)
740 else:
741 out_row = make_json_safe(out_row)
742 res_log.write(json.dumps(out_row, ensure_ascii=False) + '\n')
744 logging.info(f"Fetched {counter} quads with a badly formed datetime object value.")
746 finally:
747 if self.issues_log_fp:
748 res_log.close()
750 if self.issues_log_fp:
751 self.checkpoint_mngr.save(self.__class__.__name__, 'detection_done', -1)
752 return result if not self.issues_log_fp else None # if issues_log_fp is provided, the results are logged to the file and not returned
754 def detect_issue_from_files(self, modified_graphs:dict) -> List[Tuple[str]]:
755 """
756 Detect ill-formed datetime values by reading local JSON-LD dump files instead of querying the SPARQL endpoint directly.
758 :param modified_graphs: A dictionary where keys are graph URIs and values are the modified graph objects
759 """
761 result = []
762 counter = 0
764 paradata = get_process_paradata(self)
765 if self.issues_log_fp:
766 res_log = open(self.issues_log_fp, 'w', encoding='utf-8')
767 res_log.write(json.dumps(paradata, ensure_ascii=False) + '\n')
768 else:
769 logging.info(paradata)
771 try:
772 logging.info(f"[{self.__class__.__name__}] Fetching ill-formed datetime values from RDF files...")
774 gentime_prop = 'http://www.w3.org/ns/prov#generatedAtTime'
775 invaltime_prop = 'http://www.w3.org/ns/prov#invalidatedAtTime'
777 for graph_obj in read_rdf_dump(self.dump_dir):
778 if graph_obj['@id'] in modified_graphs:
779 graph_obj = modified_graphs[graph_obj['@id']] # use the graph already modified by FillerFixer (simulation)
781 for se_obj in graph_obj['@graph']:
782 genTime_vals = []
783 invalTime_vals = []
784 if se_obj.get(gentime_prop):
785 genTime_vals = [d['@value'] for d in se_obj[gentime_prop]]
786 if se_obj.get(invaltime_prop):
787 invalTime_vals = [d['@value'] for d in se_obj[invaltime_prop]]
790 pattern = r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:(?:\+00:00)|Z)$"
792 for prop, values in [(gentime_prop, genTime_vals), (invaltime_prop, invalTime_vals)]:
793 if values:
794 for dt in values:
795 if not re.match(pattern, dt):
796 g = graph_obj['@id']
797 s = se_obj['@id']
798 p = prop
800 counter +=1
801 out_row = (g, s, p, dt)
803 if not self.issues_log_fp:
804 result.append(out_row)
805 else:
806 out_row = make_json_safe(out_row)
807 res_log.write(json.dumps(out_row, ensure_ascii=False) + '\n')
809 logging.info(f"Fetched {counter} quads with a badly formed datetime object value.")
811 finally:
812 if self.issues_log_fp:
813 res_log.close()
815 if self.issues_log_fp:
816 self.checkpoint_mngr.save(self.__class__.__name__, 'detection_done', -1)
818 return result if not self.issues_log_fp else None # if issues_log_fp is provided, the results are logged to the file and not returned
820 def batch_fix_illformed_datetimes(self, quads: Union[list, str], batch_size=200) -> None:
821 """
822 Replace the datetime object of each quad in ``quads`` with its correct version (offset-aware and without microseconds).
823 Note that ``xsd:dateTime`` is always made explicit in newly inserted values.
825 .. note::
826 If a snapshot has multiple objects for ``prov:invalidatedAtTime`` or ``prov:generatedAtTime`` (though the latter should never
827 be the case), they all get replaced with their respective normalised value.
829 :param quads: List of quads to fix (in memory) or a path to a file containing quads in JSON Lines format.
830 :type quads: Union[list, str]
831 :param batch_size: Number of quads to process per batch.
832 :type batch_size: int
833 :returns: None
834 """
836 template = Template('''
837 PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
839 DELETE DATA {
840 $to_delete
841 } ;
842 INSERT DATA {
843 $to_insert
844 }
845 ''')
847 ckpt_mg = self.checkpoint_mngr if self.issues_log_fp else None
849 logging.info(f"[{self.__class__.__name__}] Fixing ill-formed datetime values in batches...")
851 for batch_idx, (batch, line_num) in checkpointed_batch(
852 quads,
853 batch_size,
854 fixer_name=self.__class__.__name__,
855 phase="batch_fix_illformed_datetimes",
856 ckpnt_mngr=ckpt_mg
857 ):
858 try:
859 to_delete = []
860 to_insert = []
861 for g, s, p, dt in batch:
863 to_delete.append(f'GRAPH <{g}> {{ <{s}> <{p}> "{dt}"^^xsd:dateTime . }}\n')
864 correct_dt = normalise_datetime(dt)
865 to_insert.append(f'GRAPH <{g}> {{ <{s}> <{p}> "{correct_dt}"^^xsd:dateTime . }}\n')
867 to_delete_str = " ".join(to_delete)
868 to_insert_str = " ".join(to_insert)
869 query = template.substitute(to_delete=to_delete_str, to_insert=to_insert_str)
871 self._update(query)
872 logging.debug(f"[{self.__class__.__name__}] Batch {batch_idx} completed.")
874 except Exception as e:
875 logging.error(f"Error while fixing datetime values in Batch {batch_idx} for quads {line_num-batch_size} to {line_num}: {e}")
876 print(f"Error while fixing datetime values in Batch {batch_idx} for quads {line_num-batch_size} to {line_num}: {e}")
877 raise e
878 return None
880 def fix_issue(self, modified_graphs:dict=None):
882 ckpt_mg = self.checkpoint_mngr if self.issues_log_fp else None
884 # step 0: detect ill-formed datetime values
885 if not self.issues_log_fp:
886 to_fix = self.detect_issue() # keep all the issues in memory
887 else:
888 if not detection_completed(self.__class__.__name__, ckpt_mg):
889 if os.path.exists(self.issues_log_fp):
890 logging.warning(f"Issues log file {self.issues_log_fp} already exists and will be overwritten.")
891 if not self.dump_dir:
892 self.detect_issue() # writes all issues to self.issues_log_fp
893 else:
894 self.detect_issue_from_files(modified_graphs)
895 else:
896 logging.warning(f"[{self.__class__.__name__}] Issues already detected: reading from file {self.issues_log_fp}")
898 # step 1:
899 if not self.issues_log_fp:
900 self.batch_fix_illformed_datetimes(to_fix)
901 else:
902 self.batch_fix_illformed_datetimes(self.issues_log_fp)
903 logging.info(f"[{self.__class__.__name__}] Fixing ill-formed datetime values terminated.")
905# (3) correct creation snapshots without primary source -> move to daughter class MissingPrimSourceFixer
906class MissingPrimSourceFixer(ProvenanceIssueFixer):
907 """
908 A class to fix issues related to creation snapshots that do not have a primary source in the OpenCitations Meta provenance dataset.
909 """
910 def __init__(self, endpoint: str, meta_dumps_pub_dates: List[Tuple[str, str]], dump_dir:str=None, issues_log_dir:Union[str, None]=None, checkpoint_fp='checkpoint.json'):
911 """
912 :param endpoint: The SPARQL endpoint URL.
913 :type endpoint: str
914 :param meta_dumps_pub_dates: Register of published OpenCitations Meta dumps, in the form: [(<ISO format date 1>, <dump DOI1>), (<ISO format date 2>, <dump DOI2>), ...]
915 :type meta_dumps_pub_dates: List[Tuple[str, str]]
916 """
917 super().__init__(endpoint, dump_dir=dump_dir, issues_log_dir=issues_log_dir, checkpoint_fp=checkpoint_fp)
918 validate_meta_dumps_pub_dates(meta_dumps_pub_dates) # raises errors if something wrong
919 self.meta_dumps_pub_dates = sorted([(date.fromisoformat(d), doi) for d, doi in meta_dumps_pub_dates], key=lambda x: x[0])
921 def detect_issue(self, limit=10000) -> Union[str, List[Tuple[str, str]]]:
922 """
923 Fetch creation snapshots that do not have a primary source.
925 :param limit: The number of results to fetch per page.
926 :type limit: int
927 :returns: A list of tuples with snapshot URI and generation time.
928 :rtype: Union[str, List[Tuple[str, str]]]
929 """
930 results = []
931 counter = 0
933 template = """
934 PREFIX prov: <http://www.w3.org/ns/prov#>
936 SELECT ?s ?genTime WHERE {
937 {
938 SELECT DISTINCT ?s ?genTime WHERE {
939 GRAPH ?g {
940 ?s a prov:Entity ;
941 prov:generatedAtTime ?genTime .
942 FILTER NOT EXISTS { ?s prov:hadPrimarySource ?anySource }
943 FILTER(REGEX(STR(?s), "/prov/se/1$")) # escape the dollar sign if using string.Template compiling
944 }
945 }
946 ORDER BY ?s
947 }
948 }
949 OFFSET %d
950 LIMIT %d
951 """
953 paradata = get_process_paradata(self)
954 if self.issues_log_fp:
955 res_log = open(self.issues_log_fp, 'w', encoding='utf-8')
956 res_log.write(json.dumps(paradata, ensure_ascii=False) + '\n')
957 else:
958 logging.info(paradata)
960 # Pagination loop
961 logging.info(f"[{self.__class__.__name__}] Fetching creation snapshots without a primary source with pagination...")
962 try:
963 for current_bindings in self._paginate_query(template, limit):
964 for b in current_bindings:
965 s = b["s"]["value"]
966 gen_time = b["genTime"]["value"]
967 # results.append((s, gen_time))
968 counter +=1
969 out_row = (s, gen_time)
971 if not self.issues_log_fp:
972 results.append(out_row)
973 else:
974 out_row = make_json_safe(out_row)
975 res_log.write(json.dumps(out_row, ensure_ascii=False) + '\n')
977 logging.info(f"Found {counter} creation snapshots without a primary source.")
978 finally:
979 if self.issues_log_fp:
980 res_log.close()
982 if self.issues_log_fp:
983 self.checkpoint_mngr.save(self.__class__.__name__, 'detection_done', -1)
985 return results if not self.issues_log_fp else None # (<snapshot uri>, <gen. time>)
988 def detect_issue_from_files(self, modified_graphs:dict) -> Union[str, List[Tuple[str, str]]]:
990 results = []
991 counter = 0
993 paradata = get_process_paradata(self)
994 if self.issues_log_fp:
995 res_log = open(self.issues_log_fp, 'w', encoding='utf-8')
996 res_log.write(json.dumps(paradata, ensure_ascii=False) + '\n')
997 else:
998 logging.info(paradata)
1000 logging.info(f"[{self.__class__.__name__}] Fetching creation snapshots without a primary source from RDF files...")
1001 try:
1002 for graph_obj in read_rdf_dump(self.dump_dir):
1003 if graph_obj['@id'] in modified_graphs:
1004 graph_obj = modified_graphs[graph_obj['@id']] # use the graph already modified by FillerFixer (simulation)
1005 for se_obj in graph_obj['@graph']:
1006 s = se_obj['@id']
1007 if get_seq_num(s) != 1:
1008 continue
1009 gentime_prop = 'http://www.w3.org/ns/prov#generatedAtTime'
1011 if se_obj.get(gentime_prop) and not se_obj.get('http://www.w3.org/ns/prov#hadPrimarySource'):
1012 genTime_values = [d['@value'] for d in se_obj[gentime_prop]]
1013 if len(genTime_values) == 1:
1014 gen_time = normalise_datetime(genTime_values[0])
1015 elif len(genTime_values) > 1:
1016 gen_time = min([normalise_datetime(dt) for dt in genTime_values], key=lambda x: datetime.fromisoformat(x)) # take the earliest datetime value
1018 counter +=1
1019 out_row = (s, gen_time)
1021 if not self.issues_log_fp:
1022 results.append(out_row)
1023 else:
1024 out_row = make_json_safe(out_row)
1025 res_log.write(json.dumps(out_row, ensure_ascii=False) + '\n')
1027 logging.info(f"Found {counter} creation snapshots without a primary source.")
1028 finally:
1029 if self.issues_log_fp:
1030 res_log.close()
1032 if self.issues_log_fp:
1033 self.checkpoint_mngr.save(self.__class__.__name__, 'detection_done', -1)
1035 return results if not self.issues_log_fp else None # (<snapshot uri>, <gen. time>)
1037 def batch_insert_missing_primsource(self, creations_to_fix: Union[str, List[Tuple[str, str]]], batch_size=200):
1038 """
1039 Insert primary sources for creation snapshots that do not have one, in batches.
1041 :param creations_to_fix: A list of tuples where each tuple contains the snapshot URI and the generation time, representing all the creation snapshots that must be fixed.
1042 :type creations_to_fix: Union[str, List[Tuple[str, str]]]
1043 :param batch_size: The number of snapshots to process in each batch.
1044 :type batch_size: int
1045 :returns: None
1046 """
1047 template = Template("""
1048 PREFIX prov: <http://www.w3.org/ns/prov#>
1050 INSERT DATA {
1051 $quads
1052 }
1053 """)
1055 ckpt_mg = self.checkpoint_mngr if self.issues_log_fp else None
1057 logging.info(f"[{self.__class__.__name__}] Inserting missing primary sources in batches...")
1059 for batch_idx, (batch, line_num) in checkpointed_batch(
1060 creations_to_fix,
1061 batch_size,
1062 fixer_name=self.__class__.__name__,
1063 phase="batch_insert_missing_primsource",
1064 ckpnt_mngr=ckpt_mg
1065 ):
1066 try:
1067 quads = []
1068 for snapshot_uri, gen_time in batch:
1069 prim_source_uri = get_previous_meta_dump_uri(self.meta_dumps_pub_dates, gen_time)
1070 graph_uri = get_graph_uri_from_se_uri(snapshot_uri)
1071 quads.append(f"GRAPH <{graph_uri}> {{ <{snapshot_uri}> prov:hadPrimarySource <{prim_source_uri}> . }}\n")
1072 quads_str = " ".join(quads)
1073 query = template.substitute(quads=quads_str)
1075 self._update(query)
1076 logging.debug(f"[{self.__class__.__name__}] Batch {batch_idx} completed.")
1077 except Exception as e:
1078 logging.error(f"Error while fixing multiple primary source in Batch {batch_idx} for snapshots {line_num-batch_size} to {line_num}: {e}")
1079 print(f"Error while fixing multiple primary source in Batch {batch_idx} for snapshots {line_num-batch_size} to {line_num}: {e}")
1080 raise e
1081 return None
1084 def fix_issue(self, modified_graphs:dict=None):
1086 ckpt_mg = self.checkpoint_mngr if self.issues_log_fp else None
1088 # step 0: detect creation snapshots missing a primary source
1089 if not self.issues_log_fp:
1090 to_fix = self.detect_issue()
1091 else:
1092 if not detection_completed(self.__class__.__name__, ckpt_mg):
1093 if os.path.exists(self.issues_log_fp):
1094 logging.warning(f"Issues log file {self.issues_log_fp} already exists and will be overwritten.")
1095 if not self.dump_dir:
1096 self.detect_issue() # writes all issues to self.issues_log_fp
1097 else:
1098 self.detect_issue_from_files(modified_graphs)
1099 else:
1100 logging.warning(f"[{self.__class__.__name__}] Issues already detected: reading from file {self.issues_log_fp}")
1102 # step 1: insert primary source for the snapshots
1103 if not self.issues_log_fp:
1104 self.batch_insert_missing_primsource(to_fix)
1105 else:
1106 self.batch_insert_missing_primsource(self.issues_log_fp)
1107 logging.info(f"[{self.__class__.__name__}] Fixing creation snapshots without a primary source terminated.")
1110# TODO: (4) Correct snapshots with multiple objects for prov:wasAttributedTo -> move in daughter class MultiPAFixer
1111class MultiPAFixer(ProvenanceIssueFixer):
1112 """
1113 A class to fix issues related to snapshots that have multiple objects for the ``prov:wasAttributedTo`` property in the OpenCitations Meta provenance dataset.
1114 """
1115 def __init__(self, endpoint: str, dump_dir:str=None, issues_log_dir:Union[str, None]=None, checkpoint_fp='checkpoint.json'):
1116 super().__init__(endpoint, dump_dir=dump_dir, issues_log_dir=issues_log_dir, checkpoint_fp=checkpoint_fp)
1118 def detect_issue(self, limit=10000):
1119 """
1120 Fetch graph-snapshot pairs where the snapshot has more than one object for the ``prov:wasAttributedTo`` property.
1122 :param limit: The number of results to fetch per page.
1123 :type limit: int
1124 :returns: List of tuples (graph URI, snapshot URI).
1125 :rtype: List[Tuple[str, str]]
1126 """
1127 result = []
1128 counter = 0
1130 template = """
1131 PREFIX prov: <http://www.w3.org/ns/prov#>
1132 PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
1134 SELECT ?g ?s
1135 WHERE {
1136 {
1137 SELECT ?g ?s
1138 WHERE {
1139 GRAPH ?g {
1140 ?s prov:wasAttributedTo <https://w3id.org/oc/meta/prov/pa/1> .
1141 FILTER EXISTS {
1142 ?s prov:wasAttributedTo ?other_pa
1143 FILTER (?other_pa != <https://w3id.org/oc/meta/prov/pa/1>)
1144 }
1145 }
1146 }
1147 ORDER BY ?g
1148 }
1149 }
1150 OFFSET %d
1151 LIMIT %d
1152 """
1153 paradata = get_process_paradata(self)
1154 if self.issues_log_fp:
1155 res_log = open(self.issues_log_fp, 'w', encoding='utf-8')
1156 res_log.write(json.dumps(paradata, ensure_ascii=False) + '\n')
1157 else:
1158 logging.info(paradata)
1160 logging.info(f"[{self.__class__.__name__}] Fetching snapshots with multiple objects for prov:wasAttributedTo...")
1161 try:
1162 for current_bindings in self._paginate_query(template, limit):
1163 for b in current_bindings:
1164 g = b['g']['value']
1165 s = b['s']['value']
1166 counter +=1
1167 out_row = (g, s)
1169 if not self.issues_log_fp:
1170 result.append(out_row)
1171 else:
1172 out_row = make_json_safe(out_row)
1173 res_log.write(json.dumps(out_row, ensure_ascii=False) + '\n')
1174 logging.info(f"Found {counter} snapshots with multiple objects for prov:wasAttributedTo.")
1175 finally:
1176 if self.issues_log_fp:
1177 res_log.close()
1179 if self.issues_log_fp:
1180 self.checkpoint_mngr.save(self.__class__.__name__, 'detection_done', -1)
1182 return result if not self.issues_log_fp else None
1185 def detect_issue_from_files(self, modified_graphs:dict):
1187 result = []
1188 counter = 0
1190 paradata = get_process_paradata(self)
1191 if self.issues_log_fp:
1192 res_log = open(self.issues_log_fp, 'w', encoding='utf-8')
1193 res_log.write(json.dumps(paradata, ensure_ascii=False) + '\n')
1194 else:
1195 logging.info(paradata)
1197 logging.info(f"[{self.__class__.__name__}] Fetching snapshots with multiple objects for prov:wasAttributedTo from RDF files...")
1198 try:
1199 attribut_prop = 'http://www.w3.org/ns/prov#wasAttributedTo'
1200 default_val = 'https://w3id.org/oc/meta/prov/pa/1'
1201 for graph_obj in read_rdf_dump(self.dump_dir):
1202 if graph_obj['@id'] in modified_graphs:
1203 graph_obj = modified_graphs[graph_obj['@id']] # use the graph already modified by FillerFixer (simulation)
1204 for se_obj in graph_obj['@graph']:
1205 if len(se_obj.get(attribut_prop, [])) <= 1:
1206 continue
1207 attribut_vals = [d['@id'] for d in se_obj[attribut_prop]]
1208 if default_val in attribut_vals and any(v for v in attribut_vals if v != default_val):
1209 g = graph_obj['@id']
1210 s = se_obj['@id']
1212 counter +=1
1213 out_row = (g, s)
1215 if not self.issues_log_fp:
1216 result.append(out_row)
1217 else:
1218 out_row = make_json_safe(out_row)
1219 res_log.write(json.dumps(out_row, ensure_ascii=False) + '\n')
1220 logging.info(f"Found {counter} snapshots with multiple objects for prov:wasAttributedTo.")
1221 finally:
1222 if self.issues_log_fp:
1223 res_log.close()
1225 if self.issues_log_fp:
1226 self.checkpoint_mngr.save(self.__class__.__name__, 'detection_done', -1)
1228 return result if not self.issues_log_fp else None
1231 def batch_fix_extra_pa(self, multi_pa_snapshots:Union[str, List[Tuple[str]]], batch_size=200):
1232 """
1233 Delete triples where the value of ``prov:wasAttributedTo`` is <https://w3id.org/oc/meta/prov/pa/1> if there
1234 is at least another processing agent for the same snapshot subject.
1236 :param multi_pa_snapshots: A list of tuples where each tuple contains a graph URI and a snapshot URI.
1237 :type multi_pa_snapshots: Union[str, List[Tuple[str]]]
1238 :param batch_size: Number of snapshots to process per batch.
1239 :type batch_size: int
1240 :returns: None
1241 """
1242 template = Template("""
1243 PREFIX prov: <http://www.w3.org/ns/prov#>
1244 PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
1246 DELETE DATA {
1247 $quads_to_delete
1248 } ;
1249 INSERT DATA {
1250 $quads_to_insert
1251 }
1252 """)
1254 ckpt_mg = self.checkpoint_mngr if self.issues_log_fp else None
1256 logging.info(f"[{self.__class__.__name__}] Fixing snapshots with multiple processing agents in batches...")
1258 for batch_idx, (batch, line_num) in checkpointed_batch(
1259 multi_pa_snapshots,
1260 batch_size,
1261 fixer_name=self.__class__.__name__,
1262 phase="batch_fix_extra_pa",
1263 ckpnt_mngr=ckpt_mg
1264 ):
1265 try:
1266 to_delete = []
1267 to_insert = []
1268 for g, s in batch:
1269 to_delete.append(f"GRAPH <{g}> {{ <{s}> prov:wasAttributedTo <https://orcid.org/0000-0002-8420-0696> . }}\n") # deletes Arcangelo's ORCID
1270 to_delete.append(f"GRAPH <{g}> {{ <{s}> prov:wasAttributedTo <https://w3id.org/oc/meta/prov/pa/1> . }}\n") # deletes Meta's default processing agent (for ingestions only)
1271 to_insert.append(f"GRAPH <{g}> {{ <{s}> prov:wasAttributedTo <https://w3id.org/oc/meta/prov/pa/2> . }}\n") # inserts Meta's processsing agent for modification processes
1273 to_delete_str = " ".join(to_delete)
1274 to_insert_str = " ".join(to_insert)
1275 query = template.substitute(quads_to_delete=to_delete_str, quads_to_insert=to_insert_str)
1277 self._update(query)
1278 logging.debug(f"[{self.__class__.__name__}] Batch {batch_idx} completed.")
1279 except Exception as e:
1280 logging.error(f"Error while fixing multiple processing agents in Batch {batch_idx} for snapshots {line_num-batch_size} to {line_num}: {e}")
1281 print(f"Error while fixing multiple processing agents in Batch {batch_idx} for snapshots {line_num-batch_size} to {line_num}: {e}")
1282 raise e
1283 return None
1285 def fix_issue(self, modified_graphs:dict=None):
1287 ckpt_mg = self.checkpoint_mngr if self.issues_log_fp else None
1289 # step 0: detect graph and snapshots that have an additional object besides the typical <https://w3id.org/oc/meta/prov/pa/1> for prov:wasAttributedTo
1290 if not self.issues_log_fp:
1291 to_fix = self.detect_issue()
1292 else:
1293 if not detection_completed(self.__class__.__name__, ckpt_mg):
1294 if os.path.exists(self.issues_log_fp):
1295 logging.warning(f"Issues log file {self.issues_log_fp} already exists and will be overwritten.")
1296 if not self.dump_dir:
1297 self.detect_issue() # writes all issues to self.issues_log_fp
1298 else:
1299 self.detect_issue_from_files(modified_graphs)
1300 else:
1301 logging.warning(f"[{self.__class__.__name__}] Issues already detected: reading from file {self.issues_log_fp}")
1303 # step 1: delete all objects for prov:wasAttributedTo and insert only <https://w3id.org/oc/meta/prov/pa/2>
1304 if not self.issues_log_fp:
1305 self.batch_fix_extra_pa(to_fix)
1306 else:
1307 self.batch_fix_extra_pa(self.issues_log_fp)
1308 logging.info(f"[{self.__class__.__name__}] Fixing graphs with multiple processing agents terminated.")
1311# (5) Correct graphs where at least one snapshots has too many objects for specific properties -> move to daughter class MultiObjectFixer
1312class MultiObjectFixer(ProvenanceIssueFixer):
1313 """
1314 A class to fix issues related to graphs where at least one snapshot has too many objects for specific properties in the OpenCitations Meta provenance dataset.
1315 """
1316 def __init__(self, endpoint:str, meta_dumps_pub_dates: List[Tuple[str, str]], dump_dir:str=None, issues_log_dir:Union[str, None]=None, checkpoint_fp='checkpoint.json'):
1317 """
1318 :param meta_dumps_pub_dates: Register of published OpenCitations Meta dumps, in the form: [(<ISO format date 1>, <dump DOI1>), (<ISO format date 2>, <dump DOI2>), ...]
1319 :type meta_dumps_pub_dates: List[Tuple[str, str]]
1320 """
1321 super().__init__(endpoint, dump_dir=dump_dir, issues_log_dir=issues_log_dir, checkpoint_fp=checkpoint_fp)
1322 self.pa_uri = "https://w3id.org/oc/meta/prov/pa/1" # URI of the processing agent to be used as objects of prov:wasAtttributedTo for newly created snapshots, which is always the default one
1323 validate_meta_dumps_pub_dates(meta_dumps_pub_dates) # raises errors if something wrong
1324 self.meta_dumps_pub_dates = sorted([(date.fromisoformat(d), doi) for d, doi in meta_dumps_pub_dates], key=lambda x: x[0])
1326 def detect_issue(self, limit=10000) -> Union[None, List[str]]:
1327 """
1328 Fetch graphs containing at least one snapshot with multiple objects for
1329 a property that only admits one (e.g. ``oc:hasUpdateQuery``).
1331 :param limit: The number of results to fetch per page.
1332 :type limit: int
1333 :returns: A list of tuples (graph URI, generation time).
1334 :rtype: Union[None, List[str]]
1335 """
1336 output = []
1337 counter = 0
1339 template = """
1340 PREFIX prov: <http://www.w3.org/ns/prov#>
1341 PREFIX oc: <https://w3id.org/oc/ontology/>
1343 SELECT ?g ?genTime
1344 WHERE {
1345 {
1346 SELECT DISTINCT ?g
1347 WHERE {
1348 GRAPH ?g {
1349 VALUES ?p { prov:invalidatedAtTime prov:hadPrimarySource oc:hasUpdateQuery }
1350 ?s ?p ?o ;
1351 a prov:Entity .
1352 FILTER EXISTS {
1353 ?s ?p ?o2 .
1354 FILTER (?o2 != ?o)
1355 }
1356 }
1357 }
1358 ORDER BY ?g
1359 }
1360 BIND (IRI(CONCAT(str(?g), "se/1")) AS ?target)
1361 ?target prov:generatedAtTime ?genTime .
1362 }
1363 OFFSET %d
1364 LIMIT %d
1365 """
1366 paradata = get_process_paradata(self)
1367 if self.issues_log_fp:
1368 res_log = open(self.issues_log_fp, 'w', encoding='utf-8')
1369 res_log.write(json.dumps(paradata, ensure_ascii=False) + '\n')
1370 else:
1371 logging.info(paradata)
1374 logging.info(f"[{self.__class__.__name__}] Fetching URIs of graphs containing snapshots with too many objects...")
1375 try:
1376 for current_bindings in self._paginate_query(template, limit):
1377 for b in current_bindings:
1378 g = b['g']['value']
1379 gen_time = b['genTime']['value']
1380 counter += 1
1381 out_row = (g, gen_time)
1382 if not self.issues_log_fp:
1383 output.append(out_row)
1384 else:
1385 out_row = make_json_safe(out_row)
1386 res_log.write(json.dumps(out_row, ensure_ascii=False) + '\n')
1387 logging.info(f"Found {counter} distinct graphs containing snapshots with too many objects for some properties.")
1388 finally:
1389 if self.issues_log_fp:
1390 res_log.close()
1392 if self.issues_log_fp:
1393 self.checkpoint_mngr.save(self.__class__.__name__, 'detection_done', -1)
1395 return output if not self.issues_log_fp else None
1397 def detect_issue_from_files(self, modified_graphs:dict) -> Union[None, List[str]]:
1399 output = []
1400 counter = 0
1402 paradata = get_process_paradata(self)
1403 if self.issues_log_fp:
1404 res_log = open(self.issues_log_fp, 'w', encoding='utf-8')
1405 res_log.write(json.dumps(paradata, ensure_ascii=False) + '\n')
1406 else:
1407 logging.info(paradata)
1410 logging.info(f"[{self.__class__.__name__}] Fetching URIs of graphs containing snapshots with too many objects from RDF files...")
1411 try:
1412 inval_dt_prop = 'http://www.w3.org/ns/prov#invalidatedAtTime'
1413 prim_source_prop = 'http://www.w3.org/ns/prov#hadPrimarySource'
1414 upd_query_prop = 'https://w3id.org/oc/ontology/hasUpdateQuery'
1416 for graph_obj in read_rdf_dump(self.dump_dir):
1417 if graph_obj['@id'] in modified_graphs:
1418 graph_obj = modified_graphs[graph_obj['@id']] # use the graph already modified by FillerFixer (simulation)
1419 switch = False
1420 for se_obj in graph_obj['@graph']:
1421 # check if any property among [inval_dt_prop, prim_source_prop, upd_query_prop] has more than one value
1422 if any(len(se_obj.get(p, [])) > 1 for p in [inval_dt_prop, prim_source_prop, upd_query_prop]):
1423 switch = True
1424 break
1425 if not switch:
1426 continue
1428 g = graph_obj['@id']
1429 gen_time = None
1431 for se_obj in graph_obj['@graph']:
1432 if se_obj['@id'] == g + 'se/1':
1433 genTime_values = [d['@value'] for d in se_obj.get('http://www.w3.org/ns/prov#generatedAtTime', [])]
1434 if len(genTime_values) == 1:
1435 gen_time = normalise_datetime(genTime_values[0])
1436 elif len(genTime_values) > 1:
1437 gen_time = min([normalise_datetime(dt) for dt in genTime_values], key=lambda x: datetime.fromisoformat(x)) # take the earliest datetime value
1438 break
1439 else: # i.e. no break
1440 logging.warning(f"No creation snapshot found for graph {g}. Skipping...")
1441 continue
1443 counter += 1
1444 out_row = (g, gen_time)
1445 if not self.issues_log_fp:
1446 output.append(out_row)
1447 else:
1448 out_row = make_json_safe(out_row)
1449 res_log.write(json.dumps(out_row, ensure_ascii=False) + '\n')
1450 logging.info(f"Found {counter} distinct graphs containing snapshots with too many objects for some properties.")
1451 finally:
1452 if self.issues_log_fp:
1453 res_log.close()
1455 if self.issues_log_fp:
1456 self.checkpoint_mngr.save(self.__class__.__name__, 'detection_done', -1)
1458 return output if not self.issues_log_fp else None
1461 def reset_multi_object_graphs(self, graphs:Union[str, list], batch_size=200):
1462 """
1463 Reset each graph in ``graphs`` by deleting the existing snapshots and creating a new
1464 creation snapshot, which will be the only one left for that graph.
1466 :param graphs: A list of tuples (graph URI, generation time) for graphs that have too many objects for properties that only admit one.
1467 :type graphs: Union[str, list]
1468 :returns: None
1469 """
1471 template = Template("""
1472 PREFIX prov: <http://www.w3.org/ns/prov#>
1473 PREFIX dcterms: <http://purl.org/dc/terms/>
1474 PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
1475 PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
1477 CLEAR GRAPH <$graph> ;
1478 INSERT DATA {
1479 GRAPH <$graph> {
1480 <$creation_snapshot> prov:hadPrimarySource <$primary_source> ;
1481 prov:wasAttributedTo <$processing_agent> ;
1482 prov:specializationOf <$specialization_of> ;
1483 dcterms:description "$description" ;
1484 rdf:type prov:Entity ;
1485 prov:generatedAtTime "$gen_time"^^xsd:dateTime .
1486 }
1487 }
1488 """)
1490 ckpt_mg = self.checkpoint_mngr if self.issues_log_fp else None
1492 logging.info(f"[{self.__class__.__name__}] Resetting graphs with too many objects by creating a new single creation snapshot...")
1494 # batch_size = 200 # updates are executed with individual queries anyway
1496 for batch_idx, (batch, line_num) in checkpointed_batch(
1497 graphs,
1498 batch_size,
1499 fixer_name=self.__class__.__name__,
1500 phase="reset_multi_object_graphs",
1501 ckpnt_mngr=ckpt_mg
1502 ):
1503 try:
1504 for g, gen_time in batch:
1505 creation_se = g + 'se/1'
1506 gen_time = gen_time.replace("^^xsd:dateTime", "")
1507 gen_time = gen_time.replace("^^http://www.w3.org/2001/XMLSchema#dateTime", "")
1508 prim_source = get_previous_meta_dump_uri(self.meta_dumps_pub_dates, gen_time)
1509 processing_agent = self.pa_uri
1510 referent = get_described_res_omid(g)
1511 desc = f"The entity '{referent}' has been created."
1513 query = template.substitute(
1514 graph = g,
1515 creation_snapshot = creation_se,
1516 primary_source = prim_source,
1517 processing_agent = processing_agent,
1518 specialization_of = referent,
1519 description = desc,
1520 gen_time = gen_time
1521 )
1523 self._update(query)
1524 logging.debug(f"[{self.__class__.__name__}] Batch {batch_idx} completed.")
1525 except Exception as e:
1526 logging.error(f"Error while resetting graphs in Batch {batch_idx} for graphs{line_num-batch_size} to {line_num}: {e}")
1527 print(f"Error while resetting graphs in Batch {batch_idx} for graphs{line_num-batch_size} to {line_num}: {e}")
1528 raise e
1529 logging.debug(f"[{self.__class__.__name__}] Graph-resetting process terminated.")
1530 return None
1533 def fix_issue(self, modified_graphs:dict=None):
1535 ckpt_mg = self.checkpoint_mngr if self.issues_log_fp else None
1537 # step 0: detect graphs and snapshots with multiple objects for 1-cardinality properties
1538 if not self.issues_log_fp:
1539 to_fix = self.detect_issue()
1540 else:
1541 if not detection_completed(self.__class__.__name__, ckpt_mg):
1542 if os.path.exists(self.issues_log_fp):
1543 logging.warning(f"Issues log file {self.issues_log_fp} already exists and will be overwritten.")
1544 if not self.dump_dir:
1545 self.detect_issue() # writes all issues to self.issues_log_fp
1546 else:
1547 self.detect_issue_from_files(modified_graphs)
1548 else:
1549 logging.warning(f"[{self.__class__.__name__}] Issues already detected: reading from file {self.issues_log_fp}")
1551 # step 1: reset graphs with a snapshot having extra objects
1552 if not self.issues_log_fp:
1553 self.reset_multi_object_graphs(to_fix)
1554 else:
1555 self.reset_multi_object_graphs(self.issues_log_fp)
1556 logging.info(f"[{self.__class__.__name__}] Fixing graphs with multiple objects for 1-cardinality properties terminated.")
1559def fix_process(
1560 endpoint: str,
1561 meta_dumps_pub_dates: List[Tuple[str, str]],
1562 issues_log_dir: Union[str, None] = None,
1563 dry_run: bool = False,
1564 checkpoint_fp='checkpoint.json'
1565):
1566 """
1567 Function wrapping all the single fix operations into a single process,
1568 with strictly ordered steps, checkpointing, and timing.
1570 :param endpoint: SPARQL endpoint URL
1571 :param meta_dumps_pub_dates: List of (date, url) tuples for meta dumps.
1572 :param issues_log_dir: Directory in which to write files storing detected issues.
1573 :param dry_run: If True, only print what would be done, don't actually do it.
1574 :param checkpoint_fp: Path to checkpoint file. If None, no checkpointing is performed.
1575 """
1577 ckpt_mngr = CheckpointManager(checkpoint_fp) # checkpoint file will be deleted if program terminates succesfully
1579 fixers = (
1580 FillerFixer(endpoint, issues_log_dir=issues_log_dir, checkpoint_fp=checkpoint_fp),
1581 DateTimeFixer(endpoint, issues_log_dir=issues_log_dir, checkpoint_fp=checkpoint_fp),
1582 MissingPrimSourceFixer(endpoint, meta_dumps_pub_dates, issues_log_dir=issues_log_dir, checkpoint_fp=checkpoint_fp),
1583 MultiPAFixer(endpoint, issues_log_dir=issues_log_dir, checkpoint_fp=checkpoint_fp),
1584 MultiObjectFixer(endpoint, meta_dumps_pub_dates, issues_log_dir=issues_log_dir, checkpoint_fp=checkpoint_fp),
1585 )
1586 steps = tuple(fixer.__class__.__name__ for fixer in fixers)
1588 timer = TimedProcess(total_phases=len(fixers))
1589 timer.start()
1591 for i, fixer in enumerate(fixers):
1592 try:
1593 logging.info(f"Created instance of {fixer.__class__.__qualname__}.")
1595 # --- resume logic: skip completed fixer ---
1596 if ckpt_mngr:
1597 state = ckpt_mngr.load()
1598 if (
1599 state.get("fixer") == fixer.__class__.__name__
1600 and state.get("phase") == "done"
1601 ) or (
1602 steps.index(state.get("fixer", steps[0])) > i # skip if a later fixer was completed
1603 ):
1604 logging.info(f"Skipping {fixer.__class__.__name__} (already completed).")
1605 continue
1607 # --- run fixer ---
1608 timer.start_phase()
1609 if not dry_run:
1610 fixer.fix_issue()
1611 else:
1612 logging.debug(f"[fix_process]: Would run {fixer.__class__.__name__}")
1613 phase_time = timer.end_phase()
1615 finally:
1616 if ckpt_mngr:
1617 logging.info(ckpt_mngr.load())
1619 # --- log progress ---
1620 elapsed, remaining = timer.eta(i)
1621 logging.info(
1622 f"{fixer.__class__.__name__} finished in {phase_time:.1f}s | "
1623 f"Elapsed: {elapsed/60:.1f}m | ETA: {remaining/60:.1f}m"
1624 )
1626 # --- mark fixer as done ---
1627 if ckpt_mngr:
1628 ckpt_mngr.save(fixer.__class__.__name__, "done", -1)
1629 logging.info(f"{fixer.__class__.__name__} completed.")
1631 # clear checkpoint only when the whole pipeline is done
1632 logging.info(" ----- All fixing operations terminated. ----- ")
1633 if ckpt_mngr:
1634 ckpt_mngr.clear()
1635 return None
1639def fix_process_reading_from_files(
1640 endpoint: str,
1641 dump_dir:str,
1642 meta_dumps_pub_dates: List[Tuple[str, str]],
1643 issues_log_dir: str,
1644 dry_run: bool = False,
1645 checkpoint_fp='checkpoint.json'
1646):
1647 """
1648 Function wrapping all the single fix operations into a single process,
1649 with strictly ordered steps, checkpointing, and timing. Reads from RDF dump files for detecting issues.
1651 :param endpoint: SPARQL endpoint URL
1652 :param dump_dir: Directory containing RDF dump files to read from.
1653 :param meta_dumps_pub_dates: List of (date, url) tuples for meta dumps.
1654 :param issues_log_dir: Directory in which to write files storing detected issues.
1655 :param dry_run: If True, only print what would be done, don't actually do it.
1656 :param checkpoint_fp: Path to checkpoint file. If None, no checkpointing is performed.
1657 """
1659 ckpt_mngr = CheckpointManager(checkpoint_fp) # checkpoint file will be deleted if program terminates succesfully
1661 fixers = (
1662 FillerFixer(endpoint, dump_dir=dump_dir, issues_log_dir=issues_log_dir, checkpoint_fp=checkpoint_fp),
1663 DateTimeFixer(endpoint, dump_dir=dump_dir, issues_log_dir=issues_log_dir, checkpoint_fp=checkpoint_fp),
1664 MissingPrimSourceFixer(endpoint, meta_dumps_pub_dates, dump_dir=dump_dir, issues_log_dir=issues_log_dir, checkpoint_fp=checkpoint_fp),
1665 MultiPAFixer(endpoint, dump_dir=dump_dir, issues_log_dir=issues_log_dir, checkpoint_fp=checkpoint_fp),
1666 MultiObjectFixer(endpoint, meta_dumps_pub_dates, dump_dir=dump_dir, issues_log_dir=issues_log_dir, checkpoint_fp=checkpoint_fp),
1667 )
1668 steps = tuple(fixer.__class__.__name__ for fixer in fixers)
1669 timer = TimedProcess(total_phases=len(fixers))
1670 timer.start()
1672 modified_graphs_mapping = dict() # to keep track of graphs modified by FillerFixer
1673 TMP_FILE_SIMULATED_GRAPHS = os.path.join(issues_log_dir if issues_log_dir else '.', 'temp_simulated_graphs.json') # store to file for checkpointing
1675 for i, fixer in enumerate(fixers):
1676 try:
1677 logging.info(f"Created instance of {fixer.__class__.__qualname__}.")
1679 # --- resume logic: skip completed fixer ---
1680 if ckpt_mngr:
1681 state = ckpt_mngr.load()
1682 if (
1683 state.get("fixer") == fixer.__class__.__name__
1684 and state.get("phase") == "done"
1685 ) or (
1686 steps.index(state.get("fixer", steps[0])) > i # skip if a later fixer was completed
1687 ):
1688 logging.info(f"Skipping {fixer.__class__.__name__} (already completed).")
1689 continue
1691 # --- run fixer ---
1692 timer.start_phase()
1693 if not dry_run:
1694 if i == 0: # i.e. FillerFixer
1695 fixer.fix_issue()
1696 mod_graphs_uris = load_modified_graphs_uris(fixer.issues_log_fp)
1697 logging.info(f"Simulating FillerFixer changes for {len(mod_graphs_uris)} graphs...")
1698 modified_graphs_mapping.update({g['@id']: simulate_ff_changes(g) for g in read_rdf_dump(dump_dir) if g['@id'] in mod_graphs_uris})
1700 # save simulated graphs to temp file
1701 with open(TMP_FILE_SIMULATED_GRAPHS, 'w', encoding='utf-8') as tf:
1702 json.dump(modified_graphs_mapping, tf)
1705 else: # i.e. all other fixers
1706 with open(TMP_FILE_SIMULATED_GRAPHS, 'r', encoding='utf-8') as tf:
1707 modified_graphs_mapping = json.load(tf)
1708 fixer.fix_issue(modified_graphs=modified_graphs_mapping)
1710 else:
1711 logging.debug(f"[fix_process]: Would run {fixer.__class__.__name__}")
1712 phase_time = timer.end_phase()
1714 finally:
1715 if ckpt_mngr:
1716 logging.info(f"Latest checkpoint state: {ckpt_mngr.load()}")
1718 # --- log progress ---
1719 elapsed, remaining = timer.eta(i)
1720 logging.info(
1721 f"{fixer.__class__.__name__} finished in {phase_time:.1f}s | "
1722 f"Elapsed: {elapsed/60:.1f}m | ETA: {remaining/60:.1f}m"
1723 )
1725 # --- mark fixer as done ---
1726 if ckpt_mngr:
1727 ckpt_mngr.save(fixer.__class__.__name__, "done", -1)
1728 logging.info(f"{fixer.__class__.__name__} completed.")
1730 # clear checkpoint and remove temp file for simulated graphs only when the whole pipeline is done
1731 logging.info(" ----- All fixing operations terminated. ----- ")
1732 if ckpt_mngr:
1733 ckpt_mngr.clear()
1734 # if os.path.exists(TMP_FILE_SIMULATED_GRAPHS):
1735 # os.remove(TMP_FILE_SIMULATED_GRAPHS) # remove temp file storing simulated graphs
1737 return None