Coverage for virtuoso_utilities / dump_quadstore.py: 42%

170 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-15 14:45 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3 

4""" 

5Dumps the entire content of an OpenLink Virtuoso quadstore using the official 

6dump_nquads stored procedure. 

7 

8This script utilizes Virtuoso's optimized dump_nquads procedure for dumping 

9RDF data in N-Quads format, preserving Named Graph information. 

10The procedure is based on the official OpenLink Virtuoso documentation: 

11https://vos.openlinksw.com/owiki/wiki/VOS/VirtRDFDumpNQuad 

12 

13Features: 

14- Uses official Virtuoso dump_nquads stored procedure 

15- Outputs in N-Quads format preserving Named Graph IRI information 

16- Supports both local Virtuoso and Docker-based execution 

17- Automatic compression (.nq.gz files) 

18- Configurable file size limits and starting file numbers 

19- Progress monitoring during export 

20- Excludes internal virtrdf: graphs automatically 

21 

22The script first installs the necessary stored procedure, then calls it 

23to perform the actual dump operation producing compressed N-Quads files. 

24""" 

25 

26import argparse 

27import os 

28import subprocess 

29import sys 

30import tempfile 

31import time 

32from typing import List 

33 

34from virtuoso_utilities.isql_helpers import run_isql_command 

35 

36DEFAULT_VIRTUOSO_HOST = "localhost" 

37DEFAULT_VIRTUOSO_PORT = 1111 

38DEFAULT_VIRTUOSO_USER = "dba" 

39DEFAULT_ISQL_PATH_HOST = "isql" 

40DEFAULT_ISQL_PATH_DOCKER = "isql" 

41DEFAULT_DOCKER_PATH = "docker" 

42DEFAULT_OUTPUT_DIR = "./virtuoso_dump" 

43DEFAULT_FILE_LENGTH_LIMIT = 100000000 # 100MB per file 

44DEFAULT_START_FROM = 1 

45DEFAULT_COMPRESSION = 1 # Enable compression by default 

46 

47DUMP_NQUADS_PROCEDURE = """ 

48CREATE PROCEDURE dump_nquads  

49 ( IN dir VARCHAR := 'dumps' 

50 , IN start_from INT := 1 

51 , IN file_length_limit INTEGER := 100000000 

52 , IN comp INT := 1 

53 ) 

54 { 

55 DECLARE inx, ses_len INT 

56 ; DECLARE file_name VARCHAR 

57 ; DECLARE env, ses ANY 

58 ; 

59 

60 inx := start_from; 

61 SET isolation = 'uncommitted'; 

62 env := vector (0,0,0); 

63 ses := string_output (10000000); 

64 FOR (SELECT * FROM (sparql define input:storage "" SELECT ?s ?p ?o ?g { GRAPH ?g { ?s ?p ?o } . FILTER ( ?g != virtrdf: ) } ) AS sub OPTION (loop)) DO 

65 { 

66 DECLARE EXIT HANDLER FOR SQLSTATE '22023'  

67 { 

68 GOTO next; 

69 }; 

70 http_nquad (env, "s", "p", "o", "g", ses); 

71 ses_len := LENGTH (ses); 

72 IF (ses_len >= file_length_limit) 

73 { 

74 file_name := sprintf ('%s/output%06d.nq', dir, inx); 

75 string_to_file (file_name, ses, -2); 

76 IF (comp) 

77 { 

78 gz_compress_file (file_name, file_name||'.gz'); 

79 file_delete (file_name); 

80 } 

81 inx := inx + 1; 

82 env := vector (0,0,0); 

83 ses := string_output (10000000); 

84 } 

85 next:; 

86 } 

87 IF (length (ses)) 

88 { 

89 file_name := sprintf ('%s/output%06d.nq', dir, inx); 

90 string_to_file (file_name, ses, -2); 

91 IF (comp) 

92 { 

93 gz_compress_file (file_name, file_name||'.gz'); 

94 file_delete (file_name); 

95 } 

96 inx := inx + 1; 

97 env := vector (0,0,0); 

98 } 

99} 

100; 

101""" 

102 

103 

