Coverage for oc_meta / lib / finder.py: 89%

643 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-21 09:24 +0000

1# SPDX-FileCopyrightText: 2022-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5from __future__ import annotations 

6 

7import multiprocessing 

8from concurrent.futures import ProcessPoolExecutor 

9from functools import partial 

10from typing import TYPE_CHECKING, Dict, List, Tuple, TypedDict 

11 

12import orjson 

13 

14if TYPE_CHECKING: 

15 from rich.progress import Progress 

16from dateutil import parser 

17from oc_ocdm.graph.graph_entity import GraphEntity 

18from oc_ocdm.prov.prov_entity import ProvEntity 

19from oc_ocdm.support import get_resource_number 

20from triplelite import RDFTerm, TripleLite 

21from sparqlite import SPARQLClient 

22from time_agnostic_library.agnostic_entity import AgnosticEntity 

23from rich.console import Console 

24 

25from oc_meta.constants import QLEVER_BATCH_SIZE, QLEVER_MAX_WORKERS, QLEVER_QUERIES_PER_GROUP 

26from oc_meta.lib.sparql import execute_sparql_queries 

27 

28_XSD_STRING = "http://www.w3.org/2001/XMLSchema#string" 

29_RDF_TYPE = "http://www.w3.org/1999/02/22-rdf-syntax-ns#type" 

30_DATACITE = "http://purl.org/spar/datacite/" 

31 

32_P_HAS_LITERAL_VALUE = GraphEntity.iri_has_literal_value 

33_P_USES_ID_SCHEME = GraphEntity.iri_uses_identifier_scheme 

34_P_HAS_IDENTIFIER = GraphEntity.iri_has_identifier 

35_P_TITLE = GraphEntity.iri_title 

36_P_NAME = GraphEntity.iri_name 

37_P_FAMILY_NAME = GraphEntity.iri_family_name 

38_P_GIVEN_NAME = GraphEntity.iri_given_name 

39_P_IS_DOC_CONTEXT_FOR = GraphEntity.iri_is_document_context_for 

40_P_HAS_NEXT = GraphEntity.iri_has_next 

41_P_IS_HELD_BY = GraphEntity.iri_is_held_by 

42_P_WITH_ROLE = GraphEntity.iri_with_role 

43_P_EMBODIMENT = GraphEntity.iri_embodiment 

44_P_STARTING_PAGE = GraphEntity.iri_starting_page 

45_P_ENDING_PAGE = GraphEntity.iri_ending_page 

46_P_PUB_DATE = GraphEntity.iri_has_publication_date 

47_P_SEQ_ID = GraphEntity.iri_has_sequence_identifier 

48_P_PART_OF = GraphEntity.iri_part_of 

49_P_TYPE = _RDF_TYPE 

50_T_JOURNAL_VOLUME = GraphEntity.iri_journal_volume 

51_T_JOURNAL_ISSUE = GraphEntity.iri_journal_issue 

52_T_EXPRESSION = GraphEntity.iri_expression 

53_R_AUTHOR = GraphEntity.iri_author 

54_R_EDITOR = GraphEntity.iri_editor 

55_R_PUBLISHER = GraphEntity.iri_publisher 

56 

57 

58class IssueEntry(TypedDict): 

59 id: str 

60 

61 

62class VolumeEntry(TypedDict): 

63 id: str 

64 issue: Dict[str, IssueEntry] 

65 

66 

67class VenueStructure(TypedDict): 

68 issue: Dict[str, IssueEntry] 

69 volume: Dict[str, VolumeEntry] 

70 

71 

72class ResourceFinder: 

73 

74 def __init__(self, ts_url: str, base_iri: str, settings: dict = dict(), meta_config_path: str | None = None, workers: int = 1): 

75 self.ts_url = ts_url 

76 self.base_iri = base_iri[:-1] if base_iri[-1] == '/' else base_iri 

77 self.graph = TripleLite( 

78 reverse_index_predicates=frozenset(self._PO_S_INDEXED_PREDICATES) 

79 ) 

80 self.meta_config_path = meta_config_path 

81 self.meta_settings = settings 

82 self.virtuoso_full_text_search = settings['virtuoso_full_text_search'] if settings and 'virtuoso_full_text_search' in settings else False 

83 self.workers = workers 

84 

85 _PO_S_INDEXED_PREDICATES = {_P_HAS_LITERAL_VALUE, _P_HAS_IDENTIFIER, _P_PART_OF} 

86 

87 def add_triple(self, s: str, p: str, o: str, o_datatype: str = '') -> None: 

88 if o_datatype: 

89 term = RDFTerm("literal", o, o_datatype) 

90 elif o.startswith('http'): 

91 term = RDFTerm("uri", o) 

92 else: 

93 term = RDFTerm("literal", o, _XSD_STRING) 

94 self.graph.add((s, p, term)) 

95 

96 def __contains__(self, uri: str) -> bool: 

97 return self.graph.has_subject(uri) 

98 

99 def _get_objects(self, subject: str, predicate: str) -> list[str]: 

100 return [t.value for t in self.graph.objects(subject, predicate)] 

101 

102 def _get_all_po(self, subject: str) -> dict[str, list[str]]: 

103 result: dict[str, list[str]] = {} 

104 for p, o in self.graph.predicate_objects(subject): 

105 result.setdefault(p, []).append(o.value) 

106 return result 

107 

108 def _get_subjects(self, predicate: str, obj: str) -> set[str]: 

109 if predicate == _P_HAS_LITERAL_VALUE: 

110 term = RDFTerm("literal", obj, _XSD_STRING) 

111 else: 

112 term = RDFTerm("uri", obj) 

113 return set(self.graph.subjects(predicate, term)) 

114 

115 # _______________________________BR_________________________________ # 

