Coverage for heritrace/utils/filters.py: 99%
137 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-10-13 17:12 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-10-13 17:12 +0000
1from __future__ import annotations
3import threading
4from typing import Tuple
5from urllib.parse import quote, urlparse
7import dateutil
8import validators
9from flask import url_for
10from flask_babel import format_datetime, gettext, lazy_gettext
11from heritrace.apis.orcid import format_orcid_attribution, is_orcid_url
12from heritrace.apis.zenodo import format_zenodo_source, is_zenodo_url
13from rdflib import ConjunctiveGraph, Graph
14from SPARQLWrapper import JSON
17class Filter:
18 def __init__(self, context: dict, display_rules: dict, sparql_endpoint: str):
19 from heritrace.extensions import get_sparql
21 self.context = context
22 self.display_rules = display_rules
23 self.sparql = get_sparql()
24 self.sparql.setReturnFormat(JSON)
25 self._query_lock = threading.Lock()
27 def human_readable_predicate(self, predicate_uri: str, entity_key: tuple[str, str], is_link=False, object_shape_uri: str = None):
28 """Get human readable label for a predicate in the context of an entity.
30 Args:
31 predicate_uri: URI of the predicate to get label for
32 entity_key: Tuple of (class_uri, shape_uri) for the entity context
33 is_link: Whether to format as a link
34 object_shape_uri: Shape URI of the object entity (for shape-specific display rules)
36 Returns:
37 str: Human readable label for the predicate
38 """
39 from heritrace.utils.display_rules_utils import find_matching_rule
41 class_uri, shape_uri = entity_key
42 rule = find_matching_rule(class_uri, shape_uri, self.display_rules)
44 if rule:
45 if "displayProperties" in rule:
46 for display_property in rule["displayProperties"]:
47 prop_uri = display_property.get("property") or display_property.get("virtual_property")
48 if prop_uri == str(predicate_uri):
49 if "displayRules" in display_property:
50 if object_shape_uri:
51 for display_rule in display_property["displayRules"]:
52 if display_rule.get("shape") == object_shape_uri:
53 return display_rule["displayName"]
54 return display_property["displayRules"][0]["displayName"]
55 elif "displayName" in display_property:
56 return display_property["displayName"]
58 first_part, _ = split_namespace(predicate_uri)
59 if first_part in self.context:
60 return format_uri_as_readable(predicate_uri)
61 elif validators.url(predicate_uri) and is_link:
62 return f"<a href='{url_for('entity.about', subject=quote(predicate_uri))}' alt='{gettext('Link to the entity %(entity)s', entity=predicate_uri)}'>{predicate_uri}</a>"
63 else:
64 return str(predicate_uri)
66 def human_readable_class(self, entity_key):
67 """
68 Converts a class URI to human-readable format.
70 Args:
71 entity_key (tuple): A tuple containing (class_uri, shape_uri)
73 Returns:
74 str: Human-readable representation of the class
75 """
76 from heritrace.utils.display_rules_utils import find_matching_rule
77 from heritrace.utils.shacl_utils import determine_shape_for_classes
79 class_uri, shape_uri = entity_key
81 if class_uri is None and shape_uri is None:
82 return "Unknown"
84 if shape_uri is None:
85 shape_uri = determine_shape_for_classes([class_uri])
86 rule = find_matching_rule(class_uri, shape_uri, self.display_rules)
88 if rule and "displayName" in rule:
89 return rule["displayName"]
91 return format_uri_as_readable(class_uri)
93 def human_readable_entity(
94 self, uri: str, entity_key: tuple[str, str | None], graph: Graph | ConjunctiveGraph = None
95 ) -> str:
96 """Convert an entity URI to human-readable format using display rules.
98 Args:
99 uri: The URI of the entity to format
100 entity_key: A tuple containing (class_uri, shape_uri)
101 graph: Optional graph to use for fetching URI display values
103 Returns:
104 str: Human-readable representation of the entity
105 """
106 from heritrace.utils.display_rules_utils import find_matching_rule
108 class_uri = entity_key[0]
109 shape_uri = entity_key[1]
111 rule = find_matching_rule(class_uri, shape_uri, self.display_rules)
112 if not rule:
113 return uri
115 if "fetchUriDisplay" in rule:
116 uri_display = self.get_fetch_uri_display(uri, rule, graph)
117 if uri_display:
118 return uri_display
120 if "displayName" in rule:
121 return rule["displayName"]
123 return uri
125 def get_fetch_uri_display(
126 self, uri: str, rule: dict, graph: Graph | ConjunctiveGraph = None
127 ) -> str | None:
128 """Get a display value for an entity URI using fetchUriDisplay rules.
130 Args:
131 uri: The URI to get a display value for
132 rule: The display rule containing the fetchUriDisplay query
133 graph: Optional graph to use for fetching URI display values
135 Returns:
136 str | None: The display value if found, None otherwise
137 """
138 if "fetchUriDisplay" in rule:
139 query = rule["fetchUriDisplay"].replace("[[uri]]", f"<{uri}>")
140 if graph is not None:
141 try:
142 with self._query_lock:
143 results = graph.query(query)
144 for row in results:
145 return str(row[0])
146 except Exception as e:
147 print(
148 f"Error executing fetchUriDisplay query: {e}. {query}"
149 )
150 else:
151 self.sparql.setQuery(query)
152 try:
153 results = self.sparql.query().convert()
154 if results["results"]["bindings"]:
155 first_binding = results["results"]["bindings"][0]
156 first_key = list(first_binding.keys())[0]
157 return first_binding[first_key]["value"]
158 except Exception as e:
159 print(f"Error executing fetchUriDisplay query: {e}")
160 return None
162 def human_readable_datetime(self, dt_str):
163 dt = dateutil.parser.parse(dt_str)
164 return format_datetime(dt, format="long")
167 def human_readable_primary_source(self, primary_source: str | None) -> str:
168 if primary_source is None:
169 return lazy_gettext("Unknown")
170 if "/prov/se" in primary_source:
171 version_url = f"/entity-version/{primary_source.replace('/prov/se', '')}"
172 return (
173 f"<a href='{version_url}' alt='{lazy_gettext('Link to the primary source description')}'>"
174 + lazy_gettext("Version")
175 + " "
176 + primary_source.split("/prov/se/")[-1]
177 + "</a>"
178 )
179 else:
180 if validators.url(primary_source):
181 return f"<a href='{primary_source}' alt='{lazy_gettext('Link to the primary source description')} target='_blank'>{primary_source}</a>"
182 else:
183 return primary_source
185 def format_source_reference(self, url: str) -> str:
186 """
187 Format a source reference for display, handling various URL types including Zenodo DOIs and generic URLs.
189 Args:
190 url (str): The source URL or identifier to format
191 human_readable_primary_source (callable): Function to handle generic/unknown source types
193 Returns:
194 str: Formatted HTML string representing the source
195 """
196 if not url:
197 return "Unknown"
199 # First check if it's a Zenodo DOI since this is more specific than a generic URL
200 if is_zenodo_url(url):
201 return format_zenodo_source(url)
203 # If not Zenodo, use the provided generic handler
204 return self.human_readable_primary_source(url)
206 def format_agent_reference(self, url: str) -> str:
207 """
208 Format an agent reference for display, handling various URL types including ORCID and others.
210 Args:
211 url (str): The agent URL or identifier to format
213 Returns:
214 str: Formatted HTML string representing the agent
215 """
216 if not url:
217 return "Unknown"
219 if is_orcid_url(url):
220 return format_orcid_attribution(url)
222 # For now, just return a simple linked version for other URLs
223 if validators.url(url):
224 return f'<a href="{url}" target="_blank">{url}</a>'
226 # If it's not a URL at all, just return the raw value
227 return url
230def split_namespace(uri: str) -> Tuple[str, str]:
231 """
232 Split a URI into namespace and local part.
234 Args:
235 uri: The URI to split
237 Returns:
238 Tuple of (namespace, local_part)
239 """
240 parsed = urlparse(uri)
241 if parsed.fragment:
242 first_part = parsed.scheme + "://" + parsed.netloc + parsed.path + "#"
243 last_part = parsed.fragment
244 else:
245 first_part = (
246 parsed.scheme
247 + "://"
248 + parsed.netloc
249 + "/".join(parsed.path.split("/")[:-1])
250 + "/"
251 )
252 last_part = parsed.path.split("/")[-1]
253 return first_part, last_part
256def format_uri_as_readable(uri: str) -> str:
257 """
258 Format a URI as human-readable text by extracting and formatting the local part.
260 Args:
261 uri: The URI to format
263 Returns:
264 Human-readable string
265 """
266 _, last_part = split_namespace(uri)
268 if last_part.islower():
269 return last_part
270 else:
271 # Convert CamelCase to space-separated words
272 words = []
273 word = ""
274 for char in last_part:
275 if char.isupper() and word:
276 words.append(word)
277 word = char
278 else:
279 word += char
280 words.append(word)
281 return " ".join(words).lower()