Coverage for virtuoso_utilities / bulk_load.py: 87%

89 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-15 14:45 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3 

4""" 

5Performs sequential bulk loading of RDF N-Quads Gzipped files (`.nq.gz`) 

6into OpenLink Virtuoso using the official `ld_dir`/`ld_dir_all` and 

7`rdf_loader_run` method. 

8 

9Registers files matching *.nq.gz using ld_dir/ld_dir_all into DB.DBA.load_list, 

10then runs a single `rdf_loader_run()` process to load the registered files. 

11 

12IMPORTANT: 

13- Only files with the extension `.nq.gz` will be processed. 

14- The data directory specified ('-d' or '--data-directory') MUST be 

15 accessible by the Virtuoso server process itself. 

16- This directory path MUST be listed in the 'DirsAllowed' parameter 

17 within the Virtuoso INI file (e.g., virtuoso.ini). 

18- When using Docker, data-directory is the path INSIDE the container. 

19 Files will be accessed and loaded from within the container. 

20 

21Reference: 

22- https://vos.openlinksw.com/owiki/wiki/VOS/VirtBulkRDFLoader 

23""" 

24 

25import argparse 

26import glob 

27import logging 

28import os 

29import sys 

30 

31from virtuoso_utilities.isql_helpers import run_isql_command 

32 

33logger = logging.getLogger(__name__) 

34 

35DEFAULT_VIRTUOSO_HOST = "localhost" 

36DEFAULT_VIRTUOSO_PORT = 1111 

37DEFAULT_VIRTUOSO_USER = "dba" 

38 

39ISQL_PATH_HOST = "isql" 

40ISQL_PATH_DOCKER = "isql" 

41DOCKER_PATH = "docker" 

42CHECKPOINT_INTERVAL = 60 

43SCHEDULER_INTERVAL = 10 

44 

45NQ_GZ_PATTERN = '*.nq.gz' 

46 

47 

48def find_nquads_files_local(directory, recursive=False): 

49 """ 

50 Find all N-Quads Gzipped files (`*.nq.gz`) in a directory on local filesystem. 

51 Returns a list of file paths. 

52 """ 

53 pattern = NQ_GZ_PATTERN 

54 if recursive: 

55 matches = [] 

56 for root, _, _ in os.walk(directory): 

57 path_pattern = os.path.join(root, pattern) 

58 matches.extend(glob.glob(path_pattern)) 

59 return matches 

60 else: 

61 path_pattern = os.path.join(directory, pattern) 

62 return glob.glob(path_pattern) 

63 

64 

65def bulk_load( 

66 data_directory: str, 

67 password: str, 

68 host: str = "localhost", 

69 port: int = 1111, 

70 user: str = "dba", 

71 recursive: bool = False, 

72 docker_container: str = None, 

73 isql_path: str = ISQL_PATH_HOST, 

74 docker_isql_path: str = ISQL_PATH_DOCKER, 

75 docker_path: str = DOCKER_PATH, 

76 container_data_directory: str = None, 

77 log_level: str = "ERROR" 

78) -> None: 

79 """ 

80 Perform Virtuoso bulk loading of N-Quads files. 

81 

82 This function can be imported and called programmatically, avoiding subprocess overhead. 

83 

84 Args: 

85 data_directory: Path to directory containing .nq.gz files (host path for file operations) 

86 password: Virtuoso DBA password 

87 host: Virtuoso server host 

88 port: Virtuoso server port 

89 user: Virtuoso username 

90 recursive: Use ld_dir_all instead of ld_dir for recursive loading 

91 docker_container: Docker container name (for ISQL commands only) 

92 isql_path: Path to isql binary (local) 

93 docker_isql_path: Path to isql binary inside Docker container 

94 docker_path: Path to docker binary 

95 container_data_directory: Path INSIDE container where Virtuoso accesses files (if different from data_directory) 

96 log_level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL). Default: ERROR 

97 

98 Raises: 

99 RuntimeError: If bulk load fails 

100 """ 

101 

