Coverage for oc_ds_converter / pubmed / pubmed_processing.py: 72%

496 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-25 18:06 +0000

1# SPDX-FileCopyrightText: 2023-2024 Arianna Moretti <arianna.moretti4@unibo.it> 

2# SPDX-FileCopyrightText: 2023-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

3# 

4# SPDX-License-Identifier: ISC 

5 

6import html 

7import json 

8import os 

9import pathlib 

10import re 

11import warnings 

12from os.path import exists 

13from typing import List, Tuple 

14 

15from bs4 import BeautifulSoup 

16 

17from oc_ds_converter.datasource.redis import FakeRedisWrapper, RedisDataSource 

18from oc_ds_converter.lib.cleaner import Cleaner 

19from oc_ds_converter.oc_idmanager.doi import DOIManager 

20from oc_ds_converter.oc_idmanager.orcid import ORCIDManager 

21from oc_ds_converter.oc_idmanager.pmid import PMIDManager 

22from oc_ds_converter.pubmed.finder_nih import NIHResourceFinder 

23from oc_ds_converter.pubmed.get_publishers import ExtractPublisherDOI 

24from oc_ds_converter.ra_processor import RaProcessor 

25 

26warnings.filterwarnings("ignore", category=UserWarning, module='bs4') 

27 

28 

29class PubmedProcessing(RaProcessor): 

30 def __init__(self, orcid_index: str | None = None, publishers_filepath_pubmed: str | None = None, journals_filepath: str | None = None, testing: bool = True, exclude_existing: bool = False): 

31 super().__init__(orcid_index) 

32 self.exclude_existing = exclude_existing 

33 self.nihrf = NIHResourceFinder() 

34 self.doi_m = DOIManager() 

35 self.pmid_m = PMIDManager() 

36 if testing: 

37 self.BR_redis = FakeRedisWrapper() 

38 self.RA_redis = FakeRedisWrapper() 

39 else: 

40 self.BR_redis = RedisDataSource("DB-META-BR") 

41 self.RA_redis = RedisDataSource("DB-META-RA") 

42 

43 if not journals_filepath: 

44 if not exists(os.path.join(pathlib.Path(__file__).parent.resolve(), "support_files")): 

45 os.makedirs(os.path.join(pathlib.Path(__file__).parent.resolve(), "support_files")) 

46 self.journals_filepath = os.path.join(pathlib.Path(__file__).parent.resolve(), "support_files", 

47 "issn_jour_ext.json") 

48 else: 

49 self.journals_filepath = journals_filepath 

50 

51 self.jour_dict = self.issn_data_recover_poci(self.journals_filepath) 

52 

53 

54 if not publishers_filepath_pubmed: 

55 if not exists(os.path.join(pathlib.Path(__file__).parent.resolve(), "support_files")): 

56 os.makedirs(os.path.join(pathlib.Path(__file__).parent.resolve(), "support_files")) 

57 self.publishers_filepath = os.path.join(pathlib.Path(__file__).parent.resolve(), "support_files", 

58 "prefix_publishers.json") 

59 else: 

60 self.publishers_filepath = publishers_filepath_pubmed 

61 

62 self.jour_dict = self.issn_data_recover_poci(self.journals_filepath) 

63 

64 if os.path.exists(self.publishers_filepath): 

65 with open(self.publishers_filepath, "r", encoding="utf8") as fdp: 

66 pfp = json.load(fdp) 

67 if pfp: 

68 self.publisher_manager = ExtractPublisherDOI(pfp) 

69 else: 

70 self.publisher_manager = ExtractPublisherDOI({}) 

71 else: 

72 self.publisher_manager = ExtractPublisherDOI({}) 

73 with open(self.publishers_filepath, "w", encoding="utf8") as fdp: 

74 json.dump({}, fdp, ensure_ascii=False, indent=4) 

75 

76 

77 def issn_data_recover_poci(self, path): 

78 journal_issn_dict = dict() 

79 if not path: 

80 return journal_issn_dict 

81 if not os.path.exists(path): 

82 return journal_issn_dict 

83 else: 

84 with open(path, "r", encoding="utf8") as fd: 

85 journal_issn_dict = json.load(fd) 

86 return journal_issn_dict 

