Coverage for heritrace/utils/filters.py: 100%

134 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-08-01 22:12 +0000

1from __future__ import annotations 

2 

3import threading 

4from typing import Tuple 

5from urllib.parse import quote, urlparse 

6 

7import dateutil 

8import validators 

9from flask import url_for 

10from flask_babel import format_datetime, gettext, lazy_gettext 

11from heritrace.apis.orcid import format_orcid_attribution, is_orcid_url 

12from heritrace.apis.zenodo import format_zenodo_source, is_zenodo_url 

13from rdflib import ConjunctiveGraph, Graph 

14from SPARQLWrapper import JSON 

15 

16 

17class Filter: 

18 def __init__(self, context: dict, display_rules: dict, sparql_endpoint: str): 

19 from heritrace.extensions import get_sparql 

20 

21 self.context = context 

22 self.display_rules = display_rules 

23 self.sparql = get_sparql() 

24 self.sparql.setReturnFormat(JSON) 

25 self._query_lock = threading.Lock() 

26 

27 def human_readable_predicate(self, predicate_uri: str, entity_key: tuple[str, str], is_link=False, object_shape_uri: str = None): 

28 """Get human readable label for a predicate in the context of an entity. 

29  

30 Args: 

31 predicate_uri: URI of the predicate to get label for 

32 entity_key: Tuple of (class_uri, shape_uri) for the entity context 

33 is_link: Whether to format as a link 

34 object_shape_uri: Shape URI of the object entity (for shape-specific display rules) 

35  

36 Returns: 

37 str: Human readable label for the predicate 

38 """ 

39 from heritrace.utils.display_rules_utils import find_matching_rule 

40 

41 class_uri, shape_uri = entity_key 

42 rule = find_matching_rule(class_uri, shape_uri, self.display_rules) 

43 

44 if rule: 

45 if "displayProperties" in rule: 

46 for display_property in rule["displayProperties"]: 

47 if display_property["property"] == str(predicate_uri): 

48 if "displayRules" in display_property: 

49 if object_shape_uri: 

50 for display_rule in display_property["displayRules"]: 

51 if display_rule.get("shape") == object_shape_uri: 

52 return display_rule["displayName"] 

53 return display_property["displayRules"][0]["displayName"] 

54 elif "displayName" in display_property: 

55 return display_property["displayName"] 

56 

57 first_part, _ = split_namespace(predicate_uri) 

58 if first_part in self.context: 

59 return format_uri_as_readable(predicate_uri) 

60 elif validators.url(predicate_uri) and is_link: 

61 return f"<a href='{url_for('entity.about', subject=quote(predicate_uri))}' alt='{gettext('Link to the entity %(entity)s', entity=predicate_uri)}'>{predicate_uri}</a>" 

62 else: 

63 return str(predicate_uri) 

64 

65 def human_readable_class(self, entity_key): 

66 """ 

67 Converts a class URI to human-readable format. 

68  

69 Args: 

70 entity_key (tuple): A tuple containing (class_uri, shape_uri) 

71  

72 Returns: 

73 str: Human-readable representation of the class 

74 """ 

75 from heritrace.utils.display_rules_utils import find_matching_rule 

76 from heritrace.utils.shacl_utils import determine_shape_for_classes 

77 

78 class_uri, shape_uri = entity_key 

79 if shape_uri is None: 

80 shape_uri = determine_shape_for_classes([class_uri]) 

81 rule = find_matching_rule(class_uri, shape_uri, self.display_rules) 

82 

83 if rule and "displayName" in rule: 

84 return rule["displayName"] 

85 

86 return format_uri_as_readable(class_uri) 

87 

88 def human_readable_entity( 

89 self, uri: str, entity_key: tuple[str, str | None], graph: Graph | ConjunctiveGraph = None 

90 ) -> str: 

91 """Convert an entity URI to human-readable format using display rules. 

92  

93 Args: 

94 uri: The URI of the entity to format 

95 entity_key: A tuple containing (class_uri, shape_uri) 

96 graph: Optional graph to use for fetching URI display values 

97  

98 Returns: 

99 str: Human-readable representation of the entity 

100 """ 

101 from heritrace.utils.display_rules_utils import find_matching_rule 

102 

103 class_uri = entity_key[0] 

104 shape_uri = entity_key[1] 

105 

106 rule = find_matching_rule(class_uri, shape_uri, self.display_rules) 

107 if not rule: 

108 return uri 

109 

110 if "fetchUriDisplay" in rule: 

111 uri_display = self.get_fetch_uri_display(uri, rule, graph) 

112 if uri_display: 

113 return uri_display 

114 

115 if "displayName" in rule: 

116 return rule["displayName"] 

117 

118 return uri 

119 

120 def get_fetch_uri_display( 

121 self, uri: str, rule: dict, graph: Graph | ConjunctiveGraph = None 

122 ) -> str | None: 

