Coverage for oc_ds_converter / lib / crossref_style_processing.py: 92%

184 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-06-12 21:23 +0000

1# SPDX-FileCopyrightText: 2022-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5from __future__ import annotations 

6 

7import html 

8import json 

9import os 

10import re 

11from abc import abstractmethod 

12from pathlib import Path 

13 

14from bs4 import BeautifulSoup 

15 

16from oc_ds_converter.datasource.orcid_index import OrcidIndexRedis, PublishersRedis 

17from oc_ds_converter.datasource.redis import FakeRedisWrapper, RedisDataSource 

18from oc_ds_converter.oc_idmanager import ORCIDManager 

19from oc_ds_converter.oc_idmanager.doi import DOIManager 

20from oc_ds_converter.oc_idmanager.issn import ISSNManager 

21from oc_ds_converter.oc_idmanager.oc_data_storage.batch_manager import BatchManager 

22from oc_ds_converter.oc_idmanager.oc_data_storage.redis_manager import RedisStorageManager 

23from oc_ds_converter.oc_idmanager.oc_data_storage.storage_manager import StorageManager 

24from oc_ds_converter.ra_processor import RaProcessor 

25 

26 

27_WHITESPACE_RE = re.compile(r'\s+') 

28 

29 

30class CrossrefStyleProcessing(RaProcessor): 

31 """Base class for processors that follow the Crossref-style pattern. 

32 

33 This includes common infrastructure for: 

34 - Redis-based storage and validation 

35 - ORCID index prefetching 

36 - Two-pass processing (citing/cited entities) 

37 """ 

38 

39 @staticmethod 

40 def clean_markup(text: str) -> str: 

41 if '<' in text: 

42 soup = BeautifulSoup(text, 'html.parser') 

43 text = soup.get_text() 

44 return html.unescape(text).replace('\n', '') 

45 

46 @staticmethod 

47 def sanitize_text(value: str) -> str: 

48 """Resolve HTML entities and flatten every whitespace run to a single 

49 space. Entity resolution runs only when ``&`` appears in the input 

50 (cheap membership test) because ``html.unescape`` is pure Python and 

51 scans the whole string. Running the collapse *after* the unescape is 

52 deliberate: sources sometimes encode embedded control characters as 

53 ``&#13;&#10;``, and the decoded ``\\r``/``\\n`` must be collapsed too 

54 or they will survive into the downstream CSV. 

55 """ 

56 if '&' in value: 

57 value = html.unescape(value) 

58 return _WHITESPACE_RE.sub(' ', value).strip() 

59 

60 def __init__( 

61 self, 

62 orcid_index: str | None = None, 

63 publishers_filepath: str | None = None, 

64 storage_manager: StorageManager | None = None, 

65 testing: bool = True, 

66 citing: bool = True, 

67 use_redis_orcid_index: bool = False, 

68 use_orcid_api: bool = True, 

69 use_redis_publishers: bool = False, 

70 ): 

71 orcid_index_obj = ( 

72 OrcidIndexRedis(testing=testing) 

73 if use_redis_orcid_index and orcid_index is None 

74 else orcid_index 

75 ) 

76 super().__init__(orcid_index_obj, publishers_filepath) 

77 self.citing = citing 

78 self._testing = testing 

79 self.use_orcid_api = use_orcid_api 

80 self.use_redis_publishers = use_redis_publishers 

81 self._publishers_redis: PublishersRedis | None = None 

82 if use_redis_publishers: 

83 self._publishers_redis = PublishersRedis(testing=testing) 

84 

85 if storage_manager is None: 

86 self.storage_manager = RedisStorageManager(testing=testing) 

87 else: 

88 self.storage_manager = storage_manager 

89 

90 self.temporary_manager = BatchManager() 

91 

92 self.doi_m = DOIManager(storage_manager=self.storage_manager, testing=testing) 

93 self.issn_m = ISSNManager() 

94 

95 self.venue_id_man_dict: dict[str, object] = {"issn": self.issn_m} 

96 

97 self.tmp_doi_m = DOIManager(storage_manager=self.temporary_manager, testing=testing) 

98 

99 self.venue_tmp_id_man_dict: dict[str, object] = {"issn": self.issn_m} 

100 

101 self.orcid_m = ORCIDManager( 

102 storage_manager=self.storage_manager, testing=testing, use_api_service=use_orcid_api 

103 ) 

104 self.tmp_orcid_m = ORCIDManager( 

105 storage_manager=self.temporary_manager, testing=testing, use_api_service=use_orcid_api 

106 ) 

