Coverage for oc_ocdm/reader.py: 76%

222 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-05-30 22:05 +0000

1#!/usr/bin/python 

2# -*- coding: utf-8 -*- 

3# Copyright (c) 2016, Silvio Peroni <essepuntato@gmail.com> 

4# 

5# Permission to use, copy, modify, and/or distribute this software for any purpose 

6# with or without fee is hereby granted, provided that the above copyright notice 

7# and this permission notice appear in all copies. 

8# 

9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, 

12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, 

13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 

14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS 

15# SOFTWARE. 

16from __future__ import annotations 

17 

18import json 

19import os 

20import time 

21from typing import TYPE_CHECKING 

22from zipfile import ZipFile 

23 

24from oc_ocdm.graph.graph_entity import GraphEntity 

25from oc_ocdm.support.reporter import Reporter 

26from oc_ocdm.support.support import build_graph_from_results 

27from rdflib import RDF, ConjunctiveGraph, Graph, URIRef 

28from SPARQLWrapper import JSON, POST, SPARQLWrapper 

29 

30if TYPE_CHECKING: 

31 from typing import Any, Dict, List, Optional, Set 

32 from oc_ocdm.graph.graph_set import GraphSet 

33 

34from pyshacl import validate 

35 

36 

37class Reader(object): 

38 

39 def __init__(self, repok: Reporter = None, reperr: Reporter = None, context_map: Dict[str, Any] = None) -> None: 

40 

41 if context_map is not None: 

42 self.context_map: Dict[str, Any] = context_map 

43 else: 

44 self.context_map: Dict[str, Any] = {} 

45 for context_url in self.context_map: 

46 ctx_file_path: Any = self.context_map[context_url] 

47 if type(ctx_file_path) == str and os.path.isfile(ctx_file_path): 

48 # This expensive operation is done only when it's really needed 

49 with open(ctx_file_path, 'rt', encoding='utf-8') as ctx_f: 

50 self.context_map[context_url] = json.load(ctx_f) 

51 

52 if repok is None: 

53 self.repok: Reporter = Reporter(prefix="[Reader: INFO] ") 

54 else: 

55 self.repok: Reporter = repok 

56 

57 if reperr is None: 

58 self.reperr: Reporter = Reporter(prefix="[Reader: ERROR] ") 

59 else: 

60 self.reperr: Reporter = reperr 

61 

62 def load(self, rdf_file_path: str) -> Optional[ConjunctiveGraph]: 

63 self.repok.new_article() 

64 self.reperr.new_article() 

65 

66 loaded_graph: Optional[ConjunctiveGraph] = None 

67 if os.path.isfile(rdf_file_path): 

68 

69 try: 

70 loaded_graph = self._load_graph(rdf_file_path) 

71 except Exception as e: 

72 self.reperr.add_sentence("[1] " 

73 "It was impossible to handle the format used for " 

74 "storing the file (stored in the temporary path) " 

75 f"'{rdf_file_path}'. Additional details: {e}") 

76 else: 

77 self.reperr.add_sentence("[2] " 

78 f"The file specified ('{rdf_file_path}') doesn't exist.") 

79 

80 return loaded_graph 

81 

82 def _load_graph(self, file_path: str) -> ConjunctiveGraph: 

83 formats = ["json-ld", "rdfxml", "turtle", "trig", "nt11", "nquads"] 

84 loaded_graph = ConjunctiveGraph() 

85 

86 if file_path.endswith('.zip'): 

87 try: 

88 with ZipFile(file=file_path, mode="r") as archive: 

89 for zf_name in archive.namelist(): 

90 with archive.open(zf_name) as f: 

91 if self._try_parse(loaded_graph, f, formats): 

92 return loaded_graph 

93 except Exception as e: 

94 raise IOError(f"Error opening or reading zip file '{file_path}': {e}") 

95 else: 

96 try: 

97 with open(file_path, 'rt', encoding='utf-8') as f: 

98 if self._try_parse(loaded_graph, f, formats): 

99 return loaded_graph 

100 except Exception as e: 

101 raise IOError(f"Error opening or reading file '{file_path}': {e}") 

