Coverage for oc_meta/lib/finder.py: 88%

656 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-12-20 08:55 +0000

1import multiprocessing 

2import os 

3from concurrent.futures import ProcessPoolExecutor 

4from typing import Dict, List, Tuple 

5 

6import yaml 

7from dateutil import parser 

8from oc_ocdm.graph import GraphEntity 

9from oc_ocdm.graph.graph_entity import GraphEntity 

10from oc_ocdm.prov.prov_entity import ProvEntity 

11from oc_ocdm.support import get_count, get_resource_number 

12from rdflib import RDF, XSD, Graph, Literal, URIRef 

13from sparqlite import SPARQLClient 

14from time_agnostic_library.agnostic_entity import AgnosticEntity 

15 

16 

17def _execute_sparql_queries(args: tuple) -> list: 

18 ts_url, queries = args 

19 results = [] 

20 with SPARQLClient(ts_url, max_retries=5, backoff_factor=5, timeout=3600) as client: 

21 for query in queries: 

22 result = client.query(query) 

23 results.append(result['results']['bindings'] if result else []) 

24 return results 

25 

26 

27class ResourceFinder: 

28 

29 def __init__(self, ts_url, base_iri: str, local_g: Graph = Graph(), settings: dict = dict(), meta_config_path: str = None): 

30 self.ts_url = ts_url 

31 self.base_iri = base_iri[:-1] if base_iri[-1] == '/' else base_iri 

32 self.local_g = local_g 

33 self.prebuilt_subgraphs = {} 

34 self.ids_in_local_g = set() 

35 self.meta_config_path = meta_config_path 

36 self.meta_settings = settings 

37 self.virtuoso_full_text_search = settings['virtuoso_full_text_search'] if settings and 'virtuoso_full_text_search' in settings else False 

38 

39 # _______________________________BR_________________________________ # 

40 

41 def retrieve_br_from_id(self, schema: str, value: str) -> List[Tuple[str, str, list]]: 

42 ''' 

43 Given an identifier, it retrieves bibliographic resources associated with that identifier, related titles and other identifiers MetaIDs and literal values. 

44 

45 :params schema: an identifier schema 

46 :type schema: str 

47 :params value: an identifier literal value 

48 :type value: str 

49 :returns List[Tuple[str, str, list]]: -- it returns a list of three elements tuples. The first element is the MetaID of a resource associated with the input ID. The second element is a title of that resource, if present. The third element is a list of MetaID-ID tuples related to identifiers associated with that resource.  

50 ''' 

51 schema_uri = URIRef(GraphEntity.DATACITE + schema) 

52 value = value.replace('\\', '\\\\') 

53 result_list = [] 

54 identifier_uri = None 

55 

56 # Search for both string-typed and untyped literals 

57 for literal_value in [Literal(value, datatype=XSD.string), Literal(value)]: 

58 for starting_triple in self.local_g.triples((None, GraphEntity.iri_has_literal_value, literal_value)): 

59 for known_id_triple in self.local_g.triples((starting_triple[0], None, None)): 

60 if known_id_triple[1] == GraphEntity.iri_uses_identifier_scheme and known_id_triple[2] == schema_uri: 

61 identifier_uri = known_id_triple[0] 

62 if identifier_uri: 

63 break 

64 if identifier_uri: 

65 metaid_id_list = [(identifier_uri.replace(f'{self.base_iri}/id/', ''), f'{schema}:{value}')] 

66 for triple in self.local_g.triples((None, GraphEntity.iri_has_identifier, identifier_uri)): 

67 title = '' 

68 res = triple[0] 

69 for res_triple in self.local_g.triples((res, None, None)): 

70 if res_triple[1] == GraphEntity.iri_title: 

71 title = str(res_triple[2]) 

72 elif res_triple[1] == GraphEntity.iri_has_identifier and res_triple[2] != identifier_uri: 

73 for id_triple in self.local_g.triples((res_triple[2], None, None)): 

74 if id_triple[1] == GraphEntity.iri_uses_identifier_scheme: 

75 id_schema = id_triple[2] 

76 elif id_triple[1] == GraphEntity.iri_has_literal_value: 

77 id_literal_value = id_triple[2] 

78 full_id = f'{id_schema.replace(GraphEntity.DATACITE, "")}:{id_literal_value}' 

79 metaid_id_tuple = (res_triple[2].replace(f'{self.base_iri}/id/', ''), full_id) 

80 metaid_id_list.append(metaid_id_tuple) 

81 result_list.append((res.replace(f'{self.base_iri}/br/', ''), title, metaid_id_list)) 

82 

83 return result_list 

84 

85 def retrieve_br_from_meta(self, metaid: str) -> Tuple[str, List[Tuple[str, str]]]: 

86 ''' 

87 Given a MetaID, it retrieves the title of the bibliographic resource having that MetaID and other identifiers of that entity. 

88 

89 :params metaid: a MetaID 

90 :type metaid: str 

91 :returns Tuple[str, List[Tuple[str, str]]]: -- it returns a tuple of two elements. The first element is the resource's title associated with the input MetaID. The second element is a list of MetaID-ID tuples related to identifiers associated with that entity. 

92 ''' 

93 metaid_uri = f'{self.base_iri}/br/{metaid}' 

94 title = '' 

95 identifiers = [] 

96 it_exists = False 

97 

98 for triple in self.local_g.triples((URIRef(metaid_uri), None, None)): 

99 it_exists = True 

100 if triple[1] == GraphEntity.iri_title: 

101 title = str(triple[2]) 

102 elif triple[1] == GraphEntity.iri_has_identifier: 

103 id_scheme = '' 

104 literal_value = '' 

105 identifier = triple[2] 

106 for triple_inner in self.local_g.triples((identifier, None, None)): 

107 if triple_inner[1] == GraphEntity.iri_uses_identifier_scheme: 

108 id_scheme = str(triple_inner[2]).replace(GraphEntity.DATACITE, '') 

109 elif triple_inner[1] == GraphEntity.iri_has_literal_value: 

110 literal_value = str(triple_inner[2]) 

111 if id_scheme and literal_value: # Ensure both id_scheme and literal_value are found before appending 

112 full_id = f'{id_scheme}:{literal_value}' 

113 identifiers.append((str(identifier).replace(self.base_iri + '/id/', ''), full_id)) 

114 

115 if not it_exists: 

116 return "", [], False 

117 

118 return title, identifiers, True 

119 

120 # _______________________________ID_________________________________ # 

121 

122 def retrieve_metaid_from_id(self, schema: str, value: str) -> str: 

123 ''' 

124 Given the schema and value of an ID, it returns the MetaID associated with that identifier. 

125 

126 :params schema: an identifier schema 

127 :type schema: str 

128 :params value: an identifier literal value 

129 :type value: str 

130 :returns str: -- it returns the MetaID associated with the input ID. 

131 ''' 

