Coverage for heritrace/routes/merge.py: 100%

228 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-06-24 11:39 +0000

1import traceback 

2from collections import defaultdict 

3from typing import Any, Dict, List, Optional, Tuple 

4 

5import validators 

6from flask import (Blueprint, current_app, flash, jsonify, redirect, 

7 render_template, request, url_for) 

8from flask_babel import gettext 

9from flask_login import current_user, login_required 

10from heritrace.editor import Editor 

11from heritrace.extensions import (get_counter_handler, get_custom_filter, 

12 get_dataset_endpoint, 

13 get_dataset_is_quadstore, 

14 get_provenance_endpoint, get_sparql) 

15from heritrace.utils.display_rules_utils import (get_highest_priority_class, 

16 get_similarity_properties) 

17from heritrace.utils.primary_source_utils import ( 

18 get_default_primary_source, save_user_default_primary_source) 

19from heritrace.utils.shacl_utils import determine_shape_for_classes 

20from heritrace.utils.sparql_utils import get_entity_types 

21from markupsafe import Markup 

22from rdflib import URIRef 

23from SPARQLWrapper import JSON 

24 

25merge_bp = Blueprint("merge", __name__) 

26 

27 

28def get_entity_details(entity_uri: str) -> Tuple[Optional[Dict[str, Any]], List[str]]: 

29 """ 

30 Fetches all properties (predicates and objects) for a given entity URI, 

31 grouped by predicate, along with its types. 

32 

33 Args: 

34 entity_uri: The URI of the entity to fetch details for. 

35 

36 Returns: 

37 A tuple containing: 

38 - A dictionary where keys are predicate URIs and values are lists of 

39 object dictionaries (containing 'value', 'type', 'lang', 'datatype'). 

40 Returns None if an error occurs. 

41 - A list of entity type URIs. Returns an empty list if an error occurs 

42 or no types are found. 

43 """ 

44 sparql = get_sparql() 

45 custom_filter = get_custom_filter() 

46 grouped_properties: Dict[str, List[Dict[str, Any]]] = {} 

47 entity_types: List[str] = [] 

48 

49 try: 

50 entity_types = get_entity_types(entity_uri) 

51 if not entity_types: 

52 current_app.logger.warning(f"No types found for entity: {entity_uri}") 

53 

54 query = f""" 

55 SELECT DISTINCT ?p ?o WHERE {{ 

56 <{entity_uri}> ?p ?o . 

57 }} 

58 """ 

59 sparql.setQuery(query) 

60 sparql.setReturnFormat(JSON) 

61 results = sparql.query().convert() 

62 

63 bindings = results.get("results", {}).get("bindings", []) 

64 for binding in bindings: 

65 predicate = binding["p"]["value"] 

66 obj_node = binding["o"] 

67 obj_details = { 

68 "value": obj_node["value"], 

69 "type": obj_node["type"], 

70 "lang": obj_node.get("xml:lang"), 

71 "datatype": obj_node.get("datatype"), 

72 "readable_label": None 

73 } 

74 if obj_details["type"] == 'uri': 

75 obj_types = get_entity_types(obj_details["value"]) 

76 obj_type = get_highest_priority_class(obj_types) 

77 obj_details["readable_label"] = custom_filter.human_readable_entity(obj_details["value"], (obj_type, None)) 

78 else: 

79 obj_details["readable_label"] = obj_details["value"] 

80 

81 

82 if predicate not in grouped_properties: 

83 grouped_properties[predicate] = [] 

84 grouped_properties[predicate].append(obj_details) 

85 

86 return grouped_properties, entity_types 

87 

88 except Exception as e: 

89 tb_str = traceback.format_exc() 

90 current_app.logger.error(f"Error fetching details for {entity_uri}: {e}\n{tb_str}") 

91 return None, [] 

92 

93 

94@merge_bp.route("/execute-merge", methods=["POST"]) 

95@login_required 

96def execute_merge(): 

97 """ 

98 Handles the actual merging of two entities using the Editor class 

99 to ensure provenance and data model agnosticism. 

100 Entity 1 (keep) absorbs Entity 2 (delete). 

101 """ 

102 entity1_uri = request.form.get("entity1_uri") 

103 entity2_uri = request.form.get("entity2_uri") 

104 primary_source = request.form.get("primary_source") 

105 save_default_source = request.form.get("save_default_source") == "true" 

106 