102 

103 raise IOError(f"It was impossible to load the file '{file_path}' with supported formats.") 

104 

105 def _try_parse(self, graph: ConjunctiveGraph, file_obj, formats: List[str]) -> bool: 

106 for cur_format in formats: 

107 file_obj.seek(0) # Reset file pointer to the beginning for each new attempt 

108 try: 

109 if cur_format == "json-ld": 

110 json_ld_file = json.load(file_obj) 

111 if isinstance(json_ld_file, dict): 

112 json_ld_file = [json_ld_file] 

113 for json_ld_resource in json_ld_file: 

114 if "@context" in json_ld_resource and json_ld_resource["@context"] in self.context_map: 

115 json_ld_resource["@context"] = self.context_map[json_ld_resource["@context"]]["@context"] 

116 data = json.dumps(json_ld_file, ensure_ascii=False) 

117 graph.parse(data=data, format=cur_format) 

118 else: 

119 graph.parse(file=file_obj, format=cur_format) 

120 return True # Success, no need to try other formats 

121 except Exception as e: 

122 continue # Try the next format 

123 return False # None of the formats succeeded 

124 

125 @staticmethod 

126 def get_graph_from_subject(graph: Graph, subject: URIRef) -> Graph: 

127 g: Graph = Graph(identifier=graph.identifier) 

128 for p, o in graph.predicate_objects(subject, unique=True): 

129 g.add((subject, p, o)) 

130 return g 

131 

132 @staticmethod 

133 def _extract_subjects(graph: Graph) -> Set[URIRef]: 

134 subjects: Set[URIRef] = set() 

135 for s in graph.subjects(unique=True): 

136 subjects.add(s) 

137 return subjects 

138 

139 def graph_validation(self, graph: Graph, closed: bool = False) -> Graph: 

140 valid_graph: Graph = Graph(identifier=graph.identifier) 

141 sg = Graph() 

142 if closed: 

143 sg.parse(os.path.join('oc_ocdm', 'resources', 'shacle_closed.ttl')) 

144 else: 

145 sg.parse(os.path.join('oc_ocdm', 'resources', 'shacle.ttl')) 

146 _, report_g, _ = validate(graph, 

147 shacl_graph=sg, 

148 ont_graph=None, 

149 inference=None, 

150 abort_on_first=False, 

151 allow_infos=False, 

152 allow_warnings=False, 

153 meta_shacl=False, 

154 advanced=False, 

155 js=False, 

156 debug=False) 

157 invalid_nodes = set() 

158 for triple in report_g.triples((None, URIRef('http://www.w3.org/ns/shacl#focusNode'), None)): 

159 invalid_nodes.add(triple[2]) 

160 for subject in self._extract_subjects(graph): 

161 if subject not in invalid_nodes: 

162 for valid_subject_triple in graph.triples((subject, None, None)): 

163 valid_graph.add(valid_subject_triple) 

164 return valid_graph 

165 

166 @staticmethod 

167 def import_entities_from_graph(g_set: GraphSet, results: List[Dict]|Graph, resp_agent: str, 

168 enable_validation: bool = False, closed: bool = False) -> List[GraphEntity]: 

169 graph = build_graph_from_results(results) if isinstance(results, list) else results 

170 if enable_validation: 

171 reader = Reader() 

172 graph = reader.graph_validation(graph, closed) 

173 imported_entities: List[GraphEntity] = [] 

174 for subject in Reader._extract_subjects(graph): 

175 types = [] 

176 for o in graph.objects(subject, RDF.type): 

177 types.append(o) 

178 # ReferenceAnnotation 

179 if GraphEntity.iri_note in types: 

180 imported_entities.append(g_set.add_an(resp_agent=resp_agent, res=subject, 

181 preexisting_graph=Reader.get_graph_from_subject(graph, subject))) 

182 # AgentRole 

183 elif GraphEntity.iri_role_in_time in types: 

184 imported_entities.append(g_set.add_ar(resp_agent=resp_agent, res=subject, 

185 preexisting_graph=Reader.get_graph_from_subject(graph, subject))) 

186 # BibliographicReference 

