Skip to content

Latest commit

 

History

History
2489 lines (1610 loc) · 79.7 KB

File metadata and controls

2489 lines (1610 loc) · 79.7 KB

Урок 37: Асинхронное программирование в Python

Содержание урока:

МОДУЛЬ 1 — Проблема блокировки и модели конкурентности

Введение — Проблема конкурентности

Почему «одной задачи» почти никогда не бывает

В учебных примерах программа обычно делает что-то одно:

x = input("Введите число: ")
print(int(x) ** 2)

Но реальные приложения устроены иначе.

Практически всегда одновременно присутствуют несколько типов активности:

  • пользователь взаимодействует с интерфейсом,
  • программа выполняет вычисления,
  • программа ждёт данные (файл, база данных, сеть),
  • выполняется логирование,
  • работают фоновые задачи.

То есть вместо одной линейной операции мы имеем несколько задач, которые существуют параллельно во времени.

Важно: «параллельно» здесь означает не обязательно физическое выполнение на нескольких ядрах машины. Это означает, что задачи логически независимы и могут мешать друг другу.


Чтобы говорить о конкурентности строго, нужно разделить виды нагрузки.

CPU-bound (вычисления)

Это задачи, которые активно используют процессор:

  • математические вычисления,
  • обработка больших массивов данных,
  • шифрование,
  • сжатие,
  • парсинг больших структур.

Схема:

[Задача] → CPU → CPU → CPU → CPU → CPU → завершение

Такая задача не ждёт, она постоянно вычисляет.


I/O-bound (ожидание ввода-вывода)

Это задачи, которые большую часть времени ждут:

  • чтение файла,
  • запрос к базе данных,
  • сетевой запрос,
  • ожидание ответа сервера.

Схема:

[Запрос] → (ожидание...) → (ожидание...) → (ожидание...) → результат

В этот момент процессор почти ничего не делает. Он просто ждёт.


Интерфейс (event-driven модель)

Графические приложения работают через цикл событий:

while True:
    ждать событие
    обработать событие

Если обработчик события не возвращает управление — интерфейс «замирает».


Что такое блокировка

Блокировка возникает тогда, когда одна задача удерживает управление и не позволяет другим задачам выполняться.

Простейший пример:

import time

print("Начало")
time.sleep(5)
print("Конец")

Пока выполняется sleep, программа ничего больше сделать не может.

Теперь представим, что:

  • пользователь нажал кнопку,
  • обработчик кнопки выполняет sleep(5).

Что произойдёт?

  • Интерфейс перестанет реагировать.

Почему?

  • Потому что обработчик не возвращает управление главному циклу событий.

Пока выполняется sleep, главный поток не может:

  • перерисовать окно,
  • обработать другие клики,
  • закрыть программу.

Это и есть блокировка.


Почему это происходит в Python

Чтобы понять это глубже, нужно кратко упомянуть GIL.

В стандартной реализации Python (CPython) существует механизм:

Global Interpreter Lock (GIL)

Суть его проста:

В один момент времени только один поток может исполнять Python-байткод.

Важно понимать:

  • Потоков может быть несколько.
  • Но Python-код одновременно исполняется только в одном.

Почему так сделано?

Потому что:

  • управление памятью в CPython основано на подсчёте ссылок,
  • без GIL пришлось бы защищать практически каждую операцию с объектами,
  • это сильно усложнило бы интерпретатор и замедлило работу однопоточных программ.

GIL — это инженерный компромисс.


Как это связано с блокировкой

Если в программе есть только один поток выполнения Python-кода, то схема выглядит так:

        ┌─────────────┐
        │ Python код  │
        └──────┬──────┘
               │
       ┌───────┴────────┐
       │  Задача A       │
       │  Задача B       │
       │  Задача C       │
       └─────────────────┘

Если задача A не возвращает управление — задачи B и C не получат времени исполнения.

Именно поэтому:

  • интерфейс зависает,
  • программа «не отвечает»,
  • обработка файла блокирует всё приложение.

Проблема конкурентности

Теперь можно сформулировать основную проблему.

У нас есть:

  • несколько логически независимых задач,
  • один интерпретатор Python,
  • ограничение GIL,
  • потенциальные операции ожидания.

Вопрос:

Как организовать выполнение так, чтобы задачи не мешали друг другу?

Иными словами:

  • как не блокировать интерфейс?
  • как не простаивать во время ожидания?
  • как выполнять несколько операций «одновременно»?

Три возможных направления решения

Исторически в Python есть три модели:

  1. Разделить задачи по процессам.
  2. Использовать потоки.
  3. Перестроить модель выполнения — использовать асинхронность.

Мы последовательно разберём каждую из них.


Важно не запомнить инструменты.

Важно понять фундамент:

  • Есть вычисления.
  • Есть ожидание.
  • Есть ограничение исполнения Python-кода.
  • Есть необходимость не блокировать управление.

Асинхронность — это не «магия ускорения».

Это способ: Эффективно использовать время ожидания, не блокируя выполнение других задач.


Ключевой вопрос урока: Как выполнять несколько задач так, чтобы они не блокировали друг друга?


Блок 1. Tkinter и блокировка

В предыдущем блоке мы сформулировали проблему конкурентности абстрактно. Теперь рассмотрим её в реальном, наглядном примере.

В качестве инструмента используем стандартную библиотеку Python — Tkinter.

Наша цель в этом блоке — не решить проблему, а увидеть её в действии.


Любое GUI-приложение работает по событийной модели.

После запуска программы начинается цикл обработки событий:

root.mainloop()

Что это означает?

Упрощённо можно представить так:

while True:
    ждать событие (клик, ввод, закрытие окна)
    вызвать обработчик

Этот цикл называется event loop — цикл событий.

Он:

  • ждёт действия пользователя,
  • вызывает соответствующую функцию,
  • возвращается к ожиданию следующего события.

Обработчик обязан вернуть управление обратно в цикл событий.

Если он этого не делает — приложение перестаёт реагировать.


Программа с блокировкой

Рассмотрим простой пример:

import tkinter as tk
import time

def freeze_program():
    label.config(text="Подождите...")
    time.sleep(5)  # Блокирует главный поток
    label.config(text="Готово!")

root = tk.Tk()
root.title("Блокировка интерфейса")

