Прокачиваем асинхронное программирование на Python: используем контекстный менеджер
hard

Прокачиваем асинхронное программирование на Python: используем контекстный менеджер

Берём хаос под контроль

В прошлый раз мы познакомились с базовым синтаксисом и приёмами асинхронного программирования на Python. Сегодня сделаем финальный шаг перед тем, как применить эти знания на практике: поговорим о синхронизации корутин и управлении порядком их выполнения. Тема непростая, если не читали начало — лучше познакомьтесь с ним, а потом возвращайтесь сюда.

Коротко про основные понятия

Большая часть кода в наших проектах работает так: программа считывает код и выполняет по очереди команды и функции. Пока, например, одна функция отправляет запрос на сервер, программа ждёт окончания её работы и не выполняет другие команды. Такой код называется синхронным.

Почти во всех крупных современных приложениях код выполняется асинхронно и некоторые операции выполняются параллельно. Например, программа может отправить несколько запросов на разные сайты, но не будет ждать ответа и простаивать, а продолжит дальше делать что-то полезное.

Прокачиваем асинхронное программирование на Python: используем контекстный менеджер

Корутины — это асинхронные функции. Выглядят как обычные функции, но перед стандартным ключевым словом def добавляется async. В нужный момент работа корутины ставится на паузу, и сразу же начинает выполняться другой код. В результате программа не стоит без дела совсем или тратит минимальное время на ожидание.

Приостановка корутин — необходимый этап запуска корутин, который обозначается ключевым словом await. Если есть async-функция, для неё обязательно должно быть await-ожидание. Проще говоря, мы не можем запустить асинхронную функцию, пока не скажем ей, в какой момент ей нужно будет остановиться (например, когда она получит определённый ответ от сервера).

Планировщик event loop — внутри него происходит запуск и остановка асинхронных функций.

Контекстный менеджер

Контекстный менеджер — конструкция с ключевым словом with, которая работает с содержимым объекта, обрабатывая его значения как отдельные переменные или контексты. Это можно сравнить с медкомиссией футбольной команды: команда — это единый объект, который выступает на соревнованиях, но для этого футболистам нужно проходить медкомиссию. Врач — это тот самый контекстный менеджер, который по списку вызывает каждого члена команды и работает с ним отдельно и независимо от остальных игроков. 

В контекстном менеджере асинхронного программирования на Python встроены два блока кода: __enter__() и __exit__(). Это значит, что каждый раз при запуске контекстного менеджера до и после кода, который вы в нём пропишете, всегда будет выполняться дополнительный код из этих двух методов:

  • сначала выполняется код в блоке __enter__();
  • потом код, который мы написали внутри менеджера;
  • потом код из __exit__().

Пример контекстного менеджера —  конструкция with open при работе с файлом. С ней не нужно писать дополнительную команду для закрытия файла, потому что она уже встроена в метод __exit__().

Группировка асинхронных задач в Python

Корутины в Python можно объединять в группы задач, чтобы можно было контролировать завершение всех задач в группе. При этом, пока работают корутины, в группу можно добавлять дополнительные задачи. А если хотя бы одна задача остановилась из-за ошибки, то все остальные корутины тоже перестают выполняться. 

Смысл такой группировки в том, что мы можем сделать несколько корутин, которые параллельно будут решать какую-то одну задачу: обработать данные с сервера или передать информацию в другой сервис. А так как это, по сути, одна задача, только разбитая на параллельные треки, то при сбое одной из них отменяются все остальные задачи из этой группы.

Группировка задач делается с помощью TaskGroup  — комбинации create_task и контекстного менеджера. На практике это выглядит так (почитайте комментарии в коде, чтобы понять, что за что отвечает):

# создаём корутину, которая симулирует IO операцию
async def fetch_data(id, wait_time):
   print(f'\nКорутина с IO-операцией {id} начала работу')
   # asyncio.sleep имитирует асинхронное ожидание на внешней системе,
   # мы можем ждать сразу несколько таких операций параллельно
   await asyncio.sleep(wait_time)
   # отчитываемся о завершении работы и возвращаем данные в виде строки
   print(f'Данные из корутины {id} загружены')
   return f'Данные, которые возвращает корутина {id}'