107 

108 if testing: 

109 self.BR_redis: FakeRedisWrapper | RedisDataSource = FakeRedisWrapper() 

110 self.RA_redis: FakeRedisWrapper | RedisDataSource = FakeRedisWrapper() 

111 else: 

112 self.BR_redis = RedisDataSource("DB-META-BR") 

113 self.RA_redis = RedisDataSource("DB-META-RA") 

114 

115 self._redis_values_br: list[str] = [] 

116 self._redis_values_ra: list[str] = [] 

117 self._doi_orcid_cache: dict[str, set[str]] = {} 

118 

119 def update_redis_values(self, br: list[str], ra: list[str] | None = None) -> None: 

120 self._redis_values_br = [ 

121 x for x in ( 

122 self.doi_m.normalise(b, include_prefix=True) for b in (br or []) 

123 ) if x 

124 ] 

125 self._redis_values_ra = [ 

126 x for x in ( 

127 self.orcid_m.normalise(r, include_prefix=True) for r in (ra or []) 

128 ) if x 

129 ] 

130 

131 def prefetch_doi_orcid_index(self, dois: list[str]) -> None: 

132 keys = [ 

133 norm for doi in dois 

134 if (norm := self.doi_m.normalise(doi, include_prefix=True)) 

135 ] 

136 self._doi_orcid_cache = self.orcid_index.get_values_batch(keys) 

137 

138 def orcid_finder(self, doi: str) -> dict[str, str]: 

139 norm_doi = self.doi_m.normalise(doi, include_prefix=True) 

140 if not norm_doi: 

141 return {} 

142 people = self._doi_orcid_cache.get(norm_doi) 

143 if not people: 

144 return {} 

145 found: dict[str, str] = {} 

146 for person in people: 

147 match = re.search(r'\d{4}-\d{4}-\d{4}-\d{3}[\dX]', person) 

148 if match: 

149 orcid = match.group(0) 

150 name = person[:person.find(orcid) - 1] 

151 found[orcid] = name.strip().lower() 

152 return found 

153 

154 def memory_to_storage(self) -> None: 

155 kv_in_memory = self.temporary_manager.get_validity_list_of_tuples() 

156 if kv_in_memory: 

157 self.storage_manager.set_multi_value(kv_in_memory) 

158 self.temporary_manager.delete_storage() 

159 

160 def get_id_manager( 

161 self, schema_or_id: str, id_man_dict: dict[str, object] 

162 ) -> object | None: 

163 if ":" in schema_or_id: 

164 schema = schema_or_id.split(":")[0] 

165 else: 

166 schema = schema_or_id 

167 return id_man_dict.get(schema) 

168 

169 def dict_to_cache(self, dict_to_be_saved: dict[str, list[str]], path: str) -> None: 

170 path_obj = Path(path) 

171 parent_dir_path = path_obj.parent.absolute() 

172 if not os.path.exists(parent_dir_path): 

173 Path(parent_dir_path).mkdir(parents=True, exist_ok=True) 

174 with open(path_obj, "w", encoding="utf-8") as fd: 

175 json.dump(dict_to_be_saved, fd, ensure_ascii=False, indent=4) 

176 

177 def get_redis_validity_list(self, id_list: list[str], redis_db: str) -> list[str]: 

178 ids = list(id_list) 

179 if redis_db == "ra": 

180 validity = self.RA_redis.mexists_as_set(ids) 

181 return [ids[i] for i, v in enumerate(validity) if v] 

182 if redis_db == "br": 

183 validity = self.BR_redis.mexists_as_set(ids) 

184 return [ids[i] for i, v in enumerate(validity) if v] 

185 raise ValueError("redis_db must be either 'ra' or 'br'") 

186 

187 def validated_as(self, id_dict: dict[str, str]) -> bool | None: 

188 schema = id_dict["schema"].strip().lower() 

189 identifier = id_dict["identifier"] 

190 

191 if schema == "orcid": 

192 validity_value = self.tmp_orcid_m.validated_as_id(identifier) 

193 if validity_value is None: 

194 validity_value = self.orcid_m.validated_as_id(identifier) 

195 return validity_value 

196 

197 if schema == "doi": 

198 validity_value = self.tmp_doi_m.validated_as_id(identifier) 

199 if validity_value is None: 

200 validity_value = self.doi_m.validated_as_id(identifier) 

201 return validity_value 

202 return None 

203 

