Coverage for oc_meta / run / count / meta_entities.py: 72%

160 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-21 09:24 +0000

1#!/usr/bin/python 

2 

3# Copyright 2025 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

5# 

6# SPDX-License-Identifier: ISC 

7 

8from __future__ import annotations 

9 

10import argparse 

11import os 

12import sys 

13import multiprocessing 

14from concurrent.futures import ProcessPoolExecutor, as_completed 

15from typing import Dict, Set 

16 

17from rich_argparse import RichHelpFormatter 

18from sparqlite import SPARQLClient 

19 

20from oc_meta.lib.console import create_progress 

21from oc_meta.lib.file_manager import get_csv_data 

22from oc_meta.lib.master_of_regex import split_name_and_ids 

23 

24 

25def _count_venues_in_file(filepath: str) -> Set[str]: 

26 csv_data = get_csv_data(filepath) 

27 venues = set() 

28 for row in csv_data: 

29 if not row['venue']: 

30 continue 

31 venue_name, venue_ids_str = split_name_and_ids(row['venue']) 

32 if not venue_ids_str: 

33 continue 

34 venue_ids = set(venue_ids_str.split()) 

35 venue_metaid = next( 

36 identifier for identifier in venue_ids 

37 if identifier.split(':', maxsplit=1)[0] == 'omid' 

38 ) 

39 if not venue_ids.difference({venue_metaid}): 

40 venues.add(venue_name.lower()) 

41 else: 

42 venues.add(venue_metaid) 

43 return venues 

44 

45 

46class OCMetaStatistics: 

47 def __init__(self, sparql_endpoint: str, csv_dump_path: str | None = None, max_retries: int = 3, retry_delay: int = 5): 

48 self.sparql_endpoint = sparql_endpoint 

49 self.csv_dump_path = csv_dump_path 

50 self.max_retries = max_retries 

51 self.retry_delay = retry_delay 

52 self.client = SPARQLClient(sparql_endpoint, max_retries=max_retries, backoff_factor=retry_delay, timeout=3600) 

53 

54 def _execute_sparql_query(self, query: str) -> Dict: 

55 try: 

56 return self.client.query(query) 

57 except Exception as e: 

58 print(f"Query failed after {self.max_retries} retries.", file=sys.stderr) 

59 raise Exception("SPARQL query failed after multiple retries.") from e 

60 

61 def __enter__(self): 

62 return self 

63 

64 def __exit__(self, exc_type, exc_val, exc_tb): 

65 self.close() 

66 return False 

67 

68 def close(self): 

69 self.client.close() 

70 

71 def count_expressions(self) -> int: 

72 query = """ 

73 PREFIX fabio: <http://purl.org/spar/fabio/> 

74 

75 SELECT (COUNT(DISTINCT ?expression) AS ?count) 

76 WHERE { 

77 ?expression a fabio:Expression . 

78 } 

79 """ 

80 results = self._execute_sparql_query(query) 

81 return int(results["results"]["bindings"][0]["count"]["value"]) 

82 

83 def count_role_entities(self) -> Dict[str, int]: 

84 query = """ 

85 PREFIX pro: <http://purl.org/spar/pro/> 

86 

87 SELECT ?role (COUNT(DISTINCT ?roleInTime) AS ?count) 

88 WHERE { 

89 ?roleInTime pro:withRole ?role . 

90 FILTER(?role IN (pro:author, pro:publisher, pro:editor)) 

91 } 

92 GROUP BY ?role 

93 """ 

94 results = self._execute_sparql_query(query) 

95 

96 role_counts = { 

97 'pro:author': 0, 

98 'pro:publisher': 0, 

99 'pro:editor': 0 

100 } 

101 

102 for binding in results["results"]["bindings"]: 

103 role_uri = binding["role"]["value"] 

104 count = int(binding["count"]["value"]) 

105 

106 if role_uri == "http://purl.org/spar/pro/author": 

107 role_counts['pro:author'] = count 

108 elif role_uri == "http://purl.org/spar/pro/publisher": 

109 role_counts['pro:publisher'] = count 

110 elif role_uri == "http://purl.org/spar/pro/editor": 

111 role_counts['pro:editor'] = count 

112 

113 return role_counts 

114 

115 def count_venues_from_csv(self) -> int: 

116 if not self.csv_dump_path: 

117 raise ValueError("CSV dump path is required to count venues") 

118 

119 filenames = sorted(os.listdir(self.csv_dump_path)) 

120 filepaths = [os.path.join(self.csv_dump_path, f) for f in filenames if f.endswith('.csv')] 

121 

122 all_venues: Set[str] = set() 

123 

124 with create_progress() as progress: 

125 task = progress.add_task("Counting venues from CSV files...", total=len(filepaths)) 

126 

127 # Use forkserver to avoid deadlocks when forking in a multi-threaded environment 

128 with ProcessPoolExecutor(mp_context=multiprocessing.get_context('forkserver')) as executor: 

129 futures = {executor.submit(_count_venues_in_file, fp): fp for fp in filepaths} 

130 for future in as_completed(futures): 

131 venues = future.result() 

132 all_venues.update(venues) 

133 progress.update(task, advance=1) 

134 

135 return len(all_venues) 

136 

137 def run_selected_analyses(self, analyze_br: bool, analyze_ar: bool, analyze_venues: bool) -> Dict: 

138 print("Starting dataset statistics...") 

