info@severcart.org
Русский English

Асинхронное программирование в Python

29 декабря 2017 г.    Python

Синхронное программирование – это то, с чего чаще всего начинается разработка программ, в которых производится последовательное исполнение команд.

Даже при условном ветвлении, циклах и вызовах функций, мы думаем о коде с точки зрения выполнения одного шага за раз. После завершения выполнения текущего шага, выполняется переход к следующему.

Примеры синхронных программ:

  • Программы для пакетной обработки, получающие входные данные, обрабатывающие их и генерирующие выходные. Один шаг логически следует за другим, пока не будет получен желаемый результат.
  • Программы командной строки как правило – это быстрые процессы, превращающие что-то одно во что-то другое. Их можно представить серией последовательных шагов, выполняющих некое действие.

Асинхронная программа ведёт себя по-другому. Она по-прежнему выполняется один шаг за раз, но разница заключается в том, что система двигаясь вперёд не будет ожидать завершения текущего шага выполнения. Это означает, что выполнение переходит к следующим операторам программы, при этом предыдущие шаги выполнения (их может быть и несколько) продолжают работать в фоновом режиме. После того, как один из фоновых шагов завершил своё выполнение, программный код должен обработать это событие.

Когда нам нужно писать такие программы? Иногда асинхронность помогает разрешать те или иные проблемы программирования.

Далее представлена на концептуальном уровне программа, которая может быть кандидатом на асинхронность.

Упрощенный web сервер

Его основная работа такая же, как и приведённый выше пакетных обработчиков, т. е. получить некоторые входные данные, обработать их и вернуть выходные. Если реализовать его в виде синхронной программы, то это был бы абсолютно ужасный веб-сервер.

Почему? Потому что web сервер должен обрабатывать сотни, а иногда и тысячи подключений от пользователей одновременно, а не обслуживать только одного клиента.

Можно ли как-то улучшить синхронный web сервер? Конечно, можно оптимизировать шаги исполнения, сделав их как можно быстрее. К сожалению, нужного эффекта по улучшению работы web сервера это не даст, и он не сможет возвращать ответы достаточно быстро, и не сможет обслуживать достаточное количество пользователей.

Каковы реальные пределы оптимизации указанного подхода? Скорость сети, скорость файлового IO, скорость запроса к базе данных, скорость других подключенных служб и т. д. Общей особенностью этого списка являются то, что все они являются функциями ввода-вывода. Они все на много порядков медленнее, чем скорость работы CPU.

Например, если выполняется запрос к базе данных в синхронной программе, прежде чем будет возвращён ответ клиенту и переход к следующему шагу, CPU будет находиться в состоянии длительного ожидания.

Файловый IO, сеть, база работают достаточно быстро, но намного медленнее, чем CPU. Технологии асинхронного программирования позволяют программам воспользоваться относительно медленными IO процессами, при этом нагружая CPU выполнением другими вычислениями, освобождая его от необходимости ожидать.

Разработка асинхронных программ сложнее синхронных. И это странно, потому что мир, в котором мы живем и с которым взаимодействуем, почти полностью асинхронен.

Пример из жизни. Многие из нас являются родителями, поэтому чтобы больше успеть мы делаем несколько вещей одновременно – домашняя бухгалтерия, стирка и присмотр за детьми.

Подготовительные действия

Все примеры в этой статье были протестированы в Python 3.6.1. Также нам понадобятся модули Twisted и gevent для запуска примеров. Установить их не составит труда менеджером пакетов pip. В качестве ОС рекомендуется использовать Linux или MacOS.

Настоятельно рекомендуется настроить виртуальное окружение (virtualenv) Python для запуска кода, чтобы не превращать системную установку Python в помойку.

Пример 1: Синхронная программа

В этом примере продемонстрирован несколько надуманный способ работы с очередью. Производится простой подсчёт количества элементов в очереди work_queue, а также печать текущей task и вывод итогового значения. Основная часть этой программы обеспечивает наивную основу для множества других скриптов по обработке очереди work_queue.

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
"""
example_1.py
Простой пример программы, демонстрирующей синхронный запуск 'tasks'
"""

