Coverage for oc_ds_converter / lib / console.py: 94%
49 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5from datetime import timedelta
6from math import ceil
8from rich.console import Console
9from rich.progress import BarColumn, Progress, ProgressColumn, SpinnerColumn, Task, TaskID, TaskProgressColumn, TextColumn, TimeElapsedColumn
10from rich.text import Text
12console = Console()
15class EMATimeRemainingColumn(ProgressColumn):
16 """Time remaining column using Exponential Moving Average for stable estimates.
18 Rich's default TimeRemainingColumn uses a simple windowed average that becomes
19 unstable with infrequent updates. This implementation uses EMA (like tqdm) to
20 provide smoother estimates by weighting recent observations more while retaining
21 historical information.
23 EMA formula: EMA_new = α × current_value + (1 - α) × EMA_previous
25 With α = 0.3 (default):
26 - 30% weight to the newly measured speed
27 - 70% weight to the historical average (which itself contains 70% of previous
28 history, creating exponential decay of older values)
30 Skip handling:
31 If task.fields contains a "processed" counter, speed is calculated based on
32 processed items only (ignoring skipped items). This prevents cache hits from
33 falsely inflating speed estimates. Use progress.update(task, advance=1, processed=1)
34 for actual work, and progress.update(task, advance=1) for skipped items.
35 """
37 # Limit refresh rate to avoid excessive recalculations
38 max_refresh = 0.5
40 def __init__(self, smoothing: float = 0.3):
41 # α (alpha): smoothing factor between 0 and 1
42 # - Lower values (e.g., 0.1) = more stable but slower to react
43 # - Higher values (e.g., 0.5) = more reactive but potentially volatile
44 self.smoothing = smoothing
45 # Store EMA speed estimate per task (supports multiple progress bars)
46 self._ema_speed: dict[int, float] = {}
47 # Store previous state to calculate instantaneous speed
48 self._last_processed: dict[int, float] = {}
49 self._last_time: dict[int, float] = {}
50 super().__init__()
52 def render(self, task: Task) -> Text:
53 if task.finished:
54 return Text("0:00:00", style="progress.remaining")
55 if task.total is None or task.remaining is None:
56 return Text("-:--:--", style="progress.remaining")
58 current_time = task.get_time()
59 task_id = task.id
61 # Use "processed" field if available, otherwise fall back to "completed"
62 # "processed" tracks only actual work done, excluding skipped/cached items
63 current_processed = task.fields.get("processed", task.completed)
65 # Calculate instantaneous speed if we have a previous measurement
66 if task_id in self._last_time:
67 # Time elapsed since last update
68 dt = current_time - self._last_time[task_id]
69 # Work actually processed since last update (excludes skips)
70 dp = current_processed - self._last_processed[task_id]
72 # Only update EMA when actual processing happened (dp > 0)
73 # Skip updates (dp == 0) don't affect speed calculation
74 if dt > 0 and dp > 0:
75 # Instantaneous speed = work done / time taken
76 instant_speed = dp / dt
78 if task_id in self._ema_speed:
79 # EMA formula: blend new measurement with historical average
80 # EMA = α × instant_speed + (1 - α) × previous_EMA
81 self._ema_speed[task_id] = (
82 self.smoothing * instant_speed
83 + (1 - self.smoothing) * self._ema_speed[task_id]
84 )
85 else:
86 # First measurement: use it directly as initial EMA
87 self._ema_speed[task_id] = instant_speed
89 # Only update last_time when processing happened, so dt measures
90 # time between processed items, not time between renders
91 self._last_time[task_id] = current_time
92 self._last_processed[task_id] = current_processed
94 # Initialize tracking state on first render
95 if task_id not in self._last_time:
96 self._last_time[task_id] = current_time
97 self._last_processed[task_id] = current_processed
99 # Calculate time remaining using EMA speed
100 speed = self._ema_speed.get(task_id)
101 if not speed:
102 return Text("-:--:--", style="progress.remaining")
104 # time_remaining = work_remaining / speed
105 estimate = ceil(task.remaining / speed)
106 delta = timedelta(seconds=estimate)
107 return Text(str(delta), style="progress.remaining")
110def create_progress() -> Progress:
111 return Progress(
112 SpinnerColumn(),
113 TextColumn("[progress.description]{task.description}"),
114 BarColumn(),
115 TaskProgressColumn(),
116 TextColumn("[cyan]{task.completed}/{task.total}[/cyan]"),
117 TimeElapsedColumn(),
118 EMATimeRemainingColumn(),
119 console=console,
120 )
123def advance_progress(
124 progress: Progress,
125 task_id: TaskID,
126 advance: int = 1,
127 processed: bool = True,
128) -> None:
129 """Advance progress bar, optionally marking items as actually processed.
131 Args:
132 progress: The Progress instance
133 task_id: The task ID to advance
134 advance: Number of items to advance (default: 1)
135 processed: If True, count as actual work done (affects time estimate).
136 If False, count as skipped/cached (progress advances but
137 doesn't affect time estimate).
138 """
139 task = progress._tasks[task_id]
140 current_processed = task.fields.get("processed", 0)
141 if processed:
142 progress.update(task_id, advance=advance, processed=current_processed + advance)
143 else:
144 progress.update(task_id, advance=advance, processed=current_processed)