Coverage for heritrace / routes / entity / _validation.py: 90%
101 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-07-02 10:16 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-07-02 10:16 +0000
1# SPDX-FileCopyrightText: 2024-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5from dataclasses import dataclass
7from flask_babel import gettext
8from rdflib import RDF, URIRef
10from heritrace.extensions import get_custom_filter, get_form_fields
11from heritrace.utils.datatypes import DATATYPE_MAPPING
12from heritrace.utils.filters import Filter
13from heritrace.utils.shacl_utils import find_matching_form_field
16@dataclass(frozen=True, slots=True)
17class PropertyValidationInput:
18 matching_field_def: dict
19 normalized_prop_values: list
20 prop_uri: str
21 entity_key: tuple
22 custom_filter: Filter
25def _validate_property_cardinality(
26 errors: list[str],
27 prop_input: PropertyValidationInput,
28) -> None:
29 min_count = prop_input.matching_field_def.get("min", 0)
30 max_count = prop_input.matching_field_def.get("max")
31 value_count = len(prop_input.normalized_prop_values)
33 if value_count < min_count:
34 value = gettext("values") if min_count > 1 else gettext("value")
35 errors.append(
36 gettext(
37 "Property %(prop_uri)s requires at least %(min_count)d %(value)s",
38 prop_uri=prop_input.custom_filter.human_readable_predicate(
39 prop_input.prop_uri, prop_input.entity_key
40 ),
41 min_count=min_count,
42 value=value,
43 )
44 )
45 if max_count is not None and value_count > max_count:
46 value = gettext("values") if max_count > 1 else gettext("value")
47 errors.append(
48 gettext(
49 "Property %(prop_uri)s allows at most %(max_count)d %(value)s",
50 prop_uri=prop_input.custom_filter.human_readable_predicate(
51 prop_input.prop_uri, prop_input.entity_key
52 ),
53 max_count=max_count,
54 value=value,
55 )
56 )
58 mandatory_values = prop_input.matching_field_def.get("mandatory_values", [])
59 errors.extend(
60 gettext(
61 "Property %(prop_uri)s requires the value %(mandatory_value)s",
62 prop_uri=prop_input.custom_filter.human_readable_predicate(
63 prop_input.prop_uri, prop_input.entity_key
64 ),
65 mandatory_value=mandatory_value,
66 )
67 for mandatory_value in mandatory_values
68 if mandatory_value not in prop_input.normalized_prop_values
69 )
72def _validate_property_values(
73 errors: list[str],
74 prop_input: PropertyValidationInput,
75) -> None:
76 for value in prop_input.normalized_prop_values:
77 if isinstance(value, dict) and "entity_type" in value:
78 nested_errors = validate_entity_data(value)
79 errors.extend(nested_errors)
80 else:
81 datatypes = prop_input.matching_field_def.get("datatypes", [])
82 if datatypes:
83 is_valid_datatype = False
84 for dtype in datatypes:
85 validation_func = next(
86 (d[1] for d in DATATYPE_MAPPING if d[0] == URIRef(dtype)),
87 None,
88 )
89 if validation_func and validation_func(value):
90 is_valid_datatype = True
91 break
92 if not is_valid_datatype:
93 expected_types = ", ".join(
94 [
95 prop_input.custom_filter.human_readable_predicate(
96 dtype, prop_input.entity_key
97 )
98 for dtype in datatypes
99 ]
100 )
101 errors.append(
102 gettext(
103 'Value "%(value)s" for property'
104 " %(prop_uri)s is not of expected"
105 " type %(expected_types)s",
106 value=value,
107 prop_uri=prop_input.custom_filter.human_readable_predicate(
108 prop_input.prop_uri, prop_input.entity_key
109 ),
110 expected_types=expected_types,
111 )
112 )
114 optional_values = prop_input.matching_field_def.get("optionalValues", [])
115 if optional_values and value not in optional_values:
116 acceptable_values = ", ".join(
117 [
118 prop_input.custom_filter.human_readable_predicate(
119 val, prop_input.entity_key
120 )
121 for val in optional_values
122 ]
123 )
124 errors.append(
125 gettext(
126 'Value "%(value)s" is not permitted for'
127 " property %(prop_uri)s. Acceptable values"
128 " are: %(acceptable_values)s",
129 value=value,
130 prop_uri=prop_input.custom_filter.human_readable_predicate(
131 prop_input.prop_uri, prop_input.entity_key
132 ),
133 acceptable_values=acceptable_values,
134 )
135 )
138def _check_missing_required_properties(
139 errors: list[str],
140 entity_fields: dict,
141 properties: dict,
142 entity_key: tuple,
143 custom_filter: Filter,
144) -> None:
145 # In the RDF model, a property with zero values is
146 # equivalent to the property being absent, as a triple
147 # requires a subject, predicate, and object. Therefore,
148 # this section checks for properties defined in the schema
149 # that are completely absent from the input data but are
150 # required (min_count > 0). This complements the cardinality check above, which only
151 # validates properties that are present in the data.
152 for prop_uri, field_definitions in entity_fields.items():
153 if prop_uri not in properties:
154 for field_def in field_definitions:
155 min_count = field_def.get("min", 0)
156 if min_count > 0:
157 value = gettext("values") if min_count > 1 else gettext("value")
158 errors.append(
159 gettext(
160 "Missing required property:"
161 " %(prop_uri)s requires at least"
162 " %(min_count)d %(value)s",
163 prop_uri=custom_filter.human_readable_predicate(
164 prop_uri, entity_key
165 ),
166 min_count=min_count,
167 value=value,
168 )
169 )
170 break
173def _find_matching_field_definition(
174 field_definitions: list[dict],
175 normalized_prop_values: list,
176) -> dict | None:
177 property_shape = None
178 if normalized_prop_values and isinstance(normalized_prop_values[0], dict):
179 property_shape = normalized_prop_values[0].get("shape")
181 matching_field_def = None
182 for field_def in field_definitions:
183 if property_shape:
184 if field_def.get("subjectShape") == property_shape:
185 matching_field_def = field_def
186 break
187 elif not field_def.get("subjectShape"):
188 matching_field_def = field_def
189 break
191 if not matching_field_def and field_definitions:
192 matching_field_def = field_definitions[0]
194 return matching_field_def
197def validate_entity_data(structured_data: dict) -> list[str]:
198 custom_filter = get_custom_filter()
199 form_fields = get_form_fields()
201 errors = []
202 entity_type = structured_data.get("entity_type")
203 entity_shape = structured_data.get("entity_shape")
205 if not entity_type:
206 errors.append(gettext("Entity type is required"))
207 return errors
209 entity_key = find_matching_form_field(entity_type, entity_shape, form_fields)
211 if not entity_key:
212 errors.append(
213 f"No form fields found for entity type: {entity_type}"
214 + (f" and shape: {entity_shape}" if entity_shape else "")
215 )
216 return errors
218 entity_fields = form_fields[entity_key]
219 properties = structured_data.get("properties", {})
221 for prop_uri, prop_values in properties.items():
222 if URIRef(prop_uri) == RDF.type:
223 continue
225 field_definitions = entity_fields.get(prop_uri)
226 if not field_definitions:
227 errors.append(
228 gettext(
229 "Unknown property %(prop_uri)s for entity type %(entity_type)s",
230 prop_uri=custom_filter.human_readable_predicate(
231 prop_uri, entity_key
232 ),
233 entity_type=custom_filter.human_readable_class(entity_key),
234 )
235 )
236 continue
238 normalized_prop_values = (
239 prop_values if isinstance(prop_values, list) else [prop_values]
240 )
242 matching_field_def = _find_matching_field_definition(
243 field_definitions,
244 normalized_prop_values,
245 )
247 if matching_field_def:
248 prop_input = PropertyValidationInput(
249 matching_field_def=matching_field_def,
250 normalized_prop_values=normalized_prop_values,
251 prop_uri=prop_uri,
252 entity_key=entity_key,
253 custom_filter=custom_filter,
254 )
255 _validate_property_cardinality(errors, prop_input)
256 _validate_property_values(errors, prop_input)
258 _check_missing_required_properties(
259 errors,
260 entity_fields,
261 properties,
262 entity_key,
263 custom_filter,
264 )
266 return errors