Coverage for oc_ds_converter / openaire / openaire_processing.py: 88%

448 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-06-12 21:23 +0000

1# SPDX-FileCopyrightText: 2023 Arianna Moretti <arianna.moretti4@unibo.it> 

2# SPDX-FileCopyrightText: 2023 Marta Soricetti <marta.soricetti@unibo.it> 

3# SPDX-FileCopyrightText: 2023-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# SPDX-License-Identifier: ISC 

6 

7import csv 

8import html 

9import json 

10import os 

11import os.path 

12import pathlib 

13import re 

14import warnings 

15from os.path import exists 

16from pathlib import Path 

17from re import search 

18 

19from bs4 import BeautifulSoup 

20 

21from oc_ds_converter.datasource.redis import FakeRedisWrapper, RedisDataSource 

22from oc_ds_converter.oc_idmanager.arxiv import ArXivManager 

23from oc_ds_converter.oc_idmanager.doi import DOIManager 

24from oc_ds_converter.oc_idmanager.oc_data_storage.redis_manager import RedisStorageManager 

25from oc_ds_converter.oc_idmanager.oc_data_storage.storage_manager import StorageManager 

26from oc_ds_converter.oc_idmanager.oc_data_storage.batch_manager import BatchManager 

27from oc_ds_converter.oc_idmanager.orcid import ORCIDManager 

28from oc_ds_converter.oc_idmanager.pmcid import PMCIDManager 

29from oc_ds_converter.oc_idmanager.pmid import PMIDManager 

30from oc_ds_converter.pubmed.get_publishers import ExtractPublisherDOI 

31from oc_ds_converter.ra_processor import RaProcessor 

32 

33warnings.filterwarnings("ignore", category=UserWarning, module='bs4') 

34 

35 

36class OpenaireProcessing(RaProcessor): 

37 def __init__(self, orcid_index: str | None = None, publishers_filepath_openaire: str | None = None, storage_manager: StorageManager | None = None, testing: bool = True, exclude_existing: bool = False): 

38 super(OpenaireProcessing, self).__init__(orcid_index) 

39 self.exclude_existing = exclude_existing 

40 self._testing = testing 

41 if storage_manager is None: 

42 self.storage_manager = RedisStorageManager(testing=testing) 

43 else: 

44 self.storage_manager = storage_manager 

45 

46 self.temporary_manager = BatchManager() 

47 

48 self.types_dict = { 

49 "Article": "journal article", 

50 "Part of book or chapter of book": "book chapter", 

51 "Preprint": "other", 

52 "Other literature type": "other", 

53 "Conference object": "proceedings", 

54 "Doctoral thesis": "dissertation", 

55 "Book": "book", 

56 "Thesis": "dissertation", 

57 "Research": "other", 

58 "Master thesis": "dissertation", 

59 "Report": "report", 

60 "Review": "other", 

61 "Contribution for newspaper or weekly magazine": "other", 

62 "Journal": "journal", 

63 "Presentation": "other", 

64 "Software Paper": "other", 

65 "External research report": "report", 

66 "Data Paper": "other", 

67 "Project deliverable": "other", 

68 "Bachelor thesis": "dissertation", 

69 "Project proposal": "other", 

70 "Newsletter": "other", 

71 "Data Management Plan": "data management plan", 

72 "Software": "computer program", 

73 "Dataset": "dataset", 

74 "Audiovisual": "dataset", 

75 "Image": "dataset", 

76 "Other dataset type": "dataset", 

77 "Film": "dataset", 

78 "UNKNOWN": "other", 

79 "Other ORP type": "other", 

80 "InteractiveResource": "other", 

81 "PhysicalObject": "other", 

82 "Collection": "other", 

83 "Patent": "other", 

84 "Project milestone": "other", 

85 "Clinical Trial": "other", 

86 "Bioentity": "other", 

87 "Sound": "other", 

88 } 

89 use_api = not testing 

90 self.doi_m = DOIManager(use_api_service=use_api, storage_manager=self.storage_manager, testing=testing) 

91 self.pmid_m = PMIDManager(use_api_service=use_api, storage_manager=self.storage_manager, testing=testing) 

92 self.pmc_m = PMCIDManager(use_api_service=use_api, storage_manager=self.storage_manager, testing=testing) 

