Coverage for heritrace/routes/merge.py: 100%
161 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-18 11:10 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-18 11:10 +0000
1import traceback
2from typing import Dict, List, Optional, Any, Tuple
4from flask import (Blueprint, current_app, flash, jsonify, redirect,
5 render_template, request, url_for)
6from flask_babel import gettext
7from flask_login import login_required, current_user
8from heritrace.extensions import (
9 get_custom_filter, get_sparql, get_counter_handler,
10 get_dataset_endpoint, get_provenance_endpoint, get_dataset_is_quadstore
11)
12from heritrace.utils.display_rules_utils import get_similarity_properties
13from heritrace.utils.sparql_utils import get_entity_types
14from SPARQLWrapper import JSON, POST
16from heritrace.editor import Editor
17from rdflib import URIRef
18from markupsafe import Markup
20merge_bp = Blueprint("merge", __name__)
23def get_entity_details(entity_uri: str) -> Tuple[Optional[Dict[str, Any]], List[str]]:
24 """
25 Fetches all properties (predicates and objects) for a given entity URI,
26 grouped by predicate, along with its types.
28 Args:
29 entity_uri: The URI of the entity to fetch details for.
31 Returns:
32 A tuple containing:
33 - A dictionary where keys are predicate URIs and values are lists of
34 object dictionaries (containing 'value', 'type', 'lang', 'datatype').
35 Returns None if an error occurs.
36 - A list of entity type URIs. Returns an empty list if an error occurs
37 or no types are found.
38 """
39 sparql = get_sparql()
40 custom_filter = get_custom_filter()
41 grouped_properties: Dict[str, List[Dict[str, Any]]] = {}
42 entity_types: List[str] = []
44 try:
45 entity_types = get_entity_types(entity_uri)
46 if not entity_types:
47 current_app.logger.warning(f"No types found for entity: {entity_uri}")
49 query = f"""
50 SELECT DISTINCT ?p ?o WHERE {{
51 <{entity_uri}> ?p ?o .
52 }}
53 """
54 sparql.setQuery(query)
55 sparql.setReturnFormat(JSON)
56 results = sparql.query().convert()
58 bindings = results.get("results", {}).get("bindings", [])
59 for binding in bindings:
60 predicate = binding["p"]["value"]
61 obj_node = binding["o"]
62 obj_details = {
63 "value": obj_node["value"],
64 "type": obj_node["type"],
65 "lang": obj_node.get("xml:lang"),
66 "datatype": obj_node.get("datatype"),
67 "readable_label": None
68 }
69 if obj_details["type"] == 'uri':
70 obj_types = get_entity_types(obj_details["value"])
71 obj_details["readable_label"] = custom_filter.human_readable_entity(obj_details["value"], obj_types)
72 else:
73 obj_details["readable_label"] = obj_details["value"]
76 if predicate not in grouped_properties:
77 grouped_properties[predicate] = []
78 grouped_properties[predicate].append(obj_details)
80 return grouped_properties, entity_types
82 except Exception as e:
83 tb_str = traceback.format_exc()
84 current_app.logger.error(f"Error fetching details for {entity_uri}: {e}\n{tb_str}")
85 return None, []
88@merge_bp.route("/execute-merge", methods=["POST"])
89@login_required
90def execute_merge():
91 """
92 Handles the actual merging of two entities using the Editor class
93 to ensure provenance and data model agnosticism.
94 Entity 1 (keep) absorbs Entity 2 (delete).
95 """
96 entity1_uri = request.form.get("entity1_uri")
97 entity2_uri = request.form.get("entity2_uri")
99 # TODO: Implement CSRF validation if using Flask-WTF
101 if not entity1_uri or not entity2_uri:
102 flash(gettext("Missing entity URIs for merge."), "danger")
103 return redirect(url_for("main.catalogue"))
105 try:
106 custom_filter = get_custom_filter()
108 _, entity1_types = get_entity_details(entity1_uri)
109 _, entity2_types = get_entity_details(entity2_uri)
110 entity1_label = custom_filter.human_readable_entity(entity1_uri, entity1_types) or entity1_uri
111 entity2_label = custom_filter.human_readable_entity(entity2_uri, entity2_types) or entity2_uri
113 counter_handler = get_counter_handler()
114 resp_agent_uri = URIRef(f"https://orcid.org/{current_user.orcid}") if current_user.is_authenticated and hasattr(current_user, 'orcid') else None
116 dataset_endpoint = get_dataset_endpoint()
117 provenance_endpoint = get_provenance_endpoint()
118 dataset_is_quadstore = get_dataset_is_quadstore()
120 editor = Editor(
121 dataset_endpoint=dataset_endpoint,
122 provenance_endpoint=provenance_endpoint,
123 counter_handler=counter_handler,
124 resp_agent=resp_agent_uri,
125 dataset_is_quadstore=dataset_is_quadstore
126 )
128 current_app.logger.info(f"Executing merge via Editor: Keep <{entity1_uri}>, Delete <{entity2_uri}>")
130 editor.merge(keep_entity_uri=entity1_uri, delete_entity_uri=entity2_uri)
132 current_app.logger.info(f"Successfully merged <{entity2_uri}> into <{entity1_uri}> via Editor.")
133 entity1_url = url_for('entity.about', subject=entity1_uri)
134 entity2_url = url_for('entity.about', subject=entity2_uri)
135 flash_message_html = gettext(
136 "Entities merged successfully. "
137 "<a href='%(entity2_url)s' target='_blank'>%(entity2)s</a> "
138 "has been deleted and its references now point to "
139 "<a href='%(entity1_url)s' target='_blank'>%(entity1)s</a>.",
140 entity1=entity1_label,
141 entity2=entity2_label,
142 entity1_url=entity1_url,
143 entity2_url=entity2_url
144 )
146 # Use Markup to render HTML safely
147 flash(Markup(flash_message_html), "success")
149 return redirect(url_for("entity.about", subject=entity1_uri))
151 except ValueError as ve:
152 # Specific handling for merge with self
153 current_app.logger.warning(f"Merge attempt failed: {ve}")
154 flash(str(ve), "warning")
155 return redirect(url_for('.compare_and_merge', subject=entity1_uri, other_subject=entity2_uri))
157 except Exception as e:
158 tb_str = traceback.format_exc()
159 current_app.logger.error(f"Error executing Editor merge for <{entity1_uri}> and <{entity2_uri}>: {e}\n{tb_str}")
160 flash(gettext("An error occurred during the merge operation. Please check the logs. No changes were made."), "danger")
161 return redirect(url_for('.compare_and_merge', subject=entity1_uri, other_subject=entity2_uri))
164@merge_bp.route("/compare-and-merge")
165@login_required
166def compare_and_merge():
167 """
168 Route to display details of two entities side-by-side for merge confirmation.
169 """
170 entity1_uri = request.args.get("subject")
171 entity2_uri = request.args.get("other_subject")
172 custom_filter = get_custom_filter()
175 if not entity1_uri or not entity2_uri:
176 flash(gettext("Two entities must be selected for merging/comparison."), "warning")
177 return redirect(url_for("main.catalogue"))
179 entity1_props, entity1_types = get_entity_details(entity1_uri)
180 entity2_props, entity2_types = get_entity_details(entity2_uri)
182 if entity1_props is None or entity2_props is None:
183 flash(gettext("Could not retrieve details for one or both entities. Check logs."), "danger")
184 return redirect(url_for("main.catalogue"))
186 entity1_label = custom_filter.human_readable_entity(entity1_uri, entity1_types) or entity1_uri
187 entity2_label = custom_filter.human_readable_entity(entity2_uri, entity2_types) or entity2_uri
190 entity1_data = {
191 "uri": entity1_uri,
192 "label": entity1_label,
193 "types": entity1_types,
194 "properties": entity1_props
195 }
196 entity2_data = {
197 "uri": entity2_uri,
198 "label": entity2_label,
199 "types": entity2_types,
200 "properties": entity2_props
201 }
203 return render_template(
204 "entity/merge_confirm.jinja",
205 entity1=entity1_data,
206 entity2=entity2_data
207 )
210@merge_bp.route("/find_similar", methods=["GET"])
211@login_required
212def find_similar_resources():
213 """Find resources potentially similar to a given subject based on shared literal properties."""
214 subject_uri = request.args.get("subject_uri")
215 entity_type = request.args.get("entity_type") # Primary entity type
216 limit = int(request.args.get("limit", 5))
218 if not subject_uri or not entity_type:
219 return jsonify({"status": "error", "message": gettext("Missing required parameters (subject_uri, entity_type)")}), 400
221 try:
222 sparql = get_sparql()
223 custom_filter = get_custom_filter()
225 similarity_properties = get_similarity_properties(entity_type)
227 property_filter = ""
228 if similarity_properties:
229 prop_uris = [f"<{p}>" for p in similarity_properties]
230 property_filter = f"FILTER(?p IN ({', '.join(prop_uris)}))"
232 fetch_comparison_values_query = f"""
233 SELECT DISTINCT ?p ?o WHERE {{
234 <{subject_uri}> ?p ?o .
235 {property_filter}
236 }}
237 """
238 sparql.setQuery(fetch_comparison_values_query)
239 sparql.setReturnFormat(JSON)
240 subject_values_results = sparql.query().convert()
241 subject_values = subject_values_results.get("results", {}).get("bindings", [])
243 if not subject_values:
244 return jsonify({"status": "success", "results": []})
246 similarity_conditions = []
247 for binding in subject_values:
248 prop = binding["p"]["value"]
249 val_node = binding["o"]
250 value = val_node["value"]
251 value_type = val_node["type"]
253 formatted_value = ""
254 if value_type == 'uri':
255 formatted_value = f"<{value}>"
256 elif value_type in {'literal', 'typed-literal'}:
257 datatype = val_node.get("datatype")
258 lang = val_node.get("xml:lang")
259 escaped_value_literal = value.replace('\\', '\\\\').replace('"', '\\"')
260 if datatype:
261 formatted_value = f'"{escaped_value_literal}"^^<{datatype}>'
262 elif lang:
263 formatted_value = f'"{escaped_value_literal}"@{lang}'
264 else:
265 formatted_value = f'"{escaped_value_literal}"'
266 else:
267 continue
269 similarity_conditions.append(f"{{ ?similar <{prop}> {formatted_value} . }}")
271 if not similarity_conditions:
272 return jsonify({"status": "success", "results": []})
274 query_parts = [
275 "SELECT DISTINCT ?similar WHERE {",
276 f" ?similar a <{entity_type}> .",
277 f" FILTER(?similar != <{subject_uri}>)",
278 " {",
279 " " + " \n UNION\n ".join(similarity_conditions),
280 " }",
281 f"}} LIMIT {limit}"
282 ]
283 final_query = "\n".join(query_parts)
285 sparql.setQuery(final_query)
286 sparql.setReturnFormat(JSON)
287 results = sparql.query().convert()
289 candidate_uris = [item["similar"]["value"] for item in results.get("results", {}).get("bindings", [])]
290 transformed_results = []
291 for uri in candidate_uris:
292 readable_label = custom_filter.human_readable_entity(uri, [entity_type])
293 sim_types = get_entity_types(uri)
294 type_labels = [custom_filter.human_readable_predicate(type_uri, sim_types) for type_uri in sim_types]
295 transformed_results.append({
296 "uri": uri,
297 "label": readable_label,
298 "types": sim_types,
299 "type_labels": type_labels
300 })
302 return jsonify({"status": "success", "results": transformed_results})
304 except Exception as e:
305 tb_str = traceback.format_exc()
306 current_app.logger.error(f"Error finding similar resources: {str(e)}\nTraceback: {tb_str}")
307 return jsonify({"status": "error", "message": gettext("An error occurred while finding similar resources")}), 500