Coverage for lode / viewer / base_viewer.py: 0%

268 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-03-25 15:05 +0000

1# base_viewer.py 

2import hashlib 

3import re 

4from typing import Dict, List, Optional, Tuple 

5from lode.models import Literal, Model, Resource, Statement 

6 

7class BaseViewer: 

8 """Base viewer per visualizzare istanze estratte dal Reader.""" 

9 

10 def __init__(self, reader): 

11 self.reader = reader 

12 self._cache = reader._instance_cache # it uses  

13 

14 def get_all_instances(self) -> List: 

15 """Ottiene tutte le istanze (esclusi literal).""" 

16 instances = [] 

17 

18 for uri_id, instance_set in self._cache.items(): 

19 if isinstance(uri_id, str) and uri_id.startswith("LITERAL::"): 

20 continue 

21 

22 instance_list = instance_set if isinstance(instance_set, set) else [instance_set] 

23 instances.extend(instance_list) 

24 

25 return instances 

26 

27 def get_instances_from_single_resource(self, resource_uri: str) -> Optional[set]: 

28 """Ottiene istanze per un URI specifico dalla cache.""" 

29 # Cerca l'URI nella cache 

30 for uri_id in self._cache.keys(): 

31 if str(uri_id) == resource_uri: 

32 return self._cache[uri_id] 

33 

34 return None 

35 

36 def _get_best_label(self, resource: Resource, language: Optional[str] = None) -> Optional[str]: 

37 """Gets the best label to display: language > preferred_label > label > identifier.""" 

38 

39 # 1. Force 'en' as the absolute default if no language makes it this far 

40 target_lang = language.strip().lower() if language else "en" 

41 

42 # Check Preferred Labels for the target language 

43 labels = resource.get_has_preferred_label() 

44 for label in labels: 

45 if hasattr(label, 'get_has_language') and label.get_has_language(): 

46 if label.get_has_language().lower().startswith(target_lang): 

47 return self._clean_name(label.get_has_value()) 

48 

49 # Check Normal Labels for the target language 

50 labels = resource.get_has_label() 

51 for label in labels: 

52 if hasattr(label, 'get_has_language') and label.get_has_language(): 

53 if label.get_has_language().lower().startswith(target_lang): 

54 return self._clean_name(label.get_has_value()) 

55 

56 # --- DETERMINISTIC FALLBACKS --- 

57 # If we reach here, no English label exists. 

58 # We sort them by language tag so it doesn't randomly shuffle! 

59 

60 labels = resource.get_has_preferred_label() 

61 if labels: 

62 # Sort alphabetically by language tag (e.g., 'es' then 'pt') 

63 sorted_labels = sorted(labels, key=lambda x: str(x.get_has_language() or "")) 

64 return self._clean_name(sorted_labels[0].get_has_value()) 

65 

66 labels = resource.get_has_label() 

67 if labels: 

68 sorted_labels = sorted(labels, key=lambda x: str(x.get_has_language() or "")) 

69 return self._clean_name(sorted_labels[0].get_has_value()) 

70 

71 # Final Fallback: The URI Identifier 

72 resource_id = resource.get_has_identifier() 

73 if resource_id: 

74 clean_resource_id = resource_id.split('#')[-1] if '#' in resource_id else resource_id.split('/')[-1] 

75 return self._clean_name(clean_resource_id) 

76 

77 return None 

78 

79 def get_view_data(self, resource_uri: Optional[str] = None, language: Optional[str] = None) -> Dict: 

80 """ 

81 Main entry point called by the API 

82 Subclasses should override this to define their specific view strategy 

83 """ 

84 # If no language is provided, the default would be English 

85 language = language.strip() if language else "en" 

86 

87 # Fallback: generic flat list 

88 all_instances = self.get_all_instances() 

89 metadata_dict = self._find_and_format_metadata(all_instances, language) 

90 

91 if resource_uri: 

92 data = self._handle_single_resource(resource_uri, language) 

93 data['metadata'] = metadata_dict 

94 return data 

95 

96 return { 

97 'metadata': metadata_dict, 

98 'entities': self._format_entities(all_instances, language) 

99 } 

100 

101 def _handle_single_resource(self, resource_uri: str, language: Optional[str] = None) -> Dict: 

102 """ 

103 Standard logic for displaying a single resource. 

104 Returns the specific dictionary structure required by viewer.html 

105 """ 

106 instance_set = self.get_instances_from_single_resource(resource_uri) 

107 

108 if not instance_set: 

109 return {'error': f'Resource {resource_uri} not found'} 

110 

111 instances = list(instance_set) if isinstance(instance_set, set) else [instance_set] 

112 

113 return { 

114 'single_resource': True, 

115 'entities': self._format_entities(instances, language), 

116 'groupped_view': False, 

117 'sections': None 

118 } 

