Coverage for oc_meta / lib / timer.py: 23%

158 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-21 09:24 +0000

1#!/usr/bin/env python 

2 

3# SPDX-FileCopyrightText: 2025 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# SPDX-License-Identifier: ISC 

6 

7# -*- coding: utf-8 -*- 

8""" 

9Timing and metrics collection utilities for OpenCitations Meta processing. 

10 

11This module provides reusable timing infrastructure for both production 

12processing and benchmarking, with optional activation to avoid overhead. 

13""" 

14 

15import threading 

16import time 

17from typing import Any, Callable, Dict, List, Optional 

18 

19import psutil 

20 

21 

22class _MemorySampler: 

23 """Background thread that samples RSS to capture true peak memory.""" 

24 

25 def __init__(self, process: psutil.Process, interval: float = 0.1): 

26 self._process = process 

27 self._interval = interval 

28 self._stop = threading.Event() 

29 self._peak: int = 0 

30 self._thread = threading.Thread(target=self._run, daemon=True) 

31 

32 def start(self): 

33 self._thread.start() 

34 

35 def stop(self) -> int: 

36 self._stop.set() 

37 self._thread.join() 

38 return self._peak 

39 

40 def _run(self): 

41 while not self._stop.is_set(): 

42 rss = self._process.memory_info().rss 

43 if rss > self._peak: 

44 self._peak = rss 

45 self._stop.wait(self._interval) 

46 

47 

48class BenchmarkTimer: 

49 """Context manager for timing code blocks and collecting memory metrics.""" 

50 

51 def __init__(self, name: str, verbose: bool = False, on_exit: Optional[Callable[[], None]] = None): 

52 self.name = name 

53 self.verbose = verbose 

54 self.start_time: Optional[float] = None 

55 self.end_time: Optional[float] = None 

56 self.duration: Optional[float] = None 

57 self.start_memory: Optional[int] = None 

58 self.end_memory: Optional[int] = None 

59 self.peak_memory: Optional[int] = None 

60 self._sampler: Optional[_MemorySampler] = None 

61 self._on_exit: Optional[Callable[[], None]] = on_exit 

62 

63 def __enter__(self): 

64 self.start_time = time.time() 

65 process = psutil.Process() 

66 self.start_memory = process.memory_info().rss 

67 self._sampler = _MemorySampler(process) 

68 self._sampler.start() 

69 if self.verbose: 

70 print(f" [{self.name}] Starting...") 

71 return self 

72 

73 def __exit__(self, exc_type, exc_val, exc_tb): 

74 assert self.start_time is not None and self.start_memory is not None and self._sampler is not None 

75 self.end_time = time.time() 

76 self.duration = self.end_time - self.start_time 

77 process = psutil.Process() 

78 end_memory = process.memory_info().rss 

79 self.end_memory = end_memory 

80 sampled_peak = self._sampler.stop() 

81 self.peak_memory = max(self.start_memory, end_memory, sampled_peak) 

82 if self.verbose: 

83 print(f" [{self.name}] Completed in {self.duration:.2f}s") 

84 if self._on_exit: 

85 self._on_exit() 

86 

87 def to_dict(self) -> Dict[str, Any]: 

88 """Convert timing data to dictionary.""" 

89 return { 

90 "name": self.name, 

91 "duration_seconds": round(self.duration, 3) if self.duration else None, 

92 "start_memory_mb": round(self.start_memory / 1024 / 1024, 2) if self.start_memory else None, 

93 "end_memory_mb": round(self.end_memory / 1024 / 1024, 2) if self.end_memory else None, 

94 "peak_memory_mb": round(self.peak_memory / 1024 / 1024, 2) if self.peak_memory else None, 

95 } 

96 

97 

98class DummyTimer: 

99 """No-op timer for when timing is disabled.""" 

100 

101 def __enter__(self): 

102 return self 

103 

104 def __exit__(self, *args): 

105 pass 

106 

107 

108class ProcessTimer: 

109 """Optional timing wrapper for MetaProcess operations.""" 

110 

111 def __init__(self, enabled: bool = False, verbose: bool = False, on_phase_complete: Optional[Callable[['ProcessTimer'], None]] = None): 

112 self.enabled = enabled 

113 self.verbose = verbose 

114 self.timers: List[BenchmarkTimer] = [] 

115 self.metrics: Dict[str, Any] = {} 

116 self._on_phase_complete: Optional[Callable[['ProcessTimer'], None]] = on_phase_complete 

117 

118 def timer(self, name: str): 

119 """Create a timer context manager (or no-op if disabled).""" 

120 if self.enabled: 

121 # Don't show verbose for total_processing and sub-timers 

122 show_verbose = self.verbose and name not in ["total_processing", "creator_execution", "provenance_generation"] 

123 timer = BenchmarkTimer(name, verbose=show_verbose, on_exit=self._notify_phase) 