label = tk.Label(root, text="Нажмите кнопку")
label.pack(pady=10)

button = tk.Button(root, text="Старт", command=freeze_program)
button.pack()

root.mainloop()

Запустите программу и нажмите кнопку.

Что произойдёт?

  • Интерфейс перестанет реагировать.
  • Окно невозможно переместить (Если нет оптимизации самой системы).
  • Кнопки не нажимаются.
  • Иногда система помечает окно как «не отвечает».

Через 5 секунд интерфейс «оживает».


Разберём выполнение пошагово.

  1. Пользователь нажимает кнопку.
  2. mainloop вызывает функцию freeze_program.
  3. Внутри функции вызывается time.sleep(5).

Теперь важно понять:

time.sleep() — это блокирующая операция.

Пока выполняется sleep:

  • управление не возвращается в mainloop,
  • новые события не обрабатываются,
  • интерфейс «заморожен».

Почему интерфейс зависает

Интерфейс зависает не потому, что Tkinter «плохой».

Он зависает потому, что:

  • у нас один поток исполнения,
  • обработчик события не завершился,
  • главный цикл событий не получает управление.

Пока freeze_program не завершится, mainloop не может:

  • перерисовать окно,
  • принять новый клик,
  • обработать закрытие программы.

Что важно понять

В этом примере нет:

  • сложных вычислений,
  • потоков,
  • процессов,
  • сетевых запросов.

Есть всего одна строка:

time.sleep(5)

И этого достаточно, чтобы полностью остановить интерфейс.

Это демонстрирует фундаментальную проблему:

Любая блокирующая операция в главном потоке блокирует всё приложение.


Это не проблема Tkinter

Очень важно правильно интерпретировать пример.

Проблема не в Tkinter.

Та же ситуация возникнет:

  • в веб-сервере,
  • в обработчике HTTP-запроса,
  • в CLI-программе,
  • в любой системе с циклом обработки событий.

Проблема в модели выполнения:

Один поток → один активный участок кода → остальные задачи ждут.

Более широкая картина

Если заменить sleep на:

  • чтение большого файла,
  • запрос к базе данных,
  • обращение к API,
  • долгие вычисления,

эффект будет тот же.

Схема остаётся неизменной:

[Событие] → [Обработчик] → [Долгая операция] → (блокировка всего)

Формулировка проблемы в точных терминах

В нашем примере:

  • главный поток занят,
  • GIL удерживается текущим выполнением,
  • цикл событий не получает управление,
  • конкурентные задачи не исполняются.

Это и есть проявление проблемы конкурентности в простейшем виде.


Если не учитывать эту особенность:

  • интерфейс «висит»,
  • сервер перестаёт отвечать,
  • обработка запросов замедляется,
  • пользователи получают тайм-ауты.

Поэтому вопрос из предыдущего блока становится практическим:

Как выполнить длительную операцию, не блокируя основной поток выполнения?

В следующем блоке мы рассмотрим первый способ решения — изоляцию через процессы.


Блок 2. Multiprocessing — изоляция через процессы

Рассмотрим один из способов обойти эту блокировку — вынести выполнение в отдельный процесс.

Для этого используем стандартный модуль Python — multiprocessing.

Сразу обозначим:

мы не будем глубоко изучать multiprocessing.

Наша задача — понять модель и увидеть, почему интерфейс перестаёт зависать.


Что такое процесс

Процесс — это:

  • отдельное пространство памяти,
  • собственные ресурсы,
  • независимый интерпретатор Python,
  • собственный GIL.

Схематично:

Процесс A                Процесс B
──────────────           ──────────────
Память A                 Память B
GIL A                    GIL B
Код A                    Код B

Главное отличие от потоков:

Процессы полностью изолированы друг от друга.

Это означает:

  • нет общей памяти,
  • нет общей блокировки интерпретатора,
  • возможен настоящий параллелизм на разных ядрах.

Идея решения

Если главная проблема — занятость основного потока, то логично:

Выполнить долгую задачу вне этого потока.

Мы создадим новый процесс и перенесём туда длительную операцию.


Пример с Tkinter и отдельным процессом

import tkinter as tk
import time
from multiprocessing import Process

def background_task():
    time.sleep(15)  # Имитация тяжелой задачи

def start_process():
    label.config(text="Подождите...")

    # Создаем отдельный процесс
    p = Process(target=background_task)
    p.start()

    # Проверяем — завершился ли процесс
    check_process(p)

def check_process(proc):
    if proc.is_alive():
        root.after(100, check_process, proc)
    else:
        label.config(text="Готово!")

if __name__ == '__main__':
    root = tk.Tk()
    root.title("Многопроцессность: tkinter")
    root.geometry('400x400+400+400')

    label = tk.Label(root, text="Нажмите кнопку")
    label.pack(pady=10)

    button = tk.Button(root, text="Старт", command=start_process)
    button.pack()

    root.mainloop()

Что изменилось по сравнению с предыдущим примером

Главный поток больше не выполняет sleep.

Он лишь запускает процесс и возвращает управление обратно в mainloop.


Ключевой момент:

Долгая операция выполняется в другом процессе.

Схема:

Процесс 1 (GUI)              Процесс 2 (фоновый)
─────────────────            ─────────────────────
mainloop                     background_task
обработка событий            sleep(15)
root.after(...)              выполнение

Главный процесс:

  • продолжает работать,
  • обрабатывает события,
  • перерисовывает интерфейс.

Второй процесс:

  • выполняет задачу,
  • не влияет на GUI напрямую.

Мы периодически проверяем:

proc.is_alive()

И делаем это через root.after, чтобы не блокировать цикл событий.


Важные характеристики multiprocessing

Кратко и без углубления:

  • Каждый процесс имеет свой GIL.
  • Возможно реальное использование нескольких ядер.
  • Нет общей памяти.
  • Передача данных требует сериализации.
  • Создание процесса дороже, чем создание потока.

Почему это тяжёлый инструмент

Multiprocessing решает проблему радикально:

Изолируем всё.

Но за это приходится платить:

  • больше памяти,
  • больше накладных расходов,
  • сложнее синхронизация,
  • необходимость защиты точки входа (if __name__ == "__main__").

В задачах интерфейса это решение применяется редко.

Обычно процессы используют:

  • для CPU-bound задач,
  • в научных вычислениях,
  • в обработке больших объёмов данных.

