引言:探索优化的核心挑战

探索优化(Exploration Optimization)是在未知或部分已知领域中寻找最优解的过程,这在机器学习、工程设计、商业决策和科学研究中都至关重要。与传统的优化问题不同,探索优化面临着信息不完整评估成本高昂局部最优陷阱等独特挑战。

想象一下,你正在调试一个复杂的分布式系统,或者在设计一个新的药物分子结构,又或者在为电商平台优化推荐算法。在这些场景中,你面对的是一个”黑盒”函数——你知道输入和输出的关系,但无法直接获得梯度信息,每次评估都可能需要数小时甚至数天。这就是探索优化要解决的核心问题。

一、理解探索优化的基本框架

1.1 探索与利用的权衡

探索优化的核心在于探索(Exploration)利用(Exploitation)之间的平衡:

  • 利用:在已知表现良好的区域进行更精细的搜索
  • 探索:尝试新的、未知的区域以发现可能更好的解

这种权衡可以用一个直观的例子来说明:假设你在寻找一座山的最高峰。

  • 纯粹的利用会让你沿着当前最陡的路径向上爬,但可能错过更高的山峰
  • 纯粹的探索会让你随机在各处跳跃,但很难找到真正的顶峰

1.2 探索优化的数学模型

在数学上,探索优化问题可以形式化为:

max f(x)  s.t. x ∈ X

其中:

  • f(x) 是目标函数(可能有噪声、非凸、不连续)
  • X 是搜索空间(可能是高维、离散或混合的)
  • 评估成本 C(f(x)) 可能很高

二、经典探索优化算法详解

2.1 贝叶斯优化(Bayesian Optimization)

贝叶斯优化是处理昂贵评估函数的黄金标准。它通过构建目标函数的概率模型来指导搜索。

2.1.1 高斯过程(Gaussian Process)

高斯过程是贝叶斯优化的核心,它假设函数值服从多元正态分布:

import numpy as np
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF, ConstantKernel as C

# 定义高斯过程模型
def create_gp_model(X, y):
    """
    创建高斯过程回归模型
    X: 训练输入 (n_samples, n_features)
    y: 训练输出 (n_samples,)
    """
    # 使用RBF核函数,这是最常用的核函数
    kernel = C(1.0, (1e-3, 1e3)) * RBF(length_scale=1.0, length_scale_bounds=(1e-2, 1e2))
    
    # 创建GP回归器
    gpr = GaussianProcessRegressor(
        kernel=kernel,
        alpha=1e-10,  # 噪声水平
        n_restarts_optimizer=10,  # 优化重启次数
        random_state=42
    )
    
    # 拟合模型
    gpr.fit(X, y)
    
    return gpr

# 示例:在二维空间中优化
def objective_function(x):
    """一个复杂的测试函数"""
    return np.sin(5 * x[0]) * np.cos(5 * x[1]) + np.exp(-0.5 * ((x[0] - 0.5)**2 + (x[1] - 0.5)**2))

# 初始化数据
X_train = np.random.uniform(0, 1, (5, 2))
y_train = np.array([objective_function(x) for x in X_train])

# 构建模型
gp_model = create_gp_model(X_train, y_train)

2.1.2 采集函数(Acquisition Function)

采集函数决定下一个评估点,最常用的是期望改进(Expected Improvement, EI)

from scipy.stats import norm

def expected_improvement(X, X_sample, Y_sample, gpr, xi=0.01):
    """
    计算期望改进值
    X: 候选点
    X_sample: 已评估点
    Y_sample: 已评估值
    gpr: 高斯过程模型
    xi: 探索参数,越大越倾向于探索
    """
    # 获取当前最佳值
    mu, sigma = gpr.predict(X, return_std=True)
    best_y = np.max(Y_sample)
    
    # 计算改进值
    with np.errstate(divide='ignore'):
        Z = (mu - best_y - xi) / sigma
        ei = (mu - best_y - xi) * norm.cdf(Z) + sigma * norm.pdf(Z)
        ei[sigma == 0.0] = 0.0
    
    return ei

