Advertisement
  1. Code
  2. Python

Echanges E/S asynchrones avec Python 3

Scroll to top
Read Time: 12 min

French (Français) translation by Stéphane Esteve (you can also view the original English article)

Dans ce tutoriel, vous graviterez autour des fonctionnalités de communication asynchrone, développées à partir de la version 3.4 de Python, et augmentées depuis les versions 3.5 et 3.6.

Auparavant, Python contenait déjà quelques outils pour ce type de programmation. Le support des échanges asynchrones projette ce langage vers une exploitation optimisée, incluant des API de haut niveau et une base standard, permettant désormais d'unifier l'ensemble des solutions pré-existantes (Twisted, Gevent, Tornado, asyncore, etc.).

Notez qu'il est essentiel de comprendre que l'apprentissage des concepts asynchrones en Python sont loin d'être aisés, dû à son développement constant, son étendue davantage élargie et la prise en compte des évolutions des frameworks asynchrones déjà existants. Je me concentrerai sur les dernières innovations pour en faciliter légèrement sa compréhension.

Il y a tellement d'évolutions passionnantes sur le terrain des threads, des processus et des machines distantes. Et aussi de différences et de limitations liées aux spécificités d'une plateforme. Allons donc à sa découverte.

Les boucles d'événements raccordables

La boucle d'événement est le concept central des échanges asynchrones. Il y en a plusieurs au sein d'un programme. Chaque thread contient au moins une "event loop" active et offre les fonctions suivantes :

  • Déclarer, exécuter et annuler des appels différé (avec des clauses de timeouts).
  • Créer des passerelles client et serveur pour différentes sortes de communication.
  • Déclencher des sous-processus et les associer à des passerelles pour communiquer avec un programme externe.
  • Déléguer des exécutions de fonctions coûteuses vers un groupe de threads.

Un court exemple

Voici ici un petit exemple qui démarrent deux co-routines et exécute une fonction en différé. Il s'agit d'une démonstration de l'utilisation d'une event loop pour améliorer votre programme :

1
import asyncio
2
3
4
async def foo(delay):
5
    for i in range(10):
6
        print(i)
7
        await asyncio.sleep(delay)
8
9
10
def stopper(loop):
11
    loop.stop()
12
13
14
loop = asyncio.get_event_loop()
15
16
# Schedule a call to foo()

17
loop.create_task(foo(0.5))
18
loop.create_task(foo(1))
19
loop.call_later(12, stopper, loop)
20
21
# Block until loop.stop() is called()

22
loop.run_forever()
23
loop.close()

La classe AbstractEventLoop délivre le même principe à toute event loop. Plusieurs choses lui sont nécessaires pour fonctionner :

  • Hiérarchiser les fonctions et les co-routines pour toute exécution
  • Créer des fonctions anticipatrices et les tâches
  • Gérer des serveurs TCP
  • Manipuler des signaux (sur Unix)
  • Manipuler des pipes et des sous-processus

Voici les méthodes relatives à l'exécution et à l'arrêt des événements, ainsi qu'à la coordination des fonctions et des co-routines :

1
class AbstractEventLoop:
2
    """Abstract event loop."""
3
4
    # Running and stopping the event loop.

5
6
    def run_forever(self):
7
        """Run the event loop until stop() is called."""
8
        raise NotImplementedError
9
10
    def run_until_complete(self, future):
11
        """Run the event loop until a Future is done.

12


13
        Return the Future's result, or raise its exception.

14
        """
15
        raise NotImplementedError
16
17
    def stop(self):
18
        """Stop the event loop as soon as reasonable.

19


20
        Exactly how soon that is may depend on the implementation, but

21
        no more I/O callbacks should be scheduled.

22
        """
23
        raise NotImplementedError
24
25
    def is_running(self):
26
        """Return whether the event loop is currently running."""
27
        raise NotImplementedError
28
29
    def is_closed(self):
30
        """Returns True if the event loop was closed."""
31
        raise NotImplementedError
32
33
    def close(self):
34
        """Close the loop.

35


36
        The loop should not be running.

37


38
        This is idempotent and irreversible.

39


40
        No other methods should be called after this one.

41
        """
42
        raise NotImplementedError
43
44
    def shutdown_asyncgens(self):
45
        """Shutdown all active asynchronous generators."""
46
        raise NotImplementedError
47
48
    # Methods scheduling callbacks.  All these return Handles.

49
50
    def _timer_handle_cancelled(self, handle):
51
        """Notification that a TimerHandle has been cancelled."""
52
        raise NotImplementedError