93 self.arxiv_m = ArXivManager(use_api_service=use_api, storage_manager=self.storage_manager, testing=testing) 

94 

95 self.orcid_m = ORCIDManager(use_api_service=use_api, storage_manager=self.storage_manager, testing=testing) 

96 

97 self._id_man_dict = {"doi":self.doi_m, "pmid": self.pmid_m, "pmcid": self.pmc_m,"pmc": self.pmc_m, "arxiv":self.arxiv_m} 

98 

99 # Temporary storage managers : all data must be stored in tmp storage manager and passed all together to the 

100 # main storage_manager only once the full file is processed. Checks must be done both on tmp and in 

101 # storage_manager, so that in case the process breaks while processing a file which does not complete (so 

102 # without writing the final file) all the data concerning the ids are not stored. Otherwise, the ids saved in 

103 # a storage_manager db would be considered to have been processed and thus would be ignored by the process 

104 # and lost. 

105 

106 self.tmp_doi_m = DOIManager(use_api_service=use_api, storage_manager=self.temporary_manager, testing=testing) 

107 self.tmp_pmid_m = PMIDManager(use_api_service=use_api, storage_manager=self.temporary_manager, testing=testing) 

108 self.tmp_pmc_m = PMCIDManager(use_api_service=use_api, storage_manager=self.temporary_manager, testing=testing) 

109 self.tmp_arxiv_m = ArXivManager(use_api_service=use_api, storage_manager=self.temporary_manager, testing=testing) 

110 

111 self.tmp_orcid_m = ORCIDManager(use_api_service=use_api, storage_manager=self.temporary_manager, testing=testing) 

112 

113 self.tmp_id_man_dict = {"doi": self.tmp_doi_m, "pmid": self.tmp_pmid_m, "pmcid": self.tmp_pmc_m, "pmc": self.tmp_pmc_m, 

114 "arxiv": self.tmp_arxiv_m} 

115 

116 

117 self._doi_prefixes_publishers_dict = { 

118 "10.48550":{"publisher":"arxiv", "priority":1}, 

119 "doi:10.48550":{"publisher":"arxiv", "priority":1}, 

120 "10.6084":{"publisher":"figshare","priority":1}, 

121 "doi:10.6084":{"publisher":"figshare","priority":1}, 

122 "10.1184":{"publisher": "Carnegie Mellon University", "priority":2}, 

123 "doi:10.1184":{"publisher": "Carnegie Mellon University", "priority":2}, 

124 "10.25384":{"publisher":"sage", "priority":2}, 

125 "doi:10.25384":{"publisher":"sage", "priority":2}, 

126 "10.5281":{"publisher":"zenodo", "priority":3}, 

127 "doi:10.5281":{"publisher":"zenodo", "priority":3}, 

128 "10.5061":{"publisher":"dryad", "priority":4}, 

129 "doi:10.5061":{"publisher":"dryad", "priority":4}, 

130 "10.17605":{"publisher":"psyarxiv", "priority":5}, 

131 "doi:10.17605":{"publisher":"psyarxiv", "priority":5}, 

132 "10.31234": {"publisher":"psyarxiv", "priority":6}, 

133 "doi:10.31234": {"publisher":"psyarxiv", "priority":6}, 

134 } 

135 

136 if testing: 

137 self.BR_redis = FakeRedisWrapper() 

138 self.RA_redis = FakeRedisWrapper() 

139 else: 

140 self.BR_redis = RedisDataSource("DB-META-BR") 

141 self.RA_redis = RedisDataSource("DB-META-RA") 

142 

143 self._redis_values_ra = [] 

144 self._redis_values_br = [] 

145 

146 

147 if not publishers_filepath_openaire: 

148 

149 if not exists(os.path.join(pathlib.Path(__file__).parent.resolve(), "support_files")): 

150 os.makedirs(os.path.join(pathlib.Path(__file__).parent.resolve(), "support_files")) 

151 self.publishers_filepath = os.path.join(pathlib.Path(__file__).parent.resolve(), "support_files", 

152 "prefix_publishers.json") 

153 else: 

154 self.publishers_filepath = publishers_filepath_openaire 

155 

156 if os.path.exists(self.publishers_filepath): 

157 pfp = dict() 

158 csv_headers = ("id", "name", "prefix") 

159 if self.publishers_filepath.endswith(".csv"): 

