Advertisement
  1. Code
  2. Python

Introducción de Paralelo y Concurrente a la Programación en Python

Scroll to top
Read Time: 20 min

() translation by (you can also view the original English article)

Python es uno de los idiomas de programación más populares de procesamiento y ciencia de datos en general. El ecosistema proporciona una gran cantidad de librerías y frameworks que facilitan la informática de alto rendimiento. Haciendo la programación en Python puede resultar bastante difícil, pienso.

En este tutorial, vamos a estudiar por qué paralelismo es difícil especialmente en el contexto de Python y para eso, nos dirigiremos a través de los siguientes:

  • Por qué es difícil el paralelismo en Python  (pista: Esto es debido al GIL—Global interpreter lock).
  • Hilos vs. Procesos: Diferentes maneras de lograr paralelismo. ¿Cuándo utilizar uno sobre el otro?
  • Paralelo vs. Concurrente: Por qué en algunos casos podemos resolver para la concurrencia en lugar de paralelismo.
  • La construcción de un ejemplo sencillo pero práctico utilizando las diferentes técnicas que se discuten.

Global Interpreter Lock

El Global Interpreter Lock (GIL) es uno de los temas más controvertidos en el mundo de Python. En CPython, la más popular aplicación de Python, el GIL es un mutex que hace las cosas seguras para subprocesos. El GIL, es fácil de integrar con librerías externas que no son a salvo de amenazas, y hace código no-paralelo más rápido. Esto viene en un coste, sin embargo. Debido al GIL, no podemos lograr cierto paralelismo a través de subprocesos múltiples. Básicamente, dos subprocesos nativos diferentes de un mismo proceso no pueden ejecutar código Python a la vez.

Las cosas no están malas, aunque, y aquí está el porqué: lo que sucede fuera del ámbito de GIL es libre de ser paralelas. En esta categoría caen tareas de larga duración como I/O y, afortunadamente, las bibliotecas como numpy.

Hilos y procesos

Python no es realmente multitarea. Pero ¿qué es un hilo? Vamos a dar un paso atrás y mirar las cosas en perspectiva.

Un proceso es una abstracción del sistema operativo básico. Es un programa que está en ejecución, es decir, código que se ejecuta. Múltiples procesos siempre se ejecutan en un ordenador, y se están ejecutando en paralelo.

Un proceso puede tener múltiples hilos. Ejecutan el mismo código que el proceso padre. Lo ideal es que corren en paralelo, pero no necesariamente. La razón por qué procesos no son suficientes es porque las aplicaciones deben ser sensibles y escuchar las acciones del usuario mientras que actualizar la pantalla y guardar un archivo.

Si todavía es un poco confuso, aquí es un hoja de trucos:

PROCESOS
HILOS
Procesos no comparten memoria
Hilos corparten memoria
Procesos de desove/la conmutación es cara
Desove/conmutación de hilos es menos costosa
Los procesos requieren más recursos
Hilos requieren menos recursos (a veces se llaman procesos ligeros)
No necesita sincronización de la memoria
Es necesario utilizar mecanismos de sincronización para asegurarse de que los datos son manipulados correctamente

Hay una receta que se adapta a todo. Elegir uno es grandemente dependiente sobre el contexto y la tarea que está intentando lograr.

Paralelo vs. Concurrente

Ahora dar un paso más y sumérgete en simultaneidad. Concurrencia a menudo es mal entendida y confundido con paralelismo. No es el caso. Concurrencia implica programación de código independiente a ser ejecutadas en forma cooperativa. Aprovechando el hecho de que está esperando un pedazo de código en las operaciones de I/O  y durante ese tiempo ejecutar una parte diferente, pero independiente del código.

En Python, podemos lograr ligero comportamiento concurrente vía greenlets. Desde una perspectiva paralela, utilizando hilos o greenlets es equivalente porque ninguno de ellos corre en paralelo. Greenlets son incluso menos costosas crear esos hilos. Por eso, greenlets fuertemente se utilizan para llevar a cabo un gran número de tareas simples de I/O, como las que generalmente se encuentran en servidores web y redes.

