Coverage for lode / reader / logic / rdf_logic.py: 11%
161 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-03-25 15:05 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-03-25 15:05 +0000
1# logic/rdf_logic.py
2from rdflib import Graph, URIRef, Node, Literal as RDFlibLiteral, BNode
3from rdflib.namespace import RDF, RDFS, OWL, SKOS, XSD
4from rdflib.collection import Collection as RDFLibCollection
6from lode.models import *
7from lode.reader.logic.base_logic import BaseLogic
10class RdfLogic(BaseLogic):
11 """
12 Logica RDF pura.
14 Comportamento:
15 - TUTTE le triple non mappate → Statement (phase6)
16 - Predicati → Property
17 - Soggetti/Oggetti non classificati → Resource
18 - Type mapping per rdf:Property, rdf:Statement, etc.
19 """
21 def __init__(self, graph, instance_cache, strategy):
22 super().__init__(graph, instance_cache, strategy)
23 self._statements_created = set()
24 self._statement_counter = 0
26 def _get_allowed_namespaces(self) -> set:
27 return {str(RDF), str(RDFS)}
29 def phase1_classify_from_predicates(self):
30 print("\n--- FASE 1: Classificazione RDF ---")
31 print(" Skip (RDF puro non classifica da predicati)")
33 def phase2_create_from_types(self):
34 """Crea istanze da rdf:type usando type_mapping"""
35 print("\n--- FASE 2: Types RDF ---")
37 type_mapping = self._strategy.get_type_mapping()
38 created = 0
40 for subj, pred, obj in self.graph.triples((None, RDF.type, None)):
41 if subj in self._instance_cache:
42 continue
44 # Check se c'è mapping specifico per questo type
45 if obj in type_mapping:
46 config = type_mapping[obj]
47 py_class = config.get('target_class')
49 if py_class:
50 self.get_or_create(subj, py_class, populate=False)
51 created += 1
53 print(f" Creati {created} da rdf:type")
55 def phase3_populate_properties(self):
56 """Popola proprietà delle istanze create"""
57 print("\n--- FASE 3: Popolamento RDF ---")
59 populated = 0
61 for uri in list(self._instance_cache.keys()):
62 if isinstance(uri, str) and uri.startswith("LITERAL::"):
63 continue
65 instances = self._instance_cache[uri]
66 instances_list = instances if isinstance(instances, set) else [instances]
68 for instance in instances_list:
70 # LOGICA RANGE/DOMAIN DEFAULT: default domain/range solo per Property
71 if isinstance(instance, Property):
72 self._apply_rdf_defaults(instance)
74 # Popola solo se NON è uno Statement
75 if not isinstance(instance, Statement):
76 self.populate_instance(instance, uri)
77 populated += 1
79 print(f" Popolate {populated} istanze")
81 def _apply_rdf_defaults(self, property_instance):
82 """Applica owl:Thing se domain/range mancano risalendo la gerarchia"""
84 rdfs_resource = None # Lazy creation
86 # Check domain (con risalita gerarchia)
87 inherited_domain = self._get_inherited_domain(property_instance)
88 if not inherited_domain:
89 rdfs_resource = self.get_or_create(RDFS.Resource, Concept)
90 property_instance.set_has_domain(rdfs_resource)
91 print(property_instance, property_instance.get_has_domain())
93 # Check range (con risalita gerarchia)
94 inherited_range = self._get_inherited_range(property_instance)
95 if not inherited_range:
96 rdfs_class = self.get_or_create(RDFS.Class, Concept)
97 property_instance.set_has_range(rdfs_class)
98 print(property_instance, property_instance.get_has_range())
100 def _get_inherited_domain(self, property_instance):
101 """Risale rdfs:subPropertyOf per trovare domain"""
102 visited = set()
103 queue = [property_instance]
105 while queue:
106 current = queue.pop(0)
107 if id(current) in visited:
108 continue
109 visited.add(id(current))
111 # Check domain diretto
112 try:
113 domain = current.get_has_domain()
114 if domain and (isinstance(domain, list) and len(domain) > 0 or domain):
115 return domain
116 except:
117 pass
119 # Risali ai super-properties
120 try:
121 supers = current.get_subproperty_of()
122 if supers:
123 if not isinstance(supers, list):
124 supers = [supers]
125 queue.extend(supers)
126 except:
127 pass
129 return None
131 def _get_inherited_range(self, property_instance):
132 """Risale rdfs:subPropertyOf per trovare range"""
133 visited = set()
134 queue = [property_instance]
136 while queue:
137 current = queue.pop(0)
138 if id(current) in visited:
139 continue
140 visited.add(id(current))
142 # Check range diretto
143 try:
144 range_val = current.get_has_range()
145 if range_val and (isinstance(range_val, list) and len(range_val) > 0 or range_val):
146 return range_val
147 except:
148 pass
150 # Risali ai super-properties
151 try:
152 supers = current.get_subproperty_of()
153 if supers:
154 if not isinstance(supers, list):
155 supers = [supers]
156 queue.extend(supers)
157 except:
158 pass
160 return None
162 def phase4_process_group_axioms(self):
163 print("\n--- FASE 4: Axioms RDF ---")
164 print(" Nessun axiom in RDF puro")
166 def phase5_fallback(self):
167 """
168 Fallback RDF:
169 - Tutti i predicati → Property
170 - Tutti i soggetti/oggetti non classificati → Resource
171 """
172 print("\n--- FASE 5: Fallback RDF ---")
174 # 1. Tutti i predicati → Property
175 all_predicates = set(self.graph.predicates())
176 property_count = 0
178 # Predicati strutturali RDF da escludere
179 exclude_predicates = {RDF.first, RDF.rest, RDF.nil}
180 exclude_namespaces = [RDF, RDFS, OWL, SKOS, XSD]
182 for pred in all_predicates:
183 if pred not in self._instance_cache and pred not in exclude_predicates:
184 if not any(str(pred).startswith(str(ns)) for ns in exclude_namespaces):
185 self.get_or_create(pred, Property, populate=False)
186 property_count += 1
188 print(f" Property: {property_count} predicati")
190 # 2. Tutti i soggetti non Collection → Resource
191 all_subjects = set(self.graph.subjects())
192 subject_count = 0
194 for subj in all_subjects:
195 if isinstance(subj, URIRef) and subj not in self._instance_cache:
196 # Escludi Collection nodes
197 if not self._is_rdf_collection(subj):
198 exclude_namespaces = [RDF, RDFS, OWL, SKOS, XSD]
199 if not any(str(subj).startswith(str(ns)) for ns in exclude_namespaces):
200 self.get_or_create(subj, Resource, populate=False)
201 subject_count += 1
203 print(f" Resource (soggetti): {subject_count}")
205 # 3. Tutti gli oggetti URI non Collection → Resource
206 object_count = 0
208 for s, p, o in self.graph:
209 if isinstance(o, URIRef) and o not in self._instance_cache:
210 # Escludi Collection nodes
211 if not self._is_rdf_collection(o):
212 exclude_namespaces = [RDF, RDFS, OWL, SKOS, XSD]
213 if not any(str(o).startswith(str(ns)) for ns in exclude_namespaces):
214 self.get_or_create(o, Resource, populate=False)
215 object_count += 1
217 print(f" Resource (oggetti): {object_count}")
219 # ========== HELPER METHODS ==========
222 def _handle_collection_as_container(self, collection_node):
223 """
224 Gestisce RDF Collection (rdf:List) convertendola in Container.
225 NON traccia le triple rdf:first/rest nei triples_map.
226 """
227 try:
228 collection = RDFLibCollection(self.graph, collection_node)
230 # Crea un Container per rappresentare la lista
231 container = Container()
232 container.set_has_identifier(str(collection_node))
234 items = []
235 for item in collection:
236 if isinstance(item, RDFlibLiteral):
237 items.append(self._create_literal(item))
238 else:
239 item_instance = self.get_or_create(item, Resource)
240 if item_instance:
241 items.append(item_instance)
243 # Aggiungi items al container
244 for item in items:
245 container.set_has_member(item)
247 # CRITICO: Salva Container nella cache
248 if collection_node not in self._instance_cache:
249 self._instance_cache[collection_node] = {container}
251 # NON tracciare rdf:first/rest - traccia solo triple significative
252 if container not in self._triples_map:
253 self._triples_map[container] = set()
255 # Traccia solo triple NON strutturali della Collection
256 for s, p, o in self.graph.triples((collection_node, None, None)):
257 if p not in {RDF.first, RDF.rest, RDF.nil}:
258 self._triples_map[container].add((s, p, o))
260 return container
262 except Exception as e:
263 print(f" Errore gestione Collection: {e}")
264 # Fallback: crea Resource
265 return self.get_or_create(collection_node, Resource)
267 # def _is_triple_mapped(self, subj, pred, obj) -> bool:
268 # """
269 # Check se tripla già gestita.
270 # Una tripla è mappata se:
271 # - Il predicato è in property_mapping
272 # - Il soggetto esiste in cache E non è uno Statement fallback
273 # """
274 # # Se predicato è nel property_mapping, è mappato
275 # if pred in self._property_mapping:
276 # return True
278 # # Se soggetto non esiste in cache, non è mappato
279 # if subj not in self._instance_cache:
280 # return False
282 # # Se soggetto esiste ma è uno Statement, NON è mappato
283 # # (evita loop infinito)
284 # instances = self._instance_cache[subj]
285 # for inst in instances:
286 # if isinstance(inst, Statement):
287 # return False
289 # return False