132 schema_uri = URIRef(GraphEntity.DATACITE + schema) 

133 value = value.replace('\\', '\\\\') 

134 

135 # Create both untyped and string-typed literals  

136 for literal in [Literal(value, datatype=XSD.string), Literal(value)]: 

137 for starting_triple in self.local_g.triples((None, GraphEntity.iri_has_literal_value, literal)): 

138 for known_id_triple in self.local_g.triples((starting_triple[0], None, None)): 

139 if known_id_triple[1] == GraphEntity.iri_uses_identifier_scheme and known_id_triple[2] == schema_uri: 

140 return known_id_triple[0].replace(f'{self.base_iri}/id/', '') 

141 

142 # If no match is found, return None or an appropriate value 

143 return None 

144 

145 def retrieve_metaid_from_merged_entity(self, metaid_uri:str, prov_config:str) -> str: 

146 ''' 

147 It looks for MetaId in the provenance. If the input entity was deleted due to a merge, this function returns the target entity. Otherwise, it returns None. 

148 

149 :params metaid_uri: a MetaId URI 

150 :type metaid_uri: str 

151 :params prov_config: the path of the configuration file required by time-agnostic-library 

152 :type prov_config: str 

153 :returns str: -- It returns the MetaID associated with the target entity after a merge. If there was no merge, it returns None. 

154 ''' 

155 metaval = None 

156 with open(prov_config, 'r', encoding='utf8') as f: 

157 prov_config_dict = yaml.safe_load(f) 

158 agnostic_meta = AgnosticEntity(res=metaid_uri, config=prov_config_dict, include_related_objects=False, include_merged_entities=False, include_reverse_relations=False) 

159 agnostic_meta_history = agnostic_meta.get_history(include_prov_metadata=True) 

160 meta_history_data = agnostic_meta_history[0][metaid_uri] 

161 if meta_history_data: 

162 meta_history_metadata = agnostic_meta_history[1][metaid_uri] 

163 penultimate_snapshot = sorted( 

164 meta_history_metadata.items(), 

165 key=lambda x: parser.parse(x[1]['generatedAtTime']).replace(tzinfo=None), 

166 reverse=True 

167 )[1][0] 

168 query_if_it_was_merged = f''' 

169 SELECT DISTINCT ?se 

170 WHERE {{ 

171 ?se a <{ProvEntity.iri_entity}>; 

172 <{ProvEntity.iri_was_derived_from}> <{penultimate_snapshot}>. 

173 }} 

174 ''' 

175 with SPARQLClient(self.ts_url, max_retries=5, backoff_factor=5, timeout=3600) as client: 

176 results = client.query(query_if_it_was_merged)['results']['bindings'] 

177 # The entity was merged to another 

178 merged_entity = [se for se in results if metaid_uri not in se['se']['value']] 

179 if merged_entity: 

180 merged_entity:str = merged_entity[0]['se']['value'] 

181 merged_entity = merged_entity.split('/prov/')[0] 

182 merged_entity = get_count(merged_entity) 

183 metaval = merged_entity 

184 return metaval 

185 

186 # _______________________________RA_________________________________ # 

187 def retrieve_ra_from_meta(self, metaid: str) -> Tuple[str, List[Tuple[str, str]]]: 

188 ''' 

189 Given a MetaID, it retrieves the name and id of the responsible agent associated with it, whether it is an author or a publisher. 

190 The output has the following format: 

191 

192 ('NAME', [('METAID_OF_THE_IDENTIFIER', 'LITERAL_VALUE')]) 

193 ('American Medical Association (ama)', [('4274', 'crossref:10')]) 

194 

195 :params metaid: a responsible agent's MetaID 

196 :type metaid: str 

197 :returns str: -- it returns a tuple, where the first element is the responsible agent's name, and the second element is a list containing its identifier's MetaID and literal value 

198 ''' 

199 metaid_uri = f'{self.base_iri}/ra/{metaid}' 

200 family_name = '' 

201 given_name = '' 

202 name = '' 

203 identifiers = [] 

204 it_exists = False 

205 

206 for triple in self.local_g.triples((URIRef(metaid_uri), None, None)): 

207 it_exists = True 

208 if triple[1] == GraphEntity.iri_family_name: 

209 family_name = str(triple[2]) 

210 elif triple[1] == GraphEntity.iri_given_name: 

211 given_name = str(triple[2]) 

212 elif triple[1] == GraphEntity.iri_name: 

213 name = str(triple[2]) 

214 elif triple[1] == GraphEntity.iri_has_identifier: 

215 identifier = triple[2] 

216 id_scheme = '' 

217 literal_value = '' 

218 for triple_inner in self.local_g.triples((identifier, None, None)): 

219 if triple_inner[1] == GraphEntity.iri_uses_identifier_scheme: 

220 id_scheme = str(triple_inner[2]).replace(GraphEntity.DATACITE, '') 

221 elif triple_inner[1] == GraphEntity.iri_has_literal_value: 

222 literal_value = str(triple_inner[2]) 

223 if id_scheme and literal_value: 

224 full_id = f'{id_scheme}:{literal_value}' 

225 identifiers.append((str(identifier).replace(self.base_iri + '/id/', ''), full_id)) 

226 

227 full_name = self._construct_full_name(name, family_name, given_name) 

228 

229 return full_name, identifiers, it_exists 

230 

231 def retrieve_ra_from_id(self, schema: str, value: str, publisher: bool) -> List[Tuple[str, str, list]]: 

232 ''' 

233 Given an identifier, it retrieves responsible agents associated with that identifier, related names and other identifiers MetaIDs and literal values. 

234 The output has the following format: :: 

235 

236 [(METAID, NAME, [(METAID_OF_THE_IDENTIFIER, LITERAL_VALUE)])] 

237 [('3309', 'American Medical Association (ama)', [('4274', 'crossref:10')])] 

238 

239 :params schema: an identifier schema 

240 :type schema: str 

241 :params value: an identifier literal value 

242 :type value: str 

243 :params publisher: True if the identifier is associated with a publisher, False otherwise. 

244 :type publisher: bool 

245 :returns List[Tuple[str, str, list]]: -- it returns a list of three elements tuples. The first element is the MetaID of a responsible agent associated with the input ID. The second element is the name of that responsible agent, if present. The third element is a list of MetaID-ID tuples related to identifiers associated with that responsible agent.  

246 ''' 

247 schema_uri = URIRef(GraphEntity.DATACITE + schema) 

248 value = value.replace('\\', '\\\\') 

249 result_list = [] 

250 identifier_uri = None 

251 

252 # Search for both string-typed and untyped literals 

253 for literal_value in [Literal(value, datatype=XSD.string), Literal(value)]: 

