Coverage for oc_ocdm / support / query_utils.py: 77%
62 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-28 18:52 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-28 18:52 +0000
1#!/usr/bin/python
3# SPDX-FileCopyrightText: 2020-2022 Simone Persiani <iosonopersia@gmail.com>
4# SPDX-FileCopyrightText: 2023-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
5#
6# SPDX-License-Identifier: ISC
8# -*- coding: utf-8 -*-
9from __future__ import annotations
11from typing import TYPE_CHECKING, List, Set
13from rdflib import URIRef
15if TYPE_CHECKING:
16 from typing import Tuple
17 from oc_ocdm.abstract_entity import AbstractEntity
19MAX_TRIPLES_PER_QUERY = 500
22def _serialize_triples_to_nt(triples: Set) -> str:
23 return "".join(f"{s.n3()} {p.n3()} {o.n3()} ." for s, p, o in triples)
26def _chunk_set(data: Set, chunk_size: int) -> List[Set]:
27 data_list = list(data)
28 return [set(data_list[i:i + chunk_size]) for i in range(0, len(data_list), chunk_size)]
31def get_delete_query(graph_iri: URIRef, data: Set) -> Tuple[List[str], int]:
32 num_of_statements: int = len(data)
33 if num_of_statements <= 0:
34 return [], 0
36 if num_of_statements <= MAX_TRIPLES_PER_QUERY:
37 statements: str = _serialize_triples_to_nt(data)
38 return [f"DELETE DATA {{ GRAPH <{graph_iri}> {{ {statements} }} }}"], num_of_statements
40 chunks = _chunk_set(data, MAX_TRIPLES_PER_QUERY)
41 queries = []
42 for chunk in chunks:
43 statements = _serialize_triples_to_nt(chunk)
44 queries.append(f"DELETE DATA {{ GRAPH <{graph_iri}> {{ {statements} }} }}")
45 return queries, num_of_statements
48def get_insert_query(graph_iri: URIRef, data: Set) -> Tuple[List[str], int]:
49 num_of_statements: int = len(data)
50 if num_of_statements <= 0:
51 return [], 0
53 if num_of_statements <= MAX_TRIPLES_PER_QUERY:
54 statements: str = _serialize_triples_to_nt(data)
55 return [f"INSERT DATA {{ GRAPH <{graph_iri}> {{ {statements} }} }}"], num_of_statements
57 chunks = _chunk_set(data, MAX_TRIPLES_PER_QUERY)
58 queries = []
59 for chunk in chunks:
60 statements = _serialize_triples_to_nt(chunk)
61 queries.append(f"INSERT DATA {{ GRAPH <{graph_iri}> {{ {statements} }} }}")
62 return queries, num_of_statements
65def _compute_graph_changes(entity: AbstractEntity, entity_type: str) -> Tuple[Set, Set, int, int]:
66 """
67 Computes the triples to insert and delete for an entity.
69 Args:
70 entity: The entity to analyze
71 entity_type: Type of entity ("graph", "prov", or "metadata")
73 Returns:
74 Tuple of (triples_to_insert, triples_to_delete, added_count, removed_count)
75 """
76 if entity_type == "prov":
77 triples = set(entity.g)
78 return triples, set(), len(triples), 0
80 # Deferred import to break circular dependency:
81 # graph_entity → abstract_entity → support.support → (support/__init__) → query_utils → graph_entity
82 from oc_ocdm.graph.graph_entity import GraphEntity # noqa: E402
84 assert isinstance(entity, GraphEntity)
85 to_be_deleted: bool = entity.to_be_deleted
86 preexisting_graph = entity.preexisting_graph
88 if to_be_deleted:
89 preexisting_triples = set(preexisting_graph)
90 return set(), preexisting_triples, 0, len(preexisting_triples)
92 preexisting_triples = set(preexisting_graph)
93 current_triples = set(entity.g)
95 if preexisting_triples == current_triples:
96 return set(), set(), 0, 0
98 removed_triples = preexisting_triples - current_triples
99 added_triples = current_triples - preexisting_triples
101 return added_triples, removed_triples, len(added_triples), len(removed_triples)
104def get_update_query(entity: AbstractEntity, entity_type: str = "graph") -> Tuple[List[str], int, int]:
105 to_insert, to_delete, n_added, n_removed = _compute_graph_changes(entity, entity_type)
107 if n_added == 0 and n_removed == 0:
108 return [], 0, 0
110 graph_iri = entity.g.identifier
111 assert isinstance(graph_iri, URIRef)
113 delete_queries, _ = get_delete_query(graph_iri, to_delete)
114 insert_queries, _ = get_insert_query(graph_iri, to_insert)
116 return delete_queries + insert_queries, n_added, n_removed