Coverage for oc_ds_converter / pubmed / pubmed_processing.py: 74%

496 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-06-12 21:23 +0000

1# SPDX-FileCopyrightText: 2023-2024 Arianna Moretti <arianna.moretti4@unibo.it> 

2# SPDX-FileCopyrightText: 2023-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

3# 

4# SPDX-License-Identifier: ISC 

5 

6import html 

7import json 

8import os 

9import pathlib 

10import re 

11import warnings 

12from os.path import exists 

13from typing import List, Tuple 

14 

15from bs4 import BeautifulSoup 

16 

17from oc_ds_converter.datasource.redis import FakeRedisWrapper, RedisDataSource 

18from oc_ds_converter.lib.cleaner import Cleaner 

19from oc_ds_converter.oc_idmanager.doi import DOIManager 

20from oc_ds_converter.oc_idmanager.orcid import ORCIDManager 

21from oc_ds_converter.oc_idmanager.pmid import PMIDManager 

22from oc_ds_converter.pubmed.finder_nih import NIHResourceFinder 

23from oc_ds_converter.pubmed.get_publishers import ExtractPublisherDOI 

24from oc_ds_converter.ra_processor import RaProcessor, families_match 

25 

26warnings.filterwarnings("ignore", category=UserWarning, module='bs4') 

27 

28 

29class PubmedProcessing(RaProcessor): 

30 def __init__(self, orcid_index: str | None = None, publishers_filepath_pubmed: str | None = None, journals_filepath: str | None = None, testing: bool = True, exclude_existing: bool = False): 

31 super().__init__(orcid_index) 

32 self.exclude_existing = exclude_existing 

33 self.nihrf = NIHResourceFinder() 

34 self.doi_m = DOIManager() 

35 self.pmid_m = PMIDManager() 

36 if testing: 

37 self.BR_redis = FakeRedisWrapper() 

38 self.RA_redis = FakeRedisWrapper() 

39 else: 

40 self.BR_redis = RedisDataSource("DB-META-BR") 

41 self.RA_redis = RedisDataSource("DB-META-RA") 

42 

43 if not journals_filepath: 

44 if not exists(os.path.join(pathlib.Path(__file__).parent.resolve(), "support_files")): 

45 os.makedirs(os.path.join(pathlib.Path(__file__).parent.resolve(), "support_files")) 

46 self.journals_filepath = os.path.join(pathlib.Path(__file__).parent.resolve(), "support_files", 

47 "issn_jour_ext.json") 

48 else: 

49 self.journals_filepath = journals_filepath 

50 

51 self.jour_dict = self.issn_data_recover_poci(self.journals_filepath) 

52 

53 

54 if not publishers_filepath_pubmed: 

55 if not exists(os.path.join(pathlib.Path(__file__).parent.resolve(), "support_files")): 

56 os.makedirs(os.path.join(pathlib.Path(__file__).parent.resolve(), "support_files")) 

57 self.publishers_filepath = os.path.join(pathlib.Path(__file__).parent.resolve(), "support_files", 

58 "prefix_publishers.json") 

59 else: 

60 self.publishers_filepath = publishers_filepath_pubmed 

61 

62 self.jour_dict = self.issn_data_recover_poci(self.journals_filepath) 

63 

64 if os.path.exists(self.publishers_filepath): 

65 with open(self.publishers_filepath, "r", encoding="utf8") as fdp: 

66 pfp = json.load(fdp) 

67 if pfp: 

68 self.publisher_manager = ExtractPublisherDOI(pfp) 

69 else: 

70 self.publisher_manager = ExtractPublisherDOI({}) 

71 else: 

72 self.publisher_manager = ExtractPublisherDOI({}) 

73 with open(self.publishers_filepath, "w", encoding="utf8") as fdp: 

74 json.dump({}, fdp, ensure_ascii=False, indent=4) 

75 

76 

77 def issn_data_recover_poci(self, path): 

78 journal_issn_dict = dict() 

79 if not path: 

80 return journal_issn_dict 

81 if not os.path.exists(path): 

82 return journal_issn_dict 

83 else: 

84 with open(path, "r", encoding="utf8") as fd: 

85 journal_issn_dict = json.load(fd) 

