.. meta:: :keywords: Python, Python3, Asyncio ========================= Asyncio behind the Scenes ========================= .. contents:: Table of Contents :backlinks: none What is Task? -------------- .. code-block:: python # goal: supervise coroutine run state # ref: asyncio/tasks.py import asyncio Future = asyncio.futures.Future class Task(Future): """Simple prototype of Task""" def __init__(self, gen, *, loop): super().__init__(loop=loop) self._gen = gen self._loop.call_soon(self._step) def _step(self, val=None, exc=None): try: if exc: f = self._gen.throw(exc) else: f = self._gen.send(val) except StopIteration as e: self.set_result(e.value) except Exception as e: self.set_exception(e) else: f.add_done_callback( self._wakeup) def _wakeup(self, fut): try: res = fut.result() except Exception as e: self._step(None, e) else: self._step(res, None) @asyncio.coroutine def foo(): yield from asyncio.sleep(3) print("Hello Foo") @asyncio.coroutine def bar(): yield from asyncio.sleep(1) print("Hello Bar") loop = asyncio.get_event_loop() tasks = [Task(foo(), loop=loop), loop.create_task(bar())] loop.run_until_complete( asyncio.wait(tasks)) loop.close() output: .. code-block:: console $ python test.py Hello Bar hello Foo How does event loop work? ------------------------- .. code-block:: python import asyncio from collections import deque def done_callback(fut): fut._loop.stop() class Loop: """Simple event loop prototype""" def __init__(self): self._ready = deque() self._stopping = False def create_task(self, coro): Task = asyncio.tasks.Task task = Task(coro, loop=self) return task def run_until_complete(self, fut): tasks = asyncio.tasks # get task fut = tasks.ensure_future( fut, loop=self) # add task to ready queue fut.add_done_callback(done_callback) # run tasks self.run_forever() # remove task from ready queue fut.remove_done_callback(done_callback) def run_forever(self): """Run tasks until stop""" try: while True: self._run_once() if self._stopping: break finally: self._stopping = False def call_soon(self, cb, *args): """Append task to ready queue""" self._ready.append((cb, args)) def call_exception_handler(self, c): pass def _run_once(self): """Run task at once""" ntodo = len(self._ready) for i in range(ntodo): t, a = self._ready.popleft() t(*a) def stop(self): self._stopping = True def close(self): self._ready.clear() def get_debug(self): return False @asyncio.coroutine def foo(): print("Foo") @asyncio.coroutine def bar(): print("Bar") loop = Loop() tasks = [loop.create_task(foo()), loop.create_task(bar())] loop.run_until_complete( asyncio.wait(tasks)) loop.close() output: .. code-block:: console $ python test.py Foo Bar How does ``asyncio.wait`` work? -------------------------------- .. code-block:: python import asyncio async def wait(fs, loop=None): fs = {asyncio.ensure_future(_) for _ in set(fs)} if loop is None: loop = asyncio.get_event_loop() waiter = loop.create_future() counter = len(fs) def _on_complete(f): nonlocal counter counter -= 1 if counter <= 0 and not waiter.done(): waiter.set_result(None) for f in fs: f.add_done_callback(_on_complete) # wait all tasks done await waiter done, pending = set(), set() for f in fs: f.remove_done_callback(_on_complete) if f.done(): done.add(f) else: pending.add(f) return done, pending async def slow_task(n): await asyncio.sleep(n) print('sleep "{}" sec'.format(n)) loop = asyncio.get_event_loop() try: print("---> wait") loop.run_until_complete( wait([slow_task(_) for _ in range(1, 3)])) print("---> asyncio.wait") loop.run_until_complete( asyncio.wait([slow_task(_) for _ in range(1, 3)])) finally: loop.close() output: .. code-block:: bash ---> wait sleep "1" sec sleep "2" sec ---> asyncio.wait sleep "1" sec sleep "2" sec Simple asyncio.run ------------------- .. code-block:: python >>> import asyncio >>> async def getaddrinfo(host, port): ... loop = asyncio.get_event_loop() ... return (await loop.getaddrinfo(host, port)) ... >>> def run(main): ... loop = asyncio.new_event_loop() ... asyncio.set_event_loop(loop) ... return loop.run_until_complete(main) ... >>> ret = run(getaddrinfo('google.com', 443)) >>> ret = asyncio.run(getaddrinfo('google.com', 443)) How does ``loop.sock_*`` work? ------------------------------- .. code-block:: python import asyncio import socket def sock_accept(self, sock, fut=None, registed=False): fd = sock.fileno() if fut is None: fut = self.create_future() if registed: self.remove_reader(fd) try: conn, addr = sock.accept() conn.setblocking(False) except (BlockingIOError, InterruptedError): self.add_reader(fd, self.sock_accept, sock, fut, True) except Exception as e: fut.set_exception(e) else: fut.set_result((conn, addr)) return fut def sock_recv(self, sock, n, fut=None, registed=False): fd = sock.fileno() if fut is None: fut = self.create_future() if registed: self.remove_reader(fd) try: data = sock.recv(n) except (BlockingIOError, InterruptedError): self.add_reader(fd, self.sock_recv, sock, n, fut, True) except Exception as e: fut.set_exception(e) else: fut.set_result(data) return fut def sock_sendall(self, sock, data, fut=None, registed=False): fd = sock.fileno() if fut is None: fut = self.create_future() if registed: self.remove_writer(fd) try: n = sock.send(data) except (BlockingIOError, InterruptedError): n = 0 except Exception as e: fut.set_exception(e) return if n == len(data): fut.set_result(None) else: if n: data = data[n:] self.add_writer(fd, sock, data, fut, True) return fut async def handler(loop, conn): while True: msg = await loop.sock_recv(conn, 1024) if msg: await loop.sock_sendall(conn, msg) else: break conn.close() async def server(loop): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setblocking(False) sock.bind(('localhost', 9527)) sock.listen(10) while True: conn, addr = await loop.sock_accept(sock) loop.create_task(handler(loop, conn)) EventLoop = asyncio.SelectorEventLoop EventLoop.sock_accept = sock_accept EventLoop.sock_recv = sock_recv EventLoop.sock_sendall = sock_sendall loop = EventLoop() try: loop.run_until_complete(server(loop)) except KeyboardInterrupt: pass finally: loop.close() output: .. code-block:: bash # console 1 $ python3 async_sock.py & $ nc localhost 9527 Hello Hello # console 2 $ nc localhost 9527 asyncio asyncio How does ``loop.create_server`` work? ------------------------------------- .. code-block:: python import asyncio import socket loop = asyncio.get_event_loop() async def create_server(loop, protocol_factory, host, port, *args, **kwargs): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setblocking(False) sock.bind((host, port)) sock.listen(10) sockets = [sock] server = asyncio.base_events.Server(loop, sockets) loop._start_serving(protocol_factory, sock, None, server) return server class EchoProtocol(asyncio.Protocol): def connection_made(self, transport): peername = transport.get_extra_info('peername') print('Connection from {}'.format(peername)) self.transport = transport def data_received(self, data): message = data.decode() self.transport.write(data) # Equal to: loop.create_server(EchoProtocol, # 'localhost', 5566) coro = create_server(loop, EchoProtocol, 'localhost', 5566) server = loop.run_until_complete(coro) try: loop.run_forever() finally: server.close() loop.run_until_complete(server.wait_closed()) loop.close() output: .. code-block:: bash # console1 $ nc localhost 5566 Hello Hello # console2 $ nc localhost 5566 asyncio asyncio