Coverage for heritrace / routes / entity / _creation.py: 94%

224 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-07-02 10:16 +0000

1# SPDX-FileCopyrightText: 2024-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5import json 

6from dataclasses import dataclass 

7 

8from flask import current_app, flash, jsonify, render_template, request, url_for 

9from flask_babel import gettext 

10from flask_login import current_user, login_required 

11from rdflib import RDF, XSD, Literal, URIRef 

12from werkzeug.wrappers import Response 

13 

14from heritrace.apis.orcid import get_responsible_agent_uri 

15from heritrace.editor import Editor, EditorError, EndpointConfig 

16from heritrace.extensions import ( 

17 get_dataset_endpoint, 

18 get_form_fields, 

19 get_provenance_endpoint, 

20) 

21from heritrace.routes.entity._blueprint import entity_bp 

22from heritrace.routes.entity._validation import validate_entity_data 

23from heritrace.utils.datatypes import DATATYPE_MAPPING, get_datatype_options 

24from heritrace.utils.display_rules_utils import ( 

25 get_class_priority, 

26 is_entity_type_visible, 

27) 

28from heritrace.utils.primary_source_utils import ( 

29 get_default_primary_source, 

30 save_user_default_primary_source, 

31) 

32from heritrace.utils.shacl_utils import find_matching_form_field 

33from heritrace.utils.sparql_utils import import_referenced_entities 

34from heritrace.utils.uri_utils import generate_unique_uri, is_valid_url 

35from heritrace.utils.virtual_properties import ( 

36 remove_virtual_properties_from_creation_data, 

37 transform_entity_creation_with_virtual_properties, 

38) 

39 

40 

41def _prepare_entity_creation_data( 

42 structured_data: dict, 

43) -> tuple[dict, str, dict, URIRef]: 

44 cleaned_structured_data = remove_virtual_properties_from_creation_data( 

45 structured_data 

46 ) 

47 entity_type: str = cleaned_structured_data["entity_type"] 

48 properties = cleaned_structured_data.get("properties", {}) 

49 entity_uri = generate_unique_uri(entity_type) 

50 

51 return cleaned_structured_data, entity_type, properties, entity_uri 

52 

53 

54def _setup_editor_for_creation(editor: Editor, cleaned_structured_data: dict) -> None: 

55 import_referenced_entities(editor, cleaned_structured_data) 

56 editor.preexisting_finished() 

57 

58 

59def _process_virtual_properties_after_creation( 

60 editor: Editor, 

61 structured_data: dict, 

62 entity_uri: URIRef, 

63 default_graph_uri: URIRef | None, 

64) -> None: 

65 virtual_entities = transform_entity_creation_with_virtual_properties( 

66 structured_data, str(entity_uri) 

67 ) 

68 

69 if virtual_entities: 

70 for virtual_entity in virtual_entities: 

71 virtual_entity_uri = generate_unique_uri(virtual_entity["entity_type"]) 

72 create_nested_entity( 

73 editor, virtual_entity_uri, virtual_entity, default_graph_uri 

74 ) 

75 

76 editor.save() 

77 

78 

79def _create_entity_with_form_fields( 

80 editor: Editor, 

81 structured_data: dict, 

82 entity_uri: URIRef, 

83 default_graph_uri: URIRef | None, 

84 form_fields: dict, 

85) -> None: 

86 cleaned_structured_data = remove_virtual_properties_from_creation_data( 

87 structured_data 

88 ) 

89 entity_type = cleaned_structured_data["entity_type"] 

90 properties = cleaned_structured_data.get("properties", {}) 

91 

92 _setup_editor_for_creation(editor, cleaned_structured_data) 

93 

94 for predicate, raw_values in properties.items(): 

95 predicate_uri = URIRef(predicate) 

96 values = raw_values if isinstance(raw_values, list) else [raw_values] 

97 

98 entity_shape = cleaned_structured_data.get("entity_shape") 

99 matching_key = find_matching_form_field(entity_type, entity_shape, form_fields) 

100 

101 field_definitions = ( 

102 form_fields.get(matching_key, {}).get(predicate, []) if matching_key else [] 

103 ) 

104 

105 property_shape = None 

106 if values and isinstance(values[0], dict): 

107 property_shape = values[0].get("shape") 

108 

109 matching_field_def = None 

110 for field_def in field_definitions: 

111 if property_shape: 

112 if field_def.get("subjectShape") == property_shape: 

113 matching_field_def = field_def 

114 break 

115 elif not field_def.get("subjectShape"): 

116 matching_field_def = field_def 

117 break 

118 

