Coverage for oc_ocdm / support / query_utils.py: 76%

72 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-05-08 20:23 +0000

1#!/usr/bin/python 

2 

3# SPDX-FileCopyrightText: 2020-2022 Simone Persiani <iosonopersia@gmail.com> 

4# SPDX-FileCopyrightText: 2023-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

5# 

6# SPDX-License-Identifier: ISC 

7 

8# -*- coding: utf-8 -*- 

9from __future__ import annotations 

10 

11from typing import TYPE_CHECKING, AbstractSet, List, Set 

12 

13from triplelite import RDFTerm 

14 

15if TYPE_CHECKING: 

16 from typing import Tuple 

17 

18 from oc_ocdm.abstract_entity import AbstractEntity 

19 

20MAX_TRIPLES_PER_QUERY = 500 

21 

22 

23def _term_to_nt(term) -> str: 

24 if isinstance(term, RDFTerm): 

25 if term.type == "literal": 

26 escaped = term.value.replace('\\', '\\\\').replace('"', '\\"').replace('\n', '\\n').replace('\r', '\\r') 

27 if term.lang: 

28 return f'"{escaped}"@{term.lang}' 

29 return f'"{escaped}"^^<{term.datatype}>' 

30 return f'<{term.value}>' 

31 if isinstance(term, str): 

32 return f'<{term}>' 

33 return term.n3() 

34 

35 

36def _serialize_triples_to_nt(triples: AbstractSet) -> str: 

37 return "".join(f"{_term_to_nt(s)} {_term_to_nt(p)} {_term_to_nt(o)} ." for s, p, o in triples) 

38 

39 

40def _chunk_set(data: AbstractSet, chunk_size: int) -> List[Set]: 

41 data_list = list(data) 

42 return [set(data_list[i:i + chunk_size]) for i in range(0, len(data_list), chunk_size)] 

43 

44 

45def get_delete_query(graph_iri: str, data: AbstractSet) -> Tuple[List[str], int]: 

46 num_of_statements: int = len(data) 

47 if num_of_statements <= 0: 

48 return [], 0 

49 

50 if num_of_statements <= MAX_TRIPLES_PER_QUERY: 

51 statements: str = _serialize_triples_to_nt(data) 

52 return [f"DELETE DATA {{ GRAPH <{graph_iri}> {{ {statements} }} }}"], num_of_statements 

53 

54 chunks = _chunk_set(data, MAX_TRIPLES_PER_QUERY) 

55 queries = [] 

56 for chunk in chunks: 

57 statements = _serialize_triples_to_nt(chunk) 

58 queries.append(f"DELETE DATA {{ GRAPH <{graph_iri}> {{ {statements} }} }}") 

59 return queries, num_of_statements 

60 

61 

62def get_insert_query(graph_iri: str, data: AbstractSet) -> Tuple[List[str], int]: 

63 num_of_statements: int = len(data) 

64 if num_of_statements <= 0: 

65 return [], 0 

66 

67 if num_of_statements <= MAX_TRIPLES_PER_QUERY: 

68 statements: str = _serialize_triples_to_nt(data) 

69 return [f"INSERT DATA {{ GRAPH <{graph_iri}> {{ {statements} }} }}"], num_of_statements 

70 

71 chunks = _chunk_set(data, MAX_TRIPLES_PER_QUERY) 

72 queries = [] 

73 for chunk in chunks: 

74 statements = _serialize_triples_to_nt(chunk) 

75 queries.append(f"INSERT DATA {{ GRAPH <{graph_iri}> {{ {statements} }} }}") 

76 return queries, num_of_statements 

77 

78 

79def _compute_graph_changes(entity: AbstractEntity, entity_type: str) -> Tuple[AbstractSet, AbstractSet, int, int]: 

80 """ 

81 Computes the triples to insert and delete for an entity. 

82 

83 Args: 

84 entity: The entity to analyze 

85 entity_type: Type of entity ("graph", "prov", or "metadata") 

86 

87 Returns: 

88 Tuple of (triples_to_insert, triples_to_delete, added_count, removed_count) 

89 """ 

90 if entity_type == "prov": 

91 triples = set(entity.g) 

92 return triples, set(), len(triples), 0 

93 

94 # Deferred import to break circular dependency: 

95 # graph_entity → abstract_entity → support.support → (support/__init__) → query_utils → graph_entity 

96 from oc_ocdm.graph.graph_entity import GraphEntity # noqa: E402 

97 

98 assert isinstance(entity, GraphEntity) 

99 to_be_deleted: bool = entity.to_be_deleted 

100 preexisting_triples = entity._preexisting_triples 

101 

102 if to_be_deleted: 

103 return set(), set(preexisting_triples), 0, len(preexisting_triples) 

104 

105 current_triples = set(entity.g) 

106 

107 if len(preexisting_triples) == len(current_triples) and preexisting_triples == current_triples: 

108 return set(), set(), 0, 0 

109 

110 removed_triples = preexisting_triples - current_triples 

111 added_triples = current_triples - preexisting_triples 

112 

113 return added_triples, removed_triples, len(added_triples), len(removed_triples) 

114 

115 

116def get_update_query(entity: AbstractEntity, entity_type: str = "graph") -> Tuple[List[str], int, int]: 

117 to_insert, to_delete, n_added, n_removed = _compute_graph_changes(entity, entity_type) 

118 

119 if n_added == 0 and n_removed == 0: 

120 return [], 0, 0 

121 

122 graph_iri = entity.g.identifier 

123 if graph_iri is None: 

124 raise ValueError("Entity graph has no identifier") 

125 

126 delete_queries, _ = get_delete_query(graph_iri, to_delete) 

127 insert_queries, _ = get_insert_query(graph_iri, to_insert) 

128 

129 return delete_queries + insert_queries, n_added, n_removed