Coverage for oc_meta / lib / sparql.py: 65%
34 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5from __future__ import annotations
7import multiprocessing
8from concurrent.futures import ProcessPoolExecutor, as_completed
9from typing import Callable
11from sparqlite import SPARQLClient
13from oc_meta.constants import QLEVER_MAX_WORKERS, QLEVER_QUERIES_PER_GROUP
16def execute_sparql_queries(
17 endpoint_url: str,
18 queries: list[str],
19 max_retries: int = 5,
20 backoff_factor: int = 5,
21) -> list:
22 results = []
23 with SPARQLClient(endpoint_url, max_retries=max_retries, backoff_factor=backoff_factor, timeout=3600) as client:
24 for query in queries:
25 result = client.query(query)
26 results.append(result['results']['bindings'] if result else [])
27 return results
30def run_queries_parallel(
31 endpoint_url: str,
32 batch_queries: list[str],
33 batch_sizes: list[int],
34 workers: int = QLEVER_MAX_WORKERS,
35 progress_callback: Callable[[int], None] | None = None,
36 max_retries: int = 5,
37 backoff_factor: int = 5,
38) -> list[list]:
39 if not batch_queries:
40 return []
42 all_bindings: list[list] = []
44 if len(batch_queries) > 1 and workers > 1:
45 query_groups: list[list[str]] = []
46 grouped_sizes: list[int] = []
47 for i in range(0, len(batch_queries), QLEVER_QUERIES_PER_GROUP):
48 query_groups.append(batch_queries[i:i + QLEVER_QUERIES_PER_GROUP])
49 grouped_sizes.append(sum(batch_sizes[i:i + QLEVER_QUERIES_PER_GROUP]))
51 with ProcessPoolExecutor(
52 max_workers=min(len(query_groups), workers),
53 mp_context=multiprocessing.get_context('forkserver')
54 ) as executor:
55 future_to_size = {
56 executor.submit(
57 execute_sparql_queries,
58 endpoint_url=endpoint_url,
59 queries=group,
60 max_retries=max_retries,
61 backoff_factor=backoff_factor,
62 ): size
63 for group, size in zip(query_groups, grouped_sizes)
64 }
65 for future in as_completed(future_to_size):
66 all_bindings.extend(future.result())
67 if progress_callback:
68 progress_callback(future_to_size[future])
69 else:
70 results = execute_sparql_queries(
71 endpoint_url=endpoint_url,
72 queries=batch_queries,
73 max_retries=max_retries,
74 backoff_factor=backoff_factor,
75 )
76 all_bindings.extend(results)
77 if progress_callback:
78 progress_callback(sum(batch_sizes))
80 return all_bindings