Ahora que sabemos la diferencia entre hilos y procesos, paralelo y concurrente, podemos ilustrar cómo las diferentes tareas que se realizan en los dos paradigmas. Aquí esta lo que vamos a hacer: se ejecutarán, en múltiples ocasiones, una tarea fuera del GIL y uno en su interior. Estamos ejecutándolos en serie, con hilos y procesos. Vamos a definir las tareas:

1
import os
2
import time
3
import threading
4
import multiprocessing
5
6
NUM_WORKERS = 4
7
8
def only_sleep():
9
    """ Do nothing, wait for a timer to expire """
10
    print("PID: %s, Process Name: %s, Thread Name: %s" % (
11
        os.getpid(),
12
        multiprocessing.current_process().name,
13
        threading.current_thread().name)
14
    )
15
    time.sleep(1)
16
17
18
def crunch_numbers():
19
    """ Do some computations """
20
    print("PID: %s, Process Name: %s, Thread Name: %s" % (
21
        os.getpid(),
22
        multiprocessing.current_process().name,
23
        threading.current_thread().name)
24
    )
25
    x = 0
26
    while x < 10000000:
27
        x += 1

Hemos creado dos tareas. Ambos son de larga duración, pero sólo crunch_numbers activamente realiza cálculos. Vamos a correr only_sleep en serie, utilizando múltiples procesos y comparar los resultados:

1
## Run tasks serially

2
start_time = time.time()
3
for _ in range(NUM_WORKERS):
4
    only_sleep()
5
end_time = time.time()
6
7
print("Serial time=", end_time - start_time)
8
9
# Run tasks using threads

10
start_time = time.time()
11
threads = [threading.Thread(target=only_sleep) for _ in range(NUM_WORKERS)]
12
[thread.start() for thread in threads]
13
[thread.join() for thread in threads]
14
end_time = time.time()
15
16
print("Threads time=", end_time - start_time)
17
18
# Run tasks using processes

19
start_time = time.time()
20
processes = [multiprocessing.Process(target=only_sleep()) for _ in range(NUM_WORKERS)]
21
[process.start() for process in processes]
22
[process.join() for process in processes]
23
end_time = time.time()
24
25
print("Parallel time=", end_time - start_time)

Aquí está la salida que tengo (tuyo debe ser similar, aunque los PIDs y tiempos varían un poco):

1
PID: 95726, Process Name: MainProcess, Thread Name: MainThread
2
PID: 95726, Process Name: MainProcess, Thread Name: MainThread
3
PID: 95726, Process Name: MainProcess, Thread Name: MainThread
4
PID: 95726, Process Name: MainProcess, Thread Name: MainThread
5
Serial time= 4.018089056015015
6
7
PID: 95726, Process Name: MainProcess, Thread Name: Thread-1
8
PID: 95726, Process Name: MainProcess, Thread Name: Thread-2
9
PID: 95726, Process Name: MainProcess, Thread Name: Thread-3
10
PID: 95726, Process Name: MainProcess, Thread Name: Thread-4
11
Threads time= 1.0047411918640137
12
13
PID: 95728, Process Name: Process-1, Thread Name: MainThread
14
PID: 95729, Process Name: Process-2, Thread Name: MainThread
15
PID: 95730, Process Name: Process-3, Thread Name: MainThread
16
PID: 95731, Process Name: Process-4, Thread Name: MainThread
17
Parallel time= 1.014023780822754