Что мы здесь показали на самом деле

Мы не просто «починили» интерфейс.

Мы показали первую модель конкурентности:

Изоляция через процессы.

Смысл модели:

  • не делить ресурсы,
  • не бороться с блокировками,
  • а полностью разделить выполнение.

Это эффективно, но ресурсоёмко.


Multiprocessing:

  • действительно решает проблему блокировки,
  • даёт настоящий параллелизм,
  • полностью обходит ограничение одного GIL,
  • но является тяжёлым инструментом.

В UI-задачах его используют редко.


Блок 3. Threading — совместная память

После того как мы рассмотрели решение проблемы блокировки с помощью multiprocessing, теперь давайте посмотрим на ещё одну модель конкурентности — потоки (threading).

В отличие от процессов, потоки работают внутри одного процесса, что делает их легче по сравнению с multiprocessing, но также вводит свои ограничения.


Что такое поток?

Поток — это легковесный вариант процесса. Потоки работают внутри одного процесса, поэтому они:

  • делят одну и ту же память,
  • могут общаться напрямую (передавать данные).

Схематично:

Процесс A
────────────
Память A
  └── Поток 1
  └── Поток 2
  └── Поток 3

GIL (Global Interpreter Lock)

В Python существует GIL — механизм, который блокирует выполнение нескольких потоков Python-кода одновременно в одном процессе.

  • GIL ограничивает одновременное выполнение Python-кода внутри одного потока.
  • При этом потоки могут выполняться параллельно, но только на I/O-операциях.

Почему это важно?

  • I/O-bound задачи (например, операции с файлами или сетью) могут выиграть от многопоточности.
  • CPU-bound задачи (например, сложные вычисления) в Python не могут быть эффективно распараллелены с помощью потоков из-за GIL.

Исправление проблемы блокировки в Tkinter с помощью потоков

Рассмотрим улучшенную версию примера с Tkinter, в котором будем использовать потоки для решения проблемы блокировки интерфейса.

Ранее мы использовали multiprocessing, чтобы решить проблему блокировки. Теперь попробуем использовать потоки.

import tkinter as tk
import time
import threading

def background_task():
    time.sleep(5)  # Имитация задержки
    label.config(text="Готово!")

def start_thread():
    label.config(text="Подождите...")
    thread = threading.Thread(target=background_task)
    thread.start()

root = tk.Tk()
root.title("Многопоточность: tkinter")
root.geometry('400x400+400+400')

label = tk.Label(root, text="Нажмите кнопку")
label.pack(pady=10)

button = tk.Button(root, text="Старт", command=start_thread)
button.pack()

root.mainloop()
  1. Потоки позволяют выполнить задачу в фоновом режиме, не блокируя главный поток, который продолжает обрабатывать интерфейс.
  2. Мы создаём новый поток с помощью threading.Thread, и запускаем задачу в этом потоке.

В отличие от первого примера, где программа «замораживалась» из-за того, что главный поток ждал завершения задачи, теперь:

  1. Главный поток остаётся свободным для обработки событий.
  2. Задача выполняется в фоновом потоке, а основной цикл событий продолжает работать.

Это решение, однако, не лишено ограничений.


Разбор GIL. CPU-bound пример:

Если задача требует интенсивных вычислений, потоки не помогут значительно улучшить производительность. Например:

import threading
import time

def cpu_bound_task():
    thread_name = threading.current_thread().name
    print(f"{thread_name} стартовал в {time.perf_counter():.4f}")

    result = 0
    for i in range(30_000_000):
        result += i

    print(f"{thread_name} завершился в {time.perf_counter():.4f}")

start_time = time.perf_counter()

threads = []
for i in range(4):
    thread = threading.Thread(
        target=cpu_bound_task,
        name=f"Thread-{i+1}"
    )
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

end_time = time.perf_counter()

print(f"\nОбщее время выполнения: {end_time - start_time:.4f} секунд")
  • В этом примере Python будет выполнять задачи последовательно, несмотря на несколько потоков.
  • Причина: GIL не позволяет Python выполняться в нескольких потоках одновременно на одном процессоре.

Ради контраста

Можно сначала запустить ОДИН поток и замерить время.

Потом — 10 потоков.

Результат будет примерно таким:

1 поток  ≈ 4.2 сек
10 потоков ≈ 16.8 сек

Мы добавили потоки, но быстрее не стало.

I/O-bound пример

Для I/O-операций потоков будет достаточно, чтобы сделать программу быстрее:

import threading
import time

def io_bound_task():
    time.sleep(5)  # Имитация I/O операции
    print("Готово!")

start_time = time.perf_counter()
# Создание потоков
threads = []
for _ in range(4):
    thread = threading.Thread(target=io_bound_task)
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

end_time = time.perf_counter()

print(f"\nОбщее время выполнения: {end_time - start_time:.4f} секунд")
  • В этом примере потоки работают параллельно, потому что GIL освобождается при ожиданиях (например, при sleep или при чтении с файла).
  • Как один поток, так и 10 закончатся примерно через 5 секунд.

Когда поток вызывает time.sleep(), он не занимает GIL.

Это позволяет другим потокам продолжать выполнение.

Важно

  • GIL блокирует только выполнение Python-кода.
  • Если поток ждёт (например, на операции ввода-вывода или при ожидании), GIL может быть освобождён для других потоков.

Итоговый вопрос: Можно ли обойтись без процессов и потоков?

В некоторых случаях можно избежать использования потоков и процессов вообще. Например:

  • Для I/O-bound задач решение с потоками или асинхронным программированием является оптимальным.
  • Для CPU-bound задач разумнее использовать multiprocessing, так как потоки не смогут ускорить выполнение из-за GIL.

Решение зависит от типа задачи:

  • Для задач, связанных с вводом/выводом (например, работа с сетью или файлами), потоки или асинхронность — оптимальны.
  • Для задач с вычислениями, которые требуют значительных ресурсов процессора, нужно использовать multiprocessing.

Вывод

Потоки — это удобный инструмент для решения проблемы блокировки при I/O-bound задачах. Однако, из-за GIL они не смогут эффективно распараллелить вычислительные задачи.

В следующем блоке мы рассмотрим более эффективный подход для конкурентных задач — асинхронное программирование.


МОДУЛЬ 2 — Асинхронность: кооперативная модель

