Categories: Python

Python 非同步 I/O asyncio 與 aiohttp 模組使用教學與範例

介紹如何在 Python 中使用 asyncioaiohttp 模組開發異步 I/O 的高效率程式。

異步 I/O(asynchronous I/O,或稱非同步 I/O)通常很適合用於 I/O 密集型(I/O bound)或高階結構化的網路程式,以下將介紹 Python 中的異步 I/O 程式設計與實作方法。

實作環境

這裡採用的實作環境為 Python 3.7 以上,並搭配 aiohttp 模組,此模組都可以透過 pip 安裝:

# 安裝所需套件
pip install --upgrade pip aiohttp

協同程序(Coroutines)

一般的副程序(subroutines)會從頭到尾以線性的方式逐行執行,直到執行完成所有富程序的內容才會跳出,而協同程序(coroutines)則是可以從不同的執行點暫停或繼續執行,暫停期間可以允許 CPU 去處理其他的工作。

定義協同程序時要在開頭的 def 之前加上 async(或是在程式碼中使用 async withasync for 也可以),然後在程序內部呼叫其他非同步函數時加上 await(代表此處可以在必要時暫停)。

以下是一個最簡單的協同程序範例:

#!/usr/bin/env python3
import asyncio
import time

async def hello():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

async def main():
    await asyncio.gather(hello(), hello(), hello())

start = time.perf_counter() # 開始測量執行時間

# 執行協同程序
asyncio.run(main())

elapsed = time.perf_counter() - start # 計算程式執行時間
print("執行時間:%f 秒" % elapsed)
Hello
Hello
Hello
World
World
World
執行時間:1.002419 秒

請觀察這段程式輸出訊息的順序,我們透過 asyncio.gather() 呼叫了三次 hello(),但是由於 hello() 是一個協同程序,當執行到第一個 hello()await 時會暫停 1 秒鐘,CPU 就會趁這個時候去執行其他的工作,也就是會去執行第二個 hello() 函數,以此類推,所以 CPU 會將三個 Hello 印出來之後,再印出三個 World,雖然每個 hello() 函數都要等待 1 秒,但總共花費的時間還是只有大約 1 秒左右。

這裡的 asyncio.sleep() 是一個非同步函數,在概念上它代表了一個耗時的工作,也就是說當遇到需要長時間等待的工作時,程式可以讓 CPU 先去執行其他可以先處理的事情,以減少白白等待的時間。

Task 處理大量工作

當遇到大量的非同步工作時,可以利用 asyncio 的 Task 將非同步工作包裝起來,一起透過 asyncio.gather 執行,並取回執行結果:

import asyncio
import time

async def hello(x):
    await asyncio.sleep(1)
    return x*x

async def main():
    # 建立 Task 列表
    tasks = []
    tasks.append(asyncio.create_task(hello(1)))
    tasks.append(asyncio.create_task(hello(2)))
    tasks.append(asyncio.create_task(hello(3)))

    # 執行所有 Tasks
    results = await asyncio.gather(*tasks)

    # 輸出結果
    for r in results:
        print(r)

start = time.perf_counter() # 開始測量執行時間

# 執行協同程序
asyncio.run(main())

elapsed = time.perf_counter() - start # 計算程式執行時間
print("執行時間:%f 秒" % elapsed)
1
4
9
執行時間:1.001932 秒

aiohttp

Python 的 aiohttp 套件提供了非同步版本的 HTTP 協定相關功能。以下是 aiohttp 的基本使用範例:

import aiohttp
import asyncio

async def main():
    async with aiohttp.ClientSession() as session:
        # 指定網站網址
        pokemon_url = 'https://pokeapi.co/api/v2/pokemon/151'
        # 以 aiohttp 擷取網頁資料
        async with session.get(pokemon_url) as resp:
            # 取得 JSON 資料
            pokemon = await resp.json()
            print(pokemon['name'])

# 執行協同程序
asyncio.run(main())
mew

抓取大量網頁資料

上面的範例中,我們透過 aiohttp 取得網頁上的一筆 JSON 格式資料,而如果想要一次抓取大量的 JSON 資料,就可以善用非同步 I/O 的方式來增加效率:

import aiohttp
import asyncio
import time

async def main():
    async with aiohttp.ClientSession() as session:
        # 抓取 150 個網址
        for number in range(1, 151):
            # 指定網站網址
            pokemon_url = f'https://pokeapi.co/api/v2/pokemon/{number}'
            # 以 aiohttp 擷取網頁資料
            async with session.get(pokemon_url) as resp:
                # 取得 JSON 資料
                pokemon = await resp.json()
                print("%d:%s" % (number, pokemon['name']))


start = time.perf_counter() # 開始測量執行時間

# 執行協同程序
asyncio.run(main())

elapsed = time.perf_counter() - start # 計算程式執行時間
print("執行時間:%f 秒" % elapsed)
[略]
148:dragonair
149:dragonite
150:mewtwo
執行時間:18.337544 秒

同步版本

以下是同步版本的範例,可以用來與非同步版本進行比較:

import requests
import time

start = time.perf_counter() # 開始測量執行時間

for number in range(1, 151):
    pokemon_url = f'https://pokeapi.co/api/v2/pokemon/{number}'
    resp = requests.get(pokemon_url)
    pokemon = resp.json()
    print("%d:%s" % (number, pokemon['name']))

elapsed = time.perf_counter() - start # 計算程式執行時間
print("執行時間:%f 秒" % elapsed)
[略]
148:dragonair
149:dragonite
150:mewtwo
執行時間:29.283678 秒

Task 版本

雖然非同步的版本與同步版本相較之下有顯著的差異,但是還是有改善的空間,將每個網址的擷取動作放在個別的協同程序中,以 Task 的方式同時執行,速度可以更快:

import aiohttp
import asyncio
import time

# 抓取指定編號的網址
async def get_pokemon(session, number):
    url = f'https://pokeapi.co/api/v2/pokemon/{number}'
    async with session.get(url) as resp:
        pokemon = await resp.json()
        return "%d:%s" % (number, pokemon['name'])

async def main():
    async with aiohttp.ClientSession() as session:
        # 建立 Task 列表
        tasks = []
        for number in range(1, 151):
            tasks.append(asyncio.create_task(get_pokemon(session, number)))

        # 同時執行所有 Tasks
        original_pokemon = await asyncio.gather(*tasks)

        # 輸出結果
        for pokemon in original_pokemon:
            print(pokemon)

start = time.perf_counter() # 開始測量執行時間

# 執行協同程序
asyncio.run(main())

elapsed = time.perf_counter() - start # 計算程式執行時間
print("執行時間:%f 秒" % elapsed)
[略]
148:dragonair
149:dragonite
150:mewtwo
執行時間:1.012797 秒

參考資料

Share
Published by
Office Guide

Recent Posts

Python 使用 PyAutoGUI 自動操作滑鼠與鍵盤

本篇介紹如何在 Python ...

1 年 ago

Ubuntu Linux 以 WireGuard 架設 VPN 伺服器教學與範例

本篇介紹如何在 Ubuntu ...

1 年 ago

Linux 網路設定 ip 指令用法教學與範例

本篇介紹如何在 Linux 系...

1 年 ago

Linux 以 Cryptsetup、LUKS 加密 USB 隨身碟教學與範例

介紹如何在 Linux 系統中...

1 年 ago