Coverage for heritrace/utils/filters.py: 100%

113 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-18 11:10 +0000

1from __future__ import annotations 

2 

3import threading 

4from typing import Tuple 

5from urllib.parse import quote, urlparse 

6 

7import dateutil 

8import validators 

9from flask import url_for 

10from flask_babel import format_datetime, gettext, lazy_gettext 

11from heritrace.apis.orcid import format_orcid_attribution, is_orcid_url 

12from heritrace.apis.zenodo import format_zenodo_source, is_zenodo_url 

13from rdflib import ConjunctiveGraph, Graph 

14from SPARQLWrapper import JSON, SPARQLWrapper 

15 

16 

17class Filter: 

18 def __init__(self, context: dict, display_rules: dict, sparql_endpoint: str): 

19 self.context = context 

20 self.display_rules = display_rules 

21 self.sparql = SPARQLWrapper(sparql_endpoint) 

22 self.sparql.setReturnFormat(JSON) 

23 self._query_lock = threading.Lock() 

24 

25 def human_readable_predicate( 

26 self, url: str, entity_classes: list, is_link: bool = True 

27 ): 

28 subject_classes = [str(subject_class) for subject_class in entity_classes] 

29 if self.display_rules: 

30 for display_rule in self.display_rules: 

31 for subject_class in subject_classes: 

32 if subject_class == display_rule["class"]: 

33 if url == subject_class: 

34 return display_rule["displayName"] 

35 

36 # Check if displayProperties exists before iterating 

37 if "displayProperties" in display_rule: 

38 for display_property in display_rule["displayProperties"]: 

39 if display_property["property"] == str(url): 

40 if "displayRules" in display_property: 

41 # Se ci sono displayRules, restituisci il primo displayName trovato 

42 return display_property["displayRules"][0][ 

43 "displayName" 

44 ] 

45 elif "displayName" in display_property: 

46 # Se non ci sono displayRules ma c'è un displayName, restituiscilo 

47 return display_property["displayName"] 

48 # If displayProperties is missing or property not found within it,  

49 # the loop continues to the next rule or falls through to default logic. 

50 

51 # Se non è stato trovato un displayName nelle regole di visualizzazione, 

52 # procedi con la logica originale 

53 first_part, last_part = self.split_ns(url) 

54 if first_part in self.context: 

55 if last_part.islower(): 

56 return last_part 

57 else: 

58 words = [] 

59 word = "" 

60 for char in last_part: 

61 if char.isupper() and word: 

62 words.append(word) 

63 word = char 

64 else: 

65 word += char 

66 words.append(word) 

67 return " ".join(words).lower() 

68 elif validators.url(url) and is_link: 

69 return f"<a href='{url_for('entity.about', subject=quote(url))}' alt='{gettext('Link to the entity %(entity)s', entity=url)}'>{url}</a>" 

70 else: 

71 return url 

72 

73 def human_readable_entity( 

74 self, uri: str, entity_classes: list, graph: Graph | ConjunctiveGraph = None 

75 ) -> str: 

76 subject_classes = [str(subject_class) for subject_class in entity_classes] 

77 

78 # Cerca prima una configurazione fetchUriDisplay 

79 uri_display = self.get_fetch_uri_display(uri, subject_classes, graph) 

80 if uri_display: 

81 return uri_display 

82 

83 # Se non trova nulla, restituisce l'URI originale 

84 return uri 

85 

86 def get_fetch_uri_display( 

87 self, uri: str, entity_classes: list, graph: Graph | ConjunctiveGraph = None 

88 ) -> str | None: 

89 for entity_class in entity_classes: 

90 for rule in self.display_rules: 

91 if rule["class"] == entity_class and "fetchUriDisplay" in rule: 

92 query = rule["fetchUriDisplay"].replace("[[uri]]", f"<{uri}>") 

93 if graph is not None: 

94 try: 

95 with self._query_lock: 

96 results = graph.query(query) 

97 for row in results: 

98 return str(row[0]) 

99 except Exception as e: 

100 print( 

101 f"Error executing fetchUriDisplay query: {e}. {query}" 

102 ) 

103 else: 

104 self.sparql.setQuery(query) 

105 try: 

106 results = self.sparql.query().convert() 

107 if results["results"]["bindings"]: 

108 # Prendi il primo binding e il primo (e unico) valore 

109 first_binding = results["results"]["bindings"][0] 

110 first_key = list(first_binding.keys())[0] 

111 return first_binding[first_key]["value"] 

112 except Exception as e: 

113 print(f"Error executing fetchUriDisplay query: {e}") 

114 return None 

115 

116 def human_readable_datetime(self, dt_str): 

117 dt = dateutil.parser.parse(dt_str) 

118 return format_datetime(dt, format="long") 

119 

120 def split_ns(self, ns: str) -> Tuple[str, str]: 

121 parsed = urlparse(ns) 

122 if parsed.fragment: 

123 first_part = parsed.scheme + "://" + parsed.netloc + parsed.path + "#" 

124 last_part = parsed.fragment 

125 else: 

126 first_part = ( 

127 parsed.scheme 

128 + "://" 

129 + parsed.netloc 

130 + "/".join(parsed.path.split("/")[:-1]) 

131 + "/" 

132 ) 

133 last_part = parsed.path.split("/")[-1] 

134 return first_part, last_part 

135 

136 def human_readable_primary_source(self, primary_source: str | None) -> str: 

137 if primary_source is None: 

138 return lazy_gettext("Unknown") 

139 if "/prov/se" in primary_source: 

140 version_url = f"/entity-version/{primary_source.replace('/prov/se', '')}" 

141 return ( 

142 f"<a href='{version_url}' alt='{lazy_gettext('Link to the primary source description')}'>" 

143 + lazy_gettext("Version") 

144 + " " 

145 + primary_source.split("/prov/se/")[-1] 

146 + "</a>" 

147 ) 

148 else: 

149 if validators.url(primary_source): 

150 return f"<a href='{primary_source}' alt='{lazy_gettext('Link to the primary source description')} target='_blank'>{primary_source}</a>" 

151 else: 

152 return primary_source 

153 

154 def format_source_reference(self, url: str) -> str: 

155 """ 

156 Format a source reference for display, handling various URL types including Zenodo DOIs and generic URLs. 

157 

158 Args: 

159 url (str): The source URL or identifier to format 

160 human_readable_primary_source (callable): Function to handle generic/unknown source types 

161 

162 Returns: 

163 str: Formatted HTML string representing the source 

164 """ 

165 if not url: 

166 return "Unknown" 

167 

168 # First check if it's a Zenodo DOI since this is more specific than a generic URL 

169 if is_zenodo_url(url): 

170 return format_zenodo_source(url) 

171 

172 # If not Zenodo, use the provided generic handler 

173 return self.human_readable_primary_source(url) 

174 

175 def format_agent_reference(self, url: str) -> str: 

176 """ 

177 Format an agent reference for display, handling various URL types including ORCID and others. 

178 

179 Args: 

180 url (str): The agent URL or identifier to format 

181 

182 Returns: 

183 str: Formatted HTML string representing the agent 

184 """ 

185 if not url: 

186 return "Unknown" 

187 

188 if is_orcid_url(url): 

189 return format_orcid_attribution(url) 

190 

191 # For now, just return a simple linked version for other URLs 

192 if validators.url(url): 

193 return f'<a href="{url}" target="_blank">{url}</a>' 

194 

195 # If it's not a URL at all, just return the raw value 

196 return url