187 elif GraphEntity.iri_bibliographic_reference in types: 

188 imported_entities.append(g_set.add_be(resp_agent=resp_agent, res=subject, 

189 preexisting_graph=Reader.get_graph_from_subject(graph, subject))) 

190 # BibliographicResource 

191 elif GraphEntity.iri_expression in types: 

192 imported_entities.append(g_set.add_br(resp_agent=resp_agent, res=subject, 

193 preexisting_graph=Reader.get_graph_from_subject(graph, subject))) 

194 # Citation 

195 elif GraphEntity.iri_citation in types: 

196 imported_entities.append(g_set.add_ci(resp_agent=resp_agent, res=subject, 

197 preexisting_graph=Reader.get_graph_from_subject(graph, subject))) 

198 # DiscourseElement 

199 elif GraphEntity.iri_discourse_element in types: 

200 imported_entities.append(g_set.add_de(resp_agent=resp_agent, res=subject, 

201 preexisting_graph=Reader.get_graph_from_subject(graph, subject))) 

202 # Identifier 

203 elif GraphEntity.iri_identifier in types: 

204 imported_entities.append(g_set.add_id(resp_agent=resp_agent, res=subject, 

205 preexisting_graph=Reader.get_graph_from_subject(graph, subject))) 

206 # PointerList 

207 elif GraphEntity.iri_singleloc_pointer_list in types: 

208 imported_entities.append(g_set.add_pl(resp_agent=resp_agent, res=subject, 

209 preexisting_graph=Reader.get_graph_from_subject(graph, subject))) 

210 # ResponsibleAgent 

211 elif GraphEntity.iri_agent in types: 

212 imported_entities.append(g_set.add_ra(resp_agent=resp_agent, res=subject, 

213 preexisting_graph=Reader.get_graph_from_subject(graph, subject))) 

214 # ResourceEmbodiment 

215 elif GraphEntity.iri_manifestation in types: 

216 imported_entities.append(g_set.add_re(resp_agent=resp_agent, res=subject, 

217 preexisting_graph=Reader.get_graph_from_subject(graph, subject))) 

218 # ReferencePointer 

219 elif GraphEntity.iri_intextref_pointer in types: 

220 imported_entities.append(g_set.add_rp(resp_agent=resp_agent, res=subject, 

221 preexisting_graph=Reader.get_graph_from_subject(graph, subject))) 

222 return imported_entities 

223 

224 @staticmethod 

225 def import_entity_from_triplestore(g_set: GraphSet, ts_url: str, res: URIRef, resp_agent: str, 

226 enable_validation: bool = False) -> GraphEntity: 

227 query: str = f"SELECT ?s ?p ?o WHERE {{BIND (<{res}> AS ?s). ?s ?p ?o.}}" 

228 attempt = 0 

229 max_attempts = 3 

230 wait_time = 5 # Initial wait time in seconds 

231 

232 while attempt < max_attempts: 

233 try: 

234 sparql: SPARQLWrapper = SPARQLWrapper(ts_url) 

235 sparql.setQuery(query) 

236 sparql.setMethod('GET') 

237 sparql.setReturnFormat(JSON) 

238 result = sparql.queryAndConvert()['results']['bindings'] 

239 

240 if not result: # Se non ci sono risultati, l'entità non esiste 

241 raise ValueError(f"The requested entity {res} was not found in the triplestore.") 

242 

243 imported_entities: List[GraphEntity] = Reader.import_entities_from_graph(g_set, result, resp_agent, enable_validation) 

244 if len(imported_entities) <= 0: 

245 raise ValueError("The requested entity was not recognized as a proper OCDM entity.") 

246 return imported_entities[0] 

247 

248 except ValueError as ve: # Non facciamo retry per errori di validazione 

249 raise ve 

250 except Exception as e: 

251 attempt += 1 

252 if attempt < max_attempts: 

253 print(f"[3] Attempt {attempt} failed. Could not import entity due to communication problems: {e}") 

254 print(f"Retrying in {wait_time} seconds...") 

255 time.sleep(wait_time) 

256 wait_time *= 2 

257 else: 

