Coverage for virtuoso_utilities / bulk_load.py: 87%
89 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-04-14 09:16 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-04-14 09:16 +0000
1#!/usr/bin/env python3
3# SPDX-FileCopyrightText: 2025 Arcangelo Massari <arcangelo.massari@unibo.it>
4# SPDX-FileCopyrightText: 2025 eliarizzetto <elia.rizzetto@gmail.com>
5#
6# SPDX-License-Identifier: ISC
8# -*- coding: utf-8 -*-
10"""
11Performs sequential bulk loading of RDF N-Quads Gzipped files (`.nq.gz`)
12into OpenLink Virtuoso using the official `ld_dir`/`ld_dir_all` and
13`rdf_loader_run` method.
15Registers files matching *.nq.gz using ld_dir/ld_dir_all into DB.DBA.load_list,
16then runs a single `rdf_loader_run()` process to load the registered files.
18IMPORTANT:
19- Only files with the extension `.nq.gz` will be processed.
20- The data directory specified ('-d' or '--data-directory') MUST be
21 accessible by the Virtuoso server process itself.
22- This directory path MUST be listed in the 'DirsAllowed' parameter
23 within the Virtuoso INI file (e.g., virtuoso.ini).
24- When using Docker, data-directory is the path INSIDE the container.
25 Files will be accessed and loaded from within the container.
27Reference:
28- https://vos.openlinksw.com/owiki/wiki/VOS/VirtBulkRDFLoader
29"""
31import argparse
32import glob
33import logging
34import os
35import sys
37from virtuoso_utilities.isql_helpers import run_isql_command
39logger = logging.getLogger(__name__)
41DEFAULT_VIRTUOSO_HOST = "localhost"
42DEFAULT_VIRTUOSO_PORT = 1111
43DEFAULT_VIRTUOSO_USER = "dba"
45ISQL_PATH_HOST = "isql"
46ISQL_PATH_DOCKER = "isql"
47DOCKER_PATH = "docker"
48CHECKPOINT_INTERVAL = 60
49SCHEDULER_INTERVAL = 10
51NQ_GZ_PATTERN = '*.nq.gz'
54def find_nquads_files_local(directory, recursive=False):
55 """
56 Find all N-Quads Gzipped files (`*.nq.gz`) in a directory on local filesystem.
57 Returns a list of file paths.
58 """
59 pattern = NQ_GZ_PATTERN
60 if recursive:
61 matches = []
62 for root, _, _ in os.walk(directory):
63 path_pattern = os.path.join(root, pattern)
64 matches.extend(glob.glob(path_pattern))
65 return matches
66 else:
67 path_pattern = os.path.join(directory, pattern)
68 return glob.glob(path_pattern)
71def bulk_load(
72 data_directory: str,
73 password: str,
74 host: str = "localhost",
75 port: int = 1111,
76 user: str = "dba",
77 recursive: bool = False,
78 docker_container: str = None,
79 isql_path: str = ISQL_PATH_HOST,
80 docker_isql_path: str = ISQL_PATH_DOCKER,
81 docker_path: str = DOCKER_PATH,
82 container_data_directory: str = None,
83 log_level: str = "ERROR"
84) -> None:
85 """
86 Perform Virtuoso bulk loading of N-Quads files.
88 This function can be imported and called programmatically, avoiding subprocess overhead.
90 Args:
91 data_directory: Path to directory containing .nq.gz files (host path for file operations)
92 password: Virtuoso DBA password
93 host: Virtuoso server host
94 port: Virtuoso server port
95 user: Virtuoso username
96 recursive: Use ld_dir_all instead of ld_dir for recursive loading
97 docker_container: Docker container name (for ISQL commands only)
98 isql_path: Path to isql binary (local)
99 docker_isql_path: Path to isql binary inside Docker container
100 docker_path: Path to docker binary
101 container_data_directory: Path INSIDE container where Virtuoso accesses files (if different from data_directory)
102 log_level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL). Default: ERROR
104 Raises:
105 RuntimeError: If bulk load fails
106 """
108 logging.basicConfig(level=getattr(logging, log_level.upper()), format='%(levelname)s: %(message)s', force=True)
110 args = argparse.Namespace()
111 args.data_directory = data_directory
112 args.host = host
113 args.port = port
114 args.user = user
115 args.password = password
116 args.recursive = recursive
117 args.docker_container = docker_container
118 args.isql_path = isql_path
119 args.docker_isql_path = docker_isql_path
120 args.docker_path = docker_path
121 args.container_data_directory = container_data_directory
123 data_dir = args.data_directory
124 container_dir = args.container_data_directory if args.container_data_directory else data_dir
126 files = find_nquads_files_local(data_dir, args.recursive)
128 if not files:
129 logger.warning(f"No files matching '{NQ_GZ_PATTERN}' found in '{data_dir}'.")
130 return
132 # Clean up any leftover entries from previous bulk loads
133 cleanup_sql = "DELETE FROM DB.DBA.load_list;"
134 run_isql_command(args, sql_command=cleanup_sql)
136 ld_function = "ld_dir_all" if args.recursive else "ld_dir"
138 container_dir_sql_escaped = container_dir.replace("'", "''")
139 file_pattern_sql_escaped = NQ_GZ_PATTERN.replace("'", "''")
141 # Use empty string to let Virtuoso read graph URIs from nquads (4th field)
142 register_sql = f"{ld_function}('{container_dir_sql_escaped}', '{file_pattern_sql_escaped}', '');"
143 success_reg, _, stderr_reg = run_isql_command(args, sql_command=register_sql)
145 if not success_reg or "Unable to list files" in stderr_reg or "FA020" in stderr_reg:
146 raise RuntimeError(f"Failed to register files using {ld_function}.\nError: {stderr_reg}")
148 loader_sql = "rdf_loader_run();"
149 success_load, _, stderr_load = run_isql_command(args, sql_command=loader_sql)
151 if not success_load:
152 raise RuntimeError(f"rdf_loader_run() failed.\nError: {stderr_load}")
154 stats_sql = "SELECT COUNT(*) AS total_files, SUM(CASE WHEN ll_state = 2 THEN 1 ELSE 0 END) AS loaded, SUM(CASE WHEN ll_state <> 2 OR ll_error IS NOT NULL THEN 1 ELSE 0 END) AS issues FROM DB.DBA.load_list;"
155 success_stats, stdout_stats, _ = run_isql_command(args, sql_command=stats_sql)
157 if success_stats:
158 lines = stdout_stats.strip().splitlines()
159 for i, line in enumerate(lines):
160 if i > 3 and not line.endswith("Rows.") and "INTEGER" not in line and "VARCHAR" not in line:
161 parts = line.split()
162 if len(parts) >= 3 and parts[0].isdigit():
163 total_files = int(parts[0])
164 loaded_files = int(parts[1]) if parts[1] != "NULL" else 0
165 issues = int(parts[2]) if parts[2] != "NULL" else 0
167 if total_files != loaded_files or issues != 0:
168 failed_sql = "SELECT ll_file FROM DB.DBA.load_list WHERE ll_state <> 2 OR ll_error IS NOT NULL;"
169 success_failed, stdout_failed, _ = run_isql_command(args, sql_command=failed_sql)
170 failed_files = []
171 if success_failed:
172 for line in stdout_failed.strip().splitlines():
173 line = line.strip()
174 if line and not line.startswith("ll_file") and not line.endswith("Rows.") and line != "VARCHAR":
175 failed_files.append(line)
176 raise RuntimeError(
177 f"Bulk load failed: {issues} file(s) had issues.\n"
178 f"Failed files:\n" + "\n".join(f" - {f}" for f in failed_files)
179 )
180 break
182 delete_loaded_sql = "DELETE FROM DB.DBA.load_list WHERE ll_state = 2;"
183 success_delete, _, stderr_delete = run_isql_command(args, sql_command=delete_loaded_sql)
184 if not success_delete:
185 raise RuntimeError(f"Failed to clean up load_list table.\nError: {stderr_delete}")
187 cleanup_sql = f"log_enable(3, 1); checkpoint; checkpoint_interval({CHECKPOINT_INTERVAL}); scheduler_interval({SCHEDULER_INTERVAL});"
188 success_final, _, stderr_final = run_isql_command(args, sql_command=cleanup_sql)
189 if not success_final:
190 raise RuntimeError(f"Failed to run final checkpoint.\nError details: {stderr_final}")
193def main(): # pragma: no cover
194 """
195 CLI entry point that parses arguments and calls bulk_load().
196 """
197 parser = argparse.ArgumentParser(
198 description=f"Sequential N-Quads Gzipped (`{NQ_GZ_PATTERN}`) bulk loader for OpenLink Virtuoso using ld_dir/rdf_loader_run.",
199 formatter_class=argparse.RawDescriptionHelpFormatter,
200 epilog="""\
201Example usage:
202 # Load all *.nq.gz files from /data/rdf (local mode)
203 python bulk_load.py -d /data/rdf -k mypassword
205 # Load *.nq.gz recursively using Docker, files at /database/data in container
206 python bulk_load.py -d /database/data -k mypassword --recursive \
207 --docker-container virtuoso_container
209IMPORTANT:
210- Only files with the extension `.nq.gz` will be loaded.
211- The data directory (-d) must be accessible by the Virtuoso process
212 and listed in the 'DirsAllowed' setting in virtuoso.ini.
213- When using Docker mode, data-directory is the path INSIDE the container.
214 Files are accessed and loaded directly inside the container.
215"""
216 )
218 parser.add_argument("-d", "--data-directory", required=True,
219 help="Path to the N-Quads Gzipped (`.nq.gz`) files. When using Docker, this must be the path INSIDE the container.")
220 parser.add_argument("-H", "--host", default=DEFAULT_VIRTUOSO_HOST,
221 help=f"Virtuoso server host (Default: {DEFAULT_VIRTUOSO_HOST}).")
222 parser.add_argument("-P", "--port", type=int, default=DEFAULT_VIRTUOSO_PORT,
223 help=f"Virtuoso server port (Default: {DEFAULT_VIRTUOSO_PORT}).")
224 parser.add_argument("-u", "--user", default=DEFAULT_VIRTUOSO_USER,
225 help=f"Virtuoso username (Default: {DEFAULT_VIRTUOSO_USER}).")
226 parser.add_argument("-k", "--password", required=True,
227 help="Virtuoso password.")
228 parser.add_argument("--recursive", action='store_true',
229 help="Load files recursively from subdirectories (uses ld_dir_all).")
230 parser.add_argument("--log-level", default="ERROR", choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
231 help="Logging level (Default: ERROR).")
233 docker_group = parser.add_argument_group('Docker Options')
234 docker_group.add_argument("--docker-container",
235 help="Name or ID of the running Virtuoso Docker container. If provided, 'isql' will be run via 'docker exec'.")
237 args = parser.parse_args()
239 try:
240 bulk_load(
241 data_directory=args.data_directory,
242 password=args.password,
243 host=args.host,
244 port=args.port,
245 user=args.user,
246 recursive=args.recursive,
247 docker_container=args.docker_container,
248 log_level=args.log_level
249 )
250 sys.exit(0)
251 except RuntimeError as e:
252 logger.error(str(e))
253 sys.exit(1)
256if __name__ == "__main__": # pragma: no cover
257 main()