Coverage for heritrace / routes / merge.py: 100%

228 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-21 12:56 +0000

1# SPDX-FileCopyrightText: 2025 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5import traceback 

6from collections import defaultdict 

7from typing import Any, Dict, List, Optional, Tuple 

8 

9import validators 

10from flask import (Blueprint, current_app, flash, jsonify, redirect, 

11 render_template, request, url_for) 

12from flask_babel import gettext 

13from flask_login import current_user, login_required 

14from heritrace.apis.orcid import get_responsible_agent_uri 

15from heritrace.editor import Editor 

16from heritrace.extensions import (get_counter_handler, get_custom_filter, 

17 get_dataset_endpoint, 

18 get_dataset_is_quadstore, 

19 get_provenance_endpoint, get_sparql) 

20from heritrace.utils.display_rules_utils import (get_highest_priority_class, 

21 get_similarity_properties) 

22from heritrace.utils.primary_source_utils import ( 

23 get_default_primary_source, save_user_default_primary_source) 

24from heritrace.utils.shacl_utils import determine_shape_for_classes 

25from heritrace.utils.sparql_utils import get_entity_types 

26from markupsafe import Markup 

27from rdflib import URIRef 

28from SPARQLWrapper import JSON 

29 

30merge_bp = Blueprint("merge", __name__) 

31 

32 

33def get_entity_details(entity_uri: str) -> Tuple[Optional[Dict[str, Any]], List[str]]: 

34 """ 

35 Fetches all properties (predicates and objects) for a given entity URI, 

36 grouped by predicate, along with its types. 

37 

38 Args: 

39 entity_uri: The URI of the entity to fetch details for. 

40 

41 Returns: 

42 A tuple containing: 

43 - A dictionary where keys are predicate URIs and values are lists of 

44 object dictionaries (containing 'value', 'type', 'lang', 'datatype'). 

45 Returns None if an error occurs. 

46 - A list of entity type URIs. Returns an empty list if an error occurs 

47 or no types are found. 

48 """ 

49 sparql = get_sparql() 

50 custom_filter = get_custom_filter() 

51 grouped_properties: Dict[str, List[Dict[str, Any]]] = {} 

52 entity_types: List[str] = [] 

53 

54 try: 

55 entity_types = get_entity_types(entity_uri) 

56 if not entity_types: 

57 current_app.logger.warning(f"No types found for entity: {entity_uri}") 

58 

59 query = f""" 

60 SELECT DISTINCT ?p ?o WHERE {{ 

61 <{entity_uri}> ?p ?o . 

62 }} 

63 """ 

64 sparql.setQuery(query) 

65 sparql.setReturnFormat(JSON) 

66 results = sparql.query().convert() 

67 

68 bindings = results.get("results", {}).get("bindings", []) 

69 for binding in bindings: 

70 predicate = binding["p"]["value"] 

71 obj_node = binding["o"] 

72 obj_details = { 

73 "value": obj_node["value"], 

74 "type": obj_node["type"], 

75 "lang": obj_node.get("xml:lang"), 

76 "datatype": obj_node.get("datatype"), 

77 "readable_label": None 

78 } 

79 if obj_details["type"] == 'uri': 

80 obj_types = get_entity_types(obj_details["value"]) 

81 obj_type = get_highest_priority_class(obj_types) 

82 obj_details["readable_label"] = custom_filter.human_readable_entity(obj_details["value"], (obj_type, None)) 

83 else: 

84 obj_details["readable_label"] = obj_details["value"] 

85 

86 

87 if predicate not in grouped_properties: 

88 grouped_properties[predicate] = [] 

89 grouped_properties[predicate].append(obj_details) 

90 

91 return grouped_properties, entity_types 

92 

93 except Exception as e: 

94 tb_str = traceback.format_exc() 

95 current_app.logger.error(f"Error fetching details for {entity_uri}: {e}\n{tb_str}") 

96 return None, [] 

97 

98 

99@merge_bp.route("/execute-merge", methods=["POST"]) 

100@login_required 

101def execute_merge(): 

102 """ 

103 Handles the actual merging of two entities using the Editor class 

104 to ensure provenance and data model agnosticism. 

105 Entity 1 (keep) absorbs Entity 2 (delete). 

106 """ 

107 entity1_uri = request.form.get("entity1_uri") 

108 entity2_uri = request.form.get("entity2_uri") 

109 primary_source = request.form.get("primary_source") 