254 for starting_triple in self.local_g.triples((None, GraphEntity.iri_has_literal_value, literal_value)): 

255 for known_id_triple in self.local_g.triples((starting_triple[0], None, None)): 

256 if known_id_triple[1] == GraphEntity.iri_uses_identifier_scheme and known_id_triple[2] == schema_uri: 

257 identifier_uri = known_id_triple[0] 

258 break 

259 if identifier_uri: 

260 break 

261 if identifier_uri: 

262 metaid_id_list = [(identifier_uri.replace(f'{self.base_iri}/id/', ''), f'{schema}:{value}')] 

263 for triple in self.local_g.triples((None, GraphEntity.iri_has_identifier, identifier_uri)): 

264 name = '' 

265 family_name = '' 

266 given_name = '' 

267 res = triple[0] 

268 for res_triple in self.local_g.triples((res, None, None)): 

269 if res_triple[1] == GraphEntity.iri_name: 

270 name = str(res_triple[2]) 

271 elif res_triple[1] == GraphEntity.iri_family_name: 

272 family_name = str(res_triple[2]) 

273 elif res_triple[1] == GraphEntity.iri_given_name: 

274 given_name = str(res_triple[2]) 

275 elif res_triple[1] == GraphEntity.iri_has_identifier and res_triple[2] != identifier_uri: 

276 for id_triple in self.local_g.triples((res_triple[2], None, None)): 

277 if id_triple[1] == GraphEntity.iri_uses_identifier_scheme: 

278 id_schema = id_triple[2] 

279 elif id_triple[1] == GraphEntity.iri_has_literal_value: 

280 id_literal_value = id_triple[2] 

281 full_id = f'{id_schema.replace(GraphEntity.DATACITE, "")}:{id_literal_value}' 

282 metaid_id_tuple = (res_triple[2].replace(f'{self.base_iri}/id/', ''), full_id) 

283 metaid_id_list.append(metaid_id_tuple) 

284 

285 full_name = self._construct_full_name(name, family_name, given_name) 

286 result_list.append((res.replace(f'{self.base_iri}/ra/', ''), full_name, metaid_id_list)) 

287 

288 return result_list 

289 

290 def _construct_full_name(self, name: str, family_name: str, given_name: str) -> str: 

291 if name and not family_name and not given_name: 

292 return name 

293 elif not name and family_name and not given_name: 

294 return f'{family_name},' 

295 elif not name and not family_name and given_name: 

296 return f', {given_name}' 

297 elif not name and family_name and given_name: 

298 return f'{family_name}, {given_name}' 

299 else: 

300 return '' 

301 

302 def retrieve_ra_sequence_from_br_meta(self, metaid: str, col_name: str) -> List[Dict[str, tuple]]: 

303 ''' 

304 Given a bibliographic resource's MetaID and a field name, it returns its agent roles and responsible agents in the correct order according to the specified field. 

305 The output has the following format: :: 

306 [ 

307 {METAID_AR_1: (NAME_RA_1, [(METAID_ID_RA_1, LITERAL_VALUE_ID_RA_1)], METAID_RA_1)}, 

308 {METAID_AR_2: (NAME_RA_2, [(METAID_ID_RA_2, LITERAL_VALUE_ID_RA_2)], METAID_RA_2)}, 

309 {METAID_AR_N: (NAME_RA_N, [(METAID_ID_RA_N, LITERAL_VALUE_ID_RA_N)], METAID_RA_N)}, 

310 ] 

311 [ 

312 {'5343': ('Hodge, James G.', [], '3316')}, 

313 {'5344': ('Anderson, Evan D.', [], '3317')}, 

314 {'5345': ('Kirsch, Thomas D.', [], '3318')}, 

315 {'5346': ('Kelen, Gabor D.', [('4278', 'orcid:0000-0002-3236-8286')], '3319')} 

316 ] 

317 :params metaid: a MetaID 

318 :type meta_id: str 

319 :params col_name: a MetaID 

320 :type col_name: str 

321 :returns: List[Dict[str, tuple]] -- the output is a list of three-elements tuples. Each tuple's first and third elements are the MetaIDs of an agent role and responsible agent related to the specified bibliographic resource. The second element is a two-elements tuple, where the first element is the MetaID of the identifier of the responsible agent. In contrast, the second one is the literal value of that id. 

322 ''' 

323 if col_name == 'author': 

324 role = GraphEntity.iri_author 

325 elif col_name == 'editor': 

326 role = GraphEntity.iri_editor 

327 else: 

328 role = GraphEntity.iri_publisher 

329 

330 metaid_uri = URIRef(f'{self.base_iri}/br/{str(metaid)}') 

331 dict_ar = dict() 

332 

333 for triple in self.local_g.triples((metaid_uri, GraphEntity.iri_is_document_context_for, None)): 

334 for ar_triple in self.local_g.triples((triple[2], None, None)): 

335 if ar_triple[2] == role: 

336 role_value = str(triple[2]).replace(f'{self.base_iri}/ar/', '') 

337 next_role = '' 

338 ra = None 

339 for relevant_ar_triple in self.local_g.triples((triple[2], None, None)): 

340 if relevant_ar_triple[1] == GraphEntity.iri_has_next: 

341 next_role = str(relevant_ar_triple[2]).replace(f'{self.base_iri}/ar/', '') 

342 elif relevant_ar_triple[1] == GraphEntity.iri_is_held_by: 

343 ra = str(relevant_ar_triple[2]).replace(f'{self.base_iri}/ra/', '') 

344 # Skip AR if it has no associated RA (malformed data) 

345 if ra is not None: 

346 dict_ar[role_value] = {'next': next_role, 'ra': ra} 

347 

348 # Find the start_role by excluding all roles that are "next" for others from the set of all roles 

349 all_roles = set(dict_ar.keys()) 

350 roles_with_next = set(details['next'] for details in dict_ar.values() if details['next']) 

351 start_role_candidates = all_roles - roles_with_next 

352 

353 MAX_ITERATIONS = 10000 

354 

355 if len(all_roles) == 0: 

356 return [] 

357 

358 # If no start candidates (circular loop), pick the AR with lowest number as arbitrary start 

359 if len(start_role_candidates) == 0: 

360 sorted_ars = sorted(all_roles, key=lambda ar: get_resource_number(f'{self.base_iri}/ar/{ar}')) 

361 start_role_candidates = {sorted_ars[0]} 

362 

363 if len(start_role_candidates) != 1: 

364 # If more than one start candidate exists or none exist, build all chains and return the best one 

365 chains = [] 

366 for start_candidate in start_role_candidates: 

367 current_role = start_candidate 

368 chain = [] 

369 visited_roles = set() 

370 iteration_count = 0 

371 while current_role and current_role not in visited_roles and iteration_count < MAX_ITERATIONS: 

