Update project documentation and enhance malware detection engine
- Completely rewrite README.md with comprehensive project overview and technical details - Add detailed explanation of antivirus engine architecture and detection strategies - Implement multi-stage malware detection with machine learning, sandbox, and PE structure analysis - Update project configuration and add new source files for enhanced detection capabilities - Integrate XGBoost machine learning model with C++ export functionality - Improve sandbox environment with advanced module and LDR data table handling - Remove legacy Python prediction and training scripts in favor of C++ implementation
This commit is contained in:
@@ -1,264 +1,117 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
import xgboost as xgb
|
||||
from sklearn.model_selection import train_test_split
|
||||
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
|
||||
import matplotlib.pyplot as plt
|
||||
import seaborn as sns
|
||||
import os
|
||||
import joblib
|
||||
from sklearn.metrics import accuracy_score
|
||||
import m2cgen as m2c
|
||||
from xgboost import XGBClassifier
|
||||
import csv
|
||||
|
||||
def load_data(malware_csv, whitelist_csv):
|
||||
"""
|
||||
加载恶意软件和白名单CSV文件
|
||||
"""
|
||||
print(f"加载恶意软件数据: {malware_csv}")
|
||||
|
||||
# 预处理:先获取CSV的列数
|
||||
# 读取第一行以确定正确的列数
|
||||
try:
|
||||
header = pd.read_csv(malware_csv, nrows=1)
|
||||
expected_columns = len(header.columns)
|
||||
print(f"预期列数: {expected_columns}")
|
||||
|
||||
# 使用自定义函数读取CSV,处理字段不足的行
|
||||
malware_df = pd.read_csv(
|
||||
malware_csv,
|
||||
header=0,
|
||||
low_memory=False,
|
||||
on_bad_lines='skip', # 跳过无法解析的行
|
||||
dtype=float, # 将所有数据列转为浮点型
|
||||
converters={0: str} # 第一列为文件路径,保持为字符串类型
|
||||
)
|
||||
|
||||
# 检查列数是否不足,如果不足则填充0
|
||||
actual_columns = len(malware_df.columns)
|
||||
if actual_columns < expected_columns:
|
||||
for i in range(actual_columns, expected_columns):
|
||||
col_name = f"col_{i}"
|
||||
malware_df[col_name] = 0.0
|
||||
|
||||
print(f"成功读取恶意软件数据,形状: {malware_df.shape}")
|
||||
except Exception as e:
|
||||
print(f"读取恶意软件数据时出错: {e}")
|
||||
return None, None
|
||||
|
||||
malware_df['label'] = 1 # 恶意软件标签为1
|
||||
|
||||
print(f"加载白名单数据: {whitelist_csv}")
|
||||
try:
|
||||
# 同样处理白名单数据
|
||||
whitelist_df = pd.read_csv(
|
||||
whitelist_csv,
|
||||
header=0,
|
||||
low_memory=False,
|
||||
on_bad_lines='skip',
|
||||
dtype=float,
|
||||
converters={0: str}
|
||||
)
|
||||
|
||||
# 确保列数与恶意软件数据一致
|
||||
whitelist_cols = len(whitelist_df.columns)
|
||||
malware_cols = len(malware_df.columns) - 1 # 减去标签列
|
||||
|
||||
if whitelist_cols < malware_cols:
|
||||
for i in range(whitelist_cols, malware_cols):
|
||||
col_name = f"col_{i}"
|
||||
whitelist_df[col_name] = 0.0
|
||||
|
||||
print(f"成功读取白名单数据,形状: {whitelist_df.shape}")
|
||||
except Exception as e:
|
||||
print(f"读取白名单数据时出错: {e}")
|
||||
return None, None
|
||||
|
||||
whitelist_df['label'] = 0 # 白名单软件标签为0
|
||||
|
||||
# 确保两个DataFrame的列完全一致(除了可能的文件路径差异)
|
||||
malware_features = set(malware_df.columns)
|
||||
whitelist_features = set(whitelist_df.columns)
|
||||
|
||||
# 找出不同的列
|
||||
malware_only = malware_features - whitelist_features
|
||||
whitelist_only = whitelist_features - malware_features
|
||||
|
||||
# 为缺少的列添加0值
|
||||
for col in malware_only:
|
||||
if col != 'label':
|
||||
whitelist_df[col] = 0.0
|
||||
|
||||
for col in whitelist_only:
|
||||
if col != 'label':
|
||||
malware_df[col] = 0.0
|
||||
|
||||
# 合并数据
|
||||
combined_df = pd.concat([malware_df, whitelist_df], ignore_index=True, sort=False)
|
||||
|
||||
# 第一列通常是文件路径,需要将其移除
|
||||
# 先保存文件路径以便后续参考
|
||||
file_paths = combined_df.iloc[:, 0].tolist()
|
||||
|
||||
features = combined_df.iloc[:, 1:-1] # 除去第一列(文件路径)和最后一列(标签)
|
||||
labels = combined_df['label']
|
||||
|
||||
print(f"数据加载完成: {len(malware_df)} 个恶意样本, {len(whitelist_df)} 个白名单样本")
|
||||
print(f"特征维度: {features.shape}")
|
||||
|
||||
return features, labels
|
||||
malware_csv = 'data/malware_features.csv'
|
||||
whitelist_csv = 'data/whitelist_features.csv'
|
||||
|
||||
def train_xgboost_model(X_train, y_train, X_test, y_test):
|
||||
"""
|
||||
训练XGBoost模型
|
||||
"""
|
||||
print("开始训练XGBoost模型...")
|
||||
# 手动读取CSV文件并自动填充缺失字段
|
||||
def read_csv_with_padding(file_path):
|
||||
print(f"开始读取 {file_path}...")
|
||||
max_cols = 0
|
||||
rows = []
|
||||
|
||||
# 处理数据中可能存在的NaN值
|
||||
print("检查并填充缺失值...")
|
||||
X_train = X_train.fillna(0)
|
||||
X_test = X_test.fillna(0)
|
||||
# 首先确定最大列数
|
||||
with open(file_path, 'r', encoding='latin1', errors='replace') as f:
|
||||
csv_reader = csv.reader(f)
|
||||
for row in csv_reader:
|
||||
max_cols = max(max_cols, len(row))
|
||||
rows.append(row)
|
||||
|
||||
# 检查是否还有无限值,并将其替换为0
|
||||
X_train = X_train.replace([np.inf, -np.inf], 0)
|
||||
X_test = X_test.replace([np.inf, -np.inf], 0)
|
||||
print(f"文件 {file_path} 最大列数: {max_cols}")
|
||||
|
||||
print(f"处理后的训练数据形状: {X_train.shape}")
|
||||
print(f"处理后的测试数据形状: {X_test.shape}")
|
||||
# 为每一行填充缺失的字段
|
||||
padded_rows = []
|
||||
for row in rows:
|
||||
# 如果行长度小于最大列数,用'0'填充
|
||||
padded_row = row + ['0'] * (max_cols - len(row))
|
||||
padded_rows.append(padded_row)
|
||||
|
||||
# 设置XGBoost参数
|
||||
params = {
|
||||
'max_depth': 6, # 树的最大深度
|
||||
'learning_rate': 0.1, # 学习率
|
||||
'n_estimators': 100, # 树的数量
|
||||
'objective': 'binary:logistic', # 二分类问题
|
||||
'eval_metric': 'logloss', # 评估指标
|
||||
'subsample': 0.8, # 样本采样率
|
||||
'colsample_bytree': 0.8, # 特征采样率
|
||||
'random_state': 42 # 随机种子
|
||||
}
|
||||
|
||||
# 创建XGBoost分类器
|
||||
model = xgb.XGBClassifier(**params)
|
||||
|
||||
# 训练模型
|
||||
model.fit(
|
||||
X_train, y_train,
|
||||
eval_set=[(X_train, y_train), (X_test, y_test)],
|
||||
early_stopping_rounds=10,
|
||||
verbose=True
|
||||
)
|
||||
|
||||
print("模型训练完成!")
|
||||
return model
|
||||
# 转换为DataFrame
|
||||
df = pd.DataFrame(padded_rows)
|
||||
print(f"读取 {file_path} 完成,形状: {df.shape}")
|
||||
return df
|
||||
|
||||
def evaluate_model(model, X_test, y_test):
|
||||
"""
|
||||
评估模型性能
|
||||
"""
|
||||
print("评估模型性能...")
|
||||
|
||||
# 在测试集上进行预测
|
||||
y_pred = model.predict(X_test)
|
||||
|
||||
# 计算准确率
|
||||
accuracy = accuracy_score(y_test, y_pred)
|
||||
print(f"准确率: {accuracy:.4f}")
|
||||
|
||||
# 打印分类报告
|
||||
print("\n分类报告:")
|
||||
print(classification_report(y_test, y_pred, target_names=['白名单', '恶意软件']))
|
||||
|
||||
# 打印混淆矩阵
|
||||
cm = confusion_matrix(y_test, y_pred)
|
||||
plt.figure(figsize=(8, 6))
|
||||
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
|
||||
xticklabels=['白名单', '恶意软件'],
|
||||
yticklabels=['白名单', '恶意软件'])
|
||||
plt.xlabel('预测')
|
||||
plt.ylabel('实际')
|
||||
plt.title('混淆矩阵')
|
||||
plt.savefig('confusion_matrix.png')
|
||||
plt.close()
|
||||
|
||||
# 显示特征重要性
|
||||
plt.figure(figsize=(12, 8))
|
||||
xgb.plot_importance(model, max_num_features=20)
|
||||
plt.title('特征重要性')
|
||||
plt.savefig('feature_importance.png')
|
||||
plt.close()
|
||||
|
||||
return accuracy
|
||||
# 读取CSV文件
|
||||
malware_data = read_csv_with_padding(malware_csv)
|
||||
whitelist_data = read_csv_with_padding(whitelist_csv)
|
||||
|
||||
def save_model(model, output_path='xgboost_malware_detector.model'):
|
||||
"""
|
||||
保存模型到文件
|
||||
"""
|
||||
print(f"保存模型到 {output_path}")
|
||||
joblib.dump(model, output_path)
|
||||
print("模型保存完成!")
|
||||
# 删除第一列(路径列)
|
||||
malware_data = malware_data.iloc[:, 1:]
|
||||
whitelist_data = whitelist_data.iloc[:, 1:]
|
||||
|
||||
def main():
|
||||
"""
|
||||
主函数:加载数据,训练模型,评估结果,保存模型
|
||||
"""
|
||||
try:
|
||||
print("开始恶意软件检测模型训练...")
|
||||
|
||||
# 设置文件路径
|
||||
malware_csv = 'data/malware_features.csv'
|
||||
whitelist_csv = 'data/whitelist_features.csv'
|
||||
|
||||
# 检查文件是否存在
|
||||
if not os.path.exists(malware_csv):
|
||||
print(f"错误: 找不到恶意软件特征文件 {malware_csv}")
|
||||
return
|
||||
|
||||
if not os.path.exists(whitelist_csv):
|
||||
print(f"错误: 找不到白名单特征文件 {whitelist_csv}")
|
||||
return
|
||||
|
||||
# 加载数据
|
||||
X, y = load_data(malware_csv, whitelist_csv)
|
||||
|
||||
if X is None or y is None:
|
||||
print("数据加载失败,终止训练")
|
||||
return
|
||||
|
||||
print(f"数据集加载完成,共 {len(X)} 个样本")
|
||||
|
||||
# 数据划分
|
||||
try:
|
||||
X_train, X_test, y_train, y_test = train_test_split(
|
||||
X, y, test_size=0.2, random_state=42, stratify=y)
|
||||
|
||||
print(f"训练集: {len(X_train)} 样本,测试集: {len(X_test)} 样本")
|
||||
except Exception as e:
|
||||
print(f"数据划分出错: {e}")
|
||||
return
|
||||
|
||||
# 训练模型
|
||||
try:
|
||||
model = train_xgboost_model(X_train, y_train, X_test, y_test)
|
||||
except Exception as e:
|
||||
print(f"模型训练出错: {e}")
|
||||
return
|
||||
|
||||
# 评估模型
|
||||
try:
|
||||
evaluate_model(model, X_test, y_test)
|
||||
except Exception as e:
|
||||
print(f"模型评估出错: {e}")
|
||||
|
||||
# 保存模型
|
||||
try:
|
||||
save_model(model)
|
||||
print("模型训练和评估完成!")
|
||||
except Exception as e:
|
||||
print(f"模型保存出错: {e}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"训练过程中发生未预期错误: {e}")
|
||||
# 将所有列转换为数值类型,非数值将转为NaN
|
||||
for col in malware_data.columns:
|
||||
malware_data[col] = pd.to_numeric(malware_data[col], errors='coerce')
|
||||
for col in whitelist_data.columns:
|
||||
whitelist_data[col] = pd.to_numeric(whitelist_data[col], errors='coerce')
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
# 用0填充NaN值
|
||||
malware_data.fillna(0, inplace=True)
|
||||
whitelist_data.fillna(0, inplace=True)
|
||||
|
||||
# 找到最大列数(最长的特征向量)
|
||||
max_cols = max(malware_data.shape[1], whitelist_data.shape[1])
|
||||
|
||||
# 用 0 填充(Padding)数据,使所有样本的列数相同
|
||||
malware_data = malware_data.reindex(columns=range(max_cols), fill_value=0)
|
||||
whitelist_data = whitelist_data.reindex(columns=range(max_cols), fill_value=0)
|
||||
|
||||
# 添加标签
|
||||
malware_data['label'] = 1 # 恶意软件
|
||||
whitelist_data['label'] = 0 # 白名单(正常)
|
||||
print(malware_data.head())
|
||||
print(whitelist_data.head())
|
||||
|
||||
# 合并数据
|
||||
combined_data = pd.concat([malware_data, whitelist_data], ignore_index=True)
|
||||
print(f"合并后数据形状: {combined_data.shape}")
|
||||
|
||||
# 分离特征和标签
|
||||
X = combined_data.drop('label', axis=1)
|
||||
y = combined_data['label']
|
||||
|
||||
# 分割数据集
|
||||
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
|
||||
print(f"训练集形状: {X_train.shape}, 测试集形状: {X_test.shape}")
|
||||
|
||||
# 创建 XGBoost 数据集
|
||||
dtrain = xgb.DMatrix(X_train, label=y_train)
|
||||
dtest = xgb.DMatrix(X_test, label=y_test)
|
||||
|
||||
# 训练 XGBoost 模型
|
||||
num_rounds = 30
|
||||
# 创建watchlist来监控训练和验证集的性能
|
||||
watchlist = [(dtrain, '训练集'), (dtest, '验证集')]
|
||||
pos_ratio = np.mean(y_train) # 计算 1 的比例
|
||||
|
||||
clf = XGBClassifier(
|
||||
base_score=pos_ratio, #
|
||||
|
||||
objective='binary:logistic', # 适用于二分类
|
||||
max_depth=6, # 树的最大深度
|
||||
learning_rate=0.1, # 学习率
|
||||
n_estimators=100, # 迭代轮数
|
||||
subsample=0.8, # 采样比例,防止过拟合
|
||||
colsample_bytree=0.8,
|
||||
use_label_encoder=False, # 关闭 XGBoost 的 label 编码 (适用于新版本)
|
||||
eval_metric='logloss' # 交叉熵损失
|
||||
)
|
||||
clf.fit(X_train, y_train)
|
||||
|
||||
# 预测
|
||||
y_pred_prob = clf.predict(X_test)
|
||||
y_pred = [1 if prob > 0.5 else 0 for prob in y_pred_prob]
|
||||
|
||||
# 计算准确率
|
||||
accuracy = accuracy_score(y_test, y_pred)
|
||||
print(f'XGBoost 分类准确率: {accuracy:.4f}')
|
||||
code = m2c.export_to_c(clf)
|
||||
output_file = "malware_detector.cpp"
|
||||
with open(output_file, "w") as f:
|
||||
f.write(code)
|
||||
|
||||
Reference in New Issue
Block a user