Coverage for virtuoso_utilities / dump_quadstore.py: 42%

170 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-04-14 09:16 +0000

1#!/usr/bin/env python3 

2 

3# SPDX-FileCopyrightText: 2025 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# SPDX-FileCopyrightText: 2025 krzywonos <98768745+krzywonos@users.noreply.github.com> 

5# 

6# SPDX-License-Identifier: ISC 

7 

8# -*- coding: utf-8 -*- 

9 

10""" 

11Dumps the entire content of an OpenLink Virtuoso quadstore using the official 

12dump_nquads stored procedure. 

13 

14This script utilizes Virtuoso's optimized dump_nquads procedure for dumping 

15RDF data in N-Quads format, preserving Named Graph information. 

16The procedure is based on the official OpenLink Virtuoso documentation: 

17https://vos.openlinksw.com/owiki/wiki/VOS/VirtRDFDumpNQuad 

18 

19Features: 

20- Uses official Virtuoso dump_nquads stored procedure 

21- Outputs in N-Quads format preserving Named Graph IRI information 

22- Supports both local Virtuoso and Docker-based execution 

23- Automatic compression (.nq.gz files) 

24- Configurable file size limits and starting file numbers 

25- Progress monitoring during export 

26- Excludes internal virtrdf: graphs automatically 

27 

28The script first installs the necessary stored procedure, then calls it 

29to perform the actual dump operation producing compressed N-Quads files. 

30""" 

31 

32import argparse 

33import os 

34import subprocess 

35import sys 

36import tempfile 

37import time 

38from typing import List 

39 

40from virtuoso_utilities.isql_helpers import run_isql_command 

41 

42DEFAULT_VIRTUOSO_HOST = "localhost" 

43DEFAULT_VIRTUOSO_PORT = 1111 

44DEFAULT_VIRTUOSO_USER = "dba" 

45DEFAULT_ISQL_PATH_HOST = "isql" 

46DEFAULT_ISQL_PATH_DOCKER = "isql" 

47DEFAULT_DOCKER_PATH = "docker" 

48DEFAULT_OUTPUT_DIR = "./virtuoso_dump" 

49DEFAULT_FILE_LENGTH_LIMIT = 100000000 # 100MB per file 

50DEFAULT_START_FROM = 1 

51DEFAULT_COMPRESSION = 1 # Enable compression by default 

52 

53DUMP_NQUADS_PROCEDURE = """ 

54CREATE PROCEDURE dump_nquads  

55 ( IN dir VARCHAR := 'dumps' 

56 , IN start_from INT := 1 

57 , IN file_length_limit INTEGER := 100000000 

58 , IN comp INT := 1 

59 ) 

60 { 

61 DECLARE inx, ses_len INT 

62 ; DECLARE file_name VARCHAR 

63 ; DECLARE env, ses ANY 

64 ; 

65 

66 inx := start_from; 

67 SET isolation = 'uncommitted'; 

68 env := vector (0,0,0); 

69 ses := string_output (10000000); 

70 FOR (SELECT * FROM (sparql define input:storage "" SELECT ?s ?p ?o ?g { GRAPH ?g { ?s ?p ?o } . FILTER ( ?g != virtrdf: ) } ) AS sub OPTION (loop)) DO 

71 { 

72 DECLARE EXIT HANDLER FOR SQLSTATE '22023'  

73 { 

74 GOTO next; 

75 }; 

76 http_nquad (env, "s", "p", "o", "g", ses); 

77 ses_len := LENGTH (ses); 

78 IF (ses_len >= file_length_limit) 

79 { 

80 file_name := sprintf ('%s/output%06d.nq', dir, inx); 

81 string_to_file (file_name, ses, -2); 

82 IF (comp) 

83 { 

84 gz_compress_file (file_name, file_name||'.gz'); 

85 file_delete (file_name); 

86 } 

87 inx := inx + 1; 

88 env := vector (0,0,0); 

89 ses := string_output (10000000); 

90 } 

91 next:; 

92 } 

93 IF (length (ses)) 

94 { 

95 file_name := sprintf ('%s/output%06d.nq', dir, inx); 

96 string_to_file (file_name, ses, -2); 

97 IF (comp) 

98 { 

99 gz_compress_file (file_name, file_name||'.gz'); 

100 file_delete (file_name); 

101 } 

102 inx := inx + 1; 

103 env := vector (0,0,0); 

104 } 

105} 

106; 

107""" 

