Coverage for virtuoso_utilities / bulk_load.py: 87%

89 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-04-14 09:16 +0000

1#!/usr/bin/env python3 

2 

3# SPDX-FileCopyrightText: 2025 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# SPDX-FileCopyrightText: 2025 eliarizzetto <elia.rizzetto@gmail.com> 

5# 

6# SPDX-License-Identifier: ISC 

7 

8# -*- coding: utf-8 -*- 

9 

10""" 

11Performs sequential bulk loading of RDF N-Quads Gzipped files (`.nq.gz`) 

12into OpenLink Virtuoso using the official `ld_dir`/`ld_dir_all` and 

13`rdf_loader_run` method. 

14 

15Registers files matching *.nq.gz using ld_dir/ld_dir_all into DB.DBA.load_list, 

16then runs a single `rdf_loader_run()` process to load the registered files. 

17 

18IMPORTANT: 

19- Only files with the extension `.nq.gz` will be processed. 

20- The data directory specified ('-d' or '--data-directory') MUST be 

21 accessible by the Virtuoso server process itself. 

22- This directory path MUST be listed in the 'DirsAllowed' parameter 

23 within the Virtuoso INI file (e.g., virtuoso.ini). 

24- When using Docker, data-directory is the path INSIDE the container. 

25 Files will be accessed and loaded from within the container. 

26 

27Reference: 

28- https://vos.openlinksw.com/owiki/wiki/VOS/VirtBulkRDFLoader 

29""" 

30 

31import argparse 

32import glob 

33import logging 

34import os 

35import sys 

36 

37from virtuoso_utilities.isql_helpers import run_isql_command 

38 

39logger = logging.getLogger(__name__) 

40 

41DEFAULT_VIRTUOSO_HOST = "localhost" 

42DEFAULT_VIRTUOSO_PORT = 1111 

43DEFAULT_VIRTUOSO_USER = "dba" 

44 

45ISQL_PATH_HOST = "isql" 

46ISQL_PATH_DOCKER = "isql" 

47DOCKER_PATH = "docker" 

48CHECKPOINT_INTERVAL = 60 

49SCHEDULER_INTERVAL = 10 

50 

51NQ_GZ_PATTERN = '*.nq.gz' 

52 

53 

54def find_nquads_files_local(directory, recursive=False): 

55 """ 

56 Find all N-Quads Gzipped files (`*.nq.gz`) in a directory on local filesystem. 

57 Returns a list of file paths. 

58 """ 

59 pattern = NQ_GZ_PATTERN 

60 if recursive: 

61 matches = [] 

62 for root, _, _ in os.walk(directory): 

63 path_pattern = os.path.join(root, pattern) 

64 matches.extend(glob.glob(path_pattern)) 

65 return matches 

66 else: 

67 path_pattern = os.path.join(directory, pattern) 

68 return glob.glob(path_pattern) 

69 

70 

71def bulk_load( 

72 data_directory: str, 

73 password: str, 

74 host: str = "localhost", 

75 port: int = 1111, 

76 user: str = "dba", 

77 recursive: bool = False, 

78 docker_container: str = None, 

79 isql_path: str = ISQL_PATH_HOST, 

80 docker_isql_path: str = ISQL_PATH_DOCKER, 

81 docker_path: str = DOCKER_PATH, 

82 container_data_directory: str = None, 

83 log_level: str = "ERROR" 

84) -> None: 

85 """ 

86 Perform Virtuoso bulk loading of N-Quads files. 

87 

88 This function can be imported and called programmatically, avoiding subprocess overhead. 

89 

90 Args: 

91 data_directory: Path to directory containing .nq.gz files (host path for file operations) 

92 password: Virtuoso DBA password 

93 host: Virtuoso server host 

94 port: Virtuoso server port 

95 user: Virtuoso username 

96 recursive: Use ld_dir_all instead of ld_dir for recursive loading 

97 docker_container: Docker container name (for ISQL commands only) 

98 isql_path: Path to isql binary (local) 

99 docker_isql_path: Path to isql binary inside Docker container 

100 docker_path: Path to docker binary 

101 container_data_directory: Path INSIDE container where Virtuoso accesses files (if different from data_directory) 

102 log_level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL). Default: ERROR 

103 

104 Raises: 

105 RuntimeError: If bulk load fails 

106 """ 