107 # TODO: Implement CSRF validation if using Flask-WTF 

108 

109 if not entity1_uri or not entity2_uri: 

110 flash(gettext("Missing entity URIs for merge."), "danger") 

111 return redirect(url_for("main.catalogue")) 

112 

113 if primary_source and not validators.url(primary_source): 

114 flash(gettext("Invalid primary source URL provided."), "danger") 

115 return redirect(url_for('.compare_and_merge', subject=entity1_uri, other_subject=entity2_uri)) 

116 

117 if save_default_source and primary_source and validators.url(primary_source): 

118 save_user_default_primary_source(current_user.orcid, primary_source) 

119 

120 try: 

121 custom_filter = get_custom_filter() 

122 

123 _, entity1_types = get_entity_details(entity1_uri) 

124 _, entity2_types = get_entity_details(entity2_uri) 

125 

126 entity1_type = get_highest_priority_class(entity1_types) 

127 entity2_type = get_highest_priority_class(entity2_types) 

128 entity1_shape = determine_shape_for_classes(entity1_types) 

129 entity2_shape = determine_shape_for_classes(entity2_types) 

130 entity1_label = custom_filter.human_readable_entity(entity1_uri, (entity1_type, entity1_shape)) or entity1_uri 

131 entity2_label = custom_filter.human_readable_entity(entity2_uri, (entity2_type, entity2_shape)) or entity2_uri 

132 

133 counter_handler = get_counter_handler() 

134 resp_agent_uri = URIRef(f"https://orcid.org/{current_user.orcid}") if current_user.is_authenticated and hasattr(current_user, 'orcid') else None 

135 

136 dataset_endpoint = get_dataset_endpoint() 

137 provenance_endpoint = get_provenance_endpoint() 

138 dataset_is_quadstore = get_dataset_is_quadstore() 

139 

140 editor = Editor( 

141 dataset_endpoint=dataset_endpoint, 

142 provenance_endpoint=provenance_endpoint, 

143 counter_handler=counter_handler, 

144 resp_agent=resp_agent_uri, 

145 dataset_is_quadstore=dataset_is_quadstore 

146 ) 

147 

148 if primary_source and validators.url(primary_source): 

149 editor.set_primary_source(primary_source) 

150 

151 editor.merge(keep_entity_uri=entity1_uri, delete_entity_uri=entity2_uri) 

152 

153 entity1_url = url_for('entity.about', subject=entity1_uri) 

154 entity2_url = url_for('entity.about', subject=entity2_uri) 

155 flash_message_html = gettext( 

156 "Entities merged successfully. " 

157 "<a href='%(entity2_url)s' target='_blank'>%(entity2)s</a> " 

158 "has been deleted and its references now point to " 

159 "<a href='%(entity1_url)s' target='_blank'>%(entity1)s</a>.", 

160 entity1=entity1_label, 

161 entity2=entity2_label, 

162 entity1_url=entity1_url, 

163 entity2_url=entity2_url 

164 ) 

165 

166 flash(Markup(flash_message_html), "success") 

167 

168 return redirect(url_for("entity.about", subject=entity1_uri)) 

169 

170 except ValueError as ve: 

171 current_app.logger.warning(f"Merge attempt failed: {ve}") 

172 flash(str(ve), "warning") 

173 return redirect(url_for('.compare_and_merge', subject=entity1_uri, other_subject=entity2_uri)) 

174 

175 except Exception as e: 

176 tb_str = traceback.format_exc() 

177 current_app.logger.error(f"Error executing Editor merge for <{entity1_uri}> and <{entity2_uri}>: {e}\n{tb_str}") 

178 flash(gettext("An error occurred during the merge operation. Please check the logs. No changes were made."), "danger") 

179 return redirect(url_for('.compare_and_merge', subject=entity1_uri, other_subject=entity2_uri)) 

180 

181 

182@merge_bp.route("/compare-and-merge") 

183@login_required 

184def compare_and_merge(): 

185 """ 

186 Route to display details of two entities side-by-side for merge confirmation. 

187 """ 

188 entity1_uri = request.args.get("subject") 

189 entity2_uri = request.args.get("other_subject") 

190 custom_filter = get_custom_filter() 

191 

192 

193 if not entity1_uri or not entity2_uri: 

194 flash(gettext("Two entities must be selected for merging/comparison."), "warning") 

195 return redirect(url_for("main.catalogue")) 

196 

