引言:网络爬虫在数据科学中的核心地位

在当今大数据时代,网络数据已成为数据科学和机器学习项目中不可或缺的宝贵资源。Python作为数据科学领域的首选编程语言,其强大的生态系统为网络爬虫开发提供了丰富的工具和库。本篇文章将深入探讨如何使用Python高效获取网络数据,并重点解决常见的反爬虫机制,通过实战案例展示从基础到高级的爬虫技术。

网络爬虫(Web Crawler)是一种自动化程序,能够按照预定规则浏览互联网并抓取所需信息。在数据科学项目中,爬虫技术主要用于:

  • 获取训练数据集
  • 监控竞争对手价格
  • 收集舆情分析数据
  • 构建知识图谱
  • 进行市场趋势分析

然而,随着网站安全意识的提高,各种反爬虫技术也应运而生。如何在遵守robots协议和法律法规的前提下,优雅地绕过这些限制,是每个数据工程师必须掌握的技能。本文将从基础请求库的使用讲起,逐步深入到高级反爬虫策略的应对方案。

一、Python爬虫基础技术栈

1.1 核心库介绍

Python爬虫开发主要依赖以下几个核心库:

Requests库:最流行的HTTP库,简洁优雅的API设计使其成为发送HTTP请求的首选。

import requests

# 基础GET请求示例
response = requests.get('https://httpbin.org/get')
print(response.status_code)  # 200
print(response.json())  # 返回响应的JSON数据

BeautifulSoup4:强大的HTML/XML解析库,能够处理各种格式不规范的网页。

from bs4 import BeautifulSoup
import requests

# 解析HTML示例
url = 'https://example.com'
response = requests.get(url)
soup = BeautifulSoup(response.text, 'html.parser')
title = soup.find('h1').text
print(f"页面标题: {title}")

lxml:基于C语言的解析库,速度极快,适合处理大规模数据。

from lxml import etree

# 使用XPath解析
html = '''
<div class="content">
    <h1>标题</h1>
    <p>内容段落</p>
</div>
'''
tree = etree.HTML(html)
title = tree.xpath('//h1/text()')[0]
print(title)  # 标题

Scrapy框架:专业的爬虫框架,内置异步处理、中间件、管道等高级功能。

# Scrapy Spider示例(伪代码)
import scrapy

class ExampleSpider(scrapy.Spider):
    name = 'example'
    start_urls = ['https://example.com']
    
    def parse(self, response):
        yield {
            'title': response.css('h1::text').get(),
            'url': response.url
        }

1.2 环境配置与最佳实践

在开始爬虫开发前,需要配置合适的开发环境。推荐使用虚拟环境隔离依赖:

# 创建虚拟环境
python -m venv crawler_env
source crawler_env/bin/activate  # Linux/Mac
# crawler_env\Scripts\activate  # Windows

# 安装核心库
pip install requests beautifulsoup4 lxml scrapy selenium

最佳实践

  1. 设置合理的请求头:模拟真实浏览器行为
  2. 添加异常处理:网络请求可能失败
  3. 遵守robots协议:尊重网站规则
  4. 控制请求频率:避免对目标网站造成过大压力
  5. 数据持久化:及时保存抓取结果

二、基础爬虫实战:静态网页数据抓取

2.1 案例:豆瓣电影Top250数据抓取

豆瓣电影Top250是一个经典的爬虫练习目标,页面结构清晰且包含分页。我们将使用Requests和BeautifulSoup完成数据抓取。

import requests
from bs4 import BeautifulSoup
import time
import csv
import random