107 

108 logging.basicConfig(level=getattr(logging, log_level.upper()), format='%(levelname)s: %(message)s', force=True) 

109 

110 args = argparse.Namespace() 

111 args.data_directory = data_directory 

112 args.host = host 

113 args.port = port 

114 args.user = user 

115 args.password = password 

116 args.recursive = recursive 

117 args.docker_container = docker_container 

118 args.isql_path = isql_path 

119 args.docker_isql_path = docker_isql_path 

120 args.docker_path = docker_path 

121 args.container_data_directory = container_data_directory 

122 

123 data_dir = args.data_directory 

124 container_dir = args.container_data_directory if args.container_data_directory else data_dir 

125 

126 files = find_nquads_files_local(data_dir, args.recursive) 

127 

128 if not files: 

129 logger.warning(f"No files matching '{NQ_GZ_PATTERN}' found in '{data_dir}'.") 

130 return 

131 

132 # Clean up any leftover entries from previous bulk loads 

133 cleanup_sql = "DELETE FROM DB.DBA.load_list;" 

134 run_isql_command(args, sql_command=cleanup_sql) 

135 

136 ld_function = "ld_dir_all" if args.recursive else "ld_dir" 

137 

138 container_dir_sql_escaped = container_dir.replace("'", "''") 

139 file_pattern_sql_escaped = NQ_GZ_PATTERN.replace("'", "''") 

140 

141 # Use empty string to let Virtuoso read graph URIs from nquads (4th field) 

142 register_sql = f"{ld_function}('{container_dir_sql_escaped}', '{file_pattern_sql_escaped}', '');" 

143 success_reg, _, stderr_reg = run_isql_command(args, sql_command=register_sql) 

144 

145 if not success_reg or "Unable to list files" in stderr_reg or "FA020" in stderr_reg: 

146 raise RuntimeError(f"Failed to register files using {ld_function}.\nError: {stderr_reg}") 

147 

148 loader_sql = "rdf_loader_run();" 

149 success_load, _, stderr_load = run_isql_command(args, sql_command=loader_sql) 

150 

151 if not success_load: 

152 raise RuntimeError(f"rdf_loader_run() failed.\nError: {stderr_load}") 

153 

154 stats_sql = "SELECT COUNT(*) AS total_files, SUM(CASE WHEN ll_state = 2 THEN 1 ELSE 0 END) AS loaded, SUM(CASE WHEN ll_state <> 2 OR ll_error IS NOT NULL THEN 1 ELSE 0 END) AS issues FROM DB.DBA.load_list;" 

155 success_stats, stdout_stats, _ = run_isql_command(args, sql_command=stats_sql) 

156 

157 if success_stats: 

158 lines = stdout_stats.strip().splitlines() 

159 for i, line in enumerate(lines): 

160 if i > 3 and not line.endswith("Rows.") and "INTEGER" not in line and "VARCHAR" not in line: 

161 parts = line.split() 

162 if len(parts) >= 3 and parts[0].isdigit(): 

163 total_files = int(parts[0]) 

164 loaded_files = int(parts[1]) if parts[1] != "NULL" else 0 

165 issues = int(parts[2]) if parts[2] != "NULL" else 0 

166 

167 if total_files != loaded_files or issues != 0: 

168 failed_sql = "SELECT ll_file FROM DB.DBA.load_list WHERE ll_state <> 2 OR ll_error IS NOT NULL;" 

169 success_failed, stdout_failed, _ = run_isql_command(args, sql_command=failed_sql) 

170 failed_files = [] 

171 if success_failed: 

172 for line in stdout_failed.strip().splitlines(): 

173 line = line.strip() 

174 if line and not line.startswith("ll_file") and not line.endswith("Rows.") and line != "VARCHAR": 

175 failed_files.append(line) 

176 raise RuntimeError( 

177 f"Bulk load failed: {issues} file(s) had issues.\n" 

178 f"Failed files:\n" + "\n".join(f" - {f}" for f in failed_files) 

179 ) 

