Coverage for oc_meta / core / curator.py: 95%
962 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
1# SPDX-FileCopyrightText: 2019 Silvio Peroni <silvio.peroni@unibo.it>
2# SPDX-FileCopyrightText: 2019-2020 Fabio Mariani <fabio.mariani555@gmail.com>
3# SPDX-FileCopyrightText: 2021 Simone Persiani <iosonopersia@gmail.com>
4# SPDX-FileCopyrightText: 2021-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
5#
6# SPDX-License-Identifier: ISC
8from __future__ import annotations
10import multiprocessing
11import os
12from concurrent.futures import ProcessPoolExecutor
13from contextlib import nullcontext
14from typing import TYPE_CHECKING, Dict, List, Tuple
16from oc_meta.constants import CONTAINER_EDITOR_TYPES, VALID_ENTITY_TYPES
17from oc_meta.lib.cleaner import (
18 clean_date,
19 clean_name,
20 clean_ra_list,
21 clean_title,
22 clean_volume_and_issue,
23 normalize_hyphens,
24 normalize_id,
25)
26from oc_meta.lib.file_manager import *
27from oc_meta.lib.finder import *
28from oc_meta.lib.merge_registry import EntityStore
29from oc_meta.lib.master_of_regex import (
30 RE_COLON_AND_SPACES,
31 RE_MULTIPLE_SPACES,
32 RE_ONE_OR_MORE_SPACES,
33 RE_SEMICOLON_IN_PEOPLE_FIELD,
34 split_name_and_ids,
35)
36from oc_ocdm.counter_handler.redis_counter_handler import RedisCounterHandler
38if TYPE_CHECKING:
39 from rich.progress import Progress
42def _omid(meta: str) -> str:
43 return f"omid:{meta}"
46def _extract_ids_from_chunk(rows: list) -> Tuple[set, set, set]:
47 all_metavals = set()
48 all_identifiers = set()
49 all_vvis = set()
51 for row in rows:
52 metavals = set()
53 identifiers = set()
54 vvis = set()
55 venue_ids = set()
56 venue_metaid = None
58 if row["id"]:
59 id_list = RE_ONE_OR_MORE_SPACES.split(RE_COLON_AND_SPACES.sub(":", row["id"]))
60 idslist, metaval = Curator.clean_id_list(id_list, br=True)
61 if metaval:
62 metavals.add(_omid(metaval))
63 if idslist:
64 identifiers.update(idslist)
66 fields_with_an_id = []
67 for field in ["author", "editor", "publisher", "venue", "volume", "issue"]:
68 _, ids_str = split_name_and_ids(row[field])
69 if ids_str:
70 fields_with_an_id.append((field, ids_str.split()))
71 for field, field_ids in fields_with_an_id:
72 br = field in ["venue", "volume", "issue"]
73 field_idslist, field_metaval = Curator.clean_id_list(field_ids, br=br)
74 if field_metaval:
75 field_metaval = _omid(field_metaval)
76 else:
77 field_metaval = ""
78 if field_metaval:
79 metavals.add(field_metaval)
80 if field == "venue":
81 venue_metaid = field_metaval
82 if field_idslist:
83 venue_ids.update(field_idslist)
84 else:
85 if field_idslist:
86 identifiers.update(field_idslist)
88 if (venue_metaid or venue_ids) and (row["volume"] or row["issue"]):
89 vvi = (row["volume"], row["issue"], venue_metaid, tuple(sorted(venue_ids)))
90 vvis.add(vvi)
92 all_metavals.update(metavals)
93 all_identifiers.update(identifiers)
94 all_vvis.update(vvis)
96 return all_metavals, all_identifiers, all_vvis
99class Curator:
101 def __init__(
102 self,
103 data: List[dict],
104 ts: str,
105 prov_config: str,
106 counter_handler: RedisCounterHandler,
107 base_iri: str = "https://w3id.org/oc/meta",
108 prefix: str = "060",
109 settings: dict | None = None,
110 silencer: list = [],
111 meta_config_path: str | None = None,
112 timer=None,
113 progress: Progress | None = None,
114 min_rows_parallel: int = 1000,
115 ):
116 self.timer = timer
117 self.progress = progress
118 self.settings = settings or {}
119 self.workers = self.settings.get("workers", 1)
120 self.finder = ResourceFinder(
121 ts,
122 base_iri,
123 settings=self.settings,
124 meta_config_path=meta_config_path,
125 workers=self.workers,
126 )
127 self.base_iri = base_iri
128 self.prov_config = prov_config
129 # Preliminary pass to clear volume and issue if id is present but venue is missing
130 for row in data:
131 if row["id"] and (row["volume"] or row["issue"]):
132 if not row["venue"]:
133 row["volume"] = ""
134 row["issue"] = ""
135 if not row["type"]:
136 row["type"] = "journal article"
137 self.data = [
138 {field: value.strip() for field, value in row.items()}
139 for row in data
140 if is_a_valid_row(row)
141 ]
142 self.prefix = prefix
143 self.counter_handler = counter_handler
145 self.entity_store = EntityStore()
146 self.ardict = {}
147 self.vvi = {}
148 self.remeta = dict()
149 self.wnb_cnt = 0
150 self.rowcnt = 0
151 self.preexisting_entities = set()
152 self.silencer = silencer
153 self.min_rows_parallel = min_rows_parallel
156 def _timed(self, name: str):
157 if self.timer:
158 return self.timer.timer(name)
159 return nullcontext()
161 def collect_identifiers(self):
162 return self._collect_identifiers_with_progress(task_id=None)
164 def _collect_identifiers_with_progress(self, task_id=None):
165 all_metavals = set()
166 all_idslist = set()
167 all_vvis = set()
169 total_rows = len(self.data)
170 if total_rows == 0:
171 return all_metavals, all_idslist, all_vvis
173 if total_rows > self.min_rows_parallel and self.workers > 1:
174 chunks = []
175 for i in range(0, total_rows, self.min_rows_parallel):
176 chunks.append(self.data[i:i + self.min_rows_parallel])
178 with ProcessPoolExecutor(
179 max_workers=self.workers,
180 mp_context=multiprocessing.get_context('forkserver')
181 ) as executor:
182 for chunk_metavals, chunk_ids, chunk_vvis in executor.map(_extract_ids_from_chunk, chunks):
183 all_metavals.update(chunk_metavals)
184 all_idslist.update(chunk_ids)
185 all_vvis.update(chunk_vvis)
186 if self.progress and task_id is not None:
187 self.progress.advance(task_id, min(self.min_rows_parallel, total_rows))
188 else:
189 for row in self.data:
190 metavals, idslist, vvis = self.extract_identifiers_and_metavals(row)
191 all_metavals.update(metavals)
192 all_idslist.update(idslist)
193 all_vvis.update(vvis)
194 if self.progress and task_id is not None:
195 self.progress.advance(task_id)
197 return all_metavals, all_idslist, all_vvis
199 def extract_identifiers_and_metavals(
200 self, row
201 ) -> Tuple[set, set, set]:
202 metavals = set()
203 identifiers = set()
204 vvis = set()
205 venue_ids = set()
206 venue_metaid = None
208 if row["id"]:
209 idslist, metaval = self.clean_id_list(
210 self.split_identifiers(row["id"]),
211 br=True,
212 )
213 id_metaval = _omid(metaval) if metaval else ""
214 if id_metaval:
215 metavals.add(id_metaval)
216 if idslist:
217 identifiers.update(idslist)
219 fields_with_an_id = []
220 for field in ["author", "editor", "publisher", "venue", "volume", "issue"]:
221 _, ids_str = split_name_and_ids(row[field])
222 if ids_str:
223 fields_with_an_id.append((field, ids_str.split()))
224 for field, field_ids in fields_with_an_id:
225 br = field in ["venue", "volume", "issue"]
226 field_idslist, field_metaval = self.clean_id_list(field_ids, br=br)
227 if field_metaval:
228 field_metaval = _omid(field_metaval)
229 else:
230 field_metaval = ""
231 if field_metaval:
232 metavals.add(field_metaval)
233 if field == "venue":
234 venue_metaid = field_metaval
235 if field_idslist:
236 venue_ids.update(field_idslist)
237 else:
238 if field_idslist:
239 identifiers.update(field_idslist)
241 if (venue_metaid or venue_ids) and (row["volume"] or row["issue"]):
242 vvi = (row["volume"], row["issue"], venue_metaid, tuple(sorted(venue_ids)))
243 vvis.add(vvi)
245 return metavals, identifiers, vvis
247 def split_identifiers(self, field_value):
248 return RE_ONE_OR_MORE_SPACES.split(RE_COLON_AND_SPACES.sub(":", field_value))
250 def curator(self, filename: str | None = None, path_csv: str | None = None):
251 total_rows = len(self.data)
253 # Phase 1: Collect identifiers and SPARQL prefetch
254 with self._timed("curation__collect_identifiers"):
255 task_collect = None
256 if self.progress:
257 task_collect = self.progress.add_task(
258 " [dim]Collecting identifiers[/dim]", total=total_rows
259 )
260 metavals, identifiers, vvis = self._collect_identifiers_with_progress(
261 task_id=task_collect,
262 )
263 if self.progress and task_collect is not None:
264 self.progress.remove_task(task_collect)
265 self.finder.get_everything_about_res(
266 metavals=metavals, identifiers=identifiers, vvis=vvis,
267 progress=self.progress
268 )
270 # Phase 2: Clean ID (loop over all rows)
271 with self._timed("curation__clean_id"):
272 task_clean_id = None
273 if self.progress:
274 task_clean_id = self.progress.add_task(
275 " [dim]Cleaning IDs[/dim]", total=total_rows
276 )
277 for row in self.data:
278 self.clean_id(row)
279 self.rowcnt += 1
280 if self.progress and task_clean_id is not None:
281 self.progress.advance(task_clean_id)
282 if self.progress and task_clean_id is not None:
283 self.progress.remove_task(task_clean_id)
285 # Phase 3: Merge duplicates + clean VVI/RA
286 with self._timed("curation__clean_vvi_ra"):
287 task_merge = None
288 if self.progress:
289 task_merge = self.progress.add_task(
290 " [dim]Merging duplicates[/dim]", total=total_rows
291 )
292 self.merge_duplicate_entities(task_id=task_merge)
293 if self.progress and task_merge is not None:
294 self.progress.remove_task(task_merge)
295 self.clean_metadata_without_id()
297 self.rowcnt = 0
298 task_vvi_ra = None
299 if self.progress:
300 task_vvi_ra = self.progress.add_task(
301 " [dim]Cleaning VVI and RA[/dim]", total=total_rows
302 )
303 for row in self.data:
304 self.clean_vvi(row)
305 self.clean_ra(row, "author")
306 self.clean_ra(row, "publisher")
307 self.clean_ra(row, "editor")
308 self.rowcnt += 1
309 if self.progress and task_vvi_ra is not None:
310 self.progress.advance(task_vvi_ra)
311 if self.progress and task_vvi_ra is not None:
312 self.progress.remove_task(task_vvi_ra)
314 # Phase 4: Metamaker (preexisting + meta_maker + enrich + dedupe)
315 with self._timed("curation__metamaker"):
316 task_metamaker = None
317 if self.progress:
318 task_metamaker = self.progress.add_task(
319 " [dim]Metamaker[/dim]", total=total_rows
320 )
321 self.get_preexisting_entities()
322 self.meta_maker()
323 self.enrich(task_id=task_metamaker)
324 if self.progress and task_metamaker is not None:
325 self.progress.remove_task(task_metamaker)
326 self.data = list({v["id"]: v for v in self.data}.values())
328 # Phase 5: CSV output (curated CSV indexer)
329 with self._timed("curation__csv_out"):
330 self.filename = filename
331 self.indexer(path_csv=path_csv)
333 def clean_id(self, row: Dict[str, str]) -> None:
334 """
335 The 'clean id()' function is executed for each CSV row.
336 In this process, any duplicates are detected by the IDs in the 'id' column.
337 For each line, a wannabeID or, if the bibliographic resource was found in the triplestore,
338 a MetaID is assigned.
339 Finally, this method enrich and clean the fields related to the
340 title, venue, volume, issue, page, publication date and type.
342 :params row: a dictionary representing a CSV row
343 :type row: Dict[str, str]
344 :returns: None -- This method modifies the input CSV row without returning it.
345 """
346 if row["title"]:
347 name = clean_title(
348 row["title"], bool(self.settings.get("normalize_titles", False))
349 )
350 else:
351 name = ""
352 metaval_ids_list = []
353 idslist: list = []
354 metaval = ""
355 if row["id"]:
356 idslist = RE_ONE_OR_MORE_SPACES.split(RE_COLON_AND_SPACES.sub(":", row["id"]))
357 idslist, metaval = self.clean_id_list(
358 idslist, br=True)
359 id_metaval = _omid(metaval) if metaval else ""
360 metaval_ids_list.append((id_metaval, idslist))
361 fields_with_an_id = []
362 for field in ["author", "editor", "publisher", "venue", "volume", "issue"]:
363 _, ids_str = split_name_and_ids(row[field])
364 if ids_str:
365 fields_with_an_id.append((field, ids_str.split()))
366 for field, field_ids in fields_with_an_id:
367 br = field in ["venue", "volume", "issue"]
368 field_idslist, field_metaval = self.clean_id_list(
369 field_ids, br=br)
370 if field_metaval:
371 field_metaval = _omid(field_metaval)
372 else:
373 field_metaval = ""
374 metaval_ids_list.append((field_metaval, field_idslist))
375 if row["id"]:
376 metaval = self.id_worker(
377 "id",
378 name,
379 idslist,
380 metaval,
381 ra_ent=False,
382 br_ent=True,
383 vvi_ent=False,
384 publ_entity=False,
385 )
386 else:
387 metaval = self.new_entity(name, "br")
388 row["title"] = self.entity_store.get_title(metaval)
389 row["id"] = metaval
391 def clean_metadata_without_id(self):
392 for row in self.data:
393 if row["page"]:
394 row["page"] = normalize_hyphens(row["page"])
395 if pub_date := row["pub_date"]:
396 row["pub_date"] = clean_date(normalize_hyphens(pub_date))
397 if row["type"]:
398 entity_type = RE_MULTIPLE_SPACES.sub(" ", row["type"].lower()).strip()
399 if entity_type == "edited book" or entity_type == "monograph":
400 entity_type = "book"
401 elif (
402 entity_type == "report series"
403 or entity_type == "standard series"
404 or entity_type == "proceedings series"
405 ):
406 entity_type = "series"
407 elif entity_type == "posted content":
408 entity_type = "web content"
409 if entity_type in VALID_ENTITY_TYPES:
410 row["type"] = entity_type
411 else:
412 row["type"] = ""
414 def clean_vvi(self, row: Dict[str, str]) -> None:
415 """
416 This method performs the deduplication process for venues, volumes and issues.
417 The acquired information is stored in the 'vvi' dictionary, that has the following format: ::
419 {
420 VENUE_IDENTIFIER: {
421 'issue': {SEQUENCE_IDENTIFIER: {'id': META_ID}},
422 'volume': {
423 SEQUENCE_IDENTIFIER: {
424 'id': META_ID,
425 'issue' {SEQUENCE_IDENTIFIER: {'id': META_ID}}
426 }
427 }
428 }
429 }
431 {
432 '4416': {
433 'issue': {},
434 'volume': {
435 '166': {'id': '4388', 'issue': {'4': {'id': '4389'}}},
436 '172': {'id': '4434',
437 'issue': {
438 '22': {'id': '4435'},
439 '20': {'id': '4436'},
440 '21': {'id': '4437'},
441 '19': {'id': '4438'}
442 }
443 }
444 }
445 }
446 }
448 :params row: a dictionary representing a CSV row
449 :type row: Dict[str, str]
450 :returns: None -- This method modifies the input CSV row without returning it.
451 """
452 if row["type"] not in {
453 "journal article",
454 "journal volume",
455 "journal issue",
456 } and (row["volume"] or row["issue"]):
457 row["volume"] = ""
458 row["issue"] = ""
459 clean_volume_and_issue(row=row)
460 vol_meta = None
461 br_type = row["type"]
462 volume = row["volume"]
463 issue = row["issue"]
464 br_id = row["id"]
465 venue = row["venue"]
466 # Venue
467 if venue:
468 # The data must be invalidated, because the resource is journal but a volume or an issue have also been specified
469 if br_type == "journal" and (volume or issue):
470 row["venue"] = ""
471 row["volume"] = ""
472 row["issue"] = ""
473 venue_name, venue_ids_str = split_name_and_ids(venue)
474 if venue_ids_str:
475 name = clean_title(
476 venue_name, bool(self.settings.get("normalize_titles", False))
477 )
478 idslist = RE_ONE_OR_MORE_SPACES.split(RE_COLON_AND_SPACES.sub(":", venue_ids_str))
479 idslist, metaval = self.clean_id_list(
480 idslist, br=True)
482 metaval = self.id_worker(
483 "venue",
484 name,
485 idslist,
486 metaval,
487 ra_ent=False,
488 br_ent=True,
489 vvi_ent=True,
490 publ_entity=False,
491 )
492 if metaval not in self.vvi:
493 ts_vvi = None
494 if "wannabe" not in metaval:
495 ts_vvi = self.finder.retrieve_venue_from_local_graph(metaval)
496 if "wannabe" in metaval or not ts_vvi:
497 self.vvi[metaval] = dict()
498 self.vvi[metaval]["volume"] = dict()
499 self.vvi[metaval]["issue"] = dict()
500 elif ts_vvi:
501 self.vvi[metaval] = ts_vvi
502 else:
503 name = clean_title(
504 venue_name or venue,
505 bool(self.settings.get("normalize_titles", False)),
506 )
507 metaval = self.new_entity(name, "br")
508 self.vvi[metaval] = dict()
509 self.vvi[metaval]["volume"] = dict()
510 self.vvi[metaval]["issue"] = dict()
511 row["venue"] = metaval
513 # Volume
514 if volume and (br_type == "journal issue" or br_type == "journal article"):
515 if volume in self.vvi[metaval]["volume"]:
516 vol_meta = self.vvi[metaval]["volume"][volume]["id"]
517 else:
518 vol_meta = self.new_entity("", "br")
519 self.vvi[metaval]["volume"][volume] = dict()
520 self.vvi[metaval]["volume"][volume]["id"] = vol_meta
521 self.vvi[metaval]["volume"][volume]["issue"] = dict()
522 elif volume and br_type == "journal volume":
523 # The data must be invalidated, because the resource is a journal volume but an issue has also been specified
524 if issue:
525 row["volume"] = ""
526 row["issue"] = ""
527 else:
528 vol_meta = br_id
529 self.volume_issue(
530 vol_meta, self.vvi[metaval]["volume"], volume, row
531 )
533 # Issue
534 if issue and br_type == "journal article":
535 row["issue"] = issue
536 if vol_meta:
537 if issue not in self.vvi[metaval]["volume"][volume]["issue"]:
538 issue_meta = self.new_entity("", "br")
539 self.vvi[metaval]["volume"][volume]["issue"][issue] = dict()
540 self.vvi[metaval]["volume"][volume]["issue"][issue][
541 "id"
542 ] = issue_meta
543 else:
544 if issue not in self.vvi[metaval]["issue"]:
545 issue_meta = self.new_entity("", "br")
546 self.vvi[metaval]["issue"][issue] = dict()
547 self.vvi[metaval]["issue"][issue]["id"] = issue_meta
548 elif issue and br_type == "journal issue":
549 issue_meta = br_id
550 if vol_meta:
551 self.volume_issue(
552 issue_meta,
553 self.vvi[metaval]["volume"][volume]["issue"],
554 issue,
555 row,
556 )
557 else:
558 self.volume_issue(
559 issue_meta, self.vvi[metaval]["issue"], issue, row
560 )
562 else:
563 row["venue"] = ""
564 row["volume"] = ""
565 row["issue"] = ""
567 def clean_ra(self, row, col_name):
568 """
569 This method performs the deduplication process for responsible agents (authors, publishers and editors).
571 :params row: a dictionary representing a CSV row
572 :type row: Dict[str, str]
573 :params col_name: the CSV column name. It can be 'author', 'publisher', or 'editor'
574 :type col_name: str
575 :returns: None -- This method modifies self.ardict, self.radict, and self.idra, and returns None.
576 """
578 def get_br_metaval_to_check(row, col_name):
579 if col_name == "editor":
580 return get_edited_br_metaid(row, row["id"], row["venue"])
581 else:
582 return row["id"]
584 def get_br_metaval(br_metaval_to_check):
585 if br_metaval_to_check in self.entity_store or br_metaval_to_check in self.vvi:
586 return br_metaval_to_check
587 return self.entity_store.find(br_metaval_to_check)
589 def initialize_ardict_entry(br_metaval):
590 if br_metaval not in self.ardict:
591 self.ardict[br_metaval] = {"author": [], "editor": [], "publisher": []}
593 def initialize_sequence(br_metaval, col_name):
594 sequence = []
595 if "wannabe" in br_metaval:
596 sequence = []
597 else:
598 sequence_found = self.finder.retrieve_ra_sequence_from_br_meta(
599 br_metaval, col_name
600 )
601 if sequence_found:
602 sequence = []
603 for agent in sequence_found:
604 for ar_metaid in agent:
605 ra_metaid = agent[ar_metaid][2]
606 sequence.append(tuple((ar_metaid, ra_metaid)))
607 if ra_metaid not in self.entity_store:
608 self.entity_store.add_entity(ra_metaid, agent[ar_metaid][0])
609 for identifier in agent[ar_metaid][1]:
610 id_metaid = identifier[0]
611 literal = identifier[1]
612 if self.entity_store.get_id_metaid(literal) is None:
613 self.entity_store.set_id_metaid(literal, id_metaid)
614 if literal not in self.entity_store.get_ids(ra_metaid):
615 self.entity_store.add_id(ra_metaid, literal)
616 self.ardict[br_metaval][col_name].extend(sequence)
617 else:
618 sequence = []
619 return sequence
621 def parse_ra_list(row):
622 ra_list = RE_SEMICOLON_IN_PEOPLE_FIELD.split(row[col_name])
623 ra_list = clean_ra_list(ra_list)
624 return ra_list
626 def process_individual_ra(ra, sequence):
627 new_elem_seq = True
628 raw_name, ra_id = split_name_and_ids(ra)
629 name = clean_name(raw_name)
630 if not ra_id and sequence:
631 for _, ra_metaid in sequence:
632 if self.entity_store.get_title(ra_metaid) == name:
633 ra_id = "omid:" + str(ra_metaid)
634 new_elem_seq = False
635 break
636 return ra_id, name, new_elem_seq
638 if not row[col_name]:
639 return
641 br_metaval_to_check = get_br_metaval_to_check(row, col_name)
642 br_metaval = get_br_metaval(br_metaval_to_check)
643 initialize_ardict_entry(br_metaval)
645 sequence = self.ardict[br_metaval].get(col_name, [])
646 if not sequence:
647 sequence = initialize_sequence(br_metaval, col_name)
648 if col_name in self.silencer and sequence:
649 return
651 ra_list = parse_ra_list(row)
652 new_sequence = list()
653 change_order = False
655 for pos, ra in enumerate(ra_list):
656 ra_id, name, new_elem_seq = process_individual_ra(ra, sequence)
657 if ra_id:
658 ra_id_list = RE_ONE_OR_MORE_SPACES.split(RE_COLON_AND_SPACES.sub(":", ra_id))
659 if sequence:
660 ar_ra = None
661 for ps, el in enumerate(sequence):
662 ra_metaid = el[1]
663 for literal in ra_id_list:
664 if literal in self.entity_store.get_ids(ra_metaid):
665 if ps != pos:
666 change_order = True
667 new_elem_seq = False
668 if "wannabe" not in ra_metaid:
669 ar_ra = ra_metaid
670 for pos, literal_value in enumerate(ra_id_list):
671 if "omid" in literal_value:
672 ra_id_list[pos] = ""
673 break
674 ra_id_list = list(filter(None, ra_id_list))
675 ra_id_list.append("omid:" + ar_ra)
676 if not ar_ra:
677 # new element
678 for ar_metaid, ra_metaid in sequence:
679 if self.entity_store.get_title(ra_metaid) == name:
680 new_elem_seq = False
681 if "wannabe" not in ra_metaid:
682 ar_ra = ra_metaid
683 for pos, i in enumerate(ra_id_list):
684 if "omid" in i:
685 ra_id_list[pos] = ""
686 break
687 ra_id_list = list(filter(None, ra_id_list))
688 ra_id_list.append("omid:" + ar_ra)
689 if col_name == "publisher":
690 ra_id_list, metaval = self.clean_id_list(
691 ra_id_list, br=False)
692 metaval = self.id_worker(
693 "publisher",
694 name,
695 ra_id_list,
696 metaval,
697 ra_ent=True,
698 br_ent=False,
699 vvi_ent=False,
700 publ_entity=True,
701 )
702 else:
703 ra_id_list, metaval = self.clean_id_list(
704 ra_id_list, br=False)
705 metaval = self.id_worker(
706 col_name,
707 name,
708 ra_id_list,
709 metaval,
710 ra_ent=True,
711 br_ent=False,
712 vvi_ent=False,
713 publ_entity=False,
714 )
715 if col_name != "publisher" and metaval in self.entity_store:
716 full_name: str = self.entity_store.get_title(metaval)
717 if "," in name and "," in full_name:
718 first_name = name.split(",")[1].strip()
719 if (
720 not full_name.split(",")[1].strip() and first_name
721 ): # first name found!
722 given_name = full_name.split(",")[0]
723 self.entity_store.set_title(metaval, given_name + ", " + first_name)
724 else:
725 metaval = self.new_entity(name, "ra")
726 if new_elem_seq:
727 role = "ar/" + self.prefix + str(self._add_number("ar"))
728 new_sequence.append(tuple((role, metaval)))
729 sequence.extend(new_sequence)
730 self.ardict[br_metaval][col_name] = sequence
732 @staticmethod
733 def clean_id_list(
734 id_list: List[str], br: bool
735 ) -> Tuple[list, str]:
736 """
737 Clean IDs in the input list and check if there is a MetaID.
739 :params: id_list: a list of IDs
740 :type: id_list: List[str]
741 :params: br: True if the IDs in id_list refer to bibliographic resources, False otherwise
742 :type: br: bool
743 :returns: Tuple[list, str]: -- it returns a two-elements tuple, where the first element is the list of cleaned IDs, while the second is a MetaID (with prefix like "br/0601") if any was found.
744 """
745 metaid = ""
746 id_list = list(filter(None, id_list))
747 clean_set = set()
748 clean_list = []
750 for elem in id_list:
751 if elem in clean_set:
752 continue
753 clean_set.add(elem)
754 elem = normalize_hyphens(elem)
755 identifier = elem.split(":", 1)
756 schema = identifier[0].lower()
757 value = identifier[1]
759 if schema == "omid":
760 metaid = value
761 else:
762 normalized_id = normalize_id(elem)
763 if normalized_id:
764 clean_list.append(normalized_id)
766 meta_count = sum(1 for i in id_list if i.lower().startswith("omid"))
767 if meta_count > 1:
768 clean_list = [i for i in clean_list if not i.lower().startswith("omid")]
770 return clean_list, metaid
772 def conflict(
773 self, idslist: List[str], name: str, entity_type: str, col_name: str
774 ) -> str:
775 metaval = self.new_entity(name, entity_type)
776 for identifier in idslist:
777 self.entity_store.add_id(metaval, identifier)
778 if self.entity_store.get_id_metaid(identifier) is None:
779 schema_value = identifier.split(":", maxsplit=1)
780 found_metaid = self.finder.retrieve_metaid_from_id(
781 schema_value[0], schema_value[1]
782 )
783 if found_metaid:
784 self.entity_store.set_id_metaid(identifier, found_metaid)
785 else:
786 self.__update_id_count(identifier)
787 return metaval
789 def finder_sparql(self, list_to_find, br=True, ra=False, vvi=False, publ=False):
790 match_elem = list()
791 id_set = set()
792 res = None
793 for elem in list_to_find:
794 if len(match_elem) < 2:
795 identifier = elem.split(":", maxsplit=1)
796 value = identifier[1]
797 schema = identifier[0]
798 if br:
799 res = self.finder.retrieve_br_from_id(schema, value)
800 elif ra:
801 res = self.finder.retrieve_ra_from_id(schema, value)
802 if res:
803 for f in res:
804 if f[0] not in id_set:
805 match_elem.append(f)
806 id_set.add(f[0])
807 return match_elem
809 def ra_update(self, row: dict, br_key: str, col_name: str) -> None:
810 if row[col_name]:
811 sequence = self.ardict[br_key][col_name] if br_key in self.ardict else []
812 ras_list = list()
813 for _, ra_id in sequence:
814 ra_name = self.entity_store.get_title(ra_id)
815 ra_ids_with_omid = self.entity_store.get_ids(ra_id) | {_omid(ra_id)}
816 ra = self.build_name_ids_string(ra_name, ra_ids_with_omid)
817 ras_list.append(ra)
818 row[col_name] = "; ".join(ras_list)
820 @staticmethod
821 def build_name_ids_string(name: str, ids: set) -> str:
822 if name and ids:
823 return f"{name} [{' '.join(ids)}]"
824 elif name:
825 return name
826 elif ids:
827 return f"[{' '.join(ids)}]"
828 return ""
830 def _local_match(self, list_to_match: list, entity_type: str = "") -> dict:
831 def sort_key(entity_key: str) -> tuple:
832 if "wannabe_" in entity_key:
833 parts = entity_key.split("wannabe_")
834 return (0, int(parts[1]))
835 return (1, entity_key)
837 entity_prefix = f"{entity_type}/" if entity_type else ""
838 match_elem: dict[str, list] = {"existing": [], "wannabe": []}
839 seen: set[str] = set()
840 for identifier in list_to_match:
841 entities = self.entity_store.find_entities(identifier)
842 for entity_key in sorted(entities, key=sort_key):
843 if entity_prefix and not entity_key.startswith(entity_prefix):
844 continue
845 if entity_key not in seen:
846 seen.add(entity_key)
847 if "wannabe" in entity_key:
848 match_elem["wannabe"].append(entity_key)
849 else:
850 match_elem["existing"].append(entity_key)
851 return match_elem
853 def __tree_traverse(self, tree: dict, key: str, values: List[Tuple]) -> None:
854 for k, v in tree.items():
855 if k == key:
856 values.append(v)
857 elif isinstance(v, dict):
858 found = self.__tree_traverse(v, key, values)
859 if found is not None:
860 values.append(found)
862 def get_preexisting_entities(self) -> None:
863 for entity_metaid in self.entity_store:
864 if "wannabe" not in entity_metaid:
865 self.preexisting_entities.add(entity_metaid)
866 for entity_id_literal in self.entity_store.get_ids(entity_metaid):
867 preexisting_entity_id_metaid = self.entity_store.get_id_metaid(entity_id_literal)
868 if preexisting_entity_id_metaid:
869 self.preexisting_entities.add(preexisting_entity_id_metaid)
870 for _, roles in self.ardict.items():
871 for _, ar_ras in roles.items():
872 for ar_ra in ar_ras:
873 if "wannabe" not in ar_ra[1]:
874 self.preexisting_entities.add(ar_ra[0])
875 for venue_metaid, vi in self.vvi.items():
876 if "wannabe" not in venue_metaid:
877 wannabe_preexisting_vis = list()
878 self.__tree_traverse(vi, "id", wannabe_preexisting_vis)
879 self.preexisting_entities.update(
880 {
881 vi_metaid
882 for vi_metaid in wannabe_preexisting_vis
883 if "wannabe" not in vi_metaid
884 }
885 )
886 for _, re_metaid in self.remeta.items():
887 re_id = re_metaid[0]
888 if not re_id.startswith("re/"):
889 re_id = f"re/{re_id}"
890 self.preexisting_entities.add(re_id)
892 def meta_maker(self):
893 """
894 Converts temporary wannabe identifiers to final MetaIDs.
895 Assigns final MetaIDs to wannabe entities and resolves ardict.
896 VolIss is also resolved from vvi.
897 """
898 for identifier in list(self.entity_store):
899 if identifier.startswith("br/") and "wannabe" in identifier:
900 count = self._add_number("br")
901 target_meta = f"br/{self.prefix}{count}"
902 self.entity_store.assign_meta(identifier, target_meta)
903 elif identifier.startswith("ra/") and "wannabe" in identifier:
904 count = self._add_number("ra")
905 target_meta = f"ra/{self.prefix}{count}"
906 self.entity_store.assign_meta(identifier, target_meta)
908 resolved_ardict: dict[str, dict[str, list]] = {}
909 for ar_id in self.ardict:
910 br_key = self.entity_store.find(ar_id)
911 if br_key not in resolved_ardict:
912 resolved_ardict[br_key] = {"author": [], "editor": [], "publisher": []}
913 for role_type in ["author", "editor", "publisher"]:
914 for ar_metaid, agent_id in self.ardict[ar_id][role_type]:
915 resolved_ra_metaid = self.entity_store.find(agent_id)
916 resolved_ardict[br_key][role_type].append((ar_metaid, resolved_ra_metaid))
917 self.ardict = resolved_ardict
919 self.VolIss = dict()
920 if self.vvi:
921 for venue_meta in self.vvi:
922 venue_issue = self.vvi[venue_meta]["issue"]
923 if venue_issue:
924 for issue in venue_issue:
925 issue_id = venue_issue[issue]["id"]
926 if "wannabe" in issue_id:
927 self.vvi[venue_meta]["issue"][issue]["id"] = str(
928 self.entity_store.find(issue_id)
929 )
931 venue_volume = self.vvi[venue_meta]["volume"]
932 if venue_volume:
933 for volume in venue_volume:
934 volume_id = venue_volume[volume]["id"]
935 if "wannabe" in volume_id:
936 self.vvi[venue_meta]["volume"][volume]["id"] = str(
937 self.entity_store.find(volume_id)
938 )
939 if venue_volume[volume]["issue"]:
940 volume_issue = venue_volume[volume]["issue"]
941 for issue in volume_issue:
942 volume_issue_id = volume_issue[issue]["id"]
943 if "wannabe" in volume_issue_id:
944 self.vvi[venue_meta]["volume"][volume][
945 "issue"
946 ][issue]["id"] = str(
947 self.entity_store.find(volume_issue_id)
948 )
949 if "wannabe" in venue_meta:
950 br_meta = self.entity_store.find(venue_meta)
951 self._merge_VolIss_with_vvi(br_meta, venue_meta)
952 else:
953 self._merge_VolIss_with_vvi(venue_meta, venue_meta)
955 def enrich(self, task_id=None):
956 """
957 This method replaces the wannabeID placeholders with the
958 actual data and MetaIDs as a result of the deduplication process.
959 """
960 for row in self.data:
961 metaid = row["id"]
962 if "wannabe" in row["id"]:
963 metaid = self.entity_store.find(row["id"])
964 if row["page"] and (metaid not in self.remeta):
965 re_meta = self.finder.retrieve_re_from_br_meta(metaid)
966 if re_meta:
967 self.remeta[metaid] = re_meta
968 row["page"] = re_meta[1]
969 else:
970 count = "re/" + self.prefix + str(self._add_number("re"))
971 page = row["page"]
972 self.remeta[metaid] = (count, page)
973 row["page"] = page
974 elif metaid in self.remeta:
975 row["page"] = self.remeta[metaid][1]
976 ids_with_omid = self.entity_store.get_ids(metaid) | {_omid(metaid)}
977 row["id"] = " ".join(ids_with_omid)
978 row["title"] = self.entity_store.get_title(metaid)
979 venue_metaid = None
980 if row["venue"]:
981 venue = row["venue"]
982 if "wannabe" in venue:
983 venue_metaid = self.entity_store.find(venue)
984 else:
985 venue_metaid = venue
986 venue_ids_with_omid = self.entity_store.get_ids(venue_metaid) | {_omid(venue_metaid)}
987 row["venue"] = self.build_name_ids_string(
988 self.entity_store.get_title(venue_metaid), venue_ids_with_omid
989 )
990 br_key_for_editor = get_edited_br_metaid(row, metaid, venue_metaid)
991 self.ra_update(row, metaid, "author")
992 self.ra_update(row, metaid, "publisher")
993 self.ra_update(row, br_key_for_editor, "editor")
994 if self.progress and task_id is not None:
995 self.progress.advance(task_id)
997 @staticmethod
998 def name_check(ts_name, name):
999 if "," in ts_name:
1000 names = ts_name.split(",")
1001 if names[0] and not names[1].strip():
1002 if "," in name:
1003 gname = name.split(", ")[1]
1004 if gname.strip():
1005 ts_name = names[0] + ", " + gname
1006 return ts_name
1008 def _read_number(self, entity_type: str) -> int:
1009 return self.counter_handler.read_counter(
1010 entity_type, supplier_prefix=self.prefix
1011 )
1013 def _add_number(self, entity_type: str) -> int:
1014 return self.counter_handler.increment_counter(
1015 entity_type, supplier_prefix=self.prefix
1016 )
1018 def __update_id_and_entity_store(
1019 self,
1020 existing_ids: list,
1021 metaval: str,
1022 ) -> None:
1023 for identifier in existing_ids:
1024 if self.entity_store.get_id_metaid(identifier[1]) is None:
1025 self.entity_store.set_id_metaid(identifier[1], identifier[0])
1026 if identifier[1] not in self.entity_store.get_ids(metaval):
1027 self.entity_store.add_id(metaval, identifier[1])
1029 def indexer(self, path_csv: str | None = None) -> None:
1030 """
1031 Transform internal dicts (idra, idbr, ardict, remeta) to list-of-dicts format
1032 for Creator consumption. Optionally saves the enriched CSV file.
1034 :params path_csv: Directory path for the enriched CSV output (optional)
1035 :type path_csv: str
1036 """
1037 self.index_id_ra = list()
1038 self.index_id_br = list()
1039 id_metaids = self.entity_store.get_id_metaids()
1040 for literal, metaid in id_metaids.items():
1041 entities = self.entity_store.find_entities(literal)
1042 has_br = any(e.startswith("br/") for e in entities)
1043 has_ra = any(e.startswith("ra/") for e in entities)
1044 if has_br:
1045 self.index_id_br.append({"id": str(literal), "meta": str(metaid)})
1046 if has_ra:
1047 self.index_id_ra.append({"id": str(literal), "meta": str(metaid)})
1048 if not self.index_id_br:
1049 self.index_id_br.append({"id": "", "meta": ""})
1050 if not self.index_id_ra:
1051 self.index_id_ra.append({"id": "", "meta": ""})
1052 self.ar_index = list()
1053 if self.ardict:
1054 for metaid in self.ardict:
1055 index = dict()
1056 index["meta"] = metaid
1057 for role in self.ardict[metaid]:
1058 list_ar = list()
1059 for ar, ra in self.ardict[metaid][role]:
1060 list_ar.append(str(ar) + ", " + str(ra))
1061 index[role] = "; ".join(list_ar)
1062 self.ar_index.append(index)
1063 else:
1064 row = dict()
1065 row["meta"] = ""
1066 row["author"] = ""
1067 row["editor"] = ""
1068 row["publisher"] = ""
1069 self.ar_index.append(row)
1070 self.re_index = list()
1071 if self.remeta:
1072 for x in self.remeta:
1073 r = dict()
1074 r["br"] = x
1075 r["re"] = str(self.remeta[x][0])
1076 self.re_index.append(r)
1077 else:
1078 row = dict()
1079 row["br"] = ""
1080 row["re"] = ""
1081 self.re_index.append(row)
1082 if self.filename and path_csv and self.data:
1083 name = self.filename + ".csv"
1084 data_file = os.path.join(path_csv, name)
1085 write_csv(data_file, self.data)
1087 def _merge_VolIss_with_vvi(
1088 self, VolIss_venue_meta: str, vvi_venue_meta: str
1089 ) -> None:
1090 if VolIss_venue_meta in self.VolIss:
1091 for vvi_v in self.vvi[vvi_venue_meta]["volume"]:
1092 if vvi_v in self.VolIss[VolIss_venue_meta]["volume"]:
1093 self.VolIss[VolIss_venue_meta]["volume"][vvi_v]["issue"].update(
1094 self.vvi[vvi_venue_meta]["volume"][vvi_v]["issue"]
1095 )
1096 else:
1097 self.VolIss[VolIss_venue_meta]["volume"][vvi_v] = self.vvi[
1098 vvi_venue_meta
1099 ]["volume"][vvi_v]
1100 self.VolIss[VolIss_venue_meta]["issue"].update(
1101 self.vvi[vvi_venue_meta]["issue"]
1102 )
1103 else:
1104 self.VolIss[VolIss_venue_meta] = self.vvi[vvi_venue_meta]
1106 def __update_id_count(self, identifier):
1107 schema, value = identifier.split(":", maxsplit=1)
1108 existing_metaid = self.finder.retrieve_metaid_from_id(schema, value)
1110 if existing_metaid:
1111 self.entity_store.set_id_metaid(identifier, existing_metaid)
1112 else:
1113 count = self._add_number("id")
1114 self.entity_store.set_id_metaid(identifier, f"id/{self.prefix}{count}")
1116 def merge(
1117 self,
1118 metaval: str,
1119 old_meta: str,
1120 temporary_name: str,
1121 ) -> None:
1122 for sid in self.entity_store.get_ids(old_meta):
1123 self.entity_store.add_id(metaval, sid)
1124 if not self.entity_store.get_title(metaval):
1125 title = self.entity_store.get_title(old_meta) or temporary_name
1126 self.entity_store.set_title(metaval, title)
1127 self.entity_store.update_id_entity(old_meta, metaval)
1128 self.entity_store.merge(metaval, old_meta)
1129 self.entity_store.remove_entity(old_meta)
1131 def merge_entities_in_csv(
1132 self,
1133 idslist: list,
1134 metaval: str,
1135 name: str,
1136 ) -> None:
1137 entity_type = metaval.split("/")[0] if "/" in metaval else ""
1138 found_others = self._local_match(idslist, entity_type)
1139 if found_others["wannabe"]:
1140 for old_meta in found_others["wannabe"]:
1141 self.merge(metaval, old_meta, name)
1142 entry_ids = self.entity_store.get_ids(metaval)
1143 for identifier in idslist:
1144 if identifier not in entry_ids:
1145 self.entity_store.add_id(metaval, identifier)
1146 if self.entity_store.get_id_metaid(identifier) is None:
1147 self.__update_id_count(identifier)
1148 if not self.entity_store.get_title(metaval) and name:
1149 self.entity_store.set_title(metaval, name)
1152 def id_worker(
1153 self,
1154 col_name,
1155 name,
1156 idslist: List[str],
1157 metaval: str,
1158 ra_ent=False,
1159 br_ent=False,
1160 vvi_ent=False,
1161 publ_entity=False,
1162 ):
1163 entity_type = "ra" if ra_ent else "br"
1164 if metaval:
1165 if metaval in self.entity_store:
1166 self.merge_entities_in_csv(idslist, metaval, name)
1167 else:
1168 found_meta_ts: tuple[str, list[tuple[str, str]], bool] = ("", [], False)
1169 if ra_ent:
1170 found_meta_ts = self.finder.retrieve_ra_from_meta(metaval)
1171 elif br_ent:
1172 found_meta_ts = self.finder.retrieve_br_from_meta(metaval)
1173 if found_meta_ts[2]:
1174 title = self.name_check(found_meta_ts[0], name) if col_name in ("author", "editor") else found_meta_ts[0]
1175 self.entity_store.add_entity(metaval, title)
1176 existing_ids = found_meta_ts[1]
1177 self.__update_id_and_entity_store(existing_ids, metaval)
1178 self.merge_entities_in_csv(idslist, metaval, name)
1179 else:
1180 metaid_uri = f"{self.base_iri}/{metaval}"
1181 merged_metaval = self.finder.retrieve_metaid_from_merged_entity(
1182 metaid_uri=metaid_uri, prov_config=self.prov_config
1183 )
1184 metaval = f"{entity_type}/{merged_metaval}" if merged_metaval else ""
1185 if idslist and not metaval:
1186 local_match = self._local_match(idslist, entity_type)
1187 if local_match["existing"]:
1188 if len(local_match["existing"]) > 1:
1189 return self.conflict(idslist, name, entity_type, col_name)
1190 elif len(local_match["existing"]) == 1:
1191 metaval = str(local_match["existing"][0])
1192 entry_ids = self.entity_store.get_ids(metaval)
1193 suspect_ids = [i for i in idslist if i not in entry_ids]
1194 if suspect_ids:
1195 sparql_match = self.finder_sparql(
1196 suspect_ids,
1197 br=br_ent,
1198 ra=ra_ent,
1199 vvi=vvi_ent,
1200 publ=publ_entity,
1201 )
1202 if len(sparql_match) > 1:
1203 return self.conflict(idslist, name, entity_type, col_name)
1204 elif local_match["wannabe"]:
1205 metaval = str(local_match["wannabe"].pop(0))
1206 for old_meta in local_match["wannabe"]:
1207 self.merge(metaval, old_meta, name)
1208 entry_ids = self.entity_store.get_ids(metaval)
1209 suspect_ids = [i for i in idslist if i not in entry_ids]
1210 if suspect_ids:
1211 sparql_match = self.finder_sparql(
1212 suspect_ids, br=br_ent, ra=ra_ent, vvi=vvi_ent, publ=publ_entity
1213 )
1214 if sparql_match:
1215 # if 'wannabe' not in metaval or len(sparql_match) > 1:
1216 # # Two entities previously disconnected on the triplestore now become connected
1217 # # !
1218 # return self.conflict(idslist, name, id_dict, col_name)
1219 # else:
1220 # Collect all existing IDs from all matches
1221 existing_ids = []
1222 for match in sparql_match:
1223 existing_ids.extend(match[2])
1225 # new_idslist = [x[1] for x in existing_ids]
1226 # new_sparql_match = self.finder_sparql(new_idslist, br=br_ent, ra=ra_ent, vvi=vvi_ent, publ=publ_entity)
1227 # if len(new_sparql_match) > 1:
1228 # # Two entities previously disconnected on the triplestore now become connected
1229 # # !
1230 # return self.conflict(idslist, name, id_dict, col_name)
1231 # else:
1232 # 4 Merge data from EntityA (CSV) with data from EntityX (CSV) (it has already happened in # 5), update both with data from EntityA (RDF)
1233 old_metaval = metaval
1234 metaval = sparql_match[0][0]
1235 self.entity_store.add_entity(metaval, sparql_match[0][1] or "")
1236 self.__update_id_and_entity_store(existing_ids, metaval)
1237 self.merge(metaval, old_metaval, sparql_match[0][1])
1238 else:
1239 sparql_match = self.finder_sparql(
1240 idslist, br=br_ent, ra=ra_ent, vvi=vvi_ent, publ=publ_entity
1241 )
1242 # if len(sparql_match) > 1:
1243 # # !
1244 # return self.conflict(idslist, name, id_dict, col_name)
1245 # elif len(sparql_match) == 1:
1246 if sparql_match:
1247 # Collect all existing IDs from all matches
1248 existing_ids = []
1249 for match in sparql_match:
1250 existing_ids.extend(match[2])
1252 # new_idslist = [x[1] for x in existing_ids]
1253 # new_sparql_match = self.finder_sparql(new_idslist, br=br_ent, ra=ra_ent, vvi=vvi_ent, publ=publ_entity)
1254 # if len(new_sparql_match) > 1:
1255 # # Two entities previously disconnected on the triplestore now become connected
1256 # # !
1257 # return self.conflict(idslist, name, id_dict, col_name)
1258 # 2 Retrieve EntityA data in triplestore to update EntityA inside CSV
1259 # 3 CONFLICT beteen MetaIDs. MetaID specified in EntityA inside CSV has precedence.
1260 # elif len(new_sparql_match) == 1:
1261 metaval = sparql_match[0][0]
1262 title = self.name_check(sparql_match[0][1], name) if col_name in ("author", "editor") else sparql_match[0][1]
1263 self.entity_store.add_entity(metaval, title or name)
1264 self.__update_id_and_entity_store(existing_ids, metaval)
1265 else:
1266 # 1 EntityA is a new one
1267 metaval = self.new_entity(name, entity_type)
1268 entry_ids = self.entity_store.get_ids(metaval)
1269 for identifier in idslist:
1270 if self.entity_store.get_id_metaid(identifier) is None:
1271 self.__update_id_count(identifier)
1272 if identifier not in entry_ids:
1273 self.entity_store.add_id(metaval, identifier)
1274 if not self.entity_store.get_title(metaval) and name:
1275 self.entity_store.set_title(metaval, name)
1276 # 1 EntityA is a new one
1277 if not idslist and not metaval:
1278 metaval = self.new_entity(name, entity_type)
1279 return metaval
1281 def new_entity(self, name: str, entity_type: str) -> str:
1282 metaval = f"{entity_type}/wannabe_{self.wnb_cnt}"
1283 self.wnb_cnt += 1
1284 self.entity_store.add_entity(metaval, name)
1285 return metaval
1287 def volume_issue(
1288 self,
1289 meta: str,
1290 path: dict,
1291 value: str,
1292 row: Dict[str, str],
1293 ) -> None:
1294 if "wannabe" not in meta:
1295 if value in path:
1296 if "wannabe" in path[value]["id"]:
1297 old_meta = path[value]["id"]
1298 self.merge(meta, old_meta, row["title"])
1299 path[value]["id"] = meta
1300 else:
1301 path[value] = {"id": meta}
1302 if "issue" not in path:
1303 path[value]["issue"] = {}
1304 else:
1305 if value in path:
1306 if "wannabe" in path[value]["id"]:
1307 old_meta = path[value]["id"]
1308 if meta != old_meta:
1309 self.merge(meta, old_meta, row["title"])
1310 path[value]["id"] = meta
1311 else:
1312 old_meta = path[value]["id"]
1313 if "wannabe" not in old_meta and old_meta not in self.entity_store:
1314 br4dict = self.finder.retrieve_br_from_meta(old_meta)
1315 self.entity_store.add_entity(old_meta, br4dict[0] if br4dict else "")
1316 if br4dict:
1317 for x in br4dict[1]:
1318 identifier = x[1]
1319 self.entity_store.add_id(old_meta, identifier)
1320 if self.entity_store.get_id_metaid(identifier) is None:
1321 self.entity_store.set_id_metaid(identifier, x[0])
1322 self.merge(old_meta, meta, row["title"])
1323 else:
1324 path[value] = {"id": meta}
1325 if "issue" not in path:
1326 path[value]["issue"] = {}
1328 def merge_duplicate_entities(self, task_id=None) -> None:
1329 """
1330 Merges duplicate entities and propagates data from existing entities to related rows.
1331 For rows referencing existing entities, retrieves data from triplestore (via equalizer)
1332 and updates all related rows (those with matching IDs or merged entity references).
1333 """
1334 # Build index mapping row IDs to row indices for O(1) lookup
1335 id_to_indices: dict[str, list[int]] = {}
1336 for idx, row in enumerate(self.data):
1337 row_id = row["id"]
1338 if row_id not in id_to_indices:
1339 id_to_indices[row_id] = []
1340 id_to_indices[row_id].append(idx)
1342 self.rowcnt = 0
1343 for row in self.data:
1344 row_id = row["id"]
1345 if "wannabe" not in row_id:
1346 self.equalizer(row, row_id)
1347 related_indices: set[int] = set()
1348 if row_id in id_to_indices:
1349 related_indices.update(id_to_indices[row_id])
1350 for other_id in self.entity_store.get_merged(row_id):
1351 if other_id in id_to_indices:
1352 related_indices.update(id_to_indices[other_id])
1353 related_indices.discard(self.rowcnt)
1354 for other_idx in related_indices:
1355 other_row = self.data[other_idx]
1356 for field in row:
1357 if row[field] and row[field] != other_row[field]:
1358 other_row[field] = row[field]
1359 if self.progress and task_id is not None:
1360 self.progress.advance(task_id)
1361 self.rowcnt += 1
1363 def extract_name_and_ids(self, venue_str: str) -> Tuple[str, List[str]]:
1364 """
1365 Extracts the name and IDs from the venue string.
1367 :params venue_str: the venue string
1368 :type venue_str: str
1369 :returns: Tuple[str, List[str]] -- the name and list of IDs extracted from the venue string
1370 """
1371 name, ids_str = split_name_and_ids(venue_str)
1372 return name.strip(), ids_str.split()
1374 def equalizer(self, row: Dict[str, str], metaval: str) -> None:
1375 """
1376 Given a CSV row and its MetaID, equates the information present in the CSV
1377 with that present on the triplestore. Triplestore data takes precedence.
1378 """
1379 known_data = self.finder.retrieve_br_info_from_meta(metaval)
1380 try:
1381 known_data["author"] = self.__get_resp_agents(metaval, "author")
1382 except ValueError:
1383 print(row)
1384 raise (ValueError)
1385 known_data["editor"] = self.__get_resp_agents(metaval, "editor")
1386 known_data["publisher"] = self.finder.retrieve_publisher_from_br_metaid(metaval)
1387 for datum in ["pub_date", "type", "volume", "issue"]:
1388 if known_data[datum]:
1389 row[datum] = known_data[datum]
1390 for datum in ["author", "editor", "publisher"]:
1391 if known_data[datum] and not row[datum]:
1392 row[datum] = known_data[datum]
1393 if known_data["venue"]:
1394 current_venue = row["venue"]
1395 known_venue = known_data["venue"]
1397 if current_venue:
1398 current_venue_name, current_venue_ids = self.extract_name_and_ids(
1399 current_venue
1400 )
1401 known_venue_name, known_venue_ids = self.extract_name_and_ids(
1402 known_venue
1403 )
1405 current_venue_ids_set = set(current_venue_ids)
1406 known_venue_ids_set = set(known_venue_ids)
1408 common_ids = current_venue_ids_set.intersection(known_venue_ids_set)
1410 if common_ids:
1411 merged_ids = current_venue_ids_set.union(known_venue_ids_set)
1412 row["venue"] = (
1413 f"{known_venue_name} [{' '.join(sorted(merged_ids))}]"
1414 )
1415 else:
1416 row["venue"] = known_venue
1417 else:
1418 row["venue"] = known_venue
1419 if known_data["page"]:
1420 row["page"] = known_data["page"][1]
1421 self.remeta[metaval] = known_data["page"]
1423 def __get_resp_agents(self, metaid: str, column: str) -> str:
1424 resp_agents = self.finder.retrieve_ra_sequence_from_br_meta(metaid, column)
1425 output = ""
1426 if resp_agents:
1427 full_resp_agents = list()
1428 for item in resp_agents:
1429 for _, resp_agent in item.items():
1430 author_name = resp_agent[0]
1431 ids = [_omid(resp_agent[2])]
1432 ids.extend([id[1] for id in resp_agent[1]])
1433 author_ids = "[" + " ".join(ids) + "]"
1434 full_resp_agent = author_name + " " + author_ids
1435 full_resp_agents.append(full_resp_agent)
1436 output = "; ".join(full_resp_agents)
1437 return output
1440def is_a_valid_row(row: Dict[str, str]) -> bool:
1441 """
1442 This method discards invalid rows in the input CSV file.
1444 :params row: a dictionary representing a CSV row
1445 :type row: Dict[str, str]
1446 :returns: bool -- This method returns True if the row is valid, False if it is invalid.
1447 """
1448 br_type = " ".join((row["type"].lower()).split())
1449 br_title = row["title"]
1450 br_volume = row["volume"]
1451 br_issue = row["issue"]
1452 br_venue = row["venue"]
1453 if row["id"]:
1454 if (br_volume or br_issue) and (not br_type or not br_venue):
1455 return False
1456 return True
1457 if all(not row[value] for value in row):
1458 return False
1459 br_author = row["author"]
1460 br_editor = row["editor"]
1461 br_pub_date = row["pub_date"]
1462 if not br_type or br_type in {
1463 "book",
1464 "data file",
1465 "dataset",
1466 "dissertation",
1467 "edited book",
1468 "journal article",
1469 "monograph",
1470 "other",
1471 "peer review",
1472 "posted content",
1473 "web content",
1474 "proceedings article",
1475 "report",
1476 "reference book",
1477 }:
1478 is_a_valid_row = (
1479 True if br_title and br_pub_date and (br_author or br_editor) else False
1480 )
1481 elif br_type in {
1482 "book chapter",
1483 "book part",
1484 "book section",
1485 "book track",
1486 "component",
1487 "reference entry",
1488 }:
1489 is_a_valid_row = True if br_title and br_venue else False
1490 elif br_type in {
1491 "book series",
1492 "book set",
1493 "journal",
1494 "proceedings",
1495 "proceedings series",
1496 "report series",
1497 "standard",
1498 "standard series",
1499 }:
1500 is_a_valid_row = True if br_title else False
1501 elif br_type == "journal volume":
1502 is_a_valid_row = True if br_venue and (br_volume or br_title) else False
1503 elif br_type == "journal issue":
1504 is_a_valid_row = True if br_venue and (br_issue or br_title) else False
1505 else:
1506 is_a_valid_row = False
1507 return is_a_valid_row
1510def get_edited_br_metaid(row: dict, metaid: str, venue_metaid: str | None) -> str:
1511 if row["author"] and row["venue"] and row["type"] in CONTAINER_EDITOR_TYPES and venue_metaid:
1512 return venue_metaid
1513 return metaid