Coverage for oc_meta / lib / sparql.py: 65%

34 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-21 09:24 +0000

1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5from __future__ import annotations 

6 

7import multiprocessing 

8from concurrent.futures import ProcessPoolExecutor, as_completed 

9from typing import Callable 

10 

11from sparqlite import SPARQLClient 

12 

13from oc_meta.constants import QLEVER_MAX_WORKERS, QLEVER_QUERIES_PER_GROUP 

14 

15 

16def execute_sparql_queries( 

17 endpoint_url: str, 

18 queries: list[str], 

19 max_retries: int = 5, 

20 backoff_factor: int = 5, 

21) -> list: 

22 results = [] 

23 with SPARQLClient(endpoint_url, max_retries=max_retries, backoff_factor=backoff_factor, timeout=3600) as client: 

24 for query in queries: 

25 result = client.query(query) 

26 results.append(result['results']['bindings'] if result else []) 

27 return results 

28 

29 

30def run_queries_parallel( 

31 endpoint_url: str, 

32 batch_queries: list[str], 

33 batch_sizes: list[int], 

34 workers: int = QLEVER_MAX_WORKERS, 

35 progress_callback: Callable[[int], None] | None = None, 

36 max_retries: int = 5, 

37 backoff_factor: int = 5, 

38) -> list[list]: 

39 if not batch_queries: 

40 return [] 

41 

42 all_bindings: list[list] = [] 

43 

44 if len(batch_queries) > 1 and workers > 1: 

45 query_groups: list[list[str]] = [] 

46 grouped_sizes: list[int] = [] 

47 for i in range(0, len(batch_queries), QLEVER_QUERIES_PER_GROUP): 

48 query_groups.append(batch_queries[i:i + QLEVER_QUERIES_PER_GROUP]) 

49 grouped_sizes.append(sum(batch_sizes[i:i + QLEVER_QUERIES_PER_GROUP])) 

50 

51 with ProcessPoolExecutor( 

52 max_workers=min(len(query_groups), workers), 

53 mp_context=multiprocessing.get_context('forkserver') 

54 ) as executor: 

55 future_to_size = { 

56 executor.submit( 

57 execute_sparql_queries, 

58 endpoint_url=endpoint_url, 

59 queries=group, 

60 max_retries=max_retries, 

61 backoff_factor=backoff_factor, 

62 ): size 

63 for group, size in zip(query_groups, grouped_sizes) 

64 } 

65 for future in as_completed(future_to_size): 

66 all_bindings.extend(future.result()) 

67 if progress_callback: 

68 progress_callback(future_to_size[future]) 

69 else: 

70 results = execute_sparql_queries( 

71 endpoint_url=endpoint_url, 

72 queries=batch_queries, 

73 max_retries=max_retries, 

74 backoff_factor=backoff_factor, 

75 ) 

76 all_bindings.extend(results) 

77 if progress_callback: 

78 progress_callback(sum(batch_sizes)) 

79 

80 return all_bindings