Coverage for oc_meta / run / count / meta_entities.py: 72%
160 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
1#!/usr/bin/python
3# Copyright 2025 Arcangelo Massari <arcangelo.massari@unibo.it>
4# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it>
5#
6# SPDX-License-Identifier: ISC
8from __future__ import annotations
10import argparse
11import os
12import sys
13import multiprocessing
14from concurrent.futures import ProcessPoolExecutor, as_completed
15from typing import Dict, Set
17from rich_argparse import RichHelpFormatter
18from sparqlite import SPARQLClient
20from oc_meta.lib.console import create_progress
21from oc_meta.lib.file_manager import get_csv_data
22from oc_meta.lib.master_of_regex import split_name_and_ids
25def _count_venues_in_file(filepath: str) -> Set[str]:
26 csv_data = get_csv_data(filepath)
27 venues = set()
28 for row in csv_data:
29 if not row['venue']:
30 continue
31 venue_name, venue_ids_str = split_name_and_ids(row['venue'])
32 if not venue_ids_str:
33 continue
34 venue_ids = set(venue_ids_str.split())
35 venue_metaid = next(
36 identifier for identifier in venue_ids
37 if identifier.split(':', maxsplit=1)[0] == 'omid'
38 )
39 if not venue_ids.difference({venue_metaid}):
40 venues.add(venue_name.lower())
41 else:
42 venues.add(venue_metaid)
43 return venues
46class OCMetaStatistics:
47 def __init__(self, sparql_endpoint: str, csv_dump_path: str | None = None, max_retries: int = 3, retry_delay: int = 5):
48 self.sparql_endpoint = sparql_endpoint
49 self.csv_dump_path = csv_dump_path
50 self.max_retries = max_retries
51 self.retry_delay = retry_delay
52 self.client = SPARQLClient(sparql_endpoint, max_retries=max_retries, backoff_factor=retry_delay, timeout=3600)
54 def _execute_sparql_query(self, query: str) -> Dict:
55 try:
56 return self.client.query(query)
57 except Exception as e:
58 print(f"Query failed after {self.max_retries} retries.", file=sys.stderr)
59 raise Exception("SPARQL query failed after multiple retries.") from e
61 def __enter__(self):
62 return self
64 def __exit__(self, exc_type, exc_val, exc_tb):
65 self.close()
66 return False
68 def close(self):
69 self.client.close()
71 def count_expressions(self) -> int:
72 query = """
73 PREFIX fabio: <http://purl.org/spar/fabio/>
75 SELECT (COUNT(DISTINCT ?expression) AS ?count)
76 WHERE {
77 ?expression a fabio:Expression .
78 }
79 """
80 results = self._execute_sparql_query(query)
81 return int(results["results"]["bindings"][0]["count"]["value"])
83 def count_role_entities(self) -> Dict[str, int]:
84 query = """
85 PREFIX pro: <http://purl.org/spar/pro/>
87 SELECT ?role (COUNT(DISTINCT ?roleInTime) AS ?count)
88 WHERE {
89 ?roleInTime pro:withRole ?role .
90 FILTER(?role IN (pro:author, pro:publisher, pro:editor))
91 }
92 GROUP BY ?role
93 """
94 results = self._execute_sparql_query(query)
96 role_counts = {
97 'pro:author': 0,
98 'pro:publisher': 0,
99 'pro:editor': 0
100 }
102 for binding in results["results"]["bindings"]:
103 role_uri = binding["role"]["value"]
104 count = int(binding["count"]["value"])
106 if role_uri == "http://purl.org/spar/pro/author":
107 role_counts['pro:author'] = count
108 elif role_uri == "http://purl.org/spar/pro/publisher":
109 role_counts['pro:publisher'] = count
110 elif role_uri == "http://purl.org/spar/pro/editor":
111 role_counts['pro:editor'] = count
113 return role_counts
115 def count_venues_from_csv(self) -> int:
116 if not self.csv_dump_path:
117 raise ValueError("CSV dump path is required to count venues")
119 filenames = sorted(os.listdir(self.csv_dump_path))
120 filepaths = [os.path.join(self.csv_dump_path, f) for f in filenames if f.endswith('.csv')]
122 all_venues: Set[str] = set()
124 with create_progress() as progress:
125 task = progress.add_task("Counting venues from CSV files...", total=len(filepaths))
127 # Use forkserver to avoid deadlocks when forking in a multi-threaded environment
128 with ProcessPoolExecutor(mp_context=multiprocessing.get_context('forkserver')) as executor:
129 futures = {executor.submit(_count_venues_in_file, fp): fp for fp in filepaths}
130 for future in as_completed(futures):
131 venues = future.result()
132 all_venues.update(venues)
133 progress.update(task, advance=1)
135 return len(all_venues)
137 def run_selected_analyses(self, analyze_br: bool, analyze_ar: bool, analyze_venues: bool) -> Dict:
138 print("Starting dataset statistics...")
139 print(f"Connected to endpoint: {self.sparql_endpoint}")
140 if self.csv_dump_path:
141 print(f"CSV dump path: {self.csv_dump_path}")
142 print()
144 results = {}
146 if analyze_br:
147 print("1. Counting fabio:Expression entities...")
148 try:
149 expressions_count = self.count_expressions()
150 results['fabio_expressions'] = expressions_count
151 print(f" Found {expressions_count:,} fabio:Expression entities")
152 except Exception as e:
153 print(f" Error: {e}")
154 results['fabio_expressions'] = None
155 print()
157 if analyze_ar:
158 print("2. Counting pro:author, pro:publisher and pro:editor roles...")
159 try:
160 role_counts = self.count_role_entities()
161 results['roles'] = role_counts
162 print(f" Found {role_counts['pro:author']:,} pro:author roles")
163 print(f" Found {role_counts['pro:publisher']:,} pro:publisher roles")
164 print(f" Found {role_counts['pro:editor']:,} pro:editor roles")
165 except Exception as e:
166 print(f" Error: {e}")
167 results['roles'] = None
168 print()
170 if analyze_venues:
171 print("3. Counting venues from CSV dump...")
172 if not self.csv_dump_path:
173 print(" Error: CSV dump path is required for venue counting")
174 results['venues'] = None
175 else:
176 try:
177 venues_count = self.count_venues_from_csv()
178 results['venues'] = venues_count
179 print(f" Found {venues_count:,} distinct venues")
180 except Exception as e:
181 print(f" Error: {e}")
182 results['venues'] = None
183 print()
185 print("Statistics completed!")
186 return results
188 def run_all_analyses(self) -> Dict:
189 return self.run_selected_analyses(analyze_br=True, analyze_ar=True, analyze_venues=True)
192def main():
193 parser = argparse.ArgumentParser(
194 description='Compute OpenCitations Meta dataset statistics',
195 formatter_class=RichHelpFormatter,
196 epilog="""
197Examples:
198 # Run all statistics
199 python -m oc_meta.run.count.meta_entities http://localhost:8890/sparql --csv /path/to/csv/dump
201 # Count only bibliographic resources
202 python -m oc_meta.run.count.meta_entities http://localhost:8890/sparql --br
204 # Count only roles
205 python -m oc_meta.run.count.meta_entities http://localhost:8890/sparql --ar
207 # Count only venues (requires CSV dump)
208 python -m oc_meta.run.count.meta_entities http://localhost:8890/sparql --venues --csv /path/to/csv/dump
210Statistics computed:
211 --br: Count fabio:Expression entities (via SPARQL)
212 --ar: Count pro:author, pro:publisher and pro:editor roles (via SPARQL)
213 --venues: Count distinct venues with disambiguation (via CSV dump)
215If no specific options are provided, all statistics will be computed.
216 """
217 )
219 parser.add_argument(
220 'sparql_endpoint',
221 help='SPARQL endpoint URL'
222 )
224 parser.add_argument(
225 '--csv',
226 dest='csv_dump_path',
227 help='Path to CSV dump directory (required for venue counting)'
228 )
230 parser.add_argument(
231 '--br',
232 action='store_true',
233 help='Count bibliographic resources (fabio:Expression entities)'
234 )
236 parser.add_argument(
237 '--ar',
238 action='store_true',
239 help='Count roles (pro:author, pro:publisher, pro:editor)'
240 )
242 parser.add_argument(
243 '--venues',
244 action='store_true',
245 help='Count distinct venues (requires --csv)'
246 )
248 args = parser.parse_args()
250 analyze_br = args.br or not (args.br or args.ar or args.venues)
251 analyze_ar = args.ar or not (args.br or args.ar or args.venues)
252 analyze_venues = args.venues or not (args.br or args.ar or args.venues)
254 if analyze_venues and not args.csv_dump_path:
255 print("Error: --csv is required for venue counting", file=sys.stderr)
256 sys.exit(1)
258 try:
259 with OCMetaStatistics(args.sparql_endpoint, args.csv_dump_path) as stats:
260 results = stats.run_selected_analyses(analyze_br, analyze_ar, analyze_venues)
262 print("\n" + "="*50)
263 print("SUMMARY")
264 print("="*50)
266 if results.get('fabio_expressions') is not None:
267 print(f"fabio:Expression entities: {results['fabio_expressions']:,}")
269 if results.get('roles'):
270 print(f"pro:author roles: {results['roles']['pro:author']:,}")
271 print(f"pro:publisher roles: {results['roles']['pro:publisher']:,}")
272 print(f"pro:editor roles: {results['roles']['pro:editor']:,}")
274 if results.get('venues') is not None:
275 print(f"Distinct venues: {results['venues']:,}")
277 return results
279 except Exception as e:
280 print(f"Statistics failed: {e}", file=sys.stderr)
281 sys.exit(1)
284if __name__ == "__main__":
285 main()