In programmazione parallela, i problemi “Embarrassingly parallel” richiedono poco o nessuno sforzo per essere suddivisi in sottoproblemi da risolvere con task paralleli.
Un esempio di problema Embarrassingly parallel nel mondo reale, è dover tagliare delle verdure ad esempio. Abbiamo N verdure, le distribuiamo ad N persone e in un minuto avremo tutte le verdure tagliate.
Vediamo quindi come il componente della libreria standard disponibile da Python 3.2+ chiamato concurrent.futures, ci permette di parallelizzare una funzione e risolvere in una manciata di righe problemi Embarassingly parallel.
In caso stessimo usando la versione 2.7 di Python, possiamo installare il package futures con il comando pip:
$ pip install futures
executor.map()
Per problemi CPU intensive, è consigliato utilizzare la classe ProcessPoolExecutor; mentre per problemi che riguardano operazioni di rete o I/O la ThreadPoolExecutor.
Come suggeriscono i nomi, il ProcessPoolExecutor utilizza il modulo di multiprocessing che non è affetto dal Global Interpreter Lock (GIL), ma vuol dire anche che oggetti picklable possono essere eseguiti e ritornati.
In Python 3.5, la funzione map() riceve un argomento speciale chiamato chunksize. Per iterables molto lunghi, usare un chunksize grande può migliorare significativamente le performance, rispetto all’uso del valore di default a 1. Usando il ThreadPoolExecutor, chunksize non ha effetto.
Vediamo quindi un esempio di utilizzo della funzione map:
from concurrent.futures import ThreadPoolExecutor
import time
import requests
def fetch(a):
url = 'http://httpbin.org/get?a={0}'.format(a)
r = requests.get(url)
result = r.json()['args']
return result
start = time.time()
# if max_workers is None or not given, it will default to the number of processors, multiplied by 5
with ThreadPoolExecutor(max_workers=None) as executor:
for result in executor.map(fetch, range(30)):
print('response: {0}'.format(result))
print('time: {0}'.format(time.time() - start))
Sul mio computer, usare un ThreadPoolExecutor ha richiesto 1.8 secondi, mentre col ProcessPoolExecutor
7.3 secondi.
ref:
Python: A quick introduction to the concurrent.futures module
executor.submit() and as_completed()
La funzione executor.submit() ritorna un oggetto Future. Un future in pratica è un oggetto che incapsula una esecuzione asincrona di una funzione che finirà (o lancerà un’eccezione) nel futuro.
La differenza principale fra la map e as_completed è che la map ritorna i risultati nell’ordine in cui gli vengono passati dall’iterable. D’altra parte, il primo risultati dalla funzione as_completed è dalla prima future ad essere completata. In parole povere, iterare sul risultato di una map() ritorna i risultati dei futures, mentre iterare su una as_completed(futures) ritorna i futures stessi.
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
def fetch(url, timeout):
r = requests.get(url, timeout=timeout)
data = r.json()['args']
return data
with ThreadPoolExecutor(max_workers=10) as executor:
futures = {}
for i in range(42):
url = 'https://httpbin.org/get?i={0}'.format(i)
future = executor.submit(fetch, url, 60)
futures[future] = url
for future in as_completed(futures):
url = futures[future]
try:
data = future.result()
except Exception as exc:
print(exc)
else:
print('fetch {0}, get {1}'.format(url, data))
Per approfondire
- Modulo concurrent.futures: https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures
- Oggetti futures: https://docs.python.org/3/library/concurrent.futures.html#future-objects
- Fonte: https://vinta.ws/code/parallel-tasks-in-python-concurrent-futures.html