119 if not matching_field_def and field_definitions: 

120 matching_field_def = field_definitions[0] 

121 

122 ordered_by = matching_field_def.get("orderedBy") if matching_field_def else None 

123 

124 ctx = CreationContext( 

125 editor=editor, 

126 entity_uri=entity_uri, 

127 predicate=predicate_uri, 

128 default_graph_uri=default_graph_uri, 

129 ) 

130 

131 if ordered_by: 

132 process_ordered_properties(ctx, values, URIRef(ordered_by)) 

133 else: 

134 process_unordered_properties(ctx, values, matching_field_def) 

135 

136 

137def _create_entity_without_form_fields( 

138 editor: Editor, 

139 structured_data: dict, 

140 entity_uri: URIRef, 

141 default_graph_uri: URIRef | None, 

142) -> None: 

143 cleaned_structured_data = remove_virtual_properties_from_creation_data( 

144 structured_data 

145 ) 

146 entity_type = cleaned_structured_data["entity_type"] 

147 properties = cleaned_structured_data.get("properties", {}) 

148 

149 editor.import_entity(entity_uri) 

150 _setup_editor_for_creation(editor, cleaned_structured_data) 

151 

152 editor.create( 

153 entity_uri, 

154 RDF.type, 

155 URIRef(entity_type), 

156 default_graph_uri, 

157 ) 

158 

159 for predicate, values in properties.items(): 

160 predicate_uri = URIRef(predicate) 

161 for value_dict in values: 

162 if value_dict["type"] == "uri": 

163 editor.create( 

164 entity_uri, 

165 predicate_uri, 

166 URIRef(value_dict["value"]), 

167 default_graph_uri, 

168 ) 

169 elif value_dict["type"] == "literal": 

170 datatype = ( 

171 URIRef(value_dict["datatype"]) 

172 if "datatype" in value_dict 

173 else XSD.string 

174 ) 

175 editor.create( 

176 entity_uri, 

177 predicate_uri, 

178 Literal(value_dict["value"], datatype=datatype), 

179 default_graph_uri, 

180 ) 

181 

182 

183def _handle_create_entity_post( 

184 form_fields: dict, 

185 structured_data: dict, 

186 primary_source: str | None, 

187 *, 

188 save_default_source: bool, 

189) -> tuple[Response, int]: 

190 if primary_source and not is_valid_url(primary_source): 

191 return jsonify( 

192 { 

193 "status": "error", 

194 "errors": [gettext("Invalid primary source URL provided")], 

195 } 

196 ), 400 

197 

198 if save_default_source and primary_source and is_valid_url(primary_source): 

199 save_user_default_primary_source(current_user.orcid, primary_source) 

200 

201 resp_agent = get_responsible_agent_uri(current_user.orcid) 

202 editor = Editor( 

203 EndpointConfig( 

204 dataset=get_dataset_endpoint(), 

205 provenance=get_provenance_endpoint(), 

206 is_quadstore=current_app.config["DATASET_IS_QUADSTORE"], 

207 ), 

208 current_app.config["COUNTER_HANDLER"], 

209 resp_agent, 

210 URIRef(primary_source) if primary_source else None, 

211 current_app.config["DATASET_GENERATION_TIME"], 

212 ) 

213 

214 if not structured_data.get("entity_type"): 

215 return jsonify( 

216 {"status": "error", "errors": [gettext("Entity type is required")]} 

217 ), 400 

218 

219 cleaned_structured_data, _entity_type, _properties, entity_uri = ( 

220 _prepare_entity_creation_data(structured_data) 

221 ) 

222 

223 default_graph_uri = ( 

224 URIRef(f"{entity_uri}/graph") if editor.dataset_is_quadstore else None 

225 ) 

226 

227 if form_fields: 

228 validation_errors = validate_entity_data(cleaned_structured_data) 

229 if validation_errors: 

230 return jsonify({"status": "error", "errors": validation_errors}), 400 

231 _create_entity_with_form_fields( 

232 editor, 

233 structured_data, 

234 entity_uri, 

235 default_graph_uri, 

236 form_fields, 

237 ) 

238 else: 

239 _create_entity_without_form_fields( 

240 editor, 

241 structured_data, 

242 entity_uri, 

243 default_graph_uri, 

244 ) 

245 

246 try: 

247 editor.save() 

248 _process_virtual_properties_after_creation( 

249 editor, structured_data, entity_uri, default_graph_uri 

250 ) 

251 except (EditorError, OSError) as e: 

252 error_message = gettext( 

253 "An error occurred while creating the entity: %(error)s", error=str(e) 

254 ) 