# создаём корутину с основной логикой
async def main():
   # создаём пустой список
   tasks = []
   # создаём контекстный менеджер, используя функцию сразу для нескольких задач
   async with asyncio.TaskGroup() as tg:
       # нумеруем каждую задачу и добавляем их в список
       for i, wait_main in enumerate([1, 2, 3], start=1):
           task = tg.create_task(fetch_data(i, wait_main))
           tasks.append(task)

   # объявляем переменную, в которой лежат результаты всех функций в одном списке
   results = [task.result() for task in tasks]
   # смотрим результат и время появления каждой задачи
   for result in results:
       print(f'Получен результат: {result}')

Здесь мы создаём пустой список и асинхронный контекстный менеджер с функцией asyncio.TaskGroup. Потом добавляем в список все корутины и создаём ещё один с результатами выполнения.

Обратите внимание, что нам не нужна дополнительная корутина и слово await в основной функции, всё делает TaskGroup. Ещё TaskGroup умеет обрабатывать ошибки в корутинах, которые выполняются внутри неё, что делает этот подход надёжнее gather.

В наших корутинах мы указали разное время выполнения. Поэтому они завершают работу за разное время, но всё равно делают это параллельно и тратят 3 секунды на всю работу. Как только последняя корутина завершила работу, мы получаем все результаты разом:

Группировка асинхронных задач в Python

Инструменты синхронизации: Lock

Чтобы при асинхронной работе сохранить какой-то порядок, используют примитивы синхронизации. Мы рассмотрим три из них: Lock, Semaphore и Event.

Lock используется для защиты критически важных частей программы. Чтобы разные асинхронные функции не пытались работать с такими данными одновременно, в этих местах программа становится синхронной. 

Сделаем имитацию важных данных, которые могут быть доступны нескольким корутинам. Для этого объявим числовую переменную shared_data. Ещё нам понадобится сам Lock:

# переменная общего доступа
shared_data = 0
# объект блокировки
lock = asyncio.Lock()

Для каждого из примитивов синхронизации мы напишем новый код. Сначала код корутины, которая выполняет операции с общедоступным важным ресурсом:

# корутина, которая имеет доступ к общему файлу
async def modify_shared_data():
   # говорим, что будем работать с глобальной переменной
   global shared_data
   # запускаем контекстный менеджер
   async with lock:
       # критически важная часть: корутина что-то менят в общем файле
       print(f'Данные до изменения: {shared_data}')
       shared_data += 1
       # имитация IO операции
       await asyncio.sleep(1)
       print(f'Данные после изменения: {shared_data}\n')
       # конец критически важной части

Мы используем асинхронный контекстный менеджер и передаём в него примитив Lock. После этого контекстный менеджер сможет проверять: не пытается ли использовать этот же самый Lock какая-то другая корутина? Если да, то выполнение функций ставится в очередь по одной за раз.

Теперь создадим 5 корутин, направим их все на нашу переменную и попробуем изменить её одновременно с использованием gather:

# создаём корутину с основной логикой
async def main():
   await asyncio.gather(*(modify_shared_data() for _ in range(5)))

Одновременного выполнения не получится, потому что мы превратили асинхронный код в синхронный в том месте, где нам это было нужно:

Инструменты синхронизации: Lock

Инструменты синхронизации: Semaphore

Semaphore похож на Lock: мы даём одновременный доступ к общему ресурсу, но только ограниченному количеству корутин.

Чтобы показать, как это работает, напишем функцию, которая будет отчитываться о начале и конце работы:

# корутина, которая имеет доступ к общему файлу
async def shared_data(resource_id, semaphore_arg):
   # используем контекстный менеджер с объектом Semaphore
   async with semaphore_arg:
       # имитация ограниченного процесса
       print(f'Доступ к ресурсу {resource_id} отрыт, идёт работа')
       # сейчас sleep имитирует время работы с ресурсом
       await asyncio.sleep(1)
       print(f'Доступ к ресурсу {resource_id} закрыт, работа закончена')

Для проверки создадим пять корутин и посмотрим, сколько из них смогут работать с данными:

# создаём корутину с основной логикой
async def main():
   # semaphore разрешает два асинхронных доступа
   semaphore = asyncio.Semaphore(2)
   # мы запускаем 5 корутин, но semaphore разрешает одновременную работу только двум
   await asyncio.gather(*(shared_data(i, semaphore) for i in range(5)))