160 with open(self.publishers_filepath, encoding="utf8") as f: 

161 csv_reader = csv.DictReader(f, csv_headers) 

162 for row in csv_reader: 

163 pfp[row["prefix"]] = {"name": row["name"], "crossref_member": row["id"]} 

164 self.publishers_filepath = self.publishers_filepath.replace(".csv", ".json") 

165 elif self.publishers_filepath.endswith(".json"): 

166 with open(self.publishers_filepath, encoding="utf8") as f: 

167 pfp = json.load(f) 

168 

169 if pfp: 

170 self.publisher_manager = ExtractPublisherDOI(pfp) 

171 else: 

172 self.publisher_manager = ExtractPublisherDOI({}) 

173 else: 

174 self.publisher_manager = ExtractPublisherDOI({}) 

175 with open(self.publishers_filepath, "w", encoding="utf8") as fdp: 

176 json.dump({}, fdp, ensure_ascii=False, indent=4) 

177 

178 def update_redis_values(self, br, ra): 

179 self._redis_values_br = br 

180 self._redis_values_ra = ra 

181 

182 def validated_as(self, id_dict): 

183 # Check if the validity was already retrieved and thus 

184 # a) if it is now saved either in the in-memory database, which only concerns data validated 

185 # during the current file processing; 

186 # b) or if it is now saved in the storage_manager database, which only concerns data validated 

187 # during the previous files processing. 

188 # In memory db is checked first because the dimension is smaller and the check is faster and 

189 # Because we assume that it is more likely to find the same ids in close positions, e.g.: same 

190 # citing id in several citations with different cited ids. 

191 

192 schema = id_dict["schema"].strip().lower() 

193 id = id_dict["identifier"] 

194 

195 if schema != "orcid": 

196 tmp_id_m = self.get_id_manager(schema, self.tmp_id_man_dict) 

197 if tmp_id_m is None: 

198 return None 

199 validity_value = tmp_id_m.validated_as_id(id) 

200 

201 if validity_value is None: 

202 id_m = self.get_id_manager(schema, self._id_man_dict) 

203 if id_m is None: 

204 return None 

205 validity_value = id_m.validated_as_id(id) 

206 return validity_value 

207 

208 else: 

209 validity_value = self.tmp_orcid_m.validated_as_id(id) 

210 if validity_value is None: 

211 validity_value = self.orcid_m.validated_as_id(id) 

212 return validity_value 

213 

214 def get_id_manager(self, schema_or_id, id_man_dict): 

215 """Given as input the string of a schema (e.g.:'pmid') and a dictionary mapping strings of 

216 the schemas to their id managers, the method returns the correct id manager. Note that each 

217 instance of the Preprocessing class needs its own instances of the id managers, in order to 

218 avoid conflicts while validating data""" 

219 if ":" in schema_or_id: 

220 split_id_prefix = schema_or_id.split(":") 

221 schema = split_id_prefix[0] 

222 else: 

223 schema = schema_or_id 

224 id_man = id_man_dict.get(schema) 

225 return id_man 

226 

227 def normalise_any_id(self, id_with_prefix: str) -> str | None: 

228 id_man = self.get_id_manager(id_with_prefix, self._id_man_dict) 

229 if id_man is None: 

230 return None 

231 id_no_pref = ":".join(id_with_prefix.split(":")[1:]) 

232 norm_id_w_pref = id_man.normalise(id_no_pref, include_prefix=True) 

233 return norm_id_w_pref 

234 

235 def get_norm_ids(self, entity): 

236 norm_ids = [] 

237 for e in entity: 

238 e_schema = e.get("schema").strip().lower() 

239 if e_schema in self._id_man_dict: 

240 e_id = self._id_man_dict[e_schema].normalise(e["identifier"], include_prefix=True) 

241 if e_id: 

242 dict_to_append = {"schema": e_schema, "identifier": e_id} 

243 if dict_to_append not in norm_ids: 

244 norm_ids.append(dict_to_append) 

245 return norm_ids 

246 

247 def dict_to_cache(self, dict_to_be_saved, path): 

248 path = Path(path) 

249 parent_dir_path = path.parent.absolute() 

250 if not os.path.exists(parent_dir_path): 

251 Path(parent_dir_path).mkdir(parents=True, exist_ok=True) 

