Coverage for meta_prov_fixer / utils.py: 92%

241 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-16 15:12 +0000

1import os 

2from zipfile import ZipFile 

3import json 

4from collections import defaultdict 

5from tqdm import tqdm 

6import tarfile 

7import time 

8import zipfile 

9import lzma 

10from typing import Generator, List, Literal, Union, Tuple, Iterable, IO, Any, Optional 

11from datetime import datetime, date, timezone 

12from zoneinfo import ZoneInfo 

13from urllib.parse import urlparse 

14import warnings 

15import functools 

16import inspect 

17import logging 

18import tempfile 

19from itertools import islice 

20import argparse 

21 

22 

23 

24def make_json_safe(obj): 

25 if isinstance(obj, dict): 

26 return {make_json_safe(k): make_json_safe(v) for k, v in obj.items()} 

27 elif isinstance(obj, (list, tuple, set)): 

28 return [make_json_safe(item) for item in obj] 

29 elif isinstance(obj, datetime): 

30 return obj.isoformat() 

31 elif hasattr(obj, '__dict__'): 

32 return make_json_safe(vars(obj)) 

33 elif isinstance(obj, (str, int, float, bool)) or obj is None: 

34 return obj 

35 else: 

36 return str(obj) 

37 

38def get_process_paradata(instance): 

39 """ 

40 Inspect caller frame to extract caller function name, argument names and values, 

41 and instance attributes, then return a metadata dict. 

42 

43 Args: 

44 instance: the instance (self) calling this function 

45 

46 Returns: 

47 dict with timestamp, class, function, and input data, where args is a dict of param names to values. 

48 """ 

49 frame = inspect.currentframe() 

50 try: 

51 outer_frame = frame.f_back 

52 func_name = outer_frame.f_code.co_name 

53 

54 # args_info = inspect.getargvalues(outer_frame) 

55 

56 # # Build dictionary of argument name -> value, excluding 'self' 

57 # args_dict = { 

58 # arg: args_info.locals[arg] 

59 # for arg in args_info.args 

60 # if arg != 'self' and arg in args_info.locals 

61 # } 

62 

63 # kwargs, if available 

64 # kwargs = args_info.locals.get('kwargs', {}) 

65 

66 # safe_args = make_json_safe(args_dict) 

67 # safe_kwargs = make_json_safe(kwargs) 

68 instance_attrs = make_json_safe(vars(instance)) 

69 

70 data = { 

71 "timestamp": datetime.now().isoformat(), 

72 "class": instance.__class__.__name__, 

73 "function": func_name, 

74 "input": { 

75 "instance_attributes": instance_attrs, 

76 # "args": safe_args, # dict of arg name -> value 

77 # "kwargs": safe_kwargs, 

78 } 

79 } 

80 

81 return data 

82 finally: 

83 del frame 

84 

85 

86def normalise_datetime(datetime_str: str) -> str: 

87 """ 

88 Normalise a datetime string (offset naive or aware, with or without microseconds) making it a UTC-aware ISO 8601 datetime string with no microseconds specified. 

89 

90 When converting from offset-naive to offset-aware (where necessary), Italian timezone is assumed. UTC is made explicit as 'Z' (not '+00:00').  

91 input string contains the explicit xsd datatype (^^xsd:string, ^^xsd:dateTime, ^^http://www.w3.org/2001/XMLSchema#dateTime, or  

92 ^^http://www.w3.org/2001/XMLSchema#string), the substring representing the datatype is silently removed. 

93 

94 :param datetime_str: Datetime string, possibly as a timestamp. 

95 :type datetime_str: str 

96 :returns: UTC-aware ISO 8601 string without microseconds. 

97 :rtype: str 

98 """ 

99 datetime_str = datetime_str.replace("^^xsd:dateTime", "") 

100 datetime_str = datetime_str.replace("^^http://www.w3.org/2001/XMLSchema#dateTime", "") 

101 datetime_str = datetime_str.replace("^^xsd:string", "") 

102 datetime_str = datetime_str.replace("^^http://www.w3.org/2001/XMLSchema#string", "") 

103 

104 dt = datetime.fromisoformat(datetime_str) 

105 

106 # If datetime is naive, assume Europe/Rome 

107 if dt.tzinfo is None: 

108 dt = dt.replace(tzinfo=ZoneInfo("Europe/Rome")) 

109 

110 # Convert to UTC and strip microseconds 

111 dt_utc = dt.astimezone(timezone.utc).replace(microsecond=0) 

112 

113 # return dt_utc.isoformat() # this formats the date with "+00:00" instead of "Z" 

114 # # to Format with Z: 