197 entity1_props, entity1_types = get_entity_details(entity1_uri) 

198 entity2_props, entity2_types = get_entity_details(entity2_uri) 

199 

200 if entity1_props is None or entity2_props is None: 

201 flash(gettext("Could not retrieve details for one or both entities. Check logs."), "danger") 

202 return redirect(url_for("main.catalogue")) 

203 

204 entity1_type = get_highest_priority_class(entity1_types) 

205 entity2_type = get_highest_priority_class(entity2_types) 

206 entity1_shape = determine_shape_for_classes(entity1_types) 

207 entity2_shape = determine_shape_for_classes(entity2_types) 

208 entity1_label = custom_filter.human_readable_entity(entity1_uri, (entity1_type, entity1_shape)) or entity1_uri 

209 entity2_label = custom_filter.human_readable_entity(entity2_uri, (entity2_type, entity2_shape)) or entity2_uri 

210 

211 

212 entity1_data = { 

213 "uri": entity1_uri, 

214 "label": entity1_label, 

215 "type_label": custom_filter.human_readable_class((entity1_type, entity1_shape)), 

216 "type": entity1_type, 

217 "shape": entity1_shape, 

218 "properties": entity1_props 

219 } 

220 entity2_data = { 

221 "uri": entity2_uri, 

222 "label": entity2_label, 

223 "type_label": custom_filter.human_readable_class((entity2_type, entity2_shape)), 

224 "type": entity2_type, 

225 "shape": entity2_shape, 

226 "properties": entity2_props 

227 } 

228 

229 default_primary_source = get_default_primary_source(current_user.orcid) 

230 

231 return render_template( 

232 "entity/merge_confirm.jinja", 

233 entity1=entity1_data, 

234 entity2=entity2_data, 

235 default_primary_source=default_primary_source 

236 ) 

237 

238 

239@merge_bp.route("/find_similar", methods=["GET"]) 

240@login_required 

241def find_similar_resources(): 

242 """Find resources potentially similar to a given subject based on shared properties, 

243 respecting AND/OR logic defined in display rules.""" 

244 subject_uri = request.args.get("subject_uri") 

245 entity_type = request.args.get("entity_type") # Primary entity type 

246 shape_uri = request.args.get("shape_uri") 

247 try: 

248 limit = int(request.args.get("limit", 5)) 

249 offset = int(request.args.get("offset", 0)) 

250 except ValueError: 

251 return jsonify({"status": "error", "message": gettext("Invalid limit or offset parameter")}), 400 

252 

253 if not subject_uri or not entity_type: 

254 return jsonify({"status": "error", "message": gettext("Missing required parameters (subject_uri, entity_type)")}), 400 

255 

256 if limit <= 0 or offset < 0: 

257 return jsonify({"status": "error", "message": gettext("Limit must be positive and offset non-negative")}), 400 

258 

259 try: 

260 sparql = get_sparql() 

261 custom_filter = get_custom_filter() 

262 

263 entity_key = (entity_type, shape_uri) 

264 similarity_config = get_similarity_properties(entity_key) 

265 

266 if not similarity_config or not isinstance(similarity_config, list): 

267 current_app.logger.warning(f"No valid similarity properties found or configured for type {entity_type}") 

268 return jsonify({"status": "success", "results": [], "has_more": False}) 

269 

270 def format_rdf_term(node): 

271 value = node["value"] 

272 value_type = node["type"] 

273 if value_type == 'uri': 

274 return f"<{value}>" 

275 elif value_type in {'literal', 'typed-literal'}: 

276 datatype = node.get("datatype") 

277 lang = node.get("xml:lang") 

278 escaped_value = value.replace('\\', '\\\\').replace('"', '\\"') 

279 if datatype: 

280 return f'"{escaped_value}"^^<{datatype}>' 

281 elif lang: 

282 return f'"{escaped_value}"@{lang}' 

283 else: 

284 return f'"{escaped_value}"' 

285 return None 

286 

287 all_props_in_config = set() 

288 for item in similarity_config: 

289 if isinstance(item, str): 

290 all_props_in_config.add(item) 

291 elif isinstance(item, dict) and "and" in item: 

292 all_props_in_config.update(item["and"]) 

293 

294 if not all_props_in_config: 

295 current_app.logger.warning(f"Empty properties list derived from similarity config for type {entity_type}") 

296 return jsonify({"status": "success", "results": [], "has_more": False}) 

