Advertisement
  1. Code
  2. Python

Introduzione alla programmazione parallela e concorrente con Python

Scroll to top
Read Time: 19 min

() translation by (you can also view the original English article)

Python è uno dei più popolari linguaggi per l'elaborazione dei dati e l'analisi dei dati in generale. L'ecosistema fornisce molte librerie e framework che facilitano i calcoli ad alte prestazioni. Tuttavia, fare programmazione parallela con Python può risultare abbastanza insidioso.

In questo tutorial, andremo a studiare perché il parallelismo è difficile specialmente nel contesto di Python, e per fare questo, passeremo attraverso i seguenti:

  • Perché il parallelismo è insidioso in Python (indizio: è a causa del GIL—lock globale dell'interprete).
  • I thread vs. i processi: Modi diversi di raggiungere il parallelismo. Quando usare uno rispetto all'altro?
  • Parallela vs. concorrente: perché in alcuni casi possiamo accontentarci della concorrenza piuttosto che del parallelismo.
  • Costruire un semplice ma pratico esempio usando le varie tecniche discusse.

Il lock globale dell'interprete

Il lock globale dell'interprete (GIL) è uno degli argomenti più controversi nel mondo Python. In CPython, l'implementazione più popolare di Python, il GIL è un mutex che crea cose thread-safe. Il GIL lo rende semplice da integrare con le librerie esterne che non sono thread-safe e rende il codice non-parallelo più veloce. Tuttavia, ciò ha un costo. A causa di GIL, non possiamo realizzare il parallelismo vero tramite il multithreading. In sostanza, due thread nativi diversi dello stesso processo non possono eseguire il codice Python subito.

Le cose non sono così gravi, tuttavia, ed ecco perché: le cose che succedono fuori dal regno di GIL sono libere di essere parallele. In questa categoria ricadono i task lunghissimi come I/O e, per fortuna, le librerie come numpy.

I Thread vs. i processi

Quindi Python non è veramente multithread. Ma cos'è un thread? Facciamo un passo indietro e guardiamo le cose in prospettiva.

Un processo un'operazione del sistema operativo di base. È un programma che si trova in esecuzione—in altre parole, il codice in esecuzione. I processi multipli sono sempre eseguiti in un computer e sono lanciati in parallelo.

Un processo può avere thread multipli. Essi eseguono lo stesso codice appartenente la processo genitore. Idealmente, si eseguono in parallelo, ma non necessariamente. La ragione per cui i processi non sono abbastanza è perché le applicazioni hanno bisogno di essere reattive e fare attenzione alle azioni dell'utente mentre aggiorna il monitor e salva un file.

Se è ancora poco chiaro, ecco un bigino:

PROCESSI
THREAD
I processi non condividono la memoria
I thread condividono la memoria
I processi avviati/cambiati sono costosi
I thread avviati/cambiati sono meno costosi
I processi richiedono molte risorse
I thread richiedono poche risorse(qualche volta sono chiamati processi leggeri)
Non occorre sincronizzazione di memoria
Avete bisogno di usare meccanismi di sincronizzazione per essere sicuri che stiate gestendo correttamente i dati

Non c'è una ricetta che soddisfa tutto. Scegliere una è subordinata molto dal contesto e dal compito che state cercando di raggiungere.

Parallela vs. concorrente

Adesso andremo un passo più avanti e ci immergeremo nella concorrenza. La concorrenza è spesso fraintesa e scambiata per il parallelismo. Non è questo il caso. La concorrenza implica che il codice indipendente dalla pianificazione sia eseguito in maniera cooperativa. Approfittate del fatto che un pezzo di codice sta aspettando le operazioni I/O e durante questo tempo esegue una parte del codice diversa ma indipendente.

In Python, possiamo ottenere un comportamento concorrente leggero tramite greenlet. Dalla prospettiva del parallelismo, usare i thread o greenlet è equivalente perché nessuno di essi si esegue in parallelo. I greenlet sono anche meno costosi da creare dei thread. Perciò, i greenlet sono fortemente usati per compiere un enorme numero di semplici funzioni I/O, come quella che di solito troviamo nel networking e i server web.

Adesso che sappiamo la differenza tra i thread e i processi, parallelo e concorrente, possiamo illustrare come funzioni diverse si svolgono su due paradigmi. Ecco che cosa andremo a fare: eseguiremo, molteplici volte, una funzione fuori dal GIL e una all'interno. Le lanceremo in serie, usando i thread e usando i processi. Definiamo le funzioni:

1
import os
2
import time
3
import threading
4
import multiprocessing
5
6
NUM_WORKERS = 4
7
8
def only_sleep():
9
    """ Do nothing, wait for a timer to expire """
10
    print("PID: %s, Process Name: %s, Thread Name: %s" % (
11
        os.getpid(),
12
        multiprocessing.current_process().name,
13
        threading.current_thread().name)
14
    )
15
    time.sleep(1)
16
17
18
def crunch_numbers():
19
    """ Do some computations """
20
    print("PID: %s, Process Name: %s, Thread Name: %s" % (
21
        os.getpid(),
22
        multiprocessing.current_process().name,
23
        threading.current_thread().name)
24
    )
25
    x = 0
26
    while x < 10000000:
27
        x += 1

Abbiamo creato due funzioni. Entrambe sono lunghissime, ma solo crunch_numbers compie attivamente dei calcoli. Lanciamo only_sleep in serie, in multithread e usando processi multipli e confrontiamo i risultati:

1
## Run tasks serially

2
start_time = time.time()
3
for _ in range(NUM_WORKERS):
4
    only_sleep()
5
end_time = time.time()
6
7
print("Serial time=", end_time - start_time)
8
9
# Run tasks using threads

10
start_time = time.time()
11
threads = [threading.Thread(target=only_sleep) for _ in range(NUM_WORKERS)]
12
[thread.start() for thread in threads]
13
[thread.join() for thread in threads]
14
end_time = time.time()
15
16
print("Threads time=", end_time - start_time)
17
18
# Run tasks using processes

19
start_time = time.time()
20
processes = [multiprocessing.Process(target=only_sleep()) for _ in range(NUM_WORKERS)]
21
[process.start() for process in processes]
22
[process.join() for process in processes]
23
end_time = time.time()
24
25
print("Parallel time=", end_time - start_time)

Ecco il risultato che ho ottenuto (il vostro dovrebbe essere simile, sebbene i PID e il tempo varieranno un poco):

1
PID: 95726, Process Name: MainProcess, Thread Name: MainThread
2
PID: 95726, Process Name: MainProcess, Thread Name: MainThread
3
PID: 95726, Process Name: MainProcess, Thread Name: MainThread
4
PID: 95726, Process Name: MainProcess, Thread Name: MainThread
5
Serial time= 4.018089056015015
6
7
PID: 95726, Process Name: MainProcess, Thread Name: Thread-1
8
PID: 95726, Process Name: MainProcess, Thread Name: Thread-2
9
PID: 95726, Process Name: MainProcess, Thread Name: Thread-3
10
PID: 95726, Process Name: MainProcess, Thread Name: Thread-4
11
Threads time= 1.0047411918640137
12
13
PID: 95728, Process Name: Process-1, Thread Name: MainThread
14
PID: 95729, Process Name: Process-2, Thread Name: MainThread
15
PID: 95730, Process Name: Process-3, Thread Name: MainThread
16
PID: 95731, Process Name: Process-4, Thread Name: MainThread
17
Parallel time= 1.014023780822754

Ecco alcune osservazioni:

  • Nel caso dell'approccio seriale, le cose sono abbastanza ovvie. Stiamo lanciando le funzioni una dopo l'altra. Tutte e quattro sono eseguite dallo stesso thread dello stesso processo. 

  • Usando i processi accorciamo il tempo di esecuzione di un quarto del tempo originale, semplicemente perché le funzioni sono eseguite in parallelo. Notate come ogni funzione sia compiuta in un processo differente e sul MainThread dei quel processo.

  • Usando i thread traiamo vantaggio dal fatto che le funzioni possono essere eseguite simultaneamente. Il tempo di esecuzione è anche accorciato di un quarto, anche se niente è lanciato in parallelo. Ecco come va: avviamo il primo thread ed esso inizia ad aspettare il timer per fermarsi. Interrompiamo la sua esecuzione, lasciando che aspetti il timer per fermarsi e in questo tempo avviamo il secondo thread. Ripetiamo per tutti i thread. A un certo punto il timer del primo thread si ferma così cambiamo l'esecuzione e la terminiamo. L'algoritmo è ripetuto per il secondo e per tutti gli altri thread. Alla fine, il risultato è come se le cose fossero lanciate in parallelo. Noterete anche che i quattro diversi thread si espandono dallo stesso processo e vivono al suo interno: MainProcess.

  • Potete anche notare che l'approccio con i thread è più veloce di quello veramente parallelo. Ciò è dovuto al sovraccarico dei processi avviati. Come abbiamo notato in precedenza, i processi avviati e scambiati sono un'operazione costosa.

Facciamo la stessa routine ma questa volta lanciando la funzione crunch_numbers:

1
start_time = time.time()
2
for _ in range(NUM_WORKERS):
3
    crunch_numbers()
4
end_time = time.time()
5
6
print("Serial time=", end_time - start_time)
7
8
start_time = time.time()
9
threads = [threading.Thread(target=crunch_numbers) for _ in range(NUM_WORKERS)]
10
[thread.start() for thread in threads]
11
[thread.join() for thread in threads]
12
end_time = time.time()
13
14
print("Threads time=", end_time - start_time)
15
16
17
start_time = time.time()
18
processes = [multiprocessing.Process(target=crunch_numbers) for _ in range(NUM_WORKERS)]
19
[process.start() for process in processes]
20
[process.join() for process in processes]
21
end_time = time.time()
22
23
print("Parallel time=", end_time - start_time)

Ecco il risultato che ho ottenuto:

1
PID: 96285, Process Name: MainProcess, Thread Name: MainThread
2
PID: 96285, Process Name: MainProcess, Thread Name: MainThread
3
PID: 96285, Process Name: MainProcess, Thread Name: MainThread
4
PID: 96285, Process Name: MainProcess, Thread Name: MainThread
5
Serial time= 2.705625057220459
6
PID: 96285, Process Name: MainProcess, Thread Name: Thread-1
7
PID: 96285, Process Name: MainProcess, Thread Name: Thread-2
8
PID: 96285, Process Name: MainProcess, Thread Name: Thread-3
9
PID: 96285, Process Name: MainProcess, Thread Name: Thread-4
10
Threads time= 2.6961309909820557
11
PID: 96289, Process Name: Process-1, Thread Name: MainThread
12
PID: 96290, Process Name: Process-2, Thread Name: MainThread
13
PID: 96291, Process Name: Process-3, Thread Name: MainThread
14
PID: 96292, Process Name: Process-4, Thread Name: MainThread
15
Parallel time= 0.8014059066772461

La differenza principale qui è il risultato dell'approccio multithread. Questa volta si svolge molto similmente all'approccio seriale, ed ecco perché: poiché svolge i calcoli e Python non compie un reale parallelismo, i thread sono lanciati in sostanza uno dopo l'altro, dando la precedenza di esecuzione l'uno all'altro fino a che non finiscono.

L'ecosistema della programmazione parallela/concorrente in Python

Python ha delle ricche API fer fare programmazione parallela/concorrente. In questo tutorial copriremo i più popolari, ma dovete sapere che per qualsiasi bisogno in questo settore, c'è già probabilmente qualcosa in giro che può aiutarvi a raggiungere il vostro obiettivo.

Nella prossima sezione, costruiremo un'applicazione pratica in molte forme, usando tutte le librerie presentate. Senza ulteriori indugi, ecco i moduli/librerie che abbracceremo:

  • threading: Il metodo standard per lavorare con i thread in Python. È un involucro per API di alto livello sulla funzionalità esposta dal modulo _thread, il quale è un'interfaccia di basso livello sull'implementazione del thread del sistema operativo.

  • concurrent.futures: Una parte del modulo della libreria standard che fornisce un livello di astrazione di ancora maggior livello sui thread. I thread sono modellati come funzioni asincrone.

  • multiprocessing: Simile al modulo threading, offrendo un'interfaccia molto simile ma usando i processi invece dei thread.

  • gevent and greenlets: Greenlets, anche chiamata micro thread, sono unità di esecuzione che possono essere pianificate in collaborazione e possono eseguire delle funzioni in concorrenza senza troppo sovraccarico.

  • celery: Una coda di attività distribuita di alto livello. Le attività sono messi in coda e eseguiti in concorrenza usando vari paradigmi come multiprocessing o gevent.

Costruire un'applicazione pratica

Conoscere la teoria è bello e ottimo, ma il modo migliore di imparare è costruire qualcosa di pratico, giusto? In questa sezione costruiremo un tipo di applicazione classico attraversando tutti i diversi paradigmi.

Costruiamo un'applicazione che controlli l'operatività dei siti web. Ci sono molte di tali soluzioni in giro, essendo le più note probabilmente Jetpack Monitor e Uptime Robot. Lo scopo di queste applicazioni è di notificarvi quando il vostro sito web è inaccessibile così che possiate agire velocemente. Ecco come funzionano:

  • L'applicazione va molto frequentemente su una lista degli URL dei siti web e controlla se questi siti web sono accessibili.
  • Ogni sito web dovrebbe essere controllato ogni 5-10 minuti così che l'inattività non sia significativa.
  • Invece di eseguire una classica richiesta HTTP GET, esegue una richiesta HEAD così che non incide in modo significativo sul vostro traffico.
  • Se lo stato dll'HTTP è nelle zone di pericolo (400+, 500+), il proprietario viene avvisato.
  • Il proprietario viene avvisato per email, messaggio di testo o notifica.

Ecco perché è essenziale prendere un approccio parallelo/concorrente al problema. Come la lista dei siti web cresce, scorrere la lista in serie non ci garantirà che ogni sito web viene controllato ogni cinque minuti più o meno.

Iniziamo a scrivere alcune funzioni:

1
# utils.py

2
3
import time
4
import logging
5
import requests
6
7
8
class WebsiteDownException(Exception):
9
    pass
10
11
12
def ping_website(address, timeout=20):
13
    """

14
    Check if a website is down. A website is considered down 

15
    if either the status_code >= 400 or if the timeout expires

16
    

17
    Throw a WebsiteDownException if any of the website down conditions are met

18
    """
19
    try:
20
        response = requests.head(address, timeout=timeout)
21
        if response.status_code >= 400:
22
            logging.warning("Website %s returned status_code=%s" % (address, response.status_code))
23
            raise WebsiteDownException()
24
    except requests.exceptions.RequestException:
25
        logging.warning("Timeout expired for website %s" % address)
26
        raise WebsiteDownException()
27
        
28
29
def notify_owner(address):
30
    """ 

31
    Send the owner of the address a notification that their website is down 

32
    

33
    For now, we're just going to sleep for 0.5 seconds but this is where 

34
    you would send an email, push notification or text-message

35
    """
36
    logging.info("Notifying the owner of %s website" % address)
37
    time.sleep(0.5)
38
    
39
40
def check_website(address):
41
    """

42
    Utility function: check if a website is down, if so, notify the user

43
    """
44
    try:
45
        ping_website(address)
46
    except WebsiteDownException:
47
        notify_owner(address)

Effettivamente abbiamo bisogno di una lista di siti web per provare il nostro sistema. Create la vostra lista o usate la mia:

1
# websites.py

2
3
WEBSITE_LIST = [
4
    'https://envato.com',
5
    'http://amazon.co.uk',
6
    'http://amazon.com',
7
    'http://facebook.com',
8
    'http://google.com',
9
    'http://google.fr',
10
    'http://google.es',
11
    'http://google.co.uk',
12
    'http://internet.org',
13
    'http://gmail.com',
14
    'http://stackoverflow.com',
15
    'http://github.com',
16
    'http://heroku.com',
17
    'http://really-cool-available-domain.com',
18
    'http://djangoproject.com',
19
    'http://rubyonrails.org',
20
    'http://basecamp.com',
21
    'http://trello.com',
22
    'http://yiiframework.com',
23
    'http://shopify.com',
24
    'http://another-really-interesting-domain.co',
25
    'http://airbnb.com',
26
    'http://instagram.com',
27
    'http://snapchat.com',
28
    'http://youtube.com',
29
    'http://baidu.com',
30
    'http://yahoo.com',
31
    'http://live.com',
32
    'http://linkedin.com',
33
    'http://yandex.ru',
34
    'http://netflix.com',
35
    'http://wordpress.com',
36
    'http://bing.com',
37
]

Normalmente, terreste questa lista in un database insieme alle informazioni di contatto del proprietario così che potete contattarli. Poiché questo non è l'argomento principale di questo tutorial, e per semplicità, useremo solo questa lista di Python.

Se siete stati realmente attenti, potete aver notato due domini realmente lunghi nella lista che non sono siti web validi (spero nessuno li compri quando starete leggendo questo per dimostrarmi che mi sbaglio!). Ho aggiunto due domini per essere sicuro che avessimo alcuni siti web inattivi a ogni esecuzione. Ancora, chiamiamo la nostra applicazione UptimeSquirrel.

L'approccio seriale

Primo, proviamo l'approccio seriale e vediamo quanto procede male.

1
# serial_squirrel.py

2
3
import time
4
5
6
start_time = time.time()
7
8
for address in WEBSITE_LIST:
9
    check_website(address)
10
        
11
end_time = time.time()        
12
13
print("Time for SerialSquirrel: %ssecs" % (end_time - start_time))
14
15
# WARNING:root:Timeout expired for website http://really-cool-available-domain.com

16
# WARNING:root:Timeout expired for website http://another-really-interesting-domain.co

17
# WARNING:root:Website http://bing.com returned status_code=405

18
# Time for SerialSquirrel: 15.881232261657715secs

L'approccio con i thread

Diventeremo un po' più creativi con l'implementazione dell'approccio thread. Useremo una coda per inserire gli indirizzi e creeremo dei thread secondari per tirarli fuori dalla coda e processarli. Aspetteremo che la coda sia vuota, nel senso che tutti gli indirizzi sono stati processati dai thread secondari.

1
# threaded_squirrel.py

2
3
import time
4
from queue import Queue
5
from threading import Thread
6
7
NUM_WORKERS = 4
8
task_queue = Queue()
9
10
def worker():
11
    # Constantly check the queue for addresses

12
    while True:
13
        address = task_queue.get()
14
        check_website(address)
15
        
16
        # Mark the processed task as done

17
        task_queue.task_done()
18
19
start_time = time.time()
20
        
21
# Create the worker threads

22
threads = [Thread(target=worker) for _ in range(NUM_WORKERS)]
23
24
# Add the websites to the task queue

25
[task_queue.put(item) for item in WEBSITE_LIST]
26
27
# Start all the workers

28
[thread.start() for thread in threads]
29
30
# Wait for all the tasks in the queue to be processed

31
task_queue.join()
32
33
        
34
end_time = time.time()        
35
36
print("Time for ThreadedSquirrel: %ssecs" % (end_time - start_time))
37
38
# WARNING:root:Timeout expired for website http://really-cool-available-domain.com

39
# WARNING:root:Timeout expired for website http://another-really-interesting-domain.co

40
# WARNING:root:Website http://bing.com returned status_code=405

41
# Time for ThreadedSquirrel: 3.110753059387207secs

concurrent.futures

Come dichiarato in precedenza, concurrent.futures è un API di alto livello per usare i thread. L'approccio che stiamo assumendo qui implica di usare un ThreadPoolExecutor. Sottoporremo le attività al pool e otterremo i futures, che sono i risultati che ci saranno disponibili in futuro. Certamente, possiamo aspettare che tutti i futures diventino risultati reali.

1
# future_squirrel.py

2
3
import time
4
import concurrent.futures
5
6
NUM_WORKERS = 4
7
8
start_time = time.time()
9
10
with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:
11
    futures = {executor.submit(check_website, address) for address in WEBSITE_LIST}
12
    concurrent.futures.wait(futures)
13
14
end_time = time.time()        
15
16
print("Time for FutureSquirrel: %ssecs" % (end_time - start_time))
17
18
# WARNING:root:Timeout expired for website http://really-cool-available-domain.com

19
# WARNING:root:Timeout expired for website http://another-really-interesting-domain.co

20
# WARNING:root:Website http://bing.com returned status_code=405

21
# Time for FutureSquirrel: 1.812899112701416secs

L'approccio Multiprocessing

La libreria multiprocessing fornisce una sostituzione dell'API quasi informale per la libreria threading. In questo caso, prenderemo un approccio più simile a quello di concurrent.futures. Creeremo un multiprocessing.Pool e gli sottoporremo le attività mappando una funzione alla lista degli indirizzi (pensate alla classica funzione Python map).

1
# multiprocessing_squirrel.py

2
3
import time
4
import socket
5
import multiprocessing
6
7
NUM_WORKERS = 4
8
9
start_time = time.time()
10
11
with multiprocessing.Pool(processes=NUM_WORKERS) as pool:
12
    results = pool.map_async(check_website, WEBSITE_LIST)
13
    results.wait()
14
15
end_time = time.time()        
16
17
print("Time for MultiProcessingSquirrel: %ssecs" % (end_time - start_time))
18
19
# WARNING:root:Timeout expired for website http://really-cool-available-domain.com

20
# WARNING:root:Timeout expired for website http://another-really-interesting-domain.co

21
# WARNING:root:Website http://bing.com returned status_code=405

22
# Time for MultiProcessingSquirrel: 2.8224599361419678secs

Gevent

Gevent è un'alternativa popolare per raggiungere una concorrenza massiccia. Ci sono poche cose che vi occorre sapere prima di usarlo:

  • Il codice eseguito nello stesso momento da greenlets è deterministico. Al contrario delle altre alternative presentate, questo paradigma garantisce che per ogni due esecuzioni identiche, otterrete sempre lo stesso risultato nello stesso ordine.

  • Vi occorre scimmiottare le funzioni di aggiornamento standard così che cooperino con gevent. Ecco cosa intendo con ciò. Normalmente, un'operazione di socket si sta bloccando. Aspetteremo che l'operazione finisca. Se fossimo in un ambiente multithread, lo scheduler semplicemente passerebbe a un altro thread mentre l'altro aspetterà per I/O. Poiché non siamo in un ambiente multithread, gevent aggiorna le funzioni standard così che diventano non-bloccanti e ridanno il controllo allo scheduler di gevent.

Per installare gevent, eseguite: pip install gevent

Ecco come usare gevent per eseguire la nostra funzione usando un gevent.pool.Pool:

1
# green_squirrel.py

2
3
import time
4
from gevent.pool import Pool
5
from gevent import monkey
6
7
# Note that you can spawn many workers with gevent since the cost of creating and switching is very low

8
NUM_WORKERS = 4
9
10
# Monkey-Patch socket module for HTTP requests

11
monkey.patch_socket()
12
13
start_time = time.time()
14
15
pool = Pool(NUM_WORKERS)
16
for address in WEBSITE_LIST:
17
    pool.spawn(check_website, address)
18
19
# Wait for stuff to finish

20
pool.join()
21
        
22
end_time = time.time()        
23
24
print("Time for GreenSquirrel: %ssecs" % (end_time - start_time))
25
# Time for GreenSquirrel: 3.8395519256591797secs

Celery

Celery è un approccio che differisce in misura maggiore da quelli che abbiamo visto finora. È collaudato nel contesto di ambienti molto complessi e ad alta prestazione. Impostare Celery richiederà un po' più di interventi rispetto a tutte le soluzioni precedenti.

Primo, ci occorre installare Celery:

pip install celery

Le funzioni sono i concetti centrali nel progetto Celery. Ogni cosa che vorrete eseguire dentro Celery dovrà essere una funzione. Celery offre grande flessibilità per eseguire le funzioni: potete eseguirle in modo sincrono o asincrono, in tempo reale o pianificato, sulla stessa macchina o su macchine multiple, e usando i thread, i processi, Eventlet o gevent.

La configurazione sarà un po' più complessa. Celery usa altri servizi per inviare e ricevere messaggi. Questi messaggi sono di solito funzioni o risultati delle funzioni. Useremo Redis in questo tutorial per questo scopo. Redis è una grande scelta perché è realmente semplice da installare e configurare ed è realmente possibile che già li utilizziate nella vostra applicazione per altri scopi, come il caching e pub/sub.

Potete installare Redis seguendo le istruzioni sulla pagina Redis Quick Start. Non dimenticate di installare la libreria Python redis, pip install redis, e il pacchetto necessario per usare Redis e Celery: pip install celery[redis].

Avviate il server Redis così: $ redis-server

Per iniziare a costruire qualcosa con Celery, prima dovrete creare un'applicazione Celery. Dopo di ciò, Celery ha bisogno di sapere che tipo di funzioni potrebbe eseguire. Per raggiungere ciò, abbiamo bisogno di registrare le funzioni nell'applicazione Celery. Faremo questo usando il decoratore @app.task:

1
# celery_squirrel.py

2
3
import time
4
from utils import check_website
5
from data import WEBSITE_LIST
6
from celery import Celery
7
from celery.result import ResultSet
8
9
app = Celery('celery_squirrel',
10
             broker='redis://localhost:6379/0',
11
             backend='redis://localhost:6379/0')
12
13
@app.task
14
def check_website_task(address):
15
    return check_website(address)
16
17
if __name__ == "__main__":
18
    start_time = time.time()
19
20
    # Using `delay` runs the task async

21
    rs = ResultSet([check_website_task.delay(address) for address in WEBSITE_LIST])
22
    
23
    # Wait for the tasks to finish

24
    rs.get()
25
26
    end_time = time.time()
27
28
    print("CelerySquirrel:", end_time - start_time)
29
    # CelerySquirrel: 2.4979639053344727

Niente panico se non accade niente. Ricordate, Celery è un servizio e dobbiamo lanciarlo. Fino ad adesso, abbiamo posizionato solo le funzioni in Redis ma non abbiamo avviato Celery per eseguirle. Per fare questo, abbiamo bisogno di lanciare questo comando nella cartella dove risiede il codice:

celery worker -A do_celery --loglevel=debug --concurrency=4

Ora rilanciamo lo script di Python e vediamo cosa succede. Una cosa su cui fare attenzione: notate come abbiamo trasmesso l'indirizzo di Redis alla nostra applicazione Redis due volte. Il parametro broker specifica dove le funzioni vengono trasferite a Celery, e backend è dove Celery mette i risultati così che potete usarli nella vostra applicazione. Se non specificate un risultato backend, non c'è modo per noi di sapere quando la funzione è stata elaborata e quale fosse il risultato.

Ancora, state attenti che i log ora siano nello standard output del processo di Celery, quindi siate sicuri di controllarli nel terminale appropriato.

Conclusioni

Spero che questo sia stato un viaggio interessante per voi e una buona introduzione al mondo della programmazione parallela/concorrente in Python. Questa è la fine del viaggio, e ci sono alcune conclusioni che possiamo trarre:

  • Ci sono molti paradigmi che ci aiutano a raggiungere calcoli ad alta prestazione in Python.
  • Per il paradigma multi-thread, abbiamo le librerie threading e concurrent.futures.
  • multiprocessing fornisce un'interfaccia molto simile a threading ma per i processi piuttosto che per i thread.
  • Ricordate che i processi raggiungono il vero parallelismo, ma sono molto costosi da creare.
  • Ricordate che un processo può avere più thread lanciati all'interno di esso.
  • Non scambiate parallela per concorrente. Ricordate che solo l'approccio parallelo porta vantaggio ai processori multi-core, considerando che la programmazione concorrente pianifica intelligentemente i compiti in modo che l'attesa su operazioni a lungo funzionamento sia eseguita mentre in quella parallela effettua il calcolo effettivo.
Advertisement
Did you find this post useful?
Want a weekly email summary?
Subscribe below and we’ll send you a weekly email summary of all new Code tutorials. Never miss out on learning about the next big thing.
Advertisement
Looking for something to help kick start your next project?
Envato Market has a range of items for sale to help get you started.