Coverage for oc_meta/core/curator.py: 90%
991 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-07-14 14:06 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-07-14 14:06 +0000
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3# Copyright 2019 Silvio Peroni <essepuntato@gmail.com>
4# Copyright 2019-2020 Fabio Mariani <fabio.mariani555@gmail.com>
5# Copyright 2021 Simone Persiani <iosonopersia@gmail.com>
6# Copyright 2021-2022 Arcangelo Massari <arcangelo.massari@unibo.it>
7#
8# Permission to use, copy, modify, and/or distribute this software for any purpose
9# with or without fee is hereby granted, provided that the above copyright notice
10# and this permission notice appear in all copies.
11#
12# THE SOFTWARE IS PROVIDED 'AS IS' AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
13# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
14# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
15# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
16# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
17# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
18# SOFTWARE.
20from __future__ import annotations
22import json
23import os
24import re
25from typing import Dict, List, Tuple
27from oc_meta.constants import CONTAINER_EDITOR_TYPES
28from oc_meta.lib.cleaner import Cleaner
29from oc_meta.lib.file_manager import *
30from oc_meta.lib.finder import *
31from oc_meta.lib.master_of_regex import *
32from oc_ocdm.counter_handler.redis_counter_handler import RedisCounterHandler
35class Curator:
37 def __init__(
38 self,
39 data: List[dict],
40 ts: str,
41 prov_config: str,
42 counter_handler: RedisCounterHandler,
43 base_iri: str = "https://w3id.org/oc/meta",
44 prefix: str = "060",
45 separator: str = None,
46 valid_dois_cache: dict = dict(),
47 settings: dict | None = None,
48 silencer: list = [],
49 meta_config_path: str = None,
50 ):
51 self.settings = settings or {}
52 self.everything_everywhere_allatonce = Graph()
53 self.finder = ResourceFinder(
54 ts,
55 base_iri,
56 self.everything_everywhere_allatonce,
57 settings=settings,
58 meta_config_path=meta_config_path,
59 )
60 self.base_iri = base_iri
61 self.prov_config = prov_config
62 self.separator = separator
63 # Preliminary pass to clear volume and issue if id is present but venue is missing
64 for row in data:
65 if row["id"] and (row["volume"] or row["issue"]):
66 if not row["venue"]:
67 row["volume"] = ""
68 row["issue"] = ""
69 if not row["type"]:
70 row["type"] = "journal article"
71 self.data = [
72 {field: value.strip() for field, value in row.items()}
73 for row in data
74 if is_a_valid_row(row)
75 ]
76 self.prefix = prefix
77 # Redis counter handler
78 self.counter_handler = counter_handler
79 self.brdict = {}
80 self.radict: Dict[str, Dict[str, list]] = {}
81 self.ardict: Dict[str, Dict[str, list]] = {}
82 self.vvi = {} # Venue, Volume, Issue
83 self.idra = {} # key id; value metaid of id related to ra
84 self.idbr = {} # key id; value metaid of id related to br
85 self.rameta = dict()
86 self.brmeta = dict()
87 self.armeta = dict()
88 self.remeta = dict()
89 self.wnb_cnt = 0 # wannabe counter
90 self.rowcnt = 0
91 self.log = dict()
92 self.valid_dois_cache = valid_dois_cache
93 self.preexisting_entities = set()
94 self.silencer = silencer
96 def collect_identifiers(self, valid_dois_cache):
97 all_metavals = set()
98 all_idslist = set()
99 all_vvis = set()
100 for row in self.data:
101 metavals, idslist, vvis = self.extract_identifiers_and_metavals(
102 row, valid_dois_cache=valid_dois_cache
103 )
104 all_metavals.update(metavals)
105 all_idslist.update(idslist)
106 all_vvis.update(vvis)
107 return all_metavals, all_idslist, all_vvis
109 def extract_identifiers_and_metavals(
110 self, row, valid_dois_cache
111 ) -> Tuple[set, set, set]:
112 metavals = set()
113 identifiers = set()
114 vvis = set()
115 venue_ids = set()
116 venue_metaid = None
118 if row["id"]:
119 idslist, metaval = self.clean_id_list(
120 self.split_identifiers(row["id"]),
121 br=True,
122 valid_dois_cache=valid_dois_cache,
123 )
124 id_metaval = f"omid:br/{metaval}" if metaval else ""
125 if id_metaval:
126 metavals.add(id_metaval)
127 if idslist:
128 identifiers.update(idslist)
130 fields_with_an_id = [
131 (field, re.search(name_and_ids, row[field]).group(2).split())
132 for field in ["author", "editor", "publisher", "venue", "volume", "issue"]
133 if re.search(name_and_ids, row[field])
134 ]
135 for field, field_ids in fields_with_an_id:
136 br = field in ["venue", "volume", "issue"]
137 field_idslist, field_metaval = self.clean_id_list(
138 field_ids, br=br, valid_dois_cache=valid_dois_cache
139 )
140 if field_metaval:
141 field_metaval = (
142 f"omid:br/{field_metaval}" if br else f"omid:ra/{field_metaval}"
143 )
144 else:
145 field_metaval = ""
146 if field_metaval:
147 metavals.add(field_metaval)
148 if field == "venue":
149 venue_metaid = field_metaval
150 if field_idslist:
151 venue_ids.update(field_idslist)
152 else:
153 if field_idslist:
154 identifiers.update(field_idslist)
156 if (venue_metaid or venue_ids) and (row["volume"] or row["issue"]):
157 vvi = (row["volume"], row["issue"], venue_metaid, tuple(sorted(venue_ids)))
158 vvis.add(vvi)
160 return metavals, identifiers, vvis
162 def split_identifiers(self, field_value):
163 if self.separator:
164 return re.sub(colon_and_spaces, ":", field_value).split(self.separator)
165 else:
166 return re.split(
167 one_or_more_spaces, re.sub(colon_and_spaces, ":", field_value)
168 )
170 def curator(
171 self, filename: str = None, path_csv: str = None, path_index: str = None
172 ):
173 metavals, identifiers, vvis = self.collect_identifiers(
174 valid_dois_cache=self.valid_dois_cache
175 )
176 self.finder.get_everything_about_res(
177 metavals=metavals, identifiers=identifiers, vvis=vvis
178 )
179 for row in self.data:
180 self.log[self.rowcnt] = {
181 "id": {},
182 "title": {},
183 "author": {},
184 "venue": {},
185 "editor": {},
186 "publisher": {},
187 "page": {},
188 "volume": {},
189 "issue": {},
190 "pub_date": {},
191 "type": {},
192 }
193 self.clean_id(row)
194 self.rowcnt += 1
195 self.merge_duplicate_entities()
196 self.clean_metadata_without_id()
197 self.rowcnt = 0
198 for row in self.data:
199 self.clean_vvi(row)
200 self.rowcnt += 1
201 self.rowcnt = 0
202 for row in self.data:
203 self.clean_ra(row, "author")
204 self.clean_ra(row, "publisher")
205 self.clean_ra(row, "editor")
206 self.rowcnt += 1
207 self.get_preexisting_entities()
208 self.meta_maker()
209 self.log = self.log_update()
210 self.enrich()
211 # Remove duplicates
212 self.data = list({v["id"]: v for v in self.data}.values())
213 if path_index:
214 path_index = os.path.join(path_index, filename)
215 self.filename = filename
216 self.indexer(path_index, path_csv)
218 # ID
219 def clean_id(self, row: Dict[str, str]) -> None:
220 """
221 The 'clean id()' function is executed for each CSV row.
222 In this process, any duplicates are detected by the IDs in the 'id' column.
223 For each line, a wannabeID or, if the bibliographic resource was found in the triplestore,
224 a MetaID is assigned.
225 Finally, this method enrich and clean the fields related to the
226 title, venue, volume, issue, page, publication date and type.
228 :params row: a dictionary representing a CSV row
229 :type row: Dict[str, str]
230 :returns: None -- This method modifies the input CSV row without returning it.
231 """
232 if row["title"]:
233 name = Cleaner(row["title"]).clean_title(
234 self.settings.get("normalize_titles")
235 )
236 else:
237 name = ""
238 metaval_ids_list = []
239 if row["id"]:
240 if self.separator:
241 idslist = re.sub(colon_and_spaces, ":", row["id"]).split(self.separator)
242 else:
243 idslist = re.split(
244 one_or_more_spaces, re.sub(colon_and_spaces, ":", row["id"])
245 )
246 idslist, metaval = self.clean_id_list(
247 idslist, br=True, valid_dois_cache=self.valid_dois_cache
248 )
249 id_metaval = f"omid:br/{metaval}" if metaval else ""
250 metaval_ids_list.append((id_metaval, idslist))
251 fields_with_an_id = [
252 (field, re.search(name_and_ids, row[field]).group(2).split())
253 for field in ["author", "editor", "publisher", "venue", "volume", "issue"]
254 if re.search(name_and_ids, row[field])
255 ]
256 for field, field_ids in fields_with_an_id:
257 if field in ["author", "editor", "publisher"]:
258 br = False
259 elif field in ["venue", "volume", "issue"]:
260 br = True
261 field_idslist, field_metaval = self.clean_id_list(
262 field_ids, br=br, valid_dois_cache=self.valid_dois_cache
263 )
264 if field_metaval:
265 field_metaval = (
266 f"omid:br/{field_metaval}" if br else f"omid:ra/{field_metaval}"
267 )
268 else:
269 field_metaval = ""
270 metaval_ids_list.append((field_metaval, field_idslist))
271 if row["id"]:
272 metaval = self.id_worker(
273 "id",
274 name,
275 idslist,
276 metaval,
277 ra_ent=False,
278 br_ent=True,
279 vvi_ent=False,
280 publ_entity=False,
281 )
282 else:
283 metaval = self.new_entity(self.brdict, name)
284 row["title"] = self.brdict[metaval]["title"]
285 row["id"] = metaval
287 def clean_metadata_without_id(self):
288 for row in self.data:
289 # page
290 if row["page"]:
291 row["page"] = Cleaner(row["page"]).normalize_hyphens()
292 # date
293 if row["pub_date"]:
294 date = Cleaner(row["pub_date"]).normalize_hyphens()
295 date = Cleaner(date).clean_date()
296 row["pub_date"] = date
297 # type
298 if row["type"]:
299 entity_type = " ".join((row["type"].lower()).split())
300 if entity_type == "edited book" or entity_type == "monograph":
301 entity_type = "book"
302 elif (
303 entity_type == "report series"
304 or entity_type == "standard series"
305 or entity_type == "proceedings series"
306 ):
307 entity_type = "series"
308 elif entity_type == "posted content":
309 entity_type = "web content"
310 if entity_type in {
311 "abstract",
312 "archival document",
313 "audio document",
314 "book",
315 "book chapter",
316 "book part",
317 "book section",
318 "book series",
319 "book set",
320 "computer program",
321 "data file",
322 "data management plan",
323 "dataset",
324 "dissertation",
325 "editorial",
326 "journal",
327 "journal article",
328 "journal editorial",
329 "journal issue",
330 "journal volume",
331 "newspaper",
332 "newspaper article",
333 "newspaper editorial",
334 "newspaper issue",
335 "peer review",
336 "preprint",
337 "presentation",
338 "proceedings",
339 "proceedings article",
340 "proceedings series",
341 "reference book",
342 "reference entry",
343 "retraction notice",
344 "series",
345 "report",
346 "standard",
347 "web content",
348 }:
349 row["type"] = entity_type
350 else:
351 row["type"] = ""
353 # VVI
354 def clean_vvi(self, row: Dict[str, str]) -> None:
355 """
356 This method performs the deduplication process for venues, volumes and issues.
357 The acquired information is stored in the 'vvi' dictionary, that has the following format: ::
359 {
360 VENUE_IDENTIFIER: {
361 'issue': {SEQUENCE_IDENTIFIER: {'id': META_ID}},
362 'volume': {
363 SEQUENCE_IDENTIFIER: {
364 'id': META_ID,
365 'issue' {SEQUENCE_IDENTIFIER: {'id': META_ID}}
366 }
367 }
368 }
369 }
371 {
372 '4416': {
373 'issue': {},
374 'volume': {
375 '166': {'id': '4388', 'issue': {'4': {'id': '4389'}}},
376 '172': {'id': '4434',
377 'issue': {
378 '22': {'id': '4435'},
379 '20': {'id': '4436'},
380 '21': {'id': '4437'},
381 '19': {'id': '4438'}
382 }
383 }
384 }
385 }
386 }
388 :params row: a dictionary representing a CSV row
389 :type row: Dict[str, str]
390 :returns: None -- This method modifies the input CSV row without returning it.
391 """
392 if row["type"] not in {
393 "journal article",
394 "journal volume",
395 "journal issue",
396 } and (row["volume"] or row["issue"]):
397 row["volume"] = ""
398 row["issue"] = ""
399 Cleaner.clean_volume_and_issue(row=row)
400 vol_meta = None
401 br_type = row["type"]
402 volume = row["volume"]
403 issue = row["issue"]
404 br_id = row["id"]
405 venue = row["venue"]
406 # Venue
407 if venue:
408 # The data must be invalidated, because the resource is journal but a volume or an issue have also been specified
409 if br_type == "journal" and (volume or issue):
410 row["venue"] = ""
411 row["volume"] = ""
412 row["issue"] = ""
413 venue_id = re.search(name_and_ids, venue)
414 if venue_id:
415 name = Cleaner(venue_id.group(1)).clean_title(
416 self.settings.get("normalize_titles")
417 )
418 venue_id = venue_id.group(2)
419 if self.separator:
420 idslist = re.sub(colon_and_spaces, ":", venue_id).split(
421 self.separator
422 )
423 else:
424 idslist = re.split(
425 one_or_more_spaces, re.sub(colon_and_spaces, ":", venue_id)
426 )
427 idslist, metaval = self.clean_id_list(
428 idslist, br=True, valid_dois_cache=self.valid_dois_cache
429 )
431 metaval = self.id_worker(
432 "venue",
433 name,
434 idslist,
435 metaval,
436 ra_ent=False,
437 br_ent=True,
438 vvi_ent=True,
439 publ_entity=False,
440 )
441 if metaval not in self.vvi:
442 ts_vvi = None
443 if "wannabe" not in metaval:
444 ts_vvi = self.finder.retrieve_venue_from_local_graph(metaval)
445 if "wannabe" in metaval or not ts_vvi:
446 self.vvi[metaval] = dict()
447 self.vvi[metaval]["volume"] = dict()
448 self.vvi[metaval]["issue"] = dict()
449 elif ts_vvi:
450 self.vvi[metaval] = ts_vvi
451 else:
452 name = Cleaner(venue).clean_title(self.settings.get("normalize_titles"))
453 metaval = self.new_entity(self.brdict, name)
454 self.vvi[metaval] = dict()
455 self.vvi[metaval]["volume"] = dict()
456 self.vvi[metaval]["issue"] = dict()
457 row["venue"] = metaval
459 # Volume
460 if volume and (br_type == "journal issue" or br_type == "journal article"):
461 if volume in self.vvi[metaval]["volume"]:
462 vol_meta = self.vvi[metaval]["volume"][volume]["id"]
463 else:
464 vol_meta = self.new_entity(self.brdict, "")
465 self.vvi[metaval]["volume"][volume] = dict()
466 self.vvi[metaval]["volume"][volume]["id"] = vol_meta
467 self.vvi[metaval]["volume"][volume]["issue"] = dict()
468 elif volume and br_type == "journal volume":
469 # The data must be invalidated, because the resource is a journal volume but an issue has also been specified
470 if issue:
471 row["volume"] = ""
472 row["issue"] = ""
473 else:
474 vol_meta = br_id
475 self.volume_issue(
476 vol_meta, self.vvi[metaval]["volume"], volume, row
477 )
479 # Issue
480 if issue and br_type == "journal article":
481 row["issue"] = issue
482 if vol_meta:
483 if issue not in self.vvi[metaval]["volume"][volume]["issue"]:
484 issue_meta = self.new_entity(self.brdict, "")
485 self.vvi[metaval]["volume"][volume]["issue"][issue] = dict()
486 self.vvi[metaval]["volume"][volume]["issue"][issue][
487 "id"
488 ] = issue_meta
489 else:
490 if issue not in self.vvi[metaval]["issue"]:
491 issue_meta = self.new_entity(self.brdict, "")
492 self.vvi[metaval]["issue"][issue] = dict()
493 self.vvi[metaval]["issue"][issue]["id"] = issue_meta
494 elif issue and br_type == "journal issue":
495 issue_meta = br_id
496 if vol_meta:
497 self.volume_issue(
498 issue_meta,
499 self.vvi[metaval]["volume"][volume]["issue"],
500 issue,
501 row,
502 )
503 else:
504 self.volume_issue(
505 issue_meta, self.vvi[metaval]["issue"], issue, row
506 )
508 else:
509 row["venue"] = ""
510 row["volume"] = ""
511 row["issue"] = ""
513 # RA
514 def clean_ra(self, row, col_name):
515 """
516 This method performs the deduplication process for responsible agents (authors, publishers and editors).
518 :params row: a dictionary representing a CSV row
519 :type row: Dict[str, str]
520 :params col_name: the CSV column name. It can be 'author', 'publisher', or 'editor'
521 :type col_name: str
522 :returns: None -- This method modifies self.ardict, self.radict, and self.idra, and returns None.
523 """
525 def get_br_metaval_to_check(row, col_name):
526 if col_name == "editor":
527 return get_edited_br_metaid(row, row["id"], row["venue"])
528 else:
529 return row["id"]
531 def get_br_metaval(br_metaval_to_check):
532 if br_metaval_to_check in self.brdict or br_metaval_to_check in self.vvi:
533 return br_metaval_to_check
534 return [
535 id
536 for id in self.brdict
537 if br_metaval_to_check in self.brdict[id]["others"]
538 ][0]
540 def initialize_ardict_entry(br_metaval):
541 if br_metaval not in self.ardict:
542 self.ardict[br_metaval] = {"author": [], "editor": [], "publisher": []}
544 def initialize_sequence(br_metaval, col_name):
545 sequence = []
546 if "wannabe" in br_metaval:
547 sequence = []
548 else:
549 sequence_found = self.finder.retrieve_ra_sequence_from_br_meta(
550 br_metaval, col_name
551 )
552 if sequence_found:
553 sequence = []
554 for agent in sequence_found:
555 for ar_metaid in agent:
556 ra_metaid = agent[ar_metaid][2]
557 sequence.append(tuple((ar_metaid, ra_metaid)))
558 if ra_metaid not in self.radict:
559 self.radict[ra_metaid] = dict()
560 self.radict[ra_metaid]["ids"] = list()
561 self.radict[ra_metaid]["others"] = list()
562 self.radict[ra_metaid]["title"] = agent[ar_metaid][0]
563 for identifier in agent[ar_metaid][1]:
564 # other ids after meta
565 id_metaid = identifier[0]
566 literal = identifier[1]
567 if id_metaid not in self.idra:
568 self.idra[literal] = id_metaid
569 if literal not in self.radict[ra_metaid]["ids"]:
570 self.radict[ra_metaid]["ids"].append(literal)
571 self.ardict[br_metaval][col_name].extend(sequence)
572 else:
573 sequence = []
574 return sequence
576 def parse_ra_list(row):
577 ra_list = re.split(semicolon_in_people_field, row[col_name])
578 ra_list = Cleaner.clean_ra_list(ra_list)
579 return ra_list
581 def process_individual_ra(ra, sequence):
582 new_elem_seq = True
583 ra_id = None
584 ra_id_match = re.search(name_and_ids, ra)
585 if ra_id_match:
586 cleaner = Cleaner(ra_id_match.group(1))
587 name = cleaner.clean_name()
588 ra_id = ra_id_match.group(2)
589 else:
590 cleaner = Cleaner(ra)
591 name = cleaner.clean_name()
592 if not ra_id and sequence:
593 for _, ra_metaid in sequence:
594 if self.radict[ra_metaid]["title"] == name:
595 ra_id = "omid:ra/" + str(ra_metaid)
596 new_elem_seq = False
597 break
598 return ra_id, name, new_elem_seq
600 if not row[col_name]:
601 return
603 br_metaval_to_check = get_br_metaval_to_check(row, col_name)
604 br_metaval = get_br_metaval(br_metaval_to_check)
605 initialize_ardict_entry(br_metaval)
607 sequence = self.ardict[br_metaval].get(col_name, [])
608 if not sequence:
609 sequence = initialize_sequence(br_metaval, col_name)
610 if col_name in self.silencer and sequence:
611 return
613 ra_list = parse_ra_list(row)
614 new_sequence = list()
615 change_order = False
617 for pos, ra in enumerate(ra_list):
618 ra_id, name, new_elem_seq = process_individual_ra(ra, sequence)
619 if ra_id:
620 if self.separator:
621 ra_id_list = re.sub(colon_and_spaces, ":", ra_id).split(
622 self.separator
623 )
624 else:
625 ra_id_list = re.split(
626 one_or_more_spaces, re.sub(colon_and_spaces, ":", ra_id)
627 )
628 if sequence:
629 ar_ra = None
630 for ps, el in enumerate(sequence):
631 ra_metaid = el[1]
632 for literal in ra_id_list:
633 if literal in self.radict[ra_metaid]["ids"]:
634 if ps != pos:
635 change_order = True
636 new_elem_seq = False
637 if "wannabe" not in ra_metaid:
638 ar_ra = ra_metaid
639 for pos, literal_value in enumerate(ra_id_list):
640 if "omid" in literal_value:
641 ra_id_list[pos] = ""
642 break
643 ra_id_list = list(filter(None, ra_id_list))
644 ra_id_list.append("omid:ra/" + ar_ra)
645 if not ar_ra:
646 # new element
647 for ar_metaid, ra_metaid in sequence:
648 if self.radict[ra_metaid]["title"] == name:
649 new_elem_seq = False
650 if "wannabe" not in ra_metaid:
651 ar_ra = ra_metaid
652 for pos, i in enumerate(ra_id_list):
653 if "omid" in i:
654 ra_id_list[pos] = ""
655 break
656 ra_id_list = list(filter(None, ra_id_list))
657 ra_id_list.append("omid:ra/" + ar_ra)
658 if col_name == "publisher":
659 ra_id_list, metaval = self.clean_id_list(
660 ra_id_list, br=False, valid_dois_cache=self.valid_dois_cache
661 )
662 metaval = self.id_worker(
663 "publisher",
664 name,
665 ra_id_list,
666 metaval,
667 ra_ent=True,
668 br_ent=False,
669 vvi_ent=False,
670 publ_entity=True,
671 )
672 else:
673 ra_id_list, metaval = self.clean_id_list(
674 ra_id_list, br=False, valid_dois_cache=self.valid_dois_cache
675 )
676 metaval = self.id_worker(
677 col_name,
678 name,
679 ra_id_list,
680 metaval,
681 ra_ent=True,
682 br_ent=False,
683 vvi_ent=False,
684 publ_entity=False,
685 )
686 if col_name != "publisher" and metaval in self.radict:
687 full_name: str = self.radict[metaval]["title"]
688 if "," in name and "," in full_name:
689 first_name = name.split(",")[1].strip()
690 if (
691 not full_name.split(",")[1].strip() and first_name
692 ): # first name found!
693 given_name = full_name.split(",")[0]
694 self.radict[metaval]["title"] = (
695 given_name + ", " + first_name
696 )
697 else:
698 metaval = self.new_entity(self.radict, name)
699 if new_elem_seq:
700 role = self.prefix + str(self._add_number("ar"))
701 new_sequence.append(tuple((role, metaval)))
702 if change_order:
703 self.log[self.rowcnt][col_name][
704 "Info"
705 ] = "New RA sequence proposed: refused"
706 sequence.extend(new_sequence)
707 self.ardict[br_metaval][col_name] = sequence
709 @staticmethod
710 def clean_id_list(
711 id_list: List[str], br: bool, valid_dois_cache: dict = dict()
712 ) -> Tuple[list, str]:
713 """
714 Clean IDs in the input list and check if there is a MetaID.
716 :params: id_list: a list of IDs
717 :type: id_list: List[str]
718 :params: br: True if the IDs in id_list refer to bibliographic resources, False otherwise
719 :type: br: bool
720 :returns: Tuple[list, str]: -- it returns a two-elements tuple, where the first element is the list of cleaned IDs, while the second is a MetaID if any was found.
721 """
722 pattern = "br/" if br else "ra/"
723 metaid = ""
724 id_list = list(filter(None, id_list))
725 clean_list = list()
727 for elem in id_list:
728 if elem in clean_list:
729 continue
730 elem = Cleaner(elem).normalize_hyphens()
731 identifier = elem.split(":", 1)
732 schema = identifier[0].lower()
733 value = identifier[1]
735 if schema == "omid":
736 metaid = value.replace(pattern, "")
737 else:
738 normalized_id = Cleaner(elem).normalize_id(
739 valid_dois_cache=valid_dois_cache
740 )
741 if normalized_id:
742 clean_list.append(normalized_id)
744 how_many_meta = [i for i in id_list if i.lower().startswith("omid")]
745 if len(how_many_meta) > 1:
746 clean_list = [i for i in clean_list if not i.lower().startswith("omid")]
748 return clean_list, metaid
750 def conflict(
751 self, idslist: List[str], name: str, id_dict: dict, col_name: str
752 ) -> str:
753 if col_name == "id" or col_name == "venue":
754 entity_dict = self.brdict
755 elif col_name == "author" or col_name == "editor" or col_name == "publisher":
756 entity_dict = self.radict
757 metaval = self.new_entity(entity_dict, name)
758 entity_dict[metaval] = {"ids": list(), "others": list(), "title": name}
759 self.log[self.rowcnt][col_name]["Conflict entity"] = metaval
760 for identifier in idslist:
761 entity_dict[metaval]["ids"].append(identifier)
762 if identifier not in id_dict:
763 schema_value = identifier.split(":", maxsplit=1)
764 found_metaid = self.finder.retrieve_metaid_from_id(
765 schema_value[0], schema_value[1]
766 )
767 if found_metaid:
768 id_dict[identifier] = found_metaid
769 else:
770 self.__update_id_count(id_dict, identifier)
771 return metaval
773 def finder_sparql(self, list_to_find, br=True, ra=False, vvi=False, publ=False):
774 match_elem = list()
775 id_set = set()
776 res = None
777 for elem in list_to_find:
778 if len(match_elem) < 2:
779 identifier = elem.split(":", maxsplit=1)
780 value = identifier[1]
781 schema = identifier[0]
782 if br:
783 res = self.finder.retrieve_br_from_id(schema, value)
784 elif ra:
785 res = self.finder.retrieve_ra_from_id(schema, value, publ)
786 if res:
787 for f in res:
788 if f[0] not in id_set:
789 match_elem.append(f)
790 id_set.add(f[0])
791 return match_elem
793 def ra_update(self, row: dict, br_key: str, col_name: str) -> None:
794 if row[col_name]:
795 sequence = self.armeta[br_key][col_name]
796 ras_list = list()
797 for _, ra_id in sequence:
798 ra_name = self.rameta[ra_id]["title"]
799 ra_ids = self.rameta[ra_id]["ids"]
800 ra = self.build_name_ids_string(ra_name, ra_ids)
801 ras_list.append(ra)
802 row[col_name] = "; ".join(ras_list)
804 @staticmethod
805 def build_name_ids_string(name, ids):
806 if name and ids:
807 ra_string = f"{name} [{' '.join(ids)}]"
808 elif name and not ids:
809 ra_string = name
810 elif ids and not name:
811 ra_string = f"[{' '.join(ids)}]"
812 elif not ids and not name:
813 ra_string = ""
814 return ra_string
816 @staticmethod
817 def __local_match(list_to_match, dict_to_match: dict):
818 match_elem = dict()
819 match_elem["existing"] = list()
820 match_elem["wannabe"] = list()
821 for elem in list_to_match:
822 for k, va in dict_to_match.items():
823 if elem in va["ids"]:
824 if "wannabe" in k:
825 if k not in match_elem["wannabe"]:
826 match_elem["wannabe"].append(k)
827 else:
828 if k not in match_elem["existing"]:
829 match_elem["existing"].append(k)
830 return match_elem
832 def __meta_ar(self, newkey, oldkey, role):
833 for x, k in self.ardict[oldkey][role]:
834 if "wannabe" in k:
835 for m in self.rameta:
836 if k in self.rameta[m]["others"]:
837 new_v = m
838 break
839 else:
840 new_v = k
841 self.armeta[newkey][role].append(tuple((x, new_v)))
843 def __tree_traverse(self, tree: dict, key: str, values: List[Tuple]) -> None:
844 for k, v in tree.items():
845 if k == key:
846 values.append(v)
847 elif isinstance(v, dict):
848 found = self.__tree_traverse(v, key, values)
849 if found is not None:
850 values.append(found)
852 def get_preexisting_entities(self) -> None:
853 for entity_type in {"br", "ra"}:
854 for entity_metaid, data in getattr(self, f"{entity_type}dict").items():
855 if not entity_metaid.startswith("wannabe"):
856 self.preexisting_entities.add(f"{entity_type}/{entity_metaid}")
857 for entity_id_literal in data["ids"]:
858 preexisting_entity_id_metaid = getattr(
859 self, f"id{entity_type}"
860 )[entity_id_literal]
861 self.preexisting_entities.add(
862 f"id/{preexisting_entity_id_metaid}"
863 )
864 for _, roles in self.ardict.items():
865 for _, ar_ras in roles.items():
866 for ar_ra in ar_ras:
867 if not ar_ra[1].startswith("wannabe"):
868 self.preexisting_entities.add(f"ar/{ar_ra[0]}")
869 for venue_metaid, vi in self.vvi.items():
870 if not venue_metaid.startswith("wannabe"):
871 wannabe_preexisting_vis = list()
872 self.__tree_traverse(vi, "id", wannabe_preexisting_vis)
873 self.preexisting_entities.update(
874 {
875 f"br/{vi_metaid}"
876 for vi_metaid in wannabe_preexisting_vis
877 if not vi_metaid.startswith("wannabe")
878 }
879 )
880 for _, re_metaid in self.remeta.items():
881 self.preexisting_entities.add(f"re/{re_metaid[0]}")
883 def meta_maker(self):
884 """
885 For each dictionary ('brdict', 'ardict', 'radict', 'vvi') the corresponding MetaID dictionary is created
886 ('brmeta', 'armeta', 'rameta', and 'vvi').
887 """
888 for identifier in self.brdict:
889 if "wannabe" in identifier:
890 other = identifier
891 count = self._add_number("br")
892 meta = self.prefix + str(count)
893 self.brmeta[meta] = self.brdict[identifier]
894 self.brmeta[meta]["others"].append(other)
895 self.brmeta[meta]["ids"].append("omid:br/" + meta)
896 else:
897 self.brmeta[identifier] = self.brdict[identifier]
898 self.brmeta[identifier]["ids"].append("omid:br/" + identifier)
899 for identifier in self.radict:
900 if "wannabe" in identifier:
901 other = identifier
902 count = self._add_number("ra")
903 meta = self.prefix + str(count)
904 self.rameta[meta] = self.radict[identifier]
905 self.rameta[meta]["others"].append(other)
906 self.rameta[meta]["ids"].append("omid:ra/" + meta)
907 else:
908 self.rameta[identifier] = self.radict[identifier]
909 self.rameta[identifier]["ids"].append("omid:ra/" + identifier)
910 for ar_id in self.ardict:
911 if "wannabe" in ar_id:
912 for br_id in self.brmeta:
913 if ar_id in self.brmeta[br_id]["others"]:
914 br_key = br_id
915 break
916 else:
917 br_key = ar_id
918 self.armeta[br_key] = dict()
919 self.armeta[br_key]["author"] = list()
920 self.armeta[br_key]["editor"] = list()
921 self.armeta[br_key]["publisher"] = list()
922 self.__meta_ar(br_key, ar_id, "author")
923 self.__meta_ar(br_key, ar_id, "editor")
924 self.__meta_ar(br_key, ar_id, "publisher")
925 self.VolIss = dict()
926 if self.vvi:
927 for venue_meta in self.vvi:
928 venue_issue = self.vvi[venue_meta]["issue"]
929 if venue_issue:
930 for issue in venue_issue:
931 issue_id = venue_issue[issue]["id"]
932 if "wannabe" in issue_id:
933 for br_meta in self.brmeta:
934 if issue_id in self.brmeta[br_meta]["others"]:
935 self.vvi[venue_meta]["issue"][issue]["id"] = str(
936 br_meta
937 )
938 break
940 venue_volume = self.vvi[venue_meta]["volume"]
941 if venue_volume:
942 for volume in venue_volume:
943 volume_id = venue_volume[volume]["id"]
944 if "wannabe" in volume_id:
945 for br_meta in self.brmeta:
946 if volume_id in self.brmeta[br_meta]["others"]:
947 self.vvi[venue_meta]["volume"][volume]["id"] = str(
948 br_meta
949 )
950 break
951 if venue_volume[volume]["issue"]:
952 volume_issue = venue_volume[volume]["issue"]
953 for issue in volume_issue:
954 volume_issue_id = volume_issue[issue]["id"]
955 if "wannabe" in volume_issue_id:
956 for br_meta in self.brmeta:
957 if (
958 volume_issue_id
959 in self.brmeta[br_meta]["others"]
960 ):
961 self.vvi[venue_meta]["volume"][volume][
962 "issue"
963 ][issue]["id"] = str(br_meta)
964 break
965 if "wannabe" in venue_meta:
966 for br_meta in self.brmeta:
967 if venue_meta in self.brmeta[br_meta]["others"]:
968 self.__merge_VolIss_with_vvi(br_meta, venue_meta)
969 else:
970 self.__merge_VolIss_with_vvi(venue_meta, venue_meta)
972 def enrich(self):
973 """
974 This method replaces the wannabeID placeholders with the
975 actual data and MetaIDs as a result of the deduplication process.
976 """
977 for row in self.data:
978 if "wannabe" in row["id"]:
979 for br_metaid in self.brmeta:
980 if row["id"] in self.brmeta[br_metaid]["others"]:
981 metaid = br_metaid
982 else:
983 metaid = row["id"]
984 if row["page"] and (metaid not in self.remeta):
985 re_meta = self.finder.retrieve_re_from_br_meta(metaid)
986 if re_meta:
987 self.remeta[metaid] = re_meta
988 row["page"] = re_meta[1]
989 else:
990 count = self.prefix + str(self._add_number("re"))
991 page = row["page"]
992 self.remeta[metaid] = (count, page)
993 row["page"] = page
994 elif metaid in self.remeta:
995 row["page"] = self.remeta[metaid][1]
996 row["id"] = " ".join(self.brmeta[metaid]["ids"])
997 row["title"] = self.brmeta[metaid]["title"]
998 venue_metaid = None
999 if row["venue"]:
1000 venue = row["venue"]
1001 if "wannabe" in venue:
1002 for i in self.brmeta:
1003 if venue in self.brmeta[i]["others"]:
1004 venue_metaid = i
1005 else:
1006 venue_metaid = venue
1007 row["venue"] = self.build_name_ids_string(
1008 self.brmeta[venue_metaid]["title"], self.brmeta[venue_metaid]["ids"]
1009 )
1010 br_key_for_editor = get_edited_br_metaid(row, metaid, venue_metaid)
1011 self.ra_update(row, metaid, "author")
1012 self.ra_update(row, metaid, "publisher")
1013 self.ra_update(row, br_key_for_editor, "editor")
1015 @staticmethod
1016 def name_check(ts_name, name):
1017 if "," in ts_name:
1018 names = ts_name.split(",")
1019 if names[0] and not names[1].strip():
1020 # there isn't a given name in ts
1021 if "," in name:
1022 gname = name.split(", ")[1]
1023 if gname.strip():
1024 ts_name = names[0] + ", " + gname
1025 return ts_name
1027 def _read_number(self, entity_type: str) -> int:
1028 return self.counter_handler.read_counter(
1029 entity_type, supplier_prefix=self.prefix
1030 )
1032 def _add_number(self, entity_type: str) -> int:
1033 return self.counter_handler.increment_counter(
1034 entity_type, supplier_prefix=self.prefix
1035 )
1037 def __update_id_and_entity_dict(
1038 self,
1039 existing_ids: list,
1040 id_dict: dict,
1041 entity_dict: Dict[str, Dict[str, list]],
1042 metaval: str,
1043 ) -> None:
1044 for identifier in existing_ids:
1045 if identifier[1] not in id_dict:
1046 id_dict[identifier[1]] = identifier[0]
1047 if identifier[1] not in entity_dict[metaval]["ids"]:
1048 entity_dict[metaval]["ids"].append(identifier[1])
1050 def indexer(self, path_index: str, path_csv: str) -> None:
1051 """
1052 This method is used to transform idra, idbr, armeta, remeta, brmeta and vvi in such a way as to be saved as csv and json files.
1053 As for venue, volume and issues, this method also takes care of replacing any wannabe_id with a meta_id.
1054 Finally, it generates the enriched CSV and saves it.
1056 :params path_index: a directory path. It will contain the indexes
1057 :type path_index: str
1058 :params path_csv: a file path. It will be the output enriched CSV
1059 :type path_csv: str
1060 """
1061 # ID
1062 self.index_id_ra = list()
1063 self.index_id_br = list()
1064 for entity_type in {"ra", "br"}:
1065 cur_index = getattr(self, f"id{entity_type}")
1066 if cur_index:
1067 for literal in cur_index:
1068 row = dict()
1069 row["id"] = str(literal)
1070 row["meta"] = str(cur_index[literal])
1071 getattr(self, f"index_id_{entity_type}").append(row)
1072 else:
1073 row = dict()
1074 row["id"] = ""
1075 row["meta"] = ""
1076 getattr(self, f"index_id_{entity_type}").append(row)
1077 # AR
1078 self.ar_index = list()
1079 if self.armeta:
1080 for metaid in self.armeta:
1081 index = dict()
1082 index["meta"] = metaid
1083 for role in self.armeta[metaid]:
1084 list_ar = list()
1085 for ar, ra in self.armeta[metaid][role]:
1086 list_ar.append(str(ar) + ", " + str(ra))
1087 index[role] = "; ".join(list_ar)
1088 self.ar_index.append(index)
1089 else:
1090 row = dict()
1091 row["meta"] = ""
1092 row["author"] = ""
1093 row["editor"] = ""
1094 row["publisher"] = ""
1095 self.ar_index.append(row)
1096 # RE
1097 self.re_index = list()
1098 if self.remeta:
1099 for x in self.remeta:
1100 r = dict()
1101 r["br"] = x
1102 r["re"] = str(self.remeta[x][0])
1103 self.re_index.append(r)
1104 else:
1105 row = dict()
1106 row["br"] = ""
1107 row["re"] = ""
1108 self.re_index.append(row)
1109 if self.filename:
1110 if not os.path.exists(path_index):
1111 os.makedirs(path_index)
1112 ra_path = os.path.join(path_index, "index_id_ra.csv")
1113 write_csv(ra_path, self.index_id_ra)
1114 br_path = os.path.join(path_index, "index_id_br.csv")
1115 write_csv(br_path, self.index_id_br)
1116 ar_path = os.path.join(path_index, "index_ar.csv")
1117 write_csv(ar_path, self.ar_index)
1118 re_path = os.path.join(path_index, "index_re.csv")
1119 write_csv(re_path, self.re_index)
1120 vvi_file = os.path.join(path_index, "index_vi.json")
1121 with open(vvi_file, "w") as fp:
1122 json.dump(self.VolIss, fp)
1123 if self.log:
1124 log_file = os.path.join(path_index + "log.json")
1125 with open(log_file, "w") as lf:
1126 json.dump(self.log, lf)
1127 if self.data:
1128 name = self.filename + ".csv"
1129 data_file = os.path.join(path_csv, name)
1130 write_csv(data_file, self.data)
1132 def __merge_VolIss_with_vvi(
1133 self, VolIss_venue_meta: str, vvi_venue_meta: str
1134 ) -> None:
1135 if VolIss_venue_meta in self.VolIss:
1136 for vvi_v in self.vvi[vvi_venue_meta]["volume"]:
1137 if vvi_v in self.VolIss[VolIss_venue_meta]["volume"]:
1138 self.VolIss[VolIss_venue_meta]["volume"][vvi_v]["issue"].update(
1139 self.vvi[vvi_venue_meta]["volume"][vvi_v]["issue"]
1140 )
1141 else:
1142 self.VolIss[VolIss_venue_meta]["volume"][vvi_v] = self.vvi[
1143 vvi_venue_meta
1144 ]["volume"][vvi_v]
1145 self.VolIss[VolIss_venue_meta]["issue"].update(
1146 self.vvi[vvi_venue_meta]["issue"]
1147 )
1148 else:
1149 self.VolIss[VolIss_venue_meta] = self.vvi[vvi_venue_meta]
1151 def __update_id_count(self, id_dict, identifier):
1153 # Prima di creare un nuovo ID, verifichiamo se esiste già nel triplestore
1154 schema, value = identifier.split(":", maxsplit=1)
1155 existing_metaid = self.finder.retrieve_metaid_from_id(schema, value)
1157 if existing_metaid:
1158 id_dict[identifier] = existing_metaid
1159 else:
1160 count = self._add_number("id")
1161 id_dict[identifier] = self.prefix + str(count)
1163 @staticmethod
1164 def merge(
1165 dict_to_match: Dict[str, Dict[str, list]],
1166 metaval: str,
1167 old_meta: str,
1168 temporary_name: str,
1169 ) -> None:
1170 for x in dict_to_match[old_meta]["ids"]:
1171 if x not in dict_to_match[metaval]["ids"]:
1172 dict_to_match[metaval]["ids"].append(x)
1173 for x in dict_to_match[old_meta]["others"]:
1174 if x not in dict_to_match[metaval]["others"]:
1175 dict_to_match[metaval]["others"].append(x)
1176 dict_to_match[metaval]["others"].append(old_meta)
1177 if not dict_to_match[metaval]["title"]:
1178 if dict_to_match[old_meta]["title"]:
1179 dict_to_match[metaval]["title"] = dict_to_match[old_meta]["title"]
1180 else:
1181 dict_to_match[metaval]["title"] = temporary_name
1182 del dict_to_match[old_meta]
1184 def merge_entities_in_csv(
1185 self,
1186 idslist: list,
1187 metaval: str,
1188 name: str,
1189 entity_dict: Dict[str, Dict[str, list]],
1190 id_dict: dict,
1191 ) -> None:
1192 found_others = self.__local_match(idslist, entity_dict)
1193 if found_others["wannabe"]:
1194 for old_meta in found_others["wannabe"]:
1195 self.merge(entity_dict, metaval, old_meta, name)
1196 for identifier in idslist:
1197 if identifier not in entity_dict[metaval]["ids"]:
1198 entity_dict[metaval]["ids"].append(identifier)
1199 if identifier not in id_dict:
1200 self.__update_id_count(id_dict, identifier)
1201 self.__update_title(entity_dict, metaval, name)
1203 def __update_title(self, entity_dict: dict, metaval: str, name: str) -> None:
1204 if not entity_dict[metaval]["title"] and name:
1205 entity_dict[metaval]["title"] = name
1206 self.log[self.rowcnt]["title"]["status"] = "New value proposed"
1208 def id_worker(
1209 self,
1210 col_name,
1211 name,
1212 idslist: List[str],
1213 metaval: str,
1214 ra_ent=False,
1215 br_ent=False,
1216 vvi_ent=False,
1217 publ_entity=False,
1218 ):
1219 if not ra_ent:
1220 id_dict = self.idbr
1221 entity_dict = self.brdict
1222 else:
1223 id_dict = self.idra
1224 entity_dict = self.radict
1225 # there's meta
1226 if metaval:
1227 # MetaID exists among data?
1228 # meta already in entity_dict (no care about conflicts, we have a meta specified)
1229 if metaval in entity_dict:
1230 self.merge_entities_in_csv(idslist, metaval, name, entity_dict, id_dict)
1231 else:
1232 if ra_ent:
1233 found_meta_ts = self.finder.retrieve_ra_from_meta(metaval)
1234 elif br_ent:
1235 found_meta_ts = self.finder.retrieve_br_from_meta(metaval)
1236 # meta in triplestore
1237 # 2 Retrieve EntityA data in triplestore to update EntityA inside CSV
1238 if found_meta_ts[2]:
1239 entity_dict[metaval] = dict()
1240 entity_dict[metaval]["ids"] = list()
1241 if col_name == "author" or col_name == "editor":
1242 entity_dict[metaval]["title"] = self.name_check(
1243 found_meta_ts[0], name
1244 )
1245 else:
1246 entity_dict[metaval]["title"] = found_meta_ts[0]
1247 entity_dict[metaval]["others"] = list()
1248 existing_ids = found_meta_ts[1]
1249 self.__update_id_and_entity_dict(
1250 existing_ids, id_dict, entity_dict, metaval
1251 )
1252 self.merge_entities_in_csv(
1253 idslist, metaval, name, entity_dict, id_dict
1254 )
1255 # Look for MetaId in the provenance
1256 else:
1257 entity_type = "br" if br_ent or vvi_ent else "ra"
1258 metaid_uri = f"{self.base_iri}/{entity_type}/{str(metaval)}"
1259 # The entity MetaId after merge if it was merged, None otherwise. If None, the MetaId is considered invalid
1260 metaval = self.finder.retrieve_metaid_from_merged_entity(
1261 metaid_uri=metaid_uri, prov_config=self.prov_config
1262 )
1263 # there's no meta or there was one but it didn't exist
1264 # Are there other IDs?
1265 if idslist and not metaval:
1266 local_match = self.__local_match(idslist, entity_dict)
1267 # IDs already exist among data?
1268 # check in entity_dict
1269 if local_match["existing"]:
1270 # ids refer to multiple existing entities
1271 if len(local_match["existing"]) > 1:
1272 # !
1273 return self.conflict(idslist, name, id_dict, col_name)
1274 # ids refer to ONE existing entity
1275 elif len(local_match["existing"]) == 1:
1276 metaval = str(local_match["existing"][0])
1277 suspect_ids = list()
1278 for identifier in idslist:
1279 if identifier not in entity_dict[metaval]["ids"]:
1280 suspect_ids.append(identifier)
1281 if suspect_ids:
1282 sparql_match = self.finder_sparql(
1283 suspect_ids,
1284 br=br_ent,
1285 ra=ra_ent,
1286 vvi=vvi_ent,
1287 publ=publ_entity,
1288 )
1289 if len(sparql_match) > 1:
1290 # !
1291 return self.conflict(idslist, name, id_dict, col_name)
1292 # ids refers to 1 or more wannabe entities
1293 elif local_match["wannabe"]:
1294 metaval = str(local_match["wannabe"].pop(0))
1295 # 5 Merge data from entityA (CSV) with data from EntityX (CSV)
1296 for old_meta in local_match["wannabe"]:
1297 self.merge(entity_dict, metaval, old_meta, name)
1298 suspect_ids = list()
1299 for identifier in idslist:
1300 if identifier not in entity_dict[metaval]["ids"]:
1301 suspect_ids.append(identifier)
1302 if suspect_ids:
1303 sparql_match = self.finder_sparql(
1304 suspect_ids, br=br_ent, ra=ra_ent, vvi=vvi_ent, publ=publ_entity
1305 )
1306 if sparql_match:
1307 # if 'wannabe' not in metaval or len(sparql_match) > 1:
1308 # # Two entities previously disconnected on the triplestore now become connected
1309 # # !
1310 # return self.conflict(idslist, name, id_dict, col_name)
1311 # else:
1312 # Collect all existing IDs from all matches
1313 existing_ids = []
1314 for match in sparql_match:
1315 existing_ids.extend(match[2])
1317 # new_idslist = [x[1] for x in existing_ids]
1318 # new_sparql_match = self.finder_sparql(new_idslist, br=br_ent, ra=ra_ent, vvi=vvi_ent, publ=publ_entity)
1319 # if len(new_sparql_match) > 1:
1320 # # Two entities previously disconnected on the triplestore now become connected
1321 # # !
1322 # return self.conflict(idslist, name, id_dict, col_name)
1323 # else:
1324 # 4 Merge data from EntityA (CSV) with data from EntityX (CSV) (it has already happened in # 5), update both with data from EntityA (RDF)
1325 old_metaval = metaval
1326 metaval = sparql_match[0][0]
1327 entity_dict[metaval] = dict()
1328 entity_dict[metaval]["ids"] = list()
1329 entity_dict[metaval]["others"] = list()
1330 entity_dict[metaval]["title"] = (
1331 sparql_match[0][1] if sparql_match[0][1] else ""
1332 )
1333 self.__update_id_and_entity_dict(
1334 existing_ids, id_dict, entity_dict, metaval
1335 )
1336 self.merge(
1337 entity_dict, metaval, old_metaval, sparql_match[0][1]
1338 )
1339 else:
1340 sparql_match = self.finder_sparql(
1341 idslist, br=br_ent, ra=ra_ent, vvi=vvi_ent, publ=publ_entity
1342 )
1343 # if len(sparql_match) > 1:
1344 # # !
1345 # return self.conflict(idslist, name, id_dict, col_name)
1346 # elif len(sparql_match) == 1:
1347 if sparql_match:
1348 # Collect all existing IDs from all matches
1349 existing_ids = []
1350 for match in sparql_match:
1351 existing_ids.extend(match[2])
1353 # new_idslist = [x[1] for x in existing_ids]
1354 # new_sparql_match = self.finder_sparql(new_idslist, br=br_ent, ra=ra_ent, vvi=vvi_ent, publ=publ_entity)
1355 # if len(new_sparql_match) > 1:
1356 # # Two entities previously disconnected on the triplestore now become connected
1357 # # !
1358 # return self.conflict(idslist, name, id_dict, col_name)
1359 # 2 Retrieve EntityA data in triplestore to update EntityA inside CSV
1360 # 3 CONFLICT beteen MetaIDs. MetaID specified in EntityA inside CSV has precedence.
1361 # elif len(new_sparql_match) == 1:
1362 metaval = sparql_match[0][0]
1363 entity_dict[metaval] = dict()
1364 entity_dict[metaval]["ids"] = list()
1365 entity_dict[metaval]["others"] = list()
1366 if col_name == "author" or col_name == "editor":
1367 entity_dict[metaval]["title"] = self.name_check(
1368 sparql_match[0][1], name
1369 )
1370 else:
1371 entity_dict[metaval]["title"] = sparql_match[0][1]
1372 self.__update_title(entity_dict, metaval, name)
1373 self.__update_id_and_entity_dict(
1374 existing_ids, id_dict, entity_dict, metaval
1375 )
1376 else:
1377 # 1 EntityA is a new one
1378 metaval = self.new_entity(entity_dict, name)
1379 for identifier in idslist:
1380 if identifier not in id_dict:
1381 self.__update_id_count(id_dict, identifier)
1382 if identifier not in entity_dict[metaval]["ids"]:
1383 entity_dict[metaval]["ids"].append(identifier)
1384 self.__update_title(entity_dict, metaval, name)
1385 # 1 EntityA is a new one
1386 if not idslist and not metaval:
1387 metaval = self.new_entity(entity_dict, name)
1388 return metaval
1390 def new_entity(self, entity_dict, name):
1391 metaval = "wannabe_" + str(self.wnb_cnt)
1392 self.wnb_cnt += 1
1393 entity_dict[metaval] = dict()
1394 entity_dict[metaval]["ids"] = list()
1395 entity_dict[metaval]["others"] = list()
1396 entity_dict[metaval]["title"] = name
1397 return metaval
1399 def volume_issue(
1400 self,
1401 meta: str,
1402 path: Dict[str, Dict[str, str]],
1403 value: str,
1404 row: Dict[str, str],
1405 ) -> None:
1406 if "wannabe" not in meta:
1407 if value in path:
1408 if "wannabe" in path[value]["id"]:
1409 old_meta = path[value]["id"]
1410 self.merge(self.brdict, meta, old_meta, row["title"])
1411 path[value]["id"] = meta
1412 else:
1413 path[value] = dict()
1414 path[value]["id"] = meta
1415 if "issue" not in path:
1416 path[value]["issue"] = dict()
1417 else:
1418 if value in path:
1419 if "wannabe" in path[value]["id"]:
1420 old_meta = path[value]["id"]
1421 if meta != old_meta:
1422 self.merge(self.brdict, meta, old_meta, row["title"])
1423 path[value]["id"] = meta
1424 else:
1425 old_meta = path[value]["id"]
1426 if "wannabe" not in old_meta and old_meta not in self.brdict:
1427 br4dict = self.finder.retrieve_br_from_meta(old_meta)
1428 self.brdict[old_meta] = dict()
1429 self.brdict[old_meta]["ids"] = list()
1430 self.brdict[old_meta]["others"] = list()
1431 self.brdict[old_meta]["title"] = br4dict[0] if br4dict else None
1432 if br4dict:
1433 for x in br4dict[1]:
1434 identifier = x[1]
1435 self.brdict[old_meta]["ids"].append(identifier)
1436 if identifier not in self.idbr:
1437 self.idbr[identifier] = x[0]
1438 self.merge(self.brdict, old_meta, meta, row["title"])
1439 else:
1440 path[value] = dict()
1441 path[value]["id"] = meta
1442 if "issue" not in path: # it's a Volume
1443 path[value]["issue"] = dict()
1445 def log_update(self):
1446 new_log = dict()
1447 for x in self.log:
1448 if any(self.log[x][y].values() for y in self.log[x]):
1449 for y in self.log[x]:
1450 if "Conflict entity" in self.log[x][y]:
1451 v = self.log[x][y]["Conflict entity"]
1452 if "wannabe" in v:
1453 if y == "id" or y == "venue":
1454 for brm in self.brmeta:
1455 if v in self.brmeta[brm]["others"]:
1456 m = "br/" + str(brm)
1457 elif y == "author" or y == "editor" or y == "publisher":
1458 for ram in self.rameta:
1459 if v in self.rameta[ram]["others"]:
1460 m = "ra/" + str(ram)
1461 else:
1462 m = v
1463 self.log[x][y]["Conflict entity"] = m
1464 new_log[x] = self.log[x]
1466 if "wannabe" in self.data[x]["id"]:
1467 for brm in self.brmeta:
1468 if self.data[x]["id"] in self.brmeta[brm]["others"]:
1469 met = "br/" + str(brm)
1470 else:
1471 met = "br/" + str(self.data[x]["id"])
1472 new_log[x]["id"]["meta"] = met
1473 return new_log
1475 def merge_duplicate_entities(self) -> None:
1476 """
1477 The 'merge_duplicate_entities()' function merge duplicate entities.
1478 Moreover, it modifies the CSV cells, giving precedence to the first found information
1479 or data in the triplestore in the case of already existing entities.
1481 :returns: None -- This method updates the CSV rows and returns None.
1482 """
1483 self.rowcnt = 0
1484 for row in self.data:
1485 id = row["id"]
1486 if "wannabe" not in id:
1487 self.equalizer(row, id)
1488 other_rowcnt = 0
1489 for other_row in self.data:
1490 if (
1491 other_row["id"] in self.brdict[id]["others"]
1492 or other_row["id"] == id
1493 ) and self.rowcnt != other_rowcnt:
1494 for field, _ in row.items():
1495 if row[field] and row[field] != other_row[field]:
1496 if other_row[field]:
1497 self.log[other_rowcnt][field][
1498 "status"
1499 ] = "New value proposed"
1500 other_row[field] = row[field]
1501 other_rowcnt += 1
1502 self.rowcnt += 1
1504 def extract_name_and_ids(self, venue_str: str) -> Tuple[str, List[str]]:
1505 """
1506 Extracts the name and IDs from the venue string.
1508 :params venue_str: the venue string
1509 :type venue_str: str
1510 :returns: Tuple[str, List[str]] -- the name and list of IDs extracted from the venue string
1511 """
1512 match = re.search(name_and_ids, venue_str)
1513 if match:
1514 name = match.group(1).strip()
1515 ids = match.group(2).strip().split()
1516 else:
1517 name = venue_str.strip()
1518 ids = []
1519 return name, ids
1521 def equalizer(self, row: Dict[str, str], metaval: str) -> None:
1522 """
1523 Given a CSV row and its MetaID, this function equates the information present in the CSV with that present on the triplestore.
1525 :params row: a dictionary representing a CSV row
1526 :type row: Dict[str, str]
1527 :params metaval: the MetaID identifying the bibliographic resource contained in the input CSV row
1528 :type metaval: str
1529 :returns: None -- This method modifies the input CSV row without returning it.
1530 """
1531 self.log[self.rowcnt]["id"]["status"] = "Entity already exists"
1532 known_data = self.finder.retrieve_br_info_from_meta(metaval)
1533 try:
1534 known_data["author"] = self.__get_resp_agents(metaval, "author")
1535 except ValueError:
1536 print(row)
1537 raise (ValueError)
1538 known_data["editor"] = self.__get_resp_agents(metaval, "editor")
1539 known_data["publisher"] = self.finder.retrieve_publisher_from_br_metaid(metaval)
1540 for datum in ["pub_date", "type", "volume", "issue"]:
1541 if known_data[datum]:
1542 if row[datum] and row[datum] != known_data[datum]:
1543 self.log[self.rowcnt][datum]["status"] = "New value proposed"
1544 row[datum] = known_data[datum]
1545 for datum in ["author", "editor", "publisher"]:
1546 if known_data[datum] and not row[datum]:
1547 row[datum] = known_data[datum]
1548 if known_data["venue"]:
1549 current_venue = row["venue"]
1550 known_venue = known_data["venue"]
1552 if current_venue:
1553 # Extract the IDs from the current venue
1554 current_venue_name, current_venue_ids = self.extract_name_and_ids(
1555 current_venue
1556 )
1557 known_venue_name, known_venue_ids = self.extract_name_and_ids(
1558 known_venue
1559 )
1561 current_venue_ids_set = set(current_venue_ids)
1562 known_venue_ids_set = set(known_venue_ids)
1564 common_ids = current_venue_ids_set.intersection(known_venue_ids_set)
1566 if common_ids:
1567 # Merge the IDs and use the title from the known venue
1568 merged_ids = current_venue_ids_set.union(known_venue_ids_set)
1569 row["venue"] = (
1570 f"{known_venue_name} [{' '.join(sorted(merged_ids))}]"
1571 )
1572 else:
1573 # Use the known venue information entirely
1574 row["venue"] = known_venue
1575 else:
1576 row["venue"] = known_venue
1577 if known_data["page"]:
1578 if row["page"] and row["page"] != known_data["page"][1]:
1579 self.log[self.rowcnt]["page"]["status"] = "New value proposed"
1580 row["page"] = known_data["page"][1]
1581 self.remeta[metaval] = known_data["page"]
1583 def __get_resp_agents(self, metaid: str, column: str) -> str:
1584 resp_agents = self.finder.retrieve_ra_sequence_from_br_meta(metaid, column)
1585 output = ""
1586 if resp_agents:
1587 full_resp_agents = list()
1588 for item in resp_agents:
1589 for _, resp_agent in item.items():
1590 author_name = resp_agent[0]
1591 ids = [f"omid:ra/{resp_agent[2]}"]
1592 ids.extend([id[1] for id in resp_agent[1]])
1593 author_ids = "[" + " ".join(ids) + "]"
1594 full_resp_agent = author_name + " " + author_ids
1595 full_resp_agents.append(full_resp_agent)
1596 output = "; ".join(full_resp_agents)
1597 return output
1600def is_a_valid_row(row: Dict[str, str]) -> bool:
1601 """
1602 This method discards invalid rows in the input CSV file.
1604 :params row: a dictionary representing a CSV row
1605 :type row: Dict[str, str]
1606 :returns: bool -- This method returns True if the row is valid, False if it is invalid.
1607 """
1608 br_type = " ".join((row["type"].lower()).split())
1609 br_title = row["title"]
1610 br_volume = row["volume"]
1611 br_issue = row["issue"]
1612 br_venue = row["venue"]
1613 if row["id"]:
1614 if (br_volume or br_issue) and (not br_type or not br_venue):
1615 return False
1616 return True
1617 if all(not row[value] for value in row):
1618 return False
1619 br_author = row["author"]
1620 br_editor = row["editor"]
1621 br_pub_date = row["pub_date"]
1622 if not br_type or br_type in {
1623 "book",
1624 "data file",
1625 "dataset",
1626 "dissertation",
1627 "edited book",
1628 "journal article",
1629 "monograph",
1630 "other",
1631 "peer review",
1632 "posted content",
1633 "web content",
1634 "proceedings article",
1635 "report",
1636 "reference book",
1637 }:
1638 is_a_valid_row = (
1639 True if br_title and br_pub_date and (br_author or br_editor) else False
1640 )
1641 elif br_type in {
1642 "book chapter",
1643 "book part",
1644 "book section",
1645 "book track",
1646 "component",
1647 "reference entry",
1648 }:
1649 is_a_valid_row = True if br_title and br_venue else False
1650 elif br_type in {
1651 "book series",
1652 "book set",
1653 "journal",
1654 "proceedings",
1655 "proceedings series",
1656 "report series",
1657 "standard",
1658 "standard series",
1659 }:
1660 is_a_valid_row = True if br_title else False
1661 elif br_type == "journal volume":
1662 is_a_valid_row = True if br_venue and (br_volume or br_title) else False
1663 elif br_type == "journal issue":
1664 is_a_valid_row = True if br_venue and (br_issue or br_title) else False
1665 return is_a_valid_row
1668def get_edited_br_metaid(row: dict, metaid: str, venue_metaid: str) -> Tuple[str, bool]:
1669 if row["author"] and row["venue"] and row["type"] in CONTAINER_EDITOR_TYPES:
1670 edited_br_metaid = venue_metaid
1671 else:
1672 edited_br_metaid = metaid
1673 return edited_br_metaid