Source code for plumbum.cli.progress

"""
Progress bar
------------
"""

from __future__ import annotations

import datetime
import sys
import warnings
from abc import ABC, abstractmethod

from plumbum.cli.termsize import get_terminal_size


[docs]class ProgressBase(ABC): """Base class for progress bars. Customize for types of progress bars. :param iterator: The iterator to wrap with a progress bar :param length: The length of the iterator (will use ``__len__`` if None) :param timer: Try to time the completion status of the iterator :param body: True if the slow portion occurs outside the iterator (in a loop, for example) :param has_output: True if the iteration body produces output to the screen (forces rewrite off) :param clear: Clear the progress bar afterwards, if applicable. """ def __init__( self, iterator=None, length=None, timer=True, body=False, has_output=False, clear=True, ): if length is None: length = len(iterator) elif iterator is None: iterator = range(length) elif length is None and iterator is None: raise TypeError("Expected either an iterator or a length") self.length = length self.iterator = iterator self.timer = timer self.body = body self.has_output = has_output self.clear = clear def __len__(self): return self.length def __iter__(self): self.start() return self
[docs] @abstractmethod def start(self): """This should initialize the progress bar and the iterator""" self.iter = iter(self.iterator) self.value = -1 if self.body else 0 self._start_time = datetime.datetime.now()
def __next__(self): try: rval = next(self.iter) self.increment() except StopIteration: self.done() raise return rval def next(self): return next(self) @property def value(self): """This is the current value, as a property so setting it can be customized""" return self._value @value.setter def value(self, val): self._value = val
[docs] @abstractmethod def display(self): """Called to update the progress bar"""
[docs] def increment(self): """Sets next value and displays the bar""" self.value += 1 self.display()
[docs] def time_remaining(self): """Get the time remaining for the progress bar, guesses""" if self.value < 1: return None, None elapsed_time = datetime.datetime.now() - self._start_time time_each = ( elapsed_time.days * 24 * 60 * 60 + elapsed_time.seconds + elapsed_time.microseconds / 1000000.0 ) / self.value time_remaining = time_each * (self.length - self.value) return elapsed_time, datetime.timedelta(0, time_remaining, 0)
[docs] def str_time_remaining(self): """Returns a string version of time remaining""" if self.value < 1: return "Starting... " elapsed_time, time_remaining = list(map(str, self.time_remaining())) completed = elapsed_time.split(".")[0] remaining = time_remaining.split(".")[0] return f"{completed} completed, {remaining} remaining"
[docs] @abstractmethod def done(self): """Is called when the iterator is done."""
[docs] @classmethod def range(cls, *value, **kargs): """Fast shortcut to create a range based progress bar, assumes work done in body""" return cls(range(*value), body=True, **kargs)
[docs] @classmethod def wrap(cls, iterator, length=None, **kargs): """Shortcut to wrap an iterator that does not do all the work internally""" return cls(iterator, length, body=True, **kargs)
[docs]class Progress(ProgressBase):
[docs] def start(self): super().start() self.display()
[docs] def done(self): self.value = self.length self.display() if self.clear and not self.has_output: sys.stdout.write("\r" + len(str(self)) * " " + "\r") else: sys.stdout.write("\n") sys.stdout.flush()
def __str__(self): width = get_terminal_size(default=(0, 0))[0] if self.length == 0: self.width = 0 return "0/0 complete" percent = max(self.value, 0) / self.length ending = " " + ( self.str_time_remaining() if self.timer else f"{self.value} of {self.length} complete" ) if width - len(ending) < 10 or self.has_output: self.width = 0 if self.timer: return f"{percent:.0%} complete: {self.str_time_remaining()}" return f"{percent:.0%} complete" self.width = width - len(ending) - 2 - 1 nstars = int(percent * self.width) pbar = "[" + "*" * nstars + " " * (self.width - nstars) + "]" + ending str_percent = f" {percent:.0%} " return ( pbar[: self.width // 2 - 2] + str_percent + pbar[self.width // 2 + len(str_percent) - 2 :] )
[docs] def display(self): disptxt = str(self) if self.width == 0 or self.has_output: sys.stdout.write(disptxt + "\n") else: sys.stdout.write("\r") sys.stdout.write(disptxt) sys.stdout.flush()
[docs]class ProgressIPy(ProgressBase): # pragma: no cover HTMLBOX = '<div class="widget-hbox widget-progress"><div class="widget-label" style="display:block;">{0}</div></div>' def __init__(self, *args, **kargs): # Ipython gives warnings when using widgets about the API potentially changing with warnings.catch_warnings(): warnings.simplefilter("ignore") try: from ipywidgets import HTML, HBox, IntProgress except ImportError: # Support IPython < 4.0 from IPython.html.widgets import HTML, HBox, IntProgress super().__init__(*args, **kargs) self.prog = IntProgress(max=self.length) self._label = HTML() self._box = HBox((self.prog, self._label))
[docs] def start(self): from IPython.display import display display(self._box) super().start()
@property def value(self): """This is the current value, -1 allowed (automatically fixed for display)""" return self._value @value.setter def value(self, val): self._value = val self.prog.value = max(val, 0) self.prog.description = f"{self.value / self.length:.2%}" if self.timer and val > 0: self._label.value = self.HTMLBOX.format(self.str_time_remaining())
[docs] def display(self): pass
[docs] def done(self): if self.clear: self._box.close()
[docs]class ProgressAuto(ProgressBase): """Automatically selects the best progress bar (IPython HTML or text). Does not work with qtconsole (as that is correctly identified as identical to notebook, since the kernel is the same); it will still iterate, but no graphical indication will be displayed. :param iterator: The iterator to wrap with a progress bar :param length: The length of the iterator (will use ``__len__`` if None) :param timer: Try to time the completion status of the iterator :param body: True if the slow portion occurs outside the iterator (in a loop, for example) """ def __new__(cls, *args, **kargs): """Uses the generator trick that if a cls instance is returned, the __init__ method is not called.""" try: # pragma: no cover __IPYTHON__ # noqa: B018 try: from traitlets import TraitError except ImportError: # Support for IPython < 4.0 from IPython.utils.traitlets import TraitError try: return ProgressIPy(*args, **kargs) except TraitError: raise NameError() from None except (NameError, ImportError): return Progress(*args, **kargs)
ProgressAuto.register(ProgressIPy) ProgressAuto.register(Progress) def main(): import time tst = Progress.range(20) for _ in tst: time.sleep(1) if __name__ == "__main__": main()