Coverage for oc_meta / run / count / meta_entities.py: 71%

156 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-06-20 14:28 +0000

1#!/usr/bin/python 

2 

3# Copyright 2025 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

5# 

6# SPDX-License-Identifier: ISC 

7 

8from __future__ import annotations 

9 

10import argparse 

11import os 

12import sys 

13import multiprocessing 

14from concurrent.futures import ProcessPoolExecutor, as_completed 

15from typing import Dict, Set 

16 

17from rich_argparse import RichHelpFormatter 

18from oc_meta.lib.sparql import execute_sparql 

19 

20from oc_meta.lib.console import create_progress 

21from oc_meta.lib.file_manager import get_csv_data 

22from oc_meta.lib.master_of_regex import split_name_and_ids 

23 

24 

25def _count_venues_in_file(filepath: str) -> Set[str]: 

26 csv_data = get_csv_data(filepath) 

27 venues = set() 

28 for row in csv_data: 

29 if not row['venue']: 

30 continue 

31 venue_name, venue_ids_str = split_name_and_ids(row['venue']) 

32 if not venue_ids_str: 

33 continue 

34 venue_ids = set(venue_ids_str.split()) 

35 venue_metaid = next( 

36 identifier for identifier in venue_ids 

37 if identifier.split(':', maxsplit=1)[0] == 'omid' 

38 ) 

39 if not venue_ids.difference({venue_metaid}): 

40 venues.add(venue_name.lower()) 

41 else: 

42 venues.add(venue_metaid) 

43 return venues 

44 

45 

46class OCMetaStatistics: 

47 def __init__(self, sparql_endpoint: str, csv_dump_path: str | None = None, max_retries: int = 3, retry_delay: int = 5): 

48 self.sparql_endpoint = sparql_endpoint 

49 self.csv_dump_path = csv_dump_path 

50 self.max_retries = max_retries 

51 self.retry_delay = retry_delay 

52 

53 def _execute_sparql_query(self, query: str) -> Dict: 

54 try: 

55 return execute_sparql(self.sparql_endpoint, query, max_retries=self.max_retries, backoff_factor=self.retry_delay) 

56 except Exception as e: 

57 print(f"Query failed after {self.max_retries} retries.", file=sys.stderr) 

58 raise Exception("SPARQL query failed after multiple retries.") from e 

59 

60 def __enter__(self): 

61 return self 

62 

63 def __exit__(self, exc_type, exc_val, exc_tb): 

64 return False 

65 

66 def count_expressions(self) -> int: 

67 query = """ 

68 PREFIX fabio: <http://purl.org/spar/fabio/> 

69 

70 SELECT (COUNT(DISTINCT ?expression) AS ?count) 

71 WHERE { 

72 ?expression a fabio:Expression . 

73 } 

74 """ 

75 results = self._execute_sparql_query(query) 

76 return int(results["results"]["bindings"][0]["count"]["value"]) 

77 

78 def count_role_entities(self) -> Dict[str, int]: 

79 query = """ 

80 PREFIX pro: <http://purl.org/spar/pro/> 

81 

82 SELECT ?role (COUNT(DISTINCT ?roleInTime) AS ?count) 

83 WHERE { 

84 ?roleInTime pro:withRole ?role . 

85 FILTER(?role IN (pro:author, pro:publisher, pro:editor)) 

86 } 

87 GROUP BY ?role 

88 """ 

89 results = self._execute_sparql_query(query) 

90 

91 role_counts = { 

92 'pro:author': 0, 

93 'pro:publisher': 0, 

94 'pro:editor': 0 

95 } 

96 

97 for binding in results["results"]["bindings"]: 

98 role_uri = binding["role"]["value"] 

99 count = int(binding["count"]["value"]) 

100 

101 if role_uri == "http://purl.org/spar/pro/author": 

102 role_counts['pro:author'] = count 

103 elif role_uri == "http://purl.org/spar/pro/publisher": 

104 role_counts['pro:publisher'] = count 

105 elif role_uri == "http://purl.org/spar/pro/editor": 

106 role_counts['pro:editor'] = count 

107 

108 return role_counts 

109 

110 def count_venues_from_csv(self) -> int: 

111 if not self.csv_dump_path: 

112 raise ValueError("CSV dump path is required to count venues") 

113 

114 filenames = sorted(os.listdir(self.csv_dump_path)) 

115 filepaths = [os.path.join(self.csv_dump_path, f) for f in filenames if f.endswith('.csv')] 

116 

117 all_venues: Set[str] = set() 

118 

119 with create_progress() as progress: 

120 task = progress.add_task("Counting venues from CSV files...", total=len(filepaths)) 

121 

122 # Use forkserver to avoid deadlocks when forking in a multi-threaded environment 

123 with ProcessPoolExecutor(mp_context=multiprocessing.get_context('forkserver')) as executor: 

124 futures = {executor.submit(_count_venues_in_file, fp): fp for fp in filepaths} 

125 for future in as_completed(futures): 

126 venues = future.result() 

127 all_venues.update(venues) 

128 progress.update(task, advance=1) 

129 

130 return len(all_venues) 

131 

132 def run_selected_analyses(self, analyze_br: bool, analyze_ar: bool, analyze_venues: bool) -> Dict: 

133 print("Starting dataset statistics...") 

134 print(f"Connected to endpoint: {self.sparql_endpoint}") 