119 

120 def _build_grouped_view(self, group_definitions: List[Tuple[str, str, str]], language: Optional[str] = None) -> Dict: 

121 """ 

122 Constructs the 'Table of Contents' view. 

123 

124 Args: 

125 group_definitions: List of tuples (ClassKey, HTML_ID, Title) 

126 e.g. [('Concept', 'classes', 'Classes')] 

127 language: Optional language code for label filtering 

128 """ 

129 all_instances = self.get_all_instances() 

130 sections = [] 

131 

132 for class_key, section_id, section_title in group_definitions: 

133 instances = [ 

134 inst for inst in all_instances 

135 if type(inst).__name__ == class_key 

136 ] 

137 

138 if instances: 

139 sections.append({ 

140 'id': section_id, 

141 'title': section_title, 

142 'entities': self._format_entities(instances, language) 

143 }) 

144 

145 return { 

146 'grouped_view': True, 

147 'sections': sections 

148 } 

149 

150 def _format_entities(self, instances: List, language: Optional[str] = None) -> List[Dict]: 

151 """ 

152 Converts Python Models -> HTML Template Dictionary. 

153 Ensures consistent keys ('type', 'uri', 'label') across all viewers. 

154 """ 

155 all_instances = self.get_all_instances() 

156 

157 entities = [] 

158 for instance in instances: 

159 uri = instance.has_identifier if hasattr(instance, 'has_identifier') else None 

160 

161 if not uri: 

162 continue 

163 

164 #Create a safe HTML ID to facilitate on-page navigation 

165 safe_id = hashlib.md5(str(uri).encode('utf-8')).hexdigest() 

166 

167 # Extract internal attributes (SuperClasses, etc.) 

168 relations = {} 

169 if hasattr(instance, '__dict__'): 

170 for attr, value in instance.__dict__.items(): 

171 if not attr.startswith('_') and value: 

172 # Skip attributes that are handled elsewhere or are empty 

173 # Clean up name: 

174 clean_name = self._clean_name(attr) 

175 

176 # Process value (could be a list of objects)clean 

177 # We use the helper to get clean text for each item 

178 formatted_values = [] 

179 if isinstance(value, list): 

180 for v in value: 

181 val_dict = self._resolve_resource_value(v, language) 

182 if val_dict['text']: formatted_values.append(val_dict) 

183 else: 

184 val_dict = self._resolve_resource_value(value, language) 

185 if val_dict['text']: formatted_values.append(val_dict) 

186 

187 if formatted_values: 

188 if clean_name not in relations: 

189 relations[clean_name] = [] 

190 for v in formatted_values: 

191 if v not in relations[clean_name]: # Valentina FIX: this do not add duplicates in metadata values  

192 relations[clean_name].append(v) 

193 

194 # Extract Statement Entities 

195 statements = self._format_statement(all_instances, uri, language) 

196 

197 entities.append({ 

198 'type': type(instance).__name__, 

199 'uri': uri, 

200 'label': self._get_best_label(instance, language), 

201 'anchor_id': f"id_{safe_id}", 

202 'relations': relations, 

203 'statements': statements 

204 }) 

205 

206 entities.sort(key=lambda x: (x['label'] or x['uri']).lower()) 

207 return entities 

208 

209 def _find_and_format_metadata(self, all_instances: List[Resource], language=None) -> Dict: 

210 """ 

211 Searches for the Model and its Statements and formatting them 

212 for the template. 

213 """ 

214 ontology_model = None 

215 

216 # 1. Find the Model 

217 for instance in all_instances: 

218 if isinstance(instance, Model): 

219 ontology_model = instance 

220 break 

221 

222 if not ontology_model: 

223 return {} 

224 

225 # 2. Prepare Output 

226 data = { 

227 'uri': [self._resolve_resource_value(ontology_model, language)], 

228 'label': [self._resolve_resource_value(self._get_best_label(ontology_model, language))] 

229 } 

230 

231 # 3: Dynamic Extraction of Structural Data --- 

232 # Methods we want to skip because they belong in the Header or Annotations 

233 ignore_methods = [ 

234 'get_has_identifier', 

235 'get_has_label', 

236 'get_has_subject', 

237 'get_has_predicate', 

238 'get_has_object' 

239 ] 

240 entry = {'text': None, 'link': None} 

241 # 1. Loop through ALL attributes and methods of the Model object 

242 for attr_name in dir(ontology_model): 

243 # 2. Look specifically for getter methods 

244 if attr_name.startswith('get_') and attr_name not in ignore_methods: 

245 

246 method = getattr(ontology_model, attr_name) 

247 

248 if callable(method): 

249 try: 

250 values = method() 

251 except AttributeError: 

252 # skip attributes that are not initialized 

