Coverage for oc_ds_converter / oc_idmanager / pmid.py: 74%

297 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-25 18:06 +0000

1# SPDX-FileCopyrightText: 2023 Arianna Moretti <arianna.moretti4@unibo.it> 

2# SPDX-FileCopyrightText: 2023-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

3# SPDX-FileCopyrightText: 2024 Ivan Heibi <ivan.heibi2@unibo.it> 

4# 

5# SPDX-License-Identifier: ISC 

6 

7 

8import re 

9from datetime import datetime 

10from re import match, sub 

11from time import sleep 

12from urllib.parse import quote 

13 

14from bs4 import BeautifulSoup 

15from oc_ds_converter.oc_idmanager import * 

16from oc_ds_converter.oc_idmanager.base import IdentifierManager 

17from requests import ReadTimeout, get 

18from requests.exceptions import ConnectionError 

19 

20from oc_ds_converter.oc_idmanager.oc_data_storage.redis_manager import RedisStorageManager 

21from oc_ds_converter.oc_idmanager.oc_data_storage.storage_manager import StorageManager 

22 

23 

24 

25class PMIDManager(IdentifierManager): 

26 """This class implements an identifier manager for pmid identifier""" 

27 

28 def __init__(self, use_api_service: bool = True, storage_manager: StorageManager | None = None, testing: bool = True) -> None: 

29 """PMID manager constructor.""" 

30 super(PMIDManager, self).__init__() 

31 self._api = "https://pubmed.ncbi.nlm.nih.gov/" 

32 self._use_api_service = use_api_service 

33 if storage_manager is None: 

34 self.storage_manager = RedisStorageManager(testing=testing) 

35 else: 

36 self.storage_manager = storage_manager 

37 

38 self._p = "pmid:" 

39 self._im = ISSNManager() 

40 #regex 

41 self._doi_regex = r"(?<=^AID\s-\s).*\[doi\]\s*\n" 

42 self._pmid_regex = r"(?<=PMID-\s)[1-9]\d*" 

43 self._title_regex = r"(?<=^TI\s{2}-\s)(.+?)*(\n\s{6}(.+?)*)*(?=(?:\n[A-Z]{2,4}\s{,2}-\s*|$))" 

44 self._author_regex = r"(?<=^FAU\s-\s)(.+?)*(\n\s{6}(.+?)*)*(?=(?:\n[A-Z]{2,4}\s{,2}-\s*|$))" 

45 self._date_regex = r"DP\s+-\s+(\d{4}(\s?(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec))?(\s?((3[0-1])|([1-2][0-9])|([0]?[1-9])))?)" 

46 self._issn_regex = r"(?<=^IS\s{2}-\s)[0-9]{4}-[0-9]{3}[0-9X]" 

47 self._journal_regex = r"(?<=^JT\s{2}-\s)(.+?)*(\n\s{6}(.+?)*)*(?=(?:\n[A-Z]{2,4}\s{,2}-\s*|$))" 

48 self._volume_regex = r"(?<=^VI\s{2}-\s)(.+?)*(\n\s{6}(.+?)*)*(?=(?:\n[A-Z]{2,4}\s{,2}-\s*|$))" 

49 self._issue_regex = r"(?<=^IP\s{2}-\s)(.+?)*(\n\s{6}(.+?)*)*(?=(?:\n[A-Z]{2,4}\s{,2}-\s*|$))" 

50 self._page_regex = r"(?<=^PG\s{2}-\s)(.+?)*(\n\s{6}(.+?)*)*(?=(?:\n[A-Z]{2,4}\s{,2}-\s*|$))" 

51 self._type_regex = r"(?<=^PT\s{2}-\s)(.+?)*(\n\s{6}(.+?)*)*(?=(?:\n[A-Z]{2,4}\s{,2}-\s*|$))" 

52 self._publisher_regex = r"(?<=^PB\s{2}-\s)(.+?)*(\n\s{6}(.+?)*)*(?=(?:\n[A-Z]{2,4}\s{,2}-\s*|$))" 

53 self._editor_regex = r"((?<=^FED\s-\s)|(?<=^ED\s{2}-\s))(.+?)*(\n\s{6}(.+?)*)*(?=(?:\n[A-Z]{2,4}\s{,2}-\s*|$))" 

