305 lines
13 KiB
Python
305 lines
13 KiB
Python
import threading
|
|
from models.SystemStats import SystemStats
|
|
from MainView import MainView
|
|
from models.NetworkingScanner import NetworkScanner
|
|
from models.Sniffer import packet_callback, sniff
|
|
from models.MusicPlayer import MusicPlayerModel
|
|
from models.GamblingGameModel import GamblingGameModel
|
|
from models.ScraperModel import ScraperModel
|
|
import tkinter as tk
|
|
import time
|
|
import datetime
|
|
|
|
|
|
class MainController:
|
|
def __init__(self, view: MainView):
|
|
self.view = view
|
|
self.sniffer_thread = None
|
|
self.sniffer_running = False
|
|
self.connect_events()
|
|
self.start_system_updates()
|
|
self.music_model = MusicPlayerModel()
|
|
self.game_model = GamblingGameModel()
|
|
self.roulette_running = False
|
|
self.roulette_thread = None
|
|
|
|
self.model = ScraperModel() # Modelo para gestionar el scraping
|
|
self.scraper_running = False
|
|
|
|
def connect_events(self):
|
|
"""Conecta los eventos de la vista con las funciones del controlador."""
|
|
self.view.button_track_ip.config(command=self.handle_track_ip)
|
|
self.view.button_scan_network.config(command=self.start_network_scan_thread)
|
|
self.view.button_start_sniffer.config(command=self.start_sniffer_thread)
|
|
self.view.button_stop_sniffer.config(command=self.stop_sniffer)
|
|
|
|
self.view.button_play_music.config(command=self.play_song)
|
|
self.view.button_stop_music.config(command=self.stop_song)
|
|
self.view.button_refresh_music.config(command=self.refresh_songs)
|
|
|
|
"""Conecta los eventos de la vista con las funciones del controlador."""
|
|
self.view.button_start_roulette.config(command=self.start_roulette)
|
|
self.view.button_stop_roulette.config(command=self.stop_roulette)
|
|
|
|
"""Conecta los eventos de la vista con las funciones del controlador."""
|
|
self.view.button_start_scraper.config(command=self.start_scraper_thread)
|
|
self.view.button_stop_scraper.config(command=self.stop_scraper)
|
|
|
|
|
|
def start_scraper_thread(self):
|
|
"""Inicia el scraping en un hilo separado."""
|
|
start_url = self.view.entry_scraper_url.get().strip()
|
|
if not start_url:
|
|
self.view.text_scraper_output.insert("end", "Por favor, introduce una URL válida.\n")
|
|
return
|
|
|
|
self.model.add_url(start_url) # Añadir URL inicial al modelo
|
|
self.scraper_running = True
|
|
self.view.button_start_scraper.config(state="disabled")
|
|
self.view.button_stop_scraper.config(state="normal")
|
|
|
|
# Iniciar hilo para el scraping
|
|
threading.Thread(target=self.run_scraper, daemon=True).start()
|
|
|
|
|
|
def stop_scraper(self):
|
|
"""Detiene el scraping."""
|
|
self.scraper_running = False
|
|
self.view.button_start_scraper.config(state="normal")
|
|
self.view.button_stop_scraper.config(state="disabled")
|
|
self.view.text_scraper_output.insert("end", "Scraper detenido.\n")
|
|
|
|
|
|
def run_scraper(self):
|
|
"""Ejecuta el scraping y actualiza la interfaz con los resultados."""
|
|
while self.scraper_running and self.model.has_pending_urls():
|
|
current_url, result = self.model.scrape_next_url()
|
|
|
|
if isinstance(result, str): # Error
|
|
self.view.text_scraper_output.insert("end", f"{result}\n")
|
|
else: # Éxito
|
|
self.view.text_scraper_output.insert("end", f"Explorando: {current_url}\n")
|
|
for link in result:
|
|
self.view.text_scraper_output.insert("end", f" - {link}\n")
|
|
|
|
self.view.text_scraper_output.see("end")
|
|
time.sleep(1) # Retardo para evitar sobrecarga
|
|
|
|
self.scraper_running = False
|
|
self.view.text_scraper_output.insert("end", "Scraping completado.\n")
|
|
self.view.button_start_scraper.config(state="normal")
|
|
self.view.button_stop_scraper.config(state="disabled")
|
|
|
|
def start_sniffer_thread(self):
|
|
"""Inicia el sniffer en un hilo separado."""
|
|
def run_sniffer():
|
|
self.sniffer_running = True
|
|
while self.sniffer_running:
|
|
try:
|
|
sniff(prn=self.process_packet, filter="ip", store=0, timeout=1)
|
|
except Exception as e:
|
|
self.view.text_sniffer_output.insert(tk.END, f"Error: {str(e)}\n")
|
|
|
|
# Configurar botones
|
|
self.view.button_start_sniffer.config(state="disabled")
|
|
self.view.button_stop_sniffer.config(state="normal")
|
|
|
|
# Iniciar el hilo del sniffer
|
|
self.sniffer_thread = threading.Thread(target=run_sniffer, daemon=True)
|
|
self.sniffer_thread.start()
|
|
|
|
|
|
def stop_sniffer(self):
|
|
"""Detiene el sniffer."""
|
|
self.sniffer_running = False
|
|
self.view.button_start_sniffer.config(state="normal")
|
|
self.view.button_stop_sniffer.config(state="disabled")
|
|
self.view.text_sniffer_output.insert(tk.END, "Sniffer detenido.\n")
|
|
|
|
|
|
def process_packet(self, packet):
|
|
"""Procesa un paquete capturado y lo muestra en la interfaz."""
|
|
packet_info = packet_callback(packet)
|
|
self.view.text_sniffer_output.insert(tk.END, f"{packet_info}\n")
|
|
self.view.text_sniffer_output.see(tk.END) # Scroll automático
|
|
|
|
def start_network_scan_thread(self):
|
|
"""Inicia un hilo para escanear la red continuamente cada 5 segundos."""
|
|
def scan_network_periodically():
|
|
network_range = NetworkScanner.get_network_range()
|
|
while True:
|
|
print("Scanning....")
|
|
devices = NetworkScanner.scan_network(network_range)
|
|
self.display_network_scan_results(devices)
|
|
|
|
|
|
# Inicia un hilo para el escaneo continuo
|
|
threading.Thread(target=scan_network_periodically, daemon=True).start()
|
|
|
|
def handle_track_ip(self):
|
|
"""Maneja el evento de rastrear una IP."""
|
|
ip = self.view.entry_ip.get() # Obtener la IP ingresada
|
|
if not ip:
|
|
self.view.text_output.insert(tk.END, "Por favor, introduce una IP válida.\n")
|
|
return
|
|
|
|
# Ejecutar fetch_ip_data en un hilo
|
|
def fetch_ip_data():
|
|
try:
|
|
result = SystemStats.track_ip(ip) # Llamar a la función con la IP
|
|
if "error" in result:
|
|
self.view.text_output.insert(tk.END, f"Error: {result['error']}\n")
|
|
else:
|
|
self.display_ip_data(result)
|
|
except Exception as e:
|
|
self.view.text_output.insert(tk.END, f"Error al rastrear la IP: {e}\n")
|
|
|
|
threading.Thread(target=fetch_ip_data, daemon=True).start()
|
|
|
|
|
|
def display_network_scan_results(self, devices):
|
|
"""Muestra los resultados del escaneo en la interfaz."""
|
|
# Limpiar el texto anterior y mostrar los nuevos resultados
|
|
self.view.text_network_scan.delete('1.0', tk.END)
|
|
self.view.text_network_scan.insert(tk.END, "Dispositivos encontrados:\n")
|
|
for device in devices:
|
|
self.view.text_network_scan.insert(tk.END, f"IP: {device['ip']}, MAC: {device['mac']}\n")
|
|
|
|
def display_ip_data(self, data):
|
|
"""Muestra la información de la IP en el área de texto."""
|
|
self.view.text_output.insert(tk.END, f"IP: {data['ip']}\n")
|
|
self.view.text_output.insert(tk.END, f"Tipo: {data['type']}\n")
|
|
self.view.text_output.insert(tk.END, f"País: {data['country']} ({data['country_code']})\n")
|
|
self.view.text_output.insert(tk.END, f"Región: {data['region']} ({data['region_code']})\n")
|
|
self.view.text_output.insert(tk.END, f"Ciudad: {data['city']}\n")
|
|
self.view.text_output.insert(tk.END, f"Latitud: {data['latitude']}\n")
|
|
self.view.text_output.insert(tk.END, f"Longitud: {data['longitude']}\n")
|
|
self.view.text_output.insert(tk.END, f"Mapa: https://www.google.com/maps/@{data['latitude']},{data['longitude']},8z\n")
|
|
self.view.text_output.insert(tk.END, "=============================================\n")
|
|
|
|
def start_system_updates(self):
|
|
"""Inicia las actualizaciones periódicas del sistema."""
|
|
threading.Thread(target=self.update_system_stats, daemon=True).start()
|
|
self.update_time()
|
|
|
|
def update_time(self):
|
|
"""Actualiza el reloj en la barra de estado."""
|
|
now = datetime.datetime.now()
|
|
time_str = now.strftime("%H:%M:%S")
|
|
date_str = now.strftime("%Y-%m-%d")
|
|
day_of_week = now.strftime("%A")
|
|
self.view.label_fecha_hora.config(text=f"{day_of_week}, {date_str} - {time_str}")
|
|
self.view.root.after(1000, self.update_time) # Actualizar cada segundo
|
|
|
|
def update_system_stats(self):
|
|
"""Actualiza las estadísticas del sistema periódicamente en un hilo."""
|
|
while True:
|
|
# Obtener estadísticas del sistema
|
|
cpu_usage = SystemStats.get_cpu_usage()
|
|
memory_usage = SystemStats.get_memory_usage()
|
|
bytes_sent, bytes_recv = SystemStats.get_network_usage()
|
|
|
|
# Actualizar etiquetas en el hilo principal
|
|
self.view.root.after(0, self.view.label_cpu.config, {"text": f"Uso CPU: {cpu_usage:.2f}%"})
|
|
self.view.root.after(0, self.view.label_uso_memoria.config, {"text": f"Uso RAM: {memory_usage:.2f}%"})
|
|
self.view.root.after(0, self.view.label_bytes_sent.config, {"text": f"Subida: {bytes_sent / 1024:.2f} KB/s"})
|
|
self.view.root.after(0, self.view.label_bytes_recv.config, {"text": f"Descarga: {bytes_recv / 1024:.2f} KB/s"})
|
|
|
|
time.sleep(1) # Esperar 1 segundo antes de actualizar nuevamente
|
|
|
|
|
|
def load_songs(self):
|
|
"""Carga las canciones desde el modelo y las envía a la vista."""
|
|
self.songs, self.message = self.music_model.load_songs()
|
|
self.view.populate_music_player(self.songs)
|
|
|
|
def play_song(self):
|
|
"""Reproduce la canción seleccionada."""
|
|
selected = self.view.music_listbox.curselection()
|
|
if not selected:
|
|
self.view.root.bell()
|
|
return
|
|
song_name = self.view.music_listbox.get(selected)
|
|
message = self.music_model.play_song(song_name)
|
|
print(f"Reproduciendo: {message}")
|
|
|
|
def stop_song(self):
|
|
"""Detiene la reproducción de la canción."""
|
|
message = self.music_model.stop_song()
|
|
print(f"Detenido: {message}")
|
|
|
|
def refresh_songs(self):
|
|
"""Actualiza la lista de canciones disponibles."""
|
|
self.load_songs()
|
|
print(self.message)
|
|
|
|
|
|
def start_roulette(self):
|
|
"""Inicia el giro de la ruleta en un hilo."""
|
|
if self.roulette_running:
|
|
return
|
|
|
|
try:
|
|
bet = int(self.view.bet_entry.get())
|
|
chosen_number = int(self.view.number_entry.get())
|
|
except ValueError:
|
|
self.view.result_label.config(text="Por favor, ingresa valores numéricos válidos.", foreground="red")
|
|
return
|
|
|
|
if bet <= 0 or chosen_number < 1 or chosen_number > 10:
|
|
self.view.result_label.config(
|
|
text="La apuesta debe ser mayor a 0 y elige un número entre 1 y 10.", foreground="red"
|
|
)
|
|
return
|
|
|
|
if bet > self.game_model.get_balance():
|
|
self.view.result_label.config(text="No tienes suficiente saldo para esta apuesta.", foreground="red")
|
|
return
|
|
|
|
self.game_model.set_bet(bet)
|
|
self.game_model.set_chosen_number(chosen_number)
|
|
self.roulette_running = True
|
|
self.view.button_start_roulette.config(state="disabled")
|
|
self.view.button_stop_roulette.config(state="normal")
|
|
|
|
# Inicia el hilo de la ruleta
|
|
self.roulette_thread = threading.Thread(target=self.spin_roulette)
|
|
self.roulette_thread.start()
|
|
|
|
def spin_roulette(self):
|
|
"""Simula el giro continuo de la ruleta."""
|
|
while self.roulette_running:
|
|
self.game_model.spin_roulette()
|
|
roulette_number = self.game_model.get_roulette_number()
|
|
self.view.roulette_label.config(text=f"Ruleta: {roulette_number}")
|
|
time.sleep(0.1)
|
|
|
|
def stop_roulette(self):
|
|
"""Detiene la ruleta y evalúa el resultado."""
|
|
if not self.roulette_running:
|
|
return
|
|
|
|
self.roulette_running = False
|
|
self.view.button_start_roulette.config(state="normal")
|
|
self.view.button_stop_roulette.config(state="disabled")
|
|
|
|
# Evaluar el resultado del juego
|
|
won, amount = self.game_model.evaluate_result()
|
|
if won:
|
|
self.view.result_label.config(
|
|
text=f"¡Ganaste! Número: {self.game_model.get_roulette_number()}. Ganaste $ {amount}.", foreground="green"
|
|
)
|
|
else:
|
|
self.view.result_label.config(
|
|
text=f"Perdiste. Número: {self.game_model.get_roulette_number()}. Perdiste $ {amount}.", foreground="red"
|
|
)
|
|
|
|
# Actualizar saldo en la vista
|
|
self.view.balance_label.config(text=f"Saldo: ${self.game_model.get_balance()}")
|
|
|
|
# Verificar saldo
|
|
if self.game_model.get_balance() <= 0:
|
|
self.view.result_label.config(
|
|
text="¡Te quedaste sin saldo! Gracias por jugar.", foreground="red"
|
|
)
|
|
self.view.button_start_roulette.config(state="disabled") |