Coverage for oc_ds_converter / oc_idmanager / pmid.py: 74%
297 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
1# SPDX-FileCopyrightText: 2023 Arianna Moretti <arianna.moretti4@unibo.it>
2# SPDX-FileCopyrightText: 2023-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
3# SPDX-FileCopyrightText: 2024 Ivan Heibi <ivan.heibi2@unibo.it>
4#
5# SPDX-License-Identifier: ISC
8import re
9from datetime import datetime
10from re import match, sub
11from time import sleep
12from urllib.parse import quote
14from bs4 import BeautifulSoup
15from oc_ds_converter.oc_idmanager import *
16from oc_ds_converter.oc_idmanager.base import IdentifierManager
17from requests import ReadTimeout, get
18from requests.exceptions import ConnectionError
20from oc_ds_converter.oc_idmanager.oc_data_storage.redis_manager import RedisStorageManager
21from oc_ds_converter.oc_idmanager.oc_data_storage.storage_manager import StorageManager
25class PMIDManager(IdentifierManager):
26 """This class implements an identifier manager for pmid identifier"""
28 def __init__(self, use_api_service: bool = True, storage_manager: StorageManager | None = None, testing: bool = True) -> None:
29 """PMID manager constructor."""
30 super(PMIDManager, self).__init__()
31 self._api = "https://pubmed.ncbi.nlm.nih.gov/"
32 self._use_api_service = use_api_service
33 if storage_manager is None:
34 self.storage_manager = RedisStorageManager(testing=testing)
35 else:
36 self.storage_manager = storage_manager
38 self._p = "pmid:"
39 self._im = ISSNManager()
40 #regex
41 self._doi_regex = r"(?<=^AID\s-\s).*\[doi\]\s*\n"
42 self._pmid_regex = r"(?<=PMID-\s)[1-9]\d*"
43 self._title_regex = r"(?<=^TI\s{2}-\s)(.+?)*(\n\s{6}(.+?)*)*(?=(?:\n[A-Z]{2,4}\s{,2}-\s*|$))"
44 self._author_regex = r"(?<=^FAU\s-\s)(.+?)*(\n\s{6}(.+?)*)*(?=(?:\n[A-Z]{2,4}\s{,2}-\s*|$))"
45 self._date_regex = r"DP\s+-\s+(\d{4}(\s?(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec))?(\s?((3[0-1])|([1-2][0-9])|([0]?[1-9])))?)"
46 self._issn_regex = r"(?<=^IS\s{2}-\s)[0-9]{4}-[0-9]{3}[0-9X]"
47 self._journal_regex = r"(?<=^JT\s{2}-\s)(.+?)*(\n\s{6}(.+?)*)*(?=(?:\n[A-Z]{2,4}\s{,2}-\s*|$))"
48 self._volume_regex = r"(?<=^VI\s{2}-\s)(.+?)*(\n\s{6}(.+?)*)*(?=(?:\n[A-Z]{2,4}\s{,2}-\s*|$))"
49 self._issue_regex = r"(?<=^IP\s{2}-\s)(.+?)*(\n\s{6}(.+?)*)*(?=(?:\n[A-Z]{2,4}\s{,2}-\s*|$))"
50 self._page_regex = r"(?<=^PG\s{2}-\s)(.+?)*(\n\s{6}(.+?)*)*(?=(?:\n[A-Z]{2,4}\s{,2}-\s*|$))"
51 self._type_regex = r"(?<=^PT\s{2}-\s)(.+?)*(\n\s{6}(.+?)*)*(?=(?:\n[A-Z]{2,4}\s{,2}-\s*|$))"
52 self._publisher_regex = r"(?<=^PB\s{2}-\s)(.+?)*(\n\s{6}(.+?)*)*(?=(?:\n[A-Z]{2,4}\s{,2}-\s*|$))"
53 self._editor_regex = r"((?<=^FED\s-\s)|(?<=^ED\s{2}-\s))(.+?)*(\n\s{6}(.+?)*)*(?=(?:\n[A-Z]{2,4}\s{,2}-\s*|$))"
55 def validated_as_id(self, id_string):
56 arxiv_vaidation_value = self.storage_manager.get_value(id_string)
57 if isinstance(arxiv_vaidation_value, bool):
58 return arxiv_vaidation_value
59 else:
60 return None
62 def is_valid(self, pmid, get_extra_info=False):
63 pmid = self.normalise(pmid, include_prefix=True)
65 if not pmid:
66 return False
67 else:
68 pmid_vaidation_value = self.storage_manager.get_value(pmid)
69 if isinstance(pmid_vaidation_value, bool):
70 return pmid_vaidation_value
71 else:
72 if get_extra_info:
73 info = self.exists(pmid, get_extra_info=True)
74 self.storage_manager.set_full_value(pmid,info[1])
75 return (info[0] and self.syntax_ok(pmid)), info[1]
76 validity_check = self.syntax_ok(pmid) and self.exists(pmid)
77 self.storage_manager.set_value(pmid, validity_check)
79 return validity_check
83 def normalise(self, id_string, include_prefix=False):
84 id_string = str(id_string)
85 try:
86 pmid_string = sub(r"^0+", "", sub(r"\0+", "", (sub(r"[^\d+]", "", id_string))))
87 return "%s%s" % (self._p if include_prefix else "", pmid_string)
88 except:
89 # Any error in processing the PMID will return None
90 return None
92 def syntax_ok(self, id_string):
93 if not id_string.startswith(self._p):
94 id_string = self._p + id_string
95 return True if match(r"^pmid:[1-9]\d*$", id_string) else False
97 def exists(self, pmid_full, get_extra_info=False, allow_extra_api=None):
98 valid_bool = True
99 pmid = pmid_full
100 pmid_p = "pmid:"+pmid if not pmid.startswith("pmid:") else pmid
101 if self._use_api_service:
102 pmid = self.normalise(pmid_full)
103 pmid_p = self.normalise(pmid_full, include_prefix=True)
104 if pmid is not None:
105 tentative = 3
106 while tentative:
107 tentative -= 1
108 try:
109 r = get(
110 self._api + quote(pmid) + "/?format=pubmed",
111 headers=self._headers,
112 timeout=30,
113 )
114 if r.status_code == 200:
115 r.encoding = "utf-8"
116 soup = BeautifulSoup(r.text, features="lxml")
117 txt_obj = str(soup.find(id="article-details"))
118 match_pmid = re.finditer(self._pmid_regex, txt_obj, re.MULTILINE)
119 for matchNum_pmid, match_p in enumerate(match_pmid, start=1):
120 m_pmid = match_p.group()
121 if m_pmid:
122 if get_extra_info:
123 result = self.extra_info(txt_obj)
124 result["id"] = pmid_p
125 return True, result
126 return True
127 elif r.status_code == 404:
128 if get_extra_info:
129 return False, {"id":pmid_p, "valid": False}
130 return False
132 except ReadTimeout:
133 # Do nothing, just try again
134 pass
135 except ConnectionError:
136 # Sleep 5 seconds, then try again
137 sleep(5)
138 valid_bool = False
139 else:
140 if get_extra_info:
141 return False, {"id":pmid_p, "valid": False}
142 return False
143 if get_extra_info:
144 return valid_bool, {"id":pmid_p, "valid": valid_bool}
145 return valid_bool
147 def extra_info(self, api_response, choose_api=None, info_dict={}):
148 result = {}
149 result["valid"] = True
151 try:
152 title = ""
153 match_title = re.finditer(self._title_regex, api_response, re.MULTILINE)
154 for matchNum_tit, match_tit in enumerate(match_title, start=1):
155 m_title = match_tit.group()
156 if m_title:
157 ts = re.sub(r"\s+", " ", m_title)
158 t = re.sub(r"\n", " ", ts)
159 norm_title = t.strip()
160 if norm_title is not None:
161 title = norm_title
162 break
163 except:
164 title = ""
166 result["title"] = title
168 try:
169 authors = set()
170 fa_aut = re.finditer(self._author_regex, api_response, re.MULTILINE)
171 for matchNum_aut, match_au in enumerate(fa_aut, start=1):
172 m_aut = match_au.group()
173 if m_aut:
174 fau = re.sub(r"\s+", " ", m_aut)
175 nlfau = re.sub(r"\n", " ", fau)
176 norm_fau = nlfau.strip()
177 if norm_fau is not None:
178 authors.add(norm_fau)
179 authorsList = list(authors)
180 except:
181 authorsList = []
183 result["author"] = authorsList
185 try:
186 date = re.search(self._date_regex,
187 api_response,
188 re.IGNORECASE,
189 ).group(1)
190 re_search = re.search(
191 r"(\d{4})\s+(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s+((3[0-1])|([1-2][0-9])|([0]?[1-9]))",
192 date,
193 re.IGNORECASE,
194 )
195 if re_search is not None:
196 src = re_search.group(0)
197 datetime_object = datetime.strptime(src, "%Y %b %d")
198 pmid_date = datetime.strftime(datetime_object, "%Y-%m-%d")
199 else:
200 re_search = re.search(
201 r"(\d{4})\s+(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)",
202 date,
203 re.IGNORECASE,
204 )
205 if re_search is not None:
206 src = re_search.group(0)
207 datetime_object = datetime.strptime(src, "%Y %b")
208 pmid_date = datetime.strftime(datetime_object, "%Y-%m")
209 else:
210 re_search = re.search(r"(\d{4})", date)
211 if re_search is not None:
212 src = re.search(r"(\d{4})", date).group(0)
213 datetime_object = datetime.strptime(src, "%Y")
214 pmid_date = datetime.strftime(datetime_object, "%Y")
215 else:
216 pmid_date = ""
217 except:
218 pmid_date = ""
219 result["pub_date"] = pmid_date
221 try:
222 issnset = set()
223 fa_issn = re.finditer(self._issn_regex, api_response, re.MULTILINE)
224 for matchNum_issn, match_issn in enumerate(fa_issn, start=1):
225 m_issn = match_issn.group()
226 if m_issn:
227 norm_issn = self._im.normalise(m_issn, include_prefix=True)
228 if norm_issn is not None:
229 issnset.add(norm_issn)
230 issnlist = list(issnset)
231 except:
232 issnlist = []
234 # CONTINUA DA QUI
236 try:
237 jur_title = ""
238 fa_jur_title = re.finditer(self._journal_regex, api_response, re.MULTILINE)
239 for matchNum_title, match_tit in enumerate(fa_jur_title, start=1):
240 m_title = match_tit.group()
241 if m_title:
242 s_jt = re.sub(r"\s+", " ", m_title)
243 n_jt = re.sub(r"\n", " ", s_jt)
244 norm_jour = n_jt.strip()
245 if norm_jour is not None:
246 jur_title = norm_jour
247 break
248 except:
249 jur_title = ""
251 result["venue"] = (
252 f'{jur_title} {[x for x in issnlist]}' if jur_title else str(issnlist).replace(",", "")).replace("'", "")
254 try:
255 volume = ""
256 fa_volume = re.finditer(self._volume_regex, api_response, re.MULTILINE)
257 for matchNum_volume, match_vol in enumerate(fa_volume, start=1):
258 m_vol = match_vol.group()
259 if m_vol:
260 vol = re.sub(r"\s+", " ", m_vol)
261 norm_volume = vol.strip()
262 if norm_volume is not None:
263 volume = norm_volume
264 break
265 except:
266 volume = ""
268 result["volume"] = volume
270 try:
271 issue = ""
272 fa_issue = re.finditer(self._issue_regex, api_response, re.MULTILINE)
273 for matchNum_issue, match_issue in enumerate(fa_issue, start=1):
274 m_issue = match_issue.group()
275 if m_issue:
276 s_issue = re.sub(r"\s+", " ", m_issue)
277 n_issue = re.sub(r"\n", " ", s_issue)
278 norm_issue = n_issue.strip()
279 if norm_issue is not None:
280 issue = norm_issue
281 break
282 except:
283 issue = ""
285 result["issue"] = issue
287 try:
288 pag = ""
289 fa_pag = re.finditer(self._page_regex, api_response, re.MULTILINE)
290 for matchNum_pag, match_pag in enumerate(fa_pag, start=1):
291 m_pag = match_pag.group()
292 if m_pag:
293 s_pg = re.sub(r"\s+", " ", m_pag)
294 n_pg = re.sub(r"\n", " ", s_pg)
295 norm_pag = n_pg.strip()
296 if norm_pag is not None:
297 pag = norm_pag
298 break
299 except:
300 pag = ""
302 result["page"] = pag
304 try:
305 pub_types = set()
306 types = re.finditer(self._type_regex, api_response, re.MULTILINE)
307 for matchNum_types, match_types in enumerate(types, start=1):
308 m_type = match_types.group()
309 if m_type:
310 s_ty = re.sub(r"\s+", " ", m_type)
311 b_ty = re.sub(r"\n", " ", s_ty)
312 norm_type = b_ty.strip().lower()
313 if norm_type is not None:
314 pub_types.add(norm_type)
315 typeslist = list(pub_types)
316 except:
317 typeslist = []
319 result["type"] = typeslist
321 try:
322 publisher = set()
323 publishers = re.finditer(self._publisher_regex, api_response, re.MULTILINE)
324 for matchNum_publishers, match_publishers in enumerate(publishers, start=1):
325 m_publishers = match_publishers.group()
326 if m_publishers:
327 s_pbs = re.sub(r"\s+", " ", m_publishers)
328 n_pbs = re.sub(r"\n", " ", s_pbs)
329 norm_pbs = n_pbs.strip()
330 if norm_pbs is not None:
331 publisher.add(norm_pbs)
332 publisherlist = list(publisher)
333 except:
334 publisherlist = []
336 result["publisher"] = publisherlist
338 try:
339 editor = set()
340 editors = re.finditer(self._editor_regex, api_response, re.MULTILINE)
341 for matchNum_editors, match_editors in enumerate(editors, start=1):
342 m_editors = match_editors.group()
343 if m_editors:
344 s_ed = re.sub(r"\s+", " ", m_editors)
345 n_ed = re.sub(r"\n", " ", s_ed)
346 norm_ed = n_ed.strip()
347 if norm_ed is not None:
348 editor.add(norm_ed)
349 editorlist = list(editor)
350 except:
351 editorlist = []
353 result["editor"] = editorlist
355 doi = ""
356 try:
357 map_doi = re.finditer(self._doi_regex, api_response, re.MULTILINE)
358 for matchNum_doi, match_doi in enumerate(map_doi, start=1):
359 m_doi = match_doi.group()
360 if m_doi:
361 id = re.sub(r"\s+", " ", m_doi)
362 n_id = re.sub(r"\n", " ", id)
363 n_id_strip = n_id.strip()
365 if n_id_strip.endswith('[doi]'):
366 n_id_strip = n_id_strip[:-5]
367 dm = DOIManager()
368 norm_id = dm.normalise(n_id_strip)
369 if norm_id is not None:
370 doi = norm_id
371 break
372 else:
373 doi = ""
374 except:
375 doi = ""
377 result["doi"] = doi
379 return result