Coverage for heritrace / routes / entity / _creation.py: 94%
224 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-07-02 10:16 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-07-02 10:16 +0000
1# SPDX-FileCopyrightText: 2024-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5import json
6from dataclasses import dataclass
8from flask import current_app, flash, jsonify, render_template, request, url_for
9from flask_babel import gettext
10from flask_login import current_user, login_required
11from rdflib import RDF, XSD, Literal, URIRef
12from werkzeug.wrappers import Response
14from heritrace.apis.orcid import get_responsible_agent_uri
15from heritrace.editor import Editor, EditorError, EndpointConfig
16from heritrace.extensions import (
17 get_dataset_endpoint,
18 get_form_fields,
19 get_provenance_endpoint,
20)
21from heritrace.routes.entity._blueprint import entity_bp
22from heritrace.routes.entity._validation import validate_entity_data
23from heritrace.utils.datatypes import DATATYPE_MAPPING, get_datatype_options
24from heritrace.utils.display_rules_utils import (
25 get_class_priority,
26 is_entity_type_visible,
27)
28from heritrace.utils.primary_source_utils import (
29 get_default_primary_source,
30 save_user_default_primary_source,
31)
32from heritrace.utils.shacl_utils import find_matching_form_field
33from heritrace.utils.sparql_utils import import_referenced_entities
34from heritrace.utils.uri_utils import generate_unique_uri, is_valid_url
35from heritrace.utils.virtual_properties import (
36 remove_virtual_properties_from_creation_data,
37 transform_entity_creation_with_virtual_properties,
38)
41def _prepare_entity_creation_data(
42 structured_data: dict,
43) -> tuple[dict, str, dict, URIRef]:
44 cleaned_structured_data = remove_virtual_properties_from_creation_data(
45 structured_data
46 )
47 entity_type: str = cleaned_structured_data["entity_type"]
48 properties = cleaned_structured_data.get("properties", {})
49 entity_uri = generate_unique_uri(entity_type)
51 return cleaned_structured_data, entity_type, properties, entity_uri
54def _setup_editor_for_creation(editor: Editor, cleaned_structured_data: dict) -> None:
55 import_referenced_entities(editor, cleaned_structured_data)
56 editor.preexisting_finished()
59def _process_virtual_properties_after_creation(
60 editor: Editor,
61 structured_data: dict,
62 entity_uri: URIRef,
63 default_graph_uri: URIRef | None,
64) -> None:
65 virtual_entities = transform_entity_creation_with_virtual_properties(
66 structured_data, str(entity_uri)
67 )
69 if virtual_entities:
70 for virtual_entity in virtual_entities:
71 virtual_entity_uri = generate_unique_uri(virtual_entity["entity_type"])
72 create_nested_entity(
73 editor, virtual_entity_uri, virtual_entity, default_graph_uri
74 )
76 editor.save()
79def _create_entity_with_form_fields(
80 editor: Editor,
81 structured_data: dict,
82 entity_uri: URIRef,
83 default_graph_uri: URIRef | None,
84 form_fields: dict,
85) -> None:
86 cleaned_structured_data = remove_virtual_properties_from_creation_data(
87 structured_data
88 )
89 entity_type = cleaned_structured_data["entity_type"]
90 properties = cleaned_structured_data.get("properties", {})
92 _setup_editor_for_creation(editor, cleaned_structured_data)
94 for predicate, raw_values in properties.items():
95 predicate_uri = URIRef(predicate)
96 values = raw_values if isinstance(raw_values, list) else [raw_values]
98 entity_shape = cleaned_structured_data.get("entity_shape")
99 matching_key = find_matching_form_field(entity_type, entity_shape, form_fields)
101 field_definitions = (
102 form_fields.get(matching_key, {}).get(predicate, []) if matching_key else []
103 )
105 property_shape = None
106 if values and isinstance(values[0], dict):
107 property_shape = values[0].get("shape")
109 matching_field_def = None
110 for field_def in field_definitions:
111 if property_shape:
112 if field_def.get("subjectShape") == property_shape:
113 matching_field_def = field_def
114 break
115 elif not field_def.get("subjectShape"):
116 matching_field_def = field_def
117 break
119 if not matching_field_def and field_definitions:
120 matching_field_def = field_definitions[0]
122 ordered_by = matching_field_def.get("orderedBy") if matching_field_def else None
124 ctx = CreationContext(
125 editor=editor,
126 entity_uri=entity_uri,
127 predicate=predicate_uri,
128 default_graph_uri=default_graph_uri,
129 )
131 if ordered_by:
132 process_ordered_properties(ctx, values, URIRef(ordered_by))
133 else:
134 process_unordered_properties(ctx, values, matching_field_def)
137def _create_entity_without_form_fields(
138 editor: Editor,
139 structured_data: dict,
140 entity_uri: URIRef,
141 default_graph_uri: URIRef | None,
142) -> None:
143 cleaned_structured_data = remove_virtual_properties_from_creation_data(
144 structured_data
145 )
146 entity_type = cleaned_structured_data["entity_type"]
147 properties = cleaned_structured_data.get("properties", {})
149 editor.import_entity(entity_uri)
150 _setup_editor_for_creation(editor, cleaned_structured_data)
152 editor.create(
153 entity_uri,
154 RDF.type,
155 URIRef(entity_type),
156 default_graph_uri,
157 )
159 for predicate, values in properties.items():
160 predicate_uri = URIRef(predicate)
161 for value_dict in values:
162 if value_dict["type"] == "uri":
163 editor.create(
164 entity_uri,
165 predicate_uri,
166 URIRef(value_dict["value"]),
167 default_graph_uri,
168 )
169 elif value_dict["type"] == "literal":
170 datatype = (
171 URIRef(value_dict["datatype"])
172 if "datatype" in value_dict
173 else XSD.string
174 )
175 editor.create(
176 entity_uri,
177 predicate_uri,
178 Literal(value_dict["value"], datatype=datatype),
179 default_graph_uri,
180 )
183def _handle_create_entity_post(
184 form_fields: dict,
185 structured_data: dict,
186 primary_source: str | None,
187 *,
188 save_default_source: bool,
189) -> tuple[Response, int]:
190 if primary_source and not is_valid_url(primary_source):
191 return jsonify(
192 {
193 "status": "error",
194 "errors": [gettext("Invalid primary source URL provided")],
195 }
196 ), 400
198 if save_default_source and primary_source and is_valid_url(primary_source):
199 save_user_default_primary_source(current_user.orcid, primary_source)
201 resp_agent = get_responsible_agent_uri(current_user.orcid)
202 editor = Editor(
203 EndpointConfig(
204 dataset=get_dataset_endpoint(),
205 provenance=get_provenance_endpoint(),
206 is_quadstore=current_app.config["DATASET_IS_QUADSTORE"],
207 ),
208 current_app.config["COUNTER_HANDLER"],
209 resp_agent,
210 URIRef(primary_source) if primary_source else None,
211 current_app.config["DATASET_GENERATION_TIME"],
212 )
214 if not structured_data.get("entity_type"):
215 return jsonify(
216 {"status": "error", "errors": [gettext("Entity type is required")]}
217 ), 400
219 cleaned_structured_data, _entity_type, _properties, entity_uri = (
220 _prepare_entity_creation_data(structured_data)
221 )
223 default_graph_uri = (
224 URIRef(f"{entity_uri}/graph") if editor.dataset_is_quadstore else None
225 )
227 if form_fields:
228 validation_errors = validate_entity_data(cleaned_structured_data)
229 if validation_errors:
230 return jsonify({"status": "error", "errors": validation_errors}), 400
231 _create_entity_with_form_fields(
232 editor,
233 structured_data,
234 entity_uri,
235 default_graph_uri,
236 form_fields,
237 )
238 else:
239 _create_entity_without_form_fields(
240 editor,
241 structured_data,
242 entity_uri,
243 default_graph_uri,
244 )
246 try:
247 editor.save()
248 _process_virtual_properties_after_creation(
249 editor, structured_data, entity_uri, default_graph_uri
250 )
251 except (EditorError, OSError) as e:
252 error_message = gettext(
253 "An error occurred while creating the entity: %(error)s", error=str(e)
254 )
255 return jsonify({"status": "error", "errors": [error_message]}), 500
256 else:
257 response = jsonify(
258 {
259 "status": "success",
260 "redirect_url": url_for("entity.about", subject=str(entity_uri)),
261 }
262 )
263 flash(gettext("Entity created successfully"), "success")
264 return response, 200
267@entity_bp.route("/create-entity", methods=["GET", "POST"])
268@login_required
269def create_entity() -> str | tuple[Response, int]:
270 form_fields = get_form_fields()
272 default_primary_source = get_default_primary_source(current_user.orcid)
274 entity_class_shape_pairs = sorted(
275 [
276 entity_key
277 for entity_key in form_fields
278 if is_entity_type_visible(entity_key)
279 ],
280 key=get_class_priority,
281 reverse=True,
282 )
284 datatype_options = get_datatype_options()
286 if request.method == "POST":
287 structured_data = json.loads(request.form.get("structured_data", "{}"))
288 primary_source = request.form.get("primary_source") or None
289 save_default_source = request.form.get("save_default_source") == "true"
290 return _handle_create_entity_post(
291 form_fields,
292 structured_data,
293 primary_source,
294 save_default_source=save_default_source,
295 )
297 return render_template(
298 "create_entity.jinja",
299 datatype_options=datatype_options,
300 dataset_db_triplestore=current_app.config["DATASET_DB_TRIPLESTORE"],
301 dataset_db_text_index_enabled=current_app.config[
302 "DATASET_DB_TEXT_INDEX_ENABLED"
303 ],
304 default_primary_source=default_primary_source,
305 shacl=bool(form_fields),
306 entity_class_shape_pairs=entity_class_shape_pairs,
307 )
310def create_nested_entity(
311 editor: Editor,
312 entity_uri: URIRef,
313 entity_data: dict,
314 graph_uri: URIRef | None = None,
315) -> None:
316 form_fields = get_form_fields()
318 editor.create(
319 entity_uri,
320 RDF.type,
321 URIRef(entity_data["entity_type"]),
322 graph_uri,
323 )
325 entity_type = entity_data.get("entity_type")
326 entity_shape = entity_data.get("entity_shape")
327 properties = entity_data.get("properties", {})
329 matching_key = find_matching_form_field(entity_type, entity_shape, form_fields)
331 if not matching_key:
332 return
334 for predicate, raw_values in properties.items():
335 predicate_uri = URIRef(predicate)
336 values = raw_values if isinstance(raw_values, list) else [raw_values]
337 field_definitions = form_fields[matching_key].get(predicate, [])
339 for value in values:
340 if isinstance(value, dict) and "entity_type" in value:
341 if "intermediateRelation" in value:
342 intermediate_uri = generate_unique_uri(
343 value["intermediateRelation"]["class"]
344 )
345 target_uri = generate_unique_uri(value["entity_type"])
346 editor.create(
347 entity_uri, predicate_uri, intermediate_uri, graph_uri
348 )
349 editor.create(
350 intermediate_uri,
351 URIRef(value["intermediateRelation"]["property"]),
352 target_uri,
353 graph_uri,
354 )
355 create_nested_entity(editor, target_uri, value, graph_uri)
356 else:
357 nested_uri = generate_unique_uri(value["entity_type"])
358 editor.create(entity_uri, predicate_uri, nested_uri, graph_uri)
359 create_nested_entity(editor, nested_uri, value, graph_uri)
360 elif isinstance(value, dict) and value.get("is_existing_entity", False):
361 existing_entity_uri = value.get("entity_uri")
362 if existing_entity_uri:
363 editor.create(
364 entity_uri,
365 predicate_uri,
366 URIRef(existing_entity_uri),
367 graph_uri,
368 )
369 else:
370 str_value = str(value)
371 if is_valid_url(str_value):
372 object_value: URIRef | Literal = URIRef(str_value)
373 else:
374 datatype = XSD.string
375 datatype_uris = []
376 if field_definitions:
377 datatype_uris = field_definitions[0].get("datatypes", [])
378 datatype = determine_datatype(str_value, datatype_uris)
379 object_value = Literal(str_value, datatype=datatype)
380 editor.create(entity_uri, predicate_uri, object_value, graph_uri)
383@dataclass(frozen=True, slots=True)
384class CreationContext:
385 editor: Editor
386 entity_uri: URIRef
387 predicate: URIRef
388 default_graph_uri: URIRef | None
391def process_entity_value(
392 ctx: CreationContext,
393 value: dict | str,
394 matching_field_def: dict | None,
395) -> URIRef | Literal:
396 if isinstance(value, dict) and "entity_type" in value:
397 nested_uri = generate_unique_uri(value["entity_type"])
398 ctx.editor.create(
399 ctx.entity_uri,
400 ctx.predicate,
401 nested_uri,
402 ctx.default_graph_uri,
403 )
404 create_nested_entity(ctx.editor, nested_uri, value, ctx.default_graph_uri)
405 return nested_uri
406 if isinstance(value, dict) and value.get("is_existing_entity", False):
407 entity_ref_uri = value.get("entity_uri")
408 if entity_ref_uri:
409 object_value = URIRef(entity_ref_uri)
410 ctx.editor.create(
411 ctx.entity_uri,
412 ctx.predicate,
413 object_value,
414 ctx.default_graph_uri,
415 )
416 return object_value
417 msg = "Missing entity_uri in existing entity reference"
418 raise ValueError(msg)
419 str_value = str(value)
420 if is_valid_url(str_value):
421 object_value: URIRef | Literal = URIRef(str_value)
422 else:
423 datatype_uris = []
424 if matching_field_def:
425 datatype_uris = matching_field_def.get("datatypes", [])
426 datatype = determine_datatype(str_value, datatype_uris)
427 object_value = Literal(str_value, datatype=datatype)
428 ctx.editor.create(
429 ctx.entity_uri,
430 ctx.predicate,
431 object_value,
432 ctx.default_graph_uri,
433 )
434 return object_value
437def _process_ordered_entity_value(
438 ctx: CreationContext,
439 value: dict,
440) -> URIRef:
441 if isinstance(value, dict) and "entity_type" in value:
442 nested_uri = generate_unique_uri(value["entity_type"])
443 ctx.editor.create(
444 ctx.entity_uri,
445 ctx.predicate,
446 nested_uri,
447 ctx.default_graph_uri,
448 )
449 create_nested_entity(ctx.editor, nested_uri, value, ctx.default_graph_uri)
450 return nested_uri
451 if isinstance(value, dict) and value.get("is_existing_entity", False):
452 nested_uri = URIRef(value["entity_uri"])
453 ctx.editor.create(
454 ctx.entity_uri,
455 ctx.predicate,
456 nested_uri,
457 ctx.default_graph_uri,
458 )
459 return nested_uri
460 msg = "Unexpected value type for ordered property"
461 raise ValueError(msg)
464def process_ordered_properties(
465 ctx: CreationContext,
466 values: list[dict],
467 ordered_by: URIRef,
468) -> None:
469 values_by_shape = {}
470 for value in values:
471 shape = value.get("entity_shape")
472 if not shape:
473 shape = "default_shape"
474 if shape not in values_by_shape:
475 values_by_shape[shape] = []
476 values_by_shape[shape].append(value)
478 for shape_values in values_by_shape.values():
479 previous_entity = None
480 for value in shape_values:
481 nested_uri = _process_ordered_entity_value(ctx, value)
483 if previous_entity:
484 ctx.editor.create(
485 previous_entity,
486 ordered_by,
487 nested_uri,
488 ctx.default_graph_uri,
489 )
490 previous_entity = nested_uri
493def process_unordered_properties(
494 ctx: CreationContext,
495 values: list[dict | str],
496 matching_field_def: dict | None,
497) -> None:
498 for value in values:
499 process_entity_value(ctx, value, matching_field_def)
502def determine_datatype(value: str, datatype_uris: list[str]) -> URIRef:
503 for datatype_uri in datatype_uris:
504 validation_func = next(
505 (d[1] for d in DATATYPE_MAPPING if str(d[0]) == str(datatype_uri)), None
506 )
507 if validation_func and validation_func(value):
508 return URIRef(datatype_uri)
509 return XSD.string