116 

117 def _find_id_uri(self, schema: str, value: str) -> str | None: 

118 schema_uri = _DATACITE + schema 

119 for id_uri in self._get_subjects(_P_HAS_LITERAL_VALUE, value): 

120 schemes = self._get_objects(id_uri, _P_USES_ID_SCHEME) 

121 if schemes[0] == schema_uri: 

122 return id_uri 

123 return None 

124 

125 def _collect_entity_ids(self, entity_uri: str, exclude_id_uri: str | None = None) -> list[tuple[str, str]]: 

126 result: list[tuple[str, str]] = [] 

127 for id_uri in self._get_objects(entity_uri, _P_HAS_IDENTIFIER): 

128 if id_uri == exclude_id_uri: 

129 continue 

130 po = self._get_all_po(id_uri) 

131 schemes = po.get(_P_USES_ID_SCHEME, []) 

132 literals = po.get(_P_HAS_LITERAL_VALUE, []) 

133 if not schemes or not literals: 

134 raise ValueError(f"Identifier {id_uri} missing schema or literal value") 

135 full_id = f'{schemes[0].replace(_DATACITE, "")}:{literals[0]}' 

136 result.append((id_uri.replace(f'{self.base_iri}/', ''), full_id)) 

137 return result 

138 

139 def retrieve_br_from_id(self, schema: str, value: str) -> List[Tuple[str, str, list]]: 

140 id_uri = self._find_id_uri(schema, value) 

141 if not id_uri: 

142 return [] 

143 metaid_id_list = [(id_uri.replace(f'{self.base_iri}/', ''), f'{schema}:{value}')] 

144 result_list = [] 

145 for entity_uri in self._get_subjects(_P_HAS_IDENTIFIER, id_uri): 

146 title = '' 

147 titles = self._get_objects(entity_uri, _P_TITLE) 

148 if titles: 

149 title = titles[0] 

150 other_ids = self._collect_entity_ids(entity_uri, exclude_id_uri=id_uri) 

151 result_list.append((entity_uri.replace(f'{self.base_iri}/', ''), title, metaid_id_list + other_ids)) 

152 return result_list 

153 

154 def retrieve_br_from_meta(self, metaid: str) -> Tuple[str, List[Tuple[str, str]], bool]: 

155 metaid_uri = f'{self.base_iri}/{metaid}' 

156 po = self._get_all_po(metaid_uri) 

157 if not po: 

158 return "", [], False 

159 title = '' 

160 titles = po.get(_P_TITLE, []) 

161 if titles: 

162 title = titles[0] 

163 identifiers = self._collect_entity_ids(metaid_uri) 

164 return title, identifiers, True 

165 

166 # _______________________________ID_________________________________ # 

167 

168 def retrieve_metaid_from_id(self, schema: str, value: str) -> str | None: 

169 id_uri = self._find_id_uri(schema, value) 

170 if id_uri: 

171 return id_uri.replace(f'{self.base_iri}/', '') 

172 return None 

173 

174 def retrieve_metaid_from_merged_entity(self, metaid_uri: str, prov_config: str) -> str | None: 

175 ''' 

176 It looks for MetaId in the provenance. If the input entity was deleted due to a merge, this function returns the target entity. Otherwise, it returns None. 

177 

178 :params metaid_uri: a MetaId URI 

179 :type metaid_uri: str 

180 :params prov_config: the path of the configuration file required by time-agnostic-library 

181 :type prov_config: str 

182 :returns str | None: -- It returns the MetaID associated with the target entity after a merge. If there was no merge, it returns None. 

183 ''' 

184 metaval: str | None = None 

185 with open(prov_config, 'rb') as f: 

186 prov_config_dict = orjson.loads(f.read()) 

187 agnostic_meta = AgnosticEntity(res=metaid_uri, config=prov_config_dict, include_related_objects=False, include_merged_entities=False, include_reverse_relations=False) 

188 agnostic_meta_history = agnostic_meta.get_history(include_prov_metadata=True) 

189 meta_history_data = agnostic_meta_history[0][metaid_uri] 

190 if meta_history_data: 

191 meta_history_metadata = agnostic_meta_history[1][metaid_uri] 

192 penultimate_snapshot = sorted( 

193 meta_history_metadata.items(), 

194 key=lambda x: parser.parse(x[1]['generatedAtTime']).replace(tzinfo=None), 

195 reverse=True 

196 )[1][0] 

197 query_if_it_was_merged = f''' 

198 SELECT DISTINCT ?se 

199 WHERE {{ 

200 ?se a <{ProvEntity.iri_entity}>; 

201 <{ProvEntity.iri_was_derived_from}> <{penultimate_snapshot}>. 

202 }} 

203 ''' 

204 prov_endpoint = prov_config_dict['provenance']['triplestore_urls'][0] 

205 with SPARQLClient(prov_endpoint, max_retries=5, backoff_factor=5, timeout=3600) as client: 

206 results = client.query(query_if_it_was_merged)['results']['bindings'] 

207 merged_entities = [se for se in results if metaid_uri not in se['se']['value']] 

208 if merged_entities: 

209 merged_entity_uri = merged_entities[0]['se']['value'] 

210 merged_entity_uri = merged_entity_uri.split('/prov/')[0] 

211 metaval = merged_entity_uri.split('/')[-1] 

212 return metaval 

213 

214 # _______________________________RA_________________________________ # 

215 def retrieve_ra_from_meta(self, metaid: str) -> Tuple[str, List[Tuple[str, str]], bool]: 

216 metaid_uri = f'{self.base_iri}/{metaid}' 

217 po = self._get_all_po(metaid_uri) 

218 if not po: 

219 return '', [], False 

220 family_names = po.get(_P_FAMILY_NAME, []) 