87 

88 

89 def issn_data_to_cache_poci(self, jtitle_issn_dict, path): 

90 with open(path, "w", encoding="utf-8") as fd: 

91 json.dump(jtitle_issn_dict, fd, ensure_ascii=False, indent=4) 

92 

93 def prefix_to_publisher_to_cache(self, pref_pub_dict, path): 

94 with open(path, "w", encoding="utf-8") as fd: 

95 json.dump(pref_pub_dict, fd, ensure_ascii=False, indent=4) 

96 

97 def csv_creator(self, item: dict) -> dict: 

98 row = dict() 

99 doi = "" 

100 pmid = self.pmid_m.normalise(str(item['pmid'])) 

101 if pmid: 

102 # create empty row 

103 keys = ['id', 'title', 'author', 'pub_date', 'venue', 'volume', 'issue', 'page', 'type', 

104 'publisher', 'editor'] 

105 for k in keys: 

106 row[k] = '' 

107 

108 attributes = item 

109 

110 # row['type'] 

111 row['type'] = 'journal article' 

112 

113 # row['id'] 

114 ids_list = list() 

115 ids_list.append(str('pmid:' + pmid)) 

116 if attributes.get('doi'): 

117 doi = DOIManager().normalise(attributes.get('doi'), include_prefix=False) 

118 if doi: 

119 doi_w_pref = "doi:"+doi 

120 if self.BR_redis.exists_as_set(doi_w_pref): 

121 ids_list.append(doi_w_pref) 

122 elif self.doi_m.is_valid(doi): 

123 ids_list.append(doi_w_pref) 

124 else: 

125 doi = '' 

126 

127 

128 row['id'] = ' '.join(ids_list) 

129 

130 # row['title'] 

131 pub_title = "" 

132 if attributes.get("title"): 

133 p_title = attributes.get("title") 

134 soup = BeautifulSoup(p_title, 'html.parser') 

135 title_soup = soup.get_text().replace('\n', '') 

136 title_soup_space_replaced = ' '.join(title_soup.split()) 

137 title_soup_strip = title_soup_space_replaced.strip() 

138 clean_tit = html.unescape(title_soup_strip) 

139 pub_title = clean_tit if clean_tit else p_title 

140 

141 row['title'] = pub_title 

142 

143 agents_list = self.add_authors_to_agent_list(attributes, []) 

144 authors_strings_list, editors_string_list = self.get_agents_strings_list(doi, agents_list) 

145 

146 # row['author'] 

147 if attributes.get('authors'): 

148 row['author'] = '; '.join(authors_strings_list) 

149 

150 # row['pub_date'] 

151 dates = attributes.get("year") 

152 row['pub_date'] = str(dates) if dates else "" 

153 

154 # row['venue'] 

155 row['venue'] = self.get_venue_name(attributes, pmid) 

156 

157 # row['volume'] 

158 row['volume'] = "" 

159 

160 # row['issue'] 

161 row['issue'] = "" 

162 

163 # row['page'] 

164 row['page'] = "" #self.get_pubmed_pages(attributes) 

165 

166 # row['publisher'] 

167 if doi: 

168 try: 

169 row['publisher'] = self.get_publisher_name(doi) 

170 except IndexError: 

171 print(doi, type(doi), row, item) 

172 raise(IndexError) 

173 else: 

174 row['publisher'] = "" 

175 

176 # row['editor'] 

177 row['editor'] = "" 

178 

179 try: 

180 return self.normalise_unicode(row) 

181 except TypeError: 

182 print(row) 

183 raise(TypeError) 

184 

185 def get_pubmed_pages(self, item: dict) -> str: 

186 ''' 

187 This function returns the pages interval. 

188 

189 :params item: the item's dictionary 

190 :type item: dict 

191 :returns: str -- The output is a string in the format 'START-END', for example, '583-584'. If there are no pages, the output is an empty string. 

192 ''' 

193 page_list = [] 

194 ''' NO INFO IN DUMP: to be updated with API DATA''' 

195 return self.get_pages(page_list) 

196 

197 def get_publisher_name(self, doi: str) -> str: 

