Coverage for oc_ds_converter / openaire / openaire_processing.py: 91%
444 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
1# SPDX-FileCopyrightText: 2023 Arianna Moretti <arianna.moretti4@unibo.it>
2# SPDX-FileCopyrightText: 2023 Marta Soricetti <marta.soricetti@unibo.it>
3# SPDX-FileCopyrightText: 2023-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# SPDX-License-Identifier: ISC
7import csv
8import html
9import json
10import os
11import os.path
12import pathlib
13import re
14import warnings
15from os.path import exists
16from pathlib import Path
17from re import search
19from bs4 import BeautifulSoup
21from oc_ds_converter.datasource.redis import FakeRedisWrapper, RedisDataSource
22from oc_ds_converter.oc_idmanager.arxiv import ArXivManager
23from oc_ds_converter.oc_idmanager.doi import DOIManager
24from oc_ds_converter.oc_idmanager.oc_data_storage.redis_manager import RedisStorageManager
25from oc_ds_converter.oc_idmanager.oc_data_storage.storage_manager import StorageManager
26from oc_ds_converter.oc_idmanager.oc_data_storage.batch_manager import BatchManager
27from oc_ds_converter.oc_idmanager.orcid import ORCIDManager
28from oc_ds_converter.oc_idmanager.pmcid import PMCIDManager
29from oc_ds_converter.oc_idmanager.pmid import PMIDManager
30from oc_ds_converter.pubmed.get_publishers import ExtractPublisherDOI
31from oc_ds_converter.ra_processor import RaProcessor
33warnings.filterwarnings("ignore", category=UserWarning, module='bs4')
36class OpenaireProcessing(RaProcessor):
37 def __init__(self, orcid_index: str | None = None, publishers_filepath_openaire: str | None = None, storage_manager: StorageManager | None = None, testing: bool = True, exclude_existing: bool = False):
38 super(OpenaireProcessing, self).__init__(orcid_index)
39 self.exclude_existing = exclude_existing
40 self._testing = testing
41 if storage_manager is None:
42 self.storage_manager = RedisStorageManager(testing=testing)
43 else:
44 self.storage_manager = storage_manager
46 self.temporary_manager = BatchManager()
48 self.types_dict = {
49 "Article": "journal article",
50 "Part of book or chapter of book": "book chapter",
51 "Preprint": "other",
52 "Other literature type": "other",
53 "Conference object": "proceedings",
54 "Doctoral thesis": "dissertation",
55 "Book": "book",
56 "Thesis": "dissertation",
57 "Research": "other",
58 "Master thesis": "dissertation",
59 "Report": "report",
60 "Review": "other",
61 "Contribution for newspaper or weekly magazine": "other",
62 "Journal": "journal",
63 "Presentation": "other",
64 "Software Paper": "other",
65 "External research report": "report",
66 "Data Paper": "other",
67 "Project deliverable": "other",
68 "Bachelor thesis": "dissertation",
69 "Project proposal": "other",
70 "Newsletter": "other",
71 "Data Management Plan": "data management plan",
72 "Software": "computer program",
73 "Dataset": "dataset",
74 "Audiovisual": "dataset",
75 "Image": "dataset",
76 "Other dataset type": "dataset",
77 "Film": "dataset",
78 "UNKNOWN": "other",
79 "Other ORP type": "other",
80 "InteractiveResource": "other",
81 "PhysicalObject": "other",
82 "Collection": "other",
83 "Patent": "other",
84 "Project milestone": "other",
85 "Clinical Trial": "other",
86 "Bioentity": "other",
87 "Sound": "other",
88 }
89 self.doi_m = DOIManager(storage_manager=self.storage_manager, testing=testing)
90 self.pmid_m = PMIDManager(storage_manager=self.storage_manager, testing=testing)
91 self.pmc_m = PMCIDManager(storage_manager=self.storage_manager, testing=testing)
92 self.arxiv_m = ArXivManager(storage_manager=self.storage_manager, testing=testing)
94 self.orcid_m = ORCIDManager(storage_manager=self.storage_manager, testing=testing)
96 self._id_man_dict = {"doi":self.doi_m, "pmid": self.pmid_m, "pmcid": self.pmc_m,"pmc": self.pmc_m, "arxiv":self.arxiv_m}
98 # Temporary storage managers : all data must be stored in tmp storage manager and passed all together to the
99 # main storage_manager only once the full file is processed. Checks must be done both on tmp and in
100 # storage_manager, so that in case the process breaks while processing a file which does not complete (so
101 # without writing the final file) all the data concerning the ids are not stored. Otherwise, the ids saved in
102 # a storage_manager db would be considered to have been processed and thus would be ignored by the process
103 # and lost.
105 self.tmp_doi_m = DOIManager(storage_manager=self.temporary_manager, testing=testing)
106 self.tmp_pmid_m = PMIDManager(storage_manager=self.temporary_manager, testing=testing)
107 self.tmp_pmc_m = PMCIDManager(storage_manager=self.temporary_manager, testing=testing)
108 self.tmp_arxiv_m = ArXivManager(storage_manager=self.temporary_manager, testing=testing)
110 self.tmp_orcid_m = ORCIDManager(storage_manager=self.temporary_manager, testing=testing)
112 self.tmp_id_man_dict = {"doi": self.tmp_doi_m, "pmid": self.tmp_pmid_m, "pmcid": self.tmp_pmc_m, "pmc": self.tmp_pmc_m,
113 "arxiv": self.tmp_arxiv_m}
116 self._doi_prefixes_publishers_dict = {
117 "10.48550":{"publisher":"arxiv", "priority":1},
118 "doi:10.48550":{"publisher":"arxiv", "priority":1},
119 "10.6084":{"publisher":"figshare","priority":1},
120 "doi:10.6084":{"publisher":"figshare","priority":1},
121 "10.1184":{"publisher": "Carnegie Mellon University", "priority":2},
122 "doi:10.1184":{"publisher": "Carnegie Mellon University", "priority":2},
123 "10.25384":{"publisher":"sage", "priority":2},
124 "doi:10.25384":{"publisher":"sage", "priority":2},
125 "10.5281":{"publisher":"zenodo", "priority":3},
126 "doi:10.5281":{"publisher":"zenodo", "priority":3},
127 "10.5061":{"publisher":"dryad", "priority":4},
128 "doi:10.5061":{"publisher":"dryad", "priority":4},
129 "10.17605":{"publisher":"psyarxiv", "priority":5},
130 "doi:10.17605":{"publisher":"psyarxiv", "priority":5},
131 "10.31234": {"publisher":"psyarxiv", "priority":6},
132 "doi:10.31234": {"publisher":"psyarxiv", "priority":6},
133 }
135 if testing:
136 self.BR_redis = FakeRedisWrapper()
137 self.RA_redis = FakeRedisWrapper()
138 else:
139 self.BR_redis = RedisDataSource("DB-META-BR")
140 self.RA_redis = RedisDataSource("DB-META-RA")
142 self._redis_values_ra = []
143 self._redis_values_br = []
146 if not publishers_filepath_openaire:
148 if not exists(os.path.join(pathlib.Path(__file__).parent.resolve(), "support_files")):
149 os.makedirs(os.path.join(pathlib.Path(__file__).parent.resolve(), "support_files"))
150 self.publishers_filepath = os.path.join(pathlib.Path(__file__).parent.resolve(), "support_files",
151 "prefix_publishers.json")
152 else:
153 self.publishers_filepath = publishers_filepath_openaire
155 if os.path.exists(self.publishers_filepath):
156 pfp = dict()
157 csv_headers = ("id", "name", "prefix")
158 if self.publishers_filepath.endswith(".csv"):
159 with open(self.publishers_filepath, encoding="utf8") as f:
160 csv_reader = csv.DictReader(f, csv_headers)
161 for row in csv_reader:
162 pfp[row["prefix"]] = {"name": row["name"], "crossref_member": row["id"]}
163 self.publishers_filepath = self.publishers_filepath.replace(".csv", ".json")
164 elif self.publishers_filepath.endswith(".json"):
165 with open(self.publishers_filepath, encoding="utf8") as f:
166 pfp = json.load(f)
168 if pfp:
169 self.publisher_manager = ExtractPublisherDOI(pfp)
170 else:
171 self.publisher_manager = ExtractPublisherDOI({})
172 else:
173 self.publisher_manager = ExtractPublisherDOI({})
174 with open(self.publishers_filepath, "w", encoding="utf8") as fdp:
175 json.dump({}, fdp, ensure_ascii=False, indent=4)
177 def update_redis_values(self, br, ra):
178 self._redis_values_br = br
179 self._redis_values_ra = ra
181 def validated_as(self, id_dict):
182 # Check if the validity was already retrieved and thus
183 # a) if it is now saved either in the in-memory database, which only concerns data validated
184 # during the current file processing;
185 # b) or if it is now saved in the storage_manager database, which only concerns data validated
186 # during the previous files processing.
187 # In memory db is checked first because the dimension is smaller and the check is faster and
188 # Because we assume that it is more likely to find the same ids in close positions, e.g.: same
189 # citing id in several citations with different cited ids.
191 schema = id_dict["schema"].strip().lower()
192 id = id_dict["identifier"]
194 if schema != "orcid":
195 tmp_id_m = self.get_id_manager(schema, self.tmp_id_man_dict)
196 if tmp_id_m is None:
197 return None
198 validity_value = tmp_id_m.validated_as_id(id)
200 if validity_value is None:
201 id_m = self.get_id_manager(schema, self._id_man_dict)
202 if id_m is None:
203 return None
204 validity_value = id_m.validated_as_id(id)
205 return validity_value
207 else:
208 validity_value = self.tmp_orcid_m.validated_as_id(id)
209 if validity_value is None:
210 validity_value = self.orcid_m.validated_as_id(id)
211 return validity_value
213 def get_id_manager(self, schema_or_id, id_man_dict):
214 """Given as input the string of a schema (e.g.:'pmid') and a dictionary mapping strings of
215 the schemas to their id managers, the method returns the correct id manager. Note that each
216 instance of the Preprocessing class needs its own instances of the id managers, in order to
217 avoid conflicts while validating data"""
218 if ":" in schema_or_id:
219 split_id_prefix = schema_or_id.split(":")
220 schema = split_id_prefix[0]
221 else:
222 schema = schema_or_id
223 id_man = id_man_dict.get(schema)
224 return id_man
226 def normalise_any_id(self, id_with_prefix: str) -> str | None:
227 id_man = self.get_id_manager(id_with_prefix, self._id_man_dict)
228 if id_man is None:
229 return None
230 id_no_pref = ":".join(id_with_prefix.split(":")[1:])
231 norm_id_w_pref = id_man.normalise(id_no_pref, include_prefix=True)
232 return norm_id_w_pref
234 def get_norm_ids(self, entity):
235 norm_ids = []
236 for e in entity:
237 e_schema = e.get("schema").strip().lower()
238 if e_schema in self._id_man_dict:
239 e_id = self._id_man_dict[e_schema].normalise(e["identifier"], include_prefix=True)
240 if e_id:
241 dict_to_append = {"schema": e_schema, "identifier": e_id}
242 if dict_to_append not in norm_ids:
243 norm_ids.append(dict_to_append)
244 return norm_ids
246 def dict_to_cache(self, dict_to_be_saved, path):
247 path = Path(path)
248 parent_dir_path = path.parent.absolute()
249 if not os.path.exists(parent_dir_path):
250 Path(parent_dir_path).mkdir(parents=True, exist_ok=True)
251 with open(path, "w", encoding="utf-8") as fd:
252 json.dump(dict_to_be_saved, fd, ensure_ascii=False, indent=4)
254 def csv_creator(self, item: dict) -> dict:
255 #redis_br = item["redis_validity_lists"][0]
256 #redis_ra = item["redis_validity_lists"][1]
258 #self.update_redis_values(redis_br, redis_ra)
260 row = dict()
262 doi = []
264 keys = ['id', 'title', 'author', 'pub_date', 'venue', 'volume', 'issue', 'page', 'type',
265 'publisher', 'editor']
266 for k in keys:
267 row[k] = ''
269 attributes = item
270 # row['type'] √
271 att_type = attributes.get("objectSubType")
272 if att_type:
273 map_type = self.types_dict.get(att_type)
274 if not map_type:
275 map_type = "other"
276 else:
277 map_type = "other"
278 row['type'] = map_type
280 # row['id']
281 att_identifier_dict_of_lists = attributes.get("identifier")
282 valid_ids_list = self.to_validated_id_list(att_identifier_dict_of_lists)
284 # Keep a doi for retrieving information related to its prefix (i.e.: publisher, RA..) only in the cases
285 # where there is only one doi to refer to or where all the dois have the same prefix.
286 if valid_ids_list:
287 for id in valid_ids_list:
288 if id.startswith("doi:"):
289 doi.append(id[len("doi:"):])
290 row['id'] = ' '.join(valid_ids_list)
291 else:
292 return {}
295 # row['title'] √
296 pub_title = ""
297 att_title = attributes.get("title")
298 if att_title:
299 p_title = att_title
300 soup = BeautifulSoup(p_title, 'html.parser')
301 title_soup = soup.get_text().replace('\n', '')
302 title_soup_space_replaced = ' '.join(title_soup.split())
303 title_soup_strip = title_soup_space_replaced.strip()
304 clean_tit = html.unescape(title_soup_strip)
305 pub_title = clean_tit if clean_tit else p_title
307 row['title'] = pub_title
309 # row['author'] √
310 agents_list = self.add_authors_to_agent_list(attributes, [])
311 pref_dois = [x for x in doi if x.split("/")[0] not in self._doi_prefixes_publishers_dict]
312 if doi:
313 best_doi = pref_dois[0] if pref_dois else doi[0]
314 else:
315 best_doi = ""
316 authors_strings_list, editors_string_list = self.get_agents_strings_list(best_doi, agents_list)
317 row['author'] = '; '.join(authors_strings_list)
319 # row['pub_date'] √
320 dates = attributes.get("publicationDate")
321 row['pub_date'] = str(dates) if dates else ""
323 # row['venue']
324 row['venue'] = ""
326 # row['volume']
327 row['volume'] = ""
329 # row['issue']
330 row['issue'] = ""
332 # row['page']
333 row['page'] = ""
335 # row['publisher'] √
336 att_publ = attributes.get("publisher")
337 publ = ""
338 if att_publ:
339 publ = att_publ[0]
340 publishers = self.get_publisher_name(doi, publ)
342 row['publisher'] = publishers
344 # row['editor']
345 row['editor'] = ""
347 try:
348 return self.normalise_unicode(row)
350 except TypeError:
351 print(row)
352 raise(TypeError)
354 def get_publisher_name(self, doi_list: list, item: dict | str) -> str:
355 '''
356 This function aims to return a publisher's name and id. If a mapping was provided,
357 it is used to find the publisher's standardized name from its id or DOI prefix.
359 :params doi: the item's DOI
360 :type doi_list: list
361 :params item: the item's dictionary
362 :type item: dict
363 :returns: str -- The output is a string in the format 'NAME [SCHEMA:ID]', for example, 'American Medical Association (AMA) [crossref:10]'. If the id does not exist, the output is only the name. Finally, if there is no publisher, the output is an empty string.
364 '''
365 if not item or not isinstance(item, dict):
366 return ""
367 elif "name" not in item:
368 return ""
370 name_value = item["name"]
371 publisher: str = name_value if isinstance(name_value, str) else ""
373 if publisher and doi_list:
374 for doi in doi_list:
375 prefix = doi.split('/')[0] if doi else ""
376 if prefix:
377 if prefix in self.publisher_manager._prefix_to_data_dict:
378 prefix_data = self.publisher_manager.extract_publishers_v(doi, enable_extraagencies=False,get_all_prefix_data=True, skip_update=True)
379 if prefix_data:
380 member = prefix_data.get("crossref_member") if prefix_data.get("crossref_member") not in {"not found", None} else ""
381 retrieved_publisher_name = prefix_data.get("name") if prefix_data.get("name") not in {"unidentified", None} else ""
382 if isinstance(retrieved_publisher_name, str):
383 if publisher.lower().strip() == retrieved_publisher_name.lower().strip():
384 return f'{publisher} [crossref:{member}]' if member else publisher
386 return publisher
388 def manage_arxiv_single_id(self, id_dict_list):
389 result_dict_list = []
390 arxiv_id = ""
391 is_arxiv = False
392 ent = id_dict_list[0]
393 schema = ent.get("schema")
394 if isinstance(schema, str):
395 schema = schema.strip().lower()
396 if schema == "doi":
397 id = ent.get("identifier")
398 splitted_pref = id.split('/')[0]
399 pref = re.findall(r"(10.\d{4,9})", splitted_pref)[0]
400 if pref == "10.48550":
401 if id.startswith("doi:"):
402 id = id[len("doi:"):]
403 id_no_pref = id.replace(pref,"")
405 arxiv_id = self._id_man_dict["arxiv"].normalise(id_no_pref, include_prefix=True)
406 if not arxiv_id:
407 return None
408 else:
409 is_arxiv = True
410 elif schema == "arxiv":
411 id = ent.get("identifier")
412 arxiv_id = self._id_man_dict["arxiv"].normalise(id, include_prefix=True)
413 if not arxiv_id:
414 return None
415 else:
416 is_arxiv = True
417 else:
418 return id_dict_list
419 if is_arxiv:
420 result_dict_list = [{"schema": "arxiv", "identifier": arxiv_id}]
421 if not result_dict_list:
422 return id_dict_list
424 return result_dict_list
426 def manage_doi_prefixes_priorities(self, id_dict_list):
427 result_id_dict_list= []
428 priority_prefixes = [k for k,v in self._doi_prefixes_publishers_dict.items() if v.get("priority")==1]
429 arxiv_or_figshare_dois = [x for x in id_dict_list if x.get("identifier").split("/")[0] in priority_prefixes]
430 if len(arxiv_or_figshare_dois) == 1:
431 id_dict = arxiv_or_figshare_dois[0]
432 is_arxiv = self._doi_prefixes_publishers_dict[id_dict.get("identifier").split("/")[0]].get("publisher") == "arxiv"
433 has_version = search(r"v\d+", id_dict.get("identifier"))
434 if has_version: # It is necessarily a figshare doi (ARXIV have version only in arxiv id and not in arxiv dois)
435 #√
436 return arxiv_or_figshare_dois
437 else:
438 if not is_arxiv:
439 upd_id = id_dict.get("identifier") + "v1"
440 upd_dict = {k:v for k,v in id_dict.items() if k!= "identifier"}
441 upd_dict["identifier"] = upd_id
442 result_id_dict_list.append(upd_dict)
443 # √
444 return result_id_dict_list
445 else:
446 # √
447 return self.manage_arxiv_single_id([id_dict])
449 elif len(arxiv_or_figshare_dois) > 1:
450 versioned_arxiv_or_figshare_dois = [x for x in arxiv_or_figshare_dois if search(r"v\d+", x.get("identifier"))]
451 if versioned_arxiv_or_figshare_dois:
452 # √
453 return versioned_arxiv_or_figshare_dois
454 else:
455 for id_dict in arxiv_or_figshare_dois:
456 if self._doi_prefixes_publishers_dict[id_dict.get("identifier").split("/")[0]].get("publisher") == "arxiv":
457 # in order to avoid multiple ids of the same schema for the same entity without a reasonable expl.
458 # √
459 return self.manage_arxiv_single_id([id_dict])
461 for id_dict in arxiv_or_figshare_dois:
462 if self._doi_prefixes_publishers_dict[id_dict.get("identifier").split("/")[0]].get("publisher") == "figshare":
463 version = "v1"
464 upd_dict = {k:v for k,v in id_dict.items() if k != "identifier"}
465 upd_id = id_dict.get("identifier") + version
466 upd_dict["identifier"] = upd_id
467 result_id_dict_list.append(upd_dict)
468 # √
469 return result_id_dict_list
470 else:
471 zenodo_ids_list = [x for x in id_dict_list if self._doi_prefixes_publishers_dict[x.get("identifier").split("/")[0]].get("publisher") == "zenodo"]
472 if len(zenodo_ids_list) >= 2:
473 list_of_id_n_str = [x["identifier"].replace("doi:", "").replace("10.5281/zenodo.", "") for x in zenodo_ids_list]
474 list_of_id_n_int = []
475 for n in list_of_id_n_str:
476 try:
477 int_n = int(n)
478 list_of_id_n_int.append(int_n)
479 except ValueError:
480 pass
481 if list_of_id_n_int:
482 last_assigned_id = str(max(list_of_id_n_int))
483 for id_dict in zenodo_ids_list:
484 if id_dict.get("identifier").replace("doi:", "").replace("10.5281/zenodo.", "") == last_assigned_id:
485 result_id_dict_list.append(id_dict)
486 # √
487 return result_id_dict_list
488 else:
489 prefix_set = {x.get("identifier").split("/")[0] for x in id_dict_list}
490 priorities = [self._doi_prefixes_publishers_dict[p]["priority"] for p in prefix_set]
491 max_priority = min(priorities)
493 prefixes_w_max_priority = {k for k,v in self._doi_prefixes_publishers_dict.items() if v["priority"] == max_priority}
495 for id_dict in id_dict_list:
496 if id_dict.get("identifier").split("/")[0] in prefixes_w_max_priority:
497 norm_id = self.doi_m.normalise(id_dict["identifier"], include_prefix=True)
498 if norm_id is None:
499 continue
500 #if self.BR_redis.get(norm_id):
501 if norm_id in self._redis_values_br:
502 result_id_dict_list.append(id_dict)
503 return result_id_dict_list
504 # if the id is not in redis db, validate it before appending
505 elif self.tmp_doi_m.is_valid(norm_id):
506 result_id_dict_list.append(id_dict)
507 return result_id_dict_list
509 if not result_id_dict_list:
511 while id_dict_list and max_priority < 7:
513 id_dict_list = [x for x in id_dict_list if x["identifier"].split("/")[0] not in prefixes_w_max_priority]
514 max_priority += 1
515 prefixes_w_max_priority = {k for k, v in self._doi_prefixes_publishers_dict.items() if
516 v["priority"] == max_priority}
518 for id_dict in id_dict_list:
519 if id_dict.get("identifier").split("/")[0] in prefixes_w_max_priority:
520 norm_id = self.doi_m.normalise(id_dict["identifier"], include_prefix=True)
521 if norm_id is None:
522 continue
523 #if self.BR_redis.get(norm_id):
524 if norm_id in self._redis_values_br:
525 result_id_dict_list.append(id_dict)
526 return result_id_dict_list
527 # if the id is not in redis db, validate it before appending
528 elif self.tmp_doi_m.is_valid(norm_id):
529 result_id_dict_list.append(id_dict)
530 return result_id_dict_list
532 return result_id_dict_list
534 def to_validated_id_list(self, id_dict_of_list):
535 """this method takes in input a list of id dictionaries and returns a list valid and existent ids with prefixes.
536 For each id, a first validation try is made by checking its presence in META db. If the id is not in META db yet,
537 a second attempt is made by using the specific id-schema API"""
538 valid_id_set = set([x["identifier"] for x in id_dict_of_list["valid"]])
539 to_be_processed_input = id_dict_of_list["to_be_val"]
540 to_be_processed_id_dict_list = []
541 # If there is only an id, check whether it is either an arxiv id or an arxiv doi. In this cases, if there is a
542 # versioned arxiv id, it is kept as such. Otherwise both the arxiv doi and the not versioned arxiv id are replaced
543 # with the v1 version of the arxiv id. If it is not possible to retrieve an arxiv id from the only id which is
544 # either declared as an arxiv id or starts with the arxiv doi prefix, return None and interrupt the process
545 if len(valid_id_set) == 0:
546 if len(to_be_processed_input) == 1:
547 single_id_dict_list = self.manage_arxiv_single_id(to_be_processed_input)
548 if single_id_dict_list:
549 to_be_processed_id_dict_list = single_id_dict_list
550 else:
551 return
552 elif len(to_be_processed_input)> 1:
553 second_selection_list = [x for x in to_be_processed_input if x.get("schema") == "pmid" or (x.get("schema") =="doi" and x.get("identifier").split('/')[0] not in self._doi_prefixes_publishers_dict)]
554 if second_selection_list:
555 to_be_processed_id_dict_list = second_selection_list
556 else:
557 third_selection = [x for x in to_be_processed_input if x.get("schema") == "pmc" or x.get("schema") == "pmcid"]
558 if third_selection:
559 to_be_processed_id_dict_list = third_selection
560 else:
561 fourth_selection = [x for x in to_be_processed_input if x.get("schema") == "arxiv"]
562 if fourth_selection:
563 to_be_processed_id_dict_list = fourth_selection
564 else:
565 fifth_selection = [x for x in to_be_processed_input if x.get("schema") == "doi" and x.get("identifier").split('/')[0] in self._doi_prefixes_publishers_dict]
566 if fifth_selection:
567 to_be_processed_id_dict_list = self.manage_doi_prefixes_priorities(fifth_selection)
569 else:
570 return None
571 else:
572 to_be_processed_id_dict_list = [x for x in to_be_processed_input if x.get("schema") == "pmid" or (x.get("schema") == "doi" and x.get("identifier").split('/')[0] not in self._doi_prefixes_publishers_dict)]
574 if to_be_processed_id_dict_list:
575 for ent in to_be_processed_id_dict_list:
576 schema = ent.get("schema")
577 norm_id = ent.get("identifier")
578 if schema is None or norm_id is None:
579 continue
580 tmp_id_man = self.get_id_manager(schema, self.tmp_id_man_dict)
581 if tmp_id_man is None:
582 continue
583 if schema in {"pmid", "pmcid", "pmc", "arxiv", "doi"}:
584 #if self.BR_redis.get(norm_id):
585 if norm_id in self._redis_values_br:
586 tmp_id_man.storage_manager.set_value(norm_id, True) #In questo modo l'id presente in redis viene inserito anche nello storage e risulta già
587 # preso in considerazione negli step successivi
588 valid_id_set.add(norm_id)
589 # if the id is not in redis db, validate it before appending
590 elif tmp_id_man.is_valid(norm_id):#In questo modo l'id presente in redis viene inserito anche nello storage e risulta già
591 # preso in considerazione negli step successivi
592 valid_id_set.add(norm_id)
594 valid_id_list = list(valid_id_set)
595 return valid_id_list
597 def add_authors_to_agent_list(self, item: dict, ag_list: list) -> list:
598 '''
599 This function returns the the agents list updated with the authors dictionaries, in the correct format.
601 :params item: the item's dictionary (attributes), ag_list: the agent list
602 :type item: dict, ag_list: list
604 :returns: list the agents list updated with the authors dictionaries, in the correct format.
605 '''
607 agent_list = ag_list
608 creators = item.get("creator")
609 if creators:
610 for author in creators:
611 agent = {}
612 agent["role"] = "author"
613 agent["name"] = author.get("name") if author.get("name") else ""
614 missing_names = [x for x in ["family", "given"] if x not in agent]
615 for mn in missing_names:
616 agent[mn] = ""
617 all_ids = author.get("identifiers")
618 orcid_id = self.find_openaire_orcid(all_ids)
619 if orcid_id:
620 agent["orcid"] = orcid_id
621 agent_list.append(agent)
623 return agent_list
625 def find_openaire_orcid(self, all_author_ids, doi=None):
626 orcid = ""
627 if all_author_ids:
628 for id in all_author_ids:
629 schema = id.get("schema")
630 identifier = id.get("identifier")
631 if isinstance(schema, str):
632 if schema.lower().strip() == "orcid":
633 if isinstance(identifier, str):
634 norm_orcid = self.orcid_m.normalise(identifier, include_prefix=True)
635 if norm_orcid is None:
636 continue
637 ## Check orcid presence in memory and storage before validating the id
638 validity_value_orcid = self.validated_as({"identifier":norm_orcid, "schema": schema})
639 if validity_value_orcid is True:
640 orcid = norm_orcid
641 elif validity_value_orcid is None:
642 # Check in ORCID index using provided DOI before any API validation
643 if doi:
644 found_orcids = self.orcid_finder(doi)
645 if found_orcids and norm_orcid.split(':')[1] in found_orcids:
646 self.tmp_orcid_m.storage_manager.set_value(norm_orcid, True)
647 orcid = norm_orcid
649 # If not found in index, check Redis and API
650 if not orcid:
651 if norm_orcid in self._redis_values_ra:
652 orcid = norm_orcid
653 # if the id is not in redis db, validate it before appending
654 elif self.tmp_orcid_m.is_valid(norm_orcid):
655 orcid = norm_orcid
657 return orcid
659 def memory_to_storage(self):
660 kv_in_memory = self.temporary_manager.get_validity_list_of_tuples()
661 self.storage_manager.set_multi_value(kv_in_memory)
662 self.temporary_manager.delete_storage()
664 def extract_all_ids(self, citation):
665 all_br = set()
666 all_ra = set()
668 d1 = citation["source"]
669 d2 = citation["target"]
671 source_and_target = [d1, d2]
673 #for both source and target entity
674 for d in source_and_target:
675 # get all the br ids
676 br_ids_dicts = d["identifier"]
677 #and use the correct id manager to normalise
678 for br_id in br_ids_dicts:
679 schema = br_id.get("schema").strip().lower()
680 if schema in self._id_man_dict:
681 norm_id = self._id_man_dict[schema].normalise(br_id["identifier"], include_prefix=True)
682 if norm_id:
683 # if it was possible to normalise the id according to one of the schemas accepted in oc, add
684 # the id to the set of retrieved br ids for the citation.
685 all_br.add(norm_id)
686 creators = d.get("creator")
687 if creators:
688 for c in creators:
689 c_ids = c.get("identifiers")
690 if c_ids:
691 norm_orcids = {self.orcid_m.normalise(x.get("identifier"), include_prefix=True) for x in c_ids if x.get("schema") in {"ORCID", "orcid"}}
692 if norm_orcids:
693 # if it was possible to normalise any id according to orcid schema, add
694 # the norm_orcids to the set of retrieved ra ids for the citation.
695 all_ra.update(norm_orcids)
696 all_br = list(all_br)
697 all_ra = list(all_ra)
698 return all_br, all_ra
700 def get_redis_validity_list(self, id_list, redis_db):
701 ids = list(id_list)
702 if redis_db == "ra":
703 validity = self.RA_redis.mexists_as_set(ids)
704 return [ids[i] for i, v in enumerate(validity) if v]
705 elif redis_db == "br":
706 validity = self.BR_redis.mexists_as_set(ids)
707 return [ids[i] for i, v in enumerate(validity) if v]
708 else:
709 raise ValueError("redis_db must be either 'ra' or 'br'")