Мы уже рассмотрели два способа борьбы с блокировкой:

  • изоляция через процессы,
  • параллелизм через потоки.

Оба подхода используют конкуренцию за ресурсы. ОС распределяет процессорное время, переключает потоки, управляет выполнением.

Теперь мы переходим к принципиально другой модели.

Модели, в которой:

Никто ни с кем не конкурирует. Задачи сотрудничают.

Блок 4. Идея кооперативного выполнения

Два способа переключения задач

До этого момента переключение происходило принудительно.

Потоки

Поток A выполняется
     ↓
ОС прерывает
     ↓
Поток B выполняется
  • Решение принимает операционная система.
  • Переключение происходит независимо от логики программы.
  • Это называется вытесняющая (preemptive) многозадачность.

Асинхронная модель

В асинхронной модели всё иначе.

Задача A работает
      ↓
сама уступает управление
      ↓
Задача B начинает работу

Здесь:

  • нет принудительного прерывания,
  • никто не «отбирает» процессор,
  • задача добровольно передаёт управление.

Это называется:

Кооперативная многозадачность.


Главное отличие: кто управляет переключением?

Модель Кто переключает?
Потоки Операционная система
Async Сама программа

Это фундаментальное различие.

В потоках переключение может произойти в любой момент.

В async — только в специально обозначенных местах.


Это возможно потому что в большинстве приложений:

  • мы не считаем миллиарды чисел,
  • мы ждём.

Ждём:

  • ответа сервера,
  • чтения файла,
  • базы данных,
  • пользователя,
  • таймера.

И вот здесь появляется ключевая формула всего урока:

Асинхронность = эффективное ожидание


Что значит «эффективное»?

Не так:

Жду ответ сервера
ничего не делаю
жду
жду
жду

А так:

Жду ответ сервера
→ пока жду, выполняю другую задачу
→ затем возвращаюсь

Важно понять:

Асинхронность не ускоряет вычисления. Она делает ожидание не блокирующим.


Сравнение моделей на одном примере

Представим, что у нас три задачи:

  • A — ждёт 3 секунды
  • B — ждёт 2 секунды
  • C — ждёт 1 секунду

Последовательное выполнение

A (3с) → B (2с) → C (1с)
Итого: 6 секунд

Потоки

A
B
C   (переключение делает ОС)

В случае I/O общее время ≈ 3 секунды.


Асинхронность

A стартует → ждёт → уступает
B стартует → ждёт → уступает
C стартует → ждёт → уступает

Итого: тоже ≈ 3 секунды.

Но:

  • нет потоков,
  • нет конкуренции за память,
  • нет гонок данных,
  • нет блокировок.

Визуальная модель

Представьте одного повара на кухне.

Потоки

Несколько поваров работают параллельно.

  • возможна конкуренция,
  • могут мешать друг другу,
  • нужен контроль.

Асинхронность

Один повар:

  • поставил воду кипятиться,
  • пока вода греется — режет овощи,
  • пока овощи тушатся — моет посуду.

Он не делает всё одновременно.

Он просто не стоит без дела (идеальный сотрудник).

Это и есть кооперативная модель.


Почему это безопаснее

В потоках:

  • общая память,
  • возможны race condition,
  • требуется синхронизация.

В async:

  • одна нить выполнения,
  • переключение только в явных местах,
  • предсказуемость.

Схематично:

Threading:
   [A]--?
        ├── доступ к переменной
   [B]--?

Async:
   A → (пауза) → B → (пауза) → A

В async невозможно, чтобы две задачи одновременно изменяли одну переменную.


Где подвох?

Кооперативная модель работает только если задачи:

Готовы уступать управление.

Если задача:

  • выполняет тяжёлые вычисления,
  • не отдаёт управление,

она блокирует всё приложение.

Асинхронность не магия. Это дисциплина.


До этого мы думали:

Как разделить выполнение?

Теперь мы думаем:

Как организовать правильные точки ожидания?

Это другой уровень проектирования.


Блок 5. Генераторы и yield

В предыдущем блоке мы пришли к ключевой идее:

Асинхронность — это добровольная передача управления.

Но возникает главный технический вопрос:

Как функция может остановиться, сохранить своё состояние и позже продолжить выполнение с того же места?

Обычная функция так не умеет.


Посмотрим на стандартную модель:

вызов → выполнение → return → завершение

После return функция «умирает». Её стек уничтожается. Локальные переменные исчезают.

Никакого «продолжения» быть не может.


Генератор: функция с памятью

Если внутри функции появляется оператор yield, она перестаёт быть обычной функцией.

Она становится генератором.

def simple_generator():
    print("Старт")
    yield 1
    print("Продолжение")
    yield 2
    print("Завершение")

Вызов:

g = simple_generator()

НЕ запускает код.

print(g)

Он создаёт объект-генератор. В терминале будет примерно вот такой ответ: <generator object simple_generator at 0x1036d6740>


Давайте к генератору (итерируемому объекту) применим функцию next().

Напоминаю, что функцию next() мы уже использовали в уроке, когда рассматривали итераторы и итерируемые объекты.

Функция next() извлекает следующий элемент из итератора, вызывая метод __next__() этого объекта. В нашем примере с генераторами, когда мы вызываем next(g), программа приостанавливает выполнение генератора и возвращает значение, на котором она остановилась. При следующем вызове она продолжает выполнение с того места, где была приостановлена, и так далее, пока не достигнет конца генератора или не будет вызвана ошибка.

step1 = next(g) # Старт
print(step1)  # 1

При первом вызове next(g) генерируется значение 1 и выводится "Старт".

step2 = next(g)  # Продолжение
print(step2)  # 2

При следующем вызове next(g) генератор продолжает выполнение и выводит "Продолжение", возвращая значение 2.

step3 = next(g) # Ошибка StopIteration

Когда итерации внутри объекта (количество yeld закончится) генератор "упрется" в ошибку StopIteration.


Что здесь произошло?

Генератор:

  • приостанавливает выполнение на yield,

  • сохраняет:

    • текущее место в коде,
    • локальные переменные,
    • состояние стека,
  • продолжает с той же точки при следующем next().

Это фундамент всей асинхронности.


yield — точка добровольной паузы

В контексте кооперативной модели:

yield = «Я временно уступаю управление».