# 选择下一个评估点
def propose_next_point(X_sample, Y_sample, gpr, bounds, n_restarts=25):
    """
    使用EI采集函数提议下一个评估点
    """
    from scipy.optimize import minimize
    
    def min_obj(X):
        # 最小化负EI
        return -expected_improvement(X.reshape(-1, 2), X_sample, Y_sample, gpr)
    
    # 在多个起始点优化以避免局部最优
    min_val = 1
    min_x = None
    
    for x0 in np.random.uniform(bounds[:, 0], bounds[:, 1], size=(n_restarts, 2)):
        res = minimize(min_obj, x0=x0, bounds=bounds, method='L-BFGS-B')
        
        if res.fun < min_val:
            min_val = res.fun
            min_x = res.x
    
    return min_x.reshape(-1, 2)

# 完整的贝叶斯优化循环
def bayesian_optimization(objective, bounds, n_iter=20):
    """
    完整的贝叶斯优化流程
    """
    # 初始化
    X_sample = np.random.uniform(bounds[:, 0], bounds[:, 1], (5, 2))
    Y_sample = np.array([objective(x) for x in X_sample])
    
    for i in range(n_iter):
        # 构建GP模型
        gpr = create_gp_model(X_sample, Y_sample)
        
        # 提议下一个点
        next_x = propose_next_point(X_sample, Y_sample, gpr, bounds)
        
        # 评估目标函数
        next_y = objective(next_x[0])
        
        # 更新数据集
        X_sample = np.vstack([X_sample, next_x])
        Y_sample = np.append(Y_sample, next_y)
        
        print(f"Iteration {i+1}: x={next_x[0]}, y={next_y:.4f}, best={np.max(Y_sample):.4f}")
    
    best_idx = np.argmax(Y_sample)
    return X_sample[best_idx], Y_sample[best_idx]

# 运行优化
bounds = np.array([[0, 1], [0, 1]])
best_x, best_y = bayesian_optimization(objective_function, bounds, n_iter=15)
print(f"\n最优解: x={best_x}, y={best_y}")

2.2 遗传算法(Genetic Algorithm)

遗传算法通过模拟自然选择来进化解决方案,特别适合离散和高维空间。

2.2.1 基本实现

import random
from typing import List, Tuple, Callable

class Individual:
    """个体类"""
    def __init__(self, chromosome: List[float], fitness: float = None):
        self.chromosome = chromosome  # 染色体(解向量)
        self.fitness = fitness        # 适应度
    
    def __lt__(self, other):
        return self.fitness < other.fitness

