Coverage for heritrace / utils / filters.py: 97%

163 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-07-02 10:16 +0000

1# SPDX-FileCopyrightText: 2024-2025 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5from __future__ import annotations 

6 

7import logging 

8import threading 

9from typing import TYPE_CHECKING 

10from urllib.parse import quote, urlparse 

11 

12from dateutil import parser as dateutil_parser 

13from flask import url_for 

14from flask_babel import format_datetime, gettext, lazy_gettext 

15from pyparsing.exceptions import ParseBaseException 

16from SPARQLWrapper import JSON 

17from SPARQLWrapper.SPARQLExceptions import SPARQLWrapperException 

18 

19from heritrace.apis.orcid import format_orcid_attribution, is_orcid_url 

20from heritrace.apis.zenodo import format_zenodo_source, is_zenodo_url 

21from heritrace.sparql import ( 

22 SPARQLWrapperWithRetry, 

23 get_sparql_bindings, 

24 select_results, 

25) 

26from heritrace.utils.uri_utils import is_valid_url 

27 

28if TYPE_CHECKING: 

29 from rdflib import Dataset, Graph 

30 

31 

32class Filter: 

33 def __init__( 

34 self, context: dict, display_rules: list[dict] | None, sparql_endpoint: str 

35 ) -> None: 

36 self.context = context 

37 self.display_rules = display_rules 

38 self.sparql_endpoint = sparql_endpoint 

39 self._thread_local = threading.local() 

40 self._query_lock = threading.Lock() 

41 

42 def _get_sparql(self) -> SPARQLWrapperWithRetry: 

43 if not hasattr(self._thread_local, "sparql"): 

44 sparql = SPARQLWrapperWithRetry(self.sparql_endpoint, timeout=30.0) 

45 sparql.setReturnFormat(JSON) 

46 self._thread_local.sparql = sparql 

47 return self._thread_local.sparql 

48 

49 @staticmethod 

50 def _find_display_name_from_rule( 

51 rule: dict, 

52 predicate_uri: str, 

53 object_shape_uri: str | None, 

54 ) -> str | None: 

55 if "displayProperties" not in rule: 

56 return None 

57 for display_property in rule["displayProperties"]: 

58 prop_uri = display_property.get("property") or display_property.get( 

59 "virtual_property" 

60 ) 

61 if prop_uri == str(predicate_uri): 

62 if "displayRules" in display_property: 

63 if object_shape_uri: 

64 for display_rule in display_property["displayRules"]: 

65 if display_rule.get("shape") == object_shape_uri: 

66 return display_rule["displayName"] 

67 return display_property["displayRules"][0]["displayName"] 

68 if "displayName" in display_property: 

69 return display_property["displayName"] 

70 return None 

71 

72 def human_readable_predicate( 

73 self, 

74 predicate_uri: str, 

75 entity_key: tuple[str | None, str | None], 

76 *, 

77 is_link: bool = False, 

78 object_shape_uri: str | None = None, 

79 ) -> str: 

80 from heritrace.utils.display_rules_utils import ( # noqa: PLC0415 

81 find_matching_rule, 

82 ) 

83 

84 class_uri, shape_uri = entity_key 

85 rule = find_matching_rule(class_uri, shape_uri, self.display_rules) 

86 

87 if rule: 

88 display_name = self._find_display_name_from_rule( 

89 rule, predicate_uri, object_shape_uri 

90 ) 

91 if display_name is not None: 

92 return display_name 

93 

94 first_part, _ = split_namespace(predicate_uri) 

95 if first_part in self.context: 

96 return format_uri_as_readable(predicate_uri) 

97 if is_valid_url(predicate_uri) and is_link: 

98 href = url_for("entity.about", subject=quote(predicate_uri)) 

99 alt = gettext( 

100 "Link to the entity %(entity)s", 

101 entity=predicate_uri, 

102 ) 