255 return jsonify({"status": "error", "errors": [error_message]}), 500 

256 else: 

257 response = jsonify( 

258 { 

259 "status": "success", 

260 "redirect_url": url_for("entity.about", subject=str(entity_uri)), 

261 } 

262 ) 

263 flash(gettext("Entity created successfully"), "success") 

264 return response, 200 

265 

266 

267@entity_bp.route("/create-entity", methods=["GET", "POST"]) 

268@login_required 

269def create_entity() -> str | tuple[Response, int]: 

270 form_fields = get_form_fields() 

271 

272 default_primary_source = get_default_primary_source(current_user.orcid) 

273 

274 entity_class_shape_pairs = sorted( 

275 [ 

276 entity_key 

277 for entity_key in form_fields 

278 if is_entity_type_visible(entity_key) 

279 ], 

280 key=get_class_priority, 

281 reverse=True, 

282 ) 

283 

284 datatype_options = get_datatype_options() 

285 

286 if request.method == "POST": 

287 structured_data = json.loads(request.form.get("structured_data", "{}")) 

288 primary_source = request.form.get("primary_source") or None 

289 save_default_source = request.form.get("save_default_source") == "true" 

290 return _handle_create_entity_post( 

291 form_fields, 

292 structured_data, 

293 primary_source, 

294 save_default_source=save_default_source, 

295 ) 

296 

297 return render_template( 

298 "create_entity.jinja", 

299 datatype_options=datatype_options, 

300 dataset_db_triplestore=current_app.config["DATASET_DB_TRIPLESTORE"], 

301 dataset_db_text_index_enabled=current_app.config[ 

302 "DATASET_DB_TEXT_INDEX_ENABLED" 

303 ], 

304 default_primary_source=default_primary_source, 

305 shacl=bool(form_fields), 

306 entity_class_shape_pairs=entity_class_shape_pairs, 

307 ) 

308 

309 

310def create_nested_entity( 

311 editor: Editor, 

312 entity_uri: URIRef, 

313 entity_data: dict, 

314 graph_uri: URIRef | None = None, 

315) -> None: 

316 form_fields = get_form_fields() 

317 

318 editor.create( 

319 entity_uri, 

320 RDF.type, 

321 URIRef(entity_data["entity_type"]), 

322 graph_uri, 

323 ) 

324 

325 entity_type = entity_data.get("entity_type") 

326 entity_shape = entity_data.get("entity_shape") 

327 properties = entity_data.get("properties", {}) 

328 

329 matching_key = find_matching_form_field(entity_type, entity_shape, form_fields) 

330 

331 if not matching_key: 

332 return 

333 

334 for predicate, raw_values in properties.items(): 

335 predicate_uri = URIRef(predicate) 

336 values = raw_values if isinstance(raw_values, list) else [raw_values] 

337 field_definitions = form_fields[matching_key].get(predicate, []) 

338 

339 for value in values: 

340 if isinstance(value, dict) and "entity_type" in value: 

341 if "intermediateRelation" in value: 

342 intermediate_uri = generate_unique_uri( 

343 value["intermediateRelation"]["class"] 

344 ) 

345 target_uri = generate_unique_uri(value["entity_type"]) 

346 editor.create( 

347 entity_uri, predicate_uri, intermediate_uri, graph_uri 

348 ) 

349 editor.create( 

350 intermediate_uri, 

351 URIRef(value["intermediateRelation"]["property"]), 

352 target_uri, 

353 graph_uri, 

354 ) 

355 create_nested_entity(editor, target_uri, value, graph_uri) 

356 else: 

357 nested_uri = generate_unique_uri(value["entity_type"]) 

358 editor.create(entity_uri, predicate_uri, nested_uri, graph_uri) 

359 create_nested_entity(editor, nested_uri, value, graph_uri) 

360 elif isinstance(value, dict) and value.get("is_existing_entity", False): 

361 existing_entity_uri = value.get("entity_uri") 

362 if existing_entity_uri: 

363 editor.create( 

364 entity_uri, 

365 predicate_uri, 

366 URIRef(existing_entity_uri), 

367 graph_uri, 

368 ) 

369 else: 

370 str_value = str(value) 

371 if is_valid_url(str_value): 

372 object_value: URIRef | Literal = URIRef(str_value) 

373 else: 

374 datatype = XSD.string 

375 datatype_uris = [] 

376 if field_definitions: 

377 datatype_uris = field_definitions[0].get("datatypes", []) 

378 datatype = determine_datatype(str_value, datatype_uris) 

379 object_value = Literal(str_value, datatype=datatype) 

