Coverage for rdflib_ocdm/ocdm_graph.py: 84%

194 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-11-01 22:02 +0000

1#!/usr/bin/python 

2# -*- coding: utf-8 -*- 

3# Copyright 2023 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# Permission to use, copy, modify, and/or distribute this software for any purpose 

6# with or without fee is hereby granted, provided that the above copyright notice 

7# and this permission notice appear in all copies. 

8# 

9# THE SOFTWARE IS PROVIDED 'AS IS' AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, 

12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, 

13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 

14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS 

15# SOFTWARE. 

16 

17from __future__ import annotations 

18 

19from typing import TYPE_CHECKING 

20 

21import rdflib 

22import rdflib.plugin as plugin 

23from rdflib.exceptions import ParserError 

24from rdflib.parser import InputSource, Parser, create_input_source 

25 

26if TYPE_CHECKING: 26 ↛ 27line 26 didn't jump to line 27 because the condition on line 26 was never true

27 _SubjectType = Node 

28 _PredicateType = Node 

29 _ObjectType = Node 

30 _TripleType = Tuple["_SubjectType", "_PredicateType", "_ObjectType"] 

31 from typing import (IO, TYPE_CHECKING, Any, BinaryIO, List, Optional, TextIO, 

32 Union, List, Tuple) 

33 

34import pathlib 

35import warnings 

36from copy import deepcopy 

37from datetime import datetime, timedelta, timezone 

38 

39from rdflib import Dataset, Graph, URIRef 

40from rdflib.term import Node 

41 

42from rdflib_ocdm.counter_handler.counter_handler import CounterHandler 

43from rdflib_ocdm.graph_utils import _extract_graph_iri, _extract_graph_iri_from_context 

44from rdflib_ocdm.prov.provenance import OCDMProvenance 

45from rdflib_ocdm.prov.snapshot_entity import SnapshotEntity 

46 

47 

48class OCDMGraphCommons(): 

49 def __init__(self, counter_handler: CounterHandler): 

50 self.__merge_index = dict() 

51 self.__entity_index = dict() 

52 self.all_entities = set() 

53 self.provenance = OCDMProvenance(self, counter_handler) 

54 

55 def preexisting_finished(self: Graph|Dataset|OCDMGraphCommons, resp_agent: str = None, primary_source: str = None, c_time: str = None): 

56 self.preexisting_graph = deepcopy(self) 

57 

58 unique_subjects = set() 

59 if isinstance(self, Dataset): 

60 for s, _, _, _ in self.quads((None, None, None, None)): 

61 unique_subjects.add(s) 

62 else: 

63 unique_subjects = set(self.subjects(unique=True)) 

64 

65 for subject in unique_subjects: 

66 # Initialize or update entity_index, preserving graph_iri if already set 

67 existing_graph_iri = self.entity_index.get(subject, {}).get('graph_iri') 

68 self.entity_index[subject] = {'to_be_deleted': False, 'is_restored': False, 'resp_agent': resp_agent, 'source': primary_source, 'graph_iri': existing_graph_iri} 

69 

70 # If graph_iri not yet set and this is a Dataset, find it 

71 if isinstance(self, Dataset) and existing_graph_iri is None: 71 ↛ 72line 71 didn't jump to line 72 because the condition on line 71 was never true

72 self.entity_index[subject]['graph_iri'] = _extract_graph_iri(self, subject) 

73 

74 self.all_entities.add(subject) 

75 count = self.provenance.counter_handler.read_counter(subject) 

76 if count == 0: 

77 if c_time is None: 

78 cur_time: str = (datetime.now(tz=timezone.utc).replace(microsecond=0) - timedelta(seconds=5)).isoformat(sep="T") 

79 else: 

80 cur_time: str = (datetime.fromtimestamp(c_time, tz=timezone.utc).replace(microsecond=0) - timedelta(seconds=5)).isoformat(sep="T") 

81 new_snapshot: SnapshotEntity = self.provenance._create_snapshot(subject, cur_time) 

82 new_snapshot.has_description(f"The entity '{str(subject)}' has been created.") 

83 

84 def merge(self: Graph|Dataset|OCDMGraphCommons, res: URIRef, other: URIRef): 

85 # Preserve graph_iri before removing quads 

86 other_graph_iri = None 