198 ''' 

199 This function aims to return a publisher's name and id. If a mapping was provided, 

200 it is used to find the publisher's standardized name from its id or DOI prefix. 

201 

202 :params doi: the item's DOI 

203 :type doi: str 

204 

205 :returns: str -- The output is a string in the format 'NAME [SCHEMA:ID]', for example, 

206 'American Medical Association (AMA) [crossref:10]'. If the id does not exist, the output 

207 is only the name. Finally, if there is no publisher, the output is an empty string. 

208 ''' 

209 

210 publisher_name = self.publisher_manager.extract_publishers_v(doi) 

211 if publisher_name and publisher_name != "unidentified": 

212 return publisher_name 

213 else: 

214 return "" 

215 

216 def save_updated_pref_publishers_map(self): 

217 upd_dict = self.publisher_manager.get_last_map_ver() 

218 self.prefix_to_publisher_to_cache(upd_dict, self.publishers_filepath) 

219 

220 

221 def get_venue_name(self, item: dict, id: str) -> str: 

222 ''' 

223 This method deals with generating the venue's name, followed by id in square brackets, separated by spaces. 

224 HTML tags are deleted and HTML entities escaped. In addition, any ISBN and ISSN are validated. 

225 Finally, the square brackets in the venue name are replaced by round brackets to avoid conflicts with the ids enclosures. 

226 

227 :params item: the item's dictionary 

228 :type item: dict 

229 :params row: a CSV row 

230 :type row: dict 

231 :returns: str -- The output is a string in the format 'NAME [SCHEMA:ID]', for example, 'Nutrition & Food Science 

232 [issn:0034-6659]'. If the id does not exist, the output is only the name. Finally, if there is no venue, 

233 the output is an empty string. 

234 ''' 

235 

236 short_n = item.get('journal') if item.get('journal') else "" 

237 venids_list = [] 

238 cont_title = short_n 

239 if short_n: 

240 if short_n not in self.jour_dict.keys(): 

241 self.jour_dict[short_n] = {"extended": "", "issn": []} 

242 if not self.jour_dict[short_n].get("extended") or not self.jour_dict[short_n].get("issn"): 

243 if id: 

244 api_response = self.nihrf._call_api(id) 

245 if api_response: 

246 if not self.jour_dict[short_n].get("extended"): 

247 self.jour_dict[short_n]["extended"] = self.nihrf._get_extended_j_title(api_response) 

248 if not self.jour_dict[short_n].get("issn"): 

249 issn_dict_list_valid = [x for x in self.nihrf._get_issn(api_response) if x] 

250 self.jour_dict[short_n]["issn"] = issn_dict_list_valid 

251 self.issn_data_to_cache_poci(self.jour_dict, self.journals_filepath) 

252 

253 if short_n in self.jour_dict.keys(): 

254 jt_data = self.jour_dict.get(short_n) 

255 if jt_data.get("issn"): 

256 venids_list = [x for x in jt_data.get("issn") if x.startswith("issn:")] 

257 venids_list_integration = ["issn:"+x for x in jt_data.get("issn") if not x.startswith("issn:")] 

258 venids_list.extend(venids_list_integration) 

259 extended_jt = jt_data.get("extended") if jt_data.get("extended") else short_n 

260 cont_title = extended_jt 

261 

262 # use abbreviated journal title if no mapping was provided 

263 cont_title = cont_title.replace('\n', '') 

264 ven_soup = BeautifulSoup(cont_title, 'html.parser') 

265 ventit = html.unescape(ven_soup.get_text()) 

266 ambiguous_brackets = re.search('\[\s*((?:[^\s]+:[^\s]+)?(?:\s+[^\s]+:[^\s]+)*)\s*\]', ventit) 

267 if ambiguous_brackets: 

268 match = ambiguous_brackets.group(1) 

269 open_bracket = ventit.find(match) - 1 

270 close_bracket = ventit.find(match) + len(match) 

271 ventit = ventit[:open_bracket] + '(' + ventit[open_bracket + 1:] 

272 ventit = ventit[:close_bracket] + ')' + ventit[close_bracket + 1:] 

273 cont_title = ventit 

274 

275 # IDS 

276 if venids_list: 

277 name_and_id = cont_title + ' [' + ' '.join(venids_list) + ']' if cont_title else '[' + ' '.join(venids_list) + ']' 

