Coverage for oc_meta/lib/finder.py: 88%
656 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-12-20 08:55 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-12-20 08:55 +0000
1import multiprocessing
2import os
3from concurrent.futures import ProcessPoolExecutor
4from typing import Dict, List, Tuple
6import yaml
7from dateutil import parser
8from oc_ocdm.graph import GraphEntity
9from oc_ocdm.graph.graph_entity import GraphEntity
10from oc_ocdm.prov.prov_entity import ProvEntity
11from oc_ocdm.support import get_count, get_resource_number
12from rdflib import RDF, XSD, Graph, Literal, URIRef
13from sparqlite import SPARQLClient
14from time_agnostic_library.agnostic_entity import AgnosticEntity
17def _execute_sparql_queries(args: tuple) -> list:
18 ts_url, queries = args
19 results = []
20 with SPARQLClient(ts_url, max_retries=5, backoff_factor=5, timeout=3600) as client:
21 for query in queries:
22 result = client.query(query)
23 results.append(result['results']['bindings'] if result else [])
24 return results
27class ResourceFinder:
29 def __init__(self, ts_url, base_iri: str, local_g: Graph = Graph(), settings: dict = dict(), meta_config_path: str = None):
30 self.ts_url = ts_url
31 self.base_iri = base_iri[:-1] if base_iri[-1] == '/' else base_iri
32 self.local_g = local_g
33 self.prebuilt_subgraphs = {}
34 self.ids_in_local_g = set()
35 self.meta_config_path = meta_config_path
36 self.meta_settings = settings
37 self.virtuoso_full_text_search = settings['virtuoso_full_text_search'] if settings and 'virtuoso_full_text_search' in settings else False
39 # _______________________________BR_________________________________ #
41 def retrieve_br_from_id(self, schema: str, value: str) -> List[Tuple[str, str, list]]:
42 '''
43 Given an identifier, it retrieves bibliographic resources associated with that identifier, related titles and other identifiers MetaIDs and literal values.
45 :params schema: an identifier schema
46 :type schema: str
47 :params value: an identifier literal value
48 :type value: str
49 :returns List[Tuple[str, str, list]]: -- it returns a list of three elements tuples. The first element is the MetaID of a resource associated with the input ID. The second element is a title of that resource, if present. The third element is a list of MetaID-ID tuples related to identifiers associated with that resource.
50 '''
51 schema_uri = URIRef(GraphEntity.DATACITE + schema)
52 value = value.replace('\\', '\\\\')
53 result_list = []
54 identifier_uri = None
56 # Search for both string-typed and untyped literals
57 for literal_value in [Literal(value, datatype=XSD.string), Literal(value)]:
58 for starting_triple in self.local_g.triples((None, GraphEntity.iri_has_literal_value, literal_value)):
59 for known_id_triple in self.local_g.triples((starting_triple[0], None, None)):
60 if known_id_triple[1] == GraphEntity.iri_uses_identifier_scheme and known_id_triple[2] == schema_uri:
61 identifier_uri = known_id_triple[0]
62 if identifier_uri:
63 break
64 if identifier_uri:
65 metaid_id_list = [(identifier_uri.replace(f'{self.base_iri}/id/', ''), f'{schema}:{value}')]
66 for triple in self.local_g.triples((None, GraphEntity.iri_has_identifier, identifier_uri)):
67 title = ''
68 res = triple[0]
69 for res_triple in self.local_g.triples((res, None, None)):
70 if res_triple[1] == GraphEntity.iri_title:
71 title = str(res_triple[2])
72 elif res_triple[1] == GraphEntity.iri_has_identifier and res_triple[2] != identifier_uri:
73 for id_triple in self.local_g.triples((res_triple[2], None, None)):
74 if id_triple[1] == GraphEntity.iri_uses_identifier_scheme:
75 id_schema = id_triple[2]
76 elif id_triple[1] == GraphEntity.iri_has_literal_value:
77 id_literal_value = id_triple[2]
78 full_id = f'{id_schema.replace(GraphEntity.DATACITE, "")}:{id_literal_value}'
79 metaid_id_tuple = (res_triple[2].replace(f'{self.base_iri}/id/', ''), full_id)
80 metaid_id_list.append(metaid_id_tuple)
81 result_list.append((res.replace(f'{self.base_iri}/br/', ''), title, metaid_id_list))
83 return result_list
85 def retrieve_br_from_meta(self, metaid: str) -> Tuple[str, List[Tuple[str, str]]]:
86 '''
87 Given a MetaID, it retrieves the title of the bibliographic resource having that MetaID and other identifiers of that entity.
89 :params metaid: a MetaID
90 :type metaid: str
91 :returns Tuple[str, List[Tuple[str, str]]]: -- it returns a tuple of two elements. The first element is the resource's title associated with the input MetaID. The second element is a list of MetaID-ID tuples related to identifiers associated with that entity.
92 '''
93 metaid_uri = f'{self.base_iri}/br/{metaid}'
94 title = ''
95 identifiers = []
96 it_exists = False
98 for triple in self.local_g.triples((URIRef(metaid_uri), None, None)):
99 it_exists = True
100 if triple[1] == GraphEntity.iri_title:
101 title = str(triple[2])
102 elif triple[1] == GraphEntity.iri_has_identifier:
103 id_scheme = ''
104 literal_value = ''
105 identifier = triple[2]
106 for triple_inner in self.local_g.triples((identifier, None, None)):
107 if triple_inner[1] == GraphEntity.iri_uses_identifier_scheme:
108 id_scheme = str(triple_inner[2]).replace(GraphEntity.DATACITE, '')
109 elif triple_inner[1] == GraphEntity.iri_has_literal_value:
110 literal_value = str(triple_inner[2])
111 if id_scheme and literal_value: # Ensure both id_scheme and literal_value are found before appending
112 full_id = f'{id_scheme}:{literal_value}'
113 identifiers.append((str(identifier).replace(self.base_iri + '/id/', ''), full_id))
115 if not it_exists:
116 return "", [], False
118 return title, identifiers, True
120 # _______________________________ID_________________________________ #
122 def retrieve_metaid_from_id(self, schema: str, value: str) -> str:
123 '''
124 Given the schema and value of an ID, it returns the MetaID associated with that identifier.
126 :params schema: an identifier schema
127 :type schema: str
128 :params value: an identifier literal value
129 :type value: str
130 :returns str: -- it returns the MetaID associated with the input ID.
131 '''
132 schema_uri = URIRef(GraphEntity.DATACITE + schema)
133 value = value.replace('\\', '\\\\')
135 # Create both untyped and string-typed literals
136 for literal in [Literal(value, datatype=XSD.string), Literal(value)]:
137 for starting_triple in self.local_g.triples((None, GraphEntity.iri_has_literal_value, literal)):
138 for known_id_triple in self.local_g.triples((starting_triple[0], None, None)):
139 if known_id_triple[1] == GraphEntity.iri_uses_identifier_scheme and known_id_triple[2] == schema_uri:
140 return known_id_triple[0].replace(f'{self.base_iri}/id/', '')
142 # If no match is found, return None or an appropriate value
143 return None
145 def retrieve_metaid_from_merged_entity(self, metaid_uri:str, prov_config:str) -> str:
146 '''
147 It looks for MetaId in the provenance. If the input entity was deleted due to a merge, this function returns the target entity. Otherwise, it returns None.
149 :params metaid_uri: a MetaId URI
150 :type metaid_uri: str
151 :params prov_config: the path of the configuration file required by time-agnostic-library
152 :type prov_config: str
153 :returns str: -- It returns the MetaID associated with the target entity after a merge. If there was no merge, it returns None.
154 '''
155 metaval = None
156 with open(prov_config, 'r', encoding='utf8') as f:
157 prov_config_dict = yaml.safe_load(f)
158 agnostic_meta = AgnosticEntity(res=metaid_uri, config=prov_config_dict, include_related_objects=False, include_merged_entities=False, include_reverse_relations=False)
159 agnostic_meta_history = agnostic_meta.get_history(include_prov_metadata=True)
160 meta_history_data = agnostic_meta_history[0][metaid_uri]
161 if meta_history_data:
162 meta_history_metadata = agnostic_meta_history[1][metaid_uri]
163 penultimate_snapshot = sorted(
164 meta_history_metadata.items(),
165 key=lambda x: parser.parse(x[1]['generatedAtTime']).replace(tzinfo=None),
166 reverse=True
167 )[1][0]
168 query_if_it_was_merged = f'''
169 SELECT DISTINCT ?se
170 WHERE {{
171 ?se a <{ProvEntity.iri_entity}>;
172 <{ProvEntity.iri_was_derived_from}> <{penultimate_snapshot}>.
173 }}
174 '''
175 with SPARQLClient(self.ts_url, max_retries=5, backoff_factor=5, timeout=3600) as client:
176 results = client.query(query_if_it_was_merged)['results']['bindings']
177 # The entity was merged to another
178 merged_entity = [se for se in results if metaid_uri not in se['se']['value']]
179 if merged_entity:
180 merged_entity:str = merged_entity[0]['se']['value']
181 merged_entity = merged_entity.split('/prov/')[0]
182 merged_entity = get_count(merged_entity)
183 metaval = merged_entity
184 return metaval
186 # _______________________________RA_________________________________ #
187 def retrieve_ra_from_meta(self, metaid: str) -> Tuple[str, List[Tuple[str, str]]]:
188 '''
189 Given a MetaID, it retrieves the name and id of the responsible agent associated with it, whether it is an author or a publisher.
190 The output has the following format:
192 ('NAME', [('METAID_OF_THE_IDENTIFIER', 'LITERAL_VALUE')])
193 ('American Medical Association (ama)', [('4274', 'crossref:10')])
195 :params metaid: a responsible agent's MetaID
196 :type metaid: str
197 :returns str: -- it returns a tuple, where the first element is the responsible agent's name, and the second element is a list containing its identifier's MetaID and literal value
198 '''
199 metaid_uri = f'{self.base_iri}/ra/{metaid}'
200 family_name = ''
201 given_name = ''
202 name = ''
203 identifiers = []
204 it_exists = False
206 for triple in self.local_g.triples((URIRef(metaid_uri), None, None)):
207 it_exists = True
208 if triple[1] == GraphEntity.iri_family_name:
209 family_name = str(triple[2])
210 elif triple[1] == GraphEntity.iri_given_name:
211 given_name = str(triple[2])
212 elif triple[1] == GraphEntity.iri_name:
213 name = str(triple[2])
214 elif triple[1] == GraphEntity.iri_has_identifier:
215 identifier = triple[2]
216 id_scheme = ''
217 literal_value = ''
218 for triple_inner in self.local_g.triples((identifier, None, None)):
219 if triple_inner[1] == GraphEntity.iri_uses_identifier_scheme:
220 id_scheme = str(triple_inner[2]).replace(GraphEntity.DATACITE, '')
221 elif triple_inner[1] == GraphEntity.iri_has_literal_value:
222 literal_value = str(triple_inner[2])
223 if id_scheme and literal_value:
224 full_id = f'{id_scheme}:{literal_value}'
225 identifiers.append((str(identifier).replace(self.base_iri + '/id/', ''), full_id))
227 full_name = self._construct_full_name(name, family_name, given_name)
229 return full_name, identifiers, it_exists
231 def retrieve_ra_from_id(self, schema: str, value: str, publisher: bool) -> List[Tuple[str, str, list]]:
232 '''
233 Given an identifier, it retrieves responsible agents associated with that identifier, related names and other identifiers MetaIDs and literal values.
234 The output has the following format: ::
236 [(METAID, NAME, [(METAID_OF_THE_IDENTIFIER, LITERAL_VALUE)])]
237 [('3309', 'American Medical Association (ama)', [('4274', 'crossref:10')])]
239 :params schema: an identifier schema
240 :type schema: str
241 :params value: an identifier literal value
242 :type value: str
243 :params publisher: True if the identifier is associated with a publisher, False otherwise.
244 :type publisher: bool
245 :returns List[Tuple[str, str, list]]: -- it returns a list of three elements tuples. The first element is the MetaID of a responsible agent associated with the input ID. The second element is the name of that responsible agent, if present. The third element is a list of MetaID-ID tuples related to identifiers associated with that responsible agent.
246 '''
247 schema_uri = URIRef(GraphEntity.DATACITE + schema)
248 value = value.replace('\\', '\\\\')
249 result_list = []
250 identifier_uri = None
252 # Search for both string-typed and untyped literals
253 for literal_value in [Literal(value, datatype=XSD.string), Literal(value)]:
254 for starting_triple in self.local_g.triples((None, GraphEntity.iri_has_literal_value, literal_value)):
255 for known_id_triple in self.local_g.triples((starting_triple[0], None, None)):
256 if known_id_triple[1] == GraphEntity.iri_uses_identifier_scheme and known_id_triple[2] == schema_uri:
257 identifier_uri = known_id_triple[0]
258 break
259 if identifier_uri:
260 break
261 if identifier_uri:
262 metaid_id_list = [(identifier_uri.replace(f'{self.base_iri}/id/', ''), f'{schema}:{value}')]
263 for triple in self.local_g.triples((None, GraphEntity.iri_has_identifier, identifier_uri)):
264 name = ''
265 family_name = ''
266 given_name = ''
267 res = triple[0]
268 for res_triple in self.local_g.triples((res, None, None)):
269 if res_triple[1] == GraphEntity.iri_name:
270 name = str(res_triple[2])
271 elif res_triple[1] == GraphEntity.iri_family_name:
272 family_name = str(res_triple[2])
273 elif res_triple[1] == GraphEntity.iri_given_name:
274 given_name = str(res_triple[2])
275 elif res_triple[1] == GraphEntity.iri_has_identifier and res_triple[2] != identifier_uri:
276 for id_triple in self.local_g.triples((res_triple[2], None, None)):
277 if id_triple[1] == GraphEntity.iri_uses_identifier_scheme:
278 id_schema = id_triple[2]
279 elif id_triple[1] == GraphEntity.iri_has_literal_value:
280 id_literal_value = id_triple[2]
281 full_id = f'{id_schema.replace(GraphEntity.DATACITE, "")}:{id_literal_value}'
282 metaid_id_tuple = (res_triple[2].replace(f'{self.base_iri}/id/', ''), full_id)
283 metaid_id_list.append(metaid_id_tuple)
285 full_name = self._construct_full_name(name, family_name, given_name)
286 result_list.append((res.replace(f'{self.base_iri}/ra/', ''), full_name, metaid_id_list))
288 return result_list
290 def _construct_full_name(self, name: str, family_name: str, given_name: str) -> str:
291 if name and not family_name and not given_name:
292 return name
293 elif not name and family_name and not given_name:
294 return f'{family_name},'
295 elif not name and not family_name and given_name:
296 return f', {given_name}'
297 elif not name and family_name and given_name:
298 return f'{family_name}, {given_name}'
299 else:
300 return ''
302 def retrieve_ra_sequence_from_br_meta(self, metaid: str, col_name: str) -> List[Dict[str, tuple]]:
303 '''
304 Given a bibliographic resource's MetaID and a field name, it returns its agent roles and responsible agents in the correct order according to the specified field.
305 The output has the following format: ::
306 [
307 {METAID_AR_1: (NAME_RA_1, [(METAID_ID_RA_1, LITERAL_VALUE_ID_RA_1)], METAID_RA_1)},
308 {METAID_AR_2: (NAME_RA_2, [(METAID_ID_RA_2, LITERAL_VALUE_ID_RA_2)], METAID_RA_2)},
309 {METAID_AR_N: (NAME_RA_N, [(METAID_ID_RA_N, LITERAL_VALUE_ID_RA_N)], METAID_RA_N)},
310 ]
311 [
312 {'5343': ('Hodge, James G.', [], '3316')},
313 {'5344': ('Anderson, Evan D.', [], '3317')},
314 {'5345': ('Kirsch, Thomas D.', [], '3318')},
315 {'5346': ('Kelen, Gabor D.', [('4278', 'orcid:0000-0002-3236-8286')], '3319')}
316 ]
317 :params metaid: a MetaID
318 :type meta_id: str
319 :params col_name: a MetaID
320 :type col_name: str
321 :returns: List[Dict[str, tuple]] -- the output is a list of three-elements tuples. Each tuple's first and third elements are the MetaIDs of an agent role and responsible agent related to the specified bibliographic resource. The second element is a two-elements tuple, where the first element is the MetaID of the identifier of the responsible agent. In contrast, the second one is the literal value of that id.
322 '''
323 if col_name == 'author':
324 role = GraphEntity.iri_author
325 elif col_name == 'editor':
326 role = GraphEntity.iri_editor
327 else:
328 role = GraphEntity.iri_publisher
330 metaid_uri = URIRef(f'{self.base_iri}/br/{str(metaid)}')
331 dict_ar = dict()
333 for triple in self.local_g.triples((metaid_uri, GraphEntity.iri_is_document_context_for, None)):
334 for ar_triple in self.local_g.triples((triple[2], None, None)):
335 if ar_triple[2] == role:
336 role_value = str(triple[2]).replace(f'{self.base_iri}/ar/', '')
337 next_role = ''
338 ra = None
339 for relevant_ar_triple in self.local_g.triples((triple[2], None, None)):
340 if relevant_ar_triple[1] == GraphEntity.iri_has_next:
341 next_role = str(relevant_ar_triple[2]).replace(f'{self.base_iri}/ar/', '')
342 elif relevant_ar_triple[1] == GraphEntity.iri_is_held_by:
343 ra = str(relevant_ar_triple[2]).replace(f'{self.base_iri}/ra/', '')
344 # Skip AR if it has no associated RA (malformed data)
345 if ra is not None:
346 dict_ar[role_value] = {'next': next_role, 'ra': ra}
348 # Find the start_role by excluding all roles that are "next" for others from the set of all roles
349 all_roles = set(dict_ar.keys())
350 roles_with_next = set(details['next'] for details in dict_ar.values() if details['next'])
351 start_role_candidates = all_roles - roles_with_next
353 MAX_ITERATIONS = 10000
355 if len(all_roles) == 0:
356 return []
358 # If no start candidates (circular loop), pick the AR with lowest number as arbitrary start
359 if len(start_role_candidates) == 0:
360 sorted_ars = sorted(all_roles, key=lambda ar: get_resource_number(f'{self.base_iri}/ar/{ar}'))
361 start_role_candidates = {sorted_ars[0]}
363 if len(start_role_candidates) != 1:
364 # If more than one start candidate exists or none exist, build all chains and return the best one
365 chains = []
366 for start_candidate in start_role_candidates:
367 current_role = start_candidate
368 chain = []
369 visited_roles = set()
370 iteration_count = 0
371 while current_role and current_role not in visited_roles and iteration_count < MAX_ITERATIONS:
372 visited_roles.add(current_role)
373 if current_role in dict_ar:
374 ra_info = self.retrieve_ra_from_meta(dict_ar[current_role]['ra'])[0:2]
375 ra_tuple = ra_info + (dict_ar[current_role]['ra'],)
376 chain.append({current_role: ra_tuple})
377 current_role = dict_ar[current_role]['next']
378 else:
379 break
380 iteration_count += 1
382 if iteration_count == MAX_ITERATIONS:
383 print(f"Warning: Possible infinite loop detected for BR: {metaid}, column: {col_name}")
384 return []
386 chains.append(chain)
387 # Sort chains by length, then by the lowest sequential number of the starting role
388 chains.sort(key=lambda chain: (-len(chain), get_resource_number(f'{self.base_iri}/ar/{list(chain[0].keys())[0]}')))
389 try:
390 ordered_ar_list = chains[0]
391 except Exception as e:
392 print(f"\nWarning: Error processing BR: {metaid} for column: {col_name}")
393 print(f"dict_ar: {dict_ar}")
394 print(f"All roles: {all_roles}")
395 print(f"Start role candidates: {start_role_candidates}")
396 print(f"Roles with next: {roles_with_next}")
397 print(f"Error: {str(e)}")
398 return []
399 else:
400 start_role = start_role_candidates.pop()
401 # Follow the "next" chain from the start_role to construct an ordered list
402 ordered_ar_list = []
403 current_role = start_role
404 visited_roles = set()
405 iteration_count = 0
406 while current_role and current_role not in visited_roles and iteration_count < MAX_ITERATIONS:
407 visited_roles.add(current_role)
408 if current_role in dict_ar:
409 ra_info = self.retrieve_ra_from_meta(dict_ar[current_role]['ra'])[0:2]
410 ra_tuple = ra_info + (dict_ar[current_role]['ra'],)
411 ordered_ar_list.append({current_role: ra_tuple})
412 current_role = dict_ar[current_role]['next']
413 else:
414 break
415 iteration_count += 1
417 if iteration_count == MAX_ITERATIONS:
418 print(f"Warning: Possible infinite loop detected for BR: {metaid}, column: {col_name}")
419 return []
421 return ordered_ar_list
423 def retrieve_re_from_br_meta(self, metaid:str) -> Tuple[str, str]:
424 '''
425 Given a bibliographic resource's MetaID, it returns its resource embodiment's MetaID and pages.
426 The output has the following format: ::
428 (METAID, PAGES)
429 ('2011', '391-397')
431 :params metaid: a bibliographic resource's MetaID
432 :type meta_id: str
433 :returns: Tuple[str, str] -- the output is a two-elements tuple, where the first element is the MetaID of the resource embodiment, and the second is a pages' interval.
434 '''
435 metaid_uri = URIRef(f'{self.base_iri}/br/{str(metaid)}')
436 re_uri = None
437 starting_page = None
438 ending_page = None
439 for triple in self.local_g.triples((metaid_uri, GraphEntity.iri_embodiment, None)):
440 re_uri = triple[2].replace(f'{self.base_iri}/re/', '')
441 for re_triple in self.local_g.triples((triple[2], None, None)):
442 if re_triple[1] == GraphEntity.iri_starting_page:
443 starting_page = str(re_triple[2])
444 elif re_triple[1] == GraphEntity.iri_ending_page:
445 ending_page = str(re_triple[2])
446 if re_uri:
447 if starting_page and ending_page:
448 pages = f'{starting_page}-{ending_page}'
449 elif starting_page and not ending_page:
450 pages = f'{starting_page}-{starting_page}'
451 elif not starting_page and ending_page:
452 pages = f'{ending_page}-{ending_page}'
453 elif not starting_page and not ending_page:
454 pages = ''
455 return re_uri, pages
457 def retrieve_br_info_from_meta(self, metaid: str) -> dict:
458 '''
459 Given a bibliographic resource's MetaID, it returns all the information about that resource.
460 The output has the following format: ::
462 {
463 'pub_date': PUB_DATE,
464 'type': TYPE,
465 'page': (METAID, PAGES),
466 'issue': ISSUE,
467 'volume': VOLUME,
468 'venue': VENUE
469 }
470 {
471 'pub_date': '2006-02-27',
472 'type': 'journal article',
473 'page': ('2011', '391-397'),
474 'issue': '4',
475 'volume': '166',
476 'venue': 'Archives Of Internal Medicine [omid:br/4387]'
477 }
479 :param metaid: a bibliographic resource's MetaID
480 :type metaid: str
481 :returns: dict -- the output is a dictionary including the publication date, type, page, issue, volume, and venue of the specified bibliographic resource.
482 '''
484 venue_iris = [
485 GraphEntity.iri_archival_document,
486 GraphEntity.iri_journal,
487 GraphEntity.iri_book,
488 GraphEntity.iri_book_series,
489 GraphEntity.iri_series,
490 GraphEntity.iri_academic_proceedings,
491 GraphEntity.iri_proceedings_series,
492 GraphEntity.iri_reference_book,
493 GraphEntity.iri_series,
495 GraphEntity.iri_expression
496 ]
498 def extract_identifiers(entity_uri):
499 identifiers = [f"omid:{entity_uri.replace(f'{self.base_iri}/', '')}"]
500 for id_triple in self.local_g.triples((entity_uri, GraphEntity.iri_has_identifier, None)):
501 id_obj = id_triple[2]
502 scheme = value = None
503 for detail_triple in self.local_g.triples((id_obj, None, None)):
504 if detail_triple[1] == GraphEntity.iri_uses_identifier_scheme:
505 scheme = str(detail_triple[2])
506 elif detail_triple[1] == GraphEntity.iri_has_literal_value:
507 value = str(detail_triple[2])
508 if scheme and value:
509 scheme = scheme.replace(GraphEntity.DATACITE, '')
510 identifiers.append(f"{scheme}:{value}")
511 return identifiers
513 metaid = str(metaid)
514 metaid_uri = URIRef(f'{self.base_iri}/br/{metaid}') if self.base_iri not in metaid else URIRef(metaid)
515 res_dict = {
516 'pub_date': '',
517 'type': '',
518 'page': self.retrieve_re_from_br_meta(metaid),
519 'issue': '',
520 'volume': '',
521 'venue': ''
522 }
524 for triple in self.local_g.triples((metaid_uri, None, None)):
525 predicate, obj = triple[1], triple[2]
527 if predicate == GraphEntity.iri_has_publication_date:
528 res_dict['pub_date'] = str(obj)
529 elif predicate == RDF.type and obj != GraphEntity.iri_expression:
530 res_dict['type'] = self._type_it(obj)
531 elif predicate == GraphEntity.iri_has_sequence_identifier:
532 for inner_triple in self.local_g.triples((metaid_uri, None, None)):
533 inner_obj = inner_triple[2]
534 if inner_obj == GraphEntity.iri_journal_issue:
535 res_dict['issue'] = str(triple[2])
536 elif inner_obj == GraphEntity.iri_journal_volume:
537 res_dict['volume'] = str(triple[2])
538 elif predicate == GraphEntity.iri_part_of:
539 for vvi_triple in self.local_g.triples((obj, None, None)):
540 vvi_obj = vvi_triple[2]
541 if vvi_obj == GraphEntity.iri_journal_issue:
542 for inner_vvi_triple in self.local_g.triples((obj, None, None)):
543 if inner_vvi_triple[1] == GraphEntity.iri_has_sequence_identifier:
544 res_dict['issue'] = str(inner_vvi_triple[2])
545 elif vvi_obj == GraphEntity.iri_journal_volume:
546 for inner_vvi_triple in self.local_g.triples((obj, None, None)):
547 if inner_vvi_triple[1] == GraphEntity.iri_has_sequence_identifier:
548 res_dict['volume'] = str(inner_vvi_triple[2])
549 elif vvi_obj in venue_iris:
550 for inner_vvi_triple in self.local_g.triples((obj, None, None)):
551 if inner_vvi_triple[1] == GraphEntity.iri_title:
552 venue_title = str(inner_vvi_triple[2])
553 venue_ids = extract_identifiers(obj)
554 res_dict['venue'] = f"{venue_title} [{' '.join(venue_ids)}]"
556 if vvi_triple[1] == GraphEntity.iri_part_of:
557 for vi_triple in self.local_g.triples((vvi_obj, None, None)):
558 vi_obj = vi_triple[2]
559 if vi_obj == GraphEntity.iri_journal_volume:
560 for inner_vvi_triple in self.local_g.triples((vvi_obj, None, None)):
561 if inner_vvi_triple[1] == GraphEntity.iri_has_sequence_identifier:
562 res_dict['volume'] = str(inner_vvi_triple[2])
563 elif vi_obj in venue_iris:
564 for inner_vvi_triple in self.local_g.triples((vvi_obj, None, None)):
565 if inner_vvi_triple[1] == GraphEntity.iri_title:
566 venue_title = str(inner_vvi_triple[2])
567 venue_ids = extract_identifiers(vvi_obj)
568 res_dict['venue'] = f"{venue_title} [{' '.join(venue_ids)}]"
570 if vi_triple[1] == GraphEntity.iri_part_of:
571 for venue_triple in self.local_g.triples((vi_obj, None, None)):
572 if venue_triple[1] == GraphEntity.iri_title:
573 venue_title = str(venue_triple[2])
574 venue_ids = extract_identifiers(vi_obj)
575 res_dict['venue'] = f"{venue_title} [{' '.join(venue_ids)}]"
576 return res_dict
578 @staticmethod
579 def _type_it(br_type: URIRef) -> str:
580 output_type = ''
581 if br_type == GraphEntity.iri_archival_document:
582 output_type = 'archival document'
583 if br_type == GraphEntity.iri_book:
584 output_type = 'book'
585 if br_type == GraphEntity.iri_book_chapter:
586 output_type = 'book chapter'
587 if br_type == GraphEntity.iri_part:
588 output_type = 'book part'
589 if br_type == GraphEntity.iri_expression_collection:
590 output_type = 'book section'
591 if br_type == GraphEntity.iri_book_series:
592 output_type = 'book series'
593 if br_type == GraphEntity.iri_book_set:
594 output_type = 'book set'
595 if br_type == GraphEntity.iri_data_file:
596 output_type = 'data file'
597 if br_type == GraphEntity.iri_thesis:
598 output_type = 'dissertation'
599 if br_type == GraphEntity.iri_journal:
600 output_type = 'journal'
601 if br_type == GraphEntity.iri_journal_article:
602 output_type = 'journal article'
603 if br_type == GraphEntity.iri_journal_issue:
604 output_type = 'journal issue'
605 if br_type == GraphEntity.iri_journal_volume:
606 output_type = 'journal volume'
607 if br_type == GraphEntity.iri_proceedings_paper:
608 output_type = 'proceedings article'
609 if br_type == GraphEntity.iri_academic_proceedings:
610 output_type = 'proceedings'
611 if br_type == GraphEntity.iri_reference_book:
612 output_type = 'reference book'
613 if br_type == GraphEntity.iri_reference_entry:
614 output_type = 'reference entry'
615 if br_type == GraphEntity.iri_series:
616 output_type = 'series'
617 if br_type == GraphEntity.iri_report_document:
618 output_type = 'report'
619 if br_type == GraphEntity.iri_specification_document:
620 output_type = 'standard'
621 return output_type
623 def retrieve_publisher_from_br_metaid(self, metaid:str):
624 metaid_uri = URIRef(f'{self.base_iri}/br/{metaid}')
625 publishers = set()
626 for triple in self.local_g.triples((metaid_uri, None, None)):
627 if triple[1] == GraphEntity.iri_is_document_context_for:
628 for document_triple in self.local_g.triples((triple[2], None, None)):
629 if document_triple[2] == GraphEntity.iri_publisher:
630 publishers.add(triple[2])
631 elif triple[1] == GraphEntity.iri_part_of:
632 for inner_triple in self.local_g.triples((triple[2], None, None)):
633 if inner_triple[1] == GraphEntity.iri_is_document_context_for:
634 for document_triple in self.local_g.triples((inner_triple[2], None, None)):
635 if document_triple[2] == GraphEntity.iri_publisher:
636 publishers.add(inner_triple[2])
637 elif inner_triple[1] == GraphEntity.iri_part_of:
638 for inner_inner_triple in self.local_g.triples((inner_triple[2], None, None)):
639 if inner_inner_triple[1] == GraphEntity.iri_is_document_context_for:
640 for document_triple in self.local_g.triples((inner_inner_triple[2], None, None)):
641 if document_triple[2] == GraphEntity.iri_publisher:
642 publishers.add(inner_inner_triple[2])
643 publishers_output = []
644 for publisher_uri in publishers:
645 pub_identifiers = []
646 pub_name = None
647 for triple in self.local_g.triples((publisher_uri, None, None)):
648 if triple[1] == GraphEntity.iri_is_held_by:
649 pub_metaid = triple[2].replace(f'{self.base_iri}/', 'omid:')
650 pub_identifiers.append(pub_metaid)
651 for ra_triple in self.local_g.triples((triple[2], None, None)):
652 pub_schema = None
653 pub_literal = None
654 if ra_triple[1] == GraphEntity.iri_name:
655 pub_name = ra_triple[2]
656 elif ra_triple[1] == GraphEntity.iri_has_identifier:
657 for id_triple in self.local_g.triples((ra_triple[2], None, None)):
658 if id_triple[1] == GraphEntity.iri_uses_identifier_scheme:
659 pub_schema = id_triple[2].replace(f'{str(GraphEntity.DATACITE)}', '')
660 elif id_triple[1] == GraphEntity.iri_has_literal_value:
661 pub_literal = id_triple[2]
662 if pub_schema is not None and pub_literal is not None:
663 pub_id = f'{pub_schema}:{pub_literal}'
664 pub_identifiers.append(pub_id)
665 if pub_name is not None:
666 pub_full = f'{pub_name} [{" ".join(pub_identifiers)}]'
667 else:
668 pub_full = f'[{" ".join(pub_identifiers)}]'
669 publishers_output.append(pub_full)
670 return '; '.join(publishers_output)
672 def get_everything_about_res(self, metavals: set, identifiers: set, vvis: set, max_depth: int = 10) -> None:
673 BATCH_SIZE = 10
674 MAX_WORKERS = 1
676 def batch_process(input_set, batch_size):
677 """Generator to split input data into smaller batches if batch_size is not None."""
678 if batch_size is None:
679 yield input_set
680 else:
681 for i in range(0, len(input_set), batch_size):
682 yield input_set[i:i + batch_size]
684 def process_batch_parallel(subjects, cur_depth, visited_subjects):
685 """Process batches of subjects in parallel up to the specified depth."""
686 if not subjects or (max_depth and cur_depth > max_depth):
687 return
689 new_subjects = subjects - visited_subjects
690 if not new_subjects:
691 return
693 visited_subjects.update(new_subjects)
695 subject_list = list(new_subjects)
696 batches = list(batch_process(subject_list, BATCH_SIZE))
697 batch_queries = []
698 ts_url = self.ts_url
700 for batch in batches:
701 query = f'''
702 SELECT ?s ?p ?o
703 WHERE {{
704 VALUES ?s {{ {' '.join([f"<{s}>" for s in batch])} }}
705 ?s ?p ?o.
706 }}'''
707 batch_queries.append(query)
709 next_subjects = set()
710 if len(batch_queries) > 1 and MAX_WORKERS > 1:
711 queries_per_worker = max(1, len(batch_queries) // MAX_WORKERS)
712 grouped_queries = []
713 for i in range(0, len(batch_queries), queries_per_worker):
714 grouped_queries.append((ts_url, batch_queries[i:i + queries_per_worker]))
715 with ProcessPoolExecutor(
716 max_workers=min(len(grouped_queries), MAX_WORKERS),
717 mp_context=multiprocessing.get_context('spawn')
718 ) as executor:
719 grouped_results = list(executor.map(_execute_sparql_queries, grouped_queries))
720 results = [item for sublist in grouped_results for item in sublist]
721 else:
722 results = _execute_sparql_queries((ts_url, batch_queries)) if batch_queries else []
724 for result in results:
725 for row in result:
726 s = URIRef(row['s']['value'])
727 p = URIRef(row['p']['value'])
728 o = row['o']['value']
729 o_type = row['o']['type']
730 o_datatype = URIRef(row['o']['datatype']) if 'datatype' in row['o'] else None
731 o = URIRef(o) if o_type == 'uri' else Literal(lexical_or_value=o, datatype=o_datatype)
732 self.local_g.add((s, p, o))
733 if s not in self.prebuilt_subgraphs:
734 self.prebuilt_subgraphs[s] = Graph()
735 self.prebuilt_subgraphs[s].add((s, p, o))
736 if isinstance(o, URIRef) and p not in {RDF.type, GraphEntity.iri_with_role, GraphEntity.iri_uses_identifier_scheme}:
737 next_subjects.add(str(o))
739 process_batch_parallel(next_subjects, cur_depth + 1, visited_subjects)
741 def get_initial_subjects_from_metavals(metavals):
742 """Convert metavals to a set of subjects."""
743 return {f"{self.base_iri}/{mid.replace('omid:', '')}" for mid in metavals}
745 def get_initial_subjects_from_identifiers(identifiers):
746 """Convert identifiers to a set of subjects based on batch queries executed in parallel.
748 Returns:
749 tuple: (subjects set, id_to_subjects mapping)
750 - subjects: set of subject URIs found
751 - id_to_subjects: dict mapping identifier string to set of subject URIs
752 """
753 subjects = set()
754 id_to_subjects = {}
755 ts_url = self.ts_url
756 batches = list(batch_process(list(identifiers), BATCH_SIZE))
758 if not batches:
759 return subjects, id_to_subjects
761 batch_queries = []
762 for batch in batches:
763 if not batch:
764 continue
766 if self.virtuoso_full_text_search:
767 union_blocks = []
768 for identifier in batch:
769 scheme, literal = identifier.split(':', maxsplit=1)[0], identifier.split(':', maxsplit=1)[1]
770 escaped_literal = literal.replace('\\', '\\\\').replace('"', '\\"')
771 union_blocks.append(f"""
772 {{
773 {{
774 ?id <{GraphEntity.iri_has_literal_value}> "{escaped_literal}" .
775 }}
776 UNION
777 {{
778 ?id <{GraphEntity.iri_has_literal_value}> "{escaped_literal}"^^<{XSD.string}> .
779 }}
780 ?id <{GraphEntity.iri_uses_identifier_scheme}> <{GraphEntity.DATACITE + scheme}> .
781 ?s <{GraphEntity.iri_has_identifier}> ?id .
782 BIND("{scheme}" AS ?schemeLabel)
783 BIND("{escaped_literal}" AS ?literalLabel)
784 }}
785 """)
786 union_query = " UNION ".join(union_blocks)
787 query = f'''
788 SELECT ?s ?schemeLabel ?literalLabel WHERE {{
789 {union_query}
790 }}
791 '''
792 batch_queries.append(query)
793 else:
794 identifiers_values = []
795 for identifier in batch:
796 scheme, literal = identifier.split(':', maxsplit=1)[0], identifier.split(':', maxsplit=1)[1]
797 escaped_literal = literal.replace('\\', '\\\\').replace('"', '\\"')
798 identifiers_values.append(f"(<{GraphEntity.DATACITE + scheme}> \"{escaped_literal}\")")
799 identifiers_values_str = " ".join(identifiers_values)
800 query = f'''
801 SELECT DISTINCT ?s ?scheme ?literal WHERE {{
802 VALUES (?scheme ?literal) {{ {identifiers_values_str} }}
803 ?id <{GraphEntity.iri_uses_identifier_scheme}> ?scheme .
804 ?id <{GraphEntity.iri_has_literal_value}> ?literalValue .
805 FILTER(str(?literalValue) = str(?literal))
806 ?s <{GraphEntity.iri_has_identifier}> ?id .
807 }}
808 '''
809 batch_queries.append(query)
811 if len(batch_queries) > 1 and MAX_WORKERS > 1:
812 queries_per_worker = max(1, len(batch_queries) // MAX_WORKERS)
813 grouped_queries = []
814 for i in range(0, len(batch_queries), queries_per_worker):
815 grouped_queries.append((ts_url, batch_queries[i:i + queries_per_worker]))
816 with ProcessPoolExecutor(
817 max_workers=min(len(grouped_queries), MAX_WORKERS),
818 mp_context=multiprocessing.get_context('spawn')
819 ) as executor:
820 grouped_results = list(executor.map(_execute_sparql_queries, grouped_queries))
821 results = [item for sublist in grouped_results for item in sublist]
822 elif batch_queries:
823 results = _execute_sparql_queries((ts_url, batch_queries))
824 else:
825 results = []
827 for result in results:
828 for row in result:
829 subject = str(row['s']['value'])
830 subjects.add(subject)
831 if 'schemeLabel' in row:
832 scheme = str(row['schemeLabel']['value'])
833 literal = str(row['literalLabel']['value'])
834 else:
835 scheme = str(row['scheme']['value']).replace(str(GraphEntity.DATACITE), '')
836 literal = str(row['literal']['value'])
837 identifier = f"{scheme}:{literal}"
838 if identifier not in id_to_subjects:
839 id_to_subjects[identifier] = set()
840 id_to_subjects[identifier].add(subject)
842 return subjects, id_to_subjects
844 def get_initial_subjects_from_vvis(vvis):
845 """Convert vvis to a set of subjects based on batch queries executed in parallel."""
846 subjects = set()
847 ts_url = self.ts_url
848 vvi_queries = []
849 venue_uris_to_add = set()
851 # First pass: collect all venue IDs and prepare queries
852 all_venue_ids = set()
853 for volume, issue, venue_metaid, venue_ids_tuple in vvis:
854 if venue_ids_tuple:
855 all_venue_ids.update(venue_ids_tuple)
857 # Get venue subjects from identifiers with mapping
858 venue_id_to_uris = {}
859 if all_venue_ids:
860 venue_id_subjects, venue_id_to_uris = get_initial_subjects_from_identifiers(all_venue_ids)
861 subjects.update(venue_id_subjects)
863 # Second pass: prepare VVI queries
864 for volume, issue, venue_metaid, venue_ids_tuple in vvis:
865 venues_to_search = set()
867 if venue_metaid:
868 venues_to_search.add(venue_metaid)
870 if venue_ids_tuple:
871 # Convert venue URIs to metaid format for VVI search
872 # Only use venues matching THIS tuple's identifiers
873 for venue_id in venue_ids_tuple:
874 if venue_id in venue_id_to_uris:
875 for venue_uri in venue_id_to_uris[venue_id]:
876 if '/br/' in venue_uri:
877 metaid = venue_uri.replace(f'{self.base_iri}/br/', '')
878 venues_to_search.add(f"omid:br/{metaid}")
880 # Prepare VVI queries for each venue
881 for venue_metaid_to_search in venues_to_search:
882 venue_uri = f"{self.base_iri}/{venue_metaid_to_search.replace('omid:', '')}"
883 sequence_value = issue if issue else volume
884 if not sequence_value:
885 continue
886 escaped_sequence = sequence_value.replace('\\', '\\\\').replace('"', '\\"')
888 if issue:
889 if volume:
890 escaped_volume = volume.replace('\\', '\\\\').replace('"', '\\"')
891 query = f'''
892 SELECT ?s WHERE {{
893 {{
894 ?volume a <{GraphEntity.iri_journal_volume}> ;
895 <{GraphEntity.iri_part_of}> <{venue_uri}> ;
896 <{GraphEntity.iri_has_sequence_identifier}> "{escaped_volume}" .
897 ?s a <{GraphEntity.iri_journal_issue}> ;
898 <{GraphEntity.iri_part_of}> ?volume ;
899 <{GraphEntity.iri_has_sequence_identifier}> "{escaped_sequence}" .
900 }}
901 UNION
902 {{
903 ?volume a <{GraphEntity.iri_journal_volume}> ;
904 <{GraphEntity.iri_part_of}> <{venue_uri}> ;
905 <{GraphEntity.iri_has_sequence_identifier}> "{escaped_volume}"^^<{XSD.string}> .
906 ?s a <{GraphEntity.iri_journal_issue}> ;
907 <{GraphEntity.iri_part_of}> ?volume ;
908 <{GraphEntity.iri_has_sequence_identifier}> "{escaped_sequence}" .
909 }}
910 UNION
911 {{
912 ?volume a <{GraphEntity.iri_journal_volume}> ;
913 <{GraphEntity.iri_part_of}> <{venue_uri}> ;
914 <{GraphEntity.iri_has_sequence_identifier}> "{escaped_volume}" .
915 ?s a <{GraphEntity.iri_journal_issue}> ;
916 <{GraphEntity.iri_part_of}> ?volume ;
917 <{GraphEntity.iri_has_sequence_identifier}> "{escaped_sequence}"^^<{XSD.string}> .
918 }}
919 UNION
920 {{
921 ?volume a <{GraphEntity.iri_journal_volume}> ;
922 <{GraphEntity.iri_part_of}> <{venue_uri}> ;
923 <{GraphEntity.iri_has_sequence_identifier}> "{escaped_volume}"^^<{XSD.string}> .
924 ?s a <{GraphEntity.iri_journal_issue}> ;
925 <{GraphEntity.iri_part_of}> ?volume ;
926 <{GraphEntity.iri_has_sequence_identifier}> "{escaped_sequence}"^^<{XSD.string}> .
927 }}
928 }}
929 '''
930 else:
931 query = f'''
932 SELECT ?s WHERE {{
933 {{
934 ?s a <{GraphEntity.iri_journal_issue}> ;
935 <{GraphEntity.iri_part_of}> <{venue_uri}> ;
936 <{GraphEntity.iri_has_sequence_identifier}> "{escaped_sequence}" .
937 }}
938 UNION
939 {{
940 ?s a <{GraphEntity.iri_journal_issue}> ;
941 <{GraphEntity.iri_part_of}> <{venue_uri}> ;
942 <{GraphEntity.iri_has_sequence_identifier}> "{escaped_sequence}"^^<{XSD.string}> .
943 }}
944 }}
945 '''
946 else:
947 if volume:
948 query = f'''
949 SELECT ?s WHERE {{
950 {{
951 ?s a <{GraphEntity.iri_journal_volume}> ;
952 <{GraphEntity.iri_part_of}> <{venue_uri}> ;
953 <{GraphEntity.iri_has_sequence_identifier}> "{escaped_sequence}" .
954 }}
955 UNION
956 {{
957 ?s a <{GraphEntity.iri_journal_volume}> ;
958 <{GraphEntity.iri_part_of}> <{venue_uri}> ;
959 <{GraphEntity.iri_has_sequence_identifier}> "{escaped_sequence}"^^<{XSD.string}> .
960 }}
961 }}
962 '''
963 else:
964 continue
966 vvi_queries.append(query)
967 venue_uris_to_add.add(venue_uri)
969 # Execute VVI queries in parallel
970 if len(vvi_queries) > 1 and MAX_WORKERS > 1:
971 queries_per_worker = max(1, len(vvi_queries) // MAX_WORKERS)
972 grouped_queries = []
973 for i in range(0, len(vvi_queries), queries_per_worker):
974 grouped_queries.append((ts_url, vvi_queries[i:i + queries_per_worker]))
975 with ProcessPoolExecutor(
976 max_workers=min(len(grouped_queries), MAX_WORKERS),
977 mp_context=multiprocessing.get_context('spawn')
978 ) as executor:
979 grouped_results = list(executor.map(_execute_sparql_queries, grouped_queries))
980 results = [item for sublist in grouped_results for item in sublist]
981 elif vvi_queries:
982 results = _execute_sparql_queries((ts_url, vvi_queries))
983 else:
984 results = []
986 for result in results:
987 for row in result:
988 subjects.add(str(row['s']['value']))
990 subjects.update(venue_uris_to_add)
992 return subjects
994 initial_subjects = set()
996 if metavals:
997 initial_subjects.update(get_initial_subjects_from_metavals(metavals))
999 if identifiers:
1000 id_subjects, _ = get_initial_subjects_from_identifiers(identifiers)
1001 initial_subjects.update(id_subjects)
1003 if vvis:
1004 initial_subjects.update(get_initial_subjects_from_vvis(vvis))
1006 visited_subjects = set()
1007 process_batch_parallel(initial_subjects, 0, visited_subjects)
1009 def get_subgraph(self, res: str) -> Graph|None:
1010 if res in self.prebuilt_subgraphs:
1011 return self.prebuilt_subgraphs[res]
1012 return None
1014 def retrieve_venue_from_local_graph(self, meta_id: str) -> Dict[str, Dict[str, str]]:
1015 """
1016 Retrieve venue VVI structure from local graph instead of querying triplestore.
1018 :params meta_id: a MetaID
1019 :type meta_id: str
1020 :returns: Dict[str, Dict[str, str]] -- the venue structure with volumes and issues
1021 """
1022 content = {
1023 'issue': {},
1024 'volume': {}
1025 }
1027 volumes = {}
1028 venue_uri = URIRef(f'{self.base_iri}/br/{meta_id}')
1030 # Find all volumes directly part of this venue
1031 for triple in self.local_g.triples((None, RDF.type, GraphEntity.iri_journal_volume)):
1032 entity = triple[0]
1033 # Check if this volume is part of our venue
1034 for part_triple in self.local_g.triples((entity, GraphEntity.iri_part_of, venue_uri)):
1035 entity_id = str(entity).replace(f'{self.base_iri}/br/', '')
1036 for seq_triple in self.local_g.triples((entity, GraphEntity.iri_has_sequence_identifier, None)):
1037 seq = str(seq_triple[2])
1038 volumes[entity_id] = seq
1039 content['volume'][seq] = {
1040 'id': entity_id,
1041 'issue': {}
1042 }
1044 # Find all issues
1045 for triple in self.local_g.triples((None, RDF.type, GraphEntity.iri_journal_issue)):
1046 entity = triple[0]
1047 entity_id = str(entity).replace(f'{self.base_iri}/br/', '')
1048 seq = None
1049 container = None
1051 # Get sequence identifier
1052 for seq_triple in self.local_g.triples((entity, GraphEntity.iri_has_sequence_identifier, None)):
1053 seq = str(seq_triple[2])
1055 # Get container (could be venue or volume)
1056 for container_triple in self.local_g.triples((entity, GraphEntity.iri_part_of, None)):
1057 container = str(container_triple[2])
1059 if seq:
1060 if container:
1061 container_id = container.replace(f'{self.base_iri}/br/', '')
1062 # Check if container is a volume of our venue
1063 if container_id in volumes:
1064 volume_seq = volumes[container_id]
1065 content['volume'][volume_seq]['issue'][seq] = {'id': entity_id}
1066 # Check if container is directly our venue
1067 elif container == str(venue_uri):
1068 content['issue'][seq] = {'id': entity_id}
1070 return content