86 return journal_issn_dict 

87 

88 

89 def issn_data_to_cache_poci(self, jtitle_issn_dict, path): 

90 with open(path, "w", encoding="utf-8") as fd: 

91 json.dump(jtitle_issn_dict, fd, ensure_ascii=False, indent=4) 

92 

93 def prefix_to_publisher_to_cache(self, pref_pub_dict, path): 

94 with open(path, "w", encoding="utf-8") as fd: 

95 json.dump(pref_pub_dict, fd, ensure_ascii=False, indent=4) 

96 

97 def csv_creator(self, item: dict) -> dict: 

98 row = dict() 

99 doi = "" 

100 pmid = self.pmid_m.normalise(str(item['pmid'])) 

101 if pmid: 

102 # create empty row 

103 keys = ['id', 'title', 'author', 'pub_date', 'venue', 'volume', 'issue', 'page', 'type', 

104 'publisher', 'editor'] 

105 for k in keys: 

106 row[k] = '' 

107 

108 attributes = item 

109 

110 # row['type'] 

111 row['type'] = 'journal article' 

112 

113 # row['id'] 

114 ids_list = list() 

115 ids_list.append(str('pmid:' + pmid)) 

116 if attributes.get('doi'): 

117 doi = DOIManager().normalise(attributes.get('doi'), include_prefix=False) 

118 if doi: 

119 doi_w_pref = "doi:"+doi 

120 if self.BR_redis.exists_as_set(doi_w_pref): 

121 ids_list.append(doi_w_pref) 

122 elif self.doi_m.is_valid(doi): 

123 ids_list.append(doi_w_pref) 

124 else: 

125 doi = '' 

126 

127 

128 row['id'] = ' '.join(ids_list) 

129 

130 # row['title'] 

131 pub_title = "" 

132 if attributes.get("title"): 

133 p_title = attributes.get("title") 

134 soup = BeautifulSoup(p_title, 'html.parser') 

135 title_soup = soup.get_text().replace('\n', '') 

136 title_soup_space_replaced = ' '.join(title_soup.split()) 

137 title_soup_strip = title_soup_space_replaced.strip() 

138 clean_tit = html.unescape(title_soup_strip) 

139 pub_title = clean_tit if clean_tit else p_title 

140 

141 row['title'] = pub_title 

142 

143 agents_list = self.add_authors_to_agent_list(attributes, []) 

144 authors_strings_list, editors_string_list = self.get_agents_strings_list(doi, agents_list) 

145 

146 # row['author'] 

147 if attributes.get('authors'): 

148 row['author'] = '; '.join(authors_strings_list) 

149 

150 # row['pub_date'] 

151 dates = attributes.get("year") 

152 row['pub_date'] = str(dates) if dates else "" 

153 

154 # row['venue'] 

155 row['venue'] = self.get_venue_name(attributes, pmid) 

156 

157 # row['volume'] 

158 row['volume'] = "" 

159 

160 # row['issue'] 

161 row['issue'] = "" 

162 

163 # row['page'] 

164 row['page'] = "" #self.get_pubmed_pages(attributes) 

165 

166 # row['publisher'] 

167 if doi: 

168 try: 

169 row['publisher'] = self.get_publisher_name(doi) 

170 except IndexError: 

171 print(doi, type(doi), row, item) 

172 raise(IndexError) 

173 else: 

174 row['publisher'] = "" 

175 

176 # row['editor'] 

177 row['editor'] = "" 

178 

179 try: 

180 return self.normalise_unicode(row) 

181 except TypeError: 

182 print(row) 

183 raise(TypeError) 

184 

185 def get_pubmed_pages(self, item: dict) -> str: 

186 ''' 

187 This function returns the pages interval. 

188 

189 :params item: the item's dictionary 

190 :type item: dict 

191 :returns: str -- The output is a string in the format 'START-END', for example, '583-584'. If there are no pages, the output is an empty string. 

192 ''' 

193 page_list = [] 

194 ''' NO INFO IN DUMP: to be updated with API DATA''' 

195 return self.get_pages(page_list) 

196 

197 def get_publisher_name(self, doi: str) -> str: 

