4月23日,晴天,是上午10点起床的
今天的任务,尝试着美化一下页面

LSTM的深度学习作业

今天抽空面向kimi编程写完交一下

// FileName: LSTM.py
#!/usr/bin/env python
# -*- coding:UTF-8 -*-
'''
@Project:pythonProject1
@File:LSTM3.py
@IDE:pythonProject1
@Author:whz
@Date:2025/4/23 10:20

'''
# 设置支持中文的字体

#预测未来的8个值
#跳步时间窗口
#预测未来多个数值
"""
数据集查看以及分析:
Server, Value, Timestamp, Questionable, Annotated, Substituted


data = pd.read_csv('./data/机组指令值.csv')
print("行数有:",data.shape[0])
print("列数有:",data.shape[1])
print("列名有:",data.columns.tolist())
print(data.dtypes)
# 打印每列的最大值、最小值、中位数
column = " Value"
print(f"列名: {column}")
print("最大值:", data[column].max())
print("最小值:", data[column].min())
print("中位数:", data[column].median())
print("平均值:", data[column].mean())
print("标准差:", data[column].std())
print("-" * 30)
# 绘制某一列的值

column_to_plot = ' Value'
if column_to_plot in data.columns:
plt.figure(figsize=(10, 6))
plt.plot(data[column_to_plot], marker='o', linestyle='-')
plt.title(f'{column_to_plot} 的值')
plt.xlabel('索引')
plt.ylabel(column_to_plot)
plt.grid(True)
plt.show()
else:
print(f"列名 '{column_to_plot}' 不存在于数据中。")



列名: Value
最大值: 329.9479
最小值: 149.7035
中位数: 223.8567
平均值: 222.54732904839352
标准差: 47.638236488568445
------------------------------
"""
#Server是一列无关特征
#print(data['Server'])


# Value, Timestamp, Questionable, Annotated, Substituted
"""
unique_values = data[' Questionable'].unique() # 获取唯一值
print(f"列 '{' Questionable'}' 中的唯一值:")
print(unique_values)
print(f"\n唯一值的数量:{len(unique_values)}")

列 ' Questionable' 中的唯一值:
[' False']
唯一值的数量:1

unique_values = data[' Annotated'].unique() # 获取唯一值
print(f"列 '{' Annotated'}' 中的唯一值:")
print(unique_values)
print(f"\n唯一值的数量:{len(unique_values)}")


列 ' Annotated' 中的唯一值:
[' Not Annotated']
唯一值的数量:1


unique_values = data[' Substituted'].unique() # 获取唯一值
print(f"列 '{' Substituted'}' 中的唯一值:")
print(unique_values)
print(f"\n唯一值的数量:{len(unique_values)}")


列 ' Substituted' 中的唯一值:
[' False']
唯一值的数量:1
"""
#综合分析得所有特征列都只有唯一一个值,均是无关特征。所以我们的网络只要预测Value和Timestamp即可
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

# 注意力机制层
class Attention(nn.Module):
def __init__(self, hidden_size):
super(Attention, self).__init__()
self.attention_layer = nn.Linear(hidden_size, hidden_size)
self.softmax = nn.Softmax(dim=1)

def forward(self, lstm_output):
# lstm_output shape: (batch_size, seq_len, hidden_size)
attention_scores = self.attention_layer(lstm_output)
attention_scores = torch.tanh(attention_scores)
attention_weights = self.softmax(attention_scores)
context = torch.sum(attention_weights * lstm_output, dim=1)
return context

# 读取CSV文件
plt.rcParams['font.sans-serif'] = ['SimHei'] # 黑体
plt.rcParams['axes.unicode_minus'] = False # 正确显示负号
data = pd.read_csv('./data/机组指令值.csv')

# 提取时间特征
data['Time'] = pd.to_datetime(data[' Timestamp']) # 假设时间列名为 'Time'
data['Month'] = data['Time'].dt.month
data['Day'] = data['Time'].dt.day
data['Hour'] = data['Time'].dt.hour
data['Minute'] = data['Time'].dt.minute
data['Second'] = data['Time'].dt.second

# 对时间特征进行周期编码
def encode_cyclic_features(df, column, max_value):
df[f'{column}_sin'] = np.sin(2 * np.pi * df[column] / max_value)
df[f'{column}_cos'] = np.cos(2 * np.pi * df[column] / max_value)
return df

