Coverage for src / piccione / upload / on_triplestore.py: 100%

44 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-11 13:41 +0000

1import argparse 

2import os 

3 

4from piccione.upload.cache_manager import CacheManager 

5from sparqlite import SPARQLClient 

6from tqdm import tqdm 

7 

8 

9def save_failed_query_file(filename, failed_file): 

10 with open(failed_file, "a", encoding="utf8") as failed_file: 

11 failed_file.write(f"{filename}\n") 

12 

13 

14def remove_stop_file(stop_file): 

15 if os.path.exists(stop_file): 

16 os.remove(stop_file) 

17 print(f"Existing stop file {stop_file} has been removed.") 

18 

19 

20def upload_sparql_updates( 

21 endpoint, 

22 folder, 

23 failed_file="failed_queries.txt", 

24 stop_file=".stop_upload", 

25 redis_host=None, 

26 redis_port=None, 

27 redis_db=None, 

28 description="Processing files", 

29 show_progress=True, 

30): 

31 if not os.path.exists(folder): 

32 return 

33 

34 cache_manager = None 

35 if redis_host is not None: 

36 cache_manager = CacheManager( 

37 redis_host=redis_host, 

38 redis_port=redis_port, 

39 redis_db=redis_db, 

40 ) 

41 

42 all_files = [f for f in os.listdir(folder) if f.endswith(".sparql")] 

43 if cache_manager is not None: 

44 files_to_process = [f for f in all_files if f not in cache_manager] 

45 else: 

46 files_to_process = all_files 

47 

48 if not files_to_process: 

49 return 

50 

51 iterator = tqdm(files_to_process, desc=description) if show_progress else files_to_process 

52 with SPARQLClient(endpoint, max_retries=3, backoff_factor=5) as client: 

53 for file in iterator: 

54 if os.path.exists(stop_file): 

55 print(f"\nStop file {stop_file} detected. Interrupting the process...") 

56 break 

57 

58 file_path = os.path.join(folder, file) 

59 

60 with open(file_path, "r", encoding="utf-8") as f: 

61 query = f.read().strip() 

62 

63 if not query: 

64 if cache_manager is not None: 

65 cache_manager.add(file) 

66 continue 

67 

68 try: 

69 client.update(query) 

70 if cache_manager is not None: 

71 cache_manager.add(file) 

72 except Exception as e: 

73 print(f"Failed to execute {file}: {e}") 

74 save_failed_query_file(file, failed_file) 

75 

76 

77def main(): # pragma: no cover 

78 parser = argparse.ArgumentParser( 

79 description="Execute SPARQL update queries on a triple store." 

80 ) 

81 parser.add_argument("endpoint", type=str, help="Endpoint URL of the triple store") 

82 parser.add_argument( 

83 "folder", 

84 type=str, 

85 help="Path to the folder containing SPARQL update query files", 

86 ) 

87 parser.add_argument( 

88 "--failed_file", 

89 type=str, 

90 default="failed_queries.txt", 

91 help="Path to failed queries file", 

92 ) 

93 parser.add_argument( 

94 "--stop_file", type=str, default=".stop_upload", help="Path to stop file" 

95 ) 

96 parser.add_argument("--redis_host", type=str, help="Redis host for caching") 

97 parser.add_argument("--redis_port", type=int, help="Redis port") 

98 parser.add_argument("--redis_db", type=int, help="Redis database number") 

99 

100 args = parser.parse_args() 

101 

102 remove_stop_file(args.stop_file) 

103 

104 upload_sparql_updates( 

105 args.endpoint, 

106 args.folder, 

107 args.failed_file, 

108 args.stop_file, 

109 redis_host=args.redis_host, 

110 redis_port=args.redis_port, 

111 redis_db=args.redis_db, 

112 ) 

113 

114 

115if __name__ == "__main__": # pragma: no cover 

116 main()