Aquí están algunas observaciones:

  • En el caso del enfoque de la serie, es bastante obvio. Estamos ejecutando las tareas una tras otra. Todas las ejecuciones de cuatro son ejecutadas por el mismo hilo del mismo proceso.

  • Usando procesos para acortar el tiempo de ejecución hasta un cuarto del tiempo original, simplemente porque las tareas se ejecutan en paralelo. Observe cómo cada tarea se realiza en un proceso diferente y en el MainThread de ese proceso.

  • Usando hilos aprovechamos el hecho de que las tareas pueden ser ejecutadas simultáneamente. El tiempo de ejecución también es reducir a un cuarto, aunque nada está funcionando en paralelo. Aquí es cómo va: Generamos el primer hilo y empieza esperar que el temporizador expire. Hacer una pausa en su ejecución, dejar que esperar a que el temporizador expire, y en este tiempo nos generan el segundo proceso. Repetimos esto para todos los hilos. En un momento el temporizador del primer hilo vence así cambiar ejecución y lo terminemos. El algoritmo se repite para el segundo y para todos los hilos. Al final, el resultado es como si las cosas se ejecutaron en paralelo. Usted notará también que los cuatro hilos diferentes se desprenden y viven dentro del mismo proceso: MainProcess.

  • Incluso puede notar que el enfoque roscado es más rápido que el verdaderamente paralelo. Eso es debido a la sobrecarga de los procesos de desove. Como mencionamos anteriormente, el desove y la conmutación de procesos es una operación costosa.

Vamos a hacer la misma rutina pero esta vez ejecutando la tarea de crunch_numbers:

1
start_time = time.time()
2
for _ in range(NUM_WORKERS):
3
    crunch_numbers()
4
end_time = time.time()
5
6
print("Serial time=", end_time - start_time)
7
8
start_time = time.time()
9
threads = [threading.Thread(target=crunch_numbers) for _ in range(NUM_WORKERS)]
10
[thread.start() for thread in threads]
11
[thread.join() for thread in threads]
12
end_time = time.time()
13
14
print("Threads time=", end_time - start_time)
15
16
17
start_time = time.time()
18
processes = [multiprocessing.Process(target=crunch_numbers) for _ in range(NUM_WORKERS)]
19
[process.start() for process in processes]
20
[process.join() for process in processes]
21
end_time = time.time()
22
23
print("Parallel time=", end_time - start_time)

Aquí está la salida que tengo:

1
PID: 96285, Process Name: MainProcess, Thread Name: MainThread
2
PID: 96285, Process Name: MainProcess, Thread Name: MainThread
3
PID: 96285, Process Name: MainProcess, Thread Name: MainThread
4
PID: 96285, Process Name: MainProcess, Thread Name: MainThread
5
Serial time= 2.705625057220459
6
PID: 96285, Process Name: MainProcess, Thread Name: Thread-1
7
PID: 96285, Process Name: MainProcess, Thread Name: Thread-2
8
PID: 96285, Process Name: MainProcess, Thread Name: Thread-3
9
PID: 96285, Process Name: MainProcess, Thread Name: Thread-4
10
Threads time= 2.6961309909820557
11
PID: 96289, Process Name: Process-1, Thread Name: MainThread
12
PID: 96290, Process Name: Process-2, Thread Name: MainThread
13
PID: 96291, Process Name: Process-3, Thread Name: MainThread
14
PID: 96292, Process Name: Process-4, Thread Name: MainThread
15
Parallel time= 0.8014059066772461

La principal diferencia es en el resultado del enfoque multihilo. Esta vez realiza de manera muy similar al enfoque de serie, y esta es la razón: ya que realiza cálculos y Python no realiza paralelismo real, los hilos de rosca básicamente ejecuta uno tras otro, dando ejecución a uno otro hasta que todos terminan.

El ecosistema de programación Paralelo/Concurrente en Python 

Python tiene ricos APIs para hacer programación paralelo/concurrente. En este tutorial que estamos cubriendo los más populares, pero debes saber que para cualquier necesidad tiene en este dominio, allí es probablemente algo ya fuera allí que puede ayudarle a alcanzar su meta.

En la siguiente sección, construiremos una aplicación práctica en muchas formas, usando todas las bibliotecas que se presenta. Sin más preámbulos, aquí están las módulos y las bibliotecas que vamos a cubrir:

  • threading: la manera estándar de trabajar con hilos en Python. Es un contenedor de API de alto nivel sobre la funcionalidad expuesta por el módulo _thread, que es una interfaz de bajo nivel sobre la implementación de hilos del sistema operativo.

  • Concurrent.Futures: una parte del módulo de la biblioteca estándar que proporciona una capa de abstracción de alto nivel incluso sobre roscas. Los hilos se modelan como tareas asincrónicas.

  • multiprocessing: Similar al módulo threading, que ofrece una interfaz muy similar, pero con procesos en lugar de hilos.

  • gevent and greenlets: Greenlets, también llamados micro-hilos de rosca, son las unidades de ejecución que puede ser programado de forma colaborativa y puede realizar tareas simultáneamente sin mucho arriba.

  • celery: Una distribudor de tareas de alto nivel. Las tareas están en cola y ejecutaron concurrentemente utilizando diversos paradigmas como multiprocessing o gevent.

