Coverage for oc_ds_converter / lib / crossref_style_processing.py: 92%

178 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-25 18:06 +0000

1# SPDX-FileCopyrightText: 2022-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5from __future__ import annotations 

6 

7import html 

8import json 

9import os 

10import re 

11from abc import abstractmethod 

12from pathlib import Path 

13 

14from bs4 import BeautifulSoup 

15 

16from oc_ds_converter.datasource.orcid_index import OrcidIndexRedis, PublishersRedis 

17from oc_ds_converter.datasource.redis import FakeRedisWrapper, RedisDataSource 

18from oc_ds_converter.oc_idmanager import ORCIDManager 

19from oc_ds_converter.oc_idmanager.doi import DOIManager 

20from oc_ds_converter.oc_idmanager.issn import ISSNManager 

21from oc_ds_converter.oc_idmanager.oc_data_storage.batch_manager import BatchManager 

22from oc_ds_converter.oc_idmanager.oc_data_storage.redis_manager import RedisStorageManager 

23from oc_ds_converter.oc_idmanager.oc_data_storage.storage_manager import StorageManager 

24from oc_ds_converter.ra_processor import RaProcessor 

25 

26 

27class CrossrefStyleProcessing(RaProcessor): 

28 """Base class for processors that follow the Crossref-style pattern. 

29 

30 This includes common infrastructure for: 

31 - Redis-based storage and validation 

32 - ORCID index prefetching 

33 - Two-pass processing (citing/cited entities) 

34 """ 

35 

36 @staticmethod 

37 def clean_markup(text: str) -> str: 

38 if '<' in text: 

39 soup = BeautifulSoup(text, 'html.parser') 

40 text = soup.get_text() 

41 return html.unescape(text).replace('\n', '') 

42 

43 def __init__( 

44 self, 

45 orcid_index: str | None = None, 

46 publishers_filepath: str | None = None, 

47 storage_manager: StorageManager | None = None, 

48 testing: bool = True, 

49 citing: bool = True, 

50 use_redis_orcid_index: bool = False, 

51 use_orcid_api: bool = True, 

52 use_redis_publishers: bool = False, 

53 ): 

54 orcid_index_obj = ( 

55 OrcidIndexRedis(testing=testing) 

56 if use_redis_orcid_index and orcid_index is None 

57 else orcid_index 

58 ) 

59 super().__init__(orcid_index_obj, publishers_filepath) 

60 self.citing = citing 

61 self._testing = testing 

62 self.use_orcid_api = use_orcid_api 

63 self.use_redis_publishers = use_redis_publishers 

64 self._publishers_redis: PublishersRedis | None = None 

65 if use_redis_publishers: 

66 self._publishers_redis = PublishersRedis(testing=testing) 

67 

68 if storage_manager is None: 

69 self.storage_manager = RedisStorageManager(testing=testing) 

70 else: 

71 self.storage_manager = storage_manager 

72 

73 self.temporary_manager = BatchManager() 

74 

75 self.doi_m = DOIManager(storage_manager=self.storage_manager, testing=testing) 

76 self.issn_m = ISSNManager() 

77 

78 self.venue_id_man_dict: dict[str, object] = {"issn": self.issn_m} 

79 

80 self.tmp_doi_m = DOIManager(storage_manager=self.temporary_manager, testing=testing) 

81 

82 self.venue_tmp_id_man_dict: dict[str, object] = {"issn": self.issn_m} 

83 

84 self.orcid_m = ORCIDManager( 

85 storage_manager=self.storage_manager, testing=testing, use_api_service=use_orcid_api 

86 ) 

87 self.tmp_orcid_m = ORCIDManager( 

88 storage_manager=self.temporary_manager, testing=testing, use_api_service=use_orcid_api 

89 ) 

90 

91 if testing: 

92 self.BR_redis: FakeRedisWrapper | RedisDataSource = FakeRedisWrapper() 

93 self.RA_redis: FakeRedisWrapper | RedisDataSource = FakeRedisWrapper() 

94 else: 

95 self.BR_redis = RedisDataSource("DB-META-BR") 

96 self.RA_redis = RedisDataSource("DB-META-RA") 

97 

98 self._redis_values_br: list[str] = [] 

