Coverage for heritrace/utils/display_rules_utils.py: 100%

234 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-18 11:10 +0000

1from collections import OrderedDict 

2from typing import Tuple, List, Optional 

3from urllib.parse import unquote 

4 

5from heritrace.extensions import get_display_rules, get_sparql 

6from rdflib import ConjunctiveGraph, Graph, URIRef 

7from rdflib.plugins.sparql.algebra import translateQuery 

8from rdflib.plugins.sparql.parser import parseQuery 

9from SPARQLWrapper import JSON 

10 

11display_rules = get_display_rules() 

12 

13 

14def get_class_priority(class_uri): 

15 """ 

16 Restituisce la priorità di una classe specifica. 

17 Calcola la priorità direttamente dalle regole di visualizzazione. 

18 """ 

19 rules = get_display_rules() 

20 if not rules: 

21 return 0 

22 

23 for rule in rules: 

24 if rule["class"] == str(class_uri): 

25 return rule.get("priority", 0) 

26 return 0 

27 

28 

29def is_entity_type_visible(entity_type): 

30 display_rules = get_display_rules() 

31 

32 for rule in display_rules: 

33 if rule["class"] == entity_type: 

34 return rule.get("shouldBeDisplayed", True) 

35 return True 

36 

37 

38def get_sortable_properties(entity_type: str, display_rules, form_fields_cache) -> list: 

39 """ 

40 Ottiene le proprietà ordinabili dalle regole di visualizzazione per un tipo di entità. 

41 Inferisce il tipo di ordinamento dal form_fields_cache. 

42 

43 Args: 

44 entity_type: L'URI del tipo di entità 

45 

46 Returns: 

47 Lista di dizionari con le informazioni di ordinamento 

48 """ 

49 if not display_rules: 

50 return [] 

51 

52 for rule in display_rules: 

53 if rule["class"] == entity_type and "sortableBy" in rule: 

54 # Aggiungiamo displayName ottenuto dalla proprietà nella classe 

55 sort_props = [] 

56 for sort_config in rule["sortableBy"]: 

57 prop = sort_config.copy() 

58 

59 # Trova la displayProperty corrispondente per ottenere il displayName 

60 for display_prop in rule["displayProperties"]: 

61 if display_prop["property"] == prop["property"]: 

62 if "displayRules" in display_prop: 

63 prop["displayName"] = display_prop["displayRules"][0][ 

64 "displayName" 

65 ] 

66 else: 

67 prop["displayName"] = display_prop.get( 

68 "displayName", prop["property"] 

69 ) 

70 break 

71 

72 # Determina il tipo di ordinamento dalle form fields 

73 if form_fields_cache and entity_type in form_fields_cache: 

74 entity_fields = form_fields_cache[entity_type] 

75 if prop["property"] in entity_fields: 

76 field_info = entity_fields[prop["property"]][ 

77 0 

78 ] # Prendi il primo field definition 

79 

80 # Se c'è una shape, è una referenza a un'entità (ordina per label) 

81 if field_info.get("nodeShape"): 

82 prop["sortType"] = "string" 

83 # Altrimenti guarda i datatypes 

84 elif field_info.get("datatypes"): 

85 datatype = str(field_info["datatypes"][0]).lower() 

86 if any(t in datatype for t in ["date", "time"]): 

87 prop["sortType"] = "date" 

88 elif any( 

89 t in datatype 

90 for t in ["int", "float", "decimal", "double", "number"] 

91 ): 

92 prop["sortType"] = "number" 

93 elif "boolean" in datatype: 

94 prop["sortType"] = "boolean" 

95 else: 

96 prop["sortType"] = "string" 

97 else: 

98 prop["sortType"] = "string" 

99 

100 sort_props.append(prop) 

101 

102 return sort_props 

103 

104 return [] 

105 

106 

107def get_highest_priority_class(subject_classes): 

108 max_priority = None 

109 highest_priority_class = None 

110 for cls in subject_classes: 

111 priority = get_class_priority(str(cls)) 

112 if max_priority is None or priority < max_priority: 

113 max_priority = priority 

114 highest_priority_class = cls 

115 return highest_priority_class 

116 

117 