221 given_names = po.get(_P_GIVEN_NAME, []) 

222 names = po.get(_P_NAME, []) 

223 full_name = self._construct_full_name( 

224 names[0] if names else '', 

225 family_names[0] if family_names else '', 

226 given_names[0] if given_names else '', 

227 ) 

228 identifiers = self._collect_entity_ids(metaid_uri) 

229 return full_name, identifiers, True 

230 

231 def retrieve_ra_from_id(self, schema: str, value: str) -> List[Tuple[str, str, list]]: 

232 id_uri = self._find_id_uri(schema, value) 

233 if not id_uri: 

234 return [] 

235 metaid_id_list: List[Tuple[str, str]] = [(id_uri.replace(f'{self.base_iri}/', ''), f'{schema}:{value}')] 

236 result_list = [] 

237 for entity_uri in self._get_subjects(_P_HAS_IDENTIFIER, id_uri): 

238 po = self._get_all_po(entity_uri) 

239 names = po.get(_P_NAME, []) 

240 family_names = po.get(_P_FAMILY_NAME, []) 

241 given_names = po.get(_P_GIVEN_NAME, []) 

242 full_name = self._construct_full_name( 

243 names[0] if names else '', 

244 family_names[0] if family_names else '', 

245 given_names[0] if given_names else '', 

246 ) 

247 other_ids = self._collect_entity_ids(entity_uri, exclude_id_uri=id_uri) 

248 result_list.append((entity_uri.replace(f'{self.base_iri}/', ''), full_name, metaid_id_list + other_ids)) 

249 return result_list 

250 

251 def _construct_full_name(self, name: str, family_name: str, given_name: str) -> str: 

252 if name and not family_name and not given_name: 

253 return name 

254 elif not name and family_name and not given_name: 

255 return f'{family_name},' 

256 elif not name and not family_name and given_name: 

257 return f', {given_name}' 

258 elif not name and family_name and given_name: 

259 return f'{family_name}, {given_name}' 

260 else: 

261 return '' 

262 

263 def retrieve_ra_sequence_from_br_meta(self, metaid: str, col_name: str) -> List[Dict[str, tuple]]: 

264 if col_name == 'author': 

265 role_str = _R_AUTHOR 

266 elif col_name == 'editor': 

267 role_str = _R_EDITOR 

268 else: 

269 role_str = _R_PUBLISHER 

270 

271 metaid_uri = f'{self.base_iri}/{metaid}' 

272 dict_ar: dict[str, dict[str, str]] = {} 

273 

274 for ar_uri in self._get_objects(metaid_uri, _P_IS_DOC_CONTEXT_FOR): 

275 ar_po = self._get_all_po(ar_uri) 

276 roles = ar_po.get(_P_WITH_ROLE, []) 

277 if role_str in roles: 

278 role_value = ar_uri.replace(f'{self.base_iri}/', '') 

279 next_list = ar_po.get(_P_HAS_NEXT, []) 

280 next_role = next_list[0].replace(f'{self.base_iri}/', '') if next_list else '' 

281 held_by = ar_po.get(_P_IS_HELD_BY, []) 

282 ra = held_by[0].replace(f'{self.base_iri}/', '') if held_by else None 

283 if ra is not None: 

284 dict_ar[role_value] = {'next': next_role, 'ra': ra} 

285 

286 all_roles = set(dict_ar.keys()) 

287 roles_with_next = set(details['next'] for details in dict_ar.values() if details['next']) 

288 start_role_candidates = all_roles - roles_with_next 

289 

290 MAX_ITERATIONS = 10000 

291 

292 if len(all_roles) == 0: 

293 return [] 

294 

295 if len(start_role_candidates) == 0: 

296 sorted_ars = sorted(all_roles, key=lambda ar: get_resource_number(f'{self.base_iri}/{ar}')) 

297 start_role_candidates = {sorted_ars[0]} 

298 

299 if len(start_role_candidates) != 1: 

300 chains = [] 

301 for start_candidate in start_role_candidates: 

302 current_role = start_candidate 

303 chain: list[dict[str, tuple]] = [] 

304 visited_roles: set[str] = set() 

305 iteration_count = 0 

306 while current_role and current_role not in visited_roles and iteration_count < MAX_ITERATIONS: 

307 visited_roles.add(current_role) 

308 if current_role in dict_ar: 

309 ra_info = self.retrieve_ra_from_meta(dict_ar[current_role]['ra'])[0:2] 

310 ra_tuple = ra_info + (dict_ar[current_role]['ra'],) 

311 chain.append({current_role: ra_tuple}) 

312 current_role = dict_ar[current_role]['next'] 

313 else: 

314 break 

315 iteration_count += 1 

316 if iteration_count == MAX_ITERATIONS: 

317 print(f"Warning: Possible infinite loop detected for BR: {metaid}, column: {col_name}") 

318 return [] 

319 chains.append(chain) 

320 chains.sort(key=lambda chain: (-len(chain), get_resource_number(f'{self.base_iri}/{list(chain[0].keys())[0]}'))) 

321 try: 

322 ordered_ar_list = chains[0] 

323 except Exception as e: 

324 print(f"\nWarning: Error processing BR: {metaid} for column: {col_name}") 

325 print(f"dict_ar: {dict_ar}") 

326 print(f"All roles: {all_roles}") 

327 print(f"Start role candidates: {start_role_candidates}") 

328 print(f"Roles with next: {roles_with_next}") 

329 print(f"Error: {str(e)}") 

330 return [] 

331 else: 

332 start_role = start_role_candidates.pop() 

333 ordered_ar_list: list[dict[str, tuple]] = [] 

334 current_role = start_role 

335 visited_roles: set[str] = set() 

336 iteration_count = 0 

