Coverage for meta_prov_fixer / src.py: 85%
518 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 15:12 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 15:12 +0000
1from rdflib import Graph, Dataset, URIRef, Literal
2from rdflib.namespace import XSD, PROV, DCTERMS, RDF
3from typing import List, Union, Dict, Tuple, Generator, Iterable, Optional
4from meta_prov_fixer.utils import remove_seq_num, get_seq_num, normalise_datetime, get_previous_meta_dump_uri, get_described_res_omid, \
5read_rdf_dump, get_rdf_prov_filepaths, batched, get_graph_uri_from_se_uri
6import logging
7from string import Template
8import re
9import json
10from tqdm import tqdm
11from sparqlite import SPARQLClient, QueryError, EndpointError
12from typing import TextIO
13from datetime import datetime, date, timezone
14import os
15from pathlib import Path
16import logging
17import traceback
18import time
19from zipfile import ZipFile, ZIP_DEFLATED
20from enum import IntEnum
23class Step(IntEnum):
24 START = 0
25 FILLER = 1
26 DATETIME = 2
27 MISSING_PS = 3
28 MULTI_PA = 4
29 MULTI_OBJECT = 5
30 WRITE_FILE = 6
33class Checkpoint:
34 def __init__(self, path: str):
35 self.path = path
36 self.state = None
37 self.dirty = False
38 self.load()
40 def load(self):
41 if os.path.exists(self.path):
42 with open(self.path, "r", encoding="utf-8") as f:
43 self.state = json.load(f)
44 else:
45 self.state = None
47 def _atomic_write(self, data: dict, retries=5, delay=0.05):
48 tmp_path = self.path + ".tmp"
49 with open(tmp_path, "w", encoding="utf-8") as f:
50 json.dump(data, f, indent=2)
51 for i in range(retries):
52 try:
53 os.replace(tmp_path, self.path)
54 return
56 # catch PermissionError: [WinError 5] Accesso negato: 'fix_prov.checkpoint.json.tmp' -> 'fix_prov.checkpoint.json'
57 except PermissionError:
58 time.sleep(delay)
60 raise PermissionError(f"Failed to replace checkpoint file after {retries} retries")
62 def update_state(
63 self,
64 file_index: int,
65 file_path: str,
66 step: Step,
67 endpoint_done: bool,
68 local_done: bool
69 ):
70 self.state = {
71 "file_index": file_index,
72 "file_path": file_path,
73 "step": step.name,
74 "endpoint_done": endpoint_done,
75 "local_done": local_done,
76 "timestamp": datetime.now(timezone.utc).isoformat()
77 }
78 self.dirty = True
80 def flush(self):
81 if self.dirty and self.state:
82 self._atomic_write(self.state)
83 self.dirty = False
85 def should_skip_file(self, idx: int) -> bool:
86 return self.state and idx < self.state["file_index"]
88 def step_completed(self, step: Step, file_index:int) -> bool:
89 if not self.state:
90 return False
91 return bool((Step[self.state["step"]] >= step) and self.state["file_index"] >= file_index)
94# --- Caching mechanism for filler issues detection --- #
96def _atomic_json_write(path: str, data: dict):
97 tmp = path + ".tmp"
98 with open(tmp, "w", encoding="utf-8") as f:
99 json.dump(data, f)
100 os.replace(tmp, path)
103def load_or_prepare_filler_issues(data_dir: str, cache_fp: str="filler_issues.cache.json"):
105 if os.path.exists(cache_fp):
106 with open(cache_fp, "r", encoding="utf-8") as f:
107 cached = json.load(f)
109 if cached.get("data_dir") == os.path.abspath(data_dir):
110 filler_issues = [
111 (URIRef(g), {
112 "to_delete": [URIRef(u) for u in v["to_delete"]],
113 "remaining_snapshots": [URIRef(u) for u in v["remaining_snapshots"]],
114 })
115 for g, v in cached["filler_issues"]
116 ]
117 return filler_issues, cached["tot_files"], cache_fp
119 # data_dir mismatch -> invalidate cache
120 os.remove(cache_fp)
122 # Cache miss -> compute
123 filler_issues, tot_files = prepare_filler_issues(data_dir)
125 serializable = [
126 (
127 str(g),
128 {
129 "to_delete": [str(u) for u in d["to_delete"]],
130 "remaining_snapshots": [str(u) for u in d["remaining_snapshots"]],
131 }
132 )
133 for g, d in filler_issues
134 ]
136 payload = {
137 "data_dir": os.path.abspath(data_dir),
138 "created_at": datetime.now(timezone.utc).isoformat(),
139 "tot_files": tot_files,
140 "filler_issues": serializable,
141 }
143 _atomic_json_write(cache_fp, payload)
145 return filler_issues, tot_files, cache_fp
149class FillerFixerFile:
151 def __init__(self, endpoint):
152 pass
153 self.endpoint = endpoint
155 def detect(graph:Graph) -> Optional[Tuple[URIRef, Dict[str, List[URIRef]]]]:
156 """
157 Detects the issues in the input graph. If no filler snapshots are found, return None.
158 Else, a 2-elements tuple is returned, where the first element is the URIRef object of the
159 graph's identifier, and the second element is a dictionary with "to_delete" and
160 "remaining_snapshots" keys, both having as their value a list of URIRef objects, respectively
161 representing the fillers snapshots that must be deleted and the snapshots that should be kept
162 (but must be renamed).
164 :param graph: the named graph for the provenance of an entity
165 """
167 # out = (URIRef(graph.identifier), {"to_delete":[], "remaining_snapshots":[]})
169 snapshots = list(graph.subjects(unique=True))
170 if len(snapshots) == 1:
171 return None
173 creation_se = URIRef(str(graph.identifier) + 'se/1')
174 fillers = set()
175 remaining = set()
177 for s in snapshots:
178 if s == creation_se:
179 remaining.add(s)
180 continue
181 if (s, URIRef('https://w3id.org/oc/ontology/hasUpdateQuery'), None) not in graph:
182 if (s, DCTERMS.description, None) not in graph:
183 fillers.add(s)
184 else:
185 for desc_val in graph.objects(s, DCTERMS.description, unique=True):
186 if "merged" not in str(desc_val).lower():
187 fillers.add(s)
188 if s not in fillers:
189 remaining.add(s)
191 if not fillers:
192 return None
194 out = (
195 URIRef(graph.identifier),
196 {
197 "to_delete":list(fillers),
198 "remaining_snapshots":list(remaining)
199 }
200 )
202 return out
205 @staticmethod
206 def map_se_names(to_delete:set, remaining: set) -> dict:
207 """
208 Associates a new URI value to each snapshot URI in the union of ``to_delete`` and ``remaining`` (containing snapshot URIs).
210 Values in the mapping dictionary are not unique, i.e., multiple old URIs can be mapped to the same new URI.
211 If ``to_delete`` is empty, the returned dictionary will have identical keys and values, i.e., the URIs will not change.
212 Each URI in ``to_delete`` will be mapped to the new name of the URI in ``remaining`` that immediately precedes it in
213 a sequence ordered by sequence number.
215 **Examples:**
217 .. code-block:: python
219 to_delete = {'https://w3id.org/oc/meta/br/06101234191/prov/se/3'}
220 remaining = {'https://w3id.org/oc/meta/br/06101234191/prov/se/1', 'https://w3id.org/oc/meta/br/06101234191/prov/se/2', 'https://w3id.org/oc/meta/br/06101234191/prov/se/4'}
222 # The returned mapping will be:
223 {
224 'https://w3id.org/oc/meta/br/06101234191/prov/se/1': 'https://w3id.org/oc/meta/br/06101234191/prov/se/1',
225 'https://w3id.org/oc/meta/br/06101234191/prov/se/2': 'https://w3id.org/oc/meta/br/06101234191/prov/se/2',
226 'https://w3id.org/oc/meta/br/06101234191/prov/se/3': 'https://w3id.org/oc/meta/br/06101234191/prov/se/2',
227 'https://w3id.org/oc/meta/br/06101234191/prov/se/4': 'https://w3id.org/oc/meta/br/06101234191/prov/se/3'
228 }
230 :param to_delete: A set of snapshot URIs that should be deleted.
231 :type to_delete: set
232 :param remaining: A set of URIs of snapshots that should remain in the graph (AFTER BEING RENAMED).
233 :type remaining: set
234 :returns: A dictionary mapping old snapshot URIs to their new URIs.
235 :rtype: dict
236 """
237 to_delete :set = {str(el) for el in to_delete}
238 remaining :set = {str(el) for el in remaining}
239 all_snapshots:list = sorted(to_delete|remaining, key=lambda x: get_seq_num(x)) # sorting is required!
241 mapping = {}
242 sorted_remaining = []
243 base_uri = remove_seq_num(all_snapshots[0])
245 if not all(u.startswith(base_uri) for u in all_snapshots):
246 logging.error(f"All snapshots must start with the same base URI: {base_uri}. Found: {all_snapshots}")
247 raise ValueError(f"Can rename only snapshots that are included in the same named graph.")
249 for old_uri in all_snapshots:
250 if old_uri in remaining:
251 new_uri = f"{base_uri}{len(sorted_remaining)+1}"
252 mapping[old_uri] = new_uri
253 sorted_remaining.append(new_uri)
255 else: # i.e., elif old_uri in to_delete
256 try:
257 new_uri = f"{base_uri}{get_seq_num(sorted_remaining[-1])}"
258 except IndexError:
259 # all snapshots are fillers (must be deleted), including the first one (creation)
260 logging.error(f"The first snapshot {old_uri} is a filler. Cannot rename the remaining snapshots.")
262 mapping[old_uri] = new_uri
264 return mapping
266 @staticmethod
267 def make_global_rename_map(graphs_with_fillers:Iterable):
268 """
269 Create a dictionary of the form <old_subject_uri>:<new_snapshot_name>.
271 :param graphs_with_fillers: an Iterable consisting of all the graphs containing filler snapshots,
272 the snapshots to delete in that graph, and the other snapshot in that same graph (part of
273 which must be renamed).
274 :type graphs_with_fillers: Iterable
275 """
276 out = dict()
278 for g, _dict in graphs_with_fillers:
279 mapping = FillerFixerFile.map_se_names(_dict['to_delete'], _dict['remaining_snapshots'])
280 for k, v in mapping.items():
281 if k != v:
282 out[k] = v
283 return out
286 def fix_local_graph(ds: Dataset, graph:Graph, global_rename_map:dict, fillers_issues_lookup:dict) -> None:
288 # delete all triples where subject is a filler (in local graph)
289 for snapshot_node, _, _, _ in ds.quads((None, None, None, graph.identifier)):
290 if str(snapshot_node) in global_rename_map:
291 if snapshot_node in fillers_issues_lookup[graph.identifier]['to_delete']:
292 ds.remove((URIRef(snapshot_node), None, None, graph.identifier))
294 # replace objects that used to be fillers snapshots (in this graph or in other graphs, using global mapping)
295 for subj, pred, obj, _ in ds.quads((None, None, None, graph.identifier)):
296 new_subj_name = subj
297 new_obj_name = obj
298 if str(subj) in global_rename_map:
299 new_subj_name = URIRef(global_rename_map.get(str(subj), subj))
300 if str(obj) in global_rename_map:
301 new_obj_name = URIRef(global_rename_map.get(str(obj), obj))
302 if (new_subj_name!=subj) or (new_obj_name!=obj):
303 ds.remove((subj, pred, obj, graph.identifier))
304 ds.add((new_subj_name, pred, new_obj_name, graph.identifier))
306 # adapt invalidatedAtTime relationships (in local graph only)
307 unsorted_curr_snpsts_strngs = []
308 for s in ds.graph(graph.identifier).subjects(unique=True):
309 unsorted_curr_snpsts_strngs.append(str(s))
310 snapshots_strings:list = sorted(unsorted_curr_snpsts_strngs, key=lambda x: get_seq_num(str(x)))
312 for s, following_se in zip(snapshots_strings, snapshots_strings[1:]):
313 try:
314 new_invaldt = min(
315 list(ds.graph(graph.identifier).objects(URIRef(following_se), PROV.generatedAtTime, unique=True)),
316 key=lambda x: normalise_datetime(str(x))
317 )
318 except ValueError:
319 # TODO: This should be a very rare case, but consider implementing a more robust handling
320 logging.error(f"Cannot find prov:generatedAtTime for snapshot {following_se} in graph {graph.identifier}. Skipping invalidatedAtTime update for snapshot {s}.")
321 continue
323 for old_invaldt in ds.graph(graph.identifier).objects(URIRef(s), PROV.invalidatedAtTime, unique=True):
324 ds.remove((URIRef(s), PROV.invalidatedAtTime, old_invaldt, graph.identifier))
325 ds.add((URIRef(s), PROV.invalidatedAtTime, new_invaldt, graph.identifier))
327 def build_delete_sparql_query(local_deletions:tuple)->str:
328 """
329 Makes the SPARQL query text for deleting snapshots from the triplestore based on the provided deletions list.
331 :param deletions: A tuple or list where the first element is a graph URI,
332 and the second is a dictionary with `'to_delete'` and `'remaining_snapshots'` sets.
333 """
335 deletion_template = Template("""
336 $dels
337 """)
339 # step 1: delete filler snapshots in the role of subjects
340 dels = []
341 g_uri, values = local_deletions
342 for se_to_delete in values['to_delete']:
343 single_del = f"DELETE WHERE {{ GRAPH <{str(g_uri)}> {{ <{str(se_to_delete)}> ?p ?o . }}}};\n"
344 dels.append(single_del)
345 dels_str = " ".join(dels)
346 query_str = deletion_template.substitute(dels=dels_str)
347 return query_str
349 def build_rename_sparql_query(local_mapping:dict) -> str:
350 """
351 Makes the SPARQL query text to rename snapshots in the triplestore according to the provided mapping.
353 :param local_mapping: A dictionary where keys are old snapshot URIs and values are new snapshot URIs.
354 :type local_mapping: dict
355 """
357 mapping = dict(sorted(local_mapping.items(), key=lambda i: get_seq_num(i[0])))
359 per_snapshot_template = Template("""
360 DELETE {
361 GRAPH ?g {
362 <$old_uri> ?p ?o .
363 ?s ?p2 <$old_uri> .
364 }
365 }
366 INSERT {
367 GRAPH ?g {
368 <$new_uri> ?p ?o .
369 ?s ?p2 <$new_uri> .
370 }
371 }
372 WHERE {
373 GRAPH ?g {
374 {
375 <$old_uri> ?p ?o .
376 }
377 UNION
378 {
379 ?s ?p2 <$old_uri> .
380 }
381 }
382 }
383 """)
386 per_snapshot_portions = []
387 for old_uri, new_uri in mapping.items():
388 if old_uri == new_uri:
389 continue
390 query_portion = per_snapshot_template.substitute(old_uri=old_uri, new_uri=new_uri)
391 per_snapshot_portions.append(query_portion)
393 final_query_str = ";\n".join(per_snapshot_portions)
395 return final_query_str
398 def build_adapt_invaltime_sparql_query(graph_uri: str, local_snapshots: list) -> str:
399 """
400 Update the ``prov:invalidatedAtTime`` property of each snapshot in the provided list to match
401 the value of ``prov:generatedAtTime`` of the following snapshot.
403 :param graph_uri: The URI of the named graph containing the snapshots.
404 :type graph_uri: str
405 :param local_snapshots: A list of snapshot URIs.
406 :type local_snapshots: list
407 :returns: None
408 """
410 snapshots = sorted(local_snapshots, key=lambda x: get_seq_num(x)) # sorting is required!
411 per_snaphot_template = Template("""
412 DELETE {
413 GRAPH <$graph> { <$snapshot> prov:invalidatedAtTime ?old_time . }
414 }
415 INSERT {
416 GRAPH <$graph> { <$snapshot> prov:invalidatedAtTime ?new_time . }
417 }
418 WHERE {
419 GRAPH <$graph> {
420 OPTIONAL {
421 <$snapshot> prov:invalidatedAtTime ?old_time .
422 }
423 <$following_snapshot> prov:generatedAtTime ?new_time .
424 }
425 }
426 """)
428 per_snapshot_portions = []
430 for s, following_se in zip(snapshots, snapshots[1:]):
431 query_portion = per_snaphot_template.substitute(
432 graph=graph_uri,
433 snapshot=s,
434 following_snapshot=following_se
435 )
437 per_snapshot_portions.append(query_portion)
439 final_query_str = "PREFIX prov: <http://www.w3.org/ns/prov#>\n" + ";\n".join(per_snapshot_portions)
441 return final_query_str
445class DateTimeFixerFile:
447 def __init__(self):
448 pass
450 def detect(graph:Graph) -> Optional[List[Tuple[URIRef]]]:
452 result = []
453 pattern_dt = r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:(?:\+00:00)|Z)$"
455 for s, p, o in graph.triples((None, None, None)):
456 if p in (PROV.generatedAtTime, PROV.invalidatedAtTime):
457 if not re.match(pattern_dt, str(o)):
458 result.append((graph.identifier, s, p, o))
459 return result
461 def fix_local_graph(ds:Dataset, graph:Graph, to_fix:tuple) -> None:
463 for g_uri, subj, prop, obj in to_fix:
464 correct_dt_res = Literal(normalise_datetime(str(obj)), datatype=XSD.dateTime)
465 # ds.set((subj, prop, correct_dt_res))
466 ds.remove((subj, prop, obj, g_uri))
467 ds.add((subj, prop, correct_dt_res, g_uri))
469 def build_update_query(to_fix:List[Tuple[URIRef]]):
470 template = Template('''
471 PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
473 DELETE DATA {
474 $to_delete
475 } ;
476 INSERT DATA {
477 $to_insert
478 }
479 ''')
481 to_delete = []
482 to_insert = []
483 for g, s, p, dt in list(to_fix):
484 g = str(g)
485 s = str(s)
486 p = str(p)
487 dt = str(dt)
489 to_delete.append(f'GRAPH <{g}> {{ <{s}> <{p}> "{dt}"^^xsd:dateTime . }}\n')
490 correct_dt = normalise_datetime(dt)
491 to_insert.append(f'GRAPH <{g}> {{ <{s}> <{p}> "{correct_dt}"^^xsd:dateTime . }}\n')
493 to_delete_str = "\n ".join(to_delete)
494 to_insert_str = "\n ".join(to_insert)
495 query = template.substitute(to_delete=to_delete_str, to_insert=to_insert_str)
496 return query
499class MissingPrimSourceFixerFile:
501 def __init__(self, meta_dumps_pub_dates):
502 self.meta_dumps = meta_dumps_pub_dates
504 def detect(graph:Graph) -> Optional[Tuple[URIRef, Literal]]:
506 creation_se_uri = URIRef(graph.identifier + 'se/1')
507 if ((creation_se_uri, PROV.generatedAtTime, None) in graph) and not ((creation_se_uri, PROV.hadPrimarySource, None) in graph):
508 try:
509 genTime = min(graph.objects(creation_se_uri, PROV.generatedAtTime))
510 except ValueError:
511 logging.warning(f"Could not find generatedAtTime value for creation snapshot {creation_se_uri}. Skipping MissingPrimSourceFixerFile detection.")
512 return None
513 return (creation_se_uri, genTime)
515 def fix_local_graph(ds:Dataset, graph:Graph, to_fix:tuple, meta_dumps) -> None:
517 primSource_str = get_previous_meta_dump_uri(meta_dumps, str(to_fix[1]))
518 ds.add((to_fix[0], PROV.hadPrimarySource, URIRef(primSource_str), graph.identifier))
521 def build_update_query(to_fix:List[Tuple[URIRef, Literal]], meta_dumps):
523 template = Template("""
524 PREFIX prov: <http://www.w3.org/ns/prov#>
526 INSERT DATA {
527 $quads
528 }
529 """)
531 fixes = []
532 for snapshot_uri, gen_time in to_fix:
533 snapshot_uri = str(snapshot_uri)
534 gen_time = str(gen_time)
535 prim_source_uri = get_previous_meta_dump_uri(meta_dumps, gen_time)
536 graph_uri = get_graph_uri_from_se_uri(snapshot_uri)
537 fixes.append(f"GRAPH <{graph_uri}> {{ <{snapshot_uri}> prov:hadPrimarySource <{prim_source_uri}> . }}\n")
538 quads_str = " ".join(fixes)
539 query = template.substitute(quads=quads_str)
541 return query
545class MultiPAFixerFile:
547 def __init__(self):
548 pass
550 def detect(graph:Graph) -> Optional[List[Tuple[URIRef]]]:
551 result = []
552 for s, _, o in graph.triples((None, PROV.wasAttributedTo, None)):
553 processing_agents = list(graph.objects(s, PROV.wasAttributedTo, unique=True))
554 if len(processing_agents) > 1 and URIRef('https://w3id.org/oc/meta/prov/pa/1') in processing_agents:
556 result.append((graph.identifier, s))
557 return result
560 def fix_local_graph(ds:Dataset, graph:Graph, to_fix:List[Tuple[URIRef]]) -> None:
562 for g_uri, subj in to_fix:
563 # ds.set((subj, PROV.wasAttributedTo, URIRef('https://w3id.org/oc/meta/prov/pa/2')))
564 for obj in ds.objects(subj, PROV.wasAttributedTo, unique=True):
565 ds.remove((subj, PROV.wasAttributedTo, obj, g_uri))
566 ds.add((subj, PROV.wasAttributedTo, URIRef('https://w3id.org/oc/meta/prov/pa/2'), g_uri))
569 def build_update_query(to_fix:List[Tuple[URIRef]]):
571 template = Template("""
572 PREFIX prov: <http://www.w3.org/ns/prov#>
573 PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
575 DELETE DATA {
576 $quads_to_delete
577 } ;
578 INSERT DATA {
579 $quads_to_insert
580 }
581 """)
583 to_delete = []
584 to_insert = []
585 for g, s in to_fix:
586 g = str(g)
587 s = str(s)
588 to_delete.append(f"GRAPH <{g}> {{ <{s}> prov:wasAttributedTo <https://orcid.org/0000-0002-8420-0696> . }}\n") # deletes Arcangelo's ORCID
589 to_delete.append(f"GRAPH <{g}> {{ <{s}> prov:wasAttributedTo <https://w3id.org/oc/meta/prov/pa/1> . }}\n") # deletes Meta's default processing agent (for ingestions only)
590 to_insert.append(f"GRAPH <{g}> {{ <{s}> prov:wasAttributedTo <https://w3id.org/oc/meta/prov/pa/2> . }}\n") # inserts Meta's processsing agent for modification processes
592 to_delete_str = " ".join(to_delete)
593 to_insert_str = " ".join(to_insert)
594 query = template.substitute(quads_to_delete=to_delete_str, quads_to_insert=to_insert_str)
596 return query
599class MultiObjectFixerFile:
601 def __init__(self):
602 pass
604 def detect(graph:Graph) -> Optional[Tuple[URIRef, Literal]]:
606 creation_se_uri = URIRef(graph.identifier + 'se/1')
608 for prop in {PROV.invalidatedAtTime, PROV.hadPrimarySource, URIRef('https://w3id.org/oc/ontology/hasUpdateQuery')}:
609 for s in graph.subjects():
610 if len(list(graph.objects(s, prop, unique=True))) > 1:
611 try:
612 creation_gen_time = min(graph.objects(creation_se_uri, PROV.generatedAtTime, unique=True))
613 except ValueError:
614 logging.error(f"Could not find generatedAtTime value for creation snapshot {creation_se_uri}. Skipping MultiObjectFixerFile detection.")
615 return None
616 return (graph.identifier, creation_gen_time)
619 def fix_local_graph(ds:Dataset, graph:Graph, to_fix:tuple, meta_dumps) -> None:
621 creation_se_uri = URIRef(graph.identifier + 'se/1')
622 genTime_str = to_fix[1]
623 primSource_str = get_previous_meta_dump_uri(meta_dumps, genTime_str)
624 referent = URIRef(get_described_res_omid(str(creation_se_uri)))
625 desc = Literal(f"The entity '{str(referent)}' has been created.")
626 triples_to_add = (
627 (creation_se_uri, PROV.hadPrimarySource, URIRef(primSource_str)),
628 (creation_se_uri, PROV.wasAttributedTo, URIRef('https://w3id.org/oc/meta/prov/pa/1')),
629 (creation_se_uri, PROV.specializationOf, referent),
630 (creation_se_uri, DCTERMS.description, desc),
631 (creation_se_uri, RDF.type, PROV.Entity),
632 (creation_se_uri, PROV.generatedAtTime, Literal(genTime_str, datatype=XSD.dateTime))
633 )
635 ds.remove((None, None, None, graph.identifier))
636 for t in triples_to_add:
637 quad = t + (graph.identifier, )
638 ds.add(quad)
641 def build_update_query(to_fix, meta_dumps, pa_uri="https://w3id.org/oc/meta/prov/pa/1"):
643 prefixes = """
644 PREFIX prov: <http://www.w3.org/ns/prov#>
645 PREFIX dcterms: <http://purl.org/dc/terms/>
646 PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
647 PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>\n\n
648 """
650 per_graph_template = Template("""
651 CLEAR GRAPH <$graph> ;
652 INSERT DATA {
653 GRAPH <$graph> {
654 <$creation_snapshot> prov:hadPrimarySource <$primary_source> ;
655 prov:wasAttributedTo <$processing_agent> ;
656 prov:specializationOf <$specialization_of> ;
657 dcterms:description "$description" ;
658 rdf:type prov:Entity ;
659 prov:generatedAtTime "$gen_time"^^xsd:dateTime .
660 }
661 }
662 """)
664 query_parts = []
665 for g, gen_time in to_fix:
666 g = str(g)
667 gen_time = str(gen_time)
668 creation_se = g + 'se/1'
669 gen_time = gen_time.replace("^^xsd:dateTime", "")
670 gen_time = gen_time.replace("^^http://www.w3.org/2001/XMLSchema#dateTime", "")
671 prim_source = get_previous_meta_dump_uri(meta_dumps, gen_time)
672 processing_agent = pa_uri
673 referent = get_described_res_omid(g)
674 desc = f"The entity '{referent}' has been created."
676 per_graph_part = per_graph_template.substitute(
677 graph = g,
678 creation_snapshot = creation_se,
679 primary_source = prim_source,
680 processing_agent = processing_agent,
681 specialization_of = referent,
682 description = desc,
683 gen_time = gen_time
684 )
685 query_parts.append(per_graph_part)
687 query = prefixes + " ; \n\n".join(query_parts)
689 return query
693def prepare_filler_issues(data_dir)->Tuple[List[tuple], int]:
695 result = []
696 tot_files = len(get_rdf_prov_filepaths(data_dir))
698 for file_data in tqdm(
699 read_rdf_dump(data_dir, whole_file=True),
700 desc=f'Detecting graphs with fillers...',
701 total=tot_files,
702 dynamic_ncols=True
703 ):
704 stringified_data = json.dumps(file_data)
705 d = Dataset(default_union=True)
706 d.parse(data=stringified_data, format='json-ld')
708 for graph in d.graphs():
709 issues_in_graph = FillerFixerFile.detect(graph)
710 if issues_in_graph:
711 result.append(issues_in_graph)
712 return result, tot_files
716def sparql_update(
717 client: SPARQLClient,
718 update_query: str,
719 failed_log_fp: str,
720) -> bool:
721 """
722 Execute a SPARQL UPDATE via client.update().
724 Uses the client's built-in retry settings. If the update still fails
725 after all retries, writes the query to `failed_log`.
727 Returns:
728 True if the update succeeded, False if it failed and was logged.
729 """
730 try:
731 client.update(update_query)
732 return True # success
734 except QueryError as exc:
735 # log syntax errors that weren't recoverable
736 msg = str(exc)[:1000].replace('\n', '\\n ')
737 logging.warning(f"SPARQL UPDATE failed after retries: {type(exc).__name__}: {msg}...")
738 with open(failed_log_fp, 'a', encoding='utf-8') as lf:
739 lf.write(update_query.replace("\n", "\\n") + "\n")
740 lf.write(f"# Failure: {type(exc).__name__}: {exc}\n\n")
741 lf.flush()
742 return False
744 except EndpointError as e:
745 # log endpoint errors that weren't recoverable
746 msg = str(e)[:1000].replace('\n', '\\n ')
747 logging.warning(f"SPARQL UPDATE failed after retries: {type(e).__name__}: {msg}...")
748 with open(failed_log_fp, 'a', encoding='utf-8') as lf:
749 lf.write(update_query.replace("\n", "\\n") + "\n")
750 lf.write(f"# Failure: {type(e).__name__}: {e}\n\n")
751 lf.flush()
753 logging.warning(f"EndpointError --> Possible DB warmup underway: sleeping 15 minutes before sending other udpates...")
754 time.sleep(900)
756 return False
758 finally:
759 time.sleep(0.1) # brief pause to avoid overwhelming the endpoint with rapid retries or subsequent updates
762def fix_provenance_process(
763 endpoint,
764 data_dir,
765 out_dir,
766 meta_dumps_register,
767 dry_run_db=False,
768 dry_run_files=False,
769 dry_run_callback=None,
770 chunk_size=100,
771 failed_queries_fp=f"prov_fix_failed_queries_{datetime.today().strftime('%Y-%m-%d')}.txt",
772 overwrite_ok=False,
773 resume=True,
774 checkpoint_fp="fix_prov.checkpoint.json",
775 cache_fp="filler_issues.cache.json",
776 client_recreate_interval=100,
777 zip_output=True,
778 ):
779 """
780 Fix OpenCitations Meta provenance issues found in RDF dump files and optionally apply fixes to a
781 SPARQL endpoint and output files.
783 This function processes all provenance JSON-LD files in ``data_dir`` and detects common
784 issues: fillers, invalid datetime formats, missing primary sources, multiple processing
785 agents, and multiple object occurrences. It applies fixes locally to an ``rdflib.Dataset``
786 corresponding to each file and, unless ``dry_run_db`` is ``True``, issues SPARQL UPDATE
787 requests to ``endpoint``. If ``dry_run_files`` is ``False``, fixed datasets are dumped to
788 a file in a subdirectory of ``out_dir`` with a path derived from the input file path
789 relative to ``data_dir`` (else, no output files are written).
791 :param str endpoint: URL of the SPARQL endpoint used to apply updates.
792 :param str data_dir: Path to the directory containing provenance JSON-LD dump files to
793 process.
794 :param str out_dir: Path where fixed JSON-LD files will be written.
795 :param Iterable[Tuple[str, str]] meta_dumps_register: Iterable of ``(publication_date_iso,
796 dump_url)`` pairs used to compute primary source URIs.
797 :param bool dry_run_db: If ``True``, no SPARQL updates are sent to the endpoint.
798 Defaults to ``False``.
799 :param bool dry_run_files: If ``True``, no output files are written to ``out_dir``.
800 Defaults to ``False``.
801 :param callable dry_run_callback: Callback invoked when ``dry_run_db`` is ``True`` with
802 signature ``(file_path, (ff_issues, dt_issues, mps_issues, pa_issues, mo_issues))``.
803 :param int chunk_size: Number of items per SPARQL update batch. Defaults to ``100``.
804 :param str failed_queries_fp: Path to a log file where failing SPARQL queries are appended.
805 Defaults to ``prov_fix_failed_queries_YYYY-MM-DD.txt``.
806 :param bool overwrite_ok: When ``False``, a :class:`FileExistsError` is raised if a target
807 output file already exists. Defaults to ``False``.
808 :param bool resume: When ``True``, use the checkpoint file to skip already-processed files
809 and steps. Defaults to ``True``.
810 :param str checkpoint_fp: Path of the checkpoint JSON file used to record progress for
811 resuming.
812 :param str cache_fp: Path to the filler issues cache used to avoid re-scanning ``data_dir``.
813 :param int client_recreate_interval: Number of files to process before recreating the SPARQLClient
814 to prevent pycurl's accumulated state from degrading performance. Defaults to ``100``.
815 This is necessary because pycurl's Curl object accumulates internal state (DNS cache,
816 connection pool, SSL/TLS session state) over hundreds of thousands of requests,
817 causing progressive performance degradation.
818 :param bool zip_output: If ``True``, output files are compressed using zip. Defaults to ``True``.
820 :returns: None
821 :rtype: None
823 :raises FileExistsError: If ``overwrite_ok`` is False and an output file already exists.
824 :raises RuntimeError: If the function would write inside ``data_dir`` (safeguard to avoid
825 corrupting input).
826 :raises Exception: Other exceptions may be raised for unexpected errors or endpoint
827 failures.
829 Side effects
830 - Writes fixed JSON-LD files into ``out_dir`` (unless ``dry_run``).
831 - May send SPARQL UPDATE requests to ``endpoint`` (unless ``dry_run``).
832 - Creates or updates ``checkpoint_fp`` and ``cache_fp`` files.
833 - Logs summary information and error details.
834 """
836 start_time = time.time()
838 os.makedirs(out_dir, exist_ok=True)
839 logging.info(f"[Provenance fixing process paradata]: {locals()}") # log parameters
841 checkpoint = Checkpoint(checkpoint_fp)
842 client = SPARQLClient(endpoint)
843 ff_c, dt_c, mps_c, pa_c, mo_c = 0, 0, 0, 0, 0 # counters for issues
844 client_reset_counter = 0 # Track files processed with current client instance
845 times_per_file = []
847 try:
848 logging.info("Provenance fixing process started.")
849 logging.info(f"Checkpoint state: {str(checkpoint.state)}")
850 logging.info("Detecting Filler issues (via cache or new scan)...")
851 # check cache for filler issues or get them
852 filler_issues, tot_files, filler_cache_fp = load_or_prepare_filler_issues(data_dir, cache_fp)
853 rename_mapping = FillerFixerFile.make_global_rename_map(filler_issues)
854 graphs_with_fillers = {t[0]: t[1] for t in filler_issues}
856 meta_dumps = sorted(
857 [(date.fromisoformat(d), url) for d, url in meta_dumps_register],
858 key=lambda x: x[0]
859 )
861 logging.info("Processing RDF dump files...")
862 for file_index, (file_data, fp) in enumerate(
863 tqdm(
864 read_rdf_dump(data_dir, whole_file=True, include_fp=True),
865 desc="Processing RDF dump files...",
866 total=tot_files,
867 dynamic_ncols=True
868 )
869 ):
871 start_file = time.time()
873 if resume and checkpoint.should_skip_file(file_index):
874 continue
876 stringified_data = json.dumps(file_data)
877 d = Dataset(default_union=True)
878 d.parse(data=stringified_data, format='json-ld')
880 ff_issues_in_file = []
881 dt_issues = []
882 mps_issues = []
883 pa_issues = []
884 mo_issues = []
886 # ---------------- FILLER FIXER ----------------
887 if not (resume and checkpoint.step_completed(Step.FILLER, file_index)):
889 for graph in d.graphs():
890 if graph.identifier == d.default_graph.identifier:
891 continue
892 ff_to_fix_val = graphs_with_fillers.get(graph.identifier)
893 if ff_to_fix_val:
894 ff_issues_in_file.append((graph.identifier, ff_to_fix_val))
895 ff_c += 1
896 FillerFixerFile.fix_local_graph(d, graph, rename_mapping, graphs_with_fillers)
898 for chunk in batched(ff_issues_in_file, chunk_size):
899 for t in chunk:
900 g_id = str(t[0])
901 to_delete = [str(i) for i in t[1]['to_delete']]
902 to_rename = [str(i) for i in t[1]['remaining_snapshots']]
903 local_mapping = FillerFixerFile.map_se_names(to_delete, to_rename)
904 newest_names = list(set(local_mapping.values()))
906 if not dry_run_db:
907 sparql_update(client,
908 FillerFixerFile.build_delete_sparql_query(t),
909 failed_queries_fp)
910 sparql_update(client,
911 FillerFixerFile.build_rename_sparql_query(local_mapping),
912 failed_queries_fp)
913 sparql_update(client,
914 FillerFixerFile.build_adapt_invaltime_sparql_query(g_id, newest_names),
915 failed_queries_fp)
917 checkpoint.update_state(file_index, fp, Step.FILLER, endpoint_done=True, local_done=False)
919 # ---------------- DATETIME FIXER ----------------
920 if not (resume and checkpoint.step_completed(Step.DATETIME, file_index)):
922 for graph in d.graphs():
923 if graph.identifier != d.default_graph.identifier:
924 issues = DateTimeFixerFile.detect(graph)
925 if issues:
926 dt_issues.extend(issues)
927 dt_c += len(issues)
928 DateTimeFixerFile.fix_local_graph(d, graph, issues)
930 if not dry_run_db:
931 for chunk in batched(dt_issues, chunk_size):
932 sparql_update(client,
933 DateTimeFixerFile.build_update_query(chunk),
934 failed_queries_fp)
936 checkpoint.update_state(file_index, fp, Step.DATETIME, endpoint_done=True, local_done=False)
938 # ---------------- MISSING PRIMARY SOURCE ----------------
939 if not (resume and checkpoint.step_completed(Step.MISSING_PS, file_index)):
941 for graph in d.graphs():
942 if graph.identifier != d.default_graph.identifier:
943 issue = MissingPrimSourceFixerFile.detect(graph)
944 if issue:
945 mps_issues.append(issue)
946 mps_c += 1
947 MissingPrimSourceFixerFile.fix_local_graph(d, graph, issue, meta_dumps)
949 if not dry_run_db:
950 for chunk in batched(mps_issues, chunk_size):
951 sparql_update(client,
952 MissingPrimSourceFixerFile.build_update_query(chunk, meta_dumps),
953 failed_queries_fp)
955 checkpoint.update_state(file_index, fp, Step.MISSING_PS, endpoint_done=True, local_done=False)
957 # ---------------- MULTI PA FIXER ----------------
958 if not (resume and checkpoint.step_completed(Step.MULTI_PA, file_index)):
960 for graph in d.graphs():
961 if graph.identifier != d.default_graph.identifier:
962 issues = MultiPAFixerFile.detect(graph)
963 if issues:
964 pa_issues.extend(issues)
965 pa_c += len(issues)
966 MultiPAFixerFile.fix_local_graph(d, graph, issues)
968 if not dry_run_db:
969 for chunk in batched(pa_issues, chunk_size):
970 sparql_update(client,
971 MultiPAFixerFile.build_update_query(chunk),
972 failed_queries_fp)
974 checkpoint.update_state(file_index, fp, Step.MULTI_PA, endpoint_done=True, local_done=False)
976 # ---------------- MULTI OBJECT FIXER ----------------
977 if not (resume and checkpoint.step_completed(Step.MULTI_OBJECT, file_index)):
979 for graph in d.graphs():
980 if graph.identifier != d.default_graph.identifier:
981 issue = MultiObjectFixerFile.detect(graph)
982 if issue:
983 mo_issues.append(issue)
984 mo_c += 1
985 MultiObjectFixerFile.fix_local_graph(d, graph, issue, meta_dumps)
987 if not dry_run_db:
988 for chunk in batched(mo_issues, chunk_size):
989 sparql_update(client,
990 MultiObjectFixerFile.build_update_query(chunk, meta_dumps),
991 failed_queries_fp)
993 checkpoint.update_state(file_index, fp, Step.MULTI_OBJECT, endpoint_done=True, local_done=False)
995 # ---------------- WRITE OUTPUT (FIXED) FILE ----------------
996 if not (resume and checkpoint.step_completed(Step.WRITE_FILE, file_index)):
998 if not dry_run_files:
999 abs_data_dir = Path(data_dir).resolve()
1000 abs_out_dir = Path(out_dir).resolve()
1002 rel_path = Path(fp).resolve().relative_to(abs_data_dir)
1003 fixed_fp = abs_out_dir / rel_path
1004 fixed_fp = fixed_fp.with_suffix('.json')
1006 fixed_fp.parent.mkdir(parents=True, exist_ok=True)
1008 if os.path.isfile(fixed_fp) and not overwrite_ok:
1009 raise FileExistsError(f"{fixed_fp} already exists")
1011 if abs_data_dir in fixed_fp.parents: # safeguard for not corrupting input data
1012 raise RuntimeError(f"Refusing to write inside data_dir! {fixed_fp}")
1014 out_data = d.serialize(format='json-ld', indent=None, separators=(', ', ': '))
1016 if zip_output:
1017 with ZipFile(fixed_fp.with_suffix('.zip'), 'w', compression=ZIP_DEFLATED, allowZip64=True) as zipf:
1018 zipf.writestr(fixed_fp.name, out_data)
1019 else:
1020 with open(fixed_fp, 'w', encoding='utf-8') as out_file:
1021 out_file.write(out_data)
1023 checkpoint.update_state(file_index, fp, Step.WRITE_FILE, endpoint_done=True, local_done=True)
1025 checkpoint.flush()
1027 # Periodically recreate SPARQLClient to prevent pycurl's accumulated state degradation
1028 client_reset_counter += 1
1029 if not dry_run_db and client_reset_counter >= client_recreate_interval:
1030 logging.debug(f"Recreating SPARQLClient after {client_reset_counter} files to clear accumulated pycurl state")
1031 client.close()
1032 client = SPARQLClient(endpoint)
1033 client_reset_counter = 0
1035 if dry_run_db and dry_run_callback: # use callback function to use issues found in each file
1036 dry_run_callback(fp, (ff_issues_in_file, dt_issues, mps_issues, pa_issues, mo_issues))
1038 elapsed_file :float = time.time() - start_file
1039 times_per_file.append(elapsed_file)
1040 if file_index % 500 == 0:
1041 avg_time = sum(times_per_file)/len(times_per_file)
1042 est_remaining = avg_time * (tot_files - file_index - 1)
1043 logging.info(f"Average time per file with last {len(times_per_file)} files: {avg_time:.2f} seconds. Estimated remaining time: {est_remaining/3600:.2f} hours.")
1044 times_per_file = []
1047 # successful termination -> cleanup
1048 elapsed = time.time() - start_time
1049 logging.info(f"Provenance fixing process completed successfully in {elapsed/3600:.2f} hours.")
1050 print(f"Provenance fixing process completed successfully in {elapsed/3600:.2f} hours.")
1051 logging.info(f"Total Filler issues found and fixed: {ff_c}")
1052 logging.info(f"Total DateTime issues found and fixed: {dt_c}")
1053 logging.info(f"Total Missing Primary Source issues found and fixed: {mps_c}")
1054 logging.info(f"Total Multiple Processing Agent issues found and fixed: {pa_c}")
1055 logging.info(f"Total Multiple Object issues found and fixed: {mo_c}")
1057 if os.path.exists(filler_cache_fp):
1058 os.remove(filler_cache_fp)
1059 if os.path.exists(checkpoint.path):
1060 os.remove(checkpoint.path)
1062 except (Exception, KeyboardInterrupt) as e:
1063 print(traceback.print_exc())
1064 if type(e) == KeyboardInterrupt:
1065 logging.error("KeyboardInterrupt")
1066 else:
1067 logging.error(e)
1068 elapsed = time.time() - start_time
1069 logging.info(f"Process ran for {elapsed/3600:.2f} hours before interruption.")
1070 logging.info(f"Checkpoint state at process interruption: {checkpoint.state}")
1072 finally:
1073 checkpoint.flush()
1074 client.close()