N-CMAPSS代码解析

代码目录介绍

在这里插入图片描述
在这里插入图片描述

重点部分代码解读

sample_creator_unit_auto.py
def main():
    # current_dir = os.path.dirname(os.path.abspath(__file__))
    parser = argparse.ArgumentParser(description='sample creator')
    parser.add_argument('-w', type=int, default=10, help='window length', required=True)
    parser.add_argument('-s', type=int, default=10, help='stride of window')
    parser.add_argument('--sampling', type=int, default=1, help='sub sampling of the given data. If it is 10, then this indicates that we assumes 0.1Hz of data collection')
    parser.add_argument('--test', type=int, default='0', help='select train or test, if it is zero, then extract samples from the engines used for training')

    args = parser.parse_args()   #解析参数——使用 parse_args() 解析添加的参数


    sequence_length = args.w   #窗口长度
    stride = args.s #步长
    sampling = args.sampling
    selector = args.test



    # Load data
    '''
    W: operative conditions (Scenario descriptors)
    X_s: measured signals
    X_v: virtual sensors
    T(theta): engine health parameters
    Y: RUL [in cycles]
    A: auxiliary data
    '''

    df_all = df_all_creator(data_filepath, sampling)

    '''
    划分训练集和测试集
    Split dataframe into Train and Test
    Training units: 2, 5, 10, 16, 18, 20
    Test units: 11, 14, 15

    '''
    # units = list(np.unique(df_A['unit']))
    units_index_train = [2.0, 5.0, 10.0, 16.0, 18.0, 20.0]
    units_index_test = [11.0, 14.0, 15.0]

    print("units_index_train", units_index_train)
    print("units_index_test", units_index_test)

    # if any(int(idx) == unit_index for idx in units_index_train):
    #     df_train = df_train_creator(df_all, units_index_train)
    #     print(df_train)
    #     print(df_train.columns)
    #     print("num of inputs: ", len(df_train.columns) )
    #     df_test = pd.DataFrame()
    #
    # else :
    #     df_test = df_test_creator(df_all, units_index_test)
    #     print(df_test)
    #     print(df_test.columns)
    #     print("num of inputs: ", len(df_test.columns))
    #     df_train = pd.DataFrame()


    df_train = df_train_creator(df_all, units_index_train)
    print(df_train)
    print(df_train.columns)
    print("num of inputs: ", len(df_train.columns) )
    df_test = df_test_creator(df_all, units_index_test)
    print(df_test)
    print(df_test.columns)
    print("num of inputs: ", len(df_test.columns))

    del df_all
    gc.collect()
    df_all = pd.DataFrame()
    sample_dir_path = os.path.join(data_filedir, 'Samples_whole')
    sample_folder = os.path.isdir(sample_dir_path)
    if not sample_folder:
        os.makedirs(sample_dir_path)
        print("created folder : ", sample_dir_path)

    cols_normalize = df_train.columns.difference(['RUL', 'unit'])  #去掉df_train中的['RUL', 'unit']这两列
    sequence_cols = df_train.columns.difference(['RUL', 'unit'])

#selector == 0则从train中选择数据进行训练,否则,从测试集单元中选择数据
    if selector == 0:
        for unit_index in units_index_train:
            data_class = Input_Gen (df_train, df_test, cols_normalize, sequence_length, sequence_cols, sample_dir_path,
                                    unit_index, sampling, stride =stride)
            data_class.seq_gen()

    else:
        for unit_index in units_index_test:
            data_class = Input_Gen (df_train, df_test, cols_normalize, sequence_length, sequence_cols, sample_dir_path,
                                    unit_index, sampling, stride =stride)
            data_class.seq_gen()



