Coverage for oc_meta/core/curator.py: 92%

982 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-12-20 08:55 +0000

1#!/usr/bin/python 

2# -*- coding: utf-8 -*- 

3# Copyright 2019 Silvio Peroni <essepuntato@gmail.com> 

4# Copyright 2019-2020 Fabio Mariani <fabio.mariani555@gmail.com> 

5# Copyright 2021 Simone Persiani <iosonopersia@gmail.com> 

6# Copyright 2021-2022 Arcangelo Massari <arcangelo.massari@unibo.it> 

7# 

8# Permission to use, copy, modify, and/or distribute this software for any purpose 

9# with or without fee is hereby granted, provided that the above copyright notice 

10# and this permission notice appear in all copies. 

11# 

12# THE SOFTWARE IS PROVIDED 'AS IS' AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

13# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

14# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, 

15# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, 

16# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 

17# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS 

18# SOFTWARE. 

19 

20from __future__ import annotations 

21 

22import os 

23import re 

24from contextlib import nullcontext 

25from typing import Dict, List, Tuple 

26 

27from oc_meta.constants import CONTAINER_EDITOR_TYPES 

28from oc_meta.lib.cleaner import Cleaner 

29from oc_meta.lib.file_manager import * 

30from oc_meta.lib.finder import * 

31from oc_meta.lib.master_of_regex import * 

32from oc_ocdm.counter_handler.redis_counter_handler import RedisCounterHandler 

33 

34 

35class Curator: 

36 

37 def __init__( 

38 self, 

39 data: List[dict], 

40 ts: str, 

41 prov_config: str, 

42 counter_handler: RedisCounterHandler, 

43 base_iri: str = "https://w3id.org/oc/meta", 

44 prefix: str = "060", 

45 separator: str = None, 

46 valid_dois_cache: dict = dict(), 

47 settings: dict | None = None, 

48 silencer: list = [], 

49 meta_config_path: str = None, 

50 timer=None, 

51 ): 

52 self.timer = timer 

53 self.settings = settings or {} 

54 self.everything_everywhere_allatonce = Graph() 

55 self.finder = ResourceFinder( 

56 ts, 

57 base_iri, 

58 self.everything_everywhere_allatonce, 

59 settings=settings, 

60 meta_config_path=meta_config_path, 

61 ) 

62 self.base_iri = base_iri 

63 self.prov_config = prov_config 

64 self.separator = separator 

65 # Preliminary pass to clear volume and issue if id is present but venue is missing 

66 for row in data: 

67 if row["id"] and (row["volume"] or row["issue"]): 

68 if not row["venue"]: 

69 row["volume"] = "" 

70 row["issue"] = "" 

71 if not row["type"]: 

72 row["type"] = "journal article" 

73 self.data = [ 

74 {field: value.strip() for field, value in row.items()} 

75 for row in data 

76 if is_a_valid_row(row) 

77 ] 

78 self.prefix = prefix 

79 # Redis counter handler 

80 self.counter_handler = counter_handler 

81 self.brdict = {} 

82 self.radict: Dict[str, Dict[str, list]] = {} 

83 self.ardict: Dict[str, Dict[str, list]] = {} 

84 self.vvi = {} # Venue, Volume, Issue 

85 self.idra = {} # key id; value metaid of id related to ra 

86 self.idbr = {} # key id; value metaid of id related to br 

87 self.rameta = dict() 

88 self.brmeta = dict() 

89 self.armeta = dict() 

90 self.remeta = dict() 

91 self.wnb_cnt = 0 # wannabe counter 

92 self.rowcnt = 0 

93 self.log = dict() 

94 self.valid_dois_cache = valid_dois_cache 

95 self.preexisting_entities = set() 

96 self.silencer = silencer 

97 

98 def _timed(self, name: str): 

99 if self.timer: 

100 return self.timer.timer(name) 

101 return nullcontext() 

102 

103 def collect_identifiers(self, valid_dois_cache): 

104 all_metavals = set() 

105 all_idslist = set() 

106 all_vvis = set() 

107 for row in self.data: 

108 metavals, idslist, vvis = self.extract_identifiers_and_metavals( 

109 row, valid_dois_cache=valid_dois_cache 

110 ) 

111 all_metavals.update(metavals) 

112 all_idslist.update(idslist) 

113 all_vvis.update(vvis) 

114 return all_metavals, all_idslist, all_vvis 

115 

116 def extract_identifiers_and_metavals( 

117 self, row, valid_dois_cache 

118 ) -> Tuple[set, set, set]: 

119 metavals = set() 

120 identifiers = set() 

121 vvis = set() 

122 venue_ids = set() 

123 venue_metaid = None 

124 

125 if row["id"]: 

126 idslist, metaval = self.clean_id_list( 

127 self.split_identifiers(row["id"]), 

128 br=True, 

129 valid_dois_cache=valid_dois_cache, 

130 ) 

131 id_metaval = f"omid:br/{metaval}" if metaval else "" 

132 if id_metaval: 

133 metavals.add(id_metaval) 

134 if idslist: 

135 identifiers.update(idslist) 

136 

137 fields_with_an_id = [ 

138 (field, re.search(name_and_ids, row[field]).group(2).split()) 

139 for field in ["author", "editor", "publisher", "venue", "volume", "issue"] 

140 if re.search(name_and_ids, row[field]) 

141 ] 

142 for field, field_ids in fields_with_an_id: 

143 br = field in ["venue", "volume", "issue"] 

144 field_idslist, field_metaval = self.clean_id_list( 

145 field_ids, br=br, valid_dois_cache=valid_dois_cache 

146 ) 

147 if field_metaval: 

148 field_metaval = ( 

149 f"omid:br/{field_metaval}" if br else f"omid:ra/{field_metaval}" 

150 ) 

151 else: 

152 field_metaval = "" 

153 if field_metaval: 

154 metavals.add(field_metaval) 

155 if field == "venue": 

156 venue_metaid = field_metaval 

157 if field_idslist: 

158 venue_ids.update(field_idslist) 

159 else: 

160 if field_idslist: 

161 identifiers.update(field_idslist) 

162 

163 if (venue_metaid or venue_ids) and (row["volume"] or row["issue"]): 

164 vvi = (row["volume"], row["issue"], venue_metaid, tuple(sorted(venue_ids))) 

165 vvis.add(vvi) 

166 

167 return metavals, identifiers, vvis 

168 

169 def split_identifiers(self, field_value): 

170 if self.separator: 

171 return re.sub(colon_and_spaces, ":", field_value).split(self.separator) 

172 else: 

173 return re.split( 

174 one_or_more_spaces, re.sub(colon_and_spaces, ":", field_value) 

175 ) 

176 

177 def curator(self, filename: str = None, path_csv: str = None): 

178 # Phase 1: Collect identifiers and SPARQL prefetch 

179 with self._timed("curation__collect_identifiers"): 

180 metavals, identifiers, vvis = self.collect_identifiers( 

181 valid_dois_cache=self.valid_dois_cache 

182 ) 

183 self.finder.get_everything_about_res( 

184 metavals=metavals, identifiers=identifiers, vvis=vvis 

185 ) 

186 

187 # Phase 2: Clean ID (loop over all rows) 

188 with self._timed("curation__clean_id"): 

189 for row in self.data: 

190 self.log[self.rowcnt] = { 

191 "id": {}, 

192 "title": {}, 

193 "author": {}, 

194 "venue": {}, 

195 "editor": {}, 

196 "publisher": {}, 

197 "page": {}, 

198 "volume": {}, 

199 "issue": {}, 

200 "pub_date": {}, 

201 "type": {}, 

202 } 

203 self.clean_id(row) 

204 self.rowcnt += 1 

205 

206 # Phase 3: Merge duplicate entities 

207 with self._timed("curation__merge_duplicates"): 

208 self.merge_duplicate_entities() 

209 self.clean_metadata_without_id() 

210 

211 # Phase 4: Clean VVI (venue/volume/issue) 

212 with self._timed("curation__clean_vvi"): 

213 self.rowcnt = 0 

214 for row in self.data: 

215 self.clean_vvi(row) 

216 self.rowcnt += 1 

217 

218 # Phase 5: Clean RA (author + publisher + editor aggregated) 

219 with self._timed("curation__clean_ra"): 

220 self.rowcnt = 0 

221 for row in self.data: 

222 self.clean_ra(row, "author") 

223 self.clean_ra(row, "publisher") 

224 self.clean_ra(row, "editor") 

225 self.rowcnt += 1 

226 

227 # Phase 6: Finalize (preexisting + meta_maker + enrich + indexer) 

228 with self._timed("curation__finalize"): 

