Coverage for oc_meta / run / migration / extract_subset.py: 13%
82 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 17:25 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 17:25 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
4import argparse
5import gzip
6import sys
7from urllib.parse import urlparse
9import rdflib
10from rich_argparse import RichHelpFormatter
11from sparqlite import SPARQLClient
14def get_subjects_of_class(client, class_uri, limit):
15 """Get subjects that are instances of the specified class"""
16 query = f"""
17 SELECT ?s
18 WHERE {{
19 ?s a <{class_uri}> .
20 }}
21 LIMIT {limit}
22 """
24 results = client.query(query)
26 return [result["s"]["value"] for result in results["results"]["bindings"]]
29def get_subjects_by_predicate(client, predicate_uri, limit):
30 query = f"""
31 SELECT ?s
32 WHERE {{
33 ?s <{predicate_uri}> ?o .
34 }}
35 LIMIT {limit}
36 """
37 results = client.query(query)
38 return [result["s"]["value"] for result in results["results"]["bindings"]]
41def get_triples_for_entity(client, entity_uri, use_graphs=True):
42 if use_graphs:
43 query = f"""
44 SELECT ?p ?o ?g
45 WHERE {{
46 GRAPH ?g {{
47 <{entity_uri}> ?p ?o .
48 }}
49 }}
50 """
51 else:
52 query = f"""
53 SELECT ?p ?o
54 WHERE {{
55 <{entity_uri}> ?p ?o .
56 }}
57 """
59 results = client.query(query)
61 s_term = rdflib.URIRef(entity_uri)
62 quads = []
64 for result in results["results"]["bindings"]:
65 p_value = result["p"]["value"]
66 p_term = rdflib.URIRef(p_value)
68 o_value = result["o"]["value"]
69 o_type = result["o"]["type"]
71 g_term = None
72 if "g" in result:
73 g_value = result["g"]["value"]
74 g_term = rdflib.URIRef(g_value)
76 if o_type == 'uri':
77 o_term = rdflib.URIRef(o_value)
78 elif o_type == 'bnode':
79 o_term = rdflib.BNode(o_value)
80 elif o_type in {'literal', 'typed-literal'}:
81 if 'datatype' in result["o"]:
82 datatype = result["o"]["datatype"]
83 o_term = rdflib.Literal(o_value, datatype=datatype)
84 elif 'xml:lang' in result["o"]:
85 lang = result["o"]["xml:lang"]
86 o_term = rdflib.Literal(o_value, lang=lang)
87 else:
88 o_term = rdflib.Literal(o_value)
89 else:
90 o_term = rdflib.Literal(o_value)
92 quads.append((s_term, p_term, o_term, g_term))
94 return quads
97def extract_subset(
98 endpoint, limit, output_file, compress, max_retries=5,
99 class_uri=None, predicate_uri=None, use_graphs=True, recurse=True,
100):
101 with SPARQLClient(endpoint, max_retries=max_retries, backoff_factor=2, timeout=3600) as client:
102 if predicate_uri:
103 subjects = get_subjects_by_predicate(client, predicate_uri, limit)
104 else:
105 subjects = get_subjects_of_class(client, class_uri, limit)
107 processed_entities: set[str] = set()
108 pending_entities = set(subjects)
110 dataset: rdflib.Dataset | None = None
111 graph: rdflib.Graph | None = None
112 if use_graphs:
113 dataset = rdflib.Dataset()
114 else:
115 graph = rdflib.Graph()
117 while pending_entities:
118 entity = pending_entities.pop()
119 if entity in processed_entities:
120 continue
122 processed_entities.add(entity)
124 quads = get_triples_for_entity(client, entity, use_graphs)
126 for s_term, p_term, o_term, g_term in quads:
127 if dataset is not None:
128 named_graph = dataset.graph(g_term)
129 named_graph.add((s_term, p_term, o_term))
130 elif graph is not None:
131 graph.add((s_term, p_term, o_term))
133 if recurse and isinstance(o_term, rdflib.URIRef):
134 pending_entities.add(str(o_term))
136 store = dataset if dataset is not None else graph
137 assert store is not None
138 output_format = "nquads" if use_graphs else "nt"
139 if compress:
140 if not output_file.endswith('.gz'):
141 output_file = output_file + '.gz'
142 with gzip.open(output_file, 'wb') as f:
143 store.serialize(destination=f, format=output_format) # type: ignore[arg-type]
144 else:
145 store.serialize(destination=output_file, format=output_format)
147 return len(processed_entities), output_file
150def main(): # pragma: no cover
151 parser = argparse.ArgumentParser(
152 description='Extract a subset of data from a SPARQL endpoint',
153 formatter_class=RichHelpFormatter,
154 )
155 parser.add_argument('--endpoint', default='http://localhost:8890/sparql',
156 help='SPARQL endpoint URL (default: http://localhost:8890/sparql)')
158 discovery = parser.add_mutually_exclusive_group()
159 discovery.add_argument('--class', dest='class_uri',
160 help='Class URI to extract instances of (default: fabio:Expression)')
161 discovery.add_argument('--predicate', dest='predicate_uri',
162 help='Predicate URI for entity discovery (alternative to --class)')
164 parser.add_argument('--limit', type=int, default=1000,
165 help='Maximum number of initial entities to process (default: 1000)')
166 parser.add_argument('--output', default='output.nq',
167 help='Output file name (default: output.nq)')
168 parser.add_argument('--compress', action='store_true',
169 help='Compress output file using gzip')
170 parser.add_argument('--retries', type=int, default=5,
171 help='Maximum number of retries for failed queries (default: 5)')
172 parser.add_argument('--no-graphs', action='store_true',
173 help='Disable named graph queries and output N-Triples instead of N-Quads')
174 parser.add_argument('--no-recurse', action='store_true',
175 help='Do not recursively follow URI objects')
177 args = parser.parse_args()
179 if not args.class_uri and not args.predicate_uri:
180 args.class_uri = 'http://purl.org/spar/fabio/Expression'
182 try:
183 parsed_url = urlparse(args.endpoint)
184 if not all([parsed_url.scheme, parsed_url.netloc]):
185 raise ValueError("Invalid endpoint URL")
186 except Exception:
187 print(f"Error: Invalid endpoint URL: {args.endpoint}")
188 return 1
190 try:
191 entity_count, final_output_file = extract_subset(
192 args.endpoint,
193 args.limit,
194 args.output,
195 args.compress,
196 args.retries,
197 class_uri=args.class_uri,
198 predicate_uri=args.predicate_uri,
199 use_graphs=not args.no_graphs,
200 recurse=not args.no_recurse,
201 )
203 print(f"Extraction complete. Processed {entity_count} entities.")
204 print(f"Output saved to {final_output_file}")
206 return 0
207 except Exception as e:
208 print(f"Error: {e}")
209 return 1
212if __name__ == "__main__": # pragma: no cover
213 sys.exit(main())