Coverage for oc_ds_converter / openaire / openaire_processing.py: 91%

444 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-25 18:06 +0000

1# SPDX-FileCopyrightText: 2023 Arianna Moretti <arianna.moretti4@unibo.it> 

2# SPDX-FileCopyrightText: 2023 Marta Soricetti <marta.soricetti@unibo.it> 

3# SPDX-FileCopyrightText: 2023-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# SPDX-License-Identifier: ISC 

6 

7import csv 

8import html 

9import json 

10import os 

11import os.path 

12import pathlib 

13import re 

14import warnings 

15from os.path import exists 

16from pathlib import Path 

17from re import search 

18 

19from bs4 import BeautifulSoup 

20 

21from oc_ds_converter.datasource.redis import FakeRedisWrapper, RedisDataSource 

22from oc_ds_converter.oc_idmanager.arxiv import ArXivManager 

23from oc_ds_converter.oc_idmanager.doi import DOIManager 

24from oc_ds_converter.oc_idmanager.oc_data_storage.redis_manager import RedisStorageManager 

25from oc_ds_converter.oc_idmanager.oc_data_storage.storage_manager import StorageManager 

26from oc_ds_converter.oc_idmanager.oc_data_storage.batch_manager import BatchManager 

27from oc_ds_converter.oc_idmanager.orcid import ORCIDManager 

28from oc_ds_converter.oc_idmanager.pmcid import PMCIDManager 

29from oc_ds_converter.oc_idmanager.pmid import PMIDManager 

30from oc_ds_converter.pubmed.get_publishers import ExtractPublisherDOI 

31from oc_ds_converter.ra_processor import RaProcessor 

32 

33warnings.filterwarnings("ignore", category=UserWarning, module='bs4') 

34 

35 

36class OpenaireProcessing(RaProcessor): 

37 def __init__(self, orcid_index: str | None = None, publishers_filepath_openaire: str | None = None, storage_manager: StorageManager | None = None, testing: bool = True, exclude_existing: bool = False): 

38 super(OpenaireProcessing, self).__init__(orcid_index) 

39 self.exclude_existing = exclude_existing 

40 self._testing = testing 

41 if storage_manager is None: 

42 self.storage_manager = RedisStorageManager(testing=testing) 

43 else: 

44 self.storage_manager = storage_manager 

45 

46 self.temporary_manager = BatchManager() 

47 

48 self.types_dict = { 

49 "Article": "journal article", 

50 "Part of book or chapter of book": "book chapter", 

51 "Preprint": "other", 

52 "Other literature type": "other", 

53 "Conference object": "proceedings", 

54 "Doctoral thesis": "dissertation", 

55 "Book": "book", 

56 "Thesis": "dissertation", 

57 "Research": "other", 

58 "Master thesis": "dissertation", 

59 "Report": "report", 

60 "Review": "other", 

61 "Contribution for newspaper or weekly magazine": "other", 

62 "Journal": "journal", 

63 "Presentation": "other", 

64 "Software Paper": "other", 

65 "External research report": "report", 

66 "Data Paper": "other", 

67 "Project deliverable": "other", 

68 "Bachelor thesis": "dissertation", 

69 "Project proposal": "other", 

70 "Newsletter": "other", 

71 "Data Management Plan": "data management plan", 

72 "Software": "computer program", 

73 "Dataset": "dataset", 

74 "Audiovisual": "dataset", 

75 "Image": "dataset", 

76 "Other dataset type": "dataset", 

77 "Film": "dataset", 

78 "UNKNOWN": "other", 

79 "Other ORP type": "other", 

80 "InteractiveResource": "other", 

81 "PhysicalObject": "other", 

82 "Collection": "other", 

83 "Patent": "other", 

84 "Project milestone": "other", 

85 "Clinical Trial": "other", 

86 "Bioentity": "other", 

87 "Sound": "other", 

88 } 

89 self.doi_m = DOIManager(storage_manager=self.storage_manager, testing=testing) 

90 self.pmid_m = PMIDManager(storage_manager=self.storage_manager, testing=testing) 

91 self.pmc_m = PMCIDManager(storage_manager=self.storage_manager, testing=testing) 

92 self.arxiv_m = ArXivManager(storage_manager=self.storage_manager, testing=testing) 

