Coverage for heritrace / routes / merge.py: 99%

249 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-07-02 10:16 +0000

1# SPDX-FileCopyrightText: 2025 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5from __future__ import annotations 

6 

7from collections import defaultdict 

8from typing import TYPE_CHECKING, Any 

9 

10import validators 

11from flask import ( 

12 Blueprint, 

13 Response, 

14 current_app, 

15 flash, 

16 jsonify, 

17 redirect, 

18 render_template, 

19 request, 

20 url_for, 

21) 

22from flask_babel import gettext 

23from flask_login import current_user, login_required 

24from markupsafe import Markup 

25from rdflib import URIRef 

26from SPARQLWrapper import JSON 

27 

28from heritrace.apis.orcid import get_responsible_agent_uri 

29from heritrace.editor import Editor, EndpointConfig 

30from heritrace.extensions import ( 

31 get_counter_handler, 

32 get_custom_filter, 

33 get_dataset_endpoint, 

34 get_dataset_is_quadstore, 

35 get_provenance_endpoint, 

36 get_sparql, 

37) 

38from heritrace.sparql import get_sparql_bindings 

39from heritrace.utils.display_rules_utils import ( 

40 get_highest_priority_class, 

41 get_similarity_properties, 

42) 

43from heritrace.utils.primary_source_utils import ( 

44 get_default_primary_source, 

45 save_user_default_primary_source, 

46) 

47from heritrace.utils.shacl_utils import determine_shape_for_classes 

48from heritrace.utils.sparql_utils import get_entity_types 

49 

50if TYPE_CHECKING: 

51 from werkzeug.wrappers import Response as WerkzeugResponse 

52 

53merge_bp = Blueprint("merge", __name__) 

54 

55 

56def get_entity_details( 

57 entity_uri: URIRef, 

58) -> tuple[dict[str, list[dict[str, Any]]] | None, list[str]]: 

59 """ 

60 Fetches all properties (predicates and objects) for a given entity URI, 

61 grouped by predicate, along with its types. 

62 

63 Args: 

64 entity_uri: The URI of the entity to fetch details for. 

65 

66 Returns: 

67 A tuple containing: 

68 - A dictionary where keys are predicate URIs and values are lists of 

69 object dictionaries (containing 'value', 'type', 'lang', 'datatype'). 

70 Returns None if an error occurs. 

71 - A list of entity type URIs. Returns an empty list if an error occurs 

72 or no types are found. 

73 """ 

74 sparql = get_sparql() 

75 custom_filter = get_custom_filter() 

76 grouped_properties: dict[str, list[dict[str, Any]]] = {} 

77 entity_types: list[str] = [] 

78 

79 try: 

80 entity_types = get_entity_types(entity_uri) 

81 if not entity_types: 

82 current_app.logger.warning("No types found for entity: %s", entity_uri) 

83 

84 query = f""" 

85 SELECT DISTINCT ?p ?o WHERE {{ 

86 <{entity_uri}> ?p ?o . 

87 }} 

88 """ 

89 sparql.setQuery(query) 

90 sparql.setReturnFormat(JSON) 

91 results = sparql.query().convert() 

92 

93 bindings = get_sparql_bindings(results) 

94 for binding in bindings: 

95 predicate = binding["p"]["value"] 

96 obj_node = binding["o"] 

97 obj_details = { 

98 "value": obj_node["value"], 

99 "type": obj_node["type"], 

100 "lang": obj_node.get("xml:lang"), 

101 "datatype": obj_node.get("datatype"), 

102 "readable_label": None, 

103 } 

104 if obj_details["type"] == "uri": 

105 obj_types = get_entity_types(URIRef(obj_details["value"])) 

106 obj_type = get_highest_priority_class(obj_types) 

107 if obj_type: 

108 obj_details["readable_label"] = custom_filter.human_readable_entity( 

109 obj_details["value"], (obj_type, None) 

110 ) 

111 else: 

112 obj_details["readable_label"] = obj_details["value"] 

113 else: 

114 obj_details["readable_label"] = obj_details["value"] 

115 

116 if predicate not in grouped_properties: 

117 grouped_properties[predicate] = [] 

118 grouped_properties[predicate].append(obj_details) 

119 

120 except Exception: 