229 self.get_preexisting_entities() 

230 self.meta_maker() 

231 self.log = self.log_update() 

232 self.enrich() 

233 # Remove duplicates 

234 self.data = list({v["id"]: v for v in self.data}.values()) 

235 self.filename = filename 

236 self.indexer(path_csv=path_csv) 

237 

238 # ID 

239 def clean_id(self, row: Dict[str, str]) -> None: 

240 """ 

241 The 'clean id()' function is executed for each CSV row. 

242 In this process, any duplicates are detected by the IDs in the 'id' column. 

243 For each line, a wannabeID or, if the bibliographic resource was found in the triplestore, 

244 a MetaID is assigned. 

245 Finally, this method enrich and clean the fields related to the 

246 title, venue, volume, issue, page, publication date and type. 

247 

248 :params row: a dictionary representing a CSV row 

249 :type row: Dict[str, str] 

250 :returns: None -- This method modifies the input CSV row without returning it. 

251 """ 

252 if row["title"]: 

253 name = Cleaner(row["title"]).clean_title( 

254 self.settings.get("normalize_titles") 

255 ) 

256 else: 

257 name = "" 

258 metaval_ids_list = [] 

259 if row["id"]: 

260 if self.separator: 

261 idslist = re.sub(colon_and_spaces, ":", row["id"]).split(self.separator) 

262 else: 

263 idslist = re.split( 

264 one_or_more_spaces, re.sub(colon_and_spaces, ":", row["id"]) 

265 ) 

266 idslist, metaval = self.clean_id_list( 

267 idslist, br=True, valid_dois_cache=self.valid_dois_cache 

268 ) 

269 id_metaval = f"omid:br/{metaval}" if metaval else "" 

270 metaval_ids_list.append((id_metaval, idslist)) 

271 fields_with_an_id = [ 

272 (field, re.search(name_and_ids, row[field]).group(2).split()) 

273 for field in ["author", "editor", "publisher", "venue", "volume", "issue"] 

274 if re.search(name_and_ids, row[field]) 

275 ] 

276 for field, field_ids in fields_with_an_id: 

277 if field in ["author", "editor", "publisher"]: 

278 br = False 

279 elif field in ["venue", "volume", "issue"]: 

280 br = True 

281 field_idslist, field_metaval = self.clean_id_list( 

282 field_ids, br=br, valid_dois_cache=self.valid_dois_cache 

283 ) 

284 if field_metaval: 

285 field_metaval = ( 

286 f"omid:br/{field_metaval}" if br else f"omid:ra/{field_metaval}" 

287 ) 

288 else: 

289 field_metaval = "" 

290 metaval_ids_list.append((field_metaval, field_idslist)) 

291 if row["id"]: 

292 metaval = self.id_worker( 

293 "id", 

294 name, 

295 idslist, 

296 metaval, 

297 ra_ent=False, 

298 br_ent=True, 

299 vvi_ent=False, 

300 publ_entity=False, 

301 ) 

302 else: 

303 metaval = self.new_entity(self.brdict, name) 

304 row["title"] = self.brdict[metaval]["title"] 

305 row["id"] = metaval 

306 

307 def clean_metadata_without_id(self): 

308 for row in self.data: 

309 # page 

310 if row["page"]: 

311 row["page"] = Cleaner(row["page"]).normalize_hyphens() 

312 # date 

313 if row["pub_date"]: 

314 date = Cleaner(row["pub_date"]).normalize_hyphens() 

315 date = Cleaner(date).clean_date() 

316 row["pub_date"] = date 

317 # type 

318 if row["type"]: 

319 entity_type = " ".join((row["type"].lower()).split()) 

320 if entity_type == "edited book" or entity_type == "monograph": 

321 entity_type = "book" 

322 elif ( 

323 entity_type == "report series" 

324 or entity_type == "standard series" 

325 or entity_type == "proceedings series" 

326 ): 

327 entity_type = "series" 

328 elif entity_type == "posted content": 

329 entity_type = "web content" 

330 if entity_type in { 

331 "abstract", 

332 "archival document", 

333 "audio document", 

334 "book", 

335 "book chapter", 

336 "book part", 

337 "book section", 

338 "book series", 

339 "book set", 

340 "computer program", 

341 "data file", 

342 "data management plan", 

343 "dataset", 

344 "dissertation", 

345 "editorial", 

346 "journal", 

347 "journal article", 

348 "journal editorial", 

349 "journal issue", 

350 "journal volume", 

351 "newspaper", 

352 "newspaper article", 

353 "newspaper editorial", 

354 "newspaper issue", 

355 "peer review", 

356 "preprint", 

357 "presentation", 

358 "proceedings", 

359 "proceedings article", 

360 "proceedings series", 

361 "reference book", 

362 "reference entry", 

363 "retraction notice", 

364 "series", 

365 "report", 

366 "standard", 

367 "web content", 

368 }: 

369 row["type"] = entity_type 

370 else: 

371 row["type"] = "" 

372 

373 # VVI 

374 def clean_vvi(self, row: Dict[str, str]) -> None: 

375 """ 

376 This method performs the deduplication process for venues, volumes and issues. 

377 The acquired information is stored in the 'vvi' dictionary, that has the following format: :: 

378 

379 { 

380 VENUE_IDENTIFIER: { 

381 'issue': {SEQUENCE_IDENTIFIER: {'id': META_ID}}, 

382 'volume': { 

383 SEQUENCE_IDENTIFIER: { 

384 'id': META_ID, 

385 'issue' {SEQUENCE_IDENTIFIER: {'id': META_ID}} 

386 } 

387 } 

388 } 

389 } 

390 

391 { 

392 '4416': { 

393 'issue': {}, 

394 'volume': { 

395 '166': {'id': '4388', 'issue': {'4': {'id': '4389'}}}, 

396 '172': {'id': '4434', 

397 'issue': { 

398 '22': {'id': '4435'}, 

399 '20': {'id': '4436'}, 

400 '21': {'id': '4437'}, 

401 '19': {'id': '4438'} 

402 } 

403 } 

404 } 

405 } 

406 } 

407 

408 :params row: a dictionary representing a CSV row 

409 :type row: Dict[str, str] 

410 :returns: None -- This method modifies the input CSV row without returning it. 

411 """ 

412 if row["type"] not in { 

413 "journal article", 

414 "journal volume", 

415 "journal issue", 

416 } and (row["volume"] or row["issue"]): 

417 row["volume"] = "" 

418 row["issue"] = "" 

419 Cleaner.clean_volume_and_issue(row=row) 

420 vol_meta = None 

421 br_type = row["type"] 

422 volume = row["volume"] 

423 issue = row["issue"] 

424 br_id = row["id"] 

425 venue = row["venue"] 

426 # Venue 

427 if venue: 

428 # The data must be invalidated, because the resource is journal but a volume or an issue have also been specified 

429 if br_type == "journal" and (volume or issue): 

430 row["venue"] = "" 

431 row["volume"] = "" 

432 row["issue"] = "" 

433 venue_id = re.search(name_and_ids, venue) 

434 if venue_id: 

435 name = Cleaner(venue_id.group(1)).clean_title( 

436 self.settings.get("normalize_titles") 

437 ) 

438 venue_id = venue_id.group(2) 

439 if self.separator: 

440 idslist = re.sub(colon_and_spaces, ":", venue_id).split( 

441 self.separator 

442 ) 

443 else: 

444 idslist = re.split( 

445 one_or_more_spaces, re.sub(colon_and_spaces, ":", venue_id) 

446 ) 

447 idslist, metaval = self.clean_id_list( 

448 idslist, br=True, valid_dois_cache=self.valid_dois_cache 

449 ) 

450 

451 metaval = self.id_worker( 

452 "venue", 

453 name, 

454 idslist, 

455 metaval, 

456 ra_ent=False, 

457 br_ent=True, 

458 vvi_ent=True, 

459 publ_entity=False, 

460 ) 

461 if metaval not in self.vvi: 

462 ts_vvi = None 

463 if "wannabe" not in metaval: 

464 ts_vvi = self.finder.retrieve_venue_from_local_graph(metaval) 

465 if "wannabe" in metaval or not ts_vvi: 

466 self.vvi[metaval] = dict() 

467 self.vvi[metaval]["volume"] = dict() 

468 self.vvi[metaval]["issue"] = dict() 

469 elif ts_vvi: 

470 self.vvi[metaval] = ts_vvi 

471 else: 

472 name = Cleaner(venue).clean_title(self.settings.get("normalize_titles")) 

473 metaval = self.new_entity(self.brdict, name) 

474 self.vvi[metaval] = dict() 

