Coverage for src / piccione / upload / on_triplestore.py: 100%
44 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-03-21 11:49 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-03-21 11:49 +0000
1# SPDX-FileCopyrightText: 2025 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5import argparse
6import os
8from piccione.upload.cache_manager import CacheManager
9from sparqlite import SPARQLClient
10from tqdm import tqdm
13def save_failed_query_file(filename, failed_file):
14 with open(failed_file, "a", encoding="utf8") as failed_file:
15 failed_file.write(f"{filename}\n")
18def remove_stop_file(stop_file):
19 if os.path.exists(stop_file):
20 os.remove(stop_file)
21 print(f"Existing stop file {stop_file} has been removed.")
24def upload_sparql_updates(
25 endpoint,
26 folder,
27 failed_file="failed_queries.txt",
28 stop_file=".stop_upload",
29 redis_host=None,
30 redis_port=None,
31 redis_db=None,
32 description="Processing files",
33 show_progress=True,
34):
35 if not os.path.exists(folder):
36 return
38 cache_manager = None
39 if redis_host is not None:
40 cache_manager = CacheManager(
41 redis_host=redis_host,
42 redis_port=redis_port,
43 redis_db=redis_db,
44 )
46 all_files = [f for f in os.listdir(folder) if f.endswith(".sparql")]
47 if cache_manager is not None:
48 files_to_process = [f for f in all_files if f not in cache_manager]
49 else:
50 files_to_process = all_files
52 if not files_to_process:
53 return
55 iterator = tqdm(files_to_process, desc=description) if show_progress else files_to_process
56 with SPARQLClient(endpoint, max_retries=3, backoff_factor=5) as client:
57 for file in iterator:
58 if os.path.exists(stop_file):
59 print(f"\nStop file {stop_file} detected. Interrupting the process...")
60 break
62 file_path = os.path.join(folder, file)
64 with open(file_path, "r", encoding="utf-8") as f:
65 query = f.read().strip()
67 if not query:
68 if cache_manager is not None:
69 cache_manager.add(file)
70 continue
72 try:
73 client.update(query)
74 if cache_manager is not None:
75 cache_manager.add(file)
76 except Exception as e:
77 print(f"Failed to execute {file}: {e}")
78 save_failed_query_file(file, failed_file)
81def main(): # pragma: no cover
82 parser = argparse.ArgumentParser(
83 description="Execute SPARQL update queries on a triple store."
84 )
85 parser.add_argument("endpoint", type=str, help="Endpoint URL of the triple store")
86 parser.add_argument(
87 "folder",
88 type=str,
89 help="Path to the folder containing SPARQL update query files",
90 )
91 parser.add_argument(
92 "--failed_file",
93 type=str,
94 default="failed_queries.txt",
95 help="Path to failed queries file",
96 )
97 parser.add_argument(
98 "--stop_file", type=str, default=".stop_upload", help="Path to stop file"
99 )
100 parser.add_argument("--redis_host", type=str, help="Redis host for caching")
101 parser.add_argument("--redis_port", type=int, help="Redis port")
102 parser.add_argument("--redis_db", type=int, help="Redis database number")
104 args = parser.parse_args()
106 remove_stop_file(args.stop_file)
108 upload_sparql_updates(
109 args.endpoint,
110 args.folder,
111 args.failed_file,
112 args.stop_file,
113 redis_host=args.redis_host,
114 redis_port=args.redis_port,
115 redis_db=args.redis_db,
116 )
119if __name__ == "__main__": # pragma: no cover
120 main()