198 ''' 

199 This function aims to return a publisher's name and id. If a mapping was provided, 

200 it is used to find the publisher's standardized name from its id or DOI prefix. 

201 

202 :params doi: the item's DOI 

203 :type doi: str 

204 

205 :returns: str -- The output is a string in the format 'NAME [SCHEMA:ID]', for example, 

206 'American Medical Association (AMA) [crossref:10]'. If the id does not exist, the output 

207 is only the name. Finally, if there is no publisher, the output is an empty string. 

208 ''' 

209 

210 publisher_name = self.publisher_manager.extract_publishers_v(doi) 

211 if publisher_name and publisher_name != "unidentified": 

212 return publisher_name 

213 else: 

214 return "" 

215 

216 def save_updated_pref_publishers_map(self): 

217 upd_dict = self.publisher_manager.get_last_map_ver() 

218 self.prefix_to_publisher_to_cache(upd_dict, self.publishers_filepath) 

219 

220 

221 def get_venue_name(self, item: dict, id: str) -> str: 

222 ''' 

223 This method deals with generating the venue's name, followed by id in square brackets, separated by spaces. 

224 HTML tags are deleted and HTML entities escaped. In addition, any ISBN and ISSN are validated. 

225 Finally, the square brackets in the venue name are replaced by round brackets to avoid conflicts with the ids enclosures. 

226 

227 :params item: the item's dictionary 

228 :type item: dict 

229 :params row: a CSV row 

230 :type row: dict 

231 :returns: str -- The output is a string in the format 'NAME [SCHEMA:ID]', for example, 'Nutrition & Food Science 

232 [issn:0034-6659]'. If the id does not exist, the output is only the name. Finally, if there is no venue, 

233 the output is an empty string. 

234 ''' 

235 

236 short_n = item.get('journal') if item.get('journal') else "" 

237 venids_list = [] 

238 cont_title = short_n 

239 if short_n: 

240 if short_n not in self.jour_dict.keys(): 

241 self.jour_dict[short_n] = {"extended": "", "issn": []} 

242 if not self.jour_dict[short_n].get("extended") or not self.jour_dict[short_n].get("issn"): 

243 if id: 

244 api_response = self.nihrf._call_api(id) 

245 if api_response: 

246 if not self.jour_dict[short_n].get("extended"): 

247 self.jour_dict[short_n]["extended"] = self.nihrf._get_extended_j_title(api_response) 

248 if not self.jour_dict[short_n].get("issn"): 

249 issn_dict_list_valid = [x for x in self.nihrf._get_issn(api_response) if x] 

250 self.jour_dict[short_n]["issn"] = issn_dict_list_valid 

251 self.issn_data_to_cache_poci(self.jour_dict, self.journals_filepath) 

252 

253 if short_n in self.jour_dict.keys(): 

254 jt_data = self.jour_dict.get(short_n) 

255 if jt_data.get("issn"): 

256 venids_list = [x for x in jt_data.get("issn") if x.startswith("issn:")] 

257 venids_list_integration = ["issn:"+x for x in jt_data.get("issn") if not x.startswith("issn:")] 

258 venids_list.extend(venids_list_integration) 

259 extended_jt = jt_data.get("extended") if jt_data.get("extended") else short_n 

260 cont_title = extended_jt 

261 

262 # use abbreviated journal title if no mapping was provided 

263 cont_title = cont_title.replace('\n', '') 

264 ven_soup = BeautifulSoup(cont_title, 'html.parser') 

265 ventit = html.unescape(ven_soup.get_text()) 

266 ambiguous_brackets = re.search('\[\s*((?:[^\s]+:[^\s]+)?(?:\s+[^\s]+:[^\s]+)*)\s*\]', ventit) 

267 if ambiguous_brackets: 

268 match = ambiguous_brackets.group(1) 

269 open_bracket = ventit.find(match) - 1 

270 close_bracket = ventit.find(match) + len(match) 

271 ventit = ventit[:open_bracket] + '(' + ventit[open_bracket + 1:] 

272 ventit = ventit[:close_bracket] + ')' + ventit[close_bracket + 1:] 

273 cont_title = ventit 

274 

275 # IDS 

276 if venids_list: 

