Coverage for rdflib_ocdm / ocdm_graph.py: 86%
188 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-21 12:35 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-21 12:35 +0000
1#!/usr/bin/python
3# SPDX-FileCopyrightText: 2023-2025 Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# SPDX-License-Identifier: ISC
7from __future__ import annotations
9from typing import TYPE_CHECKING
11import rdflib
12import rdflib.plugin as plugin
13from rdflib.exceptions import ParserError
14from rdflib.parser import InputSource, Parser, create_input_source
16if TYPE_CHECKING:
17 _SubjectType = Node
18 _PredicateType = Node
19 _ObjectType = Node
20 _TripleType = Tuple["_SubjectType", "_PredicateType", "_ObjectType"]
21 from typing import (IO, TYPE_CHECKING, Any, BinaryIO, List, Optional, TextIO,
22 Union, List, Tuple)
24import pathlib
25import warnings
26from copy import deepcopy
27from datetime import datetime, timedelta, timezone
29from rdflib import Dataset, Graph, URIRef
30from rdflib.term import Node
32from rdflib_ocdm.counter_handler.counter_handler import CounterHandler
33from rdflib_ocdm.graph_utils import _extract_graph_iri, _extract_graph_iri_from_context
34from rdflib_ocdm.prov.provenance import OCDMProvenance
35from rdflib_ocdm.prov.snapshot_entity import SnapshotEntity
38class OCDMGraphCommons():
39 def __init__(self, counter_handler: CounterHandler):
40 self.__merge_index = dict()
41 self.__entity_index = dict()
42 self.all_entities = set()
43 self.provenance = OCDMProvenance(self, counter_handler)
45 def preexisting_finished(self: Graph|Dataset|OCDMGraphCommons, resp_agent: str = None, primary_source: str = None, c_time: str = None):
46 self.preexisting_graph = deepcopy(self)
48 unique_subjects = set()
49 if isinstance(self, Dataset):
50 for s, _, _, _ in self.quads((None, None, None, None)):
51 unique_subjects.add(s)
52 else:
53 unique_subjects = set(self.subjects(unique=True))
55 for subject in unique_subjects:
56 # Initialize or update entity_index, preserving graph_iri if already set
57 existing_graph_iri = self.entity_index.get(subject, {}).get('graph_iri')
58 self.entity_index[subject] = {'to_be_deleted': False, 'is_restored': False, 'resp_agent': resp_agent, 'source': primary_source, 'graph_iri': existing_graph_iri}
60 # If graph_iri not yet set and this is a Dataset, find it
61 if isinstance(self, Dataset) and existing_graph_iri is None: 61 ↛ 62line 61 didn't jump to line 62 because the condition on line 61 was never true
62 self.entity_index[subject]['graph_iri'] = _extract_graph_iri(self, subject)
64 self.all_entities.add(subject)
65 count = self.provenance.counter_handler.read_counter(subject)
66 if count == 0:
67 if c_time is None:
68 cur_time: str = (datetime.now(tz=timezone.utc).replace(microsecond=0) - timedelta(seconds=5)).isoformat(sep="T")
69 else:
70 cur_time: str = (datetime.fromtimestamp(c_time, tz=timezone.utc).replace(microsecond=0) - timedelta(seconds=5)).isoformat(sep="T")
71 new_snapshot: SnapshotEntity = self.provenance._create_snapshot(subject, cur_time)
72 new_snapshot.has_description(f"The entity '{str(subject)}' has been created.")
74 def merge(self: Graph|Dataset|OCDMGraphCommons, res: URIRef, other: URIRef):
75 # Preserve graph_iri before removing quads
76 other_graph_iri = None
77 if isinstance(self, Dataset):
78 # Extract graph_iri from the entity being merged before removal
79 other_graph_iri = _extract_graph_iri(self, other)
81 quads_list: List[Tuple] = list(self.quads((None, None, other, None)))
82 for s, p, o, c in quads_list:
83 self.remove((s, p, o, c))
84 self.add((s, p, res, c))
85 quads_list: List[Tuple] = list(self.quads((other, None, None, None)))
86 for s, p, o, c in quads_list:
87 self.remove((s, p, o, c))
88 else:
89 triples_list: List[Tuple] = list(self.triples((None, None, other)))
90 for triple in triples_list:
91 self.remove(triple)
92 new_triple = (triple[0], triple[1], res)
93 self.add(new_triple)
94 triples_list: List[Tuple] = list(self.triples((other, None, None)))
95 for triple in triples_list:
96 self.remove(triple)
98 self.__merge_index.setdefault(res, set()).add(other)
99 if other not in self.entity_index:
100 self.entity_index[other] = {'to_be_deleted': False, 'is_restored': False, 'resp_agent': None, 'source': None, 'graph_iri': other_graph_iri}
101 else:
102 # Preserve graph_iri if it was already set
103 if other_graph_iri is not None and self.entity_index[other].get('graph_iri') is None: 103 ↛ 104line 103 didn't jump to line 104 because the condition on line 103 was never true
104 self.entity_index[other]['graph_iri'] = other_graph_iri
105 self.entity_index[other]['to_be_deleted'] = True
107 def mark_as_deleted(self, res: URIRef) -> None:
108 self.entity_index[res]['to_be_deleted'] = True
110 def mark_as_restored(self, res: URIRef) -> None:
111 """
112 Marks an entity as being restored after deletion.
113 This will:
114 1. Set is_restored flag to True in the entity_index
115 2. Set to_be_deleted flag to False
117 :param res: The URI reference of the entity to restore
118 :type res: URIRef
119 :return: None
120 """
121 if res in self.entity_index: 121 ↛ exitline 121 didn't return from function 'mark_as_restored' because the condition on line 121 was always true
122 self.entity_index[res]['is_restored'] = True
123 self.entity_index[res]['to_be_deleted'] = False
125 @property
126 def merge_index(self) -> dict:
127 return self.__merge_index
129 @property
130 def entity_index(self) -> dict:
131 return self.__entity_index
133 def generate_provenance(self, c_time: float = None) -> None:
134 return self.provenance.generate_provenance(c_time)
136 def get_entity(self, res: str) -> Optional[SnapshotEntity]:
137 return self.provenance.get_entity(res)
139 def commit_changes(self):
140 self.__merge_index = dict()
141 self.__entity_index = dict()
142 self.preexisting_graph = deepcopy(self)
144 def get_provenance_graphs(self) -> Dataset:
145 prov_g = Dataset()
146 for _, prov_entity in self.provenance.res_to_entity.items():
147 for triple in prov_entity.g.triples((None, None, None)):
148 prov_g.add((triple[0], triple[1], triple[2], URIRef(prov_entity.prov_subject + '/prov/')))
149 return prov_g
151class OCDMGraph(OCDMGraphCommons, Graph):
152 def __init__(self, counter_handler: CounterHandler = None):
153 Graph.__init__(self)
154 self.preexisting_graph = Graph()
155 OCDMGraphCommons.__init__(self, counter_handler)
157 def add(self, triple: "_TripleType", resp_agent = None, primary_source = None):
158 """Add a triple with self as context"""
159 s, p, o = triple
160 assert isinstance(s, Node), "Subject %s must be an rdflib term" % (s,)
161 assert isinstance(p, Node), "Predicate %s must be an rdflib term" % (p,)
162 assert isinstance(o, Node), "Object %s must be an rdflib term" % (o,)
163 self._Graph__store.add((s, p, o), self, quoted=False)
165 # Add the subject to all_entities if it's not already present
166 if s not in self.all_entities:
167 self.all_entities.add(s)
169 if s not in self.entity_index:
170 self.entity_index[s] = {'to_be_deleted': False, 'is_restored': False, 'resp_agent': resp_agent, 'source': primary_source}
172 return self
174 def parse(
175 self,
176 source: Optional[
177 Union[IO[bytes], TextIO, InputSource, str, bytes, pathlib.PurePath]
178 ] = None,
179 publicID: Optional[str] = None, # noqa: N803
180 format: Optional[str] = None,
181 location: Optional[str] = None,
182 file: Optional[Union[BinaryIO, TextIO]] = None,
183 data: Optional[Union[str, bytes]] = None,
184 resp_agent: URIRef = None,
185 primary_source: URIRef = None,
186 **args: Any,
187 ) -> "Graph":
188 """
189 Parse an RDF source adding the resulting triples to the Graph.
191 The source is specified using one of source, location, file or data.
193 .. caution::
195 This method can access directly or indirectly requested network or
196 file resources, for example, when parsing JSON-LD documents with
197 ``@context`` directives that point to a network location.
199 When processing untrusted or potentially malicious documents,
200 measures should be taken to restrict network and file access.
202 For information on available security measures, see the RDFLib
203 :doc:`Security Considerations </security_considerations>`
204 documentation.
206 :Parameters:
208 - ``source``: An InputSource, file-like object, or string. In the case
209 of a string the string is the location of the source.
210 - ``location``: A string indicating the relative or absolute URL of
211 the source. Graph's absolutize method is used if a relative location
212 is specified.
213 - ``file``: A file-like object.
214 - ``data``: A string containing the data to be parsed.
215 - ``format``: Used if format can not be determined from source, e.g.
216 file extension or Media Type. Defaults to text/turtle. Format
217 support can be extended with plugins, but "xml", "n3" (use for
218 turtle), "nt" & "trix" are built in.
219 - ``publicID``: the logical URI to use as the document base. If None
220 specified the document location is used (at least in the case where
221 there is a document location).
223 :Returns:
225 - self, the graph instance.
227 Examples:
229 >>> my_data = '''
230 ... <rdf:RDF
231 ... xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"
232 ... xmlns:rdfs="http://www.w3.org/2000/01/rdf-schema#"
233 ... >
234 ... <rdf:Description>
235 ... <rdfs:label>Example</rdfs:label>
236 ... <rdfs:comment>This is really just an example.</rdfs:comment>
237 ... </rdf:Description>
238 ... </rdf:RDF>
239 ... '''
240 >>> import os, tempfile
241 >>> fd, file_name = tempfile.mkstemp()
242 >>> f = os.fdopen(fd, "w")
243 >>> dummy = f.write(my_data) # Returns num bytes written
244 >>> f.close()
246 >>> g = Graph()
247 >>> result = g.parse(data=my_data, format="application/rdf+xml")
248 >>> len(g)
249 2
251 >>> g = Graph()
252 >>> result = g.parse(location=file_name, format="application/rdf+xml")
253 >>> len(g)
254 2
256 >>> g = Graph()
257 >>> with open(file_name, "r") as f:
258 ... result = g.parse(f, format="application/rdf+xml")
259 >>> len(g)
260 2
262 >>> os.remove(file_name)
264 >>> # default turtle parsing
265 >>> result = g.parse(data="<http://example.com/a> <http://example.com/a> <http://example.com/a> .")
266 >>> len(g)
267 3
269 """
271 source = create_input_source(
272 source=source,
273 publicID=publicID,
274 location=location,
275 file=file,
276 data=data,
277 format=format,
278 )
279 if format is None: 279 ↛ 281line 279 didn't jump to line 281 because the condition on line 279 was always true
280 format = source.content_type
281 could_not_guess_format = False
282 if format is None: 282 ↛ 292line 282 didn't jump to line 292 because the condition on line 282 was always true
283 if ( 283 ↛ 289line 283 didn't jump to line 289 because the condition on line 283 was always true
284 hasattr(source, "file")
285 and getattr(source.file, "name", None)
286 and isinstance(source.file.name, str)
287 ):
288 format = rdflib.util.guess_format(source.file.name)
289 if format is None: 289 ↛ 290line 289 didn't jump to line 290 because the condition on line 289 was never true
290 format = "turtle"
291 could_not_guess_format = True
292 parser = plugin.get(format, Parser)()
293 try:
294 # TODO FIXME: Parser.parse should have **kwargs argument.
295 parser.parse(source, self, **args)
296 except SyntaxError as se:
297 if could_not_guess_format:
298 raise ParserError(
299 "Could not guess RDF format for %r from file extension so tried Turtle but failed."
300 "You can explicitly specify format using the format argument."
301 % source
302 )
303 else:
304 raise se
305 finally:
306 if source.auto_close: 306 ↛ 309line 306 didn't jump to line 309 because the condition on line 306 was always true
307 source.close()
309 for subject in self.subjects(unique=True):
310 if subject not in self.all_entities: 310 ↛ 311line 310 didn't jump to line 311 because the condition on line 310 was never true
311 self.all_entities.add(subject)
313 if subject not in self.entity_index: 313 ↛ 314line 313 didn't jump to line 314 because the condition on line 313 was never true
314 self.entity_index[subject] = {'to_be_deleted': False, 'is_restored': False, 'resp_agent': resp_agent, 'source': primary_source}
316 return self
318class OCDMDataset(OCDMGraphCommons, Dataset):
319 def __init__(self, counter_handler: CounterHandler = None):
320 Dataset.__init__(self)
321 self.preexisting_graph = Dataset()
322 OCDMGraphCommons.__init__(self, counter_handler)
324 def __deepcopy__(self, memo):
325 new_graph = OCDMDataset(counter_handler=self.provenance.counter_handler)
327 # Copy graph data
328 for quad in self.quads((None, None, None, None)):
329 new_graph.add(quad)
331 # Copy entity index and metadata
332 for key, value in self.entity_index.items():
333 new_graph.entity_index[key] = value.copy()
334 new_graph.all_entities = self.all_entities.copy()
335 for key, value in self.merge_index.items(): 335 ↛ 336line 335 didn't jump to line 336 because the loop on line 335 never started
336 new_graph._OCDMGraphCommons__merge_index[key] = value.copy()
338 return new_graph
340 def add(
341 self,
342 triple_or_quad: Union[
343 Tuple["_SubjectType", "_PredicateType", "_ObjectType", Optional[Any]],
344 "_TripleType",
345 ],
346 resp_agent = None,
347 primary_source = None
348 ) -> "Dataset":
349 """
350 Add a triple or quad to the store.
352 if a triple is given it is added to the default context
353 """
355 s, p, o, c = self._spoc(triple_or_quad, default=True)
357 _assertnode(s, p, o)
359 # type error: Argument "context" to "add" of "Store" has incompatible type "Optional[Graph]"; expected "Graph"
360 self.store.add((s, p, o), context=c, quoted=False) # type: ignore[arg-type]
362 # Add the subject to all_entities if it's not already present
363 if s not in self.all_entities:
364 self.all_entities.add(s)
366 if s not in self.entity_index:
367 self.entity_index[s] = {'to_be_deleted': False, 'is_restored': False, 'resp_agent': resp_agent, 'source': primary_source, 'graph_iri': None}
369 # Store graph_iri in entity_index for later retrieval
370 # We already have the context from _spoc, use it directly for efficiency
371 if self.entity_index[s]['graph_iri'] is None:
372 self.entity_index[s]['graph_iri'] = _extract_graph_iri_from_context(c)
374 return self
376 def parse(
377 self,
378 source: Optional[
379 Union[IO[bytes], TextIO, InputSource, str, bytes, pathlib.PurePath]
380 ] = None,
381 publicID: Optional[str] = None, # noqa: N803
382 format: Optional[str] = None,
383 location: Optional[str] = None,
384 file: Optional[Union[BinaryIO, TextIO]] = None,
385 data: Optional[Union[str, bytes]] = None,
386 resp_agent: URIRef = None,
387 primary_source: URIRef = None,
388 **args: Any,
389 ) -> "Graph":
390 """
391 Parse source adding the resulting triples to its own context
392 (sub graph of this graph).
394 See :meth:`rdflib.graph.Graph.parse` for documentation on arguments.
396 :Returns:
398 The graph into which the source was parsed. In the case of n3
399 it returns the root context.
401 .. caution::
403 This method can access directly or indirectly requested network or
404 file resources, for example, when parsing JSON-LD documents with
405 ``@context`` directives that point to a network location.
407 When processing untrusted or potentially malicious documents,
408 measures should be taken to restrict network and file access.
410 For information on available security measures, see the RDFLib
411 :doc:`Security Considerations </security_considerations>`
412 documentation.
413 """
415 source = create_input_source(
416 source=source,
417 publicID=publicID,
418 location=location,
419 file=file,
420 data=data,
421 format=format,
422 )
424 # NOTE on type hint: `xml.sax.xmlreader.InputSource.getPublicId` has no
425 # type annotations but given that systemId should be a string, and
426 # given that there is no specific mention of type for publicId, it
427 # seems reasonable to assume it should also be a string. Furthermore,
428 # create_input_source will ensure that publicId is not None, though it
429 # would be good if this guarantee was made more explicit i.e. by type
430 # hint on InputSource (TODO/FIXME).
431 g_id: str = publicID and publicID or source.getPublicId()
432 if not isinstance(g_id, Node): 432 ↛ 433line 432 didn't jump to line 433 because the condition on line 432 was never true
433 g_id = URIRef(g_id)
435 context = Graph(store=self.store, identifier=g_id)
436 context.remove((None, None, None)) # hmm ?
437 context.parse(source, publicID=publicID, format=format, **args)
438 # TODO: FIXME: This should not return context, but self.
440 unique_subjects = set()
441 for s, _, _, _ in self.quads((None, None, None, None)):
442 unique_subjects.add(s)
444 for subject in unique_subjects:
445 if subject not in self.all_entities: 445 ↛ 448line 445 didn't jump to line 448 because the condition on line 445 was always true
446 self.all_entities.add(subject)
448 if subject not in self.entity_index: 448 ↛ 452line 448 didn't jump to line 452 because the condition on line 448 was always true
449 self.entity_index[subject] = {'to_be_deleted': False, 'is_restored': False, 'resp_agent': resp_agent, 'source': primary_source, 'graph_iri': None}
451 # Store graph_iri for this subject by finding its context
452 if 'graph_iri' not in self.entity_index[subject] or self.entity_index[subject]['graph_iri'] is None: 452 ↛ 444line 452 didn't jump to line 444 because the condition on line 452 was always true
453 self.entity_index[subject]['graph_iri'] = _extract_graph_iri(self, subject)
455 return context
457def _assertnode(*terms):
458 for t in terms:
459 assert isinstance(t, Node), "Term %s must be an rdflib term" % (t,)
460 return True
463# Backward compatibility alias
464class OCDMConjunctiveGraph(OCDMDataset):
465 """
466 Deprecated: Use OCDMDataset instead.
468 This class is maintained for backward compatibility only.
469 OCDMConjunctiveGraph has been renamed to OCDMDataset to reflect
470 the migration from the deprecated ConjunctiveGraph to Dataset.
471 """
472 def __init__(self, counter_handler: CounterHandler = None):
473 warnings.warn(
474 "OCDMConjunctiveGraph is deprecated, use OCDMDataset instead",
475 DeprecationWarning,
476 stacklevel=2
477 )
478 super().__init__(counter_handler)