104def create_output_directory(output_dir: str, use_docker: bool = False) -> bool: 

105 """ 

106 Ensure the output directory exists. If running in Docker mode, skip creation on the host. 

107  

108 Args: 

109 output_dir: Path to the output directory 

110 use_docker: True if running with --docker-container 

111  

112 Returns: 

113 True if directory exists or was created successfully (or skipped in Docker mode), False otherwise 

114 """ 

115 if use_docker: 

116 return True 

117 try: 

118 if not os.path.exists(output_dir): 

119 os.makedirs(output_dir, exist_ok=True) 

120 print(f"Created output directory: {output_dir}") 

121 return True 

122 except Exception as e: 

123 print(f"Error creating output directory '{output_dir}': {e}", file=sys.stderr) 

124 return False 

125 

126 

127def install_dump_procedure(args: argparse.Namespace) -> bool: 

128 """ 

129 Install the dump_nquads stored procedure in Virtuoso by saving it to a file and loading it with LOAD. 

130 If using Docker, copy the file into the container and LOAD it there. 

131  

132 Args: 

133 args: Parsed command-line arguments 

134  

135 Returns: 

136 True if successful, False otherwise 

137 """ 

138 print("Installing Virtuoso dump_nquads procedure via LOAD ...") 

139 try: 

140 if args.docker_container: 

141 with tempfile.NamedTemporaryFile("w", delete=False, suffix="_dump_nquads_procedure.sql", encoding="utf-8") as f: 

142 f.write(DUMP_NQUADS_PROCEDURE) 

143 host_tmp_path = f.name 

144 container_tmp_path = "/tmp/dump_nquads_procedure.sql" 

145 cp_cmd = [args.docker_path, "cp", host_tmp_path, f"{args.docker_container}:{container_tmp_path}"] 

146 cp_result = subprocess.run(cp_cmd, capture_output=True, text=True) 

147 if cp_result.returncode != 0: 

148 print(f"Error copying procedure file into container: {cp_result.stderr}", file=sys.stderr) 

149 os.unlink(host_tmp_path) 

150 return False 

151 load_command = f"LOAD '{container_tmp_path}';" 

152 success, stdout, stderr = run_isql_command(args, sql_command=load_command) 

153 rm_cmd = [args.docker_path, "exec", args.docker_container, "rm", "-f", container_tmp_path] 

154 subprocess.run(rm_cmd, capture_output=True) 

155 os.unlink(host_tmp_path) 

156 else: 

157 with tempfile.NamedTemporaryFile("w", delete=False, suffix="_dump_nquads_procedure.sql", encoding="utf-8") as f: 

158 f.write(DUMP_NQUADS_PROCEDURE) 

159 procedure_file = f.name 

160 load_command = f"LOAD '{procedure_file}';" 

161 success, stdout, stderr = run_isql_command(args, sql_command=load_command) 

162 os.unlink(procedure_file) 

163 if not success: 

164 print(f"Error installing dump_nquads procedure: {stderr}", file=sys.stderr) 

165 return False 

166 print("dump_nquads procedure installed successfully!") 

167 return True 

168 except Exception as e: 

169 print(f"Error writing or loading dump_nquads procedure: {e}", file=sys.stderr) 

170 return False 

171 

172 

173 

174def dump_nquads(args: argparse.Namespace) -> bool: 

175 """ 

176 Execute the dump_nquads procedure to dump all graphs. 

177  

178 Args: 

179 args: Parsed command-line arguments 

180  

181 Returns: 

182 True if successful, False otherwise 

183 """ 

184 print("Starting N-Quads dump using dump_nquads procedure...") 

185 

186 compression_flag = 1 if args.compression else 0 

187 dump_command = f"dump_nquads('{args.output_dir}', {DEFAULT_START_FROM}, {args.file_length_limit}, {compression_flag});" 

188 

189 print(f"Executing: {dump_command}") 

190 

191 success, stdout, stderr = run_isql_command(args, sql_command=dump_command) 

192 

193 if not success: 

194 print(f"Error executing dump_nquads: {stderr}", file=sys.stderr) 

195 return False 

196 

197 print("dump_nquads procedure completed successfully!") 

