Coverage for heritrace / routes / merge.py: 100%
228 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-21 12:56 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-21 12:56 +0000
1# SPDX-FileCopyrightText: 2025 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5import traceback
6from collections import defaultdict
7from typing import Any, Dict, List, Optional, Tuple
9import validators
10from flask import (Blueprint, current_app, flash, jsonify, redirect,
11 render_template, request, url_for)
12from flask_babel import gettext
13from flask_login import current_user, login_required
14from heritrace.apis.orcid import get_responsible_agent_uri
15from heritrace.editor import Editor
16from heritrace.extensions import (get_counter_handler, get_custom_filter,
17 get_dataset_endpoint,
18 get_dataset_is_quadstore,
19 get_provenance_endpoint, get_sparql)
20from heritrace.utils.display_rules_utils import (get_highest_priority_class,
21 get_similarity_properties)
22from heritrace.utils.primary_source_utils import (
23 get_default_primary_source, save_user_default_primary_source)
24from heritrace.utils.shacl_utils import determine_shape_for_classes
25from heritrace.utils.sparql_utils import get_entity_types
26from markupsafe import Markup
27from rdflib import URIRef
28from SPARQLWrapper import JSON
30merge_bp = Blueprint("merge", __name__)
33def get_entity_details(entity_uri: str) -> Tuple[Optional[Dict[str, Any]], List[str]]:
34 """
35 Fetches all properties (predicates and objects) for a given entity URI,
36 grouped by predicate, along with its types.
38 Args:
39 entity_uri: The URI of the entity to fetch details for.
41 Returns:
42 A tuple containing:
43 - A dictionary where keys are predicate URIs and values are lists of
44 object dictionaries (containing 'value', 'type', 'lang', 'datatype').
45 Returns None if an error occurs.
46 - A list of entity type URIs. Returns an empty list if an error occurs
47 or no types are found.
48 """
49 sparql = get_sparql()
50 custom_filter = get_custom_filter()
51 grouped_properties: Dict[str, List[Dict[str, Any]]] = {}
52 entity_types: List[str] = []
54 try:
55 entity_types = get_entity_types(entity_uri)
56 if not entity_types:
57 current_app.logger.warning(f"No types found for entity: {entity_uri}")
59 query = f"""
60 SELECT DISTINCT ?p ?o WHERE {{
61 <{entity_uri}> ?p ?o .
62 }}
63 """
64 sparql.setQuery(query)
65 sparql.setReturnFormat(JSON)
66 results = sparql.query().convert()
68 bindings = results.get("results", {}).get("bindings", [])
69 for binding in bindings:
70 predicate = binding["p"]["value"]
71 obj_node = binding["o"]
72 obj_details = {
73 "value": obj_node["value"],
74 "type": obj_node["type"],
75 "lang": obj_node.get("xml:lang"),
76 "datatype": obj_node.get("datatype"),
77 "readable_label": None
78 }
79 if obj_details["type"] == 'uri':
80 obj_types = get_entity_types(obj_details["value"])
81 obj_type = get_highest_priority_class(obj_types)
82 obj_details["readable_label"] = custom_filter.human_readable_entity(obj_details["value"], (obj_type, None))
83 else:
84 obj_details["readable_label"] = obj_details["value"]
87 if predicate not in grouped_properties:
88 grouped_properties[predicate] = []
89 grouped_properties[predicate].append(obj_details)
91 return grouped_properties, entity_types
93 except Exception as e:
94 tb_str = traceback.format_exc()
95 current_app.logger.error(f"Error fetching details for {entity_uri}: {e}\n{tb_str}")
96 return None, []
99@merge_bp.route("/execute-merge", methods=["POST"])
100@login_required
101def execute_merge():
102 """
103 Handles the actual merging of two entities using the Editor class
104 to ensure provenance and data model agnosticism.
105 Entity 1 (keep) absorbs Entity 2 (delete).
106 """
107 entity1_uri = request.form.get("entity1_uri")
108 entity2_uri = request.form.get("entity2_uri")
109 primary_source = request.form.get("primary_source")
110 save_default_source = request.form.get("save_default_source") == "true"
112 # TODO: Implement CSRF validation if using Flask-WTF
114 if not entity1_uri or not entity2_uri:
115 flash(gettext("Missing entity URIs for merge."), "danger")
116 return redirect(url_for("main.catalogue"))
118 if primary_source and not validators.url(primary_source):
119 flash(gettext("Invalid primary source URL provided."), "danger")
120 return redirect(url_for('.compare_and_merge', subject=entity1_uri, other_subject=entity2_uri))
122 if save_default_source and primary_source and validators.url(primary_source):
123 save_user_default_primary_source(current_user.orcid, primary_source)
125 try:
126 custom_filter = get_custom_filter()
128 _, entity1_types = get_entity_details(entity1_uri)
129 _, entity2_types = get_entity_details(entity2_uri)
131 entity1_type = get_highest_priority_class(entity1_types)
132 entity2_type = get_highest_priority_class(entity2_types)
133 entity1_shape = determine_shape_for_classes(entity1_types)
134 entity2_shape = determine_shape_for_classes(entity2_types)
135 entity1_label = custom_filter.human_readable_entity(entity1_uri, (entity1_type, entity1_shape)) or entity1_uri
136 entity2_label = custom_filter.human_readable_entity(entity2_uri, (entity2_type, entity2_shape)) or entity2_uri
138 counter_handler = get_counter_handler()
139 resp_agent_uri = URIRef(get_responsible_agent_uri(current_user.orcid)) if current_user.is_authenticated and hasattr(current_user, 'orcid') else None
141 dataset_endpoint = get_dataset_endpoint()
142 provenance_endpoint = get_provenance_endpoint()
143 dataset_is_quadstore = get_dataset_is_quadstore()
145 editor = Editor(
146 dataset_endpoint=dataset_endpoint,
147 provenance_endpoint=provenance_endpoint,
148 counter_handler=counter_handler,
149 resp_agent=resp_agent_uri,
150 dataset_is_quadstore=dataset_is_quadstore
151 )
153 if primary_source and validators.url(primary_source):
154 editor.set_primary_source(primary_source)
156 editor.merge(keep_entity_uri=entity1_uri, delete_entity_uri=entity2_uri)
158 entity1_url = url_for('entity.about', subject=entity1_uri)
159 entity2_url = url_for('entity.about', subject=entity2_uri)
160 flash_message_html = gettext(
161 "Entities merged successfully. "
162 "<a href='%(entity2_url)s' target='_blank'>%(entity2)s</a> "
163 "has been deleted and its references now point to "
164 "<a href='%(entity1_url)s' target='_blank'>%(entity1)s</a>.",
165 entity1=entity1_label,
166 entity2=entity2_label,
167 entity1_url=entity1_url,
168 entity2_url=entity2_url
169 )
171 flash(Markup(flash_message_html), "success")
173 return redirect(url_for("entity.about", subject=entity1_uri))
175 except ValueError as ve:
176 current_app.logger.warning(f"Merge attempt failed: {ve}")
177 flash(str(ve), "warning")
178 return redirect(url_for('.compare_and_merge', subject=entity1_uri, other_subject=entity2_uri))
180 except Exception as e:
181 tb_str = traceback.format_exc()
182 current_app.logger.error(f"Error executing Editor merge for <{entity1_uri}> and <{entity2_uri}>: {e}\n{tb_str}")
183 flash(gettext("An error occurred during the merge operation. Please check the logs. No changes were made."), "danger")
184 return redirect(url_for('.compare_and_merge', subject=entity1_uri, other_subject=entity2_uri))
187@merge_bp.route("/compare-and-merge")
188@login_required
189def compare_and_merge():
190 """
191 Route to display details of two entities side-by-side for merge confirmation.
192 """
193 entity1_uri = request.args.get("subject")
194 entity2_uri = request.args.get("other_subject")
195 custom_filter = get_custom_filter()
198 if not entity1_uri or not entity2_uri:
199 flash(gettext("Two entities must be selected for merging/comparison."), "warning")
200 return redirect(url_for("main.catalogue"))
202 entity1_props, entity1_types = get_entity_details(entity1_uri)
203 entity2_props, entity2_types = get_entity_details(entity2_uri)
205 if entity1_props is None or entity2_props is None:
206 flash(gettext("Could not retrieve details for one or both entities. Check logs."), "danger")
207 return redirect(url_for("main.catalogue"))
209 entity1_type = get_highest_priority_class(entity1_types)
210 entity2_type = get_highest_priority_class(entity2_types)
211 entity1_shape = determine_shape_for_classes(entity1_types)
212 entity2_shape = determine_shape_for_classes(entity2_types)
213 entity1_label = custom_filter.human_readable_entity(entity1_uri, (entity1_type, entity1_shape)) or entity1_uri
214 entity2_label = custom_filter.human_readable_entity(entity2_uri, (entity2_type, entity2_shape)) or entity2_uri
217 entity1_data = {
218 "uri": entity1_uri,
219 "label": entity1_label,
220 "type_label": custom_filter.human_readable_class((entity1_type, entity1_shape)),
221 "type": entity1_type,
222 "shape": entity1_shape,
223 "properties": entity1_props
224 }
225 entity2_data = {
226 "uri": entity2_uri,
227 "label": entity2_label,
228 "type_label": custom_filter.human_readable_class((entity2_type, entity2_shape)),
229 "type": entity2_type,
230 "shape": entity2_shape,
231 "properties": entity2_props
232 }
234 default_primary_source = get_default_primary_source(current_user.orcid)
236 return render_template(
237 "entity/merge_confirm.jinja",
238 entity1=entity1_data,
239 entity2=entity2_data,
240 default_primary_source=default_primary_source
241 )
244@merge_bp.route("/find_similar", methods=["GET"])
245@login_required
246def find_similar_resources():
247 """Find resources potentially similar to a given subject based on shared properties,
248 respecting AND/OR logic defined in display rules."""
249 subject_uri = request.args.get("subject_uri")
250 entity_type = request.args.get("entity_type") # Primary entity type
251 shape_uri = request.args.get("shape_uri")
252 try:
253 limit = int(request.args.get("limit", 5))
254 offset = int(request.args.get("offset", 0))
255 except ValueError:
256 return jsonify({"status": "error", "message": gettext("Invalid limit or offset parameter")}), 400
258 if not subject_uri or not entity_type:
259 return jsonify({"status": "error", "message": gettext("Missing required parameters (subject_uri, entity_type)")}), 400
261 if limit <= 0 or offset < 0:
262 return jsonify({"status": "error", "message": gettext("Limit must be positive and offset non-negative")}), 400
264 try:
265 sparql = get_sparql()
266 custom_filter = get_custom_filter()
268 entity_key = (entity_type, shape_uri)
269 similarity_config = get_similarity_properties(entity_key)
271 if not similarity_config or not isinstance(similarity_config, list):
272 return jsonify({"status": "success", "results": [], "has_more": False})
274 def format_rdf_term(node):
275 value = node["value"]
276 value_type = node["type"]
277 if value_type == 'uri':
278 return f"<{value}>"
279 elif value_type in {'literal', 'typed-literal'}:
280 datatype = node.get("datatype")
281 lang = node.get("xml:lang")
282 escaped_value = value.replace('\\', '\\\\').replace('"', '\\"')
283 if datatype:
284 return f'"{escaped_value}"^^<{datatype}>'
285 elif lang:
286 return f'"{escaped_value}"@{lang}'
287 else:
288 return f'"{escaped_value}"'
289 return None
291 all_props_in_config = set()
292 for item in similarity_config:
293 if isinstance(item, str):
294 all_props_in_config.add(item)
295 elif isinstance(item, dict) and "and" in item:
296 all_props_in_config.update(item["and"])
298 if not all_props_in_config:
299 current_app.logger.warning(f"Empty properties list derived from similarity config for type {entity_type}")
300 return jsonify({"status": "success", "results": [], "has_more": False})
302 prop_uris_formatted_for_filter = [f"<{p}>" for p in all_props_in_config]
303 property_filter_for_subject = f"FILTER(?p IN ({', '.join(prop_uris_formatted_for_filter)}))"
305 fetch_comparison_values_query = f"""
306 SELECT DISTINCT ?p ?o WHERE {{
307 <{subject_uri}> ?p ?o .
308 {property_filter_for_subject}
309 }}
310 """
312 sparql.setQuery(fetch_comparison_values_query)
313 sparql.setReturnFormat(JSON)
314 subject_values_results = sparql.query().convert()
315 subject_bindings = subject_values_results.get("results", {}).get("bindings", [])
317 if not subject_bindings:
318 return jsonify({"status": "success", "results": [], "has_more": False})
320 subject_values_by_prop = defaultdict(list)
321 for binding in subject_bindings:
322 formatted_value = format_rdf_term(binding["o"])
323 if formatted_value:
324 subject_values_by_prop[binding["p"]["value"]].append(formatted_value)
326 union_blocks = []
327 var_counter = 0
329 for condition in similarity_config:
330 if isinstance(condition, str):
331 prop_uri = condition
332 prop_values = subject_values_by_prop.get(prop_uri)
333 if prop_values:
334 var_counter += 1
335 values_filter = ", ".join(prop_values)
336 union_blocks.append(f" {{ ?similar <{prop_uri}> ?o_{var_counter} . FILTER(?o_{var_counter} IN ({values_filter})) }}")
338 elif isinstance(condition, dict) and "and" in condition:
339 and_props = condition["and"]
340 and_patterns = []
341 can_match_and_group = True
343 if not all(p in subject_values_by_prop for p in and_props):
344 can_match_and_group = False
345 current_app.logger.debug(f"Skipping AND group {and_props} because subject {subject_uri} lacks values for all its properties.")
346 continue
348 for prop_uri in and_props:
349 prop_values = subject_values_by_prop.get(prop_uri)
350 var_counter += 1
351 values_filter = ", ".join(prop_values)
352 and_patterns.append(f" ?similar <{prop_uri}> ?o_{var_counter} . FILTER(?o_{var_counter} IN ({values_filter})) .")
354 if can_match_and_group and and_patterns:
355 # Construct the block with newlines outside the formatted expression
356 patterns_str = '\n'.join(and_patterns)
357 union_blocks.append(f" {{\n{patterns_str}\n }}")
359 if not union_blocks:
360 return jsonify({"status": "success", "results": [], "has_more": False})
362 similarity_query_body = " UNION ".join(union_blocks)
364 query_limit = limit + 1
365 final_query = f"""
366 SELECT DISTINCT ?similar WHERE {{
367 ?similar a <{entity_type}> .
368 FILTER(?similar != <{subject_uri}>)
369 {{
370 {similarity_query_body}
371 }}
372 }} ORDER BY ?similar OFFSET {offset} LIMIT {query_limit}
373 """
375 sparql.setQuery(final_query)
376 sparql.setReturnFormat(JSON)
377 results = sparql.query().convert()
379 bindings = results.get("results", {}).get("bindings", [])
380 candidate_uris = [item["similar"]["value"] for item in bindings]
382 has_more = len(candidate_uris) > limit
383 results_to_process = candidate_uris[:limit]
385 transformed_results = []
386 for uri in results_to_process:
387 readable_label = custom_filter.human_readable_entity(uri, (entity_type, shape_uri)) if entity_type else uri
388 transformed_results.append({
389 "uri": uri,
390 "label": readable_label or uri
391 })
393 return jsonify({
394 "status": "success",
395 "results": transformed_results,
396 "has_more": has_more,
397 })
399 except Exception as e:
400 tb_str = traceback.format_exc()
401 current_app.logger.error(f"Error finding similar resources for {subject_uri}: {str(e)}\nTraceback: {tb_str}")
402 return jsonify({"status": "error", "message": gettext("An error occurred while finding similar resources")}), 500