Coverage for rdflib_ocdm / ocdm_graph.py: 86%

188 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-21 12:35 +0000

1#!/usr/bin/python 

2 

3# SPDX-FileCopyrightText: 2023-2025 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# SPDX-License-Identifier: ISC 

6 

7from __future__ import annotations 

8 

9from typing import TYPE_CHECKING 

10 

11import rdflib 

12import rdflib.plugin as plugin 

13from rdflib.exceptions import ParserError 

14from rdflib.parser import InputSource, Parser, create_input_source 

15 

16if TYPE_CHECKING: 

17 _SubjectType = Node 

18 _PredicateType = Node 

19 _ObjectType = Node 

20 _TripleType = Tuple["_SubjectType", "_PredicateType", "_ObjectType"] 

21 from typing import (IO, TYPE_CHECKING, Any, BinaryIO, List, Optional, TextIO, 

22 Union, List, Tuple) 

23 

24import pathlib 

25import warnings 

26from copy import deepcopy 

27from datetime import datetime, timedelta, timezone 

28 

29from rdflib import Dataset, Graph, URIRef 

30from rdflib.term import Node 

31 

32from rdflib_ocdm.counter_handler.counter_handler import CounterHandler 

33from rdflib_ocdm.graph_utils import _extract_graph_iri, _extract_graph_iri_from_context 

34from rdflib_ocdm.prov.provenance import OCDMProvenance 

35from rdflib_ocdm.prov.snapshot_entity import SnapshotEntity 

36 

37 

38class OCDMGraphCommons(): 

39 def __init__(self, counter_handler: CounterHandler): 

40 self.__merge_index = dict() 

41 self.__entity_index = dict() 

42 self.all_entities = set() 

43 self.provenance = OCDMProvenance(self, counter_handler) 

44 

45 def preexisting_finished(self: Graph|Dataset|OCDMGraphCommons, resp_agent: str = None, primary_source: str = None, c_time: str = None): 

46 self.preexisting_graph = deepcopy(self) 

47 

48 unique_subjects = set() 

49 if isinstance(self, Dataset): 

50 for s, _, _, _ in self.quads((None, None, None, None)): 

51 unique_subjects.add(s) 

52 else: 

53 unique_subjects = set(self.subjects(unique=True)) 

54 

55 for subject in unique_subjects: 

56 # Initialize or update entity_index, preserving graph_iri if already set 

57 existing_graph_iri = self.entity_index.get(subject, {}).get('graph_iri') 

58 self.entity_index[subject] = {'to_be_deleted': False, 'is_restored': False, 'resp_agent': resp_agent, 'source': primary_source, 'graph_iri': existing_graph_iri} 

59 

60 # If graph_iri not yet set and this is a Dataset, find it 

61 if isinstance(self, Dataset) and existing_graph_iri is None: 61 ↛ 62line 61 didn't jump to line 62 because the condition on line 61 was never true

62 self.entity_index[subject]['graph_iri'] = _extract_graph_iri(self, subject) 

63 

64 self.all_entities.add(subject) 

65 count = self.provenance.counter_handler.read_counter(subject) 

66 if count == 0: 

67 if c_time is None: 

68 cur_time: str = (datetime.now(tz=timezone.utc).replace(microsecond=0) - timedelta(seconds=5)).isoformat(sep="T") 

69 else: 

70 cur_time: str = (datetime.fromtimestamp(c_time, tz=timezone.utc).replace(microsecond=0) - timedelta(seconds=5)).isoformat(sep="T") 

71 new_snapshot: SnapshotEntity = self.provenance._create_snapshot(subject, cur_time) 

72 new_snapshot.has_description(f"The entity '{str(subject)}' has been created.") 

73 

74 def merge(self: Graph|Dataset|OCDMGraphCommons, res: URIRef, other: URIRef): 

75 # Preserve graph_iri before removing quads 

76 other_graph_iri = None 

77 if isinstance(self, Dataset): 

78 # Extract graph_iri from the entity being merged before removal 

79 other_graph_iri = _extract_graph_iri(self, other) 

80 

81 quads_list: List[Tuple] = list(self.quads((None, None, other, None))) 

82 for s, p, o, c in quads_list: 