import queue

def task(name, work_queue):
    if work_queue.empty():
        print(f'Task {name} nothing to do')
    else:
        while not work_queue.empty():
            count = work_queue.get()
            total = 0
            for x in range(count):
                print(f'Task {name} running')
                total += 1
            print(f'Task {name} total: {total}')


def main():
    """
    Главная точка входа в программу
    """
    # создание очереди 'work'
    work_queue = queue.Queue()

    # помещаем значения 'work' в очередь
    for work in [15, 10, 5, 2]:
        work_queue.put(work)

    # создаём задачи
    tasks = [
        (task, 'One', work_queue),
        (task, 'Two', work_queue)
    ]

    # запускаем задачи
    for t, n, q in tasks:
        t(n, q)

if __name__ == '__main__':  
    main()

Функция task, принимает строку и очередь. В процессе выполнения она просматривает очередь на наличие элементов для обработки, и если они там есть, то забирает значение из очереди, запуская цикл for с количеством итераций, равному этому значению. В завершении, распечатывается итоговое значение. Цикл будет итерироваться до тех пор, пока в очереди есть элементы.

После запуска task получаем список, показывающий, что задача выполняет всю работу. Цикл внутри него выполняет всю работу, расходуя содержимое очереди. Когда этот цикл завершается, task two получает шанс на запуск, но обнаруживает, что очередь пуста и распечатывает сообщение, что очередь пуста и завершает свою работу. В коде нет ничего, что позволяло бы задачам взаимодействовать вместе и меняться между собой.

Пример 2. Простой кооперативный параллелизм

Следующая версия программы демонстрирует возможности двух задач работать совместно с использованием генераторов. Добавление оператора yield в функцию task означает, что после выполнения этого оператора, функция завершает свою работу, но сохраняя свой контекст до следующего запуска. Затем цикл выполнения задачи возобновляет выполнение программы используя вызвав метод t.next(). Этот оператор перезапускает задачу в том месте, где она ранее вызывалась.

Это разновидность кооперативного параллелизма. Программа предоставляет контроль над своим контекстом, позволяя работать и чему-то другому. Это позволяет нашему примитивному планировщику запускать задачи по два экземпляра функции task, каждая из которых обращается в работе к одной очереди. Этот пример более продвинутый, но требует больше кода, чтобы получить схожие результаты, что и в первом примере.

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
"""
example_2.py
Короткий пример демонстрирующий простой автомат в Python
"""

import queue

def task(name, queue):
    while not queue.empty():
        count = queue.get()
        total = 0
        for x in range(count):
            print(f'Task {name} running')
            total += 1
            yield
        print(f'Task {name} total: {total}')

def main():
    """
    Главная точка входа в программу
    """
    # создание очереди для 'work'
    work_queue = queue.Queue()

    # помещение некоторых 'work' в очередь
    for work in [15, 10, 5, 2]:
        work_queue.put(work)

    # создаём задачи
    tasks = [
        task('One', work_queue),
        task('Two', work_queue)
    ]

    # запуск задач
    done = False
    while not done:
        for t in tasks:
            try:
                next(t)
            except StopIteration:
                tasks.remove(t)
            if len(tasks) == 0:
                done = True


if __name__ == '__main__':
    main()

После рассмотрения вывода работы программы, становится видно, что поочерёдно выполняются задачи One и Two, расходуя содержимое work_queue. Как и было задумано, обе task выполняют свою работу, и каждая из них заканчивает обработку двух элементов из очереди. Но опять же, довольно много работы для достижения результата.

Изюминка заключается в использовании оператора yield, который превращает функцию task в функция генератор, «переключатель контекста». Программа использует переключение контекста для запуска двух экземпляров task.

Пример 3: Кооперативная параллельность с блокирующими вызовами

Следующая версия программы (example_3.py) точно такая же, как и предыдущая, за исключением добавления вызова time.sleep (1) в тело цикла задач. Это добавляет задержку в 1 секунду на каждую итерацию цикла задач. Задержка была добавлена для имитации влияния медленного процесса ввода-вывода, возникающего в задаче.