123 """Get a display value for an entity URI using fetchUriDisplay rules. 

124  

125 Args: 

126 uri: The URI to get a display value for 

127 rule: The display rule containing the fetchUriDisplay query 

128 graph: Optional graph to use for fetching URI display values 

129  

130 Returns: 

131 str | None: The display value if found, None otherwise 

132 """ 

133 if "fetchUriDisplay" in rule: 

134 query = rule["fetchUriDisplay"].replace("[[uri]]", f"<{uri}>") 

135 if graph is not None: 

136 try: 

137 with self._query_lock: 

138 results = graph.query(query) 

139 for row in results: 

140 return str(row[0]) 

141 except Exception as e: 

142 print( 

143 f"Error executing fetchUriDisplay query: {e}. {query}" 

144 ) 

145 else: 

146 self.sparql.setQuery(query) 

147 try: 

148 results = self.sparql.query().convert() 

149 if results["results"]["bindings"]: 

150 first_binding = results["results"]["bindings"][0] 

151 first_key = list(first_binding.keys())[0] 

152 return first_binding[first_key]["value"] 

153 except Exception as e: 

154 print(f"Error executing fetchUriDisplay query: {e}") 

155 return None 

156 

157 def human_readable_datetime(self, dt_str): 

158 dt = dateutil.parser.parse(dt_str) 

159 return format_datetime(dt, format="long") 

160 

161 

162 def human_readable_primary_source(self, primary_source: str | None) -> str: 

163 if primary_source is None: 

164 return lazy_gettext("Unknown") 

165 if "/prov/se" in primary_source: 

166 version_url = f"/entity-version/{primary_source.replace('/prov/se', '')}" 

167 return ( 

168 f"<a href='{version_url}' alt='{lazy_gettext('Link to the primary source description')}'>" 

169 + lazy_gettext("Version") 

170 + " " 

171 + primary_source.split("/prov/se/")[-1] 

172 + "</a>" 

173 ) 

174 else: 

175 if validators.url(primary_source): 

176 return f"<a href='{primary_source}' alt='{lazy_gettext('Link to the primary source description')} target='_blank'>{primary_source}</a>" 

177 else: 

178 return primary_source 

179 

180 def format_source_reference(self, url: str) -> str: 

181 """ 

182 Format a source reference for display, handling various URL types including Zenodo DOIs and generic URLs. 

183 

184 Args: 

185 url (str): The source URL or identifier to format 

186 human_readable_primary_source (callable): Function to handle generic/unknown source types 

187 

188 Returns: 

189 str: Formatted HTML string representing the source 

190 """ 

191 if not url: 

192 return "Unknown" 

193 

194 # First check if it's a Zenodo DOI since this is more specific than a generic URL 

195 if is_zenodo_url(url): 

196 return format_zenodo_source(url) 

197 

198 # If not Zenodo, use the provided generic handler 

199 return self.human_readable_primary_source(url) 

200 

201 def format_agent_reference(self, url: str) -> str: 

202 """ 

203 Format an agent reference for display, handling various URL types including ORCID and others. 

204 

205 Args: 

206 url (str): The agent URL or identifier to format 

207 

208 Returns: 

209 str: Formatted HTML string representing the agent 

210 """ 

211 if not url: 

212 return "Unknown" 

213 

214 if is_orcid_url(url): 

215 return format_orcid_attribution(url) 

216 

217 # For now, just return a simple linked version for other URLs 

218 if validators.url(url): 

219 return f'<a href="{url}" target="_blank">{url}</a>' 

220 

221 # If it's not a URL at all, just return the raw value 

222 return url 

223 

224 

225def split_namespace(uri: str) -> Tuple[str, str]: 

226 """ 

227 Split a URI into namespace and local part. 

228  

229 Args: 

230 uri: The URI to split 

231  

232 Returns: 

233 Tuple of (namespace, local_part) 

234 """ 

235 parsed = urlparse(uri) 

236 if parsed.fragment: 

237 first_part = parsed.scheme + "://" + parsed.netloc + parsed.path + "#" 

238 last_part = parsed.fragment 

239 else: 

240 first_part = ( 

241 parsed.scheme 

242 + "://" 

243 + parsed.netloc 

244 + "/".join(parsed.path.split("/")[:-1]) 

245 + "/" 

246 ) 

247 last_part = parsed.path.split("/")[-1] 

248 return first_part, last_part 

249 

250 

251def format_uri_as_readable(uri: str) -> str: 

252 """ 

253 Format a URI as human-readable text by extracting and formatting the local part. 

254  

255 Args: 

256 uri: The URI to format 

257  

258 Returns: 

259 Human-readable string 

260 """ 

261 _, last_part = split_namespace(uri) 

262 

263 if last_part.islower(): 

264 return last_part 

265 else: 

266 # Convert CamelCase to space-separated words 

267 words = [] 

268 word = "" 

269 for char in last_part: 

270 if char.isupper() and word: 

271 words.append(word) 

272 word = char 

273 else: 

274 word += char 

275 words.append(word) 

276 return " ".join(words).lower()