204 def to_validated_id_list(self, norm_id_dict: dict[str, str]) -> list[str]: 

205 valid_id_list: list[str] = [] 

206 norm_id = norm_id_dict["id"] 

207 schema = norm_id_dict["schema"] 

208 

209 if schema == "doi": 

210 if norm_id in self._redis_values_br: 

211 self.tmp_doi_m.storage_manager.set_value(norm_id, True) 

212 valid_id_list.append(norm_id) 

213 elif self.tmp_doi_m.is_valid(norm_id): 

214 valid_id_list.append(norm_id) 

215 

216 elif schema == "orcid": 

217 if norm_id in self._redis_values_ra: 

218 self.tmp_orcid_m.storage_manager.set_value(norm_id, True) 

219 valid_id_list.append(norm_id) 

220 elif not self.use_orcid_api: 

221 pass 

222 elif self.tmp_orcid_m.is_valid(norm_id): 

223 valid_id_list.append(norm_id) 

224 

225 return valid_id_list 

226 

227 def get_publisher_by_prefix(self, prefix: str) -> tuple[str, str] | None: 

228 """Look up publisher by DOI prefix. Returns (name, member_id) or None.""" 

229 if self.use_redis_publishers and self._publishers_redis: 

230 pub_data = self._publishers_redis.get_by_prefix(prefix) 

231 if pub_data: 

232 member_id = self._publishers_redis._r.get( 

233 f"{self._publishers_redis.DOI_PREFIX_KEY}{prefix}" 

234 ) 

235 return str(pub_data['name']), str(member_id) 

236 return None 

237 if self.publishers_mapping: 

238 for member, data in self.publishers_mapping.items(): 

239 if prefix in data['prefixes']: 

240 return str(data['name']), member 

241 return None 

242 

243 def _extract_volume(self, item: dict) -> str: 

244 return item.get('volume', '') 

245 

246 def _extract_issue(self, item: dict) -> str: 

247 return item.get('issue', '') 

248 

249 def csv_creator(self, item: dict) -> dict: 

250 doi = self._extract_doi(item) 

251 if not doi: 

252 return {} 

253 norm_id = self.doi_m.normalise(doi, include_prefix=True) 

254 if not norm_id: 

255 return {} 

256 

257 authors_list = self._extract_agents(item) 

258 authors_string_list, editors_string_list = self.get_agents_strings_list(doi, authors_list) 

259 

260 metadata = { 

261 'id': norm_id, 

262 'title': self._extract_title(item), 

263 'author': '; '.join(authors_string_list), 

264 'issue': self._extract_issue(item), 

265 'volume': self._extract_volume(item), 

266 'venue': self._extract_venue(item), 

267 'pub_date': self._extract_pub_date(item), 

268 'page': self._extract_pages(item), 

269 'type': self._extract_type(item), 

270 'publisher': self._extract_publisher(item), 

271 'editor': '; '.join(editors_string_list) 

272 } 

273 return self.normalise_unicode(metadata) 

274 

275 @abstractmethod 

276 def _extract_doi(self, item: dict) -> str: 

277 """Extract DOI from item dict.""" 

278 ... 

279 

280 @abstractmethod 

281 def _extract_title(self, item: dict) -> str: 

282 """Extract title from item dict.""" 

283 ... 

284 

285 @abstractmethod 

286 def _extract_agents(self, item: dict) -> list[dict]: 

287 """Extract list of agents (authors/editors) from item dict.""" 

288 ... 

289 

290 @abstractmethod 

291 def _extract_venue(self, item: dict) -> str: 

292 """Extract venue name and IDs from item dict.""" 

293 ... 

294 

295 @abstractmethod 

296 def _extract_pub_date(self, item: dict) -> str: 

297 """Extract publication date from item dict.""" 

298 ... 

299 

300 @abstractmethod 

301 def _extract_pages(self, item: dict) -> str: 

302 """Extract page range from item dict.""" 

303 ... 

304 

305 @abstractmethod 

306 def _extract_type(self, item: dict) -> str: 

307 """Extract publication type from item dict.""" 

308 ... 

309 

310 @abstractmethod 

311 def _extract_publisher(self, item: dict) -> str: 

312 """Extract publisher name from item dict.""" 

313 ... 

314 

315 @abstractmethod 

316 def extract_all_ids(self, entity_dict: dict, is_citing: bool) -> tuple[list[str], list[str]]: 

317 """Extract all IDs from entity dict for validation.""" 

318 ...