102 logging.basicConfig(level=getattr(logging, log_level.upper()), format='%(levelname)s: %(message)s', force=True) 

103 

104 args = argparse.Namespace() 

105 args.data_directory = data_directory 

106 args.host = host 

107 args.port = port 

108 args.user = user 

109 args.password = password 

110 args.recursive = recursive 

111 args.docker_container = docker_container 

112 args.isql_path = isql_path 

113 args.docker_isql_path = docker_isql_path 

114 args.docker_path = docker_path 

115 args.container_data_directory = container_data_directory 

116 

117 data_dir = args.data_directory 

118 container_dir = args.container_data_directory if args.container_data_directory else data_dir 

119 

120 files = find_nquads_files_local(data_dir, args.recursive) 

121 

122 if not files: 

123 logger.warning(f"No files matching '{NQ_GZ_PATTERN}' found in '{data_dir}'.") 

124 return 

125 

126 # Clean up any leftover entries from previous bulk loads 

127 cleanup_sql = "DELETE FROM DB.DBA.load_list;" 

128 run_isql_command(args, sql_command=cleanup_sql) 

129 

130 ld_function = "ld_dir_all" if args.recursive else "ld_dir" 

131 

132 container_dir_sql_escaped = container_dir.replace("'", "''") 

133 file_pattern_sql_escaped = NQ_GZ_PATTERN.replace("'", "''") 

134 

135 # Use empty string to let Virtuoso read graph URIs from nquads (4th field) 

136 register_sql = f"{ld_function}('{container_dir_sql_escaped}', '{file_pattern_sql_escaped}', '');" 

137 success_reg, _, stderr_reg = run_isql_command(args, sql_command=register_sql) 

138 

139 if not success_reg or "Unable to list files" in stderr_reg or "FA020" in stderr_reg: 

140 raise RuntimeError(f"Failed to register files using {ld_function}.\nError: {stderr_reg}") 

141 

142 loader_sql = "rdf_loader_run();" 

143 success_load, _, stderr_load = run_isql_command(args, sql_command=loader_sql) 

144 

145 if not success_load: 

146 raise RuntimeError(f"rdf_loader_run() failed.\nError: {stderr_load}") 

147 

148 stats_sql = "SELECT COUNT(*) AS total_files, SUM(CASE WHEN ll_state = 2 THEN 1 ELSE 0 END) AS loaded, SUM(CASE WHEN ll_state <> 2 OR ll_error IS NOT NULL THEN 1 ELSE 0 END) AS issues FROM DB.DBA.load_list;" 

149 success_stats, stdout_stats, _ = run_isql_command(args, sql_command=stats_sql) 

150 

151 if success_stats: 

152 lines = stdout_stats.strip().splitlines() 

153 for i, line in enumerate(lines): 

154 if i > 3 and not line.endswith("Rows.") and "INTEGER" not in line and "VARCHAR" not in line: 

155 parts = line.split() 

156 if len(parts) >= 3 and parts[0].isdigit(): 

157 total_files = int(parts[0]) 

158 loaded_files = int(parts[1]) if parts[1] != "NULL" else 0 

159 issues = int(parts[2]) if parts[2] != "NULL" else 0 

160 

161 if total_files != loaded_files or issues != 0: 

162 failed_sql = "SELECT ll_file FROM DB.DBA.load_list WHERE ll_state <> 2 OR ll_error IS NOT NULL;" 

163 success_failed, stdout_failed, _ = run_isql_command(args, sql_command=failed_sql) 

164 failed_files = [] 

165 if success_failed: 

166 for line in stdout_failed.strip().splitlines(): 

167 line = line.strip() 

168 if line and not line.startswith("ll_file") and not line.endswith("Rows.") and line != "VARCHAR": 

169 failed_files.append(line) 

170 raise RuntimeError( 

171 f"Bulk load failed: {issues} file(s) had issues.\n" 

172 f"Failed files:\n" + "\n".join(f" - {f}" for f in failed_files) 

173 ) 

174 break 

175 

