| import atexit |
| from threading import Event, Thread, current_thread |
| from time import time |
| from warnings import warn |
| |
| __all__ = ["TMonitor", "TqdmSynchronisationWarning"] |
| |
| |
| class TqdmSynchronisationWarning(RuntimeWarning): |
| """tqdm multi-thread/-process errors which may cause incorrect nesting |
| but otherwise no adverse effects""" |
| pass |
| |
| |
| class TMonitor(Thread): |
| """ |
| Monitoring thread for tqdm bars. |
| Monitors if tqdm bars are taking too much time to display |
| and readjusts miniters automatically if necessary. |
| |
| Parameters |
| ---------- |
| tqdm_cls : class |
| tqdm class to use (can be core tqdm or a submodule). |
| sleep_interval : float |
| Time to sleep between monitoring checks. |
| """ |
| _test = {} # internal vars for unit testing |
| |
| def __init__(self, tqdm_cls, sleep_interval): |
| Thread.__init__(self) |
| self.daemon = True # kill thread when main killed (KeyboardInterrupt) |
| self.woken = 0 # last time woken up, to sync with monitor |
| self.tqdm_cls = tqdm_cls |
| self.sleep_interval = sleep_interval |
| self._time = self._test.get("time", time) |
| self.was_killed = self._test.get("Event", Event)() |
| atexit.register(self.exit) |
| self.start() |
| |
| def exit(self): |
| self.was_killed.set() |
| if self is not current_thread(): |
| self.join() |
| return self.report() |
| |
| def get_instances(self): |
| # returns a copy of started `tqdm_cls` instances |
| return [i for i in self.tqdm_cls._instances.copy() |
| # Avoid race by checking that the instance started |
| if hasattr(i, 'start_t')] |
| |
| def run(self): |
| cur_t = self._time() |
| while True: |
| # After processing and before sleeping, notify that we woke |
| # Need to be done just before sleeping |
| self.woken = cur_t |
| # Sleep some time... |
| self.was_killed.wait(self.sleep_interval) |
| # Quit if killed |
| if self.was_killed.is_set(): |
| return |
| # Then monitor! |
| # Acquire lock (to access _instances) |
| with self.tqdm_cls.get_lock(): |
| cur_t = self._time() |
| # Check tqdm instances are waiting too long to print |
| instances = self.get_instances() |
| for instance in instances: |
| # Check event in loop to reduce blocking time on exit |
| if self.was_killed.is_set(): |
| return |
| # Only if mininterval > 1 (else iterations are just slow) |
| # and last refresh exceeded maxinterval |
| if ( |
| instance.miniters > 1 |
| and (cur_t - instance.last_print_t) >= instance.maxinterval |
| ): |
| # force bypassing miniters on next iteration |
| # (dynamic_miniters adjusts mininterval automatically) |
| instance.miniters = 1 |
| # Refresh now! (works only for manual tqdm) |
| instance.refresh(nolock=True) |
| # Remove accidental long-lived strong reference |
| del instance |
| if instances != self.get_instances(): # pragma: nocover |
| warn("Set changed size during iteration" + |
| " (see https://github.com/tqdm/tqdm/issues/481)", |
| TqdmSynchronisationWarning, stacklevel=2) |
| # Remove accidental long-lived strong references |
| del instances |
| |
| def report(self): |
| return not self.was_killed.is_set() |