53
54
    def call_soon(self, callback, *args):
55
        return self.call_later(0, callback, *args)
56
57
    def call_later(self, delay, callback, *args):
58
        raise NotImplementedError
59
60
    def call_at(self, when, callback, *args):
61
        raise NotImplementedError
62
63
    def time(self):
64
        raise NotImplementedError
65
66
    def create_future(self):
67
        raise NotImplementedError
68
69
    # Method scheduling a coroutine object: create a task.

70
71
    def create_task(self, coro):
72
        raise NotImplementedError
73
74
    # Methods for interacting with threads.

75
76
    def call_soon_threadsafe(self, callback, *args):
77
        raise NotImplementedError
78
79
    def run_in_executor(self, executor, func, *args):
80
        raise NotImplementedError
81
82
    def set_default_executor(self, executor):
83
        raise NotImplementedError

Raccorder à une nouvelle Event Loop

Asyncio est conçu pour supporter diverses implémentations d'event loop qui respectent le corps de son API. A la base, se trouve la classe EventLoopPolicy qui configure l'API asyncio et facilite le contrôle de chaque aspect de l'event loop. Voici ici un exemple d'une event loop personalisée appelée uvloop et basée sur libuv, qui est sensée être bien plus rapide que d'autres solutions (je ne l'ai pas évalué moi-même) :

1
import asyncio
2
import uvloop
3
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

C'est tout. Désormais, quelle que soit la fonction asyncio utilisée, c'est uvloop qui s'en chargera discrètement.

Co-routines, Futures et Tâches

Une co-routine est un terme chargé de sens. Tantôt, c'est une fonction qui s'exécute de manière asynchrone ; tantôt un objet qui a besoin d'être hiérarchisé. Vous la chargez en y ajoutant le mot-clé async devant sa définition.

1
import asyncio
2
3
4
async def cool_coroutine():
5
    return "So cool..."
6

Si vous l'exécutez directement, rien ne se passe. A la place un objet co-routine est renvoyé, et si aucune hiérarchisation n'est faite au cours de son exécution, voici l'avertissement obtenu :

1
c = cool_coroutine()
2
print(c)
3
4
Output:
5
6
<coroutine object cool_coroutine at 0x108a862b0>
7
sys:1: RuntimeWarning: coroutine 'cool_coroutine' was never awaited
8
9
Process finished with exit code 0

Pour déclencher son exécution, vous avez besoin d'une event loop :

1
r = loop.run_until_complete(c)
2
loop.close()
3
4
print(r)
5
6
Output:
7
8
So cool...

Il s'agit d'un séquençage direct. Vous pouvez aussi les lier les unes aux autres. Remarquez que vous devez inscrire await dès lors que vous invoquez les co-routines :

1
import asyncio
2
3
async def compute(x, y):
4
    print("Compute %s + %s ..." % (x, y))
5
    await asyncio.sleep(1.0)
6
    return x + y
7
8
async def print_sum(x, y):
9
    result = await compute(x, y)
10
    print("%s + %s = %s" % (x, y, result))
11
12
loop = asyncio.get_event_loop()
13
loop.run_until_complete(print_sum(1, 2))
14
loop.close()

La class Future d'asyncio est similaire à la classe concurrent.future.Future. Ce n'est pas garanti au niveau thread et prend en charge les fonctionnalités suivantes :

  • ajouter ou retirer des callbacks terminés
  • annuler
  • formater les résultats et les exceptions

Voici la façon dont on peut utiliser un Future dans une event loop. La co-routine take_your_time() reçoit un Future et ajuste son résultat après avoir attendu une seconde.

La fonction ensure_future() planifie la co-routine et wait_until_complete() patiente jusqu'à la fin du Future. En fait, sous le capot, il ajoute un callback terminé au Future.

1
import asyncio
2
3
async def take_your_time(future):
4
    await asyncio.sleep(1)
5
    future.set_result(42)
6
7
loop = asyncio.get_event_loop()
8
future = asyncio.Future()
9
asyncio.ensure_future(take_your_time(future))
10
loop.run_until_complete(future)
11
print(future.result())
12
loop.close()

Tout ceci reste assez lourd. Asyncio délivre des tâches adaptées aux Futures et aux co-routines. Une Tâche est une sous-classe de Future, englobant une co-routine et que vous pouvez arrêter à tout moment.

La co-routine n'a plus qu'à adopter une fonction Future explicite et ajuste son résultat ou son exception. Même exemple qu'auparavant, mais cette fois, avec une tâche :

1
import asyncio
2
3
async def take_your_time():
4
    await asyncio.sleep(1)
