Coverage for oc_ocdm/reader.py: 88%

196 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-12-05 23:58 +0000

1#!/usr/bin/python 

2# -*- coding: utf-8 -*- 

3# Copyright (c) 2016, Silvio Peroni <essepuntato@gmail.com> 

4# 

5# Permission to use, copy, modify, and/or distribute this software for any purpose 

6# with or without fee is hereby granted, provided that the above copyright notice 

7# and this permission notice appear in all copies. 

8# 

9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, 

12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, 

13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 

14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS 

15# SOFTWARE. 

16from __future__ import annotations 

17 

18import json 

19import os 

20from typing import TYPE_CHECKING 

21from zipfile import ZipFile 

22 

23from oc_ocdm.graph.graph_entity import GraphEntity 

24from oc_ocdm.support.reporter import Reporter 

25from oc_ocdm.support.support import build_graph_from_results 

26from rdflib import RDF, Dataset, Graph, URIRef 

27from sparqlite import SPARQLClient, EndpointError 

28 

29if TYPE_CHECKING: 

30 from typing import Any, Dict, List, Optional, Set 

31 from oc_ocdm.graph.graph_set import GraphSet 

32 

33from pyshacl import validate 

34 

35 

36class Reader(object): 

37 

38 def __init__(self, repok: Reporter = None, reperr: Reporter = None, context_map: Dict[str, Any] = None) -> None: 

39 

40 if context_map is not None: 

41 self.context_map: Dict[str, Any] = context_map 

42 else: 

43 self.context_map: Dict[str, Any] = {} 

44 for context_url in self.context_map: 

45 ctx_file_path: Any = self.context_map[context_url] 

46 if type(ctx_file_path) == str and os.path.isfile(ctx_file_path): 

47 # This expensive operation is done only when it's really needed 

48 with open(ctx_file_path, 'rt', encoding='utf-8') as ctx_f: 

49 self.context_map[context_url] = json.load(ctx_f) 

50 

51 if repok is None: 

52 self.repok: Reporter = Reporter(prefix="[Reader: INFO] ") 

53 else: 

54 self.repok: Reporter = repok 

55 

56 if reperr is None: 

57 self.reperr: Reporter = Reporter(prefix="[Reader: ERROR] ") 

58 else: 

59 self.reperr: Reporter = reperr 

60 

61 def load(self, rdf_file_path: str) -> Optional[Dataset]: 

62 self.repok.new_article() 

63 self.reperr.new_article() 

64 

65 loaded_graph: Optional[Dataset] = None 

66 if os.path.isfile(rdf_file_path): 

67 

68 try: 

69 loaded_graph = self._load_graph(rdf_file_path) 

70 except Exception as e: 

71 self.reperr.add_sentence("[1] " 

72 "It was impossible to handle the format used for " 

73 "storing the file (stored in the temporary path) " 

74 f"'{rdf_file_path}'. Additional details: {e}") 

75 else: 

76 self.reperr.add_sentence("[2] " 

77 f"The file specified ('{rdf_file_path}') doesn't exist.") 

78 

79 return loaded_graph 

80 

81 def _load_graph(self, file_path: str) -> Dataset: 

82 formats = ["json-ld", "rdfxml", "turtle", "trig", "nt11", "nquads"] 

83 loaded_graph = Dataset() 

84 

85 if file_path.endswith('.zip'): 

86 try: 

87 with ZipFile(file=file_path, mode="r") as archive: 

88 for zf_name in archive.namelist(): 

89 with archive.open(zf_name) as f: 

90 if self._try_parse(loaded_graph, f, formats): 

91 return loaded_graph 

92 except Exception as e: 

93 raise IOError(f"Error opening or reading zip file '{file_path}': {e}") 

94 else: 

95 try: 

96 with open(file_path, 'rt', encoding='utf-8') as f: 

97 if self._try_parse(loaded_graph, f, formats): 

98 return loaded_graph 

99 except Exception as e: 

100 raise IOError(f"Error opening or reading file '{file_path}': {e}") 

101 

102 raise IOError(f"It was impossible to load the file '{file_path}' with supported formats.") 

103 

104 def _try_parse(self, graph: Dataset, file_obj, formats: List[str]) -> bool: 

105 for cur_format in formats: 

106 file_obj.seek(0) # Reset file pointer to the beginning for each new attempt 

107 try: 

108 if cur_format == "json-ld": 

109 json_ld_file = json.load(file_obj) 

110 if isinstance(json_ld_file, dict): 

111 json_ld_file = [json_ld_file] 

112 for json_ld_resource in json_ld_file: 

113 if "@context" in json_ld_resource and json_ld_resource["@context"] in self.context_map: 