475 self.vvi[metaval]["volume"] = dict() 

476 self.vvi[metaval]["issue"] = dict() 

477 row["venue"] = metaval 

478 

479 # Volume 

480 if volume and (br_type == "journal issue" or br_type == "journal article"): 

481 if volume in self.vvi[metaval]["volume"]: 

482 vol_meta = self.vvi[metaval]["volume"][volume]["id"] 

483 else: 

484 vol_meta = self.new_entity(self.brdict, "") 

485 self.vvi[metaval]["volume"][volume] = dict() 

486 self.vvi[metaval]["volume"][volume]["id"] = vol_meta 

487 self.vvi[metaval]["volume"][volume]["issue"] = dict() 

488 elif volume and br_type == "journal volume": 

489 # The data must be invalidated, because the resource is a journal volume but an issue has also been specified 

490 if issue: 

491 row["volume"] = "" 

492 row["issue"] = "" 

493 else: 

494 vol_meta = br_id 

495 self.volume_issue( 

496 vol_meta, self.vvi[metaval]["volume"], volume, row 

497 ) 

498 

499 # Issue 

500 if issue and br_type == "journal article": 

501 row["issue"] = issue 

502 if vol_meta: 

503 if issue not in self.vvi[metaval]["volume"][volume]["issue"]: 

504 issue_meta = self.new_entity(self.brdict, "") 

505 self.vvi[metaval]["volume"][volume]["issue"][issue] = dict() 

506 self.vvi[metaval]["volume"][volume]["issue"][issue][ 

507 "id" 

508 ] = issue_meta 

509 else: 

510 if issue not in self.vvi[metaval]["issue"]: 

511 issue_meta = self.new_entity(self.brdict, "") 

512 self.vvi[metaval]["issue"][issue] = dict() 

513 self.vvi[metaval]["issue"][issue]["id"] = issue_meta 

514 elif issue and br_type == "journal issue": 

515 issue_meta = br_id 

516 if vol_meta: 

517 self.volume_issue( 

518 issue_meta, 

519 self.vvi[metaval]["volume"][volume]["issue"], 

520 issue, 

521 row, 

522 ) 

523 else: 

524 self.volume_issue( 

525 issue_meta, self.vvi[metaval]["issue"], issue, row 

526 ) 

527 

528 else: 

529 row["venue"] = "" 

530 row["volume"] = "" 

531 row["issue"] = "" 

532 

533 # RA 

534 def clean_ra(self, row, col_name): 

535 """ 

536 This method performs the deduplication process for responsible agents (authors, publishers and editors). 

537 

538 :params row: a dictionary representing a CSV row 

539 :type row: Dict[str, str] 

540 :params col_name: the CSV column name. It can be 'author', 'publisher', or 'editor' 

541 :type col_name: str 

542 :returns: None -- This method modifies self.ardict, self.radict, and self.idra, and returns None. 

543 """ 

544 

545 def get_br_metaval_to_check(row, col_name): 

546 if col_name == "editor": 

547 return get_edited_br_metaid(row, row["id"], row["venue"]) 

548 else: 

549 return row["id"] 

550 

551 def get_br_metaval(br_metaval_to_check): 

552 if br_metaval_to_check in self.brdict or br_metaval_to_check in self.vvi: 

553 return br_metaval_to_check 

554 return [ 

555 id 

556 for id in self.brdict 

557 if br_metaval_to_check in self.brdict[id]["others"] 

558 ][0] 

559 

560 def initialize_ardict_entry(br_metaval): 

561 if br_metaval not in self.ardict: 

562 self.ardict[br_metaval] = {"author": [], "editor": [], "publisher": []} 

563 

564 def initialize_sequence(br_metaval, col_name): 

565 sequence = [] 

566 if "wannabe" in br_metaval: 

567 sequence = [] 

568 else: 

569 sequence_found = self.finder.retrieve_ra_sequence_from_br_meta( 

570 br_metaval, col_name 

571 ) 

572 if sequence_found: 

573 sequence = [] 

574 for agent in sequence_found: 

575 for ar_metaid in agent: 

576 ra_metaid = agent[ar_metaid][2] 

577 sequence.append(tuple((ar_metaid, ra_metaid))) 

578 if ra_metaid not in self.radict: 

579 self.radict[ra_metaid] = dict() 

580 self.radict[ra_metaid]["ids"] = list() 

581 self.radict[ra_metaid]["others"] = list() 

582 self.radict[ra_metaid]["title"] = agent[ar_metaid][0] 

583 for identifier in agent[ar_metaid][1]: 

584 # other ids after meta 

585 id_metaid = identifier[0] 

586 literal = identifier[1] 

587 if id_metaid not in self.idra: 

588 self.idra[literal] = id_metaid 

589 if literal not in self.radict[ra_metaid]["ids"]: 

590 self.radict[ra_metaid]["ids"].append(literal) 

591 self.ardict[br_metaval][col_name].extend(sequence) 

592 else: 

593 sequence = [] 

594 return sequence 

595 

596 def parse_ra_list(row): 

597 ra_list = re.split(semicolon_in_people_field, row[col_name]) 

598 ra_list = Cleaner.clean_ra_list(ra_list) 

599 return ra_list 

600 

601 def process_individual_ra(ra, sequence): 

602 new_elem_seq = True 

603 ra_id = None 

604 ra_id_match = re.search(name_and_ids, ra) 

605 if ra_id_match: 

606 cleaner = Cleaner(ra_id_match.group(1)) 

607 name = cleaner.clean_name() 

608 ra_id = ra_id_match.group(2) 

609 else: 

610 cleaner = Cleaner(ra) 

611 name = cleaner.clean_name() 

612 if not ra_id and sequence: 

613 for _, ra_metaid in sequence: 

614 if self.radict[ra_metaid]["title"] == name: 

615 ra_id = "omid:ra/" + str(ra_metaid) 

616 new_elem_seq = False 

617 break 

618 return ra_id, name, new_elem_seq 

619 

620 if not row[col_name]: 

621 return 

622 

623 br_metaval_to_check = get_br_metaval_to_check(row, col_name) 

624 br_metaval = get_br_metaval(br_metaval_to_check) 

625 initialize_ardict_entry(br_metaval) 

626 

627 sequence = self.ardict[br_metaval].get(col_name, []) 

628 if not sequence: 

629 sequence = initialize_sequence(br_metaval, col_name) 

630 if col_name in self.silencer and sequence: 

631 return 

632 

633 ra_list = parse_ra_list(row) 

634 new_sequence = list() 

635 change_order = False 

636 

637 for pos, ra in enumerate(ra_list): 

638 ra_id, name, new_elem_seq = process_individual_ra(ra, sequence) 

639 if ra_id: 

640 if self.separator: 

641 ra_id_list = re.sub(colon_and_spaces, ":", ra_id).split( 

642 self.separator 

643 ) 

644 else: 

645 ra_id_list = re.split( 

646 one_or_more_spaces, re.sub(colon_and_spaces, ":", ra_id) 

647 ) 

648 if sequence: 

649 ar_ra = None 

650 for ps, el in enumerate(sequence): 

651 ra_metaid = el[1] 

652 for literal in ra_id_list: 

653 if literal in self.radict[ra_metaid]["ids"]: 

654 if ps != pos: 

655 change_order = True 

656 new_elem_seq = False 

657 if "wannabe" not in ra_metaid: 

658 ar_ra = ra_metaid 

659 for pos, literal_value in enumerate(ra_id_list): 

660 if "omid" in literal_value: 

661 ra_id_list[pos] = "" 

662 break 

663 ra_id_list = list(filter(None, ra_id_list)) 

664 ra_id_list.append("omid:ra/" + ar_ra) 

665 if not ar_ra: 

666 # new element 

667 for ar_metaid, ra_metaid in sequence: 

668 if self.radict[ra_metaid]["title"] == name: 

669 new_elem_seq = False 

670 if "wannabe" not in ra_metaid: 

671 ar_ra = ra_metaid 

672 for pos, i in enumerate(ra_id_list): 

673 if "omid" in i: 

674 ra_id_list[pos] = "" 

675 break 

676 ra_id_list = list(filter(None, ra_id_list)) 

677 ra_id_list.append("omid:ra/" + ar_ra) 

678 if col_name == "publisher": 

679 ra_id_list, metaval = self.clean_id_list( 

680 ra_id_list, br=False, valid_dois_cache=self.valid_dois_cache 

681 ) 

682 metaval = self.id_worker( 

683 "publisher", 

684 name, 

685 ra_id_list, 

686 metaval, 

687 ra_ent=True, 

688 br_ent=False, 

689 vvi_ent=False, 

690 publ_entity=True, 

691 ) 