110 save_default_source = request.form.get("save_default_source") == "true" 

111 

112 # TODO: Implement CSRF validation if using Flask-WTF 

113 

114 if not entity1_uri or not entity2_uri: 

115 flash(gettext("Missing entity URIs for merge."), "danger") 

116 return redirect(url_for("main.catalogue")) 

117 

118 if primary_source and not validators.url(primary_source): 

119 flash(gettext("Invalid primary source URL provided."), "danger") 

120 return redirect(url_for('.compare_and_merge', subject=entity1_uri, other_subject=entity2_uri)) 

121 

122 if save_default_source and primary_source and validators.url(primary_source): 

123 save_user_default_primary_source(current_user.orcid, primary_source) 

124 

125 try: 

126 custom_filter = get_custom_filter() 

127 

128 _, entity1_types = get_entity_details(entity1_uri) 

129 _, entity2_types = get_entity_details(entity2_uri) 

130 

131 entity1_type = get_highest_priority_class(entity1_types) 

132 entity2_type = get_highest_priority_class(entity2_types) 

133 entity1_shape = determine_shape_for_classes(entity1_types) 

134 entity2_shape = determine_shape_for_classes(entity2_types) 

135 entity1_label = custom_filter.human_readable_entity(entity1_uri, (entity1_type, entity1_shape)) or entity1_uri 

136 entity2_label = custom_filter.human_readable_entity(entity2_uri, (entity2_type, entity2_shape)) or entity2_uri 

137 

138 counter_handler = get_counter_handler() 

139 resp_agent_uri = URIRef(get_responsible_agent_uri(current_user.orcid)) if current_user.is_authenticated and hasattr(current_user, 'orcid') else None 

140 

141 dataset_endpoint = get_dataset_endpoint() 

142 provenance_endpoint = get_provenance_endpoint() 

143 dataset_is_quadstore = get_dataset_is_quadstore() 

144 

145 editor = Editor( 

146 dataset_endpoint=dataset_endpoint, 

147 provenance_endpoint=provenance_endpoint, 

148 counter_handler=counter_handler, 

149 resp_agent=resp_agent_uri, 

150 dataset_is_quadstore=dataset_is_quadstore 

151 ) 

152 

153 if primary_source and validators.url(primary_source): 

154 editor.set_primary_source(primary_source) 

155 

156 editor.merge(keep_entity_uri=entity1_uri, delete_entity_uri=entity2_uri) 

157 

158 entity1_url = url_for('entity.about', subject=entity1_uri) 

159 entity2_url = url_for('entity.about', subject=entity2_uri) 

160 flash_message_html = gettext( 

161 "Entities merged successfully. " 

162 "<a href='%(entity2_url)s' target='_blank'>%(entity2)s</a> " 

163 "has been deleted and its references now point to " 

164 "<a href='%(entity1_url)s' target='_blank'>%(entity1)s</a>.", 

165 entity1=entity1_label, 

166 entity2=entity2_label, 

167 entity1_url=entity1_url, 

168 entity2_url=entity2_url 

169 ) 

170 

171 flash(Markup(flash_message_html), "success") 

172 

173 return redirect(url_for("entity.about", subject=entity1_uri)) 

174 

175 except ValueError as ve: 

176 current_app.logger.warning(f"Merge attempt failed: {ve}") 

177 flash(str(ve), "warning") 

178 return redirect(url_for('.compare_and_merge', subject=entity1_uri, other_subject=entity2_uri)) 

179 

180 except Exception as e: 

181 tb_str = traceback.format_exc() 

182 current_app.logger.error(f"Error executing Editor merge for <{entity1_uri}> and <{entity2_uri}>: {e}\n{tb_str}") 

183 flash(gettext("An error occurred during the merge operation. Please check the logs. No changes were made."), "danger") 

184 return redirect(url_for('.compare_and_merge', subject=entity1_uri, other_subject=entity2_uri)) 

185 

186 

187@merge_bp.route("/compare-and-merge") 

188@login_required 

189def compare_and_merge(): 

190 """ 

191 Route to display details of two entities side-by-side for merge confirmation. 

192 """ 

193 entity1_uri = request.args.get("subject") 

194 entity2_uri = request.args.get("other_subject") 

195 custom_filter = get_custom_filter() 

196 

197 

198 if not entity1_uri or not entity2_uri: 

199 flash(gettext("Two entities must be selected for merging/comparison."), "warning") 

