Coverage for oc_ds_converter / pubmed / pubmed_processing.py: 72%
496 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
1# SPDX-FileCopyrightText: 2023-2024 Arianna Moretti <arianna.moretti4@unibo.it>
2# SPDX-FileCopyrightText: 2023-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
3#
4# SPDX-License-Identifier: ISC
6import html
7import json
8import os
9import pathlib
10import re
11import warnings
12from os.path import exists
13from typing import List, Tuple
15from bs4 import BeautifulSoup
17from oc_ds_converter.datasource.redis import FakeRedisWrapper, RedisDataSource
18from oc_ds_converter.lib.cleaner import Cleaner
19from oc_ds_converter.oc_idmanager.doi import DOIManager
20from oc_ds_converter.oc_idmanager.orcid import ORCIDManager
21from oc_ds_converter.oc_idmanager.pmid import PMIDManager
22from oc_ds_converter.pubmed.finder_nih import NIHResourceFinder
23from oc_ds_converter.pubmed.get_publishers import ExtractPublisherDOI
24from oc_ds_converter.ra_processor import RaProcessor
26warnings.filterwarnings("ignore", category=UserWarning, module='bs4')
29class PubmedProcessing(RaProcessor):
30 def __init__(self, orcid_index: str | None = None, publishers_filepath_pubmed: str | None = None, journals_filepath: str | None = None, testing: bool = True, exclude_existing: bool = False):
31 super().__init__(orcid_index)
32 self.exclude_existing = exclude_existing
33 self.nihrf = NIHResourceFinder()
34 self.doi_m = DOIManager()
35 self.pmid_m = PMIDManager()
36 if testing:
37 self.BR_redis = FakeRedisWrapper()
38 self.RA_redis = FakeRedisWrapper()
39 else:
40 self.BR_redis = RedisDataSource("DB-META-BR")
41 self.RA_redis = RedisDataSource("DB-META-RA")
43 if not journals_filepath:
44 if not exists(os.path.join(pathlib.Path(__file__).parent.resolve(), "support_files")):
45 os.makedirs(os.path.join(pathlib.Path(__file__).parent.resolve(), "support_files"))
46 self.journals_filepath = os.path.join(pathlib.Path(__file__).parent.resolve(), "support_files",
47 "issn_jour_ext.json")
48 else:
49 self.journals_filepath = journals_filepath
51 self.jour_dict = self.issn_data_recover_poci(self.journals_filepath)
54 if not publishers_filepath_pubmed:
55 if not exists(os.path.join(pathlib.Path(__file__).parent.resolve(), "support_files")):
56 os.makedirs(os.path.join(pathlib.Path(__file__).parent.resolve(), "support_files"))
57 self.publishers_filepath = os.path.join(pathlib.Path(__file__).parent.resolve(), "support_files",
58 "prefix_publishers.json")
59 else:
60 self.publishers_filepath = publishers_filepath_pubmed
62 self.jour_dict = self.issn_data_recover_poci(self.journals_filepath)
64 if os.path.exists(self.publishers_filepath):
65 with open(self.publishers_filepath, "r", encoding="utf8") as fdp:
66 pfp = json.load(fdp)
67 if pfp:
68 self.publisher_manager = ExtractPublisherDOI(pfp)
69 else:
70 self.publisher_manager = ExtractPublisherDOI({})
71 else:
72 self.publisher_manager = ExtractPublisherDOI({})
73 with open(self.publishers_filepath, "w", encoding="utf8") as fdp:
74 json.dump({}, fdp, ensure_ascii=False, indent=4)
77 def issn_data_recover_poci(self, path):
78 journal_issn_dict = dict()
79 if not path:
80 return journal_issn_dict
81 if not os.path.exists(path):
82 return journal_issn_dict
83 else:
84 with open(path, "r", encoding="utf8") as fd:
85 journal_issn_dict = json.load(fd)
86 return journal_issn_dict
89 def issn_data_to_cache_poci(self, jtitle_issn_dict, path):
90 with open(path, "w", encoding="utf-8") as fd:
91 json.dump(jtitle_issn_dict, fd, ensure_ascii=False, indent=4)
93 def prefix_to_publisher_to_cache(self, pref_pub_dict, path):
94 with open(path, "w", encoding="utf-8") as fd:
95 json.dump(pref_pub_dict, fd, ensure_ascii=False, indent=4)
97 def csv_creator(self, item: dict) -> dict:
98 row = dict()
99 doi = ""
100 pmid = self.pmid_m.normalise(str(item['pmid']))
101 if pmid:
102 # create empty row
103 keys = ['id', 'title', 'author', 'pub_date', 'venue', 'volume', 'issue', 'page', 'type',
104 'publisher', 'editor']
105 for k in keys:
106 row[k] = ''
108 attributes = item
110 # row['type']
111 row['type'] = 'journal article'
113 # row['id']
114 ids_list = list()
115 ids_list.append(str('pmid:' + pmid))
116 if attributes.get('doi'):
117 doi = DOIManager().normalise(attributes.get('doi'), include_prefix=False)
118 if doi:
119 doi_w_pref = "doi:"+doi
120 if self.BR_redis.exists_as_set(doi_w_pref):
121 ids_list.append(doi_w_pref)
122 elif self.doi_m.is_valid(doi):
123 ids_list.append(doi_w_pref)
124 else:
125 doi = ''
128 row['id'] = ' '.join(ids_list)
130 # row['title']
131 pub_title = ""
132 if attributes.get("title"):
133 p_title = attributes.get("title")
134 soup = BeautifulSoup(p_title, 'html.parser')
135 title_soup = soup.get_text().replace('\n', '')
136 title_soup_space_replaced = ' '.join(title_soup.split())
137 title_soup_strip = title_soup_space_replaced.strip()
138 clean_tit = html.unescape(title_soup_strip)
139 pub_title = clean_tit if clean_tit else p_title
141 row['title'] = pub_title
143 agents_list = self.add_authors_to_agent_list(attributes, [])
144 authors_strings_list, editors_string_list = self.get_agents_strings_list(doi, agents_list)
146 # row['author']
147 if attributes.get('authors'):
148 row['author'] = '; '.join(authors_strings_list)
150 # row['pub_date']
151 dates = attributes.get("year")
152 row['pub_date'] = str(dates) if dates else ""
154 # row['venue']
155 row['venue'] = self.get_venue_name(attributes, pmid)
157 # row['volume']
158 row['volume'] = ""
160 # row['issue']
161 row['issue'] = ""
163 # row['page']
164 row['page'] = "" #self.get_pubmed_pages(attributes)
166 # row['publisher']
167 if doi:
168 try:
169 row['publisher'] = self.get_publisher_name(doi)
170 except IndexError:
171 print(doi, type(doi), row, item)
172 raise(IndexError)
173 else:
174 row['publisher'] = ""
176 # row['editor']
177 row['editor'] = ""
179 try:
180 return self.normalise_unicode(row)
181 except TypeError:
182 print(row)
183 raise(TypeError)
185 def get_pubmed_pages(self, item: dict) -> str:
186 '''
187 This function returns the pages interval.
189 :params item: the item's dictionary
190 :type item: dict
191 :returns: str -- The output is a string in the format 'START-END', for example, '583-584'. If there are no pages, the output is an empty string.
192 '''
193 page_list = []
194 ''' NO INFO IN DUMP: to be updated with API DATA'''
195 return self.get_pages(page_list)
197 def get_publisher_name(self, doi: str) -> str:
198 '''
199 This function aims to return a publisher's name and id. If a mapping was provided,
200 it is used to find the publisher's standardized name from its id or DOI prefix.
202 :params doi: the item's DOI
203 :type doi: str
205 :returns: str -- The output is a string in the format 'NAME [SCHEMA:ID]', for example,
206 'American Medical Association (AMA) [crossref:10]'. If the id does not exist, the output
207 is only the name. Finally, if there is no publisher, the output is an empty string.
208 '''
210 publisher_name = self.publisher_manager.extract_publishers_v(doi)
211 if publisher_name and publisher_name != "unidentified":
212 return publisher_name
213 else:
214 return ""
216 def save_updated_pref_publishers_map(self):
217 upd_dict = self.publisher_manager.get_last_map_ver()
218 self.prefix_to_publisher_to_cache(upd_dict, self.publishers_filepath)
221 def get_venue_name(self, item: dict, id: str) -> str:
222 '''
223 This method deals with generating the venue's name, followed by id in square brackets, separated by spaces.
224 HTML tags are deleted and HTML entities escaped. In addition, any ISBN and ISSN are validated.
225 Finally, the square brackets in the venue name are replaced by round brackets to avoid conflicts with the ids enclosures.
227 :params item: the item's dictionary
228 :type item: dict
229 :params row: a CSV row
230 :type row: dict
231 :returns: str -- The output is a string in the format 'NAME [SCHEMA:ID]', for example, 'Nutrition & Food Science
232 [issn:0034-6659]'. If the id does not exist, the output is only the name. Finally, if there is no venue,
233 the output is an empty string.
234 '''
236 short_n = item.get('journal') if item.get('journal') else ""
237 venids_list = []
238 cont_title = short_n
239 if short_n:
240 if short_n not in self.jour_dict.keys():
241 self.jour_dict[short_n] = {"extended": "", "issn": []}
242 if not self.jour_dict[short_n].get("extended") or not self.jour_dict[short_n].get("issn"):
243 if id:
244 api_response = self.nihrf._call_api(id)
245 if api_response:
246 if not self.jour_dict[short_n].get("extended"):
247 self.jour_dict[short_n]["extended"] = self.nihrf._get_extended_j_title(api_response)
248 if not self.jour_dict[short_n].get("issn"):
249 issn_dict_list_valid = [x for x in self.nihrf._get_issn(api_response) if x]
250 self.jour_dict[short_n]["issn"] = issn_dict_list_valid
251 self.issn_data_to_cache_poci(self.jour_dict, self.journals_filepath)
253 if short_n in self.jour_dict.keys():
254 jt_data = self.jour_dict.get(short_n)
255 if jt_data.get("issn"):
256 venids_list = [x for x in jt_data.get("issn") if x.startswith("issn:")]
257 venids_list_integration = ["issn:"+x for x in jt_data.get("issn") if not x.startswith("issn:")]
258 venids_list.extend(venids_list_integration)
259 extended_jt = jt_data.get("extended") if jt_data.get("extended") else short_n
260 cont_title = extended_jt
262 # use abbreviated journal title if no mapping was provided
263 cont_title = cont_title.replace('\n', '')
264 ven_soup = BeautifulSoup(cont_title, 'html.parser')
265 ventit = html.unescape(ven_soup.get_text())
266 ambiguous_brackets = re.search('\[\s*((?:[^\s]+:[^\s]+)?(?:\s+[^\s]+:[^\s]+)*)\s*\]', ventit)
267 if ambiguous_brackets:
268 match = ambiguous_brackets.group(1)
269 open_bracket = ventit.find(match) - 1
270 close_bracket = ventit.find(match) + len(match)
271 ventit = ventit[:open_bracket] + '(' + ventit[open_bracket + 1:]
272 ventit = ventit[:close_bracket] + ')' + ventit[close_bracket + 1:]
273 cont_title = ventit
275 # IDS
276 if venids_list:
277 name_and_id = cont_title + ' [' + ' '.join(venids_list) + ']' if cont_title else '[' + ' '.join(venids_list) + ']'
278 else:
279 name_and_id = cont_title
281 return name_and_id
283 def add_authors_to_agent_list(self, item: dict, ag_list: list) -> list:
284 '''
285 This function returns the the agents list updated with the authors dictionaries, in the correct format.
287 :params item: the item's dictionary (attributes), ag_list: the agent list
288 :type item: dict, ag_list: list
290 :returns: listthe agents list updated with the authors dictionaries, in the correct format.
291 '''
292 agent_list = ag_list
293 if item.get("authors"):
294 authors_string = str(item.get("authors")).strip()
295 authors_split_list = [a.strip() for a in authors_string.split(",") if a]
296 for author in authors_split_list:
298 agent = {}
299 agent["role"] = "author"
300 agent["name"] = author
301 missing_names = [x for x in ["family", "given", "name"] if x not in agent]
302 for mn in missing_names:
303 agent[mn] = ""
304 agent_list.append(agent)
305 return agent_list
307 def find_homonyms(self, lst):
308 homonyms_dict = dict()
309 multi_space = re.compile(r"\s+")
310 extend_pattern = r"[a-zA-Z'\-áéíóúäëïöüÄłŁőŐűŰZàáâäãåąčćęèéêëėįìíîïłńòóôöõøùúûüųūÿýżźñçčšžÀÁÂÄÃÅĄĆČĖĘÈÉÊËÌÍÎÏĮŁŃÒÓÔÖÕØÙÚÛÜŲŪŸÝŻŹÑßÇŒÆČŠŽñÑâê]{2,}(?:\s|$)"
311 for d in lst:
312 if d.get('name'):
313 name = d.get('name')
314 author = name.replace(".", " ")
315 author = multi_space.sub(" ", author).strip()
316 re_extended = re.findall(extend_pattern, author)
317 extended = [(s.strip()).lower() for s in re_extended]
318 d_hom_set = set()
319 for i in extended:
320 dicts_to_check = [dct for dct in lst if dct.get('name') and dct != d]
321 homonyms = [dct.get('name') for dct in dicts_to_check if
322 i in [(s.strip()).lower() for s in re.findall(extend_pattern, dct.get('name'))]]
323 for n in homonyms:
324 d_hom_set.add(n)
325 if d_hom_set:
326 homonyms_dict[d.get('name')] = list(d_hom_set)
328 return homonyms_dict
330 def get_agents_strings_list(self, doi: str, agents_list: List[dict]) -> Tuple[list, list]:
331 homonyms_dict = self.find_homonyms(agents_list)
332 hom_w_orcid = set()
333 authors_strings_list = list()
334 editors_string_list = list()
335 dict_orcid = None
336 multi_space = re.compile(r"\s+")
337 inits_pattern = r"([A-Z]|[ÄŐŰÀÁÂÄÃÅĄĆČĖĘÈÉÊËÌÍÎÏĮŁŃÒÓÔÖÕØÙÚÛÜŲŪŸÝŻŹÑßÇŒÆČŠŽÑ]){1}(?:\s|$)"
338 extend_pattern = r"[a-zA-Z'\-áéíóúäëïöüÄłŁőŐűŰZàáâäãåąčćęèéêëėįìíîïłńòóôöõøùúûüųūÿýżźñçčšžÀÁÂÄÃÅĄĆČĖĘÈÉÊËÌÍÎÏĮŁŃÒÓÔÖÕØÙÚÛÜŲŪŸÝŻŹÑßÇŒÆČŠŽñÑâê]{2,}(?:\s|$)"
340 if not all('orcid' in agent or 'ORCID' in agent for agent in agents_list) and doi:
341 dict_orcid = self.orcid_finder(doi)
342 agents_list = [
343 {k: Cleaner(v).remove_unwanted_characters() if k in {'family', 'given', 'name'} and v is not None
344 else v for k, v in
345 agent_dict.items()} for agent_dict in agents_list]
346 for agent in agents_list:
347 cur_role = agent['role']
348 f_name = None
349 g_name = None
350 name = None
351 agent_string = None
352 if agent.get('family') and agent.get('given'):
353 f_name = agent['family']
354 g_name = agent['given']
355 agent_string = f_name + ', ' + g_name
356 elif agent.get('name'):
357 name = agent['name']
358 f_name = name.split(",")[0].strip() if "," in name else None
359 g_name = name.split(",")[-1].strip() if "," in name else None
361 if f_name and g_name:
362 agent_string = f_name + ', ' + g_name
365 if agent_string is None:
366 if agent.get('family') and not agent.get('given'):
367 if g_name:
368 agent_string = agent['family'] + ', ' + g_name
369 else:
370 agent_string = agent['family'] + ', '
371 elif agent.get('given') and not agent.get('family'):
372 if f_name:
373 agent_string = f_name + ', ' + agent['given']
374 else:
375 agent_string = ', ' + agent['given']
376 elif agent.get('name'):
377 agent_string = agent.get('name')
379 orcid = None
380 if 'orcid' in agent:
381 if isinstance(agent['orcid'], list):
382 orcid = str(agent['orcid'][0])
383 else:
384 orcid = str(agent['orcid'])
385 elif 'ORCID' in agent:
386 if isinstance(agent['ORCID'], list):
387 orcid = str(agent['ORCID'][0])
388 else:
389 orcid = str(agent['ORCID'])
390 if orcid:
391 orcid_manager = ORCIDManager(data=dict(), use_api_service=False)
392 orcid = orcid_manager.normalise(orcid, include_prefix=False)
393 orcid = orcid if orcid_manager.check_digit(orcid) else None
395 elif dict_orcid and f_name:
396 for ori in dict_orcid:
397 orc_n: List[str] = dict_orcid[ori].split(', ')
398 orc_f = orc_n[0].lower()
399 orc_g = orc_n[1] if len(orc_n) == 2 else None
400 if f_name.lower() in orc_f.lower() or orc_f.lower() in f_name.lower():
401 if g_name and orc_g:
402 # If there are several authors with the same surname
403 if len([person for person in agents_list if 'family' in person if person['family'] if
404 person['family'].lower() in orc_f.lower() or orc_f.lower() in person[
405 'family'].lower()]) > 1:
406 # If there are several authors with the same surname and the same given names' initials
407 if len([person for person in agents_list if 'given' in person if person['given'] if
408 person['given'][0].lower() == orc_g[0].lower()]) > 1:
409 homonyms_list = [person for person in agents_list if 'given' in person if
410 person['given'] if person['given'].lower() == orc_g.lower()]
411 # If there are homonyms
412 if len(homonyms_list) > 1:
413 # If such homonyms have different roles from the current role
414 if [person for person in homonyms_list if person['role'] != cur_role]:
415 if orc_g.lower() == g_name.lower():
416 orcid = ori
417 else:
418 if orc_g.lower() == g_name.lower():
419 orcid = ori
420 elif orc_g[0].lower() == g_name[0].lower():
421 orcid = ori
422 # If there is a person whose given name is equal to the family name of the current person (a common situation for cjk names)
423 elif any([person for person in agents_list if 'given' in person if person['given'] if
424 person['given'].lower() == f_name.lower()]):
425 if orc_g.lower() == g_name.lower():
426 orcid = ori
427 else:
428 orcid = ori
429 else:
430 orcid = ori
432 # If the ra name can't be clearly divided in given name and surname
433 elif dict_orcid and name:
434 for ori in dict_orcid:
435 try_match = True
436 if name in homonyms_dict.keys():
437 get_best_affinity = self.compute_affinity(dict_orcid[ori], homonyms_dict.keys())
438 if name != get_best_affinity:
439 try_match = False
440 if try_match:
441 orc_n: List[str] = dict_orcid[ori].split(', ')
442 orc_f = orc_n[0].lower()
443 orc_g = orc_n[1] if len(orc_n) == 2 else None
445 author = name.replace(".", " ")
446 author = multi_space.sub(" ", author).strip()
447 re_inits = re.findall(inits_pattern, author)
448 re_extended = re.findall(extend_pattern, author)
449 initials = [(x.strip()).lower() for x in re_inits]
450 extended = [(s.strip()).lower() for s in re_extended]
451 author_dict = {"initials": initials, "extended": extended}
453 surname_match = True if [x for x in author_dict["extended"] if x in orc_f.split()] else False
454 if orc_g:
455 name_match_all = True if [x for x in author_dict["extended"] if x in orc_g.split()] else False
456 name_match_init = True if [x for x in author_dict["initials"] if any(
457 element.startswith(x) and element not in author_dict["extended"] for
458 element in orc_g.split())] else False
459 else:
460 name_match_all = False
461 name_match_init = False
462 matches = (surname_match and (name_match_all or name_match_init))
464 if matches:
465 # managing cases in which a name string was already retrieved but the one
466 # provided by the mapping is better
467 f_name = orc_f
468 if not g_name:
469 g_name = orc_g
470 elif g_name:
471 if len(g_name.strip()) < len(orc_g.strip()):
472 g_name = orc_g
473 orcid = ori
475 if agent_string is None:
476 if f_name and g_name:
477 agent_string = f_name + ', ' + g_name
478 elif f_name and not g_name:
479 agent_string = f_name + ', '
480 elif g_name and not f_name:
481 agent_string = ', ' + g_name
482 elif agent_string == agent.get('name') and f_name and g_name:
483 agent_string = f_name + ', ' + g_name
486 if agent_string and orcid:
487 agent_string = self.uppercase_initials(agent_string)
488 if agent_string not in hom_w_orcid:
489 hom_w_orcid.add(agent_string)
490 agent_string += ' [' + 'orcid:' + str(orcid) + ']'
492 if agent_string:
493 agent_string = self.uppercase_initials(agent_string)
495 if agent['role'] == 'author':
496 authors_strings_list.append(agent_string)
497 elif agent['role'] == 'editor':
498 editors_string_list.append(agent_string)
500 return authors_strings_list, editors_string_list
503 def compute_affinity(self, s, lst):
504 s = s.replace(r"\s+", " ")
505 s = s.replace(r"\n+", " ")
506 name = s.lower()
507 agent = name.replace(".", " ")
508 agent = agent.replace(",", " ")
509 agent = agent.strip()
510 agent_name_parts = agent.split()
511 extended = [x for x in agent_name_parts if len(x) > 1]
512 initials = [x for x in agent_name_parts if len(x) == 1]
514 target_agent_dict = {"initials": initials, "extended": extended}
516 report_dicts = {}
517 for ag in lst:
518 name = ag.lower()
519 name = name.replace(r"\s+", " ")
520 name = name.replace(r"\n+", " ")
521 agent = name.replace(".", " ")
522 agent = agent.strip()
523 agent_name_parts = agent.split()
524 ag_extended = [x for x in agent_name_parts if len(x) > 1]
525 ag_initials = [x for x in agent_name_parts if len(x) == 1]
527 copy_ext_target = [x for x in extended]
528 copy_init_target = [x for x in initials]
529 copy_ag_ext = [x for x in ag_extended]
530 copy_ag_init = [x for x in ag_initials]
532 ext_matched = 0
533 init_matched = 0
535 for i in ag_extended:
536 if i in copy_ext_target:
537 ext_matched += 1
538 copy_ext_target.remove(i)
539 copy_ag_ext.remove(i)
541 for ii in ag_initials:
542 if ii in copy_init_target:
543 init_matched += 1
544 copy_init_target.remove(ii)
545 copy_ag_init.remove(ii)
547 # check the remaining unpaired
548 # check first if the extra initials in the ra name can be paired with the remaining extended names
549 init_compatible = 0
551 if copy_ag_init and copy_ext_target:
552 remaining_ag_initials = [x for x in copy_ag_init]
553 remaining_tar_extended = [x for x in copy_ext_target]
555 for ri in remaining_ag_initials:
556 if ri in copy_ag_init:
557 for re in remaining_tar_extended:
558 if re in copy_ext_target:
559 if re.startswith(ri):
560 copy_ag_init.remove(ri)
561 copy_ext_target.remove(re)
562 init_compatible += 1
563 break
565 # check if the remaining initials of the target name are compatible with the remaining extended names of the ra
566 ext_compatible = 0
568 if copy_ag_ext and copy_init_target:
569 remaining_tar_initials = [x for x in copy_init_target]
570 remaining_ag_extended = [x for x in copy_ag_ext]
572 for ri in remaining_tar_initials:
573 if ri in copy_init_target:
574 for re in remaining_ag_extended:
575 if re in copy_ag_ext:
576 if re.startswith(ri):
577 copy_ag_ext.remove(re)
578 copy_init_target.remove(ri)
579 ext_compatible += 1
580 break
581 ext_not_compatible = len(copy_ag_ext)
582 init_not_compatible = len(copy_ag_init)
584 cur_agent_dict = {
585 "ext_matched": ext_matched,
586 "init_matched": init_matched,
587 "ext_compatible": ext_compatible,
588 "init_compatible": init_compatible,
589 "ext_not_compatible": ext_not_compatible,
590 "init_not_compatible": init_not_compatible,
591 }
593 report_dicts[ag] = cur_agent_dict
594 best_match_name = self.get_best_match(target_agent_dict, report_dicts)
595 return best_match_name
599 def add_editors_to_agent_list(self, item: dict, ag_list: list) -> list:
600 '''
601 This function returns the the agents list updated with the editors dictionaries, in the correct format.
603 :params item: the item's dictionary (attributes), ag_list: the agent list
604 :type item: dict, ag_list: list
606 :returns: listthe agents list updated with the authors dictionaries, in the correct format.
607 '''
609 agent_list = ag_list
611 ''' NO INFO IN DUMP: to be updated with API DATA'''
612 return agent_list
614 def get_citing_pmid(self, meta_dict:dict) -> str:
615 citing_pmid = ""
616 id_string = meta_dict.get("id")
617 if id_string:
618 id_list = id_string.split()
619 pmid_list = [x for x in id_list if x.startswith("pmid:")]
620 if len(pmid_list) == 1:
621 citing_pmid = pmid_list[0] # we expect only one pmid for each entity
622 return citing_pmid
624 def get_citations(self, validated_pmid, item:dict) -> list:
625 addressed_citations = set()
627 citing = validated_pmid
628 if not citing.startswith("pmid:"):
629 try:
630 int_pmid = int(citing)
631 citing = "pmid:" + str(int_pmid)
632 except ValueError:
633 return []
635 references_string = item.get("references")
636 cited_ids = references_string.split()
638 for cited_id in cited_ids:
639 try:
640 id_n = int(cited_id)
642 if id_n:
643 norm_cited = self.pmid_m.normalise(str(id_n), include_prefix=True)
645 if norm_cited:
646 addressed_citations.add((citing, norm_cited))
647 except ValueError:
648 pass
650 addressed_citations_list = list(addressed_citations)
652 return addressed_citations_list
655 def get_best_match(self, target_agent_dict, report_dicts):
656 if max([v.get("ext_matched") for k,v in report_dicts.items()]) == 0:
657 return ""
658 elif max([v.get("ext_matched") for k,v in report_dicts.items()]) == 1:
659 min_comp_dict = {k:v for k,v in report_dicts.items() if v.get("ext_matched") ==1 and (
660 (v.get("init_matched") >= 1 or v.get("ext_compatible")>=1 or v.get("init_compatible")>=1)
661 and
662 (v.get("ext_not_compatible")<= 1 and v.get("init_not_compatible")<= 1)
663 )}
664 if not min_comp_dict:
665 return ""
669 len_target_init = len(target_agent_dict["initials"])
670 len_target_ext = len(target_agent_dict["extended"])
671 if len_target_init + len_target_ext >= 1:
673 # Case 1: There is a perfect match with no exceedings: return it
674 complete_matches = {k: v for k, v in report_dicts.items() if
675 v["ext_matched"] == len_target_ext and v["init_matched"] == len_target_init and v[
676 "init_not_compatible"] == 0 and v["ext_not_compatible"] == 0}
677 if complete_matches:
678 for k in complete_matches.keys():
679 return k
680 # Case 2: There is a complete match with all the extended names and the initials of the target are compatible
681 match_all_extended_comp_ext = {k: v for k, v in report_dicts.items() if v["ext_matched"] == len_target_ext and (
682 v["init_matched"] + v["ext_compatible"] == len_target_init) and v["init_not_compatible"] == 0 and v[
683 "ext_not_compatible"] == 0}
684 if match_all_extended_comp_ext:
685 if len(match_all_extended_comp_ext) == 1:
686 for k in match_all_extended_comp_ext.keys():
687 return k
688 else:
689 return [k for k, v in match_all_extended_comp_ext.items() if
690 v["init_matched"] == max([v["init_matched"] for v in match_all_extended_comp_ext.values()])][0]
692 # Case 3: Get max extended names match + compatible extended/initials
693 max_comp_exc_ext = max([v["ext_matched"] for v in report_dicts.values()])
694 match_max_extended_comp_init = {k: v for k, v in report_dicts.items() if
695 v["ext_matched"] == max_comp_exc_ext and (
696 v["ext_matched"] + v["init_compatible"] == len_target_ext) and (
697 v["init_matched"] + v["ext_compatible"] == len_target_init) and v[
698 "init_not_compatible"] == 0 and v["ext_not_compatible"] == 0}
699 if match_max_extended_comp_init:
700 if len(match_max_extended_comp_init) == 1:
701 for k in match_max_extended_comp_init.keys():
702 return k
703 else:
704 return [k for k, v in match_max_extended_comp_init.items() if
705 v["init_matched"] == max([v["init_matched"] for v in match_max_extended_comp_init.values()])][0]
707 # Case 4 (suboptimal cases), get best compatibility
708 scores_dict = dict()
710 for k, v in report_dicts.items():
711 score = 0
713 p_match_ext = 0
714 if len_target_ext:
715 p_match_ext = v["ext_matched"] / len_target_ext
716 if p_match_ext < 1:
717 if v["init_compatible"]:
718 p_match_ext = (v["init_compatible"] * 0.2 + v["ext_matched"]) / len_target_ext
720 p_match_init = 0
721 if len_target_init:
722 p_match_init = v["init_matched"] / len_target_init
723 if p_match_init < 1:
724 if v["ext_compatible"]:
725 p_match_init = (v["ext_compatible"] * 0.7 + v["init_matched"]) / len_target_init
727 total_len_name_parts_target = len_target_ext + len_target_init
728 if v["ext_not_compatible"]:
729 p_inc_ext = v["ext_not_compatible"] * 0.7 / total_len_name_parts_target
730 else:
731 p_inc_ext = 0
732 if v["init_not_compatible"]:
733 p_inc_init = v["init_not_compatible"] * 0.2 / total_len_name_parts_target
734 else:
735 p_inc_init = 0
736 score = p_match_ext + p_match_init - p_inc_init - p_inc_ext
737 scores_dict[k] = score
738 result = [k for k, v in scores_dict.items() if v == max(scores_dict.values())]
739 if len(result) == 1:
740 return result[0]
741 else:
742 return ""
743 return ""