277 name_and_id = cont_title + ' [' + ' '.join(venids_list) + ']' if cont_title else '[' + ' '.join(venids_list) + ']' 

278 else: 

279 name_and_id = cont_title 

280 

281 return name_and_id 

282 

283 def add_authors_to_agent_list(self, item: dict, ag_list: list) -> list: 

284 ''' 

285 This function returns the the agents list updated with the authors dictionaries, in the correct format. 

286 

287 :params item: the item's dictionary (attributes), ag_list: the agent list 

288 :type item: dict, ag_list: list 

289 

290 :returns: listthe agents list updated with the authors dictionaries, in the correct format. 

291 ''' 

292 agent_list = ag_list 

293 if item.get("authors"): 

294 authors_string = str(item.get("authors")).strip() 

295 authors_split_list = [a.strip() for a in authors_string.split(",") if a] 

296 for author in authors_split_list: 

297 

298 agent = {} 

299 agent["role"] = "author" 

300 agent["name"] = author 

301 missing_names = [x for x in ["family", "given", "name"] if x not in agent] 

302 for mn in missing_names: 

303 agent[mn] = "" 

304 agent_list.append(agent) 

305 return agent_list 

306 

307 def find_homonyms(self, lst): 

308 homonyms_dict = dict() 

309 multi_space = re.compile(r"\s+") 

310 extend_pattern = r"[a-zA-Z'\-áéíóúäëïöüÄłŁőŐűŰZàáâäãåąčćęèéêëėįìíîïłńòóôöõøùúûüųūÿýżźñçčšžÀÁÂÄÃÅĄĆČĖĘÈÉÊËÌÍÎÏĮŁŃÒÓÔÖÕØÙÚÛÜŲŪŸÝŻŹÑßÇŒÆČŠŽñÑâê]{2,}(?:\s|$)" 

311 for d in lst: 

312 if d.get('name'): 

313 name = d.get('name') 

314 author = name.replace(".", " ") 

315 author = multi_space.sub(" ", author).strip() 

316 re_extended = re.findall(extend_pattern, author) 

317 extended = [(s.strip()).lower() for s in re_extended] 

318 d_hom_set = set() 

319 for i in extended: 

320 dicts_to_check = [dct for dct in lst if dct.get('name') and dct != d] 

321 homonyms = [dct.get('name') for dct in dicts_to_check if 

322 i in [(s.strip()).lower() for s in re.findall(extend_pattern, dct.get('name'))]] 

323 for n in homonyms: 

324 d_hom_set.add(n) 

325 if d_hom_set: 

326 homonyms_dict[d.get('name')] = list(d_hom_set) 

327 

328 return homonyms_dict 

329 

330 def get_agents_strings_list(self, doi: str, agents_list: List[dict]) -> Tuple[list, list]: 

331 homonyms_dict = self.find_homonyms(agents_list) 

332 hom_w_orcid = set() 

333 authors_strings_list = list() 

334 editors_string_list = list() 

335 dict_orcid = None 

336 multi_space = re.compile(r"\s+") 

337 inits_pattern = r"([A-Z]|[ÄŐŰÀÁÂÄÃÅĄĆČĖĘÈÉÊËÌÍÎÏĮŁŃÒÓÔÖÕØÙÚÛÜŲŪŸÝŻŹÑßÇŒÆČŠŽÑ]){1}(?:\s|$)" 

338 extend_pattern = r"[a-zA-Z'\-áéíóúäëïöüÄłŁőŐűŰZàáâäãåąčćęèéêëėįìíîïłńòóôöõøùúûüųūÿýżźñçčšžÀÁÂÄÃÅĄĆČĖĘÈÉÊËÌÍÎÏĮŁŃÒÓÔÖÕØÙÚÛÜŲŪŸÝŻŹÑßÇŒÆČŠŽñÑâê]{2,}(?:\s|$)" 

339 

340 if not all('orcid' in agent or 'ORCID' in agent for agent in agents_list) and doi: 

341 dict_orcid = self.orcid_finder(doi) 

342 agents_list = [ 

343 {k: Cleaner(v).remove_unwanted_characters() if k in {'family', 'given', 'name'} and v is not None 

344 else v for k, v in 

345 agent_dict.items()} for agent_dict in agents_list] 

