До Python 3.7 разработчики полагались исключительно на threading.local(), чтобы хранить специфичные для запроса данные, такие как пользовательские сессии или соединения с базой данных. Однако, распространение asyncio выявило основную проблему: потоковое локальное хранилище общедоступно для всех сопрограмм, работающих на одном потоке цикла событий. Когда одна асинхронная задача передавала управление, другая могла невольно получить доступ или изменить состояние первой задачи, которое предполагалось как изолированное, что приводило к уязвимостям безопасности и порче данных. PEP 567 ввёл contextvars, чтобы обеспечить изоляцию логического контекста выполнения независимо от потоков ОС, моделируя концепцию по аналогии с подобными механизмами в C# и Erlang.
В синхронном Python каждый HTTP-запрос, как правило, выполняется в своём собственном потоке, что делает threading.local() достаточным для хранения контекста запроса. В асинхронной архитектуре тысячи параллельных запросов могут мультиплексироваться на одном потоке, управляемом циклом событий. Если две асинхронные задачи пересекаются в выполнении — одна приостанавливается на await, тогда как другая продолжается — они делят один и тот же потоковое локальное хранилище. Без механизма для создания снимка и восстановления контекста при переключениях задач глобальное состояние может утекать между логически отдельными операциями. Это создает условия гонки, при которых токен аутентификации Задачи A становится видимым для Задачи B, или границы транзакций базы данных размываются между несвязанными запросами.
Python реализует ContextVar как ключ в неизменяемой карте, хранящейся в состоянии потока. Каждая асинхронная задача поддерживает ссылку на свой собственный объект Context — постоянную структуру данных, где изменения создают новые версии, а не изменяют общее состояние. Когда asyncio приостанавливает задачу на await, он захватывает текущий контекст; при возобновлении он восстанавливает этот контекст, обеспечивая возврат ContextVar.get() значения, связанного с этой конкретной задачей, даже несмотря на возможные изменения потоков ОС. Эта семантика копирования при записи гарантирует изоляцию без накладных расходов на блокировки.
import contextvars import asyncio request_id = contextvars.ContextVar('request_id', default='unknown') async def process_task(task_name): # Установить значение для этого конкретного контекста задачи token = request_id.set(task_name) try: await asyncio.sleep(0.01) # Передать управление, могут выполняться другие задачи current = request_id.get() print(f"Задача {task_name} читает: {current}") finally: request_id.reset(token) # Восстановить предыдущий контекст async def main(): # Запуск двух задач параллельно на одном потоке await asyncio.gather(process_task('Alpha'), process_task('Beta')) asyncio.run(main())
Команда, занимающаяся созданием API-шлюза с высокой пропускной способностью, мигрировала с многопоточного приложения Flask на асинхронный сервис FastAPI. Они обнаружили, что их промежуточное ПО для аутентификации, которое хранило текущего пользователя в threading.local(), случайным образом присваивало личность Пользователя A запросам Пользователя B под нагрузкой. Первоначальная отладка предполагала условия гонки, но журналы показывали, что назначения происходят даже при развертывании с одним рабочим. Коренной причиной стало кооперативное многозадачное выполнение asyncio, где один обработчик запроса приостановливается во время вызова базы данных, позволяя другому обработчику выполняться на том же потоке и унаследовать потоковое локальное хранение.
Команда первоначально попыталась использовать глобальный словарь по threading.get_ident(), полагая, что это изолирует запросы. Этот подход представил простую миграцию из старого кодового базиса без введения внешних зависимостей. Однако под uvicorn с asyncio один и тот же поток обрабатывает несколько запросов последовательно, что означает, что словарь сохраняет устаревшие данные от предыдущих запросов и вызывает ошибки повышения привилегий, когда аутентифицированные сессии неправильно сохранялись между несвязанными запросами.
Они рефакторили каждую сигнатуру функции, чтобы принять параметр context, передавая его через весь стек вызовов от промежуточного ПО до слоя базы данных. Этот явный поток данных устранил скрытое состояние и работал как на синхронных, так и на асинхронных границах. К сожалению, это требовало масштабного рефакторинга, затрагивающего тысячи функций и нарушающего интеграции сторонних библиотек, ожидающих глобальные объекты конфигурации, в то время как получившаяся тривиальная кодовая многословность значительно увеличивала нагрузку на обслуживание и риск ошибок разработчиков.
Команда приняла contextvars.ContextVar, чтобы хранить объект аутентифицированного пользователя, позволяя промежуточному ПО установить переменную при входе в запрос, в то время как функции подсистемы получали к ней доступ через .get(), не загрязняя сигнатуры функций. Этот подход не требовал архитектурных изменений и обеспечивал автоматическую изоляцию между параллельными задачами, хотя требовал тщательного управления токенами reset(), чтобы предотвратить утечки памяти в длительно работающих процессах. Кроме того, отладка стала более сложной, поскольку состояние стало неявным в контексте выполнения, а не видимым в трассировках стека.
В конечном итоге они выбрали contextvars, потому что прототипирование показало, что это требует изменений только в слое промежуточного ПО, избегая масштабного рефакторинга, связанного с явной передачей контекста. Заключив обработчики запросов в блоки try/finally, чтобы гарантировать сброс токенов, они предотвратили утечки памяти, сохранив при этом чистые сигнатуры функций. Шлюз теперь обрабатывает 50,000 одновременных подключений на каждый рабочий без утечки данных между запросами, и команда сократила количество потоков ОС с 100 на экземпляр до 4, сократив использование памяти на 80% и улучшив общую пропускную способность на 300%.
Почему threading.local() не работает в асинхронном коде, но работает в многопоточном?
В многопоточном Python операционная система предвосхищает расписание потоков, и каждый поддерживает свой собственный C стек и структуру PyThreadState. threading.local() сопоставляет переменные с этой идентификацией потока на уровне ОС, обеспечивая изоляцию. В asyncio цикл событий кооперативно планирует задачи на одном потоке, используя очередь; когда задача передает управление, цикл сразу же запускает другую задачу на том же потоке, не переключая PyThreadState. В результате threading.local() видит один и тот же ключ для обеих задач, что вызывает утечку состояния. Contextvars решает эту проблему, поддерживая стек сопоставлений контекста внутри PyThreadState, который цикл событий меняет во время переключения задач, создавая логическую изоляцию, независимую от потоков ОС.
Что происходит, если вы забыли сбросить токен ContextVar?
ContextVar.set() возвращает объект Token, представляющий предыдущее состояние, который должен быть передан в reset(), чтобы восстановить предыдущее значение. Если вы это пропустите — например, не добавив блок try/finally — переменная сохранит свое значение за пределами предполагаемой области. В долгосрочных асинхронных серверах это создает утечку памяти, когда старые контексты запросов накапливаются в цепочке контекста, и последующие задачи на этом потоке могут унаследовать устаревшие значения, если контекст не будет должным образом восстановлен. В отличие от традиционных переменных стека, которые исчезают при возврате функций, переменные контекста сохраняются в контексте выполнения до тех пор, пока не будут явно сброшены или пока задача не завершится, что делает очистку обязательной.
Как контекстные переменные передаются на дочерние задачи и потоки?
При использовании asyncio.create_task() дочерняя задача автоматически получает копию текущего контекста родителя, гарантируя, что контекстные переменные естественным образом передаются вниз по графу асинхронных вызовов. Однако при использовании concurrent.futures.ThreadPoolExecutor или loop.run_in_executor() вызываемая функция выполняется в другом потоке ОС, который по умолчанию начинается с пустого контекста. Кандидаты часто предполагают, что контекст передается через границы потоков так же, как потоковое локальное хранилище, но contextvars специфичны для логического асинхронного контекста. Чтобы передать значения в потоки, вы должны явно захватить контекст, используя contextvars.copy_context(), и выполнить функцию внутри него с помощью context.run(), или вручную передать переменные в качестве аргументов.