118def get_grouped_triples( 

119 subject, triples, subject_classes, valid_predicates_info, historical_snapshot=None 

120): 

121 display_rules = get_display_rules() 

122 

123 grouped_triples = OrderedDict() 

124 relevant_properties = set() 

125 fetched_values_map = ( 

126 dict() 

127 ) # Map of original values to values returned by the query 

128 primary_properties = valid_predicates_info 

129 highest_priority_class = get_highest_priority_class(subject_classes) 

130 highest_priority_rules = [ 

131 rule for rule in display_rules if rule["class"] == str(highest_priority_class) 

132 ] 

133 for prop_uri in primary_properties: 

134 if display_rules and highest_priority_rules: 

135 matched_rules = [] 

136 for rule in highest_priority_rules: 

137 for prop in rule["displayProperties"]: 

138 if prop["property"] == prop_uri: 

139 matched_rules.append(rule) 

140 if matched_rules: 

141 rule = matched_rules[0] 

142 for prop in rule["displayProperties"]: 

143 if prop["property"] == prop_uri: 

144 is_ordered = "orderedBy" in prop 

145 order_property = prop.get("orderedBy") 

146 

147 if "displayRules" in prop: 

148 for display_rule in prop["displayRules"]: 

149 display_name = display_rule.get("displayName", prop_uri) 

150 relevant_properties.add(prop_uri) 

151 process_display_rule( 

152 display_name, 

153 prop_uri, 

154 display_rule, 

155 subject, 

156 triples, 

157 grouped_triples, 

158 fetched_values_map, 

159 historical_snapshot, 

160 ) 

161 

162 if is_ordered: 

163 grouped_triples[display_name]["is_draggable"] = True 

164 grouped_triples[display_name][ 

165 "ordered_by" 

166 ] = order_property 

167 process_ordering( 

168 subject, 

169 prop, 

170 order_property, 

171 grouped_triples, 

172 display_name, 

173 fetched_values_map, 

174 historical_snapshot, 

175 ) 

176 

177 if "intermediateRelation" in prop: 

178 grouped_triples[display_name][ 

179 "intermediateRelation" 

180 ] = prop["intermediateRelation"] 

181 else: 

182 display_name = prop.get("displayName", prop_uri) 

183 relevant_properties.add(prop_uri) 

184 process_display_rule( 

185 display_name, 

186 prop_uri, 

187 prop, 

188 subject, 

189 triples, 

190 grouped_triples, 

191 fetched_values_map, 

192 historical_snapshot, 

193 ) 

194 

195 if is_ordered: 

196 grouped_triples[display_name]["is_draggable"] = True 

197 grouped_triples[display_name][ 

198 "ordered_by" 

199 ] = order_property 

200 process_ordering( 

201 subject, 

202 prop, 

203 order_property, 

204 grouped_triples, 

205 display_name, 

206 fetched_values_map, 

207 historical_snapshot, 

208 ) 

209 

210 if "intermediateRelation" in prop: 

211 grouped_triples[display_name][ 

212 "intermediateRelation" 

213 ] = prop["intermediateRelation"] 

214 else: 

215 process_default_property(prop_uri, triples, grouped_triples) 

216 else: 

217 process_default_property(prop_uri, triples, grouped_triples) 

218 

219 if display_rules: 

220 ordered_display_names = [] 

221 for rule in display_rules: 

222 if URIRef(rule["class"]) in subject_classes: 

223 for prop in rule["displayProperties"]: 

224 if "displayRules" in prop: 

225 for display_rule in prop["displayRules"]: 

226 display_name = display_rule.get( 

227 "displayName", prop["property"] 

228 ) 

229 if display_name in grouped_triples: 

230 ordered_display_names.append(display_name) 

231 else: 

232 display_name = prop.get("displayName", prop["property"]) 

233 if display_name in grouped_triples: 

234 ordered_display_names.append(display_name) 

235 for display_name in grouped_triples.keys(): 

236 if display_name not in ordered_display_names: 

237 ordered_display_names.append(display_name) 

238 else: 

239 ordered_display_names = list(grouped_triples.keys()) 

240 

241 grouped_triples = OrderedDict( 

242 (k, grouped_triples[k]) for k in ordered_display_names 

243 ) 

244 return grouped_triples, relevant_properties 