Также был включил простой класс Elapsed Time для обработки функций старт/стоп таймера, используемого в отчётах.

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
"""
example_3.py
Добавлена функция задержки
"""

import time
import queue
from lib.elapsed_time import ET


def task(name, queue):
    while not queue.empty():
        count = queue.get()
        total = 0
        et = ET()
        for x in range(count):
            print(f'Task {name} running')
            time.sleep(1)
            total += 1
            yield
        print(f'Task {name} total: {total}')
        print(f'Task {name} total elapsed time: {et():.1f}')


def main():
    """
    Точка входа в программу main
    """
    # создание очереди для 'work'
    work_queue = queue.Queue()

    # помещение некоторых 'work' в очередь
    for work in [15, 10, 5, 2]:
        work_queue.put(work)


    tasks = [
        task('One', work_queue),
        task('Two', work_queue)
    ]
    # запускаем планировщик запуска задач
    et = ET()
    done = False
    while not done:
        for t in tasks:
            try:
                next(t)
            except StopIteration:
                tasks.remove(t)
            if len(tasks) == 0:
                done = True

    print()
    print('Total elapsed time: {}'.format(et()))


if __name__ == '__main__':
    main()

Вывод показывает, что запущенная программа выполняет обе задачи, расходуя содержимое очереди work_queue, обрабатывая её как и в предыдущем примере. С добавлением ложной задержки IO видим, что наш кооперативный параллелизм не принёс нам ничего – задержка останавливает обработку всей программы, а CPU просто ожидает завершения IO.

Это именно то, что подразумевается под «блокировкой кода» в асинхронной документации. Обратите внимание на время, которое требуется для запуска всей программы, время всех задержек увеличивается по нарастанию. Этот код демонстрирует, то что это не решение.

Пример 4. Кооперативная параллельность с неблокирующими вызовами (gevent)

Следующая версия программы (example_4.py) изменена совсем немного. Она использует модуль асинхронного программирования gevent в верхней части программы. Модуль импортируется вместе с модулем monkey.

Затем вызывается метод модуля monkey, patch_all (). Что это такое? Простое объяснение заключается в том, что он устанавливает программу таким образом, чтобы любой другой импортированный модуль имущий блокирующий (синхронный) код в ней «пропатчен», чтобы сделать его асинхронным.

Как и большинство простых объяснений, это не очень полезно. Что это означает в отношении нашей примерной программы - time.sleep(1) (ложная задержка IO) больше не «блокирует» программу. Вместо этого она дает возможность возращения управления обратно в систему. Обратите внимание, что выражение «yield» из example_3.py больше не присутствует, теперь оно является частью вызова time.sleep (1).

Итак, если функция time.sleep(1) была исправлена gevent, чтобы получить контроль, где находится контроль? Одним из эффектов использования gevent является то, что он запускает поток цикла событий в программе. Для наших целей это похоже на цикл «выполнить задачи» из example_3.py. После завершения задержки time.sleep(1), он возвращает управление следующему исполняемому оператору после инструкции time.sleep(1). Преимущество такого поведения заключается в том, что процессор больше не блокируется задержкой и может свободно выполнять другой код.

Цикл «исполнитель задач» больше не существует, вместо этого наш массив задач содержит два вызова gevent.spawn(...). Два вызова запускают два потока gevent (называемые greenlets), которые представляют собой легкие микропотоки с кооперативным переключение контекста, а не в результате системных переключений, также как и в обычных потоках.

Обратите внимание на gevent.joinall(tasks) сразу после объявления задач. Этот оператор заставляет нашу программу ждать, пока задачи one и two не будет выполнено. Без этого наша программа продолжилась бы print операторами, но, по сути, нечего не делая.

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
"""
example_4.py

Есть задержки, которые влияют на выполнение программы
"""

import gevent
from gevent import monkey
monkey.patch_all()

import time
import queue
from lib.elapsed_time import ET