93 

94 self.orcid_m = ORCIDManager(storage_manager=self.storage_manager, testing=testing) 

95 

96 self._id_man_dict = {"doi":self.doi_m, "pmid": self.pmid_m, "pmcid": self.pmc_m,"pmc": self.pmc_m, "arxiv":self.arxiv_m} 

97 

98 # Temporary storage managers : all data must be stored in tmp storage manager and passed all together to the 

99 # main storage_manager only once the full file is processed. Checks must be done both on tmp and in 

100 # storage_manager, so that in case the process breaks while processing a file which does not complete (so 

101 # without writing the final file) all the data concerning the ids are not stored. Otherwise, the ids saved in 

102 # a storage_manager db would be considered to have been processed and thus would be ignored by the process 

103 # and lost. 

104 

105 self.tmp_doi_m = DOIManager(storage_manager=self.temporary_manager, testing=testing) 

106 self.tmp_pmid_m = PMIDManager(storage_manager=self.temporary_manager, testing=testing) 

107 self.tmp_pmc_m = PMCIDManager(storage_manager=self.temporary_manager, testing=testing) 

108 self.tmp_arxiv_m = ArXivManager(storage_manager=self.temporary_manager, testing=testing) 

109 

110 self.tmp_orcid_m = ORCIDManager(storage_manager=self.temporary_manager, testing=testing) 

111 

112 self.tmp_id_man_dict = {"doi": self.tmp_doi_m, "pmid": self.tmp_pmid_m, "pmcid": self.tmp_pmc_m, "pmc": self.tmp_pmc_m, 

113 "arxiv": self.tmp_arxiv_m} 

114 

115 

116 self._doi_prefixes_publishers_dict = { 

117 "10.48550":{"publisher":"arxiv", "priority":1}, 

118 "doi:10.48550":{"publisher":"arxiv", "priority":1}, 

119 "10.6084":{"publisher":"figshare","priority":1}, 

120 "doi:10.6084":{"publisher":"figshare","priority":1}, 

121 "10.1184":{"publisher": "Carnegie Mellon University", "priority":2}, 

122 "doi:10.1184":{"publisher": "Carnegie Mellon University", "priority":2}, 

123 "10.25384":{"publisher":"sage", "priority":2}, 

124 "doi:10.25384":{"publisher":"sage", "priority":2}, 

125 "10.5281":{"publisher":"zenodo", "priority":3}, 

126 "doi:10.5281":{"publisher":"zenodo", "priority":3}, 

127 "10.5061":{"publisher":"dryad", "priority":4}, 

128 "doi:10.5061":{"publisher":"dryad", "priority":4}, 

129 "10.17605":{"publisher":"psyarxiv", "priority":5}, 

130 "doi:10.17605":{"publisher":"psyarxiv", "priority":5}, 

131 "10.31234": {"publisher":"psyarxiv", "priority":6}, 

132 "doi:10.31234": {"publisher":"psyarxiv", "priority":6}, 

133 } 

134 

135 if testing: 

136 self.BR_redis = FakeRedisWrapper() 

137 self.RA_redis = FakeRedisWrapper() 

138 else: 

139 self.BR_redis = RedisDataSource("DB-META-BR") 

140 self.RA_redis = RedisDataSource("DB-META-RA") 

141 

142 self._redis_values_ra = [] 

143 self._redis_values_br = [] 

144 

145 

146 if not publishers_filepath_openaire: 

147 

148 if not exists(os.path.join(pathlib.Path(__file__).parent.resolve(), "support_files")): 

149 os.makedirs(os.path.join(pathlib.Path(__file__).parent.resolve(), "support_files")) 

150 self.publishers_filepath = os.path.join(pathlib.Path(__file__).parent.resolve(), "support_files", 

151 "prefix_publishers.json") 

152 else: 

153 self.publishers_filepath = publishers_filepath_openaire 

154 

155 if os.path.exists(self.publishers_filepath): 

156 pfp = dict() 

157 csv_headers = ("id", "name", "prefix") 

158 if self.publishers_filepath.endswith(".csv"): 

159 with open(self.publishers_filepath, encoding="utf8") as f: 

160 csv_reader = csv.DictReader(f, csv_headers) 

161 for row in csv_reader: 