103 return f"<a href='{href}' alt='{alt}'>{predicate_uri}</a>" 

104 return str(predicate_uri) 

105 

106 def human_readable_class( 

107 self, entity_key: tuple[str | None, str | None] | None 

108 ) -> str: 

109 """ 

110 Converts a class URI to human-readable format. 

111 

112 Args: 

113 entity_key (tuple): A tuple containing (class_uri, shape_uri) 

114 

115 Returns: 

116 str: Human-readable representation of the class 

117 """ 

118 from heritrace.utils.display_rules_utils import ( # noqa: PLC0415 

119 find_matching_rule, 

120 ) 

121 from heritrace.utils.shacl_utils import ( # noqa: PLC0415 

122 determine_shape_for_classes, 

123 ) 

124 

125 if entity_key is None: 

126 return "Unknown" 

127 

128 class_uri, shape_uri = entity_key 

129 

130 if class_uri is None and shape_uri is None: 

131 return "Unknown" 

132 

133 if shape_uri is None and class_uri is not None: 

134 shape_uri = determine_shape_for_classes([class_uri]) 

135 rule = find_matching_rule(class_uri, shape_uri, self.display_rules) 

136 

137 if rule and "displayName" in rule: 

138 return rule["displayName"] 

139 

140 if class_uri is None: 

141 return "Unknown" 

142 return format_uri_as_readable(class_uri) 

143 

144 def human_readable_entity( 

145 self, 

146 uri: str, 

147 entity_key: tuple[str | None, str | None], 

148 graph: Graph | Dataset | None = None, 

149 ) -> str: 

150 """Convert an entity URI to human-readable format using display rules. 

151 

152 Args: 

153 uri: The URI of the entity to format 

154 entity_key: A tuple containing (class_uri, shape_uri) 

155 graph: Optional graph to use for fetching URI display values 

156 

157 Returns: 

158 str: Human-readable representation of the entity 

159 """ 

160 from heritrace.utils.display_rules_utils import ( # noqa: PLC0415 

161 find_matching_rule, 

162 ) 

163 

164 class_uri = entity_key[0] 

165 shape_uri = entity_key[1] 

166 

167 rule = find_matching_rule(class_uri, shape_uri, self.display_rules) 

168 if not rule: 

169 return uri 

170 

171 if "fetchUriDisplay" in rule: 

172 uri_display = self.get_fetch_uri_display(uri, rule, graph) 

173 if uri_display: 

174 return uri_display 

175 

176 if "displayName" in rule: 

177 return rule["displayName"] 

178 

179 return uri 

180 

181 def get_fetch_uri_display( 

182 self, uri: str, rule: dict, graph: Graph | Dataset | None = None 

183 ) -> str | None: 

184 logger = logging.getLogger(__name__) 

185 if "fetchUriDisplay" in rule: 

186 query = rule["fetchUriDisplay"].replace("[[uri]]", f"<{uri}>") 

187 if graph is not None: 

188 try: 

189 with self._query_lock: 

190 results = graph.query(query) 

191 for row in select_results(results): 

192 return str(row[0]) 

193 except (ParseBaseException, ValueError, TypeError): 

194 logger.debug( 

195 "Failed to execute fetchUriDisplay query on graph for URI %s", 

196 uri, 

197 ) 

198 else: 

199 sparql = self._get_sparql() 

200 sparql.setQuery(query) 

201 try: 

202 bindings = get_sparql_bindings(sparql.query().convert()) 

203 if bindings: 

204 first_binding = bindings[0] 

205 first_key = next(iter(first_binding.keys())) 

206 return first_binding[first_key]["value"] 

207 except (SPARQLWrapperException, OSError, KeyError, StopIteration): 

208 logger.debug( 

209 "Failed to execute fetchUriDisplay SPARQL query for URI %s", uri 

210 ) 

211 return None 

212 

213 def human_readable_datetime(self, dt_str: str) -> str: 

214 dt = dateutil_parser.parse(dt_str) 