54 

55 def validated_as_id(self, id_string): 

56 arxiv_vaidation_value = self.storage_manager.get_value(id_string) 

57 if isinstance(arxiv_vaidation_value, bool): 

58 return arxiv_vaidation_value 

59 else: 

60 return None 

61 

62 def is_valid(self, pmid, get_extra_info=False): 

63 pmid = self.normalise(pmid, include_prefix=True) 

64 

65 if not pmid: 

66 return False 

67 else: 

68 pmid_vaidation_value = self.storage_manager.get_value(pmid) 

69 if isinstance(pmid_vaidation_value, bool): 

70 return pmid_vaidation_value 

71 else: 

72 if get_extra_info: 

73 info = self.exists(pmid, get_extra_info=True) 

74 self.storage_manager.set_full_value(pmid,info[1]) 

75 return (info[0] and self.syntax_ok(pmid)), info[1] 

76 validity_check = self.syntax_ok(pmid) and self.exists(pmid) 

77 self.storage_manager.set_value(pmid, validity_check) 

78 

79 return validity_check 

80 

81 

82 

83 def normalise(self, id_string, include_prefix=False): 

84 id_string = str(id_string) 

85 try: 

86 pmid_string = sub(r"^0+", "", sub(r"\0+", "", (sub(r"[^\d+]", "", id_string)))) 

87 return "%s%s" % (self._p if include_prefix else "", pmid_string) 

88 except: 

89 # Any error in processing the PMID will return None 

90 return None 

91 

92 def syntax_ok(self, id_string): 

93 if not id_string.startswith(self._p): 

94 id_string = self._p + id_string 

95 return True if match(r"^pmid:[1-9]\d*$", id_string) else False 

96 

97 def exists(self, pmid_full, get_extra_info=False, allow_extra_api=None): 

98 valid_bool = True 

99 pmid = pmid_full 

100 pmid_p = "pmid:"+pmid if not pmid.startswith("pmid:") else pmid 

101 if self._use_api_service: 

102 pmid = self.normalise(pmid_full) 

103 pmid_p = self.normalise(pmid_full, include_prefix=True) 

104 if pmid is not None: 

105 tentative = 3 

106 while tentative: 

107 tentative -= 1 

108 try: 

109 r = get( 

110 self._api + quote(pmid) + "/?format=pubmed", 

111 headers=self._headers, 

112 timeout=30, 

113 ) 

114 if r.status_code == 200: 

115 r.encoding = "utf-8" 

116 soup = BeautifulSoup(r.text, features="lxml") 

117 txt_obj = str(soup.find(id="article-details")) 

118 match_pmid = re.finditer(self._pmid_regex, txt_obj, re.MULTILINE) 

119 for matchNum_pmid, match_p in enumerate(match_pmid, start=1): 

120 m_pmid = match_p.group() 

121 if m_pmid: 

122 if get_extra_info: 

123 result = self.extra_info(txt_obj) 

124 result["id"] = pmid_p 

125 return True, result 

126 return True 

127 elif r.status_code == 404: 

128 if get_extra_info: 

129 return False, {"id":pmid_p, "valid": False} 

130 return False 

131 

132 except ReadTimeout: 

133 # Do nothing, just try again 

134 pass 

135 except ConnectionError: 

136 # Sleep 5 seconds, then try again 

137 sleep(5) 

138 valid_bool = False 

139 else: 

140 if get_extra_info: 

141 return False, {"id":pmid_p, "valid": False} 

142 return False 

143 if get_extra_info: 

144 return valid_bool, {"id":pmid_p, "valid": valid_bool} 

145 return valid_bool 

146 

147 def extra_info(self, api_response, choose_api=None, info_dict={}): 

148 result = {} 

149 result["valid"] = True 

150 

151 try: 

152 title = "" 

153 match_title = re.finditer(self._title_regex, api_response, re.MULTILINE) 

154 for matchNum_tit, match_tit in enumerate(match_title, start=1): 

155 m_title = match_tit.group() 

156 if m_title: 

157 ts = re.sub(r"\s+", " ", m_title) 

