Coverage for oc_meta/lib/timer.py: 26%

114 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-12-20 08:55 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3""" 

4Timing and metrics collection utilities for OpenCitations Meta processing. 

5 

6This module provides reusable timing infrastructure for both production 

7processing and benchmarking, with optional activation to avoid overhead. 

8""" 

9 

10import time 

11from typing import Any, Dict, List, Optional 

12 

13import psutil 

14 

15 

16class BenchmarkTimer: 

17 """Context manager for timing code blocks and collecting memory metrics.""" 

18 

19 def __init__(self, name: str, verbose: bool = False): 

20 self.name = name 

21 self.verbose = verbose 

22 self.start_time: Optional[float] = None 

23 self.end_time: Optional[float] = None 

24 self.duration: Optional[float] = None 

25 self.start_memory: Optional[int] = None 

26 self.end_memory: Optional[int] = None 

27 self.peak_memory: Optional[int] = None 

28 

29 def __enter__(self): 

30 self.start_time = time.time() 

31 process = psutil.Process() 

32 self.start_memory = process.memory_info().rss 

33 if self.verbose: 

34 print(f" [{self.name}] Starting...") 

35 return self 

36 

37 def __exit__(self, exc_type, exc_val, exc_tb): 

38 self.end_time = time.time() 

39 self.duration = self.end_time - self.start_time 

40 process = psutil.Process() 

41 self.end_memory = process.memory_info().rss 

42 self.peak_memory = max(self.start_memory, self.end_memory) 

43 if self.verbose: 

44 print(f" [{self.name}] Completed in {self.duration:.2f}s") 

45 

46 def to_dict(self) -> Dict[str, Any]: 

47 """Convert timing data to dictionary.""" 

48 return { 

49 "name": self.name, 

50 "duration_seconds": round(self.duration, 3) if self.duration else None, 

51 "start_memory_mb": round(self.start_memory / 1024 / 1024, 2) if self.start_memory else None, 

52 "end_memory_mb": round(self.end_memory / 1024 / 1024, 2) if self.end_memory else None, 

53 "peak_memory_mb": round(self.peak_memory / 1024 / 1024, 2) if self.peak_memory else None, 

54 } 

55 

56 

57class DummyTimer: 

58 """No-op timer for when timing is disabled.""" 

59 

60 def __enter__(self): 

61 return self 

62 

63 def __exit__(self, *args): 

64 pass 

65 

66 

67class ProcessTimer: 

68 """Optional timing wrapper for MetaProcess operations.""" 

69 

70 def __init__(self, enabled: bool = False, verbose: bool = False): 

71 self.enabled = enabled 

72 self.verbose = verbose 

73 self.timers: List[BenchmarkTimer] = [] 

74 self.metrics: Dict[str, Any] = {} 

75 

76 def timer(self, name: str): 

77 """Create a timer context manager (or no-op if disabled).""" 

78 if self.enabled: 

79 # Don't show verbose for total_processing and sub-timers 

80 show_verbose = self.verbose and name not in ["total_processing", "creator_execution", "provenance_generation"] 

81 timer = BenchmarkTimer(name, verbose=show_verbose) 

82 self.timers.append(timer) 

83 return timer 

84 else: 

85 return DummyTimer() 

86 

87 def record_metric(self, key: str, value: Any): 

88 """Record a metric.""" 

89 if self.enabled: 

90 self.metrics[key] = value 

91 

92 def record_phase(self, name: str, duration: float): 

93 """Record a phase with a specific duration (e.g., 0 for unused phases).""" 

94 if self.enabled: 

95 timer = BenchmarkTimer(name, verbose=False) 

96 timer.start_time = 0 

97 timer.end_time = duration 

98 timer.duration = duration 

99 timer.start_memory = 0 

100 timer.end_memory = 0 

101 timer.peak_memory = 0 

102 self.timers.append(timer) 

103 

104 def get_report(self) -> Dict[str, Any]: 

105 """Generate timing report.""" 

106 if not self.enabled: 

107 return {} 

108 

109 total_time = next((t.duration for t in self.timers if t.name == "total_processing"), 0) 

110 input_records = self.metrics.get("input_records", 0) 

111 

112 return { 

113 "metrics": { 

114 **self.metrics, 

115 "total_duration_seconds": round(total_time, 3), 

116 "throughput_records_per_sec": round(input_records / total_time, 2) if total_time > 0 else 0 

117 }, 

118 "phases": [t.to_dict() for t in self.timers], 

119 } 

120 

121 def print_summary(self): 

122 """Print timing summary to console.""" 

123 if not self.enabled: 

124 return 

125 

126 report = self.get_report() 

127 metrics = report["metrics"] 

128 

129 print(f"\n{'='*60}") 

130 print("Timing Summary") 

131 print(f"{'='*60}") 

132 print(f"Total Duration: {metrics.get('total_duration_seconds', 0)}s") 

133 print(f"Throughput: {metrics.get('throughput_records_per_sec', 0)} records/sec") 

134 print(f"Input Records: {metrics.get('input_records', 0)}") 

135 print(f"Curated Records: {metrics.get('curated_records', 0)}") 

136 print(f"Entities Created: {metrics.get('entities_created', 0)}") 

137 print(f"Modified Entities: {metrics.get('modified_entities', 0)}") 

138 print(f"\nPhase Breakdown:") 

139 for phase in report["phases"]: 

140 if phase["name"] not in ["total_processing", "creator_execution", "provenance_generation"]: 

141 print(f" {phase['name']}: {phase['duration_seconds']}s") 

142 print(f"{'='*60}\n") 

143 

144 def print_phase_breakdown(self): 

145 """Print detailed phase breakdown for a single file.""" 

146 if not self.enabled: 

147 return 

148 

149 report = self.get_report() 

150 phases = report["phases"] 

151 

152 print(f"\n Phase Breakdown:") 

153 for phase in phases: 

154 if phase["name"] == "total_processing": 

155 continue 

156 name = phase["name"] 

157 duration = phase["duration_seconds"] 

158 print(f" {name:25s} {duration:8.2f}s") 

159 

160 def print_file_summary(self, filename: str): 

161 """Print complete summary for a single file with metrics and phases.""" 

162 if not self.enabled: 

163 return 

164 

165 report = self.get_report() 

166 metrics = report["metrics"] 

167 

168 total_time = metrics.get("total_duration_seconds", 0) 

169 records = metrics.get("input_records", 0) 

170 entities = metrics.get("entities_created", 0) 

171 throughput = metrics.get("throughput_records_per_sec", 0) 

172 

173 print(f" ✓ Completed in {total_time:.2f}s") 

174 self.print_phase_breakdown() 

175 print(f"\n Metrics:") 

176 print(f" Records processed: {records}") 

177 print(f" Entities created: {entities}") 

178 print(f" Throughput: {throughput:.2f} rec/s")