Coverage for oc_ocdm / reader.py: 87%

277 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-05-08 20:23 +0000

1#!/usr/bin/python 

2 

3# SPDX-FileCopyrightText: 2020-2022 Simone Persiani <iosonopersia@gmail.com> 

4# SPDX-FileCopyrightText: 2022-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

5# 

6# SPDX-License-Identifier: ISC 

7 

8# -*- coding: utf-8 -*- 

9from __future__ import annotations 

10 

11import json 

12import os 

13from typing import TYPE_CHECKING 

14from zipfile import ZipFile 

15 

16import orjson 

17from rdflib import Dataset, Graph, URIRef 

18from triplelite import TripleLite, from_rdflib 

19 

20from oc_ocdm.constants import RDF_TYPE 

21from oc_ocdm.graph.graph_entity import GraphEntity 

22from oc_ocdm.support.reporter import Reporter 

23from oc_ocdm.support.sparql import SPARQLEndpointError, sparql_query 

24from oc_ocdm.support.support import build_graph_from_results, normalize_graph_literals 

25 

26if TYPE_CHECKING: 

27 from collections.abc import Callable 

28 from typing import Any, Dict, List, Optional 

29 

30 from oc_ocdm.graph.graph_set import GraphSet 

31 

32from pyshacl import validate 

33 

34 

35def _transform_jsonld_value(value: dict | str, uri_fn: Callable[[str], str]) -> dict | str: 

36 if isinstance(value, dict): 

37 if "@id" in value: 

38 return {"@id": uri_fn(value["@id"])} 

39 result: dict = {} 

40 if "@value" in value: 

41 result["@value"] = value["@value"] 

42 if "@type" in value: 

43 result["@type"] = uri_fn(value["@type"]) 

44 if "@language" in value: 

45 result["@language"] = value["@language"] 

46 return result 

47 return value 

48 

49 

50def _transform_jsonld_entity(entity: dict, uri_fn: Callable[[str], str]) -> dict: 

51 transformed: dict = {} 

52 for key, value in entity.items(): 

53 if key == "@id": 

54 transformed["@id"] = uri_fn(value) 

55 elif key == "@type": 

56 transformed["@type"] = [uri_fn(t) for t in value] if isinstance(value, list) else [uri_fn(value)] 

57 elif key.startswith("@"): 

58 continue 

59 else: 

60 new_key = uri_fn(key) 

61 if isinstance(value, list): 

62 transformed[new_key] = [_transform_jsonld_value(v, uri_fn) for v in value] 

63 else: 

64 transformed[new_key] = _transform_jsonld_value(value, uri_fn) 

65 return transformed 

66 

67 

68def _transform_jsonld_graphs(data: list[dict], uri_fn: Callable[[str], str]) -> list[dict]: 

69 result = [] 

70 for graph_obj in data: 

71 new_graph: dict = {} 

72 if "@id" in graph_obj: 

73 new_graph["@id"] = uri_fn(graph_obj["@id"]) 

74 if "@graph" in graph_obj: 

75 new_graph["@graph"] = [_transform_jsonld_entity(e, uri_fn) for e in graph_obj["@graph"]] 

76 result.append(new_graph) 

77 return result 

78 

79 

80def _expand_uri(curie: str, prefix_to_ns: dict[str, str]) -> str: 

81 colon = curie.find(":") 

82 if colon > 0: 

83 prefix = curie[:colon] 

84 ns = prefix_to_ns.get(prefix) 

85 if ns is not None: 

86 return ns + curie[colon + 1:] 

87 return curie 

88 

89 

90def _expand_jsonld(data: list[dict], prefix_to_ns: dict[str, str]) -> list[dict]: 

91 return _transform_jsonld_graphs(data, lambda uri: _expand_uri(uri, prefix_to_ns)) 

92 

93 

94class Reader(object): 

95 

96 def __init__(self, repok: Optional[Reporter] = None, reperr: Optional[Reporter] = None, context_map: Optional[Dict[str, Any]] = None) -> None: 

97 

98 if context_map is not None: 

99 self.context_map: Dict[str, Any] = context_map 

100 else: 

101 self.context_map: Dict[str, Any] = {} 

102 for context_url in self.context_map: 

103 ctx_file_path: Any = self.context_map[context_url] 

104 if type(ctx_file_path) == str and os.path.isfile(ctx_file_path): 

105 # This expensive operation is done only when it's really needed 

106 with open(ctx_file_path, 'rt', encoding='utf-8') as ctx_f: 

107 self.context_map[context_url] = json.load(ctx_f) 