def genetic_algorithm(
    objective_func: Callable[[List[float]], float],
    bounds: List[Tuple[float, float]],
    pop_size: int = 50,
    n_generations: int = 100,
    mutation_rate: float = 0.1,
    crossover_rate: float = 0.8,
    maximize: bool = True
) -> Tuple[List[float], float]:
    """
    遗传算法实现
    
    Parameters:
    - objective_func: 目标函数
    - bounds: 每个变量的取值范围 [(min, max), ...]
    - pop_size: 种群大小
    - n_generations: 迭代次数
    - mutation_rate: 变异概率
    - crossover_rate: 交叉概率
    - maximize: 是否最大化目标函数
    """
    
    # 1. 初始化种群
    def create_individual():
        return [random.uniform(low, high) for low, high in bounds]
    
    population = [Individual(create_individual()) for _ in range(pop_size)]
    
    # 评估适应度
    def evaluate_fitness(ind: Individual):
        if ind.fitness is None:
            ind.fitness = objective_func(ind.chromosome)
        return ind.fitness
    
    # 2. 进化循环
    for generation in range(n_generations):
        # 评估整个种群
        for ind in population:
            evaluate_fitness(ind)
        
        # 排序(如果是最大化问题,fitness越高越好)
        if maximize:
            population.sort(reverse=True)
        else:
            population.sort()
        
        # 记录当前最优
        best_ind = population[0]
        avg_fitness = sum(ind.fitness for ind in population) / len(population)
        
        if generation % 10 == 0:
            print(f"Generation {generation}: Best={best_ind.fitness:.4f}, Avg={avg_fitness:.4f}")
        
        # 3. 选择(锦标赛选择)
        def tournament_selection(k=3):
            """k锦标赛选择"""
            tournament = random.sample(population, k)
            if maximize:
                return max(tournament, key=lambda x: x.fitness)
            else:
                return min(tournament, key=lambda x: x.fitness)
        
        # 4. 交叉(模拟二进制交叉)
        def simulated_binary_crossover(parent1, parent2, eta=15):
            """模拟二进制交叉"""
            child1 = []
            child2 = []
            
            for i in range(len(parent1)):
                u = random.random()
                if u <= 0.5:
                    beta = (2 * u) ** (1 / (eta + 1))
                else:
                    beta = (1 / (2 * (1 - u))) ** (1 / (eta + 1))
                
                child1.append(0.5 * ((1 + beta) * parent1[i] + (1 - beta) * parent2[i]))
                child2.append(0.5 * ((1 - beta) * parent1[i] + (1 + beta) * parent2[i]))
            
            # 边界处理
            for i, (low, high) in enumerate(bounds):
                child1[i] = max(low, min(high, child1[i]))
                child2[i] = max(low, min(high, child2[i]))
            
            return child1, child2
        
        # 5. 变异(多项式变异)
        def polynomial_mutation(individual, eta=20):
            """多项式变异"""
            mutated = individual[:]
            
            for i in range(len(mutated)):
                if random.random() < mutation_rate:
                    delta1 = (mutated[i] - bounds[i][0]) / (bounds[i][1] - bounds[i][0])
                    delta2 = (bounds[i][1] - mutated[i]) / (bounds[i][1] - bounds[i][0])
                    
                    r = random.random()
                    if r <= 0.5:
                        delta_q = (2 * r + (1 - 2 * r) * (1 - delta1) ** (eta + 1)) ** (1 / (eta + 1)) - 1
                    else:
                        delta_q = 1 - (2 * (1 - r) + 2 * (r - 0.5) * (1 - delta2) ** (eta + 1)) ** (1 / (eta + 1))
                    
                    mutated[i] += delta_q * (bounds[i][1] - bounds[i][0])
                    mutated[i] = max(bounds[i][0], min(bounds[i][1], mutated[i]))
            
            return mutated
        
        # 生成新一代
        new_population = [best_ind]  # 精英保留
        
        while len(new_population) < pop_size:
            # 选择父代
            parent1 = tournament_selection()
            parent2 = tournament_selection()
            
            # 交叉
            if random.random() < crossover_rate:
                child1, child2 = simulated_binary_crossover(parent1.chromosome, parent2.chromosome)
            else:
                child1, child2 = parent1.chromosome[:], parent2.chromosome[:]
            
            # 变异
            child1 = polynomial_mutation(child1)
            child2 = polynomial_mutation(child2)
            
            # 添加到新种群
            new_population.append(Individual(child1))
            if len(new_population) < pop_size:
                new_population.append(Individual(child2))
        
        population = new_population
    
    # 返回最优解
    best_ind = max(population, key=lambda x: x.fitness) if maximize else min(population, key=lambda x: x.fitness)
    return best_ind.chromosome, best_ind.fitness

# 示例:优化Rastrigin函数(多峰函数)
def rastrigin(x):
    """Rastrigin函数,常用于测试优化算法"""
    A = 10
    return A * len(x) + sum([(xi**2 - A * np.cos(2 * np.pi * xi)) for xi in x])

# 运行遗传算法
bounds = [(-5.12, 5.12)] * 2  # 二维Rastrigin函数
best_solution, best_fitness = genetic_algorithm(
    rastrigin, 
    bounds, 
    pop_size=50, 
    n_generations=100,
    mutation_rate=0.1,
    crossover_rate=0.8
)
print(f"\n遗传算法最优解: {best_solution}, 适应度: {best_fitness}")

2.3 模拟退火(Simulated Annealing)

模拟退火通过引入随机性来避免局部最优,灵感来自金属退火过程。