5
    return 42
6
7
loop = asyncio.get_event_loop()
8
task = loop.create_task(take_your_time())
9
loop.run_until_complete(task)
10
print(task.result())
11
loop.close()

Passerelles, protocoles et flux

Une passerelle est une abstraction d'un canal de communication. Elle s'appuie toujours sur un protocole particulier. Asyncio contient un certain nombre d'implémentations clé-en-main pour le TCP, l'UDP, le SSL et aussi des sous-processus reliés.

Si vous connaissez déjà la programmation de réseaux basés sur les sockets, alors les passerelles et les protocoles n'ont plus aucun secret pour vous. Avec Asyncio, vous avez accès à la programmation standard de réseaux asynchrones. Reprenons l'infâme exemple du serveur d'écho et de son client (le "hello world" de la programmation réseau).

Premièrement, l'application client définit une classe appelée EchoClient , déclinaison de asyncio.Protocol. Elle conserve une event loop et un message qu'elle transmettra au serveur au cours de sa connexion.

Dans le callback connection_made(), elle inscrit son message à la passerelle. Dans la méthode data_received(), elle affiche la réponse du serveur, et par la méthode connection_lost() elle stoppe l'event loop. Dès qu'une instance d'EchoClient est définie dans la méthode create_connection(), le résultat obtenu est une co-routine que la boucle exécute tant qu'elle n'est pas terminée.

1
import asyncio
2
3
class EchoClient(asyncio.Protocol):
4
    def __init__(self, message, loop):
5
        self.message = message
6
        self.loop = loop
7
8
    def connection_made(self, transport):
9
        transport.write(self.message.encode())
10
        print('Data sent: {!r}'.format(self.message))
11
12
    def data_received(self, data):
13
        print('Data received: {!r}'.format(data.decode()))
14
15
    def connection_lost(self, exc):
16
        print('The server closed the connection')
17
        print('Stop the event loop')
18
        self.loop.stop()
19
20
loop = asyncio.get_event_loop()
21
message = 'Hello World!'
22
coro = loop.create_connection(lambda: EchoClient(message, loop),
23
                              '127.0.0.1', 8888)
24
loop.run_until_complete(coro)
25
loop.run_forever()
26
loop.close()  

Le serveur reprend le même code, à l'exception qu'il s'exécute perpétuellement, attendant que des clients s'y connectent. Après avoir envoyé un signal contenant sa réponse, il coupe la connexion vers le client et il peut répondre au client suivant.

Une nouvelle instance de l'EchoServer est créée à chaque connexion. Ainsi, même si plusieurs clients se connectent en même temps, il n'y aura absolument aucun conflit grâce à l'attribut transport.

1
import asyncio
2
3
class EchoServer(asyncio.Protocol):
4
    def connection_made(self, transport):
5
        peername = transport.get_extra_info('peername')
6
        print('Connection from {}'.format(peername))
7
        self.transport = transport
8
9
    def data_received(self, data):
10
        message = data.decode()
11
        print('Data received: {!r}'.format(message))
12
13
        print('Send: {!r}'.format(message))
14
        self.transport.write(data)
15
16
        print('Close the client socket')
17
        self.transport.close()
18
19
loop = asyncio.get_event_loop()
20
# Each client connection will create a new protocol instance

21
coro = loop.create_server(EchoServer, '127.0.0.1', 8888)
22
server = loop.run_until_complete(coro)
23
print('Serving on {}'.format(server.sockets[0].getsockname()))
24
loop.run_forever()

Voici ce qu'il affiche, suite à deux connexions clients :

1
Serving on ('127.0.0.1', 8888)
2
Connection from ('127.0.0.1', 53248)
3
Data received: 'Hello World!'
4
Send: 'Hello World!'
5
Close the client socket
6
Connection from ('127.0.0.1', 53351)
7
Data received: 'Hello World!'
8
Send: 'Hello World!'
9
Close the client socket

Les flux nécessitent une API de haut-niveau, issues de co-routines, et produisent des abstraction de type Reader et Writer. Les protocoles et les passerelles sont transparents, ne nécessitant aucune déclaration de classe, ni aucun callbacks. Seuls sont attendus des événements de type connexion ou donnée.

Le client fait appel à la fonction open_connection() qui renvoit un objet reader et un objet writer, naturellement hérités. Pour fermer la connexion, elle détruit l'objet writer.

1
import asyncio
2
3
4
async def tcp_echo_client(message, loop):
5
    reader, writer = await asyncio.open_connection(
6
        '127.0.0.1', 
7
        8888, 
8
        loop=loop)