115 return dt_utc.strftime('%Y-%m-%dT%H:%M:%SZ') 

116 

117 

118def get_described_res_omid(prov_uri: str) -> str: 

119 """ 

120 Return the URI of the resource described by a provenance graph or snapshot by removing the  

121 '/prov/se/<counter>' suffix if a snapshot URI is passed, or the '/prov/' suffix if a provenance graph IRI (i.e. the graph name) is passed. 

122 

123 :param prov_uri: The provenance URI from which to extract the base URI, i.e. either the URI of a snapshot (ending with '/prov/se/<counter>') or the name of a provenance graph (ending with '/prov/'). 

124 :type prov_uri: str 

125 :returns: The URI of the resource described by the provenance entity. 

126 :rtype: str 

127 """ 

128 if prov_uri.endswith('/prov/'): 

129 return prov_uri.replace("/prov/", '') 

130 elif '/prov/se/' in prov_uri and prov_uri[-1].isdigit(): 

131 return prov_uri.rsplit("/prov/se/", 1)[0] 

132 else: 

133 raise Exception("The input URI is not a valid URI for a prov:Entity resource or a provenance named graph.") 

134 

135 

136def get_seq_num(se_uri: str) -> Union[int, None]: 

137 """ 

138 Return as an integer the sequence number of a snapshot (i.e. its counter) if it ends with a number, else returns None. 

139 

140 :param se_uri: The URI of the snapshot entity. 

141 :type se_uri: str 

142 :returns: Sequence number as integer, or None. 

143 :rtype: Union[int, None] 

144 """ 

145 if se_uri[-1].isdigit(): 

146 return int(se_uri.split('/')[-1]) 

147 else: 

148 raise Exception("Sequence number not found in URI") 

149 

150def remove_seq_num(se_uri:str) -> str: 

151 """ 

152 Return the URI of a provenance snapshot without its sequence number (counter). 

153 

154 Example: 

155 

156 .. code-block:: python 

157 

158 'https://w3id.org/oc/meta/br/06104375687/prov/se/1' -> 'https://w3id.org/oc/meta/br/06104375687/prov/se/' 

159 

160 :param se_uri: The URI of the snapshot entity. 

161 :type se_uri: str 

162 :returns: URI without sequence number. 

163 :rtype: str 

164 """ 

165 return se_uri.rsplit('/', 1)[0] + '/' 

166 

167def get_graph_uri_from_se_uri(se_uri:str) -> str: 

168 """ 

169 Return the URI (name) of a provenance named graph starting from the URI of one of its snapshots. 

170 

171 Example: 

172 

173 .. code-block:: python 

174 

175 'https://w3id.org/oc/meta/br/06104375687/prov/se/1' -> 'https://w3id.org/oc/meta/br/06104375687/prov/' 

176 

177 :param se_uri: The URI of the snapshot entity. 

178 :type se_uri: str 

179 :returns: The URI of the provenance named graph. 

180 :rtype: str 

181 """ 

182 if '/prov/se/' in se_uri: 

183 return se_uri.split('se/', 1)[0] 

184 else: 

185 raise Exception(f"Invalid URI: {se_uri}") 

186 

187def get_previous_meta_dump_uri(meta_dumps_pub_dates, dt:str)-> str: 

188 """ 

189 Return the DOI of the OpenCitations Meta dump that was published before the given date. 

190 

191 Example of a well-formed ``meta_dumps_pub_dates`` register: 

192 

193 .. code-block:: python 

194 

195 meta_dumps_pub_dates = [ 

196 (datetime.date(2022, 12, 19), 'https://doi.org/10.6084/m9.figshare.21747536.v1'), 

197 (datetime.date(2022, 12, 20), 'https://doi.org/10.6084/m9.figshare.21747536.v2'), 

198 (datetime.date(2023, 02, 15), 'https://doi.org/10.6084/m9.figshare.21747536.v3'), 

199 (datetime.date(2023, 06, 28), 'https://doi.org/10.6084/m9.figshare.21747536.v4'), 

200 (datetime.date(2023, 10, 26), 'https://doi.org/10.6084/m9.figshare.21747536.v5'), 

201 (datetime.date(2024, 04, 06), 'https://doi.org/10.6084/m9.figshare.21747536.v6'), 

202 (datetime.date(2024, 06, 17), 'https://doi.org/10.6084/m9.figshare.21747536.v7'), 

203 (datetime.date(2025, 02, 02), 'https://doi.org/10.6084/m9.figshare.21747536.v8'), 

204 (datetime.date(2025, 07, 10), 'https://doi.org/10.5281/zenodo.15855112') 

205 ] 

206 

207 :param meta_dumps_pub_dates: List of tuples (date, DOI) for Meta dumps. 

208 :type meta_dumps_pub_dates: list of `datetime.date` objects. 

209 :param dt: A date string in ISO format (YYYY-MM-DD). 

210 :type dt: str 

211 :returns: The DOI of the previous Meta dump. 

212 :rtype: str 

213 """ 

