Coverage for src / piccione / upload / on_triplestore.py: 100%
44 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 13:41 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 13:41 +0000
1import argparse
2import os
4from piccione.upload.cache_manager import CacheManager
5from sparqlite import SPARQLClient
6from tqdm import tqdm
9def save_failed_query_file(filename, failed_file):
10 with open(failed_file, "a", encoding="utf8") as failed_file:
11 failed_file.write(f"{filename}\n")
14def remove_stop_file(stop_file):
15 if os.path.exists(stop_file):
16 os.remove(stop_file)
17 print(f"Existing stop file {stop_file} has been removed.")
20def upload_sparql_updates(
21 endpoint,
22 folder,
23 failed_file="failed_queries.txt",
24 stop_file=".stop_upload",
25 redis_host=None,
26 redis_port=None,
27 redis_db=None,
28 description="Processing files",
29 show_progress=True,
30):
31 if not os.path.exists(folder):
32 return
34 cache_manager = None
35 if redis_host is not None:
36 cache_manager = CacheManager(
37 redis_host=redis_host,
38 redis_port=redis_port,
39 redis_db=redis_db,
40 )
42 all_files = [f for f in os.listdir(folder) if f.endswith(".sparql")]
43 if cache_manager is not None:
44 files_to_process = [f for f in all_files if f not in cache_manager]
45 else:
46 files_to_process = all_files
48 if not files_to_process:
49 return
51 iterator = tqdm(files_to_process, desc=description) if show_progress else files_to_process
52 with SPARQLClient(endpoint, max_retries=3, backoff_factor=5) as client:
53 for file in iterator:
54 if os.path.exists(stop_file):
55 print(f"\nStop file {stop_file} detected. Interrupting the process...")
56 break
58 file_path = os.path.join(folder, file)
60 with open(file_path, "r", encoding="utf-8") as f:
61 query = f.read().strip()
63 if not query:
64 if cache_manager is not None:
65 cache_manager.add(file)
66 continue
68 try:
69 client.update(query)
70 if cache_manager is not None:
71 cache_manager.add(file)
72 except Exception as e:
73 print(f"Failed to execute {file}: {e}")
74 save_failed_query_file(file, failed_file)
77def main(): # pragma: no cover
78 parser = argparse.ArgumentParser(
79 description="Execute SPARQL update queries on a triple store."
80 )
81 parser.add_argument("endpoint", type=str, help="Endpoint URL of the triple store")
82 parser.add_argument(
83 "folder",
84 type=str,
85 help="Path to the folder containing SPARQL update query files",
86 )
87 parser.add_argument(
88 "--failed_file",
89 type=str,
90 default="failed_queries.txt",
91 help="Path to failed queries file",
92 )
93 parser.add_argument(
94 "--stop_file", type=str, default=".stop_upload", help="Path to stop file"
95 )
96 parser.add_argument("--redis_host", type=str, help="Redis host for caching")
97 parser.add_argument("--redis_port", type=int, help="Redis port")
98 parser.add_argument("--redis_db", type=int, help="Redis database number")
100 args = parser.parse_args()
102 remove_stop_file(args.stop_file)
104 upload_sparql_updates(
105 args.endpoint,
106 args.folder,
107 args.failed_file,
108 args.stop_file,
109 redis_host=args.redis_host,
110 redis_port=args.redis_port,
111 redis_db=args.redis_db,
112 )
115if __name__ == "__main__": # pragma: no cover
116 main()