增加测试用例

增加测试用例
This commit is contained in:
Huoji's
2021-03-08 21:54:35 +08:00
parent 4275520386
commit 32c3234735
2 changed files with 189 additions and 22 deletions

141
test.py Normal file
View File

@@ -0,0 +1,141 @@
# coding=utf-8
import fileinput
import warnings
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
import tensorflow as tf
import sys
import re
import nltk
import sklearn
import tensorflow.keras as keras
import tensorflow.keras.preprocessing as keras_preprocessing
from sklearn.preprocessing import StandardScaler
import chardet
import math
from joblib import load
g_word_dict = {}
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" # see issue #152
os.environ["CUDA_VISIBLE_DEVICES"] = ""
os.environ["CUDA_VISIBLE_DEVICES"] = '-1'
def os_listdir_ex(file_dir, find_name): # 祖传代码
result = []
for root, dirs, files in os.walk(file_dir):
for file in files:
if os.path.splitext(file)[1] == find_name:
result.append(os.path.join(root, file))
# return result # 测试用
return result
def get_file_length(pFile): # 得到文件长度,祖传代码
fsize = os.path.getsize(pFile)
return int(fsize)
def flush_file(pFile): # 清洗php注释
file = open(pFile, 'r', encoding='gb18030', errors='ignore')
read_string = file.read()
file.close()
m = re.compile(r'/\*.*?\*/', re.S)
result = re.sub(m, '', read_string)
m = re.compile(r'//.*')
result = re.sub(m, '', result)
m = re.compile(r'#.*')
result = re.sub(m, '', result)
return result
# 得到文件熵 https://blog.csdn.net/jliang3/article/details/88359063
def get_file_entropy(pFile):
clean_string = flush_file(pFile)
text_list = {}
_sum = 0
result = 0
for word_iter in clean_string:
if word_iter != '\n' and word_iter != ' ':
if word_iter not in text_list.keys():
text_list[word_iter] = 1
else:
text_list[word_iter] = text_list[word_iter] + 1
for index in text_list.keys():
_sum = _sum + text_list[index]
for index in text_list.keys():
result = result - float(text_list[index])/_sum * \
math.log(float(text_list[index])/_sum, 2)
return result
def vectorize_sequences(sequences, dimention=1337):
# 创建一个大小为2500010000的全零矩阵
results = np.zeros((len(sequences), dimention))
for i, sequence in enumerate(sequences):
if i > dimention:
break
try:
results[i, sequence] = 1.
except:
break
return results
def get_file_word_bag(pFile):
global g_word_dict
english_punctuations = [',', '.', ':', ';', '?',
'(', ')', '[', ']', '&', '!', '*', '@', '#', '$', '%', 'php', '<', '>', '\'']
clean_string = flush_file(pFile)
word_list = nltk.word_tokenize(clean_string)
# 过滤掉不干净的
word_list = [
word_iter for word_iter in word_list if word_iter not in english_punctuations]
keras_token = keras.preprocessing.text.Tokenizer() # 初始化标注器
keras_token.fit_on_texts(word_list) # 学习出文本的字典
g_word_dict.update(keras_token.word_index)
# 通过texts_to_sequences 这个dict可以将每个string的每个词转成数字
sequences_data = keras_token.texts_to_sequences(word_list)
# 将每条文本的长度设置一个固定值, ps 超过1337个字符的"单词"不用说肯定是某个骇客想把大马变免杀马
# word_bag = keras_preprocessing.sequence.pad_sequences(sequences_data, maxlen=1337, dtype='int16')
word_bag = []
for index in range(0, len(sequences_data)):
if len(sequences_data[index]) != 0:
for zeus in range(0, len(sequences_data[index])):
word_bag.append(sequences_data[index][zeus])
return word_bag
file_path = '.\\1.php'
entropy = get_file_entropy(file_path)
length = get_file_length(file_path)
word_bag = get_file_word_bag(file_path)
array_input = np.array([[entropy, length]])
data_frame = pd.DataFrame(
{'length': [length], 'entropy': [entropy], 'word_bag': [word_bag]}, columns=['length', 'entropy', 'word_bag'])
# scaler = StandardScaler()
scaler_entropy = load('scaler_entropy.joblib')
scaler_length = load('scaler_length.joblib')
data_frame['length_scaled'] = scaler_length.transform(
data_frame['length'].values.reshape(-1, 1))
data_frame['entropy_scaled'] = scaler_entropy.transform(
data_frame['entropy'].values.reshape(-1, 1))
data_train_pre = data_frame.filter(items=['length_scaled', 'entropy_scaled'])
# data_train_pre = data_frame.filter(items=['length', 'entropy'])
data_train_x_1 = tf.constant(data_train_pre)
data_train_x_2 = tf.constant(
vectorize_sequences(data_frame['word_bag'].values))
print(data_frame.head())
model_name = 'huoji1.h5' # huoji.h5 huoji_scaled.h5 huoji_no_scale.h5
model = keras.models.load_model(model_name)
model.summary()
print(data_train_x_1, data_train_x_2)
prediction = model.predict([data_train_x_1, data_train_x_2])
print(prediction)

