// FileName: LSTM.py
''' @Project:pythonProject1 @File:LSTM3.py @IDE:pythonProject1 @Author:whz @Date:2025/4/23 10:20
'''
""" 数据集查看以及分析: Server, Value, Timestamp, Questionable, Annotated, Substituted
data = pd.read_csv('./data/机组指令值.csv') print("行数有:",data.shape[0]) print("列数有:",data.shape[1]) print("列名有:",data.columns.tolist()) print(data.dtypes) # 打印每列的最大值、最小值、中位数 column = " Value" print(f"列名: {column}") print("最大值:", data[column].max()) print("最小值:", data[column].min()) print("中位数:", data[column].median()) print("平均值:", data[column].mean()) print("标准差:", data[column].std()) print("-" * 30) # 绘制某一列的值
column_to_plot = ' Value' if column_to_plot in data.columns: plt.figure(figsize=(10, 6)) plt.plot(data[column_to_plot], marker='o', linestyle='-') plt.title(f'{column_to_plot} 的值') plt.xlabel('索引') plt.ylabel(column_to_plot) plt.grid(True) plt.show() else: print(f"列名 '{column_to_plot}' 不存在于数据中。")
列名: Value 最大值: 329.9479 最小值: 149.7035 中位数: 223.8567 平均值: 222.54732904839352 标准差: 47.638236488568445 ------------------------------ """
""" unique_values = data[' Questionable'].unique() # 获取唯一值 print(f"列 '{' Questionable'}' 中的唯一值:") print(unique_values) print(f"\n唯一值的数量:{len(unique_values)}")
列 ' Questionable' 中的唯一值: [' False'] 唯一值的数量:1
unique_values = data[' Annotated'].unique() # 获取唯一值 print(f"列 '{' Annotated'}' 中的唯一值:") print(unique_values) print(f"\n唯一值的数量:{len(unique_values)}")
列 ' Annotated' 中的唯一值: [' Not Annotated'] 唯一值的数量:1
unique_values = data[' Substituted'].unique() # 获取唯一值 print(f"列 '{' Substituted'}' 中的唯一值:") print(unique_values) print(f"\n唯一值的数量:{len(unique_values)}")
列 ' Substituted' 中的唯一值: [' False'] 唯一值的数量:1 """
import numpy as np import pandas as pd import matplotlib.pyplot as plt from sklearn.preprocessing import MinMaxScaler import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import Dataset, DataLoader
class Attention(nn.Module): def __init__(self, hidden_size): super(Attention, self).__init__() self.attention_layer = nn.Linear(hidden_size, hidden_size) self.softmax = nn.Softmax(dim=1)
def forward(self, lstm_output): attention_scores = self.attention_layer(lstm_output) attention_scores = torch.tanh(attention_scores) attention_weights = self.softmax(attention_scores) context = torch.sum(attention_weights * lstm_output, dim=1) return context
plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False data = pd.read_csv('./data/机组指令值.csv')
data['Time'] = pd.to_datetime(data[' Timestamp']) data['Month'] = data['Time'].dt.month data['Day'] = data['Time'].dt.day data['Hour'] = data['Time'].dt.hour data['Minute'] = data['Time'].dt.minute data['Second'] = data['Time'].dt.second
def encode_cyclic_features(df, column, max_value): df[f'{column}_sin'] = np.sin(2 * np.pi * df[column] / max_value) df[f'{column}_cos'] = np.cos(2 * np.pi * df[column] / max_value) return df
data = encode_cyclic_features(data, 'Month', 12) data = encode_cyclic_features(data, 'Day', 31) data = encode_cyclic_features(data, 'Hour', 24) data = encode_cyclic_features(data, 'Minute', 60) data = encode_cyclic_features(data, 'Second', 60)
values = data[[' Value']].values
scaler = MinMaxScaler(feature_range=(0, 1)) scaled_values = scaler.fit_transform(values)
def create_time_features(data, n_steps_in, n_steps_out, n_jump): X, y = [], [] for i in range(0, len(data) - n_steps_in - n_steps_out + 1, n_jump): end_ix = i + n_steps_in out_end_ix = end_ix + n_steps_out if out_end_ix > len(data): break seq_x = data[i:end_ix] seq_y = data[end_ix:out_end_ix] X.append(seq_x) y.append(seq_y) return np.array(X), np.array(y)
n_steps_in = 30 n_steps_out = 2 n_jump = 2
X, y = create_time_features(scaled_values, n_steps_in, n_steps_out, n_jump)
train_size = int(0.8 * len(X)) X_train, X_test = X[:train_size], X[train_size:] y_train, y_test = y[:train_size], y[train_size:]
def add_time_features(X, time_data, n_steps_in): time_features = [] for i in range(len(X)): start_idx = i * n_jump end_idx = start_idx + n_steps_in time_window = time_data[start_idx:end_idx] time_features.append(time_window) time_features = np.array(time_features) return np.concatenate((X, time_features), axis=2)
time_columns = ['Month_sin', 'Month_cos', 'Day_sin', 'Day_cos', 'Hour_sin', 'Hour_cos', 'Minute_sin', 'Minute_cos', 'Second_sin', 'Second_cos'] time_data = data[time_columns].values
X_train = add_time_features(X_train, time_data, n_steps_in) X_test = add_time_features(X_test, time_data, n_steps_in)
class TimeSeriesDataset(Dataset): def __init__(self, X, y): self.X = torch.tensor(X, dtype=torch.float32) self.y = torch.tensor(y, dtype=torch.float32).squeeze(-1)
def __len__(self): return len(self.X)
def __getitem__(self, idx): return self.X[idx], self.y[idx]
train_dataset = TimeSeriesDataset(X_train, y_train) test_dataset = TimeSeriesDataset(X_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True) test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
class HybridLSTMGRUModel(nn.Module): def __init__(self, input_size, hidden_size, lstm_layers, gru_layers, output_size): super(HybridLSTMGRUModel, self).__init__() self.hidden_size = hidden_size self.lstm_layers = lstm_layers self.gru_layers = gru_layers self.lstm_stack = nn.LSTM(input_size, hidden_size, num_layers=lstm_layers, batch_first=True) self.gru_stack = nn.GRU(hidden_size, hidden_size, num_layers=gru_layers, batch_first=True) self.dropout = nn.Dropout(0.2) self.attention = Attention(hidden_size) self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x): h0_lstm = torch.zeros(self.lstm_layers, x.size(0), self.hidden_size).to(x.device) c0_lstm = torch.zeros(self.lstm_layers, x.size(0), self.hidden_size).to(x.device) lstm_out, _ = self.lstm_stack(x, (h0_lstm, c0_lstm)) h0_gru = torch.zeros(self.gru_layers, x.size(0), self.hidden_size).to(x.device) gru_out, _ = self.gru_stack(lstm_out, h0_gru) gru_out = self.dropout(gru_out) context = self.attention(gru_out) out = self.fc(context) return out
input_size = X_train.shape[2] hidden_size = 128 lstm_layers = 2 gru_layers = 3 output_size = n_steps_out
model = HybridLSTMGRUModel(input_size, hidden_size, lstm_layers, gru_layers, output_size).to(device)
criterion = nn.MSELoss() optimizer = optim.Adam(model.parameters(), lr=0.001)
num_epochs = 100 for epoch in range(num_epochs): model.train() train_loss = 0.0 for X_batch, y_batch in train_loader: X_batch = X_batch.to(device) y_batch = y_batch.to(device) optimizer.zero_grad() outputs = model(X_batch) loss = criterion(outputs, y_batch) loss.backward() optimizer.step() train_loss += loss.item() * X_batch.size(0) train_loss = train_loss / len(train_loader.dataset) model.eval() with torch.no_grad(): predictions = [] true_values = [] for X_batch, y_batch in test_loader: X_batch = X_batch.to(device) y_batch = y_batch.to(device) outputs = model(X_batch) predictions.extend(outputs.cpu().numpy()) true_values.extend(y_batch.cpu().numpy()) predictions = np.array(predictions) true_values = np.array(true_values) predictions = predictions.reshape(-1, 1) true_values = true_values.reshape(-1, 1) predictions_actual = scaler.inverse_transform(predictions) true_values_actual = scaler.inverse_transform(true_values) mse = np.mean((true_values_actual - predictions_actual) ** 2) mae = np.mean(np.abs(true_values_actual - predictions_actual)) print(f"均方误差 (MSE): {mse}") print(f"平均绝对误差 (MAE): {mae}") val_loss = val_loss / len(test_loader.dataset) print(f"Epoch {epoch + 1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")
""" 代码的实现是基于监督学习的训练过程,其目的是让模型学习如何根据历史数据预测未来的值。 在训练过程中,我们并不需要将预测值反馈到输入窗口,因为训练数据已经包含了足够多的历史信息。 在实际应用中,可以在测试或部署阶段实现,但这需要额外的逻辑来管理输入窗口的更新。 """
""" # 绘制预测结果 plt.figure(figsize=(12, 6)) plt.plot(true_values_actual[:n_steps_out], label='True Values', marker='o') plt.plot(predictions_actual[:n_steps_out], label='Predicted Values', marker='x') plt.title('LSTM-GRU Prediction') plt.xlabel('Time Step') plt.ylabel('Value') plt.legend() plt.grid(True) plt.show() """
model.eval() with torch.no_grad(): initial_window = X_test[0].unsqueeze(0).to(device) predicted_values = [] current_window = initial_window
for _ in range(8): output = model(current_window) predicted_value = output[:, -1].unsqueeze(1).unsqueeze(0) predicted_values.append(predicted_value) current_window = torch.cat((current_window[:, 1:, :], predicted_value), dim=1)
predicted_sequence = torch.cat(predicted_values, dim=1) predicted_sequence = predicted_sequence.squeeze(0).cpu().numpy()
predicted_sequence = predicted_sequence.reshape(-1, 1) predicted_actual = scaler.inverse_transform(predicted_sequence)
time_points = pd.date_range(start='2025-08-26 00:00:00', periods=8, freq='H')
plt.figure(figsize=(12, 6)) plt.plot(time_points, predicted_actual, label='Predicted Values', marker='x') plt.title('Future Prediction for August 26th, 0:00 to 7:00') plt.xlabel('Time') plt.ylabel('Value') plt.legend() plt.grid(True) plt.show()
|