Coverage for oc_meta / run / count / meta_entities.py: 72%

159 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-03 17:25 +0000

1#!/usr/bin/python 

2# Copyright 2025, Arcangelo Massari <arcangelo.massari@unibo.it> 

3# 

4# Permission to use, copy, modify, and/or distribute this software for any purpose 

5# with or without fee is hereby granted, provided that the above copyright notice 

6# and this permission notice appear in all copies. 

7# 

8# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

9# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

10# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, 

11# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, 

12# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 

13# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS 

14# SOFTWARE. 

15 

16from __future__ import annotations 

17 

18import argparse 

19import os 

20import re 

21import sys 

22from concurrent.futures import ProcessPoolExecutor, as_completed 

23from typing import Dict, Set 

24 

25from rich.progress import (BarColumn, Progress, TaskProgressColumn, TextColumn, 

26 TimeRemainingColumn) 

27from rich_argparse import RichHelpFormatter 

28from sparqlite import SPARQLClient 

29 

30from oc_meta.lib.file_manager import get_csv_data 

31from oc_meta.lib.master_of_regex import name_and_ids 

32 

33 

34def _count_venues_in_file(filepath: str) -> Set[str]: 

35 csv_data = get_csv_data(filepath) 

36 venues = set() 

37 for row in csv_data: 

38 if row['venue']: 

39 ven_name_and_ids = re.search(name_and_ids, row['venue']) 

40 if ven_name_and_ids: 

41 venue_name = ven_name_and_ids.group(1).lower() 

42 venue_ids = set(ven_name_and_ids.group(2).split()) 

43 venue_metaid = [identifier for identifier in venue_ids if identifier.split(':', maxsplit=1)[0] == 'omid'][0] 

44 if not venue_ids.difference({venue_metaid}): 

45 venues.add(venue_name) 

46 else: 

47 venues.add(venue_metaid) 

48 return venues 

49 

50 

51class OCMetaStatistics: 

52 def __init__(self, sparql_endpoint: str, csv_dump_path: str | None = None, max_retries: int = 3, retry_delay: int = 5): 

53 self.sparql_endpoint = sparql_endpoint 

54 self.csv_dump_path = csv_dump_path 

55 self.max_retries = max_retries 

56 self.retry_delay = retry_delay 

57 self.client = SPARQLClient(sparql_endpoint, max_retries=max_retries, backoff_factor=retry_delay, timeout=3600) 

58 

59 def _execute_sparql_query(self, query: str) -> Dict: 

60 try: 

61 return self.client.query(query) 

62 except Exception as e: 

63 print(f"Query failed after {self.max_retries} retries.", file=sys.stderr) 

64 raise Exception("SPARQL query failed after multiple retries.") from e 

65 

66 def __enter__(self): 

67 return self 

68 

69 def __exit__(self, exc_type, exc_val, exc_tb): 

70 self.close() 

71 return False 

72 

73 def close(self): 

74 self.client.close() 

75 

76 def count_expressions(self) -> int: 

77 query = """ 

78 PREFIX fabio: <http://purl.org/spar/fabio/> 

79 

80 SELECT (COUNT(DISTINCT ?expression) AS ?count) 

81 WHERE { 

82 ?expression a fabio:Expression . 

83 } 

84 """ 

85 results = self._execute_sparql_query(query) 

86 return int(results["results"]["bindings"][0]["count"]["value"]) 

87 

88 def count_role_entities(self) -> Dict[str, int]: 

89 query = """ 

90 PREFIX pro: <http://purl.org/spar/pro/> 

91 

92 SELECT ?role (COUNT(DISTINCT ?roleInTime) AS ?count) 

93 WHERE { 

94 ?roleInTime pro:withRole ?role . 

95 FILTER(?role IN (pro:author, pro:publisher, pro:editor)) 

96 } 

97 GROUP BY ?role 

98 """ 

99 results = self._execute_sparql_query(query) 

100 

101 role_counts = { 

102 'pro:author': 0, 

103 'pro:publisher': 0, 

104 'pro:editor': 0 

105 } 

106 

107 for binding in results["results"]["bindings"]: 

108 role_uri = binding["role"]["value"] 

109 count = int(binding["count"]["value"]) 

110 

111 if role_uri == "http://purl.org/spar/pro/author": 

112 role_counts['pro:author'] = count 

113 elif role_uri == "http://purl.org/spar/pro/publisher": 

114 role_counts['pro:publisher'] = count 

115 elif role_uri == "http://purl.org/spar/pro/editor": 

116 role_counts['pro:editor'] = count 

117 

118 return role_counts 

119 

120 def count_venues_from_csv(self) -> int: 

121 if not self.csv_dump_path: 

122 raise ValueError("CSV dump path is required to count venues") 

123 

124 filenames = sorted(os.listdir(self.csv_dump_path)) 

125 filepaths = [os.path.join(self.csv_dump_path, f) for f in filenames if f.endswith('.csv')] 

126 

127 all_venues: Set[str] = set() 

128 

129 with Progress( 

130 TextColumn("[progress.description]{task.description}"), 

131 BarColumn(), 

132 TaskProgressColumn(), 

133 TimeRemainingColumn(), 

134 ) as progress: 

135 task = progress.add_task("Counting venues from CSV files...", total=len(filepaths)) 

136 

137 with ProcessPoolExecutor() as executor: 

138 futures = {executor.submit(_count_venues_in_file, fp): fp for fp in filepaths} 

139 for future in as_completed(futures): 

140 venues = future.result() 

141 all_venues.update(venues) 

142 progress.update(task, advance=1) 

143 

144 return len(all_venues) 

