1. 系统整体架构
该系统是一个基于深度强化学习的体能训练权重优化平台,采用客户端-服务器架构,通过FastAPI提供RESTful API服务。系统核心功能分为两个主要模块:
1.1 系统组件
-
FastAPI Web框架:提供HTTP API接口
-
PyTorch深度学习框架:实现神经网络和强化学习算法
-
CUDA加速:利用GPU进行高效计算
-
日志系统:记录训练和优化过程
-
模型持久化:保存和加载训练好的模型
1.2 核心工作流程
用户数据输入 → 状态编码 → 策略网络决策 → 环境交互 → 奖励计算 → 网络更新 → 权重矩阵优化 → 结果输出
2. 数据模型设计
2.1 体能素质枚举(FitnessQuality)
定义了23种体能素质类型,包括:
-
基础素质:强度(Q001)、耐力(Q002)、柔韧性(Q003)、平衡(Q004)等
-
专项素质:上肢拉力(Q014)、握力(Q016)、下肢爆发力(Q021)等
-
扩展素质:有氧耐力(Q012)、灵敏性(Q023)等
2.2 课目编码(SUBJECTS)
93个训练课目,每个课目对应一个唯一的编码(C001-C093),代表不同的体能训练项目。
2.3 动作编码(ACTIONS)
44个训练动作,每个动作对应一个唯一的编码(XL001-XL044),代表具体的训练动作。
2.4 状态表示(State类)
状态是系统的核心数据结构,包含:
@dataclass
class State:
user_id: str # 用户标识
age: int # 年龄
gender: str # 性别(M/F)
fitness_level: str # 健身水平(beginner/intermediate/advanced)
current_qualities: List[float] # 当前素质水平(23维向量)
target_subjects: List[str] # 目标课目列表
subject_scores: List[float] # 课目成绩(93维向量)
performance_metrics: Dict[str, float] # 性能指标字典
状态编码为216维向量:
-
1维:归一化年龄
-
1维:性别编码(1.0=男, 0.0=女)
-
1维:健身水平编码(0.0=初级, 0.5=中级, 1.0=高级)
-
23维:当前素质水平
-
93维:课目成绩
-
93维:目标课目编码(1.0=目标, 0.0=非目标)
-
4维:性能指标(近期进步、受伤风险、训练效率、一致性)
2.5 权重矩阵
系统维护两个核心权重矩阵:
-
CQ矩阵:93×23的课目-素质权重矩阵
-
CQ[i,j]
表示课目i对素质j的依赖权重 -
每行归一化,确保∑CQ[i,j]=1
-
-
QX矩阵:23×44的素质-动作权重矩阵
-
QX[j,k]
表示素质j对动作k的依赖权重 -
每行归一化,确保∑QX[j,k]=1
-
3. 环境设计(TrainingEnvironment)
3.1 环境功能
环境模拟了权重矩阵调整的动态过程,是智能体与系统交互的核心组件。
3.2 核心方法
3.2.1 初始化
def __init__(self):
self.cq_matrix = torch.ones(NUM_SUBJECTS, NUM_QUALITIES, device=device) / NUM_QUALITIES
self.qx_matrix = torch.ones(NUM_QUALITIES, NUM_ACTIONS, device=device) / NUM_ACTIONS
初始化为均匀分布,确保每行和为1。
3.2.2 状态重置
def reset(self, initial_state: State) -> torch.Tensor:
self.current_state = initial_state
self.step_count = 0
# 保存初始矩阵用于计算变化
self.initial_cq_matrix = self.cq_matrix.clone()
self.initial_qx_matrix = self.qx_matrix.clone()
return initial_state.to_tensor()
3.2.3 动作执行
def step(self, action: torch.Tensor) -> Tuple[torch.Tensor, float, bool, Dict]:
# 1. 解析动作
action_idx = torch.argmax(action).item()
matrix_type = action_idx // (NUM_SUBJECTS * NUM_QUALITIES + NUM_QUALITIES * NUM_ACTIONS)
# 2. 调整权重矩阵
if matrix_type == 0: # CQ矩阵
position_idx = action_idx % (NUM_SUBJECTS * NUM_QUALITIES)
row = position_idx // NUM_QUALITIES
col = position_idx % NUM_QUALITIES
adjustment = (torch.rand(1, device=device).item() - 0.5) * 0.1
self.cq_matrix[row, col] += adjustment
self.cq_matrix[row, col] = max(0.0, min(1.0, self.cq_matrix[row, col]))
self.cq_matrix[row] = self.cq_matrix[row] / self.cq_matrix[row].sum()
else: # QX矩阵
# 类似逻辑调整QX矩阵
# 3. 计算奖励
reward = self._calculate_reward()
# 4. 创建新状态
new_state = self._create_new_state()
# 5. 检查终止条件
done = self.step_count >= self.max_steps
return new_state.to_tensor(), reward, done, {}
3.2.4 奖励计算
def _calculate_reward(self) -> float:
# 1. 计算目标课目分数
target_scores = {}
for subject in self.current_state.target_subjects:
subject_idx = SUBJECTS.index(subject)
subject_score = 0.0
for q_idx, quality_score in enumerate(self.current_state.current_qualities):
subject_score += self.cq_matrix[subject_idx, q_idx] * quality_score
target_scores[subject] = subject_score
# 2. 计算平均目标分数
avg_target_score = sum(target_scores.values()) / len(target_scores) if target_scores else 0.0
# 3. 计算矩阵变化惩罚
cq_diff = torch.norm(self.cq_matrix - self.initial_cq_matrix).item()
qx_diff = torch.norm(self.qx_matrix - self.initial_qx_matrix).item()
# 4. 组合奖励
reward = avg_target_score - 0.01 * (cq_diff + qx_diff)
return reward
奖励函数设计目标:
-
最大化目标课目分数
-
最小化权重矩阵的剧烈变化
-
平衡探索与利用
4. 神经网络架构(EnhancedActorCritic)
4.1 整体架构
采用多分支输出的Actor-Critic架构,包含:
-
特征提取网络:深层残差网络
-
共享处理层:进一步提取高级特征
-
Actor分支:输出动作概率分布
-
Critic分支:输出状态价值估计
-
CQ预测分支:预测课目-素质权重矩阵
-
QX预测分支:预测素质-动作权重矩阵
4.2 残差块设计(ResidualBlock)
class ResidualBlock(nn.Module):
def __init__(self, in_features: int, out_features: int, dropout_rate: float = 0.1):
super().__init__()
self.linear1 = nn.Linear(in_features, out_features)
self.linear2 = nn.Linear(out_features, out_features)
self.layer_norm1 = nn.LayerNorm(out_features)
self.layer_norm2 = nn.LayerNorm(out_features)
self.dropout = nn.Dropout(dropout_rate)
self.projection = nn.Linear(in_features, out_features) if in_features != out_features else nn.Identity()
def forward(self, x: torch.Tensor) -> torch.Tensor:
identity = self.projection(x)
out = self.linear1(x)
out = self.layer_norm1(out)
out = F.gelu(out)
out = self.dropout(out)
out = self.linear2(out)
out = self.layer_norm2(out)
out = self.dropout(out)
out += identity
return F.gelu(out)
关键特性:
-
使用GELU激活函数(比ReLU更平滑)
-
层归一化(LayerNorm)稳定训练
-
残差连接缓解梯度消失
-
Dropout防止过拟合
4.3 网络结构详情
def __init__(self, state_dim: int, action_dim: int):
# 特征提取网络(5层残差块)
self.feature_net = nn.Sequential(
nn.Linear(state_dim, 1024),
nn.LayerNorm(1024),
nn.GELU(),
nn.Dropout(0.1),
ResidualBlock(1024, 512),
ResidualBlock(512, 512),
ResidualBlock(512, 256),
ResidualBlock(256, 256),
ResidualBlock(256, 256),
)
# 共享处理层
self.shared_net = nn.Sequential(
nn.Linear(256, 256),
nn.LayerNorm(256),
nn.GELU(),
nn.Dropout(0.1),
ResidualBlock(256, 256)
)
# Actor网络(策略)
self.actor_net = nn.Sequential(
nn.Linear(256, 256),
nn.LayerNorm(256),
nn.GELU(),
nn.Dropout(0.1),
nn.Linear(256, 256),
nn.LayerNorm(256),
nn.GELU(),
nn.Dropout(0.1),
nn.Linear(256, 128),
nn.LayerNorm(128),
nn.GELU(),
nn.Dropout(0.1),
nn.Linear(128, action_dim),
nn.Softmax(dim=-1)
)
# Critic网络(价值)
self.critic_net = nn.Sequential(
nn.Linear(256, 256),
nn.LayerNorm(256),
nn.GELU(),
nn.Dropout(0.1),
nn.Linear(256, 128),
nn.LayerNorm(128),
nn.GELU(),
nn.Dropout(0.1),
nn.Linear(128, 1)
)
# CQ预测分支
self.cq_predictor = nn.Sequential(
nn.Linear(256, 512),
nn.LayerNorm(512),
nn.GELU(),
nn.Dropout(0.1),
nn.Linear(512, 256),
nn.LayerNorm(256),
nn.GELU(),
nn.Dropout(0.1),
nn.Linear(256, NUM_SUBJECTS * NUM_QUALITIES)
)
# QX预测分支
self.qx_predictor = nn.Sequential(
nn.Linear(256, 512),
nn.LayerNorm(512),
nn.GELU(),
nn.Dropout(0.1),
nn.Linear(512, 256),
nn.LayerNorm(256),
nn.GELU(),
nn.Dropout(0.1),
nn.Linear(256, NUM_QUALITIES * NUM_ACTIONS)
)
4.4 前向传播
def forward(self, state: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]:
# 确保输入在正确设备上
state = state.to(self.device)
# 特征提取
features = self.feature_net(state)
# 共享处理
shared_features = self.shared_net(features)
# 多分支输出
action_probs = self.actor_net(shared_features)
state_value = self.critic_net(shared_features)
# CQ矩阵预测
cq_flat = self.cq_predictor(shared_features)
batch_size = shared_features.shape[0]
cq_matrix = cq_flat.view(batch_size, NUM_SUBJECTS, NUM_QUALITIES)
cq_matrix = F.softmax(cq_matrix, dim=-1) # 沿素质维度归一化
# QX矩阵预测
qx_flat = self.qx_predictor(shared_features)
qx_matrix = qx_flat.view(batch_size, NUM_QUALITIES, NUM_ACTIONS)
qx_matrix = F.softmax(qx_matrix, dim=-1) # 沿动作维度归一化
return action_probs, state_value, cq_matrix, qx_matrix
5. PPO算法实现(EnhancedPPOAgent)
5.1 算法概述
PPO(Proximal Policy Optimization)是一种先进的策略梯度算法,通过限制策略更新幅度来提高训练稳定性。本系统实现了增强版的PPO,包含多个优化技术。
5.2 核心组件
5.2.1 网络初始化
def __init__(self, state_dim: int, action_dim: int):
# 主网络和目标网络
self.actor_critic = EnhancedActorCritic(state_dim, action_dim)
self.target_actor_critic = EnhancedActorCritic(state_dim, action_dim)
self.target_actor_critic.load_state_dict(self.actor_critic.state_dict())
# 多优化器设计
self.actor_optimizer = torch.optim.AdamW(
self.actor_critic.actor_net.parameters(),
lr=3e-4,
weight_decay=1e-4,
betas=(0.9, 0.999)
)
self.critic_optimizer = torch.optim.AdamW(
self.actor_critic.critic_net.parameters(),
lr=1e-3, # 更高的学习率
weight_decay=1e-4,
betas=(0.9, 0.999)
)
self.cq_optimizer = torch.optim.AdamW(
self.actor_critic.cq_predictor.parameters(),
lr=5e-4,
weight_decay=1e-4,
betas=(0.9, 0.999)
)
self.qx_optimizer = torch.optim.AdamW(
self.actor_critic.qx_predictor.parameters(),
lr=5e-4,
weight_decay=1e-4,
betas=(0.9, 0.999)
)
5.2.2 动作选择
def select_action(self, state: torch.Tensor, training: bool = True) -> Tuple[int, float, torch.Tensor, torch.Tensor]:
with torch.no_grad():
action_probs, _, cq_prediction, qx_prediction = self.actor_critic(state)
action_probs = torch.clamp(action_probs, min=1e-8, max=1.0)
action_probs = action_probs / action_probs.sum()
if training and random.random() < self.epsilon:
# ε-贪心探索
action = random.randint(0, self.action_dim - 1)
log_prob = torch.log(action_probs[0][action])
return action, log_prob.item(), cq_prediction, qx_prediction
else:
dist = Categorical(action_probs)
action = dist.sample()
log_prob = dist.log_prob(action)
return action.item(), log_prob.item(), cq_prediction, qx_prediction
5.2.3 目标网络更新
def update_target_network(self):
for target_param, param in zip(
self.target_actor_critic.parameters(),
self.actor_critic.parameters()
):
target_param.data.copy_(
target_param.data * (1.0 - self.target_update_rate) +
param.data * self.target_update_rate
)
5.3 训练流程(train_optimized方法)
def train_optimized(self, env, training_data, episodes=1000, max_steps_per_episode=500,
learning_rate_actor=3e-4, learning_rate_critic=1e-3,
gamma=0.99, clip_ratio=0.2, batch_size=64,
shuffle_data=True, early_stop_patience=50, use_amp=True):
# 1. 混合精度训练初始化
scaler = torch.cuda.amp.GradScaler(enabled=use_amp and device.type == 'cuda')
# 2. 获取实际状态维度
sample = training_data[0]
initial_state = State(...)
state_tensor = env.reset(initial_state)
actual_state_dim = state_tensor.shape[0]
# 3. 重新初始化网络以匹配状态维度
self.actor_critic = EnhancedActorCritic(actual_state_dim, self.action_dim).to(device)
# ... 重新初始化优化器
# 4. 训练循环
for episode in range(episodes):
# 4.1 选择训练样本
sample_idx = episode % len(training_data)
sample = training_data[sample_idx]
# 4.2 创建初始状态
initial_state = State(...)
state = env.reset(initial_state)
# 4.3 收集轨迹数据
states, actions, rewards, dones, log_probs, values, cq_predictions, qx_predictions = [], [], [], [], [], [], [], []
for step in range(max_steps_per_episode):
# 选择动作
action, log_prob, cq_pred, qx_pred = self.select_action(state, training=True)
# 执行动作
next_state, reward, done, _ = env.step(torch.zeros(self.action_dim, device=device))
# 获取价值估计
with torch.no_grad():
_, value, _, _ = self.actor_critic(state)
# 存储转换
states.append(state)
actions.append(action)
rewards.append(reward)
dones.append(done)
log_probs.append(log_prob)
values.append(value.item())
cq_predictions.append(cq_pred)
qx_predictions.append(qx_pred)
# 更新状态
state = next_state
if done:
break
# 4.4 计算回报和优势
with torch.no_grad():
_, last_value, _, _ = self.actor_critic(state)
returns, advantages = self._compute_returns_and_advantages(
rewards, values, dones, self.gamma, last_value.item()
)
# 4.5 转换为张量并移动到设备
states = torch.stack(states).to(device)
actions = torch.tensor(actions, dtype=torch.long, device=device)
returns = torch.tensor(returns, dtype=torch.float32, device=device)
advantages = torch.tensor(advantages, dtype=torch.float32, device=device)
log_probs = torch.tensor(log_probs, dtype=torch.float32, device=device)
# 标准化优势
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
# 4.6 多次更新(PPO核心)
for _ in range(5):
with torch.cuda.amp.autocast(enabled=use_amp and device.type == 'cuda'):
# 获取当前策略
action_probs, values, cq_pred, qx_pred = self.actor_critic(states)
dist = Categorical(action_probs)
new_log_probs = dist.log_prob(actions)
entropy = dist.entropy().mean()
# 计算比率
ratio = torch.exp(new_log_probs - log_probs)
# 计算PPO损失
surr1 = ratio * advantages
surr2 = torch.clamp(ratio, 1.0 - self.clip_ratio, 1.0 + self.clip_ratio) * advantages
actor_loss = -torch.min(surr1, surr2).mean()
# 计算价值损失
value_loss = F.mse_loss(values.squeeze(), returns)
# 计算CQ和QX预测损失
true_cq = env.cq_matrix.unsqueeze(0).repeat(len(states), 1, 1)
true_qx = env.qx_matrix.unsqueeze(0).repeat(len(states), 1, 1)
cq_loss = F.l1_loss(cq_pred, true_cq) * 100
qx_loss = F.l1_loss(qx_pred, true_qx) * 100
# 组合损失
loss = actor_loss + self.value_loss_coef * value_loss - self.entropy_coef * entropy + \
self.cq_loss_coef * cq_loss + self.qx_loss_coef * qx_loss
# 反向传播
self.actor_optimizer.zero_grad()
self.critic_optimizer.zero_grad()
self.cq_optimizer.zero_grad()
self.qx_optimizer.zero_grad()
# 混合精度训练
scaler.scale(loss).backward()
# 梯度裁剪
if use_amp and device.type == 'cuda':
scaler.unscale_(self.actor_optimizer)
scaler.unscale_(self.critic_optimizer)
scaler.unscale_(self.cq_optimizer)
scaler.unscale_(self.qx_optimizer)
torch.nn.utils.clip_grad_norm_(self.actor_critic.parameters(), self.max_grad_norm)
# 更新优化器
scaler.step(self.actor_optimizer)
scaler.step(self.critic_optimizer)
scaler.step(self.cq_optimizer)
scaler.step(self.qx_optimizer)
# 更新缩放器
scaler.update()
# 4.7 更新统计和探索率
self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
self.update_target_network()
# 4.8 早停检查
if patience_counter >= early_stop_patience:
logger.info(f"Early stopping at episode {episode}")
break
# 5. 返回训练结果
return {...}
5.4 优势计算(GAE实现)
def _compute_returns_and_advantages(self, rewards, values, dones, gamma, next_value=None):
if self.use_gae:
# 使用GAE计算优势
returns = []
advantages = []
if next_value is None:
next_value = 0
# 计算TD误差
td_errors = []
for i in range(len(rewards)):
if i == len(rewards) - 1:
next_v = next_value
else:
next_v = values[i + 1]
td_error = rewards[i] + gamma * next_v * (1 - dones[i]) - values[i]
td_errors.append(td_error)
# 计算GAE
gae = 0
for i in reversed(range(len(rewards))):
gae = td_errors[i] + gamma * self.gae_lambda * (1 - dones[i]) * gae
advantages.insert(0, gae)
returns.insert(0, gae + values[i])
return returns, advantages
else:
# 标准TD方法
returns = []
advantages = []
R = 0
if not dones[-1]:
R = values[-1]
for i in reversed(range(len(rewards))):
R = rewards[i] + gamma * R * (1 - dones[i])
returns.insert(0, R)
if i < len(rewards) - 1:
td_error = rewards[i] + gamma * values[i + 1] * (1 - dones[i]) - values[i]
else:
td_error = rewards[i] - values[i]
advantages.insert(0, td_error)
return returns, advantages
5.5 准确率计算
def _calculate_cq_accuracy(self, predicted_cq, true_cq):
# 使用余弦相似度作为准确率指标
predicted_flat = predicted_cq.view(predicted_cq.size(0), -1)
true_flat = true_cq.view(true_cq.size(0), -1)
# 计算余弦相似度
cos_sim = F.cosine_similarity(predicted_flat, true_flat, dim=1)
# 返回平均相似度
return cos_sim.mean().item()
def _calculate_qx_accuracy(self, predicted_qx, true_qx):
# 类似逻辑计算QX准确率
...
6. API接口设计
6.1 训练接口(/train)
@app.post("/train", response_model=TrainingResponse)
async def train_model(request: TrainingRequest, background_tasks: BackgroundTasks):
"""
训练模型接口
参数:
- episodes: 训练回合数
- max_steps_per_episode: 每回合最大步数
- learning_rate: 学习率
- gamma: 折扣因子
- clip_ratio: PPO裁剪比率
- training_data: 训练数据集
- batch_size: 训练批次大小
- shuffle_data: 是否打乱训练数据顺序
- early_stop_patience: 早停耐心值
返回:
- success: 训练是否成功
- model_path: 模型保存路径
- episodes_completed: 完成的训练回合数
- average_reward: 平均奖励
- training_time_seconds: 训练耗时(秒)
- message: 训练结果消息
"""
global agent, env
try:
# 记录开始时间
start_time = time.time()
# 训练模型
result = agent.train_optimized(
env=env,
training_data=request.training_data,
episodes=request.episodes,
max_steps_per_episode=request.max_steps_per_episode,
learning_rate_actor=request.learning_rate,
learning_rate_critic=request.learning_rate * 3,
gamma=request.gamma,
clip_ratio=request.clip_ratio,
batch_size=request.batch_size,
shuffle_data=request.shuffle_data,
early_stop_patience=request.early_stop_patience,
use_amp=True
)
# 计算训练时间
training_time = time.time() - start_time
result["training_time_seconds"] = training_time
# 保存模型
torch.save(agent.actor_critic.state_dict(), request.save_path)
# 返回训练结果
return TrainingResponse(...)
except Exception as e:
logger.error(f"Error in training: {str(e)}")
raise HTTPException(status_code=500, detail=f"Training failed: {str(e)}")
6.2 优化接口(/optimize)
@app.post("/optimize", response_model=WeightOptimizationResponse)
async def optimize_weights(request: WeightOptimizationRequest):
"""
权重优化接口
参数:
- user_id: 用户ID
- age: 年龄
- gender: 性别
- fitness_level: 健身水平
- current_qualities: 当前素质水平
- target_subjects: 目标课目列表
- optimization_steps: 优化步数
返回:
- user_id: 用户ID
- optimization_timestamp: 优化时间戳
- cq_matrix: 课目-素质权重矩阵
- qx_matrix: 素质-动作权重矩阵
- performance_metrics: 性能指标
- target_achievement_scores: 目标课目达成分数
"""
global agent, env
try:
# 创建状态
state = State(
user_id=request.user_id,
age=request.age,
gender=request.gender,
fitness_level=request.fitness_level,
current_qualities=request.current_qualities,
target_subjects=request.target_subjects,
subject_scores=[0.5] * NUM_SUBJECTS,
performance_metrics={
"recent_improvement": 0.1,
"injury_risk": 0.05,
"training_efficiency": 0.85,
"consistency": 0.75
}
)
# 重置环境
state_tensor = env.reset(state)
# 优化权重
for step in range(request.optimization_steps):
# 选择动作
action, _, cq_pred, qx_pred = agent.select_action(state_tensor, training=False)
# 执行动作
next_state_tensor, _, done, _ = env.step(torch.zeros(agent.action_dim, device=device))
# 更新状态
state_tensor = next_state_tensor
if done:
break
# 获取优化后的权重矩阵
cq_matrix = env.cq_matrix
qx_matrix = env.qx_matrix
# 计算目标课目达成分数
target_achievement_scores = {}
for subject in request.target_subjects:
subject_idx = SUBJECTS.index(subject)
subject_score = 0.0
for q_idx, quality_score in enumerate(request.current_qualities):
subject_score += cq_matrix[subject_idx, q_idx].item() * quality_score
target_achievement_scores[subject] = subject_score
# 转换为原生类型
cq_matrix_list = convert_to_native_types(cq_matrix)
qx_matrix_list = convert_to_native_types(qx_matrix)
# 返回优化结果
return WeightOptimizationResponse(
user_id=request.user_id,
optimization_timestamp=datetime.now().isoformat(),
cq_matrix=cq_matrix_list,
qx_matrix=qx_matrix_list,
performance_metrics={
"recent_improvement": 0.1,
"injury_risk": 0.05,
"training_efficiency": 0.85,
"consistency": 0.75
},
target_achievement_scores=target_achievement_scores
)
except Exception as e:
logger.error(f"Error in weight optimization: {str(e)}")
raise HTTPException(status_code=500, detail=f"Weight optimization failed: {str(e)}")
7. 系统初始化与生命周期管理
7.1 应用生命周期
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动时初始化
global agent, env
# 初始化环境和智能体
env = TrainingEnvironment()
# 创建示例状态以获取状态维度
sample_state = State(...)
state_tensor = env.reset(sample_state)
state_dim = state_tensor.shape[0]
# 计算动作维度
action_dim = NUM_SUBJECTS * NUM_QUALITIES + NUM_QUALITIES * NUM_ACTIONS
# 初始化智能体
agent = EnhancedPPOAgent(state_dim, action_dim)
# 尝试加载预训练模型
model_path = "models6/enhanced_ppo_agent_best.pth"
if os.path.exists(model_path):
agent.load_model(model_path)
yield
# 关闭时清理
pass
# 应用 lifespan 事件
app.router.lifespan_context = lifespan
7.2 模型加载
def load_model(self, model_path: str):
"""加载模型"""
if os.path.exists(model_path):
# 加载模型到CPU,然后移动到当前设备
state_dict = torch.load(model_path, map_location='cpu')
self.actor_critic.load_state_dict(state_dict)
self.actor_critic.to(self.device)
self.target_actor_critic.load_state_dict(self.actor_critic.state_dict())
self.target_actor_critic.to(self.device)
self.model_loaded = True
logger.info(f"Model loaded from {model_path} to {self.device}")
return True
else:
logger.error(f"Model file not found: {model_path}")
return False
8. 辅助功能
8.1 数据类型转换
def convert_to_native_types(obj):
"""将NumPy数组和PyTorch张量转换为Python原生类型"""
if isinstance(obj, (int, float, str, bool)):
return obj
elif isinstance(obj, torch.Tensor):
if obj.ndim == 0: # 标量
return obj.item()
else: # 数组
return obj.cpu().detach().numpy().tolist()
elif isinstance(obj, np.ndarray):
if obj.ndim == 0: # 标量
return obj.item()
else: # 数组
return obj.tolist()
elif isinstance(obj, dict):
return {key: convert_to_native_types(value) for key, value in obj.items()}
elif isinstance(obj, (list, tuple)):
return [convert_to_native_types(item) for item in obj]
elif isinstance(obj, (np.number, np.bool_)):
return obj.item()
else:
return obj
8.2 GPU内存监控
def get_gpu_memory_usage():
"""获取GPU内存使用情况"""
if device.type == 'cuda':
return {
'allocated': torch.cuda.memory_allocated(device) / 1024 ** 3, # GB
'cached': torch.cuda.memory_reserved(device) / 1024 ** 3, # GB
'max_allocated': torch.cuda.max_memory_allocated(device) / 1024 ** 3 # GB
}
return None
9. 算法创新点
9.1 多目标优化设计
系统同时优化三个目标:
-
目标课目分数最大化:通过奖励函数直接优化
-
权重矩阵稳定性:通过变化惩罚项实现
-
预测准确性:通过CQ和QX预测损失实现
9.2 多分支网络架构
创新性地将权重矩阵预测融入Actor-Critic框架:
-
Actor分支学习策略
-
Critic分支评估状态价值
-
CQ和QX分支直接预测权重矩阵
9.3 多优化器策略
为不同网络组件使用独立优化器:
-
Actor优化器:低学习率(3e-4),稳定策略更新
-
Critic优化器:高学习率(1e-3),快速价值学习
-
CQ/QX优化器:中等学习率(5e-4),平衡预测精度
9.4 混合精度训练
利用FP16加速训练:
-
减少显存占用
-
提高训练速度
-
通过GradScaler保持数值稳定性
9.5 增强的探索策略
结合ε-贪心和策略熵:
-
训练初期高ε值鼓励探索
-
随训练进程逐渐降低ε值
-
策略熵项鼓励多样化动作
10. 系统优势与应用场景
10.1 系统优势
-
端到端优化:从用户状态直接到权重矩阵的端到端优化
-
个性化推荐:根据用户特点和目标定制权重矩阵
-
高效训练:采用多种优化技术提高训练效率
-
稳定性保证:通过裁剪、归一化等技术确保系统稳定性
-
可扩展性:模块化设计便于扩展新的素质、课目或动作
10.2 应用场景
-
个性化体能训练:根据用户当前素质水平和目标课目,生成个性化训练方案
-
运动员专项训练:针对运动员的专项需求,优化训练动作权重
-
康复训练:根据康复阶段和目标,调整训练强度和动作
-
健身课程推荐:为健身爱好者推荐最适合的训练动作组合
-
训练效果评估:通过权重矩阵分析训练效果和改进方向
11. 总结
该体能训练权重优化系统通过深度强化学习技术,实现了从用户状态到训练权重矩阵的端到端优化。系统采用增强的PPO算法,结合多分支网络架构、多优化器策略和混合精度训练等先进技术,能够高效、稳定地优化课目-素质和素质-动作两个权重矩阵。
系统设计充分考虑了实际应用需求,通过奖励函数平衡目标达成和系统稳定性,通过API接口提供灵活的训练和优化服务,适用于多种体能训练场景。系统的模块化设计和可扩展性使其能够适应不断变化的训练需求和新的训练方法。