class DoubanMovieCrawler:
    def __init__(self):
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
        }
        self.base_url = 'https://movie.douban.com/top250'
        self.data = []
    
    def get_page(self, url):
        """获取单页内容"""
        try:
            response = requests.get(url, headers=self.headers, timeout=10)
            if response.status_code == 200:
                return response.text
            else:
                print(f"请求失败,状态码: {response.status_code}")
                return None
        except requests.RequestException as e:
            print(f"请求异常: {e}")
            return None
    
    def parse_page(self, html):
        """解析页面内容"""
        soup = BeautifulSoup(html, 'html.parser')
        items = soup.find_all('div', class_='item')
        
        for item in items:
            # 提取电影信息
            title = item.find('span', class_='title').text
            rating = item.find('span', class_='rating_num').text
            quote_tag = item.find('span', class_='inq')
            quote = quote_tag.text if quote_tag else "暂无"
            
            # 提取导演和主演信息
            info = item.find('div', class_='bd').p.text.strip()
            director_actors = info.split('\n')[0]
            
            # 提取电影链接和图片
            link = item.find('a')['href']
            img = item.find('img')['src']
            
            self.data.append({
                'title': title,
                'rating': rating,
                'quote': quote,
                'director_actors': director_actors,
                'link': link,
                'img': img
            })
    
    def crawl(self, max_pages=10):
        """主爬取逻辑"""
        for i in range(max_pages):
            url = f"{self.base_url}?start={i * 25}&filter="
            print(f"正在爬取第 {i + 1} 页...")
            
            html = self.get_page(url)
            if html:
                self.parse_page(html)
            
            # 随机延迟,避免被封
            time.sleep(random.uniform(1, 3))
    
    def save_to_csv(self, filename='douban_top250.csv'):
        """保存数据到CSV"""
        with open(filename, 'w', newline='', encoding='utf-8') as f:
            fieldnames = ['title', 'rating', 'quote', 'director_actors', 'link', 'img']
            writer = csv.DictWriter(f, fieldnames=fieldnames)
            writer.writeheader()
            writer.writerows(self.data)
        print(f"数据已保存到 {filename}")

# 使用示例
if __name__ == '__main__':
    crawler = DoubanMovieCrawler()
    crawler.crawl(max_pages=5)  # 爬取前5页作为演示
    crawler.save_to_csv()

代码解析

  1. 请求头设置:使用User-Agent模拟Chrome浏览器
  2. 异常处理:捕获网络请求异常
  3. 随机延迟:使用random.uniform(1,3)实现随机间隔
  4. 数据结构化:将提取的数据组织成字典列表
  5. CSV持久化:使用Python内置csv模块保存数据

2.2 数据清洗与验证

抓取的数据往往需要进一步清洗和验证:

import pandas as pd

def clean_data(filename='douban_top250.csv'):
    """数据清洗示例"""
    df = pd.read_csv(filename)
    
    # 1. 处理缺失值
    df['quote'].fillna('暂无', inplace=True)
    
    # 2. 提取年份(假设格式为"导演: xxx / 主演: xxx / 年份")
    # 实际需要根据具体格式调整
    df['year'] = df['director_actors'].str.extract(r'(\d{4})')
    
    # 3. 评分转换为数值
    df['rating'] = pd.to_numeric(df['rating'])
    
    # 4. 去重
    df.drop_duplicates(subset=['title'], inplace=True)
    
    # 5. 保存清洗后的数据
    df.to_csv('douban_cleaned.csv', index=False)
    return df

# 执行清洗
cleaned_df = clean_data()
print(cleaned_df.head())

三、动态网页爬取:Selenium与Playwright

3.1 动态网页爬取原理

现代网站大量使用JavaScript动态加载内容,传统Requests库无法获取这些数据。解决方案包括:

  • Selenium:自动化浏览器工具,支持多种浏览器驱动
  • Playwright:微软推出的现代化浏览器自动化工具,支持无头浏览器

3.2 Selenium实战案例

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
import time

class DynamicCrawler:
    def __init__(self, headless=True):
        # 配置Chrome选项
        chrome_options = Options()
        if headless:
            chrome_options.add_argument('--headless')  # 无头模式
        chrome_options.add_argument('--disable-blink-features=AutomationControlled')
        chrome_options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36')
        
        # 初始化驱动(需要提前安装ChromeDriver)
        self.driver = webdriver.Chrome(options=chrome_options)
        self.wait = WebDriverWait(self.driver, 10)
    
    def crawl_dynamic_content(self, url):
        """爬取动态加载内容"""
        try:
            self.driver.get(url)
            
            # 等待特定元素加载完成
            # 例如等待某个class为'content'的元素出现
            content = self.wait.until(
                EC.presence_of_element_located((By.CLASS_NAME, 'content'))
            )
            
            # 模拟滚动加载更多内容
            self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
            time.sleep(2)  # 等待新内容加载
            
            # 获取当前页面源码
            page_source = self.driver.page_source
            
            # 使用BeautifulSoup解析
            from bs4 import BeautifulSoup
            soup = BeautifulSoup(page_source, 'html.parser')
            
            # 提取数据...
            # 例如提取所有文章标题
            titles = soup.find_all('h2', class_='article-title')
            for title in titles:
                print(title.text.strip())
                
        except TimeoutException:
            print("页面加载超时")
        finally:
            self.driver.quit()