252 with open(path, "w", encoding="utf-8") as fd: 

253 json.dump(dict_to_be_saved, fd, ensure_ascii=False, indent=4) 

254 

255 def csv_creator(self, item: dict) -> dict: 

256 #redis_br = item["redis_validity_lists"][0] 

257 #redis_ra = item["redis_validity_lists"][1] 

258 

259 #self.update_redis_values(redis_br, redis_ra) 

260 

261 row = dict() 

262 

263 doi = [] 

264 

265 keys = ['id', 'title', 'author', 'pub_date', 'venue', 'volume', 'issue', 'page', 'type', 

266 'publisher', 'editor'] 

267 for k in keys: 

268 row[k] = '' 

269 

270 attributes = item 

271 # row['type'] √ 

272 att_type = attributes.get("objectSubType") 

273 if att_type: 

274 map_type = self.types_dict.get(att_type) 

275 if not map_type: 

276 map_type = "other" 

277 else: 

278 map_type = "other" 

279 row['type'] = map_type 

280 

281 # row['id'] 

282 att_identifier_dict_of_lists = attributes.get("identifier") 

283 valid_ids_list = self.to_validated_id_list(att_identifier_dict_of_lists) 

284 

285 # Keep a doi for retrieving information related to its prefix (i.e.: publisher, RA..) only in the cases 

286 # where there is only one doi to refer to or where all the dois have the same prefix. 

287 if valid_ids_list: 

288 for id in valid_ids_list: 

289 if id.startswith("doi:"): 

290 doi.append(id[len("doi:"):]) 

291 row['id'] = ' '.join(valid_ids_list) 

292 else: 

293 return {} 

294 

295 

296 # row['title'] √ 

297 pub_title = "" 

298 att_title = attributes.get("title") 

299 if att_title: 

300 p_title = att_title 

301 soup = BeautifulSoup(p_title, 'html.parser') 

302 title_soup = soup.get_text().replace('\n', '') 

303 title_soup_space_replaced = ' '.join(title_soup.split()) 

304 title_soup_strip = title_soup_space_replaced.strip() 

305 clean_tit = html.unescape(title_soup_strip) 

306 pub_title = clean_tit if clean_tit else p_title 

307 

308 row['title'] = pub_title 

309 

310 # row['author'] √ 

311 agents_list = self.add_authors_to_agent_list(attributes, []) 

312 pref_dois = [x for x in doi if x.split("/")[0] not in self._doi_prefixes_publishers_dict] 

313 if doi: 

314 best_doi = pref_dois[0] if pref_dois else doi[0] 

315 else: 

316 best_doi = "" 

317 authors_strings_list, editors_string_list = self.get_agents_strings_list(best_doi, agents_list) 

318 row['author'] = '; '.join(authors_strings_list) 

319 

320 # row['pub_date'] √ 

321 dates = attributes.get("publicationDate") 

322 row['pub_date'] = str(dates) if dates else "" 

323 

324 # row['venue'] 

325 row['venue'] = "" 

326 

327 # row['volume'] 

328 row['volume'] = "" 

329 

330 # row['issue'] 

331 row['issue'] = "" 

332 

333 # row['page'] 

334 row['page'] = "" 

335 

336 # row['publisher'] √ 

337 att_publ = attributes.get("publisher") 

338 publ = "" 

339 if att_publ: 

340 publ = att_publ[0] 

341 publishers = self.get_publisher_name(doi, publ) 

342 

343 row['publisher'] = publishers 

344 

345 # row['editor'] 

346 row['editor'] = "" 

347 

348 try: 

349 return self.normalise_unicode(row) 

350 

351 except TypeError: 

352 print(row) 

353 raise(TypeError) 

354 

355 def get_publisher_name(self, doi_list: list, item: dict | str) -> str: 

356 ''' 

357 This function aims to return a publisher's name and id. If a mapping was provided, 

358 it is used to find the publisher's standardized name from its id or DOI prefix. 

359 

360 :params doi: the item's DOI 

361 :type doi_list: list 

362 :params item: the item's dictionary 

363 :type item: dict 

364 :returns: str -- The output is a string in the format 'NAME [SCHEMA:ID]', for example, 'American Medical Association (AMA) [crossref:10]'. If the id does not exist, the output is only the name. Finally, if there is no publisher, the output is an empty string. 

365 ''' 

