() translation by (you can also view the original English article)
Python è uno dei più popolari linguaggi per l'elaborazione dei dati e l'analisi dei dati in generale. L'ecosistema fornisce molte librerie e framework che facilitano i calcoli ad alte prestazioni. Tuttavia, fare programmazione parallela con Python può risultare abbastanza insidioso.
In questo tutorial, andremo a studiare perché il parallelismo è difficile specialmente nel contesto di Python, e per fare questo, passeremo attraverso i seguenti:
- Perché il parallelismo è insidioso in Python (indizio: è a causa del GIL—lock globale dell'interprete).
- I thread vs. i processi: Modi diversi di raggiungere il parallelismo. Quando usare uno rispetto all'altro?
- Parallela vs. concorrente: perché in alcuni casi possiamo accontentarci della concorrenza piuttosto che del parallelismo.
- Costruire un semplice ma pratico esempio usando le varie tecniche discusse.
Il lock globale dell'interprete
Il lock globale dell'interprete (GIL) è uno degli argomenti più controversi nel mondo Python. In CPython, l'implementazione più popolare di Python, il GIL è un mutex che crea cose thread-safe. Il GIL lo rende semplice da integrare con le librerie esterne che non sono thread-safe e rende il codice non-parallelo più veloce. Tuttavia, ciò ha un costo. A causa di GIL, non possiamo realizzare il parallelismo vero tramite il multithreading. In sostanza, due thread nativi diversi dello stesso processo non possono eseguire il codice Python subito.
Le cose non sono così gravi, tuttavia, ed ecco perché: le cose che succedono fuori dal regno di GIL sono libere di essere parallele. In questa categoria ricadono i task lunghissimi come I/O e, per fortuna, le librerie come numpy
.
I Thread vs. i processi
Quindi Python non è veramente multithread. Ma cos'è un thread? Facciamo un passo indietro e guardiamo le cose in prospettiva.
Un processo un'operazione del sistema operativo di base. È un programma che si trova in esecuzione—in altre parole, il codice in esecuzione. I processi multipli sono sempre eseguiti in un computer e sono lanciati in parallelo.
Un processo può avere thread multipli. Essi eseguono lo stesso codice appartenente la processo genitore. Idealmente, si eseguono in parallelo, ma non necessariamente. La ragione per cui i processi non sono abbastanza è perché le applicazioni hanno bisogno di essere reattive e fare attenzione alle azioni dell'utente mentre aggiorna il monitor e salva un file.
Se è ancora poco chiaro, ecco un bigino:
PROCESSI | THREAD |
---|---|
I processi non condividono la memoria | I thread condividono la memoria |
I processi avviati/cambiati sono costosi | I thread avviati/cambiati sono meno costosi |
I processi richiedono molte risorse | I thread richiedono poche risorse(qualche volta sono chiamati processi leggeri) |
Non occorre sincronizzazione di memoria | Avete bisogno di usare meccanismi di sincronizzazione per essere sicuri che stiate gestendo correttamente i dati |
Non c'è una ricetta che soddisfa tutto. Scegliere una è subordinata molto dal contesto e dal compito che state cercando di raggiungere.
Parallela vs. concorrente
Adesso andremo un passo più avanti e ci immergeremo nella concorrenza. La concorrenza è spesso fraintesa e scambiata per il parallelismo. Non è questo il caso. La concorrenza implica che il codice indipendente dalla pianificazione sia eseguito in maniera cooperativa. Approfittate del fatto che un pezzo di codice sta aspettando le operazioni I/O e durante questo tempo esegue una parte del codice diversa ma indipendente.
In Python, possiamo ottenere un comportamento concorrente leggero tramite greenlet. Dalla prospettiva del parallelismo, usare i thread o greenlet è equivalente perché nessuno di essi si esegue in parallelo. I greenlet sono anche meno costosi da creare dei thread. Perciò, i greenlet sono fortemente usati per compiere un enorme numero di semplici funzioni I/O, come quella che di solito troviamo nel networking e i server web.
Adesso che sappiamo la differenza tra i thread e i processi, parallelo e concorrente, possiamo illustrare come funzioni diverse si svolgono su due paradigmi. Ecco che cosa andremo a fare: eseguiremo, molteplici volte, una funzione fuori dal GIL e una all'interno. Le lanceremo in serie, usando i thread e usando i processi. Definiamo le funzioni:
1 |
import os |
2 |
import time |
3 |
import threading |
4 |
import multiprocessing |
5 |
|
6 |
NUM_WORKERS = 4 |
7 |
|
8 |
def only_sleep(): |
9 |
""" Do nothing, wait for a timer to expire """
|
10 |
print("PID: %s, Process Name: %s, Thread Name: %s" % ( |
11 |
os.getpid(), |
12 |
multiprocessing.current_process().name, |
13 |
threading.current_thread().name) |
14 |
)
|
15 |
time.sleep(1) |
16 |
|
17 |
|
18 |
def crunch_numbers(): |
19 |
""" Do some computations """
|
20 |
print("PID: %s, Process Name: %s, Thread Name: %s" % ( |
21 |
os.getpid(), |
22 |
multiprocessing.current_process().name, |
23 |
threading.current_thread().name) |
24 |
)
|
25 |
x = 0 |
26 |
while x < 10000000: |
27 |
x += 1 |
Abbiamo creato due funzioni. Entrambe sono lunghissime, ma solo crunch_numbers
compie attivamente dei calcoli. Lanciamo only_sleep
in serie, in multithread e usando processi multipli e confrontiamo i risultati:
1 |
## Run tasks serially
|
2 |
start_time = time.time() |
3 |
for _ in range(NUM_WORKERS): |
4 |
only_sleep() |
5 |
end_time = time.time() |
6 |
|
7 |
print("Serial time=", end_time - start_time) |
8 |
|
9 |
# Run tasks using threads
|
10 |
start_time = time.time() |
11 |
threads = [threading.Thread(target=only_sleep) for _ in range(NUM_WORKERS)] |
12 |
[thread.start() for thread in threads] |
13 |
[thread.join() for thread in threads] |
14 |
end_time = time.time() |
15 |
|
16 |
print("Threads time=", end_time - start_time) |
17 |
|
18 |
# Run tasks using processes
|
19 |
start_time = time.time() |
20 |
processes = [multiprocessing.Process(target=only_sleep()) for _ in range(NUM_WORKERS)] |
21 |
[process.start() for process in processes] |
22 |
[process.join() for process in processes] |
23 |
end_time = time.time() |
24 |
|
25 |
print("Parallel time=", end_time - start_time) |
Ecco il risultato che ho ottenuto (il vostro dovrebbe essere simile, sebbene i PID e il tempo varieranno un poco):
1 |
PID: 95726, Process Name: MainProcess, Thread Name: MainThread |
2 |
PID: 95726, Process Name: MainProcess, Thread Name: MainThread |
3 |
PID: 95726, Process Name: MainProcess, Thread Name: MainThread |
4 |
PID: 95726, Process Name: MainProcess, Thread Name: MainThread |
5 |
Serial time= 4.018089056015015 |
6 |
|
7 |
PID: 95726, Process Name: MainProcess, Thread Name: Thread-1 |
8 |
PID: 95726, Process Name: MainProcess, Thread Name: Thread-2 |
9 |
PID: 95726, Process Name: MainProcess, Thread Name: Thread-3 |
10 |
PID: 95726, Process Name: MainProcess, Thread Name: Thread-4 |
11 |
Threads time= 1.0047411918640137 |
12 |
|
13 |
PID: 95728, Process Name: Process-1, Thread Name: MainThread |
14 |
PID: 95729, Process Name: Process-2, Thread Name: MainThread |
15 |
PID: 95730, Process Name: Process-3, Thread Name: MainThread |
16 |
PID: 95731, Process Name: Process-4, Thread Name: MainThread |
17 |
Parallel time= 1.014023780822754 |
Ecco alcune osservazioni:
Nel caso dell'approccio seriale, le cose sono abbastanza ovvie. Stiamo lanciando le funzioni una dopo l'altra. Tutte e quattro sono eseguite dallo stesso thread dello stesso processo.
Usando i processi accorciamo il tempo di esecuzione di un quarto del tempo originale, semplicemente perché le funzioni sono eseguite in parallelo. Notate come ogni funzione sia compiuta in un processo differente e sul
MainThread
dei quel processo.Usando i thread traiamo vantaggio dal fatto che le funzioni possono essere eseguite simultaneamente. Il tempo di esecuzione è anche accorciato di un quarto, anche se niente è lanciato in parallelo. Ecco come va: avviamo il primo thread ed esso inizia ad aspettare il timer per fermarsi. Interrompiamo la sua esecuzione, lasciando che aspetti il timer per fermarsi e in questo tempo avviamo il secondo thread. Ripetiamo per tutti i thread. A un certo punto il timer del primo thread si ferma così cambiamo l'esecuzione e la terminiamo. L'algoritmo è ripetuto per il secondo e per tutti gli altri thread. Alla fine, il risultato è come se le cose fossero lanciate in parallelo. Noterete anche che i quattro diversi thread si espandono dallo stesso processo e vivono al suo interno: MainProcess.
Potete anche notare che l'approccio con i thread è più veloce di quello veramente parallelo. Ciò è dovuto al sovraccarico dei processi avviati. Come abbiamo notato in precedenza, i processi avviati e scambiati sono un'operazione costosa.
Facciamo la stessa routine ma questa volta lanciando la funzione crunch_numbers
:
1 |
start_time = time.time() |
2 |
for _ in range(NUM_WORKERS): |
3 |
crunch_numbers() |
4 |
end_time = time.time() |
5 |
|
6 |
print("Serial time=", end_time - start_time) |
7 |
|
8 |
start_time = time.time() |
9 |
threads = [threading.Thread(target=crunch_numbers) for _ in range(NUM_WORKERS)] |
10 |
[thread.start() for thread in threads] |
11 |
[thread.join() for thread in threads] |
12 |
end_time = time.time() |
13 |
|
14 |
print("Threads time=", end_time - start_time) |
15 |
|
16 |
|
17 |
start_time = time.time() |
18 |
processes = [multiprocessing.Process(target=crunch_numbers) for _ in range(NUM_WORKERS)] |
19 |
[process.start() for process in processes] |
20 |
[process.join() for process in processes] |
21 |
end_time = time.time() |
22 |
|
23 |
print("Parallel time=", end_time - start_time) |
Ecco il risultato che ho ottenuto:
1 |
PID: 96285, Process Name: MainProcess, Thread Name: MainThread |
2 |
PID: 96285, Process Name: MainProcess, Thread Name: MainThread |
3 |
PID: 96285, Process Name: MainProcess, Thread Name: MainThread |
4 |
PID: 96285, Process Name: MainProcess, Thread Name: MainThread |
5 |
Serial time= 2.705625057220459 |
6 |
PID: 96285, Process Name: MainProcess, Thread Name: Thread-1 |
7 |
PID: 96285, Process Name: MainProcess, Thread Name: Thread-2 |
8 |
PID: 96285, Process Name: MainProcess, Thread Name: Thread-3 |
9 |
PID: 96285, Process Name: MainProcess, Thread Name: Thread-4 |
10 |
Threads time= 2.6961309909820557 |
11 |
PID: 96289, Process Name: Process-1, Thread Name: MainThread |
12 |
PID: 96290, Process Name: Process-2, Thread Name: MainThread |
13 |
PID: 96291, Process Name: Process-3, Thread Name: MainThread |
14 |
PID: 96292, Process Name: Process-4, Thread Name: MainThread |
15 |
Parallel time= 0.8014059066772461 |
La differenza principale qui è il risultato dell'approccio multithread. Questa volta si svolge molto similmente all'approccio seriale, ed ecco perché: poiché svolge i calcoli e Python non compie un reale parallelismo, i thread sono lanciati in sostanza uno dopo l'altro, dando la precedenza di esecuzione l'uno all'altro fino a che non finiscono.
L'ecosistema della programmazione parallela/concorrente in Python
Python ha delle ricche API fer fare programmazione parallela/concorrente. In questo tutorial copriremo i più popolari, ma dovete sapere che per qualsiasi bisogno in questo settore, c'è già probabilmente qualcosa in giro che può aiutarvi a raggiungere il vostro obiettivo.
Nella prossima sezione, costruiremo un'applicazione pratica in molte forme, usando tutte le librerie presentate. Senza ulteriori indugi, ecco i moduli/librerie che abbracceremo:
threading
: Il metodo standard per lavorare con i thread in Python. È un involucro per API di alto livello sulla funzionalità esposta dal modulo_thread
, il quale è un'interfaccia di basso livello sull'implementazione del thread del sistema operativo.concurrent.futures
: Una parte del modulo della libreria standard che fornisce un livello di astrazione di ancora maggior livello sui thread. I thread sono modellati come funzioni asincrone.multiprocessing
: Simile al modulothreading
, offrendo un'interfaccia molto simile ma usando i processi invece dei thread.gevent and greenlets
: Greenlets, anche chiamata micro thread, sono unità di esecuzione che possono essere pianificate in collaborazione e possono eseguire delle funzioni in concorrenza senza troppo sovraccarico.celery
: Una coda di attività distribuita di alto livello. Le attività sono messi in coda e eseguiti in concorrenza usando vari paradigmi comemultiprocessing
ogevent
.
Costruire un'applicazione pratica
Conoscere la teoria è bello e ottimo, ma il modo migliore di imparare è costruire qualcosa di pratico, giusto? In questa sezione costruiremo un tipo di applicazione classico attraversando tutti i diversi paradigmi.
Costruiamo un'applicazione che controlli l'operatività dei siti web. Ci sono molte di tali soluzioni in giro, essendo le più note probabilmente Jetpack Monitor e Uptime Robot. Lo scopo di queste applicazioni è di notificarvi quando il vostro sito web è inaccessibile così che possiate agire velocemente. Ecco come funzionano:
- L'applicazione va molto frequentemente su una lista degli URL dei siti web e controlla se questi siti web sono accessibili.
- Ogni sito web dovrebbe essere controllato ogni 5-10 minuti così che l'inattività non sia significativa.
- Invece di eseguire una classica richiesta HTTP GET, esegue una richiesta HEAD così che non incide in modo significativo sul vostro traffico.
- Se lo stato dll'HTTP è nelle zone di pericolo (400+, 500+), il proprietario viene avvisato.
- Il proprietario viene avvisato per email, messaggio di testo o notifica.
Ecco perché è essenziale prendere un approccio parallelo/concorrente al problema. Come la lista dei siti web cresce, scorrere la lista in serie non ci garantirà che ogni sito web viene controllato ogni cinque minuti più o meno.
Iniziamo a scrivere alcune funzioni:
1 |
# utils.py
|
2 |
|
3 |
import time |
4 |
import logging |
5 |
import requests |
6 |
|
7 |
|
8 |
class WebsiteDownException(Exception): |
9 |
pass
|
10 |
|
11 |
|
12 |
def ping_website(address, timeout=20): |
13 |
"""
|
14 |
Check if a website is down. A website is considered down
|
15 |
if either the status_code >= 400 or if the timeout expires
|
16 |
|
17 |
Throw a WebsiteDownException if any of the website down conditions are met
|
18 |
"""
|
19 |
try: |
20 |
response = requests.head(address, timeout=timeout) |
21 |
if response.status_code >= 400: |
22 |
logging.warning("Website %s returned status_code=%s" % (address, response.status_code)) |
23 |
raise WebsiteDownException() |
24 |
except requests.exceptions.RequestException: |
25 |
logging.warning("Timeout expired for website %s" % address) |
26 |
raise WebsiteDownException() |
27 |
|
28 |
|
29 |
def notify_owner(address): |
30 |
"""
|
31 |
Send the owner of the address a notification that their website is down
|
32 |
|
33 |
For now, we're just going to sleep for 0.5 seconds but this is where
|
34 |
you would send an email, push notification or text-message
|
35 |
"""
|
36 |
logging.info("Notifying the owner of %s website" % address) |
37 |
time.sleep(0.5) |
38 |
|
39 |
|
40 |
def check_website(address): |
41 |
"""
|
42 |
Utility function: check if a website is down, if so, notify the user
|
43 |
"""
|
44 |
try: |
45 |
ping_website(address) |
46 |
except WebsiteDownException: |
47 |
notify_owner(address) |
Effettivamente abbiamo bisogno di una lista di siti web per provare il nostro sistema. Create la vostra lista o usate la mia:
1 |
# websites.py
|
2 |
|
3 |
WEBSITE_LIST = [ |
4 |
'https://envato.com', |
5 |
'http://amazon.co.uk', |
6 |
'http://amazon.com', |
7 |
'http://facebook.com', |
8 |
'http://google.com', |
9 |
'http://google.fr', |
10 |
'http://google.es', |
11 |
'http://google.co.uk', |
12 |
'http://internet.org', |
13 |
'http://gmail.com', |
14 |
'http://stackoverflow.com', |
15 |
'http://github.com', |
16 |
'http://heroku.com', |
17 |
'http://really-cool-available-domain.com', |
18 |
'http://djangoproject.com', |
19 |
'http://rubyonrails.org', |
20 |
'http://basecamp.com', |
21 |
'http://trello.com', |
22 |
'http://yiiframework.com', |
23 |
'http://shopify.com', |
24 |
'http://another-really-interesting-domain.co', |
25 |
'http://airbnb.com', |
26 |
'http://instagram.com', |
27 |
'http://snapchat.com', |
28 |
'http://youtube.com', |
29 |
'http://baidu.com', |
30 |
'http://yahoo.com', |
31 |
'http://live.com', |
32 |
'http://linkedin.com', |
33 |
'http://yandex.ru', |
34 |
'http://netflix.com', |
35 |
'http://wordpress.com', |
36 |
'http://bing.com', |
37 |
]
|
Normalmente, terreste questa lista in un database insieme alle informazioni di contatto del proprietario così che potete contattarli. Poiché questo non è l'argomento principale di questo tutorial, e per semplicità, useremo solo questa lista di Python.
Se siete stati realmente attenti, potete aver notato due domini realmente lunghi nella lista che non sono siti web validi (spero nessuno li compri quando starete leggendo questo per dimostrarmi che mi sbaglio!). Ho aggiunto due domini per essere sicuro che avessimo alcuni siti web inattivi a ogni esecuzione. Ancora, chiamiamo la nostra applicazione UptimeSquirrel.
L'approccio seriale
Primo, proviamo l'approccio seriale e vediamo quanto procede male.
1 |
# serial_squirrel.py
|
2 |
|
3 |
import time |
4 |
|
5 |
|
6 |
start_time = time.time() |
7 |
|
8 |
for address in WEBSITE_LIST: |
9 |
check_website(address) |
10 |
|
11 |
end_time = time.time() |
12 |
|
13 |
print("Time for SerialSquirrel: %ssecs" % (end_time - start_time)) |
14 |
|
15 |
# WARNING:root:Timeout expired for website http://really-cool-available-domain.com
|
16 |
# WARNING:root:Timeout expired for website http://another-really-interesting-domain.co
|
17 |
# WARNING:root:Website http://bing.com returned status_code=405
|
18 |
# Time for SerialSquirrel: 15.881232261657715secs
|
L'approccio con i thread
Diventeremo un po' più creativi con l'implementazione dell'approccio thread. Useremo una coda per inserire gli indirizzi e creeremo dei thread secondari per tirarli fuori dalla coda e processarli. Aspetteremo che la coda sia vuota, nel senso che tutti gli indirizzi sono stati processati dai thread secondari.
1 |
# threaded_squirrel.py
|
2 |
|
3 |
import time |
4 |
from queue import Queue |
5 |
from threading import Thread |
6 |
|
7 |
NUM_WORKERS = 4 |
8 |
task_queue = Queue() |
9 |
|
10 |
def worker(): |
11 |
# Constantly check the queue for addresses
|
12 |
while True: |
13 |
address = task_queue.get() |
14 |
check_website(address) |
15 |
|
16 |
# Mark the processed task as done
|
17 |
task_queue.task_done() |
18 |
|
19 |
start_time = time.time() |
20 |
|
21 |
# Create the worker threads
|
22 |
threads = [Thread(target=worker) for _ in range(NUM_WORKERS)] |
23 |
|
24 |
# Add the websites to the task queue
|
25 |
[task_queue.put(item) for item in WEBSITE_LIST] |
26 |
|
27 |
# Start all the workers
|
28 |
[thread.start() for thread in threads] |
29 |
|
30 |
# Wait for all the tasks in the queue to be processed
|
31 |
task_queue.join() |
32 |
|
33 |
|
34 |
end_time = time.time() |
35 |
|
36 |
print("Time for ThreadedSquirrel: %ssecs" % (end_time - start_time)) |
37 |
|
38 |
# WARNING:root:Timeout expired for website http://really-cool-available-domain.com
|
39 |
# WARNING:root:Timeout expired for website http://another-really-interesting-domain.co
|
40 |
# WARNING:root:Website http://bing.com returned status_code=405
|
41 |
# Time for ThreadedSquirrel: 3.110753059387207secs
|
concurrent.futures
Come dichiarato in precedenza, concurrent.futures
è un API di alto livello per usare i thread. L'approccio che stiamo assumendo qui implica di usare un ThreadPoolExecutor
. Sottoporremo le attività al pool e otterremo i futures, che sono i risultati che ci saranno disponibili in futuro. Certamente, possiamo aspettare che tutti i futures diventino risultati reali.
1 |
# future_squirrel.py
|
2 |
|
3 |
import time |
4 |
import concurrent.futures |
5 |
|
6 |
NUM_WORKERS = 4 |
7 |
|
8 |
start_time = time.time() |
9 |
|
10 |
with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor: |
11 |
futures = {executor.submit(check_website, address) for address in WEBSITE_LIST} |
12 |
concurrent.futures.wait(futures) |
13 |
|
14 |
end_time = time.time() |
15 |
|
16 |
print("Time for FutureSquirrel: %ssecs" % (end_time - start_time)) |
17 |
|
18 |
# WARNING:root:Timeout expired for website http://really-cool-available-domain.com
|
19 |
# WARNING:root:Timeout expired for website http://another-really-interesting-domain.co
|
20 |
# WARNING:root:Website http://bing.com returned status_code=405
|
21 |
# Time for FutureSquirrel: 1.812899112701416secs
|
L'approccio Multiprocessing
La libreria multiprocessing
fornisce una sostituzione dell'API quasi informale per la libreria threading
. In questo caso, prenderemo un approccio più simile a quello di concurrent.futures
. Creeremo un multiprocessing.Pool
e gli sottoporremo le attività mappando una funzione alla lista degli indirizzi (pensate alla classica funzione Python map
).
1 |
# multiprocessing_squirrel.py
|
2 |
|
3 |
import time |
4 |
import socket |
5 |
import multiprocessing |
6 |
|
7 |
NUM_WORKERS = 4 |
8 |
|
9 |
start_time = time.time() |
10 |
|
11 |
with multiprocessing.Pool(processes=NUM_WORKERS) as pool: |
12 |
results = pool.map_async(check_website, WEBSITE_LIST) |
13 |
results.wait() |
14 |
|
15 |
end_time = time.time() |
16 |
|
17 |
print("Time for MultiProcessingSquirrel: %ssecs" % (end_time - start_time)) |
18 |
|
19 |
# WARNING:root:Timeout expired for website http://really-cool-available-domain.com
|
20 |
# WARNING:root:Timeout expired for website http://another-really-interesting-domain.co
|
21 |
# WARNING:root:Website http://bing.com returned status_code=405
|
22 |
# Time for MultiProcessingSquirrel: 2.8224599361419678secs
|
Gevent
Gevent è un'alternativa popolare per raggiungere una concorrenza massiccia. Ci sono poche cose che vi occorre sapere prima di usarlo:
Il codice eseguito nello stesso momento da greenlets è deterministico. Al contrario delle altre alternative presentate, questo paradigma garantisce che per ogni due esecuzioni identiche, otterrete sempre lo stesso risultato nello stesso ordine.
Vi occorre scimmiottare le funzioni di aggiornamento standard così che cooperino con gevent. Ecco cosa intendo con ciò. Normalmente, un'operazione di socket si sta bloccando. Aspetteremo che l'operazione finisca. Se fossimo in un ambiente multithread, lo scheduler semplicemente passerebbe a un altro thread mentre l'altro aspetterà per I/O. Poiché non siamo in un ambiente multithread, gevent aggiorna le funzioni standard così che diventano non-bloccanti e ridanno il controllo allo scheduler di gevent.
Per installare gevent, eseguite: pip install gevent
Ecco come usare gevent per eseguire la nostra funzione usando un gevent.pool.Pool
:
1 |
# green_squirrel.py
|
2 |
|
3 |
import time |
4 |
from gevent.pool import Pool |
5 |
from gevent import monkey |
6 |
|
7 |
# Note that you can spawn many workers with gevent since the cost of creating and switching is very low
|
8 |
NUM_WORKERS = 4 |
9 |
|
10 |
# Monkey-Patch socket module for HTTP requests
|
11 |
monkey.patch_socket() |
12 |
|
13 |
start_time = time.time() |
14 |
|
15 |
pool = Pool(NUM_WORKERS) |
16 |
for address in WEBSITE_LIST: |
17 |
pool.spawn(check_website, address) |
18 |
|
19 |
# Wait for stuff to finish
|
20 |
pool.join() |
21 |
|
22 |
end_time = time.time() |
23 |
|
24 |
print("Time for GreenSquirrel: %ssecs" % (end_time - start_time)) |
25 |
# Time for GreenSquirrel: 3.8395519256591797secs
|
Celery
Celery è un approccio che differisce in misura maggiore da quelli che abbiamo visto finora. È collaudato nel contesto di ambienti molto complessi e ad alta prestazione. Impostare Celery richiederà un po' più di interventi rispetto a tutte le soluzioni precedenti.
Primo, ci occorre installare Celery:
pip install celery
Le funzioni sono i concetti centrali nel progetto Celery. Ogni cosa che vorrete eseguire dentro Celery dovrà essere una funzione. Celery offre grande flessibilità per eseguire le funzioni: potete eseguirle in modo sincrono o asincrono, in tempo reale o pianificato, sulla stessa macchina o su macchine multiple, e usando i thread, i processi, Eventlet o gevent.
La configurazione sarà un po' più complessa. Celery usa altri servizi per inviare e ricevere messaggi. Questi messaggi sono di solito funzioni o risultati delle funzioni. Useremo Redis in questo tutorial per questo scopo. Redis è una grande scelta perché è realmente semplice da installare e configurare ed è realmente possibile che già li utilizziate nella vostra applicazione per altri scopi, come il caching e pub/sub.
Potete installare Redis seguendo le istruzioni sulla pagina Redis Quick Start. Non dimenticate di installare la libreria Python redis
, pip install redis
, e il pacchetto necessario per usare Redis e Celery: pip install celery[redis]
.
Avviate il server Redis così: $ redis-server
Per iniziare a costruire qualcosa con Celery, prima dovrete creare un'applicazione Celery. Dopo di ciò, Celery ha bisogno di sapere che tipo di funzioni potrebbe eseguire. Per raggiungere ciò, abbiamo bisogno di registrare le funzioni nell'applicazione Celery. Faremo questo usando il decoratore @app.task
:
1 |
# celery_squirrel.py
|
2 |
|
3 |
import time |
4 |
from utils import check_website |
5 |
from data import WEBSITE_LIST |
6 |
from celery import Celery |
7 |
from celery.result import ResultSet |
8 |
|
9 |
app = Celery('celery_squirrel', |
10 |
broker='redis://localhost:6379/0', |
11 |
backend='redis://localhost:6379/0') |
12 |
|
13 |
@app.task |
14 |
def check_website_task(address): |
15 |
return check_website(address) |
16 |
|
17 |
if __name__ == "__main__": |
18 |
start_time = time.time() |
19 |
|
20 |
# Using `delay` runs the task async
|
21 |
rs = ResultSet([check_website_task.delay(address) for address in WEBSITE_LIST]) |
22 |
|
23 |
# Wait for the tasks to finish
|
24 |
rs.get() |
25 |
|
26 |
end_time = time.time() |
27 |
|
28 |
print("CelerySquirrel:", end_time - start_time) |
29 |
# CelerySquirrel: 2.4979639053344727
|
Niente panico se non accade niente. Ricordate, Celery è un servizio e dobbiamo lanciarlo. Fino ad adesso, abbiamo posizionato solo le funzioni in Redis ma non abbiamo avviato Celery per eseguirle. Per fare questo, abbiamo bisogno di lanciare questo comando nella cartella dove risiede il codice:
celery worker -A do_celery --loglevel=debug --concurrency=4
Ora rilanciamo lo script di Python e vediamo cosa succede. Una cosa su cui fare attenzione: notate come abbiamo trasmesso l'indirizzo di Redis alla nostra applicazione Redis due volte. Il parametro broker
specifica dove le funzioni vengono trasferite a Celery, e backend
è dove Celery mette i risultati così che potete usarli nella vostra applicazione. Se non specificate un risultato backend
, non c'è modo per noi di sapere quando la funzione è stata elaborata e quale fosse il risultato.
Ancora, state attenti che i log ora siano nello standard output del processo di Celery, quindi siate sicuri di controllarli nel terminale appropriato.
Conclusioni
Spero che questo sia stato un viaggio interessante per voi e una buona introduzione al mondo della programmazione parallela/concorrente in Python. Questa è la fine del viaggio, e ci sono alcune conclusioni che possiamo trarre:
- Ci sono molti paradigmi che ci aiutano a raggiungere calcoli ad alta prestazione in Python.
- Per il paradigma multi-thread, abbiamo le librerie
threading
econcurrent.futures
. -
multiprocessing
fornisce un'interfaccia molto simile athreading
ma per i processi piuttosto che per i thread. - Ricordate che i processi raggiungono il vero parallelismo, ma sono molto costosi da creare.
- Ricordate che un processo può avere più thread lanciati all'interno di esso.
- Non scambiate parallela per concorrente. Ricordate che solo l'approccio parallelo porta vantaggio ai processori multi-core, considerando che la programmazione concorrente pianifica intelligentemente i compiti in modo che l'attesa su operazioni a lungo funzionamento sia eseguita mentre in quella parallela effettua il calcolo effettivo.