99 self._redis_values_ra: list[str] = [] 

100 self._doi_orcid_cache: dict[str, set[str]] = {} 

101 

102 def update_redis_values(self, br: list[str], ra: list[str] | None = None) -> None: 

103 self._redis_values_br = [ 

104 x for x in ( 

105 self.doi_m.normalise(b, include_prefix=True) for b in (br or []) 

106 ) if x 

107 ] 

108 self._redis_values_ra = [ 

109 x for x in ( 

110 self.orcid_m.normalise(r, include_prefix=True) for r in (ra or []) 

111 ) if x 

112 ] 

113 

114 def prefetch_doi_orcid_index(self, dois: list[str]) -> None: 

115 keys = [ 

116 norm for doi in dois 

117 if (norm := self.doi_m.normalise(doi, include_prefix=True)) 

118 ] 

119 self._doi_orcid_cache = self.orcid_index.get_values_batch(keys) 

120 

121 def orcid_finder(self, doi: str) -> dict[str, str]: 

122 norm_doi = self.doi_m.normalise(doi, include_prefix=True) 

123 if not norm_doi: 

124 return {} 

125 people = self._doi_orcid_cache.get(norm_doi) 

126 if not people: 

127 return {} 

128 found: dict[str, str] = {} 

129 for person in people: 

130 match = re.search(r'\d{4}-\d{4}-\d{4}-\d{3}[\dX]', person) 

131 if match: 

132 orcid = match.group(0) 

133 name = person[:person.find(orcid) - 1] 

134 found[orcid] = name.strip().lower() 

135 return found 

136 

137 def memory_to_storage(self) -> None: 

138 kv_in_memory = self.temporary_manager.get_validity_list_of_tuples() 

139 if kv_in_memory: 

140 self.storage_manager.set_multi_value(kv_in_memory) 

141 self.temporary_manager.delete_storage() 

142 

143 def get_id_manager( 

144 self, schema_or_id: str, id_man_dict: dict[str, object] 

145 ) -> object | None: 

146 if ":" in schema_or_id: 

147 schema = schema_or_id.split(":")[0] 

148 else: 

149 schema = schema_or_id 

150 return id_man_dict.get(schema) 

151 

152 def dict_to_cache(self, dict_to_be_saved: dict[str, list[str]], path: str) -> None: 

153 path_obj = Path(path) 

154 parent_dir_path = path_obj.parent.absolute() 

155 if not os.path.exists(parent_dir_path): 

156 Path(parent_dir_path).mkdir(parents=True, exist_ok=True) 

157 with open(path_obj, "w", encoding="utf-8") as fd: 

158 json.dump(dict_to_be_saved, fd, ensure_ascii=False, indent=4) 

159 

160 def get_redis_validity_list(self, id_list: list[str], redis_db: str) -> list[str]: 

161 ids = list(id_list) 

162 if redis_db == "ra": 

163 validity = self.RA_redis.mexists_as_set(ids) 

164 return [ids[i] for i, v in enumerate(validity) if v] 

165 if redis_db == "br": 

166 validity = self.BR_redis.mexists_as_set(ids) 

167 return [ids[i] for i, v in enumerate(validity) if v] 

168 raise ValueError("redis_db must be either 'ra' or 'br'") 

169 

170 def validated_as(self, id_dict: dict[str, str]) -> bool | None: 

171 schema = id_dict["schema"].strip().lower() 

172 identifier = id_dict["identifier"] 

173 

174 if schema == "orcid": 

175 validity_value = self.tmp_orcid_m.validated_as_id(identifier) 

176 if validity_value is None: 

177 validity_value = self.orcid_m.validated_as_id(identifier) 

178 return validity_value 

179 

180 if schema == "doi": 

181 validity_value = self.tmp_doi_m.validated_as_id(identifier) 

182 if validity_value is None: 

183 validity_value = self.doi_m.validated_as_id(identifier) 

184 return validity_value 

185 return None 

186 

187 def to_validated_id_list(self, norm_id_dict: dict[str, str]) -> list[str]: 

188 valid_id_list: list[str] = [] 

189 norm_id = norm_id_dict["id"] 

190 schema = norm_id_dict["schema"] 

191 

