Coverage for heritrace / routes / merge.py: 99%
249 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-07-02 10:16 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-07-02 10:16 +0000
1# SPDX-FileCopyrightText: 2025 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5from __future__ import annotations
7from collections import defaultdict
8from typing import TYPE_CHECKING, Any
10import validators
11from flask import (
12 Blueprint,
13 Response,
14 current_app,
15 flash,
16 jsonify,
17 redirect,
18 render_template,
19 request,
20 url_for,
21)
22from flask_babel import gettext
23from flask_login import current_user, login_required
24from markupsafe import Markup
25from rdflib import URIRef
26from SPARQLWrapper import JSON
28from heritrace.apis.orcid import get_responsible_agent_uri
29from heritrace.editor import Editor, EndpointConfig
30from heritrace.extensions import (
31 get_counter_handler,
32 get_custom_filter,
33 get_dataset_endpoint,
34 get_dataset_is_quadstore,
35 get_provenance_endpoint,
36 get_sparql,
37)
38from heritrace.sparql import get_sparql_bindings
39from heritrace.utils.display_rules_utils import (
40 get_highest_priority_class,
41 get_similarity_properties,
42)
43from heritrace.utils.primary_source_utils import (
44 get_default_primary_source,
45 save_user_default_primary_source,
46)
47from heritrace.utils.shacl_utils import determine_shape_for_classes
48from heritrace.utils.sparql_utils import get_entity_types
50if TYPE_CHECKING:
51 from werkzeug.wrappers import Response as WerkzeugResponse
53merge_bp = Blueprint("merge", __name__)
56def get_entity_details(
57 entity_uri: URIRef,
58) -> tuple[dict[str, list[dict[str, Any]]] | None, list[str]]:
59 """
60 Fetches all properties (predicates and objects) for a given entity URI,
61 grouped by predicate, along with its types.
63 Args:
64 entity_uri: The URI of the entity to fetch details for.
66 Returns:
67 A tuple containing:
68 - A dictionary where keys are predicate URIs and values are lists of
69 object dictionaries (containing 'value', 'type', 'lang', 'datatype').
70 Returns None if an error occurs.
71 - A list of entity type URIs. Returns an empty list if an error occurs
72 or no types are found.
73 """
74 sparql = get_sparql()
75 custom_filter = get_custom_filter()
76 grouped_properties: dict[str, list[dict[str, Any]]] = {}
77 entity_types: list[str] = []
79 try:
80 entity_types = get_entity_types(entity_uri)
81 if not entity_types:
82 current_app.logger.warning("No types found for entity: %s", entity_uri)
84 query = f"""
85 SELECT DISTINCT ?p ?o WHERE {{
86 <{entity_uri}> ?p ?o .
87 }}
88 """
89 sparql.setQuery(query)
90 sparql.setReturnFormat(JSON)
91 results = sparql.query().convert()
93 bindings = get_sparql_bindings(results)
94 for binding in bindings:
95 predicate = binding["p"]["value"]
96 obj_node = binding["o"]
97 obj_details = {
98 "value": obj_node["value"],
99 "type": obj_node["type"],
100 "lang": obj_node.get("xml:lang"),
101 "datatype": obj_node.get("datatype"),
102 "readable_label": None,
103 }
104 if obj_details["type"] == "uri":
105 obj_types = get_entity_types(URIRef(obj_details["value"]))
106 obj_type = get_highest_priority_class(obj_types)
107 if obj_type:
108 obj_details["readable_label"] = custom_filter.human_readable_entity(
109 obj_details["value"], (obj_type, None)
110 )
111 else:
112 obj_details["readable_label"] = obj_details["value"]
113 else:
114 obj_details["readable_label"] = obj_details["value"]
116 if predicate not in grouped_properties:
117 grouped_properties[predicate] = []
118 grouped_properties[predicate].append(obj_details)
120 except Exception:
121 current_app.logger.exception(
122 "Error fetching details for %s",
123 entity_uri,
124 )
125 return None, []
126 else:
127 return grouped_properties, entity_types
130@merge_bp.route("/execute-merge", methods=["POST"])
131@login_required
132def execute_merge() -> WerkzeugResponse:
133 """
134 Handles the actual merging of two entities using the Editor class
135 to ensure provenance and data model agnosticism.
136 Entity 1 (keep) absorbs Entity 2 (delete).
137 """
138 entity1_uri_str = request.form.get("entity1_uri")
139 entity2_uri_str = request.form.get("entity2_uri")
140 primary_source = request.form.get("primary_source")
141 save_default_source = request.form.get("save_default_source") == "true"
143 # TODO(arcangelo): Implement CSRF validation
144 # if using Flask-WTF
146 if not entity1_uri_str or not entity2_uri_str:
147 flash(gettext("Missing entity URIs for merge."), "danger")
148 return redirect(url_for("main.catalogue"))
150 entity1_uri = URIRef(entity1_uri_str)
151 entity2_uri = URIRef(entity2_uri_str)
153 if primary_source and not validators.url(primary_source): # type: ignore[arg-type]
154 flash(gettext("Invalid primary source URL provided."), "danger")
155 return redirect(
156 url_for(
157 ".compare_and_merge", subject=entity1_uri, other_subject=entity2_uri
158 )
159 )
161 if save_default_source and primary_source and validators.url(primary_source): # type: ignore[arg-type]
162 save_user_default_primary_source(current_user.orcid, primary_source)
164 try:
165 custom_filter = get_custom_filter()
167 _, entity1_types = get_entity_details(entity1_uri)
168 _, entity2_types = get_entity_details(entity2_uri)
170 entity1_type = get_highest_priority_class(entity1_types)
171 entity2_type = get_highest_priority_class(entity2_types)
172 entity1_shape = determine_shape_for_classes(entity1_types)
173 entity2_shape = determine_shape_for_classes(entity2_types)
174 entity1_label = (
175 custom_filter.human_readable_entity(
176 entity1_uri, (entity1_type, entity1_shape)
177 )
178 if entity1_type
179 else entity1_uri
180 )
181 entity2_label = (
182 custom_filter.human_readable_entity(
183 entity2_uri, (entity2_type, entity2_shape)
184 )
185 if entity2_type
186 else entity2_uri
187 )
189 counter_handler = get_counter_handler()
190 resp_agent = get_responsible_agent_uri(current_user.orcid)
192 dataset_endpoint = get_dataset_endpoint()
193 provenance_endpoint = get_provenance_endpoint()
194 dataset_is_quadstore = get_dataset_is_quadstore()
196 editor = Editor(
197 EndpointConfig(
198 dataset=dataset_endpoint,
199 provenance=provenance_endpoint,
200 is_quadstore=dataset_is_quadstore,
201 ),
202 counter_handler,
203 resp_agent,
204 )
206 if primary_source and validators.url(primary_source): # type: ignore[arg-type]
207 editor.set_primary_source(URIRef(primary_source))
209 editor.merge(keep_entity_uri=entity1_uri, delete_entity_uri=entity2_uri)
211 entity1_url = url_for("entity.about", subject=entity1_uri)
212 entity2_url = url_for("entity.about", subject=entity2_uri)
213 flash_message_html = gettext(
214 "Entities merged successfully. "
215 "<a href='%(entity2_url)s' target='_blank'>%(entity2)s</a> "
216 "has been deleted and its references now point to "
217 "<a href='%(entity1_url)s' target='_blank'>%(entity1)s</a>.",
218 entity1=entity1_label,
219 entity2=entity2_label,
220 entity1_url=entity1_url,
221 entity2_url=entity2_url,
222 )
224 flash(Markup(flash_message_html), "success") # noqa: S704
226 return redirect(url_for("entity.about", subject=entity1_uri))
228 except ValueError as ve:
229 current_app.logger.warning("Merge attempt failed: %s", ve)
230 flash(str(ve), "warning")
231 return redirect(
232 url_for(
233 ".compare_and_merge", subject=entity1_uri, other_subject=entity2_uri
234 )
235 )
237 except Exception:
238 current_app.logger.exception(
239 "Error executing Editor merge for <%s> and <%s>",
240 entity1_uri,
241 entity2_uri,
242 )
243 flash(
244 gettext(
245 "An error occurred during the merge"
246 " operation. Please check the logs."
247 " No changes were made."
248 ),
249 "danger",
250 )
251 return redirect(
252 url_for(
253 ".compare_and_merge", subject=entity1_uri, other_subject=entity2_uri
254 )
255 )
258@merge_bp.route("/compare-and-merge")
259@login_required
260def compare_and_merge() -> str | WerkzeugResponse:
261 """
262 Route to display details of two entities side-by-side for merge confirmation.
263 """
264 entity1_uri_str = request.args.get("subject")
265 entity2_uri_str = request.args.get("other_subject")
266 custom_filter = get_custom_filter()
268 if not entity1_uri_str or not entity2_uri_str:
269 flash(
270 gettext("Two entities must be selected for merging/comparison."), "warning"
271 )
272 return redirect(url_for("main.catalogue"))
274 entity1_uri = URIRef(entity1_uri_str)
275 entity2_uri = URIRef(entity2_uri_str)
277 entity1_props, entity1_types = get_entity_details(entity1_uri)
278 entity2_props, entity2_types = get_entity_details(entity2_uri)
280 if entity1_props is None or entity2_props is None:
281 flash(
282 gettext("Could not retrieve details for one or both entities. Check logs."),
283 "danger",
284 )
285 return redirect(url_for("main.catalogue"))
287 entity1_type = get_highest_priority_class(entity1_types)
288 entity2_type = get_highest_priority_class(entity2_types)
289 entity1_shape = determine_shape_for_classes(entity1_types)
290 entity2_shape = determine_shape_for_classes(entity2_types)
291 entity1_label = (
292 custom_filter.human_readable_entity(entity1_uri, (entity1_type, entity1_shape))
293 if entity1_type
294 else entity1_uri
295 )
296 entity2_label = (
297 custom_filter.human_readable_entity(entity2_uri, (entity2_type, entity2_shape))
298 if entity2_type
299 else entity2_uri
300 )
302 entity1_data = {
303 "uri": entity1_uri,
304 "label": entity1_label,
305 "type_label": custom_filter.human_readable_class((entity1_type, entity1_shape)),
306 "type": entity1_type,
307 "shape": entity1_shape,
308 "properties": entity1_props,
309 }
310 entity2_data = {
311 "uri": entity2_uri,
312 "label": entity2_label,
313 "type_label": custom_filter.human_readable_class((entity2_type, entity2_shape)),
314 "type": entity2_type,
315 "shape": entity2_shape,
316 "properties": entity2_props,
317 }
319 default_primary_source = get_default_primary_source(current_user.orcid)
321 return render_template(
322 "entity/merge_confirm.jinja",
323 entity1=entity1_data,
324 entity2=entity2_data,
325 default_primary_source=default_primary_source,
326 )
329def _format_rdf_term(node: dict[str, str]) -> str | None:
330 value = node["value"]
331 value_type = node["type"]
332 if value_type == "uri":
333 return f"<{value}>"
334 if value_type in {"literal", "typed-literal"}:
335 datatype = node.get("datatype")
336 lang = node.get("xml:lang")
337 escaped_value = value.replace("\\", "\\\\").replace('"', '\\"')
338 if datatype:
339 return f'"{escaped_value}"^^<{datatype}>'
340 if lang:
341 return f'"{escaped_value}"@{lang}'
342 return f'"{escaped_value}"'
343 return None
346def _fetch_subject_values(
347 subject_uri: str,
348 similarity_config: list,
349) -> defaultdict[str, list[str]] | None:
350 sparql = get_sparql()
352 all_props_in_config: set[str] = set()
353 for item in similarity_config:
354 if isinstance(item, str):
355 all_props_in_config.add(item)
356 elif isinstance(item, dict) and "and" in item:
357 all_props_in_config.update(item["and"])
359 if not all_props_in_config:
360 current_app.logger.warning(
361 "Empty properties list derived from similarity config for type %s",
362 subject_uri,
363 )
364 return None
366 prop_uris_formatted_for_filter = [f"<{p}>" for p in all_props_in_config]
367 property_filter_for_subject = (
368 f"FILTER(?p IN ({', '.join(prop_uris_formatted_for_filter)}))"
369 )
371 fetch_comparison_values_query = f"""
372 SELECT DISTINCT ?p ?o WHERE {{
373 <{subject_uri}> ?p ?o .
374 {property_filter_for_subject}
375 }}
376 """
378 sparql.setQuery(fetch_comparison_values_query)
379 sparql.setReturnFormat(JSON)
380 subject_values_results = sparql.query().convert()
381 subject_bindings = get_sparql_bindings(subject_values_results)
383 if not subject_bindings:
384 return None
386 subject_values_by_prop: defaultdict[str, list[str]] = defaultdict(list)
387 for binding in subject_bindings:
388 formatted_value = _format_rdf_term(binding["o"])
389 if formatted_value:
390 subject_values_by_prop[binding["p"]["value"]].append(formatted_value)
392 return subject_values_by_prop
395def _build_union_blocks(
396 similarity_config: list,
397 subject_values_by_prop: defaultdict[str, list[str]],
398 subject_uri: str,
399) -> list[str]:
400 union_blocks: list[str] = []
401 var_counter = 0
403 for condition in similarity_config:
404 if isinstance(condition, str):
405 prop_values = subject_values_by_prop.get(condition)
406 if prop_values:
407 var_counter += 1
408 values_filter = ", ".join(prop_values)
409 union_blocks.append(
410 f" {{ ?similar <{condition}>"
411 f" ?o_{var_counter} ."
412 f" FILTER(?o_{var_counter}"
413 f" IN ({values_filter})) }}"
414 )
415 elif isinstance(condition, dict) and "and" in condition:
416 block = _build_and_block(
417 condition["and"], subject_values_by_prop, subject_uri, var_counter
418 )
419 if block is not None:
420 text, var_counter = block
421 union_blocks.append(text)
422 else:
423 var_counter += len(condition["and"])
425 return union_blocks
428def _build_and_block(
429 and_props: list[str],
430 subject_values_by_prop: defaultdict[str, list[str]],
431 subject_uri: str,
432 var_counter: int,
433) -> tuple[str, int] | None:
434 if not all(p in subject_values_by_prop for p in and_props):
435 current_app.logger.debug(
436 "Skipping AND group %s because"
437 " subject %s lacks values for"
438 " all its properties.",
439 and_props,
440 subject_uri,
441 )
442 return None
444 and_patterns = []
445 for prop_uri in and_props:
446 prop_values = subject_values_by_prop[prop_uri]
447 var_counter += 1
448 values_filter = ", ".join(prop_values)
449 and_patterns.append(
450 f" ?similar <{prop_uri}>"
451 f" ?o_{var_counter} ."
452 f" FILTER(?o_{var_counter}"
453 f" IN ({values_filter})) ."
454 )
456 patterns_str = "\n".join(and_patterns)
457 return f" {{\n{patterns_str}\n }}", var_counter
460def _execute_similarity_query(
461 union_blocks: list[str],
462 entity_type: str,
463 subject_uri: str,
464 limit: int,
465 offset: int,
466) -> tuple[list[str], bool]:
467 sparql = get_sparql()
468 similarity_query_body = " UNION ".join(union_blocks)
470 query_limit = limit + 1
471 final_query = f"""
472 SELECT DISTINCT ?similar WHERE {{
473 ?similar a <{entity_type}> .
474 FILTER(?similar != <{subject_uri}>)
475 {{
476 {similarity_query_body}
477 }}
478 }} ORDER BY ?similar OFFSET {offset} LIMIT {query_limit}
479 """
481 sparql.setQuery(final_query)
482 sparql.setReturnFormat(JSON)
483 results = sparql.query().convert()
485 bindings = get_sparql_bindings(results)
486 candidate_uris = [item["similar"]["value"] for item in bindings]
488 has_more = len(candidate_uris) > limit
489 return candidate_uris[:limit], has_more
492def _transform_results(
493 uris: list[str],
494 entity_type: str,
495 shape_uri: str | None,
496) -> list[dict[str, str]]:
497 custom_filter = get_custom_filter()
498 transformed: list[dict[str, str]] = []
499 for uri in uris:
500 readable_label = (
501 custom_filter.human_readable_entity(uri, (entity_type, shape_uri))
502 if entity_type
503 else uri
504 )
505 transformed.append({"uri": uri, "label": readable_label or uri})
506 return transformed
509@merge_bp.route("/find_similar", methods=["GET"])
510@login_required
511def find_similar_resources() -> Response | tuple[Response, int]: # noqa: PLR0911
512 subject_uri = request.args.get("subject_uri")
513 entity_type = request.args.get("entity_type")
514 shape_uri = request.args.get("shape_uri")
515 try:
516 limit = int(request.args.get("limit", 5))
517 offset = int(request.args.get("offset", 0))
518 except ValueError:
519 return jsonify(
520 {"status": "error", "message": gettext("Invalid limit or offset parameter")}
521 ), 400
523 if not subject_uri or not entity_type:
524 return jsonify(
525 {
526 "status": "error",
527 "message": gettext(
528 "Missing required parameters (subject_uri, entity_type)"
529 ),
530 }
531 ), 400
533 if limit <= 0 or offset < 0:
534 return jsonify(
535 {
536 "status": "error",
537 "message": gettext("Limit must be positive and offset non-negative"),
538 }
539 ), 400
541 try:
542 entity_key = (entity_type, shape_uri)
543 similarity_config = get_similarity_properties(entity_key)
545 if not similarity_config or not isinstance(similarity_config, list):
546 return jsonify({"status": "success", "results": [], "has_more": False})
548 subject_values_by_prop = _fetch_subject_values(subject_uri, similarity_config)
549 if subject_values_by_prop is None:
550 return jsonify({"status": "success", "results": [], "has_more": False})
552 union_blocks = _build_union_blocks(
553 similarity_config, subject_values_by_prop, subject_uri
554 )
555 if not union_blocks:
556 return jsonify({"status": "success", "results": [], "has_more": False})
558 result_uris, has_more = _execute_similarity_query(
559 union_blocks, entity_type, subject_uri, limit, offset
560 )
561 transformed_results = _transform_results(result_uris, entity_type, shape_uri)
563 return jsonify(
564 {
565 "status": "success",
566 "results": transformed_results,
567 "has_more": has_more,
568 }
569 )
571 except Exception:
572 current_app.logger.exception(
573 "Error finding similar resources for %s", subject_uri
574 )
575 return jsonify(
576 {
577 "status": "error",
578 "message": gettext("An error occurred while finding similar resources"),
579 }
580 ), 500