162 pfp[row["prefix"]] = {"name": row["name"], "crossref_member": row["id"]} 

163 self.publishers_filepath = self.publishers_filepath.replace(".csv", ".json") 

164 elif self.publishers_filepath.endswith(".json"): 

165 with open(self.publishers_filepath, encoding="utf8") as f: 

166 pfp = json.load(f) 

167 

168 if pfp: 

169 self.publisher_manager = ExtractPublisherDOI(pfp) 

170 else: 

171 self.publisher_manager = ExtractPublisherDOI({}) 

172 else: 

173 self.publisher_manager = ExtractPublisherDOI({}) 

174 with open(self.publishers_filepath, "w", encoding="utf8") as fdp: 

175 json.dump({}, fdp, ensure_ascii=False, indent=4) 

176 

177 def update_redis_values(self, br, ra): 

178 self._redis_values_br = br 

179 self._redis_values_ra = ra 

180 

181 def validated_as(self, id_dict): 

182 # Check if the validity was already retrieved and thus 

183 # a) if it is now saved either in the in-memory database, which only concerns data validated 

184 # during the current file processing; 

185 # b) or if it is now saved in the storage_manager database, which only concerns data validated 

186 # during the previous files processing. 

187 # In memory db is checked first because the dimension is smaller and the check is faster and 

188 # Because we assume that it is more likely to find the same ids in close positions, e.g.: same 

189 # citing id in several citations with different cited ids. 

190 

191 schema = id_dict["schema"].strip().lower() 

192 id = id_dict["identifier"] 

193 

194 if schema != "orcid": 

195 tmp_id_m = self.get_id_manager(schema, self.tmp_id_man_dict) 

196 if tmp_id_m is None: 

197 return None 

198 validity_value = tmp_id_m.validated_as_id(id) 

199 

200 if validity_value is None: 

201 id_m = self.get_id_manager(schema, self._id_man_dict) 

202 if id_m is None: 

203 return None 

204 validity_value = id_m.validated_as_id(id) 

205 return validity_value 

206 

207 else: 

208 validity_value = self.tmp_orcid_m.validated_as_id(id) 

209 if validity_value is None: 

210 validity_value = self.orcid_m.validated_as_id(id) 

211 return validity_value 

212 

213 def get_id_manager(self, schema_or_id, id_man_dict): 

214 """Given as input the string of a schema (e.g.:'pmid') and a dictionary mapping strings of 

215 the schemas to their id managers, the method returns the correct id manager. Note that each 

216 instance of the Preprocessing class needs its own instances of the id managers, in order to 

217 avoid conflicts while validating data""" 

218 if ":" in schema_or_id: 

219 split_id_prefix = schema_or_id.split(":") 

220 schema = split_id_prefix[0] 

221 else: 

222 schema = schema_or_id 

223 id_man = id_man_dict.get(schema) 

224 return id_man 

225 

226 def normalise_any_id(self, id_with_prefix: str) -> str | None: 

227 id_man = self.get_id_manager(id_with_prefix, self._id_man_dict) 

228 if id_man is None: 

229 return None 

230 id_no_pref = ":".join(id_with_prefix.split(":")[1:]) 

231 norm_id_w_pref = id_man.normalise(id_no_pref, include_prefix=True) 

232 return norm_id_w_pref 

233 

234 def get_norm_ids(self, entity): 

235 norm_ids = [] 

236 for e in entity: 

237 e_schema = e.get("schema").strip().lower() 

238 if e_schema in self._id_man_dict: 

239 e_id = self._id_man_dict[e_schema].normalise(e["identifier"], include_prefix=True) 

240 if e_id: 

241 dict_to_append = {"schema": e_schema, "identifier": e_id} 

242 if dict_to_append not in norm_ids: 

243 norm_ids.append(dict_to_append) 

244 return norm_ids 

245 

246 def dict_to_cache(self, dict_to_be_saved, path): 

247 path = Path(path) 

248 parent_dir_path = path.parent.absolute() 

249 if not os.path.exists(parent_dir_path): 

250 Path(parent_dir_path).mkdir(parents=True, exist_ok=True) 

251 with open(path, "w", encoding="utf-8") as fd: 

252 json.dump(dict_to_be_saved, fd, ensure_ascii=False, indent=4) 

253 

