Coverage for heritrace/utils/filters.py: 99%

137 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-10-13 17:12 +0000

1from __future__ import annotations 

2 

3import threading 

4from typing import Tuple 

5from urllib.parse import quote, urlparse 

6 

7import dateutil 

8import validators 

9from flask import url_for 

10from flask_babel import format_datetime, gettext, lazy_gettext 

11from heritrace.apis.orcid import format_orcid_attribution, is_orcid_url 

12from heritrace.apis.zenodo import format_zenodo_source, is_zenodo_url 

13from rdflib import ConjunctiveGraph, Graph 

14from SPARQLWrapper import JSON 

15 

16 

17class Filter: 

18 def __init__(self, context: dict, display_rules: dict, sparql_endpoint: str): 

19 from heritrace.extensions import get_sparql 

20 

21 self.context = context 

22 self.display_rules = display_rules 

23 self.sparql = get_sparql() 

24 self.sparql.setReturnFormat(JSON) 

25 self._query_lock = threading.Lock() 

26 

27 def human_readable_predicate(self, predicate_uri: str, entity_key: tuple[str, str], is_link=False, object_shape_uri: str = None): 

28 """Get human readable label for a predicate in the context of an entity. 

29  

30 Args: 

31 predicate_uri: URI of the predicate to get label for 

32 entity_key: Tuple of (class_uri, shape_uri) for the entity context 

33 is_link: Whether to format as a link 

34 object_shape_uri: Shape URI of the object entity (for shape-specific display rules) 

35  

36 Returns: 

37 str: Human readable label for the predicate 

38 """ 

39 from heritrace.utils.display_rules_utils import find_matching_rule 

40 

41 class_uri, shape_uri = entity_key 

42 rule = find_matching_rule(class_uri, shape_uri, self.display_rules) 

43 

44 if rule: 

45 if "displayProperties" in rule: 

46 for display_property in rule["displayProperties"]: 

47 prop_uri = display_property.get("property") or display_property.get("virtual_property") 

48 if prop_uri == str(predicate_uri): 

49 if "displayRules" in display_property: 

50 if object_shape_uri: 

51 for display_rule in display_property["displayRules"]: 

52 if display_rule.get("shape") == object_shape_uri: 

53 return display_rule["displayName"] 

54 return display_property["displayRules"][0]["displayName"] 

55 elif "displayName" in display_property: 

56 return display_property["displayName"] 

57 

58 first_part, _ = split_namespace(predicate_uri) 

59 if first_part in self.context: 

60 return format_uri_as_readable(predicate_uri) 

61 elif validators.url(predicate_uri) and is_link: 

62 return f"<a href='{url_for('entity.about', subject=quote(predicate_uri))}' alt='{gettext('Link to the entity %(entity)s', entity=predicate_uri)}'>{predicate_uri}</a>" 

63 else: 

64 return str(predicate_uri) 

65 

66 def human_readable_class(self, entity_key): 

67 """ 

68 Converts a class URI to human-readable format. 

69 

70 Args: 

71 entity_key (tuple): A tuple containing (class_uri, shape_uri) 

72 

73 Returns: 

74 str: Human-readable representation of the class 

75 """ 

76 from heritrace.utils.display_rules_utils import find_matching_rule 

77 from heritrace.utils.shacl_utils import determine_shape_for_classes 

78 

79 class_uri, shape_uri = entity_key 

80 

81 if class_uri is None and shape_uri is None: 

82 return "Unknown" 

83 

84 if shape_uri is None: 

85 shape_uri = determine_shape_for_classes([class_uri]) 

86 rule = find_matching_rule(class_uri, shape_uri, self.display_rules) 

87 

88 if rule and "displayName" in rule: 

89 return rule["displayName"] 

90 

91 return format_uri_as_readable(class_uri) 

92 

93 def human_readable_entity( 

94 self, uri: str, entity_key: tuple[str, str | None], graph: Graph | ConjunctiveGraph = None 

95 ) -> str: 

96 """Convert an entity URI to human-readable format using display rules. 

97  

98 Args: 

99 uri: The URI of the entity to format 

100 entity_key: A tuple containing (class_uri, shape_uri) 

101 graph: Optional graph to use for fetching URI display values 

102  

103 Returns: 

104 str: Human-readable representation of the entity 

105 """ 

106 from heritrace.utils.display_rules_utils import find_matching_rule 

107 

108 class_uri = entity_key[0] 

109 shape_uri = entity_key[1] 

110 

111 rule = find_matching_rule(class_uri, shape_uri, self.display_rules) 

112 if not rule: 

113 return uri 

114 

115 if "fetchUriDisplay" in rule: 

116 uri_display = self.get_fetch_uri_display(uri, rule, graph) 

117 if uri_display: 

118 return uri_display 

119 

120 if "displayName" in rule: 

121 return rule["displayName"] 

122 

123 return uri 

124 

125 def get_fetch_uri_display( 

126 self, uri: str, rule: dict, graph: Graph | ConjunctiveGraph = None 

127 ) -> str | None: 

