Coverage for oc_ds_converter / lib / process_utils.py: 81%
88 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5import csv
6import json
7import os
8from pathlib import Path
10from filelock import BaseFileLock
12from oc_ds_converter.oc_idmanager.oc_data_storage.in_memory_manager import InMemoryStorageManager
13from oc_ds_converter.oc_idmanager.oc_data_storage.redis_manager import RedisStorageManager
14from oc_ds_converter.oc_idmanager.oc_data_storage.sqlite_manager import SqliteStorageManager
15from oc_ds_converter.oc_idmanager.oc_data_storage.storage_manager import StorageManager
18def get_storage_manager(storage_path: str | None, testing: bool) -> StorageManager:
19 if storage_path:
20 if not os.path.exists(storage_path):
21 parent_dir = os.path.abspath(os.path.join(storage_path, os.pardir))
22 if not os.path.exists(parent_dir):
23 Path(parent_dir).mkdir(parents=True, exist_ok=True)
24 if storage_path.endswith(".db"):
25 return SqliteStorageManager(storage_path)
26 if storage_path.endswith(".json"):
27 return InMemoryStorageManager(storage_path)
28 raise ValueError(f"Storage path must end with .db or .json, got: {storage_path}")
29 return RedisStorageManager(testing=testing)
32def normalize_cache_path(cache: str | None) -> str:
33 if cache:
34 if not cache.endswith(".json"):
35 return os.path.join(os.getcwd(), "cache.json")
36 parent_dir = os.path.abspath(os.path.join(cache, os.pardir))
37 if not os.path.exists(parent_dir):
38 Path(parent_dir).mkdir(parents=True, exist_ok=True)
39 return cache
40 return os.path.join(os.getcwd(), "cache.json")
43def init_process_cache(cache_path: str, lock: BaseFileLock) -> dict[str, list[str]]:
44 cache_dict: dict[str, list[str]] = {"citing": [], "cited": []}
45 write_new = False
46 if os.path.exists(cache_path):
47 with lock:
48 with open(cache_path, "r", encoding="utf-8") as c:
49 try:
50 cache_dict = json.load(c)
51 except json.JSONDecodeError:
52 write_new = True
53 else:
54 write_new = True
55 if write_new:
56 with lock:
57 with open(cache_path, "w", encoding="utf-8") as c:
58 json.dump(cache_dict, c)
59 return cache_dict
62def mark_file_completed(
63 cache_path: str,
64 lock: BaseFileLock,
65 filename: str,
66 processing_citing: bool,
67) -> None:
68 iteration_key = "citing" if processing_citing else "cited"
69 with lock:
70 cache_dict: dict[str, list[str]] = {}
71 if os.path.exists(cache_path):
72 with open(cache_path, "r", encoding="utf-8") as f:
73 cache_dict = json.load(f)
74 if iteration_key not in cache_dict:
75 cache_dict[iteration_key] = []
76 file_basename = Path(filename).name
77 if file_basename not in cache_dict[iteration_key]:
78 cache_dict[iteration_key].append(file_basename)
79 with open(cache_path, "w", encoding="utf-8") as f:
80 json.dump(cache_dict, f)
83def delete_cache_files(cache_path: str) -> None:
84 if os.path.exists(cache_path):
85 os.remove(cache_path)
86 lock_file = cache_path + ".lock"
87 if os.path.exists(lock_file):
88 os.remove(lock_file)
91def create_output_dirs(csv_dir: str) -> str:
92 if not os.path.exists(csv_dir):
93 os.makedirs(csv_dir)
94 citations_dir = csv_dir + "_citations"
95 if not os.path.exists(citations_dir):
96 os.makedirs(citations_dir)
97 return citations_dir
100def write_csv_output(filepath: str, rows: list[dict[str, str]]) -> None:
101 if not rows:
102 return
103 with open(filepath, 'w', newline='', encoding='utf-8') as output_file:
104 dict_writer = csv.DictWriter(
105 output_file, rows[0].keys(), delimiter=',', quotechar='"',
106 quoting=csv.QUOTE_NONNUMERIC, escapechar='\\'
107 )
108 dict_writer.writeheader()
109 dict_writer.writerows(rows)
112def cleanup_storage(testing: bool) -> None:
113 storage_manager = RedisStorageManager(testing=testing)
114 storage_manager.delete_storage()
117def is_file_in_cache(
118 cache_dict: dict[str, list[str]],
119 filename: str,
120 processing_citing: bool,
121) -> bool:
122 file_basename = Path(filename).name
123 if processing_citing:
124 return bool(cache_dict.get("citing") and file_basename in cache_dict["citing"])
125 return bool(cache_dict.get("cited") and file_basename in cache_dict["cited"])