692 else: 

693 ra_id_list, metaval = self.clean_id_list( 

694 ra_id_list, br=False, valid_dois_cache=self.valid_dois_cache 

695 ) 

696 metaval = self.id_worker( 

697 col_name, 

698 name, 

699 ra_id_list, 

700 metaval, 

701 ra_ent=True, 

702 br_ent=False, 

703 vvi_ent=False, 

704 publ_entity=False, 

705 ) 

706 if col_name != "publisher" and metaval in self.radict: 

707 full_name: str = self.radict[metaval]["title"] 

708 if "," in name and "," in full_name: 

709 first_name = name.split(",")[1].strip() 

710 if ( 

711 not full_name.split(",")[1].strip() and first_name 

712 ): # first name found! 

713 given_name = full_name.split(",")[0] 

714 self.radict[metaval]["title"] = ( 

715 given_name + ", " + first_name 

716 ) 

717 else: 

718 metaval = self.new_entity(self.radict, name) 

719 if new_elem_seq: 

720 role = self.prefix + str(self._add_number("ar")) 

721 new_sequence.append(tuple((role, metaval))) 

722 if change_order: 

723 self.log[self.rowcnt][col_name][ 

724 "Info" 

725 ] = "New RA sequence proposed: refused" 

726 sequence.extend(new_sequence) 

727 self.ardict[br_metaval][col_name] = sequence 

728 

729 @staticmethod 

730 def clean_id_list( 

731 id_list: List[str], br: bool, valid_dois_cache: dict = dict() 

732 ) -> Tuple[list, str]: 

733 """ 

734 Clean IDs in the input list and check if there is a MetaID. 

735 

736 :params: id_list: a list of IDs 

737 :type: id_list: List[str] 

738 :params: br: True if the IDs in id_list refer to bibliographic resources, False otherwise 

739 :type: br: bool 

740 :returns: Tuple[list, str]: -- it returns a two-elements tuple, where the first element is the list of cleaned IDs, while the second is a MetaID if any was found. 

741 """ 

742 pattern = "br/" if br else "ra/" 

743 metaid = "" 

744 id_list = list(filter(None, id_list)) 

745 clean_list = list() 

746 

747 for elem in id_list: 

748 if elem in clean_list: 

749 continue 

750 elem = Cleaner(elem).normalize_hyphens() 

751 identifier = elem.split(":", 1) 

752 schema = identifier[0].lower() 

753 value = identifier[1] 

754 

755 if schema == "omid": 

756 metaid = value.replace(pattern, "") 

757 else: 

758 normalized_id = Cleaner(elem).normalize_id( 

759 valid_dois_cache=valid_dois_cache 

760 ) 

761 if normalized_id: 

762 clean_list.append(normalized_id) 

763 

764 how_many_meta = [i for i in id_list if i.lower().startswith("omid")] 

765 if len(how_many_meta) > 1: 

766 clean_list = [i for i in clean_list if not i.lower().startswith("omid")] 

767 

768 return clean_list, metaid 

769 

770 def conflict( 

771 self, idslist: List[str], name: str, id_dict: dict, col_name: str 

772 ) -> str: 

773 if col_name == "id" or col_name == "venue": 

774 entity_dict = self.brdict 

775 elif col_name == "author" or col_name == "editor" or col_name == "publisher": 

776 entity_dict = self.radict 

777 metaval = self.new_entity(entity_dict, name) 

778 entity_dict[metaval] = {"ids": list(), "others": list(), "title": name} 

779 self.log[self.rowcnt][col_name]["Conflict entity"] = metaval 

780 for identifier in idslist: 

781 entity_dict[metaval]["ids"].append(identifier) 

782 if identifier not in id_dict: 

783 schema_value = identifier.split(":", maxsplit=1) 

784 found_metaid = self.finder.retrieve_metaid_from_id( 

785 schema_value[0], schema_value[1] 

786 ) 

787 if found_metaid: 

788 id_dict[identifier] = found_metaid 

789 else: 

790 self.__update_id_count(id_dict, identifier) 

791 return metaval 

792 

793 def finder_sparql(self, list_to_find, br=True, ra=False, vvi=False, publ=False): 

794 match_elem = list() 

795 id_set = set() 

796 res = None 

797 for elem in list_to_find: 

798 if len(match_elem) < 2: 

799 identifier = elem.split(":", maxsplit=1) 

800 value = identifier[1] 

801 schema = identifier[0] 

802 if br: 

803 res = self.finder.retrieve_br_from_id(schema, value) 

804 elif ra: 

805 res = self.finder.retrieve_ra_from_id(schema, value, publ) 

806 if res: 

807 for f in res: 

808 if f[0] not in id_set: 

809 match_elem.append(f) 

810 id_set.add(f[0]) 

811 return match_elem 

812 

813 def ra_update(self, row: dict, br_key: str, col_name: str) -> None: 

814 if row[col_name]: 

815 sequence = self.armeta[br_key][col_name] 

816 ras_list = list() 

817 for _, ra_id in sequence: 

818 ra_name = self.rameta[ra_id]["title"] 

819 ra_ids = self.rameta[ra_id]["ids"] 

820 ra = self.build_name_ids_string(ra_name, ra_ids) 

821 ras_list.append(ra) 

822 row[col_name] = "; ".join(ras_list) 

823 

824 @staticmethod 

825 def build_name_ids_string(name, ids): 

826 if name and ids: 

827 ra_string = f"{name} [{' '.join(ids)}]" 

828 elif name and not ids: 

829 ra_string = name 

830 elif ids and not name: 

831 ra_string = f"[{' '.join(ids)}]" 

832 elif not ids and not name: 

833 ra_string = "" 

834 return ra_string 

835 

836 @staticmethod 

837 def __local_match(list_to_match, dict_to_match: dict): 

838 match_elem = dict() 

839 match_elem["existing"] = list() 

840 match_elem["wannabe"] = list() 

841 for elem in list_to_match: 

842 for k, va in dict_to_match.items(): 

843 if elem in va["ids"]: 

844 if "wannabe" in k: 

845 if k not in match_elem["wannabe"]: 

846 match_elem["wannabe"].append(k) 

847 else: 

848 if k not in match_elem["existing"]: 

849 match_elem["existing"].append(k) 

850 return match_elem 

851 

852 def __meta_ar(self, target_br_metaid: str, source_br_key: str, role_type: str) -> None: 

853 """ 

854 Transfer agent role assignments from working dictionary to finalized dictionary. 

855 

856 Resolves any remaining placeholder ("wannabe") agent identifiers to their 

857 final MetaIDs by looking up which finalized agent absorbed them. 

858 

859 Args: 

860 target_br_metaid: The final, deduplicated bibliographic resource MetaID 

861 source_br_key: The intermediate key in ardict (may contain "wannabe") 

862 role_type: Type of role ("author", "editor", or "publisher") 

863 """ 

864 for ar_metaid, agent_id in self.ardict[source_br_key][role_type]: 

865 if "wannabe" in agent_id: 

866 for candidate_ra_metaid in self.rameta: 

867 if agent_id in self.rameta[candidate_ra_metaid]["others"]: 

868 resolved_ra_metaid = candidate_ra_metaid 

869 break 

870 else: 

871 resolved_ra_metaid = agent_id 

872 self.armeta[target_br_metaid][role_type].append((ar_metaid, resolved_ra_metaid)) 

873 

874 def __tree_traverse(self, tree: dict, key: str, values: List[Tuple]) -> None: 

875 for k, v in tree.items(): 

876 if k == key: 

877 values.append(v) 

878 elif isinstance(v, dict): 

879 found = self.__tree_traverse(v, key, values) 

880 if found is not None: 

881 values.append(found) 

882 

883 def get_preexisting_entities(self) -> None: 

884 for entity_type in {"br", "ra"}: 

885 for entity_metaid, data in getattr(self, f"{entity_type}dict").items(): 

886 if not entity_metaid.startswith("wannabe"): 

887 self.preexisting_entities.add(f"{entity_type}/{entity_metaid}") 

888 for entity_id_literal in data["ids"]: 

889 preexisting_entity_id_metaid = getattr( 

890 self, f"id{entity_type}" 

891 )[entity_id_literal] 

892 self.preexisting_entities.add( 

893 f"id/{preexisting_entity_id_metaid}" 

894 ) 

895 for _, roles in self.ardict.items(): 

896 for _, ar_ras in roles.items(): 

897 for ar_ra in ar_ras: 

898 if not ar_ra[1].startswith("wannabe"): 

899 self.preexisting_entities.add(f"ar/{ar_ra[0]}") 