def task(name, work_queue):
    while not work_queue.empty():
        count = work_queue.get()
        total = 0
        et = ET()
        for x in range(count):
            print(f'Task {name} running')
            time.sleep(1)
            total += 1
        print(f'Task {name} total: {total}')
        print(f'Task {name} total elapsed time: {et():.1f}')


def main():
    """
    Точка входа в программу main
    """
    # создание очереди для 'work'
    work_queue = queue.Queue()

    # помещение некоторых 'work' в очередь
    for work in [15, 10, 5, 2]:
        work_queue.put(work)

    # запуск задач
    et = ET()
    tasks = [
        gevent.spawn(task, 'One', work_queue),
        gevent.spawn(task, 'Two', work_queue)
    ]
    gevent.joinall(tasks)
    print()
    print(f'Total elapsed time: {et():.1f}')


if __name__ == '__main__':
    main()

Обратите внимание, что после запуска программы, что оба задания one и two запускаются одновременно, а затем ожидают при вызове фальшивого IO. Это признак того, что вызов time.sleep(1) больше не блокируется и может выполняется другая работа.

Общее время выполнения в основном на половину меньше времени, затраченного на запуск example_3.py. Теперь начинаются проявляться преимущества асинхронных программ.

Возможность одновременного запуска двух или нескольких операций, выполняющих процессы IO без блокировки. Используя gevent и greenlets, управляя переключателями контекста, мы можем мультиплексировать между задачами без особых проблем.

Пример 5: Синхронные (блокирующие) HTTP загрузки

Следующая версия программы (example_5.py) - это шаг вперед и шаг назад. В настоящее время программа выполняет некоторую фактическую работу с реальным IO, выполняя HTTP запросы к списку URL-адресов и получая содержимое страницы, но выполняя это в блокирующей (синхронной) манере.

В программу были внесены изменения, чтобы импортировать замечательный модуль requests для выполнения HTTP запросов, и добавили в очередь список URL адресов, а не цифры. Внутри task вместо увеличения счетчика используется модуль requests для получения содержимого URL адреса, полученного из очереди и печати затраченного на это времени.

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
"""
example_5.py

Эта версия выполняет фактическую работу, загружая содержимое
URL адреса, полученного из очереди
"""

import queue
import requests
from lib.elapsed_time import ET


def task(name, work_queue):
    while not work_queue.empty():
        url = work_queue.get()
        print(f'Task {name} getting URL: {url}')
        et = ET()
        requests.get(url)
        print(f'Task {name} got URL: {url}')
        print(f'Task {name} total elapsed time: {et():.1f}')
        yield


def main():
    """
    Точка входа в программу main
    """
    # создание очереди для 'work'
    work_queue = queue.Queue()

    # помещение некоторых 'work' в очередь
    for url in [
        "http://google.com",
        "http://yahoo.com",
        "http://linkedin.com",
        "http://shutterfly.com",
        "http://mypublisher.com",
        "http://facebook.com"
    ]:
        work_queue.put(url)

    tasks = [
        task('One', work_queue),
        task('Two', work_queue)
    ]
    # запускаем планировщик задач
    et = ET()
    done = False
    while not done:
        for t in tasks:
            try:
                next(t)
            except StopIteration:
                tasks.remove(t)
            if len(tasks) == 0:
                done = True

    print()
    print(f'Total elapsed time: {et():.1f}')


if __name__ == '__main__':
    main()

Как и в более ранней версии программы, используется yield, чтобы превратить функцию task в генератор и выполнить переключение контекста, чтобы запустить другой экземпляр задачи.

Каждая задача получает URL адрес из work_queue, заращивает содержимое страницы, на которую указывает URL адрес, и сообщает, сколько времени потребовалось, чтобы получить её контент.

Как и раньше, yield позволяет выполнять обе задачи, но поскольку эта программа работает синхронно, каждый вызов request.get() блокирует CPU до тех пор, пока страница не будет получена. Обратите внимание на общее время для запуска всей программы в конце, это имеет значение для следующего примера.

Пример 6. Асинхронные (неблокирующие) HTTP загрузки с gevent

