ProyectoPHP/monitor_manager.py

553 lines
26 KiB
Python

# monitor_manager.py
import tkinter as tk
import psutil
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
import threading
import time
import platform
import datetime
import csv
from tkinter import messagebox, ttk
import requests
from bs4 import BeautifulSoup
import re
# Importaciones directas de módulos (Acceso con el prefijo del módulo)
import config
import system_utils
# ===============================================
# Lógica del Panel Lateral (Resumen Rápido)
# ===============================================
def actualizar_resumen_lateral(root):
"""Hilo que actualiza la información básica del sistema."""
boot_time_timestamp = psutil.boot_time()
while config.monitor_running:
try:
# 1. Hostname, OS, Uptime...
hostname_str = platform.node()
os_name = platform.system()
os_version = platform.release()
arch = platform.machine()
os_str = f"{os_name} {os_version} ({arch})"
current_time = time.time()
uptime_seconds = int(current_time - boot_time_timestamp)
uptime_delta = str(datetime.timedelta(seconds=uptime_seconds))
# Chequeo antes de llamar a root.after
if root.winfo_exists():
root.after(0, config.label_hostname.config, {"text": f"Host: {hostname_str}"})
root.after(0, config.label_os_info.config, {"text": f"OS: {os_str}"})
root.after(0, config.label_uptime.config, {"text": f"Uptime: {uptime_delta.split('.')[0]}"})
else:
break # Salir si la ventana ya no existe
except Exception as e:
if root.winfo_exists():
root.after(0, system_utils.log_event, f"Error en hilo de resumen lateral: {e}")
time.sleep(5) # Actualizar cada 5 segundos
def crear_panel_lateral(frame, root):
"""Crea el panel lateral izquierdo SOLO con el resumen rápido."""
# --- Sección de Resumen del Sistema ---
resumen_frame = tk.LabelFrame(frame, text="Resumen Rápido", padx=10, pady=10)
resumen_frame.pack(fill="x", padx=10, pady=10)
label_style = {'font': ('Helvetica', 9, 'bold'), 'anchor': 'w', 'bg': frame['bg']}
config.label_hostname = tk.Label(resumen_frame, text="Host: Cargando...", **label_style)
config.label_hostname.pack(fill="x", pady=2)
config.label_os_info = tk.Label(resumen_frame, text="OS: Cargando...", **label_style)
config.label_os_info.pack(fill="x", pady=2)
config.label_uptime = tk.Label(resumen_frame, text="Uptime: Cargando...", **label_style)
config.label_uptime.pack(fill="x", pady=2)
# Iniciar el hilo de actualización del resumen
summary_thread = threading.Thread(target=lambda: actualizar_resumen_lateral(root))
summary_thread.daemon = True
summary_thread.start()
# ===============================================
# Lógica de Web Scraping
# ===============================================
def scrappear_pagina_principal(url, tipo_extraccion, output_text_widget, progress_bar, selector, atributo, config_data, root):
"""
Realiza la extracción de datos de la URL. Ahora acepta selector, atributo y config_data.
Se ha eliminado el límite de elementos para los modos avanzados y combinados.
"""
# 1. Validación y Control
if config.scraping_running:
root.after(0, system_utils.log_event, "Ya hay una extracción en curso. Detenla primero.")
return
config.scraping_running = True
# Si hay configuración JSON cargada, se usan esos datos
if config_data:
try:
# Si el JSON contiene URL, se usa, sino se usa la de la interfaz (ya actualizada por system_utils)
if 'url' in config_data:
url = config_data.get('url')
tipo_extraccion = config_data.get('type', tipo_extraccion)
selector = config_data.get('selector', selector)
atributo = config_data.get('attribute', atributo)
root.after(0, system_utils.log_event, f"Usando configuración JSON: Tipo={tipo_extraccion}, Selector={selector}")
except Exception as e:
root.after(0, system_utils.log_event, f"ERROR al leer config JSON: {e}")
config.scraping_running = False
return
# Validación específica para modos avanzados
is_advanced = tipo_extraccion in ["-> Texto Específico (CSS Selector)", "-> Atributo Específico (CSS Selector + Attr)", "Portátiles Gamer (Enlace + Precio)"]
if is_advanced and not selector and tipo_extraccion != "Portátiles Gamer (Enlace + Precio)":
root.after(0, system_utils.log_event, "ERROR: El modo avanzado requiere un Selector CSS/Tag.")
root.after(0, lambda: progress_bar.stop())
config.scraping_running = False
return
def perform_scraping():
if not root.winfo_exists() or not config.scraping_running:
config.scraping_running = False
return
# 2. Preparar UI (hilo principal)
root.after(0, progress_bar.start, 10)
root.after(0, system_utils.log_event, f"Iniciando extracción de '{tipo_extraccion}' en: {url}...")
root.after(0, lambda: output_text_widget.delete('1.0', tk.END))
root.after(0, lambda: output_text_widget.insert(tk.END, f"--- EXTRACCIÓN EN CURSO: {url} ---\n\n"))
try:
# 3. Realizar la solicitud HTTP con headers mejorados
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36',
'Accept-Language': 'es-ES,es;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'Referer': 'https://www.google.com/',
'Connection': 'keep-alive',
}
response = requests.get(url, headers=headers, timeout=30) # Aumento timeout por seguridad
response.raise_for_status()
# 4. Analizar el contenido
soup = BeautifulSoup(response.text, 'html.parser')
result_text = ""
# 5. Extracción basada en el tipo seleccionado
# --- EXTRACCIÓN COMBINADA AMAZON (SIN LÍMITE) ---
if tipo_extraccion == "Portátiles Gamer (Enlace + Precio)":
# Selector de contenedor genérico para cada resultado de Amazon
PRODUCT_CONTAINER = "div.s-result-item"
containers = soup.select(PRODUCT_CONTAINER)
if containers:
result_text += f"--- {len(containers)} CONTENEDORES DE PRODUCTO ENCONTRADOS --- \n\n"
for i, container in enumerate(containers):
# 1. Encontrar el enlace/título principal: Usamos el selector que funciona para el enlace.
link_tag = container.select_one('h2 a')
if not link_tag:
# Selector de respaldo que te funcionó parcialmente antes
link_tag = container.select_one('a.a-link-normal.s-underline-text.s-underline-link-text.s-link-style.a-text-normal')
# 2. Extraer el título: Buscamos el SPAN que contiene el texto del título (clase usada por Amazon)
title_span = container.select_one('span.a-text-normal')
# 3. Extraer Precio (dentro del contenedor)
price_whole_tag = container.select_one('span.a-price-whole')
price_symbol_tag = container.select_one('span.a-price-symbol')
title = title_span.get_text(strip=True) if title_span else "N/A (Título Span Falló)"
link = link_tag.get('href') if link_tag else "N/A"
price = f"{price_whole_tag.get_text(strip=True)}{price_symbol_tag.get_text(strip=True)}" if price_whole_tag and price_symbol_tag else "Precio No Encontrado"
# Formato de Salida
result_text += f"[{i+1}] TÍTULO: {title}\n"
result_text += f" PRECIO: {price}\n"
# Manejo de enlaces relativos de Amazon
if link.startswith('/'):
result_text += f" ENLACE: https://www.amazon.es{link}\n"
else:
result_text += f" ENLACE: {link}\n"
result_text += "---------------------------------------\n"
else:
result_text += f"ERROR: No se encontraron contenedores de producto con el selector: '{PRODUCT_CONTAINER}'.\n"
# --- MODOS BÁSICOS Y AVANZADOS (SIN LÍMITE) ---
elif tipo_extraccion == "Título y Metadatos":
title = soup.title.string if soup.title else "N/A"
description_tag = soup.find('meta', attrs={'name': 'description'})
desc_content = description_tag.get('content') if description_tag else "N/A"
result_text += f"TÍTULO: {title}\n"
result_text += f"DESCRIPCIÓN: {desc_content}\n"
elif tipo_extraccion == "Primeros Párrafos":
paragraphs = soup.find_all('p', limit=10)
if paragraphs:
for i, p in enumerate(paragraphs):
text = p.get_text(strip=True)
result_text += f"PARRAFO {i+1}:\n{text[:300]}{'...' if len(text) > 300 else ''}\n\n"
else:
result_text += "No se encontraron párrafos.\n"
elif tipo_extraccion == "Enlaces (Links)":
links = soup.find_all('a', href=True)
if links:
for i, link in enumerate(links):
text = link.get_text(strip=True)[:50] or "Link sin texto"
result_text += f"[{i+1}] TEXTO: {text} \n URL: {link['href']}\n\n"
else:
result_text += "No se encontraron enlaces.\n"
elif tipo_extraccion == "Imágenes (URLs)":
images = soup.find_all('img', src=True)
if images:
for i, img in enumerate(images):
alt_text = img.get('alt', 'N/A')
result_text += f"[{i+1}] ALT: {alt_text[:50]} \n URL: {img['src']}\n\n"
else:
result_text += "No se encontraron etiquetas de imagen (<img>).\n"
elif tipo_extraccion == "Tablas (Estructura Básica)":
tables = soup.find_all('table', limit=5)
if tables:
for i, table in enumerate(tables):
result_text += f"\n--- TABLA {i+1} ---\n"
rows = table.find_all(['tr'])
for row in rows[:10]:
cols = row.find_all(['td', 'th'])
row_data = [re.sub(r'\s+', ' ', col.get_text(strip=True)) for col in cols]
result_text += " | ".join(row_data) + "\n"
result_text += "--- FIN TABLA ---\n"
else:
result_text += "No se encontraron tablas (<table>).\n"
elif tipo_extraccion == "-> Texto Específico (CSS Selector)":
elements = soup.select(selector)
if elements:
result_text += f"--- {len(elements)} ELEMENTOS ENCONTRADOS CON SELECTOR: '{selector}' ---\n\n"
for i, el in enumerate(elements):
text = el.get_text(strip=True)
result_text += f"[{i+1}]: {text[:300]}{'...' if len(text) > 300 else ''}\n\n"
else:
result_text += f"No se encontraron elementos con el selector: '{selector}'.\n"
elif tipo_extraccion == "-> Atributo Específico (CSS Selector + Attr)":
if not atributo:
result_text += f"ERROR: El modo Atributo requiere un Selector y un Atributo (ej: 'href', 'src').\n"
else:
elements = soup.select(selector)
if elements:
result_text += f"--- {len(elements)} ATRIBUTOS '{atributo}' ENCONTRADOS CON SELECTOR: '{selector}' ---\n\n"
for i, el in enumerate(elements):
attr_value = el.get(atributo, "N/A (Atributo no encontrado)")
result_text += f"[{i+1}] VALOR: {attr_value}\n"
else:
result_text += f"No se encontraron elementos con el selector: '{selector}'.\n"
result_text += "\n--- EXTRACCIÓN FINALIZADA ---\n"
# 6. Mostrar el resultado en la UI
if root.winfo_exists() and config.scraping_running:
root.after(0, lambda: output_text_widget.delete('1.0', tk.END))
root.after(0, lambda: output_text_widget.insert(tk.END, result_text))
root.after(0, system_utils.log_event, "Scrapear finalizado con éxito.")
except requests.exceptions.RequestException as e:
error_msg = f"ERROR de Red o HTTP: {e}"
if root.winfo_exists() and config.scraping_running:
root.after(0, lambda: output_text_widget.insert(tk.END, error_msg))
root.after(0, system_utils.log_event, error_msg)
except Exception as e:
error_msg = f"ERROR inesperado al analizar el contenido: {e}"
if root.winfo_exists() and config.scraping_running:
root.after(0, lambda: output_text_widget.insert(tk.END, error_msg))
root.after(0, system_utils.log_event, error_msg)
finally:
# 7. Limpieza final
config.scraping_running = False
if root.winfo_exists():
root.after(0, progress_bar.stop)
root.after(0, progress_bar.config, {"value": 0})
if not root.winfo_exists(): return
root.after(0, system_utils.log_event, f"Estado de Scrapear reseteado. Detenido: {not config.scraping_running}")
# Lanzar el hilo de Scrapping
threading.Thread(target=perform_scraping, daemon=True).start()
# ===============================================
# Monitoreo del Sistema (Existente)
# ===============================================
def get_top_processes(limit=10):
"""Obtiene los N procesos con mayor uso de CPU y sus métricas."""
processes_list = []
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_info', 'num_threads']):
try:
mem_info = proc.info['memory_info']
cpu_percent = proc.info['cpu_percent']
num_threads = proc.info['num_threads']
if cpu_percent is not None and cpu_percent > 0.0:
processes_list.append({
'pid': proc.info['pid'],
'name': proc.info['name'],
'cpu': cpu_percent,
'mem_mb': mem_info.rss / (1024 * 1024) if mem_info else 0,
'num_threads': num_threads if num_threads is not None else 0
})
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
except Exception as e:
system_utils.log_event(f"Error inesperado en get_top_processes: {e}")
continue
processes_list.sort(key=lambda x: x['cpu'], reverse=True)
return processes_list[:limit]
def iniciar_monitor_sistema(fig, canvas, ax_cpu, ax_mem, ax_net, ax_cores, ax_pie, ax_disk_io, treeview_processes, root):
"""Función que inicia el hilo de recolección de métricas."""
monitor_thread = threading.Thread(
target=actualizar_metricas,
args=(fig, canvas, ax_cpu, ax_mem, ax_net, ax_cores, ax_pie, ax_disk_io, treeview_processes, root)
)
monitor_thread.daemon = True
monitor_thread.start()
def actualizar_metricas(fig, canvas, ax_cpu, ax_mem, ax_net, ax_cores, ax_pie, ax_disk_io, treeview_processes, root):
"""Bucle principal del monitor: recolecta datos y actualiza los gráficos/tablas."""
# Inicialización de contadores de E/S de disco y red
net_io = psutil.net_io_counters()
last_bytes_sent = net_io.bytes_sent
last_bytes_recv = net_io.bytes_recv
disk_io = psutil.disk_io_counters()
last_read_bytes = disk_io.read_bytes
last_write_bytes = disk_io.write_bytes
psutil.cpu_percent(interval=None)
while config.monitor_running:
try:
# 1. Recolección de datos
cpu_usage = psutil.cpu_percent(interval=None)
mem_details = psutil.virtual_memory()
mem_usage = mem_details.percent
core_usages = psutil.cpu_percent(interval=None, percpu=True)
for i, usage in enumerate(config.datos_cores):
config.datos_cores[i] = core_usages[i]
current_net_io = psutil.net_io_counters()
speed_sent = (current_net_io.bytes_sent - last_bytes_sent)
speed_recv = (current_net_io.bytes_recv - last_bytes_recv)
last_bytes_sent = current_net_io.bytes_sent
last_bytes_recv = current_net_io.bytes_recv
current_disk_io = psutil.disk_io_counters()
speed_read = (current_disk_io.read_bytes - last_read_bytes)
speed_write = (current_disk_io.write_bytes - last_write_bytes)
last_read_bytes = current_disk_io.read_bytes
last_write_bytes = current_disk_io.write_bytes
top_processes = get_top_processes(limit=10)
# Detección de Procesos Zombis
zombie_count = sum(1 for p in psutil.process_iter(['status']) if p.info['status'] == psutil.STATUS_ZOMBIE)
# 2. Actualizar datos de gráficos
config.datos_cpu.pop(0); config.datos_cpu.append(cpu_usage)
config.datos_mem.pop(0); config.datos_mem.append(mem_usage)
config.datos_net_sent.pop(0); config.datos_net_sent.append(speed_sent / 1024)
config.datos_net_recv.pop(0); config.datos_net_recv.append(speed_recv / 1024)
config.datos_disk_read.pop(0); config.datos_disk_read.append(speed_read / (1024 * 1024))
config.datos_disk_write.pop(0); config.datos_disk_write.append(speed_write / (1024 * 1024))
# --- Chequeo de existencia de ventana ANTES de llamadas Tkinter ---
if not root.winfo_exists():
break
# Detección de Zombis (actualización de log si root existe)
if zombie_count > 0:
root.after(0, system_utils.log_event, f"ALERTA: Se detectaron {zombie_count} procesos ZOMBI.")
# 3. Lógica de registro CSV
if config.registro_csv_activo:
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
data_row = [timestamp, config.datos_cpu[-1], config.datos_mem[-1], config.datos_net_sent[-1], config.datos_net_recv[-1]]
try:
with open(config.archivo_registro_csv, mode='a', newline='') as file:
writer = csv.writer(file)
writer.writerow(data_row)
except Exception as e:
root.after(0, system_utils.log_event, f"ERROR al escribir en CSV: {e}")
config.registro_csv_activo = False
root.after(0, config.label_2.config, {"text": "Registro: ERROR", "bg": "red"})
# 4. Actualizar Gráficos y Treeview (en el hilo principal)
# Chequeo de existencia de los widgets que reciben la actualización
if canvas.get_tk_widget().winfo_exists() and treeview_processes.winfo_exists():
root.after(0, lambda: dibujar_graficos(fig, canvas, ax_cpu, ax_mem, ax_net, ax_cores, ax_pie, ax_disk_io, mem_details, root))
root.after(0, lambda: actualizar_process_treeview(treeview_processes, top_processes))
except Exception as e:
if root.winfo_exists():
root.after(0, system_utils.log_event, f"Error en el hilo de monitor: {e}")
time.sleep(1)
def dibujar_graficos(fig, canvas, ax_cpu, ax_mem, ax_net, ax_cores, ax_pie, ax_disk_io, mem_details, root):
"""Dibuja y actualiza los 6 subplots. (Corrección: Uso de subplots_adjust)"""
plt.style.use('ggplot')
try:
# --- GRÁFICO 1: CPU Total (Línea) ---
ax_cpu.clear()
ax_cpu.plot(config.tiempos, config.datos_cpu, color='red', linewidth=2)
ax_cpu.set_ylim(0, 100)
ax_cpu.set_title(f"CPU Total: {config.datos_cpu[-1]:.1f}%", fontsize=9)
ax_cpu.set_ylabel("Uso (%)", fontsize=7)
ax_cpu.tick_params(axis='both', which='major', labelsize=6)
ax_cpu.grid(True, linestyle='--', alpha=0.6)
# --- GRÁFICO 2: MEMORIA RAM (Línea) ---
ax_mem.clear()
ax_mem.plot(config.tiempos, config.datos_mem, color='blue', linewidth=2)
ax_mem.set_ylim(0, 100)
ax_mem.set_title(f"RAM Total: {config.datos_mem[-1]:.1f}%", fontsize=9)
ax_mem.set_ylabel("Uso (%)", fontsize=7)
ax_mem.tick_params(axis='both', which='major', labelsize=6)
ax_mem.grid(True, linestyle='--', alpha=0.6)
# --- GRÁFICO 3: CPU por Núcleo (Barra) ---
ax_cores.clear()
core_labels = [f"N{i}" for i in range(config.num_cores)]
ax_cores.bar(core_labels, config.datos_cores, color='darkred')
ax_cores.set_ylim(0, 100)
ax_cores.set_title("Uso por Núcleo", fontsize=9)
ax_cores.tick_params(axis='both', which='major', labelsize=6)
ax_cores.grid(axis='y', linestyle='--', alpha=0.6)
# --- GRÁFICO 4: Red (Línea) ---
ax_net.clear()
ax_net.plot(config.tiempos, config.datos_net_sent, color='green', label='Enviado', linewidth=1.5)
ax_net.plot(config.tiempos, config.datos_net_recv, color='orange', label='Recibido', linewidth=1.5)
ax_net.set_title(f"Tráfico de Red (KB/s)", fontsize=9)
ax_net.set_xlabel("Tiempo (s)", fontsize=7)
ax_net.set_ylabel("KB/s", fontsize=7)
ax_net.tick_params(axis='both', which='major', labelsize=6)
ax_net.legend(loc='upper right', fontsize=6)
ax_net.grid(True, linestyle='--', alpha=0.6)
# --- GRÁFICO 5: Distribución de Memoria (Tarta) ---
ax_pie.clear()
total_mem = mem_details.total
used_mem = mem_details.used
free_mem = mem_details.free
sizes = [used_mem, free_mem]
labels = [f'Usada ({sizes[0]/1024/1024:.0f}MB)', f'Libre ({sizes[1]/1024/1024:.0f}MB)']
colors = ['#ff9999','#66b3ff']
ax_pie.pie(sizes, labels=labels, colors=colors,
autopct='%1.1f%%', shadow=True, startangle=90, textprops={'fontsize': 7})
ax_pie.set_title(f"Memoria Total: {system_utils.bytes_a_human_readable(total_mem)}", fontsize=9)
ax_pie.axis('equal')
# --- GRÁFICO 6: Disk I/O (NUEVO) ---
ax_disk_io.clear()
ax_disk_io.plot(config.tiempos, config.datos_disk_read, color='purple', label='Lectura', linewidth=1.5)
ax_disk_io.plot(config.tiempos, config.datos_disk_write, color='brown', label='Escritura', linewidth=1.5)
max_io = max(max(config.datos_disk_read), max(config.datos_disk_write)) * 1.1 or 1
ax_disk_io.set_ylim(0, max_io)
ax_disk_io.set_title(f"Disco I/O (MB/s)", fontsize=9)
ax_disk_io.set_xlabel("Tiempo (s)", fontsize=7)
ax_disk_io.set_ylabel("MB/s", fontsize=7)
ax_disk_io.tick_params(axis='both', which='major', labelsize=6)
ax_disk_io.legend(loc='upper right', fontsize=6)
ax_disk_io.grid(True, linestyle='--', alpha=0.6)
# CORRECCIÓN DE DIBUJO (Ajuste manual)
plt.subplots_adjust(
left=0.07, right=0.98,
bottom=0.08, top=0.95,
wspace=0.3,
hspace=0.4
)
canvas.draw()
except Exception as e:
system_utils.log_event(f"ERROR CRÍTICO DE DIBUJO: Matplotlib falló con {e}. (Gráficos congelados)")
def actualizar_process_treeview(tree, processes_data):
"""Limpia y rellena el Treeview con los datos de los procesos."""
for item in tree.get_children():
tree.delete(item)
for p in processes_data:
tree.insert('', tk.END, values=(
p['pid'],
f"{p['cpu']:.1f}%",
f"{p['mem_mb']:.1f}MB",
p['num_threads'],
p['name']
))
def terminar_proceso(treeview_processes):
"""Intenta terminar el proceso seleccionado en el Treeview."""
selected_item = treeview_processes.focus()
if not selected_item:
messagebox.showwarning("Advertencia", "Selecciona un proceso para terminar.")
return
values = treeview_processes.item(selected_item, 'values')
pid_to_kill = int(values[0])
name_to_kill = values[-1]
if not messagebox.askyesno(
"Confirmación",
f"¿Estás seguro de que quieres terminar el proceso {name_to_kill} (PID: {pid_to_kill})?"
):
return
def kill_thread(pid, name):
"""Función que ejecuta el kill en un hilo y registra el resultado."""
try:
proc = psutil.Process(pid)
proc.terminate()
system_utils.log_event(f"Proceso {name} (PID: {pid}) terminado exitosamente.")
except psutil.NoSuchProcess:
system_utils.log_event(f"ERROR: Proceso {name} (PID: {pid}) no encontrado.")
except psutil.AccessDenied:
system_utils.log_event(f"ERROR: No se pudo terminar el proceso {name}. Permiso denegado.")
except Exception as e:
system_utils.log_event(f"ERROR al terminar {name}: {e}")
threading.Thread(target=kill_thread, args=(pid_to_kill, name_to_kill)).start()