83 self.remove((s, p, o, c)) 

84 self.add((s, p, res, c)) 

85 quads_list: List[Tuple] = list(self.quads((other, None, None, None))) 

86 for s, p, o, c in quads_list: 

87 self.remove((s, p, o, c)) 

88 else: 

89 triples_list: List[Tuple] = list(self.triples((None, None, other))) 

90 for triple in triples_list: 

91 self.remove(triple) 

92 new_triple = (triple[0], triple[1], res) 

93 self.add(new_triple) 

94 triples_list: List[Tuple] = list(self.triples((other, None, None))) 

95 for triple in triples_list: 

96 self.remove(triple) 

97 

98 self.__merge_index.setdefault(res, set()).add(other) 

99 if other not in self.entity_index: 

100 self.entity_index[other] = {'to_be_deleted': False, 'is_restored': False, 'resp_agent': None, 'source': None, 'graph_iri': other_graph_iri} 

101 else: 

102 # Preserve graph_iri if it was already set 

103 if other_graph_iri is not None and self.entity_index[other].get('graph_iri') is None: 103 ↛ 104line 103 didn't jump to line 104 because the condition on line 103 was never true

104 self.entity_index[other]['graph_iri'] = other_graph_iri 

105 self.entity_index[other]['to_be_deleted'] = True 

106 

107 def mark_as_deleted(self, res: URIRef) -> None: 

108 self.entity_index[res]['to_be_deleted'] = True 

109 

110 def mark_as_restored(self, res: URIRef) -> None: 

111 """ 

112 Marks an entity as being restored after deletion. 

113 This will: 

114 1. Set is_restored flag to True in the entity_index 

115 2. Set to_be_deleted flag to False 

116  

117 :param res: The URI reference of the entity to restore 

118 :type res: URIRef 

119 :return: None 

120 """ 

121 if res in self.entity_index: 121 ↛ exitline 121 didn't return from function 'mark_as_restored' because the condition on line 121 was always true

122 self.entity_index[res]['is_restored'] = True 

123 self.entity_index[res]['to_be_deleted'] = False 

124 

125 @property 

126 def merge_index(self) -> dict: 

127 return self.__merge_index 

128 

129 @property 

130 def entity_index(self) -> dict: 

131 return self.__entity_index 

132 

133 def generate_provenance(self, c_time: float = None) -> None: 

134 return self.provenance.generate_provenance(c_time) 

135 

136 def get_entity(self, res: str) -> Optional[SnapshotEntity]: 

137 return self.provenance.get_entity(res) 

138 

139 def commit_changes(self): 

140 self.__merge_index = dict() 

141 self.__entity_index = dict() 

142 self.preexisting_graph = deepcopy(self) 

143 

144 def get_provenance_graphs(self) -> Dataset: 

145 prov_g = Dataset() 

146 for _, prov_entity in self.provenance.res_to_entity.items(): 

147 for triple in prov_entity.g.triples((None, None, None)): 

148 prov_g.add((triple[0], triple[1], triple[2], URIRef(prov_entity.prov_subject + '/prov/'))) 

149 return prov_g 

150 

151class OCDMGraph(OCDMGraphCommons, Graph): 

152 def __init__(self, counter_handler: CounterHandler = None): 

153 Graph.__init__(self) 

154 self.preexisting_graph = Graph() 

155 OCDMGraphCommons.__init__(self, counter_handler) 

156 

157 def add(self, triple: "_TripleType", resp_agent = None, primary_source = None): 

158 """Add a triple with self as context""" 

159 s, p, o = triple 

160 assert isinstance(s, Node), "Subject %s must be an rdflib term" % (s,) 

161 assert isinstance(p, Node), "Predicate %s must be an rdflib term" % (p,) 

162 assert isinstance(o, Node), "Object %s must be an rdflib term" % (o,) 

163 self._Graph__store.add((s, p, o), self, quoted=False) 

164 

165 # Add the subject to all_entities if it's not already present 

166 if s not in self.all_entities: 

167 self.all_entities.add(s) 

168 

169 if s not in self.entity_index: 

170 self.entity_index[s] = {'to_be_deleted': False, 'is_restored': False, 'resp_agent': resp_agent, 'source': primary_source} 