200 return redirect(url_for("main.catalogue")) 

201 

202 entity1_props, entity1_types = get_entity_details(entity1_uri) 

203 entity2_props, entity2_types = get_entity_details(entity2_uri) 

204 

205 if entity1_props is None or entity2_props is None: 

206 flash(gettext("Could not retrieve details for one or both entities. Check logs."), "danger") 

207 return redirect(url_for("main.catalogue")) 

208 

209 entity1_type = get_highest_priority_class(entity1_types) 

210 entity2_type = get_highest_priority_class(entity2_types) 

211 entity1_shape = determine_shape_for_classes(entity1_types) 

212 entity2_shape = determine_shape_for_classes(entity2_types) 

213 entity1_label = custom_filter.human_readable_entity(entity1_uri, (entity1_type, entity1_shape)) or entity1_uri 

214 entity2_label = custom_filter.human_readable_entity(entity2_uri, (entity2_type, entity2_shape)) or entity2_uri 

215 

216 

217 entity1_data = { 

218 "uri": entity1_uri, 

219 "label": entity1_label, 

220 "type_label": custom_filter.human_readable_class((entity1_type, entity1_shape)), 

221 "type": entity1_type, 

222 "shape": entity1_shape, 

223 "properties": entity1_props 

224 } 

225 entity2_data = { 

226 "uri": entity2_uri, 

227 "label": entity2_label, 

228 "type_label": custom_filter.human_readable_class((entity2_type, entity2_shape)), 

229 "type": entity2_type, 

230 "shape": entity2_shape, 

231 "properties": entity2_props 

232 } 

233 

234 default_primary_source = get_default_primary_source(current_user.orcid) 

235 

236 return render_template( 

237 "entity/merge_confirm.jinja", 

238 entity1=entity1_data, 

239 entity2=entity2_data, 

240 default_primary_source=default_primary_source 

241 ) 

242 

243 

244@merge_bp.route("/find_similar", methods=["GET"]) 

245@login_required 

246def find_similar_resources(): 

247 """Find resources potentially similar to a given subject based on shared properties, 

248 respecting AND/OR logic defined in display rules.""" 

249 subject_uri = request.args.get("subject_uri") 

250 entity_type = request.args.get("entity_type") # Primary entity type 

251 shape_uri = request.args.get("shape_uri") 

252 try: 

253 limit = int(request.args.get("limit", 5)) 

254 offset = int(request.args.get("offset", 0)) 

255 except ValueError: 

256 return jsonify({"status": "error", "message": gettext("Invalid limit or offset parameter")}), 400 

257 

258 if not subject_uri or not entity_type: 

259 return jsonify({"status": "error", "message": gettext("Missing required parameters (subject_uri, entity_type)")}), 400 

260 

261 if limit <= 0 or offset < 0: 

262 return jsonify({"status": "error", "message": gettext("Limit must be positive and offset non-negative")}), 400 

263 

264 try: 

265 sparql = get_sparql() 

266 custom_filter = get_custom_filter() 

267 

268 entity_key = (entity_type, shape_uri) 

269 similarity_config = get_similarity_properties(entity_key) 

270 

271 if not similarity_config or not isinstance(similarity_config, list): 

272 return jsonify({"status": "success", "results": [], "has_more": False}) 

273 

274 def format_rdf_term(node): 

275 value = node["value"] 

276 value_type = node["type"] 

277 if value_type == 'uri': 

278 return f"<{value}>" 

279 elif value_type in {'literal', 'typed-literal'}: 

280 datatype = node.get("datatype") 

281 lang = node.get("xml:lang") 

282 escaped_value = value.replace('\\', '\\\\').replace('"', '\\"') 

283 if datatype: 

284 return f'"{escaped_value}"^^<{datatype}>' 

285 elif lang: 

286 return f'"{escaped_value}"@{lang}' 

287 else: 

288 return f'"{escaped_value}"' 

289 return None 

290 

291 all_props_in_config = set() 

292 for item in similarity_config: 

293 if isinstance(item, str): 

294 all_props_in_config.add(item) 

295 elif isinstance(item, dict) and "and" in item: 

296 all_props_in_config.update(item["and"]) 

297 

298 if not all_props_in_config: 

299 current_app.logger.warning(f"Empty properties list derived from similarity config for type {entity_type}") 

300 return jsonify({"status": "success", "results": [], "has_more": False}) 