Creación de una aplicación práctica

Conocer la teoría es agradable y muy bien, pero la mejor manera de aprender es construir algo práctico, bien? En esta sección, vamos a construir un tipo clásico de aplicación pasando por los diferentes paradigmas.

Vamos a construir una aplicación que comprueba el tiempo de funcionamiento de sitios Web. Hay un montón de esas soluciones, los más conocidos son probablemente Jetpack Monitor y Uptime Robot. El propósito de estas aplicaciones es para que le notifique cuando su sitio está abajo para que rápidamente pueda tomar acción. Aquí es cómo funcionan:

  • La aplicación va con frecuencia en una lista de URLs de la página web y verifica si estos sitios están arriba.
  • Cada sitio web se deben comprobar cada 5-10 minutos para que el tiempo de inactividad sea significativa.
  • En lugar de realizar una petición HTTP GET clásica, realiza una petición principal por lo que no afecta significativamente su tráfico.
  • Si el Estado HTTP está en los rangos de peligro (400, 500 +), el propietario es notificado.
  • El propietario es notificado por correo electrónico, mensaje de texto o notificaciones de empuje.

He aquí por qué es esencial adoptar un enfoque paralelo/concurrente para el problema. A medida que crece la lista de sitios web, pasando por la lista en serie no nos garantiza que cada sitio es revisado cada cinco minutos o menos. Los sitios web podrían estar abajo durante horas, y el dueño no ser notificado.

Vamos a comenzar escribiendo algunas utilidades:

1
# utils.py

2
3
import time
4
import logging
5
import requests
6
7
8
class WebsiteDownException(Exception):
9
    pass
10
11
12
def ping_website(address, timeout=20):
13
    """

14
    Check if a website is down. A website is considered down 

15
    if either the status_code >= 400 or if the timeout expires

16
    

17
    Throw a WebsiteDownException if any of the website down conditions are met

18
    """
19
    try:
20
        response = requests.head(address, timeout=timeout)
21
        if response.status_code >= 400:
22
            logging.warning("Website %s returned status_code=%s" % (address, response.status_code))
23
            raise WebsiteDownException()
24
    except requests.exceptions.RequestException:
25
        logging.warning("Timeout expired for website %s" % address)
26
        raise WebsiteDownException()
27
        
28
29
def notify_owner(address):
30
    """ 

31
    Send the owner of the address a notification that their website is down 

32
    

33
    For now, we're just going to sleep for 0.5 seconds but this is where 

34
    you would send an email, push notification or text-message

35
    """
36
    logging.info("Notifying the owner of %s website" % address)
37
    time.sleep(0.5)
38
    
39
40
def check_website(address):
41
    """

42
    Utility function: check if a website is down, if so, notify the user

43
    """
44
    try:
45
        ping_website(address)
46
    except WebsiteDownException:
47
        notify_owner(address)

Realmente necesitaremos una lista de sitio web para probar nuestro sistema. Crear su propia lista o utiliza las mias:

1
# websites.py

2
3
WEBSITE_LIST = [
4
    'https://envato.com',
5
    'http://amazon.co.uk',
6
    'http://amazon.com',
7
    'http://facebook.com',
8
    'http://google.com',
9
    'http://google.fr',
10
    'http://google.es',
11
    'http://google.co.uk',
12
    'http://internet.org',
13
    'http://gmail.com',
14
    'http://stackoverflow.com',
15
    'http://github.com',
16
    'http://heroku.com',
17
    'http://really-cool-available-domain.com',
18
    'http://djangoproject.com',
19
    'http://rubyonrails.org',
20
    'http://basecamp.com',
21
    'http://trello.com',
22
    'http://yiiframework.com',
23
    'http://shopify.com',
24
    'http://another-really-interesting-domain.co',
25
    'http://airbnb.com',
26
    'http://instagram.com',
27
    'http://snapchat.com',
28
    'http://youtube.com',
29
    'http://baidu.com',
30
    'http://yahoo.com',
31
    'http://live.com',
32
    'http://linkedin.com',
33
    'http://yandex.ru',
34
    'http://netflix.com',
35
    'http://wordpress.com',
36
    'http://bing.com',
37
]