346 for agent in agents_list: 

347 cur_role = agent['role'] 

348 f_name = None 

349 g_name = None 

350 name = None 

351 agent_string = None 

352 if agent.get('family') and agent.get('given'): 

353 f_name = agent['family'] 

354 g_name = agent['given'] 

355 agent_string = f_name + ', ' + g_name 

356 elif agent.get('name'): 

357 name = agent['name'] 

358 f_name = name.split(",")[0].strip() if "," in name else None 

359 g_name = name.split(",")[-1].strip() if "," in name else None 

360 

361 if f_name and g_name: 

362 agent_string = f_name + ', ' + g_name 

363 

364 

365 if agent_string is None: 

366 if agent.get('family') and not agent.get('given'): 

367 if g_name: 

368 agent_string = agent['family'] + ', ' + g_name 

369 else: 

370 agent_string = agent['family'] + ', ' 

371 elif agent.get('given') and not agent.get('family'): 

372 if f_name: 

373 agent_string = f_name + ', ' + agent['given'] 

374 else: 

375 agent_string = ', ' + agent['given'] 

376 elif agent.get('name'): 

377 agent_string = agent.get('name') 

378 

379 orcid = None 

380 if 'orcid' in agent: 

381 if isinstance(agent['orcid'], list): 

382 orcid = str(agent['orcid'][0]) 

383 else: 

384 orcid = str(agent['orcid']) 

385 elif 'ORCID' in agent: 

386 if isinstance(agent['ORCID'], list): 

387 orcid = str(agent['ORCID'][0]) 

388 else: 

389 orcid = str(agent['ORCID']) 

390 if orcid: 

391 orcid_manager = ORCIDManager(use_api_service=False) 

392 orcid = orcid_manager.normalise(orcid, include_prefix=False) 

393 orcid = orcid if orcid_manager.check_digit(orcid) else None 

394 

395 elif dict_orcid and f_name: 

396 for ori in dict_orcid: 

397 orc_n: List[str] = dict_orcid[ori].split(', ') 

398 orc_f = orc_n[0].lower() 

399 orc_g = orc_n[1] if len(orc_n) == 2 else None 

400 if families_match(f_name, orc_f): 

401 if g_name and orc_g: 

402 # If there are several authors with the same surname 

403 if len([person for person in agents_list if 'family' in person if person['family'] if 

404 families_match(person['family'], orc_f)]) > 1: 

405 # If there are several authors with the same surname and the same given names' initials 

406 if len([person for person in agents_list if 'given' in person if person['given'] if 

407 person['given'][0].lower() == orc_g[0].lower()]) > 1: 

408 homonyms_list = [person for person in agents_list if 'given' in person if 

409 person['given'] if person['given'].lower() == orc_g.lower()] 

410 # If there are homonyms 

411 if len(homonyms_list) > 1: 

412 # If such homonyms have different roles from the current role 

413 if [person for person in homonyms_list if person['role'] != cur_role]: 

414 if orc_g.lower() == g_name.lower(): 

415 orcid = ori 

416 else: 

417 if orc_g.lower() == g_name.lower(): 

418 orcid = ori 

419 elif orc_g[0].lower() == g_name[0].lower(): 

420 orcid = ori 

421 # If there is a person whose given name is equal to the family name of the current person (a common situation for cjk names) 

422 elif any([person for person in agents_list if 'given' in person if person['given'] if 

423 person['given'].lower() == f_name.lower()]): 

424 if orc_g.lower() == g_name.lower(): 

425 orcid = ori 

426 else: 

427 orcid = ori 

428 else: 

429 orcid = ori 

430 

431 # If the ra name can't be clearly divided in given name and surname 

432 elif dict_orcid and name: 

433 for ori in dict_orcid: 

434 try_match = True 

435 if name in homonyms_dict.keys(): 

436 get_best_affinity = self.compute_affinity(dict_orcid[ori], homonyms_dict.keys()) 

437 if name != get_best_affinity: 

438 try_match = False 

439 if try_match: 

440 orc_n: List[str] = dict_orcid[ori].split(', ') 

441 orc_f = orc_n[0].lower() 

442 orc_g = orc_n[1] if len(orc_n) == 2 else None 

443 