171 

172 return self 

173 

174 def parse( 

175 self, 

176 source: Optional[ 

177 Union[IO[bytes], TextIO, InputSource, str, bytes, pathlib.PurePath] 

178 ] = None, 

179 publicID: Optional[str] = None, # noqa: N803 

180 format: Optional[str] = None, 

181 location: Optional[str] = None, 

182 file: Optional[Union[BinaryIO, TextIO]] = None, 

183 data: Optional[Union[str, bytes]] = None, 

184 resp_agent: URIRef = None, 

185 primary_source: URIRef = None, 

186 **args: Any, 

187 ) -> "Graph": 

188 """ 

189 Parse an RDF source adding the resulting triples to the Graph. 

190 

191 The source is specified using one of source, location, file or data. 

192 

193 .. caution:: 

194 

195 This method can access directly or indirectly requested network or 

196 file resources, for example, when parsing JSON-LD documents with 

197 ``@context`` directives that point to a network location. 

198 

199 When processing untrusted or potentially malicious documents, 

200 measures should be taken to restrict network and file access. 

201 

202 For information on available security measures, see the RDFLib 

203 :doc:`Security Considerations </security_considerations>` 

204 documentation. 

205 

206 :Parameters: 

207 

208 - ``source``: An InputSource, file-like object, or string. In the case 

209 of a string the string is the location of the source. 

210 - ``location``: A string indicating the relative or absolute URL of 

211 the source. Graph's absolutize method is used if a relative location 

212 is specified. 

213 - ``file``: A file-like object. 

214 - ``data``: A string containing the data to be parsed. 

215 - ``format``: Used if format can not be determined from source, e.g. 

216 file extension or Media Type. Defaults to text/turtle. Format 

217 support can be extended with plugins, but "xml", "n3" (use for 

218 turtle), "nt" & "trix" are built in. 

219 - ``publicID``: the logical URI to use as the document base. If None 

220 specified the document location is used (at least in the case where 

221 there is a document location). 

222 

223 :Returns: 

224 

225 - self, the graph instance. 

226 

227 Examples: 

228 

229 >>> my_data = ''' 

230 ... <rdf:RDF 

231 ... xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" 

232 ... xmlns:rdfs="http://www.w3.org/2000/01/rdf-schema#" 

233 ... > 

234 ... <rdf:Description> 

235 ... <rdfs:label>Example</rdfs:label> 

236 ... <rdfs:comment>This is really just an example.</rdfs:comment> 

237 ... </rdf:Description> 

238 ... </rdf:RDF> 

239 ... ''' 

240 >>> import os, tempfile 

241 >>> fd, file_name = tempfile.mkstemp() 

242 >>> f = os.fdopen(fd, "w") 

243 >>> dummy = f.write(my_data) # Returns num bytes written 

244 >>> f.close() 

245 

246 >>> g = Graph() 

247 >>> result = g.parse(data=my_data, format="application/rdf+xml") 

248 >>> len(g) 

249 2 

250 

251 >>> g = Graph() 

252 >>> result = g.parse(location=file_name, format="application/rdf+xml") 

253 >>> len(g) 

254 2 

255 

256 >>> g = Graph() 

257 >>> with open(file_name, "r") as f: 

258 ... result = g.parse(f, format="application/rdf+xml") 

259 >>> len(g) 

260 2 

261 

262 >>> os.remove(file_name) 

263 

264 >>> # default turtle parsing 

265 >>> result = g.parse(data="<http://example.com/a> <http://example.com/a> <http://example.com/a> .") 

266 >>> len(g) 

267 3 

268 

269 """ 

270 

271 source = create_input_source( 

272 source=source, 

273 publicID=publicID, 

274 location=location, 

275 file=file, 

276 data=data, 

277 format=format, 

278 ) 

279 if format is None: 279 ↛ 281line 279 didn't jump to line 281 because the condition on line 279 was always true

280 format = source.content_type 

281 could_not_guess_format = False 

282 if format is None: 282 ↛ 292line 282 didn't jump to line 292 because the condition on line 282 was always true

283 if ( 283 ↛ 289line 283 didn't jump to line 289 because the condition on line 283 was always true

284 hasattr(source, "file") 

285 and getattr(source.file, "name", None) 

286 and isinstance(source.file.name, str) 

287 ): 