366 if not item or not isinstance(item, dict): 

367 return "" 

368 elif "name" not in item: 

369 return "" 

370 

371 name_value = item["name"] 

372 publisher: str = name_value if isinstance(name_value, str) else "" 

373 

374 if publisher and doi_list: 

375 for doi in doi_list: 

376 prefix = doi.split('/')[0] if doi else "" 

377 if prefix: 

378 if prefix in self.publisher_manager._prefix_to_data_dict: 

379 prefix_data = self.publisher_manager.extract_publishers_v(doi, enable_extraagencies=False,get_all_prefix_data=True, skip_update=True) 

380 if prefix_data: 

381 member = prefix_data.get("crossref_member") if prefix_data.get("crossref_member") not in {"not found", None} else "" 

382 retrieved_publisher_name = prefix_data.get("name") if prefix_data.get("name") not in {"unidentified", None} else "" 

383 if isinstance(retrieved_publisher_name, str): 

384 if publisher.lower().strip() == retrieved_publisher_name.lower().strip(): 

385 return f'{publisher} [crossref:{member}]' if member else publisher 

386 

387 return publisher 

388 

389 def manage_arxiv_single_id(self, id_dict_list): 

390 result_dict_list = [] 

391 arxiv_id = "" 

392 is_arxiv = False 

393 ent = id_dict_list[0] 

394 schema = ent.get("schema") 

395 if isinstance(schema, str): 

396 schema = schema.strip().lower() 

397 if schema == "doi": 

398 id = ent.get("identifier") 

399 splitted_pref = id.split('/')[0] 

400 matches = re.findall(r"(10.\d{4,9})", splitted_pref) 

401 if not matches: 

402 return id_dict_list 

403 pref = matches[0] 

404 if pref == "10.48550": 

405 if id.startswith("doi:"): 

406 id = id[len("doi:"):] 

407 id_no_pref = id.replace(pref,"") 

408 

409 arxiv_id = self._id_man_dict["arxiv"].normalise(id_no_pref, include_prefix=True) 

410 if not arxiv_id: 

411 return None 

412 else: 

413 is_arxiv = True 

414 elif schema == "arxiv": 

415 id = ent.get("identifier") 

416 arxiv_id = self._id_man_dict["arxiv"].normalise(id, include_prefix=True) 

417 if not arxiv_id: 

418 return None 

419 else: 

420 is_arxiv = True 

421 else: 

422 return id_dict_list 

423 if is_arxiv: 

424 result_dict_list = [{"schema": "arxiv", "identifier": arxiv_id}] 

425 if not result_dict_list: 

426 return id_dict_list 

427 

428 return result_dict_list 

429 

430 def manage_doi_prefixes_priorities(self, id_dict_list): 

431 result_id_dict_list= [] 

432 priority_prefixes = [k for k,v in self._doi_prefixes_publishers_dict.items() if v.get("priority")==1] 

433 arxiv_or_figshare_dois = [x for x in id_dict_list if x.get("identifier").split("/")[0] in priority_prefixes] 

434 if len(arxiv_or_figshare_dois) == 1: 

435 id_dict = arxiv_or_figshare_dois[0] 

436 is_arxiv = self._doi_prefixes_publishers_dict[id_dict.get("identifier").split("/")[0]].get("publisher") == "arxiv" 

437 has_version = search(r"v\d+", id_dict.get("identifier")) 

438 if has_version: # It is necessarily a figshare doi (ARXIV have version only in arxiv id and not in arxiv dois) 

439 #√ 

440 return arxiv_or_figshare_dois 

441 else: 

442 if not is_arxiv: 

443 upd_id = id_dict.get("identifier") + "v1" 

444 upd_dict = {k:v for k,v in id_dict.items() if k!= "identifier"} 

445 upd_dict["identifier"] = upd_id 

446 result_id_dict_list.append(upd_dict) 

447 # √ 

448 return result_id_dict_list 

449 else: 

450 # √ 

451 return self.manage_arxiv_single_id([id_dict]) 

452 

453 elif len(arxiv_or_figshare_dois) > 1: 

454 versioned_arxiv_or_figshare_dois = [x for x in arxiv_or_figshare_dois if search(r"v\d+", x.get("identifier"))] 

455 if versioned_arxiv_or_figshare_dois: 

456 # √ 

