Coverage for heritrace/utils/filters.py: 100%

129 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-06-24 11:39 +0000

1from __future__ import annotations 

2 

3import threading 

4from typing import Tuple 

5from urllib.parse import quote, urlparse 

6 

7import dateutil 

8import validators 

9from flask import url_for 

10from flask_babel import format_datetime, gettext, lazy_gettext 

11from heritrace.apis.orcid import format_orcid_attribution, is_orcid_url 

12from heritrace.apis.zenodo import format_zenodo_source, is_zenodo_url 

13from rdflib import ConjunctiveGraph, Graph 

14from SPARQLWrapper import JSON 

15 

16 

17class Filter: 

18 def __init__(self, context: dict, display_rules: dict, sparql_endpoint: str): 

19 from heritrace.extensions import get_sparql 

20 

21 self.context = context 

22 self.display_rules = display_rules 

23 self.sparql = get_sparql() 

24 self.sparql.setReturnFormat(JSON) 

25 self._query_lock = threading.Lock() 

26 

27 def human_readable_predicate(self, predicate_uri: str, entity_key: tuple[str, str], is_link=False): 

28 """Get human readable label for a predicate in the context of an entity. 

29  

30 Args: 

31 predicate_uri: URI of the predicate to get label for 

32 entity_key: Tuple of (class_uri, shape_uri) for the entity context 

33 is_link: Whether to format as a link 

34  

35 Returns: 

36 str: Human readable label for the predicate 

37 """ 

38 from heritrace.utils.display_rules_utils import find_matching_rule 

39 

40 class_uri, shape_uri = entity_key 

41 rule = find_matching_rule(class_uri, shape_uri, self.display_rules) 

42 

43 if rule: 

44 if "displayProperties" in rule: 

45 for display_property in rule["displayProperties"]: 

46 if display_property["property"] == str(predicate_uri): 

47 if "displayRules" in display_property: 

48 return display_property["displayRules"][0]["displayName"] 

49 elif "displayName" in display_property: 

50 return display_property["displayName"] 

51 

52 return self._format_uri_as_readable(predicate_uri, is_link) 

53 

54 def _format_uri_as_readable(self, uri, is_link=False): 

55 """ 

56 Format an URI in a human-readable way. 

57  

58 This is a common fallback method used when no specific display rules are found. 

59  

60 Args: 

61 uri (str): The URI to format 

62 is_link (bool): Whether to generate a hyperlink for the URI 

63  

64 Returns: 

65 str: Human-readable representation of the URI 

66 """ 

67 first_part, last_part = self.split_ns(uri) 

68 if first_part in self.context: 

69 if last_part.islower(): 

70 return last_part 

71 else: 

72 words = [] 

73 word = "" 

74 for char in last_part: 

75 if char.isupper() and word: 

76 words.append(word) 

77 word = char 

78 else: 

79 word += char 

80 words.append(word) 

81 return " ".join(words).lower() 

82 elif validators.url(uri) and is_link: 

83 return f"<a href='{url_for('entity.about', subject=quote(uri))}' alt='{gettext('Link to the entity %(entity)s', entity=uri)}'>{uri}</a>" 

84 else: 

85 return str(uri) 

86 

87 def human_readable_class(self, entity_key): 

88 """ 

89 Converts a class URI to human-readable format. 

90  

91 Args: 

92 entity_key (tuple): A tuple containing (class_uri, shape_uri) 

93  

94 Returns: 

95 str: Human-readable representation of the class 

96 """ 

97 from heritrace.utils.display_rules_utils import find_matching_rule 

98 from heritrace.utils.shacl_utils import determine_shape_for_classes 

99 

100 class_uri, shape_uri = entity_key 

101 if shape_uri is None: 

102 shape_uri = determine_shape_for_classes([class_uri]) 

103 rule = find_matching_rule(class_uri, shape_uri, self.display_rules) 

104 

105 if rule and "displayName" in rule: 

106 return rule["displayName"] 

107 

108 return self._format_uri_as_readable(class_uri) 

109 

110 def human_readable_entity( 

111 self, uri: str, entity_key: tuple[str, str | None], graph: Graph | ConjunctiveGraph = None 

112 ) -> str: 

113 """Convert an entity URI to human-readable format using display rules. 

114  

115 Args: 

116 uri: The URI of the entity to format 

117 entity_key: A tuple containing (class_uri, shape_uri) 

118 graph: Optional graph to use for fetching URI display values 

119  

120 Returns: 

121 str: Human-readable representation of the entity 

122 """ 

123 from heritrace.utils.display_rules_utils import find_matching_rule 

124 

125 class_uri = entity_key[0] 

126 shape_uri = entity_key[1] 

127 