Инструменты синхронизации: Semaphore

Инструменты синхронизации: Event

Event служит уведомлением для других асинхронных задач о том, что произошло какое-то событие. По сути, это тот же самый флаг True-False, но на асинхронный лад: для установки значение True нужен метод set(), а для False — метод clear(). Изначально флаг установлен в значении False.

Давайте посмотрим, как это выглядит в коде. У нас будут три функции-корутины:

# корутина, которая ждёт установки event
async def waiter(event):
   print(f'Ожидание event для назначения. Сейчас event равен {event}')
   await event.wait()
   print('Event назначено, можно продолжать программу')

# корутина, которая устанавливает event
async def setter(event):
   # имитация работы
   await asyncio.sleep(2)
   # устанавливаем event и отчитываемся
   event.set()
   print(f'Event назначено внутри корутины setter. Сейчас event равен {event}')

# создаём корутину с основной логикой
async def main():
   # создаём объект Event
   event = asyncio.Event()
   # запускаем две корутины и смотрим на результат
   await asyncio.gather(waiter(event), setter(event))

Вот что будет происходить после запуска этого кода командой asyncio.run(main()):

  • Сначала включится корутина с основной логикой main(). Она создаст объект event, пока ещё в значении False.
  • Строка await asyncio.gather(waiter(event), setter(event)) запустит две наши дополнительные корутины.
  • Первой будет запущена waiter(event)
  • Когда Python дойдёт до строки event.wait(), он получит знак, что для продолжения работы нужно дождаться условного знака — установки Event. Поэтому функция waiter(event) переходит в режим ожидания. 
  • Запускается вторая корутина — setter(event).
  • Функция setter(event) имитирует необходимую работу — в примере мы просто делаем паузу в две секунды. После этого корутина устанавливает асинхронный Event-флаг в значение True командой event.set().
  • Снова запускается асинхронная функция waiter(event) и выводит сообщение о том, что Event установлен и работа закончена.

А так это выглядит в действии. Обратите внимание, что после установки Event в нём появляется информация о количестве ожидающих его корутин.

Инструменты синхронизации: Event

Где это применяется на практике

Асинхронное программирование делает программу лучше при двух условиях:

  • Сервис использует в работе операции на других системах: делает запросы на другие сайты, базы данных, API. Такие задачи называются IO-bound-операции.
  • Приложение достаточно большое, такие задачи запускаются часто.

Если выполнять IO-bound-операции часто и синхронно, всё будет работать очень медленно. Например, посетители сайта будут обслуживаться по очереди, как у реального живого продавца. Асинхронное программирование позволит не ждать выполнения каждой функции, а заняться пока запросами к базе данных и перейти к следующему пользователю.

При этом пользоваться технологией асинхронности нужно осторожно: если в базу данных вдруг начнёт поступать один и тот же запрос одновременно, появятся ошибки и баги. Чтобы настроить одновременный доступ, программисты используют примитивы синхронизации — с ними можно установить, что с критически важной информацией будет работать только одна задача за раз. 

Пример работы примитивов синхронизации: при создании нового пользователя нельзя будет одновременно послать запросы с разных компьютеров и создать несколько клиентов с одним id. Получается, что иногда асинхронная программа будет работать как обычная, зато ничего не сломается.

Что дальше

Теперь у нас всё готово к тому, чтобы написать асинхронный код, который будет обращаться к серверу, работать с данными и показывать какой-то результат. Заодно посмотрим, как ведёт себя такая асинхронность в реальных проектах.

Обложка:

Алексей Сухов

Корректор:

Ирина Михеева

Вёрстка:

Маша Климентьева

Соцсети:

Юлия Зубарева

Получите ИТ-профессию
В «Яндекс Практикуме» можно стать разработчиком, тестировщиком, аналитиком и менеджером цифровых продуктов. Первая часть обучения всегда бесплатная, чтобы попробовать и найти то, что вам по душе. Дальше — программы трудоустройства.
Получите ИТ-профессию Получите ИТ-профессию Получите ИТ-профессию Получите ИТ-профессию
Еще по теме
hard