data_preparation_unit.py
#加载所有特征
def df_all_creator(data_filepath, sampling):
    """

     """
    # Time tracking, Operation time (min):  0.003
    t = time.process_time()


    with h5py.File(data_filepath, 'r') as hdf:
        # Development(training) set  训练数据和测试数据是已经分好了的
        W_dev = np.array(hdf.get('W_dev'))  # W
        X_s_dev = np.array(hdf.get('X_s_dev'))  # X_s
        X_v_dev = np.array(hdf.get('X_v_dev'))  # X_v
        T_dev = np.array(hdf.get('T_dev'))  # T
        Y_dev = np.array(hdf.get('Y_dev'))  # RUL
        A_dev = np.array(hdf.get('A_dev'))  # Auxiliary

        # Test set
        W_test = np.array(hdf.get('W_test'))  # W
        X_s_test = np.array(hdf.get('X_s_test'))  # X_s
        X_v_test = np.array(hdf.get('X_v_test'))  # X_v
        T_test = np.array(hdf.get('T_test'))  # T
        Y_test = np.array(hdf.get('Y_test'))  # RUL
        A_test = np.array(hdf.get('A_test'))  # Auxiliary

        # Varnams
        W_var = np.array(hdf.get('W_var'))
        X_s_var = np.array(hdf.get('X_s_var'))
        X_v_var = np.array(hdf.get('X_v_var'))
        T_var = np.array(hdf.get('T_var'))
        A_var = np.array(hdf.get('A_var'))

        # from np.array to list dtype U4/U5
        W_var = list(np.array(W_var, dtype='U20'))
        X_s_var = list(np.array(X_s_var, dtype='U20'))
        X_v_var = list(np.array(X_v_var, dtype='U20'))
        T_var = list(np.array(T_var, dtype='U20'))
        A_var = list(np.array(A_var, dtype='U20'))


    W = np.concatenate((W_dev, W_test), axis=0)
    X_s = np.concatenate((X_s_dev, X_s_test), axis=0)
    X_v = np.concatenate((X_v_dev, X_v_test), axis=0)
    T = np.concatenate((T_dev, T_test), axis=0)
    Y = np.concatenate((Y_dev, Y_test), axis=0)
    A = np.concatenate((A_dev, A_test), axis=0)

    print('')
    print("Operation time (min): ", (time.process_time() - t) / 60)
    print("number of training samples(timestamps): ", Y_dev.shape[0])
    print("number of test samples(timestamps): ", Y_test.shape[0])
    print('')
    print("W shape: " + str(W.shape))
    print("X_s shape: " + str(X_s.shape))
    print("X_v shape: " + str(X_v.shape))
    print("T shape: " + str(T.shape))
    print("Y shape: " + str(Y.shape))
    print("A shape: " + str(A.shape))

    '''
    Illusration of Multivariate time-series of condition monitoring sensors readings for Unit5 (fifth engine)
    5号机组(第五发动机)状态监测传感器读数的多元时间序列说明
    
    W: operative conditions (Scenario descriptors) - ['alt', 'Mach', 'TRA', 'T2']
    X_s: measured signals - ['T24', 'T30', 'T48', 'T50', 'P15', 'P2', 'P21', 'P24', 'Ps30', 'P40', 'P50', 'Nf', 'Nc', 'Wf']
    X_v: virtual sensors - ['T40', 'P30', 'P45', 'W21', 'W22', 'W25', 'W31', 'W32', 'W48', 'W50', 'SmFan', 'SmLPC', 'SmHPC', 'phi']
    T(theta): engine health parameters - ['fan_eff_mod', 'fan_flow_mod', 'LPC_eff_mod', 'LPC_flow_mod', 'HPC_eff_mod', 'HPC_flow_mod', 'HPT_eff_mod', 'HPT_flow_mod', 'LPT_eff_mod', 'LPT_flow_mod']
    Y: RUL [in cycles]
    A: auxiliary data - ['unit', 'cycle', 'Fc', 'hs']
    '''

    df_W = pd.DataFrame(data=W, columns=W_var)
    df_Xs = pd.DataFrame(data=X_s, columns=X_s_var)
    df_Xv = pd.DataFrame(data=X_v[:,0:2], columns=['T40', 'P30']) #这里的virtual sensors只取了‘T40', 'P30'两个参数
    # df_T = pd.DataFrame(data=T, columns=T_var)
    df_Y = pd.DataFrame(data=Y, columns=['RUL'])
    df_A = pd.DataFrame(data=A, columns=A_var).drop(columns=['cycle', 'Fc', 'hs']) #只取unit



    # Merge all the dataframes
    df_all = pd.concat([df_W, df_Xs, df_Xv, df_Y, df_A], axis=1)
    # 这里的df_all实际只有22个参数
    print ("df_all", df_all)    # df_all = pd.concat([df_W, df_Xs, df_Xv, df_Y, df_A], axis=1).drop(columns=[ 'P45', 'W21', 'W22', 'W25', 'W31', 'W32', 'W48', 'W50', 'SmFan', 'SmLPC', 'SmHPC', 'phi', 'Fc', 'hs'])

    print ("df_all.shape", df_all.shape)
    # del [[df_W, df_Xs, df_Xv, df_Y, df_A]]
    # gc.collect()
    # df_W = pd.DataFrame()
    # df_Xs = pd.DataFrame()
    # df_Xv = pd.DataFrame()
    # df_Y = pd.DataFrame()
    # df_A = pd.DataFrame()

    df_all_smp = df_all[::sampling]
    print ("df_all_sub", df_all_smp)    # df_all = pd.concat([df_W, df_Xs, df_Xv, df_Y, df_A], axis=1).drop(columns=[ 'P45', 'W21', 'W22', 'W25', 'W31', 'W32', 'W48', 'W50', 'SmFan', 'SmLPC', 'SmHPC', 'phi', 'Fc', 'hs'])

    print ("df_all_sub.shape", df_all_smp.shape)


    return df_all_smp


