В прошлый раз мы познакомились с базовым синтаксисом и приёмами асинхронного программирования на Python. Сегодня сделаем финальный шаг перед тем, как применить эти знания на практике: поговорим о синхронизации корутин и управлении порядком их выполнения. Тема непростая, если не читали начало — лучше познакомьтесь с ним, а потом возвращайтесь сюда.
Коротко про основные понятия
Большая часть кода в наших проектах работает так: программа считывает код и выполняет по очереди команды и функции. Пока, например, одна функция отправляет запрос на сервер, программа ждёт окончания её работы и не выполняет другие команды. Такой код называется синхронным.
Почти во всех крупных современных приложениях код выполняется асинхронно и некоторые операции выполняются параллельно. Например, программа может отправить несколько запросов на разные сайты, но не будет ждать ответа и простаивать, а продолжит дальше делать что-то полезное.
Корутины — это асинхронные функции. Выглядят как обычные функции, но перед стандартным ключевым словом def добавляется async. В нужный момент работа корутины ставится на паузу, и сразу же начинает выполняться другой код. В результате программа не стоит без дела совсем или тратит минимальное время на ожидание.
Приостановка корутин — необходимый этап запуска корутин, который обозначается ключевым словом await. Если есть async-функция, для неё обязательно должно быть await-ожидание. Проще говоря, мы не можем запустить асинхронную функцию, пока не скажем ей, в какой момент ей нужно будет остановиться (например, когда она получит определённый ответ от сервера).
Планировщик event loop — внутри него происходит запуск и остановка асинхронных функций.
Контекстный менеджер
Контекстный менеджер — конструкция с ключевым словом with
, которая работает с содержимым объекта, обрабатывая его значения как отдельные переменные или контексты. Это можно сравнить с медкомиссией футбольной команды: команда — это единый объект, который выступает на соревнованиях, но для этого футболистам нужно проходить медкомиссию. Врач — это тот самый контекстный менеджер, который по списку вызывает каждого члена команды и работает с ним отдельно и независимо от остальных игроков.
В контекстном менеджере асинхронного программирования на Python встроены два блока кода: __enter__()
и __exit__()
. Это значит, что каждый раз при запуске контекстного менеджера до и после кода, который вы в нём пропишете, всегда будет выполняться дополнительный код из этих двух методов:
- сначала выполняется код в блоке
__enter__()
; - потом код, который мы написали внутри менеджера;
- потом код из
__exit__()
.
Пример контекстного менеджера — конструкция with
open при работе с файлом. С ней не нужно писать дополнительную команду для закрытия файла, потому что она уже встроена в метод __exit__()
.
Группировка асинхронных задач в Python
Корутины в Python можно объединять в группы задач, чтобы можно было контролировать завершение всех задач в группе. При этом, пока работают корутины, в группу можно добавлять дополнительные задачи. А если хотя бы одна задача остановилась из-за ошибки, то все остальные корутины тоже перестают выполняться.
Смысл такой группировки в том, что мы можем сделать несколько корутин, которые параллельно будут решать какую-то одну задачу: обработать данные с сервера или передать информацию в другой сервис. А так как это, по сути, одна задача, только разбитая на параллельные треки, то при сбое одной из них отменяются все остальные задачи из этой группы.
Группировка задач делается с помощью TaskGroup — комбинации create_task
и контекстного менеджера. На практике это выглядит так (почитайте комментарии в коде, чтобы понять, что за что отвечает):
# создаём корутину, которая симулирует IO операцию
async def fetch_data(id, wait_time):
print(f'\nКорутина с IO-операцией {id} начала работу')
# asyncio.sleep имитирует асинхронное ожидание на внешней системе,
# мы можем ждать сразу несколько таких операций параллельно
await asyncio.sleep(wait_time)
# отчитываемся о завершении работы и возвращаем данные в виде строки
print(f'Данные из корутины {id} загружены')
return f'Данные, которые возвращает корутина {id}'
# создаём корутину с основной логикой
async def main():
# создаём пустой список
tasks = []
# создаём контекстный менеджер, используя функцию сразу для нескольких задач
async with asyncio.TaskGroup() as tg:
# нумеруем каждую задачу и добавляем их в список
for i, wait_main in enumerate([1, 2, 3], start=1):
task = tg.create_task(fetch_data(i, wait_main))
tasks.append(task)
# объявляем переменную, в которой лежат результаты всех функций в одном списке
results = [task.result() for task in tasks]
# смотрим результат и время появления каждой задачи
for result in results:
print(f'Получен результат: {result}')
Здесь мы создаём пустой список и асинхронный контекстный менеджер с функцией asyncio.TaskGroup
. Потом добавляем в список все корутины и создаём ещё один с результатами выполнения.
Обратите внимание, что нам не нужна дополнительная корутина и слово await
в основной функции, всё делает TaskGroup
. Ещё TaskGroup
умеет обрабатывать ошибки в корутинах, которые выполняются внутри неё, что делает этот подход надёжнее gather
.
В наших корутинах мы указали разное время выполнения. Поэтому они завершают работу за разное время, но всё равно делают это параллельно и тратят 3 секунды на всю работу. Как только последняя корутина завершила работу, мы получаем все результаты разом:
Инструменты синхронизации: Lock
Чтобы при асинхронной работе сохранить какой-то порядок, используют примитивы синхронизации. Мы рассмотрим три из них: Lock, Semaphore и Event.
Lock используется для защиты критически важных частей программы. Чтобы разные асинхронные функции не пытались работать с такими данными одновременно, в этих местах программа становится синхронной.
Сделаем имитацию важных данных, которые могут быть доступны нескольким корутинам. Для этого объявим числовую переменную shared_data
. Ещё нам понадобится сам Lock:
# переменная общего доступа
shared_data = 0
# объект блокировки
lock = asyncio.Lock()
Для каждого из примитивов синхронизации мы напишем новый код. Сначала код корутины, которая выполняет операции с общедоступным важным ресурсом:
# корутина, которая имеет доступ к общему файлу
async def modify_shared_data():
# говорим, что будем работать с глобальной переменной
global shared_data
# запускаем контекстный менеджер
async with lock:
# критически важная часть: корутина что-то менят в общем файле
print(f'Данные до изменения: {shared_data}')
shared_data += 1
# имитация IO операции
await asyncio.sleep(1)
print(f'Данные после изменения: {shared_data}\n')
# конец критически важной части
Мы используем асинхронный контекстный менеджер и передаём в него примитив Lock. После этого контекстный менеджер сможет проверять: не пытается ли использовать этот же самый Lock какая-то другая корутина? Если да, то выполнение функций ставится в очередь по одной за раз.
Теперь создадим 5 корутин, направим их все на нашу переменную и попробуем изменить её одновременно с использованием gather
:
# создаём корутину с основной логикой
async def main():
await asyncio.gather(*(modify_shared_data() for _ in range(5)))
Одновременного выполнения не получится, потому что мы превратили асинхронный код в синхронный в том месте, где нам это было нужно:
Инструменты синхронизации: Semaphore
Semaphore похож на Lock: мы даём одновременный доступ к общему ресурсу, но только ограниченному количеству корутин.
Чтобы показать, как это работает, напишем функцию, которая будет отчитываться о начале и конце работы:
# корутина, которая имеет доступ к общему файлу
async def shared_data(resource_id, semaphore_arg):
# используем контекстный менеджер с объектом Semaphore
async with semaphore_arg:
# имитация ограниченного процесса
print(f'Доступ к ресурсу {resource_id} отрыт, идёт работа')
# сейчас sleep имитирует время работы с ресурсом
await asyncio.sleep(1)
print(f'Доступ к ресурсу {resource_id} закрыт, работа закончена')
Для проверки создадим пять корутин и посмотрим, сколько из них смогут работать с данными:
# создаём корутину с основной логикой
async def main():
# semaphore разрешает два асинхронных доступа
semaphore = asyncio.Semaphore(2)
# мы запускаем 5 корутин, но semaphore разрешает одновременную работу только двум
await asyncio.gather(*(shared_data(i, semaphore) for i in range(5)))
Инструменты синхронизации: Event
Event служит уведомлением для других асинхронных задач о том, что произошло какое-то событие. По сути, это тот же самый флаг True-False, но на асинхронный лад: для установки значение True нужен метод set()
, а для False — метод clear()
. Изначально флаг установлен в значении False.
Давайте посмотрим, как это выглядит в коде. У нас будут три функции-корутины:
# корутина, которая ждёт установки event
async def waiter(event):
print(f'Ожидание event для назначения. Сейчас event равен {event}')
await event.wait()
print('Event назначено, можно продолжать программу')
# корутина, которая устанавливает event
async def setter(event):
# имитация работы
await asyncio.sleep(2)
# устанавливаем event и отчитываемся
event.set()
print(f'Event назначено внутри корутины setter. Сейчас event равен {event}')
# создаём корутину с основной логикой
async def main():
# создаём объект Event
event = asyncio.Event()
# запускаем две корутины и смотрим на результат
await asyncio.gather(waiter(event), setter(event))
Вот что будет происходить после запуска этого кода командой asyncio.run(main())
:
- Сначала включится корутина с основной логикой
main()
. Она создаст объектevent
, пока ещё в значении False. - Строка
await asyncio.gather(waiter(event), setter(event))
запустит две наши дополнительные корутины. - Первой будет запущена
waiter(event)
. - Когда Python дойдёт до строки
event.wait()
, он получит знак, что для продолжения работы нужно дождаться условного знака — установки Event. Поэтому функцияwaiter(event)
переходит в режим ожидания. - Запускается вторая корутина —
setter(event)
. - Функция
setter(event)
имитирует необходимую работу — в примере мы просто делаем паузу в две секунды. После этого корутина устанавливает асинхронный Event-флаг в значение True командойevent.set()
. - Снова запускается асинхронная функция
waiter(event)
и выводит сообщение о том, что Event установлен и работа закончена.
А так это выглядит в действии. Обратите внимание, что после установки Event в нём появляется информация о количестве ожидающих его корутин.
Где это применяется на практике
Асинхронное программирование делает программу лучше при двух условиях:
- Сервис использует в работе операции на других системах: делает запросы на другие сайты, базы данных, API. Такие задачи называются IO-bound-операции.
- Приложение достаточно большое, такие задачи запускаются часто.
Если выполнять IO-bound-операции часто и синхронно, всё будет работать очень медленно. Например, посетители сайта будут обслуживаться по очереди, как у реального живого продавца. Асинхронное программирование позволит не ждать выполнения каждой функции, а заняться пока запросами к базе данных и перейти к следующему пользователю.
При этом пользоваться технологией асинхронности нужно осторожно: если в базу данных вдруг начнёт поступать один и тот же запрос одновременно, появятся ошибки и баги. Чтобы настроить одновременный доступ, программисты используют примитивы синхронизации — с ними можно установить, что с критически важной информацией будет работать только одна задача за раз.
Пример работы примитивов синхронизации: при создании нового пользователя нельзя будет одновременно послать запросы с разных компьютеров и создать несколько клиентов с одним id. Получается, что иногда асинхронная программа будет работать как обычная, зато ничего не сломается.
Что дальше
Теперь у нас всё готово к тому, чтобы написать асинхронный код, который будет обращаться к серверу, работать с данными и показывать какой-то результат. Заодно посмотрим, как ведёт себя такая асинхронность в реальных проектах.