Coverage for heritrace / utils / filters.py: 99%
137 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-21 12:56 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-21 12:56 +0000
1# SPDX-FileCopyrightText: 2024-2025 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5from __future__ import annotations
7import threading
8from typing import Tuple
9from urllib.parse import quote, urlparse
11import dateutil
12import validators
13from flask import url_for
14from flask_babel import format_datetime, gettext, lazy_gettext
15from heritrace.apis.orcid import format_orcid_attribution, is_orcid_url
16from heritrace.apis.zenodo import format_zenodo_source, is_zenodo_url
17from rdflib import Dataset, Graph
18from SPARQLWrapper import JSON
21class Filter:
22 def __init__(self, context: dict, display_rules: dict, sparql_endpoint: str):
23 from heritrace.extensions import get_sparql
25 self.context = context
26 self.display_rules = display_rules
27 self.sparql = get_sparql()
28 self.sparql.setReturnFormat(JSON)
29 self._query_lock = threading.Lock()
31 def human_readable_predicate(self, predicate_uri: str, entity_key: tuple[str, str], is_link=False, object_shape_uri: str = None):
32 """Get human readable label for a predicate in the context of an entity.
34 Args:
35 predicate_uri: URI of the predicate to get label for
36 entity_key: Tuple of (class_uri, shape_uri) for the entity context
37 is_link: Whether to format as a link
38 object_shape_uri: Shape URI of the object entity (for shape-specific display rules)
40 Returns:
41 str: Human readable label for the predicate
42 """
43 from heritrace.utils.display_rules_utils import find_matching_rule
45 class_uri, shape_uri = entity_key
46 rule = find_matching_rule(class_uri, shape_uri, self.display_rules)
48 if rule:
49 if "displayProperties" in rule:
50 for display_property in rule["displayProperties"]:
51 prop_uri = display_property.get("property") or display_property.get("virtual_property")
52 if prop_uri == str(predicate_uri):
53 if "displayRules" in display_property:
54 if object_shape_uri:
55 for display_rule in display_property["displayRules"]:
56 if display_rule.get("shape") == object_shape_uri:
57 return display_rule["displayName"]
58 return display_property["displayRules"][0]["displayName"]
59 elif "displayName" in display_property:
60 return display_property["displayName"]
62 first_part, _ = split_namespace(predicate_uri)
63 if first_part in self.context:
64 return format_uri_as_readable(predicate_uri)
65 elif validators.url(predicate_uri) and is_link:
66 return f"<a href='{url_for('entity.about', subject=quote(predicate_uri))}' alt='{gettext('Link to the entity %(entity)s', entity=predicate_uri)}'>{predicate_uri}</a>"
67 else:
68 return str(predicate_uri)
70 def human_readable_class(self, entity_key):
71 """
72 Converts a class URI to human-readable format.
74 Args:
75 entity_key (tuple): A tuple containing (class_uri, shape_uri)
77 Returns:
78 str: Human-readable representation of the class
79 """
80 from heritrace.utils.display_rules_utils import find_matching_rule
81 from heritrace.utils.shacl_utils import determine_shape_for_classes
83 class_uri, shape_uri = entity_key
85 if class_uri is None and shape_uri is None:
86 return "Unknown"
88 if shape_uri is None:
89 shape_uri = determine_shape_for_classes([class_uri])
90 rule = find_matching_rule(class_uri, shape_uri, self.display_rules)
92 if rule and "displayName" in rule:
93 return rule["displayName"]
95 return format_uri_as_readable(class_uri)
97 def human_readable_entity(
98 self, uri: str, entity_key: tuple[str, str | None], graph: Graph | Dataset = None
99 ) -> str:
100 """Convert an entity URI to human-readable format using display rules.
102 Args:
103 uri: The URI of the entity to format
104 entity_key: A tuple containing (class_uri, shape_uri)
105 graph: Optional graph to use for fetching URI display values
107 Returns:
108 str: Human-readable representation of the entity
109 """
110 from heritrace.utils.display_rules_utils import find_matching_rule
112 class_uri = entity_key[0]
113 shape_uri = entity_key[1]
115 rule = find_matching_rule(class_uri, shape_uri, self.display_rules)
116 if not rule:
117 return uri
119 if "fetchUriDisplay" in rule:
120 uri_display = self.get_fetch_uri_display(uri, rule, graph)
121 if uri_display:
122 return uri_display
124 if "displayName" in rule:
125 return rule["displayName"]
127 return uri
129 def get_fetch_uri_display(
130 self, uri: str, rule: dict, graph: Graph | Dataset = None
131 ) -> str | None:
132 """Get a display value for an entity URI using fetchUriDisplay rules.
134 Args:
135 uri: The URI to get a display value for
136 rule: The display rule containing the fetchUriDisplay query
137 graph: Optional graph to use for fetching URI display values
139 Returns:
140 str | None: The display value if found, None otherwise
141 """
142 if "fetchUriDisplay" in rule:
143 query = rule["fetchUriDisplay"].replace("[[uri]]", f"<{uri}>")
144 if graph is not None:
145 try:
146 with self._query_lock:
147 results = graph.query(query)
148 for row in results:
149 return str(row[0])
150 except Exception as e:
151 print(
152 f"Error executing fetchUriDisplay query: {e}. {query}"
153 )
154 else:
155 self.sparql.setQuery(query)
156 try:
157 results = self.sparql.query().convert()
158 if results["results"]["bindings"]:
159 first_binding = results["results"]["bindings"][0]
160 first_key = list(first_binding.keys())[0]
161 return first_binding[first_key]["value"]
162 except Exception as e:
163 print(f"Error executing fetchUriDisplay query: {e}")
164 return None
166 def human_readable_datetime(self, dt_str):
167 dt = dateutil.parser.parse(dt_str)
168 return format_datetime(dt, format="long")
171 def human_readable_primary_source(self, primary_source: str | None) -> str:
172 if primary_source is None:
173 return lazy_gettext("Unknown")
174 if "/prov/se" in primary_source:
175 version_url = f"/entity-version/{primary_source.replace('/prov/se', '')}"
176 return (
177 f"<a href='{version_url}' alt='{lazy_gettext('Link to the primary source description')}'>"
178 + lazy_gettext("Version")
179 + " "
180 + primary_source.split("/prov/se/")[-1]
181 + "</a>"
182 )
183 else:
184 if validators.url(primary_source):
185 return f"<a href='{primary_source}' alt='{lazy_gettext('Link to the primary source description')} target='_blank'>{primary_source}</a>"
186 else:
187 return primary_source
189 def format_source_reference(self, url: str) -> str:
190 """
191 Format a source reference for display, handling various URL types including Zenodo DOIs and generic URLs.
193 Args:
194 url (str): The source URL or identifier to format
195 human_readable_primary_source (callable): Function to handle generic/unknown source types
197 Returns:
198 str: Formatted HTML string representing the source
199 """
200 if not url:
201 return "Unknown"
203 # First check if it's a Zenodo DOI since this is more specific than a generic URL
204 if is_zenodo_url(url):
205 return format_zenodo_source(url)
207 # If not Zenodo, use the provided generic handler
208 return self.human_readable_primary_source(url)
210 def format_agent_reference(self, url: str) -> str:
211 """
212 Format an agent reference for display, handling various URL types including ORCID and others.
214 Args:
215 url (str): The agent URL or identifier to format
217 Returns:
218 str: Formatted HTML string representing the agent
219 """
220 if not url:
221 return "Unknown"
223 if is_orcid_url(url):
224 return format_orcid_attribution(url)
226 # For now, just return a simple linked version for other URLs
227 if validators.url(url):
228 return f'<a href="{url}" target="_blank">{url}</a>'
230 # If it's not a URL at all, just return the raw value
231 return url
234def split_namespace(uri: str) -> Tuple[str, str]:
235 """
236 Split a URI into namespace and local part.
238 Args:
239 uri: The URI to split
241 Returns:
242 Tuple of (namespace, local_part)
243 """
244 parsed = urlparse(uri)
245 if parsed.fragment:
246 first_part = parsed.scheme + "://" + parsed.netloc + parsed.path + "#"
247 last_part = parsed.fragment
248 else:
249 first_part = (
250 parsed.scheme
251 + "://"
252 + parsed.netloc
253 + "/".join(parsed.path.split("/")[:-1])
254 + "/"
255 )
256 last_part = parsed.path.split("/")[-1]
257 return first_part, last_part
260def format_uri_as_readable(uri: str) -> str:
261 """
262 Format a URI as human-readable text by extracting and formatting the local part.
264 Args:
265 uri: The URI to format
267 Returns:
268 Human-readable string
269 """
270 _, last_part = split_namespace(uri)
272 if last_part.islower():
273 return last_part
274 else:
275 # Convert CamelCase to space-separated words
276 words = []
277 word = ""
278 for char in last_part:
279 if char.isupper() and word:
280 words.append(word)
281 word = char
282 else:
283 word += char
284 words.append(word)
285 return " ".join(words).lower()