AI

SKADA 1기 AI경연 Public / Final 1위

META_BS 2024. 8. 11. 14:46

 

SK AI Data Academy 1기 AI 경연

반도체 SSD 신뢰성 사전 불량 예측에서 1위를 달성했습니다

 

제가 데이터를 분석하고 모델을 구축한 방법을 정리해보겠습니다

 

문제 개요

이번 문제에서는 Data Center Storage에서 수집한 센서정보를 바탕으로 시계열 분류 모델을 구현하고 그 성능을 비교하고자 합니다

Data Center에 사용되는 SSD는 일반적으로 24시간 작동해야 하며 문제가 발생할 경우 심각한 상황을 야기할 수 있습니다

SSD의 센서정보를 통해 사전에 SSD에 이상이 발생할지 여부를 분류할 수 있는 모델을 개발하고, 그에 맞춰 이상 SSD를 검출하는 것이 목표입니다.

 

라고 하네요, 시계열 데이터 기반의 이상치 binary classification이 목표입니다.

우선 데이터 분석을 해보겠습니다.

EDA

 

Train data의 형태는 아래와 같습니다

train_df = pd.read_csv(os.path.join(data_dir, "train.csv"), index_col='Serial Number')
test_x = pd.read_csv(os.path.join(data_dir, "test_x.csv"), index_col='Serial Number')

 

하나의 Serial (제품?)에 대해 시간별로 기록된 값들과 라벨이 있습니다. X1은 고유번호같네요

사용할 수 있는 feature는 X2 ~ X18로 17개임을 알 수 있습니다

 

train_df['TIMESTAMP'] = pd.to_datetime(train_df['TIMESTAMP'])
train_df['TIMESTAMP'] = train_df['TIMESTAMP'].map(lambda t: t.strftime('%Y-%m-%d %H:%M'))

test_x['TIMESTAMP'] = pd.to_datetime(test_x['TIMESTAMP'])
test_x['TIMESTAMP'] = test_x['TIMESTAMP'].map(lambda t: t.strftime('%Y-%m-%d %H:%M'))

date_time_key = list(train_df.columns)[0]
feature_keys = list(train_df.columns)[2:-1]
target_key = list(train_df.columns)[-1]

train_x = train_df.drop(columns='Y')
train_y = pd.read_csv(os.path.join(data_dir, "train_y.csv"), index_col='Serial Number')
train_y.value_counts()

Y
0    7060
1    1212
dtype: int64

 

y값의 value count를 보니 0 (정상)과 1 (이상) 데이터 수의 큰 차이를 볼 수 있습니다.

이상치 탐지에서 흔히 볼 수 있는 데이터 불균형 현상입니다.

기억하고 나중에 적절한 방법으로 불균형을 처리해야 합니다.

print("Train Data 크기 :", train_x.shape)
print("Test Data 크기 :", test_x.shape)

print("Train Data의 Serial Number의 unique 값 :", len(train_x.index.unique()))
print("Test Data의 Serial Number의 unique 값 :", len(test_x.index.unique()))

Train Data 크기 : (555456, 19)
Test Data 크기 : (138880, 19)
Train Data의 Serial Number의 unique 값 : 8272
Test Data의 Serial Number의 unique 값 : 2069

 

Serial Number (하나의 장비) 기준으로 봤을 때, Train 8200개로 데이터 수가 상당히 적은 것을 볼 수 있습니다.

 

train_x.drop(columns='X1', inplace=True)
test_x.drop(columns='X1', inplace=True)

scaler = StandardScaler()
train_x[feature_keys] = scaler.fit_transform(train_df[feature_keys])
test_x[feature_keys] = scaler.transform(test_x[feature_keys])

train_x_by_serial = [group[1] for group in train_x.groupby(train_x.index)]
test_x_by_serial = [group[1] for group in test_x.groupby(test_x.index)]

train_x_by_serial = [group.sort_values('TIMESTAMP') for group in train_x_by_serial]
test_x_by_serial = [group.sort_values('TIMESTAMP') for group in test_x_by_serial]

 

따로 떨어져 있는 데이터들을 Serial Number에 맞게 묶어줍니다.

len(train_x_by_serial)
8272

 

위에서 봤던 unique 값과 같음을 확인할 수 있습니다. 잘 묶인 것 같습니다.

 

다음은 하나의 Serial이 얼마만큼의 시계열 데이터를 가지고 있는지 확인해보겠습니다.

lens = [len(x) for x in train_x_by_serial]
plt.hist(lens)