9
10
    print('Send: %r' % message)
11
    writer.write(message.encode())
12
13
    data = await reader.read(100)
14
    print('Received: %r' % data.decode())
15
16
    print('Close the socket')
17
    writer.close()
18
19
20
message = 'Hello World!'
21
loop = asyncio.get_event_loop()
22
loop.run_until_complete(tcp_echo_client(message, loop))
23
loop.close()

Le serveur est aussi davantage épuré.

1
import asyncio
2
3
async def handle_echo(reader, writer):
4
    data = await reader.read(100)
5
    message = data.decode()
6
    addr = writer.get_extra_info('peername')
7
    print("Received %r from %r" % (message, addr))
8
9
    print("Send: %r" % message)
10
    writer.write(data)
11
    await writer.drain()
12
13
    print("Close the client socket")
14
    writer.close()
15
16
loop = asyncio.get_event_loop()
17
coro = asyncio.start_server(handle_echo, 
18
                            '127.0.0.1', 
19
                            8888, 
20
                            loop=loop)
21
server = loop.run_until_complete(coro)
22
print('Serving on {}'.format(server.sockets[0].getsockname()))
23
loop.run_forever()

Travailler avec des sous-processus

Asyncio renferme également des interactions possibles avec les sous-processus. Le programme ci-dessous déclenche un autre processus Python et exécute le code "import this". C'est une des pochette-surprises connu de Python, qui affiche le "Zen of Python". Constatez vous-même.

Le processus Python est déclenché par la co-routine zen() en utilisant la fonction create_subprocess_exec() et renvoie sa sortie standard à un autre conduit. Ainsi, il parcoure ce contenu ligne-à-ligne via await pour laisser aux autres processus ou co-routines la possibilité de s'exécuter si cette sortie n'est pas totalement conclue.

Remarquez que sous Windows, vous devez inscrire l'event loop au ProactorEventLoop , car la fonction SelectorEventLoop ne prend pas en charge les pipes.

1
import asyncio.subprocess
2
import sys
3
4
5
async def zen():
6
    code = 'import this'
7
    create = asyncio.create_subprocess_exec(
8
        sys.executable, 
9
        '-c', 
10
        code,
11
        stdout=asyncio.subprocess.PIPE)
12
    proc = await create
13
14
    data = await proc.stdout.readline()
15
    while data:
16
        line = data.decode('ascii').rstrip()
17
        print(line)
18
        data = await proc.stdout.readline()
19
20
    await proc.wait()
21
22
if sys.platform == "win32":
23
    loop = asyncio.ProactorEventLoop()
24
    asyncio.set_event_loop(loop)
25
else:
26
    loop = asyncio.get_event_loop()
27
28
loop.run_until_complete(zen())
29
30
Output:
31
32
The Zen of Python, by Tim Peters
33
34
Beautiful is better than ugly.
35
Explicit is better than implicit.
36
Simple is better than complex.
37
Complex is better than complicated.
38
Flat is better than nested.
39
Sparse is better than dense.
40
Readability counts.
41
Special cases aren't special enough to break the rules.

42
Although practicality beats purity.

43
Errors should never pass silently.

44
Unless explicitly silenced.

45
In the face of ambiguity, refuse the temptation to guess.

46
There should be one-- and preferably only one --obvious way to

47
do it.

48
Although that way may not be obvious at first unless you're
49
Dutch.
50
Now is better than never.
51
Although never is often better than *right* now.
52
If the implementation is hard to explain, it's a bad idea.

53
If the implementation is easy to explain, it may be a good idea.

54
Namespaces are one honking great idea -- let's do more of those!

Conclusion

N'hésitez pas à consulter ou acheter ce qu'il y a de disponible sur le sujet sur notre boutique, et contactez-nous si vous avez la moindre question. Nous sommes aussi attentifs à vos remarques, que vous pouvez laisser sur le formulaire ci-dessous.

Python Asyncio est un framework complet pour toute programmation asynchrone. Ses possibilités sont immenses et il pourvoit à la fois des API de bas et haut niveaux. Il est certes encore jeune et mal compris par la communauté.

Je suis persuadé qu'avec le temps, de bonnes conduites émergeront, et davantage d'exemples feront surface et rendront son utilisation plus poussée.

Advertisement
Did you find this post useful?
Want a weekly email summary?
Subscribe below and we’ll send you a weekly email summary of all new Code tutorials. Never miss out on learning about the next big thing.
Advertisement
Looking for something to help kick start your next project?
Envato Market has a range of items for sale to help get you started.