Coverage for lode / reader / logic / skos_logic.py: 26%

54 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-03-25 15:05 +0000

1# logic.py - LOGICHE SPECIFICHE PER FORMATO 

2from rdflib import Graph, URIRef, Node, Literal as RDFlibLiteral, BNode 

3from rdflib.namespace import RDF, RDFS, OWL, SKOS, XSD 

4from rdflib.collection import Collection as RDFLibCollection 

5 

6from lode.models import * 

7from lode.reader.logic.base_logic import BaseLogic 

8 

9class SkosLogic(BaseLogic): 

10 """ 

11 Logica SKOS. 

12  

13 Differenze: 

14 - Focus su Concept/ConceptScheme 

15 - Relazioni skos:broader/narrower 

16 - NO default domain/range 

17 - NO Restriction, Individual, CompositeClass, Datatype, Attribute 

18 """ 

19 

20 def _get_allowed_namespaces(self) -> set: 

21 return {str(RDF), str(RDFS), str(SKOS)} 

22 

23 def phase1_classify_from_predicates(self): 

24 print("\n--- FASE 1: Classificazione SKOS ---") 

25 print(" Skip (SKOS usa solo types)") 

26 

27 def phase2_create_from_types(self): 

28 print("\n--- FASE 2: Types SKOS ---") 

29 

30 type_mapping = self._strategy.get_type_mapping() 

31 created = 0 

32 

33 for rdf_type, config in type_mapping.items(): 

34 py_class = config.get('target_class') 

35 if not py_class: 

36 continue 

37 

38 for uri in self.graph.subjects(RDF.type, rdf_type): 

39 if uri not in self._instance_cache: 

40 self.get_or_create(uri, py_class, populate=False) 

41 created += 1 

42 

43 print(f" Create: {created}") 

44 

45 def phase3_populate_properties(self): 

46 print("\n--- FASE 3: Popolamento SKOS ---") 

47 

48 for uri in list(self._instance_cache.keys()): 

49 for instance in list(self._instance_cache[uri]): 

50 self.populate_instance(instance, uri) 

51 

52 print(f" Popolate: {len(self._instance_cache)}") 

53 

54 def phase4_process_group_axioms(self): 

55 print("\n--- FASE 4: Axioms SKOS ---") 

56 print(" Nessun axiom SKOS") 

57 

58 def phase5_fallback(self): 

59 print("\n--- FASE 5: FALLBACK ---") 

60 print("TO BE IMPLEMENTED SOON") 

61 

62 # Handler specifici SKOS 

63 def handle_membership(self, instance, uri, predicate, obj, value): 

64 """Gestisce skos:member""" 

65 if not isinstance(instance, Collection): 

66 return 

67 

68 for obj_uri in self.graph.objects(uri, predicate): 

69 member_obj = self._instance_cache.get(obj_uri) 

70 

71 if isinstance(member_obj, set): 

72 member_obj = next(iter(member_obj), None) 

73 

74 if member_obj and isinstance(member_obj, (Model, Concept)): 

75 instance.set_has_member(member_obj) 

76 

77 def handle_narrower(self, instance, uri, predicate, obj, value): 

78 """Gestisce skos:narrower (inverso)""" 

79 if not isinstance(instance, Concept): 

80 return 

81 

82 for subj_uri in self.graph.subjects(predicate, uri): 

83 broader_obj = self._instance_cache.get(subj_uri) 

84 

85 if isinstance(broader_obj, set): 

86 broader_obj = next(iter(broader_obj), None) 

87 

88 if broader_obj and isinstance(broader_obj, Concept): 

89 instance.set_is_sub_concept_of(broader_obj)