def simulated_annealing(
    objective_func: Callable[[List[float]], float],
    initial_solution: List[float],
    bounds: List[Tuple[float, float]],
    temperature: float = 100.0,
    cooling_rate: float = 0.95,
    min_temperature: float = 1e-8,
    max_iter: int = 1000,
    step_size: float = 0.1
) -> Tuple[List[float], float]:
    """
    模拟退火算法实现
    
    Parameters:
    - objective_func: 目标函数
    - initial_solution: 初始解
    - bounds: 变量边界
    - temperature: 初始温度
    - cooling_rate: 冷却速率
    - min_temperature: 最小温度
    - max_iter: 最大迭代次数
    - step_size: 邻域搜索步长
    """
    
    current_solution = initial_solution[:]
    current_energy = objective_func(current_solution)
    
    best_solution = current_solution[:]
    best_energy = current_energy
    
    history = [(current_solution[:], current_energy, temperature)]
    
    iteration = 0
    while temperature > min_temperature and iteration < max_iter:
        # 生成邻域解
        new_solution = current_solution[:]
        
        # 随机选择一个维度进行扰动
        dim = random.randint(0, len(new_solution) - 1)
        perturbation = random.gauss(0, step_size)
        new_solution[dim] += perturbation
        
        # 边界处理
        new_solution[dim] = max(bounds[dim][0], min(bounds[dim][1], new_solution[dim]))
        
        # 计算新解的能量
        new_energy = objective_func(new_solution)
        
        # 计算能量差(假设最小化问题)
        energy_delta = new_energy - current_energy
        
        # 接受准则
        if energy_delta < 0:
            # 更优解,总是接受
            accept = True
        else:
            # 更差解,以一定概率接受
            acceptance_prob = np.exp(-energy_delta / temperature)
            accept = random.random() < acceptance_prob
        
        if accept:
            current_solution = new_solution
            current_energy = new_energy
            
            # 更新历史
            history.append((current_solution[:], current_energy, temperature))
            
            # 更新全局最优
            if current_energy < best_energy:
                best_solution = current_solution[:]
                best_energy = current_energy
        
        # 降温
        temperature *= cooling_rate
        iteration += 1
        
        if iteration % 100 == 0:
            print(f"Iteration {iteration}: Temp={temperature:.6f}, Current={current_energy:.4f}, Best={best_energy:.4f}")
    
    return best_solution, best_energy, history

# 示例:优化多峰函数
def multimodal_function(x):
    """一个复杂的多峰函数"""
    return (x[0]**2 - 10*np.cos(2*np.pi*x[0])) + (x[1]**2 - 10*np.cos(2*np.pi*x[1]))

# 运行模拟退火
initial = [np.random.uniform(-5, 5) for _ in range(2)]
bounds = [(-5, 5), (-5, 5)]

best_sa, energy_sa, history_sa = simulated_annealing(
    multimodal_function,
    initial,
    bounds,
    temperature=100.0,
    cooling_rate=0.98,
    max_iter=2000,
    step_size=0.5
)
print(f"\n模拟退火最优解: {best_sa}, 能量: {energy_sa}")

三、高级探索策略与技巧

3.1 多臂老虎机(Multi-Armed Bandit)

多臂老虎机是探索优化的理论基础,特别适合在线决策场景。

class BanditArm:
    """老虎机臂"""
    def __init__(self, true_mean, true_std=1.0):
        self.true_mean = true_mean
        self.true_std = true_std
        self.n = 0  # 拉动次数
        self.mean = 0.0  # 估计均值
    
    def pull(self):
        """拉动臂"""
        return np.random.normal(self.true_mean, self.true_std)
    
    def update(self, reward):
        """更新估计"""
        self.n += 1
        self.mean += (reward - self.mean) / self.n

class EpsilonGreedyBandit:
    """ε-贪婪算法"""
    def __init__(self, arms: List[BanditArm], epsilon: float = 0.1):
        self.arms = arms
        self.epsilon = epsilon
    
    def run(self, n_steps: int):
        history = []
        
        for step in range(n_steps):
            # 探索还是利用
            if random.random() < self.epsilon:
                # 探索:随机选择
                arm_idx = random.randint(0, len(self.arms) - 1)
            else:
                # 利用:选择当前最优
                arm_idx = np.argmax([arm.mean for arm in self.arms])
            
            # 拉动臂
            reward = self.arms[arm_idx].pull()
            self.arms[arm_idx].update(reward)
            
            history.append({
                'step': step,
                'arm': arm_idx,
                'reward': reward,
                'mean': self.arms[arm_idx].mean
            })
        
        return history

# 示例:5个臂的老虎机
arms = [
    BanditArm(1.0, 0.5),
    BanditArm(1.2, 0.5),
    BanditArm(0.8, 0.5),
    BanditArm(1.1, 0.5),
    BanditArm(0.9, 0.5)
]