214 

215 # meta_dumps_pub_dates = sorted([(date.fromisoformat(d), doi) for d, doi in meta_dumps_pub_dates], key=lambda x: x[0]) 

216 d = date.fromisoformat(dt.strip()[:10]) 

217 res = None 

218 for idx, t in enumerate(meta_dumps_pub_dates): 

219 if d <= t[0]: 

220 pos = idx-1 if ((idx-1) >= 0) else 0 # if dt predates the publication date of the absolute first Meta dump, assign the first Meta dump 

221 prim_source = meta_dumps_pub_dates[pos][1] 

222 res = prim_source 

223 break 

224 if res: 

225 return res 

226 else: 

227 warnings.warn(f'[get_previous_meta_dump_uri]: {dt} follows the publication date of the latest Meta dump. The register of published dumps might need to be updated!') 

228 return meta_dumps_pub_dates[-1][1] # picks latest dump in the register 

229 

230 

231def validate_meta_dumps_pub_dates(meta_dumps_register:List[Tuple[str, str]]): 

232 """ 

233 Validate the register of published OpenCitations Meta dumps. 

234 

235 Example of a well-formed register: 

236 

237 .. code-block:: python 

238 

239 [ 

240 ('2022-12-19', 'https://doi.org/10.6084/m9.figshare.21747536.v1'), 

241 ('2022-12-20', 'https://doi.org/10.6084/m9.figshare.21747536.v2'), 

242 ('2023-02-15', 'https://doi.org/10.6084/m9.figshare.21747536.v3'), 

243 ('2023-06-28', 'https://doi.org/10.6084/m9.figshare.21747536.v4'), 

244 ('2023-10-26', 'https://doi.org/10.6084/m9.figshare.21747536.v5'), 

245 ('2024-04-06', 'https://doi.org/10.6084/m9.figshare.21747536.v6'), 

246 ('2024-06-17', 'https://doi.org/10.6084/m9.figshare.21747536.v7'), 

247 ('2025-02-02', 'https://doi.org/10.6084/m9.figshare.21747536.v8'), 

248 ('2025-07-10', 'https://doi.org/10.5281/zenodo.15855112') 

249 ] 

250 

251 :param meta_dumps_register: List of tuples (date, DOI) for Meta dumps. 

252 :type meta_dumps_register: List[Tuple[str, str]] 

253 :returns: None 

254 """ 

255 meta_dumps_register = sorted(meta_dumps_register, key=lambda x: datetime.strptime(x[0], r'%Y-%m-%d')) 

256 if len(meta_dumps_register) < 9: # number of published Meta dumps at the time of writing this code (2025-07-30) 

257 raise ValueError("[validate_meta_dumps_pub_dates]: The list of published Meta dumps is incomplete and must be updated.") 

258 

259 # warn if the last date is more than 2 months ago 

260 last_date = datetime.strptime(meta_dumps_register[-1][0], r'%Y-%m-%d') 

261 if (datetime.now() - last_date).days > 60: 

262 warnings.warn(f"[validate_meta_dumps_pub_dates]: The latest Meta dump in the register ({last_date.strftime(r'%Y-%m-%d')}) is more than 2 months old. Make sure to update the register with the latest publication dates and DOIs!") 

263 logging.warning(f"[validate_meta_dumps_pub_dates]: The latest Meta dump in the register ({last_date.strftime(r'%Y-%m-%d')}) is more than 2 months old. Make sure to update the register with the latest publication dates and DOIs!") 

264 

265 for index, item in enumerate(meta_dumps_register): 

266 # Check type and length 

267 if not isinstance(item, tuple): 

268 raise ValueError(f"[validate_meta_dumps_pub_dates]: Item at index {index} is not a tuple: {item}") 

269 if len(item) != 2: 

270 raise ValueError(f"[validate_meta_dumps_pub_dates]: Tuple at index {index} does not have 2 elements: {item}") 

271 

272 date_str, url = item 

273 

274 # Validate ISO date 

275 try: 

276 datetime.strptime(date_str, r'%Y-%m-%d') 

277 except ValueError: 

278 raise ValueError(f"[validate_meta_dumps_pub_dates]: Invalid ISO date at index {index}: {date_str}") 

279 

280 # Validate URL 

