Coverage for oc_meta / lib / timer.py: 23%
158 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
1#!/usr/bin/env python
3# SPDX-FileCopyrightText: 2025 Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# SPDX-License-Identifier: ISC
7# -*- coding: utf-8 -*-
8"""
9Timing and metrics collection utilities for OpenCitations Meta processing.
11This module provides reusable timing infrastructure for both production
12processing and benchmarking, with optional activation to avoid overhead.
13"""
15import threading
16import time
17from typing import Any, Callable, Dict, List, Optional
19import psutil
22class _MemorySampler:
23 """Background thread that samples RSS to capture true peak memory."""
25 def __init__(self, process: psutil.Process, interval: float = 0.1):
26 self._process = process
27 self._interval = interval
28 self._stop = threading.Event()
29 self._peak: int = 0
30 self._thread = threading.Thread(target=self._run, daemon=True)
32 def start(self):
33 self._thread.start()
35 def stop(self) -> int:
36 self._stop.set()
37 self._thread.join()
38 return self._peak
40 def _run(self):
41 while not self._stop.is_set():
42 rss = self._process.memory_info().rss
43 if rss > self._peak:
44 self._peak = rss
45 self._stop.wait(self._interval)
48class BenchmarkTimer:
49 """Context manager for timing code blocks and collecting memory metrics."""
51 def __init__(self, name: str, verbose: bool = False, on_exit: Optional[Callable[[], None]] = None):
52 self.name = name
53 self.verbose = verbose
54 self.start_time: Optional[float] = None
55 self.end_time: Optional[float] = None
56 self.duration: Optional[float] = None
57 self.start_memory: Optional[int] = None
58 self.end_memory: Optional[int] = None
59 self.peak_memory: Optional[int] = None
60 self._sampler: Optional[_MemorySampler] = None
61 self._on_exit: Optional[Callable[[], None]] = on_exit
63 def __enter__(self):
64 self.start_time = time.time()
65 process = psutil.Process()
66 self.start_memory = process.memory_info().rss
67 self._sampler = _MemorySampler(process)
68 self._sampler.start()
69 if self.verbose:
70 print(f" [{self.name}] Starting...")
71 return self
73 def __exit__(self, exc_type, exc_val, exc_tb):
74 assert self.start_time is not None and self.start_memory is not None and self._sampler is not None
75 self.end_time = time.time()
76 self.duration = self.end_time - self.start_time
77 process = psutil.Process()
78 end_memory = process.memory_info().rss
79 self.end_memory = end_memory
80 sampled_peak = self._sampler.stop()
81 self.peak_memory = max(self.start_memory, end_memory, sampled_peak)
82 if self.verbose:
83 print(f" [{self.name}] Completed in {self.duration:.2f}s")
84 if self._on_exit:
85 self._on_exit()
87 def to_dict(self) -> Dict[str, Any]:
88 """Convert timing data to dictionary."""
89 return {
90 "name": self.name,
91 "duration_seconds": round(self.duration, 3) if self.duration else None,
92 "start_memory_mb": round(self.start_memory / 1024 / 1024, 2) if self.start_memory else None,
93 "end_memory_mb": round(self.end_memory / 1024 / 1024, 2) if self.end_memory else None,
94 "peak_memory_mb": round(self.peak_memory / 1024 / 1024, 2) if self.peak_memory else None,
95 }
98class DummyTimer:
99 """No-op timer for when timing is disabled."""
101 def __enter__(self):
102 return self
104 def __exit__(self, *args):
105 pass
108class ProcessTimer:
109 """Optional timing wrapper for MetaProcess operations."""
111 def __init__(self, enabled: bool = False, verbose: bool = False, on_phase_complete: Optional[Callable[['ProcessTimer'], None]] = None):
112 self.enabled = enabled
113 self.verbose = verbose
114 self.timers: List[BenchmarkTimer] = []
115 self.metrics: Dict[str, Any] = {}
116 self._on_phase_complete: Optional[Callable[['ProcessTimer'], None]] = on_phase_complete
118 def timer(self, name: str):
119 """Create a timer context manager (or no-op if disabled)."""
120 if self.enabled:
121 # Don't show verbose for total_processing and sub-timers
122 show_verbose = self.verbose and name not in ["total_processing", "creator_execution", "provenance_generation"]
123 timer = BenchmarkTimer(name, verbose=show_verbose, on_exit=self._notify_phase)
124 self.timers.append(timer)
125 return timer
126 else:
127 return DummyTimer()
129 def _notify_phase(self):
130 if self._on_phase_complete:
131 self._on_phase_complete(self)
133 def record_metric(self, key: str, value: Any):
134 """Record a metric."""
135 if self.enabled:
136 self.metrics[key] = value
138 def record_phase(self, name: str, duration: float):
139 """Record a phase with a specific duration (e.g., 0 for unused phases)."""
140 if self.enabled:
141 timer = BenchmarkTimer(name, verbose=False)
142 timer.start_time = 0
143 timer.end_time = duration
144 timer.duration = duration
145 timer.start_memory = 0
146 timer.end_memory = 0
147 timer.peak_memory = 0
148 self.timers.append(timer)
150 def get_report(self) -> Dict[str, Any]:
151 """Generate timing report."""
152 if not self.enabled:
153 return {}
155 total_time = next((t.duration for t in self.timers if t.name == "total_processing"), None) or 0.0
156 input_records = self.metrics.get("input_records", 0)
158 return {
159 "metrics": {
160 **self.metrics,
161 "total_duration_seconds": round(total_time, 3),
162 "throughput_records_per_sec": round(input_records / total_time, 2) if total_time > 0 else 0
163 },
164 "phases": [t.to_dict() for t in self.timers],
165 }
167 def print_summary(self):
168 """Print timing summary to console."""
169 if not self.enabled:
170 return
172 report = self.get_report()
173 metrics = report["metrics"]
175 print(f"\n{'='*60}")
176 print("Timing Summary")
177 print(f"{'='*60}")
178 print(f"Total Duration: {metrics.get('total_duration_seconds', 0)}s")
179 print(f"Throughput: {metrics.get('throughput_records_per_sec', 0)} records/sec")
180 print(f"Input Records: {metrics.get('input_records', 0)}")
181 print(f"Curated Records: {metrics.get('curated_records', 0)}")
182 print(f"Entities Created: {metrics.get('entities_created', 0)}")
183 print(f"Modified Entities: {metrics.get('modified_entities', 0)}")
184 print(f"\nPhase Breakdown:")
185 for phase in report["phases"]:
186 if phase["name"] not in ["total_processing", "creator_execution", "provenance_generation"]:
187 print(f" {phase['name']}: {phase['duration_seconds']}s")
188 print(f"{'='*60}\n")
190 def print_phase_breakdown(self):
191 """Print detailed phase breakdown for a single file."""
192 if not self.enabled:
193 return
195 report = self.get_report()
196 phases = report["phases"]
198 print(f"\n Phase Breakdown:")
199 for phase in phases:
200 if phase["name"] == "total_processing":
201 continue
202 name = phase["name"]
203 duration = phase["duration_seconds"]
204 peak = phase["peak_memory_mb"]
205 if peak:
206 delta = phase["end_memory_mb"] - phase["start_memory_mb"]
207 sign = "+" if delta >= 0 else ""
208 print(f" {name:30s} {duration:10.2f}s {peak:10.1f} MB peak {sign}{delta:.1f} MB")
209 else:
210 print(f" {name:30s} {duration:10.2f}s")
212 def print_file_summary(self, filename: str):
213 """Print complete summary for a single file with metrics and phases."""
214 if not self.enabled:
215 return
217 report = self.get_report()
218 metrics = report["metrics"]
220 total_time = metrics.get("total_duration_seconds", 0)
221 records = metrics.get("input_records", 0)
222 entities = metrics.get("entities_created", 0)
223 throughput = metrics.get("throughput_records_per_sec", 0)
225 total_phase = next(
226 (p for p in report["phases"] if p["name"] == "total_processing"), None
227 )
228 peak_mb = total_phase["peak_memory_mb"] if total_phase else 0
229 delta_mb = (total_phase["end_memory_mb"] - total_phase["start_memory_mb"]) if total_phase else 0
231 print(f" ✓ Completed in {total_time:.2f}s")
232 self.print_phase_breakdown()
233 print(f"\n Metrics:")
234 print(f" Records processed: {records}")
235 print(f" Entities created: {entities}")
236 print(f" Throughput: {throughput:.2f} rec/s")
237 if peak_mb:
238 sign = "+" if delta_mb >= 0 else ""
239 print(f" Peak memory (RSS): {peak_mb:.1f} MB")
240 print(f" Memory growth: {sign}{delta_mb:.1f} MB")