하나의 Serial Number 당 54~68개의 시계열 데이터를 지니고 있음을 확인할 수 있습니다.

 

Serial 하나를 가지고 각 feature에 대한 시각화를 해보겠습니다

for idx, feature in enumerate(feature_keys):
    plt.plot(list(train_x_by_serial[view][feature]))
plt.title(f'Idx : {view}, {train_y.iloc[view]}')
plt.show()

경진대회 규칙에 의해 시각화 결과는 생략하겠습니다

 

EDA의 끝입니다, 얻은 결과와 모델 구축에 있어서 중요한 점을 정리해보겠습니다:

 

- 적은 데이터 수 (약 8000개): 상당히 적은 수의 데이터입니다. 딥러닝 모델 사용에 어려움이 있을것 같습니다.

- 데이터 불균형: Oversampling, Undersampling, SMOTE 와 같은 불균형 처리 방법을 시도하는것이 좋아보입니다.

- 적절한 길이: 데이터의 길이가 완벽히 동일하지는 않지만, 분포가 좁습니다. 적절히 잘라서 사용하면 될것 같습니다.

 

Train Test Split

def train_test_split(Xs, ys, test_ratio=0.2):
    data_per_label = {}

    for x, y in zip(Xs, ys):
        label = y
        if label not in data_per_label:
            data_per_label[label] = []
        data_per_label[label].append((x, y))

    train = []
    test = []

    for label in data_per_label:
        data = data_per_label[label]
        n_test = int(len(data) * test_ratio)
        test += data[:n_test]
        train += data[n_test:]

    X_train, y_train = zip(*train)
    X_test, y_test = zip(*test)

    return X_train, X_test, y_train, y_test
X_train, X_val, y_train, y_val = train_test_split(train_x_by_serial, train_y['Y'], test_ratio=0.2)

X_train = [x.drop(columns='TIMESTAMP') for x in X_train]
X_val = [x.drop(columns='TIMESTAMP') for x in X_val]
X_test = [x.drop(columns='TIMESTAMP') for x in test_x_by_serial]

print("Train Data의 개수 :", len(X_train))
print("Validation Data의 개수 :", len(X_val))
print("Test Data의 개수 :", len(X_test))

 

Train, Val 데이터를 나눠줍니다. test 데이터는 제출용입니다.

주의할 점은 아직까지 시간순으로 정렬되어있기 때문에 데이터를 shuffle하면 시계열의 특성을 잃어버릴 수 있다는 것입니다

 

def align_data(data, series_length):
    data_features = [x[feature_keys] for x in data]
    len_data = len(data_features)
    length_aligned_X = []
    for x in data_features:
        if len(x) >= series_length:
            length_aligned_X.append(x[-series_length:])
        else:
            length_aligned_X.append(x.append([x.iloc[-1]] * (series_length - len(x))))
    return np.array(length_aligned_X).reshape(len_data, -1)
    
series_length = 50

X_train = align_data(X_train, series_length)
X_val = align_data(X_val, series_length)
X_test = align_data(X_test, series_length)

y_train = np.array(y_train)
y_val = np.array(y_val)

 

모델에 집어넣기 위해 input의 shape를 동일하게 맞춰줘야 합니다. 앞서 확인한 데이터의 길이 분포를 기반으로, 데이터를 잘라주겠습니다.

 

최신 데이터를 기준으로 50개를 반영하고, 50개 미만인 경우에는 마지막 데이터를 반복해서 채워줍니다.

 

Train data에서는 데이터 수가 50개보다 적을일이 없지만, Test나 Private에서는 적절한 데이터의 수가 제공되지 않을 수 있기 때문에 그냥 지나쳐서는 안됩니다.

 

Model

Random Forest

가장 중요한 부분입니다. Baseline 코드에서는 RandomFroest를 사용하였습니다. 

점수 산정이 F1 score로 이루어지기 때문에, 이 값이 높은 모델을 선택하는 것이 좋을 것 같습니다.

# Scikit-learn을 활용해서 시계열 데이터 분류 모델을 학습합니다.
# RandomForestClassifier 객체 생성
clf = DecisionTreeClassifier(random_state=42)

# 학습 데이터에 대해 학습
clf.fit(X_train, y_train)

# 검증 데이터에 대해 예측
y_val_pred = clf.predict(X_val)

# 검증 데이터에 대한 F1 Score 계산
f1 = f1_score(y_val, y_val_pred, average='macro')

print(f'검증 데이터에 대한 F1 Score: {f1:.4f}')

검증 데이터에 대한 F1 Score: 0.7946

 

 

0.7946이 나왔습니다.

 

