Coverage for heritrace / scripts / clean_missing_entities.py: 98%

124 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-07-02 10:16 +0000

1#!/usr/bin/env python3 

2 

3# SPDX-FileCopyrightText: 2025 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# SPDX-License-Identifier: ISC 

6 

7import argparse 

8import importlib.util 

9import logging 

10import sys 

11import types 

12from typing import TypedDict 

13 

14from SPARQLWrapper import JSON 

15from SPARQLWrapper.SPARQLExceptions import SPARQLWrapperException 

16 

17from heritrace.sparql import SPARQLWrapperWithRetry, get_sparql_bindings 

18from heritrace.utils.sparql_utils import VIRTUOSO_EXCLUDED_GRAPHS 

19 

20logger = logging.getLogger(__name__) 

21 

22 

23class MissingEntityResult(TypedDict): 

24 uri: str 

25 references: list[dict[str, str]] 

26 success: bool 

27 

28 

29class MissingEntityCleaner: 

30 """ 

31 A class to detect and clean up references to missing entities from the dataset. 

32 

33 Missing entities are URIs that are referenced by triples but don't actually exist 

34 in the dataset (they have no triples where they are the subject). The script 

35 identifies 

36 these missing references and removes all triples that reference them. 

37 """ 

38 

39 def __init__(self, endpoint: str, *, is_virtuoso: bool = False) -> None: 

40 """ 

41 Initialize the MissingEntityCleaner. 

42 

43 Args: 

44 endpoint: The SPARQL endpoint for the database 

45 is_virtuoso: Boolean indicating if the endpoint is Virtuoso 

46 """ 

47 self.endpoint = endpoint 

48 self.is_virtuoso = is_virtuoso 

49 self.sparql = SPARQLWrapperWithRetry(endpoint) 

50 self.sparql.setReturnFormat(JSON) 

51 self.logger = logging.getLogger(__name__) 

52 

53 def find_missing_entities_with_references(self) -> dict[str, list[dict[str, str]]]: 

54 """ 

55 Find missing entity references in the dataset along with their references. 

56 

57 A missing entity is one that: 

58 1. Is referenced as an object in at least one triple 

59 2. Has no triples where it is the subject (completely missing) 

60 

61 The following are excluded from being considered missing entities: 

62 - Objects of rdf:type triples (types are not considered entities) 

63 - Objects of ProWithRole triples 

64 - Objects of datacite:usesIdentifierScheme triples 

65 

66 Returns: 

67 Dictionary mapping missing entity URIs to lists of reference dictionaries 

68 """ 

69 is_quad_store = self.is_virtuoso 

70 

71 # Define predicates to exclude 

72 rdf_type = "http://www.w3.org/1999/02/22-rdf-syntax-ns#type" 

73 pro_with_role = "http://purl.org/spar/pro/withRole" 

74 datacite_uses_identifier_scheme = ( 

75 "http://purl.org/spar/datacite/usesIdentifierScheme" 

76 ) 

77 

78 # Combine all predicates to exclude 

79 excluded_predicates = [rdf_type, pro_with_role, datacite_uses_identifier_scheme] 

80 

81 # Format the excluded predicates for SPARQL 

82 excluded_predicates_filter = " && ".join( 

83 [f"?p != <{pred}>" for pred in excluded_predicates] 

84 ) 

85 

86 if is_quad_store: 

87 query = f""" 

88 SELECT DISTINCT ?entity ?s ?p 

89 WHERE {{ 

90 GRAPH ?g1 {{ 

91 ?s ?p ?entity . 

92 FILTER(isIRI(?entity)) 

93 FILTER({excluded_predicates_filter}) 

94 }} 

95 FILTER NOT EXISTS {{ 

96 GRAPH ?g2 {{ 

97 ?entity ?anyPredicate ?anyObject . 

98 }} 

99 }} 

100 FILTER(?g1 NOT IN (<{">, <".join(VIRTUOSO_EXCLUDED_GRAPHS)}>)) 

101 }} 

102 """ 

103 else: 