245 

246 

247def process_display_rule( 

248 display_name, 

249 prop_uri, 

250 rule, 

251 subject, 

252 triples, 

253 grouped_triples, 

254 fetched_values_map, 

255 historical_snapshot=None, 

256): 

257 if display_name not in grouped_triples: 

258 grouped_triples[display_name] = { 

259 "property": prop_uri, 

260 "triples": [], 

261 "shape": rule.get("shape"), 

262 "intermediateRelation": rule.get("intermediateRelation"), 

263 } 

264 for triple in triples: 

265 if str(triple[1]) == prop_uri: 

266 if rule.get("fetchValueFromQuery"): 

267 if historical_snapshot: 

268 result, external_entity = execute_historical_query( 

269 rule["fetchValueFromQuery"], 

270 subject, 

271 triple[2], 

272 historical_snapshot, 

273 ) 

274 else: 

275 result, external_entity = execute_sparql_query( 

276 rule["fetchValueFromQuery"], subject, triple[2] 

277 ) 

278 if result: 

279 fetched_values_map[str(result)] = str(triple[2]) 

280 new_triple = (str(triple[0]), str(triple[1]), str(result)) 

281 new_triple_data = { 

282 "triple": new_triple, 

283 "external_entity": external_entity, 

284 "object": str(triple[2]), 

285 "shape": rule.get("shape"), 

286 } 

287 grouped_triples[display_name]["triples"].append(new_triple_data) 

288 else: 

289 new_triple_data = { 

290 "triple": (str(triple[0]), str(triple[1]), str(triple[2])), 

291 "object": str(triple[2]), 

292 "shape": rule.get("shape"), 

293 } 

294 grouped_triples[display_name]["triples"].append(new_triple_data) 

295 

296 

297def execute_sparql_query(query: str, subject: str, value: str) -> Tuple[str, str]: 

298 sparql = get_sparql() 

299 

300 decoded_subject = unquote(subject) 

301 decoded_value = unquote(value) 

302 query = query.replace("[[subject]]", f"<{decoded_subject}>") 

303 query = query.replace("[[value]]", f"<{decoded_value}>") 

304 sparql.setQuery(query) 

305 sparql.setReturnFormat(JSON) 

306 results = sparql.query().convert().get("results", {}).get("bindings", []) 

307 if results: 

308 parsed_query = parseQuery(query) 

309 algebra_query = translateQuery(parsed_query).algebra 

310 variable_order = algebra_query["PV"] 

311 result = results[0] 

312 values = [ 

313 result.get(str(var_name), {}).get("value", None) 

314 for var_name in variable_order 

315 ] 

316 first_value = values[0] if len(values) > 0 else None 

317 second_value = values[1] if len(values) > 1 else None 

318 return (first_value, second_value) 

319 return None, None 

320 

321 

322def process_ordering( 

323 subject, 

324 prop, 

325 order_property, 

326 grouped_triples, 

327 display_name, 

328 fetched_values_map, 

329 historical_snapshot: ConjunctiveGraph | Graph | None = None, 

330): 

331 def get_ordered_sequence(order_results): 

332 order_map = {} 

333 for res in order_results: 

334 if isinstance(res, dict): # For live triplestore results 

335 ordered_entity = res["orderedEntity"]["value"] 

336 next_value = res["nextValue"]["value"] 

337 else: # For historical snapshot results 

338 ordered_entity = str(res[0]) 

339 next_value = str(res[1]) 

340 

341 order_map[str(ordered_entity)] = ( 

342 None if str(next_value) == "NONE" else str(next_value) 

343 ) 

344 

345 all_sequences = [] 

346 start_elements = set(order_map.keys()) - set(order_map.values()) 

347 while start_elements: 

348 sequence = [] 

349 current_element = start_elements.pop() 

350 while current_element in order_map: 

351 sequence.append(current_element) 

352 current_element = order_map[current_element] 

353 all_sequences.append(sequence) 

354 return all_sequences 

355 

356 decoded_subject = unquote(subject) 

357 

358 sparql = get_sparql() 

359 