# 使用示例
# crawler = DynamicCrawler(headless=True)
# crawler.crawl_dynamic_content('https://example.com/dynamic-page')

Selenium优化技巧

  1. 无头模式:减少资源消耗
  2. 禁用图片加载:提高加载速度
  3. 使用代理IP:配合IP轮换
  4. 智能等待:使用显式等待而非固定sleep

3.3 Playwright现代化方案

Playwright相比Selenium的优势:

  • 更快的执行速度
  • 原生支持异步
  • 更好的反检测能力
  • 内置自动等待机制
import asyncio
from playwright.async_api import async_playwright

async def crawl_with_playwright():
    async with async_playwright() as p:
        # 启动浏览器
        browser = await p.chromium.launch(headless=True)
        page = await browser.new_page()
        
        # 设置用户代理和视窗大小
        await page.set_extra_http_headers({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        })
        await page.set_viewport_size({'width': 1920, 'height': 1080})
        
        # 访问页面
        await page.goto('https://example.com')
        
        # 等待网络空闲(所有请求完成)
        await page.wait_for_load_state('networkidle')
        
        # 执行JavaScript获取数据
        data = await page.evaluate('''() => {
            // 在浏览器环境中执行的代码
            const items = document.querySelectorAll('.data-item');
            return Array.from(items).map(item => ({
                title: item.querySelector('.title').innerText,
                value: item.querySelector('.value').innerText
            }));
        }''')
        
        print(data)
        
        await browser.close()

# 运行异步函数
# asyncio.run(crawl_with_playwright())

四、反爬虫机制与应对策略

4.1 常见反爬虫技术分析

网站管理员通常采用以下技术防止爬虫:

反爬虫技术 原理 常见表现
IP限制 限制单个IP的访问频率 403 Forbidden, 429 Too Many Requests
User-Agent检测 检查请求头是否像浏览器 返回错误页面或验证码
验证码 人机验证 滑动验证、点选验证、文字识别
动态参数 请求参数加密或动态生成 token、sign等参数
JavaScript混淆 隐藏真实数据加载逻辑 加密的数据接口
Cookie验证 检查登录状态和会话 要求登录或返回登录页面

4.2 基础反爬虫应对方案

4.2.1 请求头伪装

import requests
from fake_useragent import UserAgent

def get_random_headers():
    """生成随机请求头"""
    ua = UserAgent()
    return {
        'User-Agent': ua.random,
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
        'Accept-Language': 'en-US,en;q=0.5',
        'Accept-Encoding': 'gzip, deflate',
        'Connection': 'keep-alive',
        'Upgrade-Insecure-Requests': '1',
    }

# 使用示例
headers = get_random_headers()
response = requests.get('https://example.com', headers=headers)

4.2.2 IP代理池

import random
import requests

class ProxyManager:
    def __init__(self, proxy_list=None):
        # 代理列表(格式:协议://IP:端口)
        self.proxy_list = proxy_list or [
            'http://123.45.67.89:8080',
            'http://98.76.54.32:3128',
            'https://111.222.333.44:8080',
        ]
        self.failed_proxies = set()
    
    def get_proxy(self):
        """获取随机代理"""
        available_proxies = [p for p in self.proxy_list if p not in self.failed_proxies]
        if not available_proxies:
            print("所有代理都已失效")
            return None
        return random.choice(available_proxies)
    
    def test_proxy(self, proxy):
        """测试代理是否可用"""
        try:
            test_url = 'http://httpbin.org/ip'
            response = requests.get(test_url, proxies={'http': proxy, 'https': proxy}, timeout=5)
            if response.status_code == 200:
                print(f"代理 {proxy} 可用")
                return True
        except:
            self.failed_proxies.add(proxy)
            return False
    
    def make_request(self, url, max_retries=3):
        """使用代理发送请求"""
        for attempt in range(max_retries):
            proxy = self.get_proxy()
            if not proxy:
                return None
            
            try:
                response = requests.get(
                    url,
                    proxies={'http': proxy, 'https': proxy},
                    timeout=10
                )
                if response.status_code == 200:
                    return response
                else:
                    self.failed_proxies.add(proxy)
            except Exception as e:
                print(f"代理 {proxy} 失败: {e}")
                self.failed_proxies.add(proxy)
        
        return None

