Coverage for oc_ds_converter / lib / process_utils.py: 81%

88 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-25 18:06 +0000

1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5import csv 

6import json 

7import os 

8from pathlib import Path 

9 

10from filelock import BaseFileLock 

11 

12from oc_ds_converter.oc_idmanager.oc_data_storage.in_memory_manager import InMemoryStorageManager 

13from oc_ds_converter.oc_idmanager.oc_data_storage.redis_manager import RedisStorageManager 

14from oc_ds_converter.oc_idmanager.oc_data_storage.sqlite_manager import SqliteStorageManager 

15from oc_ds_converter.oc_idmanager.oc_data_storage.storage_manager import StorageManager 

16 

17 

18def get_storage_manager(storage_path: str | None, testing: bool) -> StorageManager: 

19 if storage_path: 

20 if not os.path.exists(storage_path): 

21 parent_dir = os.path.abspath(os.path.join(storage_path, os.pardir)) 

22 if not os.path.exists(parent_dir): 

23 Path(parent_dir).mkdir(parents=True, exist_ok=True) 

24 if storage_path.endswith(".db"): 

25 return SqliteStorageManager(storage_path) 

26 if storage_path.endswith(".json"): 

27 return InMemoryStorageManager(storage_path) 

28 raise ValueError(f"Storage path must end with .db or .json, got: {storage_path}") 

29 return RedisStorageManager(testing=testing) 

30 

31 

32def normalize_cache_path(cache: str | None) -> str: 

33 if cache: 

34 if not cache.endswith(".json"): 

35 return os.path.join(os.getcwd(), "cache.json") 

36 parent_dir = os.path.abspath(os.path.join(cache, os.pardir)) 

37 if not os.path.exists(parent_dir): 

38 Path(parent_dir).mkdir(parents=True, exist_ok=True) 

39 return cache 

40 return os.path.join(os.getcwd(), "cache.json") 

41 

42 

43def init_process_cache(cache_path: str, lock: BaseFileLock) -> dict[str, list[str]]: 

44 cache_dict: dict[str, list[str]] = {"citing": [], "cited": []} 

45 write_new = False 

46 if os.path.exists(cache_path): 

47 with lock: 

48 with open(cache_path, "r", encoding="utf-8") as c: 

49 try: 

50 cache_dict = json.load(c) 

51 except json.JSONDecodeError: 

52 write_new = True 

53 else: 

54 write_new = True 

55 if write_new: 

56 with lock: 

57 with open(cache_path, "w", encoding="utf-8") as c: 

58 json.dump(cache_dict, c) 

59 return cache_dict 

60 

61 

62def mark_file_completed( 

63 cache_path: str, 

64 lock: BaseFileLock, 

65 filename: str, 

66 processing_citing: bool, 

67) -> None: 

68 iteration_key = "citing" if processing_citing else "cited" 

69 with lock: 

70 cache_dict: dict[str, list[str]] = {} 

71 if os.path.exists(cache_path): 

72 with open(cache_path, "r", encoding="utf-8") as f: 

73 cache_dict = json.load(f) 

74 if iteration_key not in cache_dict: 

75 cache_dict[iteration_key] = [] 

76 file_basename = Path(filename).name 

77 if file_basename not in cache_dict[iteration_key]: 

78 cache_dict[iteration_key].append(file_basename) 

79 with open(cache_path, "w", encoding="utf-8") as f: 

80 json.dump(cache_dict, f) 

81 

82 

83def delete_cache_files(cache_path: str) -> None: 

84 if os.path.exists(cache_path): 

85 os.remove(cache_path) 

86 lock_file = cache_path + ".lock" 

87 if os.path.exists(lock_file): 

88 os.remove(lock_file) 

89 

90 

91def create_output_dirs(csv_dir: str) -> str: 

92 if not os.path.exists(csv_dir): 

93 os.makedirs(csv_dir) 

94 citations_dir = csv_dir + "_citations" 

95 if not os.path.exists(citations_dir): 

96 os.makedirs(citations_dir) 

97 return citations_dir 

98 

99 

100def write_csv_output(filepath: str, rows: list[dict[str, str]]) -> None: 

101 if not rows: 

102 return 

103 with open(filepath, 'w', newline='', encoding='utf-8') as output_file: 

104 dict_writer = csv.DictWriter( 

105 output_file, rows[0].keys(), delimiter=',', quotechar='"', 

106 quoting=csv.QUOTE_NONNUMERIC, escapechar='\\' 

107 ) 

108 dict_writer.writeheader() 

109 dict_writer.writerows(rows) 

110 

111 

112def cleanup_storage(testing: bool) -> None: 

113 storage_manager = RedisStorageManager(testing=testing) 

114 storage_manager.delete_storage() 

115 

116 

117def is_file_in_cache( 

118 cache_dict: dict[str, list[str]], 

119 filename: str, 

120 processing_citing: bool, 

121) -> bool: 

122 file_basename = Path(filename).name 

123 if processing_citing: 

124 return bool(cache_dict.get("citing") and file_basename in cache_dict["citing"]) 

125 return bool(cache_dict.get("cited") and file_basename in cache_dict["cited"])