Новая версия программы (example_6.py) изменяет предыдущую версию, чтобы снова воспользоваться модулем gevent. Помните, что вызов gevent monkey.patch_all() изменяет любые следующие модули, поэтому синхронный код становится асинхронным, это относится также к requests.

Теперь в task был удалён оператор yield, потому что вызов request.get(url) больше не блокирует, а выполняет контекстный переход к циклу событий gevent. В разделе «Запуск задачи» использовался gevent для создания двух экземпляров генератора задач, а затем выполняли joinall(), чтобы дождаться их завершения.

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
"""
example_6.py
Эта версия выполняет фактическую работу, загружая содержимое
URL адреса, полученные из очереди. Он также использует gevent для получения URL адреса в асинхронном режиме.
"""

import gevent
from gevent import monkey
monkey.patch_all()

import queue
import requests
from lib.elapsed_time import ET


def task(name, work_queue):
    while not work_queue.empty():
        url = work_queue.get()
        print(f'Task {name} getting URL: {url}')
        et = ET()
        requests.get(url)
        print(f'Task {name} got URL: {url}')
        print(f'Task {name} total elapsed time: {et():.1f}')

def main():
    """
    Точка входа в программу main
    """
    # создание очереди для 'work'
    work_queue = queue.Queue()

    # помещение 'work' в очередь
    for url in [
        "http://google.com",
        "http://yahoo.com",
        "http://linkedin.com",
        "http://shutterfly.com",
        "http://mypublisher.com",
        "http://facebook.com"
    ]:
        work_queue.put(url)

    # запуск задач
    et = ET()
    tasks = [
        gevent.spawn(task, 'One', work_queue),
        gevent.spawn(task, 'Two', work_queue)
    ]
    gevent.joinall(tasks)
    print()
    print(f'Total elapsed time: {et():.1f}')

if __name__ == '__main__':
    main()

В результате работы программы обратим внимание на общее время и отдельные временные интервалы, для получения содержимого URL адреса. Видно, что общее время меньше, чем суммарное время всех вызовов request.get().

Это связано с тем, что вызовы выполняются асинхронно, поэтому эффективно используются преимущества процессора, позволяя ему выполнять сразу несколько запросов.

Пример 7: Асинхронные (неблокирующие) HTTP загрузки с использованием Twisted

Новая версия программы (example_7.py) использует модуль Twisted для того, чтобы сделать практически то же самое, что и модуль gevent, загружая содержимое URL адреса в неблокирующем режиме.

Twisted - очень мощный фреймворк, использующий принципиально иной подход для создания асинхронных программ. В то время как gevent модифицирует модули, чтобы сделать их синхронный код асинхронным, Twisted предоставляет свои собственные функции и методы для достижения аналогичных целей.

В example_6.py использовался запрос request.get(url) для получения содержимого URL адреса, здесь же воспользуемся Twisted функцией getPage(url).

В этой версии декоратор функции @defer.inlineCallbacks работает вместе с yield getPage(url) для выполнения контекстного переключателя в цикле событий Twisted.

В gevent подразумевался цикл событий, но в Twisted он явно предоставляется строкой оператора reactor.run() в нижней части программы.

#!/usr/bin/env python3
# -*- coding:utf-8 -*-

"""
example_7.py

Эта версия выполняет фактическую работу, загружая содержимое URL адреса, полученные из очереди work. В этой версии используется Twisted для обеспечения параллелизма.
"""

from twisted.internet import defer
from twisted.web.client import getPage
from twisted.internet import reactor, task

import queue
from lib.elapsed_time import ET


@defer.inlineCallbacks
def my_task(name, work_queue):
    try:
        while not work_queue.empty():
            url = work_queue.get()
            print(f'Task {name} getting URL: {url}')
            et = ET()
            yield getPage(url)
            print(f'Task {name} got URL: {url}')
            print(f'Task {name} total elapsed time: {et():.1f}')
    except Exception as e:
        print(str(e))