253 continue 

254 

255 if values: 

256 # 4. Auto-format the key name 

257 clean_key = self._clean_name(attr_name) 

258 

259 # 5. Ensure values are in a list 

260 if not isinstance(values, list): 

261 values = [values] 

262 

263 # 6. Extract the actual text values 

264 extracted_values = [] 

265 for val in values: 

266 entry = self._resolve_resource_value(val, language) 

267 if entry['text']: 

268 extracted_values.append(entry) 

269 

270 # 7. Add to structural data ONLY if we found valid text 

271 if extracted_values: 

272 data[clean_key] = extracted_values 

273 

274 # 4. Statements 

275 data.update(self._format_statement(all_instances, ontology_model.has_identifier, language)) 

276 

277 return data 

278 

279 def _resolve_resource_value(self, obj, language=None) -> dict: 

280 """Helper: Extracts text and link from any object.""" 

281 handler_dic = { 

282 'text': None, 

283 'link': None, 

284 'lan': None, 

285 'parts': None, # This key is for restrictions 

286 'type': None 

287 } 

288 

289 if not obj: return handler_dic 

290 

291 # --- 1. INTERCEPT RESTRICTIONS --- 

292 restriction_types = ["Restriction", "PropertyConceptRestriction", "Quantifier", "Cardinality", "TruthFunction", 

293 "OneOf", "Value"] 

294 obj_type = type(obj).__name__ 

295 

296 if obj_type in restriction_types: 

297 # Recursively parse the restriction into clickable parts 

298 parts = self._parse_restriction(obj, language) 

299 

300 handler_dic['parts'] = parts 

301 handler_dic['text'] = "".join([p['text'] for p in parts if p.get('text')]) 

302 handler_dic['link'] = None # Forces Jinja to ignore the blank node URI 

303 handler_dic['type'] = obj_type 

304 return handler_dic 

305 

306 # --- 2. String Handling --- 

307 if isinstance(obj, str): 

308 handler_dic['text'] = obj 

309 return handler_dic 

310 

311 # --- 3. Literal Handling --- 

312 if type(obj).__name__ == 'Literal': 

313 if hasattr(obj, 'get_has_value') and obj.get_has_value(): 

314 lit_lang = obj.get_has_language() 

315 

316 # If a language is requested (e.g., 'pt') 

317 if language: 

318 target_lang = language.strip().lower() 

319 

320 # If the literal has a tag, and it DOES NOT match 'pt', destroy it. 

321 if lit_lang and not lit_lang.lower().startswith(target_lang): 

322 return handler_dic 

323 

324 if not lit_lang: 

325 return handler_dic 

326 

327 handler_dic['text'] = obj.get_has_value() 

328 handler_dic['lan'] = lit_lang 

329 return handler_dic 

330 

331 raw_str = str(obj) 

332 if "object at" not in raw_str: 

333 handler_dic['text'] = raw_str 

334 return handler_dic 

335 

336 # --- 4. Normal Resource Handling (Concepts, Properties, Individuals) --- 

337 if hasattr(obj, 'get_has_identifier'): 

338 handler_dic['link'] = obj.get_has_identifier() 

339 try: 

340 handler_dic['text'] = self._get_best_label(obj, language) 

341 handler_dic['type'] = obj_type 

342 except AttributeError: 

343 handler_dic['text'] = handler_dic['link'] 

344 handler_dic['type'] = obj_type 

345 

346 # Fallbacks 

347 if not handler_dic['text'] and handler_dic['link']: 

348 handler_dic['text'] = handler_dic['link'] 

349 handler_dic['type'] = obj_type 

350 

351 return handler_dic 

352 

353 def _format_statement(self, instances, identifier: str, language=None) -> Dict: 

354 """ 

355 Extracts all statements where the subject matches the given identifier. 

356 """ 

357 statements = {} 

358 

359 # 1. Normalize the target identifier to a clean string 

360 target_id = str(identifier).strip() if identifier else "" 

361 if not target_id: 

362 return statements 

363 

364 for instance in instances: 

365 if isinstance(instance, Statement): 

366 subj = instance.get_has_subject() 

367 

368 # 2. Extract and normalize the subject's identifier 

369 subj_id = "" 

370 if hasattr(subj, 'has_identifier') and subj.has_identifier: 

371 subj_id = str(subj.has_identifier).strip() 

372 elif isinstance(subj, str): 

373 subj_id = subj.strip() 

374 

375 # 3. String Comparison 

376 if subj_id == target_id: 

377 predicate = instance.get_has_predicate() 

378 obj = instance.get_has_object() 

379 

380 # 4. Predicate Resolution 

381 pred_label = self._get_best_label(predicate, language) if predicate else "Annotation" 

382 

383 if pred_label not in statements: 