108 

109 if repok is None: 

110 self.repok: Reporter = Reporter(prefix="[Reader: INFO] ") 

111 else: 

112 self.repok: Reporter = repok 

113 

114 if reperr is None: 

115 self.reperr: Reporter = Reporter(prefix="[Reader: ERROR] ") 

116 else: 

117 self.reperr: Reporter = reperr 

118 

119 def load(self, rdf_file_path: str) -> Optional[Dataset]: 

120 self.repok.new_article() 

121 self.reperr.new_article() 

122 

123 loaded_graph: Optional[Dataset] = None 

124 if os.path.isfile(rdf_file_path): 

125 

126 try: 

127 loaded_graph = self._load_graph(rdf_file_path) 

128 except Exception as e: 

129 self.reperr.add_sentence("[1] " 

130 "It was impossible to handle the format used for " 

131 "storing the file (stored in the temporary path) " 

132 f"'{rdf_file_path}'. Additional details: {e}") 

133 else: 

134 self.reperr.add_sentence("[2] " 

135 f"The file specified ('{rdf_file_path}') doesn't exist.") 

136 

137 return loaded_graph 

138 

139 _EXT_TO_FORMATS: dict[str, list[str]] = { 

140 ".json": ["json-ld"], 

141 ".jsonld": ["json-ld"], 

142 ".xml": ["rdfxml"], 

143 ".rdf": ["rdfxml"], 

144 ".ttl": ["turtle"], 

145 ".trig": ["trig"], 

146 ".nt": ["nt11"], 

147 ".nq": ["nquads"], 

148 } 

149 _ALL_FORMATS: list[str] = ["json-ld", "rdfxml", "turtle", "trig", "nt11", "nquads"] 

150 

151 @staticmethod 

152 def _formats_for_file(file_name: str) -> list[str]: 

153 ext = os.path.splitext(file_name)[1].lower() 

154 preferred = Reader._EXT_TO_FORMATS.get(ext) 

155 if preferred is not None: 

156 return preferred + [f for f in Reader._ALL_FORMATS if f not in preferred] 

157 return Reader._ALL_FORMATS 

158 

159 def _load_graph(self, file_path: str) -> Dataset: 

160 loaded_graph = Dataset() 

161 

162 if file_path.endswith('.zip'): 

163 try: 

164 with ZipFile(file=file_path, mode="r") as archive: 

165 for zf_name in archive.namelist(): 

166 formats = self._formats_for_file(zf_name) 

167 with archive.open(zf_name) as f: 

168 if self._try_parse(loaded_graph, f, formats): 

169 for graph in loaded_graph.graphs(): 

170 normalize_graph_literals(graph) 

171 return loaded_graph 

172 except Exception as e: 

173 raise IOError(f"Error opening or reading zip file '{file_path}': {e}") 

174 else: 

175 formats = self._formats_for_file(file_path) 

176 try: 

177 with open(file_path, 'rt', encoding='utf-8') as f: 

178 if self._try_parse(loaded_graph, f, formats): 

179 for graph in loaded_graph.graphs(): 

180 normalize_graph_literals(graph) 

181 return loaded_graph 

182 except Exception as e: 

183 raise IOError(f"Error opening or reading file '{file_path}': {e}") 

184 

185 raise IOError(f"It was impossible to load the file '{file_path}' with supported formats.") 

186 

187 def _try_parse(self, graph: Dataset, file_obj, formats: List[str]) -> bool: 

188 for cur_format in formats: 

189 file_obj.seek(0) 

190 try: 

191 if cur_format == "json-ld": 

192 json_ld_file = json.load(file_obj) 

193 if isinstance(json_ld_file, dict): 

194 json_ld_file = [json_ld_file] 

195 for json_ld_resource in json_ld_file: 

196 if "@context" in json_ld_resource and json_ld_resource["@context"] in self.context_map: 

197 json_ld_resource["@context"] = self.context_map[json_ld_resource["@context"]]["@context"] 

198 graph.parse(data=json.dumps(json_ld_file, ensure_ascii=False), format=cur_format) 

199 else: 

200 graph.parse(file=file_obj, format=cur_format) 

201 return True 

202 except Exception: 

203 continue 

204 return False 

205 

206 def load_jsonld_dict(self, rdf_file_path: str) -> list[dict]: 

207 if rdf_file_path.endswith('.zip'): 

208 with ZipFile(file=rdf_file_path, mode="r") as archive: 

209 for zf_name in archive.namelist(): 

210 ext = os.path.splitext(zf_name)[1].lower() 

