Coverage for oc_meta / core / creator.py: 100%
400 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
1# SPDX-FileCopyrightText: 2019 Silvio Peroni <silvio.peroni@unibo.it>
2# SPDX-FileCopyrightText: 2019-2020 Fabio Mariani <fabio.mariani555@gmail.com>
3# SPDX-FileCopyrightText: 2021 Simone Persiani <iosonopersia@gmail.com>
4# SPDX-FileCopyrightText: 2021-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
5#
6# SPDX-License-Identifier: ISC
8from __future__ import annotations
10from typing import TYPE_CHECKING, List
12from oc_meta.core.curator import get_edited_br_metaid
13from oc_meta.lib.finder import ResourceFinder
14from oc_meta.lib.master_of_regex import (
15 RE_COMMA_AND_SPACES,
16 RE_ONE_OR_MORE_SPACES,
17 RE_SEMICOLON_IN_PEOPLE_FIELD,
18 split_name_and_ids,
19)
20from oc_ocdm.counter_handler.redis_counter_handler import RedisCounterHandler
21from oc_ocdm.graph import GraphSet
22from oc_ocdm.graph.entities.bibliographic import BibliographicResource
23from oc_ocdm.graph.entities.bibliographic_entity import BibliographicEntity
24from oc_ocdm.support import create_date
26if TYPE_CHECKING:
27 from rich.progress import Progress
30class Creator(object):
31 def __init__(
32 self,
33 data: list,
34 finder: ResourceFinder,
35 base_iri: str,
36 counter_handler: RedisCounterHandler,
37 supplier_prefix: str,
38 resp_agent: str,
39 ra_index: list,
40 br_index: list,
41 re_index_csv: list,
42 ar_index_csv: list,
43 vi_index: dict,
44 silencer: list | None = None,
45 progress: Progress | None = None,
46 ):
47 self.url = base_iri
48 self.progress = progress
49 self.setgraph = GraphSet(
50 self.url,
51 supplier_prefix=supplier_prefix,
52 wanted_label=False,
53 custom_counter_handler=counter_handler,
54 )
55 self.resp_agent = resp_agent
56 self.finder = finder
58 self.ra_id_schemas = {"crossref", "orcid", "viaf", "wikidata", "ror"}
59 self.br_id_schemas = {
60 "arxiv",
61 "doi",
62 "issn",
63 "isbn",
64 "jid",
65 "openalex",
66 "pmid",
67 "pmcid",
68 "url",
69 "wikidata",
70 "wikipedia",
71 }
72 self.schemas = self.ra_id_schemas.union(self.br_id_schemas)
74 self.ra_index = self.indexer_id(ra_index)
75 self.br_index = self.indexer_id(br_index)
76 self.re_index = self.index_re(re_index_csv)
77 self.ar_index = self.index_ar(ar_index_csv)
78 self.vi_index = vi_index
79 self.data = data
80 self.counter_handler = counter_handler
81 self.silencer = silencer or []
83 _PRO_IS_DOC_CONTEXT_FOR = "http://purl.org/spar/pro/isDocumentContextFor"
84 _PRO_WITH_ROLE = "http://purl.org/spar/pro/withRole"
85 _PRO_AUTHOR = "http://purl.org/spar/pro/author"
86 _PRO_EDITOR = "http://purl.org/spar/pro/editor"
87 _PRO_PUBLISHER = "http://purl.org/spar/pro/publisher"
89 def _has_existing_roles(self, br_uri: str) -> dict:
90 has_roles = {
91 "author": False,
92 "editor": False,
93 "publisher": False
94 }
96 for ar_uri in self.finder._get_objects(br_uri, self._PRO_IS_DOC_CONTEXT_FOR):
97 for role in self.finder._get_objects(ar_uri, self._PRO_WITH_ROLE):
98 if role == self._PRO_AUTHOR:
99 has_roles["author"] = True
100 elif role == self._PRO_EDITOR:
101 has_roles["editor"] = True
102 elif role == self._PRO_PUBLISHER:
103 has_roles["publisher"] = True
105 return has_roles
107 def creator(self, source=None):
108 self.src = source
109 task_id = None
110 if self.progress:
111 task_id = self.progress.add_task(
112 " [dim]Creating RDF entities[/dim]", total=len(self.data)
113 )
114 for row in self.data:
115 self.row_meta = ""
116 self.venue_meta = ""
117 ids = row["id"]
118 title = row["title"]
119 authors = row["author"]
120 pub_date = row["pub_date"]
121 venue = row["venue"]
122 vol = row["volume"]
123 issue = row["issue"]
124 page = row["page"]
125 self.type = row["type"]
126 publisher = row["publisher"]
127 editor = row["editor"]
128 self.venue_graph = None
129 self.vol_graph = None
130 self.issue_graph = None
131 self.id_action(ids)
132 self.vvi_action(venue, vol, issue)
133 self.title_action(title)
135 br_uri = f"{self.url}{self.row_meta}"
136 br_is_preexisting = br_uri in self.finder
138 skip_author = False
139 skip_publisher = False
140 skip_editor = False
142 if br_is_preexisting:
143 existing_roles = self._has_existing_roles(br_uri)
145 skip_author = "author" in self.silencer and existing_roles["author"]
146 skip_publisher = "publisher" in self.silencer and existing_roles["publisher"]
147 skip_editor = "editor" in self.silencer and existing_roles["editor"]
149 if not skip_author:
150 self.author_action(authors)
152 self.pub_date_action(pub_date)
153 self.page_action(page)
154 self.type_action(self.type)
156 if not skip_publisher:
157 self.publisher_action(publisher)
159 if not skip_editor:
160 self.editor_action(editor, row)
162 if self.progress and task_id is not None:
163 self.progress.advance(task_id)
165 if self.progress and task_id is not None:
166 self.progress.remove_task(task_id)
167 return self.setgraph
169 @staticmethod
170 def index_re(id_index):
171 index = dict()
172 for row in id_index:
173 index[row["br"]] = row["re"]
174 return index
176 @staticmethod
177 def index_ar(id_index):
178 index = dict()
179 for row in id_index:
180 index[row["meta"]] = dict()
181 index[row["meta"]]["author"] = Creator.__ar_worker(row["author"])
182 index[row["meta"]]["editor"] = Creator.__ar_worker(row["editor"])
183 index[row["meta"]]["publisher"] = Creator.__ar_worker(row["publisher"])
184 return index
186 @staticmethod
187 def __ar_worker(s: str) -> dict:
188 if s:
189 ar_dict = dict()
190 couples = s.split("; ")
191 for c in couples:
192 cou = c.split(", ")
193 ar_dict[cou[1]] = cou[0]
194 return ar_dict
195 else:
196 return dict()
198 def indexer_id(self, csv_index):
199 index = {}
200 for row in csv_index:
201 if row_id := row["id"]:
202 schema, value = row_id.split(":", 1)
203 if schema in self.schemas:
204 if schema not in index:
205 index[schema] = {}
206 index[schema][value] = row["meta"]
207 return index
209 def id_action(self, ids):
210 idslist = RE_ONE_OR_MORE_SPACES.split(ids)
211 # publication id
212 for identifier in idslist:
213 if "omid:" in identifier:
214 identifier = identifier.replace("omid:", "")
215 url = self.url + identifier
216 preexisting_entity = url in self.finder
217 self.row_meta = identifier
218 preexisting_graph = (
219 self.finder.graph.subgraph(url)
220 if preexisting_entity
221 else None
222 )
223 self.br_graph = self.setgraph.add_br(
224 self.resp_agent,
225 source=self.src,
226 res=url,
227 preexisting_graph=preexisting_graph,
228 )
229 for identifier in idslist:
230 self.id_creator(self.br_graph, identifier, ra=False)
232 def title_action(self, title):
233 if title:
234 self.br_graph.has_title(title)
236 def author_action(self, authors):
237 if authors:
238 authorslist = RE_SEMICOLON_IN_PEOPLE_FIELD.split(authors)
239 aut_role_list = list()
240 for aut in authorslist:
241 author_name, aut_id = split_name_and_ids(aut)
242 aut_id_list = aut_id.split(" ")
243 author_ra = None
244 aut_meta = ""
245 for identifier in aut_id_list:
246 if "omid:" in identifier:
247 identifier = str(identifier).replace("omid:", "")
248 url = self.url + identifier
249 preexisting_entity = url in self.finder
250 aut_meta = identifier
251 preexisting_graph = (
252 self.finder.graph.subgraph(url)
253 if preexisting_entity
254 else None
255 )
256 author_ra = self.setgraph.add_ra(
257 self.resp_agent,
258 source=self.src,
259 res=url,
260 preexisting_graph=preexisting_graph,
261 )
262 if "," in author_name:
263 author_name_splitted = RE_COMMA_AND_SPACES.split(
264 author_name
265 )
266 first_name = author_name_splitted[1]
267 last_name = author_name_splitted[0]
268 if first_name.strip():
269 author_ra.has_given_name(first_name)
270 author_ra.has_family_name(last_name)
271 else:
272 author_ra.has_name(author_name)
273 assert author_ra is not None
274 for identifier in aut_id_list:
275 self.id_creator(author_ra, identifier, ra=True)
276 ar_meta = self.ar_index[self.row_meta]["author"][aut_meta]
277 ar_url = self.url + ar_meta
278 preexisting_entity = ar_url in self.finder
279 preexisting_graph = (
280 self.finder.graph.subgraph(ar_url)
281 if preexisting_entity
282 else None
283 )
284 author_ra_role = self.setgraph.add_ar(
285 self.resp_agent,
286 source=self.src,
287 res=ar_url,
288 preexisting_graph=preexisting_graph,
289 )
290 author_ra_role.create_author()
291 self.br_graph.has_contributor(author_ra_role)
292 author_ra_role.is_held_by(author_ra)
293 aut_role_list.append(author_ra_role)
294 if len(aut_role_list) > 1:
295 aut_role_list[-2].has_next(author_ra_role)
297 def pub_date_action(self, pub_date):
298 if pub_date:
299 datelist: list[int | None] = [int(x) for x in pub_date.split("-")]
300 str_date = create_date(datelist)
301 if str_date:
302 self.br_graph.has_pub_date(str_date)
304 def vvi_action(self, venue, vol, issue):
305 if venue:
306 venue_title, venue_ids = split_name_and_ids(venue)
307 venue_ids_list = venue_ids.split()
308 for identifier in venue_ids_list:
309 if "omid:" in identifier:
310 ven_id = str(identifier).replace("omid:", "")
311 self.venue_meta = ven_id
312 url = self.url + ven_id
313 preexisting_entity = url in self.finder
314 preexisting_graph = (
315 self.finder.graph.subgraph(url)
316 if preexisting_entity
317 else None
318 )
319 self.venue_graph = self.setgraph.add_br(
320 self.resp_agent,
321 source=self.src,
322 res=url,
323 preexisting_graph=preexisting_graph,
324 )
325 venue_type = self.get_venue_type(self.type, venue_ids_list)
326 if venue_type:
327 venue_type = venue_type.replace(" ", "_")
328 getattr(self.venue_graph, f"create_{venue_type}")()
329 self.venue_graph.has_title(venue_title)
330 assert self.venue_graph is not None
331 for identifier in venue_ids_list:
332 self.id_creator(self.venue_graph, identifier, ra=False)
333 if self.type in {"journal article", "journal volume", "journal issue"}:
334 if vol:
335 vol_meta = self.vi_index[self.venue_meta]["volume"][vol]["id"]
336 vol_url = self.url + vol_meta
337 preexisting_entity = vol_url in self.finder
338 preexisting_graph = (
339 self.finder.graph.subgraph(vol_url)
340 if preexisting_entity
341 else None
342 )
343 self.vol_graph = self.setgraph.add_br(
344 self.resp_agent,
345 source=self.src,
346 res=vol_url,
347 preexisting_graph=preexisting_graph,
348 )
349 self.vol_graph.create_volume()
350 self.vol_graph.has_number(vol)
351 if issue:
352 if vol:
353 issue_meta = self.vi_index[self.venue_meta]["volume"][vol][
354 "issue"
355 ][issue]["id"]
356 else:
357 issue_meta = self.vi_index[self.venue_meta]["issue"][issue][
358 "id"
359 ]
360 issue_url = self.url + issue_meta
361 preexisting_entity = issue_url in self.finder
362 preexisting_graph = (
363 self.finder.graph.subgraph(issue_url)
364 if preexisting_entity
365 else None
366 )
367 self.issue_graph = self.setgraph.add_br(
368 self.resp_agent,
369 source=self.src,
370 res=issue_url,
371 preexisting_graph=preexisting_graph,
372 )
373 self.issue_graph.create_issue()
374 self.issue_graph.has_number(issue)
375 if venue and vol and issue:
376 assert self.issue_graph is not None
377 assert self.vol_graph is not None
378 assert self.venue_graph is not None
379 self.br_graph.is_part_of(self.issue_graph)
380 self.issue_graph.is_part_of(self.vol_graph)
381 self.vol_graph.is_part_of(self.venue_graph)
382 elif venue and vol and not issue:
383 assert self.vol_graph is not None
384 assert self.venue_graph is not None
385 self.br_graph.is_part_of(self.vol_graph)
386 self.vol_graph.is_part_of(self.venue_graph)
387 elif venue and not vol and not issue:
388 assert self.venue_graph is not None
389 self.br_graph.is_part_of(self.venue_graph)
390 elif venue and not vol and issue:
391 assert self.issue_graph is not None
392 assert self.venue_graph is not None
393 self.br_graph.is_part_of(self.issue_graph)
394 self.issue_graph.is_part_of(self.venue_graph)
396 @classmethod
397 def get_venue_type(cls, br_type: str, venue_ids: list) -> str:
398 schemas = {venue_id.split(":", maxsplit=1)[0] for venue_id in venue_ids}
399 venue_type = ""
400 if br_type in {"journal article", "journal volume", "journal issue"}:
401 venue_type = "journal"
402 elif br_type in {"book chapter", "book part", "book section", "book track"}:
403 venue_type = "book"
404 elif br_type in {"book", "edited book", "monograph", "reference book"}:
405 venue_type = "book series"
406 elif br_type == "proceedings article":
407 venue_type = "proceedings"
408 elif br_type in {"proceedings", "report", "standard", "series"}:
409 venue_type = "series"
410 elif br_type == "reference entry":
411 venue_type = "reference book"
412 elif br_type == "report series":
413 venue_type = "report series"
414 elif not br_type or br_type in {"dataset", "data file"}:
415 venue_type = ""
416 # Check the type based on the identifier scheme
417 if any(
418 identifier for identifier in venue_ids if not identifier.startswith("omid:")
419 ):
420 if venue_type in {"journal", "book series", "series", "report series"}:
421 if "isbn" in schemas or "issn" not in schemas:
422 # It is undecidable
423 venue_type = ""
424 elif venue_type in {"book", "proceedings"}:
425 if "issn" in schemas or "isbn" not in schemas:
426 venue_type = ""
427 elif venue_type == "reference book":
428 if "isbn" in schemas and "issn" not in schemas:
429 venue_type = "reference book"
430 elif "issn" in schemas and "isbn" not in schemas:
431 venue_type = "journal"
432 elif "issn" in schemas and "isbn" in schemas:
433 venue_type = ""
434 return venue_type
436 def page_action(self, page):
437 if page:
438 re_meta = self.re_index[self.row_meta]
439 re_url = self.url + re_meta
440 preexisting_entity = re_url in self.finder
441 preexisting_graph = (
442 self.finder.graph.subgraph(re_url)
443 if preexisting_entity
444 else None
445 )
446 form = self.setgraph.add_re(
447 self.resp_agent,
448 source=self.src,
449 res=re_url,
450 preexisting_graph=preexisting_graph,
451 )
452 form.has_starting_page(page)
453 form.has_ending_page(page)
454 self.br_graph.has_format(form)
456 _TYPE_TO_METHOD = {
457 "abstract": "create_abstract",
458 "archival document": "create_archival_document",
459 "audio document": "create_audio_document",
460 "book": "create_book",
461 "book chapter": "create_book_chapter",
462 "book part": "create_book_part",
463 "book section": "create_book_section",
464 "book series": "create_book_series",
465 "book set": "create_book_set",
466 "computer program": "create_computer_program",
467 "data file": "create_dataset",
468 "dataset": "create_dataset",
469 "data management plan": "create_data_management_plan",
470 "dissertation": "create_dissertation",
471 "editorial": "create_editorial",
472 "journal": "create_journal",
473 "journal article": "create_journal_article",
474 "journal editorial": "create_journal_editorial",
475 "journal issue": "create_issue",
476 "journal volume": "create_volume",
477 "newspaper": "create_newspaper",
478 "newspaper article": "create_newspaper_article",
479 "newspaper issue": "create_newspaper_issue",
480 "peer review": "create_peer_review",
481 "preprint": "create_preprint",
482 "presentation": "create_presentation",
483 "proceedings": "create_proceedings",
484 "proceedings article": "create_proceedings_article",
485 "reference book": "create_reference_book",
486 "reference entry": "create_reference_entry",
487 "report": "create_report",
488 "report series": "create_report_series",
489 "retraction notice": "create_retraction_notice",
490 "standard": "create_standard",
491 "series": "create_series",
492 "web content": "create_web_content",
493 }
495 def type_action(self, entity_type):
496 method_name = self._TYPE_TO_METHOD.get(entity_type)
497 if method_name:
498 getattr(self.br_graph, method_name)()
500 def publisher_action(self, publisher):
501 if publisher:
502 publishers_list = RE_SEMICOLON_IN_PEOPLE_FIELD.split(publisher)
503 pub_role_list = list()
504 for pub in publishers_list:
505 publ_name, publ_id = split_name_and_ids(pub)
506 publ_id_list = publ_id.split()
507 publisher_ra = None
508 pub_meta = ""
509 for identifier in publ_id_list:
510 if "omid:" in identifier:
511 identifier = str(identifier).replace("omid:", "")
512 pub_meta = identifier
513 url = self.url + identifier
514 preexisting_entity = url in self.finder
515 preexisting_graph = (
516 self.finder.graph.subgraph(url)
517 if preexisting_entity
518 else None
519 )
520 publisher_ra = self.setgraph.add_ra(
521 self.resp_agent,
522 source=self.src,
523 res=url,
524 preexisting_graph=preexisting_graph,
525 )
526 publisher_ra.has_name(publ_name)
527 assert publisher_ra is not None
528 for identifier in publ_id_list:
529 self.id_creator(publisher_ra, identifier, ra=True)
530 ar_meta = self.ar_index[self.row_meta]["publisher"][pub_meta]
531 ar_url = self.url + ar_meta
532 preexisting_entity = ar_url in self.finder
533 preexisting_graph = (
534 self.finder.graph.subgraph(ar_url)
535 if preexisting_entity
536 else None
537 )
538 publ_role = self.setgraph.add_ar(
539 self.resp_agent,
540 source=self.src,
541 res=ar_url,
542 preexisting_graph=preexisting_graph,
543 )
544 publ_role.create_publisher()
545 self.br_graph.has_contributor(publ_role)
546 publ_role.is_held_by(publisher_ra)
547 pub_role_list.append(publ_role)
548 if len(pub_role_list) > 1:
549 pub_role_list[-2].has_next(publ_role)
551 def editor_action(self, editor, row):
552 if editor:
553 editorslist = RE_SEMICOLON_IN_PEOPLE_FIELD.split(editor)
554 edit_role_list = list()
555 for ed in editorslist:
556 editor_name, ed_id = split_name_and_ids(ed)
557 ed_id_list = ed_id.split(" ")
558 editor_ra = None
559 ed_meta = ""
560 for identifier in ed_id_list:
561 if "omid:" in identifier:
562 identifier = str(identifier).replace("omid:", "")
563 ed_meta = identifier
564 url = self.url + identifier
565 preexisting_entity = url in self.finder
566 preexisting_graph = (
567 self.finder.graph.subgraph(url)
568 if preexisting_entity
569 else None
570 )
571 editor_ra = self.setgraph.add_ra(
572 self.resp_agent,
573 source=self.src,
574 res=url,
575 preexisting_graph=preexisting_graph,
576 )
577 if "," in editor_name:
578 editor_name_splitted = RE_COMMA_AND_SPACES.split(
579 editor_name
580 )
581 firstName = editor_name_splitted[1]
582 lastName = editor_name_splitted[0]
583 if firstName.strip():
584 editor_ra.has_given_name(firstName)
585 editor_ra.has_family_name(lastName)
586 else:
587 editor_ra.has_name(editor_name)
588 assert editor_ra is not None
589 for identifier in ed_id_list:
590 self.id_creator(editor_ra, identifier, ra=True)
591 br_key = get_edited_br_metaid(row, self.row_meta, self.venue_meta)
592 ar_meta = self.ar_index[br_key]["editor"][ed_meta]
593 ar_url = self.url + ar_meta
594 preexisting_entity = ar_url in self.finder
595 preexisting_graph = (
596 self.finder.graph.subgraph(ar_url)
597 if preexisting_entity
598 else None
599 )
600 editor_ra_role = self.setgraph.add_ar(
601 self.resp_agent,
602 source=self.src,
603 res=ar_url,
604 preexisting_graph=preexisting_graph,
605 )
606 editor_ra_role.create_editor()
607 br_graphs: List[BibliographicResource] = [
608 g for g in [self.br_graph, self.issue_graph, self.vol_graph, self.venue_graph]
609 if g is not None
610 ]
611 for graph in br_graphs:
612 if br_key == self.__res_metaid(graph):
613 graph.has_contributor(editor_ra_role)
614 editor_ra_role.is_held_by(editor_ra)
615 edit_role_list.append(editor_ra_role)
616 if len(edit_role_list) > 1:
617 edit_role_list[-2].has_next(editor_ra_role)
619 def __res_metaid(self, graph: BibliographicResource):
620 if graph:
621 return graph.res.replace(self.url, "")
623 def id_creator(self, graph: BibliographicEntity, identifier: str, ra: bool) -> None:
624 new_id = None
625 # Skip temporary identifiers - they should not be saved in the final dataset
626 if identifier.startswith("temp:"):
627 return
629 if ra:
630 for ra_id_schema in self.ra_id_schemas:
631 if identifier.startswith(f"{ra_id_schema}:"):
632 identifier = identifier.split(":", 1)[1]
633 res = self.ra_index[ra_id_schema][identifier]
634 url = self.url + res
635 preexisting_entity = url in self.finder
636 preexisting_graph = (
637 self.finder.graph.subgraph(url)
638 if preexisting_entity
639 else None
640 )
641 new_id = self.setgraph.add_id(
642 self.resp_agent,
643 source=self.src,
644 res=url,
645 preexisting_graph=preexisting_graph,
646 )
647 getattr(new_id, f"create_{ra_id_schema}")(identifier)
648 else:
649 for br_id_schema in self.br_id_schemas:
650 if identifier.startswith(f"{br_id_schema}:"):
651 identifier = identifier.split(":", 1)[1]
652 res = self.br_index[br_id_schema][identifier]
653 url = self.url + res
654 preexisting_entity = url in self.finder
655 preexisting_graph = (
656 self.finder.graph.subgraph(url)
657 if preexisting_entity
658 else None
659 )
660 new_id = self.setgraph.add_id(
661 self.resp_agent,
662 source=self.src,
663 res=url,
664 preexisting_graph=preexisting_graph,
665 )
666 getattr(new_id, f"create_{br_id_schema}")(identifier)
667 if new_id:
668 graph.has_identifier(new_id)