Coverage for oc_ocdm/support/query_utils.py: 77%
74 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-12-05 23:58 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-12-05 23:58 +0000
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3# Copyright (c) 2016, Silvio Peroni <essepuntato@gmail.com>
4#
5# Permission to use, copy, modify, and/or distribute this software for any purpose
6# with or without fee is hereby granted, provided that the above copyright notice
7# and this permission notice appear in all copies.
8#
9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
15# SOFTWARE.
16from __future__ import annotations
18from typing import TYPE_CHECKING, List, Set
20if TYPE_CHECKING:
21 from typing import Tuple
22 from rdflib import URIRef
23 from oc_ocdm.abstract_entity import AbstractEntity
25MAX_TRIPLES_PER_QUERY = 500
28def _serialize_triples_to_nt(triples: Set) -> str:
29 return "".join(f"{s.n3()} {p.n3()} {o.n3()} ." for s, p, o in triples)
32def _chunk_set(data: Set, chunk_size: int) -> List[Set]:
33 data_list = list(data)
34 return [set(data_list[i:i + chunk_size]) for i in range(0, len(data_list), chunk_size)]
37def get_delete_query(graph_iri: URIRef, data: Set) -> Tuple[List[str], int]:
38 num_of_statements: int = len(data)
39 if num_of_statements <= 0:
40 return [], 0
42 if num_of_statements <= MAX_TRIPLES_PER_QUERY:
43 statements: str = _serialize_triples_to_nt(data)
44 return [f"DELETE DATA {{ GRAPH <{graph_iri}> {{ {statements} }} }}"], num_of_statements
46 chunks = _chunk_set(data, MAX_TRIPLES_PER_QUERY)
47 queries = []
48 for chunk in chunks:
49 statements = _serialize_triples_to_nt(chunk)
50 queries.append(f"DELETE DATA {{ GRAPH <{graph_iri}> {{ {statements} }} }}")
51 return queries, num_of_statements
54def get_insert_query(graph_iri: URIRef, data: Set) -> Tuple[List[str], int]:
55 num_of_statements: int = len(data)
56 if num_of_statements <= 0:
57 return [], 0
59 if num_of_statements <= MAX_TRIPLES_PER_QUERY:
60 statements: str = _serialize_triples_to_nt(data)
61 return [f"INSERT DATA {{ GRAPH <{graph_iri}> {{ {statements} }} }}"], num_of_statements
63 chunks = _chunk_set(data, MAX_TRIPLES_PER_QUERY)
64 queries = []
65 for chunk in chunks:
66 statements = _serialize_triples_to_nt(chunk)
67 queries.append(f"INSERT DATA {{ GRAPH <{graph_iri}> {{ {statements} }} }}")
68 return queries, num_of_statements
71def _compute_graph_changes(entity: AbstractEntity, entity_type: str) -> Tuple[Set, Set, int, int]:
72 """
73 Computes the triples to insert and delete for an entity.
75 Args:
76 entity: The entity to analyze
77 entity_type: Type of entity ("graph", "prov", or "metadata")
79 Returns:
80 Tuple of (triples_to_insert, triples_to_delete, added_count, removed_count)
81 """
82 if entity_type == "prov":
83 triples = set(entity.g)
84 return triples, set(), len(triples), 0
86 to_be_deleted: bool = entity.to_be_deleted
87 preexisting_graph = entity.preexisting_graph
89 if to_be_deleted:
90 preexisting_triples = set(preexisting_graph)
91 return set(), preexisting_triples, 0, len(preexisting_triples)
93 preexisting_triples = set(preexisting_graph)
94 current_triples = set(entity.g)
96 if preexisting_triples == current_triples:
97 return set(), set(), 0, 0
99 removed_triples = preexisting_triples - current_triples
100 added_triples = current_triples - preexisting_triples
102 return added_triples, removed_triples, len(added_triples), len(removed_triples)
105def serialize_graph_to_nquads(triples: Set, graph_iri: URIRef) -> list:
106 """
107 Serializes RDF triples to N-Quads format.
109 Args:
110 triples: Set of RDF triples
111 graph_iri: Named graph IRI
113 Returns:
114 List of N-Quad strings (each ending with newline)
115 """
116 return [f"{s.n3()} {p.n3()} {o.n3()} <{graph_iri}> .\n" for s, p, o in triples]
119def get_separated_queries(entity: AbstractEntity, entity_type: str = "graph") -> Tuple[List[str], List[str], int, int, Set]:
120 """
121 Returns separate INSERT and DELETE queries for an entity, plus the insert triples.
123 Args:
124 entity: The entity to generate queries for
125 entity_type: Type of entity ("graph", "prov", or "metadata")
127 Returns:
128 Tuple of (insert_queries, delete_queries, added_count, removed_count, insert_triples)
129 The insert_triples can be used for direct N-Quads serialization without parsing SPARQL.
130 """
131 to_insert, to_delete, n_added, n_removed = _compute_graph_changes(entity, entity_type)
133 if n_added == 0 and n_removed == 0:
134 return [], [], 0, 0, set()
136 delete_queries = []
137 insert_queries = []
139 if n_removed > 0:
140 delete_queries, _ = get_delete_query(entity.g.identifier, to_delete)
142 if n_added > 0:
143 insert_queries, _ = get_insert_query(entity.g.identifier, to_insert)
145 return insert_queries, delete_queries, n_added, n_removed, to_insert
148def get_update_query(entity: AbstractEntity, entity_type: str = "graph") -> Tuple[List[str], int, int]:
149 to_insert, to_delete, n_added, n_removed = _compute_graph_changes(entity, entity_type)
151 if n_added == 0 and n_removed == 0:
152 return [], 0, 0
154 delete_queries, _ = get_delete_query(entity.g.identifier, to_delete)
155 insert_queries, _ = get_insert_query(entity.g.identifier, to_insert)
157 return delete_queries + insert_queries, n_added, n_removed