457 return versioned_arxiv_or_figshare_dois 

458 else: 

459 for id_dict in arxiv_or_figshare_dois: 

460 if self._doi_prefixes_publishers_dict[id_dict.get("identifier").split("/")[0]].get("publisher") == "arxiv": 

461 # in order to avoid multiple ids of the same schema for the same entity without a reasonable expl. 

462 # √ 

463 return self.manage_arxiv_single_id([id_dict]) 

464 

465 for id_dict in arxiv_or_figshare_dois: 

466 if self._doi_prefixes_publishers_dict[id_dict.get("identifier").split("/")[0]].get("publisher") == "figshare": 

467 version = "v1" 

468 upd_dict = {k:v for k,v in id_dict.items() if k != "identifier"} 

469 upd_id = id_dict.get("identifier") + version 

470 upd_dict["identifier"] = upd_id 

471 result_id_dict_list.append(upd_dict) 

472 # √ 

473 return result_id_dict_list 

474 else: 

475 zenodo_ids_list = [x for x in id_dict_list if self._doi_prefixes_publishers_dict[x.get("identifier").split("/")[0]].get("publisher") == "zenodo"] 

476 if len(zenodo_ids_list) >= 2: 

477 list_of_id_n_str = [x["identifier"].replace("doi:", "").replace("10.5281/zenodo.", "") for x in zenodo_ids_list] 

478 list_of_id_n_int = [] 

479 for n in list_of_id_n_str: 

480 try: 

481 int_n = int(n) 

482 list_of_id_n_int.append(int_n) 

483 except ValueError: 

484 pass 

485 if list_of_id_n_int: 

486 last_assigned_id = str(max(list_of_id_n_int)) 

487 for id_dict in zenodo_ids_list: 

488 if id_dict.get("identifier").replace("doi:", "").replace("10.5281/zenodo.", "") == last_assigned_id: 

489 result_id_dict_list.append(id_dict) 

490 # √ 

491 return result_id_dict_list 

492 else: 

493 prefix_set = {x.get("identifier").split("/")[0] for x in id_dict_list} 

494 priorities = [self._doi_prefixes_publishers_dict[p]["priority"] for p in prefix_set] 

495 max_priority = min(priorities) 

496 

497 prefixes_w_max_priority = {k for k,v in self._doi_prefixes_publishers_dict.items() if v["priority"] == max_priority} 

498 

499 for id_dict in id_dict_list: 

500 if id_dict.get("identifier").split("/")[0] in prefixes_w_max_priority: 

501 norm_id = self.doi_m.normalise(id_dict["identifier"], include_prefix=True) 

502 if norm_id is None: 

503 continue 

504 #if self.BR_redis.get(norm_id): 

505 if norm_id in self._redis_values_br: 

506 result_id_dict_list.append(id_dict) 

507 return result_id_dict_list 

508 # if the id is not in redis db, validate it before appending 

509 elif self.tmp_doi_m.is_valid(norm_id): 

510 result_id_dict_list.append(id_dict) 

511 return result_id_dict_list 

512 

513 if not result_id_dict_list: 

514 

515 while id_dict_list and max_priority < 7: 

516 

517 id_dict_list = [x for x in id_dict_list if x["identifier"].split("/")[0] not in prefixes_w_max_priority] 

518 max_priority += 1 

519 prefixes_w_max_priority = {k for k, v in self._doi_prefixes_publishers_dict.items() if 

520 v["priority"] == max_priority} 

521 

522 for id_dict in id_dict_list: 

523 if id_dict.get("identifier").split("/")[0] in prefixes_w_max_priority: 

524 norm_id = self.doi_m.normalise(id_dict["identifier"], include_prefix=True) 

525 if norm_id is None: 

526 continue 

527 #if self.BR_redis.get(norm_id): 

528 if norm_id in self._redis_values_br: 

529 result_id_dict_list.append(id_dict) 

530 return result_id_dict_list 

531 # if the id is not in redis db, validate it before appending 

532 elif self.tmp_doi_m.is_valid(norm_id): 

533 result_id_dict_list.append(id_dict) 

534 return result_id_dict_list 

535 

536 return result_id_dict_list 

537 

538 def to_validated_id_list(self, id_dict_of_list): 