254 def csv_creator(self, item: dict) -> dict: 

255 #redis_br = item["redis_validity_lists"][0] 

256 #redis_ra = item["redis_validity_lists"][1] 

257 

258 #self.update_redis_values(redis_br, redis_ra) 

259 

260 row = dict() 

261 

262 doi = [] 

263 

264 keys = ['id', 'title', 'author', 'pub_date', 'venue', 'volume', 'issue', 'page', 'type', 

265 'publisher', 'editor'] 

266 for k in keys: 

267 row[k] = '' 

268 

269 attributes = item 

270 # row['type'] √ 

271 att_type = attributes.get("objectSubType") 

272 if att_type: 

273 map_type = self.types_dict.get(att_type) 

274 if not map_type: 

275 map_type = "other" 

276 else: 

277 map_type = "other" 

278 row['type'] = map_type 

279 

280 # row['id'] 

281 att_identifier_dict_of_lists = attributes.get("identifier") 

282 valid_ids_list = self.to_validated_id_list(att_identifier_dict_of_lists) 

283 

284 # Keep a doi for retrieving information related to its prefix (i.e.: publisher, RA..) only in the cases 

285 # where there is only one doi to refer to or where all the dois have the same prefix. 

286 if valid_ids_list: 

287 for id in valid_ids_list: 

288 if id.startswith("doi:"): 

289 doi.append(id[len("doi:"):]) 

290 row['id'] = ' '.join(valid_ids_list) 

291 else: 

292 return {} 

293 

294 

295 # row['title'] √ 

296 pub_title = "" 

297 att_title = attributes.get("title") 

298 if att_title: 

299 p_title = att_title 

300 soup = BeautifulSoup(p_title, 'html.parser') 

301 title_soup = soup.get_text().replace('\n', '') 

302 title_soup_space_replaced = ' '.join(title_soup.split()) 

303 title_soup_strip = title_soup_space_replaced.strip() 

304 clean_tit = html.unescape(title_soup_strip) 

305 pub_title = clean_tit if clean_tit else p_title 

306 

307 row['title'] = pub_title 

308 

309 # row['author'] √ 

310 agents_list = self.add_authors_to_agent_list(attributes, []) 

311 pref_dois = [x for x in doi if x.split("/")[0] not in self._doi_prefixes_publishers_dict] 

312 if doi: 

313 best_doi = pref_dois[0] if pref_dois else doi[0] 

314 else: 

315 best_doi = "" 

316 authors_strings_list, editors_string_list = self.get_agents_strings_list(best_doi, agents_list) 

317 row['author'] = '; '.join(authors_strings_list) 

318 

319 # row['pub_date'] √ 

320 dates = attributes.get("publicationDate") 

321 row['pub_date'] = str(dates) if dates else "" 

322 

323 # row['venue'] 

324 row['venue'] = "" 

325 

326 # row['volume'] 

327 row['volume'] = "" 

328 

329 # row['issue'] 

330 row['issue'] = "" 

331 

332 # row['page'] 

333 row['page'] = "" 

334 

335 # row['publisher'] √ 

336 att_publ = attributes.get("publisher") 

337 publ = "" 

338 if att_publ: 

339 publ = att_publ[0] 

340 publishers = self.get_publisher_name(doi, publ) 

341 

342 row['publisher'] = publishers 

343 

344 # row['editor'] 

345 row['editor'] = "" 

346 

347 try: 

348 return self.normalise_unicode(row) 

349 

350 except TypeError: 

351 print(row) 

352 raise(TypeError) 

353 

354 def get_publisher_name(self, doi_list: list, item: dict | str) -> str: 

355 ''' 

356 This function aims to return a publisher's name and id. If a mapping was provided, 

357 it is used to find the publisher's standardized name from its id or DOI prefix. 

358 

359 :params doi: the item's DOI 

360 :type doi_list: list 

361 :params item: the item's dictionary 

362 :type item: dict 

363 :returns: str -- The output is a string in the format 'NAME [SCHEMA:ID]', for example, 'American Medical Association (AMA) [crossref:10]'. If the id does not exist, the output is only the name. Finally, if there is no publisher, the output is an empty string. 

364 ''' 

365 if not item or not isinstance(item, dict): 

366 return "" 

367 elif "name" not in item: 

