Coverage for heritrace/utils/display_rules_utils.py: 100%
234 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-18 11:10 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-18 11:10 +0000
1from collections import OrderedDict
2from typing import Tuple, List, Optional
3from urllib.parse import unquote
5from heritrace.extensions import get_display_rules, get_sparql
6from rdflib import ConjunctiveGraph, Graph, URIRef
7from rdflib.plugins.sparql.algebra import translateQuery
8from rdflib.plugins.sparql.parser import parseQuery
9from SPARQLWrapper import JSON
11display_rules = get_display_rules()
14def get_class_priority(class_uri):
15 """
16 Restituisce la priorità di una classe specifica.
17 Calcola la priorità direttamente dalle regole di visualizzazione.
18 """
19 rules = get_display_rules()
20 if not rules:
21 return 0
23 for rule in rules:
24 if rule["class"] == str(class_uri):
25 return rule.get("priority", 0)
26 return 0
29def is_entity_type_visible(entity_type):
30 display_rules = get_display_rules()
32 for rule in display_rules:
33 if rule["class"] == entity_type:
34 return rule.get("shouldBeDisplayed", True)
35 return True
38def get_sortable_properties(entity_type: str, display_rules, form_fields_cache) -> list:
39 """
40 Ottiene le proprietà ordinabili dalle regole di visualizzazione per un tipo di entità.
41 Inferisce il tipo di ordinamento dal form_fields_cache.
43 Args:
44 entity_type: L'URI del tipo di entità
46 Returns:
47 Lista di dizionari con le informazioni di ordinamento
48 """
49 if not display_rules:
50 return []
52 for rule in display_rules:
53 if rule["class"] == entity_type and "sortableBy" in rule:
54 # Aggiungiamo displayName ottenuto dalla proprietà nella classe
55 sort_props = []
56 for sort_config in rule["sortableBy"]:
57 prop = sort_config.copy()
59 # Trova la displayProperty corrispondente per ottenere il displayName
60 for display_prop in rule["displayProperties"]:
61 if display_prop["property"] == prop["property"]:
62 if "displayRules" in display_prop:
63 prop["displayName"] = display_prop["displayRules"][0][
64 "displayName"
65 ]
66 else:
67 prop["displayName"] = display_prop.get(
68 "displayName", prop["property"]
69 )
70 break
72 # Determina il tipo di ordinamento dalle form fields
73 if form_fields_cache and entity_type in form_fields_cache:
74 entity_fields = form_fields_cache[entity_type]
75 if prop["property"] in entity_fields:
76 field_info = entity_fields[prop["property"]][
77 0
78 ] # Prendi il primo field definition
80 # Se c'è una shape, è una referenza a un'entità (ordina per label)
81 if field_info.get("nodeShape"):
82 prop["sortType"] = "string"
83 # Altrimenti guarda i datatypes
84 elif field_info.get("datatypes"):
85 datatype = str(field_info["datatypes"][0]).lower()
86 if any(t in datatype for t in ["date", "time"]):
87 prop["sortType"] = "date"
88 elif any(
89 t in datatype
90 for t in ["int", "float", "decimal", "double", "number"]
91 ):
92 prop["sortType"] = "number"
93 elif "boolean" in datatype:
94 prop["sortType"] = "boolean"
95 else:
96 prop["sortType"] = "string"
97 else:
98 prop["sortType"] = "string"
100 sort_props.append(prop)
102 return sort_props
104 return []
107def get_highest_priority_class(subject_classes):
108 max_priority = None
109 highest_priority_class = None
110 for cls in subject_classes:
111 priority = get_class_priority(str(cls))
112 if max_priority is None or priority < max_priority:
113 max_priority = priority
114 highest_priority_class = cls
115 return highest_priority_class
118def get_grouped_triples(
119 subject, triples, subject_classes, valid_predicates_info, historical_snapshot=None
120):
121 display_rules = get_display_rules()
123 grouped_triples = OrderedDict()
124 relevant_properties = set()
125 fetched_values_map = (
126 dict()
127 ) # Map of original values to values returned by the query
128 primary_properties = valid_predicates_info
129 highest_priority_class = get_highest_priority_class(subject_classes)
130 highest_priority_rules = [
131 rule for rule in display_rules if rule["class"] == str(highest_priority_class)
132 ]
133 for prop_uri in primary_properties:
134 if display_rules and highest_priority_rules:
135 matched_rules = []
136 for rule in highest_priority_rules:
137 for prop in rule["displayProperties"]:
138 if prop["property"] == prop_uri:
139 matched_rules.append(rule)
140 if matched_rules:
141 rule = matched_rules[0]
142 for prop in rule["displayProperties"]:
143 if prop["property"] == prop_uri:
144 is_ordered = "orderedBy" in prop
145 order_property = prop.get("orderedBy")
147 if "displayRules" in prop:
148 for display_rule in prop["displayRules"]:
149 display_name = display_rule.get("displayName", prop_uri)
150 relevant_properties.add(prop_uri)
151 process_display_rule(
152 display_name,
153 prop_uri,
154 display_rule,
155 subject,
156 triples,
157 grouped_triples,
158 fetched_values_map,
159 historical_snapshot,
160 )
162 if is_ordered:
163 grouped_triples[display_name]["is_draggable"] = True
164 grouped_triples[display_name][
165 "ordered_by"
166 ] = order_property
167 process_ordering(
168 subject,
169 prop,
170 order_property,
171 grouped_triples,
172 display_name,
173 fetched_values_map,
174 historical_snapshot,
175 )
177 if "intermediateRelation" in prop:
178 grouped_triples[display_name][
179 "intermediateRelation"
180 ] = prop["intermediateRelation"]
181 else:
182 display_name = prop.get("displayName", prop_uri)
183 relevant_properties.add(prop_uri)
184 process_display_rule(
185 display_name,
186 prop_uri,
187 prop,
188 subject,
189 triples,
190 grouped_triples,
191 fetched_values_map,
192 historical_snapshot,
193 )
195 if is_ordered:
196 grouped_triples[display_name]["is_draggable"] = True
197 grouped_triples[display_name][
198 "ordered_by"
199 ] = order_property
200 process_ordering(
201 subject,
202 prop,
203 order_property,
204 grouped_triples,
205 display_name,
206 fetched_values_map,
207 historical_snapshot,
208 )
210 if "intermediateRelation" in prop:
211 grouped_triples[display_name][
212 "intermediateRelation"
213 ] = prop["intermediateRelation"]
214 else:
215 process_default_property(prop_uri, triples, grouped_triples)
216 else:
217 process_default_property(prop_uri, triples, grouped_triples)
219 if display_rules:
220 ordered_display_names = []
221 for rule in display_rules:
222 if URIRef(rule["class"]) in subject_classes:
223 for prop in rule["displayProperties"]:
224 if "displayRules" in prop:
225 for display_rule in prop["displayRules"]:
226 display_name = display_rule.get(
227 "displayName", prop["property"]
228 )
229 if display_name in grouped_triples:
230 ordered_display_names.append(display_name)
231 else:
232 display_name = prop.get("displayName", prop["property"])
233 if display_name in grouped_triples:
234 ordered_display_names.append(display_name)
235 for display_name in grouped_triples.keys():
236 if display_name not in ordered_display_names:
237 ordered_display_names.append(display_name)
238 else:
239 ordered_display_names = list(grouped_triples.keys())
241 grouped_triples = OrderedDict(
242 (k, grouped_triples[k]) for k in ordered_display_names
243 )
244 return grouped_triples, relevant_properties
247def process_display_rule(
248 display_name,
249 prop_uri,
250 rule,
251 subject,
252 triples,
253 grouped_triples,
254 fetched_values_map,
255 historical_snapshot=None,
256):
257 if display_name not in grouped_triples:
258 grouped_triples[display_name] = {
259 "property": prop_uri,
260 "triples": [],
261 "shape": rule.get("shape"),
262 "intermediateRelation": rule.get("intermediateRelation"),
263 }
264 for triple in triples:
265 if str(triple[1]) == prop_uri:
266 if rule.get("fetchValueFromQuery"):
267 if historical_snapshot:
268 result, external_entity = execute_historical_query(
269 rule["fetchValueFromQuery"],
270 subject,
271 triple[2],
272 historical_snapshot,
273 )
274 else:
275 result, external_entity = execute_sparql_query(
276 rule["fetchValueFromQuery"], subject, triple[2]
277 )
278 if result:
279 fetched_values_map[str(result)] = str(triple[2])
280 new_triple = (str(triple[0]), str(triple[1]), str(result))
281 new_triple_data = {
282 "triple": new_triple,
283 "external_entity": external_entity,
284 "object": str(triple[2]),
285 "shape": rule.get("shape"),
286 }
287 grouped_triples[display_name]["triples"].append(new_triple_data)
288 else:
289 new_triple_data = {
290 "triple": (str(triple[0]), str(triple[1]), str(triple[2])),
291 "object": str(triple[2]),
292 "shape": rule.get("shape"),
293 }
294 grouped_triples[display_name]["triples"].append(new_triple_data)
297def execute_sparql_query(query: str, subject: str, value: str) -> Tuple[str, str]:
298 sparql = get_sparql()
300 decoded_subject = unquote(subject)
301 decoded_value = unquote(value)
302 query = query.replace("[[subject]]", f"<{decoded_subject}>")
303 query = query.replace("[[value]]", f"<{decoded_value}>")
304 sparql.setQuery(query)
305 sparql.setReturnFormat(JSON)
306 results = sparql.query().convert().get("results", {}).get("bindings", [])
307 if results:
308 parsed_query = parseQuery(query)
309 algebra_query = translateQuery(parsed_query).algebra
310 variable_order = algebra_query["PV"]
311 result = results[0]
312 values = [
313 result.get(str(var_name), {}).get("value", None)
314 for var_name in variable_order
315 ]
316 first_value = values[0] if len(values) > 0 else None
317 second_value = values[1] if len(values) > 1 else None
318 return (first_value, second_value)
319 return None, None
322def process_ordering(
323 subject,
324 prop,
325 order_property,
326 grouped_triples,
327 display_name,
328 fetched_values_map,
329 historical_snapshot: ConjunctiveGraph | Graph | None = None,
330):
331 def get_ordered_sequence(order_results):
332 order_map = {}
333 for res in order_results:
334 if isinstance(res, dict): # For live triplestore results
335 ordered_entity = res["orderedEntity"]["value"]
336 next_value = res["nextValue"]["value"]
337 else: # For historical snapshot results
338 ordered_entity = str(res[0])
339 next_value = str(res[1])
341 order_map[str(ordered_entity)] = (
342 None if str(next_value) == "NONE" else str(next_value)
343 )
345 all_sequences = []
346 start_elements = set(order_map.keys()) - set(order_map.values())
347 while start_elements:
348 sequence = []
349 current_element = start_elements.pop()
350 while current_element in order_map:
351 sequence.append(current_element)
352 current_element = order_map[current_element]
353 all_sequences.append(sequence)
354 return all_sequences
356 decoded_subject = unquote(subject)
358 sparql = get_sparql()
360 order_query = f"""
361 SELECT ?orderedEntity (COALESCE(?next, "NONE") AS ?nextValue)
362 WHERE {{
363 <{decoded_subject}> <{prop['property']}> ?orderedEntity.
364 OPTIONAL {{
365 ?orderedEntity <{order_property}> ?next.
366 }}
367 }}
368 """
369 if historical_snapshot:
370 order_results = list(historical_snapshot.query(order_query))
371 else:
372 sparql.setQuery(order_query)
373 sparql.setReturnFormat(JSON)
374 order_results = sparql.query().convert().get("results", {}).get("bindings", [])
376 order_sequences = get_ordered_sequence(order_results)
377 for sequence in order_sequences:
378 grouped_triples[display_name]["triples"].sort(
379 key=lambda x: (
380 sequence.index(
381 fetched_values_map.get(str(x["triple"][2]), str(x["triple"][2]))
382 )
383 if fetched_values_map.get(str(x["triple"][2]), str(x["triple"][2]))
384 in sequence
385 else float("inf")
386 )
387 )
390def process_default_property(prop_uri, triples, grouped_triples):
391 display_name = prop_uri
392 grouped_triples[display_name] = {"property": prop_uri, "triples": [], "shape": None}
393 triples_for_prop = [triple for triple in triples if str(triple[1]) == prop_uri]
394 for triple in triples_for_prop:
395 new_triple_data = {
396 "triple": (str(triple[0]), str(triple[1]), str(triple[2])),
397 "object": str(triple[2]),
398 "shape": None,
399 }
400 grouped_triples[display_name]["triples"].append(new_triple_data)
403def execute_historical_query(
404 query: str, subject: str, value: str, historical_snapshot: Graph
405) -> Tuple[str, str]:
406 decoded_subject = unquote(subject)
407 decoded_value = unquote(value)
408 query = query.replace("[[subject]]", f"<{decoded_subject}>")
409 query = query.replace("[[value]]", f"<{decoded_value}>")
410 results = historical_snapshot.query(query)
411 if results:
412 for result in results:
413 return (str(result[0]), str(result[1]))
414 return None, None
417def get_property_order_from_rules(subject_classes: list, display_rules: list) -> list:
418 """
419 Extract ordered list of properties from display rules for given entity classes.
421 Args:
422 subject_classes: List of class URIs for the entity
423 display_rules: List of display rule configurations
425 Returns:
426 List of property URIs in the order specified by display rules
427 """
428 ordered_properties = []
429 highest_priority_class = get_highest_priority_class(subject_classes)
431 if display_rules and highest_priority_class:
432 # Find matching rule for the entity's highest priority class
433 for rule in display_rules:
434 if rule["class"] == str(highest_priority_class):
435 # Extract properties in order from displayProperties
436 for prop in rule.get("displayProperties", []):
437 if isinstance(prop, dict) and "property" in prop:
438 ordered_properties.append(prop["property"])
439 break
441 return ordered_properties
444def get_similarity_properties(entity_type: str) -> Optional[List[str]]:
445 """Gets the list of property URIs specified for similarity matching for a given entity type.
447 Args:
448 entity_type: The URI of the entity type.
450 Returns:
451 A list of property URIs to use for similarity matching, or None if not specified.
452 """
453 rules = get_display_rules()
454 if not rules:
455 return None
457 for rule in rules:
458 if rule["class"] == entity_type:
459 similarity_props = rule.get("similarity_properties")
460 # Return the list only if it exists and is not empty
461 # Ensure it's a list of strings (basic validation)
462 if similarity_props and isinstance(similarity_props, list) and all(isinstance(item, str) for item in similarity_props):
463 return similarity_props
464 else:
465 # Log a warning if the format is incorrect but the key exists
466 if similarity_props:
467 print(f"Warning: Invalid format for similarity_properties in class {entity_type}. Expected list of URIs.")
468 return None # Return None if format is incorrect or list is empty
470 return None # Return None if the class or similarity_properties are not found