539 """this method takes in input a list of id dictionaries and returns a list valid and existent ids with prefixes. 

540 For each id, a first validation try is made by checking its presence in META db. If the id is not in META db yet, 

541 a second attempt is made by using the specific id-schema API""" 

542 valid_id_set = set([x["identifier"] for x in id_dict_of_list["valid"]]) 

543 to_be_processed_input = id_dict_of_list["to_be_val"] 

544 to_be_processed_id_dict_list = [] 

545 # If there is only an id, check whether it is either an arxiv id or an arxiv doi. In this cases, if there is a 

546 # versioned arxiv id, it is kept as such. Otherwise both the arxiv doi and the not versioned arxiv id are replaced 

547 # with the v1 version of the arxiv id. If it is not possible to retrieve an arxiv id from the only id which is 

548 # either declared as an arxiv id or starts with the arxiv doi prefix, return None and interrupt the process 

549 if len(valid_id_set) == 0: 

550 if len(to_be_processed_input) == 1: 

551 single_id_dict_list = self.manage_arxiv_single_id(to_be_processed_input) 

552 if single_id_dict_list: 

553 to_be_processed_id_dict_list = single_id_dict_list 

554 else: 

555 return 

556 elif len(to_be_processed_input)> 1: 

557 second_selection_list = [x for x in to_be_processed_input if x.get("schema") == "pmid" or (x.get("schema") =="doi" and x.get("identifier").split('/')[0] not in self._doi_prefixes_publishers_dict)] 

558 if second_selection_list: 

559 to_be_processed_id_dict_list = second_selection_list 

560 else: 

561 third_selection = [x for x in to_be_processed_input if x.get("schema") == "pmc" or x.get("schema") == "pmcid"] 

562 if third_selection: 

563 to_be_processed_id_dict_list = third_selection 

564 else: 

565 fourth_selection = [x for x in to_be_processed_input if x.get("schema") == "arxiv"] 

566 if fourth_selection: 

567 to_be_processed_id_dict_list = fourth_selection 

568 else: 

569 fifth_selection = [x for x in to_be_processed_input if x.get("schema") == "doi" and x.get("identifier").split('/')[0] in self._doi_prefixes_publishers_dict] 

570 if fifth_selection: 

571 to_be_processed_id_dict_list = self.manage_doi_prefixes_priorities(fifth_selection) 

572 

573 else: 

574 return None 

575 else: 

576 to_be_processed_id_dict_list = [x for x in to_be_processed_input if x.get("schema") == "pmid" or (x.get("schema") == "doi" and x.get("identifier").split('/')[0] not in self._doi_prefixes_publishers_dict)] 

577 

578 if to_be_processed_id_dict_list: 

579 for ent in to_be_processed_id_dict_list: 

580 schema = ent.get("schema") 

581 norm_id = ent.get("identifier") 

582 if schema is None or norm_id is None: 

583 continue 

584 tmp_id_man = self.get_id_manager(schema, self.tmp_id_man_dict) 

585 if tmp_id_man is None: 

586 continue 

587 if schema in {"pmid", "pmcid", "pmc", "arxiv", "doi"}: 

588 #if self.BR_redis.get(norm_id): 

589 if norm_id in self._redis_values_br: 

590 tmp_id_man.storage_manager.set_value(norm_id, True) #In questo modo l'id presente in redis viene inserito anche nello storage e risulta già 

591 # preso in considerazione negli step successivi 

592 valid_id_set.add(norm_id) 

593 # if the id is not in redis db, validate it before appending 

594 elif tmp_id_man.is_valid(norm_id):#In questo modo l'id presente in redis viene inserito anche nello storage e risulta già 

595 # preso in considerazione negli step successivi 

596 valid_id_set.add(norm_id) 

597 

598 valid_id_list = list(valid_id_set) 

599 return valid_id_list 

600 

601 def add_authors_to_agent_list(self, item: dict, ag_list: list) -> list: 

602 ''' 

603 This function returns the the agents list updated with the authors dictionaries, in the correct format. 

604 

605 :params item: the item's dictionary (attributes), ag_list: the agent list 

606 :type item: dict, ag_list: list 

607 

608 :returns: list the agents list updated with the authors dictionaries, in the correct format. 

609 ''' 

610 

611 agent_list = ag_list 

612 creators = item.get("creator") 

613 if creators: 

614 for author in creators: 

