Coverage for oc_meta / lib / finder.py: 89%
643 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
1# SPDX-FileCopyrightText: 2022-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5from __future__ import annotations
7import multiprocessing
8from concurrent.futures import ProcessPoolExecutor
9from functools import partial
10from typing import TYPE_CHECKING, Dict, List, Tuple, TypedDict
12import orjson
14if TYPE_CHECKING:
15 from rich.progress import Progress
16from dateutil import parser
17from oc_ocdm.graph.graph_entity import GraphEntity
18from oc_ocdm.prov.prov_entity import ProvEntity
19from oc_ocdm.support import get_resource_number
20from triplelite import RDFTerm, TripleLite
21from sparqlite import SPARQLClient
22from time_agnostic_library.agnostic_entity import AgnosticEntity
23from rich.console import Console
25from oc_meta.constants import QLEVER_BATCH_SIZE, QLEVER_MAX_WORKERS, QLEVER_QUERIES_PER_GROUP
26from oc_meta.lib.sparql import execute_sparql_queries
28_XSD_STRING = "http://www.w3.org/2001/XMLSchema#string"
29_RDF_TYPE = "http://www.w3.org/1999/02/22-rdf-syntax-ns#type"
30_DATACITE = "http://purl.org/spar/datacite/"
32_P_HAS_LITERAL_VALUE = GraphEntity.iri_has_literal_value
33_P_USES_ID_SCHEME = GraphEntity.iri_uses_identifier_scheme
34_P_HAS_IDENTIFIER = GraphEntity.iri_has_identifier
35_P_TITLE = GraphEntity.iri_title
36_P_NAME = GraphEntity.iri_name
37_P_FAMILY_NAME = GraphEntity.iri_family_name
38_P_GIVEN_NAME = GraphEntity.iri_given_name
39_P_IS_DOC_CONTEXT_FOR = GraphEntity.iri_is_document_context_for
40_P_HAS_NEXT = GraphEntity.iri_has_next
41_P_IS_HELD_BY = GraphEntity.iri_is_held_by
42_P_WITH_ROLE = GraphEntity.iri_with_role
43_P_EMBODIMENT = GraphEntity.iri_embodiment
44_P_STARTING_PAGE = GraphEntity.iri_starting_page
45_P_ENDING_PAGE = GraphEntity.iri_ending_page
46_P_PUB_DATE = GraphEntity.iri_has_publication_date
47_P_SEQ_ID = GraphEntity.iri_has_sequence_identifier
48_P_PART_OF = GraphEntity.iri_part_of
49_P_TYPE = _RDF_TYPE
50_T_JOURNAL_VOLUME = GraphEntity.iri_journal_volume
51_T_JOURNAL_ISSUE = GraphEntity.iri_journal_issue
52_T_EXPRESSION = GraphEntity.iri_expression
53_R_AUTHOR = GraphEntity.iri_author
54_R_EDITOR = GraphEntity.iri_editor
55_R_PUBLISHER = GraphEntity.iri_publisher
58class IssueEntry(TypedDict):
59 id: str
62class VolumeEntry(TypedDict):
63 id: str
64 issue: Dict[str, IssueEntry]
67class VenueStructure(TypedDict):
68 issue: Dict[str, IssueEntry]
69 volume: Dict[str, VolumeEntry]
72class ResourceFinder:
74 def __init__(self, ts_url: str, base_iri: str, settings: dict = dict(), meta_config_path: str | None = None, workers: int = 1):
75 self.ts_url = ts_url
76 self.base_iri = base_iri[:-1] if base_iri[-1] == '/' else base_iri
77 self.graph = TripleLite(
78 reverse_index_predicates=frozenset(self._PO_S_INDEXED_PREDICATES)
79 )
80 self.meta_config_path = meta_config_path
81 self.meta_settings = settings
82 self.virtuoso_full_text_search = settings['virtuoso_full_text_search'] if settings and 'virtuoso_full_text_search' in settings else False
83 self.workers = workers
85 _PO_S_INDEXED_PREDICATES = {_P_HAS_LITERAL_VALUE, _P_HAS_IDENTIFIER, _P_PART_OF}
87 def add_triple(self, s: str, p: str, o: str, o_datatype: str = '') -> None:
88 if o_datatype:
89 term = RDFTerm("literal", o, o_datatype)
90 elif o.startswith('http'):
91 term = RDFTerm("uri", o)
92 else:
93 term = RDFTerm("literal", o, _XSD_STRING)
94 self.graph.add((s, p, term))
96 def __contains__(self, uri: str) -> bool:
97 return self.graph.has_subject(uri)
99 def _get_objects(self, subject: str, predicate: str) -> list[str]:
100 return [t.value for t in self.graph.objects(subject, predicate)]
102 def _get_all_po(self, subject: str) -> dict[str, list[str]]:
103 result: dict[str, list[str]] = {}
104 for p, o in self.graph.predicate_objects(subject):
105 result.setdefault(p, []).append(o.value)
106 return result
108 def _get_subjects(self, predicate: str, obj: str) -> set[str]:
109 if predicate == _P_HAS_LITERAL_VALUE:
110 term = RDFTerm("literal", obj, _XSD_STRING)
111 else:
112 term = RDFTerm("uri", obj)
113 return set(self.graph.subjects(predicate, term))
115 # _______________________________BR_________________________________ #
117 def _find_id_uri(self, schema: str, value: str) -> str | None:
118 schema_uri = _DATACITE + schema
119 for id_uri in self._get_subjects(_P_HAS_LITERAL_VALUE, value):
120 schemes = self._get_objects(id_uri, _P_USES_ID_SCHEME)
121 if schemes[0] == schema_uri:
122 return id_uri
123 return None
125 def _collect_entity_ids(self, entity_uri: str, exclude_id_uri: str | None = None) -> list[tuple[str, str]]:
126 result: list[tuple[str, str]] = []
127 for id_uri in self._get_objects(entity_uri, _P_HAS_IDENTIFIER):
128 if id_uri == exclude_id_uri:
129 continue
130 po = self._get_all_po(id_uri)
131 schemes = po.get(_P_USES_ID_SCHEME, [])
132 literals = po.get(_P_HAS_LITERAL_VALUE, [])
133 if not schemes or not literals:
134 raise ValueError(f"Identifier {id_uri} missing schema or literal value")
135 full_id = f'{schemes[0].replace(_DATACITE, "")}:{literals[0]}'
136 result.append((id_uri.replace(f'{self.base_iri}/', ''), full_id))
137 return result
139 def retrieve_br_from_id(self, schema: str, value: str) -> List[Tuple[str, str, list]]:
140 id_uri = self._find_id_uri(schema, value)
141 if not id_uri:
142 return []
143 metaid_id_list = [(id_uri.replace(f'{self.base_iri}/', ''), f'{schema}:{value}')]
144 result_list = []
145 for entity_uri in self._get_subjects(_P_HAS_IDENTIFIER, id_uri):
146 title = ''
147 titles = self._get_objects(entity_uri, _P_TITLE)
148 if titles:
149 title = titles[0]
150 other_ids = self._collect_entity_ids(entity_uri, exclude_id_uri=id_uri)
151 result_list.append((entity_uri.replace(f'{self.base_iri}/', ''), title, metaid_id_list + other_ids))
152 return result_list
154 def retrieve_br_from_meta(self, metaid: str) -> Tuple[str, List[Tuple[str, str]], bool]:
155 metaid_uri = f'{self.base_iri}/{metaid}'
156 po = self._get_all_po(metaid_uri)
157 if not po:
158 return "", [], False
159 title = ''
160 titles = po.get(_P_TITLE, [])
161 if titles:
162 title = titles[0]
163 identifiers = self._collect_entity_ids(metaid_uri)
164 return title, identifiers, True
166 # _______________________________ID_________________________________ #
168 def retrieve_metaid_from_id(self, schema: str, value: str) -> str | None:
169 id_uri = self._find_id_uri(schema, value)
170 if id_uri:
171 return id_uri.replace(f'{self.base_iri}/', '')
172 return None
174 def retrieve_metaid_from_merged_entity(self, metaid_uri: str, prov_config: str) -> str | None:
175 '''
176 It looks for MetaId in the provenance. If the input entity was deleted due to a merge, this function returns the target entity. Otherwise, it returns None.
178 :params metaid_uri: a MetaId URI
179 :type metaid_uri: str
180 :params prov_config: the path of the configuration file required by time-agnostic-library
181 :type prov_config: str
182 :returns str | None: -- It returns the MetaID associated with the target entity after a merge. If there was no merge, it returns None.
183 '''
184 metaval: str | None = None
185 with open(prov_config, 'rb') as f:
186 prov_config_dict = orjson.loads(f.read())
187 agnostic_meta = AgnosticEntity(res=metaid_uri, config=prov_config_dict, include_related_objects=False, include_merged_entities=False, include_reverse_relations=False)
188 agnostic_meta_history = agnostic_meta.get_history(include_prov_metadata=True)
189 meta_history_data = agnostic_meta_history[0][metaid_uri]
190 if meta_history_data:
191 meta_history_metadata = agnostic_meta_history[1][metaid_uri]
192 penultimate_snapshot = sorted(
193 meta_history_metadata.items(),
194 key=lambda x: parser.parse(x[1]['generatedAtTime']).replace(tzinfo=None),
195 reverse=True
196 )[1][0]
197 query_if_it_was_merged = f'''
198 SELECT DISTINCT ?se
199 WHERE {{
200 ?se a <{ProvEntity.iri_entity}>;
201 <{ProvEntity.iri_was_derived_from}> <{penultimate_snapshot}>.
202 }}
203 '''
204 prov_endpoint = prov_config_dict['provenance']['triplestore_urls'][0]
205 with SPARQLClient(prov_endpoint, max_retries=5, backoff_factor=5, timeout=3600) as client:
206 results = client.query(query_if_it_was_merged)['results']['bindings']
207 merged_entities = [se for se in results if metaid_uri not in se['se']['value']]
208 if merged_entities:
209 merged_entity_uri = merged_entities[0]['se']['value']
210 merged_entity_uri = merged_entity_uri.split('/prov/')[0]
211 metaval = merged_entity_uri.split('/')[-1]
212 return metaval
214 # _______________________________RA_________________________________ #
215 def retrieve_ra_from_meta(self, metaid: str) -> Tuple[str, List[Tuple[str, str]], bool]:
216 metaid_uri = f'{self.base_iri}/{metaid}'
217 po = self._get_all_po(metaid_uri)
218 if not po:
219 return '', [], False
220 family_names = po.get(_P_FAMILY_NAME, [])
221 given_names = po.get(_P_GIVEN_NAME, [])
222 names = po.get(_P_NAME, [])
223 full_name = self._construct_full_name(
224 names[0] if names else '',
225 family_names[0] if family_names else '',
226 given_names[0] if given_names else '',
227 )
228 identifiers = self._collect_entity_ids(metaid_uri)
229 return full_name, identifiers, True
231 def retrieve_ra_from_id(self, schema: str, value: str) -> List[Tuple[str, str, list]]:
232 id_uri = self._find_id_uri(schema, value)
233 if not id_uri:
234 return []
235 metaid_id_list: List[Tuple[str, str]] = [(id_uri.replace(f'{self.base_iri}/', ''), f'{schema}:{value}')]
236 result_list = []
237 for entity_uri in self._get_subjects(_P_HAS_IDENTIFIER, id_uri):
238 po = self._get_all_po(entity_uri)
239 names = po.get(_P_NAME, [])
240 family_names = po.get(_P_FAMILY_NAME, [])
241 given_names = po.get(_P_GIVEN_NAME, [])
242 full_name = self._construct_full_name(
243 names[0] if names else '',
244 family_names[0] if family_names else '',
245 given_names[0] if given_names else '',
246 )
247 other_ids = self._collect_entity_ids(entity_uri, exclude_id_uri=id_uri)
248 result_list.append((entity_uri.replace(f'{self.base_iri}/', ''), full_name, metaid_id_list + other_ids))
249 return result_list
251 def _construct_full_name(self, name: str, family_name: str, given_name: str) -> str:
252 if name and not family_name and not given_name:
253 return name
254 elif not name and family_name and not given_name:
255 return f'{family_name},'
256 elif not name and not family_name and given_name:
257 return f', {given_name}'
258 elif not name and family_name and given_name:
259 return f'{family_name}, {given_name}'
260 else:
261 return ''
263 def retrieve_ra_sequence_from_br_meta(self, metaid: str, col_name: str) -> List[Dict[str, tuple]]:
264 if col_name == 'author':
265 role_str = _R_AUTHOR
266 elif col_name == 'editor':
267 role_str = _R_EDITOR
268 else:
269 role_str = _R_PUBLISHER
271 metaid_uri = f'{self.base_iri}/{metaid}'
272 dict_ar: dict[str, dict[str, str]] = {}
274 for ar_uri in self._get_objects(metaid_uri, _P_IS_DOC_CONTEXT_FOR):
275 ar_po = self._get_all_po(ar_uri)
276 roles = ar_po.get(_P_WITH_ROLE, [])
277 if role_str in roles:
278 role_value = ar_uri.replace(f'{self.base_iri}/', '')
279 next_list = ar_po.get(_P_HAS_NEXT, [])
280 next_role = next_list[0].replace(f'{self.base_iri}/', '') if next_list else ''
281 held_by = ar_po.get(_P_IS_HELD_BY, [])
282 ra = held_by[0].replace(f'{self.base_iri}/', '') if held_by else None
283 if ra is not None:
284 dict_ar[role_value] = {'next': next_role, 'ra': ra}
286 all_roles = set(dict_ar.keys())
287 roles_with_next = set(details['next'] for details in dict_ar.values() if details['next'])
288 start_role_candidates = all_roles - roles_with_next
290 MAX_ITERATIONS = 10000
292 if len(all_roles) == 0:
293 return []
295 if len(start_role_candidates) == 0:
296 sorted_ars = sorted(all_roles, key=lambda ar: get_resource_number(f'{self.base_iri}/{ar}'))
297 start_role_candidates = {sorted_ars[0]}
299 if len(start_role_candidates) != 1:
300 chains = []
301 for start_candidate in start_role_candidates:
302 current_role = start_candidate
303 chain: list[dict[str, tuple]] = []
304 visited_roles: set[str] = set()
305 iteration_count = 0
306 while current_role and current_role not in visited_roles and iteration_count < MAX_ITERATIONS:
307 visited_roles.add(current_role)
308 if current_role in dict_ar:
309 ra_info = self.retrieve_ra_from_meta(dict_ar[current_role]['ra'])[0:2]
310 ra_tuple = ra_info + (dict_ar[current_role]['ra'],)
311 chain.append({current_role: ra_tuple})
312 current_role = dict_ar[current_role]['next']
313 else:
314 break
315 iteration_count += 1
316 if iteration_count == MAX_ITERATIONS:
317 print(f"Warning: Possible infinite loop detected for BR: {metaid}, column: {col_name}")
318 return []
319 chains.append(chain)
320 chains.sort(key=lambda chain: (-len(chain), get_resource_number(f'{self.base_iri}/{list(chain[0].keys())[0]}')))
321 try:
322 ordered_ar_list = chains[0]
323 except Exception as e:
324 print(f"\nWarning: Error processing BR: {metaid} for column: {col_name}")
325 print(f"dict_ar: {dict_ar}")
326 print(f"All roles: {all_roles}")
327 print(f"Start role candidates: {start_role_candidates}")
328 print(f"Roles with next: {roles_with_next}")
329 print(f"Error: {str(e)}")
330 return []
331 else:
332 start_role = start_role_candidates.pop()
333 ordered_ar_list: list[dict[str, tuple]] = []
334 current_role = start_role
335 visited_roles: set[str] = set()
336 iteration_count = 0
337 while current_role and current_role not in visited_roles and iteration_count < MAX_ITERATIONS:
338 visited_roles.add(current_role)
339 if current_role in dict_ar:
340 ra_info = self.retrieve_ra_from_meta(dict_ar[current_role]['ra'])[0:2]
341 ra_tuple = ra_info + (dict_ar[current_role]['ra'],)
342 ordered_ar_list.append({current_role: ra_tuple})
343 current_role = dict_ar[current_role]['next']
344 else:
345 break
346 iteration_count += 1
347 if iteration_count == MAX_ITERATIONS:
348 print(f"Warning: Possible infinite loop detected for BR: {metaid}, column: {col_name}")
349 return []
351 return ordered_ar_list
353 def retrieve_re_from_br_meta(self, metaid: str) -> Tuple[str, str] | None:
354 metaid_uri = f'{self.base_iri}/{metaid}'
355 re_uris = self._get_objects(metaid_uri, _P_EMBODIMENT)
356 if not re_uris:
357 return None
358 re_full_uri = re_uris[0]
359 re_metaid = re_full_uri.replace(f'{self.base_iri}/', '')
360 re_po = self._get_all_po(re_full_uri)
361 starting_pages = re_po.get(_P_STARTING_PAGE, [])
362 ending_pages = re_po.get(_P_ENDING_PAGE, [])
363 starting_page = starting_pages[0] if starting_pages else None
364 ending_page = ending_pages[0] if ending_pages else None
365 pages = ''
366 if starting_page and ending_page:
367 pages = f'{starting_page}-{ending_page}'
368 elif starting_page:
369 pages = f'{starting_page}-{starting_page}'
370 elif ending_page:
371 pages = f'{ending_page}-{ending_page}'
372 return re_metaid, pages
374 def retrieve_br_info_from_meta(self, metaid: str) -> dict:
375 venue_type_strs = {
376 GraphEntity.iri_archival_document,
377 GraphEntity.iri_journal,
378 GraphEntity.iri_book,
379 GraphEntity.iri_book_series,
380 GraphEntity.iri_series,
381 GraphEntity.iri_academic_proceedings,
382 GraphEntity.iri_proceedings_series,
383 GraphEntity.iri_reference_book,
384 _T_EXPRESSION,
385 }
387 def extract_identifiers(entity_uri: str) -> list[str]:
388 identifiers = [f"omid:{entity_uri.replace(f'{self.base_iri}/', '')}"]
389 for id_uri in self._get_objects(entity_uri, _P_HAS_IDENTIFIER):
390 id_po = self._get_all_po(id_uri)
391 schemes = id_po.get(_P_USES_ID_SCHEME, [])
392 literals = id_po.get(_P_HAS_LITERAL_VALUE, [])
393 if schemes and literals:
394 scheme = schemes[0].replace(_DATACITE, '')
395 identifiers.append(f"{scheme}:{literals[0]}")
396 return identifiers
398 def check_venue(entity_uri: str) -> str | None:
399 entity_types = self._get_objects(entity_uri, _P_TYPE)
400 if any(t in venue_type_strs for t in entity_types):
401 titles = self._get_objects(entity_uri, _P_TITLE)
402 if titles:
403 venue_ids = extract_identifiers(entity_uri)
404 return f"{titles[0]} [{' '.join(venue_ids)}]"
405 return None
407 metaid_uri = f'{self.base_iri}/{metaid}' if self.base_iri not in metaid else metaid
408 po = self._get_all_po(metaid_uri)
409 res_dict: dict = {
410 'pub_date': '',
411 'type': '',
412 'page': self.retrieve_re_from_br_meta(metaid),
413 'issue': '',
414 'volume': '',
415 'venue': ''
416 }
418 pub_dates = po.get(_P_PUB_DATE, [])
419 if pub_dates:
420 res_dict['pub_date'] = pub_dates[0]
422 types = po.get(_P_TYPE, [])
423 for t in types:
424 if t != _T_EXPRESSION and t.startswith('http'):
425 res_dict['type'] = self._type_it(t)
426 break
428 seq_ids = po.get(_P_SEQ_ID, [])
429 if seq_ids:
430 entity_types = types
431 if _T_JOURNAL_ISSUE in entity_types:
432 res_dict['issue'] = seq_ids[0]
433 elif _T_JOURNAL_VOLUME in entity_types:
434 res_dict['volume'] = seq_ids[0]
436 for container_uri in po.get(_P_PART_OF, []):
437 container_po = self._get_all_po(container_uri)
438 container_types = container_po.get(_P_TYPE, [])
440 if _T_JOURNAL_ISSUE in container_types:
441 container_seqs = container_po.get(_P_SEQ_ID, [])
442 if container_seqs:
443 res_dict['issue'] = container_seqs[0]
444 elif _T_JOURNAL_VOLUME in container_types:
445 container_seqs = container_po.get(_P_SEQ_ID, [])
446 if container_seqs:
447 res_dict['volume'] = container_seqs[0]
448 else:
449 venue_str = check_venue(container_uri)
450 if venue_str:
451 res_dict['venue'] = venue_str
453 for inner_uri in container_po.get(_P_PART_OF, []):
454 inner_po = self._get_all_po(inner_uri)
455 inner_types = inner_po.get(_P_TYPE, [])
457 if _T_JOURNAL_VOLUME in inner_types:
458 inner_seqs = inner_po.get(_P_SEQ_ID, [])
459 if inner_seqs:
460 res_dict['volume'] = inner_seqs[0]
461 else:
462 venue_str = check_venue(inner_uri)
463 if venue_str:
464 res_dict['venue'] = venue_str
466 for venue_uri in inner_po.get(_P_PART_OF, []):
467 titles = self._get_objects(venue_uri, _P_TITLE)
468 if titles:
469 venue_ids = extract_identifiers(venue_uri)
470 res_dict['venue'] = f"{titles[0]} [{' '.join(venue_ids)}]"
472 return res_dict
474 _IRI_TO_TYPE = {
475 GraphEntity.iri_archival_document: 'archival document',
476 GraphEntity.iri_book: 'book',
477 GraphEntity.iri_book_chapter: 'book chapter',
478 GraphEntity.iri_part: 'book part',
479 GraphEntity.iri_expression_collection: 'book section',
480 GraphEntity.iri_book_series: 'book series',
481 GraphEntity.iri_book_set: 'book set',
482 GraphEntity.iri_data_file: 'data file',
483 GraphEntity.iri_thesis: 'dissertation',
484 GraphEntity.iri_journal: 'journal',
485 GraphEntity.iri_journal_article: 'journal article',
486 GraphEntity.iri_journal_issue: 'journal issue',
487 GraphEntity.iri_journal_volume: 'journal volume',
488 GraphEntity.iri_proceedings_paper: 'proceedings article',
489 GraphEntity.iri_academic_proceedings: 'proceedings',
490 GraphEntity.iri_reference_book: 'reference book',
491 GraphEntity.iri_reference_entry: 'reference entry',
492 GraphEntity.iri_series: 'series',
493 GraphEntity.iri_report_document: 'report',
494 GraphEntity.iri_specification_document: 'standard',
495 }
497 @staticmethod
498 def _type_it(br_type: str) -> str:
499 return ResourceFinder._IRI_TO_TYPE.get(br_type, '')
501 def retrieve_publisher_from_br_metaid(self, metaid: str):
502 metaid_uri = f'{self.base_iri}/{metaid}'
503 publisher_ar_uris: set[str] = set()
505 def find_publisher_ars(entity_uri: str) -> None:
506 for ar_uri in self._get_objects(entity_uri, _P_IS_DOC_CONTEXT_FOR):
507 roles = self._get_objects(ar_uri, _P_WITH_ROLE)
508 if _R_PUBLISHER in roles:
509 publisher_ar_uris.add(ar_uri)
511 find_publisher_ars(metaid_uri)
512 for parent_uri in self._get_objects(metaid_uri, _P_PART_OF):
513 find_publisher_ars(parent_uri)
514 for grandparent_uri in self._get_objects(parent_uri, _P_PART_OF):
515 find_publisher_ars(grandparent_uri)
517 publishers_output = []
518 for ar_uri in publisher_ar_uris:
519 pub_identifiers: List[str] = []
520 pub_name: str | None = None
521 for ra_uri in self._get_objects(ar_uri, _P_IS_HELD_BY):
522 pub_identifiers.append(ra_uri.replace(f'{self.base_iri}/', 'omid:'))
523 ra_po = self._get_all_po(ra_uri)
524 names = ra_po.get(_P_NAME, [])
525 if names:
526 pub_name = names[0]
527 for id_uri in ra_po.get(_P_HAS_IDENTIFIER, []):
528 id_po = self._get_all_po(id_uri)
529 schemes = id_po.get(_P_USES_ID_SCHEME, [])
530 literals = id_po.get(_P_HAS_LITERAL_VALUE, [])
531 if schemes and literals:
532 pub_identifiers.append(f'{schemes[0].replace(_DATACITE, "")}:{literals[0]}')
533 if pub_name is not None:
534 pub_full = f'{pub_name} [{" ".join(pub_identifiers)}]'
535 else:
536 pub_full = f'[{" ".join(pub_identifiers)}]'
537 publishers_output.append(pub_full)
538 return '; '.join(publishers_output)
540 def get_everything_about_res(self, metavals: set, identifiers: set, vvis: set, max_depth: int = 10, progress: Progress | None = None) -> None:
541 BATCH_SIZE = QLEVER_BATCH_SIZE
542 MAX_WORKERS = min(self.workers, QLEVER_MAX_WORKERS)
544 def batch_process(input_set, batch_size):
545 """Generator to split input data into smaller batches if batch_size is not None."""
546 if batch_size is None:
547 yield input_set
548 else:
549 for i in range(0, len(input_set), batch_size):
550 yield input_set[i:i + batch_size]
552 task_metavals = None
553 task_identifiers = None
554 task_vvis = None
555 if progress:
556 if metavals:
557 task_metavals = progress.add_task(
558 " [dim]Resolving OMIDs[/dim]", total=len(metavals)
559 )
560 if identifiers:
561 task_identifiers = progress.add_task(
562 " [dim]Resolving identifiers[/dim]", total=len(identifiers)
563 )
564 if vvis:
565 task_vvis = progress.add_task(
566 " [dim]Resolving VVI[/dim]", total=len(vvis)
567 )
569 max_depth_reached = 0
571 def process_batch_parallel(subjects, cur_depth, visited_subjects):
572 nonlocal max_depth_reached
573 if not subjects or (max_depth and cur_depth > max_depth):
574 return
576 new_subjects = subjects - visited_subjects
577 if not new_subjects:
578 return
580 if cur_depth > max_depth_reached:
581 max_depth_reached = cur_depth
583 visited_subjects.update(new_subjects)
585 subject_list = list(new_subjects)
586 batches = list(batch_process(subject_list, BATCH_SIZE))
587 batch_queries = []
588 ts_url = self.ts_url
590 for batch in batches:
591 query = f'''
592 SELECT ?s ?p ?o
593 WHERE {{
594 VALUES ?s {{ {' '.join([f"<{s}>" for s in batch])} }}
595 ?s ?p ?o.
596 }}'''
597 batch_queries.append(query)
599 next_subjects = set()
600 if len(batch_queries) > 1 and MAX_WORKERS > 1:
601 queries_per_worker = max(1, len(batch_queries) // MAX_WORKERS)
602 query_groups = [
603 batch_queries[i:i + queries_per_worker]
604 for i in range(0, len(batch_queries), queries_per_worker)
605 ]
606 worker = partial(execute_sparql_queries, ts_url)
607 with ProcessPoolExecutor(
608 max_workers=min(len(query_groups), MAX_WORKERS),
609 mp_context=multiprocessing.get_context('forkserver')
610 ) as executor:
611 grouped_results = list(executor.map(worker, query_groups))
612 results = [item for sublist in grouped_results for item in sublist]
613 else:
614 results = execute_sparql_queries(endpoint_url=ts_url, queries=batch_queries) if batch_queries else []
616 _skip_preds = {_P_TYPE, _P_WITH_ROLE, _P_USES_ID_SCHEME}
617 for result in results:
618 for row in result:
619 s_str = row['s']['value']
620 p_str = row['p']['value']
621 o_binding = row['o']
622 o_str = o_binding['value']
623 o_datatype = o_binding.get('datatype', '') if o_binding['type'] in ('literal', 'typed-literal') else ''
624 self.add_triple(s_str, p_str, o_str, o_datatype=o_datatype)
625 if o_binding['type'] == 'uri' and p_str not in _skip_preds:
626 next_subjects.add(o_str)
628 process_batch_parallel(next_subjects, cur_depth + 1, visited_subjects)
630 def get_initial_subjects_from_metavals(metavals):
631 """Convert metavals to a set of subjects."""
632 return {f"{self.base_iri}/{mid.replace('omid:', '')}" for mid in metavals}
634 def get_initial_subjects_from_identifiers(identifiers, progress_task=None):
635 """Convert identifiers to a set of subjects based on batch queries executed in parallel.
637 Returns:
638 tuple: (subjects set, id_to_subjects mapping)
639 - subjects: set of subject URIs found
640 - id_to_subjects: dict mapping identifier string to set of subject URIs
641 """
642 subjects = set()
643 id_to_subjects = {}
644 ts_url = self.ts_url
645 batches = list(batch_process(list(identifiers), BATCH_SIZE))
647 if not batches:
648 return subjects, id_to_subjects
650 batch_queries = []
651 batch_sizes = []
652 for batch in batches:
653 if not batch:
654 continue
656 batch_sizes.append(len(batch))
657 if self.virtuoso_full_text_search:
658 union_blocks = []
659 for identifier in batch:
660 scheme, literal = identifier.split(':', maxsplit=1)[0], identifier.split(':', maxsplit=1)[1]
661 escaped_literal = literal.replace('\\', '\\\\').replace('"', '\\"')
662 union_blocks.append(f"""
663 {{
664 ?id <{_P_HAS_LITERAL_VALUE}> "{escaped_literal}"^^<{_XSD_STRING}> .
665 ?id <{_P_USES_ID_SCHEME}> <{_DATACITE}{scheme}> .
666 ?s <{_P_HAS_IDENTIFIER}> ?id .
667 BIND("{scheme}" AS ?schemeLabel)
668 BIND("{escaped_literal}" AS ?literalLabel)
669 }}
670 """)
671 union_query = " UNION ".join(union_blocks)
672 query = f'''
673 SELECT ?s ?schemeLabel ?literalLabel WHERE {{
674 {union_query}
675 }}
676 '''
677 batch_queries.append(query)
678 else:
679 identifiers_values = []
680 for identifier in batch:
681 scheme, literal = identifier.split(':', maxsplit=1)[0], identifier.split(':', maxsplit=1)[1]
682 escaped_literal = literal.replace('\\', '\\\\').replace('"', '\\"')
683 identifiers_values.append(f'(<{_DATACITE}{scheme}> "{escaped_literal}"^^<{_XSD_STRING}>)')
684 identifiers_values_str = " ".join(identifiers_values)
685 query = f'''
686 SELECT DISTINCT ?s ?scheme ?literal WHERE {{
687 VALUES (?scheme ?literal) {{ {identifiers_values_str} }}
688 ?id <{_P_USES_ID_SCHEME}> ?scheme .
689 ?id <{_P_HAS_LITERAL_VALUE}> ?literal .
690 ?s <{_P_HAS_IDENTIFIER}> ?id .
691 }}
692 '''
693 batch_queries.append(query)
695 if len(batch_queries) > 1 and MAX_WORKERS > 1:
696 query_groups = []
697 grouped_batch_sizes = []
698 for i in range(0, len(batch_queries), QLEVER_QUERIES_PER_GROUP):
699 query_groups.append(batch_queries[i:i + QLEVER_QUERIES_PER_GROUP])
700 grouped_batch_sizes.append(sum(batch_sizes[i:i + QLEVER_QUERIES_PER_GROUP]))
701 worker = partial(execute_sparql_queries, ts_url)
702 with ProcessPoolExecutor(
703 max_workers=MAX_WORKERS,
704 mp_context=multiprocessing.get_context('forkserver')
705 ) as executor:
706 results = []
707 for idx, grouped_result in enumerate(executor.map(worker, query_groups)):
708 results.extend(grouped_result)
709 if progress and progress_task is not None:
710 progress.advance(progress_task, grouped_batch_sizes[idx])
711 elif batch_queries:
712 results = execute_sparql_queries(endpoint_url=ts_url, queries=batch_queries)
713 if progress and progress_task is not None:
714 progress.advance(progress_task, sum(batch_sizes))
715 else:
716 results = []
718 for result in results:
719 for row in result:
720 subject = str(row['s']['value'])
721 subjects.add(subject)
722 if 'schemeLabel' in row:
723 scheme = str(row['schemeLabel']['value'])
724 literal = str(row['literalLabel']['value'])
725 else:
726 scheme = str(row['scheme']['value']).replace(_DATACITE, '')
727 literal = str(row['literal']['value'])
728 identifier = f"{scheme}:{literal}"
729 if identifier not in id_to_subjects:
730 id_to_subjects[identifier] = set()
731 id_to_subjects[identifier].add(subject)
733 return subjects, id_to_subjects
735 def _build_values_queries(issue_vol_tuples, issue_no_vol_tuples, vol_only_tuples):
736 queries = []
738 for i in range(0, len(issue_vol_tuples), BATCH_SIZE):
739 chunk = issue_vol_tuples[i:i + BATCH_SIZE]
740 values_block = ' '.join(
741 f'(<{venue}> "{vol_seq}"^^<{_XSD_STRING}> "{issue_seq}"^^<{_XSD_STRING}>)'
742 for venue, vol_seq, issue_seq in chunk
743 )
744 queries.append(f'''
745 SELECT ?s WHERE {{
746 VALUES (?venueUri ?volSeq ?issSeq) {{ {values_block} }}
747 ?volume a <{_T_JOURNAL_VOLUME}> ;
748 <{_P_PART_OF}> ?venueUri ;
749 <{_P_SEQ_ID}> ?volSeq .
750 ?s a <{_T_JOURNAL_ISSUE}> ;
751 <{_P_PART_OF}> ?volume ;
752 <{_P_SEQ_ID}> ?issSeq .
753 }}
754 ''')
756 for i in range(0, len(issue_no_vol_tuples), BATCH_SIZE):
757 chunk = issue_no_vol_tuples[i:i + BATCH_SIZE]
758 values_block = ' '.join(
759 f'(<{venue}> "{issue_seq}"^^<{_XSD_STRING}>)'
760 for venue, issue_seq in chunk
761 )
762 queries.append(f'''
763 SELECT ?s WHERE {{
764 VALUES (?venueUri ?issSeq) {{ {values_block} }}
765 ?s a <{_T_JOURNAL_ISSUE}> ;
766 <{_P_PART_OF}> ?venueUri ;
767 <{_P_SEQ_ID}> ?issSeq .
768 }}
769 ''')
771 for i in range(0, len(vol_only_tuples), BATCH_SIZE):
772 chunk = vol_only_tuples[i:i + BATCH_SIZE]
773 values_block = ' '.join(
774 f'(<{venue}> "{vol_seq}"^^<{_XSD_STRING}>)'
775 for venue, vol_seq in chunk
776 )
777 queries.append(f'''
778 SELECT ?s WHERE {{
779 VALUES (?venueUri ?volSeq) {{ {values_block} }}
780 ?s a <{_T_JOURNAL_VOLUME}> ;
781 <{_P_PART_OF}> ?venueUri ;
782 <{_P_SEQ_ID}> ?volSeq .
783 }}
784 ''')
786 return queries
788 def get_initial_subjects_from_vvis(vvis, progress_task=None):
789 """Convert vvis to a set of subjects based on batched VALUES queries."""
790 subjects = set()
791 ts_url = self.ts_url
792 venue_uris_to_add = set()
793 vvis_list = list(vvis)
794 total_vvis = len(vvis_list)
796 # First pass: collect all venue IDs and prepare queries
797 all_venue_ids = set()
798 for volume, issue, venue_metaid, venue_ids_tuple in vvis_list:
799 if venue_ids_tuple:
800 all_venue_ids.update(venue_ids_tuple)
802 # Get venue subjects from identifiers with mapping
803 venue_id_to_uris = {}
804 if all_venue_ids:
805 venue_id_subjects, venue_id_to_uris = get_initial_subjects_from_identifiers(all_venue_ids)
806 subjects.update(venue_id_subjects)
808 # Second pass: collect tuples grouped by query pattern
809 issue_vol_tuples = []
810 issue_no_vol_tuples = []
811 vol_only_tuples = []
813 for volume, issue, venue_metaid, venue_ids_tuple in vvis_list:
814 venues_to_search = set()
816 if venue_metaid:
817 venues_to_search.add(venue_metaid)
819 if venue_ids_tuple:
820 for venue_id in venue_ids_tuple:
821 if venue_id in venue_id_to_uris:
822 for venue_uri in venue_id_to_uris[venue_id]:
823 if '/br/' in venue_uri:
824 venues_to_search.add(venue_uri.replace(f'{self.base_iri}/', 'omid:'))
826 for venue_metaid_to_search in venues_to_search:
827 venue_uri = f"{self.base_iri}/{venue_metaid_to_search.replace('omid:', '')}"
828 if not (issue or volume):
829 continue
830 escaped_issue = issue.replace('\\', '\\\\').replace('"', '\\"') if issue else None
831 escaped_volume = volume.replace('\\', '\\\\').replace('"', '\\"') if volume else None
833 if issue:
834 if volume:
835 issue_vol_tuples.append((venue_uri, escaped_volume, escaped_issue))
836 else:
837 issue_no_vol_tuples.append((venue_uri, escaped_issue))
838 else:
839 vol_only_tuples.append((venue_uri, escaped_volume))
841 venue_uris_to_add.add(venue_uri)
843 vvi_queries = _build_values_queries(issue_vol_tuples, issue_no_vol_tuples, vol_only_tuples)
845 # Execute batched VVI queries in parallel
846 if len(vvi_queries) > 1 and MAX_WORKERS > 1:
847 query_groups = []
848 grouped_vvi_counts = []
849 queries_per_group = max(1, len(vvi_queries) // MAX_WORKERS)
850 for i in range(0, len(vvi_queries), queries_per_group):
851 group = vvi_queries[i:i + queries_per_group]
852 query_groups.append(group)
853 vvi_count = int(total_vvis * len(group) / len(vvi_queries))
854 grouped_vvi_counts.append(max(1, vvi_count))
855 worker = partial(execute_sparql_queries, ts_url)
856 with ProcessPoolExecutor(
857 max_workers=MAX_WORKERS,
858 mp_context=multiprocessing.get_context('forkserver')
859 ) as executor:
860 results = []
861 for idx, grouped_result in enumerate(executor.map(worker, query_groups)):
862 results.extend(grouped_result)
863 if progress and progress_task is not None:
864 progress.advance(progress_task, grouped_vvi_counts[idx])
865 elif vvi_queries:
866 results = execute_sparql_queries(endpoint_url=ts_url, queries=vvi_queries)
867 if progress and progress_task is not None:
868 progress.advance(progress_task, total_vvis)
869 else:
870 results = []
871 if progress and progress_task is not None:
872 progress.advance(progress_task, total_vvis)
874 for result in results:
875 for row in result:
876 subjects.add(str(row['s']['value']))
878 subjects.update(venue_uris_to_add)
880 return subjects
882 initial_subjects = set()
884 if metavals:
885 initial_subjects.update(get_initial_subjects_from_metavals(metavals))
886 if progress and task_metavals is not None:
887 progress.advance(task_metavals, len(metavals))
888 progress.remove_task(task_metavals)
890 if identifiers:
891 id_subjects, _ = get_initial_subjects_from_identifiers(identifiers, progress_task=task_identifiers)
892 initial_subjects.update(id_subjects)
893 if progress and task_identifiers is not None:
894 progress.remove_task(task_identifiers)
896 if vvis:
897 initial_subjects.update(get_initial_subjects_from_vvis(vvis, progress_task=task_vvis))
898 if progress and task_vvis is not None:
899 progress.remove_task(task_vvis)
901 visited_subjects = set()
902 process_batch_parallel(initial_subjects, 0, visited_subjects)
904 console = Console()
905 style = "bold red" if max_depth_reached >= max_depth else "bold green"
906 console.print(f" Max traversal depth reached: {max_depth_reached}/{max_depth}", style=style)
908 def retrieve_venue_from_local_graph(self, meta_id: str) -> VenueStructure:
909 content: VenueStructure = {
910 'issue': {},
911 'volume': {}
912 }
913 venue_uri = f'{self.base_iri}/{meta_id}'
915 venue_children = self._get_subjects(_P_PART_OF, venue_uri)
916 volumes: dict[str, str] = {}
918 for child_uri in venue_children:
919 types = self._get_objects(child_uri, _P_TYPE)
920 child_id = child_uri.replace(f'{self.base_iri}/', '')
921 if _T_JOURNAL_VOLUME in types:
922 seqs = self._get_objects(child_uri, _P_SEQ_ID)
923 for seq in seqs:
924 volumes[child_uri] = seq
925 content['volume'][seq] = {'id': child_id, 'issue': {}}
926 elif _T_JOURNAL_ISSUE in types:
927 seqs = self._get_objects(child_uri, _P_SEQ_ID)
928 seq = seqs[0] if seqs else None
929 if seq:
930 content['issue'][seq] = {'id': child_id}
932 for volume_uri, volume_seq in volumes.items():
933 volume_children = self._get_subjects(_P_PART_OF, volume_uri)
934 for child_uri in volume_children:
935 types = self._get_objects(child_uri, _P_TYPE)
936 if _T_JOURNAL_ISSUE in types:
937 child_id = child_uri.replace(f'{self.base_iri}/', '')
938 seqs = self._get_objects(child_uri, _P_SEQ_ID)
939 seq = seqs[0] if seqs else None
940 if seq:
941 content['volume'][volume_seq]['issue'][seq] = {'id': child_id}
943 return content