Coverage for heritrace / utils / filters.py: 99%

137 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-21 12:56 +0000

1# SPDX-FileCopyrightText: 2024-2025 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5from __future__ import annotations 

6 

7import threading 

8from typing import Tuple 

9from urllib.parse import quote, urlparse 

10 

11import dateutil 

12import validators 

13from flask import url_for 

14from flask_babel import format_datetime, gettext, lazy_gettext 

15from heritrace.apis.orcid import format_orcid_attribution, is_orcid_url 

16from heritrace.apis.zenodo import format_zenodo_source, is_zenodo_url 

17from rdflib import Dataset, Graph 

18from SPARQLWrapper import JSON 

19 

20 

21class Filter: 

22 def __init__(self, context: dict, display_rules: dict, sparql_endpoint: str): 

23 from heritrace.extensions import get_sparql 

24 

25 self.context = context 

26 self.display_rules = display_rules 

27 self.sparql = get_sparql() 

28 self.sparql.setReturnFormat(JSON) 

29 self._query_lock = threading.Lock() 

30 

31 def human_readable_predicate(self, predicate_uri: str, entity_key: tuple[str, str], is_link=False, object_shape_uri: str = None): 

32 """Get human readable label for a predicate in the context of an entity. 

33  

34 Args: 

35 predicate_uri: URI of the predicate to get label for 

36 entity_key: Tuple of (class_uri, shape_uri) for the entity context 

37 is_link: Whether to format as a link 

38 object_shape_uri: Shape URI of the object entity (for shape-specific display rules) 

39  

40 Returns: 

41 str: Human readable label for the predicate 

42 """ 

43 from heritrace.utils.display_rules_utils import find_matching_rule 

44 

45 class_uri, shape_uri = entity_key 

46 rule = find_matching_rule(class_uri, shape_uri, self.display_rules) 

47 

48 if rule: 

49 if "displayProperties" in rule: 

50 for display_property in rule["displayProperties"]: 

51 prop_uri = display_property.get("property") or display_property.get("virtual_property") 

52 if prop_uri == str(predicate_uri): 

53 if "displayRules" in display_property: 

54 if object_shape_uri: 

55 for display_rule in display_property["displayRules"]: 

56 if display_rule.get("shape") == object_shape_uri: 

57 return display_rule["displayName"] 

58 return display_property["displayRules"][0]["displayName"] 

59 elif "displayName" in display_property: 

60 return display_property["displayName"] 

61 

62 first_part, _ = split_namespace(predicate_uri) 

63 if first_part in self.context: 

64 return format_uri_as_readable(predicate_uri) 

65 elif validators.url(predicate_uri) and is_link: 

66 return f"<a href='{url_for('entity.about', subject=quote(predicate_uri))}' alt='{gettext('Link to the entity %(entity)s', entity=predicate_uri)}'>{predicate_uri}</a>" 

67 else: 

68 return str(predicate_uri) 

69 

70 def human_readable_class(self, entity_key): 

71 """ 

72 Converts a class URI to human-readable format. 

73 

74 Args: 

75 entity_key (tuple): A tuple containing (class_uri, shape_uri) 

76 

77 Returns: 

78 str: Human-readable representation of the class 

79 """ 

80 from heritrace.utils.display_rules_utils import find_matching_rule 

81 from heritrace.utils.shacl_utils import determine_shape_for_classes 

82 

83 class_uri, shape_uri = entity_key 

84 

85 if class_uri is None and shape_uri is None: 

86 return "Unknown" 

87 

88 if shape_uri is None: 

89 shape_uri = determine_shape_for_classes([class_uri]) 

90 rule = find_matching_rule(class_uri, shape_uri, self.display_rules) 

91 

92 if rule and "displayName" in rule: 

93 return rule["displayName"] 

94 

95 return format_uri_as_readable(class_uri) 

96 

97 def human_readable_entity( 

98 self, uri: str, entity_key: tuple[str, str | None], graph: Graph | Dataset = None 

99 ) -> str: 

100 """Convert an entity URI to human-readable format using display rules. 

101  

102 Args: 

103 uri: The URI of the entity to format 

104 entity_key: A tuple containing (class_uri, shape_uri) 

105 graph: Optional graph to use for fetching URI display values 

106  

107 Returns: 

108 str: Human-readable representation of the entity 

109 """ 

110 from heritrace.utils.display_rules_utils import find_matching_rule 

111 

112 class_uri = entity_key[0] 

113 shape_uri = entity_key[1] 

114 

115 rule = find_matching_rule(class_uri, shape_uri, self.display_rules) 

116 if not rule: 

117 return uri 

118 

119 if "fetchUriDisplay" in rule: 

120 uri_display = self.get_fetch_uri_display(uri, rule, graph) 

121 if uri_display: 

122 return uri_display 

123 

124 if "displayName" in rule: 

125 return rule["displayName"] 

126 

127 return uri 

128 

129 def get_fetch_uri_display( 

130 self, uri: str, rule: dict, graph: Graph | Dataset = None 

131 ) -> str | None: 