XGBoost, CatBoost, LightGBM

많은 classification 대회에서 좋은 퍼포먼스를 보여주는 3대장을 써봤습니다.

세가지 모델 모두 비슷한 F1 score를 얻었습니다. xgboost를 기준으로 작성하겠습니다 

from xgboost import XGBClassifier
clf = XGBClassifier(random_state=42)

clf.fit(X_train, y_train)
y_val_pred = clf.predict(X_val)
f1 = f1_score(y_val, y_val_pred, average='macro')

print(f'검증 데이터에 대한 F1 Score: {f1:.4f}')

검증 데이터에 대한 F1 Score: 0.9149

 

0.9149로 상당히 높은 점수입니다. public score는 0.82입니다

 

LSTM, GRU, Transformer

LSTM: 0.8251 Val 

GRU: 0.8556 Val 

Transformer: 0.8719 Val 

 

아쉽지만 모두 XGB모델보다 떨어지는 성능을 보여줍니다.

 

class PositionalEncoding(nn.Module):
    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)
        
  class TransformerModel(nn.Module):
    def __init__(self, input_dim, model_dim, num_heads, num_layers, ff_dim, output_dim, dropout):
        super(TransformerModel, self).__init__()
        self.model_dim = model_dim
        self.embedding = nn.Linear(input_dim, model_dim)
        self.pos_encoder = PositionalEncoding(model_dim, dropout)
        encoder_layers = TransformerEncoderLayer(model_dim, num_heads, ff_dim, dropout)
        self.transformer_encoder = TransformerEncoder(encoder_layers, num_layers)
        self.fc = nn.Linear(model_dim, output_dim)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x, src_mask):
        x = self.embedding(x) * math.sqrt(self.model_dim)
        x = self.pos_encoder(x)
        x = x.permute(1, 0, 2)
        output = self.transformer_encoder(x, src_mask)
        output = output.permute(1, 0, 2)
        output = self.fc(output[:, -1, :])
        return self.sigmoid(output)

def generate_square_subsequent_mask(sz: int):
    return torch.triu(torch.ones(sz, sz) * float('-inf'), diagonal=1)

 

TCN

일부 sequence 데이터에서 rnn보다 높은 성능을 보여주는 cnn 기반 모델도 사용해봤습니다.

Val F1 score는 0.8774 입니다.

 

Feature Extraction & HyperParameter Tuning

다양한 모델을 시도해봤지만 점수가 오르지 않았습니다.

주성분 분석을 통해 적은 영향을 끼치는 Feature를 제외하고 학습시켜봤으나 성능이 증가하지 않았습니다.

XGBoost를 위한 추가적인 feature extraction을 해보겠습니다.

def extract_features(series):
        feature_vector = []

        for i in range(series.shape[1]):
            feature_vector.append(np.mean(series[:, i]))
            feature_vector.append(np.std(series[:, i]))
            feature_vector.append(np.max(series[:, i]))
            feature_vector.append(np.min(series[:, i]))

        for i in range(series.shape[1]):
            slope = np.polyfit(np.arange(series.shape[0]), series[:, i], 1)[0]
            feature_vector.append(slope)

        return feature_vector

 

평균, 표준편차, 최대 최소를 구하고 append 해줍니다. polyfit는 x와 y에 대해 deg차 다항식을 피팅하는 함수입니다.

deg=1 이므로 데이터에 맞는 직선을 구합니다. 그 중 첫번째 값이 직선방정식의 계수, 기울기를 구하게 됩니다.

 

이정도 feature 추가로는 성능향상을 기대하기 힘듭니다. 더 해보겠습니다

def extract_advanced_features(series):
    import scipy
    feature_vector = []

    for i in range(series.shape[1]):
        feature_vector.append(np.median(series[:, i]))
        feature_vector.append(np.percentile(series[:, i], 25))
        feature_vector.append(np.percentile(series[:, i], 75))
        feature_vector.append(scipy.stats.skew(series[:, i]))
        feature_vector.append(scipy.stats.kurtosis(series[:, i]))

    return feature_vector

 

중간값, Q1, Q3, Skew(왜도, 데이터의 비대칭성), Kurtosis(첨도, 꼬리의 두께) 를 추가해줍니다.

 

def extract_diff_features(series):
    feature_vector = []

    diff_series = np.diff(series, axis=0)
    
    for i in range(diff_series.shape[1]):
        feature_vector.append(np.mean(diff_series[:, i]))
        feature_vector.append(np.std(diff_series[:, i]))
        feature_vector.append(np.max(diff_series[:, i]))
        feature_vector.append(np.min(diff_series[:, i]))

    return feature_vector

 

