Coverage for heritrace/routes/merge.py: 100%

161 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-18 11:10 +0000

1import traceback 

2from typing import Dict, List, Optional, Any, Tuple 

3 

4from flask import (Blueprint, current_app, flash, jsonify, redirect, 

5 render_template, request, url_for) 

6from flask_babel import gettext 

7from flask_login import login_required, current_user 

8from heritrace.extensions import ( 

9 get_custom_filter, get_sparql, get_counter_handler, 

10 get_dataset_endpoint, get_provenance_endpoint, get_dataset_is_quadstore 

11) 

12from heritrace.utils.display_rules_utils import get_similarity_properties 

13from heritrace.utils.sparql_utils import get_entity_types 

14from SPARQLWrapper import JSON, POST 

15 

16from heritrace.editor import Editor 

17from rdflib import URIRef 

18from markupsafe import Markup 

19 

20merge_bp = Blueprint("merge", __name__) 

21 

22 

23def get_entity_details(entity_uri: str) -> Tuple[Optional[Dict[str, Any]], List[str]]: 

24 """ 

25 Fetches all properties (predicates and objects) for a given entity URI, 

26 grouped by predicate, along with its types. 

27 

28 Args: 

29 entity_uri: The URI of the entity to fetch details for. 

30 

31 Returns: 

32 A tuple containing: 

33 - A dictionary where keys are predicate URIs and values are lists of 

34 object dictionaries (containing 'value', 'type', 'lang', 'datatype'). 

35 Returns None if an error occurs. 

36 - A list of entity type URIs. Returns an empty list if an error occurs 

37 or no types are found. 

38 """ 

39 sparql = get_sparql() 

40 custom_filter = get_custom_filter() 

41 grouped_properties: Dict[str, List[Dict[str, Any]]] = {} 

42 entity_types: List[str] = [] 

43 

44 try: 

45 entity_types = get_entity_types(entity_uri) 

46 if not entity_types: 

47 current_app.logger.warning(f"No types found for entity: {entity_uri}") 

48 

49 query = f""" 

50 SELECT DISTINCT ?p ?o WHERE {{ 

51 <{entity_uri}> ?p ?o . 

52 }} 

53 """ 

54 sparql.setQuery(query) 

55 sparql.setReturnFormat(JSON) 

56 results = sparql.query().convert() 

57 

58 bindings = results.get("results", {}).get("bindings", []) 

59 for binding in bindings: 

60 predicate = binding["p"]["value"] 

61 obj_node = binding["o"] 

62 obj_details = { 

63 "value": obj_node["value"], 

64 "type": obj_node["type"], 

65 "lang": obj_node.get("xml:lang"), 

66 "datatype": obj_node.get("datatype"), 

67 "readable_label": None 

68 } 

69 if obj_details["type"] == 'uri': 

70 obj_types = get_entity_types(obj_details["value"]) 

71 obj_details["readable_label"] = custom_filter.human_readable_entity(obj_details["value"], obj_types) 

72 else: 

73 obj_details["readable_label"] = obj_details["value"] 

74 

75 

76 if predicate not in grouped_properties: 

77 grouped_properties[predicate] = [] 

78 grouped_properties[predicate].append(obj_details) 

79 

80 return grouped_properties, entity_types 

81 

82 except Exception as e: 

83 tb_str = traceback.format_exc() 

84 current_app.logger.error(f"Error fetching details for {entity_uri}: {e}\n{tb_str}") 

85 return None, [] 

86 

87 

88@merge_bp.route("/execute-merge", methods=["POST"]) 

89@login_required 

90def execute_merge(): 

91 """ 

92 Handles the actual merging of two entities using the Editor class 

93 to ensure provenance and data model agnosticism. 

94 Entity 1 (keep) absorbs Entity 2 (delete). 

95 """ 

96 entity1_uri = request.form.get("entity1_uri") 

97 entity2_uri = request.form.get("entity2_uri") 

98 

99 # TODO: Implement CSRF validation if using Flask-WTF 

100 

101 if not entity1_uri or not entity2_uri: 

102 flash(gettext("Missing entity URIs for merge."), "danger") 

103 return redirect(url_for("main.catalogue")) 

104 

105 try: 

106 custom_filter = get_custom_filter() 

107 

108 _, entity1_types = get_entity_details(entity1_uri) 

109 _, entity2_types = get_entity_details(entity2_uri) 

