Coverage for oc_meta / run / count / meta_entities.py: 72%
159 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 17:25 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 17:25 +0000
1#!/usr/bin/python
2# Copyright 2025, Arcangelo Massari <arcangelo.massari@unibo.it>
3#
4# Permission to use, copy, modify, and/or distribute this software for any purpose
5# with or without fee is hereby granted, provided that the above copyright notice
6# and this permission notice appear in all copies.
7#
8# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
9# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
10# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
11# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
12# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
13# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
14# SOFTWARE.
16from __future__ import annotations
18import argparse
19import os
20import re
21import sys
22from concurrent.futures import ProcessPoolExecutor, as_completed
23from typing import Dict, Set
25from rich.progress import (BarColumn, Progress, TaskProgressColumn, TextColumn,
26 TimeRemainingColumn)
27from rich_argparse import RichHelpFormatter
28from sparqlite import SPARQLClient
30from oc_meta.lib.file_manager import get_csv_data
31from oc_meta.lib.master_of_regex import name_and_ids
34def _count_venues_in_file(filepath: str) -> Set[str]:
35 csv_data = get_csv_data(filepath)
36 venues = set()
37 for row in csv_data:
38 if row['venue']:
39 ven_name_and_ids = re.search(name_and_ids, row['venue'])
40 if ven_name_and_ids:
41 venue_name = ven_name_and_ids.group(1).lower()
42 venue_ids = set(ven_name_and_ids.group(2).split())
43 venue_metaid = [identifier for identifier in venue_ids if identifier.split(':', maxsplit=1)[0] == 'omid'][0]
44 if not venue_ids.difference({venue_metaid}):
45 venues.add(venue_name)
46 else:
47 venues.add(venue_metaid)
48 return venues
51class OCMetaStatistics:
52 def __init__(self, sparql_endpoint: str, csv_dump_path: str | None = None, max_retries: int = 3, retry_delay: int = 5):
53 self.sparql_endpoint = sparql_endpoint
54 self.csv_dump_path = csv_dump_path
55 self.max_retries = max_retries
56 self.retry_delay = retry_delay
57 self.client = SPARQLClient(sparql_endpoint, max_retries=max_retries, backoff_factor=retry_delay, timeout=3600)
59 def _execute_sparql_query(self, query: str) -> Dict:
60 try:
61 return self.client.query(query)
62 except Exception as e:
63 print(f"Query failed after {self.max_retries} retries.", file=sys.stderr)
64 raise Exception("SPARQL query failed after multiple retries.") from e
66 def __enter__(self):
67 return self
69 def __exit__(self, exc_type, exc_val, exc_tb):
70 self.close()
71 return False
73 def close(self):
74 self.client.close()
76 def count_expressions(self) -> int:
77 query = """
78 PREFIX fabio: <http://purl.org/spar/fabio/>
80 SELECT (COUNT(DISTINCT ?expression) AS ?count)
81 WHERE {
82 ?expression a fabio:Expression .
83 }
84 """
85 results = self._execute_sparql_query(query)
86 return int(results["results"]["bindings"][0]["count"]["value"])
88 def count_role_entities(self) -> Dict[str, int]:
89 query = """
90 PREFIX pro: <http://purl.org/spar/pro/>
92 SELECT ?role (COUNT(DISTINCT ?roleInTime) AS ?count)
93 WHERE {
94 ?roleInTime pro:withRole ?role .
95 FILTER(?role IN (pro:author, pro:publisher, pro:editor))
96 }
97 GROUP BY ?role
98 """
99 results = self._execute_sparql_query(query)
101 role_counts = {
102 'pro:author': 0,
103 'pro:publisher': 0,
104 'pro:editor': 0
105 }
107 for binding in results["results"]["bindings"]:
108 role_uri = binding["role"]["value"]
109 count = int(binding["count"]["value"])
111 if role_uri == "http://purl.org/spar/pro/author":
112 role_counts['pro:author'] = count
113 elif role_uri == "http://purl.org/spar/pro/publisher":
114 role_counts['pro:publisher'] = count
115 elif role_uri == "http://purl.org/spar/pro/editor":
116 role_counts['pro:editor'] = count
118 return role_counts
120 def count_venues_from_csv(self) -> int:
121 if not self.csv_dump_path:
122 raise ValueError("CSV dump path is required to count venues")
124 filenames = sorted(os.listdir(self.csv_dump_path))
125 filepaths = [os.path.join(self.csv_dump_path, f) for f in filenames if f.endswith('.csv')]
127 all_venues: Set[str] = set()
129 with Progress(
130 TextColumn("[progress.description]{task.description}"),
131 BarColumn(),
132 TaskProgressColumn(),
133 TimeRemainingColumn(),
134 ) as progress:
135 task = progress.add_task("Counting venues from CSV files...", total=len(filepaths))
137 with ProcessPoolExecutor() as executor:
138 futures = {executor.submit(_count_venues_in_file, fp): fp for fp in filepaths}
139 for future in as_completed(futures):
140 venues = future.result()
141 all_venues.update(venues)
142 progress.update(task, advance=1)
144 return len(all_venues)
146 def run_selected_analyses(self, analyze_br: bool, analyze_ar: bool, analyze_venues: bool) -> Dict:
147 print("Starting dataset statistics...")
148 print(f"Connected to endpoint: {self.sparql_endpoint}")
149 if self.csv_dump_path:
150 print(f"CSV dump path: {self.csv_dump_path}")
151 print()
153 results = {}
155 if analyze_br:
156 print("1. Counting fabio:Expression entities...")
157 try:
158 expressions_count = self.count_expressions()
159 results['fabio_expressions'] = expressions_count
160 print(f" Found {expressions_count:,} fabio:Expression entities")
161 except Exception as e:
162 print(f" Error: {e}")
163 results['fabio_expressions'] = None
164 print()
166 if analyze_ar:
167 print("2. Counting pro:author, pro:publisher and pro:editor roles...")
168 try:
169 role_counts = self.count_role_entities()
170 results['roles'] = role_counts
171 print(f" Found {role_counts['pro:author']:,} pro:author roles")
172 print(f" Found {role_counts['pro:publisher']:,} pro:publisher roles")
173 print(f" Found {role_counts['pro:editor']:,} pro:editor roles")
174 except Exception as e:
175 print(f" Error: {e}")
176 results['roles'] = None
177 print()
179 if analyze_venues:
180 print("3. Counting venues from CSV dump...")
181 if not self.csv_dump_path:
182 print(" Error: CSV dump path is required for venue counting")
183 results['venues'] = None
184 else:
185 try:
186 venues_count = self.count_venues_from_csv()
187 results['venues'] = venues_count
188 print(f" Found {venues_count:,} distinct venues")
189 except Exception as e:
190 print(f" Error: {e}")
191 results['venues'] = None
192 print()
194 print("Statistics completed!")
195 return results
197 def run_all_analyses(self) -> Dict:
198 return self.run_selected_analyses(analyze_br=True, analyze_ar=True, analyze_venues=True)
201def main():
202 parser = argparse.ArgumentParser(
203 description='Compute OpenCitations Meta dataset statistics',
204 formatter_class=RichHelpFormatter,
205 epilog="""
206Examples:
207 # Run all statistics
208 python -m oc_meta.run.count.meta_entities http://localhost:8890/sparql --csv /path/to/csv/dump
210 # Count only bibliographic resources
211 python -m oc_meta.run.count.meta_entities http://localhost:8890/sparql --br
213 # Count only roles
214 python -m oc_meta.run.count.meta_entities http://localhost:8890/sparql --ar
216 # Count only venues (requires CSV dump)
217 python -m oc_meta.run.count.meta_entities http://localhost:8890/sparql --venues --csv /path/to/csv/dump
219Statistics computed:
220 --br: Count fabio:Expression entities (via SPARQL)
221 --ar: Count pro:author, pro:publisher and pro:editor roles (via SPARQL)
222 --venues: Count distinct venues with disambiguation (via CSV dump)
224If no specific options are provided, all statistics will be computed.
225 """
226 )
228 parser.add_argument(
229 'sparql_endpoint',
230 help='SPARQL endpoint URL'
231 )
233 parser.add_argument(
234 '--csv',
235 dest='csv_dump_path',
236 help='Path to CSV dump directory (required for venue counting)'
237 )
239 parser.add_argument(
240 '--br',
241 action='store_true',
242 help='Count bibliographic resources (fabio:Expression entities)'
243 )
245 parser.add_argument(
246 '--ar',
247 action='store_true',
248 help='Count roles (pro:author, pro:publisher, pro:editor)'
249 )
251 parser.add_argument(
252 '--venues',
253 action='store_true',
254 help='Count distinct venues (requires --csv)'
255 )
257 args = parser.parse_args()
259 analyze_br = args.br or not (args.br or args.ar or args.venues)
260 analyze_ar = args.ar or not (args.br or args.ar or args.venues)
261 analyze_venues = args.venues or not (args.br or args.ar or args.venues)
263 if analyze_venues and not args.csv_dump_path:
264 print("Error: --csv is required for venue counting", file=sys.stderr)
265 sys.exit(1)
267 try:
268 with OCMetaStatistics(args.sparql_endpoint, args.csv_dump_path) as stats:
269 results = stats.run_selected_analyses(analyze_br, analyze_ar, analyze_venues)
271 print("\n" + "="*50)
272 print("SUMMARY")
273 print("="*50)
275 if results.get('fabio_expressions') is not None:
276 print(f"fabio:Expression entities: {results['fabio_expressions']:,}")
278 if results.get('roles'):
279 print(f"pro:author roles: {results['roles']['pro:author']:,}")
280 print(f"pro:publisher roles: {results['roles']['pro:publisher']:,}")
281 print(f"pro:editor roles: {results['roles']['pro:editor']:,}")
283 if results.get('venues') is not None:
284 print(f"Distinct venues: {results['venues']:,}")
286 return results
288 except Exception as e:
289 print(f"Statistics failed: {e}", file=sys.stderr)
290 sys.exit(1)
293if __name__ == "__main__":
294 main()