900 for venue_metaid, vi in self.vvi.items(): 

901 if not venue_metaid.startswith("wannabe"): 

902 wannabe_preexisting_vis = list() 

903 self.__tree_traverse(vi, "id", wannabe_preexisting_vis) 

904 self.preexisting_entities.update( 

905 { 

906 f"br/{vi_metaid}" 

907 for vi_metaid in wannabe_preexisting_vis 

908 if not vi_metaid.startswith("wannabe") 

909 } 

910 ) 

911 for _, re_metaid in self.remeta.items(): 

912 self.preexisting_entities.add(f"re/{re_metaid[0]}") 

913 

914 def meta_maker(self): 

915 """ 

916 For each dictionary ('brdict', 'ardict', 'radict', 'vvi') the corresponding MetaID dictionary is created 

917 ('brmeta', 'armeta', 'rameta', and 'vvi'). 

918 """ 

919 for identifier in self.brdict: 

920 if "wannabe" in identifier: 

921 other = identifier 

922 count = self._add_number("br") 

923 meta = self.prefix + str(count) 

924 self.brmeta[meta] = self.brdict[identifier] 

925 self.brmeta[meta]["others"].append(other) 

926 self.brmeta[meta]["ids"].append("omid:br/" + meta) 

927 else: 

928 self.brmeta[identifier] = self.brdict[identifier] 

929 self.brmeta[identifier]["ids"].append("omid:br/" + identifier) 

930 for identifier in self.radict: 

931 if "wannabe" in identifier: 

932 other = identifier 

933 count = self._add_number("ra") 

934 meta = self.prefix + str(count) 

935 self.rameta[meta] = self.radict[identifier] 

936 self.rameta[meta]["others"].append(other) 

937 self.rameta[meta]["ids"].append("omid:ra/" + meta) 

938 else: 

939 self.rameta[identifier] = self.radict[identifier] 

940 self.rameta[identifier]["ids"].append("omid:ra/" + identifier) 

941 for ar_id in self.ardict: 

942 if "wannabe" in ar_id: 

943 for br_id in self.brmeta: 

944 if ar_id in self.brmeta[br_id]["others"]: 

945 br_key = br_id 

946 break 

947 else: 

948 br_key = ar_id 

949 self.armeta[br_key] = dict() 

950 self.armeta[br_key]["author"] = list() 

951 self.armeta[br_key]["editor"] = list() 

952 self.armeta[br_key]["publisher"] = list() 

953 self.__meta_ar(br_key, ar_id, "author") 

954 self.__meta_ar(br_key, ar_id, "editor") 

955 self.__meta_ar(br_key, ar_id, "publisher") 

956 self.VolIss = dict() 

957 if self.vvi: 

958 for venue_meta in self.vvi: 

959 venue_issue = self.vvi[venue_meta]["issue"] 

960 if venue_issue: 

961 for issue in venue_issue: 

962 issue_id = venue_issue[issue]["id"] 

963 if "wannabe" in issue_id: 

964 for br_meta in self.brmeta: 

965 if issue_id in self.brmeta[br_meta]["others"]: 

966 self.vvi[venue_meta]["issue"][issue]["id"] = str( 

967 br_meta 

968 ) 

969 break 

970 

971 venue_volume = self.vvi[venue_meta]["volume"] 

972 if venue_volume: 

973 for volume in venue_volume: 

974 volume_id = venue_volume[volume]["id"] 

975 if "wannabe" in volume_id: 

976 for br_meta in self.brmeta: 

977 if volume_id in self.brmeta[br_meta]["others"]: 

978 self.vvi[venue_meta]["volume"][volume]["id"] = str( 

979 br_meta 

980 ) 

981 break 

982 if venue_volume[volume]["issue"]: 

983 volume_issue = venue_volume[volume]["issue"] 

984 for issue in volume_issue: 

985 volume_issue_id = volume_issue[issue]["id"] 

986 if "wannabe" in volume_issue_id: 

987 for br_meta in self.brmeta: 

988 if ( 

989 volume_issue_id 

990 in self.brmeta[br_meta]["others"] 

991 ): 

992 self.vvi[venue_meta]["volume"][volume][ 

993 "issue" 

994 ][issue]["id"] = str(br_meta) 

995 break 

996 if "wannabe" in venue_meta: 

997 for br_meta in self.brmeta: 

998 if venue_meta in self.brmeta[br_meta]["others"]: 

999 self.__merge_VolIss_with_vvi(br_meta, venue_meta) 

1000 else: 

1001 self.__merge_VolIss_with_vvi(venue_meta, venue_meta) 

1002 

1003 def enrich(self): 

1004 """ 

1005 This method replaces the wannabeID placeholders with the 

1006 actual data and MetaIDs as a result of the deduplication process. 

1007 """ 

1008 for row in self.data: 

1009 if "wannabe" in row["id"]: 

1010 for br_metaid in self.brmeta: 

1011 if row["id"] in self.brmeta[br_metaid]["others"]: 

1012 metaid = br_metaid 

1013 else: 

1014 metaid = row["id"] 

1015 if row["page"] and (metaid not in self.remeta): 

1016 re_meta = self.finder.retrieve_re_from_br_meta(metaid) 

1017 if re_meta: 

1018 self.remeta[metaid] = re_meta 

1019 row["page"] = re_meta[1] 

1020 else: 

1021 count = self.prefix + str(self._add_number("re")) 

1022 page = row["page"] 

1023 self.remeta[metaid] = (count, page) 

1024 row["page"] = page 

1025 elif metaid in self.remeta: 

1026 row["page"] = self.remeta[metaid][1] 

1027 row["id"] = " ".join(self.brmeta[metaid]["ids"]) 

1028 row["title"] = self.brmeta[metaid]["title"] 

1029 venue_metaid = None 

1030 if row["venue"]: 

1031 venue = row["venue"] 

1032 if "wannabe" in venue: 

1033 for i in self.brmeta: 

1034 if venue in self.brmeta[i]["others"]: 

1035 venue_metaid = i 

1036 else: 

1037 venue_metaid = venue 

1038 row["venue"] = self.build_name_ids_string( 

1039 self.brmeta[venue_metaid]["title"], self.brmeta[venue_metaid]["ids"] 

1040 ) 

1041 br_key_for_editor = get_edited_br_metaid(row, metaid, venue_metaid) 

1042 self.ra_update(row, metaid, "author") 

1043 self.ra_update(row, metaid, "publisher") 

1044 self.ra_update(row, br_key_for_editor, "editor") 

1045 

1046 @staticmethod 

1047 def name_check(ts_name, name): 

1048 if "," in ts_name: 

1049 names = ts_name.split(",") 

1050 if names[0] and not names[1].strip(): 

1051 # there isn't a given name in ts 

1052 if "," in name: 

1053 gname = name.split(", ")[1] 

1054 if gname.strip(): 

1055 ts_name = names[0] + ", " + gname 

1056 return ts_name 

1057 

1058 def _read_number(self, entity_type: str) -> int: 

1059 return self.counter_handler.read_counter( 

1060 entity_type, supplier_prefix=self.prefix 

1061 ) 

1062 

1063 def _add_number(self, entity_type: str) -> int: 

1064 return self.counter_handler.increment_counter( 

1065 entity_type, supplier_prefix=self.prefix 

1066 ) 

1067 

1068 def __update_id_and_entity_dict( 

1069 self, 

1070 existing_ids: list, 

1071 id_dict: dict, 

1072 entity_dict: Dict[str, Dict[str, list]], 

1073 metaval: str, 

1074 ) -> None: 

1075 for identifier in existing_ids: 

1076 if identifier[1] not in id_dict: 

1077 id_dict[identifier[1]] = identifier[0] 

1078 if identifier[1] not in entity_dict[metaval]["ids"]: 

1079 entity_dict[metaval]["ids"].append(identifier[1]) 

1080 

1081 def indexer(self, path_csv: str = None) -> None: 

1082 """ 

1083 Transform internal dicts (idra, idbr, armeta, remeta) to list-of-dicts format 

1084 for Creator consumption. Optionally saves the enriched CSV file. 

1085 

1086 :params path_csv: Directory path for the enriched CSV output (optional) 

1087 :type path_csv: str 

1088 """ 

1089 # ID 

1090 self.index_id_ra = list() 

1091 self.index_id_br = list() 

1092 for entity_type in {"ra", "br"}: 

1093 cur_index = getattr(self, f"id{entity_type}") 

1094 if cur_index: 

1095 for literal in cur_index: 

1096 row = dict() 

1097 row["id"] = str(literal) 

1098 row["meta"] = str(cur_index[literal]) 