337 while current_role and current_role not in visited_roles and iteration_count < MAX_ITERATIONS: 

338 visited_roles.add(current_role) 

339 if current_role in dict_ar: 

340 ra_info = self.retrieve_ra_from_meta(dict_ar[current_role]['ra'])[0:2] 

341 ra_tuple = ra_info + (dict_ar[current_role]['ra'],) 

342 ordered_ar_list.append({current_role: ra_tuple}) 

343 current_role = dict_ar[current_role]['next'] 

344 else: 

345 break 

346 iteration_count += 1 

347 if iteration_count == MAX_ITERATIONS: 

348 print(f"Warning: Possible infinite loop detected for BR: {metaid}, column: {col_name}") 

349 return [] 

350 

351 return ordered_ar_list 

352 

353 def retrieve_re_from_br_meta(self, metaid: str) -> Tuple[str, str] | None: 

354 metaid_uri = f'{self.base_iri}/{metaid}' 

355 re_uris = self._get_objects(metaid_uri, _P_EMBODIMENT) 

356 if not re_uris: 

357 return None 

358 re_full_uri = re_uris[0] 

359 re_metaid = re_full_uri.replace(f'{self.base_iri}/', '') 

360 re_po = self._get_all_po(re_full_uri) 

361 starting_pages = re_po.get(_P_STARTING_PAGE, []) 

362 ending_pages = re_po.get(_P_ENDING_PAGE, []) 

363 starting_page = starting_pages[0] if starting_pages else None 

364 ending_page = ending_pages[0] if ending_pages else None 

365 pages = '' 

366 if starting_page and ending_page: 

367 pages = f'{starting_page}-{ending_page}' 

368 elif starting_page: 

369 pages = f'{starting_page}-{starting_page}' 

370 elif ending_page: 

371 pages = f'{ending_page}-{ending_page}' 

372 return re_metaid, pages 

373 

374 def retrieve_br_info_from_meta(self, metaid: str) -> dict: 

375 venue_type_strs = { 

376 GraphEntity.iri_archival_document, 

377 GraphEntity.iri_journal, 

378 GraphEntity.iri_book, 

379 GraphEntity.iri_book_series, 

380 GraphEntity.iri_series, 

381 GraphEntity.iri_academic_proceedings, 

382 GraphEntity.iri_proceedings_series, 

383 GraphEntity.iri_reference_book, 

384 _T_EXPRESSION, 

385 } 

386 

387 def extract_identifiers(entity_uri: str) -> list[str]: 

388 identifiers = [f"omid:{entity_uri.replace(f'{self.base_iri}/', '')}"] 

389 for id_uri in self._get_objects(entity_uri, _P_HAS_IDENTIFIER): 

390 id_po = self._get_all_po(id_uri) 

391 schemes = id_po.get(_P_USES_ID_SCHEME, []) 

392 literals = id_po.get(_P_HAS_LITERAL_VALUE, []) 

393 if schemes and literals: 

394 scheme = schemes[0].replace(_DATACITE, '') 

395 identifiers.append(f"{scheme}:{literals[0]}") 

396 return identifiers 

397 

398 def check_venue(entity_uri: str) -> str | None: 

399 entity_types = self._get_objects(entity_uri, _P_TYPE) 

400 if any(t in venue_type_strs for t in entity_types): 

401 titles = self._get_objects(entity_uri, _P_TITLE) 

402 if titles: 

403 venue_ids = extract_identifiers(entity_uri) 

404 return f"{titles[0]} [{' '.join(venue_ids)}]" 

405 return None 

406 

407 metaid_uri = f'{self.base_iri}/{metaid}' if self.base_iri not in metaid else metaid 

408 po = self._get_all_po(metaid_uri) 

409 res_dict: dict = { 

410 'pub_date': '', 

411 'type': '', 

412 'page': self.retrieve_re_from_br_meta(metaid), 

413 'issue': '', 

414 'volume': '', 

415 'venue': '' 

416 } 

417 

418 pub_dates = po.get(_P_PUB_DATE, []) 

419 if pub_dates: 

420 res_dict['pub_date'] = pub_dates[0] 

421 

422 types = po.get(_P_TYPE, []) 

423 for t in types: 

424 if t != _T_EXPRESSION and t.startswith('http'): 

425 res_dict['type'] = self._type_it(t) 

426 break 

427 

428 seq_ids = po.get(_P_SEQ_ID, []) 

429 if seq_ids: 

430 entity_types = types 

431 if _T_JOURNAL_ISSUE in entity_types: 

432 res_dict['issue'] = seq_ids[0] 

433 elif _T_JOURNAL_VOLUME in entity_types: 

434 res_dict['volume'] = seq_ids[0] 

435 

436 for container_uri in po.get(_P_PART_OF, []): 

437 container_po = self._get_all_po(container_uri) 

438 container_types = container_po.get(_P_TYPE, []) 

439 

440 if _T_JOURNAL_ISSUE in container_types: 

441 container_seqs = container_po.get(_P_SEQ_ID, []) 

442 if container_seqs: 

443 res_dict['issue'] = container_seqs[0] 

444 elif _T_JOURNAL_VOLUME in container_types: 

445 container_seqs = container_po.get(_P_SEQ_ID, []) 

446 if container_seqs: 

447 res_dict['volume'] = container_seqs[0] 

448 else: 

449 venue_str = check_venue(container_uri) 

450 if venue_str: 

451 res_dict['venue'] = venue_str 

452 

453 for inner_uri in container_po.get(_P_PART_OF, []): 

454 inner_po = self._get_all_po(inner_uri) 

455 inner_types = inner_po.get(_P_TYPE, []) 

456 

457 if _T_JOURNAL_VOLUME in inner_types: 

458 inner_seqs = inner_po.get(_P_SEQ_ID, []) 

