Coverage for oc_ocdm / support / query_utils.py: 77%

62 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-28 18:52 +0000

1#!/usr/bin/python 

2 

3# SPDX-FileCopyrightText: 2020-2022 Simone Persiani <iosonopersia@gmail.com> 

4# SPDX-FileCopyrightText: 2023-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

5# 

6# SPDX-License-Identifier: ISC 

7 

8# -*- coding: utf-8 -*- 

9from __future__ import annotations 

10 

11from typing import TYPE_CHECKING, List, Set 

12 

13from rdflib import URIRef 

14 

15if TYPE_CHECKING: 

16 from typing import Tuple 

17 from oc_ocdm.abstract_entity import AbstractEntity 

18 

19MAX_TRIPLES_PER_QUERY = 500 

20 

21 

22def _serialize_triples_to_nt(triples: Set) -> str: 

23 return "".join(f"{s.n3()} {p.n3()} {o.n3()} ." for s, p, o in triples) 

24 

25 

26def _chunk_set(data: Set, chunk_size: int) -> List[Set]: 

27 data_list = list(data) 

28 return [set(data_list[i:i + chunk_size]) for i in range(0, len(data_list), chunk_size)] 

29 

30 

31def get_delete_query(graph_iri: URIRef, data: Set) -> Tuple[List[str], int]: 

32 num_of_statements: int = len(data) 

33 if num_of_statements <= 0: 

34 return [], 0 

35 

36 if num_of_statements <= MAX_TRIPLES_PER_QUERY: 

37 statements: str = _serialize_triples_to_nt(data) 

38 return [f"DELETE DATA {{ GRAPH <{graph_iri}> {{ {statements} }} }}"], num_of_statements 

39 

40 chunks = _chunk_set(data, MAX_TRIPLES_PER_QUERY) 

41 queries = [] 

42 for chunk in chunks: 

43 statements = _serialize_triples_to_nt(chunk) 

44 queries.append(f"DELETE DATA {{ GRAPH <{graph_iri}> {{ {statements} }} }}") 

45 return queries, num_of_statements 

46 

47 

48def get_insert_query(graph_iri: URIRef, data: Set) -> Tuple[List[str], int]: 

49 num_of_statements: int = len(data) 

50 if num_of_statements <= 0: 

51 return [], 0 

52 

53 if num_of_statements <= MAX_TRIPLES_PER_QUERY: 

54 statements: str = _serialize_triples_to_nt(data) 

55 return [f"INSERT DATA {{ GRAPH <{graph_iri}> {{ {statements} }} }}"], num_of_statements 

56 

57 chunks = _chunk_set(data, MAX_TRIPLES_PER_QUERY) 

58 queries = [] 

59 for chunk in chunks: 

60 statements = _serialize_triples_to_nt(chunk) 

61 queries.append(f"INSERT DATA {{ GRAPH <{graph_iri}> {{ {statements} }} }}") 

62 return queries, num_of_statements 

63 

64 

65def _compute_graph_changes(entity: AbstractEntity, entity_type: str) -> Tuple[Set, Set, int, int]: 

66 """ 

67 Computes the triples to insert and delete for an entity. 

68 

69 Args: 

70 entity: The entity to analyze 

71 entity_type: Type of entity ("graph", "prov", or "metadata") 

72 

73 Returns: 

74 Tuple of (triples_to_insert, triples_to_delete, added_count, removed_count) 

75 """ 

76 if entity_type == "prov": 

77 triples = set(entity.g) 

78 return triples, set(), len(triples), 0 

79 

80 # Deferred import to break circular dependency: 

81 # graph_entity → abstract_entity → support.support → (support/__init__) → query_utils → graph_entity 

82 from oc_ocdm.graph.graph_entity import GraphEntity # noqa: E402 

83 

84 assert isinstance(entity, GraphEntity) 

85 to_be_deleted: bool = entity.to_be_deleted 

86 preexisting_graph = entity.preexisting_graph 

87 

88 if to_be_deleted: 

89 preexisting_triples = set(preexisting_graph) 

90 return set(), preexisting_triples, 0, len(preexisting_triples) 

91 

92 preexisting_triples = set(preexisting_graph) 

93 current_triples = set(entity.g) 

94 

95 if preexisting_triples == current_triples: 

96 return set(), set(), 0, 0 

97 

98 removed_triples = preexisting_triples - current_triples 

99 added_triples = current_triples - preexisting_triples 

100 

101 return added_triples, removed_triples, len(added_triples), len(removed_triples) 

102 

103 

104def get_update_query(entity: AbstractEntity, entity_type: str = "graph") -> Tuple[List[str], int, int]: 

105 to_insert, to_delete, n_added, n_removed = _compute_graph_changes(entity, entity_type) 

106 

107 if n_added == 0 and n_removed == 0: 

108 return [], 0, 0 

109 

110 graph_iri = entity.g.identifier 

111 assert isinstance(graph_iri, URIRef) 

112 

113 delete_queries, _ = get_delete_query(graph_iri, to_delete) 

114 insert_queries, _ = get_insert_query(graph_iri, to_insert) 

115 

116 return delete_queries + insert_queries, n_added, n_removed