198 return True 

199 

200 

201def list_output_files(output_dir: str, compressed: bool = True) -> List[str]: 

202 """ 

203 List all the output files created in the dump directory. 

204  

205 Args: 

206 output_dir: Output directory path 

207 compressed: Whether to look for compressed files 

208  

209 Returns: 

210 List of output file paths 

211 """ 

212 try: 

213 files = [] 

214 if os.path.exists(output_dir): 

215 for filename in os.listdir(output_dir): 

216 if compressed and filename.endswith('.nq.gz'): 

217 files.append(os.path.join(output_dir, filename)) 

218 elif not compressed and filename.endswith('.nq'): 

219 files.append(os.path.join(output_dir, filename)) 

220 return sorted(files) 

221 except Exception as e: 

222 print(f"Error listing output files: {e}", file=sys.stderr) 

223 return [] 

224 

225 

226def dump_quadstore(args: argparse.Namespace) -> bool: 

227 """ 

228 Main function to dump the quadstore content using dump_nquads procedure. 

229 Args: 

230 args: Parsed command-line arguments 

231 Returns: 

232 True if successful, False otherwise 

233 """ 

234 if not create_output_directory(args.output_dir, args.docker_container): 

235 return False 

236 if not install_dump_procedure(args): 

237 return False 

238 print(f"\nStep 1: Executing dump_nquads procedure...") 

239 success = dump_nquads(args) 

240 if success: 

241 output_files = list_output_files(args.output_dir, args.compression) 

242 print(f"\nDump completed successfully!") 

243 print(f"Total files created: {len(output_files)}") 

244 print(f"Output directory: {args.output_dir}") 

245 print(f"Output format: N-Quads ({'compressed' if args.compression else 'uncompressed'})") 

246 if output_files: 

247 print("\nCreated files:") 

248 total_size = 0 

249 for file_path in output_files: 

250 try: 

251 size = os.path.getsize(file_path) 

252 total_size += size 

253 print(f" {os.path.basename(file_path)} ({size:,} bytes)") 

254 except OSError: 

255 print(f" {os.path.basename(file_path)} (size unknown)") 

256 if total_size > 0: 

257 print(f"\nTotal size: {total_size:,} bytes ({total_size / (1024*1024):.2f} MB)") 

258 return success 

259 

260 

261def main(): 

262 """ 

263 Main function to parse arguments and orchestrate the quadstore dump. 

264 """ 

265 parser = argparse.ArgumentParser( 

266 description="Dump the entire content of an OpenLink Virtuoso quadstore using the official dump_nquads procedure.", 

267 formatter_class=argparse.RawDescriptionHelpFormatter, 

268 epilog=""" 

269This script uses Virtuoso's official dump_nquads stored procedure for optimal 

270performance and N-Quads output format. The procedure is documented at: 

271https://vos.openlinksw.com/owiki/wiki/VOS/VirtRDFDumpNQuad 

272 

273The dump_nquads procedure automatically: 

274- Excludes internal virtrdf: graphs 

275- Preserves Named Graph information in N-Quads format 

276- Handles compression and file splitting 

277- Optimizes memory usage during export 

278 

279Example usage: 

280 # Dump entire quadstore to compressed N-Quads files 

281 python dump_quadstore.py --password mypassword --output-dir ./dump 

282 

283 # Dump with custom file size limit (50MB per file) 

284 python dump_quadstore.py --password mypassword --file-length-limit 50000000 

285 

286 # Dump uncompressed files 

287 python dump_quadstore.py --password mypassword --no-compression 

288 

289 # Dump using Docker 

290 python dump_quadstore.py --password mypassword --docker-container virtuoso \\ 

291 --output-dir /dumps 

292 

293Important Notes: 

294- Output files are in N-Quads format: output000001.nq.gz, output000002.nq.gz, etc. 

295- The output directory must be accessible by Virtuoso and listed in DirsAllowed 

296- When using Docker, ensure the output directory is mounted and accessible inside the container 

297- The script automatically installs the required dump_nquads stored procedure 

298""" 

299 ) 

300 