444 author = name.replace(".", " ") 

445 author = multi_space.sub(" ", author).strip() 

446 re_inits = re.findall(inits_pattern, author) 

447 re_extended = re.findall(extend_pattern, author) 

448 initials = [(x.strip()).lower() for x in re_inits] 

449 extended = [(s.strip()).lower() for s in re_extended] 

450 author_dict = {"initials": initials, "extended": extended} 

451 

452 surname_match = True if [x for x in author_dict["extended"] if x in orc_f.split()] else False 

453 if orc_g: 

454 name_match_all = True if [x for x in author_dict["extended"] if x in orc_g.split()] else False 

455 name_match_init = True if [x for x in author_dict["initials"] if any( 

456 element.startswith(x) and element not in author_dict["extended"] for 

457 element in orc_g.split())] else False 

458 else: 

459 name_match_all = False 

460 name_match_init = False 

461 matches = (surname_match and (name_match_all or name_match_init)) 

462 

463 if matches: 

464 # managing cases in which a name string was already retrieved but the one 

465 # provided by the mapping is better 

466 f_name = orc_f 

467 if not g_name: 

468 g_name = orc_g 

469 elif g_name: 

470 if len(g_name.strip()) < len(orc_g.strip()): 

471 g_name = orc_g 

472 orcid = ori 

473 

474 if agent_string is None: 

475 if f_name and g_name: 

476 agent_string = f_name + ', ' + g_name 

477 elif f_name and not g_name: 

478 agent_string = f_name + ', ' 

479 elif g_name and not f_name: 

480 agent_string = ', ' + g_name 

481 elif agent_string == agent.get('name') and f_name and g_name: 

482 agent_string = f_name + ', ' + g_name 

483 

484 

485 if agent_string and orcid: 

486 agent_string = self.uppercase_initials(agent_string) 

487 if agent_string not in hom_w_orcid: 

488 hom_w_orcid.add(agent_string) 

489 agent_string += ' [' + 'orcid:' + str(orcid) + ']' 

490 

491 if agent_string: 

492 agent_string = self.uppercase_initials(agent_string) 

493 

494 if agent['role'] == 'author': 

495 authors_strings_list.append(agent_string) 

496 elif agent['role'] == 'editor': 

497 editors_string_list.append(agent_string) 

498 

499 return authors_strings_list, editors_string_list 

500 

501 

502 def compute_affinity(self, s, lst): 

503 s = s.replace(r"\s+", " ") 

504 s = s.replace(r"\n+", " ") 

505 name = s.lower() 

506 agent = name.replace(".", " ") 

507 agent = agent.replace(",", " ") 

508 agent = agent.strip() 

509 agent_name_parts = agent.split() 

510 extended = [x for x in agent_name_parts if len(x) > 1] 

511 initials = [x for x in agent_name_parts if len(x) == 1] 

512 

513 target_agent_dict = {"initials": initials, "extended": extended} 

514 

515 report_dicts = {} 

516 for ag in lst: 

517 name = ag.lower() 

518 name = name.replace(r"\s+", " ") 

519 name = name.replace(r"\n+", " ") 

520 agent = name.replace(".", " ") 

521 agent = agent.strip() 

522 agent_name_parts = agent.split() 

523 ag_extended = [x for x in agent_name_parts if len(x) > 1] 

524 ag_initials = [x for x in agent_name_parts if len(x) == 1] 

525 

526 copy_ext_target = [x for x in extended] 

527 copy_init_target = [x for x in initials] 

528 copy_ag_ext = [x for x in ag_extended] 

529 copy_ag_init = [x for x in ag_initials] 

530 

531 ext_matched = 0 

532 init_matched = 0 

533 

534 for i in ag_extended: 

535 if i in copy_ext_target: 

536 ext_matched += 1 

537 copy_ext_target.remove(i) 

538 copy_ag_ext.remove(i) 

539 

540 for ii in ag_initials: 

541 if ii in copy_init_target: 

542 init_matched += 1 

543 copy_init_target.remove(ii) 

544 copy_ag_init.remove(ii) 

545 

546 # check the remaining unpaired 

547 # check first if the extra initials in the ra name can be paired with the remaining extended names 

548 init_compatible = 0 

