() translation by (you can also view the original English article)
Python ist eine der beliebtesten Sprachen für Datenverarbeitung und Datenwissenschaft im Allgemeinen. Das Ökosystem bietet viele Bibliotheken und Frameworks, die Hochleistungsrechnen ermöglichen. Das parallele Programmieren in Python kann sich jedoch als recht schwierig erweisen.
In diesem Tutorial werden wir untersuchen, warum Parallelität besonders im Python-Kontext schwierig ist, und dafür werden wir Folgendes durchgehen:
- Warum ist Parallelität in Python schwierig? (Hinweis: Dies liegt an der GIL - der globalen Interpretersperre).
- Threads vs. Prozesse: Verschiedene Wege zur Erreichung von Parallelität. Wann übereinander verwenden?
- Parallel vs. Concurrent: Warum können wir uns in einigen Fällen eher mit Parallelität als mit Parallelität zufrieden geben?
- Erstellen eines einfachen, aber praktischen Beispiels unter Verwendung der verschiedenen diskutierten Techniken.
Global Interpreter Lock
Das Global Interpreter Lock (GIL) ist eines der umstrittensten Themen in der Python-Welt. In CPython, der beliebtesten Implementierung von Python, ist die GIL ein Mutex, der die Thread-Sicherheit gewährleistet. Die GIL erleichtert die Integration in externe Bibliotheken, die nicht threadsicher sind, und beschleunigt nicht parallelen Code. Dies ist jedoch mit Kosten verbunden. Aufgrund der GIL können wir durch Multithreading keine echte Parallelität erreichen. Grundsätzlich können zwei verschiedene native Threads desselben Prozesses Python-Code nicht gleichzeitig ausführen.
Die Dinge sind jedoch nicht so schlimm, und hier ist der Grund: Dinge, die außerhalb des GIL-Bereichs passieren, können parallel sein. In diese Kategorie fallen lang laufende Aufgaben wie E/A und glücklicherweise Bibliotheken wie numpy
.
Threads vs. Prozesse
Python ist also nicht wirklich multithreaded. Aber was ist ein Thread? Machen wir einen Schritt zurück und betrachten die Dinge in der Perspektive.
Ein Prozess ist eine grundlegende Betriebssystemabstraktion. Es ist ein Programm, das ausgeführt wird - mit anderen Worten, Code, der ausgeführt wird. Auf einem Computer werden immer mehrere Prozesse ausgeführt, die parallel ausgeführt werden.
Ein Prozess kann mehrere Threads haben. Sie führen denselben Code aus, der zum übergeordneten Prozess gehört. Im Idealfall laufen sie parallel, aber nicht unbedingt. Der Grund, warum Prozesse nicht ausreichen, liegt darin, dass Anwendungen reagieren und auf Benutzeraktionen warten müssen, während sie die Anzeige aktualisieren und eine Datei speichern.
Wenn das noch etwas unklar ist, hier ein Cheatsheet:
PROZESSE | GEWINDE |
---|---|
Prozesse teilen keinen Speicher | Threads teilen sich den Speicher |
Laich-/Wechselprozesse sind teuer | Das Laichen/Wechseln von Threads ist kostengünstiger |
Prozesse erfordern mehr Ressourcen | Threads erfordern weniger Ressourcen (werden manchmal als Lightweight-Prozesse bezeichnet). |
Keine Speichersynchronisation erforderlich | Sie müssen Synchronisationsmechanismen verwenden, um sicherzustellen, dass Sie die Daten korrekt verarbeiten |
Es gibt nicht ein Rezept, das alles bietet. Die Auswahl hängt stark vom Kontext und der Aufgabe ab, die Sie erreichen möchten.
Parallel vs. Gleichzeitig
Jetzt gehen wir noch einen Schritt weiter und tauchen in die Parallelität ein. Parallelität wird oft missverstanden und mit Parallelität verwechselt. Das ist nicht der Fall. Parallelität bedeutet, dass unabhängiger Code so geplant wird, dass er kooperativ ausgeführt wird. Nutzen Sie die Tatsache, dass ein Teil des Codes auf E/A-Vorgänge wartet, und führen Sie während dieser Zeit einen anderen, aber unabhängigen Teil des Codes aus.
In Python können wir über Greenlets ein leichtes gleichzeitiges Verhalten erzielen. Aus Sicht der Parallelisierung ist die Verwendung von Threads oder Greenlets gleichwertig, da keiner von beiden parallel ausgeführt wird. Die Herstellung von Greenlets ist noch günstiger als die von Threads. Aus diesem Grund werden Greenlets häufig für die Ausführung einer Vielzahl einfacher E/A-Aufgaben verwendet, wie sie normalerweise in Netzwerken und Webservern zu finden sind.
Nachdem wir nun den Unterschied zwischen parallelen und gleichzeitigen Threads und Prozessen kennen, können wir veranschaulichen, wie unterschiedliche Aufgaben für die beiden Paradigmen ausgeführt werden. Folgendes werden wir tun: Wir werden mehrmals eine Aufgabe außerhalb der GIL und eine innerhalb der GIL ausführen. Wir führen sie seriell aus, verwenden Threads und Prozesse. Definieren wir die Aufgaben:
1 |
import os |
2 |
import time |
3 |
import threading |
4 |
import multiprocessing |
5 |
|
6 |
NUM_WORKERS = 4 |
7 |
|
8 |
def only_sleep(): |
9 |
""" Do nothing, wait for a timer to expire """
|
10 |
print("PID: %s, Process Name: %s, Thread Name: %s" % ( |
11 |
os.getpid(), |
12 |
multiprocessing.current_process().name, |
13 |
threading.current_thread().name) |
14 |
)
|
15 |
time.sleep(1) |
16 |
|
17 |
|
18 |
def crunch_numbers(): |
19 |
""" Do some computations """
|
20 |
print("PID: %s, Process Name: %s, Thread Name: %s" % ( |
21 |
os.getpid(), |
22 |
multiprocessing.current_process().name, |
23 |
threading.current_thread().name) |
24 |
)
|
25 |
x = 0 |
26 |
while x < 10000000: |
27 |
x += 1 |
Wir haben zwei Aufgaben erstellt. Beide haben eine lange Laufzeit, aber nur crunch_numbers
führt aktiv Berechnungen durch. Lassen Sie uns only_sleep
seriell, multithreaded und mit mehreren Prozessen ausführen und die Ergebnisse vergleichen:
1 |
## Run tasks serially
|
2 |
start_time = time.time() |
3 |
for _ in range(NUM_WORKERS): |
4 |
only_sleep() |
5 |
end_time = time.time() |
6 |
|
7 |
print("Serial time=", end_time - start_time) |
8 |
|
9 |
# Run tasks using threads
|
10 |
start_time = time.time() |
11 |
threads = [threading.Thread(target=only_sleep) for _ in range(NUM_WORKERS)] |
12 |
[thread.start() for thread in threads] |
13 |
[thread.join() for thread in threads] |
14 |
end_time = time.time() |
15 |
|
16 |
print("Threads time=", end_time - start_time) |
17 |
|
18 |
# Run tasks using processes
|
19 |
start_time = time.time() |
20 |
processes = [multiprocessing.Process(target=only_sleep()) for _ in range(NUM_WORKERS)] |
21 |
[process.start() for process in processes] |
22 |
[process.join() for process in processes] |
23 |
end_time = time.time() |
24 |
|
25 |
print("Parallel time=", end_time - start_time) |
Hier ist die Ausgabe, die ich habe (Ihre sollte ähnlich sein, obwohl PIDs und Zeiten etwas variieren):
1 |
PID: 95726, Process Name: MainProcess, Thread Name: MainThread |
2 |
PID: 95726, Process Name: MainProcess, Thread Name: MainThread |
3 |
PID: 95726, Process Name: MainProcess, Thread Name: MainThread |
4 |
PID: 95726, Process Name: MainProcess, Thread Name: MainThread |
5 |
Serial time= 4.018089056015015 |
6 |
|
7 |
PID: 95726, Process Name: MainProcess, Thread Name: Thread-1 |
8 |
PID: 95726, Process Name: MainProcess, Thread Name: Thread-2 |
9 |
PID: 95726, Process Name: MainProcess, Thread Name: Thread-3 |
10 |
PID: 95726, Process Name: MainProcess, Thread Name: Thread-4 |
11 |
Threads time= 1.0047411918640137 |
12 |
|
13 |
PID: 95728, Process Name: Process-1, Thread Name: MainThread |
14 |
PID: 95729, Process Name: Process-2, Thread Name: MainThread |
15 |
PID: 95730, Process Name: Process-3, Thread Name: MainThread |
16 |
PID: 95731, Process Name: Process-4, Thread Name: MainThread |
17 |
Parallel time= 1.014023780822754 |
Hier einige Beobachtungen:
Im Fall des seriellen Ansatzes sind die Dinge ziemlich offensichtlich. Wir führen die Aufgaben nacheinander aus. Alle vier Läufe werden von demselben Thread desselben Prozesses ausgeführt.
Mithilfe von Prozessen reduzieren wir die Ausführungszeit auf ein Viertel der ursprünglichen Zeit, einfach weil die Aufgaben parallel ausgeführt werden. Beachten Sie, wie jede Aufgabe in einem anderen Prozess und auf dem
MainThread
dieses Prozesses ausgeführt wird.Mit Threads nutzen wir die Tatsache, dass die Aufgaben gleichzeitig ausgeführt werden können. Die Ausführungszeit wird ebenfalls auf ein Viertel reduziert, obwohl nichts parallel läuft. So geht's: Wir erzeugen den ersten Thread und er wartet darauf, dass der Timer abläuft. Wir unterbrechen die Ausführung und lassen sie warten, bis der Timer abgelaufen ist. In dieser Zeit erzeugen wir den zweiten Thread. Wir wiederholen dies für alle Threads. In einem Moment läuft der Timer des ersten Threads ab, sodass wir die Ausführung darauf umschalten und ihn beenden. Der Algorithmus wird für den zweiten und für alle anderen Threads wiederholt. Am Ende ist das Ergebnis, als ob die Dinge parallel laufen würden. Sie werden auch feststellen, dass die vier verschiedenen Threads von demselben Prozess abzweigen und darin leben:
MainProcess
.Möglicherweise stellen Sie sogar fest, dass der Thread-Ansatz schneller ist als der wirklich parallele. Das liegt am Overhead der Laichprozesse. Wie bereits erwähnt, ist das Laichen und Umschalten ein teurer Vorgang.
Lassen Sie uns die gleiche Routine ausführen, aber diesmal die Aufgabe crunch_numbers
ausführen:
1 |
start_time = time.time() |
2 |
for _ in range(NUM_WORKERS): |
3 |
crunch_numbers() |
4 |
end_time = time.time() |
5 |
|
6 |
print("Serial time=", end_time - start_time) |
7 |
|
8 |
start_time = time.time() |
9 |
threads = [threading.Thread(target=crunch_numbers) for _ in range(NUM_WORKERS)] |
10 |
[thread.start() for thread in threads] |
11 |
[thread.join() for thread in threads] |
12 |
end_time = time.time() |
13 |
|
14 |
print("Threads time=", end_time - start_time) |
15 |
|
16 |
|
17 |
start_time = time.time() |
18 |
processes = [multiprocessing.Process(target=crunch_numbers) for _ in range(NUM_WORKERS)] |
19 |
[process.start() for process in processes] |
20 |
[process.join() for process in processes] |
21 |
end_time = time.time() |
22 |
|
23 |
print("Parallel time=", end_time - start_time) |
Hier ist die Ausgabe, die ich habe:
1 |
PID: 96285, Process Name: MainProcess, Thread Name: MainThread |
2 |
PID: 96285, Process Name: MainProcess, Thread Name: MainThread |
3 |
PID: 96285, Process Name: MainProcess, Thread Name: MainThread |
4 |
PID: 96285, Process Name: MainProcess, Thread Name: MainThread |
5 |
Serial time= 2.705625057220459 |
6 |
PID: 96285, Process Name: MainProcess, Thread Name: Thread-1 |
7 |
PID: 96285, Process Name: MainProcess, Thread Name: Thread-2 |
8 |
PID: 96285, Process Name: MainProcess, Thread Name: Thread-3 |
9 |
PID: 96285, Process Name: MainProcess, Thread Name: Thread-4 |
10 |
Threads time= 2.6961309909820557 |
11 |
PID: 96289, Process Name: Process-1, Thread Name: MainThread |
12 |
PID: 96290, Process Name: Process-2, Thread Name: MainThread |
13 |
PID: 96291, Process Name: Process-3, Thread Name: MainThread |
14 |
PID: 96292, Process Name: Process-4, Thread Name: MainThread |
15 |
Parallel time= 0.8014059066772461 |
Der Hauptunterschied liegt hier im Ergebnis des Multithread-Ansatzes. Dieses Mal funktioniert es sehr ähnlich wie der serielle Ansatz, und hier ist der Grund: Da es Berechnungen durchführt und Python keine echte Parallelität ausführt, werden die Threads im Grunde nacheinander ausgeführt und führen zu einer Ausführung, bis sie alle fertig sind.
Das Python Parallel/Concurrent Programming Ecosystem
Python verfügt über umfangreiche APIs für die parallele/gleichzeitige Programmierung. In diesem Tutorial behandeln wir die beliebtesten, aber Sie müssen wissen, dass es für jeden Bedarf in diesem Bereich wahrscheinlich bereits etwas gibt, das Ihnen helfen kann, Ihr Ziel zu erreichen.
Im nächsten Abschnitt erstellen wir eine praktische Anwendung in vielen Formen unter Verwendung aller vorgestellten Bibliotheken. Hier sind ohne weiteres die Module/Bibliotheken, die wir behandeln werden:
threading
: Die Standardmethode zum Arbeiten mit Threads in Python. Es handelt sich um einen übergeordneten API-Wrapper über die vom_thread
-Modul bereitgestellten Funktionen, bei dem es sich um eine übergeordnete Schnittstelle zur Thread-Implementierung des Betriebssystems handelt.concurrent.futures
: Ein Modulteil der Standardbibliothek, der eine noch übergeordnete Abstraktionsschicht über Threads bereitstellt. Die Threads werden als asynchrone Aufgaben modelliert.multiprocessing
: Ähnlich wie dasthreading
-Modul, bietet eine sehr ähnliche Schnittstelle, verwendet jedoch Prozesse anstelle von Threads.gevent und greenlets
: Greenlets, auch als Micro-Threads bezeichnet, sind Ausführungseinheiten, die gemeinsam geplant werden können und gleichzeitig Aufgaben ohne großen Aufwand ausführen können.celery
: Eine übergeordnete Warteschlange für verteilte Aufgaben. Die Aufgaben werden gleichzeitig in die Warteschlange gestellt und unter Verwendung verschiedener Paradigmen wiemultiprocessing
odergevent
ausgeführt.
Erstellen einer praktischen Anwendung
Die Theorie zu kennen ist schön und gut, aber der beste Weg zu lernen ist, etwas Praktisches zu bauen, oder? In diesem Abschnitt werden wir eine klassische Art von Anwendung erstellen, die alle verschiedenen Paradigmen durchläuft.
Lassen Sie uns eine Anwendung erstellen, die die Verfügbarkeit von Websites überprüft. Es gibt viele solcher Lösungen, die bekanntesten sind wahrscheinlich Jetpack Monitor und Uptime Robot. Der Zweck dieser Apps besteht darin, Sie zu benachrichtigen, wenn Ihre Website nicht verfügbar ist, damit Sie schnell Maßnahmen ergreifen können. So funktionieren sie:
- Die Anwendung durchsucht sehr häufig eine Liste von Website-URLs und prüft, ob diese Websites aktiv sind.
- Jede Website sollte alle 5-10 Minuten überprüft werden, damit die Ausfallzeit nicht wesentlich ist.
- Anstatt eine klassische HTTP-GET-Anforderung auszuführen, wird eine HEAD-Anforderung ausgeführt, sodass Ihr Datenverkehr nicht wesentlich beeinträchtigt wird.
- Wenn der HTTP-Status in den Gefahrenbereichen (400+, 500+) liegt, wird der Eigentümer benachrichtigt.
- Der Eigentümer wird entweder per E-Mail, SMS oder Push-Benachrichtigung benachrichtigt.
Deshalb ist es wichtig, das Problem parallel/gleichzeitig anzugehen. Wenn die Liste der Websites wächst, garantiert uns das serielle Durchgehen der Liste nicht, dass jede Website etwa alle fünf Minuten überprüft wird. Die Websites können stundenlang nicht verfügbar sein und der Eigentümer wird nicht benachrichtigt.
Beginnen wir mit dem Schreiben einiger Dienstprogramme:
1 |
# utils.py
|
2 |
|
3 |
import time |
4 |
import logging |
5 |
import requests |
6 |
|
7 |
|
8 |
class WebsiteDownException(Exception): |
9 |
pass
|
10 |
|
11 |
|
12 |
def ping_website(address, timeout=20): |
13 |
"""
|
14 |
Check if a website is down. A website is considered down
|
15 |
if either the status_code >= 400 or if the timeout expires
|
16 |
|
17 |
Throw a WebsiteDownException if any of the website down conditions are met
|
18 |
"""
|
19 |
try: |
20 |
response = requests.head(address, timeout=timeout) |
21 |
if response.status_code >= 400: |
22 |
logging.warning("Website %s returned status_code=%s" % (address, response.status_code)) |
23 |
raise WebsiteDownException() |
24 |
except requests.exceptions.RequestException: |
25 |
logging.warning("Timeout expired for website %s" % address) |
26 |
raise WebsiteDownException() |
27 |
|
28 |
|
29 |
def notify_owner(address): |
30 |
"""
|
31 |
Send the owner of the address a notification that their website is down
|
32 |
|
33 |
For now, we're just going to sleep for 0.5 seconds but this is where
|
34 |
you would send an email, push notification or text-message
|
35 |
"""
|
36 |
logging.info("Notifying the owner of %s website" % address) |
37 |
time.sleep(0.5) |
38 |
|
39 |
|
40 |
def check_website(address): |
41 |
"""
|
42 |
Utility function: check if a website is down, if so, notify the user
|
43 |
"""
|
44 |
try: |
45 |
ping_website(address) |
46 |
except WebsiteDownException: |
47 |
notify_owner(address) |
Wir benötigen tatsächlich eine Website-Liste, um unser System auszuprobieren. Erstelle deine eigene Liste oder benutze meine:
1 |
# websites.py
|
2 |
|
3 |
WEBSITE_LIST = [ |
4 |
'https://envato.com', |
5 |
'http://amazon.co.uk', |
6 |
'http://amazon.com', |
7 |
'http://facebook.com', |
8 |
'http://google.com', |
9 |
'http://google.fr', |
10 |
'http://google.es', |
11 |
'http://google.co.uk', |
12 |
'http://internet.org', |
13 |
'http://gmail.com', |
14 |
'http://stackoverflow.com', |
15 |
'http://github.com', |
16 |
'http://heroku.com', |
17 |
'http://really-cool-available-domain.com', |
18 |
'http://djangoproject.com', |
19 |
'http://rubyonrails.org', |
20 |
'http://basecamp.com', |
21 |
'http://trello.com', |
22 |
'http://yiiframework.com', |
23 |
'http://shopify.com', |
24 |
'http://another-really-interesting-domain.co', |
25 |
'http://airbnb.com', |
26 |
'http://instagram.com', |
27 |
'http://snapchat.com', |
28 |
'http://youtube.com', |
29 |
'http://baidu.com', |
30 |
'http://yahoo.com', |
31 |
'http://live.com', |
32 |
'http://linkedin.com', |
33 |
'http://yandex.ru', |
34 |
'http://netflix.com', |
35 |
'http://wordpress.com', |
36 |
'http://bing.com', |
37 |
]
|
Normalerweise speichern Sie diese Liste zusammen mit den Kontaktinformationen des Eigentümers in einer Datenbank, damit Sie Kontakt mit ihnen aufnehmen können. Da dies nicht das Hauptthema dieses Tutorials ist, werden wir der Einfachheit halber nur diese Python-Liste verwenden.
Wenn Sie wirklich gut aufgepasst haben, haben Sie möglicherweise zwei wirklich lange Domains in der Liste bemerkt, die keine gültigen Websites sind (ich hoffe, niemand hat sie gekauft, als Sie dies lesen, um mir das Gegenteil zu beweisen!). Ich habe diese beiden Domains hinzugefügt, um sicherzustellen, dass bei jedem Lauf einige Websites nicht verfügbar sind. Nennen wir auch unsere App UptimeSquirrel.
Serieller Ansatz
Versuchen wir zunächst den seriellen Ansatz und sehen, wie schlecht er funktioniert. Wir werden dies als Basis betrachten.
1 |
# serial_squirrel.py
|
2 |
|
3 |
import time |
4 |
|
5 |
|
6 |
start_time = time.time() |
7 |
|
8 |
for address in WEBSITE_LIST: |
9 |
check_website(address) |
10 |
|
11 |
end_time = time.time() |
12 |
|
13 |
print("Time for SerialSquirrel: %ssecs" % (end_time - start_time)) |
14 |
|
15 |
# WARNING:root:Timeout expired for website http://really-cool-available-domain.com
|
16 |
# WARNING:root:Timeout expired for website http://another-really-interesting-domain.co
|
17 |
# WARNING:root:Website http://bing.com returned status_code=405
|
18 |
# Time for SerialSquirrel: 15.881232261657715secs
|
Threading-Ansatz
Wir werden mit der Implementierung des Threaded-Ansatzes etwas kreativer. Wir verwenden eine Warteschlange, um die Adressen einzufügen und Arbeitsthreads zu erstellen, um sie aus der Warteschlange zu entfernen und zu verarbeiten. Wir werden warten, bis die Warteschlange leer ist, was bedeutet, dass alle Adressen von unseren Arbeitsthreads verarbeitet wurden.
1 |
# threaded_squirrel.py
|
2 |
|
3 |
import time |
4 |
from queue import Queue |
5 |
from threading import Thread |
6 |
|
7 |
NUM_WORKERS = 4 |
8 |
task_queue = Queue() |
9 |
|
10 |
def worker(): |
11 |
# Constantly check the queue for addresses
|
12 |
while True: |
13 |
address = task_queue.get() |
14 |
check_website(address) |
15 |
|
16 |
# Mark the processed task as done
|
17 |
task_queue.task_done() |
18 |
|
19 |
start_time = time.time() |
20 |
|
21 |
# Create the worker threads
|
22 |
threads = [Thread(target=worker) for _ in range(NUM_WORKERS)] |
23 |
|
24 |
# Add the websites to the task queue
|
25 |
[task_queue.put(item) for item in WEBSITE_LIST] |
26 |
|
27 |
# Start all the workers
|
28 |
[thread.start() for thread in threads] |
29 |
|
30 |
# Wait for all the tasks in the queue to be processed
|
31 |
task_queue.join() |
32 |
|
33 |
|
34 |
end_time = time.time() |
35 |
|
36 |
print("Time for ThreadedSquirrel: %ssecs" % (end_time - start_time)) |
37 |
|
38 |
# WARNING:root:Timeout expired for website http://really-cool-available-domain.com
|
39 |
# WARNING:root:Timeout expired for website http://another-really-interesting-domain.co
|
40 |
# WARNING:root:Website http://bing.com returned status_code=405
|
41 |
# Time for ThreadedSquirrel: 3.110753059387207secs
|
concurrent.futures
Wie bereits erwähnt, ist concurrent.futures
eine API auf hoher Ebene für die Verwendung von Threads. Der Ansatz, den wir hier verfolgen, impliziert die Verwendung eines ThreadPoolExecutors
. Wir werden Aufgaben an den Pool senden und Futures zurückerhalten. Dies sind Ergebnisse, die uns in Zukunft zur Verfügung stehen werden. Natürlich können wir warten, bis alle Futures zu tatsächlichen Ergebnissen werden.
1 |
# future_squirrel.py
|
2 |
|
3 |
import time |
4 |
import concurrent.futures |
5 |
|
6 |
NUM_WORKERS = 4 |
7 |
|
8 |
start_time = time.time() |
9 |
|
10 |
with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor: |
11 |
futures = {executor.submit(check_website, address) for address in WEBSITE_LIST} |
12 |
concurrent.futures.wait(futures) |
13 |
|
14 |
end_time = time.time() |
15 |
|
16 |
print("Time for FutureSquirrel: %ssecs" % (end_time - start_time)) |
17 |
|
18 |
# WARNING:root:Timeout expired for website http://really-cool-available-domain.com
|
19 |
# WARNING:root:Timeout expired for website http://another-really-interesting-domain.co
|
20 |
# WARNING:root:Website http://bing.com returned status_code=405
|
21 |
# Time for FutureSquirrel: 1.812899112701416secs
|
Der Multiprocessing-Ansatz
Die multiprozessor
-Bibliothek bietet eine fast Drop-In-Ersatz-API für die threading
-Bibliothek. In diesem Fall werden wir einen Ansatz verfolgen, der dem von concurrent.futures
ähnlicher ist. Wir richten ein multiprocessing.Pool
ein und senden ihm Aufgaben, indem wir eine Funktion der Adressliste zuordnen (denken Sie an die klassische Python-map
-Funktion).
1 |
# multiprocessing_squirrel.py
|
2 |
|
3 |
import time |
4 |
import socket |
5 |
import multiprocessing |
6 |
|
7 |
NUM_WORKERS = 4 |
8 |
|
9 |
start_time = time.time() |
10 |
|
11 |
with multiprocessing.Pool(processes=NUM_WORKERS) as pool: |
12 |
results = pool.map_async(check_website, WEBSITE_LIST) |
13 |
results.wait() |
14 |
|
15 |
end_time = time.time() |
16 |
|
17 |
print("Time for MultiProcessingSquirrel: %ssecs" % (end_time - start_time)) |
18 |
|
19 |
# WARNING:root:Timeout expired for website http://really-cool-available-domain.com
|
20 |
# WARNING:root:Timeout expired for website http://another-really-interesting-domain.co
|
21 |
# WARNING:root:Website http://bing.com returned status_code=405
|
22 |
# Time for MultiProcessingSquirrel: 2.8224599361419678secs
|
Gevent
Gevent ist eine beliebte Alternative, um eine massive Parallelität zu erreichen. Es gibt einige Dinge, die Sie wissen müssen, bevor Sie es verwenden:
Code, der gleichzeitig von Greenlets ausgeführt wird, ist deterministisch. Im Gegensatz zu den anderen vorgestellten Alternativen garantiert dieses Paradigma, dass Sie für zwei identische Läufe immer die gleichen Ergebnisse in der gleichen Reihenfolge erhalten.
Sie müssen Standardfunktionen von Affen-Patches ausführen, damit diese mit gevent zusammenarbeiten. Das meine ich damit. Normalerweise blockiert ein Socket-Vorgang. Wir warten auf den Abschluss der Operation. Wenn wir uns in einer Multithread-Umgebung befinden, wechselt der Scheduler einfach zu einem anderen Thread, während der andere auf E/A wartet. Da wir uns nicht in einer Multithread-Umgebung befinden, patcht gevent die Standardfunktionen so, dass sie nicht mehr blockieren und die Kontrolle an den gevent-Scheduler zurückgeben.
Führen Sie zum Installieren von gevent Folgendes aus: pip install gevent
So verwenden Sie gevent, um unsere Aufgabe mit einem gevent.pool.Pool
auszuführen:
1 |
# green_squirrel.py
|
2 |
|
3 |
import time |
4 |
from gevent.pool import Pool |
5 |
from gevent import monkey |
6 |
|
7 |
# Note that you can spawn many workers with gevent since the cost of creating and switching is very low
|
8 |
NUM_WORKERS = 4 |
9 |
|
10 |
# Monkey-Patch socket module for HTTP requests
|
11 |
monkey.patch_socket() |
12 |
|
13 |
start_time = time.time() |
14 |
|
15 |
pool = Pool(NUM_WORKERS) |
16 |
for address in WEBSITE_LIST: |
17 |
pool.spawn(check_website, address) |
18 |
|
19 |
# Wait for stuff to finish
|
20 |
pool.join() |
21 |
|
22 |
end_time = time.time() |
23 |
|
24 |
print("Time for GreenSquirrel: %ssecs" % (end_time - start_time)) |
25 |
# Time for GreenSquirrel: 3.8395519256591797secs
|
Celery
Sellerie ist ein Ansatz, der sich größtenteils von dem unterscheidet, was wir bisher gesehen haben. Es ist kampferprobt in sehr komplexen und leistungsstarken Umgebungen. Das Einrichten von Sellerie erfordert etwas mehr Basteln als alle oben genannten Lösungen.
Zuerst müssen wir Celery installieren:
pip install celery
Aufgaben sind die zentralen Konzepte innerhalb des Sellerieprojekts. Alles, was Sie in Sellerie ausführen möchten, muss eine Aufgabe sein. Sellerie bietet große Flexibilität beim Ausführen von Aufgaben: Sie können sie synchron oder asynchron, in Echtzeit oder geplant, auf demselben Computer oder auf mehreren Computern und unter Verwendung von Threads, Prozessen, Eventlet oder Gevent ausführen.
Die Anordnung wird etwas komplexer sein. Sellerie verwendet andere Dienste zum Senden und Empfangen von Nachrichten. Diese Nachrichten sind normalerweise Aufgaben oder Ergebnisse von Aufgaben. Wir werden Redis in diesem Tutorial für diesen Zweck verwenden. Redis ist eine gute Wahl, da es sehr einfach zu installieren und zu konfigurieren ist und Sie es möglicherweise bereits in Ihrer Anwendung für andere Zwecke verwenden, z. B. für Caching und pub/sub.
Sie können Redis installieren, indem Sie den Anweisungen auf der Redis-Schnellstartseite folgen. Vergessen Sie nicht, die redis
Python-Bibliothek, pip install redis
und das für die Verwendung von Redis und Celery erforderliche Bundle zu installieren: pip install cellery[redis]
.
Starten Sie den Redis-Server wie folgt: $ redis-server
Um mit dem Erstellen von Sellerie zu beginnen, müssen wir zuerst eine Sellerie-Anwendung erstellen. Danach muss Sellerie wissen, welche Art von Aufgaben er ausführen kann. Um dies zu erreichen, müssen wir Aufgaben in der Sellerie-Anwendung registrieren. Wir machen das mit dem @app.task
Dekorator:
1 |
# celery_squirrel.py
|
2 |
|
3 |
import time |
4 |
from utils import check_website |
5 |
from data import WEBSITE_LIST |
6 |
from celery import Celery |
7 |
from celery.result import ResultSet |
8 |
|
9 |
app = Celery('celery_squirrel', |
10 |
broker='redis://localhost:6379/0', |
11 |
backend='redis://localhost:6379/0') |
12 |
|
13 |
@app.task |
14 |
def check_website_task(address): |
15 |
return check_website(address) |
16 |
|
17 |
if __name__ == "__main__": |
18 |
start_time = time.time() |
19 |
|
20 |
# Using `delay` runs the task async
|
21 |
rs = ResultSet([check_website_task.delay(address) for address in WEBSITE_LIST]) |
22 |
|
23 |
# Wait for the tasks to finish
|
24 |
rs.get() |
25 |
|
26 |
end_time = time.time() |
27 |
|
28 |
print("CelerySquirrel:", end_time - start_time) |
29 |
# CelerySquirrel: 2.4979639053344727
|
Keine Panik, wenn nichts passiert. Denken Sie daran, Sellerie ist ein Dienst, und wir müssen ihn ausführen. Bisher haben wir die Aufgaben nur in Redis platziert, aber Celery nicht gestartet, um sie auszuführen. Dazu müssen wir diesen Befehl in dem Ordner ausführen, in dem sich unser Code befindet:
celery worker -A do_celery --loglevel=debug --concurrency=4
Führen Sie nun das Python-Skript erneut aus und sehen Sie, was passiert. Beachten Sie Folgendes: Beachten Sie, wie wir die Redis-Adresse zweimal an unsere Redis-Anwendung übergeben haben. Der broker
-Parameter gibt an, wo die Aufgaben an Celery übergeben werden, und im backend
legt Celery die Ergebnisse ab, damit wir sie in unserer App verwenden können. Wenn wir kein Ergebnis-backend
angeben, können wir nicht wissen, wann die Aufgabe verarbeitet wurde und was das Ergebnis war.
Beachten Sie außerdem, dass sich die Protokolle jetzt in der Standardausgabe des Sellerieprozesses befinden. Überprüfen Sie sie daher unbedingt im entsprechenden Terminal.
Schlussfolgerungen
Ich hoffe, dies war eine interessante Reise für Sie und eine gute Einführung in die Welt der parallelen / gleichzeitigen Programmierung in Python. Dies ist das Ende der Reise, und wir können einige Schlussfolgerungen ziehen:
- Es gibt verschiedene Paradigmen, die uns helfen, Hochleistungs-Computing in Python zu erreichen.
- Für das Multithread-Paradigma haben wir die Bibliotheken
threading
undconcurrent.futures
. -
multiprocessing
bietet eine sehr ähnliche Schnittstelle zumthreading
, jedoch für Prozesse und nicht für Threads. - Denken Sie daran, dass Prozesse eine echte Parallelität erzielen, deren Erstellung jedoch teurer ist.
- Denken Sie daran, dass in einem Prozess möglicherweise mehr Threads ausgeführt werden.
- Verwechseln Sie nicht parallel mit gleichzeitig. Denken Sie daran, dass nur der parallele Ansatz Multi-Core-Prozessoren nutzt, während die gleichzeitige Programmierung Aufgaben intelligent plant, sodass das Warten auf lang laufende Vorgänge während der parallelen eigentlichen Berechnung erfolgt.