#加载训练集的所有数据
def df_train_creator(df_all, units_index_train):
    train_df_lst= []
    for idx in units_index_train:
        df_train_temp = df_all[df_all['unit'] == np.float64(idx)]
        train_df_lst.append(df_train_temp)
    df_train = pd.concat(train_df_lst)
    df_train = df_train.reset_index(drop=True)

    return df_train

#加载所有测试集数据
def df_test_creator(df_all, units_index_test):
    test_df_lst = []
    for idx in units_index_test:
        df_test_temp = df_all[df_all['unit'] == np.float64(idx)]
        test_df_lst.append(df_test_temp)

    df_test = pd.concat(test_df_lst)
    df_test = df_test.reset_index(drop=True)

    return df_test
#特征滑窗如何选取数据
def gen_sequence(id_df, seq_length, seq_cols):
    """ Only sequences that meet the window-length are considered, no padding is used. This means for testing
    we need to drop those which are below the window-length. An alternative would be to pad sequences so that
    we can use shorter ones """
    '''
    只考虑满足窗口长度的序列,不使用填充。这意味着在测试中,我们需要去掉那些低于窗口长度的部分。另一种选择是填充序列,这样我们就可以使用更短的序列
    '''
    # for one id I put all the rows in a single matrix  例如id=2的发动机,所有数据放在一个矩阵中
    data_matrix = id_df[seq_cols].values #此处只是特征部分,没有取RUL
    num_elements = data_matrix.shape[0] #行数
    # Iterate over two lists in parallel.
    # For example id1 have 192 rows and sequence_length is equal to 50  用长度为50的滑窗解释滑窗过程
    # so zip iterate over two following list of numbers (0,142),(50,192)
    # 0 50 -> from row 0 to row 50
    # 1 51 -> from row 1 to row 51
    # 2 52 -> from row 2 to row 52
    # ...
    # 142 192 -> from row 142 to 192
    for start, stop in zip(range(0, num_elements - seq_length), range(seq_length, num_elements)):
        yield data_matrix[start:stop, :]

#标签滑窗如何选取标签
def gen_labels(id_df, seq_length, label):
    """ Only sequences that meet the window-length are considered, no padding is used. This means for testing
    we need to drop those which are below the window-length. An alternative would be to pad sequences so that
    we can use shorter ones """
    # For one id I put all the labels in a single matrix.
    # For example:
    # [[1]
    # [4]
    # [1]
    # [5]
    # [9]
    # ...
    # [200]]
    data_matrix = id_df[label].values  #只有lable值
    num_elements = data_matrix.shape[0]
    # I have to remove the first seq_length labels
    # because for one id the first sequence of seq_length size have as target
    # the last label (the previus ones are discarded).
    # All the next id's sequences will have associated step by step one label as target.
    #窗口的下一条RUL作为标签
    return data_matrix[seq_length:num_elements, :]

#特征窗口滑动过程代码 ,利用滑窗生成特征sample_array,以及标签label_array
def time_window_slicing (input_array, sequence_length, sequence_cols):
    # generate labels
    label_gen = [gen_labels(input_array[input_array['unit'] == id], sequence_length, ['RUL'])
                 for id in input_array['unit'].unique()]
    label_array = np.concatenate(label_gen).astype(np.float32)
    # label_array = np.concatenate(label_gen)

    # transform each id of the train dataset in a sequence
    seq_gen = (list(gen_sequence(input_array[input_array['unit'] == id], sequence_length, sequence_cols))
               for id in input_array['unit'].unique())
    sample_array = np.concatenate(list(seq_gen)).astype(np.float32)
    # sample_array = np.concatenate(list(seq_gen))

    print("sample_array")
    return sample_array, label_array

#生成多个独立的窗口,只有RUL一列
def time_window_slicing_label_save (input_array, sequence_length, stride, index, sample_dir_path, sequence_cols = 'RUL'):
    '''
    ref
        for i in range(0, input_temp.shape[0] - sequence_length):
        window = input_temp[i*stride:i*stride + sequence_length, :]  # each individual window
        window_lst.append(window)
        # print (window.shape)


    '''
    # generate labels
    window_lst = []  # a python list to hold the windows

    input_temp = input_array[input_array['unit'] == index][sequence_cols].values
    num_samples = int((input_temp.shape[0] - sequence_length)/stride) + 1  #samples数量 也就是有多少个窗口
    for i in range(num_samples):
        window = input_temp[i*stride:i*stride + sequence_length]  # each individual window 每个窗口内的数据
        window_lst.append(window)
        # print (window.shape)

    label_array = np.asarray(window_lst).astype(np.float32)
    # label_array = np.asarray(window_lst)

    # np.save(os.path.join(sample_dir_path, 'Unit%s_rul_win%s_str%s' %(str(int(index)), sequence_length, stride)),
    #         label_array)  # save the file as "outfile_name.npy"

    return label_array[:,-1]  #取label_array矩阵的所有行,最后一列