data = encode_cyclic_features(data, 'Month', 12)
data = encode_cyclic_features(data, 'Day', 31)
data = encode_cyclic_features(data, 'Hour', 24)
data = encode_cyclic_features(data, 'Minute', 60)
data = encode_cyclic_features(data, 'Second', 60)

# 提取'Value'列作为目标变量
values = data[[' Value']].values

# 数据归一化
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_values = scaler.fit_transform(values)

# 创建时间步特征和跳跃选取'Value'的值
def create_time_features(data, n_steps_in, n_steps_out, n_jump):
X, y = [], []
for i in range(0, len(data) - n_steps_in - n_steps_out + 1, n_jump):
end_ix = i + n_steps_in
out_end_ix = end_ix + n_steps_out
if out_end_ix > len(data):
break
seq_x = data[i:end_ix]
seq_y = data[end_ix:out_end_ix]
X.append(seq_x)
y.append(seq_y)
return np.array(X), np.array(y)

# 设置时间步
n_steps_in = 30 # 输入序列长度
n_steps_out = 2 # 输出序列长度
n_jump = 2 # 跳跃步长

# 创建序列数据
X, y = create_time_features(scaled_values, n_steps_in, n_steps_out, n_jump)

# 划分训练集和测试集
train_size = int(0.8 * len(X))
X_train, X_test = X[:train_size], X[train_size:]
y_train, y_test = y[:train_size], y[train_size:]

# 添加时间编码特征
def add_time_features(X, time_data, n_steps_in):
time_features = []
for i in range(len(X)):
start_idx = i * n_jump
end_idx = start_idx + n_steps_in
time_window = time_data[start_idx:end_idx]
time_features.append(time_window)
time_features = np.array(time_features)
return np.concatenate((X, time_features), axis=2)

time_columns = ['Month_sin', 'Month_cos', 'Day_sin', 'Day_cos', 'Hour_sin', 'Hour_cos', 'Minute_sin', 'Minute_cos', 'Second_sin', 'Second_cos']
time_data = data[time_columns].values

X_train = add_time_features(X_train, time_data, n_steps_in)
X_test = add_time_features(X_test, time_data, n_steps_in)

# 定义数据集类
class TimeSeriesDataset(Dataset):
def __init__(self, X, y):
self.X = torch.tensor(X, dtype=torch.float32)
self.y = torch.tensor(y, dtype=torch.float32).squeeze(-1) # 确保 y 的形状是 (batch_size, output_size)

def __len__(self):
return len(self.X)

def __getitem__(self, idx):
return self.X[idx], self.y[idx]

# 创建数据加载器
train_dataset = TimeSeriesDataset(X_train, y_train)
test_dataset = TimeSeriesDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# 设备配置
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# 定义多层LSTM-GRU混合模型
class HybridLSTMGRUModel(nn.Module):
def __init__(self, input_size, hidden_size, lstm_layers, gru_layers, output_size):
super(HybridLSTMGRUModel, self).__init__()
self.hidden_size = hidden_size
self.lstm_layers = lstm_layers
self.gru_layers = gru_layers
# 堆叠的LSTM层
self.lstm_stack = nn.LSTM(input_size, hidden_size, num_layers=lstm_layers, batch_first=True)
# GRU层
self.gru_stack = nn.GRU(hidden_size, hidden_size, num_layers=gru_layers, batch_first=True)
self.dropout = nn.Dropout(0.2)
self.attention = Attention(hidden_size)
self.fc = nn.Linear(hidden_size, output_size)

def forward(self, x):
# 初始化LSTM的隐藏状态和细胞状态
h0_lstm = torch.zeros(self.lstm_layers, x.size(0), self.hidden_size).to(x.device)
c0_lstm = torch.zeros(self.lstm_layers, x.size(0), self.hidden_size).to(x.device)
# LSTM层
lstm_out, _ = self.lstm_stack(x, (h0_lstm, c0_lstm))
# 初始化GRU的隐藏状态
h0_gru = torch.zeros(self.gru_layers, x.size(0), self.hidden_size).to(x.device)
# GRU层
gru_out, _ = self.gru_stack(lstm_out, h0_gru)
# 添加Dropout
gru_out = self.dropout(gru_out)
# 注意力层
context = self.attention(gru_out) # 输出形状: (batch_size, hidden_size)
# 全连接层
out = self.fc(context) # 输出形状: (batch_size, output_size)
return out