372 visited_roles.add(current_role) 

373 if current_role in dict_ar: 

374 ra_info = self.retrieve_ra_from_meta(dict_ar[current_role]['ra'])[0:2] 

375 ra_tuple = ra_info + (dict_ar[current_role]['ra'],) 

376 chain.append({current_role: ra_tuple}) 

377 current_role = dict_ar[current_role]['next'] 

378 else: 

379 break 

380 iteration_count += 1 

381 

382 if iteration_count == MAX_ITERATIONS: 

383 print(f"Warning: Possible infinite loop detected for BR: {metaid}, column: {col_name}") 

384 return [] 

385 

386 chains.append(chain) 

387 # Sort chains by length, then by the lowest sequential number of the starting role 

388 chains.sort(key=lambda chain: (-len(chain), get_resource_number(f'{self.base_iri}/ar/{list(chain[0].keys())[0]}'))) 

389 try: 

390 ordered_ar_list = chains[0] 

391 except Exception as e: 

392 print(f"\nWarning: Error processing BR: {metaid} for column: {col_name}") 

393 print(f"dict_ar: {dict_ar}") 

394 print(f"All roles: {all_roles}") 

395 print(f"Start role candidates: {start_role_candidates}") 

396 print(f"Roles with next: {roles_with_next}") 

397 print(f"Error: {str(e)}") 

398 return [] 

399 else: 

400 start_role = start_role_candidates.pop() 

401 # Follow the "next" chain from the start_role to construct an ordered list 

402 ordered_ar_list = [] 

403 current_role = start_role 

404 visited_roles = set() 

405 iteration_count = 0 

406 while current_role and current_role not in visited_roles and iteration_count < MAX_ITERATIONS: 

407 visited_roles.add(current_role) 

408 if current_role in dict_ar: 

409 ra_info = self.retrieve_ra_from_meta(dict_ar[current_role]['ra'])[0:2] 

410 ra_tuple = ra_info + (dict_ar[current_role]['ra'],) 

411 ordered_ar_list.append({current_role: ra_tuple}) 

412 current_role = dict_ar[current_role]['next'] 

413 else: 

414 break 

415 iteration_count += 1 

416 

417 if iteration_count == MAX_ITERATIONS: 

418 print(f"Warning: Possible infinite loop detected for BR: {metaid}, column: {col_name}") 

419 return [] 

420 

421 return ordered_ar_list 

422 

423 def retrieve_re_from_br_meta(self, metaid:str) -> Tuple[str, str]: 

424 ''' 

425 Given a bibliographic resource's MetaID, it returns its resource embodiment's MetaID and pages. 

426 The output has the following format: :: 

427 

428 (METAID, PAGES) 

429 ('2011', '391-397') 

430 

431 :params metaid: a bibliographic resource's MetaID 

432 :type meta_id: str 

433 :returns: Tuple[str, str] -- the output is a two-elements tuple, where the first element is the MetaID of the resource embodiment, and the second is a pages' interval.  

434 ''' 

435 metaid_uri = URIRef(f'{self.base_iri}/br/{str(metaid)}') 

436 re_uri = None 

437 starting_page = None 

438 ending_page = None 

439 for triple in self.local_g.triples((metaid_uri, GraphEntity.iri_embodiment, None)): 

440 re_uri = triple[2].replace(f'{self.base_iri}/re/', '') 

441 for re_triple in self.local_g.triples((triple[2], None, None)): 

442 if re_triple[1] == GraphEntity.iri_starting_page: 

443 starting_page = str(re_triple[2]) 

444 elif re_triple[1] == GraphEntity.iri_ending_page: 

445 ending_page = str(re_triple[2]) 

446 if re_uri: 

447 if starting_page and ending_page: 

448 pages = f'{starting_page}-{ending_page}' 

449 elif starting_page and not ending_page: 

450 pages = f'{starting_page}-{starting_page}' 

451 elif not starting_page and ending_page: 

452 pages = f'{ending_page}-{ending_page}' 

453 elif not starting_page and not ending_page: 

454 pages = '' 

455 return re_uri, pages 

456 

457 def retrieve_br_info_from_meta(self, metaid: str) -> dict: 

458 ''' 

459 Given a bibliographic resource's MetaID, it returns all the information about that resource. 

460 The output has the following format: :: 

461 

462 { 

463 'pub_date': PUB_DATE,  

464 'type': TYPE,  

465 'page': (METAID, PAGES),  

466 'issue': ISSUE,  

467 'volume': VOLUME,  

468 'venue': VENUE 

469 } 

470 { 

471 'pub_date': '2006-02-27',  

472 'type': 'journal article',  

473 'page': ('2011', '391-397'),  

474 'issue': '4',  

475 'volume': '166',  

476 'venue': 'Archives Of Internal Medicine [omid:br/4387]' 

477 } 

478 

479 :param metaid: a bibliographic resource's MetaID 

480 :type metaid: str 

481 :returns: dict -- the output is a dictionary including the publication date, type, page, issue, volume, and venue of the specified bibliographic resource. 

482 ''' 

483 

484 venue_iris = [ 

485 GraphEntity.iri_archival_document, 

486 GraphEntity.iri_journal, 

487 GraphEntity.iri_book, 

488 GraphEntity.iri_book_series, 

489 GraphEntity.iri_series, 

490 GraphEntity.iri_academic_proceedings, 

491 GraphEntity.iri_proceedings_series, 

492 GraphEntity.iri_reference_book, 

493 GraphEntity.iri_series, 

494 

495 GraphEntity.iri_expression 

496 ] 

497 

498 def extract_identifiers(entity_uri): 

499 identifiers = [f"omid:{entity_uri.replace(f'{self.base_iri}/', '')}"] 

500 for id_triple in self.local_g.triples((entity_uri, GraphEntity.iri_has_identifier, None)): 

501 id_obj = id_triple[2] 

502 scheme = value = None 

503 for detail_triple in self.local_g.triples((id_obj, None, None)): 

504 if detail_triple[1] == GraphEntity.iri_uses_identifier_scheme: 

505 scheme = str(detail_triple[2]) 

506 elif detail_triple[1] == GraphEntity.iri_has_literal_value: 

507 value = str(detail_triple[2]) 

508 if scheme and value: 

509 scheme = scheme.replace(GraphEntity.DATACITE, '') 

510 identifiers.append(f"{scheme}:{value}") 

511 return identifiers 

512 

513 metaid = str(metaid) 

514 metaid_uri = URIRef(f'{self.base_iri}/br/{metaid}') if self.base_iri not in metaid else URIRef(metaid) 

515 res_dict = { 

516 'pub_date': '', 

517 'type': '', 

518 'page': self.retrieve_re_from_br_meta(metaid), 

519 'issue': '', 

520 'volume': '', 

521 'venue': '' 

522 } 