211 if ext in ('.json', '.jsonld'): 

212 with archive.open(zf_name) as f: 

213 data = orjson.loads(f.read()) 

214 break 

215 else: 

216 raise IOError(f"No JSON/JSON-LD file found inside ZIP archive '{rdf_file_path}'.") 

217 else: 

218 with open(rdf_file_path, 'rb') as f: 

219 data = orjson.loads(f.read()) 

220 if isinstance(data, dict): 

221 data = [data] 

222 prefix_to_ns: dict[str, str] | None = None 

223 for graph_obj in data: 

224 ctx_url = graph_obj.get("@context") 

225 if ctx_url and ctx_url in self.context_map: 

226 ctx = self.context_map[ctx_url] 

227 if isinstance(ctx, dict) and "@context" in ctx: 

228 ctx = ctx["@context"] 

229 prefix_to_ns = { 

230 k: v for k, v in ctx.items() 

231 if isinstance(v, str) and not k.startswith("@") 

232 } 

233 break 

234 if prefix_to_ns is not None: 

235 data = _expand_jsonld(data, prefix_to_ns) 

236 return data 

237 

238 def graph_validation(self, graph: Graph, closed: bool = False) -> Graph: 

239 valid_graph: Graph = Graph(identifier=graph.identifier) 

240 sg = Graph() 

241 if closed: 

242 sg.parse(os.path.join('oc_ocdm', 'resources', 'shacle_closed.ttl')) 

243 else: 

244 sg.parse(os.path.join('oc_ocdm', 'resources', 'shacle.ttl')) 

245 _, report_result, _ = validate(graph, 

246 shacl_graph=sg, 

247 ont_graph=None, 

248 inference=None, 

249 abort_on_first=False, 

250 allow_infos=False, 

251 allow_warnings=False, 

252 meta_shacl=False, 

253 advanced=False, 

254 js=False, 

255 debug=False) 

256 if not isinstance(report_result, Graph): 

257 raise TypeError(f"Expected Graph from SHACL validation, got {type(report_result)}") 

258 invalid_nodes = set() 

259 for triple in report_result.triples((None, URIRef('http://www.w3.org/ns/shacl#focusNode'), None)): 

260 invalid_nodes.add(triple[2]) 

261 for s in graph.subjects(unique=True): 

262 if isinstance(s, URIRef) and s not in invalid_nodes: 

263 for valid_subject_triple in graph.triples((s, None, None)): 

264 valid_graph.add(valid_subject_triple) 

265 return valid_graph 

266 

267 @staticmethod 

268 def import_entities_from_graph(g_set: GraphSet, results: List[Dict] | TripleLite | Graph | Dataset, resp_agent: str, 

269 enable_validation: bool = False, closed: bool = False) -> List[GraphEntity]: 

270 if isinstance(results, list): 

271 graph: TripleLite | Graph = build_graph_from_results(results) 

272 elif isinstance(results, Dataset): 

273 merged = TripleLite() 

274 for tl in from_rdflib(results): 

275 for triple in tl.triples((None, None, None)): 

276 merged.add(triple) 

277 graph = merged 

278 elif isinstance(results, Graph): 

279 graph = results 

280 else: 

281 graph = results 

282 if enable_validation: 

283 reader = Reader() 

284 if not isinstance(graph, Graph): 

285 graph = graph.to_rdflib() 

286 graph = reader.graph_validation(graph, closed) 

287 if isinstance(graph, Graph): 

288 graph = from_rdflib(graph)[0] 

289 imported_entities: List[GraphEntity] = [] 

290 for subject in graph.subjects(): 

291 types: List[str] = [o.value for o in graph.objects(subject, RDF_TYPE)] 

292 preexisting = graph.subgraph(subject) 

293 if GraphEntity.iri_note in types: 

294 imported_entities.append(g_set.add_an(resp_agent=resp_agent, res=subject, 

295 preexisting_graph=preexisting)) 

296 elif GraphEntity.iri_role_in_time in types: 

297 imported_entities.append(g_set.add_ar(resp_agent=resp_agent, res=subject, 

298 preexisting_graph=preexisting)) 

299 elif GraphEntity.iri_bibliographic_reference in types: 

300 imported_entities.append(g_set.add_be(resp_agent=resp_agent, res=subject, 

301 preexisting_graph=preexisting)) 

302 elif GraphEntity.iri_expression in types: 

303 imported_entities.append(g_set.add_br(resp_agent=resp_agent, res=subject, 

304 preexisting_graph=preexisting)) 

305 elif GraphEntity.iri_citation in types: 