# 设置超参数
input_size = X_train.shape[2] # 输入特征数量(包括时间编码和stap变量)
hidden_size = 128 # 隐藏层大小
lstm_layers = 2 # LSTM层数
gru_layers = 3 # GRU层数
output_size = n_steps_out # 输出序列长度

# 初始化模型
model = HybridLSTMGRUModel(input_size, hidden_size, lstm_layers, gru_layers, output_size).to(device)

# 定义损失函数和优化器
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 训练模型
num_epochs = 100 # 训练轮数
for epoch in range(num_epochs):
model.train()
train_loss = 0.0
for X_batch, y_batch in train_loader:
X_batch = X_batch.to(device)
y_batch = y_batch.to(device)
optimizer.zero_grad()
outputs = model(X_batch)
# 打印调试信息
#print(f"Model output shape: {outputs.shape}, Target shape: {y_batch.shape}")
loss = criterion(outputs, y_batch)
loss.backward()
optimizer.step()
train_loss += loss.item() * X_batch.size(0)
train_loss = train_loss / len(train_loader.dataset)
# 测试模型
model.eval()
with torch.no_grad():
predictions = []
true_values = []
for X_batch, y_batch in test_loader:
X_batch = X_batch.to(device)
y_batch = y_batch.to(device)
outputs = model(X_batch)
predictions.extend(outputs.cpu().numpy())
true_values.extend(y_batch.cpu().numpy())
predictions = np.array(predictions)
true_values = np.array(true_values)
# 反归一化
predictions = predictions.reshape(-1, 1)
true_values = true_values.reshape(-1, 1)
predictions_actual = scaler.inverse_transform(predictions)
true_values_actual = scaler.inverse_transform(true_values)
# 计算误差
mse = np.mean((true_values_actual - predictions_actual) ** 2)
mae = np.mean(np.abs(true_values_actual - predictions_actual))
print(f"均方误差 (MSE): {mse}")
print(f"平均绝对误差 (MAE): {mae}")
val_loss = val_loss / len(test_loader.dataset)
print(f"Epoch {epoch + 1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")



"""
代码的实现是基于监督学习的训练过程,其目的是让模型学习如何根据历史数据预测未来的值。
在训练过程中,我们并不需要将预测值反馈到输入窗口,因为训练数据已经包含了足够多的历史信息。
在实际应用中,可以在测试或部署阶段实现,但这需要额外的逻辑来管理输入窗口的更新。
"""

"""
# 绘制预测结果
plt.figure(figsize=(12, 6))
plt.plot(true_values_actual[:n_steps_out], label='True Values', marker='o')
plt.plot(predictions_actual[:n_steps_out], label='Predicted Values', marker='x')
plt.title('LSTM-GRU Prediction')
plt.xlabel('Time Step')
plt.ylabel('Value')
plt.legend()
plt.grid(True)
plt.show()
"""


# 预测未来8个时间点的值
model.eval()
with torch.no_grad():
# 初始化预测窗口,使用测试集的第一个窗口作为初始窗口
initial_window = X_test[0].unsqueeze(0).to(device)
predicted_values = []
current_window = initial_window

# 预测未来8个时间点的值
for _ in range(8):
output = model(current_window)
predicted_value = output[:, -1].unsqueeze(1).unsqueeze(0) # 取最后一个时间步的预测值
predicted_values.append(predicted_value)
# 更新窗口:移除最旧的数据,添加最新的预测值
current_window = torch.cat((current_window[:, 1:, :], predicted_value), dim=1)

# 将所有预测值集合起来
predicted_sequence = torch.cat(predicted_values, dim=1)
predicted_sequence = predicted_sequence.squeeze(0).cpu().numpy()

# 反归一化
predicted_sequence = predicted_sequence.reshape(-1, 1)
predicted_actual = scaler.inverse_transform(predicted_sequence)

# 生成时间点
time_points = pd.date_range(start='2025-08-26 00:00:00', periods=8, freq='H')

# 绘制预测结果
plt.figure(figsize=(12, 6))
plt.plot(time_points, predicted_actual, label='Predicted Values', marker='x')
plt.title('Future Prediction for August 26th, 0:00 to 7:00')
plt.xlabel('Time')
plt.ylabel('Value')
plt.legend()
plt.grid(True)
plt.show()