87 if isinstance(self, Dataset): 

88 # Extract graph_iri from the entity being merged before removal 

89 other_graph_iri = _extract_graph_iri(self, other) 

90 

91 quads_list: List[Tuple] = list(self.quads((None, None, other, None))) 

92 for s, p, o, c in quads_list: 

93 self.remove((s, p, o, c)) 

94 self.add((s, p, res, c)) 

95 quads_list: List[Tuple] = list(self.quads((other, None, None, None))) 

96 for s, p, o, c in quads_list: 

97 self.remove((s, p, o, c)) 

98 else: 

99 triples_list: List[Tuple] = list(self.triples((None, None, other))) 

100 for triple in triples_list: 

101 self.remove(triple) 

102 new_triple = (triple[0], triple[1], res) 

103 self.add(new_triple) 

104 triples_list: List[Tuple] = list(self.triples((other, None, None))) 

105 for triple in triples_list: 

106 self.remove(triple) 

107 

108 self.__merge_index.setdefault(res, set()).add(other) 

109 if other not in self.entity_index: 

110 self.entity_index[other] = {'to_be_deleted': False, 'is_restored': False, 'resp_agent': None, 'source': None, 'graph_iri': other_graph_iri} 

111 else: 

112 # Preserve graph_iri if it was already set 

113 if other_graph_iri is not None and self.entity_index[other].get('graph_iri') is None: 113 ↛ 114line 113 didn't jump to line 114 because the condition on line 113 was never true

114 self.entity_index[other]['graph_iri'] = other_graph_iri 

115 self.entity_index[other]['to_be_deleted'] = True 

116 

117 def mark_as_deleted(self, res: URIRef) -> None: 

118 self.entity_index[res]['to_be_deleted'] = True 

119 

120 def mark_as_restored(self, res: URIRef) -> None: 

121 """ 

122 Marks an entity as being restored after deletion. 

123 This will: 

124 1. Set is_restored flag to True in the entity_index 

125 2. Set to_be_deleted flag to False 

126  

127 :param res: The URI reference of the entity to restore 

128 :type res: URIRef 

129 :return: None 

130 """ 

131 if res in self.entity_index: 131 ↛ exitline 131 didn't return from function 'mark_as_restored' because the condition on line 131 was always true

132 self.entity_index[res]['is_restored'] = True 

133 self.entity_index[res]['to_be_deleted'] = False 

134 

135 @property 

136 def merge_index(self) -> dict: 

137 return self.__merge_index 

138 

139 @property 

140 def entity_index(self) -> dict: 

141 return self.__entity_index 

142 

143 def generate_provenance(self, c_time: float = None) -> None: 

144 return self.provenance.generate_provenance(c_time) 

145 

146 def get_entity(self, res: str) -> Optional[SnapshotEntity]: 

147 return self.provenance.get_entity(res) 

148 

149 def commit_changes(self): 

150 self.__merge_index = dict() 

151 self.__entity_index = dict() 

152 self.preexisting_graph = deepcopy(self) 

153 

154 def get_provenance_graphs(self) -> Dataset: 

155 prov_g = Dataset() 

156 for _, prov_entity in self.provenance.res_to_entity.items(): 

157 for triple in prov_entity.g.triples((None, None, None)): 

158 prov_g.add((triple[0], triple[1], triple[2], URIRef(prov_entity.prov_subject + '/prov/'))) 

159 return prov_g 

160 

161class OCDMGraph(OCDMGraphCommons, Graph): 

162 def __init__(self, counter_handler: CounterHandler = None): 

163 Graph.__init__(self) 

164 self.preexisting_graph = Graph() 

165 OCDMGraphCommons.__init__(self, counter_handler) 

166 

167 def add(self, triple: "_TripleType", resp_agent = None, primary_source = None): 

168 """Add a triple with self as context""" 

169 s, p, o = triple 

170 assert isinstance(s, Node), "Subject %s must be an rdflib term" % (s,) 

171 assert isinstance(p, Node), "Predicate %s must be an rdflib term" % (p,) 

172 assert isinstance(o, Node), "Object %s must be an rdflib term" % (o,) 

173 self._Graph__store.add((s, p, o), self, quoted=False) 

174 

175 # Add the subject to all_entities if it's not already present 

176 if s not in self.all_entities: 