135 if self.csv_dump_path: 

136 print(f"CSV dump path: {self.csv_dump_path}") 

137 print() 

138 

139 results = {} 

140 

141 if analyze_br: 

142 print("1. Counting fabio:Expression entities...") 

143 try: 

144 expressions_count = self.count_expressions() 

145 results['fabio_expressions'] = expressions_count 

146 print(f" Found {expressions_count:,} fabio:Expression entities") 

147 except Exception as e: 

148 print(f" Error: {e}") 

149 results['fabio_expressions'] = None 

150 print() 

151 

152 if analyze_ar: 

153 print("2. Counting pro:author, pro:publisher and pro:editor roles...") 

154 try: 

155 role_counts = self.count_role_entities() 

156 results['roles'] = role_counts 

157 print(f" Found {role_counts['pro:author']:,} pro:author roles") 

158 print(f" Found {role_counts['pro:publisher']:,} pro:publisher roles") 

159 print(f" Found {role_counts['pro:editor']:,} pro:editor roles") 

160 except Exception as e: 

161 print(f" Error: {e}") 

162 results['roles'] = None 

163 print() 

164 

165 if analyze_venues: 

166 print("3. Counting venues from CSV dump...") 

167 if not self.csv_dump_path: 

168 print(" Error: CSV dump path is required for venue counting") 

169 results['venues'] = None 

170 else: 

171 try: 

172 venues_count = self.count_venues_from_csv() 

173 results['venues'] = venues_count 

174 print(f" Found {venues_count:,} distinct venues") 

175 except Exception as e: 

176 print(f" Error: {e}") 

177 results['venues'] = None 

178 print() 

179 

180 print("Statistics completed!") 

181 return results 

182 

183 def run_all_analyses(self) -> Dict: 

184 return self.run_selected_analyses(analyze_br=True, analyze_ar=True, analyze_venues=True) 

185 

186 

187def main(): 

188 parser = argparse.ArgumentParser( 

189 description='Compute OpenCitations Meta dataset statistics', 

190 formatter_class=RichHelpFormatter, 

191 epilog=""" 

192Examples: 

193 # Run all statistics 

194 python -m oc_meta.run.count.meta_entities http://localhost:8890/sparql --csv /path/to/csv/dump 

195 

196 # Count only bibliographic resources 

197 python -m oc_meta.run.count.meta_entities http://localhost:8890/sparql --br 

198 

199 # Count only roles 

200 python -m oc_meta.run.count.meta_entities http://localhost:8890/sparql --ar 

201 

202 # Count only venues (requires CSV dump) 

203 python -m oc_meta.run.count.meta_entities http://localhost:8890/sparql --venues --csv /path/to/csv/dump 

204 

205Statistics computed: 

206 --br: Count fabio:Expression entities (via SPARQL) 

207 --ar: Count pro:author, pro:publisher and pro:editor roles (via SPARQL) 

208 --venues: Count distinct venues with disambiguation (via CSV dump) 

209 

210If no specific options are provided, all statistics will be computed. 

211 """ 

212 ) 

213 

214 parser.add_argument( 

215 'sparql_endpoint', 

216 help='SPARQL endpoint URL' 

217 ) 

218 

219 parser.add_argument( 

220 '--csv', 

221 dest='csv_dump_path', 

222 help='Path to CSV dump directory (required for venue counting)' 

223 ) 

224 

225 parser.add_argument( 

226 '--br', 

227 action='store_true', 

228 help='Count bibliographic resources (fabio:Expression entities)' 

229 ) 

230 

231 parser.add_argument( 

232 '--ar', 

233 action='store_true', 

234 help='Count roles (pro:author, pro:publisher, pro:editor)' 

235 ) 

236 

237 parser.add_argument( 

238 '--venues', 

239 action='store_true', 

240 help='Count distinct venues (requires --csv)' 

241 ) 

242 

243 args = parser.parse_args() 

244 

245 analyze_br = args.br or not (args.br or args.ar or args.venues) 

246 analyze_ar = args.ar or not (args.br or args.ar or args.venues) 

247 analyze_venues = args.venues or not (args.br or args.ar or args.venues) 

248 

249 if analyze_venues and not args.csv_dump_path: 

250 print("Error: --csv is required for venue counting", file=sys.stderr) 

251 sys.exit(1) 

252 

253 try: 

254 with OCMetaStatistics(args.sparql_endpoint, args.csv_dump_path) as stats: 

255 results = stats.run_selected_analyses(analyze_br, analyze_ar, analyze_venues) 

256 

257 print("\n" + "="*50) 

258 print("SUMMARY") 

259 print("="*50) 

260 

261 if results.get('fabio_expressions') is not None: 

262 print(f"fabio:Expression entities: {results['fabio_expressions']:,}") 

263 

264 if results.get('roles'): 

265 print(f"pro:author roles: {results['roles']['pro:author']:,}") 

266 print(f"pro:publisher roles: {results['roles']['pro:publisher']:,}") 

267 print(f"pro:editor roles: {results['roles']['pro:editor']:,}") 

268 

269 if results.get('venues') is not None: 

270 print(f"Distinct venues: {results['venues']:,}") 

271 

272 return results 

273 

274 except Exception as e: 

275 print(f"Statistics failed: {e}", file=sys.stderr) 

276 sys.exit(1) 

277 

278 

279if __name__ == "__main__": 

280 main()