Coverage for oc_ocdm / support / query_utils.py: 76%
72 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-08 20:23 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-08 20:23 +0000
1#!/usr/bin/python
3# SPDX-FileCopyrightText: 2020-2022 Simone Persiani <iosonopersia@gmail.com>
4# SPDX-FileCopyrightText: 2023-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
5#
6# SPDX-License-Identifier: ISC
8# -*- coding: utf-8 -*-
9from __future__ import annotations
11from typing import TYPE_CHECKING, AbstractSet, List, Set
13from triplelite import RDFTerm
15if TYPE_CHECKING:
16 from typing import Tuple
18 from oc_ocdm.abstract_entity import AbstractEntity
20MAX_TRIPLES_PER_QUERY = 500
23def _term_to_nt(term) -> str:
24 if isinstance(term, RDFTerm):
25 if term.type == "literal":
26 escaped = term.value.replace('\\', '\\\\').replace('"', '\\"').replace('\n', '\\n').replace('\r', '\\r')
27 if term.lang:
28 return f'"{escaped}"@{term.lang}'
29 return f'"{escaped}"^^<{term.datatype}>'
30 return f'<{term.value}>'
31 if isinstance(term, str):
32 return f'<{term}>'
33 return term.n3()
36def _serialize_triples_to_nt(triples: AbstractSet) -> str:
37 return "".join(f"{_term_to_nt(s)} {_term_to_nt(p)} {_term_to_nt(o)} ." for s, p, o in triples)
40def _chunk_set(data: AbstractSet, chunk_size: int) -> List[Set]:
41 data_list = list(data)
42 return [set(data_list[i:i + chunk_size]) for i in range(0, len(data_list), chunk_size)]
45def get_delete_query(graph_iri: str, data: AbstractSet) -> Tuple[List[str], int]:
46 num_of_statements: int = len(data)
47 if num_of_statements <= 0:
48 return [], 0
50 if num_of_statements <= MAX_TRIPLES_PER_QUERY:
51 statements: str = _serialize_triples_to_nt(data)
52 return [f"DELETE DATA {{ GRAPH <{graph_iri}> {{ {statements} }} }}"], num_of_statements
54 chunks = _chunk_set(data, MAX_TRIPLES_PER_QUERY)
55 queries = []
56 for chunk in chunks:
57 statements = _serialize_triples_to_nt(chunk)
58 queries.append(f"DELETE DATA {{ GRAPH <{graph_iri}> {{ {statements} }} }}")
59 return queries, num_of_statements
62def get_insert_query(graph_iri: str, data: AbstractSet) -> Tuple[List[str], int]:
63 num_of_statements: int = len(data)
64 if num_of_statements <= 0:
65 return [], 0
67 if num_of_statements <= MAX_TRIPLES_PER_QUERY:
68 statements: str = _serialize_triples_to_nt(data)
69 return [f"INSERT DATA {{ GRAPH <{graph_iri}> {{ {statements} }} }}"], num_of_statements
71 chunks = _chunk_set(data, MAX_TRIPLES_PER_QUERY)
72 queries = []
73 for chunk in chunks:
74 statements = _serialize_triples_to_nt(chunk)
75 queries.append(f"INSERT DATA {{ GRAPH <{graph_iri}> {{ {statements} }} }}")
76 return queries, num_of_statements
79def _compute_graph_changes(entity: AbstractEntity, entity_type: str) -> Tuple[AbstractSet, AbstractSet, int, int]:
80 """
81 Computes the triples to insert and delete for an entity.
83 Args:
84 entity: The entity to analyze
85 entity_type: Type of entity ("graph", "prov", or "metadata")
87 Returns:
88 Tuple of (triples_to_insert, triples_to_delete, added_count, removed_count)
89 """
90 if entity_type == "prov":
91 triples = set(entity.g)
92 return triples, set(), len(triples), 0
94 # Deferred import to break circular dependency:
95 # graph_entity → abstract_entity → support.support → (support/__init__) → query_utils → graph_entity
96 from oc_ocdm.graph.graph_entity import GraphEntity # noqa: E402
98 assert isinstance(entity, GraphEntity)
99 to_be_deleted: bool = entity.to_be_deleted
100 preexisting_triples = entity._preexisting_triples
102 if to_be_deleted:
103 return set(), set(preexisting_triples), 0, len(preexisting_triples)
105 current_triples = set(entity.g)
107 if len(preexisting_triples) == len(current_triples) and preexisting_triples == current_triples:
108 return set(), set(), 0, 0
110 removed_triples = preexisting_triples - current_triples
111 added_triples = current_triples - preexisting_triples
113 return added_triples, removed_triples, len(added_triples), len(removed_triples)
116def get_update_query(entity: AbstractEntity, entity_type: str = "graph") -> Tuple[List[str], int, int]:
117 to_insert, to_delete, n_added, n_removed = _compute_graph_changes(entity, entity_type)
119 if n_added == 0 and n_removed == 0:
120 return [], 0, 0
122 graph_iri = entity.g.identifier
123 if graph_iri is None:
124 raise ValueError("Entity graph has no identifier")
126 delete_queries, _ = get_delete_query(graph_iri, to_delete)
127 insert_queries, _ = get_insert_query(graph_iri, to_insert)
129 return delete_queries + insert_queries, n_added, n_removed