CRWU凯斯西储大学轴承数据,12k频率,十分类。
from torch.utils.data import Dataset, DataLoader
from scipy.io import loadmat
import numpy as np
import os
from sklearn import preprocessing # 0-1编码
from sklearn.model_selection import StratifiedShuffleSplit # 随机划分,保证每一类比例相同
import torch
from torch import nn
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
import torch.optim as optimdef prepro(d_path, length=0, number=0, normal=True, rate=[0, 0, 0], enc=False, enc_step=28):# 获得该文件夹下所有.mat文件名filenames = os.listdir(d_path)def capture(original_path):files = {}for i in filenames:# 文件路径file_path = os.path.join(d_path, i)file = loadmat(file_path)file_keys = file.keys()for key in file_keys:if 'DE' in key:files[i] = file[key].ravel()return filesdef slice_enc(data, slice_rate= rate[1]):keys = data.keys()Train_Samples = {}Test_Samples = {}for i in keys:slice_data = data[i]all_lenght = len(slice_data)# end_index = int(all_lenght * (1 - slice_rate))samp_train = int(number * (1 - slice_rate)) # 1000(1-0.3)Train_sample = []Test_Sample = []for j in range(samp_train):sample = slice_data[j * 150: j * 150 + length]Train_sample.append(sample)# 抓取测试数据for h in range(number - samp_train):sample = slice_data[samp_train * 150 + length + h * 150: samp_train * 150 + length + h * 150 + length]Test_Sample.append(sample)Train_Samples[i] = Train_sampleTest_Samples[i] = Test_Samplereturn Train_Samples, Test_Samples# 仅抽样完成,打标签def add_labels(train_test):X = []Y = []label = 0for i in filenames:x = train_test[i]X += xlenx = len(x)Y += [label] * lenxlabel += 1return X, Ydef scalar_stand(Train_X, Test_X):# 用训练集标准差标准化训练集以及测试集data_all = np.vstack((Train_X, Test_X))scalar = preprocessing.StandardScaler().fit(data_all)Train_X = scalar.transform(Train_X)Test_X = scalar.transform(Test_X)return Train_X, Test_Xdef valid_test_slice(Test_X, Test_Y):test_size = rate[2] / (rate[1] + rate[2])ss = StratifiedShuffleSplit(n_splits=1, test_size=test_size)Test_Y = np.asarray(Test_Y, dtype=np.int32)for train_index, test_index in ss.split(Test_X, Test_Y):X_valid, X_test = Test_X[train_index], Test_X[test_index]Y_valid, Y_test = Test_Y[train_index], Test_Y[test_index]return X_valid, Y_valid, X_test, Y_test# 从所有.mat文件中读取出数据的字典data = capture(original_path=d_path)# 将数据切分为训练集、测试集train, test = slice_enc(data)# 为训练集制作标签,返回X,YTrain_X, Train_Y = add_labels(train)# 为测试集制作标签,返回X,YTest_X, Test_Y = add_labels(test)# for i in Test_X:# print(i.shape)# for i in Train_X:# print(i.shape)# Train_X = np.stack(Train_X,axis=0)# Test_X = np.stack(Test_X,axis=0)# print(Train_X.shape,Test_X.shape)# 训练数据/测试数据 是否标准化.if normal:Train_X, Test_X = scalar_stand(Train_X, Test_X)# 将测试集切分为验证集和测试集.# Valid_X, Valid_Y, Test_X, Test_Y = valid_test_slice(Test_X, Test_Y)return Train_X, Train_Y, Test_X, Test_Ynum_classes = 10 # 样本类别
length = 224*224 # 样本长度
number = 140 # 每类样本的数量
normal = True # 是否标准化
rate = [0.5, 0.25, 0.25] # 测试集验证集划分比例class BearingDataset(Dataset):def __init__(self, data, labels):self.data = torch.tensor(data, dtype=torch.float32)self.labels = torch.tensor(labels, dtype=torch.long)def __len__(self):return len(self.data)def __getitem__(self, idx):return self.data[idx], self.labels[idx]def create_dataloader(data, labels, batch_size=32, shuffle=True, num_workers=0):dataset = BearingDataset(data, labels)dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle, num_workers=num_workers)return dataloader# 使用前面定义的函数处理数据
path = './data12k' # 注意路径格式可能需要根据您的操作系统调整
x_train, y_train, x_test, y_test = prepro(d_path=path,length=112*112, # 样本长度number=250, # 每类样本的数量normal=True, # 是否标准化rate=[0.8, 0.2] # 测试集验证集划分比例
)# 创建 DataLoader
train_loader = create_dataloader(x_train, y_train, batch_size=32, shuffle=True, num_workers=0)
test_loader = create_dataloader(x_test, y_test, batch_size=32, shuffle=False, num_workers=0)class AlexNet(nn.Module):def __init__(self):super(AlexNet, self).__init__()self.Conv1 = nn.Conv2d(1, 24, 15, 3, 2) # [1, 48, 107, 107]self.relu1 = nn.ReLU()self.maxpool1 = nn.MaxPool2d(2) # [1, 24, 35, 35]self.Conv2 = nn.Conv2d(24, 64, 5, 1, 2) # [1, 64, 37, 37]self.relu2 = nn.ReLU()self.maxpool2 = nn.MaxPool2d(2) # [1, 64, 18, 18]self.Conv3 = nn.Conv2d(64, 96, 2, 1, 1) # [1, 96, 19, 19]self.relu3 = nn.ReLU()self.Conv4 = nn.Conv2d(96, 96, 2, 1, 1) # [1, 96, 20, 20]self.relu4 = nn.ReLU()self.Conv5 = nn.Conv2d(96, 64, 2, 1, 1) # [1, 64, 21, 21]self.relu5 = nn.ReLU()self.maxpool3 = nn.MaxPool2d(3) # [1, 64, 7, 7]self.Dro1 = nn.Dropout(p=0.5)self.flatten = nn.Flatten()self.line1 = nn.Linear(64 * 3 * 3, 1000)self.relu6 = nn.ReLU()self.Dro2 = nn.Dropout(p=0.5)self.line2 = nn.Linear(1000, 1000)self.relu7 = nn.ReLU()self.line3 = nn.Linear(1000, 500)self.line4 = nn.Linear(500, 10)def forward(self, x):x = self.Conv1(x)x = self.relu1(x)x = self.maxpool1(x)x = self.Conv2(x)x = self.relu2(x)x = self.maxpool2(x)x = self.Conv3(x)x = self.relu3(x)x = self.Conv4(x)x = self.relu4(x)x = self.Conv5(x)x = self.relu5(x)x = self.maxpool3(x)x = self.Dro1(x)x = self.flatten(x)x = self.line1(x)x = self.relu6(x)x = self.Dro2(x)x = self.line2(x)x = self.relu7(x)x = self.line3(x)x = self.line4(x)return xdef train_model(model, train_loader, criterion, optimizer, device):model.train()running_loss = 0.0for inputs, labels in train_loader:inputs, labels = inputs.to(device), labels.to(device)optimizer.zero_grad()outputs = model(inputs.view(-1,1,112,112))loss = criterion(outputs, labels)loss.backward()optimizer.step()running_loss += loss.item()return running_loss / len(train_loader)def evaluate_model(model, data_loader, device):model.eval()all_preds = []all_labels = []with torch.no_grad():for inputs, labels in data_loader:inputs, labels = inputs.to(device), labels.to(device)outputs = model(inputs.view(-1,1,112,112))_, predicted = torch.max(outputs, 1)all_preds.extend(predicted.cpu().numpy())all_labels.extend(labels.cpu().numpy())accuracy = accuracy_score(all_labels, all_preds)precision, recall, f1_score, _ = precision_recall_fscore_support(all_labels, all_preds, average='weighted')return accuracy, precision, recall, f1_scoredef main():device = torch.device("cuda" if torch.cuda.is_available() else "cpu")model = AlexNet().to(device)criterion = nn.CrossEntropyLoss()optimizer = optim.Adam(model.parameters(), lr=0.001)num_epochs = 50# 假设 train_loader 和 test_loader 已经被创建best_acc=0for epoch in range(num_epochs):train_loss = train_model(model, train_loader, criterion, optimizer, device)print(f'Epoch {epoch+1}, Loss: {train_loss:.4f}')accuracy, precision, recall, f1_score = evaluate_model(model, test_loader, device)print(f'Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1-Score: {f1_score:.4f}')if best_acc<accuracy:best_acc = accuracytorch.save(model.state_dict(), 'best_alexnet.pth')if __name__ == "__main__":main()