#生成多个独立的窗口,仅包含特征
def time_window_slicing_sample_save (input_array, sequence_length, stride, index, sample_dir_path, sequence_cols):
    '''


    '''
    # generate labels
    window_lst = []  # a python list to hold the windows

    input_temp = input_array[input_array['unit'] == index][sequence_cols].values
    print ("Unit%s input array shape: " %index, input_temp.shape)
    num_samples = int((input_temp.shape[0] - sequence_length)/stride) + 1
    for i in range(num_samples):
        window = input_temp[i*stride:i*stride + sequence_length,:]  # each individual window
        window_lst.append(window)

    sample_array = np.dstack(window_lst).astype(np.float32)
    # sample_array = np.dstack(window_lst)
    print ("sample_array.shape", sample_array.shape)

    # np.save(os.path.join(sample_dir_path, 'Unit%s_samples_win%s_str%s' %(str(int(index)), sequence_length, stride)),
    #         sample_array)  # save the file as "outfile_name.npy"


    return sample_array


#准备数据,包含滑窗处理和归一化处理后的训练集和测试集
class Input_Gen(object):
    '''
    class for data preparation (sequence generator)
    '''

    def __init__(self, df_train, df_test, cols_normalize, sequence_length, sequence_cols, sample_dir_path,
                 unit_index, sampling, stride):
        '''

        '''
        # self.__logger = logging.getLogger('data preparation for using it as the network input')
        print("the number of input signals: ", len(cols_normalize))
        min_max_scaler = preprocessing.MinMaxScaler(feature_range=(-1, 1)) # 定义归一化的范围是(-1,1)
        norm_df = pd.DataFrame(min_max_scaler.fit_transform(df_train[cols_normalize]),#norm_df是归一化后的值
                               columns=cols_normalize,
                               index=df_train.index)
        join_df = df_train[df_train.columns.difference(cols_normalize)].join(norm_df)
        df_train = join_df.reindex(columns=df_train.columns) #重新设置索引,此时的df_train为归一化后的值

        norm_test_df = pd.DataFrame(min_max_scaler.transform(df_test[cols_normalize]), columns=cols_normalize,
                                    index=df_test.index)
        test_join_df = df_test[df_test.columns.difference(cols_normalize)].join(norm_test_df)
        df_test = test_join_df.reindex(columns=df_test.columns)
        df_test = df_test.reset_index(drop=True)

        self.df_train = df_train
        self.df_test = df_test

        print (self.df_train)
        print (self.df_test)

        self.cols_normalize = cols_normalize
        self.sequence_length = sequence_length
        self.sequence_cols = sequence_cols
        self.sample_dir_path = sample_dir_path
        self.unit_index = np.float64(unit_index)
        self.sampling = sampling
        self.stride = stride


    def seq_gen(self):
        '''
        concatenate vectors for NNs
        :param :
        :param :
        :return:
        '''
        #对不同的编号发动机逐个取特征和标签
        if any(index == self.unit_index for index in self.df_train['unit'].unique()):#训练集
            print ("Unit for Train")
            label_array = time_window_slicing_label_save(self.df_train, self.sequence_length,
                                           self.stride, self.unit_index, self.sample_dir_path, sequence_cols='RUL')
            sample_array = time_window_slicing_sample_save(self.df_train, self.sequence_length,
                                           self.stride, self.unit_index, self.sample_dir_path, sequence_cols=self.cols_normalize)

        else:#测试集
            print("Unit for Test")
            label_array = time_window_slicing_label_save(self.df_test, self.sequence_length,
                                           self.stride, self.unit_index, self.sample_dir_path, sequence_cols='RUL')
            sample_array = time_window_slicing_sample_save(self.df_test, self.sequence_length,
                                           self.stride, self.unit_index, self.sample_dir_path, sequence_cols=self.cols_normalize)

        # sample_split_lst = np.array_split(sample_array, 3, axis=2)
        # print (sample_split_lst[0].shape)
        # print(sample_split_lst[1].shape)
        # print(sample_split_lst[2].shape)

        # label_split_lst = np.array_split(label_array, 3, axis=0)
        # print (label_split_lst[0].shape)
        # print(label_split_lst[1].shape)
        # print(label_split_lst[2].shape)

        print("sample_array.shape", sample_array.shape)
        print("label_array.shape", label_array.shape)

        
        #以压缩的.npz 格式将多个数组保存到一个文件中,保存在 ./N-CMAPSS/Samples_whole  目录下
        np.savez_compressed(os.path.join(self.sample_dir_path, 'Unit%s_win%s_str%s_smp%s' %(str(int(self.unit_index)), self.sequence_length, self.stride, self.sampling)),
                                         sample=sample_array, label=label_array)
        print ("unit saved")

        return

