Coverage for lode / reader / config_manager.py: 79%
121 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-03-25 15:05 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-03-25 15:05 +0000
1# config_magarer.py - STRATEGIE CON CONFIG UNIFICATA
3import yaml
4from pathlib import Path
5from abc import ABC, abstractmethod
6from rdflib import URIRef, Graph, Node
7from lode.models import *
9class ConfigManager(ABC):
10 """Strategia basata su config YAML unico"""
12 def __init__(self):
13 self.config = self._load_config()
14 self._type_mapping_cache = None
15 self._property_mapping_cache = None
17 @abstractmethod
18 def create_logic(self, graph: Graph, cache: dict):
19 """Factory method per creare Logic specifica"""
20 pass
22 @property
23 @abstractmethod
24 def config_name(self) -> str:
25 """Nome del file config specifico: 'owl' | 'skos' | 'rdfs'"""
26 pass
28 def _load_config(self) -> dict:
29 config_dir = Path(__file__).parent / 'config'
31 with open(config_dir / 'base.yaml') as f:
32 base = yaml.safe_load(f)
34 with open(config_dir / f'{self.config_name}.yaml') as f:
35 specific = yaml.safe_load(f)
37 return self._deep_merge(base, specific)
39 def _deep_merge(self, base: dict, override: dict) -> dict:
40 result = base.copy()
41 for key, value in override.items():
42 if key in ('name', 'inherits'):
43 continue
44 if key in result and isinstance(result[key], dict) and isinstance(value, dict):
45 result[key] = self._deep_merge(result[key], value)
46 else:
47 result[key] = value
48 return result
50 # def _load_config(self) -> dict:
51 # """Carica config da file YAML unico"""
52 # config_path = Path(__file__).parent / 'config' / 'base.yaml'
54 # with open(config_path, 'r') as f:
55 # config = yaml.safe_load(f)
57 # return config
59 @abstractmethod
60 def create_viewer(self, reader):
61 """Factory method per creare Viewer specifico"""
62 pass
64 # ========== NAMESPACE MAP ==========
65 NAMESPACES = {
66 'owl': 'http://www.w3.org/2002/07/owl#',
67 'rdfs': 'http://www.w3.org/2000/01/rdf-schema#',
68 'rdf': 'http://www.w3.org/1999/02/22-rdf-syntax-ns#',
69 'skos': 'http://www.w3.org/2004/02/skos/core#'
70 }
72 # ========== CLASS MAP ==========
73 CLASSES = {
74 'Concept': Concept,
75 'Property': Property,
76 'Relation': Relation,
77 'Attribute': Attribute,
78 'Annotation': Annotation,
79 'Restriction': Restriction,
80 'Quantifier': Quantifier,
81 'Cardinality': Cardinality,
82 'TruthFunction': TruthFunction,
83 'PropertyConceptRestriction': PropertyConceptRestriction,
84 'DatatypeRestriction': DatatypeRestriction,
85 'Container': Container,
86 'OneOf': OneOf,
87 'Value': Value,
88 'Model': Model,
89 'Collection': Collection,
90 'Statement': Statement,
91 'Datatype': Datatype,
92 'Literal': Literal,
93 'Resource': Resource,
94 'Individual': Individual
95 }
97 def _parse_uri(self, uri_str: str) -> URIRef:
98 """owl:Class -> URIRef"""
99 if ':' not in uri_str:
100 return URIRef(uri_str)
102 prefix, local = uri_str.split(':', 1)
103 return URIRef(self.NAMESPACES[prefix] + local)
105 def _parse_class(self, class_name: str):
106 """'Concept' -> Concept class"""
107 return self.CLASSES[class_name]
109 def _parse_value(self, value):
110 """Parse valore config"""
111 if value == 'Literal':
112 return 'Literal'
113 elif value is True or value is False:
114 return value
115 elif isinstance(value, str) and value in self.CLASSES:
116 return self._parse_class(value)
117 return value
119 # ========== CONFIG ACCESSORS ==========
121 def get_type_mapping(self) -> dict[URIRef, dict]:
122 """rdf:type -> {target_class, setters, ...}"""
123 if self._type_mapping_cache is None:
124 self._type_mapping_cache = {
125 self._parse_uri(uri): self._parse_config(cfg)
126 for uri, cfg in self.config['mapper'].items()
127 if cfg.get('is') == 'class'
128 }
129 return self._type_mapping_cache
131 def get_property_mapping(self) -> dict[URIRef, dict]:
132 """predicate -> {target_classes, setters, handler, ...}"""
133 if self._property_mapping_cache is None:
134 self._property_mapping_cache = {
135 self._parse_uri(uri): self._parse_config(cfg)
136 for uri, cfg in self.config['mapper'].items()
137 if cfg.get('is') == 'predicate'
138 }
139 return self._property_mapping_cache
141 def _parse_config(self, cfg: dict) -> dict:
142 """Parse un blocco di config"""
143 parsed = cfg.copy()
145 if 'target_class' in parsed:
146 parsed['target_class'] = self._parse_class(parsed['target_class'])
148 if 'inferred_class' in parsed:
149 parsed['inferred_class'] = self._parse_class(parsed['inferred_class'])
151 if 'target_classes' in parsed:
152 parsed['target_classes'] = [
153 self._parse_class(tc) for tc in parsed['target_classes']
154 ]
156 if 'setters' in parsed:
157 parsed['setters'] = [
158 {k: self._parse_value(v)} if isinstance(item, dict) else item
159 for item in parsed['setters']
160 for k, v in (item.items() if isinstance(item, dict) else [(item, None)])
161 ]
163 return parsed
165 def get_group_axioms(self) -> dict[URIRef, str]:
166 """Group axioms: URI -> handler_name"""
167 return {
168 self._parse_uri(uri): handler
169 for uri, handler in self.config.get('enricher', {}).items()
170 }
172 def get_fallback_class(self) -> type | None:
173 """Classe fallback per risorse non categorizzate"""
174 fallback = self.config.get('mapper', {}).get('fallback_class')
175 if fallback:
176 return self.CLASSES.get(fallback, Statement)
177 return None
179 # ========== HELPER METHODS ==========
181 def get_classifier_predicates(self) -> set[URIRef]:
182 """Predicati con 'inferred_class'"""
183 return {
184 pred for pred, cfg in self.get_property_mapping().items()
185 if 'inferred_class' in cfg
186 }
188 def classify_by_predicate(self, uri: Node, graph: Graph) -> type | None:
189 """Classifica guardando predicati.
190 - inferred_class: tipo del soggetto (BNode restrictions, quantifiers, etc.) -> priorità massima
191 - target_classes con 1 elemento e nessun inferred_class: tipo implicito del soggetto URIRef
192 - target_classes con 2+ elementi: ambiguo, gestito dall'handler -> None
193 """
194 fallback = None
195 for predicate, cfg in self.get_property_mapping().items():
196 if (uri, predicate, None) in graph:
197 if 'inferred_class' in cfg:
198 return cfg['inferred_class']
199 # Note that if len(cfg.get('target_classes', [])) > 1 it is handled in phase3 by its handler
200 elif fallback is None and len(cfg.get('target_classes', [])) == 1:
201 fallback = cfg['target_classes'][0]
202 return fallback
205# config_manager.py - aggiungi nei concrete managers
207class OwlConfigManager(ConfigManager):
209 @property
210 def config_name(self) -> str:
211 return 'owl'
213 def create_logic(self, graph: Graph, cache: dict):
214 from lode.reader.logic import OwlLogic
215 return OwlLogic(graph, cache, self)
217 def create_viewer(self, reader):
218 from lode.viewer import OwlViewer
219 return OwlViewer(reader)
222class RdfConfigManager(ConfigManager):
223 def create_logic(self, graph: Graph, cache: dict):
224 from lode.reader.logic import RdfLogic
225 return RdfLogic(graph, cache, self)
227 def create_viewer(self, reader):
228 from lode.viewer import BaseViewer # COMING SOOND
229 return BaseViewer(reader)
232class SkosConfigManager(ConfigManager):
233 def create_logic(self, graph: Graph, cache: dict):
234 from lode.reader.logic import SkosLogic
235 return SkosLogic(graph, cache, self)
237 def create_viewer(self, reader):
238 from lode.viewer import BaseViewer # COMING SOON
239 return BaseViewer(reader)
241# ========== CONFIGURATIONS REGISTRY ==========
243CONFIGURATION_REGISTRY = {
244 'OWL': OwlConfigManager,
245 'SKOS': SkosConfigManager,
246 'RDF': RdfConfigManager
247}
250def get_configuration(configuration_name: str) -> ConfigManager:
251 key = configuration_name.upper()
252 if key not in CONFIGURATION_REGISTRY:
253 available = ', '.join(CONFIGURATION_REGISTRY.keys())
254 raise ValueError(f"Unknown configuration: '{configuration_name}'. Available: {available}")
255 return CONFIGURATION_REGISTRY[key]()