Coverage for oc_ocdm / reader.py: 87%
277 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-08 20:23 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-08 20:23 +0000
1#!/usr/bin/python
3# SPDX-FileCopyrightText: 2020-2022 Simone Persiani <iosonopersia@gmail.com>
4# SPDX-FileCopyrightText: 2022-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
5#
6# SPDX-License-Identifier: ISC
8# -*- coding: utf-8 -*-
9from __future__ import annotations
11import json
12import os
13from typing import TYPE_CHECKING
14from zipfile import ZipFile
16import orjson
17from rdflib import Dataset, Graph, URIRef
18from triplelite import TripleLite, from_rdflib
20from oc_ocdm.constants import RDF_TYPE
21from oc_ocdm.graph.graph_entity import GraphEntity
22from oc_ocdm.support.reporter import Reporter
23from oc_ocdm.support.sparql import SPARQLEndpointError, sparql_query
24from oc_ocdm.support.support import build_graph_from_results, normalize_graph_literals
26if TYPE_CHECKING:
27 from collections.abc import Callable
28 from typing import Any, Dict, List, Optional
30 from oc_ocdm.graph.graph_set import GraphSet
32from pyshacl import validate
35def _transform_jsonld_value(value: dict | str, uri_fn: Callable[[str], str]) -> dict | str:
36 if isinstance(value, dict):
37 if "@id" in value:
38 return {"@id": uri_fn(value["@id"])}
39 result: dict = {}
40 if "@value" in value:
41 result["@value"] = value["@value"]
42 if "@type" in value:
43 result["@type"] = uri_fn(value["@type"])
44 if "@language" in value:
45 result["@language"] = value["@language"]
46 return result
47 return value
50def _transform_jsonld_entity(entity: dict, uri_fn: Callable[[str], str]) -> dict:
51 transformed: dict = {}
52 for key, value in entity.items():
53 if key == "@id":
54 transformed["@id"] = uri_fn(value)
55 elif key == "@type":
56 transformed["@type"] = [uri_fn(t) for t in value] if isinstance(value, list) else [uri_fn(value)]
57 elif key.startswith("@"):
58 continue
59 else:
60 new_key = uri_fn(key)
61 if isinstance(value, list):
62 transformed[new_key] = [_transform_jsonld_value(v, uri_fn) for v in value]
63 else:
64 transformed[new_key] = _transform_jsonld_value(value, uri_fn)
65 return transformed
68def _transform_jsonld_graphs(data: list[dict], uri_fn: Callable[[str], str]) -> list[dict]:
69 result = []
70 for graph_obj in data:
71 new_graph: dict = {}
72 if "@id" in graph_obj:
73 new_graph["@id"] = uri_fn(graph_obj["@id"])
74 if "@graph" in graph_obj:
75 new_graph["@graph"] = [_transform_jsonld_entity(e, uri_fn) for e in graph_obj["@graph"]]
76 result.append(new_graph)
77 return result
80def _expand_uri(curie: str, prefix_to_ns: dict[str, str]) -> str:
81 colon = curie.find(":")
82 if colon > 0:
83 prefix = curie[:colon]
84 ns = prefix_to_ns.get(prefix)
85 if ns is not None:
86 return ns + curie[colon + 1:]
87 return curie
90def _expand_jsonld(data: list[dict], prefix_to_ns: dict[str, str]) -> list[dict]:
91 return _transform_jsonld_graphs(data, lambda uri: _expand_uri(uri, prefix_to_ns))
94class Reader(object):
96 def __init__(self, repok: Optional[Reporter] = None, reperr: Optional[Reporter] = None, context_map: Optional[Dict[str, Any]] = None) -> None:
98 if context_map is not None:
99 self.context_map: Dict[str, Any] = context_map
100 else:
101 self.context_map: Dict[str, Any] = {}
102 for context_url in self.context_map:
103 ctx_file_path: Any = self.context_map[context_url]
104 if type(ctx_file_path) == str and os.path.isfile(ctx_file_path):
105 # This expensive operation is done only when it's really needed
106 with open(ctx_file_path, 'rt', encoding='utf-8') as ctx_f:
107 self.context_map[context_url] = json.load(ctx_f)
109 if repok is None:
110 self.repok: Reporter = Reporter(prefix="[Reader: INFO] ")
111 else:
112 self.repok: Reporter = repok
114 if reperr is None:
115 self.reperr: Reporter = Reporter(prefix="[Reader: ERROR] ")
116 else:
117 self.reperr: Reporter = reperr
119 def load(self, rdf_file_path: str) -> Optional[Dataset]:
120 self.repok.new_article()
121 self.reperr.new_article()
123 loaded_graph: Optional[Dataset] = None
124 if os.path.isfile(rdf_file_path):
126 try:
127 loaded_graph = self._load_graph(rdf_file_path)
128 except Exception as e:
129 self.reperr.add_sentence("[1] "
130 "It was impossible to handle the format used for "
131 "storing the file (stored in the temporary path) "
132 f"'{rdf_file_path}'. Additional details: {e}")
133 else:
134 self.reperr.add_sentence("[2] "
135 f"The file specified ('{rdf_file_path}') doesn't exist.")
137 return loaded_graph
139 _EXT_TO_FORMATS: dict[str, list[str]] = {
140 ".json": ["json-ld"],
141 ".jsonld": ["json-ld"],
142 ".xml": ["rdfxml"],
143 ".rdf": ["rdfxml"],
144 ".ttl": ["turtle"],
145 ".trig": ["trig"],
146 ".nt": ["nt11"],
147 ".nq": ["nquads"],
148 }
149 _ALL_FORMATS: list[str] = ["json-ld", "rdfxml", "turtle", "trig", "nt11", "nquads"]
151 @staticmethod
152 def _formats_for_file(file_name: str) -> list[str]:
153 ext = os.path.splitext(file_name)[1].lower()
154 preferred = Reader._EXT_TO_FORMATS.get(ext)
155 if preferred is not None:
156 return preferred + [f for f in Reader._ALL_FORMATS if f not in preferred]
157 return Reader._ALL_FORMATS
159 def _load_graph(self, file_path: str) -> Dataset:
160 loaded_graph = Dataset()
162 if file_path.endswith('.zip'):
163 try:
164 with ZipFile(file=file_path, mode="r") as archive:
165 for zf_name in archive.namelist():
166 formats = self._formats_for_file(zf_name)
167 with archive.open(zf_name) as f:
168 if self._try_parse(loaded_graph, f, formats):
169 for graph in loaded_graph.graphs():
170 normalize_graph_literals(graph)
171 return loaded_graph
172 except Exception as e:
173 raise IOError(f"Error opening or reading zip file '{file_path}': {e}")
174 else:
175 formats = self._formats_for_file(file_path)
176 try:
177 with open(file_path, 'rt', encoding='utf-8') as f:
178 if self._try_parse(loaded_graph, f, formats):
179 for graph in loaded_graph.graphs():
180 normalize_graph_literals(graph)
181 return loaded_graph
182 except Exception as e:
183 raise IOError(f"Error opening or reading file '{file_path}': {e}")
185 raise IOError(f"It was impossible to load the file '{file_path}' with supported formats.")
187 def _try_parse(self, graph: Dataset, file_obj, formats: List[str]) -> bool:
188 for cur_format in formats:
189 file_obj.seek(0)
190 try:
191 if cur_format == "json-ld":
192 json_ld_file = json.load(file_obj)
193 if isinstance(json_ld_file, dict):
194 json_ld_file = [json_ld_file]
195 for json_ld_resource in json_ld_file:
196 if "@context" in json_ld_resource and json_ld_resource["@context"] in self.context_map:
197 json_ld_resource["@context"] = self.context_map[json_ld_resource["@context"]]["@context"]
198 graph.parse(data=json.dumps(json_ld_file, ensure_ascii=False), format=cur_format)
199 else:
200 graph.parse(file=file_obj, format=cur_format)
201 return True
202 except Exception:
203 continue
204 return False
206 def load_jsonld_dict(self, rdf_file_path: str) -> list[dict]:
207 if rdf_file_path.endswith('.zip'):
208 with ZipFile(file=rdf_file_path, mode="r") as archive:
209 for zf_name in archive.namelist():
210 ext = os.path.splitext(zf_name)[1].lower()
211 if ext in ('.json', '.jsonld'):
212 with archive.open(zf_name) as f:
213 data = orjson.loads(f.read())
214 break
215 else:
216 raise IOError(f"No JSON/JSON-LD file found inside ZIP archive '{rdf_file_path}'.")
217 else:
218 with open(rdf_file_path, 'rb') as f:
219 data = orjson.loads(f.read())
220 if isinstance(data, dict):
221 data = [data]
222 prefix_to_ns: dict[str, str] | None = None
223 for graph_obj in data:
224 ctx_url = graph_obj.get("@context")
225 if ctx_url and ctx_url in self.context_map:
226 ctx = self.context_map[ctx_url]
227 if isinstance(ctx, dict) and "@context" in ctx:
228 ctx = ctx["@context"]
229 prefix_to_ns = {
230 k: v for k, v in ctx.items()
231 if isinstance(v, str) and not k.startswith("@")
232 }
233 break
234 if prefix_to_ns is not None:
235 data = _expand_jsonld(data, prefix_to_ns)
236 return data
238 def graph_validation(self, graph: Graph, closed: bool = False) -> Graph:
239 valid_graph: Graph = Graph(identifier=graph.identifier)
240 sg = Graph()
241 if closed:
242 sg.parse(os.path.join('oc_ocdm', 'resources', 'shacle_closed.ttl'))
243 else:
244 sg.parse(os.path.join('oc_ocdm', 'resources', 'shacle.ttl'))
245 _, report_result, _ = validate(graph,
246 shacl_graph=sg,
247 ont_graph=None,
248 inference=None,
249 abort_on_first=False,
250 allow_infos=False,
251 allow_warnings=False,
252 meta_shacl=False,
253 advanced=False,
254 js=False,
255 debug=False)
256 if not isinstance(report_result, Graph):
257 raise TypeError(f"Expected Graph from SHACL validation, got {type(report_result)}")
258 invalid_nodes = set()
259 for triple in report_result.triples((None, URIRef('http://www.w3.org/ns/shacl#focusNode'), None)):
260 invalid_nodes.add(triple[2])
261 for s in graph.subjects(unique=True):
262 if isinstance(s, URIRef) and s not in invalid_nodes:
263 for valid_subject_triple in graph.triples((s, None, None)):
264 valid_graph.add(valid_subject_triple)
265 return valid_graph
267 @staticmethod
268 def import_entities_from_graph(g_set: GraphSet, results: List[Dict] | TripleLite | Graph | Dataset, resp_agent: str,
269 enable_validation: bool = False, closed: bool = False) -> List[GraphEntity]:
270 if isinstance(results, list):
271 graph: TripleLite | Graph = build_graph_from_results(results)
272 elif isinstance(results, Dataset):
273 merged = TripleLite()
274 for tl in from_rdflib(results):
275 for triple in tl.triples((None, None, None)):
276 merged.add(triple)
277 graph = merged
278 elif isinstance(results, Graph):
279 graph = results
280 else:
281 graph = results
282 if enable_validation:
283 reader = Reader()
284 if not isinstance(graph, Graph):
285 graph = graph.to_rdflib()
286 graph = reader.graph_validation(graph, closed)
287 if isinstance(graph, Graph):
288 graph = from_rdflib(graph)[0]
289 imported_entities: List[GraphEntity] = []
290 for subject in graph.subjects():
291 types: List[str] = [o.value for o in graph.objects(subject, RDF_TYPE)]
292 preexisting = graph.subgraph(subject)
293 if GraphEntity.iri_note in types:
294 imported_entities.append(g_set.add_an(resp_agent=resp_agent, res=subject,
295 preexisting_graph=preexisting))
296 elif GraphEntity.iri_role_in_time in types:
297 imported_entities.append(g_set.add_ar(resp_agent=resp_agent, res=subject,
298 preexisting_graph=preexisting))
299 elif GraphEntity.iri_bibliographic_reference in types:
300 imported_entities.append(g_set.add_be(resp_agent=resp_agent, res=subject,
301 preexisting_graph=preexisting))
302 elif GraphEntity.iri_expression in types:
303 imported_entities.append(g_set.add_br(resp_agent=resp_agent, res=subject,
304 preexisting_graph=preexisting))
305 elif GraphEntity.iri_citation in types:
306 imported_entities.append(g_set.add_ci(resp_agent=resp_agent, res=subject,
307 preexisting_graph=preexisting))
308 elif GraphEntity.iri_discourse_element in types:
309 imported_entities.append(g_set.add_de(resp_agent=resp_agent, res=subject,
310 preexisting_graph=preexisting))
311 elif GraphEntity.iri_identifier in types:
312 imported_entities.append(g_set.add_id(resp_agent=resp_agent, res=subject,
313 preexisting_graph=preexisting))
314 elif GraphEntity.iri_singleloc_pointer_list in types:
315 imported_entities.append(g_set.add_pl(resp_agent=resp_agent, res=subject,
316 preexisting_graph=preexisting))
317 elif GraphEntity.iri_agent in types:
318 imported_entities.append(g_set.add_ra(resp_agent=resp_agent, res=subject,
319 preexisting_graph=preexisting))
320 elif GraphEntity.iri_manifestation in types:
321 imported_entities.append(g_set.add_re(resp_agent=resp_agent, res=subject,
322 preexisting_graph=preexisting))
323 elif GraphEntity.iri_intextref_pointer in types:
324 imported_entities.append(g_set.add_rp(resp_agent=resp_agent, res=subject,
325 preexisting_graph=preexisting))
326 return imported_entities
328 @staticmethod
329 def import_entity_from_triplestore(g_set: GraphSet, ts_url: str, res: str, resp_agent: str,
330 enable_validation: bool = False) -> GraphEntity:
331 query: str = f"SELECT ?s ?p ?o WHERE {{BIND (<{res}> AS ?s). ?s ?p ?o.}}"
332 try:
333 result = sparql_query(ts_url, query, max_retries=3, backoff_factor=2.5)['results']['bindings']
335 if not result:
336 raise ValueError(f"The requested entity {res} was not found in the triplestore.")
338 imported_entities: List[GraphEntity] = Reader.import_entities_from_graph(g_set, result, resp_agent, enable_validation)
339 if len(imported_entities) <= 0:
340 raise ValueError("The requested entity was not recognized as a proper OCDM entity.")
341 return imported_entities[0]
343 except ValueError:
344 raise
345 except SPARQLEndpointError as e:
346 print(f"[3] Could not import entity due to communication problems: {e}")
347 raise
349 @staticmethod
350 def import_entities_from_triplestore(g_set: GraphSet, ts_url: str, entities: List[str], resp_agent: str,
351 enable_validation: bool = False, batch_size: int = 1000) -> List[GraphEntity]:
352 if not entities:
353 raise ValueError("No entities provided for import")
355 imported_entities: List[GraphEntity] = []
357 try:
358 for i in range(0, len(entities), batch_size):
359 batch = entities[i:i + batch_size]
360 not_found_entities = set(batch)
362 union_patterns = []
363 for entity in batch:
364 union_patterns.append(f"{{ BIND(<{entity}> AS ?s) ?s ?p ?o }}")
366 query = f"""
367 SELECT ?s ?p ?o
368 WHERE {{
369 {' UNION '.join(union_patterns)}
370 }}
371 """
373 results = sparql_query(ts_url, query, max_retries=3, backoff_factor=2.5)['results']['bindings']
375 if not results:
376 entities_str = ', '.join(not_found_entities)
377 raise ValueError(f"The requested entities were not found in the triplestore: {entities_str}")
379 for result in results:
380 if 's' in result and 'value' in result['s']:
381 not_found_entities.discard(result['s']['value'])
383 batch_entities = Reader.import_entities_from_graph(
384 g_set=g_set,
385 results=results,
386 resp_agent=resp_agent,
387 enable_validation=enable_validation
388 )
389 imported_entities.extend(batch_entities)
391 if not_found_entities:
392 entities_str = ', '.join(not_found_entities)
393 raise ValueError(f"The following entities were not recognized as proper OCDM entities: {entities_str}")
395 except ValueError:
396 raise
397 except SPARQLEndpointError as e:
398 print(f"[3] Could not import batch due to communication problems: {e}")
399 raise
401 if not imported_entities:
402 raise ValueError("None of the requested entities were found or recognized as proper OCDM entities.")
404 return imported_entities