50ae12a099b01792 8 951 98 30 964 info@severcart.org
Русский English

Асинхронное программирование в Python. Часть 3

4 января 2018 г.

Пример 5: Синхронные (блокирующие) HTTP загрузки

Продолжение части 1 и части 2.

Следующая версия программы (example_5.py) - это шаг вперед и шаг назад. В настоящее время программа выполняет некоторую фактическую работу с реальным IO, выполняя HTTP запросы к списку URL-адресов и получая содержимое страницы, но выполняя это в блокирующей (синхронной) манере.

В программу были внесены изменения, чтобы импортировать замечательный модуль requests для выполнения HTTP запросов, и добавили в очередь список URL адресов, а не цифры. Внутри task вместо увеличения счетчика используется модуль requests для получения содержимого URL адреса, полученного из очереди и печати затраченного на это времени.

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
"""
example_5.py

Эта версия выполняет фактическую работу, загружая содержимое
URL адреса, полученного из очереди
"""

import queue
import requests
from lib.elapsed_time import ET


def task(name, work_queue):
    while not work_queue.empty():
        url = work_queue.get()
        print(f'Task {name} getting URL: {url}')
        et = ET()
        requests.get(url)
        print(f'Task {name} got URL: {url}')
        print(f'Task {name} total elapsed time: {et():.1f}')
        yield


def main():
    """
    Точка входа в программу main
    """
    # создание очереди для 'work'
    work_queue = queue.Queue()

    # помещение некоторых 'work' в очередь
    for url in [
        "http://google.com",
        "http://yahoo.com",
        "http://linkedin.com",
        "http://shutterfly.com",
        "http://mypublisher.com",
        "http://facebook.com"
    ]:
        work_queue.put(url)

    tasks = [
        task('One', work_queue),
        task('Two', work_queue)
    ]
    # запускаем планировщик задач
    et = ET()
    done = False
    while not done:
        for t in tasks:
            try:
                next(t)
            except StopIteration:
                tasks.remove(t)
            if len(tasks) == 0:
                done = True

    print()
    print(f'Total elapsed time: {et():.1f}')


if __name__ == '__main__':
    main()

Как и в более ранней версии программы, используется yield, чтобы превратить функцию task в генератор и выполнить переключение контекста, чтобы запустить другой экземпляр задачи.

Каждая задача получает URL адрес из work_queue, заращивает содержимое страницы, на которую указывает URL адрес, и сообщает, сколько времени потребовалось, чтобы получить её контент.

Как и раньше, yield позволяет выполнять обе задачи, но поскольку эта программа работает синхронно, каждый вызов request.get() блокирует CPU до тех пор, пока страница не будет получена. Обратите внимание на общее время для запуска всей программы в конце, это имеет значение для следующего примера.

Пример 6. Асинхронные (неблокирующие) HTTP загрузки с gevent

Новая версия программы (example_6.py) изменяет предыдущую версию, чтобы снова воспользоваться модулем gevent. Помните, что вызов gevent monkey.patch_all() изменяет любые следующие модули, поэтому синхронный код становится асинхронным, это относится также к requests.

Теперь в task был удалён оператор yield, потому что вызов request.get(url) больше не блокирует, а выполняет контекстный переход к циклу событий gevent. В разделе «Запуск задачи» использовался gevent для создания двух экземпляров генератора задач, а затем выполняли joinall(), чтобы дождаться их завершения.

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
"""
example_6.py
Эта версия выполняет фактическую работу, загружая содержимое
URL адреса, полученные из очереди. Он также использует gevent для получения URL адреса в асинхронном режиме.
"""

import gevent
from gevent import monkey
monkey.patch_all()

import queue
import requests
from lib.elapsed_time import ET


def task(name, work_queue):
    while not work_queue.empty():
        url = work_queue.get()
        print(f'Task {name} getting URL: {url}')
        et = ET()
        requests.get(url)
        print(f'Task {name} got URL: {url}')
        print(f'Task {name} total elapsed time: {et():.1f}')