121 current_app.logger.exception( 

122 "Error fetching details for %s", 

123 entity_uri, 

124 ) 

125 return None, [] 

126 else: 

127 return grouped_properties, entity_types 

128 

129 

130@merge_bp.route("/execute-merge", methods=["POST"]) 

131@login_required 

132def execute_merge() -> WerkzeugResponse: 

133 """ 

134 Handles the actual merging of two entities using the Editor class 

135 to ensure provenance and data model agnosticism. 

136 Entity 1 (keep) absorbs Entity 2 (delete). 

137 """ 

138 entity1_uri_str = request.form.get("entity1_uri") 

139 entity2_uri_str = request.form.get("entity2_uri") 

140 primary_source = request.form.get("primary_source") 

141 save_default_source = request.form.get("save_default_source") == "true" 

142 

143 # TODO(arcangelo): Implement CSRF validation 

144 # if using Flask-WTF 

145 

146 if not entity1_uri_str or not entity2_uri_str: 

147 flash(gettext("Missing entity URIs for merge."), "danger") 

148 return redirect(url_for("main.catalogue")) 

149 

150 entity1_uri = URIRef(entity1_uri_str) 

151 entity2_uri = URIRef(entity2_uri_str) 

152 

153 if primary_source and not validators.url(primary_source): # type: ignore[arg-type] 

154 flash(gettext("Invalid primary source URL provided."), "danger") 

155 return redirect( 

156 url_for( 

157 ".compare_and_merge", subject=entity1_uri, other_subject=entity2_uri 

158 ) 

159 ) 

160 

161 if save_default_source and primary_source and validators.url(primary_source): # type: ignore[arg-type] 

162 save_user_default_primary_source(current_user.orcid, primary_source) 

163 

164 try: 

165 custom_filter = get_custom_filter() 

166 

167 _, entity1_types = get_entity_details(entity1_uri) 

168 _, entity2_types = get_entity_details(entity2_uri) 

169 

170 entity1_type = get_highest_priority_class(entity1_types) 

171 entity2_type = get_highest_priority_class(entity2_types) 

172 entity1_shape = determine_shape_for_classes(entity1_types) 

173 entity2_shape = determine_shape_for_classes(entity2_types) 

174 entity1_label = ( 

175 custom_filter.human_readable_entity( 

176 entity1_uri, (entity1_type, entity1_shape) 

177 ) 

178 if entity1_type 

179 else entity1_uri 

180 ) 

181 entity2_label = ( 

182 custom_filter.human_readable_entity( 

183 entity2_uri, (entity2_type, entity2_shape) 

184 ) 

185 if entity2_type 

186 else entity2_uri 

187 ) 

188 

189 counter_handler = get_counter_handler() 

190 resp_agent = get_responsible_agent_uri(current_user.orcid) 

191 

192 dataset_endpoint = get_dataset_endpoint() 

193 provenance_endpoint = get_provenance_endpoint() 

194 dataset_is_quadstore = get_dataset_is_quadstore() 

195 

196 editor = Editor( 

197 EndpointConfig( 

198 dataset=dataset_endpoint, 

199 provenance=provenance_endpoint, 

200 is_quadstore=dataset_is_quadstore, 

201 ), 

202 counter_handler, 

203 resp_agent, 

204 ) 

205 

206 if primary_source and validators.url(primary_source): # type: ignore[arg-type] 

207 editor.set_primary_source(URIRef(primary_source)) 

208 

209 editor.merge(keep_entity_uri=entity1_uri, delete_entity_uri=entity2_uri) 

210 

211 entity1_url = url_for("entity.about", subject=entity1_uri) 

212 entity2_url = url_for("entity.about", subject=entity2_uri) 

213 flash_message_html = gettext( 

214 "Entities merged successfully. " 

215 "<a href='%(entity2_url)s' target='_blank'>%(entity2)s</a> " 

216 "has been deleted and its references now point to " 

217 "<a href='%(entity1_url)s' target='_blank'>%(entity1)s</a>.", 

218 entity1=entity1_label, 

219 entity2=entity2_label, 

220 entity1_url=entity1_url, 

221 entity2_url=entity2_url, 

222 ) 

223 

224 flash(Markup(flash_message_html), "success") # noqa: S704 