Normalmente, mantener esta lista en una base de datos junto con información de contacto del propietario, para que usted puede contactar con ellos. Ya que no es el tema principal de este tutorial y por simplicidad, sólo vamos a utilizar esta lista de Python.

Si usted presto realmente buena atención, usted puede haber notado dos dominios muy largos en la lista que no son sitios web válido (espero que nadie los compró en el momento en que estás leyendo esto para probarme equivocado!). He añadido estos dos dominios para asegurarse de que tenemos algunos sitios web abajo de cada corrida. También, vamos a nombrar nuestra aplicación UptimeSquirrel.

Enfoque Serial

En primer lugar, vamos a probar el método serial y ver lo mal funciona. Esto consideramos la línea de base.

1
# serial_squirrel.py

2
3
import time
4
5
6
start_time = time.time()
7
8
for address in WEBSITE_LIST:
9
    check_website(address)
10
        
11
end_time = time.time()        
12
13
print("Time for SerialSquirrel: %ssecs" % (end_time - start_time))
14
15
# WARNING:root:Timeout expired for website http://really-cool-available-domain.com

16
# WARNING:root:Timeout expired for website http://another-really-interesting-domain.co

17
# WARNING:root:Website http://bing.com returned status_code=405

18
# Time for SerialSquirrel: 15.881232261657715secs

Enfoque de Rosca

Vamos a ser un poco más creativo con la aplicación del enfoque por roscado. Estamos usando una cola para poner las direcciones en el y crear hilos trabajando para sacarlos de la cola y procesarlos. Vamos a esperar la cola este vacía, lo que significa que todas las direcciones han sido procesadas por los hilos trabajadores.

1
# threaded_squirrel.py

2
3
import time
4
from queue import Queue
5
from threading import Thread
6
7
NUM_WORKERS = 4
8
task_queue = Queue()
9
10
def worker():
11
    # Constantly check the queue for addresses

12
    while True:
13
        address = task_queue.get()
14
        check_website(address)
15
        
16
        # Mark the processed task as done

17
        task_queue.task_done()
18
19
start_time = time.time()
20
        
21
# Create the worker threads

22
threads = [Thread(target=worker) for _ in range(NUM_WORKERS)]
23
24
# Add the websites to the task queue

25
[task_queue.put(item) for item in WEBSITE_LIST]
26
27
# Start all the workers

28
[thread.start() for thread in threads]
29
30
# Wait for all the tasks in the queue to be processed

31
task_queue.join()
32
33
        
34
end_time = time.time()        
35
36
print("Time for ThreadedSquirrel: %ssecs" % (end_time - start_time))
37
38
# WARNING:root:Timeout expired for website http://really-cool-available-domain.com

39
# WARNING:root:Timeout expired for website http://another-really-interesting-domain.co

40
# WARNING:root:Website http://bing.com returned status_code=405

41
# Time for ThreadedSquirrel: 3.110753059387207secs

concurrent.futures

Como se indicó anteriormente, concurrent.futures es un API de alto nivel para el uso de hilos. El enfoque que estamos adoptando aquí implica usar un ThreadPoolExecutor. Vamos a entregar las tareas asignadas a la piscina y volver después, que son resultados que estarán disponibles para nosotros en el futuro. Por supuesto, podemos esperar para que el futuro de todas para convertirse en resultados reales.

1
# future_squirrel.py

2
3
import time
4
import concurrent.futures
5
6
NUM_WORKERS = 4
7
8
start_time = time.time()
9
10
with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:
11
    futures = {executor.submit(check_website, address) for address in WEBSITE_LIST}