278 else: 

279 name_and_id = cont_title 

280 

281 return name_and_id 

282 

283 def add_authors_to_agent_list(self, item: dict, ag_list: list) -> list: 

284 ''' 

285 This function returns the the agents list updated with the authors dictionaries, in the correct format. 

286 

287 :params item: the item's dictionary (attributes), ag_list: the agent list 

288 :type item: dict, ag_list: list 

289 

290 :returns: listthe agents list updated with the authors dictionaries, in the correct format. 

291 ''' 

292 agent_list = ag_list 

293 if item.get("authors"): 

294 authors_string = str(item.get("authors")).strip() 

295 authors_split_list = [a.strip() for a in authors_string.split(",") if a] 

296 for author in authors_split_list: 

297 

298 agent = {} 

299 agent["role"] = "author" 

300 agent["name"] = author 

301 missing_names = [x for x in ["family", "given", "name"] if x not in agent] 

302 for mn in missing_names: 

303 agent[mn] = "" 

304 agent_list.append(agent) 

305 return agent_list 

306 

307 def find_homonyms(self, lst): 

308 homonyms_dict = dict() 

309 multi_space = re.compile(r"\s+") 

310 extend_pattern = r"[a-zA-Z'\-áéíóúäëïöüÄłŁőŐűŰZàáâäãåąčćęèéêëėįìíîïłńòóôöõøùúûüųūÿýżźñçčšžÀÁÂÄÃÅĄĆČĖĘÈÉÊËÌÍÎÏĮŁŃÒÓÔÖÕØÙÚÛÜŲŪŸÝŻŹÑßÇŒÆČŠŽñÑâê]{2,}(?:\s|$)" 

311 for d in lst: 

312 if d.get('name'): 

313 name = d.get('name') 

314 author = name.replace(".", " ") 

315 author = multi_space.sub(" ", author).strip() 

316 re_extended = re.findall(extend_pattern, author) 

317 extended = [(s.strip()).lower() for s in re_extended] 

318 d_hom_set = set() 

319 for i in extended: 

320 dicts_to_check = [dct for dct in lst if dct.get('name') and dct != d] 

321 homonyms = [dct.get('name') for dct in dicts_to_check if 

322 i in [(s.strip()).lower() for s in re.findall(extend_pattern, dct.get('name'))]] 

323 for n in homonyms: 

324 d_hom_set.add(n) 

325 if d_hom_set: 

326 homonyms_dict[d.get('name')] = list(d_hom_set) 

327 

328 return homonyms_dict 

329 

330 def get_agents_strings_list(self, doi: str, agents_list: List[dict]) -> Tuple[list, list]: 

331 homonyms_dict = self.find_homonyms(agents_list) 

332 hom_w_orcid = set() 

333 authors_strings_list = list() 

334 editors_string_list = list() 

335 dict_orcid = None 

336 multi_space = re.compile(r"\s+") 

337 inits_pattern = r"([A-Z]|[ÄŐŰÀÁÂÄÃÅĄĆČĖĘÈÉÊËÌÍÎÏĮŁŃÒÓÔÖÕØÙÚÛÜŲŪŸÝŻŹÑßÇŒÆČŠŽÑ]){1}(?:\s|$)" 

338 extend_pattern = r"[a-zA-Z'\-áéíóúäëïöüÄłŁőŐűŰZàáâäãåąčćęèéêëėįìíîïłńòóôöõøùúûüųūÿýżźñçčšžÀÁÂÄÃÅĄĆČĖĘÈÉÊËÌÍÎÏĮŁŃÒÓÔÖÕØÙÚÛÜŲŪŸÝŻŹÑßÇŒÆČŠŽñÑâê]{2,}(?:\s|$)" 

339 

340 if not all('orcid' in agent or 'ORCID' in agent for agent in agents_list) and doi: 

341 dict_orcid = self.orcid_finder(doi) 

342 agents_list = [ 

343 {k: Cleaner(v).remove_unwanted_characters() if k in {'family', 'given', 'name'} and v is not None 

344 else v for k, v in 

345 agent_dict.items()} for agent_dict in agents_list] 

346 for agent in agents_list: 

347 cur_role = agent['role'] 

348 f_name = None 