bandit = EpsilonGreedyBandit(arms, epsilon=0.1)
history = bandit.run(1000)

# 结果分析
print("各臂拉动次数:", [arm.n for arm in arms])
print("各臂估计均值:", [f"{arm.mean:.3f}" for arm in arms])
print("真实最优臂: 1 (真实均值1.2)")

3.2 序贯模型优化(SMBO)

SMBO是贝叶斯优化的框架化,将优化过程分为评估和建模两个阶段。

class SequentialModelBasedOptimization:
    """序贯模型优化框架"""
    
    def __init__(self, objective_func, config_space, max_evals=50):
        self.objective = objective_func
        self.config_space = config_space  # 配置空间
        self.max_evals = max_evals
        self.history = []
    
    def sample_random_config(self):
        """随机采样配置"""
        return self.config_space.sample_configuration()
    
    def build_model(self, configs, losses):
        """构建代理模型"""
        # 这里使用简单的高斯过程
        from sklearn.gaussian_process import GaussianProcessRegressor
        from sklearn.gaussian_process.kernels import Matern
        
        # 将配置转换为数值矩阵
        X = np.array([list(c.values()) for c in configs])
        y = np.array(losses)
        
        kernel = Matern(nu=2.5, length_scale=1.0)
        model = GaussianProcessRegressor(
            kernel=kernel,
            alpha=1e-6,
            n_restarts_optimizer=5,
            random_state=42
        )
        model.fit(X, y)
        return model
    
    def select_next_config(self, model, configs, losses):
        """选择下一个配置"""
        # 使用期望改进
        best_loss = min(losses)
        
        # 在配置空间中采样候选点
        candidates = [self.sample_random_config() for _ in range(100)]
        X_candidates = np.array([list(c.values()) for c in candidates])
        
        mu, sigma = model.predict(X_candidates, return_std=True)
        
        # 计算EI
        with np.errstate(divide='ignore'):
            improvement = best_loss - mu
            Z = improvement / sigma
            ei = improvement * norm.cdf(Z) + sigma * norm.pdf(Z)
            ei[sigma == 0.0] = 0.0
        
        # 选择EI最大的配置
        best_idx = np.argmax(ei)
        return candidates[best_idx]
    
    def run(self):
        """运行SMBO"""
        # 初始随机采样
        n_init = 5
        configs = [self.sample_random_config() for _ in range(n_init)]
        losses = [self.objective(c) for c in configs]
        
        self.history.extend(zip(configs, losses))
        
        # 主循环
        for i in range(n_init, self.max_evals):
            # 构建模型
            model = self.build_model(configs, losses)
            
            # 选择下一个配置
            next_config = self.select_next_config(model, configs, losses)
            
            # 评估
            next_loss = self.objective(next_config)
            
            # 更新数据
            configs.append(next_config)
            losses.append(next_loss)
            self.history.append((next_config, next_loss))
            
            print(f"Iteration {i+1}: Loss={next_loss:.4f}, Config={next_config}")
        
        # 返回最优
        best_idx = np.argmin(losses)
        return configs[best_idx], losses[1]

# 示例配置空间
import ConfigSpace as CS

config_space = CS.ConfigurationSpace()
config_space.add_hyperparameter(CS.UniformFloatHyperparameter('lr', lower=1e-4, upper=1e-1, log=True))
config_space.add_hyperparameter(CS.UniformFloatHyperparameter('dropout', lower=0.0, upper=0.5))
config_space.add_hyperparameter(CS.UniformIntegerHyperparameter('hidden_size', lower=64, upper=512))

# 模拟目标函数(例如训练神经网络)
def objective(config):
    lr = config['lr']
    dropout = config['dropout']
    hidden = config['hidden_size']
    
    # 模拟损失:越小越好
    # 实际中这是训练和验证过程
    base_loss = 2.0
    loss = base_loss - 0.5 * np.log10(lr) + 0.3 * dropout - 0.001 * hidden
    loss += np.random.normal(0, 0.05)  # 噪声
    
    return loss

# 运行SMBO
smbo = SequentialModelBasedOptimization(objective, config_space, max_evals=20)
best_config, best_loss = smbo.run()
print(f"\nSMBO最优配置: {best_config}, 损失: {best_loss}")