114 json_ld_resource["@context"] = self.context_map[json_ld_resource["@context"]]["@context"] 

115 data = json.dumps(json_ld_file, ensure_ascii=False) 

116 graph.parse(data=data, format=cur_format) 

117 else: 

118 graph.parse(file=file_obj, format=cur_format) 

119 return True # Success, no need to try other formats 

120 except Exception as e: 

121 continue # Try the next format 

122 return False # None of the formats succeeded 

123 

124 @staticmethod 

125 def get_graph_from_subject(graph: Graph, subject: URIRef) -> Graph: 

126 g: Graph = Graph(identifier=graph.identifier) 

127 for p, o in graph.predicate_objects(subject, unique=True): 

128 g.add((subject, p, o)) 

129 return g 

130 

131 @staticmethod 

132 def _extract_subjects(graph: Graph) -> Set[URIRef]: 

133 subjects: Set[URIRef] = set() 

134 for s in graph.subjects(unique=True): 

135 subjects.add(s) 

136 return subjects 

137 

138 def graph_validation(self, graph: Graph, closed: bool = False) -> Graph: 

139 valid_graph: Graph = Graph(identifier=graph.identifier) 

140 sg = Graph() 

141 if closed: 

142 sg.parse(os.path.join('oc_ocdm', 'resources', 'shacle_closed.ttl')) 

143 else: 

144 sg.parse(os.path.join('oc_ocdm', 'resources', 'shacle.ttl')) 

145 _, report_g, _ = validate(graph, 

146 shacl_graph=sg, 

147 ont_graph=None, 

148 inference=None, 

149 abort_on_first=False, 

150 allow_infos=False, 

151 allow_warnings=False, 

152 meta_shacl=False, 

153 advanced=False, 

154 js=False, 

155 debug=False) 

156 invalid_nodes = set() 

157 for triple in report_g.triples((None, URIRef('http://www.w3.org/ns/shacl#focusNode'), None)): 

158 invalid_nodes.add(triple[2]) 

159 for subject in self._extract_subjects(graph): 

160 if subject not in invalid_nodes: 

161 for valid_subject_triple in graph.triples((subject, None, None)): 

162 valid_graph.add(valid_subject_triple) 

163 return valid_graph 

164 

165 @staticmethod 

166 def import_entities_from_graph(g_set: GraphSet, results: List[Dict]|Graph|Dataset, resp_agent: str, 

167 enable_validation: bool = False, closed: bool = False) -> List[GraphEntity]: 

168 if isinstance(results, list): 

169 graph = build_graph_from_results(results) 

170 elif isinstance(results, Dataset): 

171 # Convert Dataset to Graph by flattening all quads into triples 

172 graph = Graph() 

173 for s, p, o, _ in results.quads((None, None, None, None)): 

174 graph.add((s, p, o)) 

175 else: 

176 graph = results 

177 if enable_validation: 

178 reader = Reader() 

179 graph = reader.graph_validation(graph, closed) 

180 imported_entities: List[GraphEntity] = [] 

181 for subject in Reader._extract_subjects(graph): 

182 types = [] 

183 for o in graph.objects(subject, RDF.type): 

184 types.append(o) 

185 # ReferenceAnnotation 

186 if GraphEntity.iri_note in types: 

187 imported_entities.append(g_set.add_an(resp_agent=resp_agent, res=subject, 

188 preexisting_graph=Reader.get_graph_from_subject(graph, subject))) 

189 # AgentRole 

190 elif GraphEntity.iri_role_in_time in types: 

191 imported_entities.append(g_set.add_ar(resp_agent=resp_agent, res=subject, 

192 preexisting_graph=Reader.get_graph_from_subject(graph, subject))) 

193 # BibliographicReference 

194 elif GraphEntity.iri_bibliographic_reference in types: 

195 imported_entities.append(g_set.add_be(resp_agent=resp_agent, res=subject, 

196 preexisting_graph=Reader.get_graph_from_subject(graph, subject))) 

197 # BibliographicResource 

198 elif GraphEntity.iri_expression in types: 

199 imported_entities.append(g_set.add_br(resp_agent=resp_agent, res=subject, 

200 preexisting_graph=Reader.get_graph_from_subject(graph, subject))) 

201 # Citation 

202 elif GraphEntity.iri_citation in types: 

203 imported_entities.append(g_set.add_ci(resp_agent=resp_agent, res=subject, 

204 preexisting_graph=Reader.get_graph_from_subject(graph, subject))) 

205 # DiscourseElement 

206 elif GraphEntity.iri_discourse_element in types: 

207 imported_entities.append(g_set.add_de(resp_agent=resp_agent, res=subject, 

208 preexisting_graph=Reader.get_graph_from_subject(graph, subject))) 