110 entity1_label = custom_filter.human_readable_entity(entity1_uri, entity1_types) or entity1_uri 

111 entity2_label = custom_filter.human_readable_entity(entity2_uri, entity2_types) or entity2_uri 

112 

113 counter_handler = get_counter_handler() 

114 resp_agent_uri = URIRef(f"https://orcid.org/{current_user.orcid}") if current_user.is_authenticated and hasattr(current_user, 'orcid') else None 

115 

116 dataset_endpoint = get_dataset_endpoint() 

117 provenance_endpoint = get_provenance_endpoint() 

118 dataset_is_quadstore = get_dataset_is_quadstore() 

119 

120 editor = Editor( 

121 dataset_endpoint=dataset_endpoint, 

122 provenance_endpoint=provenance_endpoint, 

123 counter_handler=counter_handler, 

124 resp_agent=resp_agent_uri, 

125 dataset_is_quadstore=dataset_is_quadstore 

126 ) 

127 

128 current_app.logger.info(f"Executing merge via Editor: Keep <{entity1_uri}>, Delete <{entity2_uri}>") 

129 

130 editor.merge(keep_entity_uri=entity1_uri, delete_entity_uri=entity2_uri) 

131 

132 current_app.logger.info(f"Successfully merged <{entity2_uri}> into <{entity1_uri}> via Editor.") 

133 entity1_url = url_for('entity.about', subject=entity1_uri) 

134 entity2_url = url_for('entity.about', subject=entity2_uri) 

135 flash_message_html = gettext( 

136 "Entities merged successfully. " 

137 "<a href='%(entity2_url)s' target='_blank'>%(entity2)s</a> " 

138 "has been deleted and its references now point to " 

139 "<a href='%(entity1_url)s' target='_blank'>%(entity1)s</a>.", 

140 entity1=entity1_label, 

141 entity2=entity2_label, 

142 entity1_url=entity1_url, 

143 entity2_url=entity2_url 

144 ) 

145 

146 # Use Markup to render HTML safely 

147 flash(Markup(flash_message_html), "success") 

148 

149 return redirect(url_for("entity.about", subject=entity1_uri)) 

150 

151 except ValueError as ve: 

152 # Specific handling for merge with self 

153 current_app.logger.warning(f"Merge attempt failed: {ve}") 

154 flash(str(ve), "warning") 

155 return redirect(url_for('.compare_and_merge', subject=entity1_uri, other_subject=entity2_uri)) 

156 

157 except Exception as e: 

158 tb_str = traceback.format_exc() 

159 current_app.logger.error(f"Error executing Editor merge for <{entity1_uri}> and <{entity2_uri}>: {e}\n{tb_str}") 

160 flash(gettext("An error occurred during the merge operation. Please check the logs. No changes were made."), "danger") 

161 return redirect(url_for('.compare_and_merge', subject=entity1_uri, other_subject=entity2_uri)) 

162 

163 

164@merge_bp.route("/compare-and-merge") 

165@login_required 

166def compare_and_merge(): 

167 """ 

168 Route to display details of two entities side-by-side for merge confirmation. 

169 """ 

170 entity1_uri = request.args.get("subject") 

171 entity2_uri = request.args.get("other_subject") 

172 custom_filter = get_custom_filter() 

173 

174 

175 if not entity1_uri or not entity2_uri: 

176 flash(gettext("Two entities must be selected for merging/comparison."), "warning") 

177 return redirect(url_for("main.catalogue")) 

178 

179 entity1_props, entity1_types = get_entity_details(entity1_uri) 

180 entity2_props, entity2_types = get_entity_details(entity2_uri) 

181 

182 if entity1_props is None or entity2_props is None: 

183 flash(gettext("Could not retrieve details for one or both entities. Check logs."), "danger") 

184 return redirect(url_for("main.catalogue")) 

185 

186 entity1_label = custom_filter.human_readable_entity(entity1_uri, entity1_types) or entity1_uri 

187 entity2_label = custom_filter.human_readable_entity(entity2_uri, entity2_types) or entity2_uri 

188 

189 

190 entity1_data = { 

191 "uri": entity1_uri, 

192 "label": entity1_label, 

193 "types": entity1_types, 

194 "properties": entity1_props 

195 } 

196 entity2_data = { 

197 "uri": entity2_uri, 

198 "label": entity2_label, 

199 "types": entity2_types, 

200 "properties": entity2_props 

201 } 