349 g_name = None 

350 name = None 

351 agent_string = None 

352 if agent.get('family') and agent.get('given'): 

353 f_name = agent['family'] 

354 g_name = agent['given'] 

355 agent_string = f_name + ', ' + g_name 

356 elif agent.get('name'): 

357 name = agent['name'] 

358 f_name = name.split(",")[0].strip() if "," in name else None 

359 g_name = name.split(",")[-1].strip() if "," in name else None 

360 

361 if f_name and g_name: 

362 agent_string = f_name + ', ' + g_name 

363 

364 

365 if agent_string is None: 

366 if agent.get('family') and not agent.get('given'): 

367 if g_name: 

368 agent_string = agent['family'] + ', ' + g_name 

369 else: 

370 agent_string = agent['family'] + ', ' 

371 elif agent.get('given') and not agent.get('family'): 

372 if f_name: 

373 agent_string = f_name + ', ' + agent['given'] 

374 else: 

375 agent_string = ', ' + agent['given'] 

376 elif agent.get('name'): 

377 agent_string = agent.get('name') 

378 

379 orcid = None 

380 if 'orcid' in agent: 

381 if isinstance(agent['orcid'], list): 

382 orcid = str(agent['orcid'][0]) 

383 else: 

384 orcid = str(agent['orcid']) 

385 elif 'ORCID' in agent: 

386 if isinstance(agent['ORCID'], list): 

387 orcid = str(agent['ORCID'][0]) 

388 else: 

389 orcid = str(agent['ORCID']) 

390 if orcid: 

391 orcid_manager = ORCIDManager(data=dict(), use_api_service=False) 

392 orcid = orcid_manager.normalise(orcid, include_prefix=False) 

393 orcid = orcid if orcid_manager.check_digit(orcid) else None 

394 

395 elif dict_orcid and f_name: 

396 for ori in dict_orcid: 

397 orc_n: List[str] = dict_orcid[ori].split(', ') 

398 orc_f = orc_n[0].lower() 

399 orc_g = orc_n[1] if len(orc_n) == 2 else None 

400 if f_name.lower() in orc_f.lower() or orc_f.lower() in f_name.lower(): 

401 if g_name and orc_g: 

402 # If there are several authors with the same surname 

403 if len([person for person in agents_list if 'family' in person if person['family'] if 

404 person['family'].lower() in orc_f.lower() or orc_f.lower() in person[ 

405 'family'].lower()]) > 1: 

406 # If there are several authors with the same surname and the same given names' initials 

407 if len([person for person in agents_list if 'given' in person if person['given'] if 

408 person['given'][0].lower() == orc_g[0].lower()]) > 1: 

409 homonyms_list = [person for person in agents_list if 'given' in person if 

410 person['given'] if person['given'].lower() == orc_g.lower()] 

411 # If there are homonyms 

412 if len(homonyms_list) > 1: 

413 # If such homonyms have different roles from the current role 

414 if [person for person in homonyms_list if person['role'] != cur_role]: 

415 if orc_g.lower() == g_name.lower(): 

416 orcid = ori 

417 else: 

418 if orc_g.lower() == g_name.lower(): 

419 orcid = ori 

420 elif orc_g[0].lower() == g_name[0].lower(): 

421 orcid = ori 

422 # If there is a person whose given name is equal to the family name of the current person (a common situation for cjk names) 

423 elif any([person for person in agents_list if 'given' in person if person['given'] if 

424 person['given'].lower() == f_name.lower()]): 

425 if orc_g.lower() == g_name.lower(): 

426 orcid = ori 

427 else: 

428 orcid = ori 

429 else: 

430 orcid = ori 

431 

432 # If the ra name can't be clearly divided in given name and surname 

433 elif dict_orcid and name: 

434 for ori in dict_orcid: 

435 try_match = True 

436 if name in homonyms_dict.keys(): 

437 get_best_affinity = self.compute_affinity(dict_orcid[ori], homonyms_dict.keys()) 

438 if name != get_best_affinity: 

439 try_match = False 

440 if try_match: 

441 orc_n: List[str] = dict_orcid[ori].split(', ') 

442 orc_f = orc_n[0].lower() 

443 orc_g = orc_n[1] if len(orc_n) == 2 else None 