四、避免常见陷阱的策略

4.1 陷阱识别与应对

陷阱1:过早收敛(Premature Convergence)

表现:算法过早陷入局部最优,不再探索新区域。

解决方案

  • 多样性保持:在遗传算法中使用拥挤度距离(Crowding Distance)
  • 重启机制:定期重新初始化部分个体
  • 自适应探索率:根据种群多样性动态调整探索参数
def diversity_preserving_ga(objective_func, bounds, pop_size=50, n_generations=100):
    """带有多样性保持的遗传算法"""
    
    def crowding_distance(population):
        """计算拥挤度距离"""
        n = len(population)
        distances = [0.0] * n
        
        # 对每个目标维度计算距离
        for i in range(len(population[0].chromosome)):
            # 按该维度排序
            sorted_pop = sorted(population, key=lambda x: x.chromosome[i])
            
            # 边界个体赋予无限距离
            distances[sorted_pop[0]] = float('inf')
            distances[sorted_pop[-1]] = float('inf')
            
            # 计算内部距离
            range_val = sorted_pop[-1].chromosome[i] - sorted_pop[0].chromosome[i]
            if range_val > 0:
                for j in range(1, n-1):
                    distances[sorted_pop[j]] += (sorted_pop[j+1].chromosome[i] - sorted_pop[j-1].chromosome[i]) / range_val
        
        return distances
    
    # 在选择时考虑拥挤度
    # ... (其他部分与标准GA相同)
    
    # 当种群多样性过低时(例如最大适应度差值小于阈值),进行扰动
    max_fit = max(ind.fitness for ind in population)
    min_fit = min(ind.fitness for ind in population)
    
    if max_fit - min_fit < 0.01:  # 种群过于集中
        # 对部分个体进行大幅度变异
        for ind in population[pop_size//2:]:
            for j in range(len(ind.chromosome)):
                if random.random() < 0.3:
                    ind.chromosome[j] = random.uniform(bounds[j][0], bounds[j][1])
            ind.fitness = None  # 重新评估

陷阱2:评估成本爆炸

表现:需要评估的点太多,时间/资源不可接受。

解决方案

  • 早停策略:当改进小于阈值时停止
  • 并行评估:同时评估多个点
  • 代理模型加速:使用廉价代理模型预筛选
def early_stopping_optimization(objective_func, bounds, patience=10, min_delta=1e-4):
    """带早停的优化"""
    best_loss = float('inf')
    no_improve_count = 0
    history = []
    
    for i in range(1000):  # 最大迭代次数
        # 生成候选点
        candidate = [random.uniform(low, high) for low, high in bounds]
        loss = objective_func(candidate)
        history.append(loss)
        
        # 检查改进
        if best_loss - loss > min_delta:
            best_loss = loss
            no_improve_count = 0
            print(f"Iteration {i}: New best {best_loss:.6f}")
        else:
            no_improve_count += 1
        
        # 早停检查
        if no_improve_count >= patience:
            print(f"早停触发:{patience}次无改进")
            break
    
    return best_loss, history

陷阱3:维度灾难

表现:随着维度增加,搜索空间指数级增长,算法失效。

解决方案

  • 降维技术:使用PCA、Autoencoder等
  • 坐标下降:一次优化一个维度
  • 随机投影:在低维空间搜索
def coordinate_descent_optimization(objective_func, bounds, max_iter=100):
    """坐标下降法"""
    n_dims = len(bounds)
    current = [random.uniform(low, high) for low, high in bounds]
    best_loss = objective_func(current)
    
    for iteration in range(max_iter):
        improved = False
        
        # 依次优化每个维度
        for dim in range(n_dims):
            # 在当前维度上进行一维搜索
            def one_dim_obj(x):
                temp = current[:]
                temp[dim] = x
                return objective_func(temp)
            
            # 使用黄金分割搜索
            from scipy.optimize import minimize_scalar
            
            res = minimize_scalar(
                one_dim_obj,
                bounds=bounds[dim],
                method='bounded'
            )
            
            if res.fun < best_loss:
                current[dim] = res.x
                best_loss = res.fun
                improved = True
        
        print(f"Coordinate Descent Iter {iteration}: Loss={best_loss:.6f}")
        
        if not improved:
            break
    
    return current, best_loss

陷阱4:噪声干扰

表现:目标函数有噪声,导致评估不稳定,算法误判。

解决方案

  • 多次评估:对同一配置多次评估取平均
  • 平滑滤波:使用移动平均
  • 鲁棒采集函数:考虑噪声方差
def noisy_objective_robust_optimization(objective_func, bounds, n_repeats=3):
    """处理噪声的鲁棒优化"""
    
    def robust_objective(x):
        """多次评估取平均"""
        values = [objective_func(x) for _ in range(n_repeats)]
        return np.mean(values), np.var(values)
    
    # 在贝叶斯优化中使用噪声感知模型
    def gp_with_noise(X, y, noise_vars):
        """带噪声的高斯过程"""
        from sklearn.gaussian_process import GaussianProcessRegressor
        
        # 使用噪声感知核
        kernel = C(1.0) * RBF(1.0)
        gpr = GaussianProcessRegressor(
            kernel=kernel,
            alpha=noise_vars,  # 每个点的噪声水平
            n_restarts_optimizer=10
        )
        gpr.fit(X, y)
        return gpr
    
    # 评估历史
    X_sample = []
    y_sample = []
    noise_vars = []
    
    for _ in range(10):
        x = [random.uniform(low, high) for low, high in bounds]
        y, var = robust_objective(x)
        X_sample.append(x)
        y_sample.append(y)
        noise_vars.append(var)
    
    # 构建噪声感知模型
    model = gp_with_noise(np.array(X_sample), np.array(y_sample), np.array(noise_vars))
    
    return model

4.2 算法选择指南

场景特征 推荐算法 关键参数 注意事项
评估成本极高 贝叶斯优化 采集函数类型、初始样本数 需要连续空间
高维离散空间 遗传算法 种群大小、变异率 易早熟收敛
快速评估 模拟退火 初始温度、冷却速率 需要调参
在线学习 多臂老虎机 探索率ε 实时性要求
混合空间 SMBO 配置空间定义 需要领域知识

五、实战案例:超参数优化

5.1 问题设定

优化神经网络超参数是一个典型的探索优化问题:

  • 目标:最小化验证集损失
  • 搜索空间:学习率、隐藏层大小、dropout率等
  • 约束:训练时间限制、内存限制
  • 噪声:随机初始化、数据采样

5.2 完整实现

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

class SimpleNN(nn.Module):
    """简单的神经网络"""
    def __init__(self, hidden_size, dropout):
        super().__init__()
        self.fc1 = nn.Linear(20, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size // 2)
        self.fc3 = nn.Linear(hidden_size // 2, 2)
        self.dropout = nn.Dropout(dropout)
        self.relu = nn.ReLU()
    
    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.relu(self.fc2(x))
        x = self.fc3(x)
        return x

def train_and_evaluate(config, train_loader, val_loader, epochs=10):
    """训练和评估模型"""
    # 创建模型
    model = SimpleNN(config['hidden_size'], config['dropout'])
    
    # 优化器
    optimizer = optim.Adam(model.parameters(), lr=config['lr'])
    criterion = nn.CrossEntropyLoss()
    
    # 训练
    model.train()
    for epoch in range(epochs):
        for batch_x, batch_y in train_loader:
            optimizer.zero_grad()
            outputs = model(batch_x)
            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()
    
    # 验证
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for batch_x, batch_y in val_loader:
            outputs = model(batch_x)
            loss = criterion(outputs, batch_y)
            val_loss += loss.item()
    
    return val_loss / len(val_loader)

# 生成模拟数据
def generate_data(n_samples=1000):
    X = torch.randn(n_samples, 20)
    y = torch.randint(0, 2, (n_samples,))
    return TensorDataset(X, y)

# 数据准备
full_dataset = generate_data(2000)
train_size = int(0.7 * len(full_dataset))
val_size = len(full_dataset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(full_dataset, [train_size, val_size])
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32)

# 定义配置空间
config_space = {
    'lr': (1e-4, 1e-2),
    'hidden_size': (64, 256),
    'dropout': (0.0, 0.5)
}

# 使用贝叶斯优化进行超参数搜索
def optimize_hyperparameters(train_loader, val_loader, n_trials=20):
    """超参数优化"""
    
    def objective(config):
        # 将离散参数转换为整数
        config['hidden_size'] = int(config['hidden_size'])
        try:
            loss = train_and_evaluate(config, train_loader, val_loader, epochs=5)
            return loss
        except Exception as e:
            print(f"评估失败: {e}")
            return float('inf')
    
    # 使用Optuna风格的实现
    from skopt import gp_minimize
    from skopt.space import Real, Integer
    from skopt.utils import use_named_args
    
    search_space = [
        Real(1e-4, 1e-2, name='lr', prior='log-uniform'),
        Integer(64, 256, name='hidden_size'),
        Real(0.0, 0.5, name='dropout')
    ]
    
    @use_named_args(search_space)
    def wrapped_objective(**params):
        return objective(params)
    
    result = gp_minimize(
        wrapped_objective,
        search_space,
        n_calls=n_trials,
        random_state=42,
        verbose=True
    )
    
    return result

# 运行优化(注意:实际运行需要PyTorch)
# result = optimize_hyperparameters(train_loader, val_loader, n_trials=10)
# print(f"最优配置: {result.x}, 最佳损失: {result.fun}")

六、最佳实践与建议

6.1 算法选择决策树

评估成本高? → 是 → 贝叶斯优化
            ↓ 否
维度高? → 是 → 遗传算法
            ↓ 否
离散空间? → 是 → 遗传算法
            ↓ 否
需要在线? → 是 → 多臂老虎机
            ↓ 否
快速原型? → 是 → 模拟退火
            ↓ 否
坐标下降

6.2 参数调优建议

  1. 贝叶斯优化

    • 初始样本:5-10个(维度相关)
    • 采集函数:EI或PI,根据探索需求
    • 核函数:Matern 5/2适合大多数情况
  2. 遗传算法

    • 种群大小:20-50(维度相关)
    • 变异率:0.05-0.2
    • 交叉率:0.7-0.9
    • 精英保留:1-5%
  3. 模拟退火

    • 初始温度:10-1000(根据目标函数范围)
    • 冷却速率:0.9-0.99
    • 重启策略:每100次迭代重启一次

6.3 监控与调试

def optimization_monitoring(history, true_optimum=None):
    """优化过程监控"""
    import matplotlib.pyplot as plt
    
    fig, axes = plt.subplots(1, 2, figsize=(12, 5))
    
    # 最优值变化
    if isinstance(history[0], tuple):  # (config, loss)
        losses = [h[1] for h in history]
    else:  # 直接是loss
        losses = history
    
    axes[0].plot(losses)
    axes[0].set_xlabel('Iteration')
    axes[0].set_ylabel('Loss')
    axes[0].set_title('Optimization Progress')
    
    if true_optimum:
        axes[0].axhline(y=true_optimum, color='r', linestyle='--', label='True Optimum')
        axes[0].legend()
    
    # 收敛分析
    if len(losses) > 10:
        window = 10
        moving_avg = np.convolve(losses, np.ones(window)/window, mode='valid')
        axes[1].plot(moving_avg)
        axes[1].set_xlabel('Iteration')
        axes[1].set_ylabel(f'Moving Avg (window={window})')
        axes[1].set_title('Convergence Analysis')
    
    plt.tight_layout()
    plt.show()
    
    # 统计信息
    print(f"最终损失: {losses[-1]:.6f}")
    print(f"最佳损失: {min(losses):.6f}")
    print(f"改进幅度: {(losses[0] - min(losses)) / losses[0] * 100:.2f}%")
    print(f"总迭代次数: {len(losses)}")

七、总结

探索优化是一个充满挑战但极其重要的领域。成功的关键在于:

  1. 理解问题:明确目标函数特性、约束条件和评估成本
  2. 选择合适的算法:根据问题特征选择最匹配的算法
  3. 避免陷阱:使用多样性保持、早停、鲁棒性设计
  4. 监控过程:实时监控优化过程,及时调整策略
  5. 组合策略:混合多种算法,发挥各自优势

记住,没有万能的算法。在实际应用中,往往需要根据具体问题进行定制化设计。建议从简单算法开始,逐步增加复杂度,并始终保留随机搜索作为基线对比。

通过本文提供的代码框架和策略,你应该能够在大多数探索优化场景中取得良好效果。祝你探索愉快!