128 """Get a display value for an entity URI using fetchUriDisplay rules. 

129  

130 Args: 

131 uri: The URI to get a display value for 

132 rule: The display rule containing the fetchUriDisplay query 

133 graph: Optional graph to use for fetching URI display values 

134  

135 Returns: 

136 str | None: The display value if found, None otherwise 

137 """ 

138 if "fetchUriDisplay" in rule: 

139 query = rule["fetchUriDisplay"].replace("[[uri]]", f"<{uri}>") 

140 if graph is not None: 

141 try: 

142 with self._query_lock: 

143 results = graph.query(query) 

144 for row in results: 

145 return str(row[0]) 

146 except Exception as e: 

147 print( 

148 f"Error executing fetchUriDisplay query: {e}. {query}" 

149 ) 

150 else: 

151 self.sparql.setQuery(query) 

152 try: 

153 results = self.sparql.query().convert() 

154 if results["results"]["bindings"]: 

155 first_binding = results["results"]["bindings"][0] 

156 first_key = list(first_binding.keys())[0] 

157 return first_binding[first_key]["value"] 

158 except Exception as e: 

159 print(f"Error executing fetchUriDisplay query: {e}") 

160 return None 

161 

162 def human_readable_datetime(self, dt_str): 

163 dt = dateutil.parser.parse(dt_str) 

164 return format_datetime(dt, format="long") 

165 

166 

167 def human_readable_primary_source(self, primary_source: str | None) -> str: 

168 if primary_source is None: 

169 return lazy_gettext("Unknown") 

170 if "/prov/se" in primary_source: 

171 version_url = f"/entity-version/{primary_source.replace('/prov/se', '')}" 

172 return ( 

173 f"<a href='{version_url}' alt='{lazy_gettext('Link to the primary source description')}'>" 

174 + lazy_gettext("Version") 

175 + " " 

176 + primary_source.split("/prov/se/")[-1] 

177 + "</a>" 

178 ) 

179 else: 

180 if validators.url(primary_source): 

181 return f"<a href='{primary_source}' alt='{lazy_gettext('Link to the primary source description')} target='_blank'>{primary_source}</a>" 

182 else: 

183 return primary_source 

184 

185 def format_source_reference(self, url: str) -> str: 

186 """ 

187 Format a source reference for display, handling various URL types including Zenodo DOIs and generic URLs. 

188 

189 Args: 

190 url (str): The source URL or identifier to format 

191 human_readable_primary_source (callable): Function to handle generic/unknown source types 

192 

193 Returns: 

194 str: Formatted HTML string representing the source 

195 """ 

196 if not url: 

197 return "Unknown" 

198 

199 # First check if it's a Zenodo DOI since this is more specific than a generic URL 

200 if is_zenodo_url(url): 

201 return format_zenodo_source(url) 

202 

203 # If not Zenodo, use the provided generic handler 

204 return self.human_readable_primary_source(url) 

205 

206 def format_agent_reference(self, url: str) -> str: 

207 """ 

208 Format an agent reference for display, handling various URL types including ORCID and others. 

209 

210 Args: 

211 url (str): The agent URL or identifier to format 

212 

213 Returns: 

214 str: Formatted HTML string representing the agent 

215 """ 

216 if not url: 

217 return "Unknown" 

218 

219 if is_orcid_url(url): 

220 return format_orcid_attribution(url) 

221 

222 # For now, just return a simple linked version for other URLs 

223 if validators.url(url): 

224 return f'<a href="{url}" target="_blank">{url}</a>' 

225 

226 # If it's not a URL at all, just return the raw value 

227 return url 

228 

229 

230def split_namespace(uri: str) -> Tuple[str, str]: 

231 """ 

232 Split a URI into namespace and local part. 

233 

234 Args: 

235 uri: The URI to split 

236 

237 Returns: 

238 Tuple of (namespace, local_part) 

239 """ 

240 parsed = urlparse(uri) 

241 if parsed.fragment: 

242 first_part = parsed.scheme + "://" + parsed.netloc + parsed.path + "#" 

243 last_part = parsed.fragment 

244 else: 

245 first_part = ( 

246 parsed.scheme 

247 + "://" 

248 + parsed.netloc 

249 + "/".join(parsed.path.split("/")[:-1]) 

250 + "/" 

251 ) 

252 last_part = parsed.path.split("/")[-1] 

253 return first_part, last_part 

254 

255 

256def format_uri_as_readable(uri: str) -> str: 

257 """ 

258 Format a URI as human-readable text by extracting and formatting the local part. 

259  

260 Args: 

261 uri: The URI to format 

262  

263 Returns: 

264 Human-readable string 

265 """ 

266 _, last_part = split_namespace(uri) 

267 

268 if last_part.islower(): 

269 return last_part 

270 else: 

271 # Convert CamelCase to space-separated words 

272 words = [] 

273 word = "" 

274 for char in last_part: 

275 if char.isupper() and word: 

276 words.append(word) 

277 word = char 

278 else: 

279 word += char 

280 words.append(word) 

281 return " ".join(words).lower()