Coverage for src / piccione / upload / on_triplestore.py: 100%
44 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-27 20:21 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-27 20:21 +0000
1# SPDX-FileCopyrightText: 2025 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5import argparse
6from pathlib import Path
8from rich.console import Console
9from sparqlite import SPARQLClient
10from tqdm import tqdm
12from piccione.upload.cache_manager import CacheManager
14console = Console()
17def save_failed_query_file(filename: str, failed_file: str | Path) -> None:
18 with Path(failed_file).open("a", encoding="utf8") as f:
19 f.write(f"{filename}\n")
22def remove_stop_file(stop_file: str | Path) -> None:
23 if Path(stop_file).exists():
24 Path(stop_file).unlink()
25 console.print(f"Existing stop file {stop_file} has been removed.")
28def upload_sparql_updates( # noqa: PLR0913
29 endpoint: str,
30 folder: str | Path,
31 *,
32 failed_file: str | Path = "failed_queries.txt",
33 stop_file: str | Path = ".stop_upload",
34 redis_host: str | None = None,
35 redis_port: int = 6379,
36 redis_db: int = 4,
37 description: str = "Processing files",
38 show_progress: bool = True,
39) -> None:
40 if not Path(folder).exists():
41 return
43 cache_manager = None
44 if redis_host is not None:
45 cache_manager = CacheManager(
46 redis_host=redis_host,
47 redis_port=redis_port,
48 redis_db=redis_db,
49 )
51 all_files = [f.name for f in Path(folder).iterdir() if f.name.endswith(".sparql")]
52 files_to_process = [f for f in all_files if f not in cache_manager] if cache_manager is not None else all_files
54 if not files_to_process:
55 return
57 iterator = tqdm(files_to_process, desc=description) if show_progress else files_to_process
58 with SPARQLClient(endpoint, max_retries=3, backoff_factor=5) as client:
59 for file in iterator:
60 if Path(stop_file).exists():
61 console.print(f"\nStop file {stop_file} detected. Interrupting the process...")
62 break
64 file_path = Path(folder) / file
66 with file_path.open(encoding="utf-8") as f:
67 query = f.read().strip()
69 if not query:
70 if cache_manager is not None:
71 cache_manager.add(file)
72 continue
74 try:
75 client.update(query)
76 if cache_manager is not None:
77 cache_manager.add(file)
78 except Exception as e: # noqa: BLE001
79 console.print(f"Failed to execute {file}: {e}")
80 save_failed_query_file(file, failed_file)
83def main() -> None: # pragma: no cover
84 parser = argparse.ArgumentParser(description="Execute SPARQL update queries on a triple store.")
85 parser.add_argument("endpoint", type=str, help="Endpoint URL of the triple store")
86 parser.add_argument(
87 "folder",
88 type=str,
89 help="Path to the folder containing SPARQL update query files",
90 )
91 parser.add_argument(
92 "--failed_file",
93 type=str,
94 default="failed_queries.txt",
95 help="Path to failed queries file",
96 )
97 parser.add_argument("--stop_file", type=str, default=".stop_upload", help="Path to stop file")
98 parser.add_argument("--redis_host", type=str, help="Redis host for caching")
99 parser.add_argument("--redis_port", type=int, help="Redis port")
100 parser.add_argument("--redis_db", type=int, help="Redis database number")
102 args = parser.parse_args()
104 remove_stop_file(args.stop_file)
106 upload_sparql_updates(
107 args.endpoint,
108 args.folder,
109 failed_file=args.failed_file,
110 stop_file=args.stop_file,
111 redis_host=args.redis_host,
112 redis_port=args.redis_port or 6379,
113 redis_db=args.redis_db or 4,
114 )
117if __name__ == "__main__": # pragma: no cover
118 main()