177 self.all_entities.add(s) 

178 

179 if s not in self.entity_index: 

180 self.entity_index[s] = {'to_be_deleted': False, 'is_restored': False, 'resp_agent': resp_agent, 'source': primary_source} 

181 

182 return self 

183 

184 def parse( 

185 self, 

186 source: Optional[ 

187 Union[IO[bytes], TextIO, InputSource, str, bytes, pathlib.PurePath] 

188 ] = None, 

189 publicID: Optional[str] = None, # noqa: N803 

190 format: Optional[str] = None, 

191 location: Optional[str] = None, 

192 file: Optional[Union[BinaryIO, TextIO]] = None, 

193 data: Optional[Union[str, bytes]] = None, 

194 resp_agent: URIRef = None, 

195 primary_source: URIRef = None, 

196 **args: Any, 

197 ) -> "Graph": 

198 """ 

199 Parse an RDF source adding the resulting triples to the Graph. 

200 

201 The source is specified using one of source, location, file or data. 

202 

203 .. caution:: 

204 

205 This method can access directly or indirectly requested network or 

206 file resources, for example, when parsing JSON-LD documents with 

207 ``@context`` directives that point to a network location. 

208 

209 When processing untrusted or potentially malicious documents, 

210 measures should be taken to restrict network and file access. 

211 

212 For information on available security measures, see the RDFLib 

213 :doc:`Security Considerations </security_considerations>` 

214 documentation. 

215 

216 :Parameters: 

217 

218 - ``source``: An InputSource, file-like object, or string. In the case 

219 of a string the string is the location of the source. 

220 - ``location``: A string indicating the relative or absolute URL of 

221 the source. Graph's absolutize method is used if a relative location 

222 is specified. 

223 - ``file``: A file-like object. 

224 - ``data``: A string containing the data to be parsed. 

225 - ``format``: Used if format can not be determined from source, e.g. 

226 file extension or Media Type. Defaults to text/turtle. Format 

227 support can be extended with plugins, but "xml", "n3" (use for 

228 turtle), "nt" & "trix" are built in. 

229 - ``publicID``: the logical URI to use as the document base. If None 

230 specified the document location is used (at least in the case where 

231 there is a document location). 

232 

233 :Returns: 

234 

235 - self, the graph instance. 

236 

237 Examples: 

238 

239 >>> my_data = ''' 

240 ... <rdf:RDF 

241 ... xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" 

242 ... xmlns:rdfs="http://www.w3.org/2000/01/rdf-schema#" 

243 ... > 

244 ... <rdf:Description> 

245 ... <rdfs:label>Example</rdfs:label> 

246 ... <rdfs:comment>This is really just an example.</rdfs:comment> 

247 ... </rdf:Description> 

248 ... </rdf:RDF> 

249 ... ''' 

250 >>> import os, tempfile 

251 >>> fd, file_name = tempfile.mkstemp() 

252 >>> f = os.fdopen(fd, "w") 

253 >>> dummy = f.write(my_data) # Returns num bytes written 

254 >>> f.close() 

255 

256 >>> g = Graph() 

257 >>> result = g.parse(data=my_data, format="application/rdf+xml") 

258 >>> len(g) 

259 2 

260 

261 >>> g = Graph() 

262 >>> result = g.parse(location=file_name, format="application/rdf+xml") 

263 >>> len(g) 

264 2 

265 

266 >>> g = Graph() 

267 >>> with open(file_name, "r") as f: 

268 ... result = g.parse(f, format="application/rdf+xml") 

269 >>> len(g) 

270 2 

271 

272 >>> os.remove(file_name) 

273 

274 >>> # default turtle parsing 

275 >>> result = g.parse(data="<http://example.com/a> <http://example.com/a> <http://example.com/a> .") 

276 >>> len(g) 

277 3 

278 

279 """ 

280 

281 source = create_input_source( 

282 source=source, 

283 publicID=publicID, 

284 location=location, 

285 file=file, 

286 data=data, 

287 format=format, 

288 ) 

289 if format is None: 289 ↛ 291line 289 didn't jump to line 291 because the condition on line 289 was always true

290 format = source.content_type 

291 could_not_guess_format = False 

292 if format is None: 292 ↛ 302line 292 didn't jump to line 302 because the condition on line 292 was always true