132 """Get a display value for an entity URI using fetchUriDisplay rules. 

133  

134 Args: 

135 uri: The URI to get a display value for 

136 rule: The display rule containing the fetchUriDisplay query 

137 graph: Optional graph to use for fetching URI display values 

138  

139 Returns: 

140 str | None: The display value if found, None otherwise 

141 """ 

142 if "fetchUriDisplay" in rule: 

143 query = rule["fetchUriDisplay"].replace("[[uri]]", f"<{uri}>") 

144 if graph is not None: 

145 try: 

146 with self._query_lock: 

147 results = graph.query(query) 

148 for row in results: 

149 return str(row[0]) 

150 except Exception as e: 

151 print( 

152 f"Error executing fetchUriDisplay query: {e}. {query}" 

153 ) 

154 else: 

155 self.sparql.setQuery(query) 

156 try: 

157 results = self.sparql.query().convert() 

158 if results["results"]["bindings"]: 

159 first_binding = results["results"]["bindings"][0] 

160 first_key = list(first_binding.keys())[0] 

161 return first_binding[first_key]["value"] 

162 except Exception as e: 

163 print(f"Error executing fetchUriDisplay query: {e}") 

164 return None 

165 

166 def human_readable_datetime(self, dt_str): 

167 dt = dateutil.parser.parse(dt_str) 

168 return format_datetime(dt, format="long") 

169 

170 

171 def human_readable_primary_source(self, primary_source: str | None) -> str: 

172 if primary_source is None: 

173 return lazy_gettext("Unknown") 

174 if "/prov/se" in primary_source: 

175 version_url = f"/entity-version/{primary_source.replace('/prov/se', '')}" 

176 return ( 

177 f"<a href='{version_url}' alt='{lazy_gettext('Link to the primary source description')}'>" 

178 + lazy_gettext("Version") 

179 + " " 

180 + primary_source.split("/prov/se/")[-1] 

181 + "</a>" 

182 ) 

183 else: 

184 if validators.url(primary_source): 

185 return f"<a href='{primary_source}' alt='{lazy_gettext('Link to the primary source description')} target='_blank'>{primary_source}</a>" 

186 else: 

187 return primary_source 

188 

189 def format_source_reference(self, url: str) -> str: 

190 """ 

191 Format a source reference for display, handling various URL types including Zenodo DOIs and generic URLs. 

192 

193 Args: 

194 url (str): The source URL or identifier to format 

195 human_readable_primary_source (callable): Function to handle generic/unknown source types 

196 

197 Returns: 

198 str: Formatted HTML string representing the source 

199 """ 

200 if not url: 

201 return "Unknown" 

202 

203 # First check if it's a Zenodo DOI since this is more specific than a generic URL 

204 if is_zenodo_url(url): 

205 return format_zenodo_source(url) 

206 

207 # If not Zenodo, use the provided generic handler 

208 return self.human_readable_primary_source(url) 

209 

210 def format_agent_reference(self, url: str) -> str: 

211 """ 

212 Format an agent reference for display, handling various URL types including ORCID and others. 

213 

214 Args: 

215 url (str): The agent URL or identifier to format 

216 

217 Returns: 

218 str: Formatted HTML string representing the agent 

219 """ 

220 if not url: 

221 return "Unknown" 

222 

223 if is_orcid_url(url): 

224 return format_orcid_attribution(url) 

225 

226 # For now, just return a simple linked version for other URLs 

227 if validators.url(url): 

228 return f'<a href="{url}" target="_blank">{url}</a>' 

229 

230 # If it's not a URL at all, just return the raw value 

231 return url 

232 

233 

234def split_namespace(uri: str) -> Tuple[str, str]: 

235 """ 

236 Split a URI into namespace and local part. 

237 

238 Args: 

239 uri: The URI to split 

240 

241 Returns: 

242 Tuple of (namespace, local_part) 

243 """ 

244 parsed = urlparse(uri) 

245 if parsed.fragment: 

246 first_part = parsed.scheme + "://" + parsed.netloc + parsed.path + "#" 

247 last_part = parsed.fragment 

248 else: 

249 first_part = ( 

250 parsed.scheme 

251 + "://" 

252 + parsed.netloc 

253 + "/".join(parsed.path.split("/")[:-1]) 

254 + "/" 

255 ) 

256 last_part = parsed.path.split("/")[-1] 

257 return first_part, last_part 

258 

259 

260def format_uri_as_readable(uri: str) -> str: 

261 """ 

262 Format a URI as human-readable text by extracting and formatting the local part. 

263  

264 Args: 

265 uri: The URI to format 

266  

267 Returns: 

268 Human-readable string 

269 """ 

270 _, last_part = split_namespace(uri) 

271 

272 if last_part.islower(): 

273 return last_part 

274 else: 

275 # Convert CamelCase to space-separated words 

276 words = [] 

277 word = "" 

278 for char in last_part: 

279 if char.isupper() and word: 

280 words.append(word) 

281 word = char 

282 else: 

283 word += char 

284 words.append(word) 

285 return " ".join(words).lower()