368 return "" 

369 

370 name_value = item["name"] 

371 publisher: str = name_value if isinstance(name_value, str) else "" 

372 

373 if publisher and doi_list: 

374 for doi in doi_list: 

375 prefix = doi.split('/')[0] if doi else "" 

376 if prefix: 

377 if prefix in self.publisher_manager._prefix_to_data_dict: 

378 prefix_data = self.publisher_manager.extract_publishers_v(doi, enable_extraagencies=False,get_all_prefix_data=True, skip_update=True) 

379 if prefix_data: 

380 member = prefix_data.get("crossref_member") if prefix_data.get("crossref_member") not in {"not found", None} else "" 

381 retrieved_publisher_name = prefix_data.get("name") if prefix_data.get("name") not in {"unidentified", None} else "" 

382 if isinstance(retrieved_publisher_name, str): 

383 if publisher.lower().strip() == retrieved_publisher_name.lower().strip(): 

384 return f'{publisher} [crossref:{member}]' if member else publisher 

385 

386 return publisher 

387 

388 def manage_arxiv_single_id(self, id_dict_list): 

389 result_dict_list = [] 

390 arxiv_id = "" 

391 is_arxiv = False 

392 ent = id_dict_list[0] 

393 schema = ent.get("schema") 

394 if isinstance(schema, str): 

395 schema = schema.strip().lower() 

396 if schema == "doi": 

397 id = ent.get("identifier") 

398 splitted_pref = id.split('/')[0] 

399 pref = re.findall(r"(10.\d{4,9})", splitted_pref)[0] 

400 if pref == "10.48550": 

401 if id.startswith("doi:"): 

402 id = id[len("doi:"):] 

403 id_no_pref = id.replace(pref,"") 

404 

405 arxiv_id = self._id_man_dict["arxiv"].normalise(id_no_pref, include_prefix=True) 

406 if not arxiv_id: 

407 return None 

408 else: 

409 is_arxiv = True 

410 elif schema == "arxiv": 

411 id = ent.get("identifier") 

412 arxiv_id = self._id_man_dict["arxiv"].normalise(id, include_prefix=True) 

413 if not arxiv_id: 

414 return None 

415 else: 

416 is_arxiv = True 

417 else: 

418 return id_dict_list 

419 if is_arxiv: 

420 result_dict_list = [{"schema": "arxiv", "identifier": arxiv_id}] 

421 if not result_dict_list: 

422 return id_dict_list 

423 

424 return result_dict_list 

425 

426 def manage_doi_prefixes_priorities(self, id_dict_list): 

427 result_id_dict_list= [] 

428 priority_prefixes = [k for k,v in self._doi_prefixes_publishers_dict.items() if v.get("priority")==1] 

429 arxiv_or_figshare_dois = [x for x in id_dict_list if x.get("identifier").split("/")[0] in priority_prefixes] 

430 if len(arxiv_or_figshare_dois) == 1: 

431 id_dict = arxiv_or_figshare_dois[0] 

432 is_arxiv = self._doi_prefixes_publishers_dict[id_dict.get("identifier").split("/")[0]].get("publisher") == "arxiv" 

433 has_version = search(r"v\d+", id_dict.get("identifier")) 

434 if has_version: # It is necessarily a figshare doi (ARXIV have version only in arxiv id and not in arxiv dois) 

435 #√ 

436 return arxiv_or_figshare_dois 

437 else: 

438 if not is_arxiv: 

439 upd_id = id_dict.get("identifier") + "v1" 

440 upd_dict = {k:v for k,v in id_dict.items() if k!= "identifier"} 

441 upd_dict["identifier"] = upd_id 

442 result_id_dict_list.append(upd_dict) 

443 # √ 

444 return result_id_dict_list 

445 else: 

446 # √ 

447 return self.manage_arxiv_single_id([id_dict]) 

448 

449 elif len(arxiv_or_figshare_dois) > 1: 

450 versioned_arxiv_or_figshare_dois = [x for x in arxiv_or_figshare_dois if search(r"v\d+", x.get("identifier"))] 

451 if versioned_arxiv_or_figshare_dois: 

452 # √ 

453 return versioned_arxiv_or_figshare_dois 

454 else: 

455 for id_dict in arxiv_or_figshare_dois: 