192 if schema == "doi": 

193 if norm_id in self._redis_values_br: 

194 self.tmp_doi_m.storage_manager.set_value(norm_id, True) 

195 valid_id_list.append(norm_id) 

196 elif self.tmp_doi_m.is_valid(norm_id): 

197 valid_id_list.append(norm_id) 

198 

199 elif schema == "orcid": 

200 if norm_id in self._redis_values_ra: 

201 self.tmp_orcid_m.storage_manager.set_value(norm_id, True) 

202 valid_id_list.append(norm_id) 

203 elif not self.use_orcid_api: 

204 pass 

205 elif self.tmp_orcid_m.is_valid(norm_id): 

206 valid_id_list.append(norm_id) 

207 

208 return valid_id_list 

209 

210 def get_publisher_by_prefix(self, prefix: str) -> tuple[str, str] | None: 

211 """Look up publisher by DOI prefix. Returns (name, member_id) or None.""" 

212 if self.use_redis_publishers and self._publishers_redis: 

213 pub_data = self._publishers_redis.get_by_prefix(prefix) 

214 if pub_data: 

215 member_id = self._publishers_redis._r.get( 

216 f"{self._publishers_redis.DOI_PREFIX_KEY}{prefix}" 

217 ) 

218 return str(pub_data['name']), str(member_id) 

219 return None 

220 if self.publishers_mapping: 

221 for member, data in self.publishers_mapping.items(): 

222 if prefix in data['prefixes']: 

223 return str(data['name']), member 

224 return None 

225 

226 def _extract_volume(self, item: dict) -> str: 

227 return item.get('volume', '') 

228 

229 def _extract_issue(self, item: dict) -> str: 

230 return item.get('issue', '') 

231 

232 def csv_creator(self, item: dict) -> dict: 

233 doi = self._extract_doi(item) 

234 if not doi: 

235 return {} 

236 norm_id = self.doi_m.normalise(doi, include_prefix=True) 

237 if not norm_id: 

238 return {} 

239 

240 authors_list = self._extract_agents(item) 

241 authors_string_list, editors_string_list = self.get_agents_strings_list(doi, authors_list) 

242 

243 metadata = { 

244 'id': norm_id, 

245 'title': self._extract_title(item), 

246 'author': '; '.join(authors_string_list), 

247 'issue': self._extract_issue(item), 

248 'volume': self._extract_volume(item), 

249 'venue': self._extract_venue(item), 

250 'pub_date': self._extract_pub_date(item), 

251 'page': self._extract_pages(item), 

252 'type': self._extract_type(item), 

253 'publisher': self._extract_publisher(item), 

254 'editor': '; '.join(editors_string_list) 

255 } 

256 return self.normalise_unicode(metadata) 

257 

258 @abstractmethod 

259 def _extract_doi(self, item: dict) -> str: 

260 """Extract DOI from item dict.""" 

261 ... 

262 

263 @abstractmethod 

264 def _extract_title(self, item: dict) -> str: 

265 """Extract title from item dict.""" 

266 ... 

267 

268 @abstractmethod 

269 def _extract_agents(self, item: dict) -> list[dict]: 

270 """Extract list of agents (authors/editors) from item dict.""" 

271 ... 

272 

273 @abstractmethod 

274 def _extract_venue(self, item: dict) -> str: 

275 """Extract venue name and IDs from item dict.""" 

276 ... 

277 

278 @abstractmethod 

279 def _extract_pub_date(self, item: dict) -> str: 

280 """Extract publication date from item dict.""" 

281 ... 

282 

283 @abstractmethod 

284 def _extract_pages(self, item: dict) -> str: 

285 """Extract page range from item dict.""" 

286 ... 

287 

288 @abstractmethod 

289 def _extract_type(self, item: dict) -> str: 

290 """Extract publication type from item dict.""" 

291 ... 

292 

293 @abstractmethod 

294 def _extract_publisher(self, item: dict) -> str: 

295 """Extract publisher name from item dict.""" 

296 ... 

297 

298 @abstractmethod 

299 def extract_all_ids(self, entity_dict: dict, is_citing: bool) -> tuple[list[str], list[str]]: 

300 """Extract all IDs from entity dict for validation.""" 

301 ...