VAEを用いたなりすまし検知を書き直してみる

こちらの記事「VAEを用いたUNIXセッションのなりすまし検出」はソースコードが完全に公開されていないので、補完してみました。

環境

  • macOS 10.14
  • python 3.7

データ準備

まず、データを準備します。こちらからダウンロードします。ダウンロードしたら、解凍し、Fasttext用にデータを結合します。各ユーザの前半の5000は訓練用のデータで、なりすましがないデータですので、これをすべてのユーザから抽出します。ユーザごとに一行にコマンドをスペースでつないで作成します。全部で五十行になります。

for f in masquerade-data/*;do head -n 5000 $f|perl -pe 's/\n/ /g'|perl -pe 's/$/\n/'>> train.txt; done

Fasttextによるベクトル化

単語をfasttextによりベクトル化します。50次元を使っています。fasttextはpipでインストールしたものでもOKです。


import numpy as np
import os
import fasttext

model = fasttext.train_unsupervised("train.txt", "skipgram", dim=50, minCount=1)

UMAPによる可視化

元記事では出現頻度の高いコマンドから200取り出して、可視化していますが、面倒なので、modelの単語列から上位50単語を抜き出して可視化します。


%matplotlib inline
import matplotlib.pyplot as plt

import umap
import matplotlib.cm as cm
np.random.seed(12345)
x_word=[model[w] for w in model.words]
y_word=model.words
embedding = umap.UMAP().fit_transform(x_word[:50])
for emb,lbl in zip(embedding,y_word[:50]):
    plt.plot(emb[0],emb[1],marker=".")
    plt.annotate(lbl,(emb[0],emb[1]))
# https://teratail.com/questions/110775
plt.show()

モデル

元記事のものではそのままでは動かなかったので多少修正しています。


import numpy as np
import keras
from keras import layers
from keras import backend as K
from keras.models import Model
from keras.layers.core import Lambda
from keras.optimizers import Adam
 
#K.clear_session()
 
shape = (100, 50, 1)
epochs = 120
batch_size = 16
latent_dim = 2
 
input_cmd = keras.Input(shape=shape)
 
x = layers.Conv2D(32, 3,
                  padding='same', activation='relu')(input_cmd)
x = layers.Conv2D(64, 3,
                  padding='same', activation='relu',
                  strides=(2, 2))(x)
x = layers.Dropout(0.1)(x)
x = layers.Conv2D(64, 3,
                  padding='same', activation='relu')(x)
x = layers.Dropout(0.2)(x)
x = layers.Conv2D(64, 3,
                  padding='same', activation='relu')(x)
x = layers.Dropout(0.2)(x)
 
shape_before_flattening = K.int_shape(x)
 
x = layers.Flatten()(x)
x = layers.Dense(32, activation='relu')(x)
 
z_mean = layers.Dense(latent_dim)(x)
z_log_var = layers.Dense(latent_dim)(x)
 
def sampling(args):
    z_mean, z_log_var = args
    epsilon = K.random_normal(shape=(K.shape(z_mean)[0], latent_dim),
                              mean=0., stddev=1.)
    return z_mean + K.exp(z_log_var) * epsilon
 
z = Lambda(sampling)([z_mean, z_log_var])
 
decoder_input = layers.Input(K.int_shape(z)[1:])
 
x = layers.Dense(np.prod(shape_before_flattening[1:]),
                 activation='relu')(decoder_input)
 
x = layers.Reshape(shape_before_flattening[1:])(x)
 
x = layers.Conv2DTranspose(32, 3, strides=(2, 2),
                           padding='same', activation='relu')(x)
x = layers.Conv2D(1, 3,
                  padding='same', activation='sigmoid')(x)
 
decoder = Model(decoder_input, x)
 
z_decoded = decoder(z)
 
class CustomVariationalLayer(keras.layers.Layer):
 
    def vae_loss(self, x, z_decoded):
        x = K.flatten(x)
        z_decoded = K.flatten(z_decoded)
        xent_loss = keras.metrics.binary_crossentropy(x, z_decoded)
        kl_loss = -5e-4 * K.mean(
            1 + z_log_var - K.square(z_mean) - K.exp(z_log_var), axis=-1)
        return K.mean(xent_loss + kl_loss)
 
    def call(self, inputs):
        x = inputs[0]
        z_decoded = inputs[1]
        loss = self.vae_loss(x, z_decoded)
        self.add_loss(loss, inputs=inputs)
        return x
 
y = CustomVariationalLayer()([input_cmd, z_decoded])


vae = Model(input_cmd, y)
vae.compile(optimizer=Adam(), loss=None)
vae.summary()

入力データ

入力データを作成します。ファイルから再度読み込んで、訓練データを上位4000コマンド、テストデータを4001から5000コマンドで作成します。また、元記事と同様にUser2を使っています。


with open("masquerade-data/User2") as f:
    cmds=[l.rstrip() for l in f.readlines()]

x=[model[cmd] for cmd in cmds]
x_train=np.reshape(np.array(x[:4000]),(40,100,50,1))
x_valid=np.reshape(np.array(x[4000:5000]),(10,100,50,1))

訓練

Earlystoppingを入れました。訓練は100コマンドをひとかたまりとして訓練しています。


from keras.callbacks import EarlyStopping
es_cb = EarlyStopping(monitor='val_loss', patience=50, verbose=1, mode='auto')
history = vae.fit(x=x_train, y=None,
        shuffle=True,
        epochs=epochs,
        batch_size=batch_size,
        validation_data=(x_valid, None),
        callbacks=[ es_cb])

結果の可視化

訓練データでのLossを可視化します


%matplotlib inline
import matplotlib.pyplot as plt
plt.plot(history.history["loss"],label="training loss")
plt.plot(history.history["val_loss"],label="validation loss")
plt.title("model loss")
plt.xlabel("epoch")
plt.ylabel("loss")
plt.legend(loc="upper right")
plt.show()

テスト

最後に、1%の確率でなりすましが入っているという各ユーザの5001行目からのデータをつかって、Lossを可視化します。同様に100コマンドをひとかたまりとして、100個分をテストしています


x_test=np.reshape(np.array(x[5000:]),(100,100,50,1))
loss_test=[]
for i in range(len(x_test)):
    loss_test.append((vae.evaluate(x=np.reshape(x_test[i],(1,100,50,1)),y=None)))
    

%matplotlib inline
import matplotlib.pyplot as plt
plt.plot(loss_test,label="test loss")
plt.title("test loss")
plt.xlabel("epoch")
plt.ylabel("loss")
plt.legend(loc="upper right")
plt.show()