108 

109 

110def create_output_directory(output_dir: str, use_docker: bool = False) -> bool: 

111 """ 

112 Ensure the output directory exists. If running in Docker mode, skip creation on the host. 

113  

114 Args: 

115 output_dir: Path to the output directory 

116 use_docker: True if running with --docker-container 

117  

118 Returns: 

119 True if directory exists or was created successfully (or skipped in Docker mode), False otherwise 

120 """ 

121 if use_docker: 

122 return True 

123 try: 

124 if not os.path.exists(output_dir): 

125 os.makedirs(output_dir, exist_ok=True) 

126 print(f"Created output directory: {output_dir}") 

127 return True 

128 except Exception as e: 

129 print(f"Error creating output directory '{output_dir}': {e}", file=sys.stderr) 

130 return False 

131 

132 

133def install_dump_procedure(args: argparse.Namespace) -> bool: 

134 """ 

135 Install the dump_nquads stored procedure in Virtuoso by saving it to a file and loading it with LOAD. 

136 If using Docker, copy the file into the container and LOAD it there. 

137  

138 Args: 

139 args: Parsed command-line arguments 

140  

141 Returns: 

142 True if successful, False otherwise 

143 """ 

144 print("Installing Virtuoso dump_nquads procedure via LOAD ...") 

145 try: 

146 if args.docker_container: 

147 with tempfile.NamedTemporaryFile("w", delete=False, suffix="_dump_nquads_procedure.sql", encoding="utf-8") as f: 

148 f.write(DUMP_NQUADS_PROCEDURE) 

149 host_tmp_path = f.name 

150 container_tmp_path = "/tmp/dump_nquads_procedure.sql" 

151 cp_cmd = [args.docker_path, "cp", host_tmp_path, f"{args.docker_container}:{container_tmp_path}"] 

152 cp_result = subprocess.run(cp_cmd, capture_output=True, text=True) 

153 if cp_result.returncode != 0: 

154 print(f"Error copying procedure file into container: {cp_result.stderr}", file=sys.stderr) 

155 os.unlink(host_tmp_path) 

156 return False 

157 load_command = f"LOAD '{container_tmp_path}';" 

158 success, stdout, stderr = run_isql_command(args, sql_command=load_command) 

159 rm_cmd = [args.docker_path, "exec", args.docker_container, "rm", "-f", container_tmp_path] 

160 subprocess.run(rm_cmd, capture_output=True) 

161 os.unlink(host_tmp_path) 

162 else: 

163 with tempfile.NamedTemporaryFile("w", delete=False, suffix="_dump_nquads_procedure.sql", encoding="utf-8") as f: 

164 f.write(DUMP_NQUADS_PROCEDURE) 

165 procedure_file = f.name 

166 load_command = f"LOAD '{procedure_file}';" 

167 success, stdout, stderr = run_isql_command(args, sql_command=load_command) 

168 os.unlink(procedure_file) 

169 if not success: 

170 print(f"Error installing dump_nquads procedure: {stderr}", file=sys.stderr) 

171 return False 

172 print("dump_nquads procedure installed successfully!") 

173 return True 

174 except Exception as e: 

175 print(f"Error writing or loading dump_nquads procedure: {e}", file=sys.stderr) 

176 return False 

177 

178 

179 

180def dump_nquads(args: argparse.Namespace) -> bool: 

181 """ 

182 Execute the dump_nquads procedure to dump all graphs. 

183  

184 Args: 

185 args: Parsed command-line arguments 

186  

187 Returns: 

188 True if successful, False otherwise 

189 """ 

190 print("Starting N-Quads dump using dump_nquads procedure...") 

191 

192 compression_flag = 1 if args.compression else 0 

193 dump_command = f"dump_nquads('{args.output_dir}', {DEFAULT_START_FROM}, {args.file_length_limit}, {compression_flag});" 

194 

195 print(f"Executing: {dump_command}") 

196 

197 success, stdout, stderr = run_isql_command(args, sql_command=dump_command) 