12
    concurrent.futures.wait(futures)
13
14
end_time = time.time()        
15
16
print("Time for FutureSquirrel: %ssecs" % (end_time - start_time))
17
18
# WARNING:root:Timeout expired for website http://really-cool-available-domain.com

19
# WARNING:root:Timeout expired for website http://another-really-interesting-domain.co

20
# WARNING:root:Website http://bing.com returned status_code=405

21
# Time for FutureSquirrel: 1.812899112701416secs

El enfoque Multiprocessing

La biblioteca de multiprocessing proporciona un casi caer  en reemplazo de la API para la biblioteca de threading. En este caso, vamos a tomar un enfoque más parecido a la concurrent.futures uno. Estamos estableciendo un multiprocessing.Pool y presentación de las tareas mediante la asignación de una función a la lista de direcciones (piénsese en la función de map clásico de Python).

1
# multiprocessing_squirrel.py

2
3
import time
4
import socket
5
import multiprocessing
6
7
NUM_WORKERS = 4
8
9
start_time = time.time()
10
11
with multiprocessing.Pool(processes=NUM_WORKERS) as pool:
12
    results = pool.map_async(check_website, WEBSITE_LIST)
13
    results.wait()
14
15
end_time = time.time()        
16
17
print("Time for MultiProcessingSquirrel: %ssecs" % (end_time - start_time))
18
19
# WARNING:root:Timeout expired for website http://really-cool-available-domain.com

20
# WARNING:root:Timeout expired for website http://another-really-interesting-domain.co

21
# WARNING:root:Website http://bing.com returned status_code=405

22
# Time for MultiProcessingSquirrel: 2.8224599361419678secs

Gevent

Gevent es una alternativa popular para lograr concurrencia masiva. Hay algunas cosas que necesita saber antes de usarlo: 

  • Código efectúa simultáneamente por greenlets es determinista. A diferencia de las otras alternativas presentadas, este paradigma garantiza eso para cualquier delas dos carreras idénticos, siempre obtendrá los mismos resultados en el mismo orden.

  • Necesita para funciones estándar de parche mono que cooperan con gevent. Aquí es lo que quiero decir por. Normalmente, una operación de socket está bloqueando. Estamos esperando para que la operación de acabado. Si estuviéramos en un entorno multihilo, el programador simplemente le cambie a otro hilo mientras otro está esperando por el I/O. Ya que no estamos en un entorno multihilo, gevent parches las funciones estándar para que tomen control de no bloqueo y retorno para el programador gevent.

Para instalar gevent ejecute: pip instalar gevent

Aquí es cómo se utiliza gevent para realizar nuestra tarea usando un gevent.pool.Pool:

1
# green_squirrel.py

2
3
import time
4
from gevent.pool import Pool
5
from gevent import monkey
6
7
# Note that you can spawn many workers with gevent since the cost of creating and switching is very low

8
NUM_WORKERS = 4
9
10
# Monkey-Patch socket module for HTTP requests

11
monkey.patch_socket()
12
13
start_time = time.time()
14
15
pool = Pool(NUM_WORKERS)
16
for address in WEBSITE_LIST:
17
    pool.spawn(check_website, address)
18
19
# Wait for stuff to finish

20
pool.join()
21
        
22
end_time = time.time()        
23
24
print("Time for GreenSquirrel: %ssecs" % (end_time - start_time))
25
# Time for GreenSquirrel: 3.8395519256591797secs

Celery

Celery es un enfoque que diferencia sobre todo de lo que hemos visto hasta ahora. Es batalla en el contexto de entornos muy complejos y de alto rendimiento. Configuración de Celery requieren trastear un poco más que todas las soluciones anteriores.

En primer lugar, necesitaremos instalar Celery:

PIP install celery

Las tareas son los conceptos centrales dentro del proyecto de Celery. Todo lo que usted desea ejecutar dentro de Celery tiene que ser una tarea. Celery ofrece una gran flexibilidad para ejecutar tareas: puede ejecutar sincrónicamente o asincrónicamente, en tiempo real o programadas, en la misma máquina o en varias máquinas y utilizando hilos, procesos, Eventlet o gevent.