inference_cnn_aggr.py
'''
DL models (FNN, 1D CNN and CNN-LSTM) evaluation on N-CMAPSS
12.07.2021
Hyunho Mo
[email protected]
'''
## Import libraries in python
import gc
import argparse
import os
import json
import logging
import sys
import h5py
import time
import matplotlib
import numpy as np
import pandas as pd
import seaborn as sns
from pandas import DataFrame
import matplotlib.pyplot as plt
from matplotlib import gridspec
import math
import random
from random import shuffle
from tqdm.keras import TqdmCallback

seed = 0
random.seed(0)
np.random.seed(seed)

import importlib
from scipy.stats import randint, expon, uniform
import sklearn as sk
from sklearn import svm
from sklearn.utils import shuffle
from sklearn import metrics
from sklearn import preprocessing
from sklearn import pipeline
from sklearn.metrics import mean_squared_error
from math import sqrt
from tqdm import tqdm
import scipy.stats as stats
# from sklearn.utils.testing import ignore_warnings
# from sklearn.exceptions import ConvergenceWarning
# import keras
import tensorflow as tf
print(tf.__version__)
# import keras.backend as K
import tensorflow.keras.backend as K
from tensorflow.keras import backend
from tensorflow.keras import optimizers
from tensorflow.keras.models import Sequential, load_model, Model
from tensorflow.keras.layers import Input, Dense, Flatten, Dropout, Embedding
from tensorflow.keras.layers import BatchNormalization, Activation, LSTM, TimeDistributed, Bidirectional
from tensorflow.keras.layers import Conv1D
from tensorflow.keras.layers import MaxPooling1D
from tensorflow.keras.layers import concatenate
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, LearningRateScheduler


from tensorflow.python.framework.convert_to_constants import  convert_variables_to_constants_v2_as_graph

from tensorflow.keras.initializers import GlorotNormal, GlorotUniform

initializer = GlorotNormal(seed=0)
# initializer = GlorotUniform(seed=0)

from utils.data_preparation_unit import df_all_creator, df_train_creator, df_test_creator, Input_Gen
from utils.dnn import one_dcnn,CNNBranch


# import tensorflow.compat.v1 as tf
# tf.disable_v2_behavior()

# Ignore tf err log
pd.options.mode.chained_assignment = None  # default='warn'


# from tensorflow.compat.v1 import ConfigProto
# from tensorflow.compat.v1 import InteractiveSession
# config = ConfigProto()
# config.gpu_options.allow_growth = True
# session = InteractiveSession(config=config)

#gpus = tf.config.experimental.list_physical_devices('GPU')
#for gpu in gpus:
#    tf.config.experimental.set_memory_growth(gpu, True)

# tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)
# tf.get_logger().setLevel(logging.ERROR)



# tf.config.set_visible_devices([], 'GPU')

# <editor-fold desc="定义数据集、模型路径">
current_dir = os.path.dirname(os.path.abspath(__file__))
print(current_dir)
data_filedir = os.path.join(current_dir, 'N-CMAPSS')    # os.path.join是拼接路径,可以传入多个路径
data_filepath = os.path.join(current_dir, 'N-CMAPSS', 'N-CMAPSS_DS02-006.h5')
sample_dir_path = os.path.join(data_filedir, 'Samples_whole')

#定义要保存的模型
# model_temp_path = os.path.join(current_dir, 'Models', 'oned_cnn_rep.h5')   #首先定义好模型名称和保存的路径
model_temp_path = os.path.join(current_dir, 'Models', 'CNNBranch_rep.h5')
tf_temp_path = os.path.join(current_dir, 'TF_Model_tf')

pic_dir = os.path.join(current_dir, 'Figures')  # 存训练损失 、学习率变化图
# </editor-fold>


def load_part_array (sample_dir_path, unit_num, win_len, stride, part_num):
    filename =  'Unit%s_win%s_str%s_part%s.npz' %(str(int(unit_num)), win_len, stride, part_num)
    filepath =  os.path.join(sample_dir_path, filename)
    loaded = np.load(filepath)
    return loaded['sample'], loaded['label']