104 query = f""" 

105 SELECT DISTINCT ?entity ?s ?p 

106 WHERE {{ 

107 ?s ?p ?entity . 

108 FILTER(isIRI(?entity)) 

109 FILTER({excluded_predicates_filter}) 

110 FILTER NOT EXISTS {{ 

111 ?entity ?anyPredicate ?anyObject . 

112 }} 

113 }} 

114 """ 

115 

116 self.sparql.setQuery(query) 

117 bindings = get_sparql_bindings(self.sparql.queryAndConvert()) 

118 

119 missing_entities: dict[str, list[dict[str, str]]] = {} 

120 

121 for result in bindings: 

122 entity_uri = result["entity"]["value"] 

123 subject = result["s"]["value"] 

124 predicate = result["p"]["value"] 

125 

126 if entity_uri not in missing_entities: 

127 missing_entities[entity_uri] = [] 

128 

129 missing_entities[entity_uri].append( 

130 {"subject": subject, "predicate": predicate} 

131 ) 

132 

133 return missing_entities 

134 

135 def remove_references( 

136 self, entity_uri: str, references: list[dict[str, str]] 

137 ) -> bool: 

138 """ 

139 Remove all references to a missing entity. 

140 

141 Args: 

142 entity_uri: The URI of the missing entity 

143 references: List of references to the missing entity 

144 

145 Returns: 

146 bool: True if all references were successfully removed, False otherwise 

147 """ 

148 success = True 

149 

150 for reference in references: 

151 subject = reference["subject"] 

152 predicate = reference["predicate"] 

153 

154 is_quad_store = self.is_virtuoso 

155 

156 if is_quad_store: 

157 query = f""" 

158 DELETE {{ 

159 GRAPH ?g {{ 

160 <{subject}> <{predicate}> <{entity_uri}> . 

161 }} 

162 }} 

163 WHERE {{ 

164 GRAPH ?g {{ 

165 <{subject}> <{predicate}> <{entity_uri}> . 

166 }} 

167 FILTER(?g NOT IN (<{">, <".join(VIRTUOSO_EXCLUDED_GRAPHS)}>)) 

168 }} 

169 """ 

170 else: 

171 query = f""" 

172 DELETE {{ 

173 <{subject}> <{predicate}> <{entity_uri}> . 

174 }} 

175 WHERE {{ 

176 <{subject}> <{predicate}> <{entity_uri}> . 

177 }} 

178 """ 

179 

180 try: 

181 self.sparql.setQuery(query) 

182 self.sparql.method = "POST" 

183 self.sparql.query() 

184 self.logger.info( 

185 "Removed reference from %s to %s via %s", 

186 subject, 

187 entity_uri, 

188 predicate, 

189 ) 

190 except SPARQLWrapperException: 

191 self.logger.exception( 

192 "Error removing reference from %s to %s via %s", 

193 subject, 

194 entity_uri, 

195 predicate, 

196 ) 

197 success = False 

198 

199 return success 

200 

201 def process_missing_entities(self) -> list[MissingEntityResult]: 

202 """ 

203 Process all missing entity references in the dataset. 

204 

205 This method: 

206 1. Finds all missing entity references along with their references 

207 2. For each missing entity, removes all references to it 

208 

209 Returns: 

210 List[Dict]: A list of dictionaries containing results for each missing 

211 entity processed 

212 Each dictionary includes: 

213 - uri: the URI of the missing entity 

214 - references: list of references that were processed 

215 - success: boolean indicating if all references were successfully 

216 removed 

217 """ 

218 missing_entities_with_refs = self.find_missing_entities_with_references() 

219 

220 if not missing_entities_with_refs: 

221 self.logger.info("No missing entity references found.") 

222 return [] 

223 

224 num_missing_entities = len(missing_entities_with_refs) 

225 self.logger.info("Found %s missing entity references.", num_missing_entities) 

226 

227 total_references = sum( 

228 len(refs) for refs in missing_entities_with_refs.values() 

229 ) 

230 results = [] 

231 

232 for entity_uri, references in missing_entities_with_refs.items(): 

233 self.logger.info("Processing missing entity: %s", entity_uri) 

234 

235 self.logger.info( 

236 "Found %s references to missing entity %s", 

237 len(references), 

238 entity_uri, 

239 ) 