456 if self._doi_prefixes_publishers_dict[id_dict.get("identifier").split("/")[0]].get("publisher") == "arxiv": 

457 # in order to avoid multiple ids of the same schema for the same entity without a reasonable expl. 

458 # √ 

459 return self.manage_arxiv_single_id([id_dict]) 

460 

461 for id_dict in arxiv_or_figshare_dois: 

462 if self._doi_prefixes_publishers_dict[id_dict.get("identifier").split("/")[0]].get("publisher") == "figshare": 

463 version = "v1" 

464 upd_dict = {k:v for k,v in id_dict.items() if k != "identifier"} 

465 upd_id = id_dict.get("identifier") + version 

466 upd_dict["identifier"] = upd_id 

467 result_id_dict_list.append(upd_dict) 

468 # √ 

469 return result_id_dict_list 

470 else: 

471 zenodo_ids_list = [x for x in id_dict_list if self._doi_prefixes_publishers_dict[x.get("identifier").split("/")[0]].get("publisher") == "zenodo"] 

472 if len(zenodo_ids_list) >= 2: 

473 list_of_id_n_str = [x["identifier"].replace("doi:", "").replace("10.5281/zenodo.", "") for x in zenodo_ids_list] 

474 list_of_id_n_int = [] 

475 for n in list_of_id_n_str: 

476 try: 

477 int_n = int(n) 

478 list_of_id_n_int.append(int_n) 

479 except ValueError: 

480 pass 

481 if list_of_id_n_int: 

482 last_assigned_id = str(max(list_of_id_n_int)) 

483 for id_dict in zenodo_ids_list: 

484 if id_dict.get("identifier").replace("doi:", "").replace("10.5281/zenodo.", "") == last_assigned_id: 

485 result_id_dict_list.append(id_dict) 

486 # √ 

487 return result_id_dict_list 

488 else: 

489 prefix_set = {x.get("identifier").split("/")[0] for x in id_dict_list} 

490 priorities = [self._doi_prefixes_publishers_dict[p]["priority"] for p in prefix_set] 

491 max_priority = min(priorities) 

492 

493 prefixes_w_max_priority = {k for k,v in self._doi_prefixes_publishers_dict.items() if v["priority"] == max_priority} 

494 

495 for id_dict in id_dict_list: 

496 if id_dict.get("identifier").split("/")[0] in prefixes_w_max_priority: 

497 norm_id = self.doi_m.normalise(id_dict["identifier"], include_prefix=True) 

498 if norm_id is None: 

499 continue 

500 #if self.BR_redis.get(norm_id): 

501 if norm_id in self._redis_values_br: 

502 result_id_dict_list.append(id_dict) 

503 return result_id_dict_list 

504 # if the id is not in redis db, validate it before appending 

505 elif self.tmp_doi_m.is_valid(norm_id): 

506 result_id_dict_list.append(id_dict) 

507 return result_id_dict_list 

508 

509 if not result_id_dict_list: 

510 

511 while id_dict_list and max_priority < 7: 

512 

513 id_dict_list = [x for x in id_dict_list if x["identifier"].split("/")[0] not in prefixes_w_max_priority] 

514 max_priority += 1 

515 prefixes_w_max_priority = {k for k, v in self._doi_prefixes_publishers_dict.items() if 

516 v["priority"] == max_priority} 

517 

518 for id_dict in id_dict_list: 

519 if id_dict.get("identifier").split("/")[0] in prefixes_w_max_priority: 

520 norm_id = self.doi_m.normalise(id_dict["identifier"], include_prefix=True) 

521 if norm_id is None: 

522 continue 

523 #if self.BR_redis.get(norm_id): 

524 if norm_id in self._redis_values_br: 

525 result_id_dict_list.append(id_dict) 

526 return result_id_dict_list 

527 # if the id is not in redis db, validate it before appending 

528 elif self.tmp_doi_m.is_valid(norm_id): 

529 result_id_dict_list.append(id_dict) 

530 return result_id_dict_list 

531 

532 return result_id_dict_list 

533 

534 def to_validated_id_list(self, id_dict_of_list): 

535 """this method takes in input a list of id dictionaries and returns a list valid and existent ids with prefixes. 

536 For each id, a first validation try is made by checking its presence in META db. If the id is not in META db yet, 

537 a second attempt is made by using the specific id-schema API""" 