Важно:

  • генератор (программист, который его написал) сам решает, где поставить yield,
  • переключение происходит только в этих местах,
  • никто не может прервать выполнение между ними.

Это и есть контролируемая кооперация.


Сохранение состояния функции

Рассмотрим пример:

def counter():
    i = 0
    while True:
        yield i
        i += 1

Переменная i сохраняется между вызовами.

Обычная функция каждый раз начинала бы с i = 0.

Генератор продолжает:

0
1
2
3
...

Состояние не теряется.


Генератор как зачаток корутины

Теперь сделаем шаг дальше.

Если генератор не просто отдаёт значения, а используется как единица выполнения — мы получаем корутину.

Что такое coroutine (корутина)?

Корутина — это функция, которая может приостанавливаться и возобновляться.

В Python генераторы — это историческая база для корутин.


Минимальный scheduler (планировщик) на генераторах

В контексте кооперативной многозадачности планировщик (scheduler) — это механизм, который:

  1. Хранит список задач.
  2. Решает, какую задачу выполнить следующей.
  3. Возвращает задачу в очередь, если она не завершена.
  4. Удаляет задачу, если она закончилась.

Проще говоря:

Планировщик — это диспетчер, который распределяет время выполнения между задачами.

В нашем случае он управляет генераторами-корутинами.

Очередь (queue)

Для правильной реализации планировщика нам нужна структура, которая называется очередь (queue).

Очередь позволяет:

  • хранит задачи,
  • брать «первую»,
  • добавлять задачу в «конец».

Именно такая структура называется очередь (queue).

Базовая очередь работает по принципу: Кто первый встал — того первого и обслужили.

[A, B, C]

↓ берём A

[B, C]

↓ добавляем A в конец если задача не завершилась

[B, C, A]

Реализация через обычный список

Рассмотрим исходный код:

def task(name):
    for i in range(3):
        print(f"{name}: шаг {i}")
        yield
        
def scheduler(tasks):
    while tasks:
        task = tasks.pop(0)
        try:
            next(task)
            tasks.append(task)
        except StopIteration:
            print("Задача завершена")
        
tasks = [
    task("A"),
    task("B")
]

scheduler(tasks)

По шагам:

tasks = [A, B] - это Начальное состояние очереди.

Шаг 1

task = tasks.pop(0)

Мы взяли первую задачу и удалили её из списка.

Теперь (внутри программы):

task = A
tasks = [B]

Шаг 2

next(task)

Задача A выполняется до первого yield.

Вывод:

A: шаг 0

Она не завершилась.

Шаг 3

tasks.append(task)

Теперь:

tasks = [B, A]

Мы поставили A в конец очереди.

Эта "карусель", которая будет выполнятся по кругу, пока каждая задача не вернет StopIteration.

Визуальная схема работы

Начало:   [A, B]

1) A → yield → [B, A]
2) B → yield → [A, B]
3) A → yield → [B, A]
4) B → yield → [A, B]

Это называется round-robin планирование — «по кругу».

Более профессиональная реализация — deque (from collections)

В стандартной библиотеке Python есть структура данных:

from collections import deque

deque — это двусторонняя очередь (double-ended queue).

Она позволяет:

  • быстро удалять элементы слева,
  • быстро добавлять элементы справа,
  • без сдвига всего массива.

Операции popleft() и append() работают за O(1) (линейная временная сложность).

O(n) - это обозначение временной сложности алгоритма, которое показывает, как время выполнения алгоритма растёт линейно с увеличением размера входных данных (n).


Планировщик через deque

from collections import deque

def scheduler(tasks):
    tasks = deque(tasks)

    while tasks:
        task = tasks.popleft()
        try:
            next(task)
            tasks.append(task)
        except StopIteration:
            print("Задача завершена")

Логика осталась прежней.

Но теперь:

  • нет сдвига элементов,
  • поведение соответствует настоящей очереди,
  • реализация масштабируется.

Ключевой вывод блока

Мы сейчас построили:

  • примитивный планировщик,
  • очередь готовых задач,
  • round-robin модель распределения времени,
  • основу кооперативной многозадачности.

Без потоков. Без процессов. Без ОС-переключения.

Только:

  • генераторы,
  • yield,
  • очередь задач.

Блок 6. От yield к async / await

В предыдущем блоке мы буквально увидели, как работает асинхронность изнутри.

Теперь переходим к профессиональной реализации этой модели — библиотеке asyncio.

asyncio — это формализованная, стандартизированная и оптимизированная версия того, что мы уже реализовали вручную.

Проблема, которую мы решаем

Нужно выполнить две операции с задержкой.

Простейший вариант:

import time

def task(name):
    for i in range(3):
        print(f"{name}: шаг {i}")
        time.sleep(1)

task("A")
task("B")

Что происходит?

A: шаг 0
(ждём 1 сек)
A: шаг 1
(ждём 1 сек)
A: шаг 2
(ждём 1 сек)
B: шаг 0
...

Общее время: ~6 секунд, потому что time.sleep:

  • блокирует поток,
  • останавливает выполнение программы полностью.

Это последовательная модель.

Асинхронный вариант (разбор аналогов)

Теперь решим ту же задачу с помощью asyncio.

import asyncio

async def task(name):
    for i in range(3):
        print(f"{name}: шаг {i}")
        await asyncio.sleep(1)

async def main():
    await task("A")
    await task("B")

asyncio.run(main())

Этот пример пока не решает нашу основную проблему, но на нем мы можем разобрать основные инструменты библиотеки asyncio.

async def — корутина нового поколения

async def task(name):

Что делает async?

  1. Функция перестаёт быть обычной.
  2. При вызове она НЕ выполняется.
  3. Она возвращает coroutine object.
  4. Её выполнение контролирует event loop.

Это эволюция генератора.

Если раньше:

def task():
    yield

То теперь:

async def task():
    await ...

Разница:

  • генератор управляется вручную через next(),
  • корутина управляется event loop.

await — современный аналог yield

В генераторах мы писали:

yield

Это означало:

Приостановить выполнение и вернуть управление планировщику.

В новой модели:

await asyncio.sleep(1)

Это означает:

Приостановить выполнение и передать управление event loop.

Принцип тот же:

yield  →  добровольная пауза
await  →  добровольная пауза

Разница в уровне абстракции.