202 

203 return render_template( 

204 "entity/merge_confirm.jinja", 

205 entity1=entity1_data, 

206 entity2=entity2_data 

207 ) 

208 

209 

210@merge_bp.route("/find_similar", methods=["GET"]) 

211@login_required 

212def find_similar_resources(): 

213 """Find resources potentially similar to a given subject based on shared literal properties.""" 

214 subject_uri = request.args.get("subject_uri") 

215 entity_type = request.args.get("entity_type") # Primary entity type 

216 limit = int(request.args.get("limit", 5)) 

217 

218 if not subject_uri or not entity_type: 

219 return jsonify({"status": "error", "message": gettext("Missing required parameters (subject_uri, entity_type)")}), 400 

220 

221 try: 

222 sparql = get_sparql() 

223 custom_filter = get_custom_filter() 

224 

225 similarity_properties = get_similarity_properties(entity_type) 

226 

227 property_filter = "" 

228 if similarity_properties: 

229 prop_uris = [f"<{p}>" for p in similarity_properties] 

230 property_filter = f"FILTER(?p IN ({', '.join(prop_uris)}))" 

231 

232 fetch_comparison_values_query = f""" 

233 SELECT DISTINCT ?p ?o WHERE {{ 

234 <{subject_uri}> ?p ?o . 

235 {property_filter} 

236 }} 

237 """ 

238 sparql.setQuery(fetch_comparison_values_query) 

239 sparql.setReturnFormat(JSON) 

240 subject_values_results = sparql.query().convert() 

241 subject_values = subject_values_results.get("results", {}).get("bindings", []) 

242 

243 if not subject_values: 

244 return jsonify({"status": "success", "results": []}) 

245 

246 similarity_conditions = [] 

247 for binding in subject_values: 

248 prop = binding["p"]["value"] 

249 val_node = binding["o"] 

250 value = val_node["value"] 

251 value_type = val_node["type"] 

252 

253 formatted_value = "" 

254 if value_type == 'uri': 

255 formatted_value = f"<{value}>" 

256 elif value_type in {'literal', 'typed-literal'}: 

257 datatype = val_node.get("datatype") 

258 lang = val_node.get("xml:lang") 

259 escaped_value_literal = value.replace('\\', '\\\\').replace('"', '\\"') 

260 if datatype: 

261 formatted_value = f'"{escaped_value_literal}"^^<{datatype}>' 

262 elif lang: 

263 formatted_value = f'"{escaped_value_literal}"@{lang}' 

264 else: 

265 formatted_value = f'"{escaped_value_literal}"' 

266 else: 

267 continue 

268 

269 similarity_conditions.append(f"{{ ?similar <{prop}> {formatted_value} . }}") 

270 

271 if not similarity_conditions: 

272 return jsonify({"status": "success", "results": []}) 

273 

274 query_parts = [ 

275 "SELECT DISTINCT ?similar WHERE {", 

276 f" ?similar a <{entity_type}> .", 

277 f" FILTER(?similar != <{subject_uri}>)", 

278 " {", 

279 " " + " \n UNION\n ".join(similarity_conditions), 

280 " }", 

281 f"}} LIMIT {limit}" 

282 ] 

283 final_query = "\n".join(query_parts) 

284 

285 sparql.setQuery(final_query) 

286 sparql.setReturnFormat(JSON) 

287 results = sparql.query().convert() 

288 

289 candidate_uris = [item["similar"]["value"] for item in results.get("results", {}).get("bindings", [])] 

290 transformed_results = [] 

291 for uri in candidate_uris: 

292 readable_label = custom_filter.human_readable_entity(uri, [entity_type]) 

293 sim_types = get_entity_types(uri) 

294 type_labels = [custom_filter.human_readable_predicate(type_uri, sim_types) for type_uri in sim_types] 

295 transformed_results.append({ 

296 "uri": uri, 

297 "label": readable_label, 

298 "types": sim_types, 

299 "type_labels": type_labels 

300 }) 

301 

302 return jsonify({"status": "success", "results": transformed_results}) 

303 

304 except Exception as e: 

305 tb_str = traceback.format_exc() 

306 current_app.logger.error(f"Error finding similar resources: {str(e)}\nTraceback: {tb_str}") 

307 return jsonify({"status": "error", "message": gettext("An error occurred while finding similar resources")}), 500