293 if ( 293 ↛ 299line 293 didn't jump to line 299 because the condition on line 293 was always true

294 hasattr(source, "file") 

295 and getattr(source.file, "name", None) 

296 and isinstance(source.file.name, str) 

297 ): 

298 format = rdflib.util.guess_format(source.file.name) 

299 if format is None: 299 ↛ 300line 299 didn't jump to line 300 because the condition on line 299 was never true

300 format = "turtle" 

301 could_not_guess_format = True 

302 parser = plugin.get(format, Parser)() 

303 try: 

304 # TODO FIXME: Parser.parse should have **kwargs argument. 

305 parser.parse(source, self, **args) 

306 except SyntaxError as se: 

307 if could_not_guess_format: 

308 raise ParserError( 

309 "Could not guess RDF format for %r from file extension so tried Turtle but failed." 

310 "You can explicitly specify format using the format argument." 

311 % source 

312 ) 

313 else: 

314 raise se 

315 finally: 

316 if source.auto_close: 316 ↛ 319line 316 didn't jump to line 319 because the condition on line 316 was always true

317 source.close() 

318 

319 for subject in self.subjects(unique=True): 

320 if subject not in self.all_entities: 320 ↛ 321line 320 didn't jump to line 321 because the condition on line 320 was never true

321 self.all_entities.add(subject) 

322 

323 if subject not in self.entity_index: 323 ↛ 324line 323 didn't jump to line 324 because the condition on line 323 was never true

324 self.entity_index[subject] = {'to_be_deleted': False, 'is_restored': False, 'resp_agent': resp_agent, 'source': primary_source} 

325 

326 return self 

327 

328class OCDMDataset(OCDMGraphCommons, Dataset): 

329 def __init__(self, counter_handler: CounterHandler = None): 

330 Dataset.__init__(self) 

331 self.preexisting_graph = Dataset() 

332 OCDMGraphCommons.__init__(self, counter_handler) 

333 

334 def __deepcopy__(self, memo): 

335 new_graph = OCDMDataset(counter_handler=self.provenance.counter_handler) 

336 

337 # Copy graph data 

338 for quad in self.quads((None, None, None, None)): 

339 new_graph.add(quad) 

340 

341 # Copy entity index and metadata 

342 for key, value in self.entity_index.items(): 

343 new_graph.entity_index[key] = value.copy() 

344 new_graph.all_entities = self.all_entities.copy() 

345 for key, value in self.merge_index.items(): 345 ↛ 346line 345 didn't jump to line 346 because the loop on line 345 never started

346 new_graph._OCDMGraphCommons__merge_index[key] = value.copy() 

347 

348 return new_graph 

349 

350 def add( 

351 self, 

352 triple_or_quad: Union[ 

353 Tuple["_SubjectType", "_PredicateType", "_ObjectType", Optional[Any]], 

354 "_TripleType", 

355 ], 

356 resp_agent = None, 

357 primary_source = None 

358 ) -> "Dataset": 

359 """ 

360 Add a triple or quad to the store. 

361 

362 if a triple is given it is added to the default context 

363 """ 

364 

365 s, p, o, c = self._spoc(triple_or_quad, default=True) 

366 

367 _assertnode(s, p, o) 

368 

369 # type error: Argument "context" to "add" of "Store" has incompatible type "Optional[Graph]"; expected "Graph" 

370 self.store.add((s, p, o), context=c, quoted=False) # type: ignore[arg-type] 

371 

372 # Add the subject to all_entities if it's not already present 

373 if s not in self.all_entities: 

374 self.all_entities.add(s) 

375 

376 if s not in self.entity_index: 

377 self.entity_index[s] = {'to_be_deleted': False, 'is_restored': False, 'resp_agent': resp_agent, 'source': primary_source, 'graph_iri': None} 

378 

379 # Store graph_iri in entity_index for later retrieval 

380 # We already have the context from _spoc, use it directly for efficiency 

381 if self.entity_index[s]['graph_iri'] is None: 

382 self.entity_index[s]['graph_iri'] = _extract_graph_iri_from_context(c) 

383 

384 return self 

385 