288 format = rdflib.util.guess_format(source.file.name) 

289 if format is None: 289 ↛ 290line 289 didn't jump to line 290 because the condition on line 289 was never true

290 format = "turtle" 

291 could_not_guess_format = True 

292 parser = plugin.get(format, Parser)() 

293 try: 

294 # TODO FIXME: Parser.parse should have **kwargs argument. 

295 parser.parse(source, self, **args) 

296 except SyntaxError as se: 

297 if could_not_guess_format: 

298 raise ParserError( 

299 "Could not guess RDF format for %r from file extension so tried Turtle but failed." 

300 "You can explicitly specify format using the format argument." 

301 % source 

302 ) 

303 else: 

304 raise se 

305 finally: 

306 if source.auto_close: 306 ↛ 309line 306 didn't jump to line 309 because the condition on line 306 was always true

307 source.close() 

308 

309 for subject in self.subjects(unique=True): 

310 if subject not in self.all_entities: 310 ↛ 311line 310 didn't jump to line 311 because the condition on line 310 was never true

311 self.all_entities.add(subject) 

312 

313 if subject not in self.entity_index: 313 ↛ 314line 313 didn't jump to line 314 because the condition on line 313 was never true

314 self.entity_index[subject] = {'to_be_deleted': False, 'is_restored': False, 'resp_agent': resp_agent, 'source': primary_source} 

315 

316 return self 

317 

318class OCDMDataset(OCDMGraphCommons, Dataset): 

319 def __init__(self, counter_handler: CounterHandler = None): 

320 Dataset.__init__(self) 

321 self.preexisting_graph = Dataset() 

322 OCDMGraphCommons.__init__(self, counter_handler) 

323 

324 def __deepcopy__(self, memo): 

325 new_graph = OCDMDataset(counter_handler=self.provenance.counter_handler) 

326 

327 # Copy graph data 

328 for quad in self.quads((None, None, None, None)): 

329 new_graph.add(quad) 

330 

331 # Copy entity index and metadata 

332 for key, value in self.entity_index.items(): 

333 new_graph.entity_index[key] = value.copy() 

334 new_graph.all_entities = self.all_entities.copy() 

335 for key, value in self.merge_index.items(): 335 ↛ 336line 335 didn't jump to line 336 because the loop on line 335 never started

336 new_graph._OCDMGraphCommons__merge_index[key] = value.copy() 

337 

338 return new_graph 

339 

340 def add( 

341 self, 

342 triple_or_quad: Union[ 

343 Tuple["_SubjectType", "_PredicateType", "_ObjectType", Optional[Any]], 

344 "_TripleType", 

345 ], 

346 resp_agent = None, 

347 primary_source = None 

348 ) -> "Dataset": 

349 """ 

350 Add a triple or quad to the store. 

351 

352 if a triple is given it is added to the default context 

353 """ 

354 

355 s, p, o, c = self._spoc(triple_or_quad, default=True) 

356 

357 _assertnode(s, p, o) 

358 

359 # type error: Argument "context" to "add" of "Store" has incompatible type "Optional[Graph]"; expected "Graph" 

360 self.store.add((s, p, o), context=c, quoted=False) # type: ignore[arg-type] 

361 

362 # Add the subject to all_entities if it's not already present 

363 if s not in self.all_entities: 

364 self.all_entities.add(s) 

365 

366 if s not in self.entity_index: 

367 self.entity_index[s] = {'to_be_deleted': False, 'is_restored': False, 'resp_agent': resp_agent, 'source': primary_source, 'graph_iri': None} 

368 

369 # Store graph_iri in entity_index for later retrieval 

370 # We already have the context from _spoc, use it directly for efficiency 

371 if self.entity_index[s]['graph_iri'] is None: 

372 self.entity_index[s]['graph_iri'] = _extract_graph_iri_from_context(c) 

373 

374 return self 

375 