1099 getattr(self, f"index_id_{entity_type}").append(row) 

1100 else: 

1101 row = dict() 

1102 row["id"] = "" 

1103 row["meta"] = "" 

1104 getattr(self, f"index_id_{entity_type}").append(row) 

1105 # AR 

1106 self.ar_index = list() 

1107 if self.armeta: 

1108 for metaid in self.armeta: 

1109 index = dict() 

1110 index["meta"] = metaid 

1111 for role in self.armeta[metaid]: 

1112 list_ar = list() 

1113 for ar, ra in self.armeta[metaid][role]: 

1114 list_ar.append(str(ar) + ", " + str(ra)) 

1115 index[role] = "; ".join(list_ar) 

1116 self.ar_index.append(index) 

1117 else: 

1118 row = dict() 

1119 row["meta"] = "" 

1120 row["author"] = "" 

1121 row["editor"] = "" 

1122 row["publisher"] = "" 

1123 self.ar_index.append(row) 

1124 # RE 

1125 self.re_index = list() 

1126 if self.remeta: 

1127 for x in self.remeta: 

1128 r = dict() 

1129 r["br"] = x 

1130 r["re"] = str(self.remeta[x][0]) 

1131 self.re_index.append(r) 

1132 else: 

1133 row = dict() 

1134 row["br"] = "" 

1135 row["re"] = "" 

1136 self.re_index.append(row) 

1137 # Save enriched CSV if path provided 

1138 if self.filename and path_csv and self.data: 

1139 name = self.filename + ".csv" 

1140 data_file = os.path.join(path_csv, name) 

1141 write_csv(data_file, self.data) 

1142 

1143 def __merge_VolIss_with_vvi( 

1144 self, VolIss_venue_meta: str, vvi_venue_meta: str 

1145 ) -> None: 

1146 if VolIss_venue_meta in self.VolIss: 

1147 for vvi_v in self.vvi[vvi_venue_meta]["volume"]: 

1148 if vvi_v in self.VolIss[VolIss_venue_meta]["volume"]: 

1149 self.VolIss[VolIss_venue_meta]["volume"][vvi_v]["issue"].update( 

1150 self.vvi[vvi_venue_meta]["volume"][vvi_v]["issue"] 

1151 ) 

1152 else: 

1153 self.VolIss[VolIss_venue_meta]["volume"][vvi_v] = self.vvi[ 

1154 vvi_venue_meta 

1155 ]["volume"][vvi_v] 

1156 self.VolIss[VolIss_venue_meta]["issue"].update( 

1157 self.vvi[vvi_venue_meta]["issue"] 

1158 ) 

1159 else: 

1160 self.VolIss[VolIss_venue_meta] = self.vvi[vvi_venue_meta] 

1161 

1162 def __update_id_count(self, id_dict, identifier): 

1163 

1164 # Prima di creare un nuovo ID, verifichiamo se esiste già nel triplestore 

1165 schema, value = identifier.split(":", maxsplit=1) 

1166 existing_metaid = self.finder.retrieve_metaid_from_id(schema, value) 

1167 

1168 if existing_metaid: 

1169 id_dict[identifier] = existing_metaid 

1170 else: 

1171 count = self._add_number("id") 

1172 id_dict[identifier] = self.prefix + str(count) 

1173 

1174 @staticmethod 

1175 def merge( 

1176 dict_to_match: Dict[str, Dict[str, list]], 

1177 metaval: str, 

1178 old_meta: str, 

1179 temporary_name: str, 

1180 ) -> None: 

1181 for x in dict_to_match[old_meta]["ids"]: 

1182 if x not in dict_to_match[metaval]["ids"]: 

1183 dict_to_match[metaval]["ids"].append(x) 

1184 for x in dict_to_match[old_meta]["others"]: 

1185 if x not in dict_to_match[metaval]["others"]: 

1186 dict_to_match[metaval]["others"].append(x) 

1187 dict_to_match[metaval]["others"].append(old_meta) 

1188 if not dict_to_match[metaval]["title"]: 

1189 if dict_to_match[old_meta]["title"]: 

1190 dict_to_match[metaval]["title"] = dict_to_match[old_meta]["title"] 

1191 else: 

1192 dict_to_match[metaval]["title"] = temporary_name 

1193 del dict_to_match[old_meta] 

1194 

1195 def merge_entities_in_csv( 

1196 self, 

1197 idslist: list, 

1198 metaval: str, 

1199 name: str, 

1200 entity_dict: Dict[str, Dict[str, list]], 

1201 id_dict: dict, 

1202 ) -> None: 

1203 found_others = self.__local_match(idslist, entity_dict) 

1204 if found_others["wannabe"]: 

1205 for old_meta in found_others["wannabe"]: 

1206 self.merge(entity_dict, metaval, old_meta, name) 

1207 for identifier in idslist: 

1208 if identifier not in entity_dict[metaval]["ids"]: 

1209 entity_dict[metaval]["ids"].append(identifier) 

1210 if identifier not in id_dict: 

1211 self.__update_id_count(id_dict, identifier) 

1212 self.__update_title(entity_dict, metaval, name) 

1213 

1214 def __update_title(self, entity_dict: dict, metaval: str, name: str) -> None: 

1215 if not entity_dict[metaval]["title"] and name: 

1216 entity_dict[metaval]["title"] = name 

1217 self.log[self.rowcnt]["title"]["status"] = "New value proposed" 

1218 

1219 def id_worker( 

1220 self, 

1221 col_name, 

1222 name, 

1223 idslist: List[str], 

1224 metaval: str, 

1225 ra_ent=False, 

1226 br_ent=False, 

1227 vvi_ent=False, 

1228 publ_entity=False, 

1229 ): 

1230 if not ra_ent: 

1231 id_dict = self.idbr 

1232 entity_dict = self.brdict 

1233 else: 

1234 id_dict = self.idra 

1235 entity_dict = self.radict 

1236 # there's meta 

1237 if metaval: 

1238 # MetaID exists among data? 

1239 # meta already in entity_dict (no care about conflicts, we have a meta specified) 

1240 if metaval in entity_dict: 

1241 self.merge_entities_in_csv(idslist, metaval, name, entity_dict, id_dict) 

1242 else: 

1243 if ra_ent: 

1244 found_meta_ts = self.finder.retrieve_ra_from_meta(metaval) 

1245 elif br_ent: 

1246 found_meta_ts = self.finder.retrieve_br_from_meta(metaval) 

1247 # meta in triplestore 

1248 # 2 Retrieve EntityA data in triplestore to update EntityA inside CSV 

1249 if found_meta_ts[2]: 

1250 entity_dict[metaval] = dict() 

1251 entity_dict[metaval]["ids"] = list() 

1252 if col_name == "author" or col_name == "editor": 

1253 entity_dict[metaval]["title"] = self.name_check( 

1254 found_meta_ts[0], name 

1255 ) 

1256 else: 

1257 entity_dict[metaval]["title"] = found_meta_ts[0] 

1258 entity_dict[metaval]["others"] = list() 

1259 existing_ids = found_meta_ts[1] 

1260 self.__update_id_and_entity_dict( 

1261 existing_ids, id_dict, entity_dict, metaval 

1262 ) 

1263 self.merge_entities_in_csv( 

1264 idslist, metaval, name, entity_dict, id_dict 

1265 ) 

1266 # Look for MetaId in the provenance 

1267 else: 

1268 entity_type = "br" if br_ent or vvi_ent else "ra" 

1269 metaid_uri = f"{self.base_iri}/{entity_type}/{str(metaval)}" 

1270 # The entity MetaId after merge if it was merged, None otherwise. If None, the MetaId is considered invalid 

1271 metaval = self.finder.retrieve_metaid_from_merged_entity( 

1272 metaid_uri=metaid_uri, prov_config=self.prov_config 

1273 ) 

1274 # there's no meta or there was one but it didn't exist 

1275 # Are there other IDs? 

1276 if idslist and not metaval: 

1277 local_match = self.__local_match(idslist, entity_dict) 

1278 # IDs already exist among data? 

1279 # check in entity_dict 

1280 if local_match["existing"]: 

1281 # ids refer to multiple existing entities 

1282 if len(local_match["existing"]) > 1: 

1283 # ! 

1284 return self.conflict(idslist, name, id_dict, col_name) 

1285 # ids refer to ONE existing entity 

1286 elif len(local_match["existing"]) == 1: 

1287 metaval = str(local_match["existing"][0]) 

1288 suspect_ids = list() 

1289 for identifier in idslist: 

1290 if identifier not in entity_dict[metaval]["ids"]: 

1291 suspect_ids.append(identifier) 