225 

226 return redirect(url_for("entity.about", subject=entity1_uri)) 

227 

228 except ValueError as ve: 

229 current_app.logger.warning("Merge attempt failed: %s", ve) 

230 flash(str(ve), "warning") 

231 return redirect( 

232 url_for( 

233 ".compare_and_merge", subject=entity1_uri, other_subject=entity2_uri 

234 ) 

235 ) 

236 

237 except Exception: 

238 current_app.logger.exception( 

239 "Error executing Editor merge for <%s> and <%s>", 

240 entity1_uri, 

241 entity2_uri, 

242 ) 

243 flash( 

244 gettext( 

245 "An error occurred during the merge" 

246 " operation. Please check the logs." 

247 " No changes were made." 

248 ), 

249 "danger", 

250 ) 

251 return redirect( 

252 url_for( 

253 ".compare_and_merge", subject=entity1_uri, other_subject=entity2_uri 

254 ) 

255 ) 

256 

257 

258@merge_bp.route("/compare-and-merge") 

259@login_required 

260def compare_and_merge() -> str | WerkzeugResponse: 

261 """ 

262 Route to display details of two entities side-by-side for merge confirmation. 

263 """ 

264 entity1_uri_str = request.args.get("subject") 

265 entity2_uri_str = request.args.get("other_subject") 

266 custom_filter = get_custom_filter() 

267 

268 if not entity1_uri_str or not entity2_uri_str: 

269 flash( 

270 gettext("Two entities must be selected for merging/comparison."), "warning" 

271 ) 

272 return redirect(url_for("main.catalogue")) 

273 

274 entity1_uri = URIRef(entity1_uri_str) 

275 entity2_uri = URIRef(entity2_uri_str) 

276 

277 entity1_props, entity1_types = get_entity_details(entity1_uri) 

278 entity2_props, entity2_types = get_entity_details(entity2_uri) 

279 

280 if entity1_props is None or entity2_props is None: 

281 flash( 

282 gettext("Could not retrieve details for one or both entities. Check logs."), 

283 "danger", 

284 ) 

285 return redirect(url_for("main.catalogue")) 

286 

287 entity1_type = get_highest_priority_class(entity1_types) 

288 entity2_type = get_highest_priority_class(entity2_types) 

289 entity1_shape = determine_shape_for_classes(entity1_types) 

290 entity2_shape = determine_shape_for_classes(entity2_types) 

291 entity1_label = ( 

292 custom_filter.human_readable_entity(entity1_uri, (entity1_type, entity1_shape)) 

293 if entity1_type 

294 else entity1_uri 

295 ) 

296 entity2_label = ( 

297 custom_filter.human_readable_entity(entity2_uri, (entity2_type, entity2_shape)) 

298 if entity2_type 

299 else entity2_uri 

300 ) 

301 

302 entity1_data = { 

303 "uri": entity1_uri, 

304 "label": entity1_label, 

305 "type_label": custom_filter.human_readable_class((entity1_type, entity1_shape)), 

306 "type": entity1_type, 

307 "shape": entity1_shape, 

308 "properties": entity1_props, 

309 } 

310 entity2_data = { 

311 "uri": entity2_uri, 

312 "label": entity2_label, 

313 "type_label": custom_filter.human_readable_class((entity2_type, entity2_shape)), 

314 "type": entity2_type, 

315 "shape": entity2_shape, 

316 "properties": entity2_props, 

317 } 

318 

319 default_primary_source = get_default_primary_source(current_user.orcid) 

320 

321 return render_template( 

322 "entity/merge_confirm.jinja", 

323 entity1=entity1_data, 

324 entity2=entity2_data, 

325 default_primary_source=default_primary_source, 

326 ) 

327 

328 

329def _format_rdf_term(node: dict[str, str]) -> str | None: 

330 value = node["value"] 

331 value_type = node["type"] 

332 if value_type == "uri": 

333 return f"<{value}>" 

334 if value_type in {"literal", "typed-literal"}: 

335 datatype = node.get("datatype") 

336 lang = node.get("xml:lang") 

337 escaped_value = value.replace("\\", "\\\\").replace('"', '\\"') 

338 if datatype: 