376 def parse( 

377 self, 

378 source: Optional[ 

379 Union[IO[bytes], TextIO, InputSource, str, bytes, pathlib.PurePath] 

380 ] = None, 

381 publicID: Optional[str] = None, # noqa: N803 

382 format: Optional[str] = None, 

383 location: Optional[str] = None, 

384 file: Optional[Union[BinaryIO, TextIO]] = None, 

385 data: Optional[Union[str, bytes]] = None, 

386 resp_agent: URIRef = None, 

387 primary_source: URIRef = None, 

388 **args: Any, 

389 ) -> "Graph": 

390 """ 

391 Parse source adding the resulting triples to its own context 

392 (sub graph of this graph). 

393 

394 See :meth:`rdflib.graph.Graph.parse` for documentation on arguments. 

395 

396 :Returns: 

397 

398 The graph into which the source was parsed. In the case of n3 

399 it returns the root context. 

400 

401 .. caution:: 

402 

403 This method can access directly or indirectly requested network or 

404 file resources, for example, when parsing JSON-LD documents with 

405 ``@context`` directives that point to a network location. 

406 

407 When processing untrusted or potentially malicious documents, 

408 measures should be taken to restrict network and file access. 

409 

410 For information on available security measures, see the RDFLib 

411 :doc:`Security Considerations </security_considerations>` 

412 documentation. 

413 """ 

414 

415 source = create_input_source( 

416 source=source, 

417 publicID=publicID, 

418 location=location, 

419 file=file, 

420 data=data, 

421 format=format, 

422 ) 

423 

424 # NOTE on type hint: `xml.sax.xmlreader.InputSource.getPublicId` has no 

425 # type annotations but given that systemId should be a string, and 

426 # given that there is no specific mention of type for publicId, it 

427 # seems reasonable to assume it should also be a string. Furthermore, 

428 # create_input_source will ensure that publicId is not None, though it 

429 # would be good if this guarantee was made more explicit i.e. by type 

430 # hint on InputSource (TODO/FIXME). 

431 g_id: str = publicID and publicID or source.getPublicId() 

432 if not isinstance(g_id, Node): 432 ↛ 433line 432 didn't jump to line 433 because the condition on line 432 was never true

433 g_id = URIRef(g_id) 

434 

435 context = Graph(store=self.store, identifier=g_id) 

436 context.remove((None, None, None)) # hmm ? 

437 context.parse(source, publicID=publicID, format=format, **args) 

438 # TODO: FIXME: This should not return context, but self. 

439 

440 unique_subjects = set() 

441 for s, _, _, _ in self.quads((None, None, None, None)): 

442 unique_subjects.add(s) 

443 

444 for subject in unique_subjects: 

445 if subject not in self.all_entities: 445 ↛ 448line 445 didn't jump to line 448 because the condition on line 445 was always true

446 self.all_entities.add(subject) 

447 

448 if subject not in self.entity_index: 448 ↛ 452line 448 didn't jump to line 452 because the condition on line 448 was always true

449 self.entity_index[subject] = {'to_be_deleted': False, 'is_restored': False, 'resp_agent': resp_agent, 'source': primary_source, 'graph_iri': None} 

450 

451 # Store graph_iri for this subject by finding its context 

452 if 'graph_iri' not in self.entity_index[subject] or self.entity_index[subject]['graph_iri'] is None: 452 ↛ 444line 452 didn't jump to line 444 because the condition on line 452 was always true

453 self.entity_index[subject]['graph_iri'] = _extract_graph_iri(self, subject) 

454 

455 return context 

456 

457def _assertnode(*terms): 

458 for t in terms: 

459 assert isinstance(t, Node), "Term %s must be an rdflib term" % (t,) 

460 return True 

461 

462 

463# Backward compatibility alias 

464class OCDMConjunctiveGraph(OCDMDataset): 

465 """ 

466 Deprecated: Use OCDMDataset instead. 

467 

468 This class is maintained for backward compatibility only. 

469 OCDMConjunctiveGraph has been renamed to OCDMDataset to reflect 

470 the migration from the deprecated ConjunctiveGraph to Dataset. 

471 """ 

472 def __init__(self, counter_handler: CounterHandler = None): 

473 warnings.warn( 

474 "OCDMConjunctiveGraph is deprecated, use OCDMDataset instead", 

475 DeprecationWarning, 

476 stacklevel=2 

477 ) 

478 super().__init__(counter_handler)