Standardize import ordering and code formatting
This commit is contained in:
@@ -1,9 +1,9 @@
|
||||
use crate::{DetectionResult, EvasionResult, ProcessInfo, ThreatContext, ThreatLevel};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{SystemTime, Duration};
|
||||
use std::time::{Duration, SystemTime};
|
||||
use tokio::sync::{broadcast, mpsc};
|
||||
use serde::{Serialize, Deserialize};
|
||||
use crate::{DetectionResult, ThreatLevel, ProcessInfo, ThreatContext, EvasionResult};
|
||||
|
||||
/// Real-time Event Streaming and Alerting System
|
||||
/// Provides configurable alerting, correlation, and notification capabilities
|
||||
@@ -519,6 +519,12 @@ pub struct EventBuffer {
|
||||
retention_period: Duration,
|
||||
}
|
||||
|
||||
impl Default for EventStreamingSystem {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl EventStreamingSystem {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
@@ -526,12 +532,18 @@ impl EventStreamingSystem {
|
||||
alert_manager: AlertManager::new(),
|
||||
correlation_engine: CorrelationEngine::new(),
|
||||
notification_system: NotificationSystem::new(),
|
||||
event_buffer: Arc::new(Mutex::new(EventBuffer::new(10000, Duration::from_secs(3600)))),
|
||||
event_buffer: Arc::new(Mutex::new(EventBuffer::new(
|
||||
10000,
|
||||
Duration::from_secs(3600),
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
/// Publish a detection event
|
||||
pub async fn publish_detection_event(&mut self, detection: DetectionResult) -> Result<(), Box<dyn std::error::Error>> {
|
||||
pub async fn publish_detection_event(
|
||||
&mut self,
|
||||
detection: DetectionResult,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let event = StreamingEvent {
|
||||
event_id: format!("det_{}", uuid::Uuid::new_v4()),
|
||||
timestamp: SystemTime::now(),
|
||||
@@ -556,11 +568,16 @@ impl EventStreamingSystem {
|
||||
tags: vec!["process-injection".to_string(), "detection".to_string()],
|
||||
};
|
||||
|
||||
self.publish_event(EventChannel::DetectionEvents, event).await
|
||||
self.publish_event(EventChannel::DetectionEvents, event)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Publish an evasion detection event
|
||||
pub async fn publish_evasion_event(&mut self, evasion: EvasionResult, process: &ProcessInfo) -> Result<(), Box<dyn std::error::Error>> {
|
||||
pub async fn publish_evasion_event(
|
||||
&mut self,
|
||||
evasion: EvasionResult,
|
||||
_process: &ProcessInfo,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let severity = if evasion.sophistication_score > 0.8 {
|
||||
EventSeverity::Critical
|
||||
} else if evasion.sophistication_score > 0.6 {
|
||||
@@ -589,11 +606,16 @@ impl EventStreamingSystem {
|
||||
tags: vec!["evasion".to_string(), "anti-analysis".to_string()],
|
||||
};
|
||||
|
||||
self.publish_event(EventChannel::EvasionDetection, event).await
|
||||
self.publish_event(EventChannel::EvasionDetection, event)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Publish a generic event to specified channel
|
||||
async fn publish_event(&mut self, channel: EventChannel, event: StreamingEvent) -> Result<(), Box<dyn std::error::Error>> {
|
||||
async fn publish_event(
|
||||
&mut self,
|
||||
channel: EventChannel,
|
||||
event: StreamingEvent,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
// Add to event buffer for correlation
|
||||
{
|
||||
let mut buffer = self.event_buffer.lock().unwrap();
|
||||
@@ -614,9 +636,12 @@ impl EventStreamingSystem {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_correlated_incident(&mut self, incident: CorrelatedIncident) -> Result<(), Box<dyn std::error::Error>> {
|
||||
async fn handle_correlated_incident(
|
||||
&mut self,
|
||||
incident: CorrelatedIncident,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
println!("Correlated incident detected: {}", incident.title);
|
||||
|
||||
|
||||
// Create alert for correlated incident
|
||||
let alert = Alert {
|
||||
alert_id: format!("inc_{}", uuid::Uuid::new_v4()),
|
||||
@@ -653,6 +678,12 @@ impl EventStreamingSystem {
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for EventPublisher {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl EventPublisher {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
@@ -661,7 +692,11 @@ impl EventPublisher {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn publish(&mut self, channel: EventChannel, event: StreamingEvent) -> Result<(), Box<dyn std::error::Error>> {
|
||||
pub async fn publish(
|
||||
&mut self,
|
||||
channel: EventChannel,
|
||||
event: StreamingEvent,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
if let Some(sender) = self.channels.get(&channel) {
|
||||
sender.send(event)?;
|
||||
}
|
||||
@@ -669,18 +704,23 @@ impl EventPublisher {
|
||||
}
|
||||
|
||||
pub fn subscribe(&mut self, channel: EventChannel) -> broadcast::Receiver<StreamingEvent> {
|
||||
let sender = self.channels.entry(channel.clone())
|
||||
.or_insert_with(|| {
|
||||
let (tx, _) = broadcast::channel(1000);
|
||||
tx
|
||||
});
|
||||
|
||||
let sender = self.channels.entry(channel.clone()).or_insert_with(|| {
|
||||
let (tx, _) = broadcast::channel(1000);
|
||||
tx
|
||||
});
|
||||
|
||||
let receiver = sender.subscribe();
|
||||
*self.subscribers.entry(channel).or_insert(0) += 1;
|
||||
receiver
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for AlertManager {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl AlertManager {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
@@ -691,12 +731,17 @@ impl AlertManager {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn evaluate_alerts(&mut self, event: &StreamingEvent) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let triggered_rules: Vec<AlertRule> = self.alert_rules.iter()
|
||||
pub async fn evaluate_alerts(
|
||||
&mut self,
|
||||
event: &StreamingEvent,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let triggered_rules: Vec<AlertRule> = self
|
||||
.alert_rules
|
||||
.iter()
|
||||
.filter(|rule| rule.enabled && self.evaluate_rule_conditions(rule, event))
|
||||
.cloned()
|
||||
.collect();
|
||||
|
||||
|
||||
for rule in triggered_rules {
|
||||
self.trigger_alert(&rule, event).await?;
|
||||
}
|
||||
@@ -734,7 +779,11 @@ impl AlertManager {
|
||||
})
|
||||
}
|
||||
|
||||
async fn trigger_alert(&mut self, rule: &AlertRule, event: &StreamingEvent) -> Result<(), Box<dyn std::error::Error>> {
|
||||
async fn trigger_alert(
|
||||
&mut self,
|
||||
rule: &AlertRule,
|
||||
event: &StreamingEvent,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let alert = Alert {
|
||||
alert_id: format!("alert_{}", uuid::Uuid::new_v4()),
|
||||
rule_id: rule.rule_id.clone(),
|
||||
@@ -761,7 +810,7 @@ impl AlertManager {
|
||||
|
||||
pub async fn create_alert(&mut self, alert: Alert) -> Result<(), Box<dyn std::error::Error>> {
|
||||
println!("Alert created: {} - {}", alert.alert_id, alert.title);
|
||||
|
||||
|
||||
let active_alert = ActiveAlert {
|
||||
alert: alert.clone(),
|
||||
escalation_level: 0,
|
||||
@@ -769,13 +818,20 @@ impl AlertManager {
|
||||
notification_count: 0,
|
||||
};
|
||||
|
||||
self.active_alerts.insert(alert.alert_id.clone(), active_alert);
|
||||
self.active_alerts
|
||||
.insert(alert.alert_id.clone(), active_alert);
|
||||
self.alert_history.push(alert);
|
||||
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for CorrelationEngine {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl CorrelationEngine {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
@@ -789,13 +845,16 @@ impl CorrelationEngine {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn correlate_event(&mut self, event: &StreamingEvent) -> Result<Option<CorrelatedIncident>, Box<dyn std::error::Error>> {
|
||||
pub async fn correlate_event(
|
||||
&mut self,
|
||||
event: &StreamingEvent,
|
||||
) -> Result<Option<CorrelatedIncident>, Box<dyn std::error::Error>> {
|
||||
// Simplified correlation logic
|
||||
for rule in &self.correlation_rules {
|
||||
if let Some(incident) = self.evaluate_correlation_rule(rule, event) {
|
||||
if let Some(_incident) = self.evaluate_correlation_rule(rule, event) {
|
||||
self.incident_tracker.incident_counter += 1;
|
||||
let incident_id = format!("incident_{}", self.incident_tracker.incident_counter);
|
||||
|
||||
|
||||
let correlated_incident = CorrelatedIncident {
|
||||
incident_id: incident_id.clone(),
|
||||
timestamp: SystemTime::now(),
|
||||
@@ -810,21 +869,33 @@ impl CorrelationEngine {
|
||||
status: IncidentStatus::Open,
|
||||
};
|
||||
|
||||
self.incident_tracker.incidents.insert(incident_id, correlated_incident.clone());
|
||||
self.incident_tracker
|
||||
.incidents
|
||||
.insert(incident_id, correlated_incident.clone());
|
||||
return Ok(Some(correlated_incident));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn evaluate_correlation_rule(&self, rule: &CorrelationRule, event: &StreamingEvent) -> Option<()> {
|
||||
fn evaluate_correlation_rule(
|
||||
&self,
|
||||
_rule: &CorrelationRule,
|
||||
_event: &StreamingEvent,
|
||||
) -> Option<()> {
|
||||
// Simplified correlation rule evaluation
|
||||
// In a real implementation, this would be much more sophisticated
|
||||
Some(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for NotificationSystem {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl NotificationSystem {
|
||||
pub fn new() -> Self {
|
||||
let (tx, _) = mpsc::channel(1000);
|
||||
@@ -836,20 +907,35 @@ impl NotificationSystem {
|
||||
}
|
||||
|
||||
pub fn add_email_channel(&mut self, name: String, config: SmtpConfig) {
|
||||
let channel = EmailChannel { smtp_config: config };
|
||||
let channel = EmailChannel {
|
||||
smtp_config: config,
|
||||
};
|
||||
self.channels.insert(name, Box::new(channel));
|
||||
}
|
||||
|
||||
pub fn add_slack_channel(&mut self, name: String, webhook_url: String, default_channel: String) {
|
||||
let channel = SlackChannel { webhook_url, default_channel };
|
||||
pub fn add_slack_channel(
|
||||
&mut self,
|
||||
name: String,
|
||||
webhook_url: String,
|
||||
default_channel: String,
|
||||
) {
|
||||
let channel = SlackChannel {
|
||||
webhook_url,
|
||||
default_channel,
|
||||
};
|
||||
self.channels.insert(name, Box::new(channel));
|
||||
}
|
||||
|
||||
pub fn add_webhook_channel(&mut self, name: String, endpoint_url: String, headers: HashMap<String, String>) {
|
||||
let channel = WebhookChannel {
|
||||
endpoint_url,
|
||||
headers,
|
||||
auth_token: None
|
||||
pub fn add_webhook_channel(
|
||||
&mut self,
|
||||
name: String,
|
||||
endpoint_url: String,
|
||||
headers: HashMap<String, String>,
|
||||
) {
|
||||
let channel = WebhookChannel {
|
||||
endpoint_url,
|
||||
headers,
|
||||
auth_token: None,
|
||||
};
|
||||
self.channels.insert(name, Box::new(channel));
|
||||
}
|
||||
@@ -866,11 +952,11 @@ impl EventBuffer {
|
||||
|
||||
pub fn add_event(&mut self, event: StreamingEvent) {
|
||||
self.events.push(event);
|
||||
|
||||
|
||||
// Remove old events
|
||||
let cutoff_time = SystemTime::now() - self.retention_period;
|
||||
self.events.retain(|e| e.timestamp >= cutoff_time);
|
||||
|
||||
|
||||
// Limit buffer size
|
||||
if self.events.len() > self.max_size {
|
||||
self.events.drain(0..self.events.len() - self.max_size);
|
||||
@@ -879,8 +965,9 @@ impl EventBuffer {
|
||||
|
||||
pub fn get_events_in_window(&self, window: Duration) -> Vec<&StreamingEvent> {
|
||||
let cutoff_time = SystemTime::now() - window;
|
||||
self.events.iter()
|
||||
self.events
|
||||
.iter()
|
||||
.filter(|e| e.timestamp >= cutoff_time)
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user