实验设计Agent:参数优化与结果分析

深入探索如何构建智能实验设计Agent,实现自动化参数优化、实验规划和结果分析,提升科研实验的效率和质量

概述与动机

在科学研究和工程实践中,实验设计是连接理论与实践的关键桥梁。传统的实验设计方法往往依赖于研究者的经验和直觉,在参数空间探索、实验规划和结果分析方面存在诸多局限性。随着问题复杂度的增加,手动设计的实验可能无法充分探索参数空间,导致次优的结果或错过重要的发现。

实验设计Agent的出现为这一挑战提供了智能化解决方案。通过整合贝叶斯优化、自动化实验规划、统计分析和可视化技术,实验设计Agent能够系统性地探索参数空间,设计高效的实验序列,并提供深度的结果分析。这种智能Agent不仅能够显著提高实验效率,减少实验次数和成本,还能发现人类可能忽略的参数组合和模式。

本文将深入探讨如何构建一个完整的实验设计Agent系统,包括参数空间建模、贝叶斯优化算法、实验规划策略、结果分析和可视化等核心模块的实现。我们将通过实际代码示例展示如何将这些技术整合到一个统一的Agent框架中,并讨论在生产环境中部署和维护此类系统需要考虑的关键因素。

核心概念与架构设计

实验设计Agent的核心组件

实验设计Agent由多个智能组件构成,每个组件负责特定的功能模块:

  1. 参数空间建模器:定义和管理实验参数的搜索空间,包括连续参数、离散参数和条件参数。处理参数约束和边界条件,确保生成的实验配置是可行的。

  2. 贝叶斯优化引擎:使用高斯过程等概率模型建模目标函数,通过采集函数(如Expected Improvement)选择下一个最有希望的参数配置,实现智能的参数探索。

  3. 实验规划器:根据优化算法的建议,生成详细的实验计划和执行序列。处理资源约束、时间限制和实验依赖关系。

  4. 结果分析器:对实验结果进行统计分析,包括效果评估、显著性检验、敏感性分析和收敛性分析。提供可视化的结果展示。

  5. 可复现性验证器:确保实验结果的可靠性和可重复性,处理随机性和不确定性问题,生成完整的实验文档。

系统架构设计

实验设计Agent采用分层架构设计,确保系统的可扩展性和模块化:

Rendering diagram...

这个架构设计的优势在于:

  • 模块化:每个组件可以独立开发和测试,便于维护和升级
  • 可扩展性:支持添加新的优化算法和实验类型
  • 灵活性:可以适应不同领域的实验需求
  • 鲁棒性:单个组件的故障不会影响整个系统的运行

实验设计工作流程

实验设计Agent的工作流程分为几个关键阶段:

Rendering diagram...

这种工作流程设计确保了实验设计的系统性和科学性,同时保持了系统的可扩展性和灵活性。

关键技术实现

参数空间建模

首先实现一个灵活的参数空间建模系统:

import numpy as np
from typing import List, Dict, Union, Tuple, Optional, Callable
from dataclasses import dataclass, field
from enum import Enum
import json
import random

class ParameterType(Enum):
    """参数类型枚举"""
    CONTINUOUS = "continuous"
    DISCRETE = "discrete"
    CATEGORICAL = "categorical"
    CONDITIONAL = "conditional"

@dataclass
class Parameter:
    """参数定义"""
    name: str
    param_type: ParameterType
    bounds: Tuple[float, float] = None  # 对于连续参数
    values: List[Union[str, float, int]] = None  # 对于离散/分类参数
    default: Union[float, int, str] = None
    description: str = ""
    log_scale: bool = False  # 是否使用对数刻度
    depends_on: Dict[str, Union[str, float, int]] = None  # 条件依赖
    
    def __post_init__(self):
        """参数初始化后的验证"""
        if self.param_type == ParameterType.CONTINUOUS:
            if self.bounds is None:
                raise ValueError(f"Continuous parameter {self.name} requires bounds")
            if len(self.bounds) != 2:
                raise ValueError(f"Bounds for {self.name} must be a tuple of 2 values")
        
        elif self.param_type in [ParameterType.DISCRETE, ParameterType.CATEGORICAL]:
            if self.values is None:
                raise ValueError(f"{self.param_type.value} parameter {self.name} requires values")
    
    def sample(self) -> Union[float, int, str]:
        """从参数分布中采样"""
        if self.param_type == ParameterType.CONTINUOUS:
            if self.log_scale:
                log_lower, log_upper = np.log(self.bounds)
                return np.exp(np.random.uniform(log_lower, log_upper))
            else:
                return np.random.uniform(*self.bounds)
        
        elif self.param_type == ParameterType.DISCRETE:
            return np.random.choice(self.values)
        
        elif self.param_type == ParameterType.CATEGORICAL:
            return np.random.choice(self.values)
        
        elif self.param_type == ParameterType.CONDITIONAL:
            # 条件参数的特殊处理
            if self.values:
                return np.random.choice(self.values)
            else:
                return self.default
        
        return self.default

class ParameterSpace:
    """参数空间定义"""
    
    def __init__(self, parameters: List[Parameter]):
        self.parameters = {param.name: param for param in parameters}
        self.parameter_names = list(self.parameters.keys())
        self._validate_parameter_space()
    
    def _validate_parameter_space(self):
        """验证参数空间的完整性"""
        # 检查条件依赖的完整性
        for param in self.parameters.values():
            if param.depends_on:
                for dep_name, dep_value in param.depends_on.items():
                    if dep_name not in self.parameters:
                        raise ValueError(f"Parameter {param.name} depends on undefined parameter {dep_name}")
    
    def add_parameter(self, parameter: Parameter):
        """添加参数到空间"""
        self.parameters[parameter.name] = parameter
        self.parameter_names.append(parameter.name)
        self._validate_parameter_space()
    
    def get_parameter(self, name: str) -> Parameter:
        """获取指定参数"""
        if name not in self.parameters:
            raise ValueError(f"Parameter {name} not found in parameter space")
        return self.parameters[name]
    
    def sample_configuration(self) -> Dict[str, Union[float, int, str]]:
        """采样一个完整的参数配置"""
        config = {}
        
        # 首先处理非条件参数
        for name, param in self.parameters.items():
            if param.param_type != ParameterType.CONDITIONAL:
                config[name] = param.sample()
        
        # 然后处理条件参数
        for name, param in self.parameters.items():
            if param.param_type == ParameterType.CONDITIONAL and param.depends_on:
                # 检查依赖条件是否满足
                condition_met = True
                for dep_name, dep_value in param.depends_on.items():
                    if config.get(dep_name) != dep_value:
                        condition_met = False
                        break
                
                if condition_met:
                    config[name] = param.sample()
                else:
                    config[name] = param.default if param.default is not None else None
        
        return config
    
    def encode_configuration(self, config: Dict[str, Union[float, int, str]]) -> np.ndarray:
        """将参数配置编码为数值向量"""
        encoded = []
        
        for name in self.parameter_names:
            param = self.parameters[name]
            value = config.get(name)
            
            if param.param_type == ParameterType.CONTINUOUS:
                encoded.append(float(value))
            
            elif param.param_type == ParameterType.DISCRETE:
                # 将离散值映射到其索引
                index = param.values.index(value) if value in param.values else 0
                encoded.append(float(index))
            
            elif param.param_type == ParameterType.CATEGORICAL:
                # 使用独热编码
                one_hot = [0.0] * len(param.values)
                if value in param.values:
                    one_hot[param.values.index(value)] = 1.0
                encoded.extend(one_hot)
            
            elif param.param_type == ParameterType.CONDITIONAL:
                # 条件参数的特殊处理
                if value is not None:
                    if param.param_type == ParameterType.CATEGORICAL:
                        one_hot = [0.0] * len(param.values)
                        if value in param.values:
                            one_hot[param.values.index(value)] = 1.0
                        encoded.extend(one_hot)
                    else:
                        encoded.append(float(value))
                else:
                    # 如果条件不满足,添加占位符
                    if param.param_type == ParameterType.CATEGORICAL:
                        encoded.extend([0.0] * len(param.values))
                    else:
                        encoded.append(0.0)
        
        return np.array(encoded)
    
    def decode_configuration(self, encoded: np.ndarray) -> Dict[str, Union[float, int, str]]:
        """将数值向量解码为参数配置"""
        config = {}
        index = 0
        
        for name in self.parameter_names:
            param = self.parameters[name]
            
            if param.param_type == ParameterType.CONTINUOUS:
                config[name] = float(encoded[index])
                index += 1
            
            elif param.param_type == ParameterType.DISCRETE:
                value_index = int(encoded[index])
                config[name] = param.values[value_index % len(param.values)]
                index += 1
            
            elif param.param_type == ParameterType.CATEGORICAL:
                one_hot = encoded[index:index + len(param.values)]
                max_index = np.argmax(one_hot)
                config[name] = param.values[max_index]
                index += len(param.values)
            
            elif param.param_type == ParameterType.CONDITIONAL:
                # 条件参数的特殊处理
                if param.param_type == ParameterType.CATEGORICAL:
                    one_hot = encoded[index:index + len(param.values)]
                    if np.sum(one_hot) > 0:
                        max_index = np.argmax(one_hot)
                        config[name] = param.values[max_index]
                    else:
                        config[name] = param.default
                    index += len(param.values)
                else:
                    if encoded[index] != 0:
                        config[name] = float(encoded[index])
                    else:
                        config[name] = param.default
                    index += 1
        
        return config
    
    def get_dimensions(self) -> int:
        """获取编码后的维度数"""
        dimensions = 0
        for name in self.parameter_names:
            param = self.parameters[name]
            
            if param.param_type == ParameterType.CONTINUOUS:
                dimensions += 1
            elif param.param_type == ParameterType.DISCRETE:
                dimensions += 1
            elif param.param_type == ParameterType.CATEGORICAL:
                dimensions += len(param.values)
            elif param.param_type == ParameterType.CONDITIONAL:
                if param.param_type == ParameterType.CATEGORICAL:
                    dimensions += len(param.values)
                else:
                    dimensions += 1
        
        return dimensions
    
    def export_to_json(self, filepath: str):
        """导出参数空间到JSON文件"""
        export_data = []
        for param in self.parameters.values():
            param_data = {
                'name': param.name,
                'type': param.param_type.value,
                'bounds': param.bounds,
                'values': param.values,
                'default': param.default,
                'description': param.description,
                'log_scale': param.log_scale,
                'depends_on': param.depends_on
            }
            export_data.append(param_data)
        
        with open(filepath, 'w') as f:
            json.dump(export_data, f, indent=2)
    
    @classmethod
    def import_from_json(cls, filepath: str) -> 'ParameterSpace':
        """从JSON文件导入参数空间"""
        with open(filepath, 'r') as f:
            import_data = json.load(f)
        
        parameters = []
        for param_data in import_data:
            param = Parameter(
                name=param_data['name'],
                param_type=ParameterType(param_data['type']),
                bounds=tuple(param_data['bounds']) if param_data['bounds'] else None,
                values=param_data['values'],
                default=param_data['default'],
                description=param_data['description'],
                log_scale=param_data.get('log_scale', False),
                depends_on=param_data.get('depends_on')
            )
            parameters.append(param)
        
        return cls(parameters)

