Coverage for heritrace / routes / entity / _validation.py: 90%

101 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-07-02 10:16 +0000

1# SPDX-FileCopyrightText: 2024-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5from dataclasses import dataclass 

6 

7from flask_babel import gettext 

8from rdflib import RDF, URIRef 

9 

10from heritrace.extensions import get_custom_filter, get_form_fields 

11from heritrace.utils.datatypes import DATATYPE_MAPPING 

12from heritrace.utils.filters import Filter 

13from heritrace.utils.shacl_utils import find_matching_form_field 

14 

15 

16@dataclass(frozen=True, slots=True) 

17class PropertyValidationInput: 

18 matching_field_def: dict 

19 normalized_prop_values: list 

20 prop_uri: str 

21 entity_key: tuple 

22 custom_filter: Filter 

23 

24 

25def _validate_property_cardinality( 

26 errors: list[str], 

27 prop_input: PropertyValidationInput, 

28) -> None: 

29 min_count = prop_input.matching_field_def.get("min", 0) 

30 max_count = prop_input.matching_field_def.get("max") 

31 value_count = len(prop_input.normalized_prop_values) 

32 

33 if value_count < min_count: 

34 value = gettext("values") if min_count > 1 else gettext("value") 

35 errors.append( 

36 gettext( 

37 "Property %(prop_uri)s requires at least %(min_count)d %(value)s", 

38 prop_uri=prop_input.custom_filter.human_readable_predicate( 

39 prop_input.prop_uri, prop_input.entity_key 

40 ), 

41 min_count=min_count, 

42 value=value, 

43 ) 

44 ) 

45 if max_count is not None and value_count > max_count: 

46 value = gettext("values") if max_count > 1 else gettext("value") 

47 errors.append( 

48 gettext( 

49 "Property %(prop_uri)s allows at most %(max_count)d %(value)s", 

50 prop_uri=prop_input.custom_filter.human_readable_predicate( 

51 prop_input.prop_uri, prop_input.entity_key 

52 ), 

53 max_count=max_count, 

54 value=value, 

55 ) 

56 ) 

57 

58 mandatory_values = prop_input.matching_field_def.get("mandatory_values", []) 

59 errors.extend( 

60 gettext( 

61 "Property %(prop_uri)s requires the value %(mandatory_value)s", 

62 prop_uri=prop_input.custom_filter.human_readable_predicate( 

63 prop_input.prop_uri, prop_input.entity_key 

64 ), 

65 mandatory_value=mandatory_value, 

66 ) 

67 for mandatory_value in mandatory_values 

68 if mandatory_value not in prop_input.normalized_prop_values 

69 ) 

70 

71 

72def _validate_property_values( 

73 errors: list[str], 

74 prop_input: PropertyValidationInput, 

75) -> None: 

76 for value in prop_input.normalized_prop_values: 

77 if isinstance(value, dict) and "entity_type" in value: 

78 nested_errors = validate_entity_data(value) 

79 errors.extend(nested_errors) 

80 else: 

81 datatypes = prop_input.matching_field_def.get("datatypes", []) 

82 if datatypes: 

83 is_valid_datatype = False 

84 for dtype in datatypes: 

85 validation_func = next( 

86 (d[1] for d in DATATYPE_MAPPING if d[0] == URIRef(dtype)), 

87 None, 

88 ) 

89 if validation_func and validation_func(value): 

90 is_valid_datatype = True 

91 break 

92 if not is_valid_datatype: 

93 expected_types = ", ".join( 

94 [ 

95 prop_input.custom_filter.human_readable_predicate( 

96 dtype, prop_input.entity_key 

97 ) 

98 for dtype in datatypes 

99 ] 

100 ) 

101 errors.append( 

102 gettext( 

103 'Value "%(value)s" for property' 

104 " %(prop_uri)s is not of expected" 

105 " type %(expected_types)s", 

106 value=value, 

107 prop_uri=prop_input.custom_filter.human_readable_predicate( 

108 prop_input.prop_uri, prop_input.entity_key 

109 ), 

110 expected_types=expected_types, 

111 ) 

112 ) 

113 

114 optional_values = prop_input.matching_field_def.get("optionalValues", []) 

115 if optional_values and value not in optional_values: 

116 acceptable_values = ", ".join( 

117 [ 

118 prop_input.custom_filter.human_readable_predicate( 

119 val, prop_input.entity_key 

120 ) 

121 for val in optional_values 

122 ] 

123 ) 

