In programmazione parallela, i problemi “Embarrassingly parallel” richiedono poco o nessuno sforzo per essere suddivisi in sottoproblemi da risolvere con task paralleli.

Un esempio di problema Embarrassingly parallel nel mondo reale, è dover tagliare delle verdure ad esempio. Abbiamo N verdure, le distribuiamo ad N persone e in un minuto avremo tutte le verdure tagliate.

Vediamo quindi come il componente della libreria standard disponibile da Python 3.2+ chiamato concurrent.futures, ci permette di parallelizzare una funzione e risolvere in una manciata di righe problemi Embarassingly parallel.

In caso stessimo usando la versione 2.7 di Python, possiamo installare il package futures con il comando pip:

$ pip install futures

executor.map()

Per problemi CPU intensive, è consigliato utilizzare la classe ProcessPoolExecutor; mentre per problemi che riguardano operazioni di rete o I/O la ThreadPoolExecutor.

Come suggeriscono i nomi, il ProcessPoolExecutor utilizza il modulo di multiprocessing che non è affetto dal Global Interpreter Lock (GIL), ma vuol dire anche che oggetti picklable possono essere eseguiti e ritornati.

In Python 3.5, la funzione map() riceve un argomento speciale chiamato chunksize. Per iterables molto lunghi, usare un chunksize grande può migliorare significativamente le performance, rispetto all’uso del valore di default a 1. Usando il ThreadPoolExecutor, chunksize non ha effetto.

Vediamo quindi un esempio di utilizzo della funzione map:

from concurrent.futures import ThreadPoolExecutor
import time

import requests

def fetch(a):
    url = 'http://httpbin.org/get?a={0}'.format(a)
    r = requests.get(url)
    result = r.json()['args']
    return result

start = time.time()

# if max_workers is None or not given, it will default to the number of processors, multiplied by 5
with ThreadPoolExecutor(max_workers=None) as executor:
    for result in executor.map(fetch, range(30)):
        print('response: {0}'.format(result))

print('time: {0}'.format(time.time() - start))

Sul mio computer, usare un ThreadPoolExecutor ha richiesto 1.8 secondi, mentre col ProcessPoolExecutor
7.3 secondi.
ref:

Python 3 Concurrency – The concurrent.futures Module

Python: A quick introduction to the concurrent.futures module

executor.submit() and as_completed()

La funzione executor.submit() ritorna un oggetto Future. Un future in pratica è un oggetto che incapsula una esecuzione asincrona di una funzione che finirà (o lancerà un’eccezione) nel futuro.

La differenza principale fra la map e as_completed è che la map ritorna i risultati nell’ordine in cui gli vengono passati dall’iterable. D’altra parte, il primo risultati dalla funzione as_completed è dalla prima future ad essere completata. In parole povere, iterare sul risultato di una map() ritorna i risultati dei futures, mentre iterare su una as_completed(futures) ritorna i futures stessi.

from concurrent.futures import ThreadPoolExecutor, as_completed

import requests

def fetch(url, timeout):
    r = requests.get(url, timeout=timeout)
    data = r.json()['args']
    return data

with ThreadPoolExecutor(max_workers=10) as executor:
    futures = {}
    for i in range(42):
        url = 'https://httpbin.org/get?i={0}'.format(i)
        future = executor.submit(fetch, url, 60)
        futures[future] = url

    for future in as_completed(futures):
        url = futures[future]
        try:
            data = future.result()
        except Exception as exc:
            print(exc)
        else:
            print('fetch {0}, get {1}'.format(url, data))

Per approfondire

  • Modulo concurrent.futures: https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures
  • Oggetti futures: https://docs.python.org/3/library/concurrent.futures.html#future-objects
  • Fonte: https://vinta.ws/code/parallel-tasks-in-python-concurrent-futures.html

Parallel tasks in Python: concurrent.futures