615 agent = {} 

616 agent["role"] = "author" 

617 agent["name"] = author.get("name") if author.get("name") else "" 

618 missing_names = [x for x in ["family", "given"] if x not in agent] 

619 for mn in missing_names: 

620 agent[mn] = "" 

621 all_ids = author.get("identifiers") 

622 orcid_id = self.find_openaire_orcid(all_ids) 

623 if orcid_id: 

624 agent["orcid"] = orcid_id 

625 agent_list.append(agent) 

626 

627 return agent_list 

628 

629 def find_openaire_orcid(self, all_author_ids, doi=None): 

630 orcid = "" 

631 if all_author_ids: 

632 for id in all_author_ids: 

633 schema = id.get("schema") 

634 identifier = id.get("identifier") 

635 if isinstance(schema, str): 

636 if schema.lower().strip() == "orcid": 

637 if isinstance(identifier, str): 

638 norm_orcid = self.orcid_m.normalise(identifier, include_prefix=True) 

639 if norm_orcid is None: 

640 continue 

641 ## Check orcid presence in memory and storage before validating the id 

642 validity_value_orcid = self.validated_as({"identifier":norm_orcid, "schema": schema}) 

643 if validity_value_orcid is True: 

644 orcid = norm_orcid 

645 elif validity_value_orcid is None: 

646 # Check in ORCID index using provided DOI before any API validation 

647 if doi: 

648 found_orcids = self.orcid_finder(doi) 

649 if found_orcids and norm_orcid.split(':')[1] in found_orcids: 

650 self.tmp_orcid_m.storage_manager.set_value(norm_orcid, True) 

651 orcid = norm_orcid 

652 

653 # If not found in index, check Redis and API 

654 if not orcid: 

655 if norm_orcid in self._redis_values_ra: 

656 orcid = norm_orcid 

657 # if the id is not in redis db, validate it before appending 

658 elif self.tmp_orcid_m.is_valid(norm_orcid): 

659 orcid = norm_orcid 

660 

661 return orcid 

662 

663 def memory_to_storage(self): 

664 kv_in_memory = self.temporary_manager.get_validity_list_of_tuples() 

665 self.storage_manager.set_multi_value(kv_in_memory) 

666 self.temporary_manager.delete_storage() 

667 

668 def extract_all_ids(self, citation): 

669 all_br = set() 

670 all_ra = set() 

671 

672 d1 = citation["source"] 

673 d2 = citation["target"] 

674 

675 source_and_target = [d1, d2] 

676 

677 #for both source and target entity 

678 for d in source_and_target: 

679 # get all the br ids 

680 br_ids_dicts = d["identifier"] 

681 #and use the correct id manager to normalise 

682 for br_id in br_ids_dicts: 

683 schema = br_id.get("schema").strip().lower() 

684 if schema in self._id_man_dict: 

685 norm_id = self._id_man_dict[schema].normalise(br_id["identifier"], include_prefix=True) 

686 if norm_id: 

687 # if it was possible to normalise the id according to one of the schemas accepted in oc, add 

688 # the id to the set of retrieved br ids for the citation. 

689 all_br.add(norm_id) 

690 creators = d.get("creator") 

691 if creators: 

692 for c in creators: 

693 c_ids = c.get("identifiers") 

694 if c_ids: 

695 norm_orcids = {self.orcid_m.normalise(x.get("identifier"), include_prefix=True) for x in c_ids if x.get("schema") in {"ORCID", "orcid"}} 

696 if norm_orcids: 

697 # if it was possible to normalise any id according to orcid schema, add 

698 # the norm_orcids to the set of retrieved ra ids for the citation. 

699 all_ra.update(norm_orcids) 

700 all_br = list(all_br) 

701 all_ra = list(all_ra) 

702 return all_br, all_ra 

703 

704 def get_redis_validity_list(self, id_list, redis_db): 

705 ids = list(id_list) 

706 if redis_db == "ra": 

707 validity = self.RA_redis.mexists_as_set(ids) 

708 return [ids[i] for i, v in enumerate(validity) if v] 

709 elif redis_db == "br": 

710 validity = self.BR_redis.mexists_as_set(ids) 

711 return [ids[i] for i, v in enumerate(validity) if v] 

712 else: 

713 raise ValueError("redis_db must be either 'ra' or 'br'") 

714