Coverage for heritrace / utils / filters.py: 97%
163 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-07-02 10:16 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-07-02 10:16 +0000
1# SPDX-FileCopyrightText: 2024-2025 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5from __future__ import annotations
7import logging
8import threading
9from typing import TYPE_CHECKING
10from urllib.parse import quote, urlparse
12from dateutil import parser as dateutil_parser
13from flask import url_for
14from flask_babel import format_datetime, gettext, lazy_gettext
15from pyparsing.exceptions import ParseBaseException
16from SPARQLWrapper import JSON
17from SPARQLWrapper.SPARQLExceptions import SPARQLWrapperException
19from heritrace.apis.orcid import format_orcid_attribution, is_orcid_url
20from heritrace.apis.zenodo import format_zenodo_source, is_zenodo_url
21from heritrace.sparql import (
22 SPARQLWrapperWithRetry,
23 get_sparql_bindings,
24 select_results,
25)
26from heritrace.utils.uri_utils import is_valid_url
28if TYPE_CHECKING:
29 from rdflib import Dataset, Graph
32class Filter:
33 def __init__(
34 self, context: dict, display_rules: list[dict] | None, sparql_endpoint: str
35 ) -> None:
36 self.context = context
37 self.display_rules = display_rules
38 self.sparql_endpoint = sparql_endpoint
39 self._thread_local = threading.local()
40 self._query_lock = threading.Lock()
42 def _get_sparql(self) -> SPARQLWrapperWithRetry:
43 if not hasattr(self._thread_local, "sparql"):
44 sparql = SPARQLWrapperWithRetry(self.sparql_endpoint, timeout=30.0)
45 sparql.setReturnFormat(JSON)
46 self._thread_local.sparql = sparql
47 return self._thread_local.sparql
49 @staticmethod
50 def _find_display_name_from_rule(
51 rule: dict,
52 predicate_uri: str,
53 object_shape_uri: str | None,
54 ) -> str | None:
55 if "displayProperties" not in rule:
56 return None
57 for display_property in rule["displayProperties"]:
58 prop_uri = display_property.get("property") or display_property.get(
59 "virtual_property"
60 )
61 if prop_uri == str(predicate_uri):
62 if "displayRules" in display_property:
63 if object_shape_uri:
64 for display_rule in display_property["displayRules"]:
65 if display_rule.get("shape") == object_shape_uri:
66 return display_rule["displayName"]
67 return display_property["displayRules"][0]["displayName"]
68 if "displayName" in display_property:
69 return display_property["displayName"]
70 return None
72 def human_readable_predicate(
73 self,
74 predicate_uri: str,
75 entity_key: tuple[str | None, str | None],
76 *,
77 is_link: bool = False,
78 object_shape_uri: str | None = None,
79 ) -> str:
80 from heritrace.utils.display_rules_utils import ( # noqa: PLC0415
81 find_matching_rule,
82 )
84 class_uri, shape_uri = entity_key
85 rule = find_matching_rule(class_uri, shape_uri, self.display_rules)
87 if rule:
88 display_name = self._find_display_name_from_rule(
89 rule, predicate_uri, object_shape_uri
90 )
91 if display_name is not None:
92 return display_name
94 first_part, _ = split_namespace(predicate_uri)
95 if first_part in self.context:
96 return format_uri_as_readable(predicate_uri)
97 if is_valid_url(predicate_uri) and is_link:
98 href = url_for("entity.about", subject=quote(predicate_uri))
99 alt = gettext(
100 "Link to the entity %(entity)s",
101 entity=predicate_uri,
102 )
103 return f"<a href='{href}' alt='{alt}'>{predicate_uri}</a>"
104 return str(predicate_uri)
106 def human_readable_class(
107 self, entity_key: tuple[str | None, str | None] | None
108 ) -> str:
109 """
110 Converts a class URI to human-readable format.
112 Args:
113 entity_key (tuple): A tuple containing (class_uri, shape_uri)
115 Returns:
116 str: Human-readable representation of the class
117 """
118 from heritrace.utils.display_rules_utils import ( # noqa: PLC0415
119 find_matching_rule,
120 )
121 from heritrace.utils.shacl_utils import ( # noqa: PLC0415
122 determine_shape_for_classes,
123 )
125 if entity_key is None:
126 return "Unknown"
128 class_uri, shape_uri = entity_key
130 if class_uri is None and shape_uri is None:
131 return "Unknown"
133 if shape_uri is None and class_uri is not None:
134 shape_uri = determine_shape_for_classes([class_uri])
135 rule = find_matching_rule(class_uri, shape_uri, self.display_rules)
137 if rule and "displayName" in rule:
138 return rule["displayName"]
140 if class_uri is None:
141 return "Unknown"
142 return format_uri_as_readable(class_uri)
144 def human_readable_entity(
145 self,
146 uri: str,
147 entity_key: tuple[str | None, str | None],
148 graph: Graph | Dataset | None = None,
149 ) -> str:
150 """Convert an entity URI to human-readable format using display rules.
152 Args:
153 uri: The URI of the entity to format
154 entity_key: A tuple containing (class_uri, shape_uri)
155 graph: Optional graph to use for fetching URI display values
157 Returns:
158 str: Human-readable representation of the entity
159 """
160 from heritrace.utils.display_rules_utils import ( # noqa: PLC0415
161 find_matching_rule,
162 )
164 class_uri = entity_key[0]
165 shape_uri = entity_key[1]
167 rule = find_matching_rule(class_uri, shape_uri, self.display_rules)
168 if not rule:
169 return uri
171 if "fetchUriDisplay" in rule:
172 uri_display = self.get_fetch_uri_display(uri, rule, graph)
173 if uri_display:
174 return uri_display
176 if "displayName" in rule:
177 return rule["displayName"]
179 return uri
181 def get_fetch_uri_display(
182 self, uri: str, rule: dict, graph: Graph | Dataset | None = None
183 ) -> str | None:
184 logger = logging.getLogger(__name__)
185 if "fetchUriDisplay" in rule:
186 query = rule["fetchUriDisplay"].replace("[[uri]]", f"<{uri}>")
187 if graph is not None:
188 try:
189 with self._query_lock:
190 results = graph.query(query)
191 for row in select_results(results):
192 return str(row[0])
193 except (ParseBaseException, ValueError, TypeError):
194 logger.debug(
195 "Failed to execute fetchUriDisplay query on graph for URI %s",
196 uri,
197 )
198 else:
199 sparql = self._get_sparql()
200 sparql.setQuery(query)
201 try:
202 bindings = get_sparql_bindings(sparql.query().convert())
203 if bindings:
204 first_binding = bindings[0]
205 first_key = next(iter(first_binding.keys()))
206 return first_binding[first_key]["value"]
207 except (SPARQLWrapperException, OSError, KeyError, StopIteration):
208 logger.debug(
209 "Failed to execute fetchUriDisplay SPARQL query for URI %s", uri
210 )
211 return None
213 def human_readable_datetime(self, dt_str: str) -> str:
214 dt = dateutil_parser.parse(dt_str)
215 return format_datetime(dt, format="long")
217 def human_readable_primary_source(self, primary_source: str | None) -> str:
218 if primary_source is None:
219 return str(lazy_gettext("Unknown"))
220 if "/prov/se" in primary_source:
221 version_url = f"/entity-version/{primary_source.replace('/prov/se', '')}"
222 return (
223 f"<a href='{version_url}'"
224 f" alt='{lazy_gettext('Link to the primary source description')}'>"
225 + lazy_gettext("Version")
226 + " "
227 + primary_source.split("/prov/se/")[-1]
228 + "</a>"
229 )
230 if is_valid_url(primary_source):
231 alt = lazy_gettext("Link to the primary source description")
232 return (
233 f"<a href='{primary_source}'"
234 f" alt='{alt}"
235 f" target='_blank'>"
236 f"{primary_source}</a>"
237 )
238 return primary_source
240 def format_source_reference(self, url: str) -> str:
241 """
242 Format a source reference for display, handling various URL types including
243 Zenodo DOIs and generic URLs.
245 Args:
246 url (str): The source URL or identifier to format
247 human_readable_primary_source (callable): Function to handle generic/unknown
248 source types
250 Returns:
251 str: Formatted HTML string representing the source
252 """
253 if not url:
254 return "Unknown"
256 # First check if it's a Zenodo DOI since this is more specific than a generic
257 # URL
258 if is_zenodo_url(url):
259 return format_zenodo_source(url)
261 # If not Zenodo, use the provided generic handler
262 return self.human_readable_primary_source(url)
264 def format_agent_reference(self, url: str) -> str:
265 """
266 Format an agent reference for display, handling various URL types including
267 ORCID and others.
269 Args:
270 url (str): The agent URL or identifier to format
272 Returns:
273 str: Formatted HTML string representing the agent
274 """
275 if not url:
276 return "Unknown"
278 if is_orcid_url(url):
279 return format_orcid_attribution(url)
281 # For now, just return a simple linked version for other URLs
282 if is_valid_url(url):
283 return f'<a href="{url}" target="_blank">{url}</a>'
285 # If it's not a URL at all, just return the raw value
286 return url
289def split_namespace(uri: str) -> tuple[str, str]:
290 """
291 Split a URI into namespace and local part.
293 Args:
294 uri: The URI to split
296 Returns:
297 Tuple of (namespace, local_part)
298 """
299 parsed = urlparse(uri)
300 if parsed.fragment:
301 first_part = parsed.scheme + "://" + parsed.netloc + parsed.path + "#"
302 last_part = parsed.fragment
303 else:
304 first_part = (
305 parsed.scheme
306 + "://"
307 + parsed.netloc
308 + "/".join(parsed.path.split("/")[:-1])
309 + "/"
310 )
311 last_part = parsed.path.split("/")[-1]
312 return first_part, last_part
315def format_uri_as_readable(uri: str) -> str:
316 """
317 Format a URI as human-readable text by extracting and formatting the local part.
319 Args:
320 uri: The URI to format
322 Returns:
323 Human-readable string
324 """
325 _, last_part = split_namespace(uri)
327 if last_part.islower():
328 return last_part
329 # Convert CamelCase to space-separated words
330 words = []
331 word = ""
332 for char in last_part:
333 if char.isupper() and word:
334 words.append(word)
335 word = char
336 else:
337 word += char
338 words.append(word)
339 return " ".join(words).lower()