Coverage for oc_meta / run / migration / extract_subset.py: 13%

82 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-03 17:25 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3 

4import argparse 

5import gzip 

6import sys 

7from urllib.parse import urlparse 

8 

9import rdflib 

10from rich_argparse import RichHelpFormatter 

11from sparqlite import SPARQLClient 

12 

13 

14def get_subjects_of_class(client, class_uri, limit): 

15 """Get subjects that are instances of the specified class""" 

16 query = f""" 

17 SELECT ?s 

18 WHERE {{ 

19 ?s a <{class_uri}> . 

20 }} 

21 LIMIT {limit} 

22 """ 

23 

24 results = client.query(query) 

25 

26 return [result["s"]["value"] for result in results["results"]["bindings"]] 

27 

28 

29def get_subjects_by_predicate(client, predicate_uri, limit): 

30 query = f""" 

31 SELECT ?s 

32 WHERE {{ 

33 ?s <{predicate_uri}> ?o . 

34 }} 

35 LIMIT {limit} 

36 """ 

37 results = client.query(query) 

38 return [result["s"]["value"] for result in results["results"]["bindings"]] 

39 

40 

41def get_triples_for_entity(client, entity_uri, use_graphs=True): 

42 if use_graphs: 

43 query = f""" 

44 SELECT ?p ?o ?g 

45 WHERE {{ 

46 GRAPH ?g {{ 

47 <{entity_uri}> ?p ?o . 

48 }} 

49 }} 

50 """ 

51 else: 

52 query = f""" 

53 SELECT ?p ?o 

54 WHERE {{ 

55 <{entity_uri}> ?p ?o . 

56 }} 

57 """ 

58 

59 results = client.query(query) 

60 

61 s_term = rdflib.URIRef(entity_uri) 

62 quads = [] 

63 

64 for result in results["results"]["bindings"]: 

65 p_value = result["p"]["value"] 

66 p_term = rdflib.URIRef(p_value) 

67 

68 o_value = result["o"]["value"] 

69 o_type = result["o"]["type"] 

70 

71 g_term = None 

72 if "g" in result: 

73 g_value = result["g"]["value"] 

74 g_term = rdflib.URIRef(g_value) 

75 

76 if o_type == 'uri': 

77 o_term = rdflib.URIRef(o_value) 

78 elif o_type == 'bnode': 

79 o_term = rdflib.BNode(o_value) 

80 elif o_type in {'literal', 'typed-literal'}: 

81 if 'datatype' in result["o"]: 

82 datatype = result["o"]["datatype"] 

83 o_term = rdflib.Literal(o_value, datatype=datatype) 

84 elif 'xml:lang' in result["o"]: 

85 lang = result["o"]["xml:lang"] 

86 o_term = rdflib.Literal(o_value, lang=lang) 

87 else: 

88 o_term = rdflib.Literal(o_value) 

89 else: 

90 o_term = rdflib.Literal(o_value) 

91 

92 quads.append((s_term, p_term, o_term, g_term)) 

93 

94 return quads 

95 

96 

97def extract_subset( 

98 endpoint, limit, output_file, compress, max_retries=5, 

99 class_uri=None, predicate_uri=None, use_graphs=True, recurse=True, 

100): 

101 with SPARQLClient(endpoint, max_retries=max_retries, backoff_factor=2, timeout=3600) as client: 

102 if predicate_uri: 

103 subjects = get_subjects_by_predicate(client, predicate_uri, limit) 

104 else: 

105 subjects = get_subjects_of_class(client, class_uri, limit) 

106 

107 processed_entities: set[str] = set() 

108 pending_entities = set(subjects) 

109 

110 dataset: rdflib.Dataset | None = None 

111 graph: rdflib.Graph | None = None 

112 if use_graphs: 

113 dataset = rdflib.Dataset() 

114 else: 

115 graph = rdflib.Graph() 

116 

117 while pending_entities: 

118 entity = pending_entities.pop() 

119 if entity in processed_entities: 

120 continue 

121 

122 processed_entities.add(entity) 

123 

124 quads = get_triples_for_entity(client, entity, use_graphs) 

125 

126 for s_term, p_term, o_term, g_term in quads: 

127 if dataset is not None: 

128 named_graph = dataset.graph(g_term) 

129 named_graph.add((s_term, p_term, o_term)) 

130 elif graph is not None: 

131 graph.add((s_term, p_term, o_term)) 

132 

133 if recurse and isinstance(o_term, rdflib.URIRef): 

134 pending_entities.add(str(o_term)) 

135 

136 store = dataset if dataset is not None else graph 

137 assert store is not None 

138 output_format = "nquads" if use_graphs else "nt" 

139 if compress: 

140 if not output_file.endswith('.gz'): 

141 output_file = output_file + '.gz' 

142 with gzip.open(output_file, 'wb') as f: 

143 store.serialize(destination=f, format=output_format) # type: ignore[arg-type] 

144 else: 

145 store.serialize(destination=output_file, format=output_format) 

146 

147 return len(processed_entities), output_file 

148 

149 

150def main(): # pragma: no cover 

151 parser = argparse.ArgumentParser( 

152 description='Extract a subset of data from a SPARQL endpoint', 

153 formatter_class=RichHelpFormatter, 

154 ) 

155 parser.add_argument('--endpoint', default='http://localhost:8890/sparql', 

156 help='SPARQL endpoint URL (default: http://localhost:8890/sparql)') 

157 

158 discovery = parser.add_mutually_exclusive_group() 

159 discovery.add_argument('--class', dest='class_uri', 

160 help='Class URI to extract instances of (default: fabio:Expression)') 

161 discovery.add_argument('--predicate', dest='predicate_uri', 

162 help='Predicate URI for entity discovery (alternative to --class)') 

163 

164 parser.add_argument('--limit', type=int, default=1000, 

165 help='Maximum number of initial entities to process (default: 1000)') 

166 parser.add_argument('--output', default='output.nq', 

167 help='Output file name (default: output.nq)') 

168 parser.add_argument('--compress', action='store_true', 

169 help='Compress output file using gzip') 

170 parser.add_argument('--retries', type=int, default=5, 

171 help='Maximum number of retries for failed queries (default: 5)') 

172 parser.add_argument('--no-graphs', action='store_true', 

173 help='Disable named graph queries and output N-Triples instead of N-Quads') 

174 parser.add_argument('--no-recurse', action='store_true', 

175 help='Do not recursively follow URI objects') 

176 

177 args = parser.parse_args() 

178 

179 if not args.class_uri and not args.predicate_uri: 

180 args.class_uri = 'http://purl.org/spar/fabio/Expression' 

181 

182 try: 

183 parsed_url = urlparse(args.endpoint) 

184 if not all([parsed_url.scheme, parsed_url.netloc]): 

185 raise ValueError("Invalid endpoint URL") 

186 except Exception: 

187 print(f"Error: Invalid endpoint URL: {args.endpoint}") 

188 return 1 

189 

190 try: 

191 entity_count, final_output_file = extract_subset( 

192 args.endpoint, 

193 args.limit, 

194 args.output, 

195 args.compress, 

196 args.retries, 

197 class_uri=args.class_uri, 

198 predicate_uri=args.predicate_uri, 

199 use_graphs=not args.no_graphs, 

200 recurse=not args.no_recurse, 

201 ) 

202 

203 print(f"Extraction complete. Processed {entity_count} entities.") 

204 print(f"Output saved to {final_output_file}") 

205 

206 return 0 

207 except Exception as e: 

208 print(f"Error: {e}") 

209 return 1 

210 

211 

212if __name__ == "__main__": # pragma: no cover 

213 sys.exit(main())