459 if inner_seqs: 

460 res_dict['volume'] = inner_seqs[0] 

461 else: 

462 venue_str = check_venue(inner_uri) 

463 if venue_str: 

464 res_dict['venue'] = venue_str 

465 

466 for venue_uri in inner_po.get(_P_PART_OF, []): 

467 titles = self._get_objects(venue_uri, _P_TITLE) 

468 if titles: 

469 venue_ids = extract_identifiers(venue_uri) 

470 res_dict['venue'] = f"{titles[0]} [{' '.join(venue_ids)}]" 

471 

472 return res_dict 

473 

474 _IRI_TO_TYPE = { 

475 GraphEntity.iri_archival_document: 'archival document', 

476 GraphEntity.iri_book: 'book', 

477 GraphEntity.iri_book_chapter: 'book chapter', 

478 GraphEntity.iri_part: 'book part', 

479 GraphEntity.iri_expression_collection: 'book section', 

480 GraphEntity.iri_book_series: 'book series', 

481 GraphEntity.iri_book_set: 'book set', 

482 GraphEntity.iri_data_file: 'data file', 

483 GraphEntity.iri_thesis: 'dissertation', 

484 GraphEntity.iri_journal: 'journal', 

485 GraphEntity.iri_journal_article: 'journal article', 

486 GraphEntity.iri_journal_issue: 'journal issue', 

487 GraphEntity.iri_journal_volume: 'journal volume', 

488 GraphEntity.iri_proceedings_paper: 'proceedings article', 

489 GraphEntity.iri_academic_proceedings: 'proceedings', 

490 GraphEntity.iri_reference_book: 'reference book', 

491 GraphEntity.iri_reference_entry: 'reference entry', 

492 GraphEntity.iri_series: 'series', 

493 GraphEntity.iri_report_document: 'report', 

494 GraphEntity.iri_specification_document: 'standard', 

495 } 

496 

497 @staticmethod 

498 def _type_it(br_type: str) -> str: 

499 return ResourceFinder._IRI_TO_TYPE.get(br_type, '') 

500 

501 def retrieve_publisher_from_br_metaid(self, metaid: str): 

502 metaid_uri = f'{self.base_iri}/{metaid}' 

503 publisher_ar_uris: set[str] = set() 

504 

505 def find_publisher_ars(entity_uri: str) -> None: 

506 for ar_uri in self._get_objects(entity_uri, _P_IS_DOC_CONTEXT_FOR): 

507 roles = self._get_objects(ar_uri, _P_WITH_ROLE) 

508 if _R_PUBLISHER in roles: 

509 publisher_ar_uris.add(ar_uri) 

510 

511 find_publisher_ars(metaid_uri) 

512 for parent_uri in self._get_objects(metaid_uri, _P_PART_OF): 

513 find_publisher_ars(parent_uri) 

514 for grandparent_uri in self._get_objects(parent_uri, _P_PART_OF): 

515 find_publisher_ars(grandparent_uri) 

516 

517 publishers_output = [] 

518 for ar_uri in publisher_ar_uris: 

519 pub_identifiers: List[str] = [] 

520 pub_name: str | None = None 

521 for ra_uri in self._get_objects(ar_uri, _P_IS_HELD_BY): 

522 pub_identifiers.append(ra_uri.replace(f'{self.base_iri}/', 'omid:')) 

523 ra_po = self._get_all_po(ra_uri) 

524 names = ra_po.get(_P_NAME, []) 

525 if names: 

526 pub_name = names[0] 

527 for id_uri in ra_po.get(_P_HAS_IDENTIFIER, []): 

528 id_po = self._get_all_po(id_uri) 

529 schemes = id_po.get(_P_USES_ID_SCHEME, []) 

530 literals = id_po.get(_P_HAS_LITERAL_VALUE, []) 

531 if schemes and literals: 

532 pub_identifiers.append(f'{schemes[0].replace(_DATACITE, "")}:{literals[0]}') 

533 if pub_name is not None: 

534 pub_full = f'{pub_name} [{" ".join(pub_identifiers)}]' 

535 else: 

536 pub_full = f'[{" ".join(pub_identifiers)}]' 

537 publishers_output.append(pub_full) 

538 return '; '.join(publishers_output) 

539 

540 def get_everything_about_res(self, metavals: set, identifiers: set, vvis: set, max_depth: int = 10, progress: Progress | None = None) -> None: 

541 BATCH_SIZE = QLEVER_BATCH_SIZE 

542 MAX_WORKERS = min(self.workers, QLEVER_MAX_WORKERS) 

543 

544 def batch_process(input_set, batch_size): 

545 """Generator to split input data into smaller batches if batch_size is not None.""" 

546 if batch_size is None: 

547 yield input_set 

548 else: 

549 for i in range(0, len(input_set), batch_size): 

550 yield input_set[i:i + batch_size] 

551 

552 task_metavals = None 

553 task_identifiers = None 

554 task_vvis = None 

555 if progress: 

556 if metavals: 

557 task_metavals = progress.add_task( 

558 " [dim]Resolving OMIDs[/dim]", total=len(metavals) 

559 ) 

560 if identifiers: 

561 task_identifiers = progress.add_task( 

562 " [dim]Resolving identifiers[/dim]", total=len(identifiers) 

563 ) 

564 if vvis: 

565 task_vvis = progress.add_task( 

566 " [dim]Resolving VVI[/dim]", total=len(vvis) 

567 ) 

568 

569 max_depth_reached = 0 

570 

571 def process_batch_parallel(subjects, cur_depth, visited_subjects): 

572 nonlocal max_depth_reached 

573 if not subjects or (max_depth and cur_depth > max_depth): 

574 return 

575 

576 new_subjects = subjects - visited_subjects 

