引言:网络爬虫在数据科学中的核心地位
在当今大数据时代,网络数据已成为数据科学和机器学习项目中不可或缺的宝贵资源。Python作为数据科学领域的首选编程语言,其强大的生态系统为网络爬虫开发提供了丰富的工具和库。本篇文章将深入探讨如何使用Python高效获取网络数据,并重点解决常见的反爬虫机制,通过实战案例展示从基础到高级的爬虫技术。
网络爬虫(Web Crawler)是一种自动化程序,能够按照预定规则浏览互联网并抓取所需信息。在数据科学项目中,爬虫技术主要用于:
- 获取训练数据集
- 监控竞争对手价格
- 收集舆情分析数据
- 构建知识图谱
- 进行市场趋势分析
然而,随着网站安全意识的提高,各种反爬虫技术也应运而生。如何在遵守robots协议和法律法规的前提下,优雅地绕过这些限制,是每个数据工程师必须掌握的技能。本文将从基础请求库的使用讲起,逐步深入到高级反爬虫策略的应对方案。
一、Python爬虫基础技术栈
1.1 核心库介绍
Python爬虫开发主要依赖以下几个核心库:
Requests库:最流行的HTTP库,简洁优雅的API设计使其成为发送HTTP请求的首选。
import requests
# 基础GET请求示例
response = requests.get('https://httpbin.org/get')
print(response.status_code) # 200
print(response.json()) # 返回响应的JSON数据
BeautifulSoup4:强大的HTML/XML解析库,能够处理各种格式不规范的网页。
from bs4 import BeautifulSoup
import requests
# 解析HTML示例
url = 'https://example.com'
response = requests.get(url)
soup = BeautifulSoup(response.text, 'html.parser')
title = soup.find('h1').text
print(f"页面标题: {title}")
lxml:基于C语言的解析库,速度极快,适合处理大规模数据。
from lxml import etree
# 使用XPath解析
html = '''
<div class="content">
<h1>标题</h1>
<p>内容段落</p>
</div>
'''
tree = etree.HTML(html)
title = tree.xpath('//h1/text()')[0]
print(title) # 标题
Scrapy框架:专业的爬虫框架,内置异步处理、中间件、管道等高级功能。
# Scrapy Spider示例(伪代码)
import scrapy
class ExampleSpider(scrapy.Spider):
name = 'example'
start_urls = ['https://example.com']
def parse(self, response):
yield {
'title': response.css('h1::text').get(),
'url': response.url
}
1.2 环境配置与最佳实践
在开始爬虫开发前,需要配置合适的开发环境。推荐使用虚拟环境隔离依赖:
# 创建虚拟环境
python -m venv crawler_env
source crawler_env/bin/activate # Linux/Mac
# crawler_env\Scripts\activate # Windows
# 安装核心库
pip install requests beautifulsoup4 lxml scrapy selenium
最佳实践:
- 设置合理的请求头:模拟真实浏览器行为
- 添加异常处理:网络请求可能失败
- 遵守robots协议:尊重网站规则
- 控制请求频率:避免对目标网站造成过大压力
- 数据持久化:及时保存抓取结果
二、基础爬虫实战:静态网页数据抓取
2.1 案例:豆瓣电影Top250数据抓取
豆瓣电影Top250是一个经典的爬虫练习目标,页面结构清晰且包含分页。我们将使用Requests和BeautifulSoup完成数据抓取。
import requests
from bs4 import BeautifulSoup
import time
import csv
import random
class DoubanMovieCrawler:
def __init__(self):
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
self.base_url = 'https://movie.douban.com/top250'
self.data = []
def get_page(self, url):
"""获取单页内容"""
try:
response = requests.get(url, headers=self.headers, timeout=10)
if response.status_code == 200:
return response.text
else:
print(f"请求失败,状态码: {response.status_code}")
return None
except requests.RequestException as e:
print(f"请求异常: {e}")
return None
def parse_page(self, html):
"""解析页面内容"""
soup = BeautifulSoup(html, 'html.parser')
items = soup.find_all('div', class_='item')
for item in items:
# 提取电影信息
title = item.find('span', class_='title').text
rating = item.find('span', class_='rating_num').text
quote_tag = item.find('span', class_='inq')
quote = quote_tag.text if quote_tag else "暂无"
# 提取导演和主演信息
info = item.find('div', class_='bd').p.text.strip()
director_actors = info.split('\n')[0]
# 提取电影链接和图片
link = item.find('a')['href']
img = item.find('img')['src']
self.data.append({
'title': title,
'rating': rating,
'quote': quote,
'director_actors': director_actors,
'link': link,
'img': img
})
def crawl(self, max_pages=10):
"""主爬取逻辑"""
for i in range(max_pages):
url = f"{self.base_url}?start={i * 25}&filter="
print(f"正在爬取第 {i + 1} 页...")
html = self.get_page(url)
if html:
self.parse_page(html)
# 随机延迟,避免被封
time.sleep(random.uniform(1, 3))
def save_to_csv(self, filename='douban_top250.csv'):
"""保存数据到CSV"""
with open(filename, 'w', newline='', encoding='utf-8') as f:
fieldnames = ['title', 'rating', 'quote', 'director_actors', 'link', 'img']
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(self.data)
print(f"数据已保存到 {filename}")
# 使用示例
if __name__ == '__main__':
crawler = DoubanMovieCrawler()
crawler.crawl(max_pages=5) # 爬取前5页作为演示
crawler.save_to_csv()
代码解析:
- 请求头设置:使用User-Agent模拟Chrome浏览器
- 异常处理:捕获网络请求异常
- 随机延迟:使用random.uniform(1,3)实现随机间隔
- 数据结构化:将提取的数据组织成字典列表
- CSV持久化:使用Python内置csv模块保存数据
2.2 数据清洗与验证
抓取的数据往往需要进一步清洗和验证:
import pandas as pd
def clean_data(filename='douban_top250.csv'):
"""数据清洗示例"""
df = pd.read_csv(filename)
# 1. 处理缺失值
df['quote'].fillna('暂无', inplace=True)
# 2. 提取年份(假设格式为"导演: xxx / 主演: xxx / 年份")
# 实际需要根据具体格式调整
df['year'] = df['director_actors'].str.extract(r'(\d{4})')
# 3. 评分转换为数值
df['rating'] = pd.to_numeric(df['rating'])
# 4. 去重
df.drop_duplicates(subset=['title'], inplace=True)
# 5. 保存清洗后的数据
df.to_csv('douban_cleaned.csv', index=False)
return df
# 执行清洗
cleaned_df = clean_data()
print(cleaned_df.head())
三、动态网页爬取:Selenium与Playwright
3.1 动态网页爬取原理
现代网站大量使用JavaScript动态加载内容,传统Requests库无法获取这些数据。解决方案包括:
- Selenium:自动化浏览器工具,支持多种浏览器驱动
- Playwright:微软推出的现代化浏览器自动化工具,支持无头浏览器
3.2 Selenium实战案例
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
import time
class DynamicCrawler:
def __init__(self, headless=True):
# 配置Chrome选项
chrome_options = Options()
if headless:
chrome_options.add_argument('--headless') # 无头模式
chrome_options.add_argument('--disable-blink-features=AutomationControlled')
chrome_options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36')
# 初始化驱动(需要提前安装ChromeDriver)
self.driver = webdriver.Chrome(options=chrome_options)
self.wait = WebDriverWait(self.driver, 10)
def crawl_dynamic_content(self, url):
"""爬取动态加载内容"""
try:
self.driver.get(url)
# 等待特定元素加载完成
# 例如等待某个class为'content'的元素出现
content = self.wait.until(
EC.presence_of_element_located((By.CLASS_NAME, 'content'))
)
# 模拟滚动加载更多内容
self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(2) # 等待新内容加载
# 获取当前页面源码
page_source = self.driver.page_source
# 使用BeautifulSoup解析
from bs4 import BeautifulSoup
soup = BeautifulSoup(page_source, 'html.parser')
# 提取数据...
# 例如提取所有文章标题
titles = soup.find_all('h2', class_='article-title')
for title in titles:
print(title.text.strip())
except TimeoutException:
print("页面加载超时")
finally:
self.driver.quit()
# 使用示例
# crawler = DynamicCrawler(headless=True)
# crawler.crawl_dynamic_content('https://example.com/dynamic-page')
Selenium优化技巧:
- 无头模式:减少资源消耗
- 禁用图片加载:提高加载速度
- 使用代理IP:配合IP轮换
- 智能等待:使用显式等待而非固定sleep
3.3 Playwright现代化方案
Playwright相比Selenium的优势:
- 更快的执行速度
- 原生支持异步
- 更好的反检测能力
- 内置自动等待机制
import asyncio
from playwright.async_api import async_playwright
async def crawl_with_playwright():
async with async_playwright() as p:
# 启动浏览器
browser = await p.chromium.launch(headless=True)
page = await browser.new_page()
# 设置用户代理和视窗大小
await page.set_extra_http_headers({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
await page.set_viewport_size({'width': 1920, 'height': 1080})
# 访问页面
await page.goto('https://example.com')
# 等待网络空闲(所有请求完成)
await page.wait_for_load_state('networkidle')
# 执行JavaScript获取数据
data = await page.evaluate('''() => {
// 在浏览器环境中执行的代码
const items = document.querySelectorAll('.data-item');
return Array.from(items).map(item => ({
title: item.querySelector('.title').innerText,
value: item.querySelector('.value').innerText
}));
}''')
print(data)
await browser.close()
# 运行异步函数
# asyncio.run(crawl_with_playwright())
四、反爬虫机制与应对策略
4.1 常见反爬虫技术分析
网站管理员通常采用以下技术防止爬虫:
| 反爬虫技术 | 原理 | 常见表现 |
|---|---|---|
| IP限制 | 限制单个IP的访问频率 | 403 Forbidden, 429 Too Many Requests |
| User-Agent检测 | 检查请求头是否像浏览器 | 返回错误页面或验证码 |
| 验证码 | 人机验证 | 滑动验证、点选验证、文字识别 |
| 动态参数 | 请求参数加密或动态生成 | token、sign等参数 |
| JavaScript混淆 | 隐藏真实数据加载逻辑 | 加密的数据接口 |
| Cookie验证 | 检查登录状态和会话 | 要求登录或返回登录页面 |
4.2 基础反爬虫应对方案
4.2.1 请求头伪装
import requests
from fake_useragent import UserAgent
def get_random_headers():
"""生成随机请求头"""
ua = UserAgent()
return {
'User-Agent': ua.random,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
}
# 使用示例
headers = get_random_headers()
response = requests.get('https://example.com', headers=headers)
4.2.2 IP代理池
import random
import requests
class ProxyManager:
def __init__(self, proxy_list=None):
# 代理列表(格式:协议://IP:端口)
self.proxy_list = proxy_list or [
'http://123.45.67.89:8080',
'http://98.76.54.32:3128',
'https://111.222.333.44:8080',
]
self.failed_proxies = set()
def get_proxy(self):
"""获取随机代理"""
available_proxies = [p for p in self.proxy_list if p not in self.failed_proxies]
if not available_proxies:
print("所有代理都已失效")
return None
return random.choice(available_proxies)
def test_proxy(self, proxy):
"""测试代理是否可用"""
try:
test_url = 'http://httpbin.org/ip'
response = requests.get(test_url, proxies={'http': proxy, 'https': proxy}, timeout=5)
if response.status_code == 200:
print(f"代理 {proxy} 可用")
return True
except:
self.failed_proxies.add(proxy)
return False
def make_request(self, url, max_retries=3):
"""使用代理发送请求"""
for attempt in range(max_retries):
proxy = self.get_proxy()
if not proxy:
return None
try:
response = requests.get(
url,
proxies={'http': proxy, 'https': proxy},
timeout=10
)
if response.status_code == 200:
return response
else:
self.failed_proxies.add(proxy)
except Exception as e:
print(f"代理 {proxy} 失败: {e}")
self.failed_proxies.add(proxy)
return None
# 使用示例
# proxy_mgr = ProxyManager()
# response = proxy_mgr.make_request('https://example.com')
4.2.3 请求频率控制
import time
import random
from functools import wraps
def rate_limit(min_delay=1, max_delay=3):
"""装饰器:控制请求频率"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 随机延迟
delay = random.uniform(min_delay, max_delay)
time.sleep(delay)
return func(*args, **kwargs)
return wrapper
return decorator
# 使用装饰器
@rate_limit(min_delay=2, max_delay=4)
def fetch_url(url):
return requests.get(url, headers=get_random_headers())
# 批量爬取时自动遵守延迟
urls = ['https://example.com/page1', 'https://example.com/page2']
for url in urls:
response = fetch_url(url)
print(response.status_code)
4.3 高级反爬虫应对技术
4.3.1 处理验证码
验证码是爬虫的终极障碍,解决方案包括:
方案1:第三方打码平台(如超级鹰、云打码)
import requests
import base64
def solve_captcha(image_path, api_key):
"""调用打码平台API识别验证码"""
with open(image_path, 'rb') as f:
image_data = base64.b64encode(f.read()).decode()
# 超级鹰API示例
response = requests.post('http://api.chaojiying.com/Upload/Processing.php', data={
'user': 'your_username',
'pass2': api_key,
'softid': 'your_softid',
'codetype': '1004', # 验证码类型
'file_base64': image_data
})
return response.json().get('pic_str') # 返回识别结果
方案2:使用OCR技术(简单验证码)
import pytesseract
from PIL import Image
def ocr_captcha(image_path):
"""使用Tesseract OCR识别简单验证码"""
image = Image.open(image_path)
# 图像预处理:灰度化、二值化
image = image.convert('L')
threshold = 128
image = image.point(lambda p: p > threshold and 255)
# 使用Tesseract识别
text = pytesseract.image_to_string(image, config='--psm 7')
return text.strip()
方案3:人工识别(最可靠但效率低)
def manual_captcha_solver(image_path):
"""人工识别验证码"""
from PIL import Image
import matplotlib.pyplot as plt
image = Image.open(image_path)
plt.imshow(image)
plt.show()
# 等待用户输入
captcha_text = input("请输入验证码: ")
return captcha_text
4.3.2 JavaScript逆向工程
对于动态加密参数,需要分析JavaScript代码:
import execjs # 需要安装Node.js
# 示例:破解某个网站的sign参数生成逻辑
js_code = '''
function generateSign(timestamp, data) {
// 这里是逆向得到的JavaScript代码
var key = "secret_key_123";
var str = timestamp + data + key;
var sign = CryptoJS.MD5(str).toString();
return sign;
}
'''
def get_encrypted_params(data):
"""执行JavaScript获取加密参数"""
ctx = execjs.compile(js_code)
timestamp = str(int(time.time()))
sign = ctx.call('generateSign', timestamp, data)
return {
'timestamp': timestamp,
'sign': sign,
'data': data
}
# 使用示例
# params = get_encrypted_params('some_data')
# response = requests.get('https://api.example.com/data', params=params)
4.3.3 Session和Cookie管理
保持会话状态绕过简单验证:
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_session_with_retry():
"""创建带重试机制的Session"""
session = requests.Session()
# 配置重试策略
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount('http://', adapter)
session.mount('https://', adapter)
# 设置默认请求头
session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
})
return session
def maintain_session(session, login_url, credentials):
"""维护登录状态"""
# 登录获取Cookie
response = session.post(login_url, data=credentials)
if response.status_code == 200:
print("登录成功")
# 后续请求会自动携带Cookie
return True
return False
# 使用示例
# session = create_session_with_retry()
# maintain_session(session, 'https://example.com/login', {'user': 'xxx', 'pass': 'xxx'})
# response = session.get('https://example.com/protected-page')
4.3.4 分布式爬虫架构
对于大规模数据采集,需要分布式架构:
# 使用Scrapy-Redis实现分布式
# settings.py配置
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
REDIS_URL = 'redis://localhost:6379/0'
# Spider示例
from scrapy_redis.spiders import RedisSpider
class DistributedSpider(RedisSpider):
name = 'distributed_spider'
redis_key = 'spider:start_urls'
def parse(self, response):
# 解析逻辑
yield {
'url': response.url,
'title': response.css('title::text').get()
}
# 将新URL推回队列
for next_page in response.css('a::attr(href)').getall():
yield response.follow(next_page, self.parse)
五、实战综合案例:电商网站商品信息采集系统
5.1 项目需求分析
目标:采集某电商网站商品信息,包括名称、价格、评价数、店铺信息等,需应对反爬虫机制。
技术栈:
- Requests + BeautifulSoup(基础爬取)
- Playwright(动态内容)
- Redis(URL去重和任务队列)
- PostgreSQL(数据存储)
- Docker(部署)
5.2 完整代码实现
import requests
from bs4 import BeautifulSoup
import redis
import psycopg2
import json
import time
import random
from datetime import datetime
from urllib.parse import urlparse
import logging
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class EcommerceCrawler:
def __init__(self, redis_host='localhost', db_config=None):
# Redis连接(用于URL去重和任务队列)
self.redis_client = redis.Redis(host=redis_host, port=6379, db=0, decode_responses=True)
# 数据库配置
self.db_config = db_config or {
'host': 'localhost',
'database': 'crawler_db',
'user': 'postgres',
'password': 'password'
}
# 代理池
self.proxy_pool = [
'http://proxy1.example.com:8080',
'http://proxy2.example.com:8080',
]
# 请求统计
self.stats = {
'total_requests': 0,
'success_requests': 0,
'failed_requests': 0,
'start_time': datetime.now()
}
def get_proxy(self):
"""获取随机代理"""
return random.choice(self.proxy_pool) if self.proxy_pool else None
def is_duplicate(self, url):
"""URL去重"""
url_hash = hash(url)
return self.redis_client.sismember('visited_urls', url_hash)
def mark_visited(self, url):
"""标记已访问"""
url_hash = hash(url)
self.redis_client.sadd('visited_urls', url_hash)
def make_request(self, url, max_retries=3):
"""带代理和重试的请求"""
for attempt in range(max_retries):
proxy = self.get_proxy()
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
}
try:
self.stats['total_requests'] += 1
response = requests.get(
url,
headers=headers,
proxies={'http': proxy, 'https': proxy} if proxy else None,
timeout=15
)
if response.status_code == 200:
self.stats['success_requests'] += 1
logging.info(f"成功请求: {url}")
return response.text
else:
logging.warning(f"请求失败 [{response.status_code}]: {url}")
self.stats['failed_requests'] += 1
except Exception as e:
logging.error(f"请求异常 (尝试 {attempt + 1}/{max_retries}): {e}")
time.sleep(random.uniform(2, 5))
return None
def parse_product_page(self, html, url):
"""解析商品页面"""
soup = BeautifulSoup(html, 'html.parser')
try:
# 根据实际网站结构调整选择器
product = {
'url': url,
'name': soup.find('h1', class_='product-title').text.strip() if soup.find('h1', class_='product-title') else None,
'price': soup.find('span', class_='price').text.strip() if soup.find('span', class_='price') else None,
'rating': soup.find('span', class_='rating').text.strip() if soup.find('span', class_='rating') else None,
'review_count': soup.find('span', class_='review-count').text.strip() if soup.find('span', class_='review-count') else None,
'shop_name': soup.find('div', class_='shop-name').text.strip() if soup.find('div', class_='shop-name') else None,
'category': soup.find('span', class_='category').text.strip() if soup.find('span', class_='category') else None,
'crawled_at': datetime.now().isoformat(),
}
# 提取商品ID(从URL或页面)
parsed_url = urlparse(url)
product_id = parsed_url.path.split('/')[-1]
product['product_id'] = product_id
return product
except Exception as e:
logging.error(f"解析失败: {e}")
return None
def extract_links(self, html, base_url):
"""提取页面中的商品链接"""
soup = BeautifulSoup(html, 'html.parser')
links = []
# 提取所有商品详情页链接
for link in soup.find_all('a', href=True):
href = link['href']
# 过滤非商品链接
if '/product/' in href or '/item/' in href:
# 补全相对URL
if href.startswith('/'):
full_url = f"{base_url}{href}"
elif href.startswith('http'):
full_url = href
else:
continue
if not self.is_duplicate(full_url):
links.append(full_url)
return links
def save_to_db(self, product):
"""保存到PostgreSQL"""
conn = None
try:
conn = psycopg2.connect(**self.db_config)
cur = conn.cursor()
# 创建表(如果不存在)
cur.execute("""
CREATE TABLE IF NOT EXISTS products (
id SERIAL PRIMARY KEY,
product_id VARCHAR(100) UNIQUE,
name TEXT,
price VARCHAR(50),
rating VARCHAR(20),
review_count VARCHAR(20),
shop_name VARCHAR(200),
category VARCHAR(200),
url TEXT,
crawled_at TIMESTAMP
)
""")
# 插入数据(忽略重复)
cur.execute("""
INSERT INTO products
(product_id, name, price, rating, review_count, shop_name, category, url, crawled_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (product_id) DO UPDATE SET
price = EXCLUDED.price,
rating = EXCLUDED.rating,
review_count = EXCLUDED.review_count,
crawled_at = EXCLUDED.crawled_at
""", (
product['product_id'], product['name'], product['price'],
product['rating'], product['review_count'], product['shop_name'],
product['category'], product['url'], product['crawled_at']
))
conn.commit()
cur.close()
logging.info(f"数据已保存: {product['name']}")
except Exception as e:
logging.error(f"数据库错误: {e}")
finally:
if conn:
conn.close()
def crawl(self, start_urls, max_pages=100):
"""主爬取逻辑"""
# 初始化任务队列
for url in start_urls:
if not self.is_duplicate(url):
self.redis_client.lpush('crawl_queue', url)
pages_crawled = 0
while pages_crawled < max_pages and not self.redis_client.llen('crawl_queue') == 0:
# 从队列获取URL
url = self.redis_client.rpop('crawl_queue')
if not url:
break
# 检查是否已访问
if self.is_duplicate(url):
continue
self.mark_visited(url)
# 爬取页面
html = self.make_request(url)
if not html:
continue
# 解析商品信息
product = self.parse_product_page(html, url)
if product and product['name']:
self.save_to_db(product)
# 提取新链接并加入队列
new_links = self.extract_links(html, 'https://example.com')
for link in new_links:
if not self.is_duplicate(link):
self.redis_client.lpush('crawl_queue', link)
pages_crawled += 1
# 随机延迟
time.sleep(random.uniform(1, 3))
# 定期打印统计
if pages_crawled % 10 == 0:
self.print_stats()
self.print_stats()
def print_stats(self):
"""打印统计信息"""
elapsed = (datetime.now() - self.stats['start_time']).total_seconds()
logging.info(f"=== 爬取统计 ===")
logging.info(f"总请求数: {self.stats['total_requests']}")
logging.info(f"成功: {self.stats['success_requests']}")
logging.info(f"失败: {self.stats['failed_requests']}")
logging.info(f"耗时: {elapsed:.2f}秒")
logging.info(f"成功率: {self.stats['success_requests']/self.stats['total_requests']*100:.2f}%")
# 使用示例
if __name__ == '__main__':
# 配置数据库(需提前创建)
db_config = {
'host': 'localhost',
'database': 'crawler_db',
'user': 'postgres',
'password': 'your_password'
}
crawler = EcommerceCrawler(db_config=db_config)
# 起始URL(商品列表页)
start_urls = [
'https://example.com/category/electronics',
'https://example.com/category/books',
]
# 开始爬取
crawler.crawl(start_urls, max_pages=50)
5.3 Docker部署配置
# Dockerfile
FROM python:3.9-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
postgresql-client \
chromium \
chromium-driver \
&& rm -rf /var/lib/apt/lists/*
# 安装Python依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制代码
COPY . .
# 设置环境变量
ENV PYTHONUNBUFFERED=1
CMD ["python", "crawler.py"]
# docker-compose.yml
version: '3.8'
services:
postgres:
image: postgres:13
environment:
POSTGRES_DB: crawler_db
POSTGRES_USER: postgres
POSTGRES_PASSWORD: password
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
redis:
image: redis:6-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
crawler:
build: .
depends_on:
- postgres
- redis
environment:
- REDIS_HOST=redis
- DB_HOST=postgres
volumes:
- ./logs:/app/logs
volumes:
postgres_data:
redis_data:
六、性能优化与监控
6.1 异步爬虫架构
使用aiohttp实现异步请求,大幅提升效率:
import aiohttp
import asyncio
from bs4 import BeautifulSoup
import asyncpg # 异步PostgreSQL驱动
class AsyncCrawler:
def __init__(self, concurrency=10):
self.concurrency = concurrency # 并发数
self.semaphore = asyncio.Semaphore(concurrency)
self.session = None
async def fetch(self, session, url):
"""异步获取单个URL"""
async with self.semaphore: # 控制并发
try:
async with session.get(url, timeout=30) as response:
if response.status == 200:
return await response.text()
else:
logging.error(f"HTTP {response.status}: {url}")
return None
except Exception as e:
logging.error(f"请求失败 {url}: {e}")
return None
async def parse_and_save(self, html, url):
"""解析并保存数据(异步)"""
if not html:
return
soup = BeautifulSoup(html, 'html.parser')
# 解析逻辑...
# 异步保存到数据库
await self.save_to_db_async({
'url': url,
'title': soup.find('title').text if soup.find('title') else '',
'crawled_at': datetime.now()
})
async def save_to_db_async(self, data):
"""异步数据库保存"""
conn = await asyncpg.connect(**self.db_config)
try:
await conn.execute("""
INSERT INTO crawled_data (url, title, crawled_at)
VALUES ($1, $2, $3)
""", data['url'], data['title'], data['crawled_at'])
finally:
await conn.close()
async def crawl_batch(self, urls):
"""批量爬取"""
async with aiohttp.ClientSession() as session:
self.session = session
tasks = [self.fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理结果
for url, html in zip(urls, results):
if isinstance(html, Exception):
logging.error(f"异常: {html}")
continue
await self.parse_and_save(html, url)
def run(self, url_list):
"""运行异步爬虫"""
asyncio.run(self.crawl_batch(url_list))
# 使用示例
# crawler = AsyncCrawler(concurrency=20)
# crawler.run(['https://example.com/page1', 'https://example.com/page2'])
6.2 爬虫监控与日志
import logging
from logging.handlers import RotatingFileHandler
import psutil
import json
class CrawlerMonitor:
def __init__(self, log_file='crawler.log'):
# 配置日志
self.logger = logging.getLogger('CrawlerMonitor')
self.logger.setLevel(logging.INFO)
# 文件日志(轮转)
file_handler = RotatingFileHandler(
log_file, maxBytes=10*1024*1024, backupCount=5
)
file_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
self.logger.addHandler(file_handler)
# 控制台日志
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
))
self.logger.addHandler(console_handler)
# 性能监控
self.metrics = {
'requests_per_minute': 0,
'error_rate': 0,
'memory_usage': 0,
'cpu_usage': 0,
'queue_size': 0
}
def log_request(self, url, status, response_time):
"""记录请求日志"""
self.logger.info(f"URL: {url} | Status: {status} | Time: {response_time:.2f}s")
def update_metrics(self, **kwargs):
"""更新监控指标"""
self.metrics.update(kwargs)
self.logger.info(f"Metrics: {json.dumps(self.metrics, indent=2)}")
def check_system_resources(self):
"""检查系统资源"""
process = psutil.Process()
self.metrics['memory_usage'] = process.memory_info().rss / 1024 / 1024 # MB
self.metrics['cpu_usage'] = process.cpu_percent()
self.logger.warning(f"资源警告: Memory={self.metrics['memory_usage']:.2f}MB, CPU={self.metrics['cpu_usage']:.2f}%")
# 使用示例
# monitor = CrawlerMonitor()
# monitor.log_request('https://example.com', 200, 1.5)
# monitor.check_system_resources()
七、法律与道德考量
7.1 遵守robots协议
import requests
from urllib.parse import urlparse
def check_robots_txt(base_url):
"""检查robots.txt"""
parsed = urlparse(base_url)
robots_url = f"{parsed.scheme}://{parsed.netloc}/robots.txt"
try:
response = requests.get(robots_url, timeout=5)
if response.status_code == 200:
print("Robots.txt内容:")
print(response.text)
# 简单解析(实际应使用robotparser模块)
lines = response.text.split('\n')
for line in lines:
if 'Disallow:' in line and '/admin/' in line:
print("警告: 禁止爬取/admin/路径")
return False
return True
else:
print("未找到robots.txt")
return True
except:
print("无法检查robots.txt")
return True
# 使用示例
# check_robots_txt('https://example.com')
7.2 法律合规要点
- 尊重版权:不要爬取受版权保护的内容用于商业用途
- 个人信息保护:避免爬取个人隐私信息
- 服务条款:遵守网站的服务条款
- 合理使用:控制爬取频率,避免影响网站正常运行
- 数据用途:明确数据用途,避免用于不正当竞争
7.3 爬虫伦理准则
- 透明性:在爬虫中标识自己的身份和联系方式
- 可追溯性:记录爬取日志,便于事后审计
- 最小化原则:只爬取必要的数据
- 及时响应:收到网站管理员投诉后及时停止爬取
八、总结与最佳实践
8.1 技术要点回顾
- 基础技术:Requests + BeautifulSoup 适合静态页面
- 动态内容:Selenium/Playwright 处理JavaScript渲染
- 反爬虫应对:代理池、请求头伪装、频率控制、Session管理
- 高级策略:验证码识别、JS逆向、分布式架构
- 性能优化:异步爬虫、连接池、缓存策略
8.2 性能对比
| 方案 | 速度 | 资源消耗 | 反爬虫能力 | 适用场景 |
|---|---|---|---|---|
| Requests+BS4 | 快 | 低 | 弱 | 静态页面,小规模 |
| Selenium | 慢 | 高 | 中 | 动态页面,中等规模 |
| Playwright | 中 | 中 | 强 | 动态页面,中等规模 |
| 异步爬虫 | 极快 | 中 | 中 | 大规模数据采集 |
| 分布式爬虫 | 极快 | 高 | 强 | 企业级应用 |
8.3 推荐技术栈选择
- 个人学习/小规模:Requests + BeautifulSoup
- 动态页面/中等规模:Playwright + 异步爬虫
- 企业级/大规模:Scrapy + Scrapy-Redis + 分布式部署
- 反爬虫严重:Playwright + 代理池 + 验证码识别
8.4 持续学习建议
- 关注技术更新:浏览器自动化工具、反爬虫技术
- 学习JavaScript:理解前端加密和动态加载原理
- 研究法律案例:了解爬虫相关的法律边界
- 参与开源社区:Scrapy、Playwright等项目
- 实践多样化:尝试不同类型的网站和反爬虫机制
通过本文的系统学习,您应该能够独立开发中等复杂度的爬虫系统,并具备应对常见反爬虫机制的能力。记住,技术是中立的,但使用技术的人必须遵守法律和道德规范。在实际项目中,始终将合规性放在首位。