360 order_query = f""" 

361 SELECT ?orderedEntity (COALESCE(?next, "NONE") AS ?nextValue) 

362 WHERE {{ 

363 <{decoded_subject}> <{prop['property']}> ?orderedEntity. 

364 OPTIONAL {{ 

365 ?orderedEntity <{order_property}> ?next. 

366 }} 

367 }} 

368 """ 

369 if historical_snapshot: 

370 order_results = list(historical_snapshot.query(order_query)) 

371 else: 

372 sparql.setQuery(order_query) 

373 sparql.setReturnFormat(JSON) 

374 order_results = sparql.query().convert().get("results", {}).get("bindings", []) 

375 

376 order_sequences = get_ordered_sequence(order_results) 

377 for sequence in order_sequences: 

378 grouped_triples[display_name]["triples"].sort( 

379 key=lambda x: ( 

380 sequence.index( 

381 fetched_values_map.get(str(x["triple"][2]), str(x["triple"][2])) 

382 ) 

383 if fetched_values_map.get(str(x["triple"][2]), str(x["triple"][2])) 

384 in sequence 

385 else float("inf") 

386 ) 

387 ) 

388 

389 

390def process_default_property(prop_uri, triples, grouped_triples): 

391 display_name = prop_uri 

392 grouped_triples[display_name] = {"property": prop_uri, "triples": [], "shape": None} 

393 triples_for_prop = [triple for triple in triples if str(triple[1]) == prop_uri] 

394 for triple in triples_for_prop: 

395 new_triple_data = { 

396 "triple": (str(triple[0]), str(triple[1]), str(triple[2])), 

397 "object": str(triple[2]), 

398 "shape": None, 

399 } 

400 grouped_triples[display_name]["triples"].append(new_triple_data) 

401 

402 

403def execute_historical_query( 

404 query: str, subject: str, value: str, historical_snapshot: Graph 

405) -> Tuple[str, str]: 

406 decoded_subject = unquote(subject) 

407 decoded_value = unquote(value) 

408 query = query.replace("[[subject]]", f"<{decoded_subject}>") 

409 query = query.replace("[[value]]", f"<{decoded_value}>") 

410 results = historical_snapshot.query(query) 

411 if results: 

412 for result in results: 

413 return (str(result[0]), str(result[1])) 

414 return None, None 

415 

416 

417def get_property_order_from_rules(subject_classes: list, display_rules: list) -> list: 

418 """ 

419 Extract ordered list of properties from display rules for given entity classes. 

420 

421 Args: 

422 subject_classes: List of class URIs for the entity 

423 display_rules: List of display rule configurations 

424 

425 Returns: 

426 List of property URIs in the order specified by display rules 

427 """ 

428 ordered_properties = [] 

429 highest_priority_class = get_highest_priority_class(subject_classes) 

430 

431 if display_rules and highest_priority_class: 

432 # Find matching rule for the entity's highest priority class 

433 for rule in display_rules: 

434 if rule["class"] == str(highest_priority_class): 

435 # Extract properties in order from displayProperties 

436 for prop in rule.get("displayProperties", []): 

437 if isinstance(prop, dict) and "property" in prop: 

438 ordered_properties.append(prop["property"]) 

439 break 

440 

441 return ordered_properties 

442 

443 

444def get_similarity_properties(entity_type: str) -> Optional[List[str]]: 

445 """Gets the list of property URIs specified for similarity matching for a given entity type. 

446 

447 Args: 

448 entity_type: The URI of the entity type. 

449 

450 Returns: 

451 A list of property URIs to use for similarity matching, or None if not specified. 

452 """ 

453 rules = get_display_rules() 

454 if not rules: 

455 return None 

456 

457 for rule in rules: 

458 if rule["class"] == entity_type: 

459 similarity_props = rule.get("similarity_properties") 

460 # Return the list only if it exists and is not empty 

461 # Ensure it's a list of strings (basic validation) 

462 if similarity_props and isinstance(similarity_props, list) and all(isinstance(item, str) for item in similarity_props): 

463 return similarity_props 

464 else: 

465 # Log a warning if the format is incorrect but the key exists 

466 if similarity_props: 

467 print(f"Warning: Invalid format for similarity_properties in class {entity_type}. Expected list of URIs.") 

468 return None # Return None if format is incorrect or list is empty 

469 

470 return None # Return None if the class or similarity_properties are not found