444 

445 author = name.replace(".", " ") 

446 author = multi_space.sub(" ", author).strip() 

447 re_inits = re.findall(inits_pattern, author) 

448 re_extended = re.findall(extend_pattern, author) 

449 initials = [(x.strip()).lower() for x in re_inits] 

450 extended = [(s.strip()).lower() for s in re_extended] 

451 author_dict = {"initials": initials, "extended": extended} 

452 

453 surname_match = True if [x for x in author_dict["extended"] if x in orc_f.split()] else False 

454 if orc_g: 

455 name_match_all = True if [x for x in author_dict["extended"] if x in orc_g.split()] else False 

456 name_match_init = True if [x for x in author_dict["initials"] if any( 

457 element.startswith(x) and element not in author_dict["extended"] for 

458 element in orc_g.split())] else False 

459 else: 

460 name_match_all = False 

461 name_match_init = False 

462 matches = (surname_match and (name_match_all or name_match_init)) 

463 

464 if matches: 

465 # managing cases in which a name string was already retrieved but the one 

466 # provided by the mapping is better 

467 f_name = orc_f 

468 if not g_name: 

469 g_name = orc_g 

470 elif g_name: 

471 if len(g_name.strip()) < len(orc_g.strip()): 

472 g_name = orc_g 

473 orcid = ori 

474 

475 if agent_string is None: 

476 if f_name and g_name: 

477 agent_string = f_name + ', ' + g_name 

478 elif f_name and not g_name: 

479 agent_string = f_name + ', ' 

480 elif g_name and not f_name: 

481 agent_string = ', ' + g_name 

482 elif agent_string == agent.get('name') and f_name and g_name: 

483 agent_string = f_name + ', ' + g_name 

484 

485 

486 if agent_string and orcid: 

487 agent_string = self.uppercase_initials(agent_string) 

488 if agent_string not in hom_w_orcid: 

489 hom_w_orcid.add(agent_string) 

490 agent_string += ' [' + 'orcid:' + str(orcid) + ']' 

491 

492 if agent_string: 

493 agent_string = self.uppercase_initials(agent_string) 

494 

495 if agent['role'] == 'author': 

496 authors_strings_list.append(agent_string) 

497 elif agent['role'] == 'editor': 

498 editors_string_list.append(agent_string) 

499 

500 return authors_strings_list, editors_string_list 

501 

502 

503 def compute_affinity(self, s, lst): 

504 s = s.replace(r"\s+", " ") 

505 s = s.replace(r"\n+", " ") 

506 name = s.lower() 

507 agent = name.replace(".", " ") 

508 agent = agent.replace(",", " ") 

509 agent = agent.strip() 

510 agent_name_parts = agent.split() 

511 extended = [x for x in agent_name_parts if len(x) > 1] 

512 initials = [x for x in agent_name_parts if len(x) == 1] 

513 

514 target_agent_dict = {"initials": initials, "extended": extended} 

515 

516 report_dicts = {} 

517 for ag in lst: 

518 name = ag.lower() 

519 name = name.replace(r"\s+", " ") 

520 name = name.replace(r"\n+", " ") 

521 agent = name.replace(".", " ") 

522 agent = agent.strip() 

523 agent_name_parts = agent.split() 

524 ag_extended = [x for x in agent_name_parts if len(x) > 1] 

525 ag_initials = [x for x in agent_name_parts if len(x) == 1] 

526 

527 copy_ext_target = [x for x in extended] 

528 copy_init_target = [x for x in initials] 

529 copy_ag_ext = [x for x in ag_extended] 

530 copy_ag_init = [x for x in ag_initials] 

531 

532 ext_matched = 0 

533 init_matched = 0 

534 

535 for i in ag_extended: 

536 if i in copy_ext_target: 

537 ext_matched += 1 

538 copy_ext_target.remove(i) 

539 copy_ag_ext.remove(i) 

540 

541 for ii in ag_initials: 

542 if ii in copy_init_target: 

543 init_matched += 1 

544 copy_init_target.remove(ii) 

545 copy_ag_init.remove(ii) 

546 

547 # check the remaining unpaired 

548 # check first if the extra initials in the ra name can be paired with the remaining extended names 

