Coverage for oc_meta / lib / console.py: 88%

58 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-21 09:24 +0000

1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5# -*- coding: utf-8 -*- 

6from datetime import timedelta 

7from math import ceil 

8 

9from rich.console import Console 

10from rich.progress import ( 

11 BarColumn, 

12 Progress, 

13 ProgressColumn, 

14 SpinnerColumn, 

15 Task, 

16 TaskID, 

17 TaskProgressColumn, 

18 TextColumn, 

19 TimeElapsedColumn, 

20) 

21from rich.text import Text 

22 

23console = Console() 

24 

25 

26class EMATimeRemainingColumn(ProgressColumn): 

27 """Time remaining column blending EMA speed with overall average speed. 

28 

29 Rich's default TimeRemainingColumn uses a simple windowed average that becomes 

30 unstable with infrequent updates. Pure EMA (α = 0.3) fixes that but over-reacts 

31 to speed bursts in long-running tasks, producing wildly optimistic estimates 

32 when recent items are fast (e.g. cache hits). 

33 

34 This implementation blends: 

35 - Overall average speed (completed / elapsed): stable anchor 

36 - EMA speed: responsive to recent trends 

37 

38 Final speed = _EMA_WEIGHT × EMA + (1 - _EMA_WEIGHT) × overall_average 

39 

40 Skip handling: 

41 If task.fields contains a "processed" counter, speed is calculated based on 

42 processed items only (ignoring skipped items). This prevents cache hits from 

43 falsely inflating speed estimates. Use progress.update(task, advance=1, processed=1) 

44 for actual work, and progress.update(task, advance=1) for skipped items. 

45 """ 

46 

47 max_refresh = 0.5 

48 _SMOOTHING = 0.3 

49 _EMA_WEIGHT = 0.3 

50 

51 def __init__(self): 

52 self._ema_speed: dict[int, float] = {} 

53 self._last_processed: dict[int, float] = {} 

54 self._last_time: dict[int, float] = {} 

55 self._start_time: dict[int, float] = {} 

56 super().__init__() 

57 

58 def render(self, task: Task) -> Text: 

59 if task.finished: 

60 return Text("0:00:00", style="progress.remaining") 

61 if task.total is None or task.remaining is None: 

62 return Text("-:--:--", style="progress.remaining") 

63 

64 current_time = task.get_time() 

65 task_id = task.id 

66 

67 current_processed = task.fields.get("processed", task.completed) 

68 

69 if task_id not in self._start_time: 

70 self._start_time[task_id] = current_time 

71 

72 if task_id in self._last_time: 

73 dt = current_time - self._last_time[task_id] 

74 dp = current_processed - self._last_processed[task_id] 

75 

76 if dt > 0 and dp > 0: 

77 instant_speed = dp / dt 

78 

79 if task_id in self._ema_speed: 

80 self._ema_speed[task_id] = ( 

81 self._SMOOTHING * instant_speed 

82 + (1 - self._SMOOTHING) * self._ema_speed[task_id] 

83 ) 

84 else: 

85 self._ema_speed[task_id] = instant_speed 

86 

87 self._last_time[task_id] = current_time 

88 self._last_processed[task_id] = current_processed 

89 

90 if task_id not in self._last_time: 

91 self._last_time[task_id] = current_time 

92 self._last_processed[task_id] = current_processed 

93 

94 ema_speed = self._ema_speed.get(task_id) 

95 if not ema_speed: 

96 return Text("-:--:--", style="progress.remaining") 

97 

98 elapsed = current_time - self._start_time[task_id] 

99 if elapsed > 0 and current_processed > 0: 

100 overall_speed = current_processed / elapsed 

101 speed = self._EMA_WEIGHT * ema_speed + (1 - self._EMA_WEIGHT) * overall_speed 

102 else: 

103 speed = ema_speed 

104 

105 estimate = ceil(task.remaining / speed) 

106 delta = timedelta(seconds=estimate) 

107 return Text(str(delta), style="progress.remaining") 

108 

109 

110def create_progress() -> Progress: 

111 return Progress( 

112 SpinnerColumn(), 

113 TextColumn("[progress.description]{task.description}"), 

114 BarColumn(), 

115 TaskProgressColumn(), 

116 TextColumn("[cyan]{task.completed}/{task.total}[/cyan]"), 

117 TimeElapsedColumn(), 

118 EMATimeRemainingColumn(), 

119 console=console, 

120 ) 

121 

122 

123def advance_progress( 

124 progress: Progress, 

125 task_id: TaskID, 

126 advance: int = 1, 

127 processed: bool = True, 

128) -> None: 

129 """Advance progress bar, optionally marking items as actually processed. 

130 

131 Args: 

132 progress: The Progress instance 

133 task_id: The task ID to advance 

134 advance: Number of items to advance (default: 1) 

135 processed: If True, count as actual work done (affects time estimate). 

136 If False, count as skipped/cached (progress advances but 

137 doesn't affect time estimate). 

138 """ 

139 task = progress._tasks[task_id] 

140 current_processed = task.fields.get("processed", 0) 

141 if processed: 

142 progress.update(task_id, advance=advance, processed=current_processed + advance) 

143 else: 

144 progress.update(task_id, advance=advance, processed=current_processed)