Coverage for oc_meta/lib/timer.py: 26%
114 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-12-20 08:55 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-12-20 08:55 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3"""
4Timing and metrics collection utilities for OpenCitations Meta processing.
6This module provides reusable timing infrastructure for both production
7processing and benchmarking, with optional activation to avoid overhead.
8"""
10import time
11from typing import Any, Dict, List, Optional
13import psutil
16class BenchmarkTimer:
17 """Context manager for timing code blocks and collecting memory metrics."""
19 def __init__(self, name: str, verbose: bool = False):
20 self.name = name
21 self.verbose = verbose
22 self.start_time: Optional[float] = None
23 self.end_time: Optional[float] = None
24 self.duration: Optional[float] = None
25 self.start_memory: Optional[int] = None
26 self.end_memory: Optional[int] = None
27 self.peak_memory: Optional[int] = None
29 def __enter__(self):
30 self.start_time = time.time()
31 process = psutil.Process()
32 self.start_memory = process.memory_info().rss
33 if self.verbose:
34 print(f" [{self.name}] Starting...")
35 return self
37 def __exit__(self, exc_type, exc_val, exc_tb):
38 self.end_time = time.time()
39 self.duration = self.end_time - self.start_time
40 process = psutil.Process()
41 self.end_memory = process.memory_info().rss
42 self.peak_memory = max(self.start_memory, self.end_memory)
43 if self.verbose:
44 print(f" [{self.name}] Completed in {self.duration:.2f}s")
46 def to_dict(self) -> Dict[str, Any]:
47 """Convert timing data to dictionary."""
48 return {
49 "name": self.name,
50 "duration_seconds": round(self.duration, 3) if self.duration else None,
51 "start_memory_mb": round(self.start_memory / 1024 / 1024, 2) if self.start_memory else None,
52 "end_memory_mb": round(self.end_memory / 1024 / 1024, 2) if self.end_memory else None,
53 "peak_memory_mb": round(self.peak_memory / 1024 / 1024, 2) if self.peak_memory else None,
54 }
57class DummyTimer:
58 """No-op timer for when timing is disabled."""
60 def __enter__(self):
61 return self
63 def __exit__(self, *args):
64 pass
67class ProcessTimer:
68 """Optional timing wrapper for MetaProcess operations."""
70 def __init__(self, enabled: bool = False, verbose: bool = False):
71 self.enabled = enabled
72 self.verbose = verbose
73 self.timers: List[BenchmarkTimer] = []
74 self.metrics: Dict[str, Any] = {}
76 def timer(self, name: str):
77 """Create a timer context manager (or no-op if disabled)."""
78 if self.enabled:
79 # Don't show verbose for total_processing and sub-timers
80 show_verbose = self.verbose and name not in ["total_processing", "creator_execution", "provenance_generation"]
81 timer = BenchmarkTimer(name, verbose=show_verbose)
82 self.timers.append(timer)
83 return timer
84 else:
85 return DummyTimer()
87 def record_metric(self, key: str, value: Any):
88 """Record a metric."""
89 if self.enabled:
90 self.metrics[key] = value
92 def record_phase(self, name: str, duration: float):
93 """Record a phase with a specific duration (e.g., 0 for unused phases)."""
94 if self.enabled:
95 timer = BenchmarkTimer(name, verbose=False)
96 timer.start_time = 0
97 timer.end_time = duration
98 timer.duration = duration
99 timer.start_memory = 0
100 timer.end_memory = 0
101 timer.peak_memory = 0
102 self.timers.append(timer)
104 def get_report(self) -> Dict[str, Any]:
105 """Generate timing report."""
106 if not self.enabled:
107 return {}
109 total_time = next((t.duration for t in self.timers if t.name == "total_processing"), 0)
110 input_records = self.metrics.get("input_records", 0)
112 return {
113 "metrics": {
114 **self.metrics,
115 "total_duration_seconds": round(total_time, 3),
116 "throughput_records_per_sec": round(input_records / total_time, 2) if total_time > 0 else 0
117 },
118 "phases": [t.to_dict() for t in self.timers],
119 }
121 def print_summary(self):
122 """Print timing summary to console."""
123 if not self.enabled:
124 return
126 report = self.get_report()
127 metrics = report["metrics"]
129 print(f"\n{'='*60}")
130 print("Timing Summary")
131 print(f"{'='*60}")
132 print(f"Total Duration: {metrics.get('total_duration_seconds', 0)}s")
133 print(f"Throughput: {metrics.get('throughput_records_per_sec', 0)} records/sec")
134 print(f"Input Records: {metrics.get('input_records', 0)}")
135 print(f"Curated Records: {metrics.get('curated_records', 0)}")
136 print(f"Entities Created: {metrics.get('entities_created', 0)}")
137 print(f"Modified Entities: {metrics.get('modified_entities', 0)}")
138 print(f"\nPhase Breakdown:")
139 for phase in report["phases"]:
140 if phase["name"] not in ["total_processing", "creator_execution", "provenance_generation"]:
141 print(f" {phase['name']}: {phase['duration_seconds']}s")
142 print(f"{'='*60}\n")
144 def print_phase_breakdown(self):
145 """Print detailed phase breakdown for a single file."""
146 if not self.enabled:
147 return
149 report = self.get_report()
150 phases = report["phases"]
152 print(f"\n Phase Breakdown:")
153 for phase in phases:
154 if phase["name"] == "total_processing":
155 continue
156 name = phase["name"]
157 duration = phase["duration_seconds"]
158 print(f" {name:25s} {duration:8.2f}s")
160 def print_file_summary(self, filename: str):
161 """Print complete summary for a single file with metrics and phases."""
162 if not self.enabled:
163 return
165 report = self.get_report()
166 metrics = report["metrics"]
168 total_time = metrics.get("total_duration_seconds", 0)
169 records = metrics.get("input_records", 0)
170 entities = metrics.get("entities_created", 0)
171 throughput = metrics.get("throughput_records_per_sec", 0)
173 print(f" ✓ Completed in {total_time:.2f}s")
174 self.print_phase_breakdown()
175 print(f"\n Metrics:")
176 print(f" Records processed: {records}")
177 print(f" Entities created: {entities}")
178 print(f" Throughput: {throughput:.2f} rec/s")