Coverage for meta_prov_fixer / utils.py: 92%
241 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 15:12 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 15:12 +0000
1import os
2from zipfile import ZipFile
3import json
4from collections import defaultdict
5from tqdm import tqdm
6import tarfile
7import time
8import zipfile
9import lzma
10from typing import Generator, List, Literal, Union, Tuple, Iterable, IO, Any, Optional
11from datetime import datetime, date, timezone
12from zoneinfo import ZoneInfo
13from urllib.parse import urlparse
14import warnings
15import functools
16import inspect
17import logging
18import tempfile
19from itertools import islice
20import argparse
24def make_json_safe(obj):
25 if isinstance(obj, dict):
26 return {make_json_safe(k): make_json_safe(v) for k, v in obj.items()}
27 elif isinstance(obj, (list, tuple, set)):
28 return [make_json_safe(item) for item in obj]
29 elif isinstance(obj, datetime):
30 return obj.isoformat()
31 elif hasattr(obj, '__dict__'):
32 return make_json_safe(vars(obj))
33 elif isinstance(obj, (str, int, float, bool)) or obj is None:
34 return obj
35 else:
36 return str(obj)
38def get_process_paradata(instance):
39 """
40 Inspect caller frame to extract caller function name, argument names and values,
41 and instance attributes, then return a metadata dict.
43 Args:
44 instance: the instance (self) calling this function
46 Returns:
47 dict with timestamp, class, function, and input data, where args is a dict of param names to values.
48 """
49 frame = inspect.currentframe()
50 try:
51 outer_frame = frame.f_back
52 func_name = outer_frame.f_code.co_name
54 # args_info = inspect.getargvalues(outer_frame)
56 # # Build dictionary of argument name -> value, excluding 'self'
57 # args_dict = {
58 # arg: args_info.locals[arg]
59 # for arg in args_info.args
60 # if arg != 'self' and arg in args_info.locals
61 # }
63 # kwargs, if available
64 # kwargs = args_info.locals.get('kwargs', {})
66 # safe_args = make_json_safe(args_dict)
67 # safe_kwargs = make_json_safe(kwargs)
68 instance_attrs = make_json_safe(vars(instance))
70 data = {
71 "timestamp": datetime.now().isoformat(),
72 "class": instance.__class__.__name__,
73 "function": func_name,
74 "input": {
75 "instance_attributes": instance_attrs,
76 # "args": safe_args, # dict of arg name -> value
77 # "kwargs": safe_kwargs,
78 }
79 }
81 return data
82 finally:
83 del frame
86def normalise_datetime(datetime_str: str) -> str:
87 """
88 Normalise a datetime string (offset naive or aware, with or without microseconds) making it a UTC-aware ISO 8601 datetime string with no microseconds specified.
90 When converting from offset-naive to offset-aware (where necessary), Italian timezone is assumed. UTC is made explicit as 'Z' (not '+00:00').
91 input string contains the explicit xsd datatype (^^xsd:string, ^^xsd:dateTime, ^^http://www.w3.org/2001/XMLSchema#dateTime, or
92 ^^http://www.w3.org/2001/XMLSchema#string), the substring representing the datatype is silently removed.
94 :param datetime_str: Datetime string, possibly as a timestamp.
95 :type datetime_str: str
96 :returns: UTC-aware ISO 8601 string without microseconds.
97 :rtype: str
98 """
99 datetime_str = datetime_str.replace("^^xsd:dateTime", "")
100 datetime_str = datetime_str.replace("^^http://www.w3.org/2001/XMLSchema#dateTime", "")
101 datetime_str = datetime_str.replace("^^xsd:string", "")
102 datetime_str = datetime_str.replace("^^http://www.w3.org/2001/XMLSchema#string", "")
104 dt = datetime.fromisoformat(datetime_str)
106 # If datetime is naive, assume Europe/Rome
107 if dt.tzinfo is None:
108 dt = dt.replace(tzinfo=ZoneInfo("Europe/Rome"))
110 # Convert to UTC and strip microseconds
111 dt_utc = dt.astimezone(timezone.utc).replace(microsecond=0)
113 # return dt_utc.isoformat() # this formats the date with "+00:00" instead of "Z"
114 # # to Format with Z:
115 return dt_utc.strftime('%Y-%m-%dT%H:%M:%SZ')
118def get_described_res_omid(prov_uri: str) -> str:
119 """
120 Return the URI of the resource described by a provenance graph or snapshot by removing the
121 '/prov/se/<counter>' suffix if a snapshot URI is passed, or the '/prov/' suffix if a provenance graph IRI (i.e. the graph name) is passed.
123 :param prov_uri: The provenance URI from which to extract the base URI, i.e. either the URI of a snapshot (ending with '/prov/se/<counter>') or the name of a provenance graph (ending with '/prov/').
124 :type prov_uri: str
125 :returns: The URI of the resource described by the provenance entity.
126 :rtype: str
127 """
128 if prov_uri.endswith('/prov/'):
129 return prov_uri.replace("/prov/", '')
130 elif '/prov/se/' in prov_uri and prov_uri[-1].isdigit():
131 return prov_uri.rsplit("/prov/se/", 1)[0]
132 else:
133 raise Exception("The input URI is not a valid URI for a prov:Entity resource or a provenance named graph.")
136def get_seq_num(se_uri: str) -> Union[int, None]:
137 """
138 Return as an integer the sequence number of a snapshot (i.e. its counter) if it ends with a number, else returns None.
140 :param se_uri: The URI of the snapshot entity.
141 :type se_uri: str
142 :returns: Sequence number as integer, or None.
143 :rtype: Union[int, None]
144 """
145 if se_uri[-1].isdigit():
146 return int(se_uri.split('/')[-1])
147 else:
148 raise Exception("Sequence number not found in URI")
150def remove_seq_num(se_uri:str) -> str:
151 """
152 Return the URI of a provenance snapshot without its sequence number (counter).
154 Example:
156 .. code-block:: python
158 'https://w3id.org/oc/meta/br/06104375687/prov/se/1' -> 'https://w3id.org/oc/meta/br/06104375687/prov/se/'
160 :param se_uri: The URI of the snapshot entity.
161 :type se_uri: str
162 :returns: URI without sequence number.
163 :rtype: str
164 """
165 return se_uri.rsplit('/', 1)[0] + '/'
167def get_graph_uri_from_se_uri(se_uri:str) -> str:
168 """
169 Return the URI (name) of a provenance named graph starting from the URI of one of its snapshots.
171 Example:
173 .. code-block:: python
175 'https://w3id.org/oc/meta/br/06104375687/prov/se/1' -> 'https://w3id.org/oc/meta/br/06104375687/prov/'
177 :param se_uri: The URI of the snapshot entity.
178 :type se_uri: str
179 :returns: The URI of the provenance named graph.
180 :rtype: str
181 """
182 if '/prov/se/' in se_uri:
183 return se_uri.split('se/', 1)[0]
184 else:
185 raise Exception(f"Invalid URI: {se_uri}")
187def get_previous_meta_dump_uri(meta_dumps_pub_dates, dt:str)-> str:
188 """
189 Return the DOI of the OpenCitations Meta dump that was published before the given date.
191 Example of a well-formed ``meta_dumps_pub_dates`` register:
193 .. code-block:: python
195 meta_dumps_pub_dates = [
196 (datetime.date(2022, 12, 19), 'https://doi.org/10.6084/m9.figshare.21747536.v1'),
197 (datetime.date(2022, 12, 20), 'https://doi.org/10.6084/m9.figshare.21747536.v2'),
198 (datetime.date(2023, 02, 15), 'https://doi.org/10.6084/m9.figshare.21747536.v3'),
199 (datetime.date(2023, 06, 28), 'https://doi.org/10.6084/m9.figshare.21747536.v4'),
200 (datetime.date(2023, 10, 26), 'https://doi.org/10.6084/m9.figshare.21747536.v5'),
201 (datetime.date(2024, 04, 06), 'https://doi.org/10.6084/m9.figshare.21747536.v6'),
202 (datetime.date(2024, 06, 17), 'https://doi.org/10.6084/m9.figshare.21747536.v7'),
203 (datetime.date(2025, 02, 02), 'https://doi.org/10.6084/m9.figshare.21747536.v8'),
204 (datetime.date(2025, 07, 10), 'https://doi.org/10.5281/zenodo.15855112')
205 ]
207 :param meta_dumps_pub_dates: List of tuples (date, DOI) for Meta dumps.
208 :type meta_dumps_pub_dates: list of `datetime.date` objects.
209 :param dt: A date string in ISO format (YYYY-MM-DD).
210 :type dt: str
211 :returns: The DOI of the previous Meta dump.
212 :rtype: str
213 """
215 # meta_dumps_pub_dates = sorted([(date.fromisoformat(d), doi) for d, doi in meta_dumps_pub_dates], key=lambda x: x[0])
216 d = date.fromisoformat(dt.strip()[:10])
217 res = None
218 for idx, t in enumerate(meta_dumps_pub_dates):
219 if d <= t[0]:
220 pos = idx-1 if ((idx-1) >= 0) else 0 # if dt predates the publication date of the absolute first Meta dump, assign the first Meta dump
221 prim_source = meta_dumps_pub_dates[pos][1]
222 res = prim_source
223 break
224 if res:
225 return res
226 else:
227 warnings.warn(f'[get_previous_meta_dump_uri]: {dt} follows the publication date of the latest Meta dump. The register of published dumps might need to be updated!')
228 return meta_dumps_pub_dates[-1][1] # picks latest dump in the register
231def validate_meta_dumps_pub_dates(meta_dumps_register:List[Tuple[str, str]]):
232 """
233 Validate the register of published OpenCitations Meta dumps.
235 Example of a well-formed register:
237 .. code-block:: python
239 [
240 ('2022-12-19', 'https://doi.org/10.6084/m9.figshare.21747536.v1'),
241 ('2022-12-20', 'https://doi.org/10.6084/m9.figshare.21747536.v2'),
242 ('2023-02-15', 'https://doi.org/10.6084/m9.figshare.21747536.v3'),
243 ('2023-06-28', 'https://doi.org/10.6084/m9.figshare.21747536.v4'),
244 ('2023-10-26', 'https://doi.org/10.6084/m9.figshare.21747536.v5'),
245 ('2024-04-06', 'https://doi.org/10.6084/m9.figshare.21747536.v6'),
246 ('2024-06-17', 'https://doi.org/10.6084/m9.figshare.21747536.v7'),
247 ('2025-02-02', 'https://doi.org/10.6084/m9.figshare.21747536.v8'),
248 ('2025-07-10', 'https://doi.org/10.5281/zenodo.15855112')
249 ]
251 :param meta_dumps_register: List of tuples (date, DOI) for Meta dumps.
252 :type meta_dumps_register: List[Tuple[str, str]]
253 :returns: None
254 """
255 meta_dumps_register = sorted(meta_dumps_register, key=lambda x: datetime.strptime(x[0], r'%Y-%m-%d'))
256 if len(meta_dumps_register) < 9: # number of published Meta dumps at the time of writing this code (2025-07-30)
257 raise ValueError("[validate_meta_dumps_pub_dates]: The list of published Meta dumps is incomplete and must be updated.")
259 # warn if the last date is more than 2 months ago
260 last_date = datetime.strptime(meta_dumps_register[-1][0], r'%Y-%m-%d')
261 if (datetime.now() - last_date).days > 60:
262 warnings.warn(f"[validate_meta_dumps_pub_dates]: The latest Meta dump in the register ({last_date.strftime(r'%Y-%m-%d')}) is more than 2 months old. Make sure to update the register with the latest publication dates and DOIs!")
263 logging.warning(f"[validate_meta_dumps_pub_dates]: The latest Meta dump in the register ({last_date.strftime(r'%Y-%m-%d')}) is more than 2 months old. Make sure to update the register with the latest publication dates and DOIs!")
265 for index, item in enumerate(meta_dumps_register):
266 # Check type and length
267 if not isinstance(item, tuple):
268 raise ValueError(f"[validate_meta_dumps_pub_dates]: Item at index {index} is not a tuple: {item}")
269 if len(item) != 2:
270 raise ValueError(f"[validate_meta_dumps_pub_dates]: Tuple at index {index} does not have 2 elements: {item}")
272 date_str, url = item
274 # Validate ISO date
275 try:
276 datetime.strptime(date_str, r'%Y-%m-%d')
277 except ValueError:
278 raise ValueError(f"[validate_meta_dumps_pub_dates]: Invalid ISO date at index {index}: {date_str}")
280 # Validate URL
281 parsed_url = urlparse(url)
282 if not (parsed_url.scheme in ('http', 'https') and parsed_url.netloc):
283 raise ValueError(f"[validate_meta_dumps_pub_dates]: Invalid URL at index {index}: {url}")
286def chunker(
287 source: Union[str, IO, Iterable],
288 batch_size: int,
289 skip_first_line: bool = True,
290) -> Generator[Tuple[List[Any], int], None, None]:
291 """
292 Yield batches of Python objects from a file or iterable.
294 :param source: Path to a JSONL file, file-like object, or any iterable of objects.
295 :param batch_size: Number of items per batch.
296 :param skip_first_line: Skip first line if reading from a file (useful for skipping metadata).
297 :yields: Tuple (batch, line_count), where batch is a list of objects and line_count is the last processed line number.
298 """
300 def _chunk_iter(it: Iterable):
301 line_count = 0
302 batch = []
303 for item in it:
304 batch.append(item)
305 line_count += 1
306 if len(batch) >= batch_size:
307 yield batch, line_count
308 batch = []
309 if batch:
310 yield batch, line_count
312 if isinstance(source, str):
313 with open(source, "r", encoding="utf-8") as f:
314 if skip_first_line:
315 next(f, None) # Skip metadata
316 parsed_iter = (json.loads(line) for line in f if line.strip())
317 yield from _chunk_iter(parsed_iter)
319 elif hasattr(source, "read"):
320 if skip_first_line:
321 next(source, None)
322 parsed_iter = (json.loads(line) for line in source if line.strip())
323 yield from _chunk_iter(parsed_iter)
325 else:
326 # Iterable
327 it = iter(source)
328 yield from _chunk_iter(it)
331class CheckpointManager:
332 def __init__(self, path="checkpoint.json"):
333 self.path = path
335 def save(self, fixer_name: str, phase: str, batch_idx: int):
336 """Always save checkpoint with a consistent schema."""
337 state = {
338 "fixer": fixer_name, # which fixer
339 "phase": phase, # "done" or the phase name
340 "batch_idx": batch_idx, # -1 if not batch-related
341 }
342 with open(self.path, "w") as f:
343 json.dump(state, f)
344 logging.debug(f"[Checkpoint] Saved: {state}")
346 def load(self):
347 if not os.path.exists(self.path):
348 return {}
349 with open(self.path) as f:
350 return json.load(f)
352 def clear(self):
353 if os.path.exists(self.path):
354 os.remove(self.path)
357def checkpointed_batch(stream, batch_size, fixer_name=None, phase=None, ckpnt_mngr=Union[None, CheckpointManager]):
358 """
359 Yield batches with optional checkpointing.
360 - stream can be:
361 * an iterable of objects (list, generator, etc.)
362 * a file path (str) to a JSONL file
363 """
365 batch_size = int(batch_size)
367 source_iter = chunker(stream, batch_size)
369 # --- no checkpoint: just yield batches ---
370 if ckpnt_mngr is None:
371 for idx, (batch, line_num) in enumerate(source_iter):
372 yield idx, (batch, line_num)
373 return
375 # --- resumable checkpoint mode ---
376 state = ckpnt_mngr.load()
377 last_idx = -1
378 if (
379 state.get("fixer") == fixer_name
380 and state.get("phase") == phase
381 and "batch_idx" in state
382 ):
383 last_idx = state["batch_idx"]
384 logging.info(f"Resuming {fixer_name}, in phase {phase} from after batch {last_idx}")
386 for idx, (batch, line_num) in enumerate(source_iter):
387 if idx <= last_idx:
388 continue
389 yield idx, (batch, line_num)
390 ckpnt_mngr.save(fixer_name, phase, idx)
393def detection_completed(fixer_name=None, checkpoint=Union[None, CheckpointManager])-> bool:
394 """
395 Checks the state of the checkpoint to see if a given fixer's detection step is completed
396 """
397 if checkpoint:
398 state = checkpoint.load()
399 if state.get("fixer") == fixer_name: # if detection is not completed, the checkpoint file should retain the name of the previous fixer
400 return True
401 else:
402 return False
403 return False
406class TimedProcess:
407 def __init__(self, total_phases):
408 self.total_phases = total_phases
409 self.phase_times = []
410 self.start_time = None
411 self.current_phase_start = None
413 def start(self):
414 self.start_time = time.time()
416 def start_phase(self):
417 self.current_phase_start = time.time()
419 def end_phase(self):
420 duration = time.time() - self.current_phase_start
421 self.phase_times.append(duration)
422 return duration
424 def eta(self, current_phase_idx):
425 elapsed = time.time() - self.start_time
426 avg = sum(self.phase_times) / len(self.phase_times) if self.phase_times else 0
427 remaining = (self.total_phases - current_phase_idx - 1) * avg
428 return elapsed, remaining
432def get_rdf_prov_filepaths(data_dir):
434 paths = []
435 for dirpath, _, filenames in os.walk(data_dir):
436 if os.path.basename(dirpath) == 'prov':
437 for fn in filenames:
438 fp = os.path.join(dirpath, fn)
439 if fp.endswith(('.zip', '.json.xz', '.json')):
440 paths.append(fp)
441 return paths
444def read_rdf_dump(data_dir:str, whole_file:bool=False, ordered:bool=True, include_fp=False) -> Generator[Union[dict, list, tuple], None, None]:
445 """
446 Iterates over the files in any given directory storing OpenCitations Meta RDF **PROVENANCE** files
447 and yields the JSON-LD data. If `whole_file` is False (default), yields single named graphs as
448 dictionaries, else if `whole_file` is True yields the whole parsed JSON file as a list of dictionaries.
450 :param data_dir: Path to the directory containing the decompressed provenance archive.
451 :param whole_file: (default: False) If True, yield whole files, else single named graphs.
452 :param ordered: (default: True) If True, sort all files to process deterministically.
453 :param include_fp: (default: False) If True, yield a tuple where the second element is a filepath.
454 :yield: Dictionary or list of dictionaries corresponding to (a) named graph(s). If include_fp is True, yields a tuple
455 where the second element is the path of the file from which the graph or dataset was parsed.
456 """
458 paths = get_rdf_prov_filepaths(data_dir)
460 if ordered:
461 paths.sort() # deterministic global order
463 for fp in paths:
465 # ZIP archives
466 if fp.endswith('.zip'):
467 with ZipFile(fp) as archive:
468 members = [f for f in archive.filelist if f.filename.endswith('.json')]
469 if ordered:
470 members.sort(key=lambda z: z.filename)
472 for m in members:
473 with archive.open(m.filename) as f:
474 data = json.load(f)
475 if not whole_file:
476 for g in data:
477 yield g if not include_fp else (g, fp)
478 else:
479 yield data if not include_fp else (data, fp)
481 # LZMA (.json.xz)
482 elif fp.endswith('.json.xz'):
483 with lzma.open(fp, 'rt', encoding='utf-8') as f:
484 data = json.load(f)
485 if not whole_file:
486 for g in data:
487 yield g if not include_fp else (g, fp)
488 else:
489 yield data if not include_fp else (data, fp)
491 # Plain JSON
492 elif fp.endswith('.json'):
493 # skip if compressed duplicate exists
494 if os.path.exists(fp + '.xz') or os.path.exists(fp.removesuffix('.json') + '.zip'):
495 continue
497 with open(fp, 'r', encoding='utf-8') as f:
498 data = json.load(f)
499 if not whole_file:
500 for g in data:
501 yield g if not include_fp else (g, fp)
502 else:
503 yield data if not include_fp else (data, fp)
507def load_modified_graphs_uris(stream:Union[str, Iterable]) -> set:
508 deleted = set()
509 for batch, _ in chunker(stream, 10000):
510 for g, _dict in batch:
511 deleted.add(g)
512 return deleted
515def batched(iterable, n, strict=False):
516 # batched('ABCDEFG', 2) → AB CD EF G
517 # see https://docs.python.org/3/library/itertools.html#itertools.batched (only for python >= 3.13)
518 if n < 1:
519 raise ValueError('n must be at least one')
520 iterator = iter(iterable)
521 while batch := tuple(islice(iterator, n)):
522 if strict and len(batch) != n:
523 raise ValueError('batched(): incomplete batch')
524 yield batch