In programmazione parallela, i problemi “Embarrassingly parallel” richiedono poco o nessuno sforzo per essere suddivisi in sottoproblemi da risolvere con task paralleli.
Un esempio di problema Embarrassingly parallel nel mondo reale, è dover tagliare delle verdure ad esempio. Abbiamo N verdure, le distribuiamo ad N persone e in un minuto avremo tutte le verdure tagliate.
Vediamo quindi come il componente della libreria standard disponibile da Python 3.2+ chiamato concurrent.futures, ci permette di parallelizzare una funzione e risolvere in una manciata di righe problemi Embarassingly parallel.
In caso stessimo usando la versione 2.7 di Python, possiamo installare il package futures con il comando pip:
$ pip install futures
executor.map()
Per problemi CPU intensive, è consigliato utilizzare la classe ProcessPoolExecutor; mentre per problemi che riguardano operazioni di rete o I/O la ThreadPoolExecutor.
Come suggeriscono i nomi, il ProcessPoolExecutor utilizza il modulo di multiprocessing che non è affetto dal Global Interpreter Lock (GIL), ma vuol dire anche che oggetti picklable possono essere eseguiti e ritornati.
In Python 3.5, la funzione map()
riceve un argomento speciale chiamato chunksize. Per iterables molto lunghi, usare un chunksize grande può migliorare significativamente le performance, rispetto all’uso del valore di default a 1. Usando il ThreadPoolExecutor, chunksize non ha effetto.
Vediamo quindi un esempio di utilizzo della funzione map:
from concurrent.futures import ThreadPoolExecutor import time import requests def fetch(a): url = 'http://httpbin.org/get?a={0}'.format(a) r = requests.get(url) result = r.json()['args'] return result start = time.time() # if max_workers is None or not given, it will default to the number of processors, multiplied by 5 with ThreadPoolExecutor(max_workers=None) as executor: for result in executor.map(fetch, range(30)): print('response: {0}'.format(result)) print('time: {0}'.format(time.time() - start))
Sul mio computer, usare un ThreadPoolExecutor
ha richiesto 1.8 secondi, mentre col ProcessPoolExecutor
7.3 secondi.
ref:
Python: A quick introduction to the concurrent.futures module
executor.submit() and as_completed()
La funzione executor.submit()
ritorna un oggetto Future
. Un future in pratica è un oggetto che incapsula una esecuzione asincrona di una funzione che finirà (o lancerà un’eccezione) nel futuro.
La differenza principale fra la map
e as_completed
è che la map
ritorna i risultati nell’ordine in cui gli vengono passati dall’iterable. D’altra parte, il primo risultati dalla funzione as_completed
è dalla prima future ad essere completata. In parole povere, iterare sul risultato di una map()
ritorna i risultati dei futures
, mentre iterare su una as_completed(futures)
ritorna i futures
stessi.
from concurrent.futures import ThreadPoolExecutor, as_completed import requests def fetch(url, timeout): r = requests.get(url, timeout=timeout) data = r.json()['args'] return data with ThreadPoolExecutor(max_workers=10) as executor: futures = {} for i in range(42): url = 'https://httpbin.org/get?i={0}'.format(i) future = executor.submit(fetch, url, 60) futures[future] = url for future in as_completed(futures): url = futures[future] try: data = future.result() except Exception as exc: print(exc) else: print('fetch {0}, get {1}'.format(url, data))
Per approfondire
- Modulo concurrent.futures: https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures
- Oggetti futures: https://docs.python.org/3/library/concurrent.futures.html#future-objects
- Fonte: https://vinta.ws/code/parallel-tasks-in-python-concurrent-futures.html