538 valid_id_set = set([x["identifier"] for x in id_dict_of_list["valid"]]) 

539 to_be_processed_input = id_dict_of_list["to_be_val"] 

540 to_be_processed_id_dict_list = [] 

541 # If there is only an id, check whether it is either an arxiv id or an arxiv doi. In this cases, if there is a 

542 # versioned arxiv id, it is kept as such. Otherwise both the arxiv doi and the not versioned arxiv id are replaced 

543 # with the v1 version of the arxiv id. If it is not possible to retrieve an arxiv id from the only id which is 

544 # either declared as an arxiv id or starts with the arxiv doi prefix, return None and interrupt the process 

545 if len(valid_id_set) == 0: 

546 if len(to_be_processed_input) == 1: 

547 single_id_dict_list = self.manage_arxiv_single_id(to_be_processed_input) 

548 if single_id_dict_list: 

549 to_be_processed_id_dict_list = single_id_dict_list 

550 else: 

551 return 

552 elif len(to_be_processed_input)> 1: 

553 second_selection_list = [x for x in to_be_processed_input if x.get("schema") == "pmid" or (x.get("schema") =="doi" and x.get("identifier").split('/')[0] not in self._doi_prefixes_publishers_dict)] 

554 if second_selection_list: 

555 to_be_processed_id_dict_list = second_selection_list 

556 else: 

557 third_selection = [x for x in to_be_processed_input if x.get("schema") == "pmc" or x.get("schema") == "pmcid"] 

558 if third_selection: 

559 to_be_processed_id_dict_list = third_selection 

560 else: 

561 fourth_selection = [x for x in to_be_processed_input if x.get("schema") == "arxiv"] 

562 if fourth_selection: 

563 to_be_processed_id_dict_list = fourth_selection 

564 else: 

565 fifth_selection = [x for x in to_be_processed_input if x.get("schema") == "doi" and x.get("identifier").split('/')[0] in self._doi_prefixes_publishers_dict] 

566 if fifth_selection: 

567 to_be_processed_id_dict_list = self.manage_doi_prefixes_priorities(fifth_selection) 

568 

569 else: 

570 return None 

571 else: 

572 to_be_processed_id_dict_list = [x for x in to_be_processed_input if x.get("schema") == "pmid" or (x.get("schema") == "doi" and x.get("identifier").split('/')[0] not in self._doi_prefixes_publishers_dict)] 

573 

574 if to_be_processed_id_dict_list: 

575 for ent in to_be_processed_id_dict_list: 

576 schema = ent.get("schema") 

577 norm_id = ent.get("identifier") 

578 if schema is None or norm_id is None: 

579 continue 

580 tmp_id_man = self.get_id_manager(schema, self.tmp_id_man_dict) 

581 if tmp_id_man is None: 

582 continue 

583 if schema in {"pmid", "pmcid", "pmc", "arxiv", "doi"}: 

584 #if self.BR_redis.get(norm_id): 

585 if norm_id in self._redis_values_br: 

586 tmp_id_man.storage_manager.set_value(norm_id, True) #In questo modo l'id presente in redis viene inserito anche nello storage e risulta già 

587 # preso in considerazione negli step successivi 

588 valid_id_set.add(norm_id) 

589 # if the id is not in redis db, validate it before appending 

590 elif tmp_id_man.is_valid(norm_id):#In questo modo l'id presente in redis viene inserito anche nello storage e risulta già 

591 # preso in considerazione negli step successivi 

592 valid_id_set.add(norm_id) 

593 

594 valid_id_list = list(valid_id_set) 

595 return valid_id_list 

596 

597 def add_authors_to_agent_list(self, item: dict, ag_list: list) -> list: 

598 ''' 

599 This function returns the the agents list updated with the authors dictionaries, in the correct format. 

600 

601 :params item: the item's dictionary (attributes), ag_list: the agent list 

602 :type item: dict, ag_list: list 

603 

604 :returns: list the agents list updated with the authors dictionaries, in the correct format. 

605 ''' 

606 

607 agent_list = ag_list 

608 creators = item.get("creator") 

609 if creators: 

610 for author in creators: 

611 agent = {} 

612 agent["role"] = "author" 