215 return format_datetime(dt, format="long") 

216 

217 def human_readable_primary_source(self, primary_source: str | None) -> str: 

218 if primary_source is None: 

219 return str(lazy_gettext("Unknown")) 

220 if "/prov/se" in primary_source: 

221 version_url = f"/entity-version/{primary_source.replace('/prov/se', '')}" 

222 return ( 

223 f"<a href='{version_url}'" 

224 f" alt='{lazy_gettext('Link to the primary source description')}'>" 

225 + lazy_gettext("Version") 

226 + " " 

227 + primary_source.split("/prov/se/")[-1] 

228 + "</a>" 

229 ) 

230 if is_valid_url(primary_source): 

231 alt = lazy_gettext("Link to the primary source description") 

232 return ( 

233 f"<a href='{primary_source}'" 

234 f" alt='{alt}" 

235 f" target='_blank'>" 

236 f"{primary_source}</a>" 

237 ) 

238 return primary_source 

239 

240 def format_source_reference(self, url: str) -> str: 

241 """ 

242 Format a source reference for display, handling various URL types including 

243 Zenodo DOIs and generic URLs. 

244 

245 Args: 

246 url (str): The source URL or identifier to format 

247 human_readable_primary_source (callable): Function to handle generic/unknown 

248 source types 

249 

250 Returns: 

251 str: Formatted HTML string representing the source 

252 """ 

253 if not url: 

254 return "Unknown" 

255 

256 # First check if it's a Zenodo DOI since this is more specific than a generic 

257 # URL 

258 if is_zenodo_url(url): 

259 return format_zenodo_source(url) 

260 

261 # If not Zenodo, use the provided generic handler 

262 return self.human_readable_primary_source(url) 

263 

264 def format_agent_reference(self, url: str) -> str: 

265 """ 

266 Format an agent reference for display, handling various URL types including 

267 ORCID and others. 

268 

269 Args: 

270 url (str): The agent URL or identifier to format 

271 

272 Returns: 

273 str: Formatted HTML string representing the agent 

274 """ 

275 if not url: 

276 return "Unknown" 

277 

278 if is_orcid_url(url): 

279 return format_orcid_attribution(url) 

280 

281 # For now, just return a simple linked version for other URLs 

282 if is_valid_url(url): 

283 return f'<a href="{url}" target="_blank">{url}</a>' 

284 

285 # If it's not a URL at all, just return the raw value 

286 return url 

287 

288 

289def split_namespace(uri: str) -> tuple[str, str]: 

290 """ 

291 Split a URI into namespace and local part. 

292 

293 Args: 

294 uri: The URI to split 

295 

296 Returns: 

297 Tuple of (namespace, local_part) 

298 """ 

299 parsed = urlparse(uri) 

300 if parsed.fragment: 

301 first_part = parsed.scheme + "://" + parsed.netloc + parsed.path + "#" 

302 last_part = parsed.fragment 

303 else: 

304 first_part = ( 

305 parsed.scheme 

306 + "://" 

307 + parsed.netloc 

308 + "/".join(parsed.path.split("/")[:-1]) 

309 + "/" 

310 ) 

311 last_part = parsed.path.split("/")[-1] 

312 return first_part, last_part 

313 

314 

315def format_uri_as_readable(uri: str) -> str: 

316 """ 

317 Format a URI as human-readable text by extracting and formatting the local part. 

318 

319 Args: 

320 uri: The URI to format 

321 

322 Returns: 

323 Human-readable string 

324 """ 

325 _, last_part = split_namespace(uri) 

326 

327 if last_part.islower(): 

328 return last_part 

329 # Convert CamelCase to space-separated words 

330 words = [] 

331 word = "" 

332 for char in last_part: 

333 if char.isupper() and word: 

334 words.append(word) 

335 word = char 

336 else: 

337 word += char 

338 words.append(word) 

339 return " ".join(words).lower()