386 def parse( 

387 self, 

388 source: Optional[ 

389 Union[IO[bytes], TextIO, InputSource, str, bytes, pathlib.PurePath] 

390 ] = None, 

391 publicID: Optional[str] = None, # noqa: N803 

392 format: Optional[str] = None, 

393 location: Optional[str] = None, 

394 file: Optional[Union[BinaryIO, TextIO]] = None, 

395 data: Optional[Union[str, bytes]] = None, 

396 resp_agent: URIRef = None, 

397 primary_source: URIRef = None, 

398 **args: Any, 

399 ) -> "Graph": 

400 """ 

401 Parse source adding the resulting triples to its own context 

402 (sub graph of this graph). 

403 

404 See :meth:`rdflib.graph.Graph.parse` for documentation on arguments. 

405 

406 :Returns: 

407 

408 The graph into which the source was parsed. In the case of n3 

409 it returns the root context. 

410 

411 .. caution:: 

412 

413 This method can access directly or indirectly requested network or 

414 file resources, for example, when parsing JSON-LD documents with 

415 ``@context`` directives that point to a network location. 

416 

417 When processing untrusted or potentially malicious documents, 

418 measures should be taken to restrict network and file access. 

419 

420 For information on available security measures, see the RDFLib 

421 :doc:`Security Considerations </security_considerations>` 

422 documentation. 

423 """ 

424 

425 source = create_input_source( 

426 source=source, 

427 publicID=publicID, 

428 location=location, 

429 file=file, 

430 data=data, 

431 format=format, 

432 ) 

433 

434 # NOTE on type hint: `xml.sax.xmlreader.InputSource.getPublicId` has no 

435 # type annotations but given that systemId should be a string, and 

436 # given that there is no specific mention of type for publicId, it 

437 # seems reasonable to assume it should also be a string. Furthermore, 

438 # create_input_source will ensure that publicId is not None, though it 

439 # would be good if this guarantee was made more explicit i.e. by type 

440 # hint on InputSource (TODO/FIXME). 

441 g_id: str = publicID and publicID or source.getPublicId() 

442 if not isinstance(g_id, Node): 442 ↛ 443line 442 didn't jump to line 443 because the condition on line 442 was never true

443 g_id = URIRef(g_id) 

444 

445 context = Graph(store=self.store, identifier=g_id) 

446 context.remove((None, None, None)) # hmm ? 

447 context.parse(source, publicID=publicID, format=format, **args) 

448 # TODO: FIXME: This should not return context, but self. 

449 

450 unique_subjects = set() 

451 for s, _, _, _ in self.quads((None, None, None, None)): 

452 unique_subjects.add(s) 

453 

454 for subject in unique_subjects: 

455 if subject not in self.all_entities: 455 ↛ 458line 455 didn't jump to line 458 because the condition on line 455 was always true

456 self.all_entities.add(subject) 

457 

458 if subject not in self.entity_index: 458 ↛ 462line 458 didn't jump to line 462 because the condition on line 458 was always true

459 self.entity_index[subject] = {'to_be_deleted': False, 'is_restored': False, 'resp_agent': resp_agent, 'source': primary_source, 'graph_iri': None} 

460 

461 # Store graph_iri for this subject by finding its context 

462 if 'graph_iri' not in self.entity_index[subject] or self.entity_index[subject]['graph_iri'] is None: 462 ↛ 454line 462 didn't jump to line 454 because the condition on line 462 was always true

463 self.entity_index[subject]['graph_iri'] = _extract_graph_iri(self, subject) 

464 

465 return context 

466 

467def _assertnode(*terms): 

468 for t in terms: 

469 assert isinstance(t, Node), "Term %s must be an rdflib term" % (t,) 

470 return True 

471 

472 

473# Backward compatibility alias 

474class OCDMConjunctiveGraph(OCDMDataset): 

475 """ 

476 Deprecated: Use OCDMDataset instead. 

477 

478 This class is maintained for backward compatibility only. 

479 OCDMConjunctiveGraph has been renamed to OCDMDataset to reflect 

480 the migration from the deprecated ConjunctiveGraph to Dataset. 

481 """ 

482 def __init__(self, counter_handler: CounterHandler = None): 

483 warnings.warn( 

484 "OCDMConjunctiveGraph is deprecated, use OCDMDataset instead", 

485 DeprecationWarning, 

486 stacklevel=2 

487 ) 

488 super().__init__(counter_handler)