240 

241 success = self.remove_references(entity_uri, references) 

242 

243 if not success: 

244 self.logger.error( 

245 "Failed to remove references to missing entity %s", 

246 entity_uri, 

247 ) 

248 

249 results.append( 

250 {"uri": entity_uri, "references": references, "success": success} 

251 ) 

252 

253 successful = all(result["success"] for result in results) 

254 if successful: 

255 self.logger.info( 

256 "Successfully processed all missing" 

257 " entities. Found %s missing entities" 

258 " and removed %s references.", 

259 num_missing_entities, 

260 total_references, 

261 ) 

262 

263 return results 

264 

265 

266def clean_missing_entities( 

267 endpoint: str, *, is_virtuoso: bool = False 

268) -> list[MissingEntityResult]: 

269 """ 

270 Clean up references to missing entities from the dataset. 

271 

272 Args: 

273 endpoint: The SPARQL endpoint for the database 

274 is_virtuoso: Boolean indicating if the endpoint is Virtuoso 

275 

276 Returns: 

277 List[Dict]: Results of processing each missing entity 

278 """ 

279 cleaner = MissingEntityCleaner(endpoint=endpoint, is_virtuoso=is_virtuoso) 

280 return cleaner.process_missing_entities() 

281 

282 

283def load_config(config_path: str) -> types.ModuleType: 

284 """ 

285 Load configuration from a Python file. 

286 

287 Args: 

288 config_path: Path to the configuration file 

289 

290 Returns: 

291 module: The loaded configuration module 

292 """ 

293 try: 

294 spec = importlib.util.spec_from_file_location("config", config_path) 

295 if spec is None or spec.loader is None: 

296 logger.error("Failed to create module spec from %s", config_path) 

297 sys.exit(1) 

298 config = importlib.util.module_from_spec(spec) 

299 spec.loader.exec_module(config) 

300 except SystemExit: 

301 raise 

302 except (FileNotFoundError, ImportError, AttributeError): 

303 logger.exception("Error loading configuration file: %s", config_path) 

304 sys.exit(1) 

305 else: 

306 return config 

307 

308 

309def main() -> int: 

310 parser = argparse.ArgumentParser( 

311 description=( 

312 "Detect and clean up references to missing entities from the dataset" 

313 ) 

314 ) 

315 parser.add_argument( 

316 "--config", "-c", required=True, help="Path to the configuration file" 

317 ) 

318 parser.add_argument( 

319 "--verbose", "-v", action="store_true", help="Enable verbose logging" 

320 ) 

321 

322 args = parser.parse_args() 

323 

324 log_level = logging.DEBUG if args.verbose else logging.INFO 

325 logging.basicConfig( 

326 level=log_level, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 

327 ) 

328 

329 config = load_config(args.config) 

330 

331 if not hasattr(config.Config, "DATASET_DB_URL"): 

332 logger.error("Config class must define DATASET_DB_URL") 

333 return 1 

334 

335 endpoint = config.Config.DATASET_DB_URL 

336 

337 is_virtuoso = False 

338 if hasattr(config.Config, "DATASET_DB_TRIPLESTORE"): 

339 is_virtuoso = config.Config.DATASET_DB_TRIPLESTORE.lower() == "virtuoso" 

340 

341 logger.info( 

342 "Starting missing entity detection and cleanup using endpoint: %s", 

343 endpoint, 

344 ) 

345 

346 results = clean_missing_entities(endpoint=endpoint, is_virtuoso=is_virtuoso) 

347 

348 successful = all(result["success"] for result in results) 

349 if not results: 

350 logger.info("No missing entity references found") 

351 return 0 

352 if successful: 

353 logger.info( 

354 "Successfully cleaned up missing entity" 

355 " references from the dataset." 

356 " Processed %s missing entities.", 

357 len(results), 

358 ) 

359 return 0 

360 logger.error( 

361 "Failed to clean up some missing entity" 

362 " references from the dataset." 

363 " %s entities had errors.", 

364 len([r for r in results if not r["success"]]), 

365 ) 

366 return 1 

367 

368 

369if __name__ == "__main__": # pragma: no cover 

370 sys.exit(main())