Coverage for lode / reader / config_manager.py: 79%

121 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-03-25 15:05 +0000

1# config_magarer.py - STRATEGIE CON CONFIG UNIFICATA 

2 

3import yaml 

4from pathlib import Path 

5from abc import ABC, abstractmethod 

6from rdflib import URIRef, Graph, Node 

7from lode.models import * 

8 

9class ConfigManager(ABC): 

10 """Strategia basata su config YAML unico""" 

11 

12 def __init__(self): 

13 self.config = self._load_config() 

14 self._type_mapping_cache = None 

15 self._property_mapping_cache = None 

16 

17 @abstractmethod 

18 def create_logic(self, graph: Graph, cache: dict): 

19 """Factory method per creare Logic specifica""" 

20 pass 

21 

22 @property 

23 @abstractmethod 

24 def config_name(self) -> str: 

25 """Nome del file config specifico: 'owl' | 'skos' | 'rdfs'""" 

26 pass 

27 

28 def _load_config(self) -> dict: 

29 config_dir = Path(__file__).parent / 'config' 

30 

31 with open(config_dir / 'base.yaml') as f: 

32 base = yaml.safe_load(f) 

33 

34 with open(config_dir / f'{self.config_name}.yaml') as f: 

35 specific = yaml.safe_load(f) 

36 

37 return self._deep_merge(base, specific) 

38 

39 def _deep_merge(self, base: dict, override: dict) -> dict: 

40 result = base.copy() 

41 for key, value in override.items(): 

42 if key in ('name', 'inherits'): 

43 continue 

44 if key in result and isinstance(result[key], dict) and isinstance(value, dict): 

45 result[key] = self._deep_merge(result[key], value) 

46 else: 

47 result[key] = value 

48 return result 

49 

50 # def _load_config(self) -> dict: 

51 # """Carica config da file YAML unico""" 

52 # config_path = Path(__file__).parent / 'config' / 'base.yaml' 

53 

54 # with open(config_path, 'r') as f: 

55 # config = yaml.safe_load(f) 

56 

57 # return config 

58 

59 @abstractmethod 

60 def create_viewer(self, reader): 

61 """Factory method per creare Viewer specifico""" 

62 pass 

63 

64 # ========== NAMESPACE MAP ========== 

65 NAMESPACES = { 

66 'owl': 'http://www.w3.org/2002/07/owl#', 

67 'rdfs': 'http://www.w3.org/2000/01/rdf-schema#', 

68 'rdf': 'http://www.w3.org/1999/02/22-rdf-syntax-ns#', 

69 'skos': 'http://www.w3.org/2004/02/skos/core#' 

70 } 

71 

72 # ========== CLASS MAP ========== 

73 CLASSES = { 

74 'Concept': Concept, 

75 'Property': Property, 

76 'Relation': Relation, 

77 'Attribute': Attribute, 

78 'Annotation': Annotation, 

79 'Restriction': Restriction, 

80 'Quantifier': Quantifier, 

81 'Cardinality': Cardinality, 

82 'TruthFunction': TruthFunction, 

83 'PropertyConceptRestriction': PropertyConceptRestriction, 

84 'DatatypeRestriction': DatatypeRestriction, 

85 'Container': Container, 

86 'OneOf': OneOf, 

87 'Value': Value, 

88 'Model': Model, 

89 'Collection': Collection, 

90 'Statement': Statement, 

91 'Datatype': Datatype, 

92 'Literal': Literal, 

93 'Resource': Resource, 

94 'Individual': Individual 

95 } 

96 

97 def _parse_uri(self, uri_str: str) -> URIRef: 

98 """owl:Class -> URIRef""" 

99 if ':' not in uri_str: 

100 return URIRef(uri_str) 

101 

102 prefix, local = uri_str.split(':', 1) 

103 return URIRef(self.NAMESPACES[prefix] + local) 

104 

105 def _parse_class(self, class_name: str): 

106 """'Concept' -> Concept class""" 

107 return self.CLASSES[class_name] 

108 

109 def _parse_value(self, value): 

110 """Parse valore config""" 

111 if value == 'Literal': 

112 return 'Literal' 

113 elif value is True or value is False: 

114 return value 

115 elif isinstance(value, str) and value in self.CLASSES: 

116 return self._parse_class(value) 

117 return value 

118 

119 # ========== CONFIG ACCESSORS ========== 

120 

121 def get_type_mapping(self) -> dict[URIRef, dict]: 

122 """rdf:type -> {target_class, setters, ...}""" 

123 if self._type_mapping_cache is None: 

124 self._type_mapping_cache = { 

125 self._parse_uri(uri): self._parse_config(cfg) 

126 for uri, cfg in self.config['mapper'].items() 

127 if cfg.get('is') == 'class' 

128 } 

129 return self._type_mapping_cache 

130 

131 def get_property_mapping(self) -> dict[URIRef, dict]: 

132 """predicate -> {target_classes, setters, handler, ...}""" 