258 print(f"[3] All {max_attempts} attempts failed. Could not import entity due to communication problems: {e}") 

259 raise 

260 

261 raise Exception("Max attempts reached. Failed to import entity from triplestore.") 

262 

263 @staticmethod 

264 def import_entities_from_triplestore(g_set: GraphSet, ts_url: str, entities: List[URIRef], resp_agent: str, 

265 enable_validation: bool = False, batch_size: int = 1000) -> List[GraphEntity]: 

266 """ 

267 Import multiple entities from a triplestore in batches using a single SPARQL query per batch. 

268  

269 Args: 

270 g_set: The GraphSet to import entities into 

271 ts_url: The triplestore URL endpoint 

272 entities: List of URIRef entities to import 

273 resp_agent: The responsible agent string 

274 enable_validation: Whether to validate the imported graphs 

275 batch_size: Number of entities to import in each batch 

276  

277 Returns: 

278 List of imported GraphEntity objects 

279 """ 

280 if not entities: 

281 raise ValueError("No entities provided for import") 

282 

283 imported_entities: List[GraphEntity] = [] 

284 max_attempts = 3 

285 wait_time = 5 # Initial wait time in seconds 

286 

287 # Process entities in batches 

288 for i in range(0, len(entities), batch_size): 

289 batch = entities[i:i + batch_size] 

290 not_found_entities = set(str(entity) for entity in batch) 

291 

292 # Construct SPARQL query for batch using UNION pattern for Virtuoso 

293 union_patterns = [] 

294 for entity in batch: 

295 union_patterns.append(f"{{ BIND(<{str(entity)}> AS ?s) ?s ?p ?o }}") 

296 

297 query = f""" 

298 SELECT ?s ?p ?o 

299 WHERE {{ 

300 {' UNION '.join(union_patterns)} 

301 }} 

302 """ 

303 

304 # Execute query with retry logic 

305 attempt = 0 

306 while attempt < max_attempts: 

307 try: 

308 sparql: SPARQLWrapper = SPARQLWrapper(ts_url) 

309 sparql.setQuery(query) 

310 sparql.setMethod('GET') 

311 sparql.setReturnFormat(JSON) 

312 results = sparql.queryAndConvert()['results']['bindings'] 

313 

314 if not results: # Se non ci sono risultati per questo batch 

315 entities_str = ', '.join(not_found_entities) 

316 raise ValueError(f"The requested entities were not found in the triplestore: {entities_str}") 

317 

318 # Teniamo traccia delle entità trovate 

319 for result in results: 

320 if 's' in result and 'value' in result['s']: 

321 not_found_entities.discard(result['s']['value']) 

322 

323 # Import entities from results 

324 try: 

325 batch_entities = Reader.import_entities_from_graph( 

326 g_set=g_set, 

327 results=results, 

328 resp_agent=resp_agent, 

329 enable_validation=enable_validation 

330 ) 

331 imported_entities.extend(batch_entities) 

332 

333 # Se alcune entità non sono state trovate, lo segnaliamo 

334 if not_found_entities: 

335 entities_str = ', '.join(not_found_entities) 

336 raise ValueError(f"The following entities were not recognized as proper OCDM entities: {entities_str}") 

337 

338 break # Usciamo dal ciclo di retry se tutto è andato bene 

339 

340 except ValueError as ve: # Errori di validazione non richiedono retry 

341 raise ve 

342 

343 except ValueError as ve: # Non facciamo retry per errori di validazione o entità non trovate 

344 raise ve 

345 except Exception as e: 

346 attempt += 1 

347 if attempt < max_attempts: 

348 print(f"[3] Attempt {attempt} failed. Could not import batch due to communication problems: {e}") 

349 print(f"Retrying in {wait_time} seconds...") 

350 time.sleep(wait_time) 

351 wait_time *= 2 

352 else: 

353 print(f"[3] All {max_attempts} attempts failed. Could not import batch due to communication problems: {e}") 

354 raise 

355 

356 if not imported_entities: 

357 raise ValueError("None of the requested entities were found or recognized as proper OCDM entities.") 

358 

359 return imported_entities