'''
整理sample和label,转换成可训练的形状
'''
def load_part_array_merge (sample_dir_path, unit_num, win_len, win_stride, partition):
    sample_array_lst = []
    label_array_lst = []
    print ("Unit: ", unit_num)
    for part in range(partition):
      print ("Part.", part+1)
      sample_array, label_array = load_part_array (sample_dir_path, unit_num, win_len, win_stride, part+1)
      sample_array_lst.append(sample_array)
      label_array_lst.append(label_array)
    sample_array = np.dstack(sample_array_lst)  #按深度方向拼接
    label_array = np.concatenate(label_array_lst)
    sample_array = sample_array.transpose(2, 0, 1)
    print ("sample_array.shape", sample_array.shape)
    print ("label_array.shape", label_array.shape)
    return sample_array, label_array

#加载Samples_whole目录下的npz文件
def load_array (sample_dir_path, unit_num, win_len, stride):
    filename =  'Unit%s_win%s_str%s_smp10.npz' %(str(int(unit_num)), win_len, stride)
    filepath =  os.path.join(sample_dir_path, filename)
    loaded = np.load(filepath)

    return loaded['sample'].transpose(2, 0, 1), loaded['label']    #为什么要transpose?

def rmse(y_true, y_pred):
    return backend.sqrt(backend.mean(backend.square(y_pred - y_true), axis=-1))

#打乱窗口之间的顺序,防止过拟合
def shuffle_array(sample_array, label_array):
    ind_list = list(range(len(sample_array)))
    print("ind_list befor: ", ind_list[:10])
    print("ind_list befor: ", ind_list[-10:])
    ind_list = shuffle(ind_list)  #随机排序
    print("ind_list after: ", ind_list[:10])
    print("ind_list after: ", ind_list[-10:])
    print("Shuffeling in progress")
    shuffle_sample = sample_array[ind_list, :, :]
    shuffle_label = label_array[ind_list,]
    return shuffle_sample, shuffle_label

#绘制损失函数
def figsave(history, win_len, win_stride, bs, lr, sub):
    fig_acc = plt.figure(figsize=(15, 8))
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('Training', fontsize=24)
    plt.ylabel('loss', fontdict={'fontsize': 18})
    plt.xlabel('epoch', fontdict={'fontsize': 18})
    plt.legend(['Training loss', 'Validation loss'], loc='upper left', fontsize=18)
    plt.show()
    print ("saving file:training loss figure")
    fig_acc.savefig(pic_dir + "/training_w%s_s%s_bs%s_sub%s_lr%s.png" %(int(win_len), int(win_stride), int(bs), int(sub), str(lr)))
    return


# 计算浮点运算次数
def get_flops(model):
    concrete = tf.function(lambda inputs: model(inputs))
    concrete_func = concrete.get_concrete_function(
        [tf.TensorSpec([1, *inputs.shape[1:]]) for inputs in model.inputs])
    frozen_func, graph_def = convert_variables_to_constants_v2_as_graph(concrete_func)
    with tf.Graph().as_default() as graph:
        tf.graph_util.import_graph_def(graph_def, name='')
        run_meta = tf.compat.v1.RunMetadata()
        opts = tf.compat.v1.profiler.ProfileOptionBuilder.float_operation()
        flops = tf.compat.v1.profiler.profile(graph=graph, run_meta=run_meta, cmd="op", options=opts)
        return flops.total_float_ops




def scheduler(epoch, lr):
    if epoch == 30:
        print("lr decay by 10")
        return lr * 0.1
    elif epoch == 70:
        print("lr decay by 10")
        return lr * 0.1
    else:
        return lr


#释放内存
def release_list(a):
   del a[:]  #del a[:]会删除列表中的所有元素,但是不会删除列表a
   del a

units_index_train = [2.0, 5.0, 10.0, 16.0, 18.0, 20.0]
units_index_test = [11.0, 14.0, 15.0]