301 parser.add_argument("-H", "--host", default=DEFAULT_VIRTUOSO_HOST, 

302 help=f"Virtuoso server host (Default: {DEFAULT_VIRTUOSO_HOST})") 

303 parser.add_argument("-P", "--port", type=int, default=DEFAULT_VIRTUOSO_PORT, 

304 help=f"Virtuoso server port (Default: {DEFAULT_VIRTUOSO_PORT})") 

305 parser.add_argument("-u", "--user", default=DEFAULT_VIRTUOSO_USER, 

306 help=f"Virtuoso username (Default: {DEFAULT_VIRTUOSO_USER})") 

307 parser.add_argument("-k", "--password", default="dba", 

308 help="Virtuoso password (Default: dba)") 

309 

310 # Output parameters 

311 parser.add_argument("-o", "--output-dir", default=DEFAULT_OUTPUT_DIR, 

312 help=f"Output directory for N-Quads files (Default: {DEFAULT_OUTPUT_DIR}). Must be accessible by Virtuoso and listed in DirsAllowed.") 

313 parser.add_argument("--file-length-limit", type=int, default=DEFAULT_FILE_LENGTH_LIMIT, 

314 help=f"Maximum length of dump files in bytes (Default: {DEFAULT_FILE_LENGTH_LIMIT:,})") 

315 parser.add_argument("--no-compression", action="store_true", 

316 help="Disable gzip compression (files will be .nq instead of .nq.gz)") 

317 

318 docker_group = parser.add_argument_group('Docker Options') 

319 docker_group.add_argument("--docker-container", 

320 help="Name or ID of the running Virtuoso Docker container") 

321 

322 args = parser.parse_args() 

323 

324 if args.file_length_limit <= 0: 

325 print("Error: --file-length-limit must be greater than 0", file=sys.stderr) 

326 sys.exit(1) 

327 

328 args.compression = not args.no_compression 

329 

330 args.isql_path = DEFAULT_ISQL_PATH_HOST 

331 args.docker_isql_path = DEFAULT_ISQL_PATH_DOCKER 

332 args.docker_path = DEFAULT_DOCKER_PATH 

333 

334 if os.name != "nt" and args.docker_container: 

335 args.output_dir = os.path.abspath(args.output_dir) 

336 

337 print("-" * 70) 

338 print("Virtuoso N-Quads Dump Configuration:") 

339 print(f" Host: {args.host}:{args.port}") 

340 print(f" User: {args.user}") 

341 print(f" Mode: {'Docker' if args.docker_container else 'Local'}") 

342 if args.docker_container: 

343 print(f" Container: {args.docker_container}") 

344 print(f" Output Dir: {args.output_dir}") 

345 print(f" File Size Limit: {args.file_length_limit:,} bytes") 

346 print(f" Compression: {'Enabled (.nq.gz)' if args.compression else 'Disabled (.nq)'}") 

347 print(f" Method: Official dump_nquads stored procedure") 

348 print(f" Output Format: N-Quads (preserves Named Graph information)") 

349 print("-" * 70) 

350 

351 print("\nIMPORTANT: Ensure the output directory is:") 

352 print(" 1. Accessible by the Virtuoso server process") 

353 print(" 2. Listed in the 'DirsAllowed' parameter in virtuoso.ini") 

354 if args.docker_container: 

355 print(" 3. Properly mounted and accessible inside the Docker container") 

356 print() 

357 

358 print("Testing Virtuoso connection...") 

359 success, stdout, stderr = run_isql_command(args, sql_command="SELECT 'Connection test' as test;") 

360 if not success: 

361 print(f"Error: Could not connect to Virtuoso: {stderr}", file=sys.stderr) 

362 sys.exit(1) 

363 print("Connection successful!") 

364 

365 start_time = time.time() 

366 success = dump_quadstore(args) 

367 end_time = time.time() 

368 

369 if success: 

370 duration = end_time - start_time 

371 print(f"\nDump completed in {duration:.2f} seconds") 

372 sys.exit(0) 

373 else: 

374 print("Dump failed", file=sys.stderr) 

375 sys.exit(1) 

376 

377 

378if __name__ == "__main__": 

379 main()