| # -*- coding: utf-8 -*- |
| # Advice: use repr(our_file.read()) to print the full output of tqdm |
| # (else '\r' will replace the previous lines and you'll see only the latest. |
| from __future__ import print_function |
| |
| import csv |
| import os |
| import re |
| import sys |
| from contextlib import contextmanager |
| from functools import wraps |
| from warnings import catch_warnings, simplefilter |
| |
| from pytest import importorskip, mark, raises, skip |
| |
| from tqdm import TqdmDeprecationWarning, TqdmWarning, tqdm, trange |
| from tqdm.contrib import DummyTqdmFile |
| from tqdm.std import EMA, Bar |
| |
| try: |
| from StringIO import StringIO |
| except ImportError: |
| from io import StringIO |
| |
| from io import IOBase # to support unicode strings |
| from io import BytesIO |
| |
| |
| class DeprecationError(Exception): |
| pass |
| |
| |
| # Ensure we can use `with closing(...) as ... :` syntax |
| if getattr(StringIO, '__exit__', False) and getattr(StringIO, '__enter__', False): |
| def closing(arg): |
| return arg |
| else: |
| from contextlib import closing |
| |
| try: |
| _range = xrange |
| except NameError: |
| _range = range |
| |
| try: |
| _unicode = unicode |
| except NameError: |
| _unicode = str |
| |
| nt_and_no_colorama = False |
| if os.name == 'nt': |
| try: |
| import colorama # NOQA |
| except ImportError: |
| nt_and_no_colorama = True |
| |
| # Regex definitions |
| # List of control characters |
| CTRLCHR = [r'\r', r'\n', r'\x1b\[A'] # Need to escape [ for regex |
| # Regular expressions compilation |
| RE_rate = re.compile(r'[^\d](\d[.\d]+)it/s') |
| RE_ctrlchr = re.compile("(%s)" % '|'.join(CTRLCHR)) # Match control chars |
| RE_ctrlchr_excl = re.compile('|'.join(CTRLCHR)) # Match and exclude ctrl chars |
| RE_pos = re.compile(r'([\r\n]+((pos\d+) bar:\s+\d+%|\s{3,6})?[^\r\n]*)') |
| |
| |
| def pos_line_diff(res_list, expected_list, raise_nonempty=True): |
| """ |
| Return differences between two bar output lists. |
| To be used with `RE_pos` |
| """ |
| res = [(r, e) for r, e in zip(res_list, expected_list) |
| for pos in [len(e) - len(e.lstrip('\n'))] # bar position |
| if r != e # simple comparison |
| if not r.startswith(e) # start matches |
| or not ( |
| # move up at end (maybe less due to closing bars) |
| any(r.endswith(end + i * '\x1b[A') for i in range(pos + 1) |
| for end in [ |
| ']', # bar |
| ' ']) # cleared |
| or '100%' in r # completed bar |
| or r == '\n') # final bar |
| or r[(-1 - pos) * len('\x1b[A'):] == '\x1b[A'] # too many moves up |
| if raise_nonempty and (res or len(res_list) != len(expected_list)): |
| if len(res_list) < len(expected_list): |
| res.extend([(None, e) for e in expected_list[len(res_list):]]) |
| elif len(res_list) > len(expected_list): |
| res.extend([(r, None) for r in res_list[len(expected_list):]]) |
| raise AssertionError( |
| "Got => Expected\n" + '\n'.join('%r => %r' % i for i in res)) |
| return res |
| |
| |
| class DiscreteTimer(object): |
| """Virtual discrete time manager, to precisely control time for tests""" |
| def __init__(self): |
| self.t = 0.0 |
| |
| def sleep(self, t): |
| """Sleep = increment the time counter (almost no CPU used)""" |
| self.t += t |
| |
| def time(self): |
| """Get the current time""" |
| return self.t |
| |
| |
| def cpu_timify(t, timer=None): |
| """Force tqdm to use the specified timer instead of system-wide time()""" |
| if timer is None: |
| timer = DiscreteTimer() |
| t._time = timer.time |
| t._sleep = timer.sleep |
| t.start_t = t.last_print_t = t._time() |
| return timer |
| |
| |
| class UnicodeIO(IOBase): |
| """Unicode version of StringIO""" |
| def __init__(self, *args, **kwargs): |
| super(UnicodeIO, self).__init__(*args, **kwargs) |
| self.encoding = 'U8' # io.StringIO supports unicode, but no encoding |
| self.text = '' |
| self.cursor = 0 |
| |
| def __len__(self): |
| return len(self.text) |
| |
| def seek(self, offset): |
| self.cursor = offset |
| |
| def tell(self): |
| return self.cursor |
| |
| def write(self, s): |
| self.text = self.text[:self.cursor] + s + self.text[self.cursor + len(s):] |
| self.cursor += len(s) |
| |
| def read(self, n=-1): |
| _cur = self.cursor |
| self.cursor = len(self) if n < 0 else min(_cur + n, len(self)) |
| return self.text[_cur:self.cursor] |
| |
| def getvalue(self): |
| return self.text |
| |
| |
| def get_bar(all_bars, i=None): |
| """Get a specific update from a whole bar traceback""" |
| # Split according to any used control characters |
| bars_split = RE_ctrlchr_excl.split(all_bars) |
| bars_split = list(filter(None, bars_split)) # filter out empty splits |
| return bars_split if i is None else bars_split[i] |
| |
| |
| def progressbar_rate(bar_str): |
| return float(RE_rate.search(bar_str).group(1)) |
| |
| |
| def squash_ctrlchars(s): |
| """Apply control characters in a string just like a terminal display""" |
| curline = 0 |
| lines = [''] # state of fake terminal |
| for nextctrl in filter(None, RE_ctrlchr.split(s)): |
| # apply control chars |
| if nextctrl == '\r': |
| # go to line beginning (simplified here: just empty the string) |
| lines[curline] = '' |
| elif nextctrl == '\n': |
| if curline >= len(lines) - 1: |
| # wrap-around creates newline |
| lines.append('') |
| # move cursor down |
| curline += 1 |
| elif nextctrl == '\x1b[A': |
| # move cursor up |
| if curline > 0: |
| curline -= 1 |
| else: |
| raise ValueError("Cannot go further up") |
| else: |
| # print message on current line |
| lines[curline] += nextctrl |
| return lines |
| |
| |
| def test_format_interval(): |
| """Test time interval format""" |
| format_interval = tqdm.format_interval |
| |
| assert format_interval(60) == '01:00' |
| assert format_interval(6160) == '1:42:40' |
| assert format_interval(238113) == '66:08:33' |
| |
| |
| def test_format_num(): |
| """Test number format""" |
| format_num = tqdm.format_num |
| |
| assert float(format_num(1337)) == 1337 |
| assert format_num(int(1e6)) == '1e+6' |
| assert format_num(1239876) == '1' '239' '876' |
| |
| |
| def test_format_meter(): |
| """Test statistics and progress bar formatting""" |
| try: |
| unich = unichr |
| except NameError: |
| unich = chr |
| |
| format_meter = tqdm.format_meter |
| |
| assert format_meter(0, 1000, 13) == " 0%| | 0/1000 [00:13<?, ?it/s]" |
| # If not implementing any changes to _tqdm.py, set prefix='desc' |
| # or else ": : " will be in output, so assertion should change |
| assert format_meter(0, 1000, 13, ncols=68, prefix='desc: ') == ( |
| "desc: 0%| | 0/1000 [00:13<?, ?it/s]") |
| assert format_meter(231, 1000, 392) == (" 23%|" + unich(0x2588) * 2 + unich(0x258e) + |
| " | 231/1000 [06:32<21:44, 1.70s/it]") |
| assert format_meter(10000, 1000, 13) == "10000it [00:13, 769.23it/s]" |
| assert format_meter(231, 1000, 392, ncols=56, ascii=True) == " 23%|" + '#' * 3 + '6' + ( |
| " | 231/1000 [06:32<21:44, 1.70s/it]") |
| assert format_meter(100000, 1000, 13, unit_scale=True, |
| unit='iB') == "100kiB [00:13, 7.69kiB/s]" |
| assert format_meter(100, 1000, 12, ncols=0, |
| rate=7.33) == " 10% 100/1000 [00:12<02:02, 7.33it/s]" |
| # ncols is small, l_bar is too large |
| # l_bar gets chopped |
| # no bar |
| # no r_bar |
| # 10/12 stars since ncols is 10 |
| assert format_meter( |
| 0, 1000, 13, ncols=10, |
| bar_format="************{bar:10}$$$$$$$$$$") == "**********" |
| # n_cols allows for l_bar and some of bar |
| # l_bar displays |
| # bar gets chopped |
| # no r_bar |
| # all 12 stars and 8/10 bar parts |
| assert format_meter( |
| 0, 1000, 13, ncols=20, |
| bar_format="************{bar:10}$$$$$$$$$$") == "************ " |
| # n_cols allows for l_bar, bar, and some of r_bar |
| # l_bar displays |
| # bar displays |
| # r_bar gets chopped |
| # all 12 stars and 10 bar parts, but only 8/10 dollar signs |
| assert format_meter( |
| 0, 1000, 13, ncols=30, |
| bar_format="************{bar:10}$$$$$$$$$$") == "************ $$$$$$$$" |
| # trim left ANSI; escape is before trim zone |
| # we only know it has ANSI codes, so we append an END code anyway |
| assert format_meter( |
| 0, 1000, 13, ncols=10, bar_format="*****\033[22m****\033[0m***{bar:10}$$$$$$$$$$" |
| ) == "*****\033[22m****\033[0m*\033[0m" |
| # trim left ANSI; escape is at trim zone |
| assert format_meter( |
| 0, 1000, 13, ncols=10, |
| bar_format="*****\033[22m*****\033[0m**{bar:10}$$$$$$$$$$") == "*****\033[22m*****\033[0m" |
| # trim left ANSI; escape is after trim zone |
| assert format_meter( |
| 0, 1000, 13, ncols=10, |
| bar_format="*****\033[22m******\033[0m*{bar:10}$$$$$$$$$$") == "*****\033[22m*****\033[0m" |
| # Check that bar_format correctly adapts {bar} size to the rest |
| assert format_meter( |
| 20, 100, 12, ncols=13, rate=8.1, |
| bar_format=r'{l_bar}{bar}|{n_fmt}/{total_fmt}') == " 20%|" + unich(0x258f) + "|20/100" |
| assert format_meter( |
| 20, 100, 12, ncols=14, rate=8.1, |
| bar_format=r'{l_bar}{bar}|{n_fmt}/{total_fmt}') == " 20%|" + unich(0x258d) + " |20/100" |
| # Check wide characters |
| if sys.version_info >= (3,): |
| assert format_meter(0, 1000, 13, ncols=68, prefix='fullwidth: ') == ( |
| "fullwidth: 0%| | 0/1000 [00:13<?, ?it/s]") |
| assert format_meter(0, 1000, 13, ncols=68, prefix='ニッポン [ニッポン]: ') == ( |
| "ニッポン [ニッポン]: 0%| | 0/1000 [00:13<?, ?it/s]") |
| # Check that bar_format can print only {bar} or just one side |
| assert format_meter(20, 100, 12, ncols=2, rate=8.1, |
| bar_format=r'{bar}') == unich(0x258d) + " " |
| assert format_meter(20, 100, 12, ncols=7, rate=8.1, |
| bar_format=r'{l_bar}{bar}') == " 20%|" + unich(0x258d) + " " |
| assert format_meter(20, 100, 12, ncols=6, rate=8.1, |
| bar_format=r'{bar}|test') == unich(0x258f) + "|test" |
| |
| |
| def test_ansi_escape_codes(): |
| """Test stripping of ANSI escape codes""" |
| ansi = {'BOLD': '\033[1m', 'RED': '\033[91m', 'END': '\033[0m'} |
| desc_raw = '{BOLD}{RED}Colored{END} description' |
| ncols = 123 |
| |
| desc_stripped = desc_raw.format(BOLD='', RED='', END='') |
| meter = tqdm.format_meter(0, 100, 0, ncols=ncols, prefix=desc_stripped) |
| assert len(meter) == ncols |
| |
| desc = desc_raw.format(**ansi) |
| meter = tqdm.format_meter(0, 100, 0, ncols=ncols, prefix=desc) |
| # `format_meter` inserts an extra END for safety |
| ansi_len = len(desc) - len(desc_stripped) + len(ansi['END']) |
| assert len(meter) == ncols + ansi_len |
| |
| |
| def test_si_format(): |
| """Test SI unit prefixes""" |
| format_meter = tqdm.format_meter |
| |
| assert '9.00 ' in format_meter(1, 9, 1, unit_scale=True, unit='B') |
| assert '99.0 ' in format_meter(1, 99, 1, unit_scale=True) |
| assert '999 ' in format_meter(1, 999, 1, unit_scale=True) |
| assert '9.99k ' in format_meter(1, 9994, 1, unit_scale=True) |
| assert '10.0k ' in format_meter(1, 9999, 1, unit_scale=True) |
| assert '99.5k ' in format_meter(1, 99499, 1, unit_scale=True) |
| assert '100k ' in format_meter(1, 99999, 1, unit_scale=True) |
| assert '1.00M ' in format_meter(1, 999999, 1, unit_scale=True) |
| assert '1.00G ' in format_meter(1, 999999999, 1, unit_scale=True) |
| assert '1.00T ' in format_meter(1, 999999999999, 1, unit_scale=True) |
| assert '1.00P ' in format_meter(1, 999999999999999, 1, unit_scale=True) |
| assert '1.00E ' in format_meter(1, 999999999999999999, 1, unit_scale=True) |
| assert '1.00Z ' in format_meter(1, 999999999999999999999, 1, unit_scale=True) |
| assert '1.0Y ' in format_meter(1, 999999999999999999999999, 1, unit_scale=True) |
| assert '10.0Y ' in format_meter(1, 9999999999999999999999999, 1, unit_scale=True) |
| assert '100.0Y ' in format_meter(1, 99999999999999999999999999, 1, unit_scale=True) |
| assert '1000.0Y ' in format_meter(1, 999999999999999999999999999, 1, |
| unit_scale=True) |
| |
| |
| def test_bar_formatspec(): |
| """Test Bar.__format__ spec""" |
| assert "{0:5a}".format(Bar(0.3)) == "#5 " |
| assert "{0:2}".format(Bar(0.5, charset=" .oO0")) == "0 " |
| assert "{0:2a}".format(Bar(0.5, charset=" .oO0")) == "# " |
| assert "{0:-6a}".format(Bar(0.5, 10)) == '## ' |
| assert "{0:2b}".format(Bar(0.5, 10)) == ' ' |
| |
| |
| def test_all_defaults(): |
| """Test default kwargs""" |
| with closing(UnicodeIO()) as our_file: |
| with tqdm(range(10), file=our_file) as progressbar: |
| assert len(progressbar) == 10 |
| for _ in progressbar: |
| pass |
| # restore stdout/stderr output for `nosetest` interface |
| # try: |
| # sys.stderr.write('\x1b[A') |
| # except: |
| # pass |
| sys.stderr.write('\rTest default kwargs ... ') |
| |
| |
| class WriteTypeChecker(BytesIO): |
| """File-like to assert the expected type is written""" |
| def __init__(self, expected_type): |
| super(WriteTypeChecker, self).__init__() |
| self.expected_type = expected_type |
| |
| def write(self, s): |
| assert isinstance(s, self.expected_type) |
| |
| |
| def test_native_string_io_for_default_file(): |
| """Native strings written to unspecified files""" |
| stderr = sys.stderr |
| try: |
| sys.stderr = WriteTypeChecker(expected_type=type('')) |
| for _ in tqdm(range(3)): |
| pass |
| sys.stderr.encoding = None # py2 behaviour |
| for _ in tqdm(range(3)): |
| pass |
| finally: |
| sys.stderr = stderr |
| |
| |
| def test_unicode_string_io_for_specified_file(): |
| """Unicode strings written to specified files""" |
| for _ in tqdm(range(3), file=WriteTypeChecker(expected_type=type(u''))): |
| pass |
| |
| |
| def test_write_bytes(): |
| """Test write_bytes argument with and without `file`""" |
| # specified file (and bytes) |
| for _ in tqdm(range(3), file=WriteTypeChecker(expected_type=type(b'')), |
| write_bytes=True): |
| pass |
| # unspecified file (and unicode) |
| stderr = sys.stderr |
| try: |
| sys.stderr = WriteTypeChecker(expected_type=type(u'')) |
| for _ in tqdm(range(3), write_bytes=False): |
| pass |
| finally: |
| sys.stderr = stderr |
| |
| |
| def test_iterate_over_csv_rows(): |
| """Test csv iterator""" |
| # Create a test csv pseudo file |
| with closing(StringIO()) as test_csv_file: |
| writer = csv.writer(test_csv_file) |
| for _ in _range(3): |
| writer.writerow(['test'] * 3) |
| test_csv_file.seek(0) |
| |
| # Test that nothing fails if we iterate over rows |
| reader = csv.DictReader(test_csv_file, fieldnames=('row1', 'row2', 'row3')) |
| with closing(StringIO()) as our_file: |
| for _ in tqdm(reader, file=our_file): |
| pass |
| |
| |
| def test_file_output(): |
| """Test output to arbitrary file-like objects""" |
| with closing(StringIO()) as our_file: |
| for i in tqdm(_range(3), file=our_file): |
| if i == 1: |
| our_file.seek(0) |
| assert '0/3' in our_file.read() |
| |
| |
| def test_leave_option(): |
| """Test `leave=True` always prints info about the last iteration""" |
| with closing(StringIO()) as our_file: |
| for _ in tqdm(_range(3), file=our_file, leave=True): |
| pass |
| res = our_file.getvalue() |
| assert '| 3/3 ' in res |
| assert '\n' == res[-1] # not '\r' |
| |
| with closing(StringIO()) as our_file2: |
| for _ in tqdm(_range(3), file=our_file2, leave=False): |
| pass |
| assert '| 3/3 ' not in our_file2.getvalue() |
| |
| |
| def test_trange(): |
| """Test trange""" |
| with closing(StringIO()) as our_file: |
| for _ in trange(3, file=our_file, leave=True): |
| pass |
| assert '| 3/3 ' in our_file.getvalue() |
| |
| with closing(StringIO()) as our_file2: |
| for _ in trange(3, file=our_file2, leave=False): |
| pass |
| assert '| 3/3 ' not in our_file2.getvalue() |
| |
| |
| def test_min_interval(): |
| """Test mininterval""" |
| with closing(StringIO()) as our_file: |
| for _ in tqdm(_range(3), file=our_file, mininterval=1e-10): |
| pass |
| assert " 0%| | 0/3 [00:00<" in our_file.getvalue() |
| |
| |
| def test_max_interval(): |
| """Test maxinterval""" |
| total = 100 |
| bigstep = 10 |
| smallstep = 5 |
| |
| # Test without maxinterval |
| timer = DiscreteTimer() |
| with closing(StringIO()) as our_file: |
| with closing(StringIO()) as our_file2: |
| # with maxinterval but higher than loop sleep time |
| t = tqdm(total=total, file=our_file, miniters=None, mininterval=0, |
| smoothing=1, maxinterval=1e-2) |
| cpu_timify(t, timer) |
| |
| # without maxinterval |
| t2 = tqdm(total=total, file=our_file2, miniters=None, mininterval=0, |
| smoothing=1, maxinterval=None) |
| cpu_timify(t2, timer) |
| |
| assert t.dynamic_miniters |
| assert t2.dynamic_miniters |
| |
| # Increase 10 iterations at once |
| t.update(bigstep) |
| t2.update(bigstep) |
| # The next iterations should not trigger maxinterval (step 10) |
| for _ in _range(4): |
| t.update(smallstep) |
| t2.update(smallstep) |
| timer.sleep(1e-5) |
| t.close() # because PyPy doesn't gc immediately |
| t2.close() # as above |
| |
| assert "25%" not in our_file2.getvalue() |
| assert "25%" not in our_file.getvalue() |
| |
| # Test with maxinterval effect |
| timer = DiscreteTimer() |
| with closing(StringIO()) as our_file: |
| with tqdm(total=total, file=our_file, miniters=None, mininterval=0, |
| smoothing=1, maxinterval=1e-4) as t: |
| cpu_timify(t, timer) |
| |
| # Increase 10 iterations at once |
| t.update(bigstep) |
| # The next iterations should trigger maxinterval (step 5) |
| for _ in _range(4): |
| t.update(smallstep) |
| timer.sleep(1e-2) |
| |
| assert "25%" in our_file.getvalue() |
| |
| # Test iteration based tqdm with maxinterval effect |
| timer = DiscreteTimer() |
| with closing(StringIO()) as our_file: |
| with tqdm(_range(total), file=our_file, miniters=None, |
| mininterval=1e-5, smoothing=1, maxinterval=1e-4) as t2: |
| cpu_timify(t2, timer) |
| |
| for i in t2: |
| if i >= (bigstep - 1) and ((i - (bigstep - 1)) % smallstep) == 0: |
| timer.sleep(1e-2) |
| if i >= 3 * bigstep: |
| break |
| |
| assert "15%" in our_file.getvalue() |
| |
| # Test different behavior with and without mininterval |
| timer = DiscreteTimer() |
| total = 1000 |
| mininterval = 0.1 |
| maxinterval = 10 |
| with closing(StringIO()) as our_file: |
| with tqdm(total=total, file=our_file, miniters=None, smoothing=1, |
| mininterval=mininterval, maxinterval=maxinterval) as tm1: |
| with tqdm(total=total, file=our_file, miniters=None, smoothing=1, |
| mininterval=0, maxinterval=maxinterval) as tm2: |
| |
| cpu_timify(tm1, timer) |
| cpu_timify(tm2, timer) |
| |
| # Fast iterations, check if dynamic_miniters triggers |
| timer.sleep(mininterval) # to force update for t1 |
| tm1.update(total / 2) |
| tm2.update(total / 2) |
| assert int(tm1.miniters) == tm2.miniters == total / 2 |
| |
| # Slow iterations, check different miniters if mininterval |
| timer.sleep(maxinterval * 2) |
| tm1.update(total / 2) |
| tm2.update(total / 2) |
| res = [tm1.miniters, tm2.miniters] |
| assert res == [(total / 2) * mininterval / (maxinterval * 2), |
| (total / 2) * maxinterval / (maxinterval * 2)] |
| |
| # Same with iterable based tqdm |
| timer1 = DiscreteTimer() # need 2 timers for each bar because zip not work |
| timer2 = DiscreteTimer() |
| total = 100 |
| mininterval = 0.1 |
| maxinterval = 10 |
| with closing(StringIO()) as our_file: |
| t1 = tqdm(_range(total), file=our_file, miniters=None, smoothing=1, |
| mininterval=mininterval, maxinterval=maxinterval) |
| t2 = tqdm(_range(total), file=our_file, miniters=None, smoothing=1, |
| mininterval=0, maxinterval=maxinterval) |
| |
| cpu_timify(t1, timer1) |
| cpu_timify(t2, timer2) |
| |
| for i in t1: |
| if i == ((total / 2) - 2): |
| timer1.sleep(mininterval) |
| if i == (total - 1): |
| timer1.sleep(maxinterval * 2) |
| |
| for i in t2: |
| if i == ((total / 2) - 2): |
| timer2.sleep(mininterval) |
| if i == (total - 1): |
| timer2.sleep(maxinterval * 2) |
| |
| assert t1.miniters == 0.255 |
| assert t2.miniters == 0.5 |
| |
| t1.close() |
| t2.close() |
| |
| |
| def test_delay(): |
| """Test delay""" |
| timer = DiscreteTimer() |
| with closing(StringIO()) as our_file: |
| t = tqdm(total=2, file=our_file, leave=True, delay=3) |
| cpu_timify(t, timer) |
| timer.sleep(2) |
| t.update(1) |
| assert not our_file.getvalue() |
| timer.sleep(2) |
| t.update(1) |
| assert our_file.getvalue() |
| t.close() |
| |
| |
| def test_min_iters(): |
| """Test miniters""" |
| with closing(StringIO()) as our_file: |
| for _ in tqdm(_range(3), file=our_file, leave=True, mininterval=0, miniters=2): |
| pass |
| |
| out = our_file.getvalue() |
| assert '| 0/3 ' in out |
| assert '| 1/3 ' not in out |
| assert '| 2/3 ' in out |
| assert '| 3/3 ' in out |
| |
| with closing(StringIO()) as our_file: |
| for _ in tqdm(_range(3), file=our_file, leave=True, mininterval=0, miniters=1): |
| pass |
| |
| out = our_file.getvalue() |
| assert '| 0/3 ' in out |
| assert '| 1/3 ' in out |
| assert '| 2/3 ' in out |
| assert '| 3/3 ' in out |
| |
| |
| def test_dynamic_min_iters(): |
| """Test purely dynamic miniters (and manual updates and __del__)""" |
| with closing(StringIO()) as our_file: |
| total = 10 |
| t = tqdm(total=total, file=our_file, miniters=None, mininterval=0, smoothing=1) |
| |
| t.update() |
| # Increase 3 iterations |
| t.update(3) |
| # The next two iterations should be skipped because of dynamic_miniters |
| t.update() |
| t.update() |
| # The third iteration should be displayed |
| t.update() |
| |
| out = our_file.getvalue() |
| assert t.dynamic_miniters |
| t.__del__() # simulate immediate del gc |
| |
| assert ' 0%| | 0/10 [00:00<' in out |
| assert '40%' in out |
| assert '50%' not in out |
| assert '60%' not in out |
| assert '70%' in out |
| |
| # Check with smoothing=0, miniters should be set to max update seen so far |
| with closing(StringIO()) as our_file: |
| total = 10 |
| t = tqdm(total=total, file=our_file, miniters=None, mininterval=0, smoothing=0) |
| |
| t.update() |
| t.update(2) |
| t.update(5) # this should be stored as miniters |
| t.update(1) |
| |
| out = our_file.getvalue() |
| assert all(i in out for i in ("0/10", "1/10", "3/10")) |
| assert "2/10" not in out |
| assert t.dynamic_miniters and not t.smoothing |
| assert t.miniters == 5 |
| t.close() |
| |
| # Check iterable based tqdm |
| with closing(StringIO()) as our_file: |
| t = tqdm(_range(10), file=our_file, miniters=None, mininterval=None, |
| smoothing=0.5) |
| for _ in t: |
| pass |
| assert t.dynamic_miniters |
| |
| # No smoothing |
| with closing(StringIO()) as our_file: |
| t = tqdm(_range(10), file=our_file, miniters=None, mininterval=None, |
| smoothing=0) |
| for _ in t: |
| pass |
| assert t.dynamic_miniters |
| |
| # No dynamic_miniters (miniters is fixed manually) |
| with closing(StringIO()) as our_file: |
| t = tqdm(_range(10), file=our_file, miniters=1, mininterval=None) |
| for _ in t: |
| pass |
| assert not t.dynamic_miniters |
| |
| |
| def test_big_min_interval(): |
| """Test large mininterval""" |
| with closing(StringIO()) as our_file: |
| for _ in tqdm(_range(2), file=our_file, mininterval=1E10): |
| pass |
| assert '50%' not in our_file.getvalue() |
| |
| with closing(StringIO()) as our_file: |
| with tqdm(_range(2), file=our_file, mininterval=1E10) as t: |
| t.update() |
| t.update() |
| assert '50%' not in our_file.getvalue() |
| |
| |
| def test_smoothed_dynamic_min_iters(): |
| """Test smoothed dynamic miniters""" |
| timer = DiscreteTimer() |
| |
| with closing(StringIO()) as our_file: |
| with tqdm(total=100, file=our_file, miniters=None, mininterval=1, |
| smoothing=0.5, maxinterval=0) as t: |
| cpu_timify(t, timer) |
| |
| # Increase 10 iterations at once |
| timer.sleep(1) |
| t.update(10) |
| # The next iterations should be partially skipped |
| for _ in _range(2): |
| timer.sleep(1) |
| t.update(4) |
| for _ in _range(20): |
| timer.sleep(1) |
| t.update() |
| |
| assert t.dynamic_miniters |
| out = our_file.getvalue() |
| assert ' 0%| | 0/100 [00:00<' in out |
| assert '20%' in out |
| assert '23%' not in out |
| assert '25%' in out |
| assert '26%' not in out |
| assert '28%' in out |
| |
| |
| def test_smoothed_dynamic_min_iters_with_min_interval(): |
| """Test smoothed dynamic miniters with mininterval""" |
| timer = DiscreteTimer() |
| |
| # In this test, `miniters` should gradually decline |
| total = 100 |
| |
| with closing(StringIO()) as our_file: |
| # Test manual updating tqdm |
| with tqdm(total=total, file=our_file, miniters=None, mininterval=1e-3, |
| smoothing=1, maxinterval=0) as t: |
| cpu_timify(t, timer) |
| |
| t.update(10) |
| timer.sleep(1e-2) |
| for _ in _range(4): |
| t.update() |
| timer.sleep(1e-2) |
| out = our_file.getvalue() |
| assert t.dynamic_miniters |
| |
| with closing(StringIO()) as our_file: |
| # Test iteration-based tqdm |
| with tqdm(_range(total), file=our_file, miniters=None, |
| mininterval=0.01, smoothing=1, maxinterval=0) as t2: |
| cpu_timify(t2, timer) |
| |
| for i in t2: |
| if i >= 10: |
| timer.sleep(0.1) |
| if i >= 14: |
| break |
| out2 = our_file.getvalue() |
| |
| assert t.dynamic_miniters |
| assert ' 0%| | 0/100 [00:00<' in out |
| assert '11%' in out and '11%' in out2 |
| # assert '12%' not in out and '12%' in out2 |
| assert '13%' in out and '13%' in out2 |
| assert '14%' in out and '14%' in out2 |
| |
| |
| @mark.slow |
| def test_rlock_creation(): |
| """Test that importing tqdm does not create multiprocessing objects.""" |
| mp = importorskip('multiprocessing') |
| if not hasattr(mp, 'get_context'): |
| skip("missing multiprocessing.get_context") |
| |
| # Use 'spawn' instead of 'fork' so that the process does not inherit any |
| # globals that have been constructed by running other tests |
| ctx = mp.get_context('spawn') |
| with ctx.Pool(1) as pool: |
| # The pool will propagate the error if the target method fails |
| pool.apply(_rlock_creation_target) |
| |
| |
| def _rlock_creation_target(): |
| """Check that the RLock has not been constructed.""" |
| import multiprocessing as mp |
| patch = importorskip('unittest.mock').patch |
| |
| # Patch the RLock class/method but use the original implementation |
| with patch('multiprocessing.RLock', wraps=mp.RLock) as rlock_mock: |
| # Importing the module should not create a lock |
| from tqdm import tqdm |
| assert rlock_mock.call_count == 0 |
| # Creating a progress bar should initialize the lock |
| with closing(StringIO()) as our_file: |
| with tqdm(file=our_file) as _: # NOQA |
| pass |
| assert rlock_mock.call_count == 1 |
| # Creating a progress bar again should reuse the lock |
| with closing(StringIO()) as our_file: |
| with tqdm(file=our_file) as _: # NOQA |
| pass |
| assert rlock_mock.call_count == 1 |
| |
| |
| def test_disable(): |
| """Test disable""" |
| with closing(StringIO()) as our_file: |
| for _ in tqdm(_range(3), file=our_file, disable=True): |
| pass |
| assert our_file.getvalue() == '' |
| |
| with closing(StringIO()) as our_file: |
| progressbar = tqdm(total=3, file=our_file, miniters=1, disable=True) |
| progressbar.update(3) |
| progressbar.close() |
| assert our_file.getvalue() == '' |
| |
| |
| def test_infinite_total(): |
| """Test treatment of infinite total""" |
| with closing(StringIO()) as our_file: |
| for _ in tqdm(_range(3), file=our_file, total=float("inf")): |
| pass |
| |
| |
| def test_nototal(): |
| """Test unknown total length""" |
| with closing(StringIO()) as our_file: |
| for _ in tqdm(iter(range(10)), file=our_file, unit_scale=10): |
| pass |
| assert "100it" in our_file.getvalue() |
| |
| with closing(StringIO()) as our_file: |
| for _ in tqdm(iter(range(10)), file=our_file, |
| bar_format="{l_bar}{bar}{r_bar}"): |
| pass |
| assert "10/?" in our_file.getvalue() |
| |
| |
| def test_unit(): |
| """Test SI unit prefix""" |
| with closing(StringIO()) as our_file: |
| for _ in tqdm(_range(3), file=our_file, miniters=1, unit="bytes"): |
| pass |
| assert 'bytes/s' in our_file.getvalue() |
| |
| |
| def test_ascii(): |
| """Test ascii/unicode bar""" |
| # Test ascii autodetection |
| with closing(StringIO()) as our_file: |
| with tqdm(total=10, file=our_file, ascii=None) as t: |
| assert t.ascii # TODO: this may fail in the future |
| |
| # Test ascii bar |
| with closing(StringIO()) as our_file: |
| for _ in tqdm(_range(3), total=15, file=our_file, miniters=1, |
| mininterval=0, ascii=True): |
| pass |
| res = our_file.getvalue().strip("\r").split("\r") |
| assert '7%|6' in res[1] |
| assert '13%|#3' in res[2] |
| assert '20%|##' in res[3] |
| |
| # Test unicode bar |
| with closing(UnicodeIO()) as our_file: |
| with tqdm(total=15, file=our_file, ascii=False, mininterval=0) as t: |
| for _ in _range(3): |
| t.update() |
| res = our_file.getvalue().strip("\r").split("\r") |
| assert u"7%|\u258b" in res[1] |
| assert u"13%|\u2588\u258e" in res[2] |
| assert u"20%|\u2588\u2588" in res[3] |
| |
| # Test custom bar |
| for bars in [" .oO0", " #"]: |
| with closing(StringIO()) as our_file: |
| for _ in tqdm(_range(len(bars) - 1), file=our_file, miniters=1, |
| mininterval=0, ascii=bars, ncols=27): |
| pass |
| res = our_file.getvalue().strip("\r").split("\r") |
| for b, line in zip(bars, res): |
| assert '|' + b + '|' in line |
| |
| |
| def test_update(): |
| """Test manual creation and updates""" |
| res = None |
| with closing(StringIO()) as our_file: |
| with tqdm(total=2, file=our_file, miniters=1, mininterval=0) as progressbar: |
| assert len(progressbar) == 2 |
| progressbar.update(2) |
| assert '| 2/2' in our_file.getvalue() |
| progressbar.desc = 'dynamically notify of 4 increments in total' |
| progressbar.total = 4 |
| progressbar.update(-1) |
| progressbar.update(2) |
| res = our_file.getvalue() |
| assert '| 3/4 ' in res |
| assert 'dynamically notify of 4 increments in total' in res |
| |
| |
| def test_close(): |
| """Test manual creation and closure and n_instances""" |
| |
| # With `leave` option |
| with closing(StringIO()) as our_file: |
| progressbar = tqdm(total=3, file=our_file, miniters=10) |
| progressbar.update(3) |
| assert '| 3/3 ' not in our_file.getvalue() # Should be blank |
| assert len(tqdm._instances) == 1 |
| progressbar.close() |
| assert len(tqdm._instances) == 0 |
| assert '| 3/3 ' in our_file.getvalue() |
| |
| # Without `leave` option |
| with closing(StringIO()) as our_file: |
| progressbar = tqdm(total=3, file=our_file, miniters=10, leave=False) |
| progressbar.update(3) |
| progressbar.close() |
| assert '| 3/3 ' not in our_file.getvalue() # Should be blank |
| |
| # With all updates |
| with closing(StringIO()) as our_file: |
| assert len(tqdm._instances) == 0 |
| with tqdm(total=3, file=our_file, miniters=0, mininterval=0, |
| leave=True) as progressbar: |
| assert len(tqdm._instances) == 1 |
| progressbar.update(3) |
| res = our_file.getvalue() |
| assert '| 3/3 ' in res # Should be blank |
| assert '\n' not in res |
| # close() called |
| assert len(tqdm._instances) == 0 |
| |
| exres = res.rsplit(', ', 1)[0] |
| res = our_file.getvalue() |
| assert res[-1] == '\n' |
| if not res.startswith(exres): |
| raise AssertionError("\n<<< Expected:\n{0}\n>>> Got:\n{1}\n===".format( |
| exres + ', ...it/s]\n', our_file.getvalue())) |
| |
| # Closing after the output stream has closed |
| with closing(StringIO()) as our_file: |
| t = tqdm(total=2, file=our_file) |
| t.update() |
| t.update() |
| t.close() |
| |
| |
| def test_ema(): |
| """Test exponential weighted average""" |
| ema = EMA(0.01) |
| assert round(ema(10), 2) == 10 |
| assert round(ema(1), 2) == 5.48 |
| assert round(ema(), 2) == 5.48 |
| assert round(ema(1), 2) == 3.97 |
| assert round(ema(1), 2) == 3.22 |
| |
| |
| def test_smoothing(): |
| """Test exponential weighted average smoothing""" |
| timer = DiscreteTimer() |
| |
| # -- Test disabling smoothing |
| with closing(StringIO()) as our_file: |
| with tqdm(_range(3), file=our_file, smoothing=None, leave=True) as t: |
| cpu_timify(t, timer) |
| |
| for _ in t: |
| pass |
| assert '| 3/3 ' in our_file.getvalue() |
| |
| # -- Test smoothing |
| # 1st case: no smoothing (only use average) |
| with closing(StringIO()) as our_file2: |
| with closing(StringIO()) as our_file: |
| t = tqdm(_range(3), file=our_file2, smoothing=None, leave=True, |
| miniters=1, mininterval=0) |
| cpu_timify(t, timer) |
| |
| with tqdm(_range(3), file=our_file, smoothing=None, leave=True, |
| miniters=1, mininterval=0) as t2: |
| cpu_timify(t2, timer) |
| |
| for i in t2: |
| # Sleep more for first iteration and |
| # see how quickly rate is updated |
| if i == 0: |
| timer.sleep(0.01) |
| else: |
| # Need to sleep in all iterations |
| # to calculate smoothed rate |
| # (else delta_t is 0!) |
| timer.sleep(0.001) |
| t.update() |
| n_old = len(tqdm._instances) |
| t.close() |
| assert len(tqdm._instances) == n_old - 1 |
| # Get result for iter-based bar |
| a = progressbar_rate(get_bar(our_file.getvalue(), 3)) |
| # Get result for manually updated bar |
| a2 = progressbar_rate(get_bar(our_file2.getvalue(), 3)) |
| |
| # 2nd case: use max smoothing (= instant rate) |
| with closing(StringIO()) as our_file2: |
| with closing(StringIO()) as our_file: |
| t = tqdm(_range(3), file=our_file2, smoothing=1, leave=True, |
| miniters=1, mininterval=0) |
| cpu_timify(t, timer) |
| |
| with tqdm(_range(3), file=our_file, smoothing=1, leave=True, |
| miniters=1, mininterval=0) as t2: |
| cpu_timify(t2, timer) |
| |
| for i in t2: |
| if i == 0: |
| timer.sleep(0.01) |
| else: |
| timer.sleep(0.001) |
| t.update() |
| t.close() |
| # Get result for iter-based bar |
| b = progressbar_rate(get_bar(our_file.getvalue(), 3)) |
| # Get result for manually updated bar |
| b2 = progressbar_rate(get_bar(our_file2.getvalue(), 3)) |
| |
| # 3rd case: use medium smoothing |
| with closing(StringIO()) as our_file2: |
| with closing(StringIO()) as our_file: |
| t = tqdm(_range(3), file=our_file2, smoothing=0.5, leave=True, |
| miniters=1, mininterval=0) |
| cpu_timify(t, timer) |
| |
| t2 = tqdm(_range(3), file=our_file, smoothing=0.5, leave=True, |
| miniters=1, mininterval=0) |
| cpu_timify(t2, timer) |
| |
| for i in t2: |
| if i == 0: |
| timer.sleep(0.01) |
| else: |
| timer.sleep(0.001) |
| t.update() |
| t2.close() |
| t.close() |
| # Get result for iter-based bar |
| c = progressbar_rate(get_bar(our_file.getvalue(), 3)) |
| # Get result for manually updated bar |
| c2 = progressbar_rate(get_bar(our_file2.getvalue(), 3)) |
| |
| # Check that medium smoothing's rate is between no and max smoothing rates |
| assert a <= c <= b |
| assert a2 <= c2 <= b2 |
| |
| |
| @mark.skipif(nt_and_no_colorama, reason="Windows without colorama") |
| def test_deprecated_nested(): |
| """Test nested progress bars""" |
| # TODO: test degradation on windows without colorama? |
| |
| # Artificially test nested loop printing |
| # Without leave |
| our_file = StringIO() |
| try: |
| tqdm(total=2, file=our_file, nested=True) |
| except TqdmDeprecationWarning: |
| if """`nested` is deprecated and automated. |
| Use `position` instead for manual control.""" not in our_file.getvalue(): |
| raise |
| else: |
| raise DeprecationError("Should not allow nested kwarg") |
| |
| |
| def test_bar_format(): |
| """Test custom bar formatting""" |
| with closing(StringIO()) as our_file: |
| bar_format = ('{l_bar}{bar}|{n_fmt}/{total_fmt}-{n}/{total}' |
| '{percentage}{rate}{rate_fmt}{elapsed}{remaining}') |
| for _ in trange(2, file=our_file, leave=True, bar_format=bar_format): |
| pass |
| out = our_file.getvalue() |
| assert "\r 0%| |0/2-0/20.0None?it/s00:00?\r" in out |
| |
| # Test unicode string auto conversion |
| with closing(StringIO()) as our_file: |
| bar_format = r'hello world' |
| with tqdm(ascii=False, bar_format=bar_format, file=our_file) as t: |
| assert isinstance(t.bar_format, _unicode) |
| |
| |
| def test_custom_format(): |
| """Test adding additional derived format arguments""" |
| class TqdmExtraFormat(tqdm): |
| """Provides a `total_time` format parameter""" |
| @property |
| def format_dict(self): |
| d = super(TqdmExtraFormat, self).format_dict |
| total_time = d["elapsed"] * (d["total"] or 0) / max(d["n"], 1) |
| d.update(total_time=self.format_interval(total_time) + " in total") |
| return d |
| |
| with closing(StringIO()) as our_file: |
| for _ in TqdmExtraFormat( |
| range(10), file=our_file, |
| bar_format="{total_time}: {percentage:.0f}%|{bar}{r_bar}"): |
| pass |
| assert "00:00 in total" in our_file.getvalue() |
| |
| |
| def test_eta(capsys): |
| """Test eta bar_format""" |
| from datetime import datetime as dt |
| for _ in trange(999, miniters=1, mininterval=0, leave=True, |
| bar_format='{l_bar}{eta:%Y-%m-%d}'): |
| pass |
| _, err = capsys.readouterr() |
| assert "\r100%|{eta:%Y-%m-%d}\n".format(eta=dt.now()) in err |
| |
| |
| def test_unpause(): |
| """Test unpause""" |
| timer = DiscreteTimer() |
| with closing(StringIO()) as our_file: |
| t = trange(10, file=our_file, leave=True, mininterval=0) |
| cpu_timify(t, timer) |
| timer.sleep(0.01) |
| t.update() |
| timer.sleep(0.01) |
| t.update() |
| timer.sleep(0.1) # longer wait time |
| t.unpause() |
| timer.sleep(0.01) |
| t.update() |
| timer.sleep(0.01) |
| t.update() |
| t.close() |
| r_before = progressbar_rate(get_bar(our_file.getvalue(), 2)) |
| r_after = progressbar_rate(get_bar(our_file.getvalue(), 3)) |
| assert r_before == r_after |
| |
| |
| def test_disabled_unpause(capsys): |
| """Test disabled unpause""" |
| with tqdm(total=10, disable=True) as t: |
| t.update() |
| t.unpause() |
| t.update() |
| print(t) |
| out, err = capsys.readouterr() |
| assert not err |
| assert out == ' 0%| | 0/10 [00:00<?, ?it/s]\n' |
| |
| |
| def test_reset(): |
| """Test resetting a bar for re-use""" |
| with closing(StringIO()) as our_file: |
| with tqdm(total=10, file=our_file, |
| miniters=1, mininterval=0, maxinterval=0) as t: |
| t.update(9) |
| t.reset() |
| t.update() |
| t.reset(total=12) |
| t.update(10) |
| assert '| 1/10' in our_file.getvalue() |
| assert '| 10/12' in our_file.getvalue() |
| |
| |
| def test_disabled_reset(capsys): |
| """Test disabled reset""" |
| with tqdm(total=10, disable=True) as t: |
| t.update(9) |
| t.reset() |
| t.update() |
| t.reset(total=12) |
| t.update(10) |
| print(t) |
| out, err = capsys.readouterr() |
| assert not err |
| assert out == ' 0%| | 0/12 [00:00<?, ?it/s]\n' |
| |
| |
| @mark.skipif(nt_and_no_colorama, reason="Windows without colorama") |
| def test_position(): |
| """Test positioned progress bars""" |
| # Artificially test nested loop printing |
| # Without leave |
| our_file = StringIO() |
| kwargs = {'file': our_file, 'miniters': 1, 'mininterval': 0, 'maxinterval': 0} |
| t = tqdm(total=2, desc='pos2 bar', leave=False, position=2, **kwargs) |
| t.update() |
| t.close() |
| out = our_file.getvalue() |
| res = [m[0] for m in RE_pos.findall(out)] |
| exres = ['\n\n\rpos2 bar: 0%', |
| '\n\n\rpos2 bar: 50%', |
| '\n\n\r '] |
| |
| pos_line_diff(res, exres) |
| |
| # Test iteration-based tqdm positioning |
| our_file = StringIO() |
| kwargs["file"] = our_file |
| for _ in trange(2, desc='pos0 bar', position=0, **kwargs): |
| for _ in trange(2, desc='pos1 bar', position=1, **kwargs): |
| for _ in trange(2, desc='pos2 bar', position=2, **kwargs): |
| pass |
| out = our_file.getvalue() |
| res = [m[0] for m in RE_pos.findall(out)] |
| exres = ['\rpos0 bar: 0%', |
| '\n\rpos1 bar: 0%', |
| '\n\n\rpos2 bar: 0%', |
| '\n\n\rpos2 bar: 50%', |
| '\n\n\rpos2 bar: 100%', |
| '\rpos2 bar: 100%', |
| '\n\n\rpos1 bar: 50%', |
| '\n\n\rpos2 bar: 0%', |
| '\n\n\rpos2 bar: 50%', |
| '\n\n\rpos2 bar: 100%', |
| '\rpos2 bar: 100%', |
| '\n\n\rpos1 bar: 100%', |
| '\rpos1 bar: 100%', |
| '\n\rpos0 bar: 50%', |
| '\n\rpos1 bar: 0%', |
| '\n\n\rpos2 bar: 0%', |
| '\n\n\rpos2 bar: 50%', |
| '\n\n\rpos2 bar: 100%', |
| '\rpos2 bar: 100%', |
| '\n\n\rpos1 bar: 50%', |
| '\n\n\rpos2 bar: 0%', |
| '\n\n\rpos2 bar: 50%', |
| '\n\n\rpos2 bar: 100%', |
| '\rpos2 bar: 100%', |
| '\n\n\rpos1 bar: 100%', |
| '\rpos1 bar: 100%', |
| '\n\rpos0 bar: 100%', |
| '\rpos0 bar: 100%', |
| '\n'] |
| pos_line_diff(res, exres) |
| |
| # Test manual tqdm positioning |
| our_file = StringIO() |
| kwargs["file"] = our_file |
| kwargs["total"] = 2 |
| t1 = tqdm(desc='pos0 bar', position=0, **kwargs) |
| t2 = tqdm(desc='pos1 bar', position=1, **kwargs) |
| t3 = tqdm(desc='pos2 bar', position=2, **kwargs) |
| for _ in _range(2): |
| t1.update() |
| t3.update() |
| t2.update() |
| out = our_file.getvalue() |
| res = [m[0] for m in RE_pos.findall(out)] |
| exres = ['\rpos0 bar: 0%', |
| '\n\rpos1 bar: 0%', |
| '\n\n\rpos2 bar: 0%', |
| '\rpos0 bar: 50%', |
| '\n\n\rpos2 bar: 50%', |
| '\n\rpos1 bar: 50%', |
| '\rpos0 bar: 100%', |
| '\n\n\rpos2 bar: 100%', |
| '\n\rpos1 bar: 100%'] |
| pos_line_diff(res, exres) |
| t1.close() |
| t2.close() |
| t3.close() |
| |
| # Test auto repositioning of bars when a bar is prematurely closed |
| # tqdm._instances.clear() # reset number of instances |
| with closing(StringIO()) as our_file: |
| t1 = tqdm(total=10, file=our_file, desc='1.pos0 bar', mininterval=0) |
| t2 = tqdm(total=10, file=our_file, desc='2.pos1 bar', mininterval=0) |
| t3 = tqdm(total=10, file=our_file, desc='3.pos2 bar', mininterval=0) |
| res = [m[0] for m in RE_pos.findall(our_file.getvalue())] |
| exres = ['\r1.pos0 bar: 0%', |
| '\n\r2.pos1 bar: 0%', |
| '\n\n\r3.pos2 bar: 0%'] |
| pos_line_diff(res, exres) |
| |
| t2.close() |
| t4 = tqdm(total=10, file=our_file, desc='4.pos2 bar', mininterval=0) |
| t1.update(1) |
| t3.update(1) |
| t4.update(1) |
| res = [m[0] for m in RE_pos.findall(our_file.getvalue())] |
| exres = ['\r1.pos0 bar: 0%', |
| '\n\r2.pos1 bar: 0%', |
| '\n\n\r3.pos2 bar: 0%', |
| '\r2.pos1 bar: 0%', |
| '\n\n\r4.pos2 bar: 0%', |
| '\r1.pos0 bar: 10%', |
| '\n\n\r3.pos2 bar: 10%', |
| '\n\r4.pos2 bar: 10%'] |
| pos_line_diff(res, exres) |
| t4.close() |
| t3.close() |
| t1.close() |
| |
| |
| def test_set_description(): |
| """Test set description""" |
| with closing(StringIO()) as our_file: |
| with tqdm(desc='Hello', file=our_file) as t: |
| assert t.desc == 'Hello' |
| t.set_description_str('World') |
| assert t.desc == 'World' |
| t.set_description() |
| assert t.desc == '' |
| t.set_description('Bye') |
| assert t.desc == 'Bye: ' |
| assert "World" in our_file.getvalue() |
| |
| # without refresh |
| with closing(StringIO()) as our_file: |
| with tqdm(desc='Hello', file=our_file) as t: |
| assert t.desc == 'Hello' |
| t.set_description_str('World', False) |
| assert t.desc == 'World' |
| t.set_description(None, False) |
| assert t.desc == '' |
| assert "World" not in our_file.getvalue() |
| |
| # unicode |
| with closing(StringIO()) as our_file: |
| with tqdm(total=10, file=our_file) as t: |
| t.set_description(u"\xe1\xe9\xed\xf3\xfa") |
| |
| |
| def test_deprecated_gui(): |
| """Test internal GUI properties""" |
| # Check: StatusPrinter iff gui is disabled |
| with closing(StringIO()) as our_file: |
| t = tqdm(total=2, gui=True, file=our_file, miniters=1, mininterval=0) |
| assert not hasattr(t, "sp") |
| try: |
| t.update(1) |
| except TqdmDeprecationWarning as e: |
| if ( |
| 'Please use `tqdm.gui.tqdm(...)` instead of `tqdm(..., gui=True)`' |
| not in our_file.getvalue() |
| ): |
| raise e |
| else: |
| raise DeprecationError('Should not allow manual gui=True without' |
| ' overriding __iter__() and update()') |
| finally: |
| t._instances.clear() |
| # t.close() |
| # len(tqdm._instances) += 1 # undo the close() decrement |
| |
| t = tqdm(_range(3), gui=True, file=our_file, miniters=1, mininterval=0) |
| try: |
| for _ in t: |
| pass |
| except TqdmDeprecationWarning as e: |
| if ( |
| 'Please use `tqdm.gui.tqdm(...)` instead of `tqdm(..., gui=True)`' |
| not in our_file.getvalue() |
| ): |
| raise e |
| else: |
| raise DeprecationError('Should not allow manual gui=True without' |
| ' overriding __iter__() and update()') |
| finally: |
| t._instances.clear() |
| # t.close() |
| # len(tqdm._instances) += 1 # undo the close() decrement |
| |
| with tqdm(total=1, gui=False, file=our_file) as t: |
| assert hasattr(t, "sp") |
| |
| |
| def test_cmp(): |
| """Test comparison functions""" |
| with closing(StringIO()) as our_file: |
| t0 = tqdm(total=10, file=our_file) |
| t1 = tqdm(total=10, file=our_file) |
| t2 = tqdm(total=10, file=our_file) |
| |
| assert t0 < t1 |
| assert t2 >= t0 |
| assert t0 <= t2 |
| |
| t3 = tqdm(total=10, file=our_file) |
| t4 = tqdm(total=10, file=our_file) |
| t5 = tqdm(total=10, file=our_file) |
| t5.close() |
| t6 = tqdm(total=10, file=our_file) |
| |
| assert t3 != t4 |
| assert t3 > t2 |
| assert t5 == t6 |
| t6.close() |
| t4.close() |
| t3.close() |
| t2.close() |
| t1.close() |
| t0.close() |
| |
| |
| def test_repr(): |
| """Test representation""" |
| with closing(StringIO()) as our_file: |
| with tqdm(total=10, ascii=True, file=our_file) as t: |
| assert str(t) == ' 0%| | 0/10 [00:00<?, ?it/s]' |
| |
| |
| def test_clear(): |
| """Test clearing bar display""" |
| with closing(StringIO()) as our_file: |
| t1 = tqdm(total=10, file=our_file, desc='pos0 bar', bar_format='{l_bar}') |
| t2 = trange(10, file=our_file, desc='pos1 bar', bar_format='{l_bar}') |
| before = squash_ctrlchars(our_file.getvalue()) |
| t2.clear() |
| t1.clear() |
| after = squash_ctrlchars(our_file.getvalue()) |
| t1.close() |
| t2.close() |
| assert before == ['pos0 bar: 0%|', 'pos1 bar: 0%|'] |
| assert after == ['', ''] |
| |
| |
| def test_clear_disabled(): |
| """Test disabled clear""" |
| with closing(StringIO()) as our_file: |
| with tqdm(total=10, file=our_file, desc='pos0 bar', disable=True, |
| bar_format='{l_bar}') as t: |
| t.clear() |
| assert our_file.getvalue() == '' |
| |
| |
| def test_refresh(): |
| """Test refresh bar display""" |
| with closing(StringIO()) as our_file: |
| t1 = tqdm(total=10, file=our_file, desc='pos0 bar', |
| bar_format='{l_bar}', mininterval=999, miniters=999) |
| t2 = tqdm(total=10, file=our_file, desc='pos1 bar', |
| bar_format='{l_bar}', mininterval=999, miniters=999) |
| t1.update() |
| t2.update() |
| before = squash_ctrlchars(our_file.getvalue()) |
| t1.refresh() |
| t2.refresh() |
| after = squash_ctrlchars(our_file.getvalue()) |
| t1.close() |
| t2.close() |
| |
| # Check that refreshing indeed forced the display to use realtime state |
| assert before == [u'pos0 bar: 0%|', u'pos1 bar: 0%|'] |
| assert after == [u'pos0 bar: 10%|', u'pos1 bar: 10%|'] |
| |
| |
| def test_disabled_repr(capsys): |
| """Test disabled repr""" |
| with tqdm(total=10, disable=True) as t: |
| str(t) |
| t.update() |
| print(t) |
| out, err = capsys.readouterr() |
| assert not err |
| assert out == ' 0%| | 0/10 [00:00<?, ?it/s]\n' |
| |
| |
| def test_disabled_refresh(): |
| """Test disabled refresh""" |
| with closing(StringIO()) as our_file: |
| with tqdm(total=10, file=our_file, desc='pos0 bar', disable=True, |
| bar_format='{l_bar}', mininterval=999, miniters=999) as t: |
| t.update() |
| t.refresh() |
| |
| assert our_file.getvalue() == '' |
| |
| |
| def test_write(): |
| """Test write messages""" |
| s = "Hello world" |
| with closing(StringIO()) as our_file: |
| # Change format to keep only left part w/o bar and it/s rate |
| t1 = tqdm(total=10, file=our_file, desc='pos0 bar', |
| bar_format='{l_bar}', mininterval=0, miniters=1) |
| t2 = trange(10, file=our_file, desc='pos1 bar', bar_format='{l_bar}', |
| mininterval=0, miniters=1) |
| t3 = tqdm(total=10, file=our_file, desc='pos2 bar', |
| bar_format='{l_bar}', mininterval=0, miniters=1) |
| t1.update() |
| t2.update() |
| t3.update() |
| before = our_file.getvalue() |
| |
| # Write msg and see if bars are correctly redrawn below the msg |
| t1.write(s, file=our_file) # call as an instance method |
| tqdm.write(s, file=our_file) # call as a class method |
| after = our_file.getvalue() |
| |
| t1.close() |
| t2.close() |
| t3.close() |
| |
| before_squashed = squash_ctrlchars(before) |
| after_squashed = squash_ctrlchars(after) |
| |
| assert after_squashed == [s, s] + before_squashed |
| |
| # Check that no bar clearing if different file |
| with closing(StringIO()) as our_file_bar: |
| with closing(StringIO()) as our_file_write: |
| t1 = tqdm(total=10, file=our_file_bar, desc='pos0 bar', |
| bar_format='{l_bar}', mininterval=0, miniters=1) |
| |
| t1.update() |
| before_bar = our_file_bar.getvalue() |
| |
| tqdm.write(s, file=our_file_write) |
| |
| after_bar = our_file_bar.getvalue() |
| t1.close() |
| |
| assert before_bar == after_bar |
| |
| # Test stdout/stderr anti-mixup strategy |
| # Backup stdout/stderr |
| stde = sys.stderr |
| stdo = sys.stdout |
| # Mock stdout/stderr |
| with closing(StringIO()) as our_stderr: |
| with closing(StringIO()) as our_stdout: |
| sys.stderr = our_stderr |
| sys.stdout = our_stdout |
| t1 = tqdm(total=10, file=sys.stderr, desc='pos0 bar', |
| bar_format='{l_bar}', mininterval=0, miniters=1) |
| |
| t1.update() |
| before_err = sys.stderr.getvalue() |
| before_out = sys.stdout.getvalue() |
| |
| tqdm.write(s, file=sys.stdout) |
| after_err = sys.stderr.getvalue() |
| after_out = sys.stdout.getvalue() |
| |
| t1.close() |
| |
| assert before_err == '\rpos0 bar: 0%|\rpos0 bar: 10%|' |
| assert before_out == '' |
| after_err_res = [m[0] for m in RE_pos.findall(after_err)] |
| exres = ['\rpos0 bar: 0%|', |
| '\rpos0 bar: 10%|', |
| '\r ', |
| '\r\rpos0 bar: 10%|'] |
| pos_line_diff(after_err_res, exres) |
| assert after_out == s + '\n' |
| # Restore stdout and stderr |
| sys.stderr = stde |
| sys.stdout = stdo |
| |
| |
| def test_len(): |
| """Test advance len (numpy array shape)""" |
| np = importorskip('numpy') |
| with closing(StringIO()) as f: |
| with tqdm(np.zeros((3, 4)), file=f) as t: |
| assert len(t) == 3 |
| |
| |
| def test_autodisable_disable(): |
| """Test autodisable will disable on non-TTY""" |
| with closing(StringIO()) as our_file: |
| with tqdm(total=10, disable=None, file=our_file) as t: |
| t.update(3) |
| assert our_file.getvalue() == '' |
| |
| |
| def test_autodisable_enable(): |
| """Test autodisable will not disable on TTY""" |
| with closing(StringIO()) as our_file: |
| our_file.isatty = lambda: True |
| with tqdm(total=10, disable=None, file=our_file) as t: |
| t.update() |
| assert our_file.getvalue() != '' |
| |
| |
| def test_deprecation_exception(): |
| def test_TqdmDeprecationWarning(): |
| with closing(StringIO()) as our_file: |
| raise (TqdmDeprecationWarning('Test!', fp_write=getattr( |
| our_file, 'write', sys.stderr.write))) |
| |
| def test_TqdmDeprecationWarning_nofpwrite(): |
| raise TqdmDeprecationWarning('Test!', fp_write=None) |
| |
| raises(TqdmDeprecationWarning, test_TqdmDeprecationWarning) |
| raises(Exception, test_TqdmDeprecationWarning_nofpwrite) |
| |
| |
| def test_postfix(): |
| """Test postfix""" |
| postfix = {'float': 0.321034, 'gen': 543, 'str': 'h', 'lst': [2]} |
| postfix_order = (('w', 'w'), ('a', 0)) # no need for OrderedDict |
| expected = ['float=0.321', 'gen=543', 'lst=[2]', 'str=h'] |
| expected_order = ['w=w', 'a=0', 'float=0.321', 'gen=543', 'lst=[2]', 'str=h'] |
| |
| # Test postfix set at init |
| with closing(StringIO()) as our_file: |
| with tqdm(total=10, file=our_file, desc='pos0 bar', |
| bar_format='{r_bar}', postfix=postfix) as t1: |
| t1.refresh() |
| out = our_file.getvalue() |
| |
| # Test postfix set after init |
| with closing(StringIO()) as our_file: |
| with trange(10, file=our_file, desc='pos1 bar', bar_format='{r_bar}', |
| postfix=None) as t2: |
| t2.set_postfix(**postfix) |
| t2.refresh() |
| out2 = our_file.getvalue() |
| |
| # Order of items in dict may change, so need a loop to check per item |
| for res in expected: |
| assert res in out |
| assert res in out2 |
| |
| # Test postfix (with ordered dict and no refresh) set after init |
| with closing(StringIO()) as our_file: |
| with trange(10, file=our_file, desc='pos2 bar', bar_format='{r_bar}', |
| postfix=None) as t3: |
| t3.set_postfix(postfix_order, False, **postfix) |
| t3.refresh() # explicit external refresh |
| out3 = our_file.getvalue() |
| |
| out3 = out3[1:-1].split(', ')[3:] |
| assert out3 == expected_order |
| |
| # Test postfix (with ordered dict and refresh) set after init |
| with closing(StringIO()) as our_file: |
| with trange(10, file=our_file, desc='pos2 bar', |
| bar_format='{r_bar}', postfix=None) as t4: |
| t4.set_postfix(postfix_order, True, **postfix) |
| t4.refresh() # double refresh |
| out4 = our_file.getvalue() |
| |
| assert out4.count('\r') > out3.count('\r') |
| assert out4.count(", ".join(expected_order)) == 2 |
| |
| # Test setting postfix string directly |
| with closing(StringIO()) as our_file: |
| with trange(10, file=our_file, desc='pos2 bar', bar_format='{r_bar}', |
| postfix=None) as t5: |
| t5.set_postfix_str("Hello", False) |
| t5.set_postfix_str("World") |
| out5 = our_file.getvalue() |
| |
| assert "Hello" not in out5 |
| out5 = out5[1:-1].split(', ')[3:] |
| assert out5 == ["World"] |
| |
| |
| def test_postfix_direct(): |
| """Test directly assigning non-str objects to postfix""" |
| with closing(StringIO()) as our_file: |
| with tqdm(total=10, file=our_file, miniters=1, mininterval=0, |
| bar_format="{postfix[0][name]} {postfix[1]:>5.2f}", |
| postfix=[{'name': "foo"}, 42]) as t: |
| for i in range(10): |
| if i % 2: |
| t.postfix[0]["name"] = "abcdefghij"[i] |
| else: |
| t.postfix[1] = i |
| t.update() |
| res = our_file.getvalue() |
| assert "f 6.00" in res |
| assert "h 6.00" in res |
| assert "h 8.00" in res |
| assert "j 8.00" in res |
| |
| |
| @contextmanager |
| def std_out_err_redirect_tqdm(tqdm_file=sys.stderr): |
| orig_out_err = sys.stdout, sys.stderr |
| try: |
| sys.stdout = sys.stderr = DummyTqdmFile(tqdm_file) |
| yield orig_out_err[0] |
| # Relay exceptions |
| except Exception as exc: |
| raise exc |
| # Always restore sys.stdout/err if necessary |
| finally: |
| sys.stdout, sys.stderr = orig_out_err |
| |
| |
| def test_file_redirection(): |
| """Test redirection of output""" |
| with closing(StringIO()) as our_file: |
| # Redirect stdout to tqdm.write() |
| with std_out_err_redirect_tqdm(tqdm_file=our_file): |
| with tqdm(total=3) as pbar: |
| print("Such fun") |
| pbar.update(1) |
| print("Such", "fun") |
| pbar.update(1) |
| print("Such ", end="") |
| print("fun") |
| pbar.update(1) |
| res = our_file.getvalue() |
| assert res.count("Such fun\n") == 3 |
| assert "0/3" in res |
| assert "3/3" in res |
| |
| |
| def test_external_write(): |
| """Test external write mode""" |
| with closing(StringIO()) as our_file: |
| # Redirect stdout to tqdm.write() |
| for _ in trange(3, file=our_file): |
| del tqdm._lock # classmethod should be able to recreate lock |
| with tqdm.external_write_mode(file=our_file): |
| our_file.write("Such fun\n") |
| res = our_file.getvalue() |
| assert res.count("Such fun\n") == 3 |
| assert "0/3" in res |
| assert "3/3" in res |
| |
| |
| def test_unit_scale(): |
| """Test numeric `unit_scale`""" |
| with closing(StringIO()) as our_file: |
| for _ in tqdm(_range(9), unit_scale=9, file=our_file, |
| miniters=1, mininterval=0): |
| pass |
| out = our_file.getvalue() |
| assert '81/81' in out |
| |
| |
| def patch_lock(thread=True): |
| """decorator replacing tqdm's lock with vanilla threading/multiprocessing""" |
| try: |
| if thread: |
| from threading import RLock |
| else: |
| from multiprocessing import RLock |
| lock = RLock() |
| except (ImportError, OSError) as err: |
| skip(str(err)) |
| |
| def outer(func): |
| """actual decorator""" |
| @wraps(func) |
| def inner(*args, **kwargs): |
| """set & reset lock even if exceptions occur""" |
| default_lock = tqdm.get_lock() |
| try: |
| tqdm.set_lock(lock) |
| return func(*args, **kwargs) |
| finally: |
| tqdm.set_lock(default_lock) |
| return inner |
| return outer |
| |
| |
| @patch_lock(thread=False) |
| def test_threading(): |
| """Test multiprocess/thread-realted features""" |
| pass # TODO: test interleaved output #445 |
| |
| |
| def test_bool(): |
| """Test boolean cast""" |
| def internal(our_file, disable): |
| kwargs = {'file': our_file, 'disable': disable} |
| with trange(10, **kwargs) as t: |
| assert t |
| with trange(0, **kwargs) as t: |
| assert not t |
| with tqdm(total=10, **kwargs) as t: |
| assert bool(t) |
| with tqdm(total=0, **kwargs) as t: |
| assert not bool(t) |
| with tqdm([], **kwargs) as t: |
| assert not t |
| with tqdm([0], **kwargs) as t: |
| assert t |
| with tqdm(iter([]), **kwargs) as t: |
| assert t |
| with tqdm(iter([1, 2, 3]), **kwargs) as t: |
| assert t |
| with tqdm(**kwargs) as t: |
| try: |
| print(bool(t)) |
| except TypeError: |
| pass |
| else: |
| raise TypeError("Expected bool(tqdm()) to fail") |
| |
| # test with and without disable |
| with closing(StringIO()) as our_file: |
| internal(our_file, False) |
| internal(our_file, True) |
| |
| |
| def backendCheck(module): |
| """Test tqdm-like module fallback""" |
| tn = module.tqdm |
| tr = module.trange |
| |
| with closing(StringIO()) as our_file: |
| with tn(total=10, file=our_file) as t: |
| assert len(t) == 10 |
| with tr(1337) as t: |
| assert len(t) == 1337 |
| |
| |
| def test_auto(): |
| """Test auto fallback""" |
| from tqdm import auto, autonotebook |
| backendCheck(autonotebook) |
| backendCheck(auto) |
| |
| |
| def test_wrapattr(): |
| """Test wrapping file-like objects""" |
| data = "a twenty-char string" |
| |
| with closing(StringIO()) as our_file: |
| with closing(StringIO()) as writer: |
| with tqdm.wrapattr(writer, "write", file=our_file, bytes=True) as wrap: |
| wrap.write(data) |
| res = writer.getvalue() |
| assert data == res |
| res = our_file.getvalue() |
| assert '%.1fB [' % len(data) in res |
| |
| with closing(StringIO()) as our_file: |
| with closing(StringIO()) as writer: |
| with tqdm.wrapattr(writer, "write", file=our_file, bytes=False) as wrap: |
| wrap.write(data) |
| res = our_file.getvalue() |
| assert '%dit [' % len(data) in res |
| |
| |
| def test_float_progress(): |
| """Test float totals""" |
| with closing(StringIO()) as our_file: |
| with trange(10, total=9.6, file=our_file) as t: |
| with catch_warnings(record=True) as w: |
| simplefilter("always", category=TqdmWarning) |
| for i in t: |
| if i < 9: |
| assert not w |
| assert w |
| assert "clamping frac" in str(w[-1].message) |
| |
| |
| def test_screen_shape(): |
| """Test screen shape""" |
| # ncols |
| with closing(StringIO()) as our_file: |
| with trange(10, file=our_file, ncols=50) as t: |
| list(t) |
| |
| res = our_file.getvalue() |
| assert all(len(i) == 50 for i in get_bar(res)) |
| |
| # no second/third bar, leave=False |
| with closing(StringIO()) as our_file: |
| kwargs = {'file': our_file, 'ncols': 50, 'nrows': 2, 'miniters': 0, |
| 'mininterval': 0, 'leave': False} |
| with trange(10, desc="one", **kwargs) as t1: |
| with trange(10, desc="two", **kwargs) as t2: |
| with trange(10, desc="three", **kwargs) as t3: |
| list(t3) |
| list(t2) |
| list(t1) |
| |
| res = our_file.getvalue() |
| assert "one" in res |
| assert "two" not in res |
| assert "three" not in res |
| assert "\n\n" not in res |
| assert "more hidden" in res |
| # double-check ncols |
| assert all(len(i) == 50 for i in get_bar(res) |
| if i.strip() and "more hidden" not in i) |
| |
| # all bars, leave=True |
| with closing(StringIO()) as our_file: |
| kwargs = {'file': our_file, 'ncols': 50, 'nrows': 2, |
| 'miniters': 0, 'mininterval': 0} |
| with trange(10, desc="one", **kwargs) as t1: |
| with trange(10, desc="two", **kwargs) as t2: |
| assert "two" not in our_file.getvalue() |
| with trange(10, desc="three", **kwargs) as t3: |
| assert "three" not in our_file.getvalue() |
| list(t3) |
| list(t2) |
| list(t1) |
| |
| res = our_file.getvalue() |
| assert "one" in res |
| assert "two" in res |
| assert "three" in res |
| assert "\n\n" not in res |
| assert "more hidden" in res |
| # double-check ncols |
| assert all(len(i) == 50 for i in get_bar(res) |
| if i.strip() and "more hidden" not in i) |
| |
| # second bar becomes first, leave=False |
| with closing(StringIO()) as our_file: |
| kwargs = {'file': our_file, 'ncols': 50, 'nrows': 2, 'miniters': 0, |
| 'mininterval': 0, 'leave': False} |
| t1 = tqdm(total=10, desc="one", **kwargs) |
| with tqdm(total=10, desc="two", **kwargs) as t2: |
| t1.update() |
| t2.update() |
| t1.close() |
| res = our_file.getvalue() |
| assert "one" in res |
| assert "two" not in res |
| assert "more hidden" in res |
| t2.update() |
| |
| res = our_file.getvalue() |
| assert "two" in res |
| |
| |
| def test_initial(): |
| """Test `initial`""" |
| with closing(StringIO()) as our_file: |
| for _ in tqdm(_range(9), initial=10, total=19, file=our_file, |
| miniters=1, mininterval=0): |
| pass |
| out = our_file.getvalue() |
| assert '10/19' in out |
| assert '19/19' in out |
| |
| |
| def test_colour(): |
| """Test `colour`""" |
| with closing(StringIO()) as our_file: |
| for _ in tqdm(_range(9), file=our_file, colour="#beefed"): |
| pass |
| out = our_file.getvalue() |
| assert '\x1b[38;2;%d;%d;%dm' % (0xbe, 0xef, 0xed) in out |
| |
| with catch_warnings(record=True) as w: |
| simplefilter("always", category=TqdmWarning) |
| with tqdm(total=1, file=our_file, colour="charm") as t: |
| assert w |
| t.update() |
| assert "Unknown colour" in str(w[-1].message) |
| |
| with closing(StringIO()) as our_file2: |
| for _ in tqdm(_range(9), file=our_file2, colour="blue"): |
| pass |
| out = our_file2.getvalue() |
| assert '\x1b[34m' in out |
| |
| |
| def test_closed(): |
| """Test writing to closed file""" |
| with closing(StringIO()) as our_file: |
| for i in trange(9, file=our_file, miniters=1, mininterval=0): |
| if i == 5: |
| our_file.close() |
| |
| |
| def test_reversed(capsys): |
| """Test reversed()""" |
| for _ in reversed(tqdm(_range(9))): |
| pass |
| out, err = capsys.readouterr() |
| assert not out |
| assert ' 0%' in err |
| assert '100%' in err |
| |
| |
| def test_contains(capsys): |
| """Test __contains__ doesn't iterate""" |
| with tqdm(list(range(9))) as t: |
| assert 9 not in t |
| assert all(i in t for i in _range(9)) |
| out, err = capsys.readouterr() |
| assert not out |
| assert ' 0%' in err |
| assert '100%' not in err |