El arreglo será un poco más complejo. Celery utiliza otros servicios para enviar y recibir mensajes. Estos mensajes suelen ser las tareas o los resultados de las tareas. Vamos a utilizar Redis en este tutorial para ello. Redis es una gran opción ya que es muy fácil de instalar y configurar, y es realmente posible que ya usas en tu aplicación para otros fines, tales como almacenamiento en caché y pub/sub.

Puede instalar Redis siguiendo las instrucciones en la página de Inicio Rápido Redis. No te olvides de instalar la biblioteca de Python de redis, pip install redis y el paquete es necesario para el uso de Redis y Celery: pip install celery [redis].

Iniciar el servidor de Redis así: $ redis-servidor

Para empezar a construir cosas con Celery, primero necesitaremos crear una aplicación de Celery. Después de eso, Celery necesita saber qué tipo de tareas puede ejecutar. Para ello tenemos que registrar las tareas para la aplicación de Celery. Haremos esto con el decorador @app.task:

1
# celery_squirrel.py

2
3
import time
4
from utils import check_website
5
from data import WEBSITE_LIST
6
from celery import Celery
7
from celery.result import ResultSet
8
9
app = Celery('celery_squirrel',
10
             broker='redis://localhost:6379/0',
11
             backend='redis://localhost:6379/0')
12
13
@app.task
14
def check_website_task(address):
15
    return check_website(address)
16
17
if __name__ == "__main__":
18
    start_time = time.time()
19
20
    # Using `delay` runs the task async

21
    rs = ResultSet([check_website_task.delay(address) for address in WEBSITE_LIST])
22
    
23
    # Wait for the tasks to finish

24
    rs.get()
25
26
    end_time = time.time()
27
28
    print("CelerySquirrel:", end_time - start_time)
29
    # CelerySquirrel: 2.4979639053344727

No se asuste si nada está pasando. Recuerde que el Celery es un servicio, y tenemos que ejecutarlo. Hasta ahora, solamente colocamos las tareas en Redis pero no en Celery para ejecutarlos. Para ello, necesitamos ejecutar este comando en la carpeta donde se encuentra nuestro código:

celery worker -A do_celery --loglevel=debug --concurrency=4

Ahora, vuelva a ejecutar el script de Python y ver qué pasa. Una cosa es prestar atención a: Observe cómo pasamos la dirección de Redis a nuestra aplicación de Redis dos veces. El parámetro broker especifica donde las tareas se pasan a Celery y backend donde Celery pone los resultados para que podemos utilizar en nuestra aplicación. Si no especifica un backend de resultado, no hay forma para que sepamos cuando la tarea fue procesada y lo que fue el resultado.

Además, ten en cuenta que los registros están ahora en la salida estándar del procesos de Celery, así que Échales un vistazo en el terminal apropiado.

Conclusiones

Espero que esto haya sido un interesante viaje para ti y una buena introducción al mundo de la programación paralelo/concurrente en Python. Este es el final del viaje, y hay algunas conclusiones que podemos extraer:

  • Hay varios paradigmas que nos ayudan a lograr la computación de alto rendimiento en Python.
  • Para el paradigma Multihilo, tenemos las bibliotecas threading y concurrent.futures.
  • multiprocessing ofrece una interfaz muy similar a threading pero para procesos en lugar de hilos.
  • Recuerda procesos para lograr cierto paralelismo, pero son más costosos crear.
  • Recuerde que un proceso puede tener más hilos ejecutándose dentro de ella.
  • Error paralelo para concurrentes. Recuerde que solamente el enfoque paralelo toma ventaja de los procesadores multi-core, mientras que la programación concurrente inteligentemente asigna horarios a tareas para que la espera en las operaciones de larga duración está hace rato en paralelo haciendo cómputo real.
Advertisement
Did you find this post useful?
Want a weekly email summary?
Subscribe below and we’ll send you a weekly email summary of all new Code tutorials. Never miss out on learning about the next big thing.
Advertisement
Looking for something to help kick start your next project?
Envato Market has a range of items for sale to help get you started.