Coverage for oc_ocdm/reader.py: 88%
196 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-12-05 23:58 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-12-05 23:58 +0000
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3# Copyright (c) 2016, Silvio Peroni <essepuntato@gmail.com>
4#
5# Permission to use, copy, modify, and/or distribute this software for any purpose
6# with or without fee is hereby granted, provided that the above copyright notice
7# and this permission notice appear in all copies.
8#
9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
15# SOFTWARE.
16from __future__ import annotations
18import json
19import os
20from typing import TYPE_CHECKING
21from zipfile import ZipFile
23from oc_ocdm.graph.graph_entity import GraphEntity
24from oc_ocdm.support.reporter import Reporter
25from oc_ocdm.support.support import build_graph_from_results
26from rdflib import RDF, Dataset, Graph, URIRef
27from sparqlite import SPARQLClient, EndpointError
29if TYPE_CHECKING:
30 from typing import Any, Dict, List, Optional, Set
31 from oc_ocdm.graph.graph_set import GraphSet
33from pyshacl import validate
36class Reader(object):
38 def __init__(self, repok: Reporter = None, reperr: Reporter = None, context_map: Dict[str, Any] = None) -> None:
40 if context_map is not None:
41 self.context_map: Dict[str, Any] = context_map
42 else:
43 self.context_map: Dict[str, Any] = {}
44 for context_url in self.context_map:
45 ctx_file_path: Any = self.context_map[context_url]
46 if type(ctx_file_path) == str and os.path.isfile(ctx_file_path):
47 # This expensive operation is done only when it's really needed
48 with open(ctx_file_path, 'rt', encoding='utf-8') as ctx_f:
49 self.context_map[context_url] = json.load(ctx_f)
51 if repok is None:
52 self.repok: Reporter = Reporter(prefix="[Reader: INFO] ")
53 else:
54 self.repok: Reporter = repok
56 if reperr is None:
57 self.reperr: Reporter = Reporter(prefix="[Reader: ERROR] ")
58 else:
59 self.reperr: Reporter = reperr
61 def load(self, rdf_file_path: str) -> Optional[Dataset]:
62 self.repok.new_article()
63 self.reperr.new_article()
65 loaded_graph: Optional[Dataset] = None
66 if os.path.isfile(rdf_file_path):
68 try:
69 loaded_graph = self._load_graph(rdf_file_path)
70 except Exception as e:
71 self.reperr.add_sentence("[1] "
72 "It was impossible to handle the format used for "
73 "storing the file (stored in the temporary path) "
74 f"'{rdf_file_path}'. Additional details: {e}")
75 else:
76 self.reperr.add_sentence("[2] "
77 f"The file specified ('{rdf_file_path}') doesn't exist.")
79 return loaded_graph
81 def _load_graph(self, file_path: str) -> Dataset:
82 formats = ["json-ld", "rdfxml", "turtle", "trig", "nt11", "nquads"]
83 loaded_graph = Dataset()
85 if file_path.endswith('.zip'):
86 try:
87 with ZipFile(file=file_path, mode="r") as archive:
88 for zf_name in archive.namelist():
89 with archive.open(zf_name) as f:
90 if self._try_parse(loaded_graph, f, formats):
91 return loaded_graph
92 except Exception as e:
93 raise IOError(f"Error opening or reading zip file '{file_path}': {e}")
94 else:
95 try:
96 with open(file_path, 'rt', encoding='utf-8') as f:
97 if self._try_parse(loaded_graph, f, formats):
98 return loaded_graph
99 except Exception as e:
100 raise IOError(f"Error opening or reading file '{file_path}': {e}")
102 raise IOError(f"It was impossible to load the file '{file_path}' with supported formats.")
104 def _try_parse(self, graph: Dataset, file_obj, formats: List[str]) -> bool:
105 for cur_format in formats:
106 file_obj.seek(0) # Reset file pointer to the beginning for each new attempt
107 try:
108 if cur_format == "json-ld":
109 json_ld_file = json.load(file_obj)
110 if isinstance(json_ld_file, dict):
111 json_ld_file = [json_ld_file]
112 for json_ld_resource in json_ld_file:
113 if "@context" in json_ld_resource and json_ld_resource["@context"] in self.context_map:
114 json_ld_resource["@context"] = self.context_map[json_ld_resource["@context"]]["@context"]
115 data = json.dumps(json_ld_file, ensure_ascii=False)
116 graph.parse(data=data, format=cur_format)
117 else:
118 graph.parse(file=file_obj, format=cur_format)
119 return True # Success, no need to try other formats
120 except Exception as e:
121 continue # Try the next format
122 return False # None of the formats succeeded
124 @staticmethod
125 def get_graph_from_subject(graph: Graph, subject: URIRef) -> Graph:
126 g: Graph = Graph(identifier=graph.identifier)
127 for p, o in graph.predicate_objects(subject, unique=True):
128 g.add((subject, p, o))
129 return g
131 @staticmethod
132 def _extract_subjects(graph: Graph) -> Set[URIRef]:
133 subjects: Set[URIRef] = set()
134 for s in graph.subjects(unique=True):
135 subjects.add(s)
136 return subjects
138 def graph_validation(self, graph: Graph, closed: bool = False) -> Graph:
139 valid_graph: Graph = Graph(identifier=graph.identifier)
140 sg = Graph()
141 if closed:
142 sg.parse(os.path.join('oc_ocdm', 'resources', 'shacle_closed.ttl'))
143 else:
144 sg.parse(os.path.join('oc_ocdm', 'resources', 'shacle.ttl'))
145 _, report_g, _ = validate(graph,
146 shacl_graph=sg,
147 ont_graph=None,
148 inference=None,
149 abort_on_first=False,
150 allow_infos=False,
151 allow_warnings=False,
152 meta_shacl=False,
153 advanced=False,
154 js=False,
155 debug=False)
156 invalid_nodes = set()
157 for triple in report_g.triples((None, URIRef('http://www.w3.org/ns/shacl#focusNode'), None)):
158 invalid_nodes.add(triple[2])
159 for subject in self._extract_subjects(graph):
160 if subject not in invalid_nodes:
161 for valid_subject_triple in graph.triples((subject, None, None)):
162 valid_graph.add(valid_subject_triple)
163 return valid_graph
165 @staticmethod
166 def import_entities_from_graph(g_set: GraphSet, results: List[Dict]|Graph|Dataset, resp_agent: str,
167 enable_validation: bool = False, closed: bool = False) -> List[GraphEntity]:
168 if isinstance(results, list):
169 graph = build_graph_from_results(results)
170 elif isinstance(results, Dataset):
171 # Convert Dataset to Graph by flattening all quads into triples
172 graph = Graph()
173 for s, p, o, _ in results.quads((None, None, None, None)):
174 graph.add((s, p, o))
175 else:
176 graph = results
177 if enable_validation:
178 reader = Reader()
179 graph = reader.graph_validation(graph, closed)
180 imported_entities: List[GraphEntity] = []
181 for subject in Reader._extract_subjects(graph):
182 types = []
183 for o in graph.objects(subject, RDF.type):
184 types.append(o)
185 # ReferenceAnnotation
186 if GraphEntity.iri_note in types:
187 imported_entities.append(g_set.add_an(resp_agent=resp_agent, res=subject,
188 preexisting_graph=Reader.get_graph_from_subject(graph, subject)))
189 # AgentRole
190 elif GraphEntity.iri_role_in_time in types:
191 imported_entities.append(g_set.add_ar(resp_agent=resp_agent, res=subject,
192 preexisting_graph=Reader.get_graph_from_subject(graph, subject)))
193 # BibliographicReference
194 elif GraphEntity.iri_bibliographic_reference in types:
195 imported_entities.append(g_set.add_be(resp_agent=resp_agent, res=subject,
196 preexisting_graph=Reader.get_graph_from_subject(graph, subject)))
197 # BibliographicResource
198 elif GraphEntity.iri_expression in types:
199 imported_entities.append(g_set.add_br(resp_agent=resp_agent, res=subject,
200 preexisting_graph=Reader.get_graph_from_subject(graph, subject)))
201 # Citation
202 elif GraphEntity.iri_citation in types:
203 imported_entities.append(g_set.add_ci(resp_agent=resp_agent, res=subject,
204 preexisting_graph=Reader.get_graph_from_subject(graph, subject)))
205 # DiscourseElement
206 elif GraphEntity.iri_discourse_element in types:
207 imported_entities.append(g_set.add_de(resp_agent=resp_agent, res=subject,
208 preexisting_graph=Reader.get_graph_from_subject(graph, subject)))
209 # Identifier
210 elif GraphEntity.iri_identifier in types:
211 imported_entities.append(g_set.add_id(resp_agent=resp_agent, res=subject,
212 preexisting_graph=Reader.get_graph_from_subject(graph, subject)))
213 # PointerList
214 elif GraphEntity.iri_singleloc_pointer_list in types:
215 imported_entities.append(g_set.add_pl(resp_agent=resp_agent, res=subject,
216 preexisting_graph=Reader.get_graph_from_subject(graph, subject)))
217 # ResponsibleAgent
218 elif GraphEntity.iri_agent in types:
219 imported_entities.append(g_set.add_ra(resp_agent=resp_agent, res=subject,
220 preexisting_graph=Reader.get_graph_from_subject(graph, subject)))
221 # ResourceEmbodiment
222 elif GraphEntity.iri_manifestation in types:
223 imported_entities.append(g_set.add_re(resp_agent=resp_agent, res=subject,
224 preexisting_graph=Reader.get_graph_from_subject(graph, subject)))
225 # ReferencePointer
226 elif GraphEntity.iri_intextref_pointer in types:
227 imported_entities.append(g_set.add_rp(resp_agent=resp_agent, res=subject,
228 preexisting_graph=Reader.get_graph_from_subject(graph, subject)))
229 return imported_entities
231 @staticmethod
232 def import_entity_from_triplestore(g_set: GraphSet, ts_url: str, res: URIRef, resp_agent: str,
233 enable_validation: bool = False) -> GraphEntity:
234 query: str = f"SELECT ?s ?p ?o WHERE {{BIND (<{res}> AS ?s). ?s ?p ?o.}}"
235 try:
236 with SPARQLClient(ts_url, max_retries=3, backoff_factor=2.5) as client:
237 result = client.query(query)['results']['bindings']
239 if not result:
240 raise ValueError(f"The requested entity {res} was not found in the triplestore.")
242 imported_entities: List[GraphEntity] = Reader.import_entities_from_graph(g_set, result, resp_agent, enable_validation)
243 if len(imported_entities) <= 0:
244 raise ValueError("The requested entity was not recognized as a proper OCDM entity.")
245 return imported_entities[0]
247 except ValueError:
248 raise
249 except EndpointError as e:
250 print(f"[3] Could not import entity due to communication problems: {e}")
251 raise
253 @staticmethod
254 def import_entities_from_triplestore(g_set: GraphSet, ts_url: str, entities: List[URIRef], resp_agent: str,
255 enable_validation: bool = False, batch_size: int = 1000) -> List[GraphEntity]:
256 if not entities:
257 raise ValueError("No entities provided for import")
259 imported_entities: List[GraphEntity] = []
261 try:
262 with SPARQLClient(ts_url, max_retries=3, backoff_factor=2.5) as client:
263 for i in range(0, len(entities), batch_size):
264 batch = entities[i:i + batch_size]
265 not_found_entities = set(str(entity) for entity in batch)
267 union_patterns = []
268 for entity in batch:
269 union_patterns.append(f"{{ BIND(<{str(entity)}> AS ?s) ?s ?p ?o }}")
271 query = f"""
272 SELECT ?s ?p ?o
273 WHERE {{
274 {' UNION '.join(union_patterns)}
275 }}
276 """
278 results = client.query(query)['results']['bindings']
280 if not results:
281 entities_str = ', '.join(not_found_entities)
282 raise ValueError(f"The requested entities were not found in the triplestore: {entities_str}")
284 for result in results:
285 if 's' in result and 'value' in result['s']:
286 not_found_entities.discard(result['s']['value'])
288 batch_entities = Reader.import_entities_from_graph(
289 g_set=g_set,
290 results=results,
291 resp_agent=resp_agent,
292 enable_validation=enable_validation
293 )
294 imported_entities.extend(batch_entities)
296 if not_found_entities:
297 entities_str = ', '.join(not_found_entities)
298 raise ValueError(f"The following entities were not recognized as proper OCDM entities: {entities_str}")
300 except ValueError:
301 raise
302 except EndpointError as e:
303 print(f"[3] Could not import batch due to communication problems: {e}")
304 raise
306 if not imported_entities:
307 raise ValueError("None of the requested entities were found or recognized as proper OCDM entities.")
309 return imported_entities