Event Loop — промышленный scheduler

В прошлом блоке у нас был:

def scheduler(tasks):

Теперь его роль выполняет event loop внутри asyncio.

Event loop:

  • запускает корутину,
  • отслеживает await,
  • возвращает корутину к выполнению,
  • управляет жизненным циклом задач.

Это тот же самый механизм, но:

  • встроенный,
  • оптимизированный,
  • стандартизированный.

asyncio.run() — точка входа

asyncio.run(main())

Что происходит:

  1. Создаётся event loop.
  2. В него передаётся корутина main().
  3. Loop управляет её выполнением.
  4. После завершения цикл корректно закрывается.

Это безопасный способ запуска асинхронной программы.


Главное отличие: asyncio.sleep vs time.sleep

Теперь ключевой момент блока.

time.sleep(1)

  • блокирует поток,
  • останавливает всю программу,
  • event loop не получает управление.

await asyncio.sleep(1)

  • НЕ блокирует поток,
  • сообщает event loop: «Я жду 1 секунду»,
  • loop может выполнять другие задачи.

Даже если сейчас мы не запускаем задачи конкурентно, мы уже используем неблокирующую модель.

Схематично:

time.sleep:
   Задача → стоп → всё замерло

asyncio.sleep:
   Задача → await → управление возвращено loop

Это принципиальное отличие.

Этот блок про переход от ручной асинхронной модели на генераторах к профессиональной модели async / await.

Далее мы начнём запускать несколько корутин конкурентно и увидим реальную силу асинхронной модели.

Блок 7. Параллельность ожиданий с помощью gather и TaskGroup

Только что мы научились:

  • создавать корутины (async def),
  • приостанавливать их (await),
  • запускать через asyncio.run().

Остался переход к тому, ради чего используется asyncio: параллельность ожиданий.

Возьмём знакомый пример:

import asyncio

async def task(name):
    for i in range(3):
        print(f"{name}: шаг {i}")
        await asyncio.sleep(1)

async def main():
    await task("A")
    await task("B")

asyncio.run(main())

Пока наша программа выполняется примерно ~6 секунд, потому что мы ждём одну корутину до завершения, а потом запускаем вторую.

asyncio.gather — массовый запуск и ожидание

Теперь добавим конкурентность.

import asyncio

async def task(name):
    for i in range(3):
        print(f"{name}: шаг {i}")
        await asyncio.sleep(1)

async def main():
    await asyncio.gather(
        task("A"),
        task("B")
    )

asyncio.run(main())

Что делает asyncio.gather

  1. Принимает несколько корутин.
  2. Регистрирует их в event loop как задачи.
  3. Запускает их конкурентно.
  4. Ждёт завершения всех.
  5. Возвращает результаты.

Теперь программа выполнится примерно за 3 секунды, а не за 6.

Примерный вывод:

A: шаг 0
B: шаг 0
A: шаг 1
B: шаг 1
A: шаг 2
B: шаг 2

Мы не ускорили выполнение кода. Мы убрали простои.

Важно понимать архитектурно.

asyncio.gather:

  • не создаёт потоки,
  • не создаёт процессы,
  • не использует параллелизм CPU.

Это:

  • один поток
  • один event loop
  • несколько корутин
  • переключение в await

Это та же кооперативная модель, только на более высоком уровне.


asyncio.TaskGroup — современный способ массового запуска

Начиная с Python 3.11, появился более структурированный способ — TaskGroup.

Он также находится в asyncio.

Пример:

import asyncio

async def task(name):
    for i in range(3):
        print(f"{name}: шаг {i}")
        await asyncio.sleep(1)

async def main():
    async with asyncio.TaskGroup() as tg:
        tg.create_task(task("A"))
        tg.create_task(task("B"))

asyncio.run(main())

Чем TaskGroup лучше?

  1. Структурированная конкурентность (structured concurrency).
  2. Более предсказуемая обработка ошибок.
  3. Автоматическая отмена всех задач при исключении.
  4. Явное создание задач.

Если одна задача падает:

  • TaskGroup корректно отменяет остальные,
  • исключение пробрасывается наружу.

Это делает код более безопасным.

В современных проектах предпочтение отдаётся TaskGroup.


Разница между gather и TaskGroup в таблице

Характеристика gather TaskGroup
Синтаксис Функция Контекстный менеджер
Создание задач Неявное Явное
Обработка ошибок Менее строгая Более предсказуемая
Современный стиль Старый подход Рекомендуемый

Модуль 3. ПРАКТИКА

Блок 8. Главная идея и Архитектурный паттерн

Асинхронность — это способ эффективно организовать I/O-нагруженную работу. С помощью асинхронного подхода можно значительно ускорить работу с задачами, которые зависят от внешних ресурсов, таких как:

  • Чтение и запись файлов
  • Сетевые запросы
  • Ожидание внешних сервисов (например, базы данных)
  • Обработка множества источников данных

В данном модуле мы будем строить программу, которая:

  1. Асинхронно анализирует 10 текстовых файлов.
  2. Агрегирует результаты.
  3. Делает это корректно и безопасно.

Основной паттерн, который мы будем использовать в этой практике:

main()
    ↓
создание задач (TaskGroup)
    ↓
каждая задача:
    - асинхронно получает данные
    - обрабатывает данные
    - возвращает результат
    ↓
агрегирование результата

Это каноничный паттерн разработки на asyncio. Он используется в продакшн-проектах и является стандартом для работы с асинхронностью в Python.

Какие технологии будем использовать

  • async / await: Это базовая модель кооперативной многозадачности, при которой задачи добровольно передают управление (пауза с помощью await). Такой подход позволяет эффективно работать с I/O-операциями, не блокируя главный поток.

  • asyncio.run(): Точка входа в асинхронную программу. Она позволяет запускать корутины и управлять их выполнением через asyncio event loop.

  • asyncio.TaskGroup или asyncio.gather: Способы управления группой задач. Мы будем использовать их, чтобы:

    • централизованно управлять задачами
    • отменять все задачи, если одна из них завершится с ошибкой если это TaskGroup и безопасно завершить все задачи.
    • продолжать выполнение задач, даже если одна задача выпала с ошибкой, если это gather
  • asyncio.to_thread(): Мы будем читать обычные файлы через стандартный open(), что является блокирующей операцией. Для того чтобы не блокировать event loop, но не писать весь код на aiofiles, мы будем использовать asyncio.to_thread:

await asyncio.to_thread(read_file, path)

Этот метод позволяет запускать блокирующие операции в пуле потоков, не влияя на асинхронный поток выполнения.

Важно:

  • Под капотом это не "магический неблокирующий диск".
  • Он использует thread pool executor.
  • Это обёртка над тем же самым подходом, что и в библиотекиaiofiles.
aiofiles ≈ удобный интерфейс над thread pool

Структура программы

  1. Создание задач: в начале создаём все задачи, которые будут обрабатывать файлы.

  2. Каждая задача:

    • асинхронно читает данные с файлов
    • анализирует их
    • возвращает результат
  3. Агрегирование результата: после того как все задачи завершатся, мы соберём результаты в одну структуру.

Это будет простая, но эффективная программа, которая продемонстрирует силу асинхронности в работе с множеством файлов.


Блок 9. Практика: Асинхронный анализ текстовых файлов

В этой практике мы реализуем одну и ту же задачу несколькими способами. Это позволит не только понять синтаксис асинхронного программирования, но и увидеть разницу в архитектуре и поведении программы.

Мы реализуем задачу:

  1. С использованием TaskGroup
  2. С использованием gather
  3. Без использования асинхронности

После этого сравним время выполнения и поведение программ.

Задача

В директории(папке) находится 10 текстовых файлов.

Каждый файл содержит строки текста. Внутри текста могут встречаться специальные слова-триггеры.

Наша задача:

  1. Прочитать все файлы.

  2. Найти в тексте триггер-слова.

  3. Для каждого триггера определить:

    • количество вхождений
    • позиции в тексте (номер строки и индекс в строке)
  4. Собрать результаты для всех файлов.

Пример триггер-слов:

TRIGGER_WORDS = [
    "error",
    "warning",
    "critical",
    "failed",
    "timeout"
]

Пример файла с текстом:

Lorem ipsum error dolor sit amet consectetur failed critical adipisicing elit. Vel voluptate eligendi dolores optio a voluptates, dolor error repellat deserunt omnis, et quae nulla officiis repellendus neque. 
Id, quisquam hic! Illum eos asperiores reiciendis alias consequatur quia odit, quidem fuga excepturi suscipit esse warning omnis critical aliquid cumque soluta, voluptates placeat impedit nihil timeout!
Lorem failed ipsum dolor sit amet consectetur adipisicing elit. Vel voluptate eligendi dolores optio a voluptates, dolor error repellat deserunt omnis, et quae nulla officiis repellendus neque. 
Id, warning, quisquam hic! Illum error eos failed asperiores timeout reiciendis alias consequatur quia odit, critical quidem fuga excepturi suscipit esse omnis aliquid cumque soluta, voluptates placeat, warning impedit nihil!

Начальные функции (одинаковые для всех реализаций)

Чтение файла:

def read_file(path: str) -> list[str]:
    with open(path, "r", encoding="utf-8") as f:
        return f.readlines()

Это обычная синхронная функция.

Она выполняет блокирующую операцию — чтение файла.


Анализ текста:

def analyze_text(lines: list[str], triggers: list[str]) -> dict:
    result = {}

    for trigger in triggers:
        result[trigger] = {
            "count": 0,
            "positions": []
        }

    for line_number, line in enumerate(lines, start=1):
        lower_line = line.lower()

        for trigger in triggers:
            index = lower_line.find(trigger)

            if index != -1:
                result[trigger]["count"] += 1
                result[trigger]["positions"].append(
                    (line_number, index)
                )

    return result

Функция:

  1. проходит по всем строкам файла
  2. ищет триггер-слова
  3. сохраняет количество и позиции

Асинхронная обработка файла (для асинхронной реализации)

import asyncio

async def process_file(path: str, triggers: list[str]) -> dict:
    lines = await asyncio.to_thread(read_file, path)
    return analyze_text(lines, triggers)

Здесь используется важный механизм:

await asyncio.to_thread(read_file, path)

Он выполняет блокирующее чтение файла в отдельном потоке, не блокируя event loop.

Решение (функция main) с использованием TaskGroup

Теперь реализуем асинхронную обработку файлов через TaskGroup.

async def main():

    paths = [f"file_{i}.txt" for i in range(1, 11)]

    all_results = []

    try:

        async with asyncio.TaskGroup() as tg:

            tasks = [
                tg.create_task(process_file(path, TRIGGER_WORDS))
                for path in paths
            ]

        for task in tasks:
            all_results.append(task.result())

    except* Exception as e:
        print("Ошибка при обработке файлов:", e)
        raise

    print("Результаты анализа:")
    for file_result in all_results:
        print(file_result)


if __name__ == "__main__":
    asyncio.run(main())

Схема выполнения:

main()
   ↓
TaskGroup
   ↓
создаются 10 задач
   ↓
каждая задача:
    process_file()
        ↓
        to_thread(read_file)
        ↓
        analyze_text
   ↓
TaskGroup ждёт завершения всех задач
   ↓
результаты агрегируются

Главная особенность TaskGroup:

если одна задача падает
↓
остальные автоматически отменяются

Это делает выполнение консистентным и безопасным. Такая реализация называется structured concurrency


Решение (функция main) с использованием gather

async def main():

    paths = [f"file_{i}.txt" for i in range(1, 11)]

    tasks = [
        process_file(path, TRIGGER_WORDS)
        for path in paths
    ]

    try:

        all_results = await asyncio.gather(*tasks)

    except Exception as e:
        print("Ошибка:", e)
        return

    print("Результаты анализа:")
    for result in all_results:
        print(result)


if __name__ == "__main__":
    asyncio.run(main())

У gather поведение отличается от TaskGroup:

одна задача падает
↓
остальные продолжают выполняться

Это может приводить к частично выполненной работе. Например:

  • 10 файлов обрабатываются
  • 1 файл вызывает ошибку
  • 9 продолжают работу

Результаты могут оказаться неполными или неконсистентными.

В большинстве современных проектов предпочтение отдают TaskGroup.


Синхронная реализация

def main():

    paths = [f"file_{i}.txt" for i in range(1, 11)]

    all_results = []

    for path in paths:

        lines = read_file(path)

        result = analyze_text(lines, TRIGGER_WORDS)

        all_results.append(result)

    print("Результаты анализа:")

    for result in all_results:
        print(result)