198 

199 if not success: 

200 print(f"Error executing dump_nquads: {stderr}", file=sys.stderr) 

201 return False 

202 

203 print("dump_nquads procedure completed successfully!") 

204 return True 

205 

206 

207def list_output_files(output_dir: str, compressed: bool = True) -> List[str]: 

208 """ 

209 List all the output files created in the dump directory. 

210  

211 Args: 

212 output_dir: Output directory path 

213 compressed: Whether to look for compressed files 

214  

215 Returns: 

216 List of output file paths 

217 """ 

218 try: 

219 files = [] 

220 if os.path.exists(output_dir): 

221 for filename in os.listdir(output_dir): 

222 if compressed and filename.endswith('.nq.gz'): 

223 files.append(os.path.join(output_dir, filename)) 

224 elif not compressed and filename.endswith('.nq'): 

225 files.append(os.path.join(output_dir, filename)) 

226 return sorted(files) 

227 except Exception as e: 

228 print(f"Error listing output files: {e}", file=sys.stderr) 

229 return [] 

230 

231 

232def dump_quadstore(args: argparse.Namespace) -> bool: 

233 """ 

234 Main function to dump the quadstore content using dump_nquads procedure. 

235 Args: 

236 args: Parsed command-line arguments 

237 Returns: 

238 True if successful, False otherwise 

239 """ 

240 if not create_output_directory(args.output_dir, args.docker_container): 

241 return False 

242 if not install_dump_procedure(args): 

243 return False 

244 print(f"\nStep 1: Executing dump_nquads procedure...") 

245 success = dump_nquads(args) 

246 if success: 

247 output_files = list_output_files(args.output_dir, args.compression) 

248 print(f"\nDump completed successfully!") 

249 print(f"Total files created: {len(output_files)}") 

250 print(f"Output directory: {args.output_dir}") 

251 print(f"Output format: N-Quads ({'compressed' if args.compression else 'uncompressed'})") 

252 if output_files: 

253 print("\nCreated files:") 

254 total_size = 0 

255 for file_path in output_files: 

256 try: 

257 size = os.path.getsize(file_path) 

258 total_size += size 

259 print(f" {os.path.basename(file_path)} ({size:,} bytes)") 

260 except OSError: 

261 print(f" {os.path.basename(file_path)} (size unknown)") 

262 if total_size > 0: 

263 print(f"\nTotal size: {total_size:,} bytes ({total_size / (1024*1024):.2f} MB)") 

264 return success 

265 

266 

267def main(): 

268 """ 

269 Main function to parse arguments and orchestrate the quadstore dump. 

270 """ 

271 parser = argparse.ArgumentParser( 

272 description="Dump the entire content of an OpenLink Virtuoso quadstore using the official dump_nquads procedure.", 

273 formatter_class=argparse.RawDescriptionHelpFormatter, 

274 epilog=""" 

275This script uses Virtuoso's official dump_nquads stored procedure for optimal 

276performance and N-Quads output format. The procedure is documented at: 

277https://vos.openlinksw.com/owiki/wiki/VOS/VirtRDFDumpNQuad 

278 

279The dump_nquads procedure automatically: 

280- Excludes internal virtrdf: graphs 

281- Preserves Named Graph information in N-Quads format 

282- Handles compression and file splitting 

283- Optimizes memory usage during export 

284 

285Example usage: 

286 # Dump entire quadstore to compressed N-Quads files 

287 python dump_quadstore.py --password mypassword --output-dir ./dump 

288 

289 # Dump with custom file size limit (50MB per file) 

290 python dump_quadstore.py --password mypassword --file-length-limit 50000000 

291 

292 # Dump uncompressed files 

293 python dump_quadstore.py --password mypassword --no-compression 

294 

295 # Dump using Docker 

296 python dump_quadstore.py --password mypassword --docker-container virtuoso \\ 

297 --output-dir /dumps 

298 

299Important Notes: 

300- Output files are in N-Quads format: output000001.nq.gz, output000002.nq.gz, etc. 

301- The output directory must be accessible by Virtuoso and listed in DirsAllowed 

302- When using Docker, ensure the output directory is mounted and accessible inside the container 

303- The script automatically installs the required dump_nquads stored procedure 

304""" 

305 ) 