339 return f'"{escaped_value}"^^<{datatype}>' 

340 if lang: 

341 return f'"{escaped_value}"@{lang}' 

342 return f'"{escaped_value}"' 

343 return None 

344 

345 

346def _fetch_subject_values( 

347 subject_uri: str, 

348 similarity_config: list, 

349) -> defaultdict[str, list[str]] | None: 

350 sparql = get_sparql() 

351 

352 all_props_in_config: set[str] = set() 

353 for item in similarity_config: 

354 if isinstance(item, str): 

355 all_props_in_config.add(item) 

356 elif isinstance(item, dict) and "and" in item: 

357 all_props_in_config.update(item["and"]) 

358 

359 if not all_props_in_config: 

360 current_app.logger.warning( 

361 "Empty properties list derived from similarity config for type %s", 

362 subject_uri, 

363 ) 

364 return None 

365 

366 prop_uris_formatted_for_filter = [f"<{p}>" for p in all_props_in_config] 

367 property_filter_for_subject = ( 

368 f"FILTER(?p IN ({', '.join(prop_uris_formatted_for_filter)}))" 

369 ) 

370 

371 fetch_comparison_values_query = f""" 

372 SELECT DISTINCT ?p ?o WHERE {{ 

373 <{subject_uri}> ?p ?o . 

374 {property_filter_for_subject} 

375 }} 

376 """ 

377 

378 sparql.setQuery(fetch_comparison_values_query) 

379 sparql.setReturnFormat(JSON) 

380 subject_values_results = sparql.query().convert() 

381 subject_bindings = get_sparql_bindings(subject_values_results) 

382 

383 if not subject_bindings: 

384 return None 

385 

386 subject_values_by_prop: defaultdict[str, list[str]] = defaultdict(list) 

387 for binding in subject_bindings: 

388 formatted_value = _format_rdf_term(binding["o"]) 

389 if formatted_value: 

390 subject_values_by_prop[binding["p"]["value"]].append(formatted_value) 

391 

392 return subject_values_by_prop 

393 

394 

395def _build_union_blocks( 

396 similarity_config: list, 

397 subject_values_by_prop: defaultdict[str, list[str]], 

398 subject_uri: str, 

399) -> list[str]: 

400 union_blocks: list[str] = [] 

401 var_counter = 0 

402 

403 for condition in similarity_config: 

404 if isinstance(condition, str): 

405 prop_values = subject_values_by_prop.get(condition) 

406 if prop_values: 

407 var_counter += 1 

408 values_filter = ", ".join(prop_values) 

409 union_blocks.append( 

410 f" {{ ?similar <{condition}>" 

411 f" ?o_{var_counter} ." 

412 f" FILTER(?o_{var_counter}" 

413 f" IN ({values_filter})) }}" 

414 ) 

415 elif isinstance(condition, dict) and "and" in condition: 

416 block = _build_and_block( 

417 condition["and"], subject_values_by_prop, subject_uri, var_counter 

418 ) 

419 if block is not None: 

420 text, var_counter = block 

421 union_blocks.append(text) 

422 else: 

423 var_counter += len(condition["and"]) 

424 

425 return union_blocks 

426 

427 

428def _build_and_block( 

429 and_props: list[str], 

430 subject_values_by_prop: defaultdict[str, list[str]], 

431 subject_uri: str, 

432 var_counter: int, 

433) -> tuple[str, int] | None: 

434 if not all(p in subject_values_by_prop for p in and_props): 

435 current_app.logger.debug( 

436 "Skipping AND group %s because" 

437 " subject %s lacks values for" 

438 " all its properties.", 

439 and_props, 

440 subject_uri, 

441 ) 

442 return None 

443 

444 and_patterns = [] 

445 for prop_uri in and_props: 

446 prop_values = subject_values_by_prop[prop_uri] 

447 var_counter += 1 

448 values_filter = ", ".join(prop_values) 

449 and_patterns.append( 

450 f" ?similar <{prop_uri}>" 

451 f" ?o_{var_counter} ." 

452 f" FILTER(?o_{var_counter}" 

453 f" IN ({values_filter})) ." 

454 ) 

455 

456 patterns_str = "\n".join(and_patterns) 

457 return f" {{\n{patterns_str}\n }}", var_counter 