613 agent["name"] = author.get("name") if author.get("name") else "" 

614 missing_names = [x for x in ["family", "given"] if x not in agent] 

615 for mn in missing_names: 

616 agent[mn] = "" 

617 all_ids = author.get("identifiers") 

618 orcid_id = self.find_openaire_orcid(all_ids) 

619 if orcid_id: 

620 agent["orcid"] = orcid_id 

621 agent_list.append(agent) 

622 

623 return agent_list 

624 

625 def find_openaire_orcid(self, all_author_ids, doi=None): 

626 orcid = "" 

627 if all_author_ids: 

628 for id in all_author_ids: 

629 schema = id.get("schema") 

630 identifier = id.get("identifier") 

631 if isinstance(schema, str): 

632 if schema.lower().strip() == "orcid": 

633 if isinstance(identifier, str): 

634 norm_orcid = self.orcid_m.normalise(identifier, include_prefix=True) 

635 if norm_orcid is None: 

636 continue 

637 ## Check orcid presence in memory and storage before validating the id 

638 validity_value_orcid = self.validated_as({"identifier":norm_orcid, "schema": schema}) 

639 if validity_value_orcid is True: 

640 orcid = norm_orcid 

641 elif validity_value_orcid is None: 

642 # Check in ORCID index using provided DOI before any API validation 

643 if doi: 

644 found_orcids = self.orcid_finder(doi) 

645 if found_orcids and norm_orcid.split(':')[1] in found_orcids: 

646 self.tmp_orcid_m.storage_manager.set_value(norm_orcid, True) 

647 orcid = norm_orcid 

648 

649 # If not found in index, check Redis and API 

650 if not orcid: 

651 if norm_orcid in self._redis_values_ra: 

652 orcid = norm_orcid 

653 # if the id is not in redis db, validate it before appending 

654 elif self.tmp_orcid_m.is_valid(norm_orcid): 

655 orcid = norm_orcid 

656 

657 return orcid 

658 

659 def memory_to_storage(self): 

660 kv_in_memory = self.temporary_manager.get_validity_list_of_tuples() 

661 self.storage_manager.set_multi_value(kv_in_memory) 

662 self.temporary_manager.delete_storage() 

663 

664 def extract_all_ids(self, citation): 

665 all_br = set() 

666 all_ra = set() 

667 

668 d1 = citation["source"] 

669 d2 = citation["target"] 

670 

671 source_and_target = [d1, d2] 

672 

673 #for both source and target entity 

674 for d in source_and_target: 

675 # get all the br ids 

676 br_ids_dicts = d["identifier"] 

677 #and use the correct id manager to normalise 

678 for br_id in br_ids_dicts: 

679 schema = br_id.get("schema").strip().lower() 

680 if schema in self._id_man_dict: 

681 norm_id = self._id_man_dict[schema].normalise(br_id["identifier"], include_prefix=True) 

682 if norm_id: 

683 # if it was possible to normalise the id according to one of the schemas accepted in oc, add 

684 # the id to the set of retrieved br ids for the citation. 

685 all_br.add(norm_id) 

686 creators = d.get("creator") 

687 if creators: 

688 for c in creators: 

689 c_ids = c.get("identifiers") 

690 if c_ids: 

691 norm_orcids = {self.orcid_m.normalise(x.get("identifier"), include_prefix=True) for x in c_ids if x.get("schema") in {"ORCID", "orcid"}} 

692 if norm_orcids: 

693 # if it was possible to normalise any id according to orcid schema, add 

694 # the norm_orcids to the set of retrieved ra ids for the citation. 

695 all_ra.update(norm_orcids) 

696 all_br = list(all_br) 

697 all_ra = list(all_ra) 

698 return all_br, all_ra 

699 

700 def get_redis_validity_list(self, id_list, redis_db): 

701 ids = list(id_list) 

702 if redis_db == "ra": 

703 validity = self.RA_redis.mexists_as_set(ids) 

704 return [ids[i] for i, v in enumerate(validity) if v] 

705 elif redis_db == "br": 

706 validity = self.BR_redis.mexists_as_set(ids) 

707 return [ids[i] for i, v in enumerate(validity) if v] 

708 else: 

709 raise ValueError("redis_db must be either 'ra' or 'br'") 

710