1292 if suspect_ids: 

1293 sparql_match = self.finder_sparql( 

1294 suspect_ids, 

1295 br=br_ent, 

1296 ra=ra_ent, 

1297 vvi=vvi_ent, 

1298 publ=publ_entity, 

1299 ) 

1300 if len(sparql_match) > 1: 

1301 # ! 

1302 return self.conflict(idslist, name, id_dict, col_name) 

1303 # ids refers to 1 or more wannabe entities 

1304 elif local_match["wannabe"]: 

1305 metaval = str(local_match["wannabe"].pop(0)) 

1306 # 5 Merge data from entityA (CSV) with data from EntityX (CSV) 

1307 for old_meta in local_match["wannabe"]: 

1308 self.merge(entity_dict, metaval, old_meta, name) 

1309 suspect_ids = list() 

1310 for identifier in idslist: 

1311 if identifier not in entity_dict[metaval]["ids"]: 

1312 suspect_ids.append(identifier) 

1313 if suspect_ids: 

1314 sparql_match = self.finder_sparql( 

1315 suspect_ids, br=br_ent, ra=ra_ent, vvi=vvi_ent, publ=publ_entity 

1316 ) 

1317 if sparql_match: 

1318 # if 'wannabe' not in metaval or len(sparql_match) > 1: 

1319 # # Two entities previously disconnected on the triplestore now become connected 

1320 # # ! 

1321 # return self.conflict(idslist, name, id_dict, col_name) 

1322 # else: 

1323 # Collect all existing IDs from all matches 

1324 existing_ids = [] 

1325 for match in sparql_match: 

1326 existing_ids.extend(match[2]) 

1327 

1328 # new_idslist = [x[1] for x in existing_ids] 

1329 # new_sparql_match = self.finder_sparql(new_idslist, br=br_ent, ra=ra_ent, vvi=vvi_ent, publ=publ_entity) 

1330 # if len(new_sparql_match) > 1: 

1331 # # Two entities previously disconnected on the triplestore now become connected 

1332 # # ! 

1333 # return self.conflict(idslist, name, id_dict, col_name) 

1334 # else: 

1335 # 4 Merge data from EntityA (CSV) with data from EntityX (CSV) (it has already happened in # 5), update both with data from EntityA (RDF) 

1336 old_metaval = metaval 

1337 metaval = sparql_match[0][0] 

1338 entity_dict[metaval] = dict() 

1339 entity_dict[metaval]["ids"] = list() 

1340 entity_dict[metaval]["others"] = list() 

1341 entity_dict[metaval]["title"] = ( 

1342 sparql_match[0][1] if sparql_match[0][1] else "" 

1343 ) 

1344 self.__update_id_and_entity_dict( 

1345 existing_ids, id_dict, entity_dict, metaval 

1346 ) 

1347 self.merge( 

1348 entity_dict, metaval, old_metaval, sparql_match[0][1] 

1349 ) 

1350 else: 

1351 sparql_match = self.finder_sparql( 

1352 idslist, br=br_ent, ra=ra_ent, vvi=vvi_ent, publ=publ_entity 

1353 ) 

1354 # if len(sparql_match) > 1: 

1355 # # ! 

1356 # return self.conflict(idslist, name, id_dict, col_name) 

1357 # elif len(sparql_match) == 1: 

1358 if sparql_match: 

1359 # Collect all existing IDs from all matches 

1360 existing_ids = [] 

1361 for match in sparql_match: 

1362 existing_ids.extend(match[2]) 

1363 

1364 # new_idslist = [x[1] for x in existing_ids] 

1365 # new_sparql_match = self.finder_sparql(new_idslist, br=br_ent, ra=ra_ent, vvi=vvi_ent, publ=publ_entity) 

1366 # if len(new_sparql_match) > 1: 

1367 # # Two entities previously disconnected on the triplestore now become connected 

1368 # # ! 

1369 # return self.conflict(idslist, name, id_dict, col_name) 

1370 # 2 Retrieve EntityA data in triplestore to update EntityA inside CSV 

1371 # 3 CONFLICT beteen MetaIDs. MetaID specified in EntityA inside CSV has precedence. 

1372 # elif len(new_sparql_match) == 1: 

1373 metaval = sparql_match[0][0] 

1374 entity_dict[metaval] = dict() 

1375 entity_dict[metaval]["ids"] = list() 

1376 entity_dict[metaval]["others"] = list() 

1377 if col_name == "author" or col_name == "editor": 

1378 entity_dict[metaval]["title"] = self.name_check( 

1379 sparql_match[0][1], name 

1380 ) 

1381 else: 

1382 entity_dict[metaval]["title"] = sparql_match[0][1] 

1383 self.__update_title(entity_dict, metaval, name) 

1384 self.__update_id_and_entity_dict( 

1385 existing_ids, id_dict, entity_dict, metaval 

1386 ) 

1387 else: 

1388 # 1 EntityA is a new one 

1389 metaval = self.new_entity(entity_dict, name) 

1390 for identifier in idslist: 

1391 if identifier not in id_dict: 

1392 self.__update_id_count(id_dict, identifier) 

1393 if identifier not in entity_dict[metaval]["ids"]: 

1394 entity_dict[metaval]["ids"].append(identifier) 

1395 self.__update_title(entity_dict, metaval, name) 

1396 # 1 EntityA is a new one 

1397 if not idslist and not metaval: 

1398 metaval = self.new_entity(entity_dict, name) 

1399 return metaval 

1400 

1401 def new_entity(self, entity_dict, name): 

1402 metaval = "wannabe_" + str(self.wnb_cnt) 

1403 self.wnb_cnt += 1 

1404 entity_dict[metaval] = dict() 

1405 entity_dict[metaval]["ids"] = list() 

1406 entity_dict[metaval]["others"] = list() 

1407 entity_dict[metaval]["title"] = name 

1408 return metaval 

1409 

1410 def volume_issue( 

1411 self, 

1412 meta: str, 

1413 path: Dict[str, Dict[str, str]], 

1414 value: str, 

1415 row: Dict[str, str], 

1416 ) -> None: 

1417 if "wannabe" not in meta: 

1418 if value in path: 

1419 if "wannabe" in path[value]["id"]: 

1420 old_meta = path[value]["id"] 

1421 self.merge(self.brdict, meta, old_meta, row["title"]) 

1422 path[value]["id"] = meta 

1423 else: 

1424 path[value] = dict() 

1425 path[value]["id"] = meta 

1426 if "issue" not in path: 

1427 path[value]["issue"] = dict() 

1428 else: 

1429 if value in path: 

1430 if "wannabe" in path[value]["id"]: 

1431 old_meta = path[value]["id"] 

1432 if meta != old_meta: 

1433 self.merge(self.brdict, meta, old_meta, row["title"]) 

1434 path[value]["id"] = meta 

1435 else: 

1436 old_meta = path[value]["id"] 

1437 if "wannabe" not in old_meta and old_meta not in self.brdict: 

1438 br4dict = self.finder.retrieve_br_from_meta(old_meta) 

1439 self.brdict[old_meta] = dict() 

1440 self.brdict[old_meta]["ids"] = list() 

1441 self.brdict[old_meta]["others"] = list() 

1442 self.brdict[old_meta]["title"] = br4dict[0] if br4dict else None 

1443 if br4dict: 

1444 for x in br4dict[1]: 

1445 identifier = x[1] 

1446 self.brdict[old_meta]["ids"].append(identifier) 

1447 if identifier not in self.idbr: 

1448 self.idbr[identifier] = x[0] 

1449 self.merge(self.brdict, old_meta, meta, row["title"]) 

1450 else: 

1451 path[value] = dict() 

1452 path[value]["id"] = meta 

1453 if "issue" not in path: # it's a Volume 

1454 path[value]["issue"] = dict() 

1455 

1456 def log_update(self): 

1457 new_log = dict() 

1458 for x in self.log: 

1459 if any(self.log[x][y].values() for y in self.log[x]): 

1460 for y in self.log[x]: 

1461 if "Conflict entity" in self.log[x][y]: 

1462 v = self.log[x][y]["Conflict entity"] 

1463 if "wannabe" in v: 

1464 if y == "id" or y == "venue": 

1465 for brm in self.brmeta: 

1466 if v in self.brmeta[brm]["others"]: 

1467 m = "br/" + str(brm) 

1468 elif y == "author" or y == "editor" or y == "publisher": 

1469 for ram in self.rameta: 

1470 if v in self.rameta[ram]["others"]: 

1471 m = "ra/" + str(ram) 

1472 else: 

1473 m = v 

1474 self.log[x][y]["Conflict entity"] = m 