176 delete_loaded_sql = "DELETE FROM DB.DBA.load_list WHERE ll_state = 2;" 

177 success_delete, _, stderr_delete = run_isql_command(args, sql_command=delete_loaded_sql) 

178 if not success_delete: 

179 raise RuntimeError(f"Failed to clean up load_list table.\nError: {stderr_delete}") 

180 

181 cleanup_sql = f"log_enable(3, 1); checkpoint; checkpoint_interval({CHECKPOINT_INTERVAL}); scheduler_interval({SCHEDULER_INTERVAL});" 

182 success_final, _, stderr_final = run_isql_command(args, sql_command=cleanup_sql) 

183 if not success_final: 

184 raise RuntimeError(f"Failed to run final checkpoint.\nError details: {stderr_final}") 

185 

186 

187def main(): # pragma: no cover 

188 """ 

189 CLI entry point that parses arguments and calls bulk_load(). 

190 """ 

191 parser = argparse.ArgumentParser( 

192 description=f"Sequential N-Quads Gzipped (`{NQ_GZ_PATTERN}`) bulk loader for OpenLink Virtuoso using ld_dir/rdf_loader_run.", 

193 formatter_class=argparse.RawDescriptionHelpFormatter, 

194 epilog="""\ 

195Example usage: 

196 # Load all *.nq.gz files from /data/rdf (local mode) 

197 python bulk_load.py -d /data/rdf -k mypassword 

198 

199 # Load *.nq.gz recursively using Docker, files at /database/data in container 

200 python bulk_load.py -d /database/data -k mypassword --recursive \ 

201 --docker-container virtuoso_container 

202 

203IMPORTANT: 

204- Only files with the extension `.nq.gz` will be loaded. 

205- The data directory (-d) must be accessible by the Virtuoso process 

206 and listed in the 'DirsAllowed' setting in virtuoso.ini. 

207- When using Docker mode, data-directory is the path INSIDE the container. 

208 Files are accessed and loaded directly inside the container. 

209""" 

210 ) 

211 

212 parser.add_argument("-d", "--data-directory", required=True, 

213 help="Path to the N-Quads Gzipped (`.nq.gz`) files. When using Docker, this must be the path INSIDE the container.") 

214 parser.add_argument("-H", "--host", default=DEFAULT_VIRTUOSO_HOST, 

215 help=f"Virtuoso server host (Default: {DEFAULT_VIRTUOSO_HOST}).") 

216 parser.add_argument("-P", "--port", type=int, default=DEFAULT_VIRTUOSO_PORT, 

217 help=f"Virtuoso server port (Default: {DEFAULT_VIRTUOSO_PORT}).") 

218 parser.add_argument("-u", "--user", default=DEFAULT_VIRTUOSO_USER, 

219 help=f"Virtuoso username (Default: {DEFAULT_VIRTUOSO_USER}).") 

220 parser.add_argument("-k", "--password", required=True, 

221 help="Virtuoso password.") 

222 parser.add_argument("--recursive", action='store_true', 

223 help="Load files recursively from subdirectories (uses ld_dir_all).") 

224 parser.add_argument("--log-level", default="ERROR", choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], 

225 help="Logging level (Default: ERROR).") 

226 

227 docker_group = parser.add_argument_group('Docker Options') 

228 docker_group.add_argument("--docker-container", 

229 help="Name or ID of the running Virtuoso Docker container. If provided, 'isql' will be run via 'docker exec'.") 

230 

231 args = parser.parse_args() 

232 

233 try: 

234 bulk_load( 

235 data_directory=args.data_directory, 

236 password=args.password, 

237 host=args.host, 

238 port=args.port, 

239 user=args.user, 

240 recursive=args.recursive, 

241 docker_container=args.docker_container, 

242 log_level=args.log_level 

243 ) 

244 sys.exit(0) 

245 except RuntimeError as e: 

246 logger.error(str(e)) 

247 sys.exit(1) 

248 

249 

250if __name__ == "__main__": # pragma: no cover 

251 main()