549 init_compatible = 0 

550 

551 if copy_ag_init and copy_ext_target: 

552 remaining_ag_initials = [x for x in copy_ag_init] 

553 remaining_tar_extended = [x for x in copy_ext_target] 

554 

555 for ri in remaining_ag_initials: 

556 if ri in copy_ag_init: 

557 for re in remaining_tar_extended: 

558 if re in copy_ext_target: 

559 if re.startswith(ri): 

560 copy_ag_init.remove(ri) 

561 copy_ext_target.remove(re) 

562 init_compatible += 1 

563 break 

564 

565 # check if the remaining initials of the target name are compatible with the remaining extended names of the ra 

566 ext_compatible = 0 

567 

568 if copy_ag_ext and copy_init_target: 

569 remaining_tar_initials = [x for x in copy_init_target] 

570 remaining_ag_extended = [x for x in copy_ag_ext] 

571 

572 for ri in remaining_tar_initials: 

573 if ri in copy_init_target: 

574 for re in remaining_ag_extended: 

575 if re in copy_ag_ext: 

576 if re.startswith(ri): 

577 copy_ag_ext.remove(re) 

578 copy_init_target.remove(ri) 

579 ext_compatible += 1 

580 break 

581 ext_not_compatible = len(copy_ag_ext) 

582 init_not_compatible = len(copy_ag_init) 

583 

584 cur_agent_dict = { 

585 "ext_matched": ext_matched, 

586 "init_matched": init_matched, 

587 "ext_compatible": ext_compatible, 

588 "init_compatible": init_compatible, 

589 "ext_not_compatible": ext_not_compatible, 

590 "init_not_compatible": init_not_compatible, 

591 } 

592 

593 report_dicts[ag] = cur_agent_dict 

594 best_match_name = self.get_best_match(target_agent_dict, report_dicts) 

595 return best_match_name 

596 

597 

598 

599 def add_editors_to_agent_list(self, item: dict, ag_list: list) -> list: 

600 ''' 

601 This function returns the the agents list updated with the editors dictionaries, in the correct format. 

602 

603 :params item: the item's dictionary (attributes), ag_list: the agent list 

604 :type item: dict, ag_list: list 

605 

606 :returns: listthe agents list updated with the authors dictionaries, in the correct format. 

607 ''' 

608 

609 agent_list = ag_list 

610 

611 ''' NO INFO IN DUMP: to be updated with API DATA''' 

612 return agent_list 

613 

614 def get_citing_pmid(self, meta_dict:dict) -> str: 

615 citing_pmid = "" 

616 id_string = meta_dict.get("id") 

617 if id_string: 

618 id_list = id_string.split() 

619 pmid_list = [x for x in id_list if x.startswith("pmid:")] 

620 if len(pmid_list) == 1: 

621 citing_pmid = pmid_list[0] # we expect only one pmid for each entity 

622 return citing_pmid 

623 

624 def get_citations(self, validated_pmid, item:dict) -> list: 

625 addressed_citations = set() 

626 

627 citing = validated_pmid 

628 if not citing.startswith("pmid:"): 

629 try: 

630 int_pmid = int(citing) 

631 citing = "pmid:" + str(int_pmid) 

632 except ValueError: 

633 return [] 

634 

635 references_string = item.get("references") 

636 cited_ids = references_string.split() 

637 

638 for cited_id in cited_ids: 

639 try: 

640 id_n = int(cited_id) 

641 

642 if id_n: 

643 norm_cited = self.pmid_m.normalise(str(id_n), include_prefix=True) 

644 

645 if norm_cited: 

646 addressed_citations.add((citing, norm_cited)) 

647 except ValueError: 

648 pass 

649 

650 addressed_citations_list = list(addressed_citations) 

651 

652 return addressed_citations_list 

653 

654 

655 def get_best_match(self, target_agent_dict, report_dicts): 

656 if max([v.get("ext_matched") for k,v in report_dicts.items()]) == 0: 

657 return "" 

658 elif max([v.get("ext_matched") for k,v in report_dicts.items()]) == 1: 