def main():
    """
    Точка входа в программу main
    """
    # создание очереди для 'work'
    work_queue = queue.Queue()

    # помещение 'work' в очередь
    for url in [
        "http://google.com",
        "http://yahoo.com",
        "http://linkedin.com",
        "http://shutterfly.com",
        "http://mypublisher.com",
        "http://facebook.com"
    ]:
        work_queue.put(url)

    # запуск задач
    et = ET()
    tasks = [
        gevent.spawn(task, 'One', work_queue),
        gevent.spawn(task, 'Two', work_queue)
    ]
    gevent.joinall(tasks)
    print()
    print(f'Total elapsed time: {et():.1f}')

if __name__ == '__main__':
    main()

В результате работы программы обратим внимание на общее время и отдельные временные интервалы, для получения содержимого URL адреса. Видно, что общее время меньше, чем суммарное время всех вызовов request.get().

Это связано с тем, что вызовы выполняются асинхронно, поэтому эффективно используются преимущества процессора, позволяя ему выполнять сразу несколько запросов.

Пример 7: Асинхронные (неблокирующие) HTTP загрузки с использованием Twisted

Новая версия программы (example_7.py) использует модуль Twisted для того, чтобы сделать практически то же самое, что и модуль gevent, загружая содержимое URL адреса в неблокирующем режиме.

Twisted - очень мощный фреймворк, использующий принципиально иной подход для создания асинхронных программ. В то время как gevent модифицирует модули, чтобы сделать их синхронный код асинхронным, Twisted предоставляет свои собственные функции и методы для достижения аналогичных целей.

В example_6.py использовался запрос request.get(url) для получения содержимого URL адреса, здесь же воспользуемся Twisted функцией getPage(url).

В этой версии декоратор функции @defer.inlineCallbacks работает вместе с yield getPage(url) для выполнения контекстного переключателя в цикле событий Twisted.

В gevent подразумевался цикл событий, но в Twisted он явно предоставляется строкой оператора reactor.run() в нижней части программы.

#!/usr/bin/env python3
# -*- coding:utf-8 -*-

"""
example_7.py

Эта версия выполняет фактическую работу, загружая содержимое URL адреса, полученные из очереди work. В этой версии используется Twisted для обеспечения параллелизма.
"""

from twisted.internet import defer
from twisted.web.client import getPage
from twisted.internet import reactor, task

import queue
from lib.elapsed_time import ET


@defer.inlineCallbacks
def my_task(name, work_queue):
    try:
        while not work_queue.empty():
            url = work_queue.get()
            print(f'Task {name} getting URL: {url}')
            et = ET()
            yield getPage(url)
            print(f'Task {name} got URL: {url}')
            print(f'Task {name} total elapsed time: {et():.1f}')
    except Exception as e:
        print(str(e))


def main():
    """
    Точка входа в программу main
    """
    # создание очереди для 'work'
    work_queue = queue.Queue()

    for url in [
        b"http://google.com",
        b"http://yahoo.com",
        b"http://linkedin.com",
        b"http://shutterfly.com",
        b"http://mypublisher.com",
        b"http://facebook.com"
    ]:
        work_queue.put(url)

    # запускаем задачи
    et = ET()
    defer.DeferredList([
        task.deferLater(reactor, 0, my_task, 'One', work_queue),
        task.deferLater(reactor, 0, my_task, 'Two', work_queue)
    ]).addCallback(lambda _: reactor.stop())

    # запускаем event loop
    reactor.run()

    print()
    print(f'Total elapsed time: {et():.1f}')


if __name__ == '__main__':
    main()

Обратите внимание, что конечный результат такой же, как и версия gevent, общее время выполнения программы меньше, чем суммарное время для каждого запрашиваемого URL адреса.

Продолжение часть 4.