145 

146 def run_selected_analyses(self, analyze_br: bool, analyze_ar: bool, analyze_venues: bool) -> Dict: 

147 print("Starting dataset statistics...") 

148 print(f"Connected to endpoint: {self.sparql_endpoint}") 

149 if self.csv_dump_path: 

150 print(f"CSV dump path: {self.csv_dump_path}") 

151 print() 

152 

153 results = {} 

154 

155 if analyze_br: 

156 print("1. Counting fabio:Expression entities...") 

157 try: 

158 expressions_count = self.count_expressions() 

159 results['fabio_expressions'] = expressions_count 

160 print(f" Found {expressions_count:,} fabio:Expression entities") 

161 except Exception as e: 

162 print(f" Error: {e}") 

163 results['fabio_expressions'] = None 

164 print() 

165 

166 if analyze_ar: 

167 print("2. Counting pro:author, pro:publisher and pro:editor roles...") 

168 try: 

169 role_counts = self.count_role_entities() 

170 results['roles'] = role_counts 

171 print(f" Found {role_counts['pro:author']:,} pro:author roles") 

172 print(f" Found {role_counts['pro:publisher']:,} pro:publisher roles") 

173 print(f" Found {role_counts['pro:editor']:,} pro:editor roles") 

174 except Exception as e: 

175 print(f" Error: {e}") 

176 results['roles'] = None 

177 print() 

178 

179 if analyze_venues: 

180 print("3. Counting venues from CSV dump...") 

181 if not self.csv_dump_path: 

182 print(" Error: CSV dump path is required for venue counting") 

183 results['venues'] = None 

184 else: 

185 try: 

186 venues_count = self.count_venues_from_csv() 

187 results['venues'] = venues_count 

188 print(f" Found {venues_count:,} distinct venues") 

189 except Exception as e: 

190 print(f" Error: {e}") 

191 results['venues'] = None 

192 print() 

193 

194 print("Statistics completed!") 

195 return results 

196 

197 def run_all_analyses(self) -> Dict: 

198 return self.run_selected_analyses(analyze_br=True, analyze_ar=True, analyze_venues=True) 

199 

200 

201def main(): 

202 parser = argparse.ArgumentParser( 

203 description='Compute OpenCitations Meta dataset statistics', 

204 formatter_class=RichHelpFormatter, 

205 epilog=""" 

206Examples: 

207 # Run all statistics 

208 python -m oc_meta.run.count.meta_entities http://localhost:8890/sparql --csv /path/to/csv/dump 

209 

210 # Count only bibliographic resources 

211 python -m oc_meta.run.count.meta_entities http://localhost:8890/sparql --br 

212 

213 # Count only roles 

214 python -m oc_meta.run.count.meta_entities http://localhost:8890/sparql --ar 

215 

216 # Count only venues (requires CSV dump) 

217 python -m oc_meta.run.count.meta_entities http://localhost:8890/sparql --venues --csv /path/to/csv/dump 

218 

219Statistics computed: 

220 --br: Count fabio:Expression entities (via SPARQL) 

221 --ar: Count pro:author, pro:publisher and pro:editor roles (via SPARQL) 

222 --venues: Count distinct venues with disambiguation (via CSV dump) 

223 

224If no specific options are provided, all statistics will be computed. 

225 """ 

226 ) 

227 

228 parser.add_argument( 

229 'sparql_endpoint', 

230 help='SPARQL endpoint URL' 

231 ) 

232 

233 parser.add_argument( 

234 '--csv', 

235 dest='csv_dump_path', 

236 help='Path to CSV dump directory (required for venue counting)' 

237 ) 

238 

239 parser.add_argument( 

240 '--br', 

241 action='store_true', 

242 help='Count bibliographic resources (fabio:Expression entities)' 

243 ) 

244 

245 parser.add_argument( 

246 '--ar', 

247 action='store_true', 

248 help='Count roles (pro:author, pro:publisher, pro:editor)' 

249 ) 

250 

251 parser.add_argument( 

252 '--venues', 

253 action='store_true', 

254 help='Count distinct venues (requires --csv)' 

255 ) 

256 

257 args = parser.parse_args() 

258 

259 analyze_br = args.br or not (args.br or args.ar or args.venues) 

260 analyze_ar = args.ar or not (args.br or args.ar or args.venues) 

261 analyze_venues = args.venues or not (args.br or args.ar or args.venues) 

262 

263 if analyze_venues and not args.csv_dump_path: 

264 print("Error: --csv is required for venue counting", file=sys.stderr) 

265 sys.exit(1) 

266 

267 try: 

268 with OCMetaStatistics(args.sparql_endpoint, args.csv_dump_path) as stats: 

269 results = stats.run_selected_analyses(analyze_br, analyze_ar, analyze_venues) 

270 

271 print("\n" + "="*50) 

272 print("SUMMARY") 

273 print("="*50) 

274 

275 if results.get('fabio_expressions') is not None: 

276 print(f"fabio:Expression entities: {results['fabio_expressions']:,}") 

277 

278 if results.get('roles'): 

279 print(f"pro:author roles: {results['roles']['pro:author']:,}") 

280 print(f"pro:publisher roles: {results['roles']['pro:publisher']:,}") 

281 print(f"pro:editor roles: {results['roles']['pro:editor']:,}") 

282 

283 if results.get('venues') is not None: 

284 print(f"Distinct venues: {results['venues']:,}") 

285 

286 return results 

287 

288 except Exception as e: 

289 print(f"Statistics failed: {e}", file=sys.stderr) 

290 sys.exit(1) 

291 

292 

293if __name__ == "__main__": 

294 main()