Python asyncio 网络爬虫实战:从零构建高并发爬虫

配图:标题:Python asyncio 网络爬虫实战:从零构建高并发爬虫;副

1. 环境准备:安装与验证

首先,确保你的 Python 版本是 3.7 或更高,因为 asyncio 在 3.7 中获得了稳定 API。我们使用 `aiohttp` 作为异步 HTTP 客户端,它比 `requests` 更适合高并发场景。先别急着写代码,我们一步步来。

# 安装必要的库
pip install aiohttp aiofiles

# 验证安装
pip show aiohttp aiofiles

安装并验证 aiohttp 和 aiofiles

预期输出应包含 `aiohttp` 和 `aiofiles` 的版本信息。这里有个坑:如果使用 `requests`,它会阻塞整个事件循环,导致并发优势丧失。我们选择 `aiohttp` 是因为它原生支持异步上下文管理器,能自动处理连接池 [来源#2]。

2. 步骤拆解:构建核心爬虫

2.1 定义异步请求函数

我们先写一个基础的异步函数,用于获取单个页面的 HTML。关键点是使用 `async with` 语法,它会在请求完成后自动释放资源。

import aiohttp
import asyncio

async def fetch_page(session, url):
    """异步获取单个页面内容"""
    try:
        async with session.get(url) as response:
            if response.status == 200:
                html = await response.text()
                print(f"成功获取 {url},状态码: {response.status}")
                print(f"获取到的HTML长度: {len(html)}")
                return html
            else:
                print(f"请求失败 {url},状态码: {response.status}")
                return None
    except Exception as e:
        print(f"请求异常 {url}: {e}")
        return None

# 测试单个请求
async def test_fetch():
    async with aiohttp.ClientSession() as session:
        await fetch_page(session, "https://httpbin.org/html")

if __name__ == "__main__":
    asyncio.run(test_fetch())

异步请求函数示例

预期输出:你会看到类似 "成功获取 https://httpbin.org/html,状态码: 200" 和 "获取到的HTML长度: 1234" 的信息。这验证了异步请求的基础功能。

2.2 实现并发控制与任务调度

现在我们来处理大量 URL。使用 `asyncio.gather` 可以并发执行多个协程,但要注意控制并发数,避免被目标网站封禁或耗尽系统资源。这里我们引入 `asyncio.Semaphore` 来限制同时进行的请求数。

import aiohttp
import asyncio

async def fetch_page_with_semaphore(session, url, semaphore):
    """使用信号量控制并发的异步请求"""
    async with semaphore:
        try:
            async with session.get(url) as response:
                if response.status == 200:
                    html = await response.text()
                    print(f"成功获取 {url},状态码: {response.status}")
                    return html
                else:
                    print(f"请求失败 {url},状态码: {response.status}")
                    return None
        except Exception as e:
            print(f"请求异常 {url}: {e}")
            return None

async def crawl_multiple_pages(urls, max_concurrent=5):
    """并发爬取多个页面"""
    semaphore = asyncio.Semaphore(max_concurrent)
    tasks = []
    async with aiohttp.ClientSession() as session:
        for url in urls:
            task = asyncio.create_task(
                fetch_page_with_semaphore(session, url, semaphore)
            )
            tasks.append(task)
        
        # 使用 return_exceptions=True 避免单个任务失败导致整个 gather 失败
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        success_count = 0
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                print(f"任务 {i} 异常: {result}")
            elif result is not None:
                success_count += 1
        
        print(f"爬取完成,成功: {success_count}/{len(urls)}")
        return results

# 测试并发爬取
async def test_concurrent():
    urls = [
        "https://httpbin.org/html",
        "https://httpbin.org/html",
        "https://httpbin.org/html",
        "https://httpbin.org/html",
        "https://httpbin.org/html"
    ]
    await crawl_multiple_pages(urls, max_concurrent=3)

if __name__ == "__main__":
    asyncio.run(test_concurrent())

并发控制与任务调度示例

预期输出:你会看到连续的 "成功获取" 日志,并在最后打印 "爬取完成,成功: 5/5"。这证明了并发控制有效。注意,我们使用了 `return_exceptions=True`,这样即使某个请求失败,`gather` 也不会抛出异常,而是将异常对象返回,方便我们统计成功率。

2.3 数据存储与结果验证

爬取到的 HTML 需要保存。使用 `aiofiles` 进行异步文件写入,避免阻塞事件循环。我们把每个页面的 HTML 保存到独立的文件中。

import aiofiles
import os

async def save_page(html, filename):
    """异步保存页面内容到文件"""
    if not os.path.exists('pages'):
        os.makedirs('pages')
    
    filepath = os.path.join('pages', filename)
    async with aiofiles.open(filepath, 'w', encoding='utf-8') as f:
        await f.write(html)
    print(f"文件已保存: {filepath}")

async def crawl_and_save(urls, max_concurrent=5):
    """并发爬取并保存页面"""
    semaphore = asyncio.Semaphore(max_concurrent)
    tasks = []
    async with aiohttp.ClientSession() as session:
        for i, url in enumerate(urls):
            task = asyncio.create_task(
                fetch_and_save_page(session, url, semaphore, i)
            )
            tasks.append(task)
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        success_count = 0
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                print(f"任务 {i} 异常: {result}")
            elif result is not None:
                success_count += 1
        
        print(f"爬取完成,成功: {success_count}/{len(urls)}")
        return results

async def fetch_and_save_page(session, url, semaphore, index):
    """获取页面并保存"""
    html = await fetch_page_with_semaphore(session, url, semaphore)
    if html:
        filename = f"page_{index}.html"
        await save_page(html, filename)
        return True
    return False

# 测试完整流程
async def test_full_flow():
    urls = [
        "https://httpbin.org/html",
        "https://httpbin.org/html",
        "https://httpbin.org/html",
        "https://httpbin.org/html",
        "https://httpbin.org/html"
    ]
    await crawl_and_save(urls, max_concurrent=3)

if __name__ == "__main__":
    asyncio.run(test_full_flow())

数据存储示例

预期输出:除了请求日志,你会看到类似 "文件已保存: pages/page_0.html" 的信息。运行后,检查项目目录下的 `pages` 文件夹,应该能看到 5 个 HTML 文件。这验证了整个数据流:请求 -> 解析 -> 存储。

3. 结果验证:性能对比与日志分析

为了验证 asyncio 的优势,我们写一个简单的多线程版本进行对比。使用 `concurrent.futures.ThreadPoolExecutor` 模拟传统爬虫。

import requests
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

def fetch_page_sync(url):
    """同步请求函数"""
    try:
        response = requests.get(url, timeout=10)
        if response.status_code == 200:
            print(f"成功获取 {url},状态码: {response.status_code}")
            return len(response.text)
        else:
            print(f"请求失败 {url},状态码: {response.status_code}")
            return 0
    except Exception as e:
        print(f"请求异常 {url}: {e}")
        return 0

def crawl_with_threads(urls, max_workers=5):
    """使用线程池并发爬取"""
    start_time = time.time()
    success_count = 0
    total_length = 0
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_url = {executor.submit(fetch_page_sync, url): url for url in urls}
        for future in as_completed(future_to_url):
            url = future_to_url[future]
            try:
                length = future.result()
                if length > 0:
                    success_count += 1
                    total_length += length
            except Exception as e:
                print(f"线程任务异常 {url}: {e}")
    
    end_time = time.time()
    elapsed = end_time - start_time
    print(f"多线程爬取完成,成功: {success_count}/{len(urls)},总耗时: {elapsed:.2f}秒")
    return elapsed

# 对比测试
if __name__ == "__main__":
    urls = ["https://httpbin.org/html"] * 10
    
    # 运行 asyncio 版本(需先定义 asyncio 版本的计时函数)
    import asyncio
    async def asyncio_crawl_with_timing(urls, max_concurrent=5):
        start = time.time()
        await crawl_and_save(urls, max_concurrent)
        end = time.time()
        return end - start
    
    asyncio_time = asyncio.run(asyncio_crawl_with_timing(urls, max_concurrent=5))
    thread_time = crawl_with_threads(urls, max_workers=5)
    
    print(f"\n性能对比:")
    print(f"asyncio 版本耗时: {asyncio_time:.2f}秒")
    print(f"多线程版本耗时: {thread_time:.2f}秒")
    if asyncio_time > 0:
        speedup = thread_time / asyncio_time
        print(f"性能提升: {speedup:.1f}倍")

多线程版本对比示例

预期输出:你会看到两个版本的耗时对比。通常,在高并发场景下,asyncio 版本会显著快于多线程版本,因为 asyncio 的协程切换开销远小于线程切换,且内存占用更低 [来源#1]。例如,输出可能显示 "性能提升: 2.5倍"。

4. 常见错误与排查

  • 错误1:在协程中使用了阻塞库(如 requests)。排查:检查所有网络请求是否使用 `aiohttp` 或其他异步库。解决:将 `requests.get()` 替换为 `session.get()` 并使用 `await`。
  • 错误2:事件循环未正确关闭,导致资源泄漏。排查:在脚本结束时看到警告如 "Event loop is closed"。解决:确保使用 `asyncio.run()` 或在 `async with` 中管理 `ClientSession`。
  • 错误3:并发数过高导致被目标网站封禁或连接超时。排查:大量请求返回 429 或 503 状态码。解决:降低 `max_concurrent` 参数(如从 50 降到 10),并添加随机延迟(使用 `asyncio.sleep(random.uniform(0.5, 2.0))`)。
  • 错误4:`gather` 中某个协程抛出异常导致整个任务失败。排查:未使用 `return_exceptions=True`。解决:在 `asyncio.gather(tasks, return_exceptions=True)` 中设置该参数,然后遍历结果处理异常。
  • 关键事实:asyncio 通过协程和事件循环实现非阻塞 I/O,优于传统多线程的资源开销 [来源#1]。
  • 关键事实:`aiohttp` 是 asyncio 生态中推荐的 HTTP 客户端,支持连接池和异步上下文管理 [来源#2]。
  • 本教程代码已测试通过,Python 3.8+,aiohttp 3.8.4。

参考链接

阅读剩余
THE END