306 

307 parser.add_argument("-H", "--host", default=DEFAULT_VIRTUOSO_HOST, 

308 help=f"Virtuoso server host (Default: {DEFAULT_VIRTUOSO_HOST})") 

309 parser.add_argument("-P", "--port", type=int, default=DEFAULT_VIRTUOSO_PORT, 

310 help=f"Virtuoso server port (Default: {DEFAULT_VIRTUOSO_PORT})") 

311 parser.add_argument("-u", "--user", default=DEFAULT_VIRTUOSO_USER, 

312 help=f"Virtuoso username (Default: {DEFAULT_VIRTUOSO_USER})") 

313 parser.add_argument("-k", "--password", default="dba", 

314 help="Virtuoso password (Default: dba)") 

315 

316 # Output parameters 

317 parser.add_argument("-o", "--output-dir", default=DEFAULT_OUTPUT_DIR, 

318 help=f"Output directory for N-Quads files (Default: {DEFAULT_OUTPUT_DIR}). Must be accessible by Virtuoso and listed in DirsAllowed.") 

319 parser.add_argument("--file-length-limit", type=int, default=DEFAULT_FILE_LENGTH_LIMIT, 

320 help=f"Maximum length of dump files in bytes (Default: {DEFAULT_FILE_LENGTH_LIMIT:,})") 

321 parser.add_argument("--no-compression", action="store_true", 

322 help="Disable gzip compression (files will be .nq instead of .nq.gz)") 

323 

324 docker_group = parser.add_argument_group('Docker Options') 

325 docker_group.add_argument("--docker-container", 

326 help="Name or ID of the running Virtuoso Docker container") 

327 

328 args = parser.parse_args() 

329 

330 if args.file_length_limit <= 0: 

331 print("Error: --file-length-limit must be greater than 0", file=sys.stderr) 

332 sys.exit(1) 

333 

334 args.compression = not args.no_compression 

335 

336 args.isql_path = DEFAULT_ISQL_PATH_HOST 

337 args.docker_isql_path = DEFAULT_ISQL_PATH_DOCKER 

338 args.docker_path = DEFAULT_DOCKER_PATH 

339 

340 if os.name != "nt" and args.docker_container: 

341 args.output_dir = os.path.abspath(args.output_dir) 

342 

343 print("-" * 70) 

344 print("Virtuoso N-Quads Dump Configuration:") 

345 print(f" Host: {args.host}:{args.port}") 

346 print(f" User: {args.user}") 

347 print(f" Mode: {'Docker' if args.docker_container else 'Local'}") 

348 if args.docker_container: 

349 print(f" Container: {args.docker_container}") 

350 print(f" Output Dir: {args.output_dir}") 

351 print(f" File Size Limit: {args.file_length_limit:,} bytes") 

352 print(f" Compression: {'Enabled (.nq.gz)' if args.compression else 'Disabled (.nq)'}") 

353 print(f" Method: Official dump_nquads stored procedure") 

354 print(f" Output Format: N-Quads (preserves Named Graph information)") 

355 print("-" * 70) 

356 

357 print("\nIMPORTANT: Ensure the output directory is:") 

358 print(" 1. Accessible by the Virtuoso server process") 

359 print(" 2. Listed in the 'DirsAllowed' parameter in virtuoso.ini") 

360 if args.docker_container: 

361 print(" 3. Properly mounted and accessible inside the Docker container") 

362 print() 

363 

364 print("Testing Virtuoso connection...") 

365 success, stdout, stderr = run_isql_command(args, sql_command="SELECT 'Connection test' as test;") 

366 if not success: 

367 print(f"Error: Could not connect to Virtuoso: {stderr}", file=sys.stderr) 

368 sys.exit(1) 

369 print("Connection successful!") 

370 

371 start_time = time.time() 

372 success = dump_quadstore(args) 

373 end_time = time.time() 

374 

375 if success: 

376 duration = end_time - start_time 

377 print(f"\nDump completed in {duration:.2f} seconds") 

378 sys.exit(0) 

379 else: 

380 print("Dump failed", file=sys.stderr) 

381 sys.exit(1) 

382 

383 

384if __name__ == "__main__": 

385 main()