523 

524 for triple in self.local_g.triples((metaid_uri, None, None)): 

525 predicate, obj = triple[1], triple[2] 

526 

527 if predicate == GraphEntity.iri_has_publication_date: 

528 res_dict['pub_date'] = str(obj) 

529 elif predicate == RDF.type and obj != GraphEntity.iri_expression: 

530 res_dict['type'] = self._type_it(obj) 

531 elif predicate == GraphEntity.iri_has_sequence_identifier: 

532 for inner_triple in self.local_g.triples((metaid_uri, None, None)): 

533 inner_obj = inner_triple[2] 

534 if inner_obj == GraphEntity.iri_journal_issue: 

535 res_dict['issue'] = str(triple[2]) 

536 elif inner_obj == GraphEntity.iri_journal_volume: 

537 res_dict['volume'] = str(triple[2]) 

538 elif predicate == GraphEntity.iri_part_of: 

539 for vvi_triple in self.local_g.triples((obj, None, None)): 

540 vvi_obj = vvi_triple[2] 

541 if vvi_obj == GraphEntity.iri_journal_issue: 

542 for inner_vvi_triple in self.local_g.triples((obj, None, None)): 

543 if inner_vvi_triple[1] == GraphEntity.iri_has_sequence_identifier: 

544 res_dict['issue'] = str(inner_vvi_triple[2]) 

545 elif vvi_obj == GraphEntity.iri_journal_volume: 

546 for inner_vvi_triple in self.local_g.triples((obj, None, None)): 

547 if inner_vvi_triple[1] == GraphEntity.iri_has_sequence_identifier: 

548 res_dict['volume'] = str(inner_vvi_triple[2]) 

549 elif vvi_obj in venue_iris: 

550 for inner_vvi_triple in self.local_g.triples((obj, None, None)): 

551 if inner_vvi_triple[1] == GraphEntity.iri_title: 

552 venue_title = str(inner_vvi_triple[2]) 

553 venue_ids = extract_identifiers(obj) 

554 res_dict['venue'] = f"{venue_title} [{' '.join(venue_ids)}]" 

555 

556 if vvi_triple[1] == GraphEntity.iri_part_of: 

557 for vi_triple in self.local_g.triples((vvi_obj, None, None)): 

558 vi_obj = vi_triple[2] 

559 if vi_obj == GraphEntity.iri_journal_volume: 

560 for inner_vvi_triple in self.local_g.triples((vvi_obj, None, None)): 

561 if inner_vvi_triple[1] == GraphEntity.iri_has_sequence_identifier: 

562 res_dict['volume'] = str(inner_vvi_triple[2]) 

563 elif vi_obj in venue_iris: 

564 for inner_vvi_triple in self.local_g.triples((vvi_obj, None, None)): 

565 if inner_vvi_triple[1] == GraphEntity.iri_title: 

566 venue_title = str(inner_vvi_triple[2]) 

567 venue_ids = extract_identifiers(vvi_obj) 

568 res_dict['venue'] = f"{venue_title} [{' '.join(venue_ids)}]" 

569 

570 if vi_triple[1] == GraphEntity.iri_part_of: 

571 for venue_triple in self.local_g.triples((vi_obj, None, None)): 

572 if venue_triple[1] == GraphEntity.iri_title: 

573 venue_title = str(venue_triple[2]) 

574 venue_ids = extract_identifiers(vi_obj) 

575 res_dict['venue'] = f"{venue_title} [{' '.join(venue_ids)}]" 

576 return res_dict 

577 

578 @staticmethod 

579 def _type_it(br_type: URIRef) -> str: 

580 output_type = '' 

581 if br_type == GraphEntity.iri_archival_document: 

582 output_type = 'archival document' 

583 if br_type == GraphEntity.iri_book: 

584 output_type = 'book' 

585 if br_type == GraphEntity.iri_book_chapter: 

586 output_type = 'book chapter' 

587 if br_type == GraphEntity.iri_part: 

588 output_type = 'book part' 

589 if br_type == GraphEntity.iri_expression_collection: 

590 output_type = 'book section' 

591 if br_type == GraphEntity.iri_book_series: 

592 output_type = 'book series' 

593 if br_type == GraphEntity.iri_book_set: 

594 output_type = 'book set' 

595 if br_type == GraphEntity.iri_data_file: 

596 output_type = 'data file' 

597 if br_type == GraphEntity.iri_thesis: 

598 output_type = 'dissertation' 

599 if br_type == GraphEntity.iri_journal: 

600 output_type = 'journal' 

601 if br_type == GraphEntity.iri_journal_article: 

602 output_type = 'journal article' 

603 if br_type == GraphEntity.iri_journal_issue: 

604 output_type = 'journal issue' 

605 if br_type == GraphEntity.iri_journal_volume: 

606 output_type = 'journal volume' 

607 if br_type == GraphEntity.iri_proceedings_paper: 

608 output_type = 'proceedings article' 

609 if br_type == GraphEntity.iri_academic_proceedings: 

610 output_type = 'proceedings' 

611 if br_type == GraphEntity.iri_reference_book: 

612 output_type = 'reference book' 

613 if br_type == GraphEntity.iri_reference_entry: 

614 output_type = 'reference entry' 

615 if br_type == GraphEntity.iri_series: 

616 output_type = 'series' 

617 if br_type == GraphEntity.iri_report_document: 

618 output_type = 'report' 

619 if br_type == GraphEntity.iri_specification_document: 

620 output_type = 'standard' 

621 return output_type 

622 

623 def retrieve_publisher_from_br_metaid(self, metaid:str): 

624 metaid_uri = URIRef(f'{self.base_iri}/br/{metaid}') 

625 publishers = set() 

626 for triple in self.local_g.triples((metaid_uri, None, None)): 

627 if triple[1] == GraphEntity.iri_is_document_context_for: 

628 for document_triple in self.local_g.triples((triple[2], None, None)): 

629 if document_triple[2] == GraphEntity.iri_publisher: 

630 publishers.add(triple[2]) 

631 elif triple[1] == GraphEntity.iri_part_of: 

632 for inner_triple in self.local_g.triples((triple[2], None, None)): 

633 if inner_triple[1] == GraphEntity.iri_is_document_context_for: 

634 for document_triple in self.local_g.triples((inner_triple[2], None, None)): 

635 if document_triple[2] == GraphEntity.iri_publisher: 

636 publishers.add(inner_triple[2]) 

637 elif inner_triple[1] == GraphEntity.iri_part_of: 

638 for inner_inner_triple in self.local_g.triples((inner_triple[2], None, None)): 

639 if inner_inner_triple[1] == GraphEntity.iri_is_document_context_for: 

640 for document_triple in self.local_g.triples((inner_inner_triple[2], None, None)): 