301 

302 prop_uris_formatted_for_filter = [f"<{p}>" for p in all_props_in_config] 

303 property_filter_for_subject = f"FILTER(?p IN ({', '.join(prop_uris_formatted_for_filter)}))" 

304 

305 fetch_comparison_values_query = f""" 

306 SELECT DISTINCT ?p ?o WHERE {{ 

307 <{subject_uri}> ?p ?o . 

308 {property_filter_for_subject} 

309 }} 

310 """ 

311 

312 sparql.setQuery(fetch_comparison_values_query) 

313 sparql.setReturnFormat(JSON) 

314 subject_values_results = sparql.query().convert() 

315 subject_bindings = subject_values_results.get("results", {}).get("bindings", []) 

316 

317 if not subject_bindings: 

318 return jsonify({"status": "success", "results": [], "has_more": False}) 

319 

320 subject_values_by_prop = defaultdict(list) 

321 for binding in subject_bindings: 

322 formatted_value = format_rdf_term(binding["o"]) 

323 if formatted_value: 

324 subject_values_by_prop[binding["p"]["value"]].append(formatted_value) 

325 

326 union_blocks = [] 

327 var_counter = 0 

328 

329 for condition in similarity_config: 

330 if isinstance(condition, str): 

331 prop_uri = condition 

332 prop_values = subject_values_by_prop.get(prop_uri) 

333 if prop_values: 

334 var_counter += 1 

335 values_filter = ", ".join(prop_values) 

336 union_blocks.append(f" {{ ?similar <{prop_uri}> ?o_{var_counter} . FILTER(?o_{var_counter} IN ({values_filter})) }}") 

337 

338 elif isinstance(condition, dict) and "and" in condition: 

339 and_props = condition["and"] 

340 and_patterns = [] 

341 can_match_and_group = True 

342 

343 if not all(p in subject_values_by_prop for p in and_props): 

344 can_match_and_group = False 

345 current_app.logger.debug(f"Skipping AND group {and_props} because subject {subject_uri} lacks values for all its properties.") 

346 continue 

347 

348 for prop_uri in and_props: 

349 prop_values = subject_values_by_prop.get(prop_uri) 

350 var_counter += 1 

351 values_filter = ", ".join(prop_values) 

352 and_patterns.append(f" ?similar <{prop_uri}> ?o_{var_counter} . FILTER(?o_{var_counter} IN ({values_filter})) .") 

353 

354 if can_match_and_group and and_patterns: 

355 # Construct the block with newlines outside the formatted expression 

356 patterns_str = '\n'.join(and_patterns) 

357 union_blocks.append(f" {{\n{patterns_str}\n }}") 

358 

359 if not union_blocks: 

360 return jsonify({"status": "success", "results": [], "has_more": False}) 

361 

362 similarity_query_body = " UNION ".join(union_blocks) 

363 

364 query_limit = limit + 1 

365 final_query = f""" 

366 SELECT DISTINCT ?similar WHERE {{ 

367 ?similar a <{entity_type}> . 

368 FILTER(?similar != <{subject_uri}>) 

369 {{ 

370 {similarity_query_body} 

371 }} 

372 }} ORDER BY ?similar OFFSET {offset} LIMIT {query_limit} 

373 """ 

374 

375 sparql.setQuery(final_query) 

376 sparql.setReturnFormat(JSON) 

377 results = sparql.query().convert() 

378 

379 bindings = results.get("results", {}).get("bindings", []) 

380 candidate_uris = [item["similar"]["value"] for item in bindings] 

381 

382 has_more = len(candidate_uris) > limit 

383 results_to_process = candidate_uris[:limit] 

384 

385 transformed_results = [] 

386 for uri in results_to_process: 

387 readable_label = custom_filter.human_readable_entity(uri, (entity_type, shape_uri)) if entity_type else uri 

388 transformed_results.append({ 

389 "uri": uri, 

390 "label": readable_label or uri 

391 }) 

392 

393 return jsonify({ 

394 "status": "success", 

395 "results": transformed_results, 

396 "has_more": has_more, 

397 }) 

398 

399 except Exception as e: 

400 tb_str = traceback.format_exc() 

401 current_app.logger.error(f"Error finding similar resources for {subject_uri}: {str(e)}\nTraceback: {tb_str}") 

402 return jsonify({"status": "error", "message": gettext("An error occurred while finding similar resources")}), 500