Coverage for oc_meta / run / count / meta_entities.py: 71%
156 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-20 14:28 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-20 14:28 +0000
1#!/usr/bin/python
3# Copyright 2025 Arcangelo Massari <arcangelo.massari@unibo.it>
4# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it>
5#
6# SPDX-License-Identifier: ISC
8from __future__ import annotations
10import argparse
11import os
12import sys
13import multiprocessing
14from concurrent.futures import ProcessPoolExecutor, as_completed
15from typing import Dict, Set
17from rich_argparse import RichHelpFormatter
18from oc_meta.lib.sparql import execute_sparql
20from oc_meta.lib.console import create_progress
21from oc_meta.lib.file_manager import get_csv_data
22from oc_meta.lib.master_of_regex import split_name_and_ids
25def _count_venues_in_file(filepath: str) -> Set[str]:
26 csv_data = get_csv_data(filepath)
27 venues = set()
28 for row in csv_data:
29 if not row['venue']:
30 continue
31 venue_name, venue_ids_str = split_name_and_ids(row['venue'])
32 if not venue_ids_str:
33 continue
34 venue_ids = set(venue_ids_str.split())
35 venue_metaid = next(
36 identifier for identifier in venue_ids
37 if identifier.split(':', maxsplit=1)[0] == 'omid'
38 )
39 if not venue_ids.difference({venue_metaid}):
40 venues.add(venue_name.lower())
41 else:
42 venues.add(venue_metaid)
43 return venues
46class OCMetaStatistics:
47 def __init__(self, sparql_endpoint: str, csv_dump_path: str | None = None, max_retries: int = 3, retry_delay: int = 5):
48 self.sparql_endpoint = sparql_endpoint
49 self.csv_dump_path = csv_dump_path
50 self.max_retries = max_retries
51 self.retry_delay = retry_delay
53 def _execute_sparql_query(self, query: str) -> Dict:
54 try:
55 return execute_sparql(self.sparql_endpoint, query, max_retries=self.max_retries, backoff_factor=self.retry_delay)
56 except Exception as e:
57 print(f"Query failed after {self.max_retries} retries.", file=sys.stderr)
58 raise Exception("SPARQL query failed after multiple retries.") from e
60 def __enter__(self):
61 return self
63 def __exit__(self, exc_type, exc_val, exc_tb):
64 return False
66 def count_expressions(self) -> int:
67 query = """
68 PREFIX fabio: <http://purl.org/spar/fabio/>
70 SELECT (COUNT(DISTINCT ?expression) AS ?count)
71 WHERE {
72 ?expression a fabio:Expression .
73 }
74 """
75 results = self._execute_sparql_query(query)
76 return int(results["results"]["bindings"][0]["count"]["value"])
78 def count_role_entities(self) -> Dict[str, int]:
79 query = """
80 PREFIX pro: <http://purl.org/spar/pro/>
82 SELECT ?role (COUNT(DISTINCT ?roleInTime) AS ?count)
83 WHERE {
84 ?roleInTime pro:withRole ?role .
85 FILTER(?role IN (pro:author, pro:publisher, pro:editor))
86 }
87 GROUP BY ?role
88 """
89 results = self._execute_sparql_query(query)
91 role_counts = {
92 'pro:author': 0,
93 'pro:publisher': 0,
94 'pro:editor': 0
95 }
97 for binding in results["results"]["bindings"]:
98 role_uri = binding["role"]["value"]
99 count = int(binding["count"]["value"])
101 if role_uri == "http://purl.org/spar/pro/author":
102 role_counts['pro:author'] = count
103 elif role_uri == "http://purl.org/spar/pro/publisher":
104 role_counts['pro:publisher'] = count
105 elif role_uri == "http://purl.org/spar/pro/editor":
106 role_counts['pro:editor'] = count
108 return role_counts
110 def count_venues_from_csv(self) -> int:
111 if not self.csv_dump_path:
112 raise ValueError("CSV dump path is required to count venues")
114 filenames = sorted(os.listdir(self.csv_dump_path))
115 filepaths = [os.path.join(self.csv_dump_path, f) for f in filenames if f.endswith('.csv')]
117 all_venues: Set[str] = set()
119 with create_progress() as progress:
120 task = progress.add_task("Counting venues from CSV files...", total=len(filepaths))
122 # Use forkserver to avoid deadlocks when forking in a multi-threaded environment
123 with ProcessPoolExecutor(mp_context=multiprocessing.get_context('forkserver')) as executor:
124 futures = {executor.submit(_count_venues_in_file, fp): fp for fp in filepaths}
125 for future in as_completed(futures):
126 venues = future.result()
127 all_venues.update(venues)
128 progress.update(task, advance=1)
130 return len(all_venues)
132 def run_selected_analyses(self, analyze_br: bool, analyze_ar: bool, analyze_venues: bool) -> Dict:
133 print("Starting dataset statistics...")
134 print(f"Connected to endpoint: {self.sparql_endpoint}")
135 if self.csv_dump_path:
136 print(f"CSV dump path: {self.csv_dump_path}")
137 print()
139 results = {}
141 if analyze_br:
142 print("1. Counting fabio:Expression entities...")
143 try:
144 expressions_count = self.count_expressions()
145 results['fabio_expressions'] = expressions_count
146 print(f" Found {expressions_count:,} fabio:Expression entities")
147 except Exception as e:
148 print(f" Error: {e}")
149 results['fabio_expressions'] = None
150 print()
152 if analyze_ar:
153 print("2. Counting pro:author, pro:publisher and pro:editor roles...")
154 try:
155 role_counts = self.count_role_entities()
156 results['roles'] = role_counts
157 print(f" Found {role_counts['pro:author']:,} pro:author roles")
158 print(f" Found {role_counts['pro:publisher']:,} pro:publisher roles")
159 print(f" Found {role_counts['pro:editor']:,} pro:editor roles")
160 except Exception as e:
161 print(f" Error: {e}")
162 results['roles'] = None
163 print()
165 if analyze_venues:
166 print("3. Counting venues from CSV dump...")
167 if not self.csv_dump_path:
168 print(" Error: CSV dump path is required for venue counting")
169 results['venues'] = None
170 else:
171 try:
172 venues_count = self.count_venues_from_csv()
173 results['venues'] = venues_count
174 print(f" Found {venues_count:,} distinct venues")
175 except Exception as e:
176 print(f" Error: {e}")
177 results['venues'] = None
178 print()
180 print("Statistics completed!")
181 return results
183 def run_all_analyses(self) -> Dict:
184 return self.run_selected_analyses(analyze_br=True, analyze_ar=True, analyze_venues=True)
187def main():
188 parser = argparse.ArgumentParser(
189 description='Compute OpenCitations Meta dataset statistics',
190 formatter_class=RichHelpFormatter,
191 epilog="""
192Examples:
193 # Run all statistics
194 python -m oc_meta.run.count.meta_entities http://localhost:8890/sparql --csv /path/to/csv/dump
196 # Count only bibliographic resources
197 python -m oc_meta.run.count.meta_entities http://localhost:8890/sparql --br
199 # Count only roles
200 python -m oc_meta.run.count.meta_entities http://localhost:8890/sparql --ar
202 # Count only venues (requires CSV dump)
203 python -m oc_meta.run.count.meta_entities http://localhost:8890/sparql --venues --csv /path/to/csv/dump
205Statistics computed:
206 --br: Count fabio:Expression entities (via SPARQL)
207 --ar: Count pro:author, pro:publisher and pro:editor roles (via SPARQL)
208 --venues: Count distinct venues with disambiguation (via CSV dump)
210If no specific options are provided, all statistics will be computed.
211 """
212 )
214 parser.add_argument(
215 'sparql_endpoint',
216 help='SPARQL endpoint URL'
217 )
219 parser.add_argument(
220 '--csv',
221 dest='csv_dump_path',
222 help='Path to CSV dump directory (required for venue counting)'
223 )
225 parser.add_argument(
226 '--br',
227 action='store_true',
228 help='Count bibliographic resources (fabio:Expression entities)'
229 )
231 parser.add_argument(
232 '--ar',
233 action='store_true',
234 help='Count roles (pro:author, pro:publisher, pro:editor)'
235 )
237 parser.add_argument(
238 '--venues',
239 action='store_true',
240 help='Count distinct venues (requires --csv)'
241 )
243 args = parser.parse_args()
245 analyze_br = args.br or not (args.br or args.ar or args.venues)
246 analyze_ar = args.ar or not (args.br or args.ar or args.venues)
247 analyze_venues = args.venues or not (args.br or args.ar or args.venues)
249 if analyze_venues and not args.csv_dump_path:
250 print("Error: --csv is required for venue counting", file=sys.stderr)
251 sys.exit(1)
253 try:
254 with OCMetaStatistics(args.sparql_endpoint, args.csv_dump_path) as stats:
255 results = stats.run_selected_analyses(analyze_br, analyze_ar, analyze_venues)
257 print("\n" + "="*50)
258 print("SUMMARY")
259 print("="*50)
261 if results.get('fabio_expressions') is not None:
262 print(f"fabio:Expression entities: {results['fabio_expressions']:,}")
264 if results.get('roles'):
265 print(f"pro:author roles: {results['roles']['pro:author']:,}")
266 print(f"pro:publisher roles: {results['roles']['pro:publisher']:,}")
267 print(f"pro:editor roles: {results['roles']['pro:editor']:,}")
269 if results.get('venues') is not None:
270 print(f"Distinct venues: {results['venues']:,}")
272 return results
274 except Exception as e:
275 print(f"Statistics failed: {e}", file=sys.stderr)
276 sys.exit(1)
279if __name__ == "__main__":
280 main()