def main():
    # current_dir = os.path.dirname(os.path.abspath(__file__))
    parser = argparse.ArgumentParser(description='sample creator')
    parser.add_argument('-w', type=int, default=50, help='sequence length', required=False)#滑窗长度
    parser.add_argument('-s', type=int, default=1, help='stride of filter')  #步长
    parser.add_argument('-f', type=int, default=10, help='number of filter')  #卷积核数量
    parser.add_argument('-k', type=int, default=10, help='size of kernel')    #卷积核大小
    parser.add_argument('-bs', type=int, default=256, help='batch size')
    parser.add_argument('-ep', type=int, default=30, help='max epoch')
    parser.add_argument('-pt', type=int, default=20, help='patience') # 相当于早停率
    parser.add_argument('-vs', type=float, default=0.1, help='validation split')
    parser.add_argument('-lr', type=float, default=0.001, help='learning rate') #初试学习率
    parser.add_argument('-sub', type=int, default=10, help='subsampling stride') #重采样 取1/10


    args = parser.parse_args()

    win_len = args.w   #滑窗长度
    win_stride = args.s
    partition = 3
    n_filters = args.f
    kernel_size = args.k
    lr = args.lr
    bs = args.bs
    ep = args.ep
    pt = args.pt
    vs = args.vs
    sub = args.sub

    # beta_1 beta_2分别表示一阶、二阶矩估计的指数衰减率通常设为接近1的数
    amsgrad = optimizers.Adam(learning_rate=lr, beta_1=0.9, beta_2=0.999, epsilon=1e-07, amsgrad=True, name='Adam')
    rmsop = optimizers.RMSprop(learning_rate=lr, rho=0.9, momentum=0.0, epsilon=1e-07, centered=False,
                               name='RMSprop')

    #训练集 sample、label
    train_units_samples_lst =[]
    train_units_labels_lst = []

    for index in units_index_train:
        print("Load data index: ", index)
        sample_array, label_array = load_array (sample_dir_path, index, win_len, win_stride)
        #sample_array, label_array = shuffle_array(sample_array, label_array)
        print("sample_array.shape", sample_array.shape)
        print("label_array.shape", label_array.shape)
        sample_array = sample_array[::sub] # 以多少步长取数据 sub表示步长,如果是-1,就倒序取数,这里的sub为10,意思是隔10行取一条数据
        label_array = label_array[::sub]
        print("sub sample_array.shape", sample_array.shape)
        print("sub label_array.shape", label_array.shape)
        train_units_samples_lst.append(sample_array)
        train_units_labels_lst.append(label_array)

    sample_array = np.concatenate(train_units_samples_lst)  #为了解决内存不足,在训练单元中每10次采样1次,相当于把训练集缩小10倍
    label_array = np.concatenate(train_units_labels_lst)
    print ("samples are aggregated")

    release_list(train_units_samples_lst)
    release_list(train_units_labels_lst)
    train_units_samples_lst =[]
    train_units_labels_lst = []
    print("Memory released")  # 释放内存

    #sample_array, label_array = shuffle_array(sample_array, label_array)
    print("samples are shuffled")
    print("sample_array.shape", sample_array.shape)
    print("label_array.shape", label_array.shape)

    print ("train sample dtype", sample_array.dtype)
    print("train label dtype", label_array.dtype)


    # input_temp = Input(shape=(sample_array.shape[1], sample_array.shape[2]),name='kernel_size%s' %str(int(kernel_size)))
    # #------
    # one_d_cnn = one_dcnn(n_filters, kernel_size, sample_array, initializer)
    # cnn_out = one_d_cnn(input_temp)
    # x = cnn_out
    # # x = Dropout(0.5)(x)
    # main_output = Dense(1, activation='linear', kernel_initializer=initializer, name='main_output')(x)
    # one_d_cnn_model = Model(inputs=input_temp, outputs=main_output)

    # model = Model(inputs=[input_1, input_2], outputs=main_output)

    # <editor-fold desc="调用模型">
    # DCNN(深度卷积)网络
    one_d_cnn_model = one_dcnn(n_filters, kernel_size, sample_array, initializer)

    print(one_d_cnn_model.summary())
    one_d_cnn_model.compile(loss='mean_squared_error', optimizer=amsgrad, metrics=[rmse, 'mae'])

    # CNNBranch_model=CNNBranch(n_filters, win_len, 20,win_stride, kernel_size, 5)
    #
    # print(CNNBranch_model.summary())

    start = time.time()

    lr_scheduler = LearningRateScheduler(scheduler)

    # 模型训练
    '''
    此处保存最优模型到制定路径下面,并且提前命名.h5文件名称。
     save_best_only=True 只在模型被认为是目前最好时保存。如果 filepath 不包含格式化选项,例如 {epoch},则新保存的更好模型将覆盖之前保存的模型。
    '''
    one_d_cnn_model.compile(loss='mean_squared_error', optimizer=amsgrad, metrics='mae')
    history = one_d_cnn_model.fit(sample_array, label_array, epochs=ep, batch_size=bs, validation_split=vs, verbose=2,
                      callbacks = [EarlyStopping(monitor='val_loss', min_delta=0, patience=pt, verbose=1, mode='min'),
                                    ModelCheckpoint(model_temp_path, monitor='val_loss', save_best_only=True, mode='min', verbose=1)]
                      )


    #CNNBranch
    # CNNBranch_model.compile(loss='mean_squared_error', optimizer=amsgrad, metrics='mae')
    # history = CNNBranch_model.fit(sample_array, label_array, epochs=ep, batch_size=bs, validation_split=vs, verbose=2,
    #                   callbacks = [EarlyStopping(monitor='val_loss', min_delta=0, patience=pt, verbose=1, mode='min'),
    #                                 ModelCheckpoint(model_temp_path, monitor='val_loss', save_best_only=True, mode='min', verbose=1)]
    #                   )
    # </editor-fold>


    # TqdmCallback(verbose=2)
    # one_d_cnn_model.save(tf_temp_path,save_format='tf')
    figsave(history, win_len, win_stride, bs, lr, sub)

   #FLOPs:注意s小写,是floating point operations的缩写(s表复数),意指浮点运算数,理解为计算量。可以用来衡量算法/模型的复杂度。
    print("The FLOPs is:{}".format(get_flops(one_d_cnn_model)), flush=True)
    # print("The FLOPs is:{}".format(get_flops(CNNBranch_model)), flush=True)
    num_train = sample_array.shape[0]
    end = time.time()
    training_time = end - start
    print("Training time: ", training_time)


    ### Test (inference after training)  测试集
    start = time.time()

    output_lst = []
    truth_lst = []

    for index in units_index_test:
        print ("test idx: ", index)
        sample_array, label_array = load_array(sample_dir_path, index, win_len, win_stride)
        # estimator = load_model(tf_temp_path, custom_objects={'rmse':rmse})
        print("sample_array.shape", sample_array.shape)
        print("label_array.shape", label_array.shape)
        sample_array = sample_array[::sub]
        label_array = label_array[::sub]
        print("sub sample_array.shape", sample_array.shape)
        print("sub label_array.shape", label_array.shape)

        estimator = load_model(model_temp_path)  #加载训练好的最优模型

        y_pred_test = estimator.predict(sample_array)  #用训练好的模型对测试数据sample_array进行预测,输入:sample_array, 输出:RUL值
        output_lst.append(y_pred_test) # RUL预测值
        truth_lst.append(label_array)  #真实值

    print(output_lst[0].shape)
    print(truth_lst[0].shape)

    print(np.concatenate(output_lst).shape) #合并三个测试发动机的RUL 一共12524行
    print(np.concatenate(truth_lst).shape)

    output_array = np.concatenate(output_lst)[:, 0]
    trytg_array = np.concatenate(truth_lst)
    print(output_array.shape)
    print(trytg_array.shape)
    rms = sqrt(mean_squared_error(output_array, trytg_array)) #打印测试集的RMSE值
    print(rms)
    rms = round(rms, 2) #保留两位小数

    end = time.time()
    inference_time = end - start
    num_test = output_array.shape[0]  #测试集总单元数

    #绘制测试集上的预测值与真实值分布图
    for idx in range(len(units_index_test)):
        print(output_lst[idx])
        print(truth_lst[idx])
        fig_verify = plt.figure(figsize=(24, 10))
        plt.plot(output_lst[idx], color="green")
        plt.plot(truth_lst[idx], color="red", linewidth=2.0)
        plt.title('Unit%s inference' %str(int(units_index_test[idx])), fontsize=30)
        plt.yticks(fontsize=20)
        plt.xticks(fontsize=20)
        plt.ylabel('RUL', fontdict={'fontsize': 24})
        plt.xlabel('Timestamps', fontdict={'fontsize': 24})
        plt.legend(['Predicted', 'Truth'], loc='upper right', fontsize=28)
        plt.show()
        fig_verify.savefig(pic_dir + "/unit%s_test_w%s_s%s_bs%s_lr%s_sub%s_rmse-%s.png" %(str(int(units_index_test[idx])),
                                                                              int(win_len), int(win_stride), int(bs),
                                                                                    str(lr), int(sub), str(rms)))

    print("The FLOPs is:{}".format(get_flops(one_d_cnn_model)), flush=True) #浮点运算数,理解为计算量。可以用来衡量算法/模型的复杂度
    # print("The FLOPs is:{}".format(get_flops(CNNBranch_model)), flush=True)
    print("wind length_%s,  win stride_%s" %(str(win_len), str(win_stride)))
    print("# Training samples: ", num_train) #训练集,6台发动机,共52608行数据
    print("# Inference samples: ", num_test) #测试集,3台发动机,共12524 行数据
    print("Training time: ", training_time)  #训练时间
    print("Inference time: ", inference_time)  #预测时间
    print("Result in RMSE: ", rms)


if __name__ == '__main__':
    main()

实验结果图

在这里插入图片描述
unit11_test_w50_s1_bs256_lr0.001_sub10_rmse-5.78.png

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>