if __name__ == "__main__":
    main()

Файлы обрабатываются строго последовательно.


Замер времени

Для того что бы увидеть разницу в скорости работы двух (синхронной и асинхронной) реализаций создадим декораторы.

Они будут немного отличатся друг от друга.

Декоратор замера времени для асинхронной реализации:

def async_timer(func):

    @wraps(func)
    async def wrapper(*args, **kwargs):

        start = time.perf_counter()

        result = await func(*args, **kwargs)

        end = time.perf_counter()

        print(f"Время выполнения: {end - start:.4f} секунд")

        return result

    return wrapper

Декоратор замера времени для синхронной реализации:

def timer(func):

    @wraps(func)
    def wrapper(*args, **kwargs):

        start = time.perf_counter()

        result = func(*args, **kwargs)

        end = time.perf_counter()

        print(f"Время выполнения: {end - start:.4f} секунд")

        return result

    return wrapper

Если файлы внутри имеют мало информации, и их чтение занимает один такт процессора, то скорее всего время выполнение синхронной версии будет быстрее чем время выполнения асинхронной версии

Почему синхронный код может быть быстрее?

Если у тебя всего 10 файлов, и они маленькие, то:

  • Синхронный код просто открывает каждый файл, читает его, обрабатывает.
  • Асинхронный код создаёт TaskGroup, корутины, event loop, переключение задач, возможно даже asyncio.to_thread() для чтения файлов.
  • Все эти механизмы добавляют накладные расходы (overhead).

То есть для маленьких, быстрых операций асинхронность не выигрывает, а иногда даже проигрывает по времени.

Асинхронность — это не про ускорение выполнения вычислений, а про эффективное использование времени при ожидании:

  • CPU-bound задачи → лучше multiprocessing
  • I/O-bound задачи → лучше async/await

То есть скорость выполнения кода синхронно и асинхронно зависит от нагрузки

Как нам увидеть реальную пользу от асинхронной реализации?

Давайте добавим задержки в места I/O-операции — туда, где программа «ждёт ресурс». В нашем случае - это чтение файлов.

Идея: имитируем медленный диск / сеть.

Задержка time.sleep

def read_file(path: str) -> list[str]:

    time.sleep(0.1)   # имитация медленного I/O

    with open(path, "r", encoding="utf-8") as f:
        return f.readlines()

Для синхронного кода: 10 файлов × 0.1 сек = примерно 1 секунда, потому что синхронный код ждёт каждый файл по очереди.

Для асинхронного кода: 10 задач → все одновременно ждут 0.1 секунды, поэтому общее время будет примерно ~0.1–0.15 секунды.


Блок 10. Самостоятельная практика Анализ логов веб-сервера

В системе мониторинга веб-сервера сохраняются текстовые файлы логов. Каждый файл содержит записи о событиях, происходящих в системе.

Необходимо проанализировать несколько файлов логов и определить, сколько событий каждого типа произошло.

Типы событий:

  • ERROR — ошибка в системе
  • WARNING — предупреждение
  • INFO — информационное сообщение

Каждый лог-файл содержит записи вида:

дата время ТИП_СОБЫТИЯ описание

Пример строки:

2026-02-15 08:05:45 ERROR Database connection failed

Пример содержимого лог-файла:

2026-02-15 00:00:00 START LOG
2026-02-15 08:01:22 INFO User67 logged in
2026-02-15 08:03:10 WARNING User12 failed login attempt
2026-02-15 08:05:45 ERROR Database connection failed
2026-02-15 09:11:00 INFO User45 logged in
2026-02-15 09:15:12 WARNING User12 failed login attempt
2026-02-15 10:45:00 INFO User67 logged in
2026-02-15 11:00:33 ERROR Timeout while processing request
2026-02-15 12:12:12 INFO User88 logged in
2026-02-15 13:05:44 WARNING User77 failed login attempt
2026-02-15 14:22:10 INFO User90 logged in
2026-02-15 15:03:55 ERROR File upload failed
2026-02-15 16:40:11 INFO User12 logged in
2026-02-15 18:01:20 WARNING Disk usage is high
2026-02-15 19:33:45 INFO User34 logged out

Условие задачи

В директории находятся 10 файлов логов:

log_1.txt
log_2.txt
...
log_10.txt

Необходимо написать программу, которая:

  1. Асинхронно прочитает все файлы логов.
  2. Проанализирует содержимое каждого файла.
  3. Подсчитает количество событий типов:
ERROR
WARNING
INFO
  1. Выведет результаты анализа для каждого файла.

Пример вывода:

log_1.txt
ERROR: 2
WARNING: 3
INFO: 5

log_2.txt
ERROR: 1
WARNING: 2
INFO: 6

Уровни сложности

Уровень 1 — Базовый

Реализуйте программу в одном файле.

Требования:

  • использовать asyncio

  • асинхронно обработать 10 файлов

  • для каждого файла:

    • прочитать содержимое
    • подсчитать количество ERROR, WARNING, INFO
  • вывести результат анализа


Уровень 2 — Продвинутый

Добавьте декораторы:

1. Декоратор измерения времени выполнения

Декоратор должен выводить:

Время выполнения программы: X.XXXX секунд

2. Декоратор логирования

При обработке файла нужно выводить сообщения:

Начало обработки log_3.txt
Завершение обработки log_3.txt

Декоратор должен применяться к функции обработки файла.


Уровень 3 — Архитектура проекта

Разделите программу на несколько модулей.

Пример структуры проекта:

log_analyzer/
│
├── main.py
├── config.py
├── reader.py
├── analyzer.py
├── decorators.py
└── logs/
    ├── log_1.txt
    ├── log_2.txt
    ├── log_3.txt
    └── ...

Назначение файлов

main.py

  • точка входа программы
  • запуск асинхронной обработки файлов
  • агрегирование результатов

config.py

  • список типов событий
EVENT_TYPES = ["ERROR", "WARNING", "INFO"]

reader.py

  • функции чтения файлов
  • асинхронная обработка файлов

analyzer.py

  • функции анализа текста
  • подсчёт количества событий

decorators.py

  • декоратор измерения времени
  • декоратор логирования

Предыдущий урок | Следующий урок