124 self.timers.append(timer) 

125 return timer 

126 else: 

127 return DummyTimer() 

128 

129 def _notify_phase(self): 

130 if self._on_phase_complete: 

131 self._on_phase_complete(self) 

132 

133 def record_metric(self, key: str, value: Any): 

134 """Record a metric.""" 

135 if self.enabled: 

136 self.metrics[key] = value 

137 

138 def record_phase(self, name: str, duration: float): 

139 """Record a phase with a specific duration (e.g., 0 for unused phases).""" 

140 if self.enabled: 

141 timer = BenchmarkTimer(name, verbose=False) 

142 timer.start_time = 0 

143 timer.end_time = duration 

144 timer.duration = duration 

145 timer.start_memory = 0 

146 timer.end_memory = 0 

147 timer.peak_memory = 0 

148 self.timers.append(timer) 

149 

150 def get_report(self) -> Dict[str, Any]: 

151 """Generate timing report.""" 

152 if not self.enabled: 

153 return {} 

154 

155 total_time = next((t.duration for t in self.timers if t.name == "total_processing"), None) or 0.0 

156 input_records = self.metrics.get("input_records", 0) 

157 

158 return { 

159 "metrics": { 

160 **self.metrics, 

161 "total_duration_seconds": round(total_time, 3), 

162 "throughput_records_per_sec": round(input_records / total_time, 2) if total_time > 0 else 0 

163 }, 

164 "phases": [t.to_dict() for t in self.timers], 

165 } 

166 

167 def print_summary(self): 

168 """Print timing summary to console.""" 

169 if not self.enabled: 

170 return 

171 

172 report = self.get_report() 

173 metrics = report["metrics"] 

174 

175 print(f"\n{'='*60}") 

176 print("Timing Summary") 

177 print(f"{'='*60}") 

178 print(f"Total Duration: {metrics.get('total_duration_seconds', 0)}s") 

179 print(f"Throughput: {metrics.get('throughput_records_per_sec', 0)} records/sec") 

180 print(f"Input Records: {metrics.get('input_records', 0)}") 

181 print(f"Curated Records: {metrics.get('curated_records', 0)}") 

182 print(f"Entities Created: {metrics.get('entities_created', 0)}") 

183 print(f"Modified Entities: {metrics.get('modified_entities', 0)}") 

184 print(f"\nPhase Breakdown:") 

185 for phase in report["phases"]: 

186 if phase["name"] not in ["total_processing", "creator_execution", "provenance_generation"]: 

187 print(f" {phase['name']}: {phase['duration_seconds']}s") 

188 print(f"{'='*60}\n") 

189 

190 def print_phase_breakdown(self): 

191 """Print detailed phase breakdown for a single file.""" 

192 if not self.enabled: 

193 return 

194 

195 report = self.get_report() 

196 phases = report["phases"] 

197 

198 print(f"\n Phase Breakdown:") 

199 for phase in phases: 

200 if phase["name"] == "total_processing": 

201 continue 

202 name = phase["name"] 

203 duration = phase["duration_seconds"] 

204 peak = phase["peak_memory_mb"] 

205 if peak: 

206 delta = phase["end_memory_mb"] - phase["start_memory_mb"] 

207 sign = "+" if delta >= 0 else "" 

208 print(f" {name:30s} {duration:10.2f}s {peak:10.1f} MB peak {sign}{delta:.1f} MB") 

209 else: 

210 print(f" {name:30s} {duration:10.2f}s") 

211 

212 def print_file_summary(self, filename: str): 

213 """Print complete summary for a single file with metrics and phases.""" 

214 if not self.enabled: 

215 return 

216 

217 report = self.get_report() 

218 metrics = report["metrics"] 

219 

220 total_time = metrics.get("total_duration_seconds", 0) 

221 records = metrics.get("input_records", 0) 

222 entities = metrics.get("entities_created", 0) 

223 throughput = metrics.get("throughput_records_per_sec", 0) 

224 

225 total_phase = next( 

226 (p for p in report["phases"] if p["name"] == "total_processing"), None 

227 ) 

228 peak_mb = total_phase["peak_memory_mb"] if total_phase else 0 

229 delta_mb = (total_phase["end_memory_mb"] - total_phase["start_memory_mb"]) if total_phase else 0 

230 

231 print(f" ✓ Completed in {total_time:.2f}s") 

232 self.print_phase_breakdown() 

233 print(f"\n Metrics:") 

234 print(f" Records processed: {records}") 

235 print(f" Entities created: {entities}") 

236 print(f" Throughput: {throughput:.2f} rec/s") 

237 if peak_mb: 

238 sign = "+" if delta_mb >= 0 else "" 

239 print(f" Peak memory (RSS): {peak_mb:.1f} MB") 

240 print(f" Memory growth: {sign}{delta_mb:.1f} MB")