Coverage for rdflib_ocdm / query_utils.py: 92%

68 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-21 12:35 +0000

1#!/usr/bin/python 

2 

3# SPDX-FileCopyrightText: 2016 Silvio Peroni <essepuntato@gmail.com> 

4# SPDX-FileCopyrightText: 2023-2025 Arcangelo Massari <arcangelo.massari@unibo.it> 

5# 

6# SPDX-License-Identifier: ISC 

7 

8from __future__ import annotations 

9 

10from typing import TYPE_CHECKING 

11 

12if TYPE_CHECKING: 

13 from typing import Tuple 

14 from rdflib.compare import IsomorphicGraph 

15 from rdflib_ocdm.ocdm_graph import OCDMGraphCommons 

16 

17from rdflib import Dataset, Graph, URIRef 

18from rdflib.compare import graph_diff, to_isomorphic 

19 

20from rdflib_ocdm.graph_utils import _extract_graph_iri 

21from rdflib_ocdm.support import get_entity_subgraph 

22 

23 

24def get_delete_query(data: Dataset|Graph, graph_iri: URIRef = None) -> Tuple[str, int]: 

25 num_of_statements: int = len(data) 

26 if num_of_statements <= 0: 

27 return "", 0 

28 else: 

29 statements: str = data.serialize(format="nt11").replace('\n', '') 

30 if graph_iri: 

31 delete_string: str = f"DELETE DATA {{ GRAPH <{graph_iri}> {{ {statements} }} }}" 

32 else: 

33 delete_string: str = f"DELETE DATA {{ {statements} }}" 

34 return delete_string, num_of_statements 

35 

36def get_insert_query(data: Dataset|Graph, graph_iri: URIRef = None) -> Tuple[str, int]: 

37 num_of_statements: int = len(data) 

38 if num_of_statements <= 0: 

39 return "", 0 

40 else: 

41 statements: str = data.serialize(format="nt11").replace('\n', '') 

42 if graph_iri: 

43 insert_string: str = f"INSERT DATA {{ GRAPH <{graph_iri}> {{ {statements} }} }}" 

44 else: 

45 insert_string: str = f"INSERT DATA {{ {statements} }}" 

46 return insert_string, num_of_statements 

47 

48def get_update_query(a_set: OCDMGraphCommons|Dataset|Graph, entity: URIRef, entity_type = 'graph') -> Tuple[str, int, int]: 

49 if entity_type == 'graph': 

50 to_be_deleted: bool = a_set.entity_index[entity]['to_be_deleted'] if entity in a_set.entity_index else False 

51 preexisting_graph = get_entity_subgraph(a_set.preexisting_graph, entity) 

52 graph_iri = None 

53 elif entity_type == 'prov': 53 ↛ 58line 53 didn't jump to line 58 because the condition on line 53 was always true

54 to_be_deleted = False 

55 preexisting_graph = Dataset() 

56 

57 # Extract graph_iri: prefer entity_index (OCDMDataset), fallback to helper function (regular Dataset) 

58 if isinstance(a_set, Dataset): 

59 if hasattr(a_set, 'entity_index') and entity in a_set.entity_index: 

60 # Clean architectural solution: use stored graph_iri from entity_index 

61 graph_iri = a_set.entity_index[entity].get('graph_iri') 

62 else: 

63 # Fallback for regular Dataset (e.g., provenance graphs): use DRY helper function 

64 graph_iri = _extract_graph_iri(a_set, entity) 

65 elif isinstance(a_set, Graph): 65 ↛ 67line 65 didn't jump to line 67 because the condition on line 65 was always true

66 graph_iri = None 

67 if to_be_deleted: 

68 delete_string, removed_triples = get_delete_query(preexisting_graph, graph_iri) 

69 if delete_string != "": 69 ↛ 72line 69 didn't jump to line 72 because the condition on line 69 was always true

70 return delete_string, 0, removed_triples 

71 else: 

72 return "", 0, 0 

73 else: 

74 current_graph = get_entity_subgraph(a_set, entity) 

75 

76 # Convert Dataset to Graph for isomorphic comparison if needed 

77 if isinstance(preexisting_graph, Dataset): 

78 preexisting_for_comparison = Graph() 

79 for s, p, o, _ in preexisting_graph.quads((None, None, None, None)): 

80 preexisting_for_comparison.add((s, p, o)) 

81 else: 

82 preexisting_for_comparison = preexisting_graph 

83 

84 if isinstance(current_graph, Dataset): 

85 current_for_comparison = Graph() 

86 for s, p, o, _ in current_graph.quads((None, None, None, None)): 

87 current_for_comparison.add((s, p, o)) 

88 else: 

89 current_for_comparison = current_graph 

90 

91 preexisting_iso: IsomorphicGraph = to_isomorphic(preexisting_for_comparison) 

92 current_iso: IsomorphicGraph = to_isomorphic(current_for_comparison) 

93 if preexisting_iso == current_iso: 

94 # Both graphs have exactly the same content! 

95 return "", 0, 0 

96 _, in_first, in_second = graph_diff(preexisting_iso, current_iso) 

97 delete_string, removed_triples = get_delete_query(in_first, graph_iri) 

98 insert_string, added_triples = get_insert_query(in_second, graph_iri) 

99 if delete_string != "" and insert_string != "": 

100 return delete_string + '; ' + insert_string, added_triples, removed_triples 

101 elif delete_string != "": 101 ↛ 102line 101 didn't jump to line 102 because the condition on line 101 was never true

102 return delete_string, 0, removed_triples 

103 elif insert_string != "": 103 ↛ 106line 103 didn't jump to line 106 because the condition on line 103 was always true

104 return insert_string, added_triples, 0 

105 else: 

106 return "", 0, 0