Coverage for heritrace/routes/merge.py: 100%
228 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-08-01 22:12 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-08-01 22:12 +0000
1import traceback
2from collections import defaultdict
3from typing import Any, Dict, List, Optional, Tuple
5import validators
6from flask import (Blueprint, current_app, flash, jsonify, redirect,
7 render_template, request, url_for)
8from flask_babel import gettext
9from flask_login import current_user, login_required
10from heritrace.apis.orcid import get_responsible_agent_uri
11from heritrace.editor import Editor
12from heritrace.extensions import (get_counter_handler, get_custom_filter,
13 get_dataset_endpoint,
14 get_dataset_is_quadstore,
15 get_provenance_endpoint, get_sparql)
16from heritrace.utils.display_rules_utils import (get_highest_priority_class,
17 get_similarity_properties)
18from heritrace.utils.primary_source_utils import (
19 get_default_primary_source, save_user_default_primary_source)
20from heritrace.utils.shacl_utils import determine_shape_for_classes
21from heritrace.utils.sparql_utils import get_entity_types
22from markupsafe import Markup
23from rdflib import URIRef
24from SPARQLWrapper import JSON
26merge_bp = Blueprint("merge", __name__)
29def get_entity_details(entity_uri: str) -> Tuple[Optional[Dict[str, Any]], List[str]]:
30 """
31 Fetches all properties (predicates and objects) for a given entity URI,
32 grouped by predicate, along with its types.
34 Args:
35 entity_uri: The URI of the entity to fetch details for.
37 Returns:
38 A tuple containing:
39 - A dictionary where keys are predicate URIs and values are lists of
40 object dictionaries (containing 'value', 'type', 'lang', 'datatype').
41 Returns None if an error occurs.
42 - A list of entity type URIs. Returns an empty list if an error occurs
43 or no types are found.
44 """
45 sparql = get_sparql()
46 custom_filter = get_custom_filter()
47 grouped_properties: Dict[str, List[Dict[str, Any]]] = {}
48 entity_types: List[str] = []
50 try:
51 entity_types = get_entity_types(entity_uri)
52 if not entity_types:
53 current_app.logger.warning(f"No types found for entity: {entity_uri}")
55 query = f"""
56 SELECT DISTINCT ?p ?o WHERE {{
57 <{entity_uri}> ?p ?o .
58 }}
59 """
60 sparql.setQuery(query)
61 sparql.setReturnFormat(JSON)
62 results = sparql.query().convert()
64 bindings = results.get("results", {}).get("bindings", [])
65 for binding in bindings:
66 predicate = binding["p"]["value"]
67 obj_node = binding["o"]
68 obj_details = {
69 "value": obj_node["value"],
70 "type": obj_node["type"],
71 "lang": obj_node.get("xml:lang"),
72 "datatype": obj_node.get("datatype"),
73 "readable_label": None
74 }
75 if obj_details["type"] == 'uri':
76 obj_types = get_entity_types(obj_details["value"])
77 obj_type = get_highest_priority_class(obj_types)
78 obj_details["readable_label"] = custom_filter.human_readable_entity(obj_details["value"], (obj_type, None))
79 else:
80 obj_details["readable_label"] = obj_details["value"]
83 if predicate not in grouped_properties:
84 grouped_properties[predicate] = []
85 grouped_properties[predicate].append(obj_details)
87 return grouped_properties, entity_types
89 except Exception as e:
90 tb_str = traceback.format_exc()
91 current_app.logger.error(f"Error fetching details for {entity_uri}: {e}\n{tb_str}")
92 return None, []
95@merge_bp.route("/execute-merge", methods=["POST"])
96@login_required
97def execute_merge():
98 """
99 Handles the actual merging of two entities using the Editor class
100 to ensure provenance and data model agnosticism.
101 Entity 1 (keep) absorbs Entity 2 (delete).
102 """
103 entity1_uri = request.form.get("entity1_uri")
104 entity2_uri = request.form.get("entity2_uri")
105 primary_source = request.form.get("primary_source")
106 save_default_source = request.form.get("save_default_source") == "true"
108 # TODO: Implement CSRF validation if using Flask-WTF
110 if not entity1_uri or not entity2_uri:
111 flash(gettext("Missing entity URIs for merge."), "danger")
112 return redirect(url_for("main.catalogue"))
114 if primary_source and not validators.url(primary_source):
115 flash(gettext("Invalid primary source URL provided."), "danger")
116 return redirect(url_for('.compare_and_merge', subject=entity1_uri, other_subject=entity2_uri))
118 if save_default_source and primary_source and validators.url(primary_source):
119 save_user_default_primary_source(current_user.orcid, primary_source)
121 try:
122 custom_filter = get_custom_filter()
124 _, entity1_types = get_entity_details(entity1_uri)
125 _, entity2_types = get_entity_details(entity2_uri)
127 entity1_type = get_highest_priority_class(entity1_types)
128 entity2_type = get_highest_priority_class(entity2_types)
129 entity1_shape = determine_shape_for_classes(entity1_types)
130 entity2_shape = determine_shape_for_classes(entity2_types)
131 entity1_label = custom_filter.human_readable_entity(entity1_uri, (entity1_type, entity1_shape)) or entity1_uri
132 entity2_label = custom_filter.human_readable_entity(entity2_uri, (entity2_type, entity2_shape)) or entity2_uri
134 counter_handler = get_counter_handler()
135 resp_agent_uri = URIRef(get_responsible_agent_uri(current_user.orcid)) if current_user.is_authenticated and hasattr(current_user, 'orcid') else None
137 dataset_endpoint = get_dataset_endpoint()
138 provenance_endpoint = get_provenance_endpoint()
139 dataset_is_quadstore = get_dataset_is_quadstore()
141 editor = Editor(
142 dataset_endpoint=dataset_endpoint,
143 provenance_endpoint=provenance_endpoint,
144 counter_handler=counter_handler,
145 resp_agent=resp_agent_uri,
146 dataset_is_quadstore=dataset_is_quadstore
147 )
149 if primary_source and validators.url(primary_source):
150 editor.set_primary_source(primary_source)
152 editor.merge(keep_entity_uri=entity1_uri, delete_entity_uri=entity2_uri)
154 entity1_url = url_for('entity.about', subject=entity1_uri)
155 entity2_url = url_for('entity.about', subject=entity2_uri)
156 flash_message_html = gettext(
157 "Entities merged successfully. "
158 "<a href='%(entity2_url)s' target='_blank'>%(entity2)s</a> "
159 "has been deleted and its references now point to "
160 "<a href='%(entity1_url)s' target='_blank'>%(entity1)s</a>.",
161 entity1=entity1_label,
162 entity2=entity2_label,
163 entity1_url=entity1_url,
164 entity2_url=entity2_url
165 )
167 flash(Markup(flash_message_html), "success")
169 return redirect(url_for("entity.about", subject=entity1_uri))
171 except ValueError as ve:
172 current_app.logger.warning(f"Merge attempt failed: {ve}")
173 flash(str(ve), "warning")
174 return redirect(url_for('.compare_and_merge', subject=entity1_uri, other_subject=entity2_uri))
176 except Exception as e:
177 tb_str = traceback.format_exc()
178 current_app.logger.error(f"Error executing Editor merge for <{entity1_uri}> and <{entity2_uri}>: {e}\n{tb_str}")
179 flash(gettext("An error occurred during the merge operation. Please check the logs. No changes were made."), "danger")
180 return redirect(url_for('.compare_and_merge', subject=entity1_uri, other_subject=entity2_uri))
183@merge_bp.route("/compare-and-merge")
184@login_required
185def compare_and_merge():
186 """
187 Route to display details of two entities side-by-side for merge confirmation.
188 """
189 entity1_uri = request.args.get("subject")
190 entity2_uri = request.args.get("other_subject")
191 custom_filter = get_custom_filter()
194 if not entity1_uri or not entity2_uri:
195 flash(gettext("Two entities must be selected for merging/comparison."), "warning")
196 return redirect(url_for("main.catalogue"))
198 entity1_props, entity1_types = get_entity_details(entity1_uri)
199 entity2_props, entity2_types = get_entity_details(entity2_uri)
201 if entity1_props is None or entity2_props is None:
202 flash(gettext("Could not retrieve details for one or both entities. Check logs."), "danger")
203 return redirect(url_for("main.catalogue"))
205 entity1_type = get_highest_priority_class(entity1_types)
206 entity2_type = get_highest_priority_class(entity2_types)
207 entity1_shape = determine_shape_for_classes(entity1_types)
208 entity2_shape = determine_shape_for_classes(entity2_types)
209 entity1_label = custom_filter.human_readable_entity(entity1_uri, (entity1_type, entity1_shape)) or entity1_uri
210 entity2_label = custom_filter.human_readable_entity(entity2_uri, (entity2_type, entity2_shape)) or entity2_uri
213 entity1_data = {
214 "uri": entity1_uri,
215 "label": entity1_label,
216 "type_label": custom_filter.human_readable_class((entity1_type, entity1_shape)),
217 "type": entity1_type,
218 "shape": entity1_shape,
219 "properties": entity1_props
220 }
221 entity2_data = {
222 "uri": entity2_uri,
223 "label": entity2_label,
224 "type_label": custom_filter.human_readable_class((entity2_type, entity2_shape)),
225 "type": entity2_type,
226 "shape": entity2_shape,
227 "properties": entity2_props
228 }
230 default_primary_source = get_default_primary_source(current_user.orcid)
232 return render_template(
233 "entity/merge_confirm.jinja",
234 entity1=entity1_data,
235 entity2=entity2_data,
236 default_primary_source=default_primary_source
237 )
240@merge_bp.route("/find_similar", methods=["GET"])
241@login_required
242def find_similar_resources():
243 """Find resources potentially similar to a given subject based on shared properties,
244 respecting AND/OR logic defined in display rules."""
245 subject_uri = request.args.get("subject_uri")
246 entity_type = request.args.get("entity_type") # Primary entity type
247 shape_uri = request.args.get("shape_uri")
248 try:
249 limit = int(request.args.get("limit", 5))
250 offset = int(request.args.get("offset", 0))
251 except ValueError:
252 return jsonify({"status": "error", "message": gettext("Invalid limit or offset parameter")}), 400
254 if not subject_uri or not entity_type:
255 return jsonify({"status": "error", "message": gettext("Missing required parameters (subject_uri, entity_type)")}), 400
257 if limit <= 0 or offset < 0:
258 return jsonify({"status": "error", "message": gettext("Limit must be positive and offset non-negative")}), 400
260 try:
261 sparql = get_sparql()
262 custom_filter = get_custom_filter()
264 entity_key = (entity_type, shape_uri)
265 similarity_config = get_similarity_properties(entity_key)
267 if not similarity_config or not isinstance(similarity_config, list):
268 return jsonify({"status": "success", "results": [], "has_more": False})
270 def format_rdf_term(node):
271 value = node["value"]
272 value_type = node["type"]
273 if value_type == 'uri':
274 return f"<{value}>"
275 elif value_type in {'literal', 'typed-literal'}:
276 datatype = node.get("datatype")
277 lang = node.get("xml:lang")
278 escaped_value = value.replace('\\', '\\\\').replace('"', '\\"')
279 if datatype:
280 return f'"{escaped_value}"^^<{datatype}>'
281 elif lang:
282 return f'"{escaped_value}"@{lang}'
283 else:
284 return f'"{escaped_value}"'
285 return None
287 all_props_in_config = set()
288 for item in similarity_config:
289 if isinstance(item, str):
290 all_props_in_config.add(item)
291 elif isinstance(item, dict) and "and" in item:
292 all_props_in_config.update(item["and"])
294 if not all_props_in_config:
295 current_app.logger.warning(f"Empty properties list derived from similarity config for type {entity_type}")
296 return jsonify({"status": "success", "results": [], "has_more": False})
298 prop_uris_formatted_for_filter = [f"<{p}>" for p in all_props_in_config]
299 property_filter_for_subject = f"FILTER(?p IN ({', '.join(prop_uris_formatted_for_filter)}))"
301 fetch_comparison_values_query = f"""
302 SELECT DISTINCT ?p ?o WHERE {{
303 <{subject_uri}> ?p ?o .
304 {property_filter_for_subject}
305 }}
306 """
308 sparql.setQuery(fetch_comparison_values_query)
309 sparql.setReturnFormat(JSON)
310 subject_values_results = sparql.query().convert()
311 subject_bindings = subject_values_results.get("results", {}).get("bindings", [])
313 if not subject_bindings:
314 return jsonify({"status": "success", "results": [], "has_more": False})
316 subject_values_by_prop = defaultdict(list)
317 for binding in subject_bindings:
318 formatted_value = format_rdf_term(binding["o"])
319 if formatted_value:
320 subject_values_by_prop[binding["p"]["value"]].append(formatted_value)
322 union_blocks = []
323 var_counter = 0
325 for condition in similarity_config:
326 if isinstance(condition, str):
327 prop_uri = condition
328 prop_values = subject_values_by_prop.get(prop_uri)
329 if prop_values:
330 var_counter += 1
331 values_filter = ", ".join(prop_values)
332 union_blocks.append(f" {{ ?similar <{prop_uri}> ?o_{var_counter} . FILTER(?o_{var_counter} IN ({values_filter})) }}")
334 elif isinstance(condition, dict) and "and" in condition:
335 and_props = condition["and"]
336 and_patterns = []
337 can_match_and_group = True
339 if not all(p in subject_values_by_prop for p in and_props):
340 can_match_and_group = False
341 current_app.logger.debug(f"Skipping AND group {and_props} because subject {subject_uri} lacks values for all its properties.")
342 continue
344 for prop_uri in and_props:
345 prop_values = subject_values_by_prop.get(prop_uri)
346 var_counter += 1
347 values_filter = ", ".join(prop_values)
348 and_patterns.append(f" ?similar <{prop_uri}> ?o_{var_counter} . FILTER(?o_{var_counter} IN ({values_filter})) .")
350 if can_match_and_group and and_patterns:
351 # Construct the block with newlines outside the formatted expression
352 patterns_str = '\n'.join(and_patterns)
353 union_blocks.append(f" {{\n{patterns_str}\n }}")
355 if not union_blocks:
356 return jsonify({"status": "success", "results": [], "has_more": False})
358 similarity_query_body = " UNION ".join(union_blocks)
360 query_limit = limit + 1
361 final_query = f"""
362 SELECT DISTINCT ?similar WHERE {{
363 ?similar a <{entity_type}> .
364 FILTER(?similar != <{subject_uri}>)
365 {{
366 {similarity_query_body}
367 }}
368 }} ORDER BY ?similar OFFSET {offset} LIMIT {query_limit}
369 """
371 sparql.setQuery(final_query)
372 sparql.setReturnFormat(JSON)
373 results = sparql.query().convert()
375 bindings = results.get("results", {}).get("bindings", [])
376 candidate_uris = [item["similar"]["value"] for item in bindings]
378 has_more = len(candidate_uris) > limit
379 results_to_process = candidate_uris[:limit]
381 transformed_results = []
382 for uri in results_to_process:
383 readable_label = custom_filter.human_readable_entity(uri, (entity_type, shape_uri)) if entity_type else uri
384 transformed_results.append({
385 "uri": uri,
386 "label": readable_label or uri
387 })
389 return jsonify({
390 "status": "success",
391 "results": transformed_results,
392 "has_more": has_more,
393 })
395 except Exception as e:
396 tb_str = traceback.format_exc()
397 current_app.logger.error(f"Error finding similar resources for {subject_uri}: {str(e)}\nTraceback: {tb_str}")
398 return jsonify({"status": "error", "message": gettext("An error occurred while finding similar resources")}), 500