158 t = re.sub(r"\n", " ", ts) 

159 norm_title = t.strip() 

160 if norm_title is not None: 

161 title = norm_title 

162 break 

163 except: 

164 title = "" 

165 

166 result["title"] = title 

167 

168 try: 

169 authors = set() 

170 fa_aut = re.finditer(self._author_regex, api_response, re.MULTILINE) 

171 for matchNum_aut, match_au in enumerate(fa_aut, start=1): 

172 m_aut = match_au.group() 

173 if m_aut: 

174 fau = re.sub(r"\s+", " ", m_aut) 

175 nlfau = re.sub(r"\n", " ", fau) 

176 norm_fau = nlfau.strip() 

177 if norm_fau is not None: 

178 authors.add(norm_fau) 

179 authorsList = list(authors) 

180 except: 

181 authorsList = [] 

182 

183 result["author"] = authorsList 

184 

185 try: 

186 date = re.search(self._date_regex, 

187 api_response, 

188 re.IGNORECASE, 

189 ).group(1) 

190 re_search = re.search( 

191 r"(\d{4})\s+(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s+((3[0-1])|([1-2][0-9])|([0]?[1-9]))", 

192 date, 

193 re.IGNORECASE, 

194 ) 

195 if re_search is not None: 

196 src = re_search.group(0) 

197 datetime_object = datetime.strptime(src, "%Y %b %d") 

198 pmid_date = datetime.strftime(datetime_object, "%Y-%m-%d") 

199 else: 

200 re_search = re.search( 

201 r"(\d{4})\s+(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)", 

202 date, 

203 re.IGNORECASE, 

204 ) 

205 if re_search is not None: 

206 src = re_search.group(0) 

207 datetime_object = datetime.strptime(src, "%Y %b") 

208 pmid_date = datetime.strftime(datetime_object, "%Y-%m") 

209 else: 

210 re_search = re.search(r"(\d{4})", date) 

211 if re_search is not None: 

212 src = re.search(r"(\d{4})", date).group(0) 

213 datetime_object = datetime.strptime(src, "%Y") 

214 pmid_date = datetime.strftime(datetime_object, "%Y") 

215 else: 

216 pmid_date = "" 

217 except: 

218 pmid_date = "" 

219 result["pub_date"] = pmid_date 

220 

221 try: 

222 issnset = set() 

223 fa_issn = re.finditer(self._issn_regex, api_response, re.MULTILINE) 

224 for matchNum_issn, match_issn in enumerate(fa_issn, start=1): 

225 m_issn = match_issn.group() 

226 if m_issn: 

227 norm_issn = self._im.normalise(m_issn, include_prefix=True) 

228 if norm_issn is not None: 

229 issnset.add(norm_issn) 

230 issnlist = list(issnset) 

231 except: 

232 issnlist = [] 

233 

234 # CONTINUA DA QUI 

235 

236 try: 

237 jur_title = "" 

238 fa_jur_title = re.finditer(self._journal_regex, api_response, re.MULTILINE) 

239 for matchNum_title, match_tit in enumerate(fa_jur_title, start=1): 

240 m_title = match_tit.group() 

241 if m_title: 

242 s_jt = re.sub(r"\s+", " ", m_title) 

243 n_jt = re.sub(r"\n", " ", s_jt) 

244 norm_jour = n_jt.strip() 

245 if norm_jour is not None: 

246 jur_title = norm_jour 

247 break 

248 except: 

249 jur_title = "" 

250 

251 result["venue"] = ( 

252 f'{jur_title} {[x for x in issnlist]}' if jur_title else str(issnlist).replace(",", "")).replace("'", "") 

253 

254 try: 

255 volume = "" 

256 fa_volume = re.finditer(self._volume_regex, api_response, re.MULTILINE) 

257 for matchNum_volume, match_vol in enumerate(fa_volume, start=1): 

258 m_vol = match_vol.group() 

259 if m_vol: 

260 vol = re.sub(r"\s+", " ", m_vol) 

261 norm_volume = vol.strip() 

262 if norm_volume is not None: 

263 volume = norm_volume 

264 break 

265 except: 

266 volume = "" 