# 使用示例
# proxy_mgr = ProxyManager()
# response = proxy_mgr.make_request('https://example.com')

4.2.3 请求频率控制

import time
import random
from functools import wraps

def rate_limit(min_delay=1, max_delay=3):
    """装饰器:控制请求频率"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # 随机延迟
            delay = random.uniform(min_delay, max_delay)
            time.sleep(delay)
            return func(*args, **kwargs)
        return wrapper
    return decorator

# 使用装饰器
@rate_limit(min_delay=2, max_delay=4)
def fetch_url(url):
    return requests.get(url, headers=get_random_headers())

# 批量爬取时自动遵守延迟
urls = ['https://example.com/page1', 'https://example.com/page2']
for url in urls:
    response = fetch_url(url)
    print(response.status_code)

4.3 高级反爬虫应对技术

4.3.1 处理验证码

验证码是爬虫的终极障碍,解决方案包括:

方案1:第三方打码平台(如超级鹰、云打码)

import requests
import base64

def solve_captcha(image_path, api_key):
    """调用打码平台API识别验证码"""
    with open(image_path, 'rb') as f:
        image_data = base64.b64encode(f.read()).decode()
    
    # 超级鹰API示例
    response = requests.post('http://api.chaojiying.com/Upload/Processing.php', data={
        'user': 'your_username',
        'pass2': api_key,
        'softid': 'your_softid',
        'codetype': '1004',  # 验证码类型
        'file_base64': image_data
    })
    
    return response.json().get('pic_str')  # 返回识别结果

方案2:使用OCR技术(简单验证码)

import pytesseract
from PIL import Image

def ocr_captcha(image_path):
    """使用Tesseract OCR识别简单验证码"""
    image = Image.open(image_path)
    # 图像预处理:灰度化、二值化
    image = image.convert('L')
    threshold = 128
    image = image.point(lambda p: p > threshold and 255)
    
    # 使用Tesseract识别
    text = pytesseract.image_to_string(image, config='--psm 7')
    return text.strip()

方案3:人工识别(最可靠但效率低)

def manual_captcha_solver(image_path):
    """人工识别验证码"""
    from PIL import Image
    import matplotlib.pyplot as plt
    
    image = Image.open(image_path)
    plt.imshow(image)
    plt.show()
    
    # 等待用户输入
    captcha_text = input("请输入验证码: ")
    return captcha_text

4.3.2 JavaScript逆向工程

对于动态加密参数,需要分析JavaScript代码:

import execjs  # 需要安装Node.js

# 示例:破解某个网站的sign参数生成逻辑
js_code = '''
function generateSign(timestamp, data) {
    // 这里是逆向得到的JavaScript代码
    var key = "secret_key_123";
    var str = timestamp + data + key;
    var sign = CryptoJS.MD5(str).toString();
    return sign;
}
'''

def get_encrypted_params(data):
    """执行JavaScript获取加密参数"""
    ctx = execjs.compile(js_code)
    timestamp = str(int(time.time()))
    sign = ctx.call('generateSign', timestamp, data)
    
    return {
        'timestamp': timestamp,
        'sign': sign,
        'data': data
    }

# 使用示例
# params = get_encrypted_params('some_data')
# response = requests.get('https://api.example.com/data', params=params)

4.3.3 Session和Cookie管理

保持会话状态绕过简单验证:

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

def create_session_with_retry():
    """创建带重试机制的Session"""
    session = requests.Session()
    
    # 配置重试策略
    retry_strategy = Retry(
        total=3,
        backoff_factor=1,
        status_forcelist=[429, 500, 502, 503, 504],
    )
    adapter = HTTPAdapter(max_retries=retry_strategy)
    session.mount('http://', adapter)
    session.mount('https://', adapter)
    
    # 设置默认请求头
    session.headers.update({
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
    })
    
    return session

def maintain_session(session, login_url, credentials):
    """维护登录状态"""
    # 登录获取Cookie
    response = session.post(login_url, data=credentials)
    if response.status_code == 200:
        print("登录成功")
        # 后续请求会自动携带Cookie
        return True
    return False

# 使用示例
# session = create_session_with_retry()
# maintain_session(session, 'https://example.com/login', {'user': 'xxx', 'pass': 'xxx'})
# response = session.get('https://example.com/protected-page')

4.3.4 分布式爬虫架构

对于大规模数据采集,需要分布式架构:

# 使用Scrapy-Redis实现分布式
# settings.py配置
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
REDIS_URL = 'redis://localhost:6379/0'

# Spider示例
from scrapy_redis.spiders import RedisSpider

class DistributedSpider(RedisSpider):
    name = 'distributed_spider'
    redis_key = 'spider:start_urls'
    
    def parse(self, response):
        # 解析逻辑
        yield {
            'url': response.url,
            'title': response.css('title::text').get()
        }
        # 将新URL推回队列
        for next_page in response.css('a::attr(href)').getall():
            yield response.follow(next_page, self.parse)

五、实战综合案例:电商网站商品信息采集系统

5.1 项目需求分析

目标:采集某电商网站商品信息,包括名称、价格、评价数、店铺信息等,需应对反爬虫机制。

技术栈

  • Requests + BeautifulSoup(基础爬取)
  • Playwright(动态内容)
  • Redis(URL去重和任务队列)
  • PostgreSQL(数据存储)
  • Docker(部署)

5.2 完整代码实现

import requests
from bs4 import BeautifulSoup
import redis
import psycopg2
import json
import time
import random
from datetime import datetime
from urllib.parse import urlparse
import logging

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class EcommerceCrawler:
    def __init__(self, redis_host='localhost', db_config=None):
        # Redis连接(用于URL去重和任务队列)
        self.redis_client = redis.Redis(host=redis_host, port=6379, db=0, decode_responses=True)
        
        # 数据库配置
        self.db_config = db_config or {
            'host': 'localhost',
            'database': 'crawler_db',
            'user': 'postgres',
            'password': 'password'
        }
        
        # 代理池
        self.proxy_pool = [
            'http://proxy1.example.com:8080',
            'http://proxy2.example.com:8080',
        ]
        
        # 请求统计
        self.stats = {
            'total_requests': 0,
            'success_requests': 0,
            'failed_requests': 0,
            'start_time': datetime.now()
        }
    
    def get_proxy(self):
        """获取随机代理"""
        return random.choice(self.proxy_pool) if self.proxy_pool else None
    
    def is_duplicate(self, url):
        """URL去重"""
        url_hash = hash(url)
        return self.redis_client.sismember('visited_urls', url_hash)
    
    def mark_visited(self, url):
        """标记已访问"""
        url_hash = hash(url)
        self.redis_client.sadd('visited_urls', url_hash)
    
    def make_request(self, url, max_retries=3):
        """带代理和重试的请求"""
        for attempt in range(max_retries):
            proxy = self.get_proxy()
            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
                'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
                'Accept-Language': 'en-US,en;q=0.5',
            }
            
            try:
                self.stats['total_requests'] += 1
                response = requests.get(
                    url,
                    headers=headers,
                    proxies={'http': proxy, 'https': proxy} if proxy else None,
                    timeout=15
                )
                
                if response.status_code == 200:
                    self.stats['success_requests'] += 1
                    logging.info(f"成功请求: {url}")
                    return response.text
                else:
                    logging.warning(f"请求失败 [{response.status_code}]: {url}")
                    self.stats['failed_requests'] += 1
                    
            except Exception as e:
                logging.error(f"请求异常 (尝试 {attempt + 1}/{max_retries}): {e}")
                time.sleep(random.uniform(2, 5))
        
        return None
    
    def parse_product_page(self, html, url):
        """解析商品页面"""
        soup = BeautifulSoup(html, 'html.parser')
        
        try:
            # 根据实际网站结构调整选择器
            product = {
                'url': url,
                'name': soup.find('h1', class_='product-title').text.strip() if soup.find('h1', class_='product-title') else None,
                'price': soup.find('span', class_='price').text.strip() if soup.find('span', class_='price') else None,
                'rating': soup.find('span', class_='rating').text.strip() if soup.find('span', class_='rating') else None,
                'review_count': soup.find('span', class_='review-count').text.strip() if soup.find('span', class_='review-count') else None,
                'shop_name': soup.find('div', class_='shop-name').text.strip() if soup.find('div', class_='shop-name') else None,
                'category': soup.find('span', class_='category').text.strip() if soup.find('span', class_='category') else None,
                'crawled_at': datetime.now().isoformat(),
            }
            
            # 提取商品ID(从URL或页面)
            parsed_url = urlparse(url)
            product_id = parsed_url.path.split('/')[-1]
            product['product_id'] = product_id
            
            return product
        except Exception as e:
            logging.error(f"解析失败: {e}")
            return None
    
    def extract_links(self, html, base_url):
        """提取页面中的商品链接"""
        soup = BeautifulSoup(html, 'html.parser')
        links = []
        
        # 提取所有商品详情页链接
        for link in soup.find_all('a', href=True):
            href = link['href']
            # 过滤非商品链接
            if '/product/' in href or '/item/' in href:
                # 补全相对URL
                if href.startswith('/'):
                    full_url = f"{base_url}{href}"
                elif href.startswith('http'):
                    full_url = href
                else:
                    continue
                
                if not self.is_duplicate(full_url):
                    links.append(full_url)
        
        return links
    
    def save_to_db(self, product):
        """保存到PostgreSQL"""
        conn = None
        try:
            conn = psycopg2.connect(**self.db_config)
            cur = conn.cursor()
            
            # 创建表(如果不存在)
            cur.execute("""
                CREATE TABLE IF NOT EXISTS products (
                    id SERIAL PRIMARY KEY,
                    product_id VARCHAR(100) UNIQUE,
                    name TEXT,
                    price VARCHAR(50),
                    rating VARCHAR(20),
                    review_count VARCHAR(20),
                    shop_name VARCHAR(200),
                    category VARCHAR(200),
                    url TEXT,
                    crawled_at TIMESTAMP
                )
            """)
            
            # 插入数据(忽略重复)
            cur.execute("""
                INSERT INTO products 
                (product_id, name, price, rating, review_count, shop_name, category, url, crawled_at)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
                ON CONFLICT (product_id) DO UPDATE SET
                    price = EXCLUDED.price,
                    rating = EXCLUDED.rating,
                    review_count = EXCLUDED.review_count,
                    crawled_at = EXCLUDED.crawled_at
            """, (
                product['product_id'], product['name'], product['price'],
                product['rating'], product['review_count'], product['shop_name'],
                product['category'], product['url'], product['crawled_at']
            ))
            
            conn.commit()
            cur.close()
            logging.info(f"数据已保存: {product['name']}")
            
        except Exception as e:
            logging.error(f"数据库错误: {e}")
        finally:
            if conn:
                conn.close()
    
    def crawl(self, start_urls, max_pages=100):
        """主爬取逻辑"""
        # 初始化任务队列
        for url in start_urls:
            if not self.is_duplicate(url):
                self.redis_client.lpush('crawl_queue', url)
        
        pages_crawled = 0
        
        while pages_crawled < max_pages and not self.redis_client.llen('crawl_queue') == 0:
            # 从队列获取URL
            url = self.redis_client.rpop('crawl_queue')
            if not url:
                break
            
            # 检查是否已访问
            if self.is_duplicate(url):
                continue
            
            self.mark_visited(url)
            
            # 爬取页面
            html = self.make_request(url)
            if not html:
                continue
            
            # 解析商品信息
            product = self.parse_product_page(html, url)
            if product and product['name']:
                self.save_to_db(product)
            
            # 提取新链接并加入队列
            new_links = self.extract_links(html, 'https://example.com')
            for link in new_links:
                if not self.is_duplicate(link):
                    self.redis_client.lpush('crawl_queue', link)
            
            pages_crawled += 1
            
            # 随机延迟
            time.sleep(random.uniform(1, 3))
            
            # 定期打印统计
            if pages_crawled % 10 == 0:
                self.print_stats()
        
        self.print_stats()
    
    def print_stats(self):
        """打印统计信息"""
        elapsed = (datetime.now() - self.stats['start_time']).total_seconds()
        logging.info(f"=== 爬取统计 ===")
        logging.info(f"总请求数: {self.stats['total_requests']}")
        logging.info(f"成功: {self.stats['success_requests']}")
        logging.info(f"失败: {self.stats['failed_requests']}")
        logging.info(f"耗时: {elapsed:.2f}秒")
        logging.info(f"成功率: {self.stats['success_requests']/self.stats['total_requests']*100:.2f}%")

# 使用示例
if __name__ == '__main__':
    # 配置数据库(需提前创建)
    db_config = {
        'host': 'localhost',
        'database': 'crawler_db',
        'user': 'postgres',
        'password': 'your_password'
    }
    
    crawler = EcommerceCrawler(db_config=db_config)
    
    # 起始URL(商品列表页)
    start_urls = [
        'https://example.com/category/electronics',
        'https://example.com/category/books',
    ]
    
    # 开始爬取
    crawler.crawl(start_urls, max_pages=50)

5.3 Docker部署配置

# Dockerfile
FROM python:3.9-slim

WORKDIR /app

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    postgresql-client \
    chromium \
    chromium-driver \
    && rm -rf /var/lib/apt/lists/*

# 安装Python依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制代码
COPY . .

# 设置环境变量
ENV PYTHONUNBUFFERED=1

CMD ["python", "crawler.py"]
# docker-compose.yml
version: '3.8'
services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_DB: crawler_db
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: password
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data

  redis:
    image: redis:6-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data

  crawler:
    build: .
    depends_on:
      - postgres
      - redis
    environment:
      - REDIS_HOST=redis
      - DB_HOST=postgres
    volumes:
      - ./logs:/app/logs

volumes:
  postgres_data:
  redis_data:

六、性能优化与监控

6.1 异步爬虫架构

使用aiohttp实现异步请求,大幅提升效率:

import aiohttp
import asyncio
from bs4 import BeautifulSoup
import asyncpg  # 异步PostgreSQL驱动

class AsyncCrawler:
    def __init__(self, concurrency=10):
        self.concurrency = concurrency  # 并发数
        self.semaphore = asyncio.Semaphore(concurrency)
        self.session = None
    
    async def fetch(self, session, url):
        """异步获取单个URL"""
        async with self.semaphore:  # 控制并发
            try:
                async with session.get(url, timeout=30) as response:
                    if response.status == 200:
                        return await response.text()
                    else:
                        logging.error(f"HTTP {response.status}: {url}")
                        return None
            except Exception as e:
                logging.error(f"请求失败 {url}: {e}")
                return None
    
    async def parse_and_save(self, html, url):
        """解析并保存数据(异步)"""
        if not html:
            return
        
        soup = BeautifulSoup(html, 'html.parser')
        # 解析逻辑...
        
        # 异步保存到数据库
        await self.save_to_db_async({
            'url': url,
            'title': soup.find('title').text if soup.find('title') else '',
            'crawled_at': datetime.now()
        })
    
    async def save_to_db_async(self, data):
        """异步数据库保存"""
        conn = await asyncpg.connect(**self.db_config)
        try:
            await conn.execute("""
                INSERT INTO crawled_data (url, title, crawled_at)
                VALUES ($1, $2, $3)
            """, data['url'], data['title'], data['crawled_at'])
        finally:
            await conn.close()
    
    async def crawl_batch(self, urls):
        """批量爬取"""
        async with aiohttp.ClientSession() as session:
            self.session = session
            tasks = [self.fetch(session, url) for url in urls]
            results = await asyncio.gather(*tasks, return_exceptions=True)
            
            # 处理结果
            for url, html in zip(urls, results):
                if isinstance(html, Exception):
                    logging.error(f"异常: {html}")
                    continue
                await self.parse_and_save(html, url)
    
    def run(self, url_list):
        """运行异步爬虫"""
        asyncio.run(self.crawl_batch(url_list))

# 使用示例
# crawler = AsyncCrawler(concurrency=20)
# crawler.run(['https://example.com/page1', 'https://example.com/page2'])

6.2 爬虫监控与日志

import logging
from logging.handlers import RotatingFileHandler
import psutil
import json

class CrawlerMonitor:
    def __init__(self, log_file='crawler.log'):
        # 配置日志
        self.logger = logging.getLogger('CrawlerMonitor')
        self.logger.setLevel(logging.INFO)
        
        # 文件日志(轮转)
        file_handler = RotatingFileHandler(
            log_file, maxBytes=10*1024*1024, backupCount=5
        )
        file_handler.setFormatter(logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        ))
        self.logger.addHandler(file_handler)
        
        # 控制台日志
        console_handler = logging.StreamHandler()
        console_handler.setFormatter(logging.Formatter(
            '%(asctime)s - %(levelname)s - %(message)s'
        ))
        self.logger.addHandler(console_handler)
        
        # 性能监控
        self.metrics = {
            'requests_per_minute': 0,
            'error_rate': 0,
            'memory_usage': 0,
            'cpu_usage': 0,
            'queue_size': 0
        }
    
    def log_request(self, url, status, response_time):
        """记录请求日志"""
        self.logger.info(f"URL: {url} | Status: {status} | Time: {response_time:.2f}s")
    
    def update_metrics(self, **kwargs):
        """更新监控指标"""
        self.metrics.update(kwargs)
        self.logger.info(f"Metrics: {json.dumps(self.metrics, indent=2)}")
    
    def check_system_resources(self):
        """检查系统资源"""
        process = psutil.Process()
        self.metrics['memory_usage'] = process.memory_info().rss / 1024 / 1024  # MB
        self.metrics['cpu_usage'] = process.cpu_percent()
        self.logger.warning(f"资源警告: Memory={self.metrics['memory_usage']:.2f}MB, CPU={self.metrics['cpu_usage']:.2f}%")

# 使用示例
# monitor = CrawlerMonitor()
# monitor.log_request('https://example.com', 200, 1.5)
# monitor.check_system_resources()

七、法律与道德考量

7.1 遵守robots协议

import requests
from urllib.parse import urlparse

def check_robots_txt(base_url):
    """检查robots.txt"""
    parsed = urlparse(base_url)
    robots_url = f"{parsed.scheme}://{parsed.netloc}/robots.txt"
    
    try:
        response = requests.get(robots_url, timeout=5)
        if response.status_code == 200:
            print("Robots.txt内容:")
            print(response.text)
            
            # 简单解析(实际应使用robotparser模块)
            lines = response.text.split('\n')
            for line in lines:
                if 'Disallow:' in line and '/admin/' in line:
                    print("警告: 禁止爬取/admin/路径")
                    return False
            return True
        else:
            print("未找到robots.txt")
            return True
    except:
        print("无法检查robots.txt")
        return True

# 使用示例
# check_robots_txt('https://example.com')

7.2 法律合规要点

  1. 尊重版权:不要爬取受版权保护的内容用于商业用途
  2. 个人信息保护:避免爬取个人隐私信息
  3. 服务条款:遵守网站的服务条款
  4. 合理使用:控制爬取频率,避免影响网站正常运行
  5. 数据用途:明确数据用途,避免用于不正当竞争

7.3 爬虫伦理准则

  • 透明性:在爬虫中标识自己的身份和联系方式
  • 可追溯性:记录爬取日志,便于事后审计
  • 最小化原则:只爬取必要的数据
  • 及时响应:收到网站管理员投诉后及时停止爬取

八、总结与最佳实践

8.1 技术要点回顾

  1. 基础技术:Requests + BeautifulSoup 适合静态页面
  2. 动态内容:Selenium/Playwright 处理JavaScript渲染
  3. 反爬虫应对:代理池、请求头伪装、频率控制、Session管理
  4. 高级策略:验证码识别、JS逆向、分布式架构
  5. 性能优化:异步爬虫、连接池、缓存策略

8.2 性能对比

方案 速度 资源消耗 反爬虫能力 适用场景
Requests+BS4 静态页面,小规模
Selenium 动态页面,中等规模
Playwright 动态页面,中等规模
异步爬虫 极快 大规模数据采集
分布式爬虫 极快 企业级应用

8.3 推荐技术栈选择

  • 个人学习/小规模:Requests + BeautifulSoup
  • 动态页面/中等规模:Playwright + 异步爬虫
  • 企业级/大规模:Scrapy + Scrapy-Redis + 分布式部署
  • 反爬虫严重:Playwright + 代理池 + 验证码识别

8.4 持续学习建议

  1. 关注技术更新:浏览器自动化工具、反爬虫技术
  2. 学习JavaScript:理解前端加密和动态加载原理
  3. 研究法律案例:了解爬虫相关的法律边界
  4. 参与开源社区:Scrapy、Playwright等项目
  5. 实践多样化:尝试不同类型的网站和反爬虫机制

通过本文的系统学习,您应该能够独立开发中等复杂度的爬虫系统,并具备应对常见反爬虫机制的能力。记住,技术是中立的,但使用技术的人必须遵守法律和道德规范。在实际项目中,始终将合规性放在首位。