Coverage for virtuoso_utilities / bulk_load.py: 87%
89 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-15 14:45 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-15 14:45 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
4"""
5Performs sequential bulk loading of RDF N-Quads Gzipped files (`.nq.gz`)
6into OpenLink Virtuoso using the official `ld_dir`/`ld_dir_all` and
7`rdf_loader_run` method.
9Registers files matching *.nq.gz using ld_dir/ld_dir_all into DB.DBA.load_list,
10then runs a single `rdf_loader_run()` process to load the registered files.
12IMPORTANT:
13- Only files with the extension `.nq.gz` will be processed.
14- The data directory specified ('-d' or '--data-directory') MUST be
15 accessible by the Virtuoso server process itself.
16- This directory path MUST be listed in the 'DirsAllowed' parameter
17 within the Virtuoso INI file (e.g., virtuoso.ini).
18- When using Docker, data-directory is the path INSIDE the container.
19 Files will be accessed and loaded from within the container.
21Reference:
22- https://vos.openlinksw.com/owiki/wiki/VOS/VirtBulkRDFLoader
23"""
25import argparse
26import glob
27import logging
28import os
29import sys
31from virtuoso_utilities.isql_helpers import run_isql_command
33logger = logging.getLogger(__name__)
35DEFAULT_VIRTUOSO_HOST = "localhost"
36DEFAULT_VIRTUOSO_PORT = 1111
37DEFAULT_VIRTUOSO_USER = "dba"
39ISQL_PATH_HOST = "isql"
40ISQL_PATH_DOCKER = "isql"
41DOCKER_PATH = "docker"
42CHECKPOINT_INTERVAL = 60
43SCHEDULER_INTERVAL = 10
45NQ_GZ_PATTERN = '*.nq.gz'
48def find_nquads_files_local(directory, recursive=False):
49 """
50 Find all N-Quads Gzipped files (`*.nq.gz`) in a directory on local filesystem.
51 Returns a list of file paths.
52 """
53 pattern = NQ_GZ_PATTERN
54 if recursive:
55 matches = []
56 for root, _, _ in os.walk(directory):
57 path_pattern = os.path.join(root, pattern)
58 matches.extend(glob.glob(path_pattern))
59 return matches
60 else:
61 path_pattern = os.path.join(directory, pattern)
62 return glob.glob(path_pattern)
65def bulk_load(
66 data_directory: str,
67 password: str,
68 host: str = "localhost",
69 port: int = 1111,
70 user: str = "dba",
71 recursive: bool = False,
72 docker_container: str = None,
73 isql_path: str = ISQL_PATH_HOST,
74 docker_isql_path: str = ISQL_PATH_DOCKER,
75 docker_path: str = DOCKER_PATH,
76 container_data_directory: str = None,
77 log_level: str = "ERROR"
78) -> None:
79 """
80 Perform Virtuoso bulk loading of N-Quads files.
82 This function can be imported and called programmatically, avoiding subprocess overhead.
84 Args:
85 data_directory: Path to directory containing .nq.gz files (host path for file operations)
86 password: Virtuoso DBA password
87 host: Virtuoso server host
88 port: Virtuoso server port
89 user: Virtuoso username
90 recursive: Use ld_dir_all instead of ld_dir for recursive loading
91 docker_container: Docker container name (for ISQL commands only)
92 isql_path: Path to isql binary (local)
93 docker_isql_path: Path to isql binary inside Docker container
94 docker_path: Path to docker binary
95 container_data_directory: Path INSIDE container where Virtuoso accesses files (if different from data_directory)
96 log_level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL). Default: ERROR
98 Raises:
99 RuntimeError: If bulk load fails
100 """
102 logging.basicConfig(level=getattr(logging, log_level.upper()), format='%(levelname)s: %(message)s', force=True)
104 args = argparse.Namespace()
105 args.data_directory = data_directory
106 args.host = host
107 args.port = port
108 args.user = user
109 args.password = password
110 args.recursive = recursive
111 args.docker_container = docker_container
112 args.isql_path = isql_path
113 args.docker_isql_path = docker_isql_path
114 args.docker_path = docker_path
115 args.container_data_directory = container_data_directory
117 data_dir = args.data_directory
118 container_dir = args.container_data_directory if args.container_data_directory else data_dir
120 files = find_nquads_files_local(data_dir, args.recursive)
122 if not files:
123 logger.warning(f"No files matching '{NQ_GZ_PATTERN}' found in '{data_dir}'.")
124 return
126 # Clean up any leftover entries from previous bulk loads
127 cleanup_sql = "DELETE FROM DB.DBA.load_list;"
128 run_isql_command(args, sql_command=cleanup_sql)
130 ld_function = "ld_dir_all" if args.recursive else "ld_dir"
132 container_dir_sql_escaped = container_dir.replace("'", "''")
133 file_pattern_sql_escaped = NQ_GZ_PATTERN.replace("'", "''")
135 # Use empty string to let Virtuoso read graph URIs from nquads (4th field)
136 register_sql = f"{ld_function}('{container_dir_sql_escaped}', '{file_pattern_sql_escaped}', '');"
137 success_reg, _, stderr_reg = run_isql_command(args, sql_command=register_sql)
139 if not success_reg or "Unable to list files" in stderr_reg or "FA020" in stderr_reg:
140 raise RuntimeError(f"Failed to register files using {ld_function}.\nError: {stderr_reg}")
142 loader_sql = "rdf_loader_run();"
143 success_load, _, stderr_load = run_isql_command(args, sql_command=loader_sql)
145 if not success_load:
146 raise RuntimeError(f"rdf_loader_run() failed.\nError: {stderr_load}")
148 stats_sql = "SELECT COUNT(*) AS total_files, SUM(CASE WHEN ll_state = 2 THEN 1 ELSE 0 END) AS loaded, SUM(CASE WHEN ll_state <> 2 OR ll_error IS NOT NULL THEN 1 ELSE 0 END) AS issues FROM DB.DBA.load_list;"
149 success_stats, stdout_stats, _ = run_isql_command(args, sql_command=stats_sql)
151 if success_stats:
152 lines = stdout_stats.strip().splitlines()
153 for i, line in enumerate(lines):
154 if i > 3 and not line.endswith("Rows.") and "INTEGER" not in line and "VARCHAR" not in line:
155 parts = line.split()
156 if len(parts) >= 3 and parts[0].isdigit():
157 total_files = int(parts[0])
158 loaded_files = int(parts[1]) if parts[1] != "NULL" else 0
159 issues = int(parts[2]) if parts[2] != "NULL" else 0
161 if total_files != loaded_files or issues != 0:
162 failed_sql = "SELECT ll_file FROM DB.DBA.load_list WHERE ll_state <> 2 OR ll_error IS NOT NULL;"
163 success_failed, stdout_failed, _ = run_isql_command(args, sql_command=failed_sql)
164 failed_files = []
165 if success_failed:
166 for line in stdout_failed.strip().splitlines():
167 line = line.strip()
168 if line and not line.startswith("ll_file") and not line.endswith("Rows.") and line != "VARCHAR":
169 failed_files.append(line)
170 raise RuntimeError(
171 f"Bulk load failed: {issues} file(s) had issues.\n"
172 f"Failed files:\n" + "\n".join(f" - {f}" for f in failed_files)
173 )
174 break
176 delete_loaded_sql = "DELETE FROM DB.DBA.load_list WHERE ll_state = 2;"
177 success_delete, _, stderr_delete = run_isql_command(args, sql_command=delete_loaded_sql)
178 if not success_delete:
179 raise RuntimeError(f"Failed to clean up load_list table.\nError: {stderr_delete}")
181 cleanup_sql = f"log_enable(3, 1); checkpoint; checkpoint_interval({CHECKPOINT_INTERVAL}); scheduler_interval({SCHEDULER_INTERVAL});"
182 success_final, _, stderr_final = run_isql_command(args, sql_command=cleanup_sql)
183 if not success_final:
184 raise RuntimeError(f"Failed to run final checkpoint.\nError details: {stderr_final}")
187def main(): # pragma: no cover
188 """
189 CLI entry point that parses arguments and calls bulk_load().
190 """
191 parser = argparse.ArgumentParser(
192 description=f"Sequential N-Quads Gzipped (`{NQ_GZ_PATTERN}`) bulk loader for OpenLink Virtuoso using ld_dir/rdf_loader_run.",
193 formatter_class=argparse.RawDescriptionHelpFormatter,
194 epilog="""\
195Example usage:
196 # Load all *.nq.gz files from /data/rdf (local mode)
197 python bulk_load.py -d /data/rdf -k mypassword
199 # Load *.nq.gz recursively using Docker, files at /database/data in container
200 python bulk_load.py -d /database/data -k mypassword --recursive \
201 --docker-container virtuoso_container
203IMPORTANT:
204- Only files with the extension `.nq.gz` will be loaded.
205- The data directory (-d) must be accessible by the Virtuoso process
206 and listed in the 'DirsAllowed' setting in virtuoso.ini.
207- When using Docker mode, data-directory is the path INSIDE the container.
208 Files are accessed and loaded directly inside the container.
209"""
210 )
212 parser.add_argument("-d", "--data-directory", required=True,
213 help="Path to the N-Quads Gzipped (`.nq.gz`) files. When using Docker, this must be the path INSIDE the container.")
214 parser.add_argument("-H", "--host", default=DEFAULT_VIRTUOSO_HOST,
215 help=f"Virtuoso server host (Default: {DEFAULT_VIRTUOSO_HOST}).")
216 parser.add_argument("-P", "--port", type=int, default=DEFAULT_VIRTUOSO_PORT,
217 help=f"Virtuoso server port (Default: {DEFAULT_VIRTUOSO_PORT}).")
218 parser.add_argument("-u", "--user", default=DEFAULT_VIRTUOSO_USER,
219 help=f"Virtuoso username (Default: {DEFAULT_VIRTUOSO_USER}).")
220 parser.add_argument("-k", "--password", required=True,
221 help="Virtuoso password.")
222 parser.add_argument("--recursive", action='store_true',
223 help="Load files recursively from subdirectories (uses ld_dir_all).")
224 parser.add_argument("--log-level", default="ERROR", choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
225 help="Logging level (Default: ERROR).")
227 docker_group = parser.add_argument_group('Docker Options')
228 docker_group.add_argument("--docker-container",
229 help="Name or ID of the running Virtuoso Docker container. If provided, 'isql' will be run via 'docker exec'.")
231 args = parser.parse_args()
233 try:
234 bulk_load(
235 data_directory=args.data_directory,
236 password=args.password,
237 host=args.host,
238 port=args.port,
239 user=args.user,
240 recursive=args.recursive,
241 docker_container=args.docker_container,
242 log_level=args.log_level
243 )
244 sys.exit(0)
245 except RuntimeError as e:
246 logger.error(str(e))
247 sys.exit(1)
250if __name__ == "__main__": # pragma: no cover
251 main()