引言:为什么需要异步编程?
在现代软件开发中,系统响应速度和资源利用率是衡量应用性能的关键指标。传统的同步编程模型(Synchronous Programming)在处理I/O密集型任务(如网络请求、文件读写、数据库查询)时,会因为等待操作完成而阻塞整个线程,导致CPU资源浪费,系统响应变慢。
异步编程(Asynchronous Programming) 通过允许程序在等待某个操作完成时继续执行其他任务,有效解决了这一问题。它特别适合处理高并发、I/O密集型的应用场景,如Web服务器、实时通信系统、数据处理管道等。
本文将深入探讨异步编程的核心概念、主流实现方法、最佳实践,并通过详细的代码示例展示如何在不同编程语言中高效实现异步处理。
一、异步编程的核心概念
1.1 同步 vs 异步
同步(Synchronous):代码按顺序执行,每个操作必须等待前一个操作完成才能继续。例如:
console.log("开始"); // 假设这是一个耗时的网络请求 const data = fetchDataSync(); // 阻塞直到数据返回 console.log("数据:", data); console.log("结束");异步(Asynchronous):代码可以同时执行多个操作,无需等待前一个操作完成。例如:
console.log("开始"); fetchDataAsync().then(data => { console.log("数据:", data); }); console.log("结束"); // 这行会立即执行,不等待数据返回
1.2 阻塞 vs 非阻塞
- 阻塞(Blocking):调用一个函数时,调用者会被挂起,直到函数返回结果。
- 非阻塞(Non-blocking):调用一个函数时,调用者可以继续执行,结果通过回调、事件或通知机制返回。
1.3 并发 vs 并行
- 并发(Concurrency):多个任务在重叠的时间段内交替执行,可能使用单个CPU核心。
- 并行(Parallelism):多个任务在同一时刻同时执行,需要多核CPU支持。
异步编程主要实现的是并发,但可以通过多线程/多进程实现并行。
二、异步编程的实现模型
2.1 回调函数(Callback)
回调函数是最早的异步编程模式,通过将函数作为参数传递给另一个函数,在操作完成后调用该函数。
优点:简单直接,广泛支持。 缺点:容易导致“回调地狱”(Callback Hell),代码可读性差,错误处理复杂。
示例(Node.js):
// 传统回调方式
function fetchData(callback) {
setTimeout(() => {
const data = "模拟数据";
callback(null, data);
}, 1000);
}
fetchData((err, data) => {
if (err) {
console.error("错误:", err);
} else {
console.log("数据:", data);
}
});
2.2 Promise
Promise是ES6引入的异步编程模型,代表一个异步操作的最终完成(或失败)及其结果值。它有三种状态:pending(进行中)、fulfilled(已成功)、rejected(已失败)。
优点:链式调用,避免回调地狱;更好的错误处理。
缺点:仍需使用.then()和.catch(),代码嵌套可能仍然存在。
示例(JavaScript):
function fetchData() {
return new Promise((resolve, reject) => {
setTimeout(() => {
const data = "模拟数据";
resolve(data);
}, 1000);
});
}
fetchData()
.then(data => {
console.log("数据:", data);
return fetchData(); // 链式调用
})
.then(data => {
console.log("第二个数据:", data);
})
.catch(err => {
console.error("错误:", err);
});
2.3 Async/Await
Async/Await是基于Promise的语法糖,使异步代码看起来像同步代码,极大提高了可读性。
优点:代码简洁,可读性高;错误处理使用try/catch,更直观。 缺点:需要理解Promise的基础。
示例(JavaScript):
async function main() {
try {
const data1 = await fetchData();
console.log("数据1:", data1);
const data2 = await fetchData();
console.log("数据2:", data2);
// 并行执行多个异步操作
const [data3, data4] = await Promise.all([
fetchData(),
fetchData()
]);
console.log("数据3和4:", data3, data4);
} catch (err) {
console.error("错误:", err);
}
}
main();
2.4 协程(Coroutine)
协程是比线程更轻量级的并发单元,可以在单线程内实现并发。Python的asyncio、Go的goroutine都是协程的实现。
优点:极高的并发能力,低内存开销。 缺点:需要特定语言支持,学习曲线较陡。
示例(Python asyncio):
import asyncio
import aiohttp
async def fetch_data(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
"https://example.com/api1",
"https://example.com/api2",
"https://example.com/api3"
]
async with aiohttp.ClientSession() as session:
# 并发执行多个请求
tasks = [fetch_data(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for i, result in enumerate(results):
print(f"URL {i+1} 数据: {result[:100]}...")
# 运行主协程
asyncio.run(main())
2.5 事件循环(Event Loop)
事件循环是异步编程的核心机制,负责监听和分发事件,管理异步任务的执行。Node.js、Python asyncio、浏览器JavaScript都使用事件循环。
工作原理:
- 维护一个任务队列
- 循环检查是否有任务需要执行
- 执行任务,遇到I/O操作时将其挂起并注册回调
- I/O操作完成后,将回调放入队列等待执行
示例(Node.js事件循环):
const fs = require('fs');
// 异步读取文件,事件循环会处理I/O操作
fs.readFile('example.txt', 'utf8', (err, data) => {
if (err) throw err;
console.log(data);
});
// 主线程继续执行,不会阻塞
console.log("文件读取操作已提交,主线程继续执行");
三、不同编程语言的异步实现
3.1 JavaScript (Node.js)
Node.js基于V8引擎和libuv库,提供了强大的异步I/O能力。
使用async/await处理HTTP请求:
const axios = require('axios');
async function fetchMultipleAPIs() {
try {
// 并行请求多个API
const [users, posts, comments] = await Promise.all([
axios.get('https://jsonplaceholder.typicode.com/users'),
axios.get('https://jsonplaceholder.typicode.com/posts'),
axios.get('https://jsonplaceholder.typicode.com/comments')
]);
console.log(`获取到 ${users.data.length} 个用户`);
console.log(`获取到 ${posts.data.length} 篇文章`);
console.log(`获取到 ${comments.data.length} 条评论`);
// 顺序处理
const user = users.data[0];
const userPosts = posts.data.filter(post => post.userId === user.id);
console.log(`用户 ${user.name} 有 ${userPosts.length} 篇文章`);
} catch (error) {
console.error('请求失败:', error.message);
}
}
fetchMultipleAPIs();
使用Worker Threads处理CPU密集型任务:
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
if (isMainThread) {
// 主线程
const worker = new Worker(__filename, {
workerData: { number: 1000000 }
});
worker.on('message', result => {
console.log(`计算结果: ${result}`);
});
worker.on('error', err => {
console.error('Worker错误:', err);
});
} else {
// Worker线程
const { number } = workerData;
// 模拟CPU密集型计算
let sum = 0;
for (let i = 1; i <= number; i++) {
sum += i;
}
parentPort.postMessage(sum);
}
3.2 Python
Python通过asyncio库和async/await语法支持异步编程。
使用asyncio处理网络请求:
import asyncio
import aiohttp
import time
async def fetch_url(session, url):
"""异步获取URL内容"""
try:
async with session.get(url, timeout=10) as response:
if response.status == 200:
text = await response.text()
return url, len(text), None
else:
return url, 0, f"HTTP {response.status}"
except Exception as e:
return url, 0, str(e)
async def main():
urls = [
"https://www.google.com",
"https://www.github.com",
"https://www.stackoverflow.com",
"https://www.python.org",
"https://www.wikipedia.org"
]
start_time = time.time()
async with aiohttp.ClientSession() as session:
# 创建任务列表
tasks = [fetch_url(session, url) for url in urls]
# 并发执行所有任务
results = await asyncio.gather(*tasks)
# 处理结果
for url, length, error in results:
if error:
print(f"❌ {url}: {error}")
else:
print(f"✅ {url}: {length} 字节")
end_time = time.time()
print(f"\n总耗时: {end_time - start_time:.2f} 秒")
# 运行主协程
if __name__ == "__main__":
asyncio.run(main())
使用多进程处理CPU密集型任务:
import multiprocessing
import time
import math
def cpu_intensive_task(n):
"""模拟CPU密集型计算"""
result = 0
for i in range(1, n + 1):
result += math.sqrt(i)
return result
def main():
tasks = [1000000, 1000000, 1000000, 1000000] # 4个任务
start_time = time.time()
# 使用进程池并行执行
with multiprocessing.Pool(processes=4) as pool:
results = pool.map(cpu_intensive_task, tasks)
end_time = time.time()
print(f"结果: {results}")
print(f"总耗时: {end_time - start_time:.2f} 秒")
if __name__ == "__main__":
main()
3.3 Go语言
Go语言的goroutine和channel是原生支持的并发模型。
使用goroutine和channel:
package main
import (
"fmt"
"net/http"
"sync"
"time"
)
// 使用WaitGroup等待所有goroutine完成
var wg sync.WaitGroup
func fetchURL(url string, resultChan chan<- string) {
defer wg.Done()
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Get(url)
if err != nil {
resultChan <- fmt.Sprintf("Error fetching %s: %v", url, err)
return
}
defer resp.Body.Close()
resultChan <- fmt.Sprintf("✅ %s: Status %d", url, resp.StatusCode)
}
func main() {
urls := []string{
"https://www.google.com",
"https://www.github.com",
"https://www.stackoverflow.com",
"https://www.python.org",
}
resultChan := make(chan string, len(urls))
start := time.Now()
// 启动goroutine
for _, url := range urls {
wg.Add(1)
go fetchURL(url, resultChan)
}
// 等待所有goroutine完成
wg.Wait()
close(resultChan)
// 收集结果
for result := range resultChan {
fmt.Println(result)
}
fmt.Printf("\n总耗时: %v\n", time.Since(start))
}
使用context处理超时和取消:
package main
import (
"context"
"fmt"
"time"
)
func longRunningTask(ctx context.Context, name string) error {
select {
case <-time.After(5 * time.Second):
fmt.Printf("任务 %s 完成\n", name)
return nil
case <-ctx.Done():
fmt.Printf("任务 %s 被取消: %v\n", name, ctx.Err())
return ctx.Err()
}
}
func main() {
// 创建带超时的context
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
// 启动任务
err := longRunningTask(ctx, "数据处理")
if err != nil {
fmt.Printf("任务失败: %v\n", err)
}
}
3.4 Java
Java通过CompletableFuture和ExecutorService支持异步编程。
使用CompletableFuture:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.List;
import java.util.ArrayList;
public class AsyncExample {
// 模拟异步获取数据
public static CompletableFuture<String> fetchDataAsync(String source) {
return CompletableFuture.supplyAsync(() -> {
try {
// 模拟网络延迟
TimeUnit.SECONDS.sleep(1);
return "数据来自: " + source;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
// 并行执行多个异步任务
CompletableFuture<String> future1 = fetchDataAsync("API-A");
CompletableFuture<String> future2 = fetchDataAsync("API-B");
CompletableFuture<String> future3 = fetchDataAsync("API-C");
// 等待所有任务完成
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
future1, future2, future3
);
allFutures.get(); // 阻塞直到所有任务完成
// 获取结果
System.out.println("结果1: " + future1.get());
System.out.println("结果2: " + future2.get());
System.out.println("结果3: " + future3.get());
long end = System.currentTimeMillis();
System.out.println("总耗时: " + (end - start) + "ms");
// 链式调用示例
CompletableFuture.supplyAsync(() -> "初始数据")
.thenApply(data -> data + " - 处理1")
.thenApply(data -> data + " - 处理2")
.thenAccept(result -> System.out.println("最终结果: " + result));
}
}
使用ExecutorService:
import java.util.concurrent.*;
import java.util.List;
import java.util.ArrayList;
public class ExecutorServiceExample {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
List<Callable<String>> tasks = new ArrayList<>();
// 创建任务
for (int i = 1; i <= 5; i++) {
final int taskId = i;
tasks.add(() -> {
Thread.sleep(1000); // 模拟耗时操作
return "任务 " + taskId + " 完成";
});
}
long start = System.currentTimeMillis();
// 执行所有任务
List<Future<String>> futures = executor.invokeAll(tasks);
// 获取结果
for (Future<String> future : futures) {
System.out.println(future.get());
}
long end = System.currentTimeMillis();
System.out.println("总耗时: " + (end - start) + "ms");
// 关闭线程池
executor.shutdown();
}
}
四、异步编程的最佳实践
4.1 错误处理
异步编程中的错误处理比同步代码更复杂,需要特别注意。
JavaScript错误处理:
// 使用try/catch处理async/await
async function safeFetch() {
try {
const response = await fetch('https://api.example.com/data');
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const data = await response.json();
return data;
} catch (error) {
console.error('请求失败:', error);
// 可以返回默认值或重试
return { error: error.message };
}
}
// 使用Promise.allSettled处理多个异步操作
async function fetchMultipleWithErrors() {
const promises = [
fetch('https://api1.example.com'),
fetch('https://api2.example.com'),
fetch('https://api3.example.com')
];
const results = await Promise.allSettled(promises);
results.forEach((result, index) => {
if (result.status === 'fulfilled') {
console.log(`API ${index + 1} 成功:`, result.value.status);
} else {
console.error(`API ${index + 1} 失败:`, result.reason);
}
});
}
Python错误处理:
import asyncio
import aiohttp
async def safe_fetch(session, url):
try:
async with session.get(url, timeout=10) as response:
response.raise_for_status() # 抛出HTTP错误
return await response.text()
except aiohttp.ClientError as e:
print(f"请求 {url} 失败: {e}")
return None
except asyncio.TimeoutError:
print(f"请求 {url} 超时")
return None
async def main():
urls = ["https://example.com", "https://invalid-url.com"]
async with aiohttp.ClientSession() as session:
tasks = [safe_fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
for url, result in zip(urls, results):
if isinstance(result, Exception):
print(f"{url}: 异常 - {result}")
elif result is None:
print(f"{url}: 失败")
else:
print(f"{url}: 成功 ({len(result)} 字节)")
4.2 超时控制
为异步操作设置超时,防止无限期等待。
JavaScript超时控制:
// 使用Promise.race实现超时
function withTimeout(promise, timeoutMs) {
const timeoutPromise = new Promise((_, reject) => {
setTimeout(() => reject(new Error(`操作超时 (${timeoutMs}ms)`)), timeoutMs);
});
return Promise.race([promise, timeoutPromise]);
}
// 使用示例
async function fetchWithTimeout() {
try {
const fetchPromise = fetch('https://api.example.com/data');
const result = await withTimeout(fetchPromise, 5000); // 5秒超时
const data = await result.json();
console.log('数据:', data);
} catch (error) {
console.error('错误:', error.message);
}
}
Python超时控制:
import asyncio
async def fetch_with_timeout(url, timeout=10):
try:
# 使用asyncio.wait_for设置超时
result = await asyncio.wait_for(
fetch_url(url),
timeout=timeout
)
return result
except asyncio.TimeoutError:
print(f"请求 {url} 超时")
return None
async def main():
# 设置全局超时
try:
result = await asyncio.wait_for(
fetch_with_timeout("https://example.com", timeout=5),
timeout=10 # 外层超时
)
except asyncio.TimeoutError:
print("整体操作超时")
4.3 资源管理
异步编程中需要特别注意资源的释放,避免内存泄漏。
JavaScript资源管理:
// 使用async/await和try/finally确保资源释放
async function processFile(filename) {
let fileHandle;
try {
fileHandle = await fs.promises.open(filename, 'r');
const buffer = Buffer.alloc(1024);
const { bytesRead } = await fileHandle.read(buffer, 0, 1024, 0);
console.log(`读取了 ${bytesRead} 字节`);
return buffer.toString('utf8', 0, bytesRead);
} finally {
if (fileHandle) {
await fileHandle.close();
console.log('文件句柄已释放');
}
}
}
// 使用async/await处理数据库连接
async function queryDatabase(sql, params) {
const pool = await getDatabasePool();
let connection;
try {
connection = await pool.getConnection();
const [rows] = await connection.query(sql, params);
return rows;
} finally {
if (connection) {
connection.release(); // 释放连接回连接池
}
}
}
Python资源管理:
import asyncio
import aiohttp
async def fetch_with_session(url):
# 使用async with自动管理资源
async with aiohttp.ClientSession() as session:
try:
async with session.get(url) as response:
return await response.text()
except Exception as e:
print(f"请求失败: {e}")
return None
async def main():
# 批量处理,自动管理会话
urls = ["https://example.com"] * 10
# 使用单个会话处理多个请求(推荐)
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_session(url) for url in urls]
results = await asyncio.gather(*tasks)
for i, result in enumerate(results):
if result:
print(f"URL {i+1}: 成功 ({len(result)} 字节)")
4.4 并发控制
限制并发数量,避免系统过载。
JavaScript并发控制:
// 限制并发数量的异步队列
class AsyncQueue {
constructor(maxConcurrency = 5) {
this.maxConcurrency = maxConcurrency;
this.running = 0;
this.queue = [];
}
async add(task) {
return new Promise((resolve, reject) => {
this.queue.push({ task, resolve, reject });
this.process();
});
}
async process() {
if (this.running >= this.maxConcurrency || this.queue.length === 0) {
return;
}
this.running++;
const { task, resolve, reject } = this.queue.shift();
try {
const result = await task();
resolve(result);
} catch (error) {
reject(error);
} finally {
this.running--;
this.process(); // 处理下一个任务
}
}
}
// 使用示例
async function limitedConcurrency() {
const queue = new AsyncQueue(3); // 最多3个并发
const tasks = Array.from({ length: 10 }, (_, i) =>
() => new Promise(resolve => {
console.log(`任务 ${i} 开始`);
setTimeout(() => {
console.log(`任务 ${i} 完成`);
resolve(`结果 ${i}`);
}, Math.random() * 1000);
})
);
const results = await Promise.all(tasks.map(task => queue.add(task)));
console.log('所有任务完成:', results);
}
Python并发控制:
import asyncio
import aiohttp
from asyncio import Semaphore
async def fetch_with_semaphore(session, url, semaphore):
async with semaphore: # 限制并发数量
try:
async with session.get(url, timeout=10) as response:
return await response.text()
except Exception as e:
print(f"请求 {url} 失败: {e}")
return None
async def main():
urls = [f"https://example.com/api/{i}" for i in range(20)]
# 创建信号量,限制最多5个并发
semaphore = Semaphore(5)
async with aiohttp.ClientSession() as session:
tasks = [
fetch_with_semaphore(session, url, semaphore)
for url in urls
]
results = await asyncio.gather(*tasks)
successful = [r for r in results if r is not None]
print(f"成功获取 {len(successful)} 个结果")
五、性能优化技巧
5.1 批量处理
将多个小操作合并为一个批量操作,减少系统调用开销。
示例(数据库批量插入):
// 低效:逐条插入
async function insertOneByOne(data) {
for (const item of data) {
await db.insert('users', item); // 每次都等待
}
}
// 高效:批量插入
async function insertBatch(data, batchSize = 100) {
for (let i = 0; i < data.length; i += batchSize) {
const batch = data.slice(i, i + batchSize);
await db.insertBatch('users', batch); // 一次插入多条
}
}
5.2 缓存策略
缓存频繁访问的数据,减少重复计算。
示例(内存缓存):
class AsyncCache {
constructor() {
this.cache = new Map();
this.pending = new Map(); // 防止重复请求
}
async get(key, fetchFn) {
// 检查缓存
if (this.cache.has(key)) {
return this.cache.get(key);
}
// 检查是否已有相同请求在进行
if (this.pending.has(key)) {
return this.pending.get(key);
}
// 创建新请求
const promise = fetchFn()
.then(result => {
this.cache.set(key, result);
this.pending.delete(key);
return result;
})
.catch(error => {
this.pending.delete(key);
throw error;
});
this.pending.set(key, promise);
return promise;
}
}
// 使用示例
const cache = new AsyncCache();
async function getUser(id) {
return cache.get(`user:${id}`, async () => {
console.log(`从数据库获取用户 ${id}`);
// 模拟数据库查询
await new Promise(resolve => setTimeout(resolve, 100));
return { id, name: `用户${id}` };
});
}
5.3 连接池
对于数据库、HTTP客户端等资源,使用连接池减少连接开销。
示例(数据库连接池):
const mysql = require('mysql2/promise');
// 创建连接池
const pool = mysql.createPool({
host: 'localhost',
user: 'root',
password: 'password',
database: 'mydb',
waitForConnections: true,
connectionLimit: 10, // 最大连接数
queueLimit: 0
});
async function queryWithPool(sql, params) {
// 从连接池获取连接
const connection = await pool.getConnection();
try {
const [rows] = await connection.query(sql, params);
return rows;
} finally {
// 释放连接回连接池
connection.release();
}
}
六、常见陷阱与解决方案
6.1 忘记await
问题:忘记使用await会导致Promise未被正确处理。
// 错误示例
async function badExample() {
const promise = fetchData(); // 忘记await
console.log(promise); // 输出Promise对象,不是数据
}
// 正确示例
async function goodExample() {
const data = await fetchData(); // 正确使用await
console.log(data); // 输出实际数据
}
6.2 循环中的异步操作
问题:在循环中直接使用异步操作可能导致意外行为。
// 错误:循环会立即完成,不会等待
async function badLoop() {
for (let i = 0; i < 5; i++) {
await fetchData(i); // 每次都会等待
}
console.log('循环完成'); // 会等待所有迭代完成
}
// 更高效:并行执行
async function goodLoop() {
const promises = [];
for (let i = 0; i < 5; i++) {
promises.push(fetchData(i));
}
await Promise.all(promises);
console.log('所有请求完成');
}
6.3 死锁
问题:在异步编程中,如果两个任务互相等待对方完成,可能导致死锁。
# Python死锁示例
import asyncio
async def task1():
print("任务1开始")
await asyncio.sleep(1)
print("任务1等待任务2完成")
# 这里会等待task2完成,但task2也在等待task1
await task2()
print("任务1完成")
async def task2():
print("任务2开始")
await asyncio.sleep(1)
print("任务2等待任务1完成")
# 这里会等待task1完成,但task1也在等待task2
await task1()
print("任务2完成")
# 解决方案:使用超时或避免循环依赖
async def safe_task1():
print("安全任务1开始")
await asyncio.sleep(1)
print("安全任务1完成")
async def safe_task2():
print("安全任务2开始")
await asyncio.sleep(1)
print("安全任务2完成")
七、实际应用场景
7.1 Web服务器
Node.js Express异步中间件:
const express = require('express');
const app = express();
// 异步中间件
app.use(async (req, res, next) => {
try {
// 异步日志记录
await logRequest(req);
next();
} catch (error) {
next(error);
}
});
// 异步路由处理
app.get('/api/users/:id', async (req, res) => {
try {
const userId = req.params.id;
// 并行获取用户信息和相关数据
const [user, posts, comments] = await Promise.all([
db.users.findById(userId),
db.posts.findByUserId(userId),
db.comments.findByUserId(userId)
]);
res.json({
user,
posts,
comments,
total: posts.length + comments.length
});
} catch (error) {
res.status(500).json({ error: error.message });
}
});
// 批量处理端点
app.post('/api/batch', async (req, res) => {
const { items } = req.body;
// 使用队列限制并发
const queue = new AsyncQueue(5);
const results = await Promise.all(
items.map(item => queue.add(() => processItem(item)))
);
res.json({ results });
});
app.listen(3000, () => {
console.log('服务器运行在端口3000');
});
7.2 数据处理管道
Python异步数据处理:
import asyncio
import aiofiles
import json
from typing import List, Dict
class AsyncDataProcessor:
def __init__(self, max_concurrent=10):
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)
async def process_file(self, filepath: str) -> Dict:
"""异步处理单个文件"""
async with self.semaphore:
try:
async with aiofiles.open(filepath, 'r', encoding='utf-8') as f:
content = await f.read()
data = json.loads(content)
# 模拟数据处理
processed = {
'filename': filepath,
'size': len(content),
'records': len(data.get('items', [])),
'timestamp': asyncio.get_event_loop().time()
}
return processed
except Exception as e:
return {'filename': filepath, 'error': str(e)}
async def process_batch(self, filepaths: List[str]) -> List[Dict]:
"""批量处理文件"""
tasks = [self.process_file(filepath) for filepath in filepaths]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 过滤异常结果
valid_results = []
for result in results:
if isinstance(result, Exception):
print(f"处理异常: {result}")
else:
valid_results.append(result)
return valid_results
async def main():
# 模拟文件列表
filepaths = [f"data/file_{i}.json" for i in range(100)]
processor = AsyncDataProcessor(max_concurrent=10)
start = asyncio.get_event_loop().time()
results = await processor.process_batch(filepaths)
end = asyncio.get_event_loop().time()
print(f"处理了 {len(results)} 个文件")
print(f"总耗时: {end - start:.2f} 秒")
# 统计
successful = [r for r in results if 'error' not in r]
failed = [r for r in results if 'error' in r]
print(f"成功: {len(successful)}, 失败: {len(failed)}")
if __name__ == "__main__":
asyncio.run(main())
7.3 实时通信
Go语言WebSocket服务器:
package main
import (
"fmt"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
)
type Client struct {
conn *websocket.Conn
send chan []byte
}
type Hub struct {
clients map[*Client]bool
broadcast chan []byte
register chan *Client
unregister chan *Client
mutex sync.RWMutex
}
func NewHub() *Hub {
return &Hub{
clients: make(map[*Client]bool),
broadcast: make(chan []byte),
register: make(chan *Client),
unregister: make(chan *Client),
}
}
func (h *Hub) Run() {
for {
select {
case client := <-h.register:
h.mutex.Lock()
h.clients[client] = true
h.mutex.Unlock()
fmt.Println("客户端连接:", len(h.clients))
case client := <-h.unregister:
h.mutex.Lock()
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
close(client.send)
}
h.mutex.Unlock()
fmt.Println("客户端断开:", len(h.clients))
case message := <-h.broadcast:
h.mutex.RLock()
for client := range h.clients {
select {
case client.send <- message:
default:
close(client.send)
delete(h.clients, client)
}
}
h.mutex.RUnlock()
}
}
}
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
fmt.Println("升级失败:", err)
return
}
client := &Client{
conn: conn,
send: make(chan []byte, 256),
}
hub.register <- client
// 启动读写goroutine
go client.writePump(hub)
go client.readPump(hub)
}
func (c *Client) readPump(hub *Hub) {
defer func() {
hub.unregister <- c
c.conn.Close()
}()
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
break
}
hub.broadcast <- message
}
}
func (c *Client) writePump(hub *Hub) {
defer func() {
c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
if !ok {
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
c.conn.WriteMessage(websocket.TextMessage, message)
}
}
}
func main() {
hub := NewHub()
go hub.Run()
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
serveWs(hub, w, r)
})
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
http.ServeFile(w, r, "index.html")
})
fmt.Println("WebSocket服务器运行在 :8080")
http.ListenAndServe(":8080", nil)
}
八、总结
异步编程是现代软件开发中不可或缺的技能,它能显著提升系统响应速度和资源利用率。通过合理使用回调、Promise、async/await、协程等技术,可以高效处理并发任务,避免阻塞。
关键要点:
- 选择合适的异步模型:根据语言和场景选择回调、Promise、async/await或协程
- 正确处理错误:使用try/catch、Promise.allSettled等机制
- 控制并发:使用信号量、队列等限制并发数量
- 资源管理:确保异步操作中的资源正确释放
- 性能优化:批量处理、缓存、连接池等技巧
- 避免陷阱:注意循环中的异步操作、死锁等问题
学习建议:
- 从简单的异步示例开始,逐步理解事件循环机制
- 在实际项目中应用异步编程,解决真实问题
- 阅读官方文档和优秀开源项目的异步实现
- 使用性能分析工具监控异步代码的执行情况
异步编程虽然有一定学习曲线,但掌握后能极大提升代码质量和系统性能。随着云原生、微服务架构的普及,异步编程的重要性将日益凸显。