267 

268 result["volume"] = volume 

269 

270 try: 

271 issue = "" 

272 fa_issue = re.finditer(self._issue_regex, api_response, re.MULTILINE) 

273 for matchNum_issue, match_issue in enumerate(fa_issue, start=1): 

274 m_issue = match_issue.group() 

275 if m_issue: 

276 s_issue = re.sub(r"\s+", " ", m_issue) 

277 n_issue = re.sub(r"\n", " ", s_issue) 

278 norm_issue = n_issue.strip() 

279 if norm_issue is not None: 

280 issue = norm_issue 

281 break 

282 except: 

283 issue = "" 

284 

285 result["issue"] = issue 

286 

287 try: 

288 pag = "" 

289 fa_pag = re.finditer(self._page_regex, api_response, re.MULTILINE) 

290 for matchNum_pag, match_pag in enumerate(fa_pag, start=1): 

291 m_pag = match_pag.group() 

292 if m_pag: 

293 s_pg = re.sub(r"\s+", " ", m_pag) 

294 n_pg = re.sub(r"\n", " ", s_pg) 

295 norm_pag = n_pg.strip() 

296 if norm_pag is not None: 

297 pag = norm_pag 

298 break 

299 except: 

300 pag = "" 

301 

302 result["page"] = pag 

303 

304 try: 

305 pub_types = set() 

306 types = re.finditer(self._type_regex, api_response, re.MULTILINE) 

307 for matchNum_types, match_types in enumerate(types, start=1): 

308 m_type = match_types.group() 

309 if m_type: 

310 s_ty = re.sub(r"\s+", " ", m_type) 

311 b_ty = re.sub(r"\n", " ", s_ty) 

312 norm_type = b_ty.strip().lower() 

313 if norm_type is not None: 

314 pub_types.add(norm_type) 

315 typeslist = list(pub_types) 

316 except: 

317 typeslist = [] 

318 

319 result["type"] = typeslist 

320 

321 try: 

322 publisher = set() 

323 publishers = re.finditer(self._publisher_regex, api_response, re.MULTILINE) 

324 for matchNum_publishers, match_publishers in enumerate(publishers, start=1): 

325 m_publishers = match_publishers.group() 

326 if m_publishers: 

327 s_pbs = re.sub(r"\s+", " ", m_publishers) 

328 n_pbs = re.sub(r"\n", " ", s_pbs) 

329 norm_pbs = n_pbs.strip() 

330 if norm_pbs is not None: 

331 publisher.add(norm_pbs) 

332 publisherlist = list(publisher) 

333 except: 

334 publisherlist = [] 

335 

336 result["publisher"] = publisherlist 

337 

338 try: 

339 editor = set() 

340 editors = re.finditer(self._editor_regex, api_response, re.MULTILINE) 

341 for matchNum_editors, match_editors in enumerate(editors, start=1): 

342 m_editors = match_editors.group() 

343 if m_editors: 

344 s_ed = re.sub(r"\s+", " ", m_editors) 

345 n_ed = re.sub(r"\n", " ", s_ed) 

346 norm_ed = n_ed.strip() 

347 if norm_ed is not None: 

348 editor.add(norm_ed) 

349 editorlist = list(editor) 

350 except: 

351 editorlist = [] 

352 

353 result["editor"] = editorlist 

354 

355 doi = "" 

356 try: 

357 map_doi = re.finditer(self._doi_regex, api_response, re.MULTILINE) 

358 for matchNum_doi, match_doi in enumerate(map_doi, start=1): 

359 m_doi = match_doi.group() 

360 if m_doi: 

361 id = re.sub(r"\s+", " ", m_doi) 

362 n_id = re.sub(r"\n", " ", id) 

363 n_id_strip = n_id.strip() 

364 

365 if n_id_strip.endswith('[doi]'): 

366 n_id_strip = n_id_strip[:-5] 

367 dm = DOIManager() 

368 norm_id = dm.normalise(n_id_strip) 

369 if norm_id is not None: 

370 doi = norm_id 

371 break 

372 else: 

373 doi = "" 

374 except: 

375 doi = "" 

376 

377 result["doi"] = doi 

378 

379 return result