Coverage for oc_meta/core/curator.py: 92%
982 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-12-20 08:55 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-12-20 08:55 +0000
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3# Copyright 2019 Silvio Peroni <essepuntato@gmail.com>
4# Copyright 2019-2020 Fabio Mariani <fabio.mariani555@gmail.com>
5# Copyright 2021 Simone Persiani <iosonopersia@gmail.com>
6# Copyright 2021-2022 Arcangelo Massari <arcangelo.massari@unibo.it>
7#
8# Permission to use, copy, modify, and/or distribute this software for any purpose
9# with or without fee is hereby granted, provided that the above copyright notice
10# and this permission notice appear in all copies.
11#
12# THE SOFTWARE IS PROVIDED 'AS IS' AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
13# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
14# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
15# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
16# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
17# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
18# SOFTWARE.
20from __future__ import annotations
22import os
23import re
24from contextlib import nullcontext
25from typing import Dict, List, Tuple
27from oc_meta.constants import CONTAINER_EDITOR_TYPES
28from oc_meta.lib.cleaner import Cleaner
29from oc_meta.lib.file_manager import *
30from oc_meta.lib.finder import *
31from oc_meta.lib.master_of_regex import *
32from oc_ocdm.counter_handler.redis_counter_handler import RedisCounterHandler
35class Curator:
37 def __init__(
38 self,
39 data: List[dict],
40 ts: str,
41 prov_config: str,
42 counter_handler: RedisCounterHandler,
43 base_iri: str = "https://w3id.org/oc/meta",
44 prefix: str = "060",
45 separator: str = None,
46 valid_dois_cache: dict = dict(),
47 settings: dict | None = None,
48 silencer: list = [],
49 meta_config_path: str = None,
50 timer=None,
51 ):
52 self.timer = timer
53 self.settings = settings or {}
54 self.everything_everywhere_allatonce = Graph()
55 self.finder = ResourceFinder(
56 ts,
57 base_iri,
58 self.everything_everywhere_allatonce,
59 settings=settings,
60 meta_config_path=meta_config_path,
61 )
62 self.base_iri = base_iri
63 self.prov_config = prov_config
64 self.separator = separator
65 # Preliminary pass to clear volume and issue if id is present but venue is missing
66 for row in data:
67 if row["id"] and (row["volume"] or row["issue"]):
68 if not row["venue"]:
69 row["volume"] = ""
70 row["issue"] = ""
71 if not row["type"]:
72 row["type"] = "journal article"
73 self.data = [
74 {field: value.strip() for field, value in row.items()}
75 for row in data
76 if is_a_valid_row(row)
77 ]
78 self.prefix = prefix
79 # Redis counter handler
80 self.counter_handler = counter_handler
81 self.brdict = {}
82 self.radict: Dict[str, Dict[str, list]] = {}
83 self.ardict: Dict[str, Dict[str, list]] = {}
84 self.vvi = {} # Venue, Volume, Issue
85 self.idra = {} # key id; value metaid of id related to ra
86 self.idbr = {} # key id; value metaid of id related to br
87 self.rameta = dict()
88 self.brmeta = dict()
89 self.armeta = dict()
90 self.remeta = dict()
91 self.wnb_cnt = 0 # wannabe counter
92 self.rowcnt = 0
93 self.log = dict()
94 self.valid_dois_cache = valid_dois_cache
95 self.preexisting_entities = set()
96 self.silencer = silencer
98 def _timed(self, name: str):
99 if self.timer:
100 return self.timer.timer(name)
101 return nullcontext()
103 def collect_identifiers(self, valid_dois_cache):
104 all_metavals = set()
105 all_idslist = set()
106 all_vvis = set()
107 for row in self.data:
108 metavals, idslist, vvis = self.extract_identifiers_and_metavals(
109 row, valid_dois_cache=valid_dois_cache
110 )
111 all_metavals.update(metavals)
112 all_idslist.update(idslist)
113 all_vvis.update(vvis)
114 return all_metavals, all_idslist, all_vvis
116 def extract_identifiers_and_metavals(
117 self, row, valid_dois_cache
118 ) -> Tuple[set, set, set]:
119 metavals = set()
120 identifiers = set()
121 vvis = set()
122 venue_ids = set()
123 venue_metaid = None
125 if row["id"]:
126 idslist, metaval = self.clean_id_list(
127 self.split_identifiers(row["id"]),
128 br=True,
129 valid_dois_cache=valid_dois_cache,
130 )
131 id_metaval = f"omid:br/{metaval}" if metaval else ""
132 if id_metaval:
133 metavals.add(id_metaval)
134 if idslist:
135 identifiers.update(idslist)
137 fields_with_an_id = [
138 (field, re.search(name_and_ids, row[field]).group(2).split())
139 for field in ["author", "editor", "publisher", "venue", "volume", "issue"]
140 if re.search(name_and_ids, row[field])
141 ]
142 for field, field_ids in fields_with_an_id:
143 br = field in ["venue", "volume", "issue"]
144 field_idslist, field_metaval = self.clean_id_list(
145 field_ids, br=br, valid_dois_cache=valid_dois_cache
146 )
147 if field_metaval:
148 field_metaval = (
149 f"omid:br/{field_metaval}" if br else f"omid:ra/{field_metaval}"
150 )
151 else:
152 field_metaval = ""
153 if field_metaval:
154 metavals.add(field_metaval)
155 if field == "venue":
156 venue_metaid = field_metaval
157 if field_idslist:
158 venue_ids.update(field_idslist)
159 else:
160 if field_idslist:
161 identifiers.update(field_idslist)
163 if (venue_metaid or venue_ids) and (row["volume"] or row["issue"]):
164 vvi = (row["volume"], row["issue"], venue_metaid, tuple(sorted(venue_ids)))
165 vvis.add(vvi)
167 return metavals, identifiers, vvis
169 def split_identifiers(self, field_value):
170 if self.separator:
171 return re.sub(colon_and_spaces, ":", field_value).split(self.separator)
172 else:
173 return re.split(
174 one_or_more_spaces, re.sub(colon_and_spaces, ":", field_value)
175 )
177 def curator(self, filename: str = None, path_csv: str = None):
178 # Phase 1: Collect identifiers and SPARQL prefetch
179 with self._timed("curation__collect_identifiers"):
180 metavals, identifiers, vvis = self.collect_identifiers(
181 valid_dois_cache=self.valid_dois_cache
182 )
183 self.finder.get_everything_about_res(
184 metavals=metavals, identifiers=identifiers, vvis=vvis
185 )
187 # Phase 2: Clean ID (loop over all rows)
188 with self._timed("curation__clean_id"):
189 for row in self.data:
190 self.log[self.rowcnt] = {
191 "id": {},
192 "title": {},
193 "author": {},
194 "venue": {},
195 "editor": {},
196 "publisher": {},
197 "page": {},
198 "volume": {},
199 "issue": {},
200 "pub_date": {},
201 "type": {},
202 }
203 self.clean_id(row)
204 self.rowcnt += 1
206 # Phase 3: Merge duplicate entities
207 with self._timed("curation__merge_duplicates"):
208 self.merge_duplicate_entities()
209 self.clean_metadata_without_id()
211 # Phase 4: Clean VVI (venue/volume/issue)
212 with self._timed("curation__clean_vvi"):
213 self.rowcnt = 0
214 for row in self.data:
215 self.clean_vvi(row)
216 self.rowcnt += 1
218 # Phase 5: Clean RA (author + publisher + editor aggregated)
219 with self._timed("curation__clean_ra"):
220 self.rowcnt = 0
221 for row in self.data:
222 self.clean_ra(row, "author")
223 self.clean_ra(row, "publisher")
224 self.clean_ra(row, "editor")
225 self.rowcnt += 1
227 # Phase 6: Finalize (preexisting + meta_maker + enrich + indexer)
228 with self._timed("curation__finalize"):
229 self.get_preexisting_entities()
230 self.meta_maker()
231 self.log = self.log_update()
232 self.enrich()
233 # Remove duplicates
234 self.data = list({v["id"]: v for v in self.data}.values())
235 self.filename = filename
236 self.indexer(path_csv=path_csv)
238 # ID
239 def clean_id(self, row: Dict[str, str]) -> None:
240 """
241 The 'clean id()' function is executed for each CSV row.
242 In this process, any duplicates are detected by the IDs in the 'id' column.
243 For each line, a wannabeID or, if the bibliographic resource was found in the triplestore,
244 a MetaID is assigned.
245 Finally, this method enrich and clean the fields related to the
246 title, venue, volume, issue, page, publication date and type.
248 :params row: a dictionary representing a CSV row
249 :type row: Dict[str, str]
250 :returns: None -- This method modifies the input CSV row without returning it.
251 """
252 if row["title"]:
253 name = Cleaner(row["title"]).clean_title(
254 self.settings.get("normalize_titles")
255 )
256 else:
257 name = ""
258 metaval_ids_list = []
259 if row["id"]:
260 if self.separator:
261 idslist = re.sub(colon_and_spaces, ":", row["id"]).split(self.separator)
262 else:
263 idslist = re.split(
264 one_or_more_spaces, re.sub(colon_and_spaces, ":", row["id"])
265 )
266 idslist, metaval = self.clean_id_list(
267 idslist, br=True, valid_dois_cache=self.valid_dois_cache
268 )
269 id_metaval = f"omid:br/{metaval}" if metaval else ""
270 metaval_ids_list.append((id_metaval, idslist))
271 fields_with_an_id = [
272 (field, re.search(name_and_ids, row[field]).group(2).split())
273 for field in ["author", "editor", "publisher", "venue", "volume", "issue"]
274 if re.search(name_and_ids, row[field])
275 ]
276 for field, field_ids in fields_with_an_id:
277 if field in ["author", "editor", "publisher"]:
278 br = False
279 elif field in ["venue", "volume", "issue"]:
280 br = True
281 field_idslist, field_metaval = self.clean_id_list(
282 field_ids, br=br, valid_dois_cache=self.valid_dois_cache
283 )
284 if field_metaval:
285 field_metaval = (
286 f"omid:br/{field_metaval}" if br else f"omid:ra/{field_metaval}"
287 )
288 else:
289 field_metaval = ""
290 metaval_ids_list.append((field_metaval, field_idslist))
291 if row["id"]:
292 metaval = self.id_worker(
293 "id",
294 name,
295 idslist,
296 metaval,
297 ra_ent=False,
298 br_ent=True,
299 vvi_ent=False,
300 publ_entity=False,
301 )
302 else:
303 metaval = self.new_entity(self.brdict, name)
304 row["title"] = self.brdict[metaval]["title"]
305 row["id"] = metaval
307 def clean_metadata_without_id(self):
308 for row in self.data:
309 # page
310 if row["page"]:
311 row["page"] = Cleaner(row["page"]).normalize_hyphens()
312 # date
313 if row["pub_date"]:
314 date = Cleaner(row["pub_date"]).normalize_hyphens()
315 date = Cleaner(date).clean_date()
316 row["pub_date"] = date
317 # type
318 if row["type"]:
319 entity_type = " ".join((row["type"].lower()).split())
320 if entity_type == "edited book" or entity_type == "monograph":
321 entity_type = "book"
322 elif (
323 entity_type == "report series"
324 or entity_type == "standard series"
325 or entity_type == "proceedings series"
326 ):
327 entity_type = "series"
328 elif entity_type == "posted content":
329 entity_type = "web content"
330 if entity_type in {
331 "abstract",
332 "archival document",
333 "audio document",
334 "book",
335 "book chapter",
336 "book part",
337 "book section",
338 "book series",
339 "book set",
340 "computer program",
341 "data file",
342 "data management plan",
343 "dataset",
344 "dissertation",
345 "editorial",
346 "journal",
347 "journal article",
348 "journal editorial",
349 "journal issue",
350 "journal volume",
351 "newspaper",
352 "newspaper article",
353 "newspaper editorial",
354 "newspaper issue",
355 "peer review",
356 "preprint",
357 "presentation",
358 "proceedings",
359 "proceedings article",
360 "proceedings series",
361 "reference book",
362 "reference entry",
363 "retraction notice",
364 "series",
365 "report",
366 "standard",
367 "web content",
368 }:
369 row["type"] = entity_type
370 else:
371 row["type"] = ""
373 # VVI
374 def clean_vvi(self, row: Dict[str, str]) -> None:
375 """
376 This method performs the deduplication process for venues, volumes and issues.
377 The acquired information is stored in the 'vvi' dictionary, that has the following format: ::
379 {
380 VENUE_IDENTIFIER: {
381 'issue': {SEQUENCE_IDENTIFIER: {'id': META_ID}},
382 'volume': {
383 SEQUENCE_IDENTIFIER: {
384 'id': META_ID,
385 'issue' {SEQUENCE_IDENTIFIER: {'id': META_ID}}
386 }
387 }
388 }
389 }
391 {
392 '4416': {
393 'issue': {},
394 'volume': {
395 '166': {'id': '4388', 'issue': {'4': {'id': '4389'}}},
396 '172': {'id': '4434',
397 'issue': {
398 '22': {'id': '4435'},
399 '20': {'id': '4436'},
400 '21': {'id': '4437'},
401 '19': {'id': '4438'}
402 }
403 }
404 }
405 }
406 }
408 :params row: a dictionary representing a CSV row
409 :type row: Dict[str, str]
410 :returns: None -- This method modifies the input CSV row without returning it.
411 """
412 if row["type"] not in {
413 "journal article",
414 "journal volume",
415 "journal issue",
416 } and (row["volume"] or row["issue"]):
417 row["volume"] = ""
418 row["issue"] = ""
419 Cleaner.clean_volume_and_issue(row=row)
420 vol_meta = None
421 br_type = row["type"]
422 volume = row["volume"]
423 issue = row["issue"]
424 br_id = row["id"]
425 venue = row["venue"]
426 # Venue
427 if venue:
428 # The data must be invalidated, because the resource is journal but a volume or an issue have also been specified
429 if br_type == "journal" and (volume or issue):
430 row["venue"] = ""
431 row["volume"] = ""
432 row["issue"] = ""
433 venue_id = re.search(name_and_ids, venue)
434 if venue_id:
435 name = Cleaner(venue_id.group(1)).clean_title(
436 self.settings.get("normalize_titles")
437 )
438 venue_id = venue_id.group(2)
439 if self.separator:
440 idslist = re.sub(colon_and_spaces, ":", venue_id).split(
441 self.separator
442 )
443 else:
444 idslist = re.split(
445 one_or_more_spaces, re.sub(colon_and_spaces, ":", venue_id)
446 )
447 idslist, metaval = self.clean_id_list(
448 idslist, br=True, valid_dois_cache=self.valid_dois_cache
449 )
451 metaval = self.id_worker(
452 "venue",
453 name,
454 idslist,
455 metaval,
456 ra_ent=False,
457 br_ent=True,
458 vvi_ent=True,
459 publ_entity=False,
460 )
461 if metaval not in self.vvi:
462 ts_vvi = None
463 if "wannabe" not in metaval:
464 ts_vvi = self.finder.retrieve_venue_from_local_graph(metaval)
465 if "wannabe" in metaval or not ts_vvi:
466 self.vvi[metaval] = dict()
467 self.vvi[metaval]["volume"] = dict()
468 self.vvi[metaval]["issue"] = dict()
469 elif ts_vvi:
470 self.vvi[metaval] = ts_vvi
471 else:
472 name = Cleaner(venue).clean_title(self.settings.get("normalize_titles"))
473 metaval = self.new_entity(self.brdict, name)
474 self.vvi[metaval] = dict()
475 self.vvi[metaval]["volume"] = dict()
476 self.vvi[metaval]["issue"] = dict()
477 row["venue"] = metaval
479 # Volume
480 if volume and (br_type == "journal issue" or br_type == "journal article"):
481 if volume in self.vvi[metaval]["volume"]:
482 vol_meta = self.vvi[metaval]["volume"][volume]["id"]
483 else:
484 vol_meta = self.new_entity(self.brdict, "")
485 self.vvi[metaval]["volume"][volume] = dict()
486 self.vvi[metaval]["volume"][volume]["id"] = vol_meta
487 self.vvi[metaval]["volume"][volume]["issue"] = dict()
488 elif volume and br_type == "journal volume":
489 # The data must be invalidated, because the resource is a journal volume but an issue has also been specified
490 if issue:
491 row["volume"] = ""
492 row["issue"] = ""
493 else:
494 vol_meta = br_id
495 self.volume_issue(
496 vol_meta, self.vvi[metaval]["volume"], volume, row
497 )
499 # Issue
500 if issue and br_type == "journal article":
501 row["issue"] = issue
502 if vol_meta:
503 if issue not in self.vvi[metaval]["volume"][volume]["issue"]:
504 issue_meta = self.new_entity(self.brdict, "")
505 self.vvi[metaval]["volume"][volume]["issue"][issue] = dict()
506 self.vvi[metaval]["volume"][volume]["issue"][issue][
507 "id"
508 ] = issue_meta
509 else:
510 if issue not in self.vvi[metaval]["issue"]:
511 issue_meta = self.new_entity(self.brdict, "")
512 self.vvi[metaval]["issue"][issue] = dict()
513 self.vvi[metaval]["issue"][issue]["id"] = issue_meta
514 elif issue and br_type == "journal issue":
515 issue_meta = br_id
516 if vol_meta:
517 self.volume_issue(
518 issue_meta,
519 self.vvi[metaval]["volume"][volume]["issue"],
520 issue,
521 row,
522 )
523 else:
524 self.volume_issue(
525 issue_meta, self.vvi[metaval]["issue"], issue, row
526 )
528 else:
529 row["venue"] = ""
530 row["volume"] = ""
531 row["issue"] = ""
533 # RA
534 def clean_ra(self, row, col_name):
535 """
536 This method performs the deduplication process for responsible agents (authors, publishers and editors).
538 :params row: a dictionary representing a CSV row
539 :type row: Dict[str, str]
540 :params col_name: the CSV column name. It can be 'author', 'publisher', or 'editor'
541 :type col_name: str
542 :returns: None -- This method modifies self.ardict, self.radict, and self.idra, and returns None.
543 """
545 def get_br_metaval_to_check(row, col_name):
546 if col_name == "editor":
547 return get_edited_br_metaid(row, row["id"], row["venue"])
548 else:
549 return row["id"]
551 def get_br_metaval(br_metaval_to_check):
552 if br_metaval_to_check in self.brdict or br_metaval_to_check in self.vvi:
553 return br_metaval_to_check
554 return [
555 id
556 for id in self.brdict
557 if br_metaval_to_check in self.brdict[id]["others"]
558 ][0]
560 def initialize_ardict_entry(br_metaval):
561 if br_metaval not in self.ardict:
562 self.ardict[br_metaval] = {"author": [], "editor": [], "publisher": []}
564 def initialize_sequence(br_metaval, col_name):
565 sequence = []
566 if "wannabe" in br_metaval:
567 sequence = []
568 else:
569 sequence_found = self.finder.retrieve_ra_sequence_from_br_meta(
570 br_metaval, col_name
571 )
572 if sequence_found:
573 sequence = []
574 for agent in sequence_found:
575 for ar_metaid in agent:
576 ra_metaid = agent[ar_metaid][2]
577 sequence.append(tuple((ar_metaid, ra_metaid)))
578 if ra_metaid not in self.radict:
579 self.radict[ra_metaid] = dict()
580 self.radict[ra_metaid]["ids"] = list()
581 self.radict[ra_metaid]["others"] = list()
582 self.radict[ra_metaid]["title"] = agent[ar_metaid][0]
583 for identifier in agent[ar_metaid][1]:
584 # other ids after meta
585 id_metaid = identifier[0]
586 literal = identifier[1]
587 if id_metaid not in self.idra:
588 self.idra[literal] = id_metaid
589 if literal not in self.radict[ra_metaid]["ids"]:
590 self.radict[ra_metaid]["ids"].append(literal)
591 self.ardict[br_metaval][col_name].extend(sequence)
592 else:
593 sequence = []
594 return sequence
596 def parse_ra_list(row):
597 ra_list = re.split(semicolon_in_people_field, row[col_name])
598 ra_list = Cleaner.clean_ra_list(ra_list)
599 return ra_list
601 def process_individual_ra(ra, sequence):
602 new_elem_seq = True
603 ra_id = None
604 ra_id_match = re.search(name_and_ids, ra)
605 if ra_id_match:
606 cleaner = Cleaner(ra_id_match.group(1))
607 name = cleaner.clean_name()
608 ra_id = ra_id_match.group(2)
609 else:
610 cleaner = Cleaner(ra)
611 name = cleaner.clean_name()
612 if not ra_id and sequence:
613 for _, ra_metaid in sequence:
614 if self.radict[ra_metaid]["title"] == name:
615 ra_id = "omid:ra/" + str(ra_metaid)
616 new_elem_seq = False
617 break
618 return ra_id, name, new_elem_seq
620 if not row[col_name]:
621 return
623 br_metaval_to_check = get_br_metaval_to_check(row, col_name)
624 br_metaval = get_br_metaval(br_metaval_to_check)
625 initialize_ardict_entry(br_metaval)
627 sequence = self.ardict[br_metaval].get(col_name, [])
628 if not sequence:
629 sequence = initialize_sequence(br_metaval, col_name)
630 if col_name in self.silencer and sequence:
631 return
633 ra_list = parse_ra_list(row)
634 new_sequence = list()
635 change_order = False
637 for pos, ra in enumerate(ra_list):
638 ra_id, name, new_elem_seq = process_individual_ra(ra, sequence)
639 if ra_id:
640 if self.separator:
641 ra_id_list = re.sub(colon_and_spaces, ":", ra_id).split(
642 self.separator
643 )
644 else:
645 ra_id_list = re.split(
646 one_or_more_spaces, re.sub(colon_and_spaces, ":", ra_id)
647 )
648 if sequence:
649 ar_ra = None
650 for ps, el in enumerate(sequence):
651 ra_metaid = el[1]
652 for literal in ra_id_list:
653 if literal in self.radict[ra_metaid]["ids"]:
654 if ps != pos:
655 change_order = True
656 new_elem_seq = False
657 if "wannabe" not in ra_metaid:
658 ar_ra = ra_metaid
659 for pos, literal_value in enumerate(ra_id_list):
660 if "omid" in literal_value:
661 ra_id_list[pos] = ""
662 break
663 ra_id_list = list(filter(None, ra_id_list))
664 ra_id_list.append("omid:ra/" + ar_ra)
665 if not ar_ra:
666 # new element
667 for ar_metaid, ra_metaid in sequence:
668 if self.radict[ra_metaid]["title"] == name:
669 new_elem_seq = False
670 if "wannabe" not in ra_metaid:
671 ar_ra = ra_metaid
672 for pos, i in enumerate(ra_id_list):
673 if "omid" in i:
674 ra_id_list[pos] = ""
675 break
676 ra_id_list = list(filter(None, ra_id_list))
677 ra_id_list.append("omid:ra/" + ar_ra)
678 if col_name == "publisher":
679 ra_id_list, metaval = self.clean_id_list(
680 ra_id_list, br=False, valid_dois_cache=self.valid_dois_cache
681 )
682 metaval = self.id_worker(
683 "publisher",
684 name,
685 ra_id_list,
686 metaval,
687 ra_ent=True,
688 br_ent=False,
689 vvi_ent=False,
690 publ_entity=True,
691 )
692 else:
693 ra_id_list, metaval = self.clean_id_list(
694 ra_id_list, br=False, valid_dois_cache=self.valid_dois_cache
695 )
696 metaval = self.id_worker(
697 col_name,
698 name,
699 ra_id_list,
700 metaval,
701 ra_ent=True,
702 br_ent=False,
703 vvi_ent=False,
704 publ_entity=False,
705 )
706 if col_name != "publisher" and metaval in self.radict:
707 full_name: str = self.radict[metaval]["title"]
708 if "," in name and "," in full_name:
709 first_name = name.split(",")[1].strip()
710 if (
711 not full_name.split(",")[1].strip() and first_name
712 ): # first name found!
713 given_name = full_name.split(",")[0]
714 self.radict[metaval]["title"] = (
715 given_name + ", " + first_name
716 )
717 else:
718 metaval = self.new_entity(self.radict, name)
719 if new_elem_seq:
720 role = self.prefix + str(self._add_number("ar"))
721 new_sequence.append(tuple((role, metaval)))
722 if change_order:
723 self.log[self.rowcnt][col_name][
724 "Info"
725 ] = "New RA sequence proposed: refused"
726 sequence.extend(new_sequence)
727 self.ardict[br_metaval][col_name] = sequence
729 @staticmethod
730 def clean_id_list(
731 id_list: List[str], br: bool, valid_dois_cache: dict = dict()
732 ) -> Tuple[list, str]:
733 """
734 Clean IDs in the input list and check if there is a MetaID.
736 :params: id_list: a list of IDs
737 :type: id_list: List[str]
738 :params: br: True if the IDs in id_list refer to bibliographic resources, False otherwise
739 :type: br: bool
740 :returns: Tuple[list, str]: -- it returns a two-elements tuple, where the first element is the list of cleaned IDs, while the second is a MetaID if any was found.
741 """
742 pattern = "br/" if br else "ra/"
743 metaid = ""
744 id_list = list(filter(None, id_list))
745 clean_list = list()
747 for elem in id_list:
748 if elem in clean_list:
749 continue
750 elem = Cleaner(elem).normalize_hyphens()
751 identifier = elem.split(":", 1)
752 schema = identifier[0].lower()
753 value = identifier[1]
755 if schema == "omid":
756 metaid = value.replace(pattern, "")
757 else:
758 normalized_id = Cleaner(elem).normalize_id(
759 valid_dois_cache=valid_dois_cache
760 )
761 if normalized_id:
762 clean_list.append(normalized_id)
764 how_many_meta = [i for i in id_list if i.lower().startswith("omid")]
765 if len(how_many_meta) > 1:
766 clean_list = [i for i in clean_list if not i.lower().startswith("omid")]
768 return clean_list, metaid
770 def conflict(
771 self, idslist: List[str], name: str, id_dict: dict, col_name: str
772 ) -> str:
773 if col_name == "id" or col_name == "venue":
774 entity_dict = self.brdict
775 elif col_name == "author" or col_name == "editor" or col_name == "publisher":
776 entity_dict = self.radict
777 metaval = self.new_entity(entity_dict, name)
778 entity_dict[metaval] = {"ids": list(), "others": list(), "title": name}
779 self.log[self.rowcnt][col_name]["Conflict entity"] = metaval
780 for identifier in idslist:
781 entity_dict[metaval]["ids"].append(identifier)
782 if identifier not in id_dict:
783 schema_value = identifier.split(":", maxsplit=1)
784 found_metaid = self.finder.retrieve_metaid_from_id(
785 schema_value[0], schema_value[1]
786 )
787 if found_metaid:
788 id_dict[identifier] = found_metaid
789 else:
790 self.__update_id_count(id_dict, identifier)
791 return metaval
793 def finder_sparql(self, list_to_find, br=True, ra=False, vvi=False, publ=False):
794 match_elem = list()
795 id_set = set()
796 res = None
797 for elem in list_to_find:
798 if len(match_elem) < 2:
799 identifier = elem.split(":", maxsplit=1)
800 value = identifier[1]
801 schema = identifier[0]
802 if br:
803 res = self.finder.retrieve_br_from_id(schema, value)
804 elif ra:
805 res = self.finder.retrieve_ra_from_id(schema, value, publ)
806 if res:
807 for f in res:
808 if f[0] not in id_set:
809 match_elem.append(f)
810 id_set.add(f[0])
811 return match_elem
813 def ra_update(self, row: dict, br_key: str, col_name: str) -> None:
814 if row[col_name]:
815 sequence = self.armeta[br_key][col_name]
816 ras_list = list()
817 for _, ra_id in sequence:
818 ra_name = self.rameta[ra_id]["title"]
819 ra_ids = self.rameta[ra_id]["ids"]
820 ra = self.build_name_ids_string(ra_name, ra_ids)
821 ras_list.append(ra)
822 row[col_name] = "; ".join(ras_list)
824 @staticmethod
825 def build_name_ids_string(name, ids):
826 if name and ids:
827 ra_string = f"{name} [{' '.join(ids)}]"
828 elif name and not ids:
829 ra_string = name
830 elif ids and not name:
831 ra_string = f"[{' '.join(ids)}]"
832 elif not ids and not name:
833 ra_string = ""
834 return ra_string
836 @staticmethod
837 def __local_match(list_to_match, dict_to_match: dict):
838 match_elem = dict()
839 match_elem["existing"] = list()
840 match_elem["wannabe"] = list()
841 for elem in list_to_match:
842 for k, va in dict_to_match.items():
843 if elem in va["ids"]:
844 if "wannabe" in k:
845 if k not in match_elem["wannabe"]:
846 match_elem["wannabe"].append(k)
847 else:
848 if k not in match_elem["existing"]:
849 match_elem["existing"].append(k)
850 return match_elem
852 def __meta_ar(self, target_br_metaid: str, source_br_key: str, role_type: str) -> None:
853 """
854 Transfer agent role assignments from working dictionary to finalized dictionary.
856 Resolves any remaining placeholder ("wannabe") agent identifiers to their
857 final MetaIDs by looking up which finalized agent absorbed them.
859 Args:
860 target_br_metaid: The final, deduplicated bibliographic resource MetaID
861 source_br_key: The intermediate key in ardict (may contain "wannabe")
862 role_type: Type of role ("author", "editor", or "publisher")
863 """
864 for ar_metaid, agent_id in self.ardict[source_br_key][role_type]:
865 if "wannabe" in agent_id:
866 for candidate_ra_metaid in self.rameta:
867 if agent_id in self.rameta[candidate_ra_metaid]["others"]:
868 resolved_ra_metaid = candidate_ra_metaid
869 break
870 else:
871 resolved_ra_metaid = agent_id
872 self.armeta[target_br_metaid][role_type].append((ar_metaid, resolved_ra_metaid))
874 def __tree_traverse(self, tree: dict, key: str, values: List[Tuple]) -> None:
875 for k, v in tree.items():
876 if k == key:
877 values.append(v)
878 elif isinstance(v, dict):
879 found = self.__tree_traverse(v, key, values)
880 if found is not None:
881 values.append(found)
883 def get_preexisting_entities(self) -> None:
884 for entity_type in {"br", "ra"}:
885 for entity_metaid, data in getattr(self, f"{entity_type}dict").items():
886 if not entity_metaid.startswith("wannabe"):
887 self.preexisting_entities.add(f"{entity_type}/{entity_metaid}")
888 for entity_id_literal in data["ids"]:
889 preexisting_entity_id_metaid = getattr(
890 self, f"id{entity_type}"
891 )[entity_id_literal]
892 self.preexisting_entities.add(
893 f"id/{preexisting_entity_id_metaid}"
894 )
895 for _, roles in self.ardict.items():
896 for _, ar_ras in roles.items():
897 for ar_ra in ar_ras:
898 if not ar_ra[1].startswith("wannabe"):
899 self.preexisting_entities.add(f"ar/{ar_ra[0]}")
900 for venue_metaid, vi in self.vvi.items():
901 if not venue_metaid.startswith("wannabe"):
902 wannabe_preexisting_vis = list()
903 self.__tree_traverse(vi, "id", wannabe_preexisting_vis)
904 self.preexisting_entities.update(
905 {
906 f"br/{vi_metaid}"
907 for vi_metaid in wannabe_preexisting_vis
908 if not vi_metaid.startswith("wannabe")
909 }
910 )
911 for _, re_metaid in self.remeta.items():
912 self.preexisting_entities.add(f"re/{re_metaid[0]}")
914 def meta_maker(self):
915 """
916 For each dictionary ('brdict', 'ardict', 'radict', 'vvi') the corresponding MetaID dictionary is created
917 ('brmeta', 'armeta', 'rameta', and 'vvi').
918 """
919 for identifier in self.brdict:
920 if "wannabe" in identifier:
921 other = identifier
922 count = self._add_number("br")
923 meta = self.prefix + str(count)
924 self.brmeta[meta] = self.brdict[identifier]
925 self.brmeta[meta]["others"].append(other)
926 self.brmeta[meta]["ids"].append("omid:br/" + meta)
927 else:
928 self.brmeta[identifier] = self.brdict[identifier]
929 self.brmeta[identifier]["ids"].append("omid:br/" + identifier)
930 for identifier in self.radict:
931 if "wannabe" in identifier:
932 other = identifier
933 count = self._add_number("ra")
934 meta = self.prefix + str(count)
935 self.rameta[meta] = self.radict[identifier]
936 self.rameta[meta]["others"].append(other)
937 self.rameta[meta]["ids"].append("omid:ra/" + meta)
938 else:
939 self.rameta[identifier] = self.radict[identifier]
940 self.rameta[identifier]["ids"].append("omid:ra/" + identifier)
941 for ar_id in self.ardict:
942 if "wannabe" in ar_id:
943 for br_id in self.brmeta:
944 if ar_id in self.brmeta[br_id]["others"]:
945 br_key = br_id
946 break
947 else:
948 br_key = ar_id
949 self.armeta[br_key] = dict()
950 self.armeta[br_key]["author"] = list()
951 self.armeta[br_key]["editor"] = list()
952 self.armeta[br_key]["publisher"] = list()
953 self.__meta_ar(br_key, ar_id, "author")
954 self.__meta_ar(br_key, ar_id, "editor")
955 self.__meta_ar(br_key, ar_id, "publisher")
956 self.VolIss = dict()
957 if self.vvi:
958 for venue_meta in self.vvi:
959 venue_issue = self.vvi[venue_meta]["issue"]
960 if venue_issue:
961 for issue in venue_issue:
962 issue_id = venue_issue[issue]["id"]
963 if "wannabe" in issue_id:
964 for br_meta in self.brmeta:
965 if issue_id in self.brmeta[br_meta]["others"]:
966 self.vvi[venue_meta]["issue"][issue]["id"] = str(
967 br_meta
968 )
969 break
971 venue_volume = self.vvi[venue_meta]["volume"]
972 if venue_volume:
973 for volume in venue_volume:
974 volume_id = venue_volume[volume]["id"]
975 if "wannabe" in volume_id:
976 for br_meta in self.brmeta:
977 if volume_id in self.brmeta[br_meta]["others"]:
978 self.vvi[venue_meta]["volume"][volume]["id"] = str(
979 br_meta
980 )
981 break
982 if venue_volume[volume]["issue"]:
983 volume_issue = venue_volume[volume]["issue"]
984 for issue in volume_issue:
985 volume_issue_id = volume_issue[issue]["id"]
986 if "wannabe" in volume_issue_id:
987 for br_meta in self.brmeta:
988 if (
989 volume_issue_id
990 in self.brmeta[br_meta]["others"]
991 ):
992 self.vvi[venue_meta]["volume"][volume][
993 "issue"
994 ][issue]["id"] = str(br_meta)
995 break
996 if "wannabe" in venue_meta:
997 for br_meta in self.brmeta:
998 if venue_meta in self.brmeta[br_meta]["others"]:
999 self.__merge_VolIss_with_vvi(br_meta, venue_meta)
1000 else:
1001 self.__merge_VolIss_with_vvi(venue_meta, venue_meta)
1003 def enrich(self):
1004 """
1005 This method replaces the wannabeID placeholders with the
1006 actual data and MetaIDs as a result of the deduplication process.
1007 """
1008 for row in self.data:
1009 if "wannabe" in row["id"]:
1010 for br_metaid in self.brmeta:
1011 if row["id"] in self.brmeta[br_metaid]["others"]:
1012 metaid = br_metaid
1013 else:
1014 metaid = row["id"]
1015 if row["page"] and (metaid not in self.remeta):
1016 re_meta = self.finder.retrieve_re_from_br_meta(metaid)
1017 if re_meta:
1018 self.remeta[metaid] = re_meta
1019 row["page"] = re_meta[1]
1020 else:
1021 count = self.prefix + str(self._add_number("re"))
1022 page = row["page"]
1023 self.remeta[metaid] = (count, page)
1024 row["page"] = page
1025 elif metaid in self.remeta:
1026 row["page"] = self.remeta[metaid][1]
1027 row["id"] = " ".join(self.brmeta[metaid]["ids"])
1028 row["title"] = self.brmeta[metaid]["title"]
1029 venue_metaid = None
1030 if row["venue"]:
1031 venue = row["venue"]
1032 if "wannabe" in venue:
1033 for i in self.brmeta:
1034 if venue in self.brmeta[i]["others"]:
1035 venue_metaid = i
1036 else:
1037 venue_metaid = venue
1038 row["venue"] = self.build_name_ids_string(
1039 self.brmeta[venue_metaid]["title"], self.brmeta[venue_metaid]["ids"]
1040 )
1041 br_key_for_editor = get_edited_br_metaid(row, metaid, venue_metaid)
1042 self.ra_update(row, metaid, "author")
1043 self.ra_update(row, metaid, "publisher")
1044 self.ra_update(row, br_key_for_editor, "editor")
1046 @staticmethod
1047 def name_check(ts_name, name):
1048 if "," in ts_name:
1049 names = ts_name.split(",")
1050 if names[0] and not names[1].strip():
1051 # there isn't a given name in ts
1052 if "," in name:
1053 gname = name.split(", ")[1]
1054 if gname.strip():
1055 ts_name = names[0] + ", " + gname
1056 return ts_name
1058 def _read_number(self, entity_type: str) -> int:
1059 return self.counter_handler.read_counter(
1060 entity_type, supplier_prefix=self.prefix
1061 )
1063 def _add_number(self, entity_type: str) -> int:
1064 return self.counter_handler.increment_counter(
1065 entity_type, supplier_prefix=self.prefix
1066 )
1068 def __update_id_and_entity_dict(
1069 self,
1070 existing_ids: list,
1071 id_dict: dict,
1072 entity_dict: Dict[str, Dict[str, list]],
1073 metaval: str,
1074 ) -> None:
1075 for identifier in existing_ids:
1076 if identifier[1] not in id_dict:
1077 id_dict[identifier[1]] = identifier[0]
1078 if identifier[1] not in entity_dict[metaval]["ids"]:
1079 entity_dict[metaval]["ids"].append(identifier[1])
1081 def indexer(self, path_csv: str = None) -> None:
1082 """
1083 Transform internal dicts (idra, idbr, armeta, remeta) to list-of-dicts format
1084 for Creator consumption. Optionally saves the enriched CSV file.
1086 :params path_csv: Directory path for the enriched CSV output (optional)
1087 :type path_csv: str
1088 """
1089 # ID
1090 self.index_id_ra = list()
1091 self.index_id_br = list()
1092 for entity_type in {"ra", "br"}:
1093 cur_index = getattr(self, f"id{entity_type}")
1094 if cur_index:
1095 for literal in cur_index:
1096 row = dict()
1097 row["id"] = str(literal)
1098 row["meta"] = str(cur_index[literal])
1099 getattr(self, f"index_id_{entity_type}").append(row)
1100 else:
1101 row = dict()
1102 row["id"] = ""
1103 row["meta"] = ""
1104 getattr(self, f"index_id_{entity_type}").append(row)
1105 # AR
1106 self.ar_index = list()
1107 if self.armeta:
1108 for metaid in self.armeta:
1109 index = dict()
1110 index["meta"] = metaid
1111 for role in self.armeta[metaid]:
1112 list_ar = list()
1113 for ar, ra in self.armeta[metaid][role]:
1114 list_ar.append(str(ar) + ", " + str(ra))
1115 index[role] = "; ".join(list_ar)
1116 self.ar_index.append(index)
1117 else:
1118 row = dict()
1119 row["meta"] = ""
1120 row["author"] = ""
1121 row["editor"] = ""
1122 row["publisher"] = ""
1123 self.ar_index.append(row)
1124 # RE
1125 self.re_index = list()
1126 if self.remeta:
1127 for x in self.remeta:
1128 r = dict()
1129 r["br"] = x
1130 r["re"] = str(self.remeta[x][0])
1131 self.re_index.append(r)
1132 else:
1133 row = dict()
1134 row["br"] = ""
1135 row["re"] = ""
1136 self.re_index.append(row)
1137 # Save enriched CSV if path provided
1138 if self.filename and path_csv and self.data:
1139 name = self.filename + ".csv"
1140 data_file = os.path.join(path_csv, name)
1141 write_csv(data_file, self.data)
1143 def __merge_VolIss_with_vvi(
1144 self, VolIss_venue_meta: str, vvi_venue_meta: str
1145 ) -> None:
1146 if VolIss_venue_meta in self.VolIss:
1147 for vvi_v in self.vvi[vvi_venue_meta]["volume"]:
1148 if vvi_v in self.VolIss[VolIss_venue_meta]["volume"]:
1149 self.VolIss[VolIss_venue_meta]["volume"][vvi_v]["issue"].update(
1150 self.vvi[vvi_venue_meta]["volume"][vvi_v]["issue"]
1151 )
1152 else:
1153 self.VolIss[VolIss_venue_meta]["volume"][vvi_v] = self.vvi[
1154 vvi_venue_meta
1155 ]["volume"][vvi_v]
1156 self.VolIss[VolIss_venue_meta]["issue"].update(
1157 self.vvi[vvi_venue_meta]["issue"]
1158 )
1159 else:
1160 self.VolIss[VolIss_venue_meta] = self.vvi[vvi_venue_meta]
1162 def __update_id_count(self, id_dict, identifier):
1164 # Prima di creare un nuovo ID, verifichiamo se esiste già nel triplestore
1165 schema, value = identifier.split(":", maxsplit=1)
1166 existing_metaid = self.finder.retrieve_metaid_from_id(schema, value)
1168 if existing_metaid:
1169 id_dict[identifier] = existing_metaid
1170 else:
1171 count = self._add_number("id")
1172 id_dict[identifier] = self.prefix + str(count)
1174 @staticmethod
1175 def merge(
1176 dict_to_match: Dict[str, Dict[str, list]],
1177 metaval: str,
1178 old_meta: str,
1179 temporary_name: str,
1180 ) -> None:
1181 for x in dict_to_match[old_meta]["ids"]:
1182 if x not in dict_to_match[metaval]["ids"]:
1183 dict_to_match[metaval]["ids"].append(x)
1184 for x in dict_to_match[old_meta]["others"]:
1185 if x not in dict_to_match[metaval]["others"]:
1186 dict_to_match[metaval]["others"].append(x)
1187 dict_to_match[metaval]["others"].append(old_meta)
1188 if not dict_to_match[metaval]["title"]:
1189 if dict_to_match[old_meta]["title"]:
1190 dict_to_match[metaval]["title"] = dict_to_match[old_meta]["title"]
1191 else:
1192 dict_to_match[metaval]["title"] = temporary_name
1193 del dict_to_match[old_meta]
1195 def merge_entities_in_csv(
1196 self,
1197 idslist: list,
1198 metaval: str,
1199 name: str,
1200 entity_dict: Dict[str, Dict[str, list]],
1201 id_dict: dict,
1202 ) -> None:
1203 found_others = self.__local_match(idslist, entity_dict)
1204 if found_others["wannabe"]:
1205 for old_meta in found_others["wannabe"]:
1206 self.merge(entity_dict, metaval, old_meta, name)
1207 for identifier in idslist:
1208 if identifier not in entity_dict[metaval]["ids"]:
1209 entity_dict[metaval]["ids"].append(identifier)
1210 if identifier not in id_dict:
1211 self.__update_id_count(id_dict, identifier)
1212 self.__update_title(entity_dict, metaval, name)
1214 def __update_title(self, entity_dict: dict, metaval: str, name: str) -> None:
1215 if not entity_dict[metaval]["title"] and name:
1216 entity_dict[metaval]["title"] = name
1217 self.log[self.rowcnt]["title"]["status"] = "New value proposed"
1219 def id_worker(
1220 self,
1221 col_name,
1222 name,
1223 idslist: List[str],
1224 metaval: str,
1225 ra_ent=False,
1226 br_ent=False,
1227 vvi_ent=False,
1228 publ_entity=False,
1229 ):
1230 if not ra_ent:
1231 id_dict = self.idbr
1232 entity_dict = self.brdict
1233 else:
1234 id_dict = self.idra
1235 entity_dict = self.radict
1236 # there's meta
1237 if metaval:
1238 # MetaID exists among data?
1239 # meta already in entity_dict (no care about conflicts, we have a meta specified)
1240 if metaval in entity_dict:
1241 self.merge_entities_in_csv(idslist, metaval, name, entity_dict, id_dict)
1242 else:
1243 if ra_ent:
1244 found_meta_ts = self.finder.retrieve_ra_from_meta(metaval)
1245 elif br_ent:
1246 found_meta_ts = self.finder.retrieve_br_from_meta(metaval)
1247 # meta in triplestore
1248 # 2 Retrieve EntityA data in triplestore to update EntityA inside CSV
1249 if found_meta_ts[2]:
1250 entity_dict[metaval] = dict()
1251 entity_dict[metaval]["ids"] = list()
1252 if col_name == "author" or col_name == "editor":
1253 entity_dict[metaval]["title"] = self.name_check(
1254 found_meta_ts[0], name
1255 )
1256 else:
1257 entity_dict[metaval]["title"] = found_meta_ts[0]
1258 entity_dict[metaval]["others"] = list()
1259 existing_ids = found_meta_ts[1]
1260 self.__update_id_and_entity_dict(
1261 existing_ids, id_dict, entity_dict, metaval
1262 )
1263 self.merge_entities_in_csv(
1264 idslist, metaval, name, entity_dict, id_dict
1265 )
1266 # Look for MetaId in the provenance
1267 else:
1268 entity_type = "br" if br_ent or vvi_ent else "ra"
1269 metaid_uri = f"{self.base_iri}/{entity_type}/{str(metaval)}"
1270 # The entity MetaId after merge if it was merged, None otherwise. If None, the MetaId is considered invalid
1271 metaval = self.finder.retrieve_metaid_from_merged_entity(
1272 metaid_uri=metaid_uri, prov_config=self.prov_config
1273 )
1274 # there's no meta or there was one but it didn't exist
1275 # Are there other IDs?
1276 if idslist and not metaval:
1277 local_match = self.__local_match(idslist, entity_dict)
1278 # IDs already exist among data?
1279 # check in entity_dict
1280 if local_match["existing"]:
1281 # ids refer to multiple existing entities
1282 if len(local_match["existing"]) > 1:
1283 # !
1284 return self.conflict(idslist, name, id_dict, col_name)
1285 # ids refer to ONE existing entity
1286 elif len(local_match["existing"]) == 1:
1287 metaval = str(local_match["existing"][0])
1288 suspect_ids = list()
1289 for identifier in idslist:
1290 if identifier not in entity_dict[metaval]["ids"]:
1291 suspect_ids.append(identifier)
1292 if suspect_ids:
1293 sparql_match = self.finder_sparql(
1294 suspect_ids,
1295 br=br_ent,
1296 ra=ra_ent,
1297 vvi=vvi_ent,
1298 publ=publ_entity,
1299 )
1300 if len(sparql_match) > 1:
1301 # !
1302 return self.conflict(idslist, name, id_dict, col_name)
1303 # ids refers to 1 or more wannabe entities
1304 elif local_match["wannabe"]:
1305 metaval = str(local_match["wannabe"].pop(0))
1306 # 5 Merge data from entityA (CSV) with data from EntityX (CSV)
1307 for old_meta in local_match["wannabe"]:
1308 self.merge(entity_dict, metaval, old_meta, name)
1309 suspect_ids = list()
1310 for identifier in idslist:
1311 if identifier not in entity_dict[metaval]["ids"]:
1312 suspect_ids.append(identifier)
1313 if suspect_ids:
1314 sparql_match = self.finder_sparql(
1315 suspect_ids, br=br_ent, ra=ra_ent, vvi=vvi_ent, publ=publ_entity
1316 )
1317 if sparql_match:
1318 # if 'wannabe' not in metaval or len(sparql_match) > 1:
1319 # # Two entities previously disconnected on the triplestore now become connected
1320 # # !
1321 # return self.conflict(idslist, name, id_dict, col_name)
1322 # else:
1323 # Collect all existing IDs from all matches
1324 existing_ids = []
1325 for match in sparql_match:
1326 existing_ids.extend(match[2])
1328 # new_idslist = [x[1] for x in existing_ids]
1329 # new_sparql_match = self.finder_sparql(new_idslist, br=br_ent, ra=ra_ent, vvi=vvi_ent, publ=publ_entity)
1330 # if len(new_sparql_match) > 1:
1331 # # Two entities previously disconnected on the triplestore now become connected
1332 # # !
1333 # return self.conflict(idslist, name, id_dict, col_name)
1334 # else:
1335 # 4 Merge data from EntityA (CSV) with data from EntityX (CSV) (it has already happened in # 5), update both with data from EntityA (RDF)
1336 old_metaval = metaval
1337 metaval = sparql_match[0][0]
1338 entity_dict[metaval] = dict()
1339 entity_dict[metaval]["ids"] = list()
1340 entity_dict[metaval]["others"] = list()
1341 entity_dict[metaval]["title"] = (
1342 sparql_match[0][1] if sparql_match[0][1] else ""
1343 )
1344 self.__update_id_and_entity_dict(
1345 existing_ids, id_dict, entity_dict, metaval
1346 )
1347 self.merge(
1348 entity_dict, metaval, old_metaval, sparql_match[0][1]
1349 )
1350 else:
1351 sparql_match = self.finder_sparql(
1352 idslist, br=br_ent, ra=ra_ent, vvi=vvi_ent, publ=publ_entity
1353 )
1354 # if len(sparql_match) > 1:
1355 # # !
1356 # return self.conflict(idslist, name, id_dict, col_name)
1357 # elif len(sparql_match) == 1:
1358 if sparql_match:
1359 # Collect all existing IDs from all matches
1360 existing_ids = []
1361 for match in sparql_match:
1362 existing_ids.extend(match[2])
1364 # new_idslist = [x[1] for x in existing_ids]
1365 # new_sparql_match = self.finder_sparql(new_idslist, br=br_ent, ra=ra_ent, vvi=vvi_ent, publ=publ_entity)
1366 # if len(new_sparql_match) > 1:
1367 # # Two entities previously disconnected on the triplestore now become connected
1368 # # !
1369 # return self.conflict(idslist, name, id_dict, col_name)
1370 # 2 Retrieve EntityA data in triplestore to update EntityA inside CSV
1371 # 3 CONFLICT beteen MetaIDs. MetaID specified in EntityA inside CSV has precedence.
1372 # elif len(new_sparql_match) == 1:
1373 metaval = sparql_match[0][0]
1374 entity_dict[metaval] = dict()
1375 entity_dict[metaval]["ids"] = list()
1376 entity_dict[metaval]["others"] = list()
1377 if col_name == "author" or col_name == "editor":
1378 entity_dict[metaval]["title"] = self.name_check(
1379 sparql_match[0][1], name
1380 )
1381 else:
1382 entity_dict[metaval]["title"] = sparql_match[0][1]
1383 self.__update_title(entity_dict, metaval, name)
1384 self.__update_id_and_entity_dict(
1385 existing_ids, id_dict, entity_dict, metaval
1386 )
1387 else:
1388 # 1 EntityA is a new one
1389 metaval = self.new_entity(entity_dict, name)
1390 for identifier in idslist:
1391 if identifier not in id_dict:
1392 self.__update_id_count(id_dict, identifier)
1393 if identifier not in entity_dict[metaval]["ids"]:
1394 entity_dict[metaval]["ids"].append(identifier)
1395 self.__update_title(entity_dict, metaval, name)
1396 # 1 EntityA is a new one
1397 if not idslist and not metaval:
1398 metaval = self.new_entity(entity_dict, name)
1399 return metaval
1401 def new_entity(self, entity_dict, name):
1402 metaval = "wannabe_" + str(self.wnb_cnt)
1403 self.wnb_cnt += 1
1404 entity_dict[metaval] = dict()
1405 entity_dict[metaval]["ids"] = list()
1406 entity_dict[metaval]["others"] = list()
1407 entity_dict[metaval]["title"] = name
1408 return metaval
1410 def volume_issue(
1411 self,
1412 meta: str,
1413 path: Dict[str, Dict[str, str]],
1414 value: str,
1415 row: Dict[str, str],
1416 ) -> None:
1417 if "wannabe" not in meta:
1418 if value in path:
1419 if "wannabe" in path[value]["id"]:
1420 old_meta = path[value]["id"]
1421 self.merge(self.brdict, meta, old_meta, row["title"])
1422 path[value]["id"] = meta
1423 else:
1424 path[value] = dict()
1425 path[value]["id"] = meta
1426 if "issue" not in path:
1427 path[value]["issue"] = dict()
1428 else:
1429 if value in path:
1430 if "wannabe" in path[value]["id"]:
1431 old_meta = path[value]["id"]
1432 if meta != old_meta:
1433 self.merge(self.brdict, meta, old_meta, row["title"])
1434 path[value]["id"] = meta
1435 else:
1436 old_meta = path[value]["id"]
1437 if "wannabe" not in old_meta and old_meta not in self.brdict:
1438 br4dict = self.finder.retrieve_br_from_meta(old_meta)
1439 self.brdict[old_meta] = dict()
1440 self.brdict[old_meta]["ids"] = list()
1441 self.brdict[old_meta]["others"] = list()
1442 self.brdict[old_meta]["title"] = br4dict[0] if br4dict else None
1443 if br4dict:
1444 for x in br4dict[1]:
1445 identifier = x[1]
1446 self.brdict[old_meta]["ids"].append(identifier)
1447 if identifier not in self.idbr:
1448 self.idbr[identifier] = x[0]
1449 self.merge(self.brdict, old_meta, meta, row["title"])
1450 else:
1451 path[value] = dict()
1452 path[value]["id"] = meta
1453 if "issue" not in path: # it's a Volume
1454 path[value]["issue"] = dict()
1456 def log_update(self):
1457 new_log = dict()
1458 for x in self.log:
1459 if any(self.log[x][y].values() for y in self.log[x]):
1460 for y in self.log[x]:
1461 if "Conflict entity" in self.log[x][y]:
1462 v = self.log[x][y]["Conflict entity"]
1463 if "wannabe" in v:
1464 if y == "id" or y == "venue":
1465 for brm in self.brmeta:
1466 if v in self.brmeta[brm]["others"]:
1467 m = "br/" + str(brm)
1468 elif y == "author" or y == "editor" or y == "publisher":
1469 for ram in self.rameta:
1470 if v in self.rameta[ram]["others"]:
1471 m = "ra/" + str(ram)
1472 else:
1473 m = v
1474 self.log[x][y]["Conflict entity"] = m
1475 new_log[x] = self.log[x]
1477 if "wannabe" in self.data[x]["id"]:
1478 for brm in self.brmeta:
1479 if self.data[x]["id"] in self.brmeta[brm]["others"]:
1480 met = "br/" + str(brm)
1481 else:
1482 met = "br/" + str(self.data[x]["id"])
1483 new_log[x]["id"]["meta"] = met
1484 return new_log
1486 def merge_duplicate_entities(self) -> None:
1487 """
1488 The 'merge_duplicate_entities()' function merge duplicate entities.
1489 Moreover, it modifies the CSV cells, giving precedence to the first found information
1490 or data in the triplestore in the case of already existing entities.
1492 :returns: None -- This method updates the CSV rows and returns None.
1493 """
1494 self.rowcnt = 0
1495 for row in self.data:
1496 id = row["id"]
1497 if "wannabe" not in id:
1498 self.equalizer(row, id)
1499 other_rowcnt = 0
1500 for other_row in self.data:
1501 if (
1502 other_row["id"] in self.brdict[id]["others"]
1503 or other_row["id"] == id
1504 ) and self.rowcnt != other_rowcnt:
1505 for field, _ in row.items():
1506 if row[field] and row[field] != other_row[field]:
1507 if other_row[field]:
1508 self.log[other_rowcnt][field][
1509 "status"
1510 ] = "New value proposed"
1511 other_row[field] = row[field]
1512 other_rowcnt += 1
1513 self.rowcnt += 1
1515 def extract_name_and_ids(self, venue_str: str) -> Tuple[str, List[str]]:
1516 """
1517 Extracts the name and IDs from the venue string.
1519 :params venue_str: the venue string
1520 :type venue_str: str
1521 :returns: Tuple[str, List[str]] -- the name and list of IDs extracted from the venue string
1522 """
1523 match = re.search(name_and_ids, venue_str)
1524 if match:
1525 name = match.group(1).strip()
1526 ids = match.group(2).strip().split()
1527 else:
1528 name = venue_str.strip()
1529 ids = []
1530 return name, ids
1532 def equalizer(self, row: Dict[str, str], metaval: str) -> None:
1533 """
1534 Given a CSV row and its MetaID, this function equates the information present in the CSV with that present on the triplestore.
1536 :params row: a dictionary representing a CSV row
1537 :type row: Dict[str, str]
1538 :params metaval: the MetaID identifying the bibliographic resource contained in the input CSV row
1539 :type metaval: str
1540 :returns: None -- This method modifies the input CSV row without returning it.
1541 """
1542 self.log[self.rowcnt]["id"]["status"] = "Entity already exists"
1543 known_data = self.finder.retrieve_br_info_from_meta(metaval)
1544 try:
1545 known_data["author"] = self.__get_resp_agents(metaval, "author")
1546 except ValueError:
1547 print(row)
1548 raise (ValueError)
1549 known_data["editor"] = self.__get_resp_agents(metaval, "editor")
1550 known_data["publisher"] = self.finder.retrieve_publisher_from_br_metaid(metaval)
1551 for datum in ["pub_date", "type", "volume", "issue"]:
1552 if known_data[datum]:
1553 if row[datum] and row[datum] != known_data[datum]:
1554 self.log[self.rowcnt][datum]["status"] = "New value proposed"
1555 row[datum] = known_data[datum]
1556 for datum in ["author", "editor", "publisher"]:
1557 if known_data[datum] and not row[datum]:
1558 row[datum] = known_data[datum]
1559 if known_data["venue"]:
1560 current_venue = row["venue"]
1561 known_venue = known_data["venue"]
1563 if current_venue:
1564 # Extract the IDs from the current venue
1565 current_venue_name, current_venue_ids = self.extract_name_and_ids(
1566 current_venue
1567 )
1568 known_venue_name, known_venue_ids = self.extract_name_and_ids(
1569 known_venue
1570 )
1572 current_venue_ids_set = set(current_venue_ids)
1573 known_venue_ids_set = set(known_venue_ids)
1575 common_ids = current_venue_ids_set.intersection(known_venue_ids_set)
1577 if common_ids:
1578 # Merge the IDs and use the title from the known venue
1579 merged_ids = current_venue_ids_set.union(known_venue_ids_set)
1580 row["venue"] = (
1581 f"{known_venue_name} [{' '.join(sorted(merged_ids))}]"
1582 )
1583 else:
1584 # Use the known venue information entirely
1585 row["venue"] = known_venue
1586 else:
1587 row["venue"] = known_venue
1588 if known_data["page"]:
1589 if row["page"] and row["page"] != known_data["page"][1]:
1590 self.log[self.rowcnt]["page"]["status"] = "New value proposed"
1591 row["page"] = known_data["page"][1]
1592 self.remeta[metaval] = known_data["page"]
1594 def __get_resp_agents(self, metaid: str, column: str) -> str:
1595 resp_agents = self.finder.retrieve_ra_sequence_from_br_meta(metaid, column)
1596 output = ""
1597 if resp_agents:
1598 full_resp_agents = list()
1599 for item in resp_agents:
1600 for _, resp_agent in item.items():
1601 author_name = resp_agent[0]
1602 ids = [f"omid:ra/{resp_agent[2]}"]
1603 ids.extend([id[1] for id in resp_agent[1]])
1604 author_ids = "[" + " ".join(ids) + "]"
1605 full_resp_agent = author_name + " " + author_ids
1606 full_resp_agents.append(full_resp_agent)
1607 output = "; ".join(full_resp_agents)
1608 return output
1611def is_a_valid_row(row: Dict[str, str]) -> bool:
1612 """
1613 This method discards invalid rows in the input CSV file.
1615 :params row: a dictionary representing a CSV row
1616 :type row: Dict[str, str]
1617 :returns: bool -- This method returns True if the row is valid, False if it is invalid.
1618 """
1619 br_type = " ".join((row["type"].lower()).split())
1620 br_title = row["title"]
1621 br_volume = row["volume"]
1622 br_issue = row["issue"]
1623 br_venue = row["venue"]
1624 if row["id"]:
1625 if (br_volume or br_issue) and (not br_type or not br_venue):
1626 return False
1627 return True
1628 if all(not row[value] for value in row):
1629 return False
1630 br_author = row["author"]
1631 br_editor = row["editor"]
1632 br_pub_date = row["pub_date"]
1633 if not br_type or br_type in {
1634 "book",
1635 "data file",
1636 "dataset",
1637 "dissertation",
1638 "edited book",
1639 "journal article",
1640 "monograph",
1641 "other",
1642 "peer review",
1643 "posted content",
1644 "web content",
1645 "proceedings article",
1646 "report",
1647 "reference book",
1648 }:
1649 is_a_valid_row = (
1650 True if br_title and br_pub_date and (br_author or br_editor) else False
1651 )
1652 elif br_type in {
1653 "book chapter",
1654 "book part",
1655 "book section",
1656 "book track",
1657 "component",
1658 "reference entry",
1659 }:
1660 is_a_valid_row = True if br_title and br_venue else False
1661 elif br_type in {
1662 "book series",
1663 "book set",
1664 "journal",
1665 "proceedings",
1666 "proceedings series",
1667 "report series",
1668 "standard",
1669 "standard series",
1670 }:
1671 is_a_valid_row = True if br_title else False
1672 elif br_type == "journal volume":
1673 is_a_valid_row = True if br_venue and (br_volume or br_title) else False
1674 elif br_type == "journal issue":
1675 is_a_valid_row = True if br_venue and (br_issue or br_title) else False
1676 return is_a_valid_row
1679def get_edited_br_metaid(row: dict, metaid: str, venue_metaid: str) -> Tuple[str, bool]:
1680 if row["author"] and row["venue"] and row["type"] in CONTAINER_EDITOR_TYPES:
1681 edited_br_metaid = venue_metaid
1682 else:
1683 edited_br_metaid = metaid
1684 return edited_br_metaid