Hai mai desiderato visualizzare il traffico di rete in tempo reale? In questo tutorial imparerai come creare un cruscotto interattivo per l’analisi del traffico di rete con Python e Streamlit
. Streamlit
è un framework open-source in Python che puoi utilizzare per sviluppare applicazioni web per l’analisi e l’elaborazione dei dati.
Alla fine di questo tutorial, saprai come catturare i pacchetti di rete grezzi dalla scheda di interfaccia di rete (NIC) del tuo computer, elaborare i dati e creare bellissime visualizzazioni che si aggiornano in tempo reale.
Indice
Perché è Importante l’Analisi del Traffico di Rete?
L’analisi del traffico di rete è un requisito critico nelle imprese in cui le reti costituiscono il fulcro di quasi ogni applicazione e servizio. Al centro di ciò, abbiamo l’analisi dei pacchetti di rete che coinvolge il monitoraggio della rete, la cattura di tutto il traffico (ingresso ed uscita) e l’interpretazione di questi pacchetti mentre fluiscono attraverso una rete. È possibile utilizzare questa tecnica per identificare modelli di sicurezza, rilevare anomalie e garantire la sicurezza e l’efficienza della rete.
Questo progetto di prova concettuale su cui lavoreremo in questo tutorial è particolarmente utile poiché ti aiuta a visualizzare e analizzare l’attività di rete in tempo reale. E questo ti permetterà di comprendere come vengono affrontati i problemi di risoluzione dei problemi, le ottimizzazioni delle prestazioni e l’analisi della sicurezza nei sistemi aziendali.
Prerequisiti
-
Python 3.8 o una versione più recente installata sul tuo sistema.
-
Una comprensione di base dei concetti di networking informatico.
-
Familiarità con il linguaggio di programmazione Python e le sue librerie ampiamente utilizzate.
-
Conoscenza di base delle tecniche di visualizzazione dei dati e delle librerie.
Come impostare il tuo progetto
Per iniziare, crea la struttura del progetto e installa gli strumenti necessari con Pip utilizzando i seguenti comandi:
mkdir network-dashboard
cd network-dashboard
pip install streamlit pandas scapy plotly
Utilizzeremo Streamlit
per le visualizzazioni del dashboard, Pandas
per l’elaborazione dei dati, Scapy
per la cattura e l’elaborazione dei pacchetti di rete, e infine Plotly
per tracciare grafici con i dati raccolti.
Come costruire le funzionalità principali
Metteremo tutto il codice in un unico file chiamato dashboard.py
. Innanzitutto, cominciamo importando tutti gli elementi che utilizzeremo:
import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from scapy.all import *
from collections import defaultdict
import time
from datetime import datetime
import threading
import warnings
import logging
from typing import Dict, List, Optional
import socket
Ora configuriamo il logging impostando una configurazione di base per il logging. Questo sarà utilizzato per tracciare eventi e far funzionare la nostra applicazione in modalità debug. Attualmente abbiamo impostato il livello di logging su INFO
, il che significa che gli eventi con livello INFO
o superiore verranno visualizzati. Se non sei familiare con il logging in Python, ti consiglio di dare un’occhiata a questo documento che approfondisce l’argomento.
# Configura il logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
Successivamente, costruiremo il nostro processore di pacchetti. Implementeremo la funzionalità di elaborazione dei pacchetti catturati in questa classe.
class PacketProcessor:
"""Process and analyze network packets"""
def __init__(self):
self.protocol_map = {
1: 'ICMP',
6: 'TCP',
17: 'UDP'
}
self.packet_data = []
self.start_time = datetime.now()
self.packet_count = 0
self.lock = threading.Lock()
def get_protocol_name(self, protocol_num: int) -> str:
"""Convert protocol number to name"""
return self.protocol_map.get(protocol_num, f'OTHER({protocol_num})')
def process_packet(self, packet) -> None:
"""Process a single packet and extract relevant information"""
try:
if IP in packet:
with self.lock:
packet_info = {
'timestamp': datetime.now(),
'source': packet[IP].src,
'destination': packet[IP].dst,
'protocol': self.get_protocol_name(packet[IP].proto),
'size': len(packet),
'time_relative': (datetime.now() - self.start_time).total_seconds()
}
# Aggiungi informazioni specifiche per TCP
if TCP in packet:
packet_info.update({
'src_port': packet[TCP].sport,
'dst_port': packet[TCP].dport,
'tcp_flags': packet[TCP].flags
})
# Aggiungi informazioni specifiche per UDP
elif UDP in packet:
packet_info.update({
'src_port': packet[UDP].sport,
'dst_port': packet[UDP].dport
})
self.packet_data.append(packet_info)
self.packet_count += 1
# Mantieni solo gli ultimi 10000 pacchetti per prevenire problemi di memoria
if len(self.packet_data) > 10000:
self.packet_data.pop(0)
except Exception as e:
logger.error(f"Error processing packet: {str(e)}")
def get_dataframe(self) -> pd.DataFrame:
"""Convert packet data to pandas DataFrame"""
with self.lock:
return pd.DataFrame(self.packet_data)
Questa classe costruirà la nostra funzionalità principale e ha diverse funzioni di utilità che verranno utilizzate per elaborare i pacchetti.
I pacchetti di rete sono categorizzati in due a livello di trasporto (TCP e UDP) e nel protocollo ICMP a livello di rete. Se non sei familiare con i concetti di TCP/IP, ti consiglio di dare un’occhiata a questo articolo su freeCodeCamp News.
Il nostro costruttore terrà traccia di tutti i pacchetti visti che sono categorizzati in questi bucket di tipi di protocollo TCP/IP che abbiamo definito. Prenderemo nota anche dell’ora di acquisizione del pacchetto, dei dati acquisiti e del numero di pacchetti acquisiti.
Utilizzeremo anche un blocco del thread per garantire che venga elaborato un solo pacchetto alla volta. Questo può essere ulteriormente esteso per consentire al progetto di avere un’elaborazione parallela dei pacchetti.
La funzione di supporto get_protocol_name
ci aiuta a ottenere il tipo corretto del protocollo in base ai loro numeri di protocollo. Per dare qualche contesto su questo, l’Autorità per l’assegnazione dei numeri di Internet (IANA) assegna numeri standardizzati per identificare diversi protocolli in un pacchetto di rete. Man mano che vediamo questi numeri nel pacchetto di rete analizzato, sapremo quale tipo di protocollo viene utilizzato nel pacchetto attualmente intercettato. Per il campo di applicazione di questo progetto, mappiamo solo TCP, UDP e ICMP (Ping). Se incontriamo un altro tipo di pacchetto, lo classificheremo come ALTRO(<numero_protocollo>)
.
La funzione process_packet
gestisce la nostra funzionalità principale che elaborerà questi singoli pacchetti. Se il pacchetto contiene uno strato IP, verranno prese nota degli indirizzi IP di origine e destinazione, del tipo di protocollo, delle dimensioni del pacchetto e del tempo trascorso dall’inizio dell’acquisizione del pacchetto.
Per i pacchetti con protocolli specifici dello strato di trasporto (come TCP e UDP), cattureremo le porte di origine e destinazione insieme ai flag TCP per i pacchetti TCP. Questi dettagli estratti verranno memorizzati in memoria nella lista packet_data
. Terreremo traccia anche del packet_count
man mano che questi pacchetti vengono elaborati.
La funzione get_dataframe
ci aiuta a convertire la lista packet_data
in un data-frame Pandas
che verrà poi utilizzato per la nostra visualizzazione.
Come Creare le Visualizzazioni di Streamlit
È ora di costruire il nostro pannello interattivo di Streamlit. Definiremo una funzione chiamata create_visualization
nello script dashboard.py
(al di fuori della nostra classe di elaborazione dei pacchetti).
def create_visualizations(df: pd.DataFrame):
"""Create all dashboard visualizations"""
if len(df) > 0:
# Distribuzione dei protocolli
protocol_counts = df['protocol'].value_counts()
fig_protocol = px.pie(
values=protocol_counts.values,
names=protocol_counts.index,
title="Protocol Distribution"
)
st.plotly_chart(fig_protocol, use_container_width=True)
# Cronologia dei pacchetti
df['timestamp'] = pd.to_datetime(df['timestamp'])
df_grouped = df.groupby(df['timestamp'].dt.floor('S')).size()
fig_timeline = px.line(
x=df_grouped.index,
y=df_grouped.values,
title="Packets per Second"
)
st.plotly_chart(fig_timeline, use_container_width=True)
# Top indirizzi IP sorgente
top_sources = df['source'].value_counts().head(10)
fig_sources = px.bar(
x=top_sources.index,
y=top_sources.values,
title="Top Source IP Addresses"
)
st.plotly_chart(fig_sources, use_container_width=True)
Questa funzione prenderà il data frame in input e ci aiuterà a tracciare tre grafici:
-
Grafico della Distribuzione dei Protocolli: Questo grafico mostrerà la proporzione dei diversi protocolli (ad esempio, TCP, UDP, ICMP) nel traffico dei pacchetti catturati.
-
Grafico della Cronologia dei Pacchetti: Questo grafico mostrerà il numero di pacchetti processati al secondo durante un periodo di tempo.
-
Grafico dei Top Indirizzi IP Sorgente: Questo grafico metterà in evidenza i primi 10 indirizzi IP che hanno inviato il maggior numero di pacchetti nel traffico catturato.
Il grafico di distribuzione del protocollo è semplicemente un grafico a torta dei conteggi dei protocolli per i tre diversi tipi (insieme ad ALTRO). Utilizziamo gli strumenti Python Streamlit
e Plotly
per tracciare questi grafici. Poiché abbiamo anche annotato l’orario in cui è iniziata la cattura dei pacchetti, utilizzeremo questi dati per tracciare la tendenza dei pacchetti catturati nel tempo.
Per il secondo grafico, eseguiremo un’operazione groupby
sui dati e otterremo il numero di pacchetti catturati in ogni secondo (S
sta per secondi), e infine tracceremo il grafico.
Infine, per il terzo grafico, conteremo gli indirizzi IP sorgente distinti osservati e tracceremo un grafico dei conteggi degli IP per mostrare i primi 10 IP.
Come Catturare i Pacchetti di Rete
Ora, costruiamo la funzionalità che ci permetterà di catturare i dati dei pacchetti di rete.
def start_packet_capture():
"""Start packet capture in a separate thread"""
processor = PacketProcessor()
def capture_packets():
sniff(prn=processor.process_packet, store=False)
capture_thread = threading.Thread(target=capture_packets, daemon=True)
capture_thread.start()
return processor
Si tratta di una semplice funzione che istanzia la classe PacketProcessor
e quindi utilizza la funzione sniff
nel modulo scapy
per iniziare a catturare i pacchetti.
Qui utilizziamo il threading per consentirci di catturare i pacchetti in modo indipendente dal flusso del programma principale. Ciò garantisce che l’operazione di cattura dei pacchetti non blocchi altre operazioni come l’aggiornamento del cruscotto in tempo reale. Restituiamo anche l’istanza di PacketProcessor
creata in modo che possa essere utilizzata nel nostro programma principale.
Mettere Tutto Insieme
Ora uniamo tutti questi pezzi con la nostra funzione main
che fungerà da funzione driver per il nostro programma.
def main():
"""Main function to run the dashboard"""
st.set_page_config(page_title="Network Traffic Analysis", layout="wide")
st.title("Real-time Network Traffic Analysis")
# Inizializza il processore di pacchetti nello stato della sessione
if 'processor' not in st.session_state:
st.session_state.processor = start_packet_capture()
st.session_state.start_time = time.time()
# Crea il layout della dashboard
col1, col2 = st.columns(2)
# Ottieni i dati attuali
df = st.session_state.processor.get_dataframe()
# Visualizza le metriche
with col1:
st.metric("Total Packets", len(df))
with col2:
duration = time.time() - st.session_state.start_time
st.metric("Capture Duration", f"{duration:.2f}s")
# Visualizza le visualizzazioni
create_visualizations(df)
# Visualizza i pacchetti recenti
st.subheader("Recent Packets")
if len(df) > 0:
st.dataframe(
df.tail(10)[['timestamp', 'source', 'destination', 'protocol', 'size']],
use_container_width=True
)
# Aggiungi il pulsante di aggiornamento
if st.button('Refresh Data'):
st.rerun()
# Aggiornamento automatico
time.sleep(2)
st.rerun()
Questa funzione istanzierà anche la dashboard di Streamlit
e integrerà tutti i nostri componenti insieme. Prima impostiamo il titolo della pagina della nostra dashboard di Streamlit
e quindi inizializziamo il nostro PacketProcessor
. Utilizziamo lo stato della sessione in Streamlit
per garantire che venga creato solo un’istanza di acquisizione pacchetti e che ne venga mantenuto lo stato.
Ora, otterremo dinamicamente il dataframe dallo stato della sessione ogni volta che i dati vengono elaborati e inizieremo a visualizzare le metriche e le visualizzazioni. Visualizzeremo anche i pacchetti recentemente acquisiti insieme a informazioni come il timestamp, gli IP di origine e destinazione, il protocollo e la dimensione del pacchetto. Aggiungeremo anche la possibilità per l’utente di aggiornare manualmente i dati dalla dashboard mentre li aggiorniamo automaticamente ogni due secondi.
Infine, eseguiamo il programma con il seguente comando:
sudo streamlit run dashboard.py
Nota che dovrai eseguire il programma con sudo
poiché le capacità di acquisizione dei pacchetti richiedono privilegi amministrativi. Se sei su Windows, apri il tuo terminale come Amministratore e poi esegui il programma senza il prefisso sudo
.
Dagli un momento per avviare la cattura dei pacchetti. Se tutto va per il meglio, dovresti vedere qualcosa del genere:
Queste sono tutte le visualizzazioni che abbiamo appena implementato nel nostro programma di dashboard Streamlit
.
Miglioramenti Futuri
Con questo, ecco alcune idee per miglioramenti futuri che puoi utilizzare per estendere le funzionalità della dashboard:
-
Aggiungi capacità di apprendimento automatico per il rilevamento delle anomalie
-
Implementa il mapping IP geografico
-
Crea avvisi personalizzati basati sui modelli di analisi del traffico
-
Aggiungi opzioni di analisi del carico utile dei pacchetti
Conclusione
Congratulazioni! Hai ora costruito con successo una dashboard di analisi del traffico di rete in tempo reale con Python e Streamlit
. Questo programma fornirà preziosi insights sul comportamento della rete e può essere esteso per vari casi d’uso, dal monitoraggio della sicurezza all’ottimizzazione della rete.
Con questo, spero tu abbia imparato alcuni concetti di base sull’analisi del traffico di rete e un po’ di programmazione in Python. Grazie per aver letto!