281 parsed_url = urlparse(url) 

282 if not (parsed_url.scheme in ('http', 'https') and parsed_url.netloc): 

283 raise ValueError(f"[validate_meta_dumps_pub_dates]: Invalid URL at index {index}: {url}") 

284 

285 

286def chunker( 

287 source: Union[str, IO, Iterable], 

288 batch_size: int, 

289 skip_first_line: bool = True, 

290) -> Generator[Tuple[List[Any], int], None, None]: 

291 """ 

292 Yield batches of Python objects from a file or iterable. 

293 

294 :param source: Path to a JSONL file, file-like object, or any iterable of objects. 

295 :param batch_size: Number of items per batch. 

296 :param skip_first_line: Skip first line if reading from a file (useful for skipping metadata). 

297 :yields: Tuple (batch, line_count), where batch is a list of objects and line_count is the last processed line number. 

298 """ 

299 

300 def _chunk_iter(it: Iterable): 

301 line_count = 0 

302 batch = [] 

303 for item in it: 

304 batch.append(item) 

305 line_count += 1 

306 if len(batch) >= batch_size: 

307 yield batch, line_count 

308 batch = [] 

309 if batch: 

310 yield batch, line_count 

311 

312 if isinstance(source, str): 

313 with open(source, "r", encoding="utf-8") as f: 

314 if skip_first_line: 

315 next(f, None) # Skip metadata 

316 parsed_iter = (json.loads(line) for line in f if line.strip()) 

317 yield from _chunk_iter(parsed_iter) 

318 

319 elif hasattr(source, "read"): 

320 if skip_first_line: 

321 next(source, None) 

322 parsed_iter = (json.loads(line) for line in source if line.strip()) 

323 yield from _chunk_iter(parsed_iter) 

324 

325 else: 

326 # Iterable 

327 it = iter(source) 

328 yield from _chunk_iter(it) 

329 

330 

331class CheckpointManager: 

332 def __init__(self, path="checkpoint.json"): 

333 self.path = path 

334 

335 def save(self, fixer_name: str, phase: str, batch_idx: int): 

336 """Always save checkpoint with a consistent schema.""" 

337 state = { 

338 "fixer": fixer_name, # which fixer 

339 "phase": phase, # "done" or the phase name 

340 "batch_idx": batch_idx, # -1 if not batch-related 

341 } 

342 with open(self.path, "w") as f: 

343 json.dump(state, f) 

344 logging.debug(f"[Checkpoint] Saved: {state}") 

345 

346 def load(self): 

347 if not os.path.exists(self.path): 

348 return {} 

349 with open(self.path) as f: 

350 return json.load(f) 

351 

352 def clear(self): 

353 if os.path.exists(self.path): 

354 os.remove(self.path) 

355 

356 

357def checkpointed_batch(stream, batch_size, fixer_name=None, phase=None, ckpnt_mngr=Union[None, CheckpointManager]): 

358 """ 

359 Yield batches with optional checkpointing. 

360 - stream can be: 

361 * an iterable of objects (list, generator, etc.) 

362 * a file path (str) to a JSONL file 

363 """ 

364 

365 batch_size = int(batch_size) 

366 

367 source_iter = chunker(stream, batch_size) 

368 

369 # --- no checkpoint: just yield batches --- 

370 if ckpnt_mngr is None: 

371 for idx, (batch, line_num) in enumerate(source_iter): 

372 yield idx, (batch, line_num) 

373 return 

374 

375 # --- resumable checkpoint mode --- 

376 state = ckpnt_mngr.load() 

377 last_idx = -1 

378 if ( 

379 state.get("fixer") == fixer_name 

380 and state.get("phase") == phase 

381 and "batch_idx" in state 

382 ): 

383 last_idx = state["batch_idx"] 

384 logging.info(f"Resuming {fixer_name}, in phase {phase} from after batch {last_idx}") 

385 

386 for idx, (batch, line_num) in enumerate(source_iter): 

387 if idx <= last_idx: 

388 continue 

389 yield idx, (batch, line_num) 

390 ckpnt_mngr.save(fixer_name, phase, idx) 

391 

392 

393def detection_completed(fixer_name=None, checkpoint=Union[None, CheckpointManager])-> bool: 

394 """ 

395 Checks the state of the checkpoint to see if a given fixer's detection step is completed 

396 """ 

397 if checkpoint: 

398 state = checkpoint.load() 

399 if state.get("fixer") == fixer_name: # if detection is not completed, the checkpoint file should retain the name of the previous fixer 

400 return True 

401 else: 

402 return False 

403 return False 

404 

405 

406class TimedProcess: 

407 def __init__(self, total_phases): 