128 rule = find_matching_rule(class_uri, shape_uri, self.display_rules) 

129 if not rule: 

130 return uri 

131 

132 if "fetchUriDisplay" in rule: 

133 uri_display = self.get_fetch_uri_display(uri, rule, graph) 

134 if uri_display: 

135 return uri_display 

136 

137 if "displayName" in rule: 

138 return rule["displayName"] 

139 

140 return uri 

141 

142 def get_fetch_uri_display( 

143 self, uri: str, rule: dict, graph: Graph | ConjunctiveGraph = None 

144 ) -> str | None: 

145 """Get a display value for an entity URI using fetchUriDisplay rules. 

146  

147 Args: 

148 uri: The URI to get a display value for 

149 rule: The display rule containing the fetchUriDisplay query 

150 graph: Optional graph to use for fetching URI display values 

151  

152 Returns: 

153 str | None: The display value if found, None otherwise 

154 """ 

155 if "fetchUriDisplay" in rule: 

156 query = rule["fetchUriDisplay"].replace("[[uri]]", f"<{uri}>") 

157 if graph is not None: 

158 try: 

159 with self._query_lock: 

160 results = graph.query(query) 

161 for row in results: 

162 return str(row[0]) 

163 except Exception as e: 

164 print( 

165 f"Error executing fetchUriDisplay query: {e}. {query}" 

166 ) 

167 else: 

168 self.sparql.setQuery(query) 

169 try: 

170 results = self.sparql.query().convert() 

171 if results["results"]["bindings"]: 

172 first_binding = results["results"]["bindings"][0] 

173 first_key = list(first_binding.keys())[0] 

174 return first_binding[first_key]["value"] 

175 except Exception as e: 

176 print(f"Error executing fetchUriDisplay query: {e}") 

177 return None 

178 

179 def human_readable_datetime(self, dt_str): 

180 dt = dateutil.parser.parse(dt_str) 

181 return format_datetime(dt, format="long") 

182 

183 def split_ns(self, ns: str) -> Tuple[str, str]: 

184 parsed = urlparse(ns) 

185 if parsed.fragment: 

186 first_part = parsed.scheme + "://" + parsed.netloc + parsed.path + "#" 

187 last_part = parsed.fragment 

188 else: 

189 first_part = ( 

190 parsed.scheme 

191 + "://" 

192 + parsed.netloc 

193 + "/".join(parsed.path.split("/")[:-1]) 

194 + "/" 

195 ) 

196 last_part = parsed.path.split("/")[-1] 

197 return first_part, last_part 

198 

199 def human_readable_primary_source(self, primary_source: str | None) -> str: 

200 if primary_source is None: 

201 return lazy_gettext("Unknown") 

202 if "/prov/se" in primary_source: 

203 version_url = f"/entity-version/{primary_source.replace('/prov/se', '')}" 

204 return ( 

205 f"<a href='{version_url}' alt='{lazy_gettext('Link to the primary source description')}'>" 

206 + lazy_gettext("Version") 

207 + " " 

208 + primary_source.split("/prov/se/")[-1] 

209 + "</a>" 

210 ) 

211 else: 

212 if validators.url(primary_source): 

213 return f"<a href='{primary_source}' alt='{lazy_gettext('Link to the primary source description')} target='_blank'>{primary_source}</a>" 

214 else: 

215 return primary_source 

216 

217 def format_source_reference(self, url: str) -> str: 

218 """ 

219 Format a source reference for display, handling various URL types including Zenodo DOIs and generic URLs. 

220 

221 Args: 

222 url (str): The source URL or identifier to format 

223 human_readable_primary_source (callable): Function to handle generic/unknown source types 

224 

225 Returns: 

226 str: Formatted HTML string representing the source 

227 """ 

228 if not url: 

229 return "Unknown" 

230 

231 # First check if it's a Zenodo DOI since this is more specific than a generic URL 

232 if is_zenodo_url(url): 

233 return format_zenodo_source(url) 

234 

235 # If not Zenodo, use the provided generic handler 

236 return self.human_readable_primary_source(url) 

237 

238 def format_agent_reference(self, url: str) -> str: 

239 """ 

240 Format an agent reference for display, handling various URL types including ORCID and others. 

241 

242 Args: 

243 url (str): The agent URL or identifier to format 

244 

245 Returns: 

246 str: Formatted HTML string representing the agent 

247 """ 

248 if not url: 

249 return "Unknown" 

250 

251 if is_orcid_url(url): 

252 return format_orcid_attribution(url) 

253 

254 # For now, just return a simple linked version for other URLs 

255 if validators.url(url): 

256 return f'<a href="{url}" target="_blank">{url}</a>' 

257 

258 # If it's not a URL at all, just return the raw value 

259 return url