Coverage for oc_meta / lib / console.py: 88%
58 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5# -*- coding: utf-8 -*-
6from datetime import timedelta
7from math import ceil
9from rich.console import Console
10from rich.progress import (
11 BarColumn,
12 Progress,
13 ProgressColumn,
14 SpinnerColumn,
15 Task,
16 TaskID,
17 TaskProgressColumn,
18 TextColumn,
19 TimeElapsedColumn,
20)
21from rich.text import Text
23console = Console()
26class EMATimeRemainingColumn(ProgressColumn):
27 """Time remaining column blending EMA speed with overall average speed.
29 Rich's default TimeRemainingColumn uses a simple windowed average that becomes
30 unstable with infrequent updates. Pure EMA (α = 0.3) fixes that but over-reacts
31 to speed bursts in long-running tasks, producing wildly optimistic estimates
32 when recent items are fast (e.g. cache hits).
34 This implementation blends:
35 - Overall average speed (completed / elapsed): stable anchor
36 - EMA speed: responsive to recent trends
38 Final speed = _EMA_WEIGHT × EMA + (1 - _EMA_WEIGHT) × overall_average
40 Skip handling:
41 If task.fields contains a "processed" counter, speed is calculated based on
42 processed items only (ignoring skipped items). This prevents cache hits from
43 falsely inflating speed estimates. Use progress.update(task, advance=1, processed=1)
44 for actual work, and progress.update(task, advance=1) for skipped items.
45 """
47 max_refresh = 0.5
48 _SMOOTHING = 0.3
49 _EMA_WEIGHT = 0.3
51 def __init__(self):
52 self._ema_speed: dict[int, float] = {}
53 self._last_processed: dict[int, float] = {}
54 self._last_time: dict[int, float] = {}
55 self._start_time: dict[int, float] = {}
56 super().__init__()
58 def render(self, task: Task) -> Text:
59 if task.finished:
60 return Text("0:00:00", style="progress.remaining")
61 if task.total is None or task.remaining is None:
62 return Text("-:--:--", style="progress.remaining")
64 current_time = task.get_time()
65 task_id = task.id
67 current_processed = task.fields.get("processed", task.completed)
69 if task_id not in self._start_time:
70 self._start_time[task_id] = current_time
72 if task_id in self._last_time:
73 dt = current_time - self._last_time[task_id]
74 dp = current_processed - self._last_processed[task_id]
76 if dt > 0 and dp > 0:
77 instant_speed = dp / dt
79 if task_id in self._ema_speed:
80 self._ema_speed[task_id] = (
81 self._SMOOTHING * instant_speed
82 + (1 - self._SMOOTHING) * self._ema_speed[task_id]
83 )
84 else:
85 self._ema_speed[task_id] = instant_speed
87 self._last_time[task_id] = current_time
88 self._last_processed[task_id] = current_processed
90 if task_id not in self._last_time:
91 self._last_time[task_id] = current_time
92 self._last_processed[task_id] = current_processed
94 ema_speed = self._ema_speed.get(task_id)
95 if not ema_speed:
96 return Text("-:--:--", style="progress.remaining")
98 elapsed = current_time - self._start_time[task_id]
99 if elapsed > 0 and current_processed > 0:
100 overall_speed = current_processed / elapsed
101 speed = self._EMA_WEIGHT * ema_speed + (1 - self._EMA_WEIGHT) * overall_speed
102 else:
103 speed = ema_speed
105 estimate = ceil(task.remaining / speed)
106 delta = timedelta(seconds=estimate)
107 return Text(str(delta), style="progress.remaining")
110def create_progress() -> Progress:
111 return Progress(
112 SpinnerColumn(),
113 TextColumn("[progress.description]{task.description}"),
114 BarColumn(),
115 TaskProgressColumn(),
116 TextColumn("[cyan]{task.completed}/{task.total}[/cyan]"),
117 TimeElapsedColumn(),
118 EMATimeRemainingColumn(),
119 console=console,
120 )
123def advance_progress(
124 progress: Progress,
125 task_id: TaskID,
126 advance: int = 1,
127 processed: bool = True,
128) -> None:
129 """Advance progress bar, optionally marking items as actually processed.
131 Args:
132 progress: The Progress instance
133 task_id: The task ID to advance
134 advance: Number of items to advance (default: 1)
135 processed: If True, count as actual work done (affects time estimate).
136 If False, count as skipped/cached (progress advances but
137 doesn't affect time estimate).
138 """
139 task = progress._tasks[task_id]
140 current_processed = task.fields.get("processed", 0)
141 if processed:
142 progress.update(task_id, advance=advance, processed=current_processed + advance)
143 else:
144 progress.update(task_id, advance=advance, processed=current_processed)