306 imported_entities.append(g_set.add_ci(resp_agent=resp_agent, res=subject, 

307 preexisting_graph=preexisting)) 

308 elif GraphEntity.iri_discourse_element in types: 

309 imported_entities.append(g_set.add_de(resp_agent=resp_agent, res=subject, 

310 preexisting_graph=preexisting)) 

311 elif GraphEntity.iri_identifier in types: 

312 imported_entities.append(g_set.add_id(resp_agent=resp_agent, res=subject, 

313 preexisting_graph=preexisting)) 

314 elif GraphEntity.iri_singleloc_pointer_list in types: 

315 imported_entities.append(g_set.add_pl(resp_agent=resp_agent, res=subject, 

316 preexisting_graph=preexisting)) 

317 elif GraphEntity.iri_agent in types: 

318 imported_entities.append(g_set.add_ra(resp_agent=resp_agent, res=subject, 

319 preexisting_graph=preexisting)) 

320 elif GraphEntity.iri_manifestation in types: 

321 imported_entities.append(g_set.add_re(resp_agent=resp_agent, res=subject, 

322 preexisting_graph=preexisting)) 

323 elif GraphEntity.iri_intextref_pointer in types: 

324 imported_entities.append(g_set.add_rp(resp_agent=resp_agent, res=subject, 

325 preexisting_graph=preexisting)) 

326 return imported_entities 

327 

328 @staticmethod 

329 def import_entity_from_triplestore(g_set: GraphSet, ts_url: str, res: str, resp_agent: str, 

330 enable_validation: bool = False) -> GraphEntity: 

331 query: str = f"SELECT ?s ?p ?o WHERE {{BIND (<{res}> AS ?s). ?s ?p ?o.}}" 

332 try: 

333 result = sparql_query(ts_url, query, max_retries=3, backoff_factor=2.5)['results']['bindings'] 

334 

335 if not result: 

336 raise ValueError(f"The requested entity {res} was not found in the triplestore.") 

337 

338 imported_entities: List[GraphEntity] = Reader.import_entities_from_graph(g_set, result, resp_agent, enable_validation) 

339 if len(imported_entities) <= 0: 

340 raise ValueError("The requested entity was not recognized as a proper OCDM entity.") 

341 return imported_entities[0] 

342 

343 except ValueError: 

344 raise 

345 except SPARQLEndpointError as e: 

346 print(f"[3] Could not import entity due to communication problems: {e}") 

347 raise 

348 

349 @staticmethod 

350 def import_entities_from_triplestore(g_set: GraphSet, ts_url: str, entities: List[str], resp_agent: str, 

351 enable_validation: bool = False, batch_size: int = 1000) -> List[GraphEntity]: 

352 if not entities: 

353 raise ValueError("No entities provided for import") 

354 

355 imported_entities: List[GraphEntity] = [] 

356 

357 try: 

358 for i in range(0, len(entities), batch_size): 

359 batch = entities[i:i + batch_size] 

360 not_found_entities = set(batch) 

361 

362 union_patterns = [] 

363 for entity in batch: 

364 union_patterns.append(f"{{ BIND(<{entity}> AS ?s) ?s ?p ?o }}") 

365 

366 query = f""" 

367 SELECT ?s ?p ?o 

368 WHERE {{ 

369 {' UNION '.join(union_patterns)} 

370 }} 

371 """ 

372 

373 results = sparql_query(ts_url, query, max_retries=3, backoff_factor=2.5)['results']['bindings'] 

374 

375 if not results: 

376 entities_str = ', '.join(not_found_entities) 

377 raise ValueError(f"The requested entities were not found in the triplestore: {entities_str}") 

378 

379 for result in results: 

380 if 's' in result and 'value' in result['s']: 

381 not_found_entities.discard(result['s']['value']) 

382 

383 batch_entities = Reader.import_entities_from_graph( 

384 g_set=g_set, 

385 results=results, 

386 resp_agent=resp_agent, 

387 enable_validation=enable_validation 

388 ) 

389 imported_entities.extend(batch_entities) 

390 

391 if not_found_entities: 

392 entities_str = ', '.join(not_found_entities) 

393 raise ValueError(f"The following entities were not recognized as proper OCDM entities: {entities_str}") 

394 

395 except ValueError: 

396 raise 

397 except SPARQLEndpointError as e: 

398 print(f"[3] Could not import batch due to communication problems: {e}") 

399 raise 

400 

401 if not imported_entities: 

402 raise ValueError("None of the requested entities were found or recognized as proper OCDM entities.") 

403 

404 return imported_entities