641 if document_triple[2] == GraphEntity.iri_publisher: 

642 publishers.add(inner_inner_triple[2]) 

643 publishers_output = [] 

644 for publisher_uri in publishers: 

645 pub_identifiers = [] 

646 pub_name = None 

647 for triple in self.local_g.triples((publisher_uri, None, None)): 

648 if triple[1] == GraphEntity.iri_is_held_by: 

649 pub_metaid = triple[2].replace(f'{self.base_iri}/', 'omid:') 

650 pub_identifiers.append(pub_metaid) 

651 for ra_triple in self.local_g.triples((triple[2], None, None)): 

652 pub_schema = None 

653 pub_literal = None 

654 if ra_triple[1] == GraphEntity.iri_name: 

655 pub_name = ra_triple[2] 

656 elif ra_triple[1] == GraphEntity.iri_has_identifier: 

657 for id_triple in self.local_g.triples((ra_triple[2], None, None)): 

658 if id_triple[1] == GraphEntity.iri_uses_identifier_scheme: 

659 pub_schema = id_triple[2].replace(f'{str(GraphEntity.DATACITE)}', '') 

660 elif id_triple[1] == GraphEntity.iri_has_literal_value: 

661 pub_literal = id_triple[2] 

662 if pub_schema is not None and pub_literal is not None: 

663 pub_id = f'{pub_schema}:{pub_literal}' 

664 pub_identifiers.append(pub_id) 

665 if pub_name is not None: 

666 pub_full = f'{pub_name} [{" ".join(pub_identifiers)}]' 

667 else: 

668 pub_full = f'[{" ".join(pub_identifiers)}]' 

669 publishers_output.append(pub_full) 

670 return '; '.join(publishers_output) 

671 

672 def get_everything_about_res(self, metavals: set, identifiers: set, vvis: set, max_depth: int = 10) -> None: 

673 BATCH_SIZE = 10 

674 MAX_WORKERS = 1 

675 

676 def batch_process(input_set, batch_size): 

677 """Generator to split input data into smaller batches if batch_size is not None.""" 

678 if batch_size is None: 

679 yield input_set 

680 else: 

681 for i in range(0, len(input_set), batch_size): 

682 yield input_set[i:i + batch_size] 

683 

684 def process_batch_parallel(subjects, cur_depth, visited_subjects): 

685 """Process batches of subjects in parallel up to the specified depth.""" 

686 if not subjects or (max_depth and cur_depth > max_depth): 

687 return 

688 

689 new_subjects = subjects - visited_subjects 

690 if not new_subjects: 

691 return 

692 

693 visited_subjects.update(new_subjects) 

694 

695 subject_list = list(new_subjects) 

696 batches = list(batch_process(subject_list, BATCH_SIZE)) 

697 batch_queries = [] 

698 ts_url = self.ts_url 

699 

700 for batch in batches: 

701 query = f''' 

702 SELECT ?s ?p ?o 

703 WHERE {{ 

704 VALUES ?s {{ {' '.join([f"<{s}>" for s in batch])} }} 

705 ?s ?p ?o. 

706 }}''' 

707 batch_queries.append(query) 

708 

709 next_subjects = set() 

710 if len(batch_queries) > 1 and MAX_WORKERS > 1: 