549 

550 if copy_ag_init and copy_ext_target: 

551 remaining_ag_initials = [x for x in copy_ag_init] 

552 remaining_tar_extended = [x for x in copy_ext_target] 

553 

554 for ri in remaining_ag_initials: 

555 if ri in copy_ag_init: 

556 for re in remaining_tar_extended: 

557 if re in copy_ext_target: 

558 if re.startswith(ri): 

559 copy_ag_init.remove(ri) 

560 copy_ext_target.remove(re) 

561 init_compatible += 1 

562 break 

563 

564 # check if the remaining initials of the target name are compatible with the remaining extended names of the ra 

565 ext_compatible = 0 

566 

567 if copy_ag_ext and copy_init_target: 

568 remaining_tar_initials = [x for x in copy_init_target] 

569 remaining_ag_extended = [x for x in copy_ag_ext] 

570 

571 for ri in remaining_tar_initials: 

572 if ri in copy_init_target: 

573 for re in remaining_ag_extended: 

574 if re in copy_ag_ext: 

575 if re.startswith(ri): 

576 copy_ag_ext.remove(re) 

577 copy_init_target.remove(ri) 

578 ext_compatible += 1 

579 break 

580 ext_not_compatible = len(copy_ag_ext) 

581 init_not_compatible = len(copy_ag_init) 

582 

583 cur_agent_dict = { 

584 "ext_matched": ext_matched, 

585 "init_matched": init_matched, 

586 "ext_compatible": ext_compatible, 

587 "init_compatible": init_compatible, 

588 "ext_not_compatible": ext_not_compatible, 

589 "init_not_compatible": init_not_compatible, 

590 } 

591 

592 report_dicts[ag] = cur_agent_dict 

593 best_match_name = self.get_best_match(target_agent_dict, report_dicts) 

594 return best_match_name 

595 

596 

597 

598 def add_editors_to_agent_list(self, item: dict, ag_list: list) -> list: 

599 ''' 

600 This function returns the the agents list updated with the editors dictionaries, in the correct format. 

601 

602 :params item: the item's dictionary (attributes), ag_list: the agent list 

603 :type item: dict, ag_list: list 

604 

605 :returns: listthe agents list updated with the authors dictionaries, in the correct format. 

606 ''' 

607 

608 agent_list = ag_list 

609 

610 ''' NO INFO IN DUMP: to be updated with API DATA''' 

611 return agent_list 

612 

613 def get_citing_pmid(self, meta_dict:dict) -> str: 

614 citing_pmid = "" 

615 id_string = meta_dict.get("id") 

616 if id_string: 

617 id_list = id_string.split() 

618 pmid_list = [x for x in id_list if x.startswith("pmid:")] 

619 if len(pmid_list) == 1: 

620 citing_pmid = pmid_list[0] # we expect only one pmid for each entity 

621 return citing_pmid 

622 

623 def get_citations(self, validated_pmid, item:dict) -> list: 

624 addressed_citations = set() 

625 

626 citing = validated_pmid 

627 if not citing.startswith("pmid:"): 

628 try: 

629 int_pmid = int(citing) 

630 citing = "pmid:" + str(int_pmid) 

631 except ValueError: 

632 return [] 

633 

634 references_string = item.get("references") 

635 cited_ids = references_string.split() 

636 

637 for cited_id in cited_ids: 

638 try: 

639 id_n = int(cited_id) 

640 

641 if id_n: 

642 norm_cited = self.pmid_m.normalise(str(id_n), include_prefix=True) 

643 

644 if norm_cited: 

645 addressed_citations.add((citing, norm_cited)) 

646 except ValueError: 

647 pass 

648 

649 addressed_citations_list = list(addressed_citations) 

650 

651 return addressed_citations_list 

652 

653 

654 def get_best_match(self, target_agent_dict, report_dicts): 

655 if max([v.get("ext_matched") for k,v in report_dicts.items()]) == 0: 

656 return "" 

657 elif max([v.get("ext_matched") for k,v in report_dicts.items()]) == 1: 