384 statements[pred_label] = [] 

385 

386 # 5. Resolve Object and Prevent Duplicates 

387 if obj: 

388 obj_data = self._resolve_resource_value(obj, language) 

389 

390 

391 if obj_data not in statements[pred_label]: 

392 statements[pred_label].append(obj_data) 

393 

394 return statements 

395 

396 @staticmethod 

397 def _clean_name(name: str) -> str: 

398 if not name: return "" 

399 

400 name = re.sub(r'^(get_has_|get_|has_)', '', name) 

401 name = re.sub(r'([a-z0-9])([A-Z])', r'\1 \2', name) 

402 name = name.replace('_', ' ') 

403 

404 return ' '.join(name.split()).title() 

405 

406 def _parse_restriction(self, obj, language=None) -> list: 

407 """ 

408 Recursively unpacks nested restrictions into a list of display parts 

409 (each part being a dict with 'text' and 'link'). 

410 """ 

411 if not obj: return [] 

412 

413 # 1. Handle lists of restrictions/concepts (e.g., in TruthFunctions or OneOf) 

414 if isinstance(obj, list) or isinstance(obj, set): 

415 parts = [] 

416 for i, item in enumerate(obj): 

417 if i > 0: 

418 parts.append({'text': ', ', 'link': None}) 

419 parts.extend(self._parse_restriction(item, language)) 

420 return parts 

421 

422 obj_type = type(obj).__name__ 

423 restriction_types = ["Restriction", "PropertyConceptRestriction", "Quantifier", "Cardinality", "TruthFunction", 

424 "OneOf", "Value"] 

425 

426 # 2. If it is a Restriction, recursively unpack its specific components 

427 if obj_type in restriction_types: 

428 parts = [] 

429 

430 # Helper to safely call getter methods (e.g., get_applies_on_property) 

431 def _get(instance, prop_name, default=None): 

432 getter = f"get_{prop_name}" 

433 if hasattr(instance, getter): 

434 res = getattr(instance, getter)() 

435 return res if res is not None else default 

436 return getattr(instance, prop_name, default) 

437 

438 if obj_type == "Quantifier": 

439 prop = _get(obj, 'applies_on_property') 

440 quant = _get(obj, 'has_quantifier_type', "some") 

441 concept = _get(obj, 'applies_on_concept') 

442 

443 parts.extend(self._parse_restriction(prop, language)) 

444 parts.append({'text': f' {quant} ', 'link': None}) 

445 parts.extend(self._parse_restriction(concept, language)) 

446 

447 elif obj_type == "Cardinality": 

448 prop = _get(obj, 'applies_on_property') 

449 card = _get(obj, 'has_cardinality_type', "exactly") 

450 concept = _get(obj, 'applies_on_concept') 

451 

452 parts.extend(self._parse_restriction(prop, language)) 

453 parts.append({'text': f' {card} ', 'link': None}) 

454 parts.extend(self._parse_restriction(concept, language)) 

455 

456 elif obj_type == "TruthFunction": 

457 operator = _get(obj, 'has_logical_operator', "and") 

458 concepts = _get(obj, 'applies_on_concept', []) 

459 if not isinstance(concepts, list): concepts = [concepts] 

460 

461 parts.append({'text': '(', 'link': None}) 

462 for i, c in enumerate(concepts): 

463 if i > 0: 

464 parts.append({'text': f' {operator} ', 'link': None}) 

465 parts.extend(self._parse_restriction(c, language)) 

466 parts.append({'text': ')', 'link': None}) 

467 

468 elif obj_type == "OneOf": 

469 resources = _get(obj, 'applies_on_resource', []) 

470 if not isinstance(resources, list): resources = [resources] 

471 

472 parts.append({'text': 'one of { ', 'link': None}) 

473 for i, r in enumerate(resources): 

474 if i > 0: 

475 parts.append({'text': ', ', 'link': None}) 

476 parts.extend(self._parse_restriction(r, language)) 

477 parts.append({'text': ' }', 'link': None}) 

478 

479 elif obj_type == "Value": 

480 prop = _get(obj, 'applies_on_property') 

481 resource = _get(obj, 'applies_on_resource') 

482 

483 parts.extend(self._parse_restriction(prop, language)) 

484 parts.append({'text': ' value ', 'link': None}) 

485 parts.extend(self._parse_restriction(resource, language)) 

486 

487 return parts 

488 

489 # 3. Base Case: We hit a non-blank node (Concept, Relation, String) 

490 # Send it to the main resolver to extract its URI link and clean text. 

491 resolved = self._resolve_resource_value(obj, language) 

492 

493 if resolved.get('text'): 

494 return [{'text': resolved['text'], 'link': resolved.get('link'), 'type': resolved.get('type')}] 

495 

496 return []