124 errors.append( 

125 gettext( 

126 'Value "%(value)s" is not permitted for' 

127 " property %(prop_uri)s. Acceptable values" 

128 " are: %(acceptable_values)s", 

129 value=value, 

130 prop_uri=prop_input.custom_filter.human_readable_predicate( 

131 prop_input.prop_uri, prop_input.entity_key 

132 ), 

133 acceptable_values=acceptable_values, 

134 ) 

135 ) 

136 

137 

138def _check_missing_required_properties( 

139 errors: list[str], 

140 entity_fields: dict, 

141 properties: dict, 

142 entity_key: tuple, 

143 custom_filter: Filter, 

144) -> None: 

145 # In the RDF model, a property with zero values is 

146 # equivalent to the property being absent, as a triple 

147 # requires a subject, predicate, and object. Therefore, 

148 # this section checks for properties defined in the schema 

149 # that are completely absent from the input data but are 

150 # required (min_count > 0). This complements the cardinality check above, which only 

151 # validates properties that are present in the data. 

152 for prop_uri, field_definitions in entity_fields.items(): 

153 if prop_uri not in properties: 

154 for field_def in field_definitions: 

155 min_count = field_def.get("min", 0) 

156 if min_count > 0: 

157 value = gettext("values") if min_count > 1 else gettext("value") 

158 errors.append( 

159 gettext( 

160 "Missing required property:" 

161 " %(prop_uri)s requires at least" 

162 " %(min_count)d %(value)s", 

163 prop_uri=custom_filter.human_readable_predicate( 

164 prop_uri, entity_key 

165 ), 

166 min_count=min_count, 

167 value=value, 

168 ) 

169 ) 

170 break 

171 

172 

173def _find_matching_field_definition( 

174 field_definitions: list[dict], 

175 normalized_prop_values: list, 

176) -> dict | None: 

177 property_shape = None 

178 if normalized_prop_values and isinstance(normalized_prop_values[0], dict): 

179 property_shape = normalized_prop_values[0].get("shape") 

180 

181 matching_field_def = None 

182 for field_def in field_definitions: 

183 if property_shape: 

184 if field_def.get("subjectShape") == property_shape: 

185 matching_field_def = field_def 

186 break 

187 elif not field_def.get("subjectShape"): 

188 matching_field_def = field_def 

189 break 

190 

191 if not matching_field_def and field_definitions: 

192 matching_field_def = field_definitions[0] 

193 

194 return matching_field_def 

195 

196 

197def validate_entity_data(structured_data: dict) -> list[str]: 

198 custom_filter = get_custom_filter() 

199 form_fields = get_form_fields() 

200 

201 errors = [] 

202 entity_type = structured_data.get("entity_type") 

203 entity_shape = structured_data.get("entity_shape") 

204 

205 if not entity_type: 

206 errors.append(gettext("Entity type is required")) 

207 return errors 

208 

209 entity_key = find_matching_form_field(entity_type, entity_shape, form_fields) 

210 

211 if not entity_key: 

212 errors.append( 

213 f"No form fields found for entity type: {entity_type}" 

214 + (f" and shape: {entity_shape}" if entity_shape else "") 

215 ) 

216 return errors 

217 

218 entity_fields = form_fields[entity_key] 

219 properties = structured_data.get("properties", {}) 

220 

221 for prop_uri, prop_values in properties.items(): 

222 if URIRef(prop_uri) == RDF.type: 

223 continue 

224 

225 field_definitions = entity_fields.get(prop_uri) 

226 if not field_definitions: 

227 errors.append( 

228 gettext( 

229 "Unknown property %(prop_uri)s for entity type %(entity_type)s", 

230 prop_uri=custom_filter.human_readable_predicate( 

231 prop_uri, entity_key 

232 ), 

233 entity_type=custom_filter.human_readable_class(entity_key), 

234 ) 

235 ) 

236 continue 

237 

238 normalized_prop_values = ( 

239 prop_values if isinstance(prop_values, list) else [prop_values] 

240 ) 

241 

242 matching_field_def = _find_matching_field_definition( 

243 field_definitions, 

244 normalized_prop_values, 

245 ) 

246 

247 if matching_field_def: 

248 prop_input = PropertyValidationInput( 

249 matching_field_def=matching_field_def, 

250 normalized_prop_values=normalized_prop_values, 

251 prop_uri=prop_uri, 

252 entity_key=entity_key, 

253 custom_filter=custom_filter, 

254 ) 

255 _validate_property_cardinality(errors, prop_input) 

256 _validate_property_values(errors, prop_input) 

257 

258 _check_missing_required_properties( 

259 errors, 

260 entity_fields, 

261 properties, 

262 entity_key, 

263 custom_filter, 

264 ) 

265 

266 return errors