458 

459 

460def _execute_similarity_query( 

461 union_blocks: list[str], 

462 entity_type: str, 

463 subject_uri: str, 

464 limit: int, 

465 offset: int, 

466) -> tuple[list[str], bool]: 

467 sparql = get_sparql() 

468 similarity_query_body = " UNION ".join(union_blocks) 

469 

470 query_limit = limit + 1 

471 final_query = f""" 

472 SELECT DISTINCT ?similar WHERE {{ 

473 ?similar a <{entity_type}> . 

474 FILTER(?similar != <{subject_uri}>) 

475 {{ 

476 {similarity_query_body} 

477 }} 

478 }} ORDER BY ?similar OFFSET {offset} LIMIT {query_limit} 

479 """ 

480 

481 sparql.setQuery(final_query) 

482 sparql.setReturnFormat(JSON) 

483 results = sparql.query().convert() 

484 

485 bindings = get_sparql_bindings(results) 

486 candidate_uris = [item["similar"]["value"] for item in bindings] 

487 

488 has_more = len(candidate_uris) > limit 

489 return candidate_uris[:limit], has_more 

490 

491 

492def _transform_results( 

493 uris: list[str], 

494 entity_type: str, 

495 shape_uri: str | None, 

496) -> list[dict[str, str]]: 

497 custom_filter = get_custom_filter() 

498 transformed: list[dict[str, str]] = [] 

499 for uri in uris: 

500 readable_label = ( 

501 custom_filter.human_readable_entity(uri, (entity_type, shape_uri)) 

502 if entity_type 

503 else uri 

504 ) 

505 transformed.append({"uri": uri, "label": readable_label or uri}) 

506 return transformed 

507 

508 

509@merge_bp.route("/find_similar", methods=["GET"]) 

510@login_required 

511def find_similar_resources() -> Response | tuple[Response, int]: # noqa: PLR0911 

512 subject_uri = request.args.get("subject_uri") 

513 entity_type = request.args.get("entity_type") 

514 shape_uri = request.args.get("shape_uri") 

515 try: 

516 limit = int(request.args.get("limit", 5)) 

517 offset = int(request.args.get("offset", 0)) 

518 except ValueError: 

519 return jsonify( 

520 {"status": "error", "message": gettext("Invalid limit or offset parameter")} 

521 ), 400 

522 

523 if not subject_uri or not entity_type: 

524 return jsonify( 

525 { 

526 "status": "error", 

527 "message": gettext( 

528 "Missing required parameters (subject_uri, entity_type)" 

529 ), 

530 } 

531 ), 400 

532 

533 if limit <= 0 or offset < 0: 

534 return jsonify( 

535 { 

536 "status": "error", 

537 "message": gettext("Limit must be positive and offset non-negative"), 

538 } 

539 ), 400 

540 

541 try: 

542 entity_key = (entity_type, shape_uri) 

543 similarity_config = get_similarity_properties(entity_key) 

544 

545 if not similarity_config or not isinstance(similarity_config, list): 

546 return jsonify({"status": "success", "results": [], "has_more": False}) 

547 

548 subject_values_by_prop = _fetch_subject_values(subject_uri, similarity_config) 

549 if subject_values_by_prop is None: 

550 return jsonify({"status": "success", "results": [], "has_more": False}) 

551 

552 union_blocks = _build_union_blocks( 

553 similarity_config, subject_values_by_prop, subject_uri 

554 ) 

555 if not union_blocks: 

556 return jsonify({"status": "success", "results": [], "has_more": False}) 

557 

558 result_uris, has_more = _execute_similarity_query( 

559 union_blocks, entity_type, subject_uri, limit, offset 

560 ) 

561 transformed_results = _transform_results(result_uris, entity_type, shape_uri) 

562 

563 return jsonify( 

564 { 

565 "status": "success", 

566 "results": transformed_results, 

567 "has_more": has_more, 

568 } 

569 ) 

570 

571 except Exception: 

572 current_app.logger.exception( 

573 "Error finding similar resources for %s", subject_uri 

574 ) 

575 return jsonify( 

576 { 

577 "status": "error", 

578 "message": gettext("An error occurred while finding similar resources"), 

579 } 

580 ), 500