Coverage for oc_ds_converter / pubmed / pubmed_processing.py: 74%
496 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-12 21:23 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-12 21:23 +0000
1# SPDX-FileCopyrightText: 2023-2024 Arianna Moretti <arianna.moretti4@unibo.it>
2# SPDX-FileCopyrightText: 2023-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
3#
4# SPDX-License-Identifier: ISC
6import html
7import json
8import os
9import pathlib
10import re
11import warnings
12from os.path import exists
13from typing import List, Tuple
15from bs4 import BeautifulSoup
17from oc_ds_converter.datasource.redis import FakeRedisWrapper, RedisDataSource
18from oc_ds_converter.lib.cleaner import Cleaner
19from oc_ds_converter.oc_idmanager.doi import DOIManager
20from oc_ds_converter.oc_idmanager.orcid import ORCIDManager
21from oc_ds_converter.oc_idmanager.pmid import PMIDManager
22from oc_ds_converter.pubmed.finder_nih import NIHResourceFinder
23from oc_ds_converter.pubmed.get_publishers import ExtractPublisherDOI
24from oc_ds_converter.ra_processor import RaProcessor, families_match
26warnings.filterwarnings("ignore", category=UserWarning, module='bs4')
29class PubmedProcessing(RaProcessor):
30 def __init__(self, orcid_index: str | None = None, publishers_filepath_pubmed: str | None = None, journals_filepath: str | None = None, testing: bool = True, exclude_existing: bool = False):
31 super().__init__(orcid_index)
32 self.exclude_existing = exclude_existing
33 self.nihrf = NIHResourceFinder()
34 self.doi_m = DOIManager()
35 self.pmid_m = PMIDManager()
36 if testing:
37 self.BR_redis = FakeRedisWrapper()
38 self.RA_redis = FakeRedisWrapper()
39 else:
40 self.BR_redis = RedisDataSource("DB-META-BR")
41 self.RA_redis = RedisDataSource("DB-META-RA")
43 if not journals_filepath:
44 if not exists(os.path.join(pathlib.Path(__file__).parent.resolve(), "support_files")):
45 os.makedirs(os.path.join(pathlib.Path(__file__).parent.resolve(), "support_files"))
46 self.journals_filepath = os.path.join(pathlib.Path(__file__).parent.resolve(), "support_files",
47 "issn_jour_ext.json")
48 else:
49 self.journals_filepath = journals_filepath
51 self.jour_dict = self.issn_data_recover_poci(self.journals_filepath)
54 if not publishers_filepath_pubmed:
55 if not exists(os.path.join(pathlib.Path(__file__).parent.resolve(), "support_files")):
56 os.makedirs(os.path.join(pathlib.Path(__file__).parent.resolve(), "support_files"))
57 self.publishers_filepath = os.path.join(pathlib.Path(__file__).parent.resolve(), "support_files",
58 "prefix_publishers.json")
59 else:
60 self.publishers_filepath = publishers_filepath_pubmed
62 self.jour_dict = self.issn_data_recover_poci(self.journals_filepath)
64 if os.path.exists(self.publishers_filepath):
65 with open(self.publishers_filepath, "r", encoding="utf8") as fdp:
66 pfp = json.load(fdp)
67 if pfp:
68 self.publisher_manager = ExtractPublisherDOI(pfp)
69 else:
70 self.publisher_manager = ExtractPublisherDOI({})
71 else:
72 self.publisher_manager = ExtractPublisherDOI({})
73 with open(self.publishers_filepath, "w", encoding="utf8") as fdp:
74 json.dump({}, fdp, ensure_ascii=False, indent=4)
77 def issn_data_recover_poci(self, path):
78 journal_issn_dict = dict()
79 if not path:
80 return journal_issn_dict
81 if not os.path.exists(path):
82 return journal_issn_dict
83 else:
84 with open(path, "r", encoding="utf8") as fd:
85 journal_issn_dict = json.load(fd)
86 return journal_issn_dict
89 def issn_data_to_cache_poci(self, jtitle_issn_dict, path):
90 with open(path, "w", encoding="utf-8") as fd:
91 json.dump(jtitle_issn_dict, fd, ensure_ascii=False, indent=4)
93 def prefix_to_publisher_to_cache(self, pref_pub_dict, path):
94 with open(path, "w", encoding="utf-8") as fd:
95 json.dump(pref_pub_dict, fd, ensure_ascii=False, indent=4)
97 def csv_creator(self, item: dict) -> dict:
98 row = dict()
99 doi = ""
100 pmid = self.pmid_m.normalise(str(item['pmid']))
101 if pmid:
102 # create empty row
103 keys = ['id', 'title', 'author', 'pub_date', 'venue', 'volume', 'issue', 'page', 'type',
104 'publisher', 'editor']
105 for k in keys:
106 row[k] = ''
108 attributes = item
110 # row['type']
111 row['type'] = 'journal article'
113 # row['id']
114 ids_list = list()
115 ids_list.append(str('pmid:' + pmid))
116 if attributes.get('doi'):
117 doi = DOIManager().normalise(attributes.get('doi'), include_prefix=False)
118 if doi:
119 doi_w_pref = "doi:"+doi
120 if self.BR_redis.exists_as_set(doi_w_pref):
121 ids_list.append(doi_w_pref)
122 elif self.doi_m.is_valid(doi):
123 ids_list.append(doi_w_pref)
124 else:
125 doi = ''
128 row['id'] = ' '.join(ids_list)
130 # row['title']
131 pub_title = ""
132 if attributes.get("title"):
133 p_title = attributes.get("title")
134 soup = BeautifulSoup(p_title, 'html.parser')
135 title_soup = soup.get_text().replace('\n', '')
136 title_soup_space_replaced = ' '.join(title_soup.split())
137 title_soup_strip = title_soup_space_replaced.strip()
138 clean_tit = html.unescape(title_soup_strip)
139 pub_title = clean_tit if clean_tit else p_title
141 row['title'] = pub_title
143 agents_list = self.add_authors_to_agent_list(attributes, [])
144 authors_strings_list, editors_string_list = self.get_agents_strings_list(doi, agents_list)
146 # row['author']
147 if attributes.get('authors'):
148 row['author'] = '; '.join(authors_strings_list)
150 # row['pub_date']
151 dates = attributes.get("year")
152 row['pub_date'] = str(dates) if dates else ""
154 # row['venue']
155 row['venue'] = self.get_venue_name(attributes, pmid)
157 # row['volume']
158 row['volume'] = ""
160 # row['issue']
161 row['issue'] = ""
163 # row['page']
164 row['page'] = "" #self.get_pubmed_pages(attributes)
166 # row['publisher']
167 if doi:
168 try:
169 row['publisher'] = self.get_publisher_name(doi)
170 except IndexError:
171 print(doi, type(doi), row, item)
172 raise(IndexError)
173 else:
174 row['publisher'] = ""
176 # row['editor']
177 row['editor'] = ""
179 try:
180 return self.normalise_unicode(row)
181 except TypeError:
182 print(row)
183 raise(TypeError)
185 def get_pubmed_pages(self, item: dict) -> str:
186 '''
187 This function returns the pages interval.
189 :params item: the item's dictionary
190 :type item: dict
191 :returns: str -- The output is a string in the format 'START-END', for example, '583-584'. If there are no pages, the output is an empty string.
192 '''
193 page_list = []
194 ''' NO INFO IN DUMP: to be updated with API DATA'''
195 return self.get_pages(page_list)
197 def get_publisher_name(self, doi: str) -> str:
198 '''
199 This function aims to return a publisher's name and id. If a mapping was provided,
200 it is used to find the publisher's standardized name from its id or DOI prefix.
202 :params doi: the item's DOI
203 :type doi: str
205 :returns: str -- The output is a string in the format 'NAME [SCHEMA:ID]', for example,
206 'American Medical Association (AMA) [crossref:10]'. If the id does not exist, the output
207 is only the name. Finally, if there is no publisher, the output is an empty string.
208 '''
210 publisher_name = self.publisher_manager.extract_publishers_v(doi)
211 if publisher_name and publisher_name != "unidentified":
212 return publisher_name
213 else:
214 return ""
216 def save_updated_pref_publishers_map(self):
217 upd_dict = self.publisher_manager.get_last_map_ver()
218 self.prefix_to_publisher_to_cache(upd_dict, self.publishers_filepath)
221 def get_venue_name(self, item: dict, id: str) -> str:
222 '''
223 This method deals with generating the venue's name, followed by id in square brackets, separated by spaces.
224 HTML tags are deleted and HTML entities escaped. In addition, any ISBN and ISSN are validated.
225 Finally, the square brackets in the venue name are replaced by round brackets to avoid conflicts with the ids enclosures.
227 :params item: the item's dictionary
228 :type item: dict
229 :params row: a CSV row
230 :type row: dict
231 :returns: str -- The output is a string in the format 'NAME [SCHEMA:ID]', for example, 'Nutrition & Food Science
232 [issn:0034-6659]'. If the id does not exist, the output is only the name. Finally, if there is no venue,
233 the output is an empty string.
234 '''
236 short_n = item.get('journal') if item.get('journal') else ""
237 venids_list = []
238 cont_title = short_n
239 if short_n:
240 if short_n not in self.jour_dict.keys():
241 self.jour_dict[short_n] = {"extended": "", "issn": []}
242 if not self.jour_dict[short_n].get("extended") or not self.jour_dict[short_n].get("issn"):
243 if id:
244 api_response = self.nihrf._call_api(id)
245 if api_response:
246 if not self.jour_dict[short_n].get("extended"):
247 self.jour_dict[short_n]["extended"] = self.nihrf._get_extended_j_title(api_response)
248 if not self.jour_dict[short_n].get("issn"):
249 issn_dict_list_valid = [x for x in self.nihrf._get_issn(api_response) if x]
250 self.jour_dict[short_n]["issn"] = issn_dict_list_valid
251 self.issn_data_to_cache_poci(self.jour_dict, self.journals_filepath)
253 if short_n in self.jour_dict.keys():
254 jt_data = self.jour_dict.get(short_n)
255 if jt_data.get("issn"):
256 venids_list = [x for x in jt_data.get("issn") if x.startswith("issn:")]
257 venids_list_integration = ["issn:"+x for x in jt_data.get("issn") if not x.startswith("issn:")]
258 venids_list.extend(venids_list_integration)
259 extended_jt = jt_data.get("extended") if jt_data.get("extended") else short_n
260 cont_title = extended_jt
262 # use abbreviated journal title if no mapping was provided
263 cont_title = cont_title.replace('\n', '')
264 ven_soup = BeautifulSoup(cont_title, 'html.parser')
265 ventit = html.unescape(ven_soup.get_text())
266 ambiguous_brackets = re.search('\[\s*((?:[^\s]+:[^\s]+)?(?:\s+[^\s]+:[^\s]+)*)\s*\]', ventit)
267 if ambiguous_brackets:
268 match = ambiguous_brackets.group(1)
269 open_bracket = ventit.find(match) - 1
270 close_bracket = ventit.find(match) + len(match)
271 ventit = ventit[:open_bracket] + '(' + ventit[open_bracket + 1:]
272 ventit = ventit[:close_bracket] + ')' + ventit[close_bracket + 1:]
273 cont_title = ventit
275 # IDS
276 if venids_list:
277 name_and_id = cont_title + ' [' + ' '.join(venids_list) + ']' if cont_title else '[' + ' '.join(venids_list) + ']'
278 else:
279 name_and_id = cont_title
281 return name_and_id
283 def add_authors_to_agent_list(self, item: dict, ag_list: list) -> list:
284 '''
285 This function returns the the agents list updated with the authors dictionaries, in the correct format.
287 :params item: the item's dictionary (attributes), ag_list: the agent list
288 :type item: dict, ag_list: list
290 :returns: listthe agents list updated with the authors dictionaries, in the correct format.
291 '''
292 agent_list = ag_list
293 if item.get("authors"):
294 authors_string = str(item.get("authors")).strip()
295 authors_split_list = [a.strip() for a in authors_string.split(",") if a]
296 for author in authors_split_list:
298 agent = {}
299 agent["role"] = "author"
300 agent["name"] = author
301 missing_names = [x for x in ["family", "given", "name"] if x not in agent]
302 for mn in missing_names:
303 agent[mn] = ""
304 agent_list.append(agent)
305 return agent_list
307 def find_homonyms(self, lst):
308 homonyms_dict = dict()
309 multi_space = re.compile(r"\s+")
310 extend_pattern = r"[a-zA-Z'\-áéíóúäëïöüÄłŁőŐűŰZàáâäãåąčćęèéêëėįìíîïłńòóôöõøùúûüųūÿýżźñçčšžÀÁÂÄÃÅĄĆČĖĘÈÉÊËÌÍÎÏĮŁŃÒÓÔÖÕØÙÚÛÜŲŪŸÝŻŹÑßÇŒÆČŠŽñÑâê]{2,}(?:\s|$)"
311 for d in lst:
312 if d.get('name'):
313 name = d.get('name')
314 author = name.replace(".", " ")
315 author = multi_space.sub(" ", author).strip()
316 re_extended = re.findall(extend_pattern, author)
317 extended = [(s.strip()).lower() for s in re_extended]
318 d_hom_set = set()
319 for i in extended:
320 dicts_to_check = [dct for dct in lst if dct.get('name') and dct != d]
321 homonyms = [dct.get('name') for dct in dicts_to_check if
322 i in [(s.strip()).lower() for s in re.findall(extend_pattern, dct.get('name'))]]
323 for n in homonyms:
324 d_hom_set.add(n)
325 if d_hom_set:
326 homonyms_dict[d.get('name')] = list(d_hom_set)
328 return homonyms_dict
330 def get_agents_strings_list(self, doi: str, agents_list: List[dict]) -> Tuple[list, list]:
331 homonyms_dict = self.find_homonyms(agents_list)
332 hom_w_orcid = set()
333 authors_strings_list = list()
334 editors_string_list = list()
335 dict_orcid = None
336 multi_space = re.compile(r"\s+")
337 inits_pattern = r"([A-Z]|[ÄŐŰÀÁÂÄÃÅĄĆČĖĘÈÉÊËÌÍÎÏĮŁŃÒÓÔÖÕØÙÚÛÜŲŪŸÝŻŹÑßÇŒÆČŠŽÑ]){1}(?:\s|$)"
338 extend_pattern = r"[a-zA-Z'\-áéíóúäëïöüÄłŁőŐűŰZàáâäãåąčćęèéêëėįìíîïłńòóôöõøùúûüųūÿýżźñçčšžÀÁÂÄÃÅĄĆČĖĘÈÉÊËÌÍÎÏĮŁŃÒÓÔÖÕØÙÚÛÜŲŪŸÝŻŹÑßÇŒÆČŠŽñÑâê]{2,}(?:\s|$)"
340 if not all('orcid' in agent or 'ORCID' in agent for agent in agents_list) and doi:
341 dict_orcid = self.orcid_finder(doi)
342 agents_list = [
343 {k: Cleaner(v).remove_unwanted_characters() if k in {'family', 'given', 'name'} and v is not None
344 else v for k, v in
345 agent_dict.items()} for agent_dict in agents_list]
346 for agent in agents_list:
347 cur_role = agent['role']
348 f_name = None
349 g_name = None
350 name = None
351 agent_string = None
352 if agent.get('family') and agent.get('given'):
353 f_name = agent['family']
354 g_name = agent['given']
355 agent_string = f_name + ', ' + g_name
356 elif agent.get('name'):
357 name = agent['name']
358 f_name = name.split(",")[0].strip() if "," in name else None
359 g_name = name.split(",")[-1].strip() if "," in name else None
361 if f_name and g_name:
362 agent_string = f_name + ', ' + g_name
365 if agent_string is None:
366 if agent.get('family') and not agent.get('given'):
367 if g_name:
368 agent_string = agent['family'] + ', ' + g_name
369 else:
370 agent_string = agent['family'] + ', '
371 elif agent.get('given') and not agent.get('family'):
372 if f_name:
373 agent_string = f_name + ', ' + agent['given']
374 else:
375 agent_string = ', ' + agent['given']
376 elif agent.get('name'):
377 agent_string = agent.get('name')
379 orcid = None
380 if 'orcid' in agent:
381 if isinstance(agent['orcid'], list):
382 orcid = str(agent['orcid'][0])
383 else:
384 orcid = str(agent['orcid'])
385 elif 'ORCID' in agent:
386 if isinstance(agent['ORCID'], list):
387 orcid = str(agent['ORCID'][0])
388 else:
389 orcid = str(agent['ORCID'])
390 if orcid:
391 orcid_manager = ORCIDManager(use_api_service=False)
392 orcid = orcid_manager.normalise(orcid, include_prefix=False)
393 orcid = orcid if orcid_manager.check_digit(orcid) else None
395 elif dict_orcid and f_name:
396 for ori in dict_orcid:
397 orc_n: List[str] = dict_orcid[ori].split(', ')
398 orc_f = orc_n[0].lower()
399 orc_g = orc_n[1] if len(orc_n) == 2 else None
400 if families_match(f_name, orc_f):
401 if g_name and orc_g:
402 # If there are several authors with the same surname
403 if len([person for person in agents_list if 'family' in person if person['family'] if
404 families_match(person['family'], orc_f)]) > 1:
405 # If there are several authors with the same surname and the same given names' initials
406 if len([person for person in agents_list if 'given' in person if person['given'] if
407 person['given'][0].lower() == orc_g[0].lower()]) > 1:
408 homonyms_list = [person for person in agents_list if 'given' in person if
409 person['given'] if person['given'].lower() == orc_g.lower()]
410 # If there are homonyms
411 if len(homonyms_list) > 1:
412 # If such homonyms have different roles from the current role
413 if [person for person in homonyms_list if person['role'] != cur_role]:
414 if orc_g.lower() == g_name.lower():
415 orcid = ori
416 else:
417 if orc_g.lower() == g_name.lower():
418 orcid = ori
419 elif orc_g[0].lower() == g_name[0].lower():
420 orcid = ori
421 # If there is a person whose given name is equal to the family name of the current person (a common situation for cjk names)
422 elif any([person for person in agents_list if 'given' in person if person['given'] if
423 person['given'].lower() == f_name.lower()]):
424 if orc_g.lower() == g_name.lower():
425 orcid = ori
426 else:
427 orcid = ori
428 else:
429 orcid = ori
431 # If the ra name can't be clearly divided in given name and surname
432 elif dict_orcid and name:
433 for ori in dict_orcid:
434 try_match = True
435 if name in homonyms_dict.keys():
436 get_best_affinity = self.compute_affinity(dict_orcid[ori], homonyms_dict.keys())
437 if name != get_best_affinity:
438 try_match = False
439 if try_match:
440 orc_n: List[str] = dict_orcid[ori].split(', ')
441 orc_f = orc_n[0].lower()
442 orc_g = orc_n[1] if len(orc_n) == 2 else None
444 author = name.replace(".", " ")
445 author = multi_space.sub(" ", author).strip()
446 re_inits = re.findall(inits_pattern, author)
447 re_extended = re.findall(extend_pattern, author)
448 initials = [(x.strip()).lower() for x in re_inits]
449 extended = [(s.strip()).lower() for s in re_extended]
450 author_dict = {"initials": initials, "extended": extended}
452 surname_match = True if [x for x in author_dict["extended"] if x in orc_f.split()] else False
453 if orc_g:
454 name_match_all = True if [x for x in author_dict["extended"] if x in orc_g.split()] else False
455 name_match_init = True if [x for x in author_dict["initials"] if any(
456 element.startswith(x) and element not in author_dict["extended"] for
457 element in orc_g.split())] else False
458 else:
459 name_match_all = False
460 name_match_init = False
461 matches = (surname_match and (name_match_all or name_match_init))
463 if matches:
464 # managing cases in which a name string was already retrieved but the one
465 # provided by the mapping is better
466 f_name = orc_f
467 if not g_name:
468 g_name = orc_g
469 elif g_name:
470 if len(g_name.strip()) < len(orc_g.strip()):
471 g_name = orc_g
472 orcid = ori
474 if agent_string is None:
475 if f_name and g_name:
476 agent_string = f_name + ', ' + g_name
477 elif f_name and not g_name:
478 agent_string = f_name + ', '
479 elif g_name and not f_name:
480 agent_string = ', ' + g_name
481 elif agent_string == agent.get('name') and f_name and g_name:
482 agent_string = f_name + ', ' + g_name
485 if agent_string and orcid:
486 agent_string = self.uppercase_initials(agent_string)
487 if agent_string not in hom_w_orcid:
488 hom_w_orcid.add(agent_string)
489 agent_string += ' [' + 'orcid:' + str(orcid) + ']'
491 if agent_string:
492 agent_string = self.uppercase_initials(agent_string)
494 if agent['role'] == 'author':
495 authors_strings_list.append(agent_string)
496 elif agent['role'] == 'editor':
497 editors_string_list.append(agent_string)
499 return authors_strings_list, editors_string_list
502 def compute_affinity(self, s, lst):
503 s = s.replace(r"\s+", " ")
504 s = s.replace(r"\n+", " ")
505 name = s.lower()
506 agent = name.replace(".", " ")
507 agent = agent.replace(",", " ")
508 agent = agent.strip()
509 agent_name_parts = agent.split()
510 extended = [x for x in agent_name_parts if len(x) > 1]
511 initials = [x for x in agent_name_parts if len(x) == 1]
513 target_agent_dict = {"initials": initials, "extended": extended}
515 report_dicts = {}
516 for ag in lst:
517 name = ag.lower()
518 name = name.replace(r"\s+", " ")
519 name = name.replace(r"\n+", " ")
520 agent = name.replace(".", " ")
521 agent = agent.strip()
522 agent_name_parts = agent.split()
523 ag_extended = [x for x in agent_name_parts if len(x) > 1]
524 ag_initials = [x for x in agent_name_parts if len(x) == 1]
526 copy_ext_target = [x for x in extended]
527 copy_init_target = [x for x in initials]
528 copy_ag_ext = [x for x in ag_extended]
529 copy_ag_init = [x for x in ag_initials]
531 ext_matched = 0
532 init_matched = 0
534 for i in ag_extended:
535 if i in copy_ext_target:
536 ext_matched += 1
537 copy_ext_target.remove(i)
538 copy_ag_ext.remove(i)
540 for ii in ag_initials:
541 if ii in copy_init_target:
542 init_matched += 1
543 copy_init_target.remove(ii)
544 copy_ag_init.remove(ii)
546 # check the remaining unpaired
547 # check first if the extra initials in the ra name can be paired with the remaining extended names
548 init_compatible = 0
550 if copy_ag_init and copy_ext_target:
551 remaining_ag_initials = [x for x in copy_ag_init]
552 remaining_tar_extended = [x for x in copy_ext_target]
554 for ri in remaining_ag_initials:
555 if ri in copy_ag_init:
556 for re in remaining_tar_extended:
557 if re in copy_ext_target:
558 if re.startswith(ri):
559 copy_ag_init.remove(ri)
560 copy_ext_target.remove(re)
561 init_compatible += 1
562 break
564 # check if the remaining initials of the target name are compatible with the remaining extended names of the ra
565 ext_compatible = 0
567 if copy_ag_ext and copy_init_target:
568 remaining_tar_initials = [x for x in copy_init_target]
569 remaining_ag_extended = [x for x in copy_ag_ext]
571 for ri in remaining_tar_initials:
572 if ri in copy_init_target:
573 for re in remaining_ag_extended:
574 if re in copy_ag_ext:
575 if re.startswith(ri):
576 copy_ag_ext.remove(re)
577 copy_init_target.remove(ri)
578 ext_compatible += 1
579 break
580 ext_not_compatible = len(copy_ag_ext)
581 init_not_compatible = len(copy_ag_init)
583 cur_agent_dict = {
584 "ext_matched": ext_matched,
585 "init_matched": init_matched,
586 "ext_compatible": ext_compatible,
587 "init_compatible": init_compatible,
588 "ext_not_compatible": ext_not_compatible,
589 "init_not_compatible": init_not_compatible,
590 }
592 report_dicts[ag] = cur_agent_dict
593 best_match_name = self.get_best_match(target_agent_dict, report_dicts)
594 return best_match_name
598 def add_editors_to_agent_list(self, item: dict, ag_list: list) -> list:
599 '''
600 This function returns the the agents list updated with the editors dictionaries, in the correct format.
602 :params item: the item's dictionary (attributes), ag_list: the agent list
603 :type item: dict, ag_list: list
605 :returns: listthe agents list updated with the authors dictionaries, in the correct format.
606 '''
608 agent_list = ag_list
610 ''' NO INFO IN DUMP: to be updated with API DATA'''
611 return agent_list
613 def get_citing_pmid(self, meta_dict:dict) -> str:
614 citing_pmid = ""
615 id_string = meta_dict.get("id")
616 if id_string:
617 id_list = id_string.split()
618 pmid_list = [x for x in id_list if x.startswith("pmid:")]
619 if len(pmid_list) == 1:
620 citing_pmid = pmid_list[0] # we expect only one pmid for each entity
621 return citing_pmid
623 def get_citations(self, validated_pmid, item:dict) -> list:
624 addressed_citations = set()
626 citing = validated_pmid
627 if not citing.startswith("pmid:"):
628 try:
629 int_pmid = int(citing)
630 citing = "pmid:" + str(int_pmid)
631 except ValueError:
632 return []
634 references_string = item.get("references")
635 cited_ids = references_string.split()
637 for cited_id in cited_ids:
638 try:
639 id_n = int(cited_id)
641 if id_n:
642 norm_cited = self.pmid_m.normalise(str(id_n), include_prefix=True)
644 if norm_cited:
645 addressed_citations.add((citing, norm_cited))
646 except ValueError:
647 pass
649 addressed_citations_list = list(addressed_citations)
651 return addressed_citations_list
654 def get_best_match(self, target_agent_dict, report_dicts):
655 if max([v.get("ext_matched") for k,v in report_dicts.items()]) == 0:
656 return ""
657 elif max([v.get("ext_matched") for k,v in report_dicts.items()]) == 1:
658 min_comp_dict = {k:v for k,v in report_dicts.items() if v.get("ext_matched") ==1 and (
659 (v.get("init_matched") >= 1 or v.get("ext_compatible")>=1 or v.get("init_compatible")>=1)
660 and
661 (v.get("ext_not_compatible")<= 1 and v.get("init_not_compatible")<= 1)
662 )}
663 if not min_comp_dict:
664 return ""
668 len_target_init = len(target_agent_dict["initials"])
669 len_target_ext = len(target_agent_dict["extended"])
670 if len_target_init + len_target_ext >= 1:
672 # Case 1: There is a perfect match with no exceedings: return it
673 complete_matches = {k: v for k, v in report_dicts.items() if
674 v["ext_matched"] == len_target_ext and v["init_matched"] == len_target_init and v[
675 "init_not_compatible"] == 0 and v["ext_not_compatible"] == 0}
676 if complete_matches:
677 for k in complete_matches.keys():
678 return k
679 # Case 2: There is a complete match with all the extended names and the initials of the target are compatible
680 match_all_extended_comp_ext = {k: v for k, v in report_dicts.items() if v["ext_matched"] == len_target_ext and (
681 v["init_matched"] + v["ext_compatible"] == len_target_init) and v["init_not_compatible"] == 0 and v[
682 "ext_not_compatible"] == 0}
683 if match_all_extended_comp_ext:
684 if len(match_all_extended_comp_ext) == 1:
685 for k in match_all_extended_comp_ext.keys():
686 return k
687 else:
688 return [k for k, v in match_all_extended_comp_ext.items() if
689 v["init_matched"] == max([v["init_matched"] for v in match_all_extended_comp_ext.values()])][0]
691 # Case 3: Get max extended names match + compatible extended/initials
692 max_comp_exc_ext = max([v["ext_matched"] for v in report_dicts.values()])
693 match_max_extended_comp_init = {k: v for k, v in report_dicts.items() if
694 v["ext_matched"] == max_comp_exc_ext and (
695 v["ext_matched"] + v["init_compatible"] == len_target_ext) and (
696 v["init_matched"] + v["ext_compatible"] == len_target_init) and v[
697 "init_not_compatible"] == 0 and v["ext_not_compatible"] == 0}
698 if match_max_extended_comp_init:
699 if len(match_max_extended_comp_init) == 1:
700 for k in match_max_extended_comp_init.keys():
701 return k
702 else:
703 return [k for k, v in match_max_extended_comp_init.items() if
704 v["init_matched"] == max([v["init_matched"] for v in match_max_extended_comp_init.values()])][0]
706 # Case 4 (suboptimal cases), get best compatibility
707 scores_dict = dict()
709 for k, v in report_dicts.items():
710 score = 0
712 p_match_ext = 0
713 if len_target_ext:
714 p_match_ext = v["ext_matched"] / len_target_ext
715 if p_match_ext < 1:
716 if v["init_compatible"]:
717 p_match_ext = (v["init_compatible"] * 0.2 + v["ext_matched"]) / len_target_ext
719 p_match_init = 0
720 if len_target_init:
721 p_match_init = v["init_matched"] / len_target_init
722 if p_match_init < 1:
723 if v["ext_compatible"]:
724 p_match_init = (v["ext_compatible"] * 0.7 + v["init_matched"]) / len_target_init
726 total_len_name_parts_target = len_target_ext + len_target_init
727 if v["ext_not_compatible"]:
728 p_inc_ext = v["ext_not_compatible"] * 0.7 / total_len_name_parts_target
729 else:
730 p_inc_ext = 0
731 if v["init_not_compatible"]:
732 p_inc_init = v["init_not_compatible"] * 0.2 / total_len_name_parts_target
733 else:
734 p_inc_init = 0
735 score = p_match_ext + p_match_init - p_inc_init - p_inc_ext
736 scores_dict[k] = score
737 result = [k for k, v in scores_dict.items() if v == max(scores_dict.values())]
738 if len(result) == 1:
739 return result[0]
740 else:
741 return ""
742 return ""