297 

298 prop_uris_formatted_for_filter = [f"<{p}>" for p in all_props_in_config] 

299 property_filter_for_subject = f"FILTER(?p IN ({', '.join(prop_uris_formatted_for_filter)}))" 

300 

301 fetch_comparison_values_query = f""" 

302 SELECT DISTINCT ?p ?o WHERE {{ 

303 <{subject_uri}> ?p ?o . 

304 {property_filter_for_subject} 

305 }} 

306 """ 

307 

308 sparql.setQuery(fetch_comparison_values_query) 

309 sparql.setReturnFormat(JSON) 

310 subject_values_results = sparql.query().convert() 

311 subject_bindings = subject_values_results.get("results", {}).get("bindings", []) 

312 

313 if not subject_bindings: 

314 return jsonify({"status": "success", "results": [], "has_more": False}) 

315 

316 subject_values_by_prop = defaultdict(list) 

317 for binding in subject_bindings: 

318 formatted_value = format_rdf_term(binding["o"]) 

319 if formatted_value: 

320 subject_values_by_prop[binding["p"]["value"]].append(formatted_value) 

321 

322 union_blocks = [] 

323 var_counter = 0 

324 

325 for condition in similarity_config: 

326 if isinstance(condition, str): 

327 prop_uri = condition 

328 prop_values = subject_values_by_prop.get(prop_uri) 

329 if prop_values: 

330 var_counter += 1 

331 values_filter = ", ".join(prop_values) 

332 union_blocks.append(f" {{ ?similar <{prop_uri}> ?o_{var_counter} . FILTER(?o_{var_counter} IN ({values_filter})) }}") 

333 

334 elif isinstance(condition, dict) and "and" in condition: 

335 and_props = condition["and"] 

336 and_patterns = [] 

337 can_match_and_group = True 

338 

339 if not all(p in subject_values_by_prop for p in and_props): 

340 can_match_and_group = False 

341 current_app.logger.debug(f"Skipping AND group {and_props} because subject {subject_uri} lacks values for all its properties.") 

342 continue 

343 

344 for prop_uri in and_props: 

345 prop_values = subject_values_by_prop.get(prop_uri) 

346 var_counter += 1 

347 values_filter = ", ".join(prop_values) 

348 and_patterns.append(f" ?similar <{prop_uri}> ?o_{var_counter} . FILTER(?o_{var_counter} IN ({values_filter})) .") 

349 

350 if can_match_and_group and and_patterns: 

351 # Construct the block with newlines outside the formatted expression 

352 patterns_str = '\n'.join(and_patterns) 

353 union_blocks.append(f" {{\n{patterns_str}\n }}") 

354 

355 if not union_blocks: 

356 return jsonify({"status": "success", "results": [], "has_more": False}) 

357 

358 similarity_query_body = " UNION ".join(union_blocks) 

359 

360 query_limit = limit + 1 

361 final_query = f""" 

362 SELECT DISTINCT ?similar WHERE {{ 

363 ?similar a <{entity_type}> . 

364 FILTER(?similar != <{subject_uri}>) 

365 {{ 

366 {similarity_query_body} 

367 }} 

368 }} ORDER BY ?similar OFFSET {offset} LIMIT {query_limit} 

369 """ 

370 

371 sparql.setQuery(final_query) 

372 sparql.setReturnFormat(JSON) 

373 results = sparql.query().convert() 

374 

375 bindings = results.get("results", {}).get("bindings", []) 

376 candidate_uris = [item["similar"]["value"] for item in bindings] 

377 

378 has_more = len(candidate_uris) > limit 

379 results_to_process = candidate_uris[:limit] 

380 

381 transformed_results = [] 

382 for uri in results_to_process: 

383 readable_label = custom_filter.human_readable_entity(uri, (entity_type, shape_uri)) if entity_type else uri 

384 transformed_results.append({ 

385 "uri": uri, 

386 "label": readable_label or uri 

387 }) 

388 

389 return jsonify({ 

390 "status": "success", 

391 "results": transformed_results, 

392 "has_more": has_more, 

393 }) 

394 

395 except Exception as e: 

396 tb_str = traceback.format_exc() 

397 current_app.logger.error(f"Error finding similar resources for {subject_uri}: {str(e)}\nTraceback: {tb_str}") 

398 return jsonify({"status": "error", "message": gettext("An error occurred while finding similar resources")}), 500