# 使用示例
def demonstrate_parameter_space():
    """演示参数空间的使用"""
    # 定义参数空间
    parameters = [
        Parameter(
            name="learning_rate",
            param_type=ParameterType.CONTINUOUS,
            bounds=(1e-5, 1e-1),
            log_scale=True,
            description="Learning rate for optimization"
        ),
        Parameter(
            name="batch_size",
            param_type=ParameterType.DISCRETE,
            values=[16, 32, 64, 128, 256],
            default=32,
            description="Batch size for training"
        ),
        Parameter(
            name="optimizer",
            param_type=ParameterType.CATEGORICAL,
            values=["adam", "sgd", "rmsprop"],
            default="adam",
            description="Optimizer type"
        ),
        Parameter(
            name="momentum",
            param_type=ParameterType.CONDITIONAL,
            values=[0.9, 0.95, 0.99],
            default=0.9,
            depends_on={"optimizer": "sgd"},
            description="Momentum for SGD optimizer"
        )
    ]
    
    param_space = ParameterSpace(parameters)
    
    # 采样几个配置
    print("Sampled configurations:")
    for i in range(3):
        config = param_space.sample_configuration()
        encoded = param_space.encode_configuration(config)
        decoded = param_space.decode_configuration(encoded)
        
        print(f"\nConfiguration {i+1}:")
        print(f"  Original: {config}")
        print(f"  Encoded shape: {encoded.shape}")
        print(f"  Decoded: {decoded}")
        print(f"  Reconstruction matches: {config == decoded}")
    
    print(f"\nParameter space dimensions: {param_space.get_dimensions()}")
    
    # 导出和导入
    param_space.export_to_json("parameter_space.json")
    imported_space = ParameterSpace.import_from_json("parameter_space.json")
    print(f"Imported parameter space has {len(imported_space.parameters)} parameters")

if __name__ == "__main__":
    demonstrate_parameter_space()

贝叶斯优化引擎

实现核心的贝叶斯优化引擎:

from scipy.optimize import minimize
from scipy.stats import norm
from typing import Callable, List, Dict, Tuple, Optional
import numpy as np
from enum import Enum

class AcquisitionFunction(Enum):
    """采集函数类型"""
    EXPECTED_IMPROVEMENT = "expected_improvement"
    UPPER_CONFIDENCE_BOUND = "upper_confidence_bound"
    PROBABILITY_IMPROVEMENT = "probability_improvement"
    THOMPSON_SAMPLING = "thompson_sampling"

