Coverage for oc_ds_converter / lib / console.py: 94%

49 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-25 18:06 +0000

1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5from datetime import timedelta 

6from math import ceil 

7 

8from rich.console import Console 

9from rich.progress import BarColumn, Progress, ProgressColumn, SpinnerColumn, Task, TaskID, TaskProgressColumn, TextColumn, TimeElapsedColumn 

10from rich.text import Text 

11 

12console = Console() 

13 

14 

15class EMATimeRemainingColumn(ProgressColumn): 

16 """Time remaining column using Exponential Moving Average for stable estimates. 

17 

18 Rich's default TimeRemainingColumn uses a simple windowed average that becomes 

19 unstable with infrequent updates. This implementation uses EMA (like tqdm) to 

20 provide smoother estimates by weighting recent observations more while retaining 

21 historical information. 

22 

23 EMA formula: EMA_new = α × current_value + (1 - α) × EMA_previous 

24 

25 With α = 0.3 (default): 

26 - 30% weight to the newly measured speed 

27 - 70% weight to the historical average (which itself contains 70% of previous 

28 history, creating exponential decay of older values) 

29 

30 Skip handling: 

31 If task.fields contains a "processed" counter, speed is calculated based on 

32 processed items only (ignoring skipped items). This prevents cache hits from 

33 falsely inflating speed estimates. Use progress.update(task, advance=1, processed=1) 

34 for actual work, and progress.update(task, advance=1) for skipped items. 

35 """ 

36 

37 # Limit refresh rate to avoid excessive recalculations 

38 max_refresh = 0.5 

39 

40 def __init__(self, smoothing: float = 0.3): 

41 # α (alpha): smoothing factor between 0 and 1 

42 # - Lower values (e.g., 0.1) = more stable but slower to react 

43 # - Higher values (e.g., 0.5) = more reactive but potentially volatile 

44 self.smoothing = smoothing 

45 # Store EMA speed estimate per task (supports multiple progress bars) 

46 self._ema_speed: dict[int, float] = {} 

47 # Store previous state to calculate instantaneous speed 

48 self._last_processed: dict[int, float] = {} 

49 self._last_time: dict[int, float] = {} 

50 super().__init__() 

51 

52 def render(self, task: Task) -> Text: 

53 if task.finished: 

54 return Text("0:00:00", style="progress.remaining") 

55 if task.total is None or task.remaining is None: 

56 return Text("-:--:--", style="progress.remaining") 

57 

58 current_time = task.get_time() 

59 task_id = task.id 

60 

61 # Use "processed" field if available, otherwise fall back to "completed" 

62 # "processed" tracks only actual work done, excluding skipped/cached items 

63 current_processed = task.fields.get("processed", task.completed) 

64 

65 # Calculate instantaneous speed if we have a previous measurement 

66 if task_id in self._last_time: 

67 # Time elapsed since last update 

68 dt = current_time - self._last_time[task_id] 

69 # Work actually processed since last update (excludes skips) 

70 dp = current_processed - self._last_processed[task_id] 

71 

72 # Only update EMA when actual processing happened (dp > 0) 

73 # Skip updates (dp == 0) don't affect speed calculation 

74 if dt > 0 and dp > 0: 

75 # Instantaneous speed = work done / time taken 

76 instant_speed = dp / dt 

77 

78 if task_id in self._ema_speed: 

79 # EMA formula: blend new measurement with historical average 

80 # EMA = α × instant_speed + (1 - α) × previous_EMA 

81 self._ema_speed[task_id] = ( 

82 self.smoothing * instant_speed 

83 + (1 - self.smoothing) * self._ema_speed[task_id] 

84 ) 

85 else: 

86 # First measurement: use it directly as initial EMA 

87 self._ema_speed[task_id] = instant_speed 

88 

89 # Only update last_time when processing happened, so dt measures 

90 # time between processed items, not time between renders 

91 self._last_time[task_id] = current_time 

92 self._last_processed[task_id] = current_processed 

93 

94 # Initialize tracking state on first render 

95 if task_id not in self._last_time: 

96 self._last_time[task_id] = current_time 

97 self._last_processed[task_id] = current_processed 

98 

99 # Calculate time remaining using EMA speed 

100 speed = self._ema_speed.get(task_id) 

101 if not speed: 

102 return Text("-:--:--", style="progress.remaining") 

103 

104 # time_remaining = work_remaining / speed 

105 estimate = ceil(task.remaining / speed) 

106 delta = timedelta(seconds=estimate) 

107 return Text(str(delta), style="progress.remaining") 

108 

109 

110def create_progress() -> Progress: 

111 return Progress( 

112 SpinnerColumn(), 

113 TextColumn("[progress.description]{task.description}"), 

114 BarColumn(), 

115 TaskProgressColumn(), 

116 TextColumn("[cyan]{task.completed}/{task.total}[/cyan]"), 

117 TimeElapsedColumn(), 

118 EMATimeRemainingColumn(), 

119 console=console, 

120 ) 

121 

122 

123def advance_progress( 

124 progress: Progress, 

125 task_id: TaskID, 

126 advance: int = 1, 

127 processed: bool = True, 

128) -> None: 

129 """Advance progress bar, optionally marking items as actually processed. 

130 

131 Args: 

132 progress: The Progress instance 

133 task_id: The task ID to advance 

134 advance: Number of items to advance (default: 1) 

135 processed: If True, count as actual work done (affects time estimate). 

136 If False, count as skipped/cached (progress advances but 

137 doesn't affect time estimate). 

138 """ 

139 task = progress._tasks[task_id] 

140 current_processed = task.fields.get("processed", 0) 

141 if processed: 

142 progress.update(task_id, advance=advance, processed=current_processed + advance) 

143 else: 

144 progress.update(task_id, advance=advance, processed=current_processed)