1475 new_log[x] = self.log[x] 

1476 

1477 if "wannabe" in self.data[x]["id"]: 

1478 for brm in self.brmeta: 

1479 if self.data[x]["id"] in self.brmeta[brm]["others"]: 

1480 met = "br/" + str(brm) 

1481 else: 

1482 met = "br/" + str(self.data[x]["id"]) 

1483 new_log[x]["id"]["meta"] = met 

1484 return new_log 

1485 

1486 def merge_duplicate_entities(self) -> None: 

1487 """ 

1488 The 'merge_duplicate_entities()' function merge duplicate entities. 

1489 Moreover, it modifies the CSV cells, giving precedence to the first found information 

1490 or data in the triplestore in the case of already existing entities. 

1491 

1492 :returns: None -- This method updates the CSV rows and returns None. 

1493 """ 

1494 self.rowcnt = 0 

1495 for row in self.data: 

1496 id = row["id"] 

1497 if "wannabe" not in id: 

1498 self.equalizer(row, id) 

1499 other_rowcnt = 0 

1500 for other_row in self.data: 

1501 if ( 

1502 other_row["id"] in self.brdict[id]["others"] 

1503 or other_row["id"] == id 

1504 ) and self.rowcnt != other_rowcnt: 

1505 for field, _ in row.items(): 

1506 if row[field] and row[field] != other_row[field]: 

1507 if other_row[field]: 

1508 self.log[other_rowcnt][field][ 

1509 "status" 

1510 ] = "New value proposed" 

1511 other_row[field] = row[field] 

1512 other_rowcnt += 1 

1513 self.rowcnt += 1 

1514 

1515 def extract_name_and_ids(self, venue_str: str) -> Tuple[str, List[str]]: 

1516 """ 

1517 Extracts the name and IDs from the venue string. 

1518 

1519 :params venue_str: the venue string 

1520 :type venue_str: str 

1521 :returns: Tuple[str, List[str]] -- the name and list of IDs extracted from the venue string 

1522 """ 

1523 match = re.search(name_and_ids, venue_str) 

1524 if match: 

1525 name = match.group(1).strip() 

1526 ids = match.group(2).strip().split() 

1527 else: 

1528 name = venue_str.strip() 

1529 ids = [] 

1530 return name, ids 

1531 

1532 def equalizer(self, row: Dict[str, str], metaval: str) -> None: 

1533 """ 

1534 Given a CSV row and its MetaID, this function equates the information present in the CSV with that present on the triplestore. 

1535 

1536 :params row: a dictionary representing a CSV row 

1537 :type row: Dict[str, str] 

1538 :params metaval: the MetaID identifying the bibliographic resource contained in the input CSV row 

1539 :type metaval: str 

1540 :returns: None -- This method modifies the input CSV row without returning it. 

1541 """ 

1542 self.log[self.rowcnt]["id"]["status"] = "Entity already exists" 

1543 known_data = self.finder.retrieve_br_info_from_meta(metaval) 

1544 try: 

1545 known_data["author"] = self.__get_resp_agents(metaval, "author") 

1546 except ValueError: 

1547 print(row) 

1548 raise (ValueError) 

1549 known_data["editor"] = self.__get_resp_agents(metaval, "editor") 

1550 known_data["publisher"] = self.finder.retrieve_publisher_from_br_metaid(metaval) 

1551 for datum in ["pub_date", "type", "volume", "issue"]: 

1552 if known_data[datum]: 

1553 if row[datum] and row[datum] != known_data[datum]: 

1554 self.log[self.rowcnt][datum]["status"] = "New value proposed" 

1555 row[datum] = known_data[datum] 

1556 for datum in ["author", "editor", "publisher"]: 

1557 if known_data[datum] and not row[datum]: 

1558 row[datum] = known_data[datum] 

1559 if known_data["venue"]: 

1560 current_venue = row["venue"] 

1561 known_venue = known_data["venue"] 

1562 

1563 if current_venue: 

1564 # Extract the IDs from the current venue 

1565 current_venue_name, current_venue_ids = self.extract_name_and_ids( 

1566 current_venue 

1567 ) 

1568 known_venue_name, known_venue_ids = self.extract_name_and_ids( 

1569 known_venue 

1570 ) 

1571 

1572 current_venue_ids_set = set(current_venue_ids) 

1573 known_venue_ids_set = set(known_venue_ids) 

1574 

1575 common_ids = current_venue_ids_set.intersection(known_venue_ids_set) 

1576 

1577 if common_ids: 

1578 # Merge the IDs and use the title from the known venue 

1579 merged_ids = current_venue_ids_set.union(known_venue_ids_set) 

1580 row["venue"] = ( 

1581 f"{known_venue_name} [{' '.join(sorted(merged_ids))}]" 

1582 ) 

1583 else: 

1584 # Use the known venue information entirely 

1585 row["venue"] = known_venue 

1586 else: 

1587 row["venue"] = known_venue 

1588 if known_data["page"]: 

1589 if row["page"] and row["page"] != known_data["page"][1]: 

1590 self.log[self.rowcnt]["page"]["status"] = "New value proposed" 

1591 row["page"] = known_data["page"][1] 

1592 self.remeta[metaval] = known_data["page"] 

1593 

1594 def __get_resp_agents(self, metaid: str, column: str) -> str: 

1595 resp_agents = self.finder.retrieve_ra_sequence_from_br_meta(metaid, column) 

1596 output = "" 

1597 if resp_agents: 

1598 full_resp_agents = list() 

1599 for item in resp_agents: 

1600 for _, resp_agent in item.items(): 

1601 author_name = resp_agent[0] 

1602 ids = [f"omid:ra/{resp_agent[2]}"] 

1603 ids.extend([id[1] for id in resp_agent[1]]) 

1604 author_ids = "[" + " ".join(ids) + "]" 

1605 full_resp_agent = author_name + " " + author_ids 

1606 full_resp_agents.append(full_resp_agent) 

1607 output = "; ".join(full_resp_agents) 

1608 return output 

1609 

1610 

1611def is_a_valid_row(row: Dict[str, str]) -> bool: 

1612 """ 

1613 This method discards invalid rows in the input CSV file. 

1614 

1615 :params row: a dictionary representing a CSV row 

1616 :type row: Dict[str, str] 

1617 :returns: bool -- This method returns True if the row is valid, False if it is invalid. 

1618 """ 

1619 br_type = " ".join((row["type"].lower()).split()) 

1620 br_title = row["title"] 

1621 br_volume = row["volume"] 

1622 br_issue = row["issue"] 

1623 br_venue = row["venue"] 

1624 if row["id"]: 

1625 if (br_volume or br_issue) and (not br_type or not br_venue): 

1626 return False 

1627 return True 

1628 if all(not row[value] for value in row): 

1629 return False 

1630 br_author = row["author"] 

1631 br_editor = row["editor"] 

1632 br_pub_date = row["pub_date"] 

1633 if not br_type or br_type in { 

1634 "book", 

1635 "data file", 

1636 "dataset", 

1637 "dissertation", 

1638 "edited book", 

1639 "journal article", 

1640 "monograph", 

1641 "other", 

1642 "peer review", 

1643 "posted content", 

1644 "web content", 

1645 "proceedings article", 

1646 "report", 

1647 "reference book", 

1648 }: 

1649 is_a_valid_row = ( 

1650 True if br_title and br_pub_date and (br_author or br_editor) else False 

1651 ) 

1652 elif br_type in { 

1653 "book chapter", 

1654 "book part", 

1655 "book section", 

1656 "book track", 

1657 "component", 

1658 "reference entry", 

1659 }: 

1660 is_a_valid_row = True if br_title and br_venue else False 

1661 elif br_type in { 

1662 "book series", 

1663 "book set", 

1664 "journal", 

1665 "proceedings", 

1666 "proceedings series", 

1667 "report series", 

1668 "standard", 

1669 "standard series", 

1670 }: 

1671 is_a_valid_row = True if br_title else False 

1672 elif br_type == "journal volume": 

1673 is_a_valid_row = True if br_venue and (br_volume or br_title) else False 

1674 elif br_type == "journal issue": 

1675 is_a_valid_row = True if br_venue and (br_issue or br_title) else False 

1676 return is_a_valid_row 

1677 

1678 

1679def get_edited_br_metaid(row: dict, metaid: str, venue_metaid: str) -> Tuple[str, bool]: 

1680 if row["author"] and row["venue"] and row["type"] in CONTAINER_EDITOR_TYPES: 

1681 edited_br_metaid = venue_metaid 

1682 else: 

1683 edited_br_metaid = metaid 

1684 return edited_br_metaid