711 queries_per_worker = max(1, len(batch_queries) // MAX_WORKERS) 

712 grouped_queries = [] 

713 for i in range(0, len(batch_queries), queries_per_worker): 

714 grouped_queries.append((ts_url, batch_queries[i:i + queries_per_worker])) 

715 with ProcessPoolExecutor( 

716 max_workers=min(len(grouped_queries), MAX_WORKERS), 

717 mp_context=multiprocessing.get_context('spawn') 

718 ) as executor: 

719 grouped_results = list(executor.map(_execute_sparql_queries, grouped_queries)) 

720 results = [item for sublist in grouped_results for item in sublist] 

721 else: 

722 results = _execute_sparql_queries((ts_url, batch_queries)) if batch_queries else [] 

723 

724 for result in results: 

725 for row in result: 

726 s = URIRef(row['s']['value']) 

727 p = URIRef(row['p']['value']) 

728 o = row['o']['value'] 

729 o_type = row['o']['type'] 

730 o_datatype = URIRef(row['o']['datatype']) if 'datatype' in row['o'] else None 

731 o = URIRef(o) if o_type == 'uri' else Literal(lexical_or_value=o, datatype=o_datatype) 

732 self.local_g.add((s, p, o)) 

733 if s not in self.prebuilt_subgraphs: 

734 self.prebuilt_subgraphs[s] = Graph() 

735 self.prebuilt_subgraphs[s].add((s, p, o)) 

736 if isinstance(o, URIRef) and p not in {RDF.type, GraphEntity.iri_with_role, GraphEntity.iri_uses_identifier_scheme}: 

737 next_subjects.add(str(o)) 

738 

739 process_batch_parallel(next_subjects, cur_depth + 1, visited_subjects) 

740 

741 def get_initial_subjects_from_metavals(metavals): 

742 """Convert metavals to a set of subjects.""" 

743 return {f"{self.base_iri}/{mid.replace('omid:', '')}" for mid in metavals} 

744 

745 def get_initial_subjects_from_identifiers(identifiers): 

746 """Convert identifiers to a set of subjects based on batch queries executed in parallel. 

747 

748 Returns: 

749 tuple: (subjects set, id_to_subjects mapping) 

750 - subjects: set of subject URIs found 

751 - id_to_subjects: dict mapping identifier string to set of subject URIs 

752 """ 

753 subjects = set() 

754 id_to_subjects = {} 

755 ts_url = self.ts_url 

756 batches = list(batch_process(list(identifiers), BATCH_SIZE)) 

757 

758 if not batches: 

759 return subjects, id_to_subjects 

760 

761 batch_queries = [] 

762 for batch in batches: 

763 if not batch: 

764 continue 

765 

766 if self.virtuoso_full_text_search: 

767 union_blocks = [] 

768 for identifier in batch: 

769 scheme, literal = identifier.split(':', maxsplit=1)[0], identifier.split(':', maxsplit=1)[1] 

770 escaped_literal = literal.replace('\\', '\\\\').replace('"', '\\"') 

771 union_blocks.append(f""" 

772 {{ 

773 {{ 

774 ?id <{GraphEntity.iri_has_literal_value}> "{escaped_literal}" . 

775 }} 

776 UNION 

777 {{ 

778 ?id <{GraphEntity.iri_has_literal_value}> "{escaped_literal}"^^<{XSD.string}> . 

779 }} 

780 ?id <{GraphEntity.iri_uses_identifier_scheme}> <{GraphEntity.DATACITE + scheme}> . 

781 ?s <{GraphEntity.iri_has_identifier}> ?id . 

782 BIND("{scheme}" AS ?schemeLabel) 

783 BIND("{escaped_literal}" AS ?literalLabel) 

784 }} 

785 """) 

786 union_query = " UNION ".join(union_blocks) 

787 query = f''' 

788 SELECT ?s ?schemeLabel ?literalLabel WHERE {{ 

789 {union_query} 

790 }} 

791 ''' 

792 batch_queries.append(query) 

793 else: 

794 identifiers_values = [] 

795 for identifier in batch: 

796 scheme, literal = identifier.split(':', maxsplit=1)[0], identifier.split(':', maxsplit=1)[1] 

797 escaped_literal = literal.replace('\\', '\\\\').replace('"', '\\"') 

798 identifiers_values.append(f"(<{GraphEntity.DATACITE + scheme}> \"{escaped_literal}\")") 

799 identifiers_values_str = " ".join(identifiers_values) 

800 query = f''' 

801 SELECT DISTINCT ?s ?scheme ?literal WHERE {{ 

802 VALUES (?scheme ?literal) {{ {identifiers_values_str} }} 

803 ?id <{GraphEntity.iri_uses_identifier_scheme}> ?scheme . 

804 ?id <{GraphEntity.iri_has_literal_value}> ?literalValue . 

805 FILTER(str(?literalValue) = str(?literal)) 

806 ?s <{GraphEntity.iri_has_identifier}> ?id . 

807 }} 

808 ''' 

809 batch_queries.append(query) 

810 

811 if len(batch_queries) > 1 and MAX_WORKERS > 1: 

812 queries_per_worker = max(1, len(batch_queries) // MAX_WORKERS) 

813 grouped_queries = [] 

814 for i in range(0, len(batch_queries), queries_per_worker): 

815 grouped_queries.append((ts_url, batch_queries[i:i + queries_per_worker])) 

816 with ProcessPoolExecutor( 

817 max_workers=min(len(grouped_queries), MAX_WORKERS), 

818 mp_context=multiprocessing.get_context('spawn') 

819 ) as executor: 

820 grouped_results = list(executor.map(_execute_sparql_queries, grouped_queries)) 

821 results = [item for sublist in grouped_results for item in sublist] 

822 elif batch_queries: 

823 results = _execute_sparql_queries((ts_url, batch_queries)) 

824 else: 

825 results = [] 

826 

827 for result in results: 

828 for row in result: 

829 subject = str(row['s']['value']) 

830 subjects.add(subject) 

831 if 'schemeLabel' in row: 

832 scheme = str(row['schemeLabel']['value']) 

833 literal = str(row['literalLabel']['value']) 

834 else: 

835 scheme = str(row['scheme']['value']).replace(str(GraphEntity.DATACITE), '') 

836 literal = str(row['literal']['value']) 

837 identifier = f"{scheme}:{literal}" 

838 if identifier not in id_to_subjects: 

839 id_to_subjects[identifier] = set() 

840 id_to_subjects[identifier].add(subject) 

841 

842 return subjects, id_to_subjects 

843 

844 def get_initial_subjects_from_vvis(vvis): 

845 """Convert vvis to a set of subjects based on batch queries executed in parallel.""" 

846 subjects = set() 

847 ts_url = self.ts_url 

848 vvi_queries = [] 

849 venue_uris_to_add = set() 

850 

851 # First pass: collect all venue IDs and prepare queries 

852 all_venue_ids = set() 

853 for volume, issue, venue_metaid, venue_ids_tuple in vvis: 

854 if venue_ids_tuple: 

855 all_venue_ids.update(venue_ids_tuple) 

856 

857 # Get venue subjects from identifiers with mapping 

858 venue_id_to_uris = {} 

859 if all_venue_ids: 

860 venue_id_subjects, venue_id_to_uris = get_initial_subjects_from_identifiers(all_venue_ids) 

861 subjects.update(venue_id_subjects) 

862 

863 # Second pass: prepare VVI queries 

864 for volume, issue, venue_metaid, venue_ids_tuple in vvis: 

865 venues_to_search = set() 

866 

867 if venue_metaid: 

868 venues_to_search.add(venue_metaid) 

869 

870 if venue_ids_tuple: 

871 # Convert venue URIs to metaid format for VVI search 

872 # Only use venues matching THIS tuple's identifiers 

873 for venue_id in venue_ids_tuple: 

874 if venue_id in venue_id_to_uris: 

875 for venue_uri in venue_id_to_uris[venue_id]: 

876 if '/br/' in venue_uri: 

877 metaid = venue_uri.replace(f'{self.base_iri}/br/', '') 

878 venues_to_search.add(f"omid:br/{metaid}") 

879 

880 # Prepare VVI queries for each venue 

881 for venue_metaid_to_search in venues_to_search: 

882 venue_uri = f"{self.base_iri}/{venue_metaid_to_search.replace('omid:', '')}" 

883 sequence_value = issue if issue else volume 

884 if not sequence_value: 

885 continue 

886 escaped_sequence = sequence_value.replace('\\', '\\\\').replace('"', '\\"') 

887 

888 if issue: 

889 if volume: 

890 escaped_volume = volume.replace('\\', '\\\\').replace('"', '\\"') 

891 query = f''' 

892 SELECT ?s WHERE {{ 

893 {{ 

894 ?volume a <{GraphEntity.iri_journal_volume}> ; 

895 <{GraphEntity.iri_part_of}> <{venue_uri}> ; 

896 <{GraphEntity.iri_has_sequence_identifier}> "{escaped_volume}" . 

897 ?s a <{GraphEntity.iri_journal_issue}> ; 

898 <{GraphEntity.iri_part_of}> ?volume ; 

899 <{GraphEntity.iri_has_sequence_identifier}> "{escaped_sequence}" . 

900 }} 

901 UNION 

902 {{ 

903 ?volume a <{GraphEntity.iri_journal_volume}> ; 

904 <{GraphEntity.iri_part_of}> <{venue_uri}> ; 

905 <{GraphEntity.iri_has_sequence_identifier}> "{escaped_volume}"^^<{XSD.string}> . 

906 ?s a <{GraphEntity.iri_journal_issue}> ; 

907 <{GraphEntity.iri_part_of}> ?volume ; 

908 <{GraphEntity.iri_has_sequence_identifier}> "{escaped_sequence}" . 

909 }} 

910 UNION 

911 {{ 

912 ?volume a <{GraphEntity.iri_journal_volume}> ; 

913 <{GraphEntity.iri_part_of}> <{venue_uri}> ; 

914 <{GraphEntity.iri_has_sequence_identifier}> "{escaped_volume}" . 

915 ?s a <{GraphEntity.iri_journal_issue}> ; 

916 <{GraphEntity.iri_part_of}> ?volume ; 

917 <{GraphEntity.iri_has_sequence_identifier}> "{escaped_sequence}"^^<{XSD.string}> . 

918 }} 

919 UNION 

920 {{ 

921 ?volume a <{GraphEntity.iri_journal_volume}> ; 

922 <{GraphEntity.iri_part_of}> <{venue_uri}> ; 

923 <{GraphEntity.iri_has_sequence_identifier}> "{escaped_volume}"^^<{XSD.string}> . 

924 ?s a <{GraphEntity.iri_journal_issue}> ; 

925 <{GraphEntity.iri_part_of}> ?volume ; 

926 <{GraphEntity.iri_has_sequence_identifier}> "{escaped_sequence}"^^<{XSD.string}> . 

927 }} 

928 }} 

929 ''' 

930 else: 

931 query = f''' 

932 SELECT ?s WHERE {{ 

933 {{ 

934 ?s a <{GraphEntity.iri_journal_issue}> ; 

935 <{GraphEntity.iri_part_of}> <{venue_uri}> ; 

936 <{GraphEntity.iri_has_sequence_identifier}> "{escaped_sequence}" . 

937 }} 

938 UNION 

939 {{ 

940 ?s a <{GraphEntity.iri_journal_issue}> ; 

941 <{GraphEntity.iri_part_of}> <{venue_uri}> ; 

942 <{GraphEntity.iri_has_sequence_identifier}> "{escaped_sequence}"^^<{XSD.string}> . 

943 }} 

944 }} 

945 ''' 

946 else: 

947 if volume: 

948 query = f''' 

949 SELECT ?s WHERE {{ 

950 {{ 

951 ?s a <{GraphEntity.iri_journal_volume}> ; 

952 <{GraphEntity.iri_part_of}> <{venue_uri}> ; 

953 <{GraphEntity.iri_has_sequence_identifier}> "{escaped_sequence}" . 

954 }} 

955 UNION 

956 {{ 

957 ?s a <{GraphEntity.iri_journal_volume}> ; 

958 <{GraphEntity.iri_part_of}> <{venue_uri}> ; 

959 <{GraphEntity.iri_has_sequence_identifier}> "{escaped_sequence}"^^<{XSD.string}> . 

960 }} 

961 }} 

962 ''' 

963 else: 

964 continue 

965 

966 vvi_queries.append(query) 

967 venue_uris_to_add.add(venue_uri) 

968 

969 # Execute VVI queries in parallel 

970 if len(vvi_queries) > 1 and MAX_WORKERS > 1: 

971 queries_per_worker = max(1, len(vvi_queries) // MAX_WORKERS) 

972 grouped_queries = [] 

973 for i in range(0, len(vvi_queries), queries_per_worker): 

974 grouped_queries.append((ts_url, vvi_queries[i:i + queries_per_worker])) 

975 with ProcessPoolExecutor( 

976 max_workers=min(len(grouped_queries), MAX_WORKERS), 

977 mp_context=multiprocessing.get_context('spawn') 

978 ) as executor: 

979 grouped_results = list(executor.map(_execute_sparql_queries, grouped_queries)) 

980 results = [item for sublist in grouped_results for item in sublist] 

981 elif vvi_queries: 

982 results = _execute_sparql_queries((ts_url, vvi_queries)) 

983 else: 

984 results = [] 

985 

986 for result in results: 

987 for row in result: 

988 subjects.add(str(row['s']['value'])) 

989 

990 subjects.update(venue_uris_to_add) 

991 

992 return subjects 

993 

994 initial_subjects = set() 

995 

996 if metavals: 

997 initial_subjects.update(get_initial_subjects_from_metavals(metavals)) 

998 

999 if identifiers: 

1000 id_subjects, _ = get_initial_subjects_from_identifiers(identifiers) 

1001 initial_subjects.update(id_subjects) 

1002 

1003 if vvis: 

1004 initial_subjects.update(get_initial_subjects_from_vvis(vvis)) 

1005 

1006 visited_subjects = set() 

1007 process_batch_parallel(initial_subjects, 0, visited_subjects) 

1008 

1009 def get_subgraph(self, res: str) -> Graph|None: 

1010 if res in self.prebuilt_subgraphs: 

1011 return self.prebuilt_subgraphs[res] 

1012 return None 

1013 

1014 def retrieve_venue_from_local_graph(self, meta_id: str) -> Dict[str, Dict[str, str]]: 

1015 """ 

1016 Retrieve venue VVI structure from local graph instead of querying triplestore. 

1017  

1018 :params meta_id: a MetaID 

1019 :type meta_id: str 

1020 :returns: Dict[str, Dict[str, str]] -- the venue structure with volumes and issues 

1021 """ 

1022 content = { 

1023 'issue': {}, 

1024 'volume': {} 

1025 } 

1026 

1027 volumes = {} 

1028 venue_uri = URIRef(f'{self.base_iri}/br/{meta_id}') 

1029 

1030 # Find all volumes directly part of this venue 

1031 for triple in self.local_g.triples((None, RDF.type, GraphEntity.iri_journal_volume)): 

1032 entity = triple[0] 

1033 # Check if this volume is part of our venue 

1034 for part_triple in self.local_g.triples((entity, GraphEntity.iri_part_of, venue_uri)): 

1035 entity_id = str(entity).replace(f'{self.base_iri}/br/', '') 

1036 for seq_triple in self.local_g.triples((entity, GraphEntity.iri_has_sequence_identifier, None)): 

1037 seq = str(seq_triple[2]) 

1038 volumes[entity_id] = seq 

1039 content['volume'][seq] = { 

1040 'id': entity_id, 

1041 'issue': {} 

1042 } 

1043 

1044 # Find all issues 

1045 for triple in self.local_g.triples((None, RDF.type, GraphEntity.iri_journal_issue)): 

1046 entity = triple[0] 

1047 entity_id = str(entity).replace(f'{self.base_iri}/br/', '') 

1048 seq = None 

1049 container = None 

1050 

1051 # Get sequence identifier 

1052 for seq_triple in self.local_g.triples((entity, GraphEntity.iri_has_sequence_identifier, None)): 

1053 seq = str(seq_triple[2]) 

1054 

1055 # Get container (could be venue or volume) 

1056 for container_triple in self.local_g.triples((entity, GraphEntity.iri_part_of, None)): 

1057 container = str(container_triple[2]) 

1058 

1059 if seq: 

1060 if container: 

1061 container_id = container.replace(f'{self.base_iri}/br/', '') 

1062 # Check if container is a volume of our venue 

1063 if container_id in volumes: 

1064 volume_seq = volumes[container_id] 

1065 content['volume'][volume_seq]['issue'][seq] = {'id': entity_id} 

1066 # Check if container is directly our venue 

1067 elif container == str(venue_uri): 

1068 content['issue'][seq] = {'id': entity_id} 

1069 

1070 return content