Implementacion en solapa3 wireShark
This commit is contained in:
parent
ee98b3cef3
commit
5b8493b67a
|
@ -2,6 +2,7 @@ import threading
|
|||
from models.SystemStats import SystemStats
|
||||
from MainView import MainView
|
||||
from models.NetworkingScanner import NetworkScanner
|
||||
from models.Sniffer import packet_callback, sniff
|
||||
import tkinter as tk
|
||||
import time
|
||||
import datetime
|
||||
|
@ -10,13 +11,51 @@ import datetime
|
|||
class MainController:
|
||||
def __init__(self, view: MainView):
|
||||
self.view = view
|
||||
self.sniffer_thread = None
|
||||
self.sniffer_running = False
|
||||
self.connect_events()
|
||||
self.start_system_updates()
|
||||
|
||||
def connect_events(self):
|
||||
"""Conecta los eventos de la vista con :las funciones del controlador."""
|
||||
"""Conecta los eventos de la vista con las funciones del controlador."""
|
||||
self.view.button_track_ip.config(command=self.handle_track_ip)
|
||||
self.view.button_scan_network.config(command=self.start_network_scan_thread)
|
||||
self.view.button_start_sniffer.config(command=self.start_sniffer_thread)
|
||||
self.view.button_stop_sniffer.config(command=self.stop_sniffer)
|
||||
|
||||
|
||||
def start_sniffer_thread(self):
|
||||
"""Inicia el sniffer en un hilo separado."""
|
||||
def run_sniffer():
|
||||
self.sniffer_running = True
|
||||
while self.sniffer_running:
|
||||
try:
|
||||
sniff(prn=self.process_packet, filter="ip", store=0, timeout=1)
|
||||
except Exception as e:
|
||||
self.view.text_sniffer_output.insert(tk.END, f"Error: {str(e)}\n")
|
||||
|
||||
# Configurar botones
|
||||
self.view.button_start_sniffer.config(state="disabled")
|
||||
self.view.button_stop_sniffer.config(state="normal")
|
||||
|
||||
# Iniciar el hilo del sniffer
|
||||
self.sniffer_thread = threading.Thread(target=run_sniffer, daemon=True)
|
||||
self.sniffer_thread.start()
|
||||
|
||||
|
||||
def stop_sniffer(self):
|
||||
"""Detiene el sniffer."""
|
||||
self.sniffer_running = False
|
||||
self.view.button_start_sniffer.config(state="normal")
|
||||
self.view.button_stop_sniffer.config(state="disabled")
|
||||
self.view.text_sniffer_output.insert(tk.END, "Sniffer detenido.\n")
|
||||
|
||||
|
||||
def process_packet(self, packet):
|
||||
"""Procesa un paquete capturado y lo muestra en la interfaz."""
|
||||
packet_info = packet_callback(packet)
|
||||
self.view.text_sniffer_output.insert(tk.END, f"{packet_info}\n")
|
||||
self.view.text_sniffer_output.see(tk.END) # Scroll automático
|
||||
|
||||
def start_network_scan_thread(self):
|
||||
"""Inicia un hilo para escanear la red continuamente cada 5 segundos."""
|
||||
|
|
25
MainView.py
25
MainView.py
|
@ -52,8 +52,13 @@ class MainView:
|
|||
self.notebook.add(tab2, text="Escaneo de Red")
|
||||
self.create_network_scan_ui(tab2)
|
||||
|
||||
# Solapas 3-5: Vacías
|
||||
for i in range(3, 6):
|
||||
# Solapa 3: Sniffer
|
||||
tab3 = ttk.Frame(self.notebook)
|
||||
self.notebook.add(tab3, text="Sniffer")
|
||||
self.create_sniffer_ui(tab3)
|
||||
|
||||
# Solapas 4-5: Vacías
|
||||
for i in range(4, 6):
|
||||
tab = ttk.Frame(self.notebook)
|
||||
self.notebook.add(tab, text=f"Solapa {i}")
|
||||
label = ttk.Label(tab, text=f"Contenido de la Solapa {i}")
|
||||
|
@ -85,6 +90,22 @@ class MainView:
|
|||
self.button_scan_network = ttk.Button(tab, text="Iniciar Escaneo")
|
||||
self.button_scan_network.pack(pady=5)
|
||||
|
||||
def create_sniffer_ui(self, tab):
|
||||
"""Crea la interfaz de usuario para el sniffer en la Solapa 3."""
|
||||
label = ttk.Label(tab, text="Sniffer de paquetes:")
|
||||
label.pack(pady=5)
|
||||
|
||||
self.text_sniffer_output = scrolledtext.ScrolledText(tab, wrap=tk.WORD, height=20, width=80)
|
||||
self.text_sniffer_output.pack(fill="both", expand=True, pady=10)
|
||||
|
||||
self.button_start_sniffer = ttk.Button(tab, text="Iniciar Sniffer")
|
||||
self.button_start_sniffer.pack(side="left", padx=5)
|
||||
|
||||
self.button_stop_sniffer = ttk.Button(tab, text="Detener Sniffer", state="disabled")
|
||||
self.button_stop_sniffer.pack(side="left", padx=5)
|
||||
|
||||
|
||||
|
||||
def create_status_bar(self):
|
||||
"""Crea una barra de estado en la parte inferior."""
|
||||
self.label_cpu = tk.Label(self.frame_bottom, text="Uso CPU: 0%", bg="orange", anchor="w", width=20)
|
||||
|
|
Binary file not shown.
Binary file not shown.
|
@ -0,0 +1,46 @@
|
|||
from scapy.all import sniff
|
||||
from scapy.layers.inet import IP, TCP, UDP, ICMP
|
||||
from scapy.packet import Raw
|
||||
import os
|
||||
import datetime
|
||||
|
||||
def save_packet_to_file(packet):
|
||||
"""Guarda el paquete en un archivo de registro para análisis posterior."""
|
||||
with open("packet_log.txt", "a") as log_file:
|
||||
log_file.write(f"{datetime.datetime.now()} - {packet.summary()}\n")
|
||||
log_file.write(f"{packet.show(dump=True)}\n\n")
|
||||
|
||||
def packet_callback(packet):
|
||||
"""Procesa y retorna información del paquete capturado."""
|
||||
packet_info = []
|
||||
if IP in packet:
|
||||
ip_layer = packet[IP]
|
||||
packet_info.append(f"[+] Capturado un paquete:")
|
||||
packet_info.append(f" Origen: {ip_layer.src}")
|
||||
packet_info.append(f" Destino: {ip_layer.dst}")
|
||||
packet_info.append(f" Protocolo: {ip_layer.proto}")
|
||||
|
||||
if TCP in packet:
|
||||
tcp_layer = packet[TCP]
|
||||
packet_info.append(f" Puerto origen: {tcp_layer.sport}")
|
||||
packet_info.append(f" Puerto destino: {tcp_layer.dport}")
|
||||
elif UDP in packet:
|
||||
udp_layer = packet[UDP]
|
||||
packet_info.append(f" Puerto origen: {udp_layer.sport}")
|
||||
packet_info.append(f" Puerto destino: {udp_layer.dport}")
|
||||
elif ICMP in packet:
|
||||
packet_info.append(" Tipo de ICMP detectado.")
|
||||
|
||||
if Raw in packet:
|
||||
payload = packet[Raw].load.decode(errors="ignore")
|
||||
packet_info.append(f" Datos (Raw): {payload}")
|
||||
if "youtube.com" in payload or "YouTube" in payload:
|
||||
packet_info.append(f"\n[!!!] Posible enlace de YouTube detectado:")
|
||||
packet_info.append(payload)
|
||||
|
||||
packet_info.append(f"[Paquete completo]: {packet.summary()}")
|
||||
|
||||
# Guardar el paquete en un archivo
|
||||
save_packet_to_file(packet)
|
||||
|
||||
return "\n".join(packet_info)
|
Binary file not shown.
File diff suppressed because one or more lines are too long
63
pruebas.py
63
pruebas.py
|
@ -11,67 +11,36 @@ def save_packet_to_file(packet):
|
|||
log_file.write(f"{packet.show(dump=True)}\n\n")
|
||||
|
||||
def packet_callback(packet):
|
||||
"""Procesa y muestra información del paquete capturado."""
|
||||
"""Procesa y retorna información del paquete capturado."""
|
||||
packet_info = []
|
||||
if IP in packet:
|
||||
ip_layer = packet[IP]
|
||||
print(f"\n[+] Capturado un paquete:")
|
||||
print(f" Origen: {ip_layer.src}")
|
||||
print(f" Destino: {ip_layer.dst}")
|
||||
print(f" Protocolo: {ip_layer.proto}")
|
||||
packet_info.append(f"[+] Capturado un paquete:")
|
||||
packet_info.append(f" Origen: {ip_layer.src}")
|
||||
packet_info.append(f" Destino: {ip_layer.dst}")
|
||||
packet_info.append(f" Protocolo: {ip_layer.proto}")
|
||||
|
||||
if TCP in packet:
|
||||
tcp_layer = packet[TCP]
|
||||
print(f" Puerto origen: {tcp_layer.sport}")
|
||||
print(f" Puerto destino: {tcp_layer.dport}")
|
||||
packet_info.append(f" Puerto origen: {tcp_layer.sport}")
|
||||
packet_info.append(f" Puerto destino: {tcp_layer.dport}")
|
||||
elif UDP in packet:
|
||||
udp_layer = packet[UDP]
|
||||
print(f" Puerto origen: {udp_layer.sport}")
|
||||
print(f" Puerto destino: {udp_layer.dport}")
|
||||
packet_info.append(f" Puerto origen: {udp_layer.sport}")
|
||||
packet_info.append(f" Puerto destino: {udp_layer.dport}")
|
||||
elif ICMP in packet:
|
||||
print(" Tipo de ICMP detectado.")
|
||||
packet_info.append(" Tipo de ICMP detectado.")
|
||||
|
||||
if Raw in packet:
|
||||
payload = packet[Raw].load.decode(errors="ignore")
|
||||
print(f" Datos (Raw): {payload}")
|
||||
packet_info.append(f" Datos (Raw): {payload}")
|
||||
if "youtube.com" in payload or "YouTube" in payload:
|
||||
print(f"\n[!!!] Posible enlace de YouTube detectado:")
|
||||
print(payload)
|
||||
packet_info.append(f"\n[!!!] Posible enlace de YouTube detectado:")
|
||||
packet_info.append(payload)
|
||||
|
||||
print(f"[Paquete completo]: {packet.summary()}")
|
||||
packet_info.append(f"[Paquete completo]: {packet.summary()}")
|
||||
|
||||
# Guardar el paquete en un archivo
|
||||
save_packet_to_file(packet)
|
||||
|
||||
def start_sniffer():
|
||||
"""Inicia la captura de paquetes."""
|
||||
print("Iniciando captura de paquetes. Presiona Ctrl+C para detener.")
|
||||
print("Opciones disponibles:")
|
||||
print("1. Capturar todo el tráfico IP")
|
||||
print("2. Filtrar por TCP")
|
||||
print("3. Filtrar por UDP")
|
||||
print("4. Filtrar por ICMP")
|
||||
print("5. Salir")
|
||||
|
||||
choice = input("Selecciona una opción (1-5): ")
|
||||
try:
|
||||
if choice == "1":
|
||||
sniff(prn=packet_callback, filter="ip", store=0)
|
||||
elif choice == "2":
|
||||
sniff(prn=packet_callback, filter="tcp", store=0)
|
||||
elif choice == "3":
|
||||
sniff(prn=packet_callback, filter="udp", store=0)
|
||||
elif choice == "4":
|
||||
sniff(prn=packet_callback, filter="icmp", store=0)
|
||||
elif choice == "5":
|
||||
print("Saliendo del programa.")
|
||||
return
|
||||
else:
|
||||
print("Opción no válida. Intenta de nuevo.")
|
||||
start_sniffer()
|
||||
except KeyboardInterrupt:
|
||||
print("\nCaptura detenida.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
if os.path.exists("packet_log.txt"):
|
||||
os.remove("packet_log.txt")
|
||||
start_sniffer()
|
||||
return "\n".join(packet_info)
|
||||
|
|
Loading…
Reference in New Issue