class GaussianProcessRegressor:
    """简化版的高斯过程回归器"""
    
    def __init__(self, kernel: str = 'rbf', alpha: float = 1.0):
        """初始化高斯过程"""
        self.kernel = kernel
        self.alpha = alpha
        self.X_train = None
        self.y_train = None
        self.kernel_params = {'length_scale': 1.0, 'sigma_f': 1.0}
    
    def _rbf_kernel(self, X1: np.ndarray, X2: np.ndarray) -> np.ndarray:
        """RBF核函数"""
        sqdist = np.sum(X1**2, 1).reshape(-1, 1) + np.sum(X2**2, 1) - 2 * np.dot(X1, X2.T)
        return self.kernel_params['sigma_f']**2 * np.exp(-0.5 / self.kernel_params['length_scale']**2 * sqdist)
    
    def fit(self, X: np.ndarray, y: np.ndarray):
        """拟合高斯过程"""
        self.X_train = X
        self.y_train = y
        
        # 简单的超参数优化
        self._optimize_hyperparameters()
    
    def _optimize_hyperparameters(self):
        """优化核函数超参数"""
        def negative_log_likelihood(params):
            self.kernel_params['length_scale'] = params[0]
            self.kernel_params['sigma_f'] = params[1]
            
            K = self._rbf_kernel(self.X_train, self.X_train) + self.alpha * np.eye(len(self.X_train))
            L = np.linalg.cholesky(K)
            alpha = np.linalg.solve(L, self.y_train)
            
            log_likelihood = -0.5 * np.dot(self.y_train.T, alpha) - np.sum(np.log(np.diag(L))) - len(self.y_train) / 2 * np.log(2 * np.pi)
            return -log_likelihood
        
        # 初始猜测
        initial_params = [1.0, 1.0]
        bounds = [(0.1, 10.0), (0.1, 10.0)]
        
        result = minimize(negative_log_likelihood, initial_params, bounds=bounds, method='L-BFGS-B')
        
        if result.success:
            self.kernel_params['length_scale'] = result.x[0]
            self.kernel_params['sigma_f'] = result.x[1]
    
    def predict(self, X: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        """预测均值和方差"""
        K = self._rbf_kernel(self.X_train, self.X_train) + self.alpha * np.eye(len(self.X_train))
        K_s = self._rbf_kernel(self.X_train, X)
        K_ss = self._rbf_kernel(X, X)
        
        L = np.linalg.cholesky(K)
        alpha = np.linalg.solve(L, self.y_train)
        v = np.linalg.solve(L, K_s)
        
        # 预测均值
        mean = np.dot(K_s.T, alpha)
        
        # 预测方差
        variance = np.diag(K_ss) - np.sum(v**2, axis=0)
        
        return mean, np.sqrt(variance)
    
    def sample_posterior(self, X: np.ndarray, n_samples: int = 1) -> np.ndarray:
        """从后验分布采样"""
        mean, std = self.predict(X)
        samples = np.random.normal(mean[:, None], std[:, None], size=(len(mean), n_samples))
        return samples

class BayesianOptimizer:
    """贝叶斯优化器"""
    
    def __init__(self, 
                 parameter_space: ParameterSpace,
                 acquisition_function: AcquisitionFunction = AcquisitionFunction.EXPECTED_IMPROVEMENT,
                 n_initial_points: int = 5,
                 random_seed: int = None):
        """
        初始化贝叶斯优化器
        
        Args:
            parameter_space: 参数空间定义
            acquisition_function: 采集函数类型
            n_initial_points: 初始随机采样点数
            random_seed: 随机种子
        """
        self.parameter_space = parameter_space
        self.acquisition_function = acquisition_function
        self.n_initial_points = n_initial_points
        self.random_seed = random_seed
        
        if random_seed is not None:
            np.random.seed(random_seed)
        
        # 初始化高斯过程
        self.gp = GaussianProcessRegressor()
        
        # 存储历史数据
        self.X_history = []
        self.y_history = []
        self.config_history = []
        
        # 采集函数参数
        self.exploration_weight = 0.1  # 用于UCB
        self.xi = 0.01  # 用于EI和PI
    
    def _initialize_random_points(self) -> List[Dict]:
        """生成初始随机采样点"""
        initial_points = []
        for _ in range(self.n_initial_points):
            config = self.parameter_space.sample_configuration()
            initial_points.append(config)
        return initial_points
    
    def suggest_next(self) -> Dict:
        """建议下一个参数配置"""
        # 如果还没有足够的数据点,返回随机采样
        if len(self.X_history) < self.n_initial_points:
            return self.parameter_space.sample_configuration()
        
        # 将配置转换为编码向量
        X_encoded = np.array(self.X_history)
        y_encoded = np.array(self.y_history)
        
        # 拟合高斯过程
        self.gp.fit(X_encoded, y_encoded)
        
        # 优化采集函数
        best_config = self._optimize_acquisition()
        
        return best_config
    
    def _optimize_acquisition(self) -> Dict:
        """优化采集函数找到下一个最佳配置"""
        # 使用多起点优化
        n_candidates = 1000  # 候选点数量
        candidates = []
        acquisition_values = []
        
        # 生成随机候选点
        for _ in range(n_candidates):
            candidate_config = self.parameter_space.sample_configuration()
            candidate_encoded = self.parameter_space.encode_configuration(candidate_config)
            candidates.append(candidate_config)
            acquisition_values.append(self._compute_acquisition_value(candidate_encoded))
        
        # 选择采集函数值最大的候选点
        best_index = np.argmax(acquisition_values)
        best_candidate = candidates[best_index]
        
        # 在最佳候选点附近进行局部优化
        best_config = self._local_optimization(best_candidate)
        
        return best_config
    
    def _compute_acquisition_value(self, x_encoded: np.ndarray) -> float:
        """计算采集函数值"""
        x_encoded = x_encoded.reshape(1, -1)
        mean, std = self.gp.predict(x_encoded)
        
        if self.acquisition_function == AcquisitionFunction.EXPECTED_IMPROVEMENT:
            return self._expected_improvement(mean, std)
        elif self.acquisition_function == AcquisitionFunction.UPPER_CONFIDENCE_BOUND:
            return self._upper_confidence_bound(mean, std)
        elif self.acquisition_function == AcquisitionFunction.PROBABILITY_IMPROVEMENT:
            return self._probability_improvement(mean, std)
        elif self.acquisition_function == AcquisitionFunction.THOMPSON_SAMPLING:
            return self._thompson_sampling(x_encoded)
        else:
            return 0.0
    
    def _expected_improvement(self, mean: float, std: float) -> float:
        """计算期望改进"""
        if len(self.y_history) == 0:
            return 0.0
        
        best_y = max(self.y_history)
        z = (mean - best_y - self.xi) / std
        
        return (mean - best_y - self.xi) * norm.cdf(z) + std * norm.pdf(z)
    
    def _upper_confidence_bound(self, mean: float, std: float) -> float:
        """计算上置信界"""
        return mean + self.exploration_weight * std
    
    def _probability_improvement(self, mean: float, std: float) -> float:
        """计算改进概率"""
        if len(self.y_history) == 0:
            return 0.0
        
        best_y = max(self.y_history)
        z = (mean - best_y - self.xi) / std
        
        return norm.cdf(z)
    
    def _thompson_sampling(self, x_encoded: np.ndarray) -> float:
        """Thompson采样"""
        sample = self.gp.sample_posterior(x_encoded, n_samples=1)
        return sample[0][0]
    
    def _local_optimization(self, initial_config: Dict) -> Dict:
        """在初始配置附近进行局部优化"""
        best_config = initial_config.copy()
        best_value = float('-inf')
        
        # 简单的局部搜索
        for _ in range(10):  # 局部搜索迭代次数
            # 在当前最佳配置周围添加小扰动
            perturbed_config = self._perturb_configuration(best_config)
            perturbed_encoded = self.parameter_space.encode_configuration(perturbed_config)
            perturbed_value = self._compute_acquisition_value(perturbed_encoded)
            
            if perturbed_value > best_value:
                best_value = perturbed_value
                best_config = perturbed_config
        
        return best_config
    
    def _perturb_configuration(self, config: Dict, perturbation_scale: float = 0.1) -> Dict:
        """对配置添加小扰动"""
        perturbed_config = {}
        
        for name, value in config.items():
            param = self.parameter_space.get_parameter(name)
            
            if param.param_type == ParameterType.CONTINUOUS:
                # 对连续参数添加高斯噪声
                noise = np.random.normal(0, perturbation_scale * (param.bounds[1] - param.bounds[0]))
                perturbed_value = np.clip(value + noise, *param.bounds)
                perturbed_config[name] = perturbed_value
            
            elif param.param_type == ParameterType.DISCRETE:
                # 以一定概率改变离散参数
                if np.random.random() < perturbation_scale:
                    current_index = param.values.index(value)
                    new_index = np.random.choice([i for i in range(len(param.values)) if i != current_index])
                    perturbed_config[name] = param.values[new_index]
                else:
                    perturbed_config[name] = value
            
            elif param.param_type == ParameterType.CATEGORICAL:
                # 以一定概率改变分类参数
                if np.random.random() < perturbation_scale:
                    new_value = np.random.choice([v for v in param.values if v != value])
                    perturbed_config[name] = new_value
                else:
                    perturbed_config[name] = value
            
            elif param.param_type == ParameterType.CONDITIONAL:
                # 条件参数的特殊处理
                if value is not None:
                    if param.param_type == ParameterType.CATEGORICAL:
                        if np.random.random() < perturbation_scale:
                            new_value = np.random.choice([v for v in param.values if v != value])
                            perturbed_config[name] = new_value
                        else:
                            perturbed_config[name] = value
                    else:
                        noise = np.random.normal(0, perturbation_scale)
                        perturbed_config[name] = value + noise
                else:
                    perturbed_config[name] = value
            
            else:
                perturbed_config[name] = value
        
        return perturbed_config
    
    def register(self, config: Dict, value: float):
        """注册实验结果"""
        encoded_config = self.parameter_space.encode_configuration(config)
        self.X_history.append(encoded_config)
        self.y_history.append(value)
        self.config_history.append(config)
    
    def get_best(self) -> Tuple[Dict, float]:
        """获取最佳配置和对应的值"""
        if not self.y_history:
            return None, None
        
        best_index = np.argmax(self.y_history)
        best_config = self.config_history[best_index]
        best_value = self.y_history[best_index]
        
        return best_config, best_value
    
    def get_optimization_history(self) -> Dict:
        """获取优化历史"""
        return {
            'configurations': self.config_history,
            'values': self.y_history,
            'best_values': np.maximum.accumulate(self.y_history).tolist()
        }

# 使用示例
def demonstrate_bayesian_optimization():
    """演示贝叶斯优化"""
    # 定义参数空间
    parameters = [
        Parameter(
            name="x",
            param_type=ParameterType.CONTINUOUS,
            bounds=(-5.0, 5.0),
            description="First dimension"
        ),
        Parameter(
            name="y",
            param_type=ParameterType.CONTINUOUS,
            bounds=(-5.0, 5.0),
            description="Second dimension"
        )
    ]
    
    param_space = ParameterSpace(parameters)
    
    # 创建贝叶斯优化器
    optimizer = BayesianOptimizer(
        parameter_space=param_space,
        acquisition_function=AcquisitionFunction.EXPECTED_IMPROVEMENT,
        n_initial_points=3
    )
    
    # 定义测试函数 (Rastrigin函数)
    def rastrigin_function(x: float, y: float) -> float:
        """Rastrigin函数 - 多峰优化测试函数"""
        A = 10
        return 2*A + x**2 - A*np.cos(2*np.pi*x) + y**2 - A*np.cos(2*np.pi*y)
    
    # 运行优化
    n_iterations = 20
    print("Running Bayesian Optimization:")
    print("=" * 60)
    
    for iteration in range(n_iterations):
        # 建议下一个配置
        config = optimizer.suggest_next()
        
        # 评估配置
        value = rastrigin_function(config['x'], config['y'])
        
        # 注册结果
        optimizer.register(config, value)
        
        # 获取当前最佳
        best_config, best_value = optimizer.get_best()
        
        print(f"Iteration {iteration + 1}:")
        print(f"  Suggested config: x={config['x']:.3f}, y={config['y']:.3f}")
        print(f"  Function value: {value:.3f}")
        print(f"  Best so far: x={best_config['x']:.3f}, y={best_config['y']:.3f}, value={best_value:.3f}")
        print()
    
    # 获取优化历史
    history = optimizer.get_optimization_history()
    print(f"Optimization complete. Best value: {history['best_values'][-1]:.3f}")

if __name__ == "__main__":
    demonstrate_bayesian_optimization()

实验规划与执行管理

实现实验规划器和执行管理系统:

from typing import List, Dict, Optional, Callable
from datetime import datetime, timedelta
import time
import json
import os
from dataclasses import dataclass, field
from enum import Enum
import threading
import queue

class ExperimentStatus(Enum):
    """实验状态枚举"""
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

@dataclass
class Experiment:
    """实验定义"""
    id: str
    config: Dict[str, any]
    status: ExperimentStatus = ExperimentStatus.PENDING
    start_time: Optional[datetime] = None
    end_time: Optional[datetime] = None
    result: Optional[float] = None
    error: Optional[str] = None
    metadata: Dict = field(default_factory=dict)
    
    def to_dict(self) -> Dict:
        """转换为字典"""
        return {
            'id': self.id,
            'config': self.config,
            'status': self.status.value,
            'start_time': self.start_time.isoformat() if self.start_time else None,
            'end_time': self.end_time.isoformat() if self.end_time else None,
            'result': self.result,
            'error': self.error,
            'metadata': self.metadata
        }

class ExperimentPlanner:
    """实验规划器"""
    
    def __init__(self, parameter_space: ParameterSpace, max_concurrent: int = 3):
        """初始化实验规划器"""
        self.parameter_space = parameter_space
        self.max_concurrent = max_concurrent
        self.experiments = []
        self.experiment_queue = queue.Queue()
        self.running_experiments = {}
        self.completed_experiments = []
        
        # 实验执行器
        self.experiment_executor = ExperimentExecutor()
        
        # 启动工作线程
        self.workers = []
        for i in range(max_concurrent):
            worker = threading.Thread(target=self._worker_loop, args=(i,))
            worker.daemon = True
            worker.start()
            self.workers.append(worker)
    
    def _worker_loop(self, worker_id: int):
        """工作线程循环"""
        while True:
            try:
                experiment = self.experiment_queue.get(timeout=1)
                if experiment is None:
                    break
                
                print(f"Worker {worker_id} starting experiment {experiment.id}")
                self.running_experiments[experiment.id] = experiment
                
                # 执行实验
                self.experiment_executor.execute_experiment(experiment)
                
                # 移动到完成列表
                self.running_experiments.pop(experiment.id, None)
                self.completed_experiments.append(experiment)
                
                print(f"Worker {worker_id} completed experiment {experiment.id}")
                self.experiment_queue.task_done()
                
            except queue.Empty:
                continue
            except Exception as e:
                print(f"Worker {worker_id} error: {e}")
                if experiment and experiment.id in self.running_experiments:
                    experiment.status = ExperimentStatus.FAILED
                    experiment.error = str(e)
                    experiment.end_time = datetime.now()
                    self.running_experiments.pop(experiment.id, None)
                    self.completed_experiments.append(experiment)
    
    def plan_experiments(self, configurations: List[Dict], 
                        objective_function: Callable,
                        metadata: Dict = None) -> List[Experiment]:
        """规划一系列实验"""
        experiments = []
        
        for i, config in enumerate(configurations):
            experiment_id = f"exp_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{i:04d}"
            
            experiment = Experiment(
                id=experiment_id,
                config=config,
                metadata=metadata or {}
            )
            
            # 设置目标函数
            self.experiment_executor.register_objective_function(experiment_id, objective_function)
            
            self.experiments.append(experiment)
            self.experiment_queue.put(experiment)
        
        return experiments
    
    def wait_for_completion(self, timeout: Optional[float] = None) -> bool:
        """等待所有实验完成"""
        start_time = time.time()
        
        while True:
            # 检查是否所有实验都已完成
            if len(self.completed_experiments) == len(self.experiments):
                return True
            
            # 检查超时
            if timeout and (time.time() - start_time) > timeout:
                return False
            
            time.sleep(0.1)
    
    def get_experiment_status(self, experiment_id: str) -> Optional[Experiment]:
        """获取实验状态"""
        # 检查运行中的实验
        if experiment_id in self.running_experiments:
            return self.running_experiments[experiment_id]
        
        # 检查已完成的实验
        for experiment in self.completed_experiments:
            if experiment.id == experiment_id:
                return experiment
        
        return None
    
    def get_all_experiments(self) -> List[Experiment]:
        """获取所有实验"""
        all_experiments = []
        all_experiments.extend(self.running_experiments.values())
        all_experiments.extend(self.completed_experiments)
        return all_experiments
    
    def get_results(self) -> List[Dict]:
        """获取所有实验结果"""
        results = []
        for experiment in self.completed_experiments:
            if experiment.status == ExperimentStatus.COMPLETED:
                results.append({
                    'config': experiment.config,
                    'result': experiment.result,
                    'experiment_id': experiment.id
                })
        return results
    
    def cancel_experiment(self, experiment_id: str) -> bool:
        """取消实验"""
        # 如果实验还在队列中,无法直接取消
        # 这里简化实现
        if experiment_id in self.running_experiments:
            experiment = self.running_experiments[experiment_id]
            experiment.status = ExperimentStatus.CANCELLED
            return True
        return False
    
    def shutdown(self):
        """关闭规划器"""
        # 停止工作线程
        for _ in range(len(self.workers)):
            self.experiment_queue.put(None)
        
        # 等待工作线程结束
        for worker in self.workers:
            worker.join(timeout=5)
        
        print("Experiment planner shutdown complete")

class ExperimentExecutor:
    """实验执行器"""
    
    def __init__(self):
        """初始化实验执行器"""
        self.objective_functions = {}
    
    def register_objective_function(self, experiment_id: str, 
                                   objective_function: Callable):
        """注册目标函数"""
        self.objective_functions[experiment_id] = objective_function
    
    def execute_experiment(self, experiment: Experiment):
        """执行单个实验"""
        try:
            # 更新状态
            experiment.status = ExperimentStatus.RUNNING
            experiment.start_time = datetime.now()
            
            # 获取目标函数
            objective_func = self.objective_functions.get(experiment.id)
            if not objective_func:
                raise ValueError(f"No objective function registered for experiment {experiment.id}")
            
            # 执行目标函数
            result = objective_func(**experiment.config)
            
            # 更新结果
            experiment.result = result
            experiment.status = ExperimentStatus.COMPLETED
            experiment.end_time = datetime.now()
            
            # 计算执行时间
            execution_time = (experiment.end_time - experiment.start_time).total_seconds()
            experiment.metadata['execution_time'] = execution_time
            
        except Exception as e:
            # 处理错误
            experiment.status = ExperimentStatus.FAILED
            experiment.error = str(e)
            experiment.end_time = datetime.now()
            print(f"Experiment {experiment.id} failed: {e}")

class AdvancedExperimentPlanner(ExperimentPlanner):
    """高级实验规划器"""
    
    def __init__(self, parameter_space: ParameterSpace, max_concurrent: int = 3):
        """初始化高级实验规划器"""
        super().__init__(parameter_space, max_concurrent)
        
        # 资源管理
        self.resource_manager = ResourceManager()
        
        # 实验依赖管理
        self.dependency_graph = {}
        
        # 实验优先级
        self.experiment_priorities = {}
    
    def plan_experiments_with_dependencies(self, 
                                         configurations: List[Dict],
                                         objective_function: Callable,
                                         dependencies: Dict[str, List[str]] = None,
                                         priorities: Dict[str, int] = None) -> List[Experiment]:
        """规划具有依赖关系的实验"""
        experiments = super().plan_experiments(configurations, objective_function)
        
        # 设置依赖关系
        if dependencies:
            for exp_id, dep_ids in dependencies.items():
                self.dependency_graph[exp_id] = dep_ids
        
        # 设置优先级
        if priorities:
            self.experiment_priorities.update(priorities)
        
        return experiments
    
    def _check_dependencies(self, experiment_id: str) -> bool:
        """检查实验的依赖是否满足"""
        if experiment_id not in self.dependency_graph:
            return True
        
        dependencies = self.dependency_graph[experiment_id]
        for dep_id in dependencies:
            dep_experiment = self.get_experiment_status(dep_id)
            if not dep_experiment or dep_experiment.status != ExperimentStatus.COMPLETED:
                return False
        
        return True
    
    def get_experiments_ready_to_run(self) -> List[Experiment]:
        """获取准备运行的实验"""
        ready_experiments = []
        
        for experiment in self.experiments:
            if (experiment.status == ExperimentStatus.PENDING and 
                self._check_dependencies(experiment.id)):
                ready_experiments.append(experiment)
        
        # 按优先级排序
        ready_experiments.sort(key=lambda exp: self.experiment_priorities.get(exp.id, 0), reverse=True)
        
        return ready_experiments

class ResourceManager:
    """资源管理器"""
    
    def __init__(self, max_memory: float = 16.0, max_gpu_memory: float = 8.0):
        """初始化资源管理器"""
        self.max_memory = max_memory  # GB
        self.max_gpu_memory = max_gpu_memory  # GB
        self.allocated_memory = 0.0
        self.allocated_gpu_memory = 0.0
        self.memory_lock = threading.Lock()
    
    def request_resources(self, memory: float, gpu_memory: float = 0.0) -> bool:
        """请求资源"""
        with self.memory_lock:
            if (self.allocated_memory + memory <= self.max_memory and 
                self.allocated_gpu_memory + gpu_memory <= self.max_gpu_memory):
                self.allocated_memory += memory
                self.allocated_gpu_memory += gpu_memory
                return True
            return False
    
    def release_resources(self, memory: float, gpu_memory: float = 0.0):
        """释放资源"""
        with self.memory_lock:
            self.allocated_memory = max(0.0, self.allocated_memory - memory)
            self.allocated_gpu_memory = max(0.0, self.allocated_gpu_memory - gpu_memory)
    
    def get_available_memory(self) -> Tuple[float, float]:
        """获取可用内存"""
        return (self.max_memory - self.allocated_memory, 
                self.max_gpu_memory - self.allocated_gpu_memory)

# 使用示例
def demonstrate_experiment_planning():
    """演示实验规划"""
    # 定义参数空间
    parameters = [
        Parameter(
            name="learning_rate",
            param_type=ParameterType.CONTINUOUS,
            bounds=(1e-4, 1e-1),
            log_scale=True
        ),
        Parameter(
            name="batch_size",
            param_type=ParameterType.DISCRETE,
            values=[16, 32, 64]
        )
    ]
    
    param_space = ParameterSpace(parameters)
    
    # 创建实验规划器
    planner = ExperimentPlanner(parameter_space, max_concurrent=2)
    
    # 定义目标函数
    def simple_objective(learning_rate: float, batch_size: int) -> float:
        """简单的目标函数"""
        # 模拟一些计算时间
        time.sleep(0.5)
        
        # 简单的函数用于演示
        result = -(learning_rate * 1000) + (batch_size / 100) + np.random.normal(0, 0.1)
        return result
    
    # 生成一些配置
    configurations = []
    for i in range(5):
        config = param_space.sample_configuration()
        configurations.append(config)
    
    # 规划实验
    print("Planning experiments...")
    experiments = planner.plan_experiments(configurations, simple_objective)
    print(f"Planned {len(experiments)} experiments")
    
    # 等待完成
    print("Waiting for experiments to complete...")
    completed = planner.wait_for_completion(timeout=30)
    
    if completed:
        print("All experiments completed successfully!")
        
        # 获取结果
        results = planner.get_results()
        print(f"\nResults summary:")
        for i, result in enumerate(results, 1):
            print(f"  Experiment {i}: {result['config']}, Result: {result['result']:.3f}")
        
        # 找到最佳结果
        best_result = max(results, key=lambda x: x['result'])
        print(f"\nBest configuration: {best_result['config']}, Best result: {best_result['result']:.3f}")
    else:
        print("Some experiments did not complete in time")
    
    # 关闭规划器
    planner.shutdown()

if __name__ == "__main__":
    demonstrate_experiment_planning()

结果分析与可视化

实现全面的结果分析和可视化系统:

import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from typing import List, Dict, Optional, Tuple
import numpy as np
import pandas as pd
from dataclasses import dataclass
import json

@dataclass
class AnalysisResult:
    """分析结果"""
    summary_statistics: Dict[str, any]
    statistical_tests: Dict[str, any]
    convergence_analysis: Dict[str, any]
    sensitivity_analysis: Dict[str, any]
    recommendations: List[str]

class ResultAnalyzer:
    """结果分析器"""
    
    def __init__(self, parameter_space: ParameterSpace):
        """初始化结果分析器"""
        self.parameter_space = parameter_space
    
    def analyze_results(self, 
                       configurations: List[Dict],
                       results: List[float],
                       experiment_metadata: List[Dict] = None) -> AnalysisResult:
        """全面分析实验结果"""
        
        # 准备数据
        df = self._prepare_dataframe(configurations, results, experiment_metadata)
        
        # 执行各种分析
        summary_statistics = self._compute_summary_statistics(df)
        statistical_tests = self._perform_statistical_tests(df)
        convergence_analysis = self._analyze_convergence(results)
        sensitivity_analysis = self._analyze_sensitivity(df)
        recommendations = self._generate_recommendations(df, results)
        
        return AnalysisResult(
            summary_statistics=summary_statistics,
            statistical_tests=statistical_tests,
            convergence_analysis=convergence_analysis,
            sensitivity_analysis=sensitivity_analysis,
            recommendations=recommendations
        )
    
    def _prepare_dataframe(self, 
                          configurations: List[Dict],
                          results: List[float],
                          experiment_metadata: List[Dict] = None) -> pd.DataFrame:
        """准备分析数据框"""
        data = []
        
        for i, (config, result) in enumerate(zip(configurations, results)):
            row = config.copy()
            row['result'] = result
            
            if experiment_metadata and i < len(experiment_metadata):
                row.update(experiment_metadata[i])
            
            data.append(row)
        
        return pd.DataFrame(data)
    
    def _compute_summary_statistics(self, df: pd.DataFrame) -> Dict[str, any]:
        """计算汇总统计"""
        stats_dict = {
            'results': {
                'mean': df['result'].mean(),
                'std': df['result'].std(),
                'min': df['result'].min(),
                'max': df['result'].max(),
                'median': df['result'].median(),
                'quartiles': df['result'].quantile([0.25, 0.5, 0.75]).to_dict()
            },
            'sample_size': len(df),
            'best_result': {
                'value': df['result'].max(),
                'configuration': df.loc[df['result'].idxmax()].to_dict()
            },
            'worst_result': {
                'value': df['result'].min(),
                'configuration': df.loc[df['result'].idxmin()].to_dict()
            }
        }
        
        # 为每个参数添加统计
        for param_name in self.parameter_space.parameter_names:
            if param_name in df.columns:
                param_stats = {
                    'unique_values': df[param_name].nunique(),
                    'most_frequent': df[param_name].mode().iloc[0] if not df[param_name].mode().empty else None
                }
                
                # 如果是数值参数,添加更多统计
                if df[param_name].dtype in [np.float64, np.int64]:
                    param_stats.update({
                        'mean': df[param_name].mean(),
                        'std': df[param_name].std(),
                        'min': df[param_name].min(),
                        'max': df[param_name].max()
                    })
                
                stats_dict[param_name] = param_stats
        
        return stats_dict
    
    def _perform_statistical_tests(self, df: pd.DataFrame) -> Dict[str, any]:
        """执行统计检验"""
        tests = {}
        
        # 正态性检验
        _, normality_p_value = stats.shapiro(df['result'])
        tests['normality_test'] = {
            'test': 'Shapiro-Wilk',
            'p_value': normality_p_value,
            'is_normal': normality_p_value > 0.05
        }
        
        # 参数对比分析
        for param_name in self.parameter_space.parameter_names:
            if param_name in df.columns and df[param_name].dtype == 'object':
                # 对于分类参数,执行ANOVA检验
                groups = [group['result'].values for name, group in df.groupby(param_name)]
                if len(groups) >= 2:
                    try:
                        f_stat, p_value = stats.f_oneway(*groups)
                        tests[f'{param_name}_anova'] = {
                            'f_statistic': f_stat,
                            'p_value': p_value,
                            'significant_difference': p_value < 0.05
                        }
                    except:
                        pass
        
        # 相关性分析
        numeric_cols = df.select_dtypes(include=[np.number]).columns
        if len(numeric_cols) > 1:
            correlation_matrix = df[numeric_cols].corr()
            tests['correlations'] = correlation_matrix.to_dict()
        
        return tests
    
    def _analyze_convergence(self, results: List[float]) -> Dict[str, any]:
        """分析收敛性"""
        if len(results) < 3:
            return {'status': 'insufficient_data'}
        
        # 计算累积最佳结果
        cumulative_best = np.maximum.accumulate(results)
        
        # 计算最近结果的方差
        recent_results = results[-min(10, len(results)):]
        recent_variance = np.var(recent_results)
        
        # 计算改进趋势
        if len(results) >= 10:
            recent_improvements = np.diff(cumulative_best[-10:])
            improvement_rate = np.mean(recent_improvements)
        else:
            improvement_rate = 0.0
        
        convergence_status = 'unknown'
        if recent_variance < 0.01 and abs(improvement_rate) < 0.001:
            convergence_status = 'converged'
        elif improvement_rate > 0:
            convergence_status = 'improving'
        else:
            convergence_status = 'stagnant'
        
        return {
            'status': convergence_status,
            'cumulative_best': cumulative_best.tolist(),
            'recent_variance': recent_variance,
            'improvement_rate': improvement_rate,
            'final_improvement': cumulative_best[-1] - results[0]
        }
    
    def _analyze_sensitivity(self, df: pd.DataFrame) -> Dict[str, any]:
        """敏感性分析"""
        sensitivity = {}
        
        for param_name in self.parameter_space.parameter_names:
            if param_name not in df.columns:
                continue
            
            param = self.parameter_space.get_parameter(param_name)
            
            if param.param_type == ParameterType.CONTINUOUS:
                # 计算皮尔逊相关系数
                if df[param_name].dtype in [np.float64, np.int64]:
                    correlation = df[[param_name, 'result']].corr().iloc[0, 1]
                    sensitivity[param_name] = {
                        'correlation': correlation,
                        'sensitivity': abs(correlation),
                        'direction': 'positive' if correlation > 0 else 'negative'
                    }
            
            elif param.param_type in [ParameterType.DISCRETE, ParameterType.CATEGORICAL]:
                # 计算不同取值的结果分布
                param_sensitivity = {}
                for value in df[param_name].unique():
                    value_results = df[df[param_name] == value]['result']
                    param_sensitivity[str(value)] = {
                        'mean': value_results.mean(),
                        'std': value_results.std(),
                        'count': len(value_results)
                    }
                
                sensitivity[param_name] = param_sensitivity
        
        return sensitivity
    
    def _generate_recommendations(self, df: pd.DataFrame, results: List[float]) -> List[str]:
        """生成优化建议"""
        recommendations = []
        
        # 基于收敛状态的建议
        if len(results) >= 10:
            recent_variance = np.var(results[-10:])
            if recent_variance < 0.01:
                recommendations.append("实验结果已趋于收敛,可以考虑停止优化")
            else:
                recommendations.append("实验结果仍在变化,建议继续探索参数空间")
        
        # 基于最佳配置的建议
        best_idx = df['result'].idxmax()
        best_config = df.iloc[best_idx]
        
        for param_name in self.parameter_space.parameter_names:
            if param_name in best_config:
                param = self.parameter_space.get_parameter(param_name)
                recommendations.append(f"参数 {param_name} 的最佳值约为 {best_config[param_name]}")
        
        # 基于敏感性分析的建议
        sensitivity_analysis = self._analyze_sensitivity(df)
        
        # 找到最敏感的参数
        continuous_sensitivities = []
        for param_name, data in sensitivity_analysis.items():
            if isinstance(data, dict) and 'sensitivity' in data:
                continuous_sensitivities.append((param_name, data['sensitivity']))
        
        if continuous_sensitivities:
            continuous_sensitivities.sort(key=lambda x: x[1], reverse=True)
            most_sensitive = continuous_sensitivities[0]
            recommendations.append(f"参数 {most_sensitive[0]} 对结果影响最大,需要精细调节")
        
        # 基于统计检验的建议
        if len(recommendations) < 5:
            recommendations.append("建议增加实验样本数量以提高统计显著性")
            recommendations.append("考虑使用更复杂的采集函数来平衡探索和利用")
        
        return recommendations

class ResultVisualizer:
    """结果可视化器"""
    
    def __init__(self, parameter_space: ParameterSpace):
        """初始化可视化器"""
        self.parameter_space = parameter_space
        plt.style.use('seaborn')
    
    def create_comprehensive_dashboard(self, 
                                      configurations: List[Dict],
                                      results: List[float],
                                      save_path: str = "experiment_dashboard.png"):
        """创建综合仪表板"""
        fig = plt.figure(figsize=(20, 15))
        
        # 创建网格布局
        gs = fig.add_gridspec(3, 4, hspace=0.3, wspace=0.3)
        
        # 准备数据
        analyzer = ResultAnalyzer(self.parameter_space)
        df = analyzer._prepare_dataframe(configurations, results)
        
        # 1. 优化过程图
        ax1 = fig.add_subplot(gs[0, :2])
        self._plot_optimization_process(results, ax1)
        
        # 2. 参数重要性分析
        ax2 = fig.add_subplot(gs[0, 2:])
        self._plot_parameter_importance(df, ax2)
        
        # 3. 结果分布
        ax3 = fig.add_subplot(gs[1, 0])
        self._plot_result_distribution(results, ax3)
        
        # 4. 参数-结果关系 (2D参数)
        ax4 = fig.add_subplot(gs[1, 1:3])
        self._plot_parameter_relationships_2d(df, ax4)
        
        # 5. 热力图 (如果有足够参数)
        ax5 = fig.add_subplot(gs[1, 3])
        self._plot_correlation_heatmap(df, ax5)
        
        # 6. 收敛分析
        ax6 = fig.add_subplot(gs[2, :2])
        self._plot_convergence_analysis(results, ax6)
        
        # 7. 参数影响分析
        ax7 = fig.add_subplot(gs[2, 2:])
        self._plot_parameter_impact_analysis(df, ax7)
        
        plt.suptitle('实验结果综合分析仪表板', fontsize=20, y=1.02)
        plt.savefig(save_path, dpi=300, bbox_inches='tight')
        plt.close()
        
        return save_path
    
    def _plot_optimization_process(self, results: List[float], ax):
        """绘制优化过程"""
        iterations = range(1, len(results) + 1)
        cumulative_best = np.maximum.accumulate(results)
        
        ax.plot(iterations, results, 'o-', alpha=0.6, label='每次实验结果')
        ax.plot(iterations, cumulative_best, 'r-', linewidth=2, label='累积最佳结果')
        
        ax.set_xlabel('实验迭代次数')
        ax.set_ylabel('目标函数值')
        ax.set_title('优化过程跟踪')
        ax.legend()
        ax.grid(True, alpha=0.3)
    
    def _plot_parameter_importance(self, df: pd.DataFrame, ax):
        """绘制参数重要性"""
        analyzer = ResultAnalyzer(self.parameter_space)
        sensitivity = analyzer._analyze_sensitivity(df)
        
        param_names = []
        importances = []
        
        for param_name, data in sensitivity.items():
            if isinstance(data, dict) and 'sensitivity' in data:
                param_names.append(param_name)
                importances.append(data['sensitivity'])
        
        if param_names:
            # 按重要性排序
            sorted_indices = np.argsort(importances)[::-1]
            param_names = [param_names[i] for i in sorted_indices]
            importances = [importances[i] for i in sorted_indices]
            
            ax.barh(param_names, importances, color='steelblue')
            ax.set_xlabel('敏感性')
            ax.set_title('参数重要性排序')
        else:
            ax.text(0.5, 0.5, '暂无连续参数', ha='center', va='center')
            ax.set_title('参数重要性排序')
    
    def _plot_result_distribution(self, results: List[float], ax):
        """绘制结果分布"""
        ax.hist(results, bins=20, alpha=0.7, color='skyblue', edgecolor='black')
        ax.axvline(np.mean(results), color='red', linestyle='--', linewidth=2, label=f'均值: {np.mean(results):.3f}')
        ax.axvline(np.median(results), color='green', linestyle='--', linewidth=2, label=f'中位数: {np.median(results):.3f}')
        
        ax.set_xlabel('目标函数值')
        ax.set_ylabel('频数')
        ax.set_title('结果分布')
        ax.legend()
    
    def _plot_parameter_relationships_2d(self, df: pd.DataFrame, ax):
        """绘制2D参数关系"""
        # 找到两个连续参数
        continuous_params = []
        for param_name in self.parameter_space.parameter_names:
            param = self.parameter_space.get_parameter(param_name)
            if param.param_type == ParameterType.CONTINUOUS and param_name in df.columns:
                continuous_params.append(param_name)
                if len(continuous_params) == 2:
                    break
        
        if len(continuous_params) == 2:
            param1, param2 = continuous_params
            scatter = ax.scatter(df[param1], df[param2], c=df['result'], 
                               cmap='RdYlBu', alpha=0.6, s=100)
            ax.set_xlabel(param1)
            ax.set_ylabel(param2)
            ax.set_title(f'{param1} vs {param2} (颜色表示结果)')
            plt.colorbar(scatter, ax=ax, label='目标函数值')
        else:
            ax.text(0.5, 0.5, '需要至少两个连续参数', ha='center', va='center')
            ax.set_title('2D参数关系')
    
    def _plot_correlation_heatmap(self, df: pd.DataFrame, ax):
        """绘制相关性热力图"""
        numeric_cols = df.select_dtypes(include=[np.number]).columns
        if len(numeric_cols) > 1:
            correlation_matrix = df[numeric_cols].corr()
            sns.heatmap(correlation_matrix, annot=True, fmt='.2f', 
                       cmap='coolwarm', center=0, ax=ax, cbar_kws={'label': '相关系数'})
            ax.set_title('参数相关性热力图')
        else:
            ax.text(0.5, 0.5, '需要数值参数', ha='center', va='center')
            ax.set_title('相关性热力图')
    
    def _plot_convergence_analysis(self, results: List[float], ax):
        """绘制收敛分析"""
        analyzer = ResultAnalyzer(self.parameter_space)
        convergence = analyzer._analyze_convergence(results)
        
        if 'cumulative_best' in convergence:
            cumulative_best = convergence['cumulative_best']
            iterations = range(1, len(cumulative_best) + 1)
            
            ax.plot(iterations, cumulative_best, 'b-', linewidth=2, label='累积最佳结果')
            ax.fill_between(iterations, 0, cumulative_best, alpha=0.3)
            
            # 标记收敛点
            if convergence.get('status') == 'converged':
                ax.axvline(len(cumulative_best), color='red', linestyle='--', 
                          label='收敛点')
            
            ax.set_xlabel('实验迭代次数')
            ax.set_ylabel('最佳目标函数值')
            ax.set_title(f'收敛分析 (状态: {convergence.get("status", "unknown")})')
            ax.legend()
            ax.grid(True, alpha=0.3)
        else:
            ax.text(0.5, 0.5, '数据不足', ha='center', va='center')
            ax.set_title('收敛分析')
    
    def _plot_parameter_impact_analysis(self, df: pd.DataFrame, ax):
        """绘制参数影响分析"""
        # 为每个离散/分类参数绘制箱线图
        discrete_params = []
        for param_name in self.parameter_space.parameter_names:
            param = self.parameter_space.get_parameter(param_name)
            if param.param_type in [ParameterType.DISCRETE, ParameterType.CATEGORICAL] and param_name in df.columns:
                if df[param_name].nunique() <= 5:  # 限制显示的类别数
                    discrete_params.append(param_name)
        
        if discrete_params:
            param_name = discrete_params[0]
            df.boxplot(column='result', by=param_name, ax=ax)
            ax.set_xlabel(param_name)
            ax.set_ylabel('目标函数值')
            ax.set_title(f'{param_name} 对结果的影响')
        else:
            ax.text(0.5, 0.5, '没有合适的离散参数', ha='center', va='center')
            ax.set_title('参数影响分析')

# 使用示例
def demonstrate_result_analysis():
    """演示结果分析"""
    # 定义参数空间
    parameters = [
        Parameter(
            name="temperature",
            param_type=ParameterType.CONTINUOUS,
            bounds=(0.1, 10.0),
            log_scale=True
        ),
        Parameter(
            name="top_p",
            param_type=ParameterType.CONTINUOUS,
            bounds=(0.1, 1.0)
        ),
        Parameter(
            name="top_k",
            param_type=ParameterType.DISCRETE,
            values=[1, 5, 10, 20, 50]
        )
    ]
    
    param_space = ParameterSpace(parameters)
    
    # 模拟一些实验结果
    np.random.seed(42)
    configurations = []
    results = []
    
    for i in range(30):
        config = param_space.sample_configuration()
        # 模拟一个复杂的目标函数
        result = -(config['temperature'] - 2.0)**2 - (config['top_p'] - 0.9)**2 + np.random.normal(0, 0.1)
        
        # 添加一些top_k的影响
        if config['top_k'] == 10:
            result += 0.5
        
        configurations.append(config)
        results.append(result)
    
    # 创建分析器和可视化器
    analyzer = ResultAnalyzer(param_space)
    visualizer = ResultVisualizer(param_space)
    
    # 执行分析
    analysis = analyzer.analyze_results(configurations, results)
    
    print("实验结果分析报告")
    print("=" * 60)
    
    print("\n1. 汇总统计:")
    print(f"   实验次数: {analysis.summary_statistics['sample_size']}")
    print(f"   平均结果: {analysis.summary_statistics['results']['mean']:.3f}")
    print(f"   标准差: {analysis.summary_statistics['results']['std']:.3f}")
    print(f"   最佳结果: {analysis.summary_statistics['best_result']['value']:.3f}")
    print(f"   最差结果: {analysis.summary_statistics['worst_result']['value']:.3f}")
    
    print("\n2. 收敛分析:")
    convergence = analysis.convergence_analysis
    print(f"   状态: {convergence.get('status', 'unknown')}")
    print(f"   最终改进: {convergence.get('final_improvement', 0):.3f}")
    print(f"   改进率: {convergence.get('improvement_rate', 0):.6f}")
    
    print("\n3. 敏感性分析:")
    for param_name, data in analysis.sensitivity_analysis.items():
        if isinstance(data, dict) and 'sensitivity' in data:
            print(f"   {param_name}: 相关性={data['correlation']:.3f}, "
                  f"敏感性={data['sensitivity']:.3f}")
    
    print("\n4. 统计检验:")
    normality = analysis.statistical_tests.get('normality_test', {})
    print(f"   正态性检验: p值={normality.get('p_value', 0):.4f}, "
          f"{'符合正态分布' if normality.get('is_normal', False) else '不符合正态分布'}")
    
    print("\n5. 优化建议:")
    for i, recommendation in enumerate(analysis.recommendations, 1):
        print(f"   {i}. {recommendation}")
    
    # 创建可视化仪表板
    dashboard_path = visualizer.create_comprehensive_dashboard(configurations, results)
    print(f"\n可视化仪表板已保存到: {dashboard_path}")

if __name__ == "__main__":
    demonstrate_result_analysis()

实验设计Agent主系统

最后,我们将所有组件整合到一个完整的实验设计Agent系统中:

import asyncio
from typing import List, Dict, Optional, Callable
import json
from datetime import datetime
import logging

class ExperimentDesignAgent:
    """实验设计Agent主系统"""
    
    def __init__(self, 
                 parameter_space: ParameterSpace,
                 objective_function: Callable,
                 config: Optional[Dict] = None):
        """
        初始化实验设计Agent
        
        Args:
            parameter_space: 参数空间定义
            objective_function: 目标函数
            config: Agent配置
        """
        self.parameter_space = parameter_space
        self.objective_function = objective_function
        self.config = config or self._default_config()
        
        # 初始化各个组件
        self.optimizer = BayesianOptimizer(
            parameter_space=parameter_space,
            acquisition_function=AcquisitionFunction(
                self.config.get('acquisition_function', 'expected_improvement')
            ),
            n_initial_points=self.config.get('n_initial_points', 5)
        )
        
        self.planner = ExperimentPlanner(
            parameter_space=parameter_space,
            max_concurrent=self.config.get('max_concurrent', 3)
        )
        
        self.analyzer = ResultAnalyzer(parameter_space)
        self.visualizer = ResultVisualizer(parameter_space)
        
        # 设置日志
        self._setup_logging()
        
        # 存储实验历史
        self.experiment_history = []
        self.optimization_trace = []
    
    def _default_config(self) -> Dict:
        """默认配置"""
        return {
            'max_iterations': 50,
            'n_initial_points': 5,
            'max_concurrent': 3,
            'acquisition_function': 'expected_improvement',
            'early_stopping_patience': 10,
            'early_stopping_threshold': 0.001,
            'save_results': True,
            'generate_visualizations': True,
            'timeout_per_experiment': 300
        }
    
    def _setup_logging(self):
        """设置日志系统"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('experiment_agent.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger('ExperimentDesignAgent')
    
    async def run_optimization(self, 
                              max_iterations: Optional[int] = None,
                              budget: Optional[float] = None) -> Dict:
        """
        运行完整的优化流程
        
        Args:
            max_iterations: 最大迭代次数
            budget: 时间预算(秒)
        
        Returns:
            优化结果字典
        """
        max_iterations = max_iterations or self.config['max_iterations']
        start_time = datetime.now()
        
        self.logger.info(f"Starting optimization with max {max_iterations} iterations")
        self.logger.info(f"Parameter space dimensions: {self.parameter_space.get_dimensions()}")
        
        # 优化主循环
        for iteration in range(max_iterations):
            iteration_start = datetime.now()
            
            # 检查时间预算
            if budget and (datetime.now() - start_time).total_seconds() > budget:
                self.logger.info("Time budget reached, stopping optimization")
                break
            
            # 1. 建议下一个配置
            try:
                config = self.optimizer.suggest_next()
                self.logger.info(f"Iteration {iteration + 1}: Suggested configuration")
            except Exception as e:
                self.logger.error(f"Error suggesting configuration: {e}")
                break
            
            # 2. 执行实验
            try:
                result = await self._execute_experiment(config)
                self.logger.info(f"Experiment result: {result:.6f}")
            except Exception as e:
                self.logger.error(f"Error executing experiment: {e}")
                continue
            
            # 3. 注册结果
            self.optimizer.register(config, result)
            
            # 4. 记录优化轨迹
            self.optimization_trace.append({
                'iteration': iteration + 1,
                'config': config,
                'result': result,
                'timestamp': datetime.now().isoformat(),
                'elapsed_time': (datetime.now() - iteration_start).total_seconds()
            })
            
            # 5. 检查早停条件
            if self._should_stop_early():
                self.logger.info("Early stopping condition met")
                break
            
            # 6. 定期分析
            if (iteration + 1) % 5 == 0:
                self._interim_analysis(iteration + 1)
        
        # 最终分析
        final_results = self._finalize_optimization(start_time)
        
        return final_results
    
    async def _execute_experiment(self, config: Dict) -> float:
        """执行单个实验"""
        # 在线程池中执行目标函数
        loop = asyncio.get_event_loop()
        
        # 添加超时控制
        timeout = self.config.get('timeout_per_experiment', 300)
        
        try:
            result = await asyncio.wait_for(
                loop.run_in_executor(None, self.objective_function, **config),
                timeout=timeout
            )
            return result
        except asyncio.TimeoutError:
            self.logger.warning(f"Experiment timeout after {timeout} seconds")
            raise
        except Exception as e:
            self.logger.error(f"Experiment execution error: {e}")
            raise
    
    def _should_stop_early(self) -> bool:
        """检查是否应该早停"""
        if len(self.optimizer.y_history) < self.config['early_stopping_patience']:
            return False
        
        # 检查最近的改进
        recent_results = self.optimizer.y_history[-self.config['early_stopping_patience']:]
        best_recent = max(recent_results)
        best_overall = max(self.optimizer.y_history)
        
        improvement = best_overall - best_recent
        
        return improvement < self.config['early_stopping_threshold']
    
    def _interim_analysis(self, iteration: int):
        """中间分析"""
        if len(self.optimizer.y_history) < 3:
            return
        
        # 获取当前最佳
        best_config, best_value = self.optimizer.get_best()
        
        self.logger.info(f"Interim analysis at iteration {iteration}:")
        self.logger.info(f"  Best result so far: {best_value:.6f}")
        self.logger.info(f"  Best config: {best_config}")
        
        # 分析收敛性
        if len(self.optimizer.y_history) >= 10:
            convergence = self.analyzer._analyze_convergence(self.optimizer.y_history)
            self.logger.info(f"  Convergence status: {convergence.get('status', 'unknown')}")
    
    def _finalize_optimization(self, start_time: datetime) -> Dict:
        """完成优化并生成最终报告"""
        end_time = datetime.now()
        total_time = (end_time - start_time).total_seconds()
        
        # 获取最佳结果
        best_config, best_value = self.optimizer.get_best()
        
        self.logger.info(f"Optimization completed in {total_time:.2f} seconds")
        self.logger.info(f"Best result: {best_value:.6f}")
        self.logger.info(f"Best configuration: {best_config}")
        
        # 提取实验历史
        configurations = [item['config'] for item in self.optimization_trace]
        results = [item['result'] for item in self.optimization_trace]
        
        # 全面分析
        self.logger.info("Performing comprehensive analysis...")
        analysis = self.analyzer.analyze_results(configurations, results)
        
        # 生成可视化
        visualizations = {}
        if self.config.get('generate_visualizations', True):
            self.logger.info("Generating visualizations...")
            dashboard_path = self.visualizer.create_comprehensive_dashboard(
                configurations, results
            )
            visualizations['dashboard'] = dashboard_path
        
        # 保存结果
        final_report = {
            'optimization_summary': {
                'total_iterations': len(self.optimization_trace),
                'total_time': total_time,
                'best_result': best_value,
                'best_configuration': best_config,
                'convergence_status': analysis.convergence_analysis.get('status', 'unknown')
            },
            'detailed_analysis': {
                'summary_statistics': analysis.summary_statistics,
                'statistical_tests': analysis.statistical_tests,
                'convergence_analysis': analysis.convergence_analysis,
                'sensitivity_analysis': analysis.sensitivity_analysis,
                'recommendations': analysis.recommendations
            },
            'optimization_trace': self.optimization_trace,
            'visualizations': visualizations,
            'timestamp': end_time.isoformat()
        }
        
        # 保存到文件
        if self.config.get('save_results', True):
            self._save_results(final_report)
        
        return final_report
    
    def _save_results(self, report: Dict):
        """保存优化结果"""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        
        # 保存完整报告
        report_path = f"optimization_report_{timestamp}.json"
        with open(report_path, 'w') as f:
            json.dump(report, f, indent=2, default=str)
        
        self.logger.info(f"Optimization report saved to {report_path}")
        
        # 保存优化轨迹为CSV
        import pandas as pd
        trace_df = pd.DataFrame(self.optimization_trace)
        trace_path = f"optimization_trace_{timestamp}.csv"
        trace_df.to_csv(trace_path, index=False)
        
        self.logger.info(f"Optimization trace saved to {trace_path}")
    
    def export_report(self, report: Dict, format: str = 'markdown') -> str:
        """导出报告为指定格式"""
        if format == 'markdown':
            return self._export_to_markdown(report)
        elif format == 'html':
            return self._export_to_html(report)
        else:
            raise ValueError(f"Unsupported format: {format}")
    
    def _export_to_markdown(self, report: Dict) -> str:
        """导出为Markdown格式"""
        md = f"# 实验优化报告\n\n"
        
        # 优化摘要
        summary = report['optimization_summary']
        md += "## 优化摘要\n\n"
        md += f"- **总迭代次数**: {summary['total_iterations']}\n"
        md += f"- **总耗时**: {summary['total_time']:.2f} 秒\n"
        md += f"- **最佳结果**: {summary['best_result']:.6f}\n"
        md += f"- **收敛状态**: {summary['convergence_status']}\n\n"
        
        md += "### 最佳配置\n\n"
        for param, value in summary['best_configuration'].items():
            md += f"- **{param}**: {value}\n"
        
        # 详细分析
        analysis = report['detailed_analysis']
        md += "\n## 详细分析\n\n"
        
        # 统计摘要
        stats = analysis['summary_statistics']
        md += "### 统计摘要\n\n"
        md += f"- **实验次数**: {stats['sample_size']}\n"
        md += f"- **平均结果**: {stats['results']['mean']:.6f}\n"
        md += f"- **标准差**: {stats['results']['std']:.6f}\n"
        md += f"- **最小值**: {stats['results']['min']:.6f}\n"
        md += f"- **最大值**: {stats['results']['max']:.6f}\n\n"
        
        # 敏感性分析
        sensitivity = analysis['sensitivity_analysis']
        md += "### 敏感性分析\n\n"
        for param_name, data in sensitivity.items():
            if isinstance(data, dict) and 'sensitivity' in data:
                md += f"- **{param_name}**: 相关性={data['correlation']:.3f}, "
                md += f"敏感性={data['sensitivity']:.3f}, 方向={data['direction']}\n"
        
        # 建议
        recommendations = analysis['recommendations']
        md += "\n## 优化建议\n\n"
        for i, recommendation in enumerate(recommendations, 1):
            md += f"{i}. {recommendation}\n"
        
        # 可视化
        if 'visualizations' in report and report['visualizations']:
            md += "\n## 可视化\n\n"
            for name, path in report['visualizations'].items():
                md += f"- **{name}**: `{path}`\n"
        
        return md
    
    def _export_to_html(self, report: Dict) -> str:
        """导出为HTML格式"""
        summary = report['optimization_summary']
        analysis = report['detailed_analysis']
        
        html = f"""
<!DOCTYPE html>
<html>
<head>
    <title>实验优化报告</title>
    <style>
        body {{ font-family: Arial, sans-serif; max-width: 1200px; margin: 0 auto; padding: 20px; }}
        h1 {{ color: #2c3e50; border-bottom: 2px solid #3498db; }}
        h2 {{ color: #34495e; margin-top: 30px; }}
        .summary {{ background-color: #f8f9fa; padding: 15px; border-radius: 5px; }}
        .config {{ background-color: #e8f4f8; padding: 10px; margin: 10px 0; border-left: 4px solid #3498db; }}
        .metric {{ display: inline-block; margin: 10px; padding: 10px; background-color: #fff; border: 1px solid #ddd; border-radius: 5px; }}
        .recommendation {{ background-color: #d5f4e6; padding: 10px; margin: 5px 0; border-radius: 5px; }}
    </style>
</head>
<body>
    <h1>实验优化报告</h1>
    
    <div class="summary">
        <h2>优化摘要</h2>
        <div class="metric"><strong>总迭代次数:</strong> {summary['total_iterations']}</div>
        <div class="metric"><strong>总耗时:</strong> {summary['total_time']:.2f} 秒</div>
        <div class="metric"><strong>最佳结果:</strong> {summary['best_result']:.6f}</div>
        <div class="metric"><strong>收敛状态:</strong> {summary['convergence_status']}</div>
    </div>
    
    <h2>最佳配置</h2>
    {"".join(f"<div class='config'><strong>{param}:</strong> {value}</div>" 
             for param, value in summary['best_configuration'].items())}
    
    <h2>详细分析</h2>
    <h3>统计摘要</h3>
    <div class="metric"><strong>实验次数:</strong> {analysis['summary_statistics']['sample_size']}</div>
    <div class="metric"><strong>平均结果:</strong> {analysis['summary_statistics']['results']['mean']:.6f}</div>
    <div class="metric"><strong>标准差:</strong> {analysis['summary_statistics']['results']['std']:.6f}</div>
    
    <h3>敏感性分析</h3>
    {"".join(f"<div class='config'><strong>{param}:</strong> 相关性={data['correlation']:.3f}, "
             f"敏感性={data['sensitivity']:.3f}</div>" 
             for param, data in analysis['sensitivity_analysis'].items() 
             if isinstance(data, dict) and 'sensitivity' in data)}
    
    <h2>优化建议</h2>
    {"".join(f"<div class='recommendation'>{i}. {rec}</div>" 
             for i, rec in enumerate(analysis['recommendations'], 1))}
    
    <h2>优化轨迹</h2>
    <p>包含 {len(report['optimization_trace'])} 次实验的完整记录</p>
    
</body>
</html>
        """
        return html

# 使用示例
async def demonstrate_experiment_design_agent():
    """演示完整的实验设计Agent"""
    # 定义参数空间
    parameters = [
        Parameter(
            name="learning_rate",
            param_type=ParameterType.CONTINUOUS,
            bounds=(1e-5, 1e-1),
            log_scale=True,
            description="学习率"
        ),
        Parameter(
            name="batch_size",
            param_type=ParameterType.DISCRETE,
            values=[16, 32, 64, 128],
            description="批次大小"
        ),
        Parameter(
            name="dropout_rate",
            param_type=ParameterType.CONTINUOUS,
            bounds=(0.0, 0.5),
            description="Dropout率"
        )
    ]
    
    param_space = ParameterSpace(parameters)
    
    # 定义目标函数 (模拟机器学习模型训练)
    def ml_objective_function(learning_rate: float, batch_size: int, dropout_rate: float) -> float:
        """模拟机器学习训练的准确率"""
        # 模拟训练时间
        time.sleep(0.3)
        
        # 模拟一个复杂的响应面
        # 最佳点大约在 learning_rate=0.001, batch_size=64, dropout_rate=0.2
        lr_optimal = 0.001
        batch_optimal = 64
        dropout_optimal = 0.2
        
        # 计算距离最优点的距离
        lr_distance = abs(np.log10(learning_rate) - np.log10(lr_optimal))
        batch_distance = abs(batch_size - batch_optimal) / batch_optimal
        dropout_distance = abs(dropout_rate - dropout_optimal) / dropout_optimal
        
        # 综合距离
        total_distance = np.sqrt(lr_distance**2 + batch_distance**2 + dropout_distance**2)
        
        # 模拟准确率 (距离越小,准确率越高)
        accuracy = 0.95 * np.exp(-total_distance) + np.random.normal(0, 0.02)
        
        return accuracy
    
    # 创建Agent配置
    agent_config = {
        'max_iterations': 20,
        'n_initial_points': 5,
        'max_concurrent': 3,
        'acquisition_function': 'expected_improvement',
        'early_stopping_patience': 8,
        'early_stopping_threshold': 0.001,
        'save_results': True,
        'generate_visualizations': True
    }
    
    # 创建实验设计Agent
    agent = ExperimentDesignAgent(
        parameter_space=param_space,
        objective_function=ml_objective_function,
        config=agent_config
    )
    
    print("启动实验设计Agent...")
    print("=" * 80)
    
    # 运行优化
    optimization_report = await agent.run_optimization()
    
    print("\n" + "=" * 80)
    print("优化完成!")
    print("=" * 80)
    
    # 显示关键结果
    summary = optimization_report['optimization_summary']
    print(f"\n优化摘要:")
    print(f"  总迭代次数: {summary['total_iterations']}")
    print(f"  最佳结果: {summary['best_result']:.6f}")
    print(f"  收敛状态: {summary['convergence_status']}")
    
    print(f"\n最佳配置:")
    for param, value in summary['best_configuration'].items():
        print(f"  {param}: {value}")
    
    # 显示建议
    recommendations = optimization_report['detailed_analysis']['recommendations']
    print(f"\n优化建议:")
    for i, recommendation in enumerate(recommendations, 1):
        print(f"  {i}. {recommendation}")
    
    # 导出报告
    markdown_report = agent.export_report(optimization_report, 'markdown')
    html_report = agent.export_report(optimization_report, 'html')
    
    # 保存报告
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    with open(f"experiment_report_{timestamp}.md", 'w', encoding='utf-8') as f:
        f.write(markdown_report)
    print(f"\nMarkdown报告已保存到: experiment_report_{timestamp}.md")
    
    with open(f"experiment_report_{timestamp}.html", 'w', encoding='utf-8') as f:
        f.write(html_report)
    print(f"HTML报告已保存到: experiment_report_{timestamp}.html")
    
    return optimization_report

if __name__ == "__main__":
    asyncio.run(demonstrate_experiment_design_agent())

最佳实践与常见陷阱

生产环境部署最佳实践

  1. 目标函数安全性:确保目标函数的执行是安全的,实现超时控制和资源限制,防止恶意或错误的目标函数导致系统崩溃。
import signal
import resource
from contextlib import contextmanager
import multiprocessing

class SafeExperimentExecutor:
    """安全的实验执行器"""
    
    def __init__(self, max_memory_mb: int = 1024, max_time_seconds: int = 300):
        self.max_memory_mb = max_memory_mb
        self.max_time_seconds = max_time_seconds
    
    @contextmanager
    def resource_limits(self):
        """设置资源限制"""
        # 设置内存限制
        resource.setrlimit(resource.RLIMIT_AS, 
                          (self.max_memory_mb * 1024 * 1024, 
                           self.max_memory_mb * 1024 * 1024))
        
        # 设置CPU时间限制
        resource.setrlimit(resource.RLIMIT_CPU, 
                          (self.max_time_seconds, 
                           self.max_time_seconds + 5))
        
        try:
            yield
        except MemoryError:
            raise Exception("Experiment exceeded memory limit")
        except Exception as e:
            raise Exception(f"Experiment failed: {str(e)}")
    
    def execute_safely(self, objective_function: Callable, **kwargs) -> float:
        """安全地执行目标函数"""
        def target(queue, func, kwargs):
            try:
                with self.resource_limits():
                    result = func(**kwargs)
                    queue.put(('success', result))
            except Exception as e:
                queue.put(('error', str(e)))
        
        # 使用子进程执行
        queue = multiprocessing.Queue()
        process = multiprocessing.Process(
            target=target,
            args=(queue, objective_function, kwargs)
        )
        
        process.start()
        process.join(timeout=self.max_time_seconds)
        
        if process.is_alive():
            process.terminate()
            process.join()
            raise Exception(f"Experiment exceeded time limit of {self.max_time_seconds} seconds")
        
        if not queue.empty():
            status, result = queue.get()
            if status == 'success':
                return result
            else:
                raise Exception(f"Experiment failed: {result}")
        else:
            raise Exception("Experiment failed without error message")
  1. 实验结果持久化:实现可靠的结果存储机制,防止数据丢失。使用数据库或文件系统存储实验结果,并定期备份。
import sqlite3
import json
import os
from datetime import datetime
from typing import Optional

class ExperimentDatabase:
    """实验结果数据库"""
    
    def __init__(self, db_path: str = "experiments.db"):
        self.db_path = db_path
        self._init_database()
    
    def _init_database(self):
        """初始化数据库"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS experiments (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                experiment_id TEXT UNIQUE NOT NULL,
                configuration TEXT NOT NULL,
                result REAL NOT NULL,
                status TEXT NOT NULL,
                error_message TEXT,
                start_time TIMESTAMP,
                end_time TIMESTAMP,
                metadata TEXT,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)
        
        cursor.execute("""
            CREATE INDEX IF NOT EXISTS idx_experiment_id 
            ON experiments(experiment_id)
        """)
        
        cursor.execute("""
            CREATE INDEX IF NOT EXISTS idx_result 
            ON experiments(result)
        """)
        
        conn.commit()
        conn.close()
    
    def save_experiment(self, 
                       experiment_id: str,
                       configuration: Dict,
                       result: float,
                       status: str = 'completed',
                       error_message: Optional[str] = None,
                       metadata: Optional[Dict] = None):
        """保存实验结果"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        try:
            cursor.execute("""
                INSERT OR REPLACE INTO experiments 
                (experiment_id, configuration, result, status, error_message, metadata)
                VALUES (?, ?, ?, ?, ?, ?)
            """, (
                experiment_id,
                json.dumps(configuration),
                result,
                status,
                error_message,
                json.dumps(metadata or {})
            ))
            
            conn.commit()
        except Exception as e:
            conn.rollback()
            raise e
        finally:
            conn.close()
    
    def get_experiment(self, experiment_id: str) -> Optional[Dict]:
        """获取实验结果"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute("""
            SELECT * FROM experiments WHERE experiment_id = ?
        """, (experiment_id,))
        
        row = cursor.fetchone()
        conn.close()
        
        if row:
            return {
                'id': row[0],
                'experiment_id': row[1],
                'configuration': json.loads(row[2]),
                'result': row[3],
                'status': row[4],
                'error_message': row[5],
                'metadata': json.loads(row[7]) if row[7] else {}
            }
        return None
    
    def get_best_experiments(self, limit: int = 10) -> List[Dict]:
        """获取最佳实验"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute("""
            SELECT * FROM experiments 
            WHERE status = 'completed'
            ORDER BY result DESC
            LIMIT ?
        """, (limit,))
        
        rows = cursor.fetchall()
        conn.close()
        
        return [
            {
                'experiment_id': row[1],
                'configuration': json.loads(row[2]),
                'result': row[3],
                'metadata': json.loads(row[7]) if row[7] else {}
            }
            for row in rows
        ]
    
    def export_to_json(self, output_path: str):
        """导出为JSON"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute("SELECT * FROM experiments")
        rows = cursor.fetchall()
        conn.close()
        
        experiments = [
            {
                'experiment_id': row[1],
                'configuration': json.loads(row[2]),
                'result': row[3],
                'status': row[4],
                'error_message': row[5],
                'start_time': row[6],
                'end_time': row[7],
                'metadata': json.loads(row[8]) if row[8] else {}
            }
            for row in rows
        ]
        
        with open(output_path, 'w') as f:
            json.dump(experiments, f, indent=2)
  1. 分布式实验执行:对于大规模参数空间,实现分布式实验执行,利用多台机器并行运行实验。
from concurrent.futures import ProcessPoolExecutor, as_completed
import numpy as np

class DistributedExperimentRunner:
    """分布式实验运行器"""
    
    def __init__(self, max_workers: int = 4):
        self.max_workers = max_workers
    
    def run_experiments_distributed(self,
                                   configurations: List[Dict],
                                   objective_function: Callable,
                                   progress_callback: Optional[Callable] = None) -> List[Dict]:
        """分布式运行实验"""
        results = []
        
        with ProcessPoolExecutor(max_workers=self.max_workers) as executor:
            # 提交所有实验
            future_to_config = {
                executor.submit(objective_function, **config): config
                for config in configurations
            }
            
            # 收集结果
            for i, future in enumerate(as_completed(future_to_config), 1):
                config = future_to_config[future]
                try:
                    result = future.result()
                    results.append({
                        'configuration': config,
                        'result': result,
                        'status': 'success'
                    })
                except Exception as e:
                    results.append({
                        'configuration': config,
                        'result': None,
                        'status': 'error',
                        'error': str(e)
                    })
                
                # 进度回调
                if progress_callback:
                    progress_callback(i, len(configurations), results[-1])
        
        return results
    
    def run_parameter_sweep(self,
                           parameter_space: ParameterSpace,
                           objective_function: Callable,
                           n_samples: int = 100) -> List[Dict]:
        """运行参数扫描"""
        # 生成随机采样
        configurations = [
            parameter_space.sample_configuration()
            for _ in range(n_samples)
        ]
        
        return self.run_experiments_distributed(configurations, objective_function)

常见陷阱及解决方案

  1. 过度拟合优化历史:贝叶斯优化器可能过度拟合有限的实验历史,导致陷入局部最优。解决方案是调整采集函数的探索参数,定期重置优化器,或使用不同的采集函数。

  2. 目标函数噪声:噪声目标函数会导致优化不稳定。解决方案是使用重复实验取平均值,或使用专门针对噪声优化的方法(如熵搜索)。

  3. 参数尺度问题:不同参数的尺度差异会影响优化效果。解决方案是对参数进行归一化,使用对数刻度处理大范围参数。

  4. 过早收敛:优化过程可能过早收敛到次优解。解决方案是增加初始随机点数量,调整采集函数参数,或实施重启策略。

  5. 计算资源不足:大规模参数空间需要大量计算资源。解决方案是实现智能资源分配,优先探索有希望的区域,使用代理模型减少目标函数调用。

性能优化考虑

并行化优化策略

实现高效的并行化优化策略:

import asyncio
from typing import List, Dict, Tuple, Optional
import numpy as np

class ParallelBayesianOptimizer:
    """并行贝叶斯优化器"""
    
    def __init__(self, 
                 parameter_space: ParameterSpace,
                 n_parallel: int = 4,
                 batch_size: int = 4):
        self.parameter_space = parameter_space
        self.n_parallel = n_parallel
        self.batch_size = batch_size
        self.optimizer = BayesianOptimizer(parameter_space)
    
    async def optimize_parallel(self,
                               objective_function: Callable,
                               max_iterations: int = 50) -> Dict:
        """并行优化"""
        results = []
        
        for iteration in range(max_iterations):
            # 生成一批候选配置
            batch_configs = self._generate_batch()
            
            # 并行执行实验
            batch_results = await self._execute_batch(objective_function, batch_configs)
            
            # 更新优化器
            for config, result in zip(batch_configs, batch_results):
                self.optimizer.register(config, result)
                results.append({
                    'iteration': iteration,
                    'config': config,
                    'result': result
                })
            
            # 检查收敛
            if self._check_convergence(results):
                break
        
        # 返回最佳结果
        best_config, best_value = self.optimizer.get_best()
        
        return {
            'best_config': best_config,
            'best_value': best_value,
            'total_iterations': len(results),
            'optimization_trace': results
        }
    
    def _generate_batch(self) -> List[Dict]:
        """生成一批候选配置"""
        batch = []
        
        for _ in range(self.batch_size):
            config = self.optimizer.suggest_next()
            batch.append(config)
        
        return batch
    
    async def _execute_batch(self, 
                            objective_function: Callable,
                            batch_configs: List[Dict]) -> List[float]:
        """并行执行一批实验"""
        tasks = [
            self._execute_single_experiment(objective_function, config)
            for config in batch_configs
        ]
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # 处理异常
        processed_results = []
        for result in results:
            if isinstance(result, Exception):
                # 用一个很差的值替代失败的实验
                processed_results.append(float('-inf'))
            else:
                processed_results.append(result)
        
        return processed_results
    
    async def _execute_single_experiment(self,
                                        objective_function: Callable,
                                        config: Dict) -> float:
        """执行单个实验"""
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(None, objective_function, **config)
    
    def _check_convergence(self, results: List[Dict]) -> bool:
        """检查收敛性"""
        if len(results) < 10:
            return False
        
        recent_results = [r['result'] for r in results[-10:]]
        recent_best = max(recent_results)
        overall_best = max(r['result'] for r in results)
        
        improvement = overall_best - recent_best
        return improvement < 0.001

内存优化策略

处理大规模参数空间时的内存优化:

import pickle
import os
import tempfile
from typing import Iterator, Dict

class MemoryEfficientExperimentTracker:
    """内存高效的实验跟踪器"""
    
    def __init__(self, max_in_memory: int = 1000, cache_dir: str = "experiment_cache"):
        self.max_in_memory = max_in_memory
        self.cache_dir = cache_dir
        os.makedirs(cache_dir, exist_ok=True)
        
        self.in_memory_experiments = []
        self.disk_cache_files = []
        self.total_experiments = 0
    
    def add_experiment(self, config: Dict, result: float, metadata: Dict = None):
        """添加实验结果"""
        experiment = {
            'config': config,
            'result': result,
            'metadata': metadata or {},
            'index': self.total_experiments
        }
        
        if len(self.in_memory_experiments) >= self.max_in_memory:
            # 将最早的结果写入磁盘
            oldest = self.in_memory_experiments.pop(0)
            self._write_to_disk(oldest)
        
        self.in_memory_experiments.append(experiment)
        self.total_experiments += 1
    
    def _write_to_disk(self, experiment: Dict):
        """将实验写入磁盘"""
        cache_file = os.path.join(self.cache_dir, f"exp_{experiment['index']}.pkl")
        with open(cache_file, 'wb') as f:
            pickle.dump(experiment, f)
        self.disk_cache_files.append(cache_file)
    
    def get_all_experiments(self) -> Iterator[Dict]:
        """获取所有实验(内存 + 磁盘)"""
        # 首先返回内存中的实验
        for experiment in self.in_memory_experiments:
            yield experiment
        
        # 然后返回磁盘上的实验
        for cache_file in self.disk_cache_files:
            with open(cache_file, 'rb') as f:
                yield pickle.load(f)
    
    def get_best_result(self) -> Tuple[Dict, float]:
        """获取最佳结果"""
        best_result = float('-inf')
        best_config = None
        
        for experiment in self.get_all_experiments():
            if experiment['result'] > best_result:
                best_result = experiment['result']
                best_config = experiment['config']
        
        return best_config, best_result
    
    def clear_cache(self):
        """清理缓存"""
        # 清理磁盘缓存
        for cache_file in self.disk_cache_files:
            if os.path.exists(cache_file):
                os.remove(cache_file)
        
        self.disk_cache_files.clear()
        self.in_memory_experiments.clear()

成本优化策略

  1. 智能预算分配:根据前期结果动态调整后续实验的资源分配。

  2. 早停策略:实现基于置信度的早停,避免在明显次优的配置上浪费资源。

  3. 代理模型:使用轻量级代理模型筛选候选配置,减少昂贵的目标函数调用。

class BudgetAwareOptimizer:
    """预算感知的优化器"""
    
    def __init__(self, 
                 parameter_space: ParameterSpace,
                 total_budget: float,
                 cost_per_experiment: float = 1.0):
        self.parameter_space = parameter_space
        self.total_budget = total_budget
        self.cost_per_experiment = cost_per_experiment
        self.remaining_budget = total_budget
        self.experiments_completed = 0
        self.optimizer = BayesianOptimizer(parameter_space)
    
    def optimize_with_budget(self,
                            objective_function: Callable,
                            cost_function: Optional[Callable] = None) -> Dict:
        """在预算约束下优化"""
        results = []
        
        while self._can_afford_experiment():
            # 建议配置
            config = self.optimizer.suggest_next()
            
            # 估算成本
            estimated_cost = cost_function(**config) if cost_function else self.cost_per_experiment
            
            if not self._can_afford_cost(estimated_cost):
                break
            
            # 执行实验
            try:
                result = objective_function(**config)
                actual_cost = estimated_cost
                
                self.optimizer.register(config, result)
                self.remaining_budget -= actual_cost
                self.experiments_completed += 1
                
                results.append({
                    'config': config,
                    'result': result,
                    'cost': actual_cost,
                    'remaining_budget': self.remaining_budget
                })
                
            except Exception as e:
                print(f"Experiment failed: {e}")
                continue
        
        best_config, best_value = self.optimizer.get_best()
        
        return {
            'best_config': best_config,
            'best_value': best_value,
            'experiments_completed': self.experiments_completed,
            'total_cost': self.total_budget - self.remaining_budget,
            'optimization_trace': results
        }
    
    def _can_afford_experiment(self) -> bool:
        """检查是否能够承担实验"""
        return self.remaining_budget >= self.cost_per_experiment
    
    def _can_afford_cost(self, cost: float) -> bool:
        """检查是否能够承担特定成本"""
        return self.remaining_budget >= cost

参考资源

贝叶斯优化相关资源

统计分析资源

可视化资源

相关研究论文

  1. "Practical Bayesian Optimization of Machine Learning Algorithms"

    • 贝叶斯优化的经典论文
    • 理论基础和实践指导
  2. "Taking the Human Out of the Loop: A Review of Bayesian Optimization"

    • 贝叶叶优化综述
    • 涵盖算法和应用
  3. "Multi-Task Bayesian Optimization"

    • 多任务贝叶斯优化
    • 相关任务的联合优化
  4. "Bayesian Optimization for Machine Learning: A Practical Guidebook"

    • 机器学习中的贝叶斯优化
    • 实践经验和最佳实践

通过本文的深入探讨,我们了解了如何构建一个完整的实验设计Agent系统,从参数空间建模到贝叶斯优化,再到结果分析和可视化的完整流程。这个系统能够显著提高实验效率,减少实验次数和成本,并为研究者提供深度的洞察和可操作的建议。随着人工智能和自动化技术的发展,实验设计Agent将在科学研究和工程实践中发挥越来越重要的作用。