139 print(f"Connected to endpoint: {self.sparql_endpoint}") 

140 if self.csv_dump_path: 

141 print(f"CSV dump path: {self.csv_dump_path}") 

142 print() 

143 

144 results = {} 

145 

146 if analyze_br: 

147 print("1. Counting fabio:Expression entities...") 

148 try: 

149 expressions_count = self.count_expressions() 

150 results['fabio_expressions'] = expressions_count 

151 print(f" Found {expressions_count:,} fabio:Expression entities") 

152 except Exception as e: 

153 print(f" Error: {e}") 

154 results['fabio_expressions'] = None 

155 print() 

156 

157 if analyze_ar: 

158 print("2. Counting pro:author, pro:publisher and pro:editor roles...") 

159 try: 

160 role_counts = self.count_role_entities() 

161 results['roles'] = role_counts 

162 print(f" Found {role_counts['pro:author']:,} pro:author roles") 

163 print(f" Found {role_counts['pro:publisher']:,} pro:publisher roles") 

164 print(f" Found {role_counts['pro:editor']:,} pro:editor roles") 

165 except Exception as e: 

166 print(f" Error: {e}") 

167 results['roles'] = None 

168 print() 

169 

170 if analyze_venues: 

171 print("3. Counting venues from CSV dump...") 

172 if not self.csv_dump_path: 

173 print(" Error: CSV dump path is required for venue counting") 

174 results['venues'] = None 

175 else: 

176 try: 

177 venues_count = self.count_venues_from_csv() 

178 results['venues'] = venues_count 

179 print(f" Found {venues_count:,} distinct venues") 

180 except Exception as e: 

181 print(f" Error: {e}") 

182 results['venues'] = None 

183 print() 

184 

185 print("Statistics completed!") 

186 return results 

187 

188 def run_all_analyses(self) -> Dict: 

189 return self.run_selected_analyses(analyze_br=True, analyze_ar=True, analyze_venues=True) 

190 

191 

192def main(): 

193 parser = argparse.ArgumentParser( 

194 description='Compute OpenCitations Meta dataset statistics', 

195 formatter_class=RichHelpFormatter, 

196 epilog=""" 

197Examples: 

198 # Run all statistics 

199 python -m oc_meta.run.count.meta_entities http://localhost:8890/sparql --csv /path/to/csv/dump 

200 

201 # Count only bibliographic resources 

202 python -m oc_meta.run.count.meta_entities http://localhost:8890/sparql --br 

203 

204 # Count only roles 

205 python -m oc_meta.run.count.meta_entities http://localhost:8890/sparql --ar 

206 

207 # Count only venues (requires CSV dump) 

208 python -m oc_meta.run.count.meta_entities http://localhost:8890/sparql --venues --csv /path/to/csv/dump 

209 

210Statistics computed: 

211 --br: Count fabio:Expression entities (via SPARQL) 

212 --ar: Count pro:author, pro:publisher and pro:editor roles (via SPARQL) 

213 --venues: Count distinct venues with disambiguation (via CSV dump) 

214 

215If no specific options are provided, all statistics will be computed. 

216 """ 

217 ) 

218 

219 parser.add_argument( 

220 'sparql_endpoint', 

221 help='SPARQL endpoint URL' 

222 ) 

223 

224 parser.add_argument( 

225 '--csv', 

226 dest='csv_dump_path', 

227 help='Path to CSV dump directory (required for venue counting)' 

228 ) 

229 

230 parser.add_argument( 

231 '--br', 

232 action='store_true', 

233 help='Count bibliographic resources (fabio:Expression entities)' 

234 ) 

235 

236 parser.add_argument( 

237 '--ar', 

238 action='store_true', 

239 help='Count roles (pro:author, pro:publisher, pro:editor)' 

240 ) 

241 

242 parser.add_argument( 

243 '--venues', 

244 action='store_true', 

245 help='Count distinct venues (requires --csv)' 

246 ) 

247 

248 args = parser.parse_args() 

249 

250 analyze_br = args.br or not (args.br or args.ar or args.venues) 

251 analyze_ar = args.ar or not (args.br or args.ar or args.venues) 

252 analyze_venues = args.venues or not (args.br or args.ar or args.venues) 

253 

254 if analyze_venues and not args.csv_dump_path: 

255 print("Error: --csv is required for venue counting", file=sys.stderr) 

256 sys.exit(1) 

257 

258 try: 

259 with OCMetaStatistics(args.sparql_endpoint, args.csv_dump_path) as stats: 

260 results = stats.run_selected_analyses(analyze_br, analyze_ar, analyze_venues) 

261 

262 print("\n" + "="*50) 

263 print("SUMMARY") 

264 print("="*50) 

265 

266 if results.get('fabio_expressions') is not None: 

267 print(f"fabio:Expression entities: {results['fabio_expressions']:,}") 

268 

269 if results.get('roles'): 

270 print(f"pro:author roles: {results['roles']['pro:author']:,}") 

271 print(f"pro:publisher roles: {results['roles']['pro:publisher']:,}") 

272 print(f"pro:editor roles: {results['roles']['pro:editor']:,}") 

273 

274 if results.get('venues') is not None: 

275 print(f"Distinct venues: {results['venues']:,}") 

276 

277 return results 

278 

279 except Exception as e: 

280 print(f"Statistics failed: {e}", file=sys.stderr) 

281 sys.exit(1) 

282 

283 

284if __name__ == "__main__": 

285 main()