차분(differencing)을 구해줬습니다. 각 시점의 값과 이전 시점의 값 차이를 계산해줍니다.

시계열 센서 이상치 탐지이므로, 이상이 있다면 센서가 급격한 반응을 보이는것이 일반적입니다.

학습에 있어서 큰 도움이 될것으로 예상됩니다.

 

from scipy.fftpack import fft

def extract_fourier_features(series):
    feature_vector = []
    
    for i in range(series.shape[1]):
        fft_coeffs = np.abs(fft(series[:, i]))
        feature_vector.append(np.mean(fft_coeffs))
        feature_vector.append(np.std(fft_coeffs))
        feature_vector.append(np.max(fft_coeffs))
        feature_vector.append(np.min(fft_coeffs))
    
    return feature_vector

 

푸리에 변환입니다. Signal을 주파수 도메인으로 분류해주기 때문에 이것도 꽤 도움이 될것 같습니다

진폭의 평균, 표준편차, 최대 최소를 추가해줍니다.

 

def extract_outlier_features(series):
    feature_vector = []
    
    for i in range(series.shape[1]):
        q1 = np.percentile(series[:, i], 25)
        q3 = np.percentile(series[:, i], 75)
        iqr = q3 - q1
        lower_bound = q1 - 1.5 * iqr
        upper_bound = q3 + 1.5 * iqr
        outliers = ((series[:, i] < lower_bound) | (series[:, i] > upper_bound)).sum()
        outlier_ratio = outliers / len(series[:, i])
        
        feature_vector.append(outliers)
        feature_vector.append(outlier_ratio)
    
    return feature_vector

 

이상치 탐지에서 가장 쉽고 많이 쓰이는 outlier 분석입니다. 아웃라이어의 개수와 비율을 추가해 줍니다

 

from scipy.signal import find_peaks

def extract_peak_valley_features(series):
    feature_vector = []
    
    for i in range(series.shape[1]):
        peaks, _ = find_peaks(series[:, i])
        valleys, _ = find_peaks(-series[:, i])
        
        feature_vector.append(len(peaks))
        feature_vector.append(len(valleys))
        feature_vector.append(np.mean(series[peaks, i]) if peaks.size > 0 else 0)
        feature_vector.append(np.mean(series[valleys, i]) if valleys.size > 0 else 0)
    
    return feature_vector

 

데이터에서 극값의 개수와 극값의 평균을 추가해줍니다. 데이터의 변곡 빈도를 찾아내 이상탐지에 도움이 될것 같습니다

 

def extract_lagged_features(series, lags=[1, 2, 3]):
    feature_vector = []
    
    for lag in lags:
        lagged_values = np.roll(series, lag, axis=0)
        lagged_values[:lag, :] = 0
        
        for i in range(series.shape[1]):
            feature_vector.append(np.mean(lagged_values[:, i]))
            feature_vector.append(np.std(lagged_values[:, i]))
            feature_vector.append(np.max(lagged_values[:, i]))
            feature_vector.append(np.min(lagged_values[:, i]))

    return feature_vector

 

시계열 데이터의 시간의존성을 반영하기 위해 지연된 데이터의 특성들을 몇개 추출해줍니다.

 

print(len(X_train[0]))

618

 

기존에 850개 (50 * 17)였던 데이터 길이가 feature extraction이 끝나니 618개가 되었습니다.

원본 값은 넣지 않고 특징만 추출했기 때문입니다. 다시 XGBoost에 넣어봅시다

 

from xgboost import XGBClassifier

params={
    'max_depth': 11,
    'learning_rate': 0.1,
    'n_estimators': 300,
    'reg_alpha': 0.1,
}

clf = XGBClassifier(random_state=42 , scale_pos_weight=0.4, **params)

clf.fit(X_train, y_train)
y_val_pred = clf.predict(X_val)
f1 = f1_score(y_val, y_val_pred, average='macro')

print(f'검증 데이터에 대한 F1 Score: {f1:.8f}')

 

GridSearchCV로 파라미터 튜닝까지 완료했습니다.

아까 데이터 불균형이 있다고 했는데, scale_pos_weight 값을 조정하면 모델이 학습할 때 클래스 별 가중치를 반영하도록 할 수 있습니다.

 

검증 데이터에 대한 F1 Score: 0.98162749

 

만족할만한 F1 Score가 나왔습니다. Public Score는 0.9311 입니다.

 

Public / Final 1위를 달성했습니다