Coverage for oc_ocdm/reader.py: 76%
222 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-05-30 22:05 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-05-30 22:05 +0000
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3# Copyright (c) 2016, Silvio Peroni <essepuntato@gmail.com>
4#
5# Permission to use, copy, modify, and/or distribute this software for any purpose
6# with or without fee is hereby granted, provided that the above copyright notice
7# and this permission notice appear in all copies.
8#
9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
15# SOFTWARE.
16from __future__ import annotations
18import json
19import os
20import time
21from typing import TYPE_CHECKING
22from zipfile import ZipFile
24from oc_ocdm.graph.graph_entity import GraphEntity
25from oc_ocdm.support.reporter import Reporter
26from oc_ocdm.support.support import build_graph_from_results
27from rdflib import RDF, ConjunctiveGraph, Graph, URIRef
28from SPARQLWrapper import JSON, POST, SPARQLWrapper
30if TYPE_CHECKING:
31 from typing import Any, Dict, List, Optional, Set
32 from oc_ocdm.graph.graph_set import GraphSet
34from pyshacl import validate
37class Reader(object):
39 def __init__(self, repok: Reporter = None, reperr: Reporter = None, context_map: Dict[str, Any] = None) -> None:
41 if context_map is not None:
42 self.context_map: Dict[str, Any] = context_map
43 else:
44 self.context_map: Dict[str, Any] = {}
45 for context_url in self.context_map:
46 ctx_file_path: Any = self.context_map[context_url]
47 if type(ctx_file_path) == str and os.path.isfile(ctx_file_path):
48 # This expensive operation is done only when it's really needed
49 with open(ctx_file_path, 'rt', encoding='utf-8') as ctx_f:
50 self.context_map[context_url] = json.load(ctx_f)
52 if repok is None:
53 self.repok: Reporter = Reporter(prefix="[Reader: INFO] ")
54 else:
55 self.repok: Reporter = repok
57 if reperr is None:
58 self.reperr: Reporter = Reporter(prefix="[Reader: ERROR] ")
59 else:
60 self.reperr: Reporter = reperr
62 def load(self, rdf_file_path: str) -> Optional[ConjunctiveGraph]:
63 self.repok.new_article()
64 self.reperr.new_article()
66 loaded_graph: Optional[ConjunctiveGraph] = None
67 if os.path.isfile(rdf_file_path):
69 try:
70 loaded_graph = self._load_graph(rdf_file_path)
71 except Exception as e:
72 self.reperr.add_sentence("[1] "
73 "It was impossible to handle the format used for "
74 "storing the file (stored in the temporary path) "
75 f"'{rdf_file_path}'. Additional details: {e}")
76 else:
77 self.reperr.add_sentence("[2] "
78 f"The file specified ('{rdf_file_path}') doesn't exist.")
80 return loaded_graph
82 def _load_graph(self, file_path: str) -> ConjunctiveGraph:
83 formats = ["json-ld", "rdfxml", "turtle", "trig", "nt11", "nquads"]
84 loaded_graph = ConjunctiveGraph()
86 if file_path.endswith('.zip'):
87 try:
88 with ZipFile(file=file_path, mode="r") as archive:
89 for zf_name in archive.namelist():
90 with archive.open(zf_name) as f:
91 if self._try_parse(loaded_graph, f, formats):
92 return loaded_graph
93 except Exception as e:
94 raise IOError(f"Error opening or reading zip file '{file_path}': {e}")
95 else:
96 try:
97 with open(file_path, 'rt', encoding='utf-8') as f:
98 if self._try_parse(loaded_graph, f, formats):
99 return loaded_graph
100 except Exception as e:
101 raise IOError(f"Error opening or reading file '{file_path}': {e}")
103 raise IOError(f"It was impossible to load the file '{file_path}' with supported formats.")
105 def _try_parse(self, graph: ConjunctiveGraph, file_obj, formats: List[str]) -> bool:
106 for cur_format in formats:
107 file_obj.seek(0) # Reset file pointer to the beginning for each new attempt
108 try:
109 if cur_format == "json-ld":
110 json_ld_file = json.load(file_obj)
111 if isinstance(json_ld_file, dict):
112 json_ld_file = [json_ld_file]
113 for json_ld_resource in json_ld_file:
114 if "@context" in json_ld_resource and json_ld_resource["@context"] in self.context_map:
115 json_ld_resource["@context"] = self.context_map[json_ld_resource["@context"]]["@context"]
116 data = json.dumps(json_ld_file, ensure_ascii=False)
117 graph.parse(data=data, format=cur_format)
118 else:
119 graph.parse(file=file_obj, format=cur_format)
120 return True # Success, no need to try other formats
121 except Exception as e:
122 continue # Try the next format
123 return False # None of the formats succeeded
125 @staticmethod
126 def get_graph_from_subject(graph: Graph, subject: URIRef) -> Graph:
127 g: Graph = Graph(identifier=graph.identifier)
128 for p, o in graph.predicate_objects(subject, unique=True):
129 g.add((subject, p, o))
130 return g
132 @staticmethod
133 def _extract_subjects(graph: Graph) -> Set[URIRef]:
134 subjects: Set[URIRef] = set()
135 for s in graph.subjects(unique=True):
136 subjects.add(s)
137 return subjects
139 def graph_validation(self, graph: Graph, closed: bool = False) -> Graph:
140 valid_graph: Graph = Graph(identifier=graph.identifier)
141 sg = Graph()
142 if closed:
143 sg.parse(os.path.join('oc_ocdm', 'resources', 'shacle_closed.ttl'))
144 else:
145 sg.parse(os.path.join('oc_ocdm', 'resources', 'shacle.ttl'))
146 _, report_g, _ = validate(graph,
147 shacl_graph=sg,
148 ont_graph=None,
149 inference=None,
150 abort_on_first=False,
151 allow_infos=False,
152 allow_warnings=False,
153 meta_shacl=False,
154 advanced=False,
155 js=False,
156 debug=False)
157 invalid_nodes = set()
158 for triple in report_g.triples((None, URIRef('http://www.w3.org/ns/shacl#focusNode'), None)):
159 invalid_nodes.add(triple[2])
160 for subject in self._extract_subjects(graph):
161 if subject not in invalid_nodes:
162 for valid_subject_triple in graph.triples((subject, None, None)):
163 valid_graph.add(valid_subject_triple)
164 return valid_graph
166 @staticmethod
167 def import_entities_from_graph(g_set: GraphSet, results: List[Dict]|Graph, resp_agent: str,
168 enable_validation: bool = False, closed: bool = False) -> List[GraphEntity]:
169 graph = build_graph_from_results(results) if isinstance(results, list) else results
170 if enable_validation:
171 reader = Reader()
172 graph = reader.graph_validation(graph, closed)
173 imported_entities: List[GraphEntity] = []
174 for subject in Reader._extract_subjects(graph):
175 types = []
176 for o in graph.objects(subject, RDF.type):
177 types.append(o)
178 # ReferenceAnnotation
179 if GraphEntity.iri_note in types:
180 imported_entities.append(g_set.add_an(resp_agent=resp_agent, res=subject,
181 preexisting_graph=Reader.get_graph_from_subject(graph, subject)))
182 # AgentRole
183 elif GraphEntity.iri_role_in_time in types:
184 imported_entities.append(g_set.add_ar(resp_agent=resp_agent, res=subject,
185 preexisting_graph=Reader.get_graph_from_subject(graph, subject)))
186 # BibliographicReference
187 elif GraphEntity.iri_bibliographic_reference in types:
188 imported_entities.append(g_set.add_be(resp_agent=resp_agent, res=subject,
189 preexisting_graph=Reader.get_graph_from_subject(graph, subject)))
190 # BibliographicResource
191 elif GraphEntity.iri_expression in types:
192 imported_entities.append(g_set.add_br(resp_agent=resp_agent, res=subject,
193 preexisting_graph=Reader.get_graph_from_subject(graph, subject)))
194 # Citation
195 elif GraphEntity.iri_citation in types:
196 imported_entities.append(g_set.add_ci(resp_agent=resp_agent, res=subject,
197 preexisting_graph=Reader.get_graph_from_subject(graph, subject)))
198 # DiscourseElement
199 elif GraphEntity.iri_discourse_element in types:
200 imported_entities.append(g_set.add_de(resp_agent=resp_agent, res=subject,
201 preexisting_graph=Reader.get_graph_from_subject(graph, subject)))
202 # Identifier
203 elif GraphEntity.iri_identifier in types:
204 imported_entities.append(g_set.add_id(resp_agent=resp_agent, res=subject,
205 preexisting_graph=Reader.get_graph_from_subject(graph, subject)))
206 # PointerList
207 elif GraphEntity.iri_singleloc_pointer_list in types:
208 imported_entities.append(g_set.add_pl(resp_agent=resp_agent, res=subject,
209 preexisting_graph=Reader.get_graph_from_subject(graph, subject)))
210 # ResponsibleAgent
211 elif GraphEntity.iri_agent in types:
212 imported_entities.append(g_set.add_ra(resp_agent=resp_agent, res=subject,
213 preexisting_graph=Reader.get_graph_from_subject(graph, subject)))
214 # ResourceEmbodiment
215 elif GraphEntity.iri_manifestation in types:
216 imported_entities.append(g_set.add_re(resp_agent=resp_agent, res=subject,
217 preexisting_graph=Reader.get_graph_from_subject(graph, subject)))
218 # ReferencePointer
219 elif GraphEntity.iri_intextref_pointer in types:
220 imported_entities.append(g_set.add_rp(resp_agent=resp_agent, res=subject,
221 preexisting_graph=Reader.get_graph_from_subject(graph, subject)))
222 return imported_entities
224 @staticmethod
225 def import_entity_from_triplestore(g_set: GraphSet, ts_url: str, res: URIRef, resp_agent: str,
226 enable_validation: bool = False) -> GraphEntity:
227 query: str = f"SELECT ?s ?p ?o WHERE {{BIND (<{res}> AS ?s). ?s ?p ?o.}}"
228 attempt = 0
229 max_attempts = 3
230 wait_time = 5 # Initial wait time in seconds
232 while attempt < max_attempts:
233 try:
234 sparql: SPARQLWrapper = SPARQLWrapper(ts_url)
235 sparql.setQuery(query)
236 sparql.setMethod('GET')
237 sparql.setReturnFormat(JSON)
238 result = sparql.queryAndConvert()['results']['bindings']
240 if not result: # Se non ci sono risultati, l'entità non esiste
241 raise ValueError(f"The requested entity {res} was not found in the triplestore.")
243 imported_entities: List[GraphEntity] = Reader.import_entities_from_graph(g_set, result, resp_agent, enable_validation)
244 if len(imported_entities) <= 0:
245 raise ValueError("The requested entity was not recognized as a proper OCDM entity.")
246 return imported_entities[0]
248 except ValueError as ve: # Non facciamo retry per errori di validazione
249 raise ve
250 except Exception as e:
251 attempt += 1
252 if attempt < max_attempts:
253 print(f"[3] Attempt {attempt} failed. Could not import entity due to communication problems: {e}")
254 print(f"Retrying in {wait_time} seconds...")
255 time.sleep(wait_time)
256 wait_time *= 2
257 else:
258 print(f"[3] All {max_attempts} attempts failed. Could not import entity due to communication problems: {e}")
259 raise
261 raise Exception("Max attempts reached. Failed to import entity from triplestore.")
263 @staticmethod
264 def import_entities_from_triplestore(g_set: GraphSet, ts_url: str, entities: List[URIRef], resp_agent: str,
265 enable_validation: bool = False, batch_size: int = 1000) -> List[GraphEntity]:
266 """
267 Import multiple entities from a triplestore in batches using a single SPARQL query per batch.
269 Args:
270 g_set: The GraphSet to import entities into
271 ts_url: The triplestore URL endpoint
272 entities: List of URIRef entities to import
273 resp_agent: The responsible agent string
274 enable_validation: Whether to validate the imported graphs
275 batch_size: Number of entities to import in each batch
277 Returns:
278 List of imported GraphEntity objects
279 """
280 if not entities:
281 raise ValueError("No entities provided for import")
283 imported_entities: List[GraphEntity] = []
284 max_attempts = 3
285 wait_time = 5 # Initial wait time in seconds
287 # Process entities in batches
288 for i in range(0, len(entities), batch_size):
289 batch = entities[i:i + batch_size]
290 not_found_entities = set(str(entity) for entity in batch)
292 # Construct SPARQL query for batch using UNION pattern for Virtuoso
293 union_patterns = []
294 for entity in batch:
295 union_patterns.append(f"{{ BIND(<{str(entity)}> AS ?s) ?s ?p ?o }}")
297 query = f"""
298 SELECT ?s ?p ?o
299 WHERE {{
300 {' UNION '.join(union_patterns)}
301 }}
302 """
304 # Execute query with retry logic
305 attempt = 0
306 while attempt < max_attempts:
307 try:
308 sparql: SPARQLWrapper = SPARQLWrapper(ts_url)
309 sparql.setQuery(query)
310 sparql.setMethod('GET')
311 sparql.setReturnFormat(JSON)
312 results = sparql.queryAndConvert()['results']['bindings']
314 if not results: # Se non ci sono risultati per questo batch
315 entities_str = ', '.join(not_found_entities)
316 raise ValueError(f"The requested entities were not found in the triplestore: {entities_str}")
318 # Teniamo traccia delle entità trovate
319 for result in results:
320 if 's' in result and 'value' in result['s']:
321 not_found_entities.discard(result['s']['value'])
323 # Import entities from results
324 try:
325 batch_entities = Reader.import_entities_from_graph(
326 g_set=g_set,
327 results=results,
328 resp_agent=resp_agent,
329 enable_validation=enable_validation
330 )
331 imported_entities.extend(batch_entities)
333 # Se alcune entità non sono state trovate, lo segnaliamo
334 if not_found_entities:
335 entities_str = ', '.join(not_found_entities)
336 raise ValueError(f"The following entities were not recognized as proper OCDM entities: {entities_str}")
338 break # Usciamo dal ciclo di retry se tutto è andato bene
340 except ValueError as ve: # Errori di validazione non richiedono retry
341 raise ve
343 except ValueError as ve: # Non facciamo retry per errori di validazione o entità non trovate
344 raise ve
345 except Exception as e:
346 attempt += 1
347 if attempt < max_attempts:
348 print(f"[3] Attempt {attempt} failed. Could not import batch due to communication problems: {e}")
349 print(f"Retrying in {wait_time} seconds...")
350 time.sleep(wait_time)
351 wait_time *= 2
352 else:
353 print(f"[3] All {max_attempts} attempts failed. Could not import batch due to communication problems: {e}")
354 raise
356 if not imported_entities:
357 raise ValueError("None of the requested entities were found or recognized as proper OCDM entities.")
359 return imported_entities