Coverage for src / piccione / upload / on_triplestore.py: 100%

44 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-27 20:21 +0000

1# SPDX-FileCopyrightText: 2025 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5import argparse 

6from pathlib import Path 

7 

8from rich.console import Console 

9from sparqlite import SPARQLClient 

10from tqdm import tqdm 

11 

12from piccione.upload.cache_manager import CacheManager 

13 

14console = Console() 

15 

16 

17def save_failed_query_file(filename: str, failed_file: str | Path) -> None: 

18 with Path(failed_file).open("a", encoding="utf8") as f: 

19 f.write(f"{filename}\n") 

20 

21 

22def remove_stop_file(stop_file: str | Path) -> None: 

23 if Path(stop_file).exists(): 

24 Path(stop_file).unlink() 

25 console.print(f"Existing stop file {stop_file} has been removed.") 

26 

27 

28def upload_sparql_updates( # noqa: PLR0913 

29 endpoint: str, 

30 folder: str | Path, 

31 *, 

32 failed_file: str | Path = "failed_queries.txt", 

33 stop_file: str | Path = ".stop_upload", 

34 redis_host: str | None = None, 

35 redis_port: int = 6379, 

36 redis_db: int = 4, 

37 description: str = "Processing files", 

38 show_progress: bool = True, 

39) -> None: 

40 if not Path(folder).exists(): 

41 return 

42 

43 cache_manager = None 

44 if redis_host is not None: 

45 cache_manager = CacheManager( 

46 redis_host=redis_host, 

47 redis_port=redis_port, 

48 redis_db=redis_db, 

49 ) 

50 

51 all_files = [f.name for f in Path(folder).iterdir() if f.name.endswith(".sparql")] 

52 files_to_process = [f for f in all_files if f not in cache_manager] if cache_manager is not None else all_files 

53 

54 if not files_to_process: 

55 return 

56 

57 iterator = tqdm(files_to_process, desc=description) if show_progress else files_to_process 

58 with SPARQLClient(endpoint, max_retries=3, backoff_factor=5) as client: 

59 for file in iterator: 

60 if Path(stop_file).exists(): 

61 console.print(f"\nStop file {stop_file} detected. Interrupting the process...") 

62 break 

63 

64 file_path = Path(folder) / file 

65 

66 with file_path.open(encoding="utf-8") as f: 

67 query = f.read().strip() 

68 

69 if not query: 

70 if cache_manager is not None: 

71 cache_manager.add(file) 

72 continue 

73 

74 try: 

75 client.update(query) 

76 if cache_manager is not None: 

77 cache_manager.add(file) 

78 except Exception as e: # noqa: BLE001 

79 console.print(f"Failed to execute {file}: {e}") 

80 save_failed_query_file(file, failed_file) 

81 

82 

83def main() -> None: # pragma: no cover 

84 parser = argparse.ArgumentParser(description="Execute SPARQL update queries on a triple store.") 

85 parser.add_argument("endpoint", type=str, help="Endpoint URL of the triple store") 

86 parser.add_argument( 

87 "folder", 

88 type=str, 

89 help="Path to the folder containing SPARQL update query files", 

90 ) 

91 parser.add_argument( 

92 "--failed_file", 

93 type=str, 

94 default="failed_queries.txt", 

95 help="Path to failed queries file", 

96 ) 

97 parser.add_argument("--stop_file", type=str, default=".stop_upload", help="Path to stop file") 

98 parser.add_argument("--redis_host", type=str, help="Redis host for caching") 

99 parser.add_argument("--redis_port", type=int, help="Redis port") 

100 parser.add_argument("--redis_db", type=int, help="Redis database number") 

101 

102 args = parser.parse_args() 

103 

104 remove_stop_file(args.stop_file) 

105 

106 upload_sparql_updates( 

107 args.endpoint, 

108 args.folder, 

109 failed_file=args.failed_file, 

110 stop_file=args.stop_file, 

111 redis_host=args.redis_host, 

112 redis_port=args.redis_port or 6379, 

113 redis_db=args.redis_db or 4, 

114 ) 

115 

116 

117if __name__ == "__main__": # pragma: no cover 

118 main()