180 break 

181 

182 delete_loaded_sql = "DELETE FROM DB.DBA.load_list WHERE ll_state = 2;" 

183 success_delete, _, stderr_delete = run_isql_command(args, sql_command=delete_loaded_sql) 

184 if not success_delete: 

185 raise RuntimeError(f"Failed to clean up load_list table.\nError: {stderr_delete}") 

186 

187 cleanup_sql = f"log_enable(3, 1); checkpoint; checkpoint_interval({CHECKPOINT_INTERVAL}); scheduler_interval({SCHEDULER_INTERVAL});" 

188 success_final, _, stderr_final = run_isql_command(args, sql_command=cleanup_sql) 

189 if not success_final: 

190 raise RuntimeError(f"Failed to run final checkpoint.\nError details: {stderr_final}") 

191 

192 

193def main(): # pragma: no cover 

194 """ 

195 CLI entry point that parses arguments and calls bulk_load(). 

196 """ 

197 parser = argparse.ArgumentParser( 

198 description=f"Sequential N-Quads Gzipped (`{NQ_GZ_PATTERN}`) bulk loader for OpenLink Virtuoso using ld_dir/rdf_loader_run.", 

199 formatter_class=argparse.RawDescriptionHelpFormatter, 

200 epilog="""\ 

201Example usage: 

202 # Load all *.nq.gz files from /data/rdf (local mode) 

203 python bulk_load.py -d /data/rdf -k mypassword 

204 

205 # Load *.nq.gz recursively using Docker, files at /database/data in container 

206 python bulk_load.py -d /database/data -k mypassword --recursive \ 

207 --docker-container virtuoso_container 

208 

209IMPORTANT: 

210- Only files with the extension `.nq.gz` will be loaded. 

211- The data directory (-d) must be accessible by the Virtuoso process 

212 and listed in the 'DirsAllowed' setting in virtuoso.ini. 

213- When using Docker mode, data-directory is the path INSIDE the container. 

214 Files are accessed and loaded directly inside the container. 

215""" 

216 ) 

217 

218 parser.add_argument("-d", "--data-directory", required=True, 

219 help="Path to the N-Quads Gzipped (`.nq.gz`) files. When using Docker, this must be the path INSIDE the container.") 

220 parser.add_argument("-H", "--host", default=DEFAULT_VIRTUOSO_HOST, 

221 help=f"Virtuoso server host (Default: {DEFAULT_VIRTUOSO_HOST}).") 

222 parser.add_argument("-P", "--port", type=int, default=DEFAULT_VIRTUOSO_PORT, 

223 help=f"Virtuoso server port (Default: {DEFAULT_VIRTUOSO_PORT}).") 

224 parser.add_argument("-u", "--user", default=DEFAULT_VIRTUOSO_USER, 

225 help=f"Virtuoso username (Default: {DEFAULT_VIRTUOSO_USER}).") 

226 parser.add_argument("-k", "--password", required=True, 

227 help="Virtuoso password.") 

228 parser.add_argument("--recursive", action='store_true', 

229 help="Load files recursively from subdirectories (uses ld_dir_all).") 

230 parser.add_argument("--log-level", default="ERROR", choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], 

231 help="Logging level (Default: ERROR).") 

232 

233 docker_group = parser.add_argument_group('Docker Options') 

234 docker_group.add_argument("--docker-container", 

235 help="Name or ID of the running Virtuoso Docker container. If provided, 'isql' will be run via 'docker exec'.") 

236 

237 args = parser.parse_args() 

238 

239 try: 

240 bulk_load( 

241 data_directory=args.data_directory, 

242 password=args.password, 

243 host=args.host, 

244 port=args.port, 

245 user=args.user, 

246 recursive=args.recursive, 

247 docker_container=args.docker_container, 

248 log_level=args.log_level 

249 ) 

250 sys.exit(0) 

251 except RuntimeError as e: 

252 logger.error(str(e)) 

253 sys.exit(1) 

254 

255 

256if __name__ == "__main__": # pragma: no cover 

257 main()