50ae12a099b01792 8 951 98 30 964 info@severcart.org
Русский English

Асинхронное программирование в Python. Часть 2

1 января 2018 г.

Пример 3: Кооперативная параллельность с блокирующими вызовами

Продолжение части 1.

Следующая версия программы (example_3.py) точно такая же, как и предыдущая, за исключением добавления вызова time.sleep (1) в тело цикла задач. Это добавляет задержку в 1 секунду на каждую итерацию цикла задач. Задержка была добавлена для имитации влияния медленного процесса ввода-вывода, возникающего в задаче.

Также был включил простой класс Elapsed Time для обработки функций старт/стоп таймера, используемого в отчётах.

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
"""
example_3.py
Добавлена функция задержки
"""

import time
import queue
from lib.elapsed_time import ET


def task(name, queue):
    while not queue.empty():
        count = queue.get()
        total = 0
        et = ET()
        for x in range(count):
            print(f'Task {name} running')
            time.sleep(1)
            total += 1
            yield
        print(f'Task {name} total: {total}')
        print(f'Task {name} total elapsed time: {et():.1f}')


def main():
    """
    Точка входа в программу main
    """
    # создание очереди для 'work'
    work_queue = queue.Queue()

    # помещение некоторых 'work' в очередь
    for work in [15, 10, 5, 2]:
        work_queue.put(work)


    tasks = [
        task('One', work_queue),
        task('Two', work_queue)
    ]
    # запускаем планировщик запуска задач
    et = ET()
    done = False
    while not done:
        for t in tasks:
            try:
                next(t)
            except StopIteration:
                tasks.remove(t)
            if len(tasks) == 0:
                done = True

    print()
    print('Total elapsed time: {}'.format(et()))


if __name__ == '__main__':
    main()

Вывод показывает, что запущенная программа выполняет обе задачи, расходуя содержимое очереди work_queue, обрабатывая её как и в предыдущем примере. С добавлением ложной задержки IO видим, что наш кооперативный параллелизм не принёс нам ничего – задержка останавливает обработку всей программы, а CPU просто ожидает завершения IO.

Это именно то, что подразумевается под «блокировкой кода» в асинхронной документации. Обратите внимание на время, которое требуется для запуска всей программы, время всех задержек увеличивается по нарастанию. Этот код демонстрирует, то что это не решение.

Пример 4. Кооперативная параллельность с неблокирующими вызовами (gevent)

Следующая версия программы (example_4.py) изменена совсем немного. Она использует модуль асинхронного программирования gevent в верхней части программы. Модуль импортируется вместе с модулем monkey.

Затем вызывается метод модуля monkey, patch_all (). Что это такое? Простое объяснение заключается в том, что он устанавливает программу таким образом, чтобы любой другой импортированный модуль имущий блокирующий (синхронный) код в ней «пропатчен», чтобы сделать его асинхронным.

Как и большинство простых объяснений, это не очень полезно. Что это означает в отношении нашей примерной программы - time.sleep(1) (ложная задержка IO) больше не «блокирует» программу. Вместо этого она дает возможность возращения управления обратно в систему. Обратите внимание, что выражение «yield» из example_3.py больше не присутствует, теперь оно является частью вызова time.sleep (1).

Итак, если функция time.sleep(1) была исправлена gevent, чтобы получить контроль, где находится контроль? Одним из эффектов использования gevent является то, что он запускает поток цикла событий в программе. Для наших целей это похоже на цикл «выполнить задачи» из example_3.py. После завершения задержки time.sleep(1), он возвращает управление следующему исполняемому оператору после инструкции time.sleep(1). Преимущество такого поведения заключается в том, что процессор больше не блокируется задержкой и может свободно выполнять другой код.

Цикл «исполнитель задач» больше не существует, вместо этого наш массив задач содержит два вызова gevent.spawn(...). Два вызова запускают два потока gevent (называемые greenlets), которые представляют собой легкие микропотоки с кооперативным переключение контекста, а не в результате системных переключений, также как и в обычных потоках.

Обратите внимание на gevent.joinall(tasks) сразу после объявления задач. Этот оператор заставляет нашу программу ждать, пока задачи one и two не будет выполнено. Без этого наша программа продолжилась бы print операторами, но, по сути, нечего не делая.

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
"""
example_4.py

Есть задержки, которые влияют на выполнение программы
"""

import gevent
from gevent import monkey
monkey.patch_all()

import time
import queue
from lib.elapsed_time import ET


def task(name, work_queue):
    while not work_queue.empty():
        count = work_queue.get()
        total = 0
        et = ET()
        for x in range(count):
            print(f'Task {name} running')
            time.sleep(1)
            total += 1
        print(f'Task {name} total: {total}')
        print(f'Task {name} total elapsed time: {et():.1f}')


def main():
    """
    Точка входа в программу main
    """
    # создание очереди для 'work'
    work_queue = queue.Queue()

    # помещение некоторых 'work' в очередь
    for work in [15, 10, 5, 2]:
        work_queue.put(work)

    # запуск задач
    et = ET()
    tasks = [
        gevent.spawn(task, 'One', work_queue),
        gevent.spawn(task, 'Two', work_queue)
    ]
    gevent.joinall(tasks)
    print()
    print(f'Total elapsed time: {et():.1f}')


if __name__ == '__main__':
    main()

Обратите внимание, что после запуска программы, что оба задания one и two запускаются одновременно, а затем ожидают при вызове фальшивого IO. Это признак того, что вызов time.sleep(1) больше не блокируется и может выполняется другая работа.

Общее время выполнения в основном на половину меньше времени, затраченного на запуск example_3.py. Теперь начинаются проявляться преимущества асинхронных программ.

Возможность одновременного запуска двух или нескольких операций, выполняющих процессы IO без блокировки. Используя gevent и greenlets, управляя переключателями контекста, мы можем мультиплексировать между задачами без особых проблем.

Следующая часть 3.