Flatten outputs for chained boolean operations where possible

Add special-case handling to flatten the data structure when chaining
multiple AND/OR together; for example, make f_and(f_and(x, y), z)
work approximately in the same way as f_and(x, y, z).

The goal here is to avoid a stack overflow when using a pattern which
can chain a large number of futures. Previously, an overflow could start
to happen if the chain was a few hundred futures deep, since each
future's completion callback would set the result on the next future
which would invoke another completion callback and so on throughout the
chain.

Fixes #320
3 files changed
tree: 0fa1b87b1943185c0f3fc9cb368b4cd0515292fb
  1. .circleci/
  2. .github/
  3. docs/
  4. examples/
  5. more_executors/
  6. scripts/
  7. tests/
  8. .gitignore
  9. .pre-commit-config.yaml
  10. .pylintrc
  11. .travis.yml
  12. CHANGELOG.md
  13. docs-requirements.in
  14. docs-requirements.txt
  15. LICENSE
  16. Makefile
  17. MANIFEST.in
  18. mypy.ini
  19. pidiff-requirements.in
  20. pidiff-requirements.txt
  21. README.md
  22. requirements.in
  23. requirements.txt
  24. setup.cfg
  25. setup.py
  26. test-requirements.in
  27. test-requirements.txt
  28. tox.ini
README.md

more-executors

A library of composable Python executors and futures.

Build Status Maintainability Test Coverage

This library is intended for use with the concurrent.futures module. It includes a collection of Executor implementations in order to extend the behavior of Future objects.

Features

  • Futures with implicit retry
  • Futures with implicit cancel on executor shutdown
  • Futures with implicit cancel after timeout
  • Futures with transformed output values (sync & async)
  • Futures resolved by a caller-provided polling function
  • Throttle the number of futures running at once
  • Synchronous executor
  • Bridge concurrent.futures with asyncio
  • Convenience API for creating executors
  • Instrumented with Prometheus

See the API documentation for detailed information on usage.

Example

This example combines the map and retry executors to create futures for HTTP requests running concurrently, decoding JSON responses within the future and retrying on error.

import requests
from concurrent.futures import as_completed
from more_executors import Executors


def get_json(response):
    response.raise_for_status()
    return (response.url, response.json())


def fetch_urls(urls):
    # Configure an executor:
    # - run up to 4 requests concurrently, in separate threads
    # - run get_json on each response
    # - retry up to several minutes on any errors
    executor = Executors.\
        thread_pool(max_workers=4).\
        with_map(get_json).\
        with_retry()

    # Submit requests for each given URL
    futures = [executor.submit(requests.get, url)
               for url in urls]

    # Futures API works as normal; we can block on the completed
    # futures and map/retry happens implicitly
    for future in as_completed(futures):
        (url, data) = future.result()
        do_something(url, data)

Development

virtualenv and pip may be used to locally install this project from source:

virtualenv ~/dev/python
. ~/dev/python/bin/activate

git clone https://github.com/rohanpm/more-executors
cd more-executors

pip install --editable .

Autotests may be run with pytest:

pip install -r test-requirements.txt
py.test

Submit pull requests against https://github.com/rohanpm/more-executors.

License

This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.