Coverage for src / piccione / upload / on_triplestore.py: 100%

44 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-03-21 11:49 +0000

1# SPDX-FileCopyrightText: 2025 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5import argparse 

6import os 

7 

8from piccione.upload.cache_manager import CacheManager 

9from sparqlite import SPARQLClient 

10from tqdm import tqdm 

11 

12 

13def save_failed_query_file(filename, failed_file): 

14 with open(failed_file, "a", encoding="utf8") as failed_file: 

15 failed_file.write(f"{filename}\n") 

16 

17 

18def remove_stop_file(stop_file): 

19 if os.path.exists(stop_file): 

20 os.remove(stop_file) 

21 print(f"Existing stop file {stop_file} has been removed.") 

22 

23 

24def upload_sparql_updates( 

25 endpoint, 

26 folder, 

27 failed_file="failed_queries.txt", 

28 stop_file=".stop_upload", 

29 redis_host=None, 

30 redis_port=None, 

31 redis_db=None, 

32 description="Processing files", 

33 show_progress=True, 

34): 

35 if not os.path.exists(folder): 

36 return 

37 

38 cache_manager = None 

39 if redis_host is not None: 

40 cache_manager = CacheManager( 

41 redis_host=redis_host, 

42 redis_port=redis_port, 

43 redis_db=redis_db, 

44 ) 

45 

46 all_files = [f for f in os.listdir(folder) if f.endswith(".sparql")] 

47 if cache_manager is not None: 

48 files_to_process = [f for f in all_files if f not in cache_manager] 

49 else: 

50 files_to_process = all_files 

51 

52 if not files_to_process: 

53 return 

54 

55 iterator = tqdm(files_to_process, desc=description) if show_progress else files_to_process 

56 with SPARQLClient(endpoint, max_retries=3, backoff_factor=5) as client: 

57 for file in iterator: 

58 if os.path.exists(stop_file): 

59 print(f"\nStop file {stop_file} detected. Interrupting the process...") 

60 break 

61 

62 file_path = os.path.join(folder, file) 

63 

64 with open(file_path, "r", encoding="utf-8") as f: 

65 query = f.read().strip() 

66 

67 if not query: 

68 if cache_manager is not None: 

69 cache_manager.add(file) 

70 continue 

71 

72 try: 

73 client.update(query) 

74 if cache_manager is not None: 

75 cache_manager.add(file) 

76 except Exception as e: 

77 print(f"Failed to execute {file}: {e}") 

78 save_failed_query_file(file, failed_file) 

79 

80 

81def main(): # pragma: no cover 

82 parser = argparse.ArgumentParser( 

83 description="Execute SPARQL update queries on a triple store." 

84 ) 

85 parser.add_argument("endpoint", type=str, help="Endpoint URL of the triple store") 

86 parser.add_argument( 

87 "folder", 

88 type=str, 

89 help="Path to the folder containing SPARQL update query files", 

90 ) 

91 parser.add_argument( 

92 "--failed_file", 

93 type=str, 

94 default="failed_queries.txt", 

95 help="Path to failed queries file", 

96 ) 

97 parser.add_argument( 

98 "--stop_file", type=str, default=".stop_upload", help="Path to stop file" 

99 ) 

100 parser.add_argument("--redis_host", type=str, help="Redis host for caching") 

101 parser.add_argument("--redis_port", type=int, help="Redis port") 

102 parser.add_argument("--redis_db", type=int, help="Redis database number") 

103 

104 args = parser.parse_args() 

105 

106 remove_stop_file(args.stop_file) 

107 

108 upload_sparql_updates( 

109 args.endpoint, 

110 args.folder, 

111 args.failed_file, 

112 args.stop_file, 

113 redis_host=args.redis_host, 

114 redis_port=args.redis_port, 

115 redis_db=args.redis_db, 

116 ) 

117 

118 

119if __name__ == "__main__": # pragma: no cover 

120 main()