def main():
    """
    Точка входа в программу main
    """
    # создание очереди для 'work'
    work_queue = queue.Queue()

    for url in [
        b"http://google.com",
        b"http://yahoo.com",
        b"http://linkedin.com",
        b"http://shutterfly.com",
        b"http://mypublisher.com",
        b"http://facebook.com"
    ]:
        work_queue.put(url)

    # запускаем задачи
    et = ET()
    defer.DeferredList([
        task.deferLater(reactor, 0, my_task, 'One', work_queue),
        task.deferLater(reactor, 0, my_task, 'Two', work_queue)
    ]).addCallback(lambda _: reactor.stop())

    # запускаем event loop
    reactor.run()

    print()
    print(f'Total elapsed time: {et():.1f}')


if __name__ == '__main__':
    main()

Обратите внимание, что конечный результат такой же, как и версия gevent, общее время выполнения программы меньше, чем суммарное время для каждого запрашиваемого URL адреса.

Пример 8: Асинхронные (неблокирующие) HTTP загрузки с использованием обратных вызовов Twisted

Новая версия программы (example_8.py) также использует библиотеку Twisted, но иллюстрирует более традиционный подход использования Twisted.

Под этим подразумевается, отказ от использования стиля @defer.inlineCallbacks / yield для кодирования и использование явных обратных вызовов. Обратный вызов – это функция, которая регистрируется в системе и может быть вызвана позже в ответ на событие. В приведенном ниже примере поставляемая с Twisted функция success_callback() используется для вызова при завершения выполнения getPage(url).

Обратите внимание, что в программе больше не присутствует декоратор @defer.inlineCallbacks для функции my_task(). Кроме того, функция возвращает переменную d, получаемую вызовом функции getPage(url).

Отложенный Twisted метод предназначен для асинхронного программирования и является точкой привязки обратного вызова. Функция getPage(url) возвращает отложенный обратный вызов страницы (как строки) или errback с описанием ошибки.

#!/usr/bin/env python3
# -*- coding:utf-8 -*-

"""
example_8.py

Загрузка содержимого по URL адресу, полученный из очереди. 
Для обеспечения параллелизма используется Twisted.
"""

from twisted.internet import defer
from twisted.web.client import getPage
from twisted.internet import reactor, task

import queue
from lib.elapsed_time import ET


def success_callback(results, name, url, et):
    print(f'Task {name} got URL: {url}')
    print(f'Task {name} total elapsed time: {et():.1f}')


def my_task(name, queue):
    if not queue.empty():
        while not queue.empty():
            url = queue.get()
            print(f'Task {name} getting URL: {url}')
            et = ET()
            d = getPage(url)
            d.addCallback(success_callback, name, url, et)
            yield d


def main():
    """
    main точка входа в программу
    """
    # создание очереди work_queue
    work_queue = queue.Queue()

    # помещаем url в очередь
    for url in [
        b"http://google.com",
        b"http://yahoo.com",
        b"http://linkedin.com",
        b"http://shutterfly.com",
        b"http://mypublisher.com",
        b"http://facebook.com"
    ]:
        work_queue.put(url)

    # запускаем задачи
    et = ET()

    # создаём кооператор
    coop = task.Cooperator()

    defer.DeferredList([
        coop.coiterate(my_task('One', work_queue)),
        coop.coiterate(my_task('Two', work_queue)),
    ]).addCallback(lambda _: reactor.stop())

    # запускаем event loop
    reactor.run()

    print()
    print(f'Total elapsed time: {et():.1f}')


if __name__ == '__main__':
    main()

Результат запуска этой программы такой же, как и в предыдущих двух примерах. Общее время программы меньше, чем время выполнения URL адресов.

Используете ли вы gevent или Twisted, это вопрос личного предпочтения и стиля кодирования. Оба являются мощными библиотеками, которые предоставляют механизмы, позволяющие программисту создавать асинхронный код.

Вывод

Надеюсь, эта статья помогла вам понять, когда и как асинхронное программирование может быть полезным. Если Вы пишете программу, которая вычисляет число Pi до миллионного десятичного разряда, то асинхронный код здесь не поможет.

Однако, если Вы хотите реализовать сервер или программу, выполняющую значительное количество IO, то асинхронный код может оказать огромное подспорье. Это мощная технология, которая может улучшить Ваши программы до следующего уровня.