380 editor.create(entity_uri, predicate_uri, object_value, graph_uri) 

381 

382 

383@dataclass(frozen=True, slots=True) 

384class CreationContext: 

385 editor: Editor 

386 entity_uri: URIRef 

387 predicate: URIRef 

388 default_graph_uri: URIRef | None 

389 

390 

391def process_entity_value( 

392 ctx: CreationContext, 

393 value: dict | str, 

394 matching_field_def: dict | None, 

395) -> URIRef | Literal: 

396 if isinstance(value, dict) and "entity_type" in value: 

397 nested_uri = generate_unique_uri(value["entity_type"]) 

398 ctx.editor.create( 

399 ctx.entity_uri, 

400 ctx.predicate, 

401 nested_uri, 

402 ctx.default_graph_uri, 

403 ) 

404 create_nested_entity(ctx.editor, nested_uri, value, ctx.default_graph_uri) 

405 return nested_uri 

406 if isinstance(value, dict) and value.get("is_existing_entity", False): 

407 entity_ref_uri = value.get("entity_uri") 

408 if entity_ref_uri: 

409 object_value = URIRef(entity_ref_uri) 

410 ctx.editor.create( 

411 ctx.entity_uri, 

412 ctx.predicate, 

413 object_value, 

414 ctx.default_graph_uri, 

415 ) 

416 return object_value 

417 msg = "Missing entity_uri in existing entity reference" 

418 raise ValueError(msg) 

419 str_value = str(value) 

420 if is_valid_url(str_value): 

421 object_value: URIRef | Literal = URIRef(str_value) 

422 else: 

423 datatype_uris = [] 

424 if matching_field_def: 

425 datatype_uris = matching_field_def.get("datatypes", []) 

426 datatype = determine_datatype(str_value, datatype_uris) 

427 object_value = Literal(str_value, datatype=datatype) 

428 ctx.editor.create( 

429 ctx.entity_uri, 

430 ctx.predicate, 

431 object_value, 

432 ctx.default_graph_uri, 

433 ) 

434 return object_value 

435 

436 

437def _process_ordered_entity_value( 

438 ctx: CreationContext, 

439 value: dict, 

440) -> URIRef: 

441 if isinstance(value, dict) and "entity_type" in value: 

442 nested_uri = generate_unique_uri(value["entity_type"]) 

443 ctx.editor.create( 

444 ctx.entity_uri, 

445 ctx.predicate, 

446 nested_uri, 

447 ctx.default_graph_uri, 

448 ) 

449 create_nested_entity(ctx.editor, nested_uri, value, ctx.default_graph_uri) 

450 return nested_uri 

451 if isinstance(value, dict) and value.get("is_existing_entity", False): 

452 nested_uri = URIRef(value["entity_uri"]) 

453 ctx.editor.create( 

454 ctx.entity_uri, 

455 ctx.predicate, 

456 nested_uri, 

457 ctx.default_graph_uri, 

458 ) 

459 return nested_uri 

460 msg = "Unexpected value type for ordered property" 

461 raise ValueError(msg) 

462 

463 

464def process_ordered_properties( 

465 ctx: CreationContext, 

466 values: list[dict], 

467 ordered_by: URIRef, 

468) -> None: 

469 values_by_shape = {} 

470 for value in values: 

471 shape = value.get("entity_shape") 

472 if not shape: 

473 shape = "default_shape" 

474 if shape not in values_by_shape: 

475 values_by_shape[shape] = [] 

476 values_by_shape[shape].append(value) 

477 

478 for shape_values in values_by_shape.values(): 

479 previous_entity = None 

480 for value in shape_values: 

481 nested_uri = _process_ordered_entity_value(ctx, value) 

482 

483 if previous_entity: 

484 ctx.editor.create( 

485 previous_entity, 

486 ordered_by, 

487 nested_uri, 

488 ctx.default_graph_uri, 

489 ) 

490 previous_entity = nested_uri 

491 

492 

493def process_unordered_properties( 

494 ctx: CreationContext, 

495 values: list[dict | str], 

496 matching_field_def: dict | None, 

497) -> None: 

498 for value in values: 

499 process_entity_value(ctx, value, matching_field_def) 

500 

501 

502def determine_datatype(value: str, datatype_uris: list[str]) -> URIRef: 

503 for datatype_uri in datatype_uris: 

504 validation_func = next( 

505 (d[1] for d in DATATYPE_MAPPING if str(d[0]) == str(datatype_uri)), None 

506 ) 

507 if validation_func and validation_func(value): 

508 return URIRef(datatype_uri) 

509 return XSD.string