Coverage for oc_ocdm/support/query_utils.py: 77%

74 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-12-05 23:58 +0000

1#!/usr/bin/python 

2# -*- coding: utf-8 -*- 

3# Copyright (c) 2016, Silvio Peroni <essepuntato@gmail.com> 

4# 

5# Permission to use, copy, modify, and/or distribute this software for any purpose 

6# with or without fee is hereby granted, provided that the above copyright notice 

7# and this permission notice appear in all copies. 

8# 

9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, 

12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, 

13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 

14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS 

15# SOFTWARE. 

16from __future__ import annotations 

17 

18from typing import TYPE_CHECKING, List, Set 

19 

20if TYPE_CHECKING: 

21 from typing import Tuple 

22 from rdflib import URIRef 

23 from oc_ocdm.abstract_entity import AbstractEntity 

24 

25MAX_TRIPLES_PER_QUERY = 500 

26 

27 

28def _serialize_triples_to_nt(triples: Set) -> str: 

29 return "".join(f"{s.n3()} {p.n3()} {o.n3()} ." for s, p, o in triples) 

30 

31 

32def _chunk_set(data: Set, chunk_size: int) -> List[Set]: 

33 data_list = list(data) 

34 return [set(data_list[i:i + chunk_size]) for i in range(0, len(data_list), chunk_size)] 

35 

36 

37def get_delete_query(graph_iri: URIRef, data: Set) -> Tuple[List[str], int]: 

38 num_of_statements: int = len(data) 

39 if num_of_statements <= 0: 

40 return [], 0 

41 

42 if num_of_statements <= MAX_TRIPLES_PER_QUERY: 

43 statements: str = _serialize_triples_to_nt(data) 

44 return [f"DELETE DATA {{ GRAPH <{graph_iri}> {{ {statements} }} }}"], num_of_statements 

45 

46 chunks = _chunk_set(data, MAX_TRIPLES_PER_QUERY) 

47 queries = [] 

48 for chunk in chunks: 

49 statements = _serialize_triples_to_nt(chunk) 

50 queries.append(f"DELETE DATA {{ GRAPH <{graph_iri}> {{ {statements} }} }}") 

51 return queries, num_of_statements 

52 

53 

54def get_insert_query(graph_iri: URIRef, data: Set) -> Tuple[List[str], int]: 

55 num_of_statements: int = len(data) 

56 if num_of_statements <= 0: 

57 return [], 0 

58 

59 if num_of_statements <= MAX_TRIPLES_PER_QUERY: 

60 statements: str = _serialize_triples_to_nt(data) 

61 return [f"INSERT DATA {{ GRAPH <{graph_iri}> {{ {statements} }} }}"], num_of_statements 

62 

63 chunks = _chunk_set(data, MAX_TRIPLES_PER_QUERY) 

64 queries = [] 

65 for chunk in chunks: 

66 statements = _serialize_triples_to_nt(chunk) 

67 queries.append(f"INSERT DATA {{ GRAPH <{graph_iri}> {{ {statements} }} }}") 

68 return queries, num_of_statements 

69 

70 

71def _compute_graph_changes(entity: AbstractEntity, entity_type: str) -> Tuple[Set, Set, int, int]: 

72 """ 

73 Computes the triples to insert and delete for an entity. 

74 

75 Args: 

76 entity: The entity to analyze 

77 entity_type: Type of entity ("graph", "prov", or "metadata") 

78 

79 Returns: 

80 Tuple of (triples_to_insert, triples_to_delete, added_count, removed_count) 

81 """ 

82 if entity_type == "prov": 

83 triples = set(entity.g) 

84 return triples, set(), len(triples), 0 

85 

86 to_be_deleted: bool = entity.to_be_deleted 

87 preexisting_graph = entity.preexisting_graph 

88 

89 if to_be_deleted: 

90 preexisting_triples = set(preexisting_graph) 

91 return set(), preexisting_triples, 0, len(preexisting_triples) 

92 

93 preexisting_triples = set(preexisting_graph) 

94 current_triples = set(entity.g) 

95 

96 if preexisting_triples == current_triples: 

97 return set(), set(), 0, 0 

98 

99 removed_triples = preexisting_triples - current_triples 

100 added_triples = current_triples - preexisting_triples 

101 

102 return added_triples, removed_triples, len(added_triples), len(removed_triples) 

103 

104 

105def serialize_graph_to_nquads(triples: Set, graph_iri: URIRef) -> list: 

106 """ 

107 Serializes RDF triples to N-Quads format. 

108 

109 Args: 

110 triples: Set of RDF triples 

111 graph_iri: Named graph IRI 

112 

113 Returns: 

114 List of N-Quad strings (each ending with newline) 

115 """ 

116 return [f"{s.n3()} {p.n3()} {o.n3()} <{graph_iri}> .\n" for s, p, o in triples] 

117 

118 

119def get_separated_queries(entity: AbstractEntity, entity_type: str = "graph") -> Tuple[List[str], List[str], int, int, Set]: 

120 """ 

121 Returns separate INSERT and DELETE queries for an entity, plus the insert triples. 

122 

123 Args: 

124 entity: The entity to generate queries for 

125 entity_type: Type of entity ("graph", "prov", or "metadata") 

126 

127 Returns: 

128 Tuple of (insert_queries, delete_queries, added_count, removed_count, insert_triples) 

129 The insert_triples can be used for direct N-Quads serialization without parsing SPARQL. 

130 """ 

131 to_insert, to_delete, n_added, n_removed = _compute_graph_changes(entity, entity_type) 

132 

133 if n_added == 0 and n_removed == 0: 

134 return [], [], 0, 0, set() 

135 

136 delete_queries = [] 

137 insert_queries = [] 

138 

139 if n_removed > 0: 

140 delete_queries, _ = get_delete_query(entity.g.identifier, to_delete) 

141 

142 if n_added > 0: 

143 insert_queries, _ = get_insert_query(entity.g.identifier, to_insert) 

144 

145 return insert_queries, delete_queries, n_added, n_removed, to_insert 

146 

147 

148def get_update_query(entity: AbstractEntity, entity_type: str = "graph") -> Tuple[List[str], int, int]: 

149 to_insert, to_delete, n_added, n_removed = _compute_graph_changes(entity, entity_type) 

150 

151 if n_added == 0 and n_removed == 0: 

152 return [], 0, 0 

153 

154 delete_queries, _ = get_delete_query(entity.g.identifier, to_delete) 

155 insert_queries, _ = get_insert_query(entity.g.identifier, to_insert) 

156 

157 return delete_queries + insert_queries, n_added, n_removed