View File

@@ -15,7 +15,15 @@ import tensorflow.keras.preprocessing as keras_preprocessing
from sklearn.preprocessing import StandardScaler
import chardet
import math
from joblib import dump
'''
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
config = tf.ConfigProto(allow_soft_placement=True)
gpu_options = tf.GPUOptions(per_process_gpu_memory_fraction=0.5)
config.gpu_options.allow_growth = True
sess0 = tf.InteractiveSession(config=config)
'''
g_word_dict = {}
@@ -25,7 +33,7 @@ def os_listdir_ex(file_dir, find_name): # 祖传代码
for file in files:
if os.path.splitext(file)[1] == find_name:
result.append(os.path.join(root, file))
# return result 测试用
# return result # 测试用
return result
@@ -96,7 +104,13 @@ def vectorize_sequences(sequences, dimention=1337):
# 创建一个大小为2500010000的全零矩阵
results = np.zeros((len(sequences), dimention))
for i, sequence in enumerate(sequences):
results[i, sequence] = 1.
if i > dimention:
break
try:
results[i, sequence] = 1.
except:
break
return results
@@ -150,7 +164,8 @@ def build_network():
# y = label
# 进来的file length_scaled entropy_scaled word_bag
# 第一网络是一个TextCNN 词嵌入-卷积池化*3-拼接-全连接-dropout-全连接
input_1 = keras.layers.Input(shape=(1337,), dtype='int16', name='word_bag')
input_1 = keras.layers.Input(
shape=(1337,), dtype='int16', name='word_bag')
# 词嵌入(使用预训练的词向量)
embed = keras.layers.Embedding(
len(g_word_dict) + 1, 300, input_length=1337)(input_1)
@@ -232,31 +247,42 @@ def build_network():
# get_functions("C:\\Users\\Administrator\\Desktop\\webshell检测\\webshell\\一句话\\一句话.php")
data_frame = get_data_frame()
data_frame['length'] = data_frame['file'].map(
lambda file_name: get_file_length(file_name)).astype(int)
data_frame['entropy'] = data_frame['file'].map(
lambda file_name: get_file_entropy(file_name)).astype(float)
# 归一化这两个东西
scaler = StandardScaler()
data_frame['length_scaled'] = scaler.fit_transform(
data_frame['length'].values.reshape(-1, 1), scaler.fit(data_frame['length'].values.reshape(-1, 1)))
data_frame['entropy_scaled'] = scaler.fit_transform(
data_frame['entropy'].values.reshape(-1, 1), scaler.fit(data_frame['entropy'].values.reshape(-1, 1)))
# 导入词袋
data_frame['word_bag'] = data_frame['file'].map(
lambda file_name: get_file_word_bag(file_name))
data_frame = []
if os.path.exists("save.csv") == False:
data_frame = get_data_frame()
print(data_frame.head(5))
data_frame['length'] = data_frame['file'].map(
lambda file_name: get_file_length(file_name)).astype(int)
data_frame['entropy'] = data_frame['file'].map(
lambda file_name: get_file_entropy(file_name)).astype(float)
# 归一化这两个东西
scaler_length = StandardScaler()
scaler_entropy = StandardScaler()
data_frame['length_scaled'] = scaler_length.fit_transform(
data_frame['length'].values.reshape(-1, 1), scaler_length.fit(data_frame['length'].values.reshape(-1, 1)))
data_frame['entropy_scaled'] = scaler_entropy.fit_transform(
data_frame['entropy'].values.reshape(-1, 1), scaler_entropy.fit(data_frame['entropy'].values.reshape(-1, 1)))
# 导入词袋
data_frame['word_bag'] = data_frame['file'].map(
lambda file_name: get_file_word_bag(file_name))
data_frame.to_csv("save.csv")
dump(scaler_length, 'scaler_length.joblib')
dump(scaler_entropy, 'scaler_entropy.joblib')
else:
data_frame = pd.read_csv("save.csv", header=0)
print(data_frame.head(5))
skip_Data_num = 2610
data_train_pre = data_frame.filter(
items=['length_scaled', 'entropy_scaled'])
data_train_y = tf.constant(data_frame.filter(
items=['label']))
data_train_x_1 = tf.constant(data_train_pre)
items=['label'])[:skip_Data_num])
data_train_x_1 = tf.constant(data_train_pre[:skip_Data_num])
data_train_x_2 = tf.constant(
vectorize_sequences(data_frame['word_bag'].values))
vectorize_sequences(data_frame['word_bag'].values[:skip_Data_num]))
# 现在这个是一个 (batch_size,(1337个单词[1337个hot code]))
network_model = build_network()
network_model.summary()
history = network_model.fit(
x=[data_train_x_1, data_train_x_2], y=data_train_y, batch_size=128, epochs=128)
x=[data_train_x_1, data_train_x_2], y=data_train_y, epochs=100)
network_model.save('huoji.h5')