Coverage for heritrace/utils/filters.py: 100%
113 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-18 11:10 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-18 11:10 +0000
1from __future__ import annotations
3import threading
4from typing import Tuple
5from urllib.parse import quote, urlparse
7import dateutil
8import validators
9from flask import url_for
10from flask_babel import format_datetime, gettext, lazy_gettext
11from heritrace.apis.orcid import format_orcid_attribution, is_orcid_url
12from heritrace.apis.zenodo import format_zenodo_source, is_zenodo_url
13from rdflib import ConjunctiveGraph, Graph
14from SPARQLWrapper import JSON, SPARQLWrapper
17class Filter:
18 def __init__(self, context: dict, display_rules: dict, sparql_endpoint: str):
19 self.context = context
20 self.display_rules = display_rules
21 self.sparql = SPARQLWrapper(sparql_endpoint)
22 self.sparql.setReturnFormat(JSON)
23 self._query_lock = threading.Lock()
25 def human_readable_predicate(
26 self, url: str, entity_classes: list, is_link: bool = True
27 ):
28 subject_classes = [str(subject_class) for subject_class in entity_classes]
29 if self.display_rules:
30 for display_rule in self.display_rules:
31 for subject_class in subject_classes:
32 if subject_class == display_rule["class"]:
33 if url == subject_class:
34 return display_rule["displayName"]
36 # Check if displayProperties exists before iterating
37 if "displayProperties" in display_rule:
38 for display_property in display_rule["displayProperties"]:
39 if display_property["property"] == str(url):
40 if "displayRules" in display_property:
41 # Se ci sono displayRules, restituisci il primo displayName trovato
42 return display_property["displayRules"][0][
43 "displayName"
44 ]
45 elif "displayName" in display_property:
46 # Se non ci sono displayRules ma c'è un displayName, restituiscilo
47 return display_property["displayName"]
48 # If displayProperties is missing or property not found within it,
49 # the loop continues to the next rule or falls through to default logic.
51 # Se non è stato trovato un displayName nelle regole di visualizzazione,
52 # procedi con la logica originale
53 first_part, last_part = self.split_ns(url)
54 if first_part in self.context:
55 if last_part.islower():
56 return last_part
57 else:
58 words = []
59 word = ""
60 for char in last_part:
61 if char.isupper() and word:
62 words.append(word)
63 word = char
64 else:
65 word += char
66 words.append(word)
67 return " ".join(words).lower()
68 elif validators.url(url) and is_link:
69 return f"<a href='{url_for('entity.about', subject=quote(url))}' alt='{gettext('Link to the entity %(entity)s', entity=url)}'>{url}</a>"
70 else:
71 return url
73 def human_readable_entity(
74 self, uri: str, entity_classes: list, graph: Graph | ConjunctiveGraph = None
75 ) -> str:
76 subject_classes = [str(subject_class) for subject_class in entity_classes]
78 # Cerca prima una configurazione fetchUriDisplay
79 uri_display = self.get_fetch_uri_display(uri, subject_classes, graph)
80 if uri_display:
81 return uri_display
83 # Se non trova nulla, restituisce l'URI originale
84 return uri
86 def get_fetch_uri_display(
87 self, uri: str, entity_classes: list, graph: Graph | ConjunctiveGraph = None
88 ) -> str | None:
89 for entity_class in entity_classes:
90 for rule in self.display_rules:
91 if rule["class"] == entity_class and "fetchUriDisplay" in rule:
92 query = rule["fetchUriDisplay"].replace("[[uri]]", f"<{uri}>")
93 if graph is not None:
94 try:
95 with self._query_lock:
96 results = graph.query(query)
97 for row in results:
98 return str(row[0])
99 except Exception as e:
100 print(
101 f"Error executing fetchUriDisplay query: {e}. {query}"
102 )
103 else:
104 self.sparql.setQuery(query)
105 try:
106 results = self.sparql.query().convert()
107 if results["results"]["bindings"]:
108 # Prendi il primo binding e il primo (e unico) valore
109 first_binding = results["results"]["bindings"][0]
110 first_key = list(first_binding.keys())[0]
111 return first_binding[first_key]["value"]
112 except Exception as e:
113 print(f"Error executing fetchUriDisplay query: {e}")
114 return None
116 def human_readable_datetime(self, dt_str):
117 dt = dateutil.parser.parse(dt_str)
118 return format_datetime(dt, format="long")
120 def split_ns(self, ns: str) -> Tuple[str, str]:
121 parsed = urlparse(ns)
122 if parsed.fragment:
123 first_part = parsed.scheme + "://" + parsed.netloc + parsed.path + "#"
124 last_part = parsed.fragment
125 else:
126 first_part = (
127 parsed.scheme
128 + "://"
129 + parsed.netloc
130 + "/".join(parsed.path.split("/")[:-1])
131 + "/"
132 )
133 last_part = parsed.path.split("/")[-1]
134 return first_part, last_part
136 def human_readable_primary_source(self, primary_source: str | None) -> str:
137 if primary_source is None:
138 return lazy_gettext("Unknown")
139 if "/prov/se" in primary_source:
140 version_url = f"/entity-version/{primary_source.replace('/prov/se', '')}"
141 return (
142 f"<a href='{version_url}' alt='{lazy_gettext('Link to the primary source description')}'>"
143 + lazy_gettext("Version")
144 + " "
145 + primary_source.split("/prov/se/")[-1]
146 + "</a>"
147 )
148 else:
149 if validators.url(primary_source):
150 return f"<a href='{primary_source}' alt='{lazy_gettext('Link to the primary source description')} target='_blank'>{primary_source}</a>"
151 else:
152 return primary_source
154 def format_source_reference(self, url: str) -> str:
155 """
156 Format a source reference for display, handling various URL types including Zenodo DOIs and generic URLs.
158 Args:
159 url (str): The source URL or identifier to format
160 human_readable_primary_source (callable): Function to handle generic/unknown source types
162 Returns:
163 str: Formatted HTML string representing the source
164 """
165 if not url:
166 return "Unknown"
168 # First check if it's a Zenodo DOI since this is more specific than a generic URL
169 if is_zenodo_url(url):
170 return format_zenodo_source(url)
172 # If not Zenodo, use the provided generic handler
173 return self.human_readable_primary_source(url)
175 def format_agent_reference(self, url: str) -> str:
176 """
177 Format an agent reference for display, handling various URL types including ORCID and others.
179 Args:
180 url (str): The agent URL or identifier to format
182 Returns:
183 str: Formatted HTML string representing the agent
184 """
185 if not url:
186 return "Unknown"
188 if is_orcid_url(url):
189 return format_orcid_attribution(url)
191 # For now, just return a simple linked version for other URLs
192 if validators.url(url):
193 return f'<a href="{url}" target="_blank">{url}</a>'
195 # If it's not a URL at all, just return the raw value
196 return url