408 self.total_phases = total_phases 

409 self.phase_times = [] 

410 self.start_time = None 

411 self.current_phase_start = None 

412 

413 def start(self): 

414 self.start_time = time.time() 

415 

416 def start_phase(self): 

417 self.current_phase_start = time.time() 

418 

419 def end_phase(self): 

420 duration = time.time() - self.current_phase_start 

421 self.phase_times.append(duration) 

422 return duration 

423 

424 def eta(self, current_phase_idx): 

425 elapsed = time.time() - self.start_time 

426 avg = sum(self.phase_times) / len(self.phase_times) if self.phase_times else 0 

427 remaining = (self.total_phases - current_phase_idx - 1) * avg 

428 return elapsed, remaining 

429 

430 

431 

432def get_rdf_prov_filepaths(data_dir): 

433 

434 paths = [] 

435 for dirpath, _, filenames in os.walk(data_dir): 

436 if os.path.basename(dirpath) == 'prov': 

437 for fn in filenames: 

438 fp = os.path.join(dirpath, fn) 

439 if fp.endswith(('.zip', '.json.xz', '.json')): 

440 paths.append(fp) 

441 return paths 

442 

443 

444def read_rdf_dump(data_dir:str, whole_file:bool=False, ordered:bool=True, include_fp=False) -> Generator[Union[dict, list, tuple], None, None]: 

445 """ 

446 Iterates over the files in any given directory storing OpenCitations Meta RDF **PROVENANCE** files 

447 and yields the JSON-LD data. If `whole_file` is False (default), yields single named graphs as 

448 dictionaries, else if `whole_file` is True yields the whole parsed JSON file as a list of dictionaries. 

449 

450 :param data_dir: Path to the directory containing the decompressed provenance archive. 

451 :param whole_file: (default: False) If True, yield whole files, else single named graphs. 

452 :param ordered: (default: True) If True, sort all files to process deterministically. 

453 :param include_fp: (default: False) If True, yield a tuple where the second element is a filepath. 

454 :yield: Dictionary or list of dictionaries corresponding to (a) named graph(s). If include_fp is True, yields a tuple 

455 where the second element is the path of the file from which the graph or dataset was parsed. 

456 """ 

457 

458 paths = get_rdf_prov_filepaths(data_dir) 

459 

460 if ordered: 

461 paths.sort() # deterministic global order 

462 

463 for fp in paths: 

464 

465 # ZIP archives 

466 if fp.endswith('.zip'): 

467 with ZipFile(fp) as archive: 

468 members = [f for f in archive.filelist if f.filename.endswith('.json')] 

469 if ordered: 

470 members.sort(key=lambda z: z.filename) 

471 

472 for m in members: 

473 with archive.open(m.filename) as f: 

474 data = json.load(f) 

475 if not whole_file: 

476 for g in data: 

477 yield g if not include_fp else (g, fp) 

478 else: 

479 yield data if not include_fp else (data, fp) 

480 

481 # LZMA (.json.xz) 

482 elif fp.endswith('.json.xz'): 

483 with lzma.open(fp, 'rt', encoding='utf-8') as f: 

484 data = json.load(f) 

485 if not whole_file: 

486 for g in data: 

487 yield g if not include_fp else (g, fp) 

488 else: 

489 yield data if not include_fp else (data, fp) 

490 

491 # Plain JSON 

492 elif fp.endswith('.json'): 

493 # skip if compressed duplicate exists 

494 if os.path.exists(fp + '.xz') or os.path.exists(fp.removesuffix('.json') + '.zip'): 

495 continue 

496 

497 with open(fp, 'r', encoding='utf-8') as f: 

498 data = json.load(f) 

499 if not whole_file: 

500 for g in data: 

501 yield g if not include_fp else (g, fp) 

502 else: 

503 yield data if not include_fp else (data, fp) 

504 

505 

506 

507def load_modified_graphs_uris(stream:Union[str, Iterable]) -> set: 

508 deleted = set() 

509 for batch, _ in chunker(stream, 10000): 

510 for g, _dict in batch: 

511 deleted.add(g) 

512 return deleted 

513 

514 

515def batched(iterable, n, strict=False): 

516 # batched('ABCDEFG', 2) → AB CD EF G 

517 # see https://docs.python.org/3/library/itertools.html#itertools.batched (only for python >= 3.13) 

518 if n < 1: 

519 raise ValueError('n must be at least one') 

520 iterator = iter(iterable) 

521 while batch := tuple(islice(iterator, n)): 

522 if strict and len(batch) != n: 

523 raise ValueError('batched(): incomplete batch') 

524 yield batch