659 min_comp_dict = {k:v for k,v in report_dicts.items() if v.get("ext_matched") ==1 and ( 

660 (v.get("init_matched") >= 1 or v.get("ext_compatible")>=1 or v.get("init_compatible")>=1) 

661 and 

662 (v.get("ext_not_compatible")<= 1 and v.get("init_not_compatible")<= 1) 

663 )} 

664 if not min_comp_dict: 

665 return "" 

666 

667 

668 

669 len_target_init = len(target_agent_dict["initials"]) 

670 len_target_ext = len(target_agent_dict["extended"]) 

671 if len_target_init + len_target_ext >= 1: 

672 

673 # Case 1: There is a perfect match with no exceedings: return it 

674 complete_matches = {k: v for k, v in report_dicts.items() if 

675 v["ext_matched"] == len_target_ext and v["init_matched"] == len_target_init and v[ 

676 "init_not_compatible"] == 0 and v["ext_not_compatible"] == 0} 

677 if complete_matches: 

678 for k in complete_matches.keys(): 

679 return k 

680 # Case 2: There is a complete match with all the extended names and the initials of the target are compatible 

681 match_all_extended_comp_ext = {k: v for k, v in report_dicts.items() if v["ext_matched"] == len_target_ext and ( 

682 v["init_matched"] + v["ext_compatible"] == len_target_init) and v["init_not_compatible"] == 0 and v[ 

683 "ext_not_compatible"] == 0} 

684 if match_all_extended_comp_ext: 

685 if len(match_all_extended_comp_ext) == 1: 

686 for k in match_all_extended_comp_ext.keys(): 

687 return k 

688 else: 

689 return [k for k, v in match_all_extended_comp_ext.items() if 

690 v["init_matched"] == max([v["init_matched"] for v in match_all_extended_comp_ext.values()])][0] 

691 

692 # Case 3: Get max extended names match + compatible extended/initials 

693 max_comp_exc_ext = max([v["ext_matched"] for v in report_dicts.values()]) 

694 match_max_extended_comp_init = {k: v for k, v in report_dicts.items() if 

695 v["ext_matched"] == max_comp_exc_ext and ( 

696 v["ext_matched"] + v["init_compatible"] == len_target_ext) and ( 

697 v["init_matched"] + v["ext_compatible"] == len_target_init) and v[ 

698 "init_not_compatible"] == 0 and v["ext_not_compatible"] == 0} 

699 if match_max_extended_comp_init: 

700 if len(match_max_extended_comp_init) == 1: 

701 for k in match_max_extended_comp_init.keys(): 

702 return k 

703 else: 

704 return [k for k, v in match_max_extended_comp_init.items() if 

705 v["init_matched"] == max([v["init_matched"] for v in match_max_extended_comp_init.values()])][0] 

706 

707 # Case 4 (suboptimal cases), get best compatibility 

708 scores_dict = dict() 

709 

710 for k, v in report_dicts.items(): 

711 score = 0 

712 

713 p_match_ext = 0 

714 if len_target_ext: 

715 p_match_ext = v["ext_matched"] / len_target_ext 

716 if p_match_ext < 1: 

717 if v["init_compatible"]: 

718 p_match_ext = (v["init_compatible"] * 0.2 + v["ext_matched"]) / len_target_ext 

719 

720 p_match_init = 0 

721 if len_target_init: 

722 p_match_init = v["init_matched"] / len_target_init 

723 if p_match_init < 1: 

724 if v["ext_compatible"]: 

725 p_match_init = (v["ext_compatible"] * 0.7 + v["init_matched"]) / len_target_init 

726 

727 total_len_name_parts_target = len_target_ext + len_target_init 

728 if v["ext_not_compatible"]: 

729 p_inc_ext = v["ext_not_compatible"] * 0.7 / total_len_name_parts_target 

730 else: 

731 p_inc_ext = 0 

732 if v["init_not_compatible"]: 

733 p_inc_init = v["init_not_compatible"] * 0.2 / total_len_name_parts_target 

734 else: 

735 p_inc_init = 0 

736 score = p_match_ext + p_match_init - p_inc_init - p_inc_ext 

737 scores_dict[k] = score 

738 result = [k for k, v in scores_dict.items() if v == max(scores_dict.values())] 

739 if len(result) == 1: 

740 return result[0] 

741 else: 

742 return "" 

743 return ""