133 if self._property_mapping_cache is None: 

134 self._property_mapping_cache = { 

135 self._parse_uri(uri): self._parse_config(cfg) 

136 for uri, cfg in self.config['mapper'].items() 

137 if cfg.get('is') == 'predicate' 

138 } 

139 return self._property_mapping_cache 

140 

141 def _parse_config(self, cfg: dict) -> dict: 

142 """Parse un blocco di config""" 

143 parsed = cfg.copy() 

144 

145 if 'target_class' in parsed: 

146 parsed['target_class'] = self._parse_class(parsed['target_class']) 

147 

148 if 'inferred_class' in parsed: 

149 parsed['inferred_class'] = self._parse_class(parsed['inferred_class']) 

150 

151 if 'target_classes' in parsed: 

152 parsed['target_classes'] = [ 

153 self._parse_class(tc) for tc in parsed['target_classes'] 

154 ] 

155 

156 if 'setters' in parsed: 

157 parsed['setters'] = [ 

158 {k: self._parse_value(v)} if isinstance(item, dict) else item 

159 for item in parsed['setters'] 

160 for k, v in (item.items() if isinstance(item, dict) else [(item, None)]) 

161 ] 

162 

163 return parsed 

164 

165 def get_group_axioms(self) -> dict[URIRef, str]: 

166 """Group axioms: URI -> handler_name""" 

167 return { 

168 self._parse_uri(uri): handler 

169 for uri, handler in self.config.get('enricher', {}).items() 

170 } 

171 

172 def get_fallback_class(self) -> type | None: 

173 """Classe fallback per risorse non categorizzate""" 

174 fallback = self.config.get('mapper', {}).get('fallback_class') 

175 if fallback: 

176 return self.CLASSES.get(fallback, Statement) 

177 return None 

178 

179 # ========== HELPER METHODS ========== 

180 

181 def get_classifier_predicates(self) -> set[URIRef]: 

182 """Predicati con 'inferred_class'""" 

183 return { 

184 pred for pred, cfg in self.get_property_mapping().items() 

185 if 'inferred_class' in cfg 

186 } 

187 

188 def classify_by_predicate(self, uri: Node, graph: Graph) -> type | None: 

189 """Classifica guardando predicati. 

190 - inferred_class: tipo del soggetto (BNode restrictions, quantifiers, etc.) -> priorità massima 

191 - target_classes con 1 elemento e nessun inferred_class: tipo implicito del soggetto URIRef 

192 - target_classes con 2+ elementi: ambiguo, gestito dall'handler -> None 

193 """ 

194 fallback = None 

195 for predicate, cfg in self.get_property_mapping().items(): 

196 if (uri, predicate, None) in graph: 

197 if 'inferred_class' in cfg: 

198 return cfg['inferred_class'] 

199 # Note that if len(cfg.get('target_classes', [])) > 1 it is handled in phase3 by its handler 

200 elif fallback is None and len(cfg.get('target_classes', [])) == 1: 

201 fallback = cfg['target_classes'][0] 

202 return fallback 

203 

204 

205# config_manager.py - aggiungi nei concrete managers 

206 

207class OwlConfigManager(ConfigManager): 

208 

209 @property 

210 def config_name(self) -> str: 

211 return 'owl' 

212 

213 def create_logic(self, graph: Graph, cache: dict): 

214 from lode.reader.logic import OwlLogic 

215 return OwlLogic(graph, cache, self) 

216 

217 def create_viewer(self, reader): 

218 from lode.viewer import OwlViewer 

219 return OwlViewer(reader) 

220 

221 

222class RdfConfigManager(ConfigManager): 

223 def create_logic(self, graph: Graph, cache: dict): 

224 from lode.reader.logic import RdfLogic 

225 return RdfLogic(graph, cache, self) 

226 

227 def create_viewer(self, reader): 

228 from lode.viewer import BaseViewer # COMING SOOND 

229 return BaseViewer(reader) 

230 

231 

232class SkosConfigManager(ConfigManager): 

233 def create_logic(self, graph: Graph, cache: dict): 

234 from lode.reader.logic import SkosLogic 

235 return SkosLogic(graph, cache, self) 

236 

237 def create_viewer(self, reader): 

238 from lode.viewer import BaseViewer # COMING SOON 

239 return BaseViewer(reader) 

240 

241# ========== CONFIGURATIONS REGISTRY ========== 

242 

243CONFIGURATION_REGISTRY = { 

244 'OWL': OwlConfigManager, 

245 'SKOS': SkosConfigManager, 

246 'RDF': RdfConfigManager 

247} 

248 

249 

250def get_configuration(configuration_name: str) -> ConfigManager: 

251 key = configuration_name.upper() 

252 if key not in CONFIGURATION_REGISTRY: 

253 available = ', '.join(CONFIGURATION_REGISTRY.keys()) 

254 raise ValueError(f"Unknown configuration: '{configuration_name}'. Available: {available}") 

255 return CONFIGURATION_REGISTRY[key]()