209 # Identifier 

210 elif GraphEntity.iri_identifier in types: 

211 imported_entities.append(g_set.add_id(resp_agent=resp_agent, res=subject, 

212 preexisting_graph=Reader.get_graph_from_subject(graph, subject))) 

213 # PointerList 

214 elif GraphEntity.iri_singleloc_pointer_list in types: 

215 imported_entities.append(g_set.add_pl(resp_agent=resp_agent, res=subject, 

216 preexisting_graph=Reader.get_graph_from_subject(graph, subject))) 

217 # ResponsibleAgent 

218 elif GraphEntity.iri_agent in types: 

219 imported_entities.append(g_set.add_ra(resp_agent=resp_agent, res=subject, 

220 preexisting_graph=Reader.get_graph_from_subject(graph, subject))) 

221 # ResourceEmbodiment 

222 elif GraphEntity.iri_manifestation in types: 

223 imported_entities.append(g_set.add_re(resp_agent=resp_agent, res=subject, 

224 preexisting_graph=Reader.get_graph_from_subject(graph, subject))) 

225 # ReferencePointer 

226 elif GraphEntity.iri_intextref_pointer in types: 

227 imported_entities.append(g_set.add_rp(resp_agent=resp_agent, res=subject, 

228 preexisting_graph=Reader.get_graph_from_subject(graph, subject))) 

229 return imported_entities 

230 

231 @staticmethod 

232 def import_entity_from_triplestore(g_set: GraphSet, ts_url: str, res: URIRef, resp_agent: str, 

233 enable_validation: bool = False) -> GraphEntity: 

234 query: str = f"SELECT ?s ?p ?o WHERE {{BIND (<{res}> AS ?s). ?s ?p ?o.}}" 

235 try: 

236 with SPARQLClient(ts_url, max_retries=3, backoff_factor=2.5) as client: 

237 result = client.query(query)['results']['bindings'] 

238 

239 if not result: 

240 raise ValueError(f"The requested entity {res} was not found in the triplestore.") 

241 

242 imported_entities: List[GraphEntity] = Reader.import_entities_from_graph(g_set, result, resp_agent, enable_validation) 

243 if len(imported_entities) <= 0: 

244 raise ValueError("The requested entity was not recognized as a proper OCDM entity.") 

245 return imported_entities[0] 

246 

247 except ValueError: 

248 raise 

249 except EndpointError as e: 

250 print(f"[3] Could not import entity due to communication problems: {e}") 

251 raise 

252 

253 @staticmethod 

254 def import_entities_from_triplestore(g_set: GraphSet, ts_url: str, entities: List[URIRef], resp_agent: str, 

255 enable_validation: bool = False, batch_size: int = 1000) -> List[GraphEntity]: 

256 if not entities: 

257 raise ValueError("No entities provided for import") 

258 

259 imported_entities: List[GraphEntity] = [] 

260 

261 try: 

262 with SPARQLClient(ts_url, max_retries=3, backoff_factor=2.5) as client: 

263 for i in range(0, len(entities), batch_size): 

264 batch = entities[i:i + batch_size] 

265 not_found_entities = set(str(entity) for entity in batch) 

266 

267 union_patterns = [] 

268 for entity in batch: 

269 union_patterns.append(f"{{ BIND(<{str(entity)}> AS ?s) ?s ?p ?o }}") 

270 

271 query = f""" 

272 SELECT ?s ?p ?o 

273 WHERE {{ 

274 {' UNION '.join(union_patterns)} 

275 }} 

276 """ 

277 

278 results = client.query(query)['results']['bindings'] 

279 

280 if not results: 

281 entities_str = ', '.join(not_found_entities) 

282 raise ValueError(f"The requested entities were not found in the triplestore: {entities_str}") 

283 

284 for result in results: 

285 if 's' in result and 'value' in result['s']: 

286 not_found_entities.discard(result['s']['value']) 

287 

288 batch_entities = Reader.import_entities_from_graph( 

289 g_set=g_set, 

290 results=results, 

291 resp_agent=resp_agent, 

292 enable_validation=enable_validation 

293 ) 

294 imported_entities.extend(batch_entities) 

295 

296 if not_found_entities: 

297 entities_str = ', '.join(not_found_entities) 

298 raise ValueError(f"The following entities were not recognized as proper OCDM entities: {entities_str}") 

299 

300 except ValueError: 

301 raise 

302 except EndpointError as e: 

303 print(f"[3] Could not import batch due to communication problems: {e}") 

304 raise 

305 

306 if not imported_entities: 

307 raise ValueError("None of the requested entities were found or recognized as proper OCDM entities.") 

308 

309 return imported_entities