Coverage for heritrace / scripts / clean_missing_entities.py: 98%
124 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-07-02 10:16 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-07-02 10:16 +0000
1#!/usr/bin/env python3
3# SPDX-FileCopyrightText: 2025 Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# SPDX-License-Identifier: ISC
7import argparse
8import importlib.util
9import logging
10import sys
11import types
12from typing import TypedDict
14from SPARQLWrapper import JSON
15from SPARQLWrapper.SPARQLExceptions import SPARQLWrapperException
17from heritrace.sparql import SPARQLWrapperWithRetry, get_sparql_bindings
18from heritrace.utils.sparql_utils import VIRTUOSO_EXCLUDED_GRAPHS
20logger = logging.getLogger(__name__)
23class MissingEntityResult(TypedDict):
24 uri: str
25 references: list[dict[str, str]]
26 success: bool
29class MissingEntityCleaner:
30 """
31 A class to detect and clean up references to missing entities from the dataset.
33 Missing entities are URIs that are referenced by triples but don't actually exist
34 in the dataset (they have no triples where they are the subject). The script
35 identifies
36 these missing references and removes all triples that reference them.
37 """
39 def __init__(self, endpoint: str, *, is_virtuoso: bool = False) -> None:
40 """
41 Initialize the MissingEntityCleaner.
43 Args:
44 endpoint: The SPARQL endpoint for the database
45 is_virtuoso: Boolean indicating if the endpoint is Virtuoso
46 """
47 self.endpoint = endpoint
48 self.is_virtuoso = is_virtuoso
49 self.sparql = SPARQLWrapperWithRetry(endpoint)
50 self.sparql.setReturnFormat(JSON)
51 self.logger = logging.getLogger(__name__)
53 def find_missing_entities_with_references(self) -> dict[str, list[dict[str, str]]]:
54 """
55 Find missing entity references in the dataset along with their references.
57 A missing entity is one that:
58 1. Is referenced as an object in at least one triple
59 2. Has no triples where it is the subject (completely missing)
61 The following are excluded from being considered missing entities:
62 - Objects of rdf:type triples (types are not considered entities)
63 - Objects of ProWithRole triples
64 - Objects of datacite:usesIdentifierScheme triples
66 Returns:
67 Dictionary mapping missing entity URIs to lists of reference dictionaries
68 """
69 is_quad_store = self.is_virtuoso
71 # Define predicates to exclude
72 rdf_type = "http://www.w3.org/1999/02/22-rdf-syntax-ns#type"
73 pro_with_role = "http://purl.org/spar/pro/withRole"
74 datacite_uses_identifier_scheme = (
75 "http://purl.org/spar/datacite/usesIdentifierScheme"
76 )
78 # Combine all predicates to exclude
79 excluded_predicates = [rdf_type, pro_with_role, datacite_uses_identifier_scheme]
81 # Format the excluded predicates for SPARQL
82 excluded_predicates_filter = " && ".join(
83 [f"?p != <{pred}>" for pred in excluded_predicates]
84 )
86 if is_quad_store:
87 query = f"""
88 SELECT DISTINCT ?entity ?s ?p
89 WHERE {{
90 GRAPH ?g1 {{
91 ?s ?p ?entity .
92 FILTER(isIRI(?entity))
93 FILTER({excluded_predicates_filter})
94 }}
95 FILTER NOT EXISTS {{
96 GRAPH ?g2 {{
97 ?entity ?anyPredicate ?anyObject .
98 }}
99 }}
100 FILTER(?g1 NOT IN (<{">, <".join(VIRTUOSO_EXCLUDED_GRAPHS)}>))
101 }}
102 """
103 else:
104 query = f"""
105 SELECT DISTINCT ?entity ?s ?p
106 WHERE {{
107 ?s ?p ?entity .
108 FILTER(isIRI(?entity))
109 FILTER({excluded_predicates_filter})
110 FILTER NOT EXISTS {{
111 ?entity ?anyPredicate ?anyObject .
112 }}
113 }}
114 """
116 self.sparql.setQuery(query)
117 bindings = get_sparql_bindings(self.sparql.queryAndConvert())
119 missing_entities: dict[str, list[dict[str, str]]] = {}
121 for result in bindings:
122 entity_uri = result["entity"]["value"]
123 subject = result["s"]["value"]
124 predicate = result["p"]["value"]
126 if entity_uri not in missing_entities:
127 missing_entities[entity_uri] = []
129 missing_entities[entity_uri].append(
130 {"subject": subject, "predicate": predicate}
131 )
133 return missing_entities
135 def remove_references(
136 self, entity_uri: str, references: list[dict[str, str]]
137 ) -> bool:
138 """
139 Remove all references to a missing entity.
141 Args:
142 entity_uri: The URI of the missing entity
143 references: List of references to the missing entity
145 Returns:
146 bool: True if all references were successfully removed, False otherwise
147 """
148 success = True
150 for reference in references:
151 subject = reference["subject"]
152 predicate = reference["predicate"]
154 is_quad_store = self.is_virtuoso
156 if is_quad_store:
157 query = f"""
158 DELETE {{
159 GRAPH ?g {{
160 <{subject}> <{predicate}> <{entity_uri}> .
161 }}
162 }}
163 WHERE {{
164 GRAPH ?g {{
165 <{subject}> <{predicate}> <{entity_uri}> .
166 }}
167 FILTER(?g NOT IN (<{">, <".join(VIRTUOSO_EXCLUDED_GRAPHS)}>))
168 }}
169 """
170 else:
171 query = f"""
172 DELETE {{
173 <{subject}> <{predicate}> <{entity_uri}> .
174 }}
175 WHERE {{
176 <{subject}> <{predicate}> <{entity_uri}> .
177 }}
178 """
180 try:
181 self.sparql.setQuery(query)
182 self.sparql.method = "POST"
183 self.sparql.query()
184 self.logger.info(
185 "Removed reference from %s to %s via %s",
186 subject,
187 entity_uri,
188 predicate,
189 )
190 except SPARQLWrapperException:
191 self.logger.exception(
192 "Error removing reference from %s to %s via %s",
193 subject,
194 entity_uri,
195 predicate,
196 )
197 success = False
199 return success
201 def process_missing_entities(self) -> list[MissingEntityResult]:
202 """
203 Process all missing entity references in the dataset.
205 This method:
206 1. Finds all missing entity references along with their references
207 2. For each missing entity, removes all references to it
209 Returns:
210 List[Dict]: A list of dictionaries containing results for each missing
211 entity processed
212 Each dictionary includes:
213 - uri: the URI of the missing entity
214 - references: list of references that were processed
215 - success: boolean indicating if all references were successfully
216 removed
217 """
218 missing_entities_with_refs = self.find_missing_entities_with_references()
220 if not missing_entities_with_refs:
221 self.logger.info("No missing entity references found.")
222 return []
224 num_missing_entities = len(missing_entities_with_refs)
225 self.logger.info("Found %s missing entity references.", num_missing_entities)
227 total_references = sum(
228 len(refs) for refs in missing_entities_with_refs.values()
229 )
230 results = []
232 for entity_uri, references in missing_entities_with_refs.items():
233 self.logger.info("Processing missing entity: %s", entity_uri)
235 self.logger.info(
236 "Found %s references to missing entity %s",
237 len(references),
238 entity_uri,
239 )
241 success = self.remove_references(entity_uri, references)
243 if not success:
244 self.logger.error(
245 "Failed to remove references to missing entity %s",
246 entity_uri,
247 )
249 results.append(
250 {"uri": entity_uri, "references": references, "success": success}
251 )
253 successful = all(result["success"] for result in results)
254 if successful:
255 self.logger.info(
256 "Successfully processed all missing"
257 " entities. Found %s missing entities"
258 " and removed %s references.",
259 num_missing_entities,
260 total_references,
261 )
263 return results
266def clean_missing_entities(
267 endpoint: str, *, is_virtuoso: bool = False
268) -> list[MissingEntityResult]:
269 """
270 Clean up references to missing entities from the dataset.
272 Args:
273 endpoint: The SPARQL endpoint for the database
274 is_virtuoso: Boolean indicating if the endpoint is Virtuoso
276 Returns:
277 List[Dict]: Results of processing each missing entity
278 """
279 cleaner = MissingEntityCleaner(endpoint=endpoint, is_virtuoso=is_virtuoso)
280 return cleaner.process_missing_entities()
283def load_config(config_path: str) -> types.ModuleType:
284 """
285 Load configuration from a Python file.
287 Args:
288 config_path: Path to the configuration file
290 Returns:
291 module: The loaded configuration module
292 """
293 try:
294 spec = importlib.util.spec_from_file_location("config", config_path)
295 if spec is None or spec.loader is None:
296 logger.error("Failed to create module spec from %s", config_path)
297 sys.exit(1)
298 config = importlib.util.module_from_spec(spec)
299 spec.loader.exec_module(config)
300 except SystemExit:
301 raise
302 except (FileNotFoundError, ImportError, AttributeError):
303 logger.exception("Error loading configuration file: %s", config_path)
304 sys.exit(1)
305 else:
306 return config
309def main() -> int:
310 parser = argparse.ArgumentParser(
311 description=(
312 "Detect and clean up references to missing entities from the dataset"
313 )
314 )
315 parser.add_argument(
316 "--config", "-c", required=True, help="Path to the configuration file"
317 )
318 parser.add_argument(
319 "--verbose", "-v", action="store_true", help="Enable verbose logging"
320 )
322 args = parser.parse_args()
324 log_level = logging.DEBUG if args.verbose else logging.INFO
325 logging.basicConfig(
326 level=log_level, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
327 )
329 config = load_config(args.config)
331 if not hasattr(config.Config, "DATASET_DB_URL"):
332 logger.error("Config class must define DATASET_DB_URL")
333 return 1
335 endpoint = config.Config.DATASET_DB_URL
337 is_virtuoso = False
338 if hasattr(config.Config, "DATASET_DB_TRIPLESTORE"):
339 is_virtuoso = config.Config.DATASET_DB_TRIPLESTORE.lower() == "virtuoso"
341 logger.info(
342 "Starting missing entity detection and cleanup using endpoint: %s",
343 endpoint,
344 )
346 results = clean_missing_entities(endpoint=endpoint, is_virtuoso=is_virtuoso)
348 successful = all(result["success"] for result in results)
349 if not results:
350 logger.info("No missing entity references found")
351 return 0
352 if successful:
353 logger.info(
354 "Successfully cleaned up missing entity"
355 " references from the dataset."
356 " Processed %s missing entities.",
357 len(results),
358 )
359 return 0
360 logger.error(
361 "Failed to clean up some missing entity"
362 " references from the dataset."
363 " %s entities had errors.",
364 len([r for r in results if not r["success"]]),
365 )
366 return 1
369if __name__ == "__main__": # pragma: no cover
370 sys.exit(main())