Coverage for oc_ds_converter / openaire / openaire_processing.py: 88%
448 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-12 21:23 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-12 21:23 +0000
1# SPDX-FileCopyrightText: 2023 Arianna Moretti <arianna.moretti4@unibo.it>
2# SPDX-FileCopyrightText: 2023 Marta Soricetti <marta.soricetti@unibo.it>
3# SPDX-FileCopyrightText: 2023-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# SPDX-License-Identifier: ISC
7import csv
8import html
9import json
10import os
11import os.path
12import pathlib
13import re
14import warnings
15from os.path import exists
16from pathlib import Path
17from re import search
19from bs4 import BeautifulSoup
21from oc_ds_converter.datasource.redis import FakeRedisWrapper, RedisDataSource
22from oc_ds_converter.oc_idmanager.arxiv import ArXivManager
23from oc_ds_converter.oc_idmanager.doi import DOIManager
24from oc_ds_converter.oc_idmanager.oc_data_storage.redis_manager import RedisStorageManager
25from oc_ds_converter.oc_idmanager.oc_data_storage.storage_manager import StorageManager
26from oc_ds_converter.oc_idmanager.oc_data_storage.batch_manager import BatchManager
27from oc_ds_converter.oc_idmanager.orcid import ORCIDManager
28from oc_ds_converter.oc_idmanager.pmcid import PMCIDManager
29from oc_ds_converter.oc_idmanager.pmid import PMIDManager
30from oc_ds_converter.pubmed.get_publishers import ExtractPublisherDOI
31from oc_ds_converter.ra_processor import RaProcessor
33warnings.filterwarnings("ignore", category=UserWarning, module='bs4')
36class OpenaireProcessing(RaProcessor):
37 def __init__(self, orcid_index: str | None = None, publishers_filepath_openaire: str | None = None, storage_manager: StorageManager | None = None, testing: bool = True, exclude_existing: bool = False):
38 super(OpenaireProcessing, self).__init__(orcid_index)
39 self.exclude_existing = exclude_existing
40 self._testing = testing
41 if storage_manager is None:
42 self.storage_manager = RedisStorageManager(testing=testing)
43 else:
44 self.storage_manager = storage_manager
46 self.temporary_manager = BatchManager()
48 self.types_dict = {
49 "Article": "journal article",
50 "Part of book or chapter of book": "book chapter",
51 "Preprint": "other",
52 "Other literature type": "other",
53 "Conference object": "proceedings",
54 "Doctoral thesis": "dissertation",
55 "Book": "book",
56 "Thesis": "dissertation",
57 "Research": "other",
58 "Master thesis": "dissertation",
59 "Report": "report",
60 "Review": "other",
61 "Contribution for newspaper or weekly magazine": "other",
62 "Journal": "journal",
63 "Presentation": "other",
64 "Software Paper": "other",
65 "External research report": "report",
66 "Data Paper": "other",
67 "Project deliverable": "other",
68 "Bachelor thesis": "dissertation",
69 "Project proposal": "other",
70 "Newsletter": "other",
71 "Data Management Plan": "data management plan",
72 "Software": "computer program",
73 "Dataset": "dataset",
74 "Audiovisual": "dataset",
75 "Image": "dataset",
76 "Other dataset type": "dataset",
77 "Film": "dataset",
78 "UNKNOWN": "other",
79 "Other ORP type": "other",
80 "InteractiveResource": "other",
81 "PhysicalObject": "other",
82 "Collection": "other",
83 "Patent": "other",
84 "Project milestone": "other",
85 "Clinical Trial": "other",
86 "Bioentity": "other",
87 "Sound": "other",
88 }
89 use_api = not testing
90 self.doi_m = DOIManager(use_api_service=use_api, storage_manager=self.storage_manager, testing=testing)
91 self.pmid_m = PMIDManager(use_api_service=use_api, storage_manager=self.storage_manager, testing=testing)
92 self.pmc_m = PMCIDManager(use_api_service=use_api, storage_manager=self.storage_manager, testing=testing)
93 self.arxiv_m = ArXivManager(use_api_service=use_api, storage_manager=self.storage_manager, testing=testing)
95 self.orcid_m = ORCIDManager(use_api_service=use_api, storage_manager=self.storage_manager, testing=testing)
97 self._id_man_dict = {"doi":self.doi_m, "pmid": self.pmid_m, "pmcid": self.pmc_m,"pmc": self.pmc_m, "arxiv":self.arxiv_m}
99 # Temporary storage managers : all data must be stored in tmp storage manager and passed all together to the
100 # main storage_manager only once the full file is processed. Checks must be done both on tmp and in
101 # storage_manager, so that in case the process breaks while processing a file which does not complete (so
102 # without writing the final file) all the data concerning the ids are not stored. Otherwise, the ids saved in
103 # a storage_manager db would be considered to have been processed and thus would be ignored by the process
104 # and lost.
106 self.tmp_doi_m = DOIManager(use_api_service=use_api, storage_manager=self.temporary_manager, testing=testing)
107 self.tmp_pmid_m = PMIDManager(use_api_service=use_api, storage_manager=self.temporary_manager, testing=testing)
108 self.tmp_pmc_m = PMCIDManager(use_api_service=use_api, storage_manager=self.temporary_manager, testing=testing)
109 self.tmp_arxiv_m = ArXivManager(use_api_service=use_api, storage_manager=self.temporary_manager, testing=testing)
111 self.tmp_orcid_m = ORCIDManager(use_api_service=use_api, storage_manager=self.temporary_manager, testing=testing)
113 self.tmp_id_man_dict = {"doi": self.tmp_doi_m, "pmid": self.tmp_pmid_m, "pmcid": self.tmp_pmc_m, "pmc": self.tmp_pmc_m,
114 "arxiv": self.tmp_arxiv_m}
117 self._doi_prefixes_publishers_dict = {
118 "10.48550":{"publisher":"arxiv", "priority":1},
119 "doi:10.48550":{"publisher":"arxiv", "priority":1},
120 "10.6084":{"publisher":"figshare","priority":1},
121 "doi:10.6084":{"publisher":"figshare","priority":1},
122 "10.1184":{"publisher": "Carnegie Mellon University", "priority":2},
123 "doi:10.1184":{"publisher": "Carnegie Mellon University", "priority":2},
124 "10.25384":{"publisher":"sage", "priority":2},
125 "doi:10.25384":{"publisher":"sage", "priority":2},
126 "10.5281":{"publisher":"zenodo", "priority":3},
127 "doi:10.5281":{"publisher":"zenodo", "priority":3},
128 "10.5061":{"publisher":"dryad", "priority":4},
129 "doi:10.5061":{"publisher":"dryad", "priority":4},
130 "10.17605":{"publisher":"psyarxiv", "priority":5},
131 "doi:10.17605":{"publisher":"psyarxiv", "priority":5},
132 "10.31234": {"publisher":"psyarxiv", "priority":6},
133 "doi:10.31234": {"publisher":"psyarxiv", "priority":6},
134 }
136 if testing:
137 self.BR_redis = FakeRedisWrapper()
138 self.RA_redis = FakeRedisWrapper()
139 else:
140 self.BR_redis = RedisDataSource("DB-META-BR")
141 self.RA_redis = RedisDataSource("DB-META-RA")
143 self._redis_values_ra = []
144 self._redis_values_br = []
147 if not publishers_filepath_openaire:
149 if not exists(os.path.join(pathlib.Path(__file__).parent.resolve(), "support_files")):
150 os.makedirs(os.path.join(pathlib.Path(__file__).parent.resolve(), "support_files"))
151 self.publishers_filepath = os.path.join(pathlib.Path(__file__).parent.resolve(), "support_files",
152 "prefix_publishers.json")
153 else:
154 self.publishers_filepath = publishers_filepath_openaire
156 if os.path.exists(self.publishers_filepath):
157 pfp = dict()
158 csv_headers = ("id", "name", "prefix")
159 if self.publishers_filepath.endswith(".csv"):
160 with open(self.publishers_filepath, encoding="utf8") as f:
161 csv_reader = csv.DictReader(f, csv_headers)
162 for row in csv_reader:
163 pfp[row["prefix"]] = {"name": row["name"], "crossref_member": row["id"]}
164 self.publishers_filepath = self.publishers_filepath.replace(".csv", ".json")
165 elif self.publishers_filepath.endswith(".json"):
166 with open(self.publishers_filepath, encoding="utf8") as f:
167 pfp = json.load(f)
169 if pfp:
170 self.publisher_manager = ExtractPublisherDOI(pfp)
171 else:
172 self.publisher_manager = ExtractPublisherDOI({})
173 else:
174 self.publisher_manager = ExtractPublisherDOI({})
175 with open(self.publishers_filepath, "w", encoding="utf8") as fdp:
176 json.dump({}, fdp, ensure_ascii=False, indent=4)
178 def update_redis_values(self, br, ra):
179 self._redis_values_br = br
180 self._redis_values_ra = ra
182 def validated_as(self, id_dict):
183 # Check if the validity was already retrieved and thus
184 # a) if it is now saved either in the in-memory database, which only concerns data validated
185 # during the current file processing;
186 # b) or if it is now saved in the storage_manager database, which only concerns data validated
187 # during the previous files processing.
188 # In memory db is checked first because the dimension is smaller and the check is faster and
189 # Because we assume that it is more likely to find the same ids in close positions, e.g.: same
190 # citing id in several citations with different cited ids.
192 schema = id_dict["schema"].strip().lower()
193 id = id_dict["identifier"]
195 if schema != "orcid":
196 tmp_id_m = self.get_id_manager(schema, self.tmp_id_man_dict)
197 if tmp_id_m is None:
198 return None
199 validity_value = tmp_id_m.validated_as_id(id)
201 if validity_value is None:
202 id_m = self.get_id_manager(schema, self._id_man_dict)
203 if id_m is None:
204 return None
205 validity_value = id_m.validated_as_id(id)
206 return validity_value
208 else:
209 validity_value = self.tmp_orcid_m.validated_as_id(id)
210 if validity_value is None:
211 validity_value = self.orcid_m.validated_as_id(id)
212 return validity_value
214 def get_id_manager(self, schema_or_id, id_man_dict):
215 """Given as input the string of a schema (e.g.:'pmid') and a dictionary mapping strings of
216 the schemas to their id managers, the method returns the correct id manager. Note that each
217 instance of the Preprocessing class needs its own instances of the id managers, in order to
218 avoid conflicts while validating data"""
219 if ":" in schema_or_id:
220 split_id_prefix = schema_or_id.split(":")
221 schema = split_id_prefix[0]
222 else:
223 schema = schema_or_id
224 id_man = id_man_dict.get(schema)
225 return id_man
227 def normalise_any_id(self, id_with_prefix: str) -> str | None:
228 id_man = self.get_id_manager(id_with_prefix, self._id_man_dict)
229 if id_man is None:
230 return None
231 id_no_pref = ":".join(id_with_prefix.split(":")[1:])
232 norm_id_w_pref = id_man.normalise(id_no_pref, include_prefix=True)
233 return norm_id_w_pref
235 def get_norm_ids(self, entity):
236 norm_ids = []
237 for e in entity:
238 e_schema = e.get("schema").strip().lower()
239 if e_schema in self._id_man_dict:
240 e_id = self._id_man_dict[e_schema].normalise(e["identifier"], include_prefix=True)
241 if e_id:
242 dict_to_append = {"schema": e_schema, "identifier": e_id}
243 if dict_to_append not in norm_ids:
244 norm_ids.append(dict_to_append)
245 return norm_ids
247 def dict_to_cache(self, dict_to_be_saved, path):
248 path = Path(path)
249 parent_dir_path = path.parent.absolute()
250 if not os.path.exists(parent_dir_path):
251 Path(parent_dir_path).mkdir(parents=True, exist_ok=True)
252 with open(path, "w", encoding="utf-8") as fd:
253 json.dump(dict_to_be_saved, fd, ensure_ascii=False, indent=4)
255 def csv_creator(self, item: dict) -> dict:
256 #redis_br = item["redis_validity_lists"][0]
257 #redis_ra = item["redis_validity_lists"][1]
259 #self.update_redis_values(redis_br, redis_ra)
261 row = dict()
263 doi = []
265 keys = ['id', 'title', 'author', 'pub_date', 'venue', 'volume', 'issue', 'page', 'type',
266 'publisher', 'editor']
267 for k in keys:
268 row[k] = ''
270 attributes = item
271 # row['type'] √
272 att_type = attributes.get("objectSubType")
273 if att_type:
274 map_type = self.types_dict.get(att_type)
275 if not map_type:
276 map_type = "other"
277 else:
278 map_type = "other"
279 row['type'] = map_type
281 # row['id']
282 att_identifier_dict_of_lists = attributes.get("identifier")
283 valid_ids_list = self.to_validated_id_list(att_identifier_dict_of_lists)
285 # Keep a doi for retrieving information related to its prefix (i.e.: publisher, RA..) only in the cases
286 # where there is only one doi to refer to or where all the dois have the same prefix.
287 if valid_ids_list:
288 for id in valid_ids_list:
289 if id.startswith("doi:"):
290 doi.append(id[len("doi:"):])
291 row['id'] = ' '.join(valid_ids_list)
292 else:
293 return {}
296 # row['title'] √
297 pub_title = ""
298 att_title = attributes.get("title")
299 if att_title:
300 p_title = att_title
301 soup = BeautifulSoup(p_title, 'html.parser')
302 title_soup = soup.get_text().replace('\n', '')
303 title_soup_space_replaced = ' '.join(title_soup.split())
304 title_soup_strip = title_soup_space_replaced.strip()
305 clean_tit = html.unescape(title_soup_strip)
306 pub_title = clean_tit if clean_tit else p_title
308 row['title'] = pub_title
310 # row['author'] √
311 agents_list = self.add_authors_to_agent_list(attributes, [])
312 pref_dois = [x for x in doi if x.split("/")[0] not in self._doi_prefixes_publishers_dict]
313 if doi:
314 best_doi = pref_dois[0] if pref_dois else doi[0]
315 else:
316 best_doi = ""
317 authors_strings_list, editors_string_list = self.get_agents_strings_list(best_doi, agents_list)
318 row['author'] = '; '.join(authors_strings_list)
320 # row['pub_date'] √
321 dates = attributes.get("publicationDate")
322 row['pub_date'] = str(dates) if dates else ""
324 # row['venue']
325 row['venue'] = ""
327 # row['volume']
328 row['volume'] = ""
330 # row['issue']
331 row['issue'] = ""
333 # row['page']
334 row['page'] = ""
336 # row['publisher'] √
337 att_publ = attributes.get("publisher")
338 publ = ""
339 if att_publ:
340 publ = att_publ[0]
341 publishers = self.get_publisher_name(doi, publ)
343 row['publisher'] = publishers
345 # row['editor']
346 row['editor'] = ""
348 try:
349 return self.normalise_unicode(row)
351 except TypeError:
352 print(row)
353 raise(TypeError)
355 def get_publisher_name(self, doi_list: list, item: dict | str) -> str:
356 '''
357 This function aims to return a publisher's name and id. If a mapping was provided,
358 it is used to find the publisher's standardized name from its id or DOI prefix.
360 :params doi: the item's DOI
361 :type doi_list: list
362 :params item: the item's dictionary
363 :type item: dict
364 :returns: str -- The output is a string in the format 'NAME [SCHEMA:ID]', for example, 'American Medical Association (AMA) [crossref:10]'. If the id does not exist, the output is only the name. Finally, if there is no publisher, the output is an empty string.
365 '''
366 if not item or not isinstance(item, dict):
367 return ""
368 elif "name" not in item:
369 return ""
371 name_value = item["name"]
372 publisher: str = name_value if isinstance(name_value, str) else ""
374 if publisher and doi_list:
375 for doi in doi_list:
376 prefix = doi.split('/')[0] if doi else ""
377 if prefix:
378 if prefix in self.publisher_manager._prefix_to_data_dict:
379 prefix_data = self.publisher_manager.extract_publishers_v(doi, enable_extraagencies=False,get_all_prefix_data=True, skip_update=True)
380 if prefix_data:
381 member = prefix_data.get("crossref_member") if prefix_data.get("crossref_member") not in {"not found", None} else ""
382 retrieved_publisher_name = prefix_data.get("name") if prefix_data.get("name") not in {"unidentified", None} else ""
383 if isinstance(retrieved_publisher_name, str):
384 if publisher.lower().strip() == retrieved_publisher_name.lower().strip():
385 return f'{publisher} [crossref:{member}]' if member else publisher
387 return publisher
389 def manage_arxiv_single_id(self, id_dict_list):
390 result_dict_list = []
391 arxiv_id = ""
392 is_arxiv = False
393 ent = id_dict_list[0]
394 schema = ent.get("schema")
395 if isinstance(schema, str):
396 schema = schema.strip().lower()
397 if schema == "doi":
398 id = ent.get("identifier")
399 splitted_pref = id.split('/')[0]
400 matches = re.findall(r"(10.\d{4,9})", splitted_pref)
401 if not matches:
402 return id_dict_list
403 pref = matches[0]
404 if pref == "10.48550":
405 if id.startswith("doi:"):
406 id = id[len("doi:"):]
407 id_no_pref = id.replace(pref,"")
409 arxiv_id = self._id_man_dict["arxiv"].normalise(id_no_pref, include_prefix=True)
410 if not arxiv_id:
411 return None
412 else:
413 is_arxiv = True
414 elif schema == "arxiv":
415 id = ent.get("identifier")
416 arxiv_id = self._id_man_dict["arxiv"].normalise(id, include_prefix=True)
417 if not arxiv_id:
418 return None
419 else:
420 is_arxiv = True
421 else:
422 return id_dict_list
423 if is_arxiv:
424 result_dict_list = [{"schema": "arxiv", "identifier": arxiv_id}]
425 if not result_dict_list:
426 return id_dict_list
428 return result_dict_list
430 def manage_doi_prefixes_priorities(self, id_dict_list):
431 result_id_dict_list= []
432 priority_prefixes = [k for k,v in self._doi_prefixes_publishers_dict.items() if v.get("priority")==1]
433 arxiv_or_figshare_dois = [x for x in id_dict_list if x.get("identifier").split("/")[0] in priority_prefixes]
434 if len(arxiv_or_figshare_dois) == 1:
435 id_dict = arxiv_or_figshare_dois[0]
436 is_arxiv = self._doi_prefixes_publishers_dict[id_dict.get("identifier").split("/")[0]].get("publisher") == "arxiv"
437 has_version = search(r"v\d+", id_dict.get("identifier"))
438 if has_version: # It is necessarily a figshare doi (ARXIV have version only in arxiv id and not in arxiv dois)
439 #√
440 return arxiv_or_figshare_dois
441 else:
442 if not is_arxiv:
443 upd_id = id_dict.get("identifier") + "v1"
444 upd_dict = {k:v for k,v in id_dict.items() if k!= "identifier"}
445 upd_dict["identifier"] = upd_id
446 result_id_dict_list.append(upd_dict)
447 # √
448 return result_id_dict_list
449 else:
450 # √
451 return self.manage_arxiv_single_id([id_dict])
453 elif len(arxiv_or_figshare_dois) > 1:
454 versioned_arxiv_or_figshare_dois = [x for x in arxiv_or_figshare_dois if search(r"v\d+", x.get("identifier"))]
455 if versioned_arxiv_or_figshare_dois:
456 # √
457 return versioned_arxiv_or_figshare_dois
458 else:
459 for id_dict in arxiv_or_figshare_dois:
460 if self._doi_prefixes_publishers_dict[id_dict.get("identifier").split("/")[0]].get("publisher") == "arxiv":
461 # in order to avoid multiple ids of the same schema for the same entity without a reasonable expl.
462 # √
463 return self.manage_arxiv_single_id([id_dict])
465 for id_dict in arxiv_or_figshare_dois:
466 if self._doi_prefixes_publishers_dict[id_dict.get("identifier").split("/")[0]].get("publisher") == "figshare":
467 version = "v1"
468 upd_dict = {k:v for k,v in id_dict.items() if k != "identifier"}
469 upd_id = id_dict.get("identifier") + version
470 upd_dict["identifier"] = upd_id
471 result_id_dict_list.append(upd_dict)
472 # √
473 return result_id_dict_list
474 else:
475 zenodo_ids_list = [x for x in id_dict_list if self._doi_prefixes_publishers_dict[x.get("identifier").split("/")[0]].get("publisher") == "zenodo"]
476 if len(zenodo_ids_list) >= 2:
477 list_of_id_n_str = [x["identifier"].replace("doi:", "").replace("10.5281/zenodo.", "") for x in zenodo_ids_list]
478 list_of_id_n_int = []
479 for n in list_of_id_n_str:
480 try:
481 int_n = int(n)
482 list_of_id_n_int.append(int_n)
483 except ValueError:
484 pass
485 if list_of_id_n_int:
486 last_assigned_id = str(max(list_of_id_n_int))
487 for id_dict in zenodo_ids_list:
488 if id_dict.get("identifier").replace("doi:", "").replace("10.5281/zenodo.", "") == last_assigned_id:
489 result_id_dict_list.append(id_dict)
490 # √
491 return result_id_dict_list
492 else:
493 prefix_set = {x.get("identifier").split("/")[0] for x in id_dict_list}
494 priorities = [self._doi_prefixes_publishers_dict[p]["priority"] for p in prefix_set]
495 max_priority = min(priorities)
497 prefixes_w_max_priority = {k for k,v in self._doi_prefixes_publishers_dict.items() if v["priority"] == max_priority}
499 for id_dict in id_dict_list:
500 if id_dict.get("identifier").split("/")[0] in prefixes_w_max_priority:
501 norm_id = self.doi_m.normalise(id_dict["identifier"], include_prefix=True)
502 if norm_id is None:
503 continue
504 #if self.BR_redis.get(norm_id):
505 if norm_id in self._redis_values_br:
506 result_id_dict_list.append(id_dict)
507 return result_id_dict_list
508 # if the id is not in redis db, validate it before appending
509 elif self.tmp_doi_m.is_valid(norm_id):
510 result_id_dict_list.append(id_dict)
511 return result_id_dict_list
513 if not result_id_dict_list:
515 while id_dict_list and max_priority < 7:
517 id_dict_list = [x for x in id_dict_list if x["identifier"].split("/")[0] not in prefixes_w_max_priority]
518 max_priority += 1
519 prefixes_w_max_priority = {k for k, v in self._doi_prefixes_publishers_dict.items() if
520 v["priority"] == max_priority}
522 for id_dict in id_dict_list:
523 if id_dict.get("identifier").split("/")[0] in prefixes_w_max_priority:
524 norm_id = self.doi_m.normalise(id_dict["identifier"], include_prefix=True)
525 if norm_id is None:
526 continue
527 #if self.BR_redis.get(norm_id):
528 if norm_id in self._redis_values_br:
529 result_id_dict_list.append(id_dict)
530 return result_id_dict_list
531 # if the id is not in redis db, validate it before appending
532 elif self.tmp_doi_m.is_valid(norm_id):
533 result_id_dict_list.append(id_dict)
534 return result_id_dict_list
536 return result_id_dict_list
538 def to_validated_id_list(self, id_dict_of_list):
539 """this method takes in input a list of id dictionaries and returns a list valid and existent ids with prefixes.
540 For each id, a first validation try is made by checking its presence in META db. If the id is not in META db yet,
541 a second attempt is made by using the specific id-schema API"""
542 valid_id_set = set([x["identifier"] for x in id_dict_of_list["valid"]])
543 to_be_processed_input = id_dict_of_list["to_be_val"]
544 to_be_processed_id_dict_list = []
545 # If there is only an id, check whether it is either an arxiv id or an arxiv doi. In this cases, if there is a
546 # versioned arxiv id, it is kept as such. Otherwise both the arxiv doi and the not versioned arxiv id are replaced
547 # with the v1 version of the arxiv id. If it is not possible to retrieve an arxiv id from the only id which is
548 # either declared as an arxiv id or starts with the arxiv doi prefix, return None and interrupt the process
549 if len(valid_id_set) == 0:
550 if len(to_be_processed_input) == 1:
551 single_id_dict_list = self.manage_arxiv_single_id(to_be_processed_input)
552 if single_id_dict_list:
553 to_be_processed_id_dict_list = single_id_dict_list
554 else:
555 return
556 elif len(to_be_processed_input)> 1:
557 second_selection_list = [x for x in to_be_processed_input if x.get("schema") == "pmid" or (x.get("schema") =="doi" and x.get("identifier").split('/')[0] not in self._doi_prefixes_publishers_dict)]
558 if second_selection_list:
559 to_be_processed_id_dict_list = second_selection_list
560 else:
561 third_selection = [x for x in to_be_processed_input if x.get("schema") == "pmc" or x.get("schema") == "pmcid"]
562 if third_selection:
563 to_be_processed_id_dict_list = third_selection
564 else:
565 fourth_selection = [x for x in to_be_processed_input if x.get("schema") == "arxiv"]
566 if fourth_selection:
567 to_be_processed_id_dict_list = fourth_selection
568 else:
569 fifth_selection = [x for x in to_be_processed_input if x.get("schema") == "doi" and x.get("identifier").split('/')[0] in self._doi_prefixes_publishers_dict]
570 if fifth_selection:
571 to_be_processed_id_dict_list = self.manage_doi_prefixes_priorities(fifth_selection)
573 else:
574 return None
575 else:
576 to_be_processed_id_dict_list = [x for x in to_be_processed_input if x.get("schema") == "pmid" or (x.get("schema") == "doi" and x.get("identifier").split('/')[0] not in self._doi_prefixes_publishers_dict)]
578 if to_be_processed_id_dict_list:
579 for ent in to_be_processed_id_dict_list:
580 schema = ent.get("schema")
581 norm_id = ent.get("identifier")
582 if schema is None or norm_id is None:
583 continue
584 tmp_id_man = self.get_id_manager(schema, self.tmp_id_man_dict)
585 if tmp_id_man is None:
586 continue
587 if schema in {"pmid", "pmcid", "pmc", "arxiv", "doi"}:
588 #if self.BR_redis.get(norm_id):
589 if norm_id in self._redis_values_br:
590 tmp_id_man.storage_manager.set_value(norm_id, True) #In questo modo l'id presente in redis viene inserito anche nello storage e risulta già
591 # preso in considerazione negli step successivi
592 valid_id_set.add(norm_id)
593 # if the id is not in redis db, validate it before appending
594 elif tmp_id_man.is_valid(norm_id):#In questo modo l'id presente in redis viene inserito anche nello storage e risulta già
595 # preso in considerazione negli step successivi
596 valid_id_set.add(norm_id)
598 valid_id_list = list(valid_id_set)
599 return valid_id_list
601 def add_authors_to_agent_list(self, item: dict, ag_list: list) -> list:
602 '''
603 This function returns the the agents list updated with the authors dictionaries, in the correct format.
605 :params item: the item's dictionary (attributes), ag_list: the agent list
606 :type item: dict, ag_list: list
608 :returns: list the agents list updated with the authors dictionaries, in the correct format.
609 '''
611 agent_list = ag_list
612 creators = item.get("creator")
613 if creators:
614 for author in creators:
615 agent = {}
616 agent["role"] = "author"
617 agent["name"] = author.get("name") if author.get("name") else ""
618 missing_names = [x for x in ["family", "given"] if x not in agent]
619 for mn in missing_names:
620 agent[mn] = ""
621 all_ids = author.get("identifiers")
622 orcid_id = self.find_openaire_orcid(all_ids)
623 if orcid_id:
624 agent["orcid"] = orcid_id
625 agent_list.append(agent)
627 return agent_list
629 def find_openaire_orcid(self, all_author_ids, doi=None):
630 orcid = ""
631 if all_author_ids:
632 for id in all_author_ids:
633 schema = id.get("schema")
634 identifier = id.get("identifier")
635 if isinstance(schema, str):
636 if schema.lower().strip() == "orcid":
637 if isinstance(identifier, str):
638 norm_orcid = self.orcid_m.normalise(identifier, include_prefix=True)
639 if norm_orcid is None:
640 continue
641 ## Check orcid presence in memory and storage before validating the id
642 validity_value_orcid = self.validated_as({"identifier":norm_orcid, "schema": schema})
643 if validity_value_orcid is True:
644 orcid = norm_orcid
645 elif validity_value_orcid is None:
646 # Check in ORCID index using provided DOI before any API validation
647 if doi:
648 found_orcids = self.orcid_finder(doi)
649 if found_orcids and norm_orcid.split(':')[1] in found_orcids:
650 self.tmp_orcid_m.storage_manager.set_value(norm_orcid, True)
651 orcid = norm_orcid
653 # If not found in index, check Redis and API
654 if not orcid:
655 if norm_orcid in self._redis_values_ra:
656 orcid = norm_orcid
657 # if the id is not in redis db, validate it before appending
658 elif self.tmp_orcid_m.is_valid(norm_orcid):
659 orcid = norm_orcid
661 return orcid
663 def memory_to_storage(self):
664 kv_in_memory = self.temporary_manager.get_validity_list_of_tuples()
665 self.storage_manager.set_multi_value(kv_in_memory)
666 self.temporary_manager.delete_storage()
668 def extract_all_ids(self, citation):
669 all_br = set()
670 all_ra = set()
672 d1 = citation["source"]
673 d2 = citation["target"]
675 source_and_target = [d1, d2]
677 #for both source and target entity
678 for d in source_and_target:
679 # get all the br ids
680 br_ids_dicts = d["identifier"]
681 #and use the correct id manager to normalise
682 for br_id in br_ids_dicts:
683 schema = br_id.get("schema").strip().lower()
684 if schema in self._id_man_dict:
685 norm_id = self._id_man_dict[schema].normalise(br_id["identifier"], include_prefix=True)
686 if norm_id:
687 # if it was possible to normalise the id according to one of the schemas accepted in oc, add
688 # the id to the set of retrieved br ids for the citation.
689 all_br.add(norm_id)
690 creators = d.get("creator")
691 if creators:
692 for c in creators:
693 c_ids = c.get("identifiers")
694 if c_ids:
695 norm_orcids = {self.orcid_m.normalise(x.get("identifier"), include_prefix=True) for x in c_ids if x.get("schema") in {"ORCID", "orcid"}}
696 if norm_orcids:
697 # if it was possible to normalise any id according to orcid schema, add
698 # the norm_orcids to the set of retrieved ra ids for the citation.
699 all_ra.update(norm_orcids)
700 all_br = list(all_br)
701 all_ra = list(all_ra)
702 return all_br, all_ra
704 def get_redis_validity_list(self, id_list, redis_db):
705 ids = list(id_list)
706 if redis_db == "ra":
707 validity = self.RA_redis.mexists_as_set(ids)
708 return [ids[i] for i, v in enumerate(validity) if v]
709 elif redis_db == "br":
710 validity = self.BR_redis.mexists_as_set(ids)
711 return [ids[i] for i, v in enumerate(validity) if v]
712 else:
713 raise ValueError("redis_db must be either 'ra' or 'br'")