Coverage for oc_ds_converter / lib / crossref_style_processing.py: 92%
184 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-12 21:23 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-12 21:23 +0000
1# SPDX-FileCopyrightText: 2022-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5from __future__ import annotations
7import html
8import json
9import os
10import re
11from abc import abstractmethod
12from pathlib import Path
14from bs4 import BeautifulSoup
16from oc_ds_converter.datasource.orcid_index import OrcidIndexRedis, PublishersRedis
17from oc_ds_converter.datasource.redis import FakeRedisWrapper, RedisDataSource
18from oc_ds_converter.oc_idmanager import ORCIDManager
19from oc_ds_converter.oc_idmanager.doi import DOIManager
20from oc_ds_converter.oc_idmanager.issn import ISSNManager
21from oc_ds_converter.oc_idmanager.oc_data_storage.batch_manager import BatchManager
22from oc_ds_converter.oc_idmanager.oc_data_storage.redis_manager import RedisStorageManager
23from oc_ds_converter.oc_idmanager.oc_data_storage.storage_manager import StorageManager
24from oc_ds_converter.ra_processor import RaProcessor
27_WHITESPACE_RE = re.compile(r'\s+')
30class CrossrefStyleProcessing(RaProcessor):
31 """Base class for processors that follow the Crossref-style pattern.
33 This includes common infrastructure for:
34 - Redis-based storage and validation
35 - ORCID index prefetching
36 - Two-pass processing (citing/cited entities)
37 """
39 @staticmethod
40 def clean_markup(text: str) -> str:
41 if '<' in text:
42 soup = BeautifulSoup(text, 'html.parser')
43 text = soup.get_text()
44 return html.unescape(text).replace('\n', '')
46 @staticmethod
47 def sanitize_text(value: str) -> str:
48 """Resolve HTML entities and flatten every whitespace run to a single
49 space. Entity resolution runs only when ``&`` appears in the input
50 (cheap membership test) because ``html.unescape`` is pure Python and
51 scans the whole string. Running the collapse *after* the unescape is
52 deliberate: sources sometimes encode embedded control characters as
53 `` ``, and the decoded ``\\r``/``\\n`` must be collapsed too
54 or they will survive into the downstream CSV.
55 """
56 if '&' in value:
57 value = html.unescape(value)
58 return _WHITESPACE_RE.sub(' ', value).strip()
60 def __init__(
61 self,
62 orcid_index: str | None = None,
63 publishers_filepath: str | None = None,
64 storage_manager: StorageManager | None = None,
65 testing: bool = True,
66 citing: bool = True,
67 use_redis_orcid_index: bool = False,
68 use_orcid_api: bool = True,
69 use_redis_publishers: bool = False,
70 ):
71 orcid_index_obj = (
72 OrcidIndexRedis(testing=testing)
73 if use_redis_orcid_index and orcid_index is None
74 else orcid_index
75 )
76 super().__init__(orcid_index_obj, publishers_filepath)
77 self.citing = citing
78 self._testing = testing
79 self.use_orcid_api = use_orcid_api
80 self.use_redis_publishers = use_redis_publishers
81 self._publishers_redis: PublishersRedis | None = None
82 if use_redis_publishers:
83 self._publishers_redis = PublishersRedis(testing=testing)
85 if storage_manager is None:
86 self.storage_manager = RedisStorageManager(testing=testing)
87 else:
88 self.storage_manager = storage_manager
90 self.temporary_manager = BatchManager()
92 self.doi_m = DOIManager(storage_manager=self.storage_manager, testing=testing)
93 self.issn_m = ISSNManager()
95 self.venue_id_man_dict: dict[str, object] = {"issn": self.issn_m}
97 self.tmp_doi_m = DOIManager(storage_manager=self.temporary_manager, testing=testing)
99 self.venue_tmp_id_man_dict: dict[str, object] = {"issn": self.issn_m}
101 self.orcid_m = ORCIDManager(
102 storage_manager=self.storage_manager, testing=testing, use_api_service=use_orcid_api
103 )
104 self.tmp_orcid_m = ORCIDManager(
105 storage_manager=self.temporary_manager, testing=testing, use_api_service=use_orcid_api
106 )
108 if testing:
109 self.BR_redis: FakeRedisWrapper | RedisDataSource = FakeRedisWrapper()
110 self.RA_redis: FakeRedisWrapper | RedisDataSource = FakeRedisWrapper()
111 else:
112 self.BR_redis = RedisDataSource("DB-META-BR")
113 self.RA_redis = RedisDataSource("DB-META-RA")
115 self._redis_values_br: list[str] = []
116 self._redis_values_ra: list[str] = []
117 self._doi_orcid_cache: dict[str, set[str]] = {}
119 def update_redis_values(self, br: list[str], ra: list[str] | None = None) -> None:
120 self._redis_values_br = [
121 x for x in (
122 self.doi_m.normalise(b, include_prefix=True) for b in (br or [])
123 ) if x
124 ]
125 self._redis_values_ra = [
126 x for x in (
127 self.orcid_m.normalise(r, include_prefix=True) for r in (ra or [])
128 ) if x
129 ]
131 def prefetch_doi_orcid_index(self, dois: list[str]) -> None:
132 keys = [
133 norm for doi in dois
134 if (norm := self.doi_m.normalise(doi, include_prefix=True))
135 ]
136 self._doi_orcid_cache = self.orcid_index.get_values_batch(keys)
138 def orcid_finder(self, doi: str) -> dict[str, str]:
139 norm_doi = self.doi_m.normalise(doi, include_prefix=True)
140 if not norm_doi:
141 return {}
142 people = self._doi_orcid_cache.get(norm_doi)
143 if not people:
144 return {}
145 found: dict[str, str] = {}
146 for person in people:
147 match = re.search(r'\d{4}-\d{4}-\d{4}-\d{3}[\dX]', person)
148 if match:
149 orcid = match.group(0)
150 name = person[:person.find(orcid) - 1]
151 found[orcid] = name.strip().lower()
152 return found
154 def memory_to_storage(self) -> None:
155 kv_in_memory = self.temporary_manager.get_validity_list_of_tuples()
156 if kv_in_memory:
157 self.storage_manager.set_multi_value(kv_in_memory)
158 self.temporary_manager.delete_storage()
160 def get_id_manager(
161 self, schema_or_id: str, id_man_dict: dict[str, object]
162 ) -> object | None:
163 if ":" in schema_or_id:
164 schema = schema_or_id.split(":")[0]
165 else:
166 schema = schema_or_id
167 return id_man_dict.get(schema)
169 def dict_to_cache(self, dict_to_be_saved: dict[str, list[str]], path: str) -> None:
170 path_obj = Path(path)
171 parent_dir_path = path_obj.parent.absolute()
172 if not os.path.exists(parent_dir_path):
173 Path(parent_dir_path).mkdir(parents=True, exist_ok=True)
174 with open(path_obj, "w", encoding="utf-8") as fd:
175 json.dump(dict_to_be_saved, fd, ensure_ascii=False, indent=4)
177 def get_redis_validity_list(self, id_list: list[str], redis_db: str) -> list[str]:
178 ids = list(id_list)
179 if redis_db == "ra":
180 validity = self.RA_redis.mexists_as_set(ids)
181 return [ids[i] for i, v in enumerate(validity) if v]
182 if redis_db == "br":
183 validity = self.BR_redis.mexists_as_set(ids)
184 return [ids[i] for i, v in enumerate(validity) if v]
185 raise ValueError("redis_db must be either 'ra' or 'br'")
187 def validated_as(self, id_dict: dict[str, str]) -> bool | None:
188 schema = id_dict["schema"].strip().lower()
189 identifier = id_dict["identifier"]
191 if schema == "orcid":
192 validity_value = self.tmp_orcid_m.validated_as_id(identifier)
193 if validity_value is None:
194 validity_value = self.orcid_m.validated_as_id(identifier)
195 return validity_value
197 if schema == "doi":
198 validity_value = self.tmp_doi_m.validated_as_id(identifier)
199 if validity_value is None:
200 validity_value = self.doi_m.validated_as_id(identifier)
201 return validity_value
202 return None
204 def to_validated_id_list(self, norm_id_dict: dict[str, str]) -> list[str]:
205 valid_id_list: list[str] = []
206 norm_id = norm_id_dict["id"]
207 schema = norm_id_dict["schema"]
209 if schema == "doi":
210 if norm_id in self._redis_values_br:
211 self.tmp_doi_m.storage_manager.set_value(norm_id, True)
212 valid_id_list.append(norm_id)
213 elif self.tmp_doi_m.is_valid(norm_id):
214 valid_id_list.append(norm_id)
216 elif schema == "orcid":
217 if norm_id in self._redis_values_ra:
218 self.tmp_orcid_m.storage_manager.set_value(norm_id, True)
219 valid_id_list.append(norm_id)
220 elif not self.use_orcid_api:
221 pass
222 elif self.tmp_orcid_m.is_valid(norm_id):
223 valid_id_list.append(norm_id)
225 return valid_id_list
227 def get_publisher_by_prefix(self, prefix: str) -> tuple[str, str] | None:
228 """Look up publisher by DOI prefix. Returns (name, member_id) or None."""
229 if self.use_redis_publishers and self._publishers_redis:
230 pub_data = self._publishers_redis.get_by_prefix(prefix)
231 if pub_data:
232 member_id = self._publishers_redis._r.get(
233 f"{self._publishers_redis.DOI_PREFIX_KEY}{prefix}"
234 )
235 return str(pub_data['name']), str(member_id)
236 return None
237 if self.publishers_mapping:
238 for member, data in self.publishers_mapping.items():
239 if prefix in data['prefixes']:
240 return str(data['name']), member
241 return None
243 def _extract_volume(self, item: dict) -> str:
244 return item.get('volume', '')
246 def _extract_issue(self, item: dict) -> str:
247 return item.get('issue', '')
249 def csv_creator(self, item: dict) -> dict:
250 doi = self._extract_doi(item)
251 if not doi:
252 return {}
253 norm_id = self.doi_m.normalise(doi, include_prefix=True)
254 if not norm_id:
255 return {}
257 authors_list = self._extract_agents(item)
258 authors_string_list, editors_string_list = self.get_agents_strings_list(doi, authors_list)
260 metadata = {
261 'id': norm_id,
262 'title': self._extract_title(item),
263 'author': '; '.join(authors_string_list),
264 'issue': self._extract_issue(item),
265 'volume': self._extract_volume(item),
266 'venue': self._extract_venue(item),
267 'pub_date': self._extract_pub_date(item),
268 'page': self._extract_pages(item),
269 'type': self._extract_type(item),
270 'publisher': self._extract_publisher(item),
271 'editor': '; '.join(editors_string_list)
272 }
273 return self.normalise_unicode(metadata)
275 @abstractmethod
276 def _extract_doi(self, item: dict) -> str:
277 """Extract DOI from item dict."""
278 ...
280 @abstractmethod
281 def _extract_title(self, item: dict) -> str:
282 """Extract title from item dict."""
283 ...
285 @abstractmethod
286 def _extract_agents(self, item: dict) -> list[dict]:
287 """Extract list of agents (authors/editors) from item dict."""
288 ...
290 @abstractmethod
291 def _extract_venue(self, item: dict) -> str:
292 """Extract venue name and IDs from item dict."""
293 ...
295 @abstractmethod
296 def _extract_pub_date(self, item: dict) -> str:
297 """Extract publication date from item dict."""
298 ...
300 @abstractmethod
301 def _extract_pages(self, item: dict) -> str:
302 """Extract page range from item dict."""
303 ...
305 @abstractmethod
306 def _extract_type(self, item: dict) -> str:
307 """Extract publication type from item dict."""
308 ...
310 @abstractmethod
311 def _extract_publisher(self, item: dict) -> str:
312 """Extract publisher name from item dict."""
313 ...
315 @abstractmethod
316 def extract_all_ids(self, entity_dict: dict, is_citing: bool) -> tuple[list[str], list[str]]:
317 """Extract all IDs from entity dict for validation."""
318 ...