658 min_comp_dict = {k:v for k,v in report_dicts.items() if v.get("ext_matched") ==1 and ( 

659 (v.get("init_matched") >= 1 or v.get("ext_compatible")>=1 or v.get("init_compatible")>=1) 

660 and 

661 (v.get("ext_not_compatible")<= 1 and v.get("init_not_compatible")<= 1) 

662 )} 

663 if not min_comp_dict: 

664 return "" 

665 

666 

667 

668 len_target_init = len(target_agent_dict["initials"]) 

669 len_target_ext = len(target_agent_dict["extended"]) 

670 if len_target_init + len_target_ext >= 1: 

671 

672 # Case 1: There is a perfect match with no exceedings: return it 

673 complete_matches = {k: v for k, v in report_dicts.items() if 

674 v["ext_matched"] == len_target_ext and v["init_matched"] == len_target_init and v[ 

675 "init_not_compatible"] == 0 and v["ext_not_compatible"] == 0} 

676 if complete_matches: 

677 for k in complete_matches.keys(): 

678 return k 

679 # Case 2: There is a complete match with all the extended names and the initials of the target are compatible 

680 match_all_extended_comp_ext = {k: v for k, v in report_dicts.items() if v["ext_matched"] == len_target_ext and ( 

681 v["init_matched"] + v["ext_compatible"] == len_target_init) and v["init_not_compatible"] == 0 and v[ 

682 "ext_not_compatible"] == 0} 

683 if match_all_extended_comp_ext: 

684 if len(match_all_extended_comp_ext) == 1: 

685 for k in match_all_extended_comp_ext.keys(): 

686 return k 

687 else: 

688 return [k for k, v in match_all_extended_comp_ext.items() if 

689 v["init_matched"] == max([v["init_matched"] for v in match_all_extended_comp_ext.values()])][0] 

690 

691 # Case 3: Get max extended names match + compatible extended/initials 

692 max_comp_exc_ext = max([v["ext_matched"] for v in report_dicts.values()]) 

693 match_max_extended_comp_init = {k: v for k, v in report_dicts.items() if 

694 v["ext_matched"] == max_comp_exc_ext and ( 

695 v["ext_matched"] + v["init_compatible"] == len_target_ext) and ( 

696 v["init_matched"] + v["ext_compatible"] == len_target_init) and v[ 

697 "init_not_compatible"] == 0 and v["ext_not_compatible"] == 0} 

698 if match_max_extended_comp_init: 

699 if len(match_max_extended_comp_init) == 1: 

700 for k in match_max_extended_comp_init.keys(): 

701 return k 

702 else: 

703 return [k for k, v in match_max_extended_comp_init.items() if 

704 v["init_matched"] == max([v["init_matched"] for v in match_max_extended_comp_init.values()])][0] 

705 

706 # Case 4 (suboptimal cases), get best compatibility 

707 scores_dict = dict() 

708 

709 for k, v in report_dicts.items(): 

710 score = 0 

711 

712 p_match_ext = 0 

713 if len_target_ext: 

714 p_match_ext = v["ext_matched"] / len_target_ext 

715 if p_match_ext < 1: 

716 if v["init_compatible"]: 

717 p_match_ext = (v["init_compatible"] * 0.2 + v["ext_matched"]) / len_target_ext 

718 

719 p_match_init = 0 

720 if len_target_init: 

721 p_match_init = v["init_matched"] / len_target_init 

722 if p_match_init < 1: 

723 if v["ext_compatible"]: 

724 p_match_init = (v["ext_compatible"] * 0.7 + v["init_matched"]) / len_target_init 

725 

726 total_len_name_parts_target = len_target_ext + len_target_init 

727 if v["ext_not_compatible"]: 

728 p_inc_ext = v["ext_not_compatible"] * 0.7 / total_len_name_parts_target 

729 else: 

730 p_inc_ext = 0 

731 if v["init_not_compatible"]: 

732 p_inc_init = v["init_not_compatible"] * 0.2 / total_len_name_parts_target 

733 else: 

734 p_inc_init = 0 

735 score = p_match_ext + p_match_init - p_inc_init - p_inc_ext 

736 scores_dict[k] = score 

737 result = [k for k, v in scores_dict.items() if v == max(scores_dict.values())] 

738 if len(result) == 1: 

739 return result[0] 

740 else: 

741 return "" 

742 return ""