577 if not new_subjects: 

578 return 

579 

580 if cur_depth > max_depth_reached: 

581 max_depth_reached = cur_depth 

582 

583 visited_subjects.update(new_subjects) 

584 

585 subject_list = list(new_subjects) 

586 batches = list(batch_process(subject_list, BATCH_SIZE)) 

587 batch_queries = [] 

588 ts_url = self.ts_url 

589 

590 for batch in batches: 

591 query = f''' 

592 SELECT ?s ?p ?o 

593 WHERE {{ 

594 VALUES ?s {{ {' '.join([f"<{s}>" for s in batch])} }} 

595 ?s ?p ?o. 

596 }}''' 

597 batch_queries.append(query) 

598 

599 next_subjects = set() 

600 if len(batch_queries) > 1 and MAX_WORKERS > 1: 

601 queries_per_worker = max(1, len(batch_queries) // MAX_WORKERS) 

602 query_groups = [ 

603 batch_queries[i:i + queries_per_worker] 

604 for i in range(0, len(batch_queries), queries_per_worker) 

605 ] 

606 worker = partial(execute_sparql_queries, ts_url) 

607 with ProcessPoolExecutor( 

608 max_workers=min(len(query_groups), MAX_WORKERS), 

609 mp_context=multiprocessing.get_context('forkserver') 

610 ) as executor: 

611 grouped_results = list(executor.map(worker, query_groups)) 

612 results = [item for sublist in grouped_results for item in sublist] 

613 else: 

614 results = execute_sparql_queries(endpoint_url=ts_url, queries=batch_queries) if batch_queries else [] 

615 

616 _skip_preds = {_P_TYPE, _P_WITH_ROLE, _P_USES_ID_SCHEME} 

617 for result in results: 

618 for row in result: 

619 s_str = row['s']['value'] 

620 p_str = row['p']['value'] 

621 o_binding = row['o'] 

622 o_str = o_binding['value'] 

623 o_datatype = o_binding.get('datatype', '') if o_binding['type'] in ('literal', 'typed-literal') else '' 

624 self.add_triple(s_str, p_str, o_str, o_datatype=o_datatype) 

625 if o_binding['type'] == 'uri' and p_str not in _skip_preds: 

626 next_subjects.add(o_str) 

627 

628 process_batch_parallel(next_subjects, cur_depth + 1, visited_subjects) 

629 

630 def get_initial_subjects_from_metavals(metavals): 

631 """Convert metavals to a set of subjects.""" 

632 return {f"{self.base_iri}/{mid.replace('omid:', '')}" for mid in metavals} 

633 

634 def get_initial_subjects_from_identifiers(identifiers, progress_task=None): 

635 """Convert identifiers to a set of subjects based on batch queries executed in parallel. 

636 

637 Returns: 

638 tuple: (subjects set, id_to_subjects mapping) 

639 - subjects: set of subject URIs found 

640 - id_to_subjects: dict mapping identifier string to set of subject URIs 

641 """ 

642 subjects = set() 

643 id_to_subjects = {} 

644 ts_url = self.ts_url 

645 batches = list(batch_process(list(identifiers), BATCH_SIZE)) 

646 

647 if not batches: 

648 return subjects, id_to_subjects 

649 

650 batch_queries = [] 

651 batch_sizes = [] 

652 for batch in batches: 

653 if not batch: 

654 continue 

655 

656 batch_sizes.append(len(batch)) 

657 if self.virtuoso_full_text_search: 

658 union_blocks = [] 

659 for identifier in batch: 

660 scheme, literal = identifier.split(':', maxsplit=1)[0], identifier.split(':', maxsplit=1)[1] 

661 escaped_literal = literal.replace('\\', '\\\\').replace('"', '\\"') 

662 union_blocks.append(f""" 

663 {{ 

664 ?id <{_P_HAS_LITERAL_VALUE}> "{escaped_literal}"^^<{_XSD_STRING}> . 

665 ?id <{_P_USES_ID_SCHEME}> <{_DATACITE}{scheme}> . 

666 ?s <{_P_HAS_IDENTIFIER}> ?id . 

667 BIND("{scheme}" AS ?schemeLabel) 

668 BIND("{escaped_literal}" AS ?literalLabel) 

669 }} 

670 """) 

671 union_query = " UNION ".join(union_blocks) 

672 query = f''' 

673 SELECT ?s ?schemeLabel ?literalLabel WHERE {{ 

674 {union_query} 

675 }} 

676 ''' 

677 batch_queries.append(query) 

678 else: 

679 identifiers_values = [] 

680 for identifier in batch: 

681 scheme, literal = identifier.split(':', maxsplit=1)[0], identifier.split(':', maxsplit=1)[1] 

682 escaped_literal = literal.replace('\\', '\\\\').replace('"', '\\"') 

683 identifiers_values.append(f'(<{_DATACITE}{scheme}> "{escaped_literal}"^^<{_XSD_STRING}>)') 

684 identifiers_values_str = " ".join(identifiers_values) 

685 query = f''' 

686 SELECT DISTINCT ?s ?scheme ?literal WHERE {{ 

687 VALUES (?scheme ?literal) {{ {identifiers_values_str} }} 

688 ?id <{_P_USES_ID_SCHEME}> ?scheme . 

689 ?id <{_P_HAS_LITERAL_VALUE}> ?literal . 

690 ?s <{_P_HAS_IDENTIFIER}> ?id . 

691 }} 

692 ''' 

693 batch_queries.append(query) 

694 

695 if len(batch_queries) > 1 and MAX_WORKERS > 1: 

696 query_groups = [] 

697 grouped_batch_sizes = [] 

698 for i in range(0, len(batch_queries), QLEVER_QUERIES_PER_GROUP): 

699 query_groups.append(batch_queries[i:i + QLEVER_QUERIES_PER_GROUP]) 

700 grouped_batch_sizes.append(sum(batch_sizes[i:i + QLEVER_QUERIES_PER_GROUP])) 

701 worker = partial(execute_sparql_queries, ts_url) 

702 with ProcessPoolExecutor( 

703 max_workers=MAX_WORKERS, 

704 mp_context=multiprocessing.get_context('forkserver') 

705 ) as executor: 

706 results = [] 

707 for idx, grouped_result in enumerate(executor.map(worker, query_groups)): 

708 results.extend(grouped_result) 

709 if progress and progress_task is not None: 

710 progress.advance(progress_task, grouped_batch_sizes[idx]) 

711 elif batch_queries: 

712 results = execute_sparql_queries(endpoint_url=ts_url, queries=batch_queries) 

713 if progress and progress_task is not None: 

714 progress.advance(progress_task, sum(batch_sizes)) 

715 else: 

716 results = [] 

717 

718 for result in results: 

719 for row in result: 

720 subject = str(row['s']['value']) 

721 subjects.add(subject) 

722 if 'schemeLabel' in row: 

723 scheme = str(row['schemeLabel']['value']) 

724 literal = str(row['literalLabel']['value']) 

725 else: 

726 scheme = str(row['scheme']['value']).replace(_DATACITE, '') 

727 literal = str(row['literal']['value']) 

728 identifier = f"{scheme}:{literal}" 

729 if identifier not in id_to_subjects: 

730 id_to_subjects[identifier] = set() 

731 id_to_subjects[identifier].add(subject) 

732 

733 return subjects, id_to_subjects 

734 

735 def _build_values_queries(issue_vol_tuples, issue_no_vol_tuples, vol_only_tuples): 

736 queries = [] 

737 

738 for i in range(0, len(issue_vol_tuples), BATCH_SIZE): 

739 chunk = issue_vol_tuples[i:i + BATCH_SIZE] 

740 values_block = ' '.join( 

741 f'(<{venue}> "{vol_seq}"^^<{_XSD_STRING}> "{issue_seq}"^^<{_XSD_STRING}>)' 

742 for venue, vol_seq, issue_seq in chunk 

743 ) 

744 queries.append(f''' 

745 SELECT ?s WHERE {{ 

746 VALUES (?venueUri ?volSeq ?issSeq) {{ {values_block} }} 

747 ?volume a <{_T_JOURNAL_VOLUME}> ; 

748 <{_P_PART_OF}> ?venueUri ; 

749 <{_P_SEQ_ID}> ?volSeq . 

750 ?s a <{_T_JOURNAL_ISSUE}> ; 

751 <{_P_PART_OF}> ?volume ; 

752 <{_P_SEQ_ID}> ?issSeq . 

753 }} 

754 ''') 

755 

756 for i in range(0, len(issue_no_vol_tuples), BATCH_SIZE): 

757 chunk = issue_no_vol_tuples[i:i + BATCH_SIZE] 

758 values_block = ' '.join( 

759 f'(<{venue}> "{issue_seq}"^^<{_XSD_STRING}>)' 

760 for venue, issue_seq in chunk 

761 ) 

762 queries.append(f''' 

763 SELECT ?s WHERE {{ 

764 VALUES (?venueUri ?issSeq) {{ {values_block} }} 

765 ?s a <{_T_JOURNAL_ISSUE}> ; 

766 <{_P_PART_OF}> ?venueUri ; 

767 <{_P_SEQ_ID}> ?issSeq . 

768 }} 

769 ''') 

770 

771 for i in range(0, len(vol_only_tuples), BATCH_SIZE): 

772 chunk = vol_only_tuples[i:i + BATCH_SIZE] 

773 values_block = ' '.join( 

774 f'(<{venue}> "{vol_seq}"^^<{_XSD_STRING}>)' 

775 for venue, vol_seq in chunk 

776 ) 

777 queries.append(f''' 

778 SELECT ?s WHERE {{ 

779 VALUES (?venueUri ?volSeq) {{ {values_block} }} 

780 ?s a <{_T_JOURNAL_VOLUME}> ; 

781 <{_P_PART_OF}> ?venueUri ; 

782 <{_P_SEQ_ID}> ?volSeq . 

783 }} 

784 ''') 

785 

786 return queries 

787 

788 def get_initial_subjects_from_vvis(vvis, progress_task=None): 

789 """Convert vvis to a set of subjects based on batched VALUES queries.""" 

790 subjects = set() 

791 ts_url = self.ts_url 

792 venue_uris_to_add = set() 

793 vvis_list = list(vvis) 

794 total_vvis = len(vvis_list) 

795 

796 # First pass: collect all venue IDs and prepare queries 

797 all_venue_ids = set() 

798 for volume, issue, venue_metaid, venue_ids_tuple in vvis_list: 

799 if venue_ids_tuple: 

800 all_venue_ids.update(venue_ids_tuple) 

801 

802 # Get venue subjects from identifiers with mapping 

803 venue_id_to_uris = {} 

804 if all_venue_ids: 

805 venue_id_subjects, venue_id_to_uris = get_initial_subjects_from_identifiers(all_venue_ids) 

806 subjects.update(venue_id_subjects) 

807 

808 # Second pass: collect tuples grouped by query pattern 

809 issue_vol_tuples = [] 

810 issue_no_vol_tuples = [] 

811 vol_only_tuples = [] 

812 

813 for volume, issue, venue_metaid, venue_ids_tuple in vvis_list: 

814 venues_to_search = set() 

815 

816 if venue_metaid: 

817 venues_to_search.add(venue_metaid) 

818 

819 if venue_ids_tuple: 

820 for venue_id in venue_ids_tuple: 

821 if venue_id in venue_id_to_uris: 

822 for venue_uri in venue_id_to_uris[venue_id]: 

823 if '/br/' in venue_uri: 

824 venues_to_search.add(venue_uri.replace(f'{self.base_iri}/', 'omid:')) 

825 

826 for venue_metaid_to_search in venues_to_search: 

827 venue_uri = f"{self.base_iri}/{venue_metaid_to_search.replace('omid:', '')}" 

828 if not (issue or volume): 

829 continue 

830 escaped_issue = issue.replace('\\', '\\\\').replace('"', '\\"') if issue else None 

831 escaped_volume = volume.replace('\\', '\\\\').replace('"', '\\"') if volume else None 

832 

833 if issue: 

834 if volume: 

835 issue_vol_tuples.append((venue_uri, escaped_volume, escaped_issue)) 

836 else: 

837 issue_no_vol_tuples.append((venue_uri, escaped_issue)) 

838 else: 

839 vol_only_tuples.append((venue_uri, escaped_volume)) 

840 

841 venue_uris_to_add.add(venue_uri) 

842 

843 vvi_queries = _build_values_queries(issue_vol_tuples, issue_no_vol_tuples, vol_only_tuples) 

844 

845 # Execute batched VVI queries in parallel 

846 if len(vvi_queries) > 1 and MAX_WORKERS > 1: 

847 query_groups = [] 

848 grouped_vvi_counts = [] 

849 queries_per_group = max(1, len(vvi_queries) // MAX_WORKERS) 

850 for i in range(0, len(vvi_queries), queries_per_group): 

851 group = vvi_queries[i:i + queries_per_group] 

852 query_groups.append(group) 

853 vvi_count = int(total_vvis * len(group) / len(vvi_queries)) 

854 grouped_vvi_counts.append(max(1, vvi_count)) 

855 worker = partial(execute_sparql_queries, ts_url) 

856 with ProcessPoolExecutor( 

857 max_workers=MAX_WORKERS, 

858 mp_context=multiprocessing.get_context('forkserver') 

859 ) as executor: 

860 results = [] 

861 for idx, grouped_result in enumerate(executor.map(worker, query_groups)): 

862 results.extend(grouped_result) 

863 if progress and progress_task is not None: 

864 progress.advance(progress_task, grouped_vvi_counts[idx]) 

865 elif vvi_queries: 

866 results = execute_sparql_queries(endpoint_url=ts_url, queries=vvi_queries) 

867 if progress and progress_task is not None: 

868 progress.advance(progress_task, total_vvis) 

869 else: 

870 results = [] 

871 if progress and progress_task is not None: 

872 progress.advance(progress_task, total_vvis) 

873 

874 for result in results: 

875 for row in result: 

876 subjects.add(str(row['s']['value'])) 

877 

878 subjects.update(venue_uris_to_add) 

879 

880 return subjects 

881 

882 initial_subjects = set() 

883 

884 if metavals: 

885 initial_subjects.update(get_initial_subjects_from_metavals(metavals)) 

886 if progress and task_metavals is not None: 

887 progress.advance(task_metavals, len(metavals)) 

888 progress.remove_task(task_metavals) 

889 

890 if identifiers: 

891 id_subjects, _ = get_initial_subjects_from_identifiers(identifiers, progress_task=task_identifiers) 

892 initial_subjects.update(id_subjects) 

893 if progress and task_identifiers is not None: 

894 progress.remove_task(task_identifiers) 

895 

896 if vvis: 

897 initial_subjects.update(get_initial_subjects_from_vvis(vvis, progress_task=task_vvis)) 

898 if progress and task_vvis is not None: 

899 progress.remove_task(task_vvis) 

900 

901 visited_subjects = set() 

902 process_batch_parallel(initial_subjects, 0, visited_subjects) 

903 

904 console = Console() 

905 style = "bold red" if max_depth_reached >= max_depth else "bold green" 

906 console.print(f" Max traversal depth reached: {max_depth_reached}/{max_depth}", style=style) 

907 

908 def retrieve_venue_from_local_graph(self, meta_id: str) -> VenueStructure: 

909 content: VenueStructure = { 

910 'issue': {}, 

911 'volume': {} 

912 } 

913 venue_uri = f'{self.base_iri}/{meta_id}' 

914 

915 venue_children = self._get_subjects(_P_PART_OF, venue_uri) 

916 volumes: dict[str, str] = {} 

917 

918 for child_uri in venue_children: 

919 types = self._get_objects(child_uri, _P_TYPE) 

920 child_id = child_uri.replace(f'{self.base_iri}/', '') 

921 if _T_JOURNAL_VOLUME in types: 

922 seqs = self._get_objects(child_uri, _P_SEQ_ID) 

923 for seq in seqs: 

924 volumes[child_uri] = seq 

925 content['volume'][seq] = {'id': child_id, 'issue': {}} 

926 elif _T_JOURNAL_ISSUE in types: 

927 seqs = self._get_objects(child_uri, _P_SEQ_ID) 

928 seq = seqs[0] if seqs else None 

929 if seq: 

930 content['issue'][seq] = {'id': child_id} 

931 

932 for volume_uri, volume_seq in volumes.items(): 

933 volume_children = self._get_subjects(_P_PART_OF, volume_uri) 

934 for child_uri in volume_children: 

935 types = self._get_objects(child_uri, _P_TYPE) 

936 if _T_JOURNAL_ISSUE in types: 

937 child_id = child_uri.replace(f'{self.base_iri}/', '') 

938 seqs = self._get_objects(child_uri, _P_SEQ_ID) 

939 seq = seqs[0] if seqs else None 

940 if seq: 

941 content['volume'][volume_seq]['issue'][seq] = {'id': child_id} 

942 

943 return content