# monitor_manager.py import tkinter as tk import psutil import matplotlib.pyplot as plt from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg import threading import time import platform import datetime import csv from tkinter import messagebox, ttk import requests from bs4 import BeautifulSoup import re # Importaciones directas de módulos (Acceso con el prefijo del módulo) import config import system_utils # =============================================== # Lógica del Panel Lateral (Resumen Rápido) # =============================================== def actualizar_resumen_lateral(root): """Hilo que actualiza la información básica del sistema.""" boot_time_timestamp = psutil.boot_time() while config.monitor_running: try: # 1. Hostname, OS, Uptime... hostname_str = platform.node() os_name = platform.system() os_version = platform.release() arch = platform.machine() os_str = f"{os_name} {os_version} ({arch})" current_time = time.time() uptime_seconds = int(current_time - boot_time_timestamp) uptime_delta = str(datetime.timedelta(seconds=uptime_seconds)) # Chequeo antes de llamar a root.after if root.winfo_exists(): root.after(0, config.label_hostname.config, {"text": f"Host: {hostname_str}"}) root.after(0, config.label_os_info.config, {"text": f"OS: {os_str}"}) root.after(0, config.label_uptime.config, {"text": f"Uptime: {uptime_delta.split('.')[0]}"}) else: break # Salir si la ventana ya no existe except Exception as e: if root.winfo_exists(): root.after(0, system_utils.log_event, f"Error en hilo de resumen lateral: {e}") time.sleep(5) # Actualizar cada 5 segundos def crear_panel_lateral(frame, root): """Crea el panel lateral izquierdo SOLO con el resumen rápido.""" # --- Sección de Resumen del Sistema --- resumen_frame = tk.LabelFrame(frame, text="Resumen Rápido", padx=10, pady=10) resumen_frame.pack(fill="x", padx=10, pady=10) label_style = {'font': ('Helvetica', 9, 'bold'), 'anchor': 'w', 'bg': frame['bg']} config.label_hostname = tk.Label(resumen_frame, text="Host: Cargando...", **label_style) config.label_hostname.pack(fill="x", pady=2) config.label_os_info = tk.Label(resumen_frame, text="OS: Cargando...", **label_style) config.label_os_info.pack(fill="x", pady=2) config.label_uptime = tk.Label(resumen_frame, text="Uptime: Cargando...", **label_style) config.label_uptime.pack(fill="x", pady=2) # Iniciar el hilo de actualización del resumen summary_thread = threading.Thread(target=lambda: actualizar_resumen_lateral(root)) summary_thread.daemon = True summary_thread.start() # =============================================== # Lógica de Web Scraping # =============================================== def scrappear_pagina_principal(url, tipo_extraccion, output_text_widget, progress_bar, selector, atributo, config_data, root): """ Realiza la extracción de datos de la URL. Ahora acepta selector, atributo y config_data. Se ha eliminado el límite de elementos para los modos avanzados y combinados. """ # 1. Validación y Control if config.scraping_running: root.after(0, system_utils.log_event, "Ya hay una extracción en curso. Detenla primero.") return config.scraping_running = True # Si hay configuración JSON cargada, se usan esos datos if config_data: try: # Si el JSON contiene URL, se usa, sino se usa la de la interfaz (ya actualizada por system_utils) if 'url' in config_data: url = config_data.get('url') tipo_extraccion = config_data.get('type', tipo_extraccion) selector = config_data.get('selector', selector) atributo = config_data.get('attribute', atributo) root.after(0, system_utils.log_event, f"Usando configuración JSON: Tipo={tipo_extraccion}, Selector={selector}") except Exception as e: root.after(0, system_utils.log_event, f"ERROR al leer config JSON: {e}") config.scraping_running = False return # Validación específica para modos avanzados is_advanced = tipo_extraccion in ["-> Texto Específico (CSS Selector)", "-> Atributo Específico (CSS Selector + Attr)", "Portátiles Gamer (Enlace + Precio)"] if is_advanced and not selector and tipo_extraccion != "Portátiles Gamer (Enlace + Precio)": root.after(0, system_utils.log_event, "ERROR: El modo avanzado requiere un Selector CSS/Tag.") root.after(0, lambda: progress_bar.stop()) config.scraping_running = False return def perform_scraping(): if not root.winfo_exists() or not config.scraping_running: config.scraping_running = False return # 2. Preparar UI (hilo principal) root.after(0, progress_bar.start, 10) root.after(0, system_utils.log_event, f"Iniciando extracción de '{tipo_extraccion}' en: {url}...") root.after(0, lambda: output_text_widget.delete('1.0', tk.END)) root.after(0, lambda: output_text_widget.insert(tk.END, f"--- EXTRACCIÓN EN CURSO: {url} ---\n\n")) try: # 3. Realizar la solicitud HTTP con headers mejorados headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36', 'Accept-Language': 'es-ES,es;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9', 'Referer': 'https://www.google.com/', 'Connection': 'keep-alive', } response = requests.get(url, headers=headers, timeout=30) # Aumento timeout por seguridad response.raise_for_status() # 4. Analizar el contenido soup = BeautifulSoup(response.text, 'html.parser') result_text = "" # 5. Extracción basada en el tipo seleccionado # --- EXTRACCIÓN COMBINADA AMAZON (SIN LÍMITE) --- if tipo_extraccion == "Portátiles Gamer (Enlace + Precio)": # Selector de contenedor genérico para cada resultado de Amazon PRODUCT_CONTAINER = "div.s-result-item" containers = soup.select(PRODUCT_CONTAINER) if containers: result_text += f"--- {len(containers)} CONTENEDORES DE PRODUCTO ENCONTRADOS --- \n\n" for i, container in enumerate(containers): # 1. Encontrar el enlace/título principal: Usamos el selector que funciona para el enlace. link_tag = container.select_one('h2 a') if not link_tag: # Selector de respaldo que te funcionó parcialmente antes link_tag = container.select_one('a.a-link-normal.s-underline-text.s-underline-link-text.s-link-style.a-text-normal') # 2. Extraer el título: Buscamos el SPAN que contiene el texto del título (clase usada por Amazon) title_span = container.select_one('span.a-text-normal') # 3. Extraer Precio (dentro del contenedor) price_whole_tag = container.select_one('span.a-price-whole') price_symbol_tag = container.select_one('span.a-price-symbol') title = title_span.get_text(strip=True) if title_span else "N/A (Título Span Falló)" link = link_tag.get('href') if link_tag else "N/A" price = f"{price_whole_tag.get_text(strip=True)}{price_symbol_tag.get_text(strip=True)}" if price_whole_tag and price_symbol_tag else "Precio No Encontrado" # Formato de Salida result_text += f"[{i+1}] TÍTULO: {title}\n" result_text += f" PRECIO: {price}\n" # Manejo de enlaces relativos de Amazon if link.startswith('/'): result_text += f" ENLACE: https://www.amazon.es{link}\n" else: result_text += f" ENLACE: {link}\n" result_text += "---------------------------------------\n" else: result_text += f"ERROR: No se encontraron contenedores de producto con el selector: '{PRODUCT_CONTAINER}'.\n" # --- MODOS BÁSICOS Y AVANZADOS (SIN LÍMITE) --- elif tipo_extraccion == "Título y Metadatos": title = soup.title.string if soup.title else "N/A" description_tag = soup.find('meta', attrs={'name': 'description'}) desc_content = description_tag.get('content') if description_tag else "N/A" result_text += f"TÍTULO: {title}\n" result_text += f"DESCRIPCIÓN: {desc_content}\n" elif tipo_extraccion == "Primeros Párrafos": paragraphs = soup.find_all('p', limit=10) if paragraphs: for i, p in enumerate(paragraphs): text = p.get_text(strip=True) result_text += f"PARRAFO {i+1}:\n{text[:300]}{'...' if len(text) > 300 else ''}\n\n" else: result_text += "No se encontraron párrafos.\n" elif tipo_extraccion == "Enlaces (Links)": links = soup.find_all('a', href=True) if links: for i, link in enumerate(links): text = link.get_text(strip=True)[:50] or "Link sin texto" result_text += f"[{i+1}] TEXTO: {text} \n URL: {link['href']}\n\n" else: result_text += "No se encontraron enlaces.\n" elif tipo_extraccion == "Imágenes (URLs)": images = soup.find_all('img', src=True) if images: for i, img in enumerate(images): alt_text = img.get('alt', 'N/A') result_text += f"[{i+1}] ALT: {alt_text[:50]} \n URL: {img['src']}\n\n" else: result_text += "No se encontraron etiquetas de imagen ().\n" elif tipo_extraccion == "Tablas (Estructura Básica)": tables = soup.find_all('table', limit=5) if tables: for i, table in enumerate(tables): result_text += f"\n--- TABLA {i+1} ---\n" rows = table.find_all(['tr']) for row in rows[:10]: cols = row.find_all(['td', 'th']) row_data = [re.sub(r'\s+', ' ', col.get_text(strip=True)) for col in cols] result_text += " | ".join(row_data) + "\n" result_text += "--- FIN TABLA ---\n" else: result_text += "No se encontraron tablas ().\n" elif tipo_extraccion == "-> Texto Específico (CSS Selector)": elements = soup.select(selector) if elements: result_text += f"--- {len(elements)} ELEMENTOS ENCONTRADOS CON SELECTOR: '{selector}' ---\n\n" for i, el in enumerate(elements): text = el.get_text(strip=True) result_text += f"[{i+1}]: {text[:300]}{'...' if len(text) > 300 else ''}\n\n" else: result_text += f"No se encontraron elementos con el selector: '{selector}'.\n" elif tipo_extraccion == "-> Atributo Específico (CSS Selector + Attr)": if not atributo: result_text += f"ERROR: El modo Atributo requiere un Selector y un Atributo (ej: 'href', 'src').\n" else: elements = soup.select(selector) if elements: result_text += f"--- {len(elements)} ATRIBUTOS '{atributo}' ENCONTRADOS CON SELECTOR: '{selector}' ---\n\n" for i, el in enumerate(elements): attr_value = el.get(atributo, "N/A (Atributo no encontrado)") result_text += f"[{i+1}] VALOR: {attr_value}\n" else: result_text += f"No se encontraron elementos con el selector: '{selector}'.\n" result_text += "\n--- EXTRACCIÓN FINALIZADA ---\n" # 6. Mostrar el resultado en la UI if root.winfo_exists() and config.scraping_running: root.after(0, lambda: output_text_widget.delete('1.0', tk.END)) root.after(0, lambda: output_text_widget.insert(tk.END, result_text)) root.after(0, system_utils.log_event, "Scrapear finalizado con éxito.") except requests.exceptions.RequestException as e: error_msg = f"ERROR de Red o HTTP: {e}" if root.winfo_exists() and config.scraping_running: root.after(0, lambda: output_text_widget.insert(tk.END, error_msg)) root.after(0, system_utils.log_event, error_msg) except Exception as e: error_msg = f"ERROR inesperado al analizar el contenido: {e}" if root.winfo_exists() and config.scraping_running: root.after(0, lambda: output_text_widget.insert(tk.END, error_msg)) root.after(0, system_utils.log_event, error_msg) finally: # 7. Limpieza final config.scraping_running = False if root.winfo_exists(): root.after(0, progress_bar.stop) root.after(0, progress_bar.config, {"value": 0}) if not root.winfo_exists(): return root.after(0, system_utils.log_event, f"Estado de Scrapear reseteado. Detenido: {not config.scraping_running}") # Lanzar el hilo de Scrapping threading.Thread(target=perform_scraping, daemon=True).start() # =============================================== # Monitoreo del Sistema (Existente) # =============================================== def get_top_processes(limit=10): """Obtiene los N procesos con mayor uso de CPU y sus métricas.""" processes_list = [] for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_info', 'num_threads']): try: mem_info = proc.info['memory_info'] cpu_percent = proc.info['cpu_percent'] num_threads = proc.info['num_threads'] if cpu_percent is not None and cpu_percent > 0.0: processes_list.append({ 'pid': proc.info['pid'], 'name': proc.info['name'], 'cpu': cpu_percent, 'mem_mb': mem_info.rss / (1024 * 1024) if mem_info else 0, 'num_threads': num_threads if num_threads is not None else 0 }) except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): continue except Exception as e: system_utils.log_event(f"Error inesperado en get_top_processes: {e}") continue processes_list.sort(key=lambda x: x['cpu'], reverse=True) return processes_list[:limit] def iniciar_monitor_sistema(fig, canvas, ax_cpu, ax_mem, ax_net, ax_cores, ax_pie, ax_disk_io, treeview_processes, root): """Función que inicia el hilo de recolección de métricas.""" monitor_thread = threading.Thread( target=actualizar_metricas, args=(fig, canvas, ax_cpu, ax_mem, ax_net, ax_cores, ax_pie, ax_disk_io, treeview_processes, root) ) monitor_thread.daemon = True monitor_thread.start() def actualizar_metricas(fig, canvas, ax_cpu, ax_mem, ax_net, ax_cores, ax_pie, ax_disk_io, treeview_processes, root): """Bucle principal del monitor: recolecta datos y actualiza los gráficos/tablas.""" # Inicialización de contadores de E/S de disco y red net_io = psutil.net_io_counters() last_bytes_sent = net_io.bytes_sent last_bytes_recv = net_io.bytes_recv disk_io = psutil.disk_io_counters() last_read_bytes = disk_io.read_bytes last_write_bytes = disk_io.write_bytes psutil.cpu_percent(interval=None) while config.monitor_running: try: # 1. Recolección de datos cpu_usage = psutil.cpu_percent(interval=None) mem_details = psutil.virtual_memory() mem_usage = mem_details.percent core_usages = psutil.cpu_percent(interval=None, percpu=True) for i, usage in enumerate(config.datos_cores): config.datos_cores[i] = core_usages[i] current_net_io = psutil.net_io_counters() speed_sent = (current_net_io.bytes_sent - last_bytes_sent) speed_recv = (current_net_io.bytes_recv - last_bytes_recv) last_bytes_sent = current_net_io.bytes_sent last_bytes_recv = current_net_io.bytes_recv current_disk_io = psutil.disk_io_counters() speed_read = (current_disk_io.read_bytes - last_read_bytes) speed_write = (current_disk_io.write_bytes - last_write_bytes) last_read_bytes = current_disk_io.read_bytes last_write_bytes = current_disk_io.write_bytes top_processes = get_top_processes(limit=10) # Detección de Procesos Zombis zombie_count = sum(1 for p in psutil.process_iter(['status']) if p.info['status'] == psutil.STATUS_ZOMBIE) # 2. Actualizar datos de gráficos config.datos_cpu.pop(0); config.datos_cpu.append(cpu_usage) config.datos_mem.pop(0); config.datos_mem.append(mem_usage) config.datos_net_sent.pop(0); config.datos_net_sent.append(speed_sent / 1024) config.datos_net_recv.pop(0); config.datos_net_recv.append(speed_recv / 1024) config.datos_disk_read.pop(0); config.datos_disk_read.append(speed_read / (1024 * 1024)) config.datos_disk_write.pop(0); config.datos_disk_write.append(speed_write / (1024 * 1024)) # --- Chequeo de existencia de ventana ANTES de llamadas Tkinter --- if not root.winfo_exists(): break # Detección de Zombis (actualización de log si root existe) if zombie_count > 0: root.after(0, system_utils.log_event, f"ALERTA: Se detectaron {zombie_count} procesos ZOMBI.") # 3. Lógica de registro CSV if config.registro_csv_activo: timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") data_row = [timestamp, config.datos_cpu[-1], config.datos_mem[-1], config.datos_net_sent[-1], config.datos_net_recv[-1]] try: with open(config.archivo_registro_csv, mode='a', newline='') as file: writer = csv.writer(file) writer.writerow(data_row) except Exception as e: root.after(0, system_utils.log_event, f"ERROR al escribir en CSV: {e}") config.registro_csv_activo = False root.after(0, config.label_2.config, {"text": "Registro: ERROR", "bg": "red"}) # 4. Actualizar Gráficos y Treeview (en el hilo principal) # Chequeo de existencia de los widgets que reciben la actualización if canvas.get_tk_widget().winfo_exists() and treeview_processes.winfo_exists(): root.after(0, lambda: dibujar_graficos(fig, canvas, ax_cpu, ax_mem, ax_net, ax_cores, ax_pie, ax_disk_io, mem_details, root)) root.after(0, lambda: actualizar_process_treeview(treeview_processes, top_processes)) except Exception as e: if root.winfo_exists(): root.after(0, system_utils.log_event, f"Error en el hilo de monitor: {e}") time.sleep(1) def dibujar_graficos(fig, canvas, ax_cpu, ax_mem, ax_net, ax_cores, ax_pie, ax_disk_io, mem_details, root): """Dibuja y actualiza los 6 subplots. (Corrección: Uso de subplots_adjust)""" plt.style.use('ggplot') try: # --- GRÁFICO 1: CPU Total (Línea) --- ax_cpu.clear() ax_cpu.plot(config.tiempos, config.datos_cpu, color='red', linewidth=2) ax_cpu.set_ylim(0, 100) ax_cpu.set_title(f"CPU Total: {config.datos_cpu[-1]:.1f}%", fontsize=9) ax_cpu.set_ylabel("Uso (%)", fontsize=7) ax_cpu.tick_params(axis='both', which='major', labelsize=6) ax_cpu.grid(True, linestyle='--', alpha=0.6) # --- GRÁFICO 2: MEMORIA RAM (Línea) --- ax_mem.clear() ax_mem.plot(config.tiempos, config.datos_mem, color='blue', linewidth=2) ax_mem.set_ylim(0, 100) ax_mem.set_title(f"RAM Total: {config.datos_mem[-1]:.1f}%", fontsize=9) ax_mem.set_ylabel("Uso (%)", fontsize=7) ax_mem.tick_params(axis='both', which='major', labelsize=6) ax_mem.grid(True, linestyle='--', alpha=0.6) # --- GRÁFICO 3: CPU por Núcleo (Barra) --- ax_cores.clear() core_labels = [f"N{i}" for i in range(config.num_cores)] ax_cores.bar(core_labels, config.datos_cores, color='darkred') ax_cores.set_ylim(0, 100) ax_cores.set_title("Uso por Núcleo", fontsize=9) ax_cores.tick_params(axis='both', which='major', labelsize=6) ax_cores.grid(axis='y', linestyle='--', alpha=0.6) # --- GRÁFICO 4: Red (Línea) --- ax_net.clear() ax_net.plot(config.tiempos, config.datos_net_sent, color='green', label='Enviado', linewidth=1.5) ax_net.plot(config.tiempos, config.datos_net_recv, color='orange', label='Recibido', linewidth=1.5) ax_net.set_title(f"Tráfico de Red (KB/s)", fontsize=9) ax_net.set_xlabel("Tiempo (s)", fontsize=7) ax_net.set_ylabel("KB/s", fontsize=7) ax_net.tick_params(axis='both', which='major', labelsize=6) ax_net.legend(loc='upper right', fontsize=6) ax_net.grid(True, linestyle='--', alpha=0.6) # --- GRÁFICO 5: Distribución de Memoria (Tarta) --- ax_pie.clear() total_mem = mem_details.total used_mem = mem_details.used free_mem = mem_details.free sizes = [used_mem, free_mem] labels = [f'Usada ({sizes[0]/1024/1024:.0f}MB)', f'Libre ({sizes[1]/1024/1024:.0f}MB)'] colors = ['#ff9999','#66b3ff'] ax_pie.pie(sizes, labels=labels, colors=colors, autopct='%1.1f%%', shadow=True, startangle=90, textprops={'fontsize': 7}) ax_pie.set_title(f"Memoria Total: {system_utils.bytes_a_human_readable(total_mem)}", fontsize=9) ax_pie.axis('equal') # --- GRÁFICO 6: Disk I/O (NUEVO) --- ax_disk_io.clear() ax_disk_io.plot(config.tiempos, config.datos_disk_read, color='purple', label='Lectura', linewidth=1.5) ax_disk_io.plot(config.tiempos, config.datos_disk_write, color='brown', label='Escritura', linewidth=1.5) max_io = max(max(config.datos_disk_read), max(config.datos_disk_write)) * 1.1 or 1 ax_disk_io.set_ylim(0, max_io) ax_disk_io.set_title(f"Disco I/O (MB/s)", fontsize=9) ax_disk_io.set_xlabel("Tiempo (s)", fontsize=7) ax_disk_io.set_ylabel("MB/s", fontsize=7) ax_disk_io.tick_params(axis='both', which='major', labelsize=6) ax_disk_io.legend(loc='upper right', fontsize=6) ax_disk_io.grid(True, linestyle='--', alpha=0.6) # CORRECCIÓN DE DIBUJO (Ajuste manual) plt.subplots_adjust( left=0.07, right=0.98, bottom=0.08, top=0.95, wspace=0.3, hspace=0.4 ) canvas.draw() except Exception as e: system_utils.log_event(f"ERROR CRÍTICO DE DIBUJO: Matplotlib falló con {e}. (Gráficos congelados)") def actualizar_process_treeview(tree, processes_data): """Limpia y rellena el Treeview con los datos de los procesos.""" for item in tree.get_children(): tree.delete(item) for p in processes_data: tree.insert('', tk.END, values=( p['pid'], f"{p['cpu']:.1f}%", f"{p['mem_mb']:.1f}MB", p['num_threads'], p['name'] )) def terminar_proceso(treeview_processes): """Intenta terminar el proceso seleccionado en el Treeview.""" selected_item = treeview_processes.focus() if not selected_item: messagebox.showwarning("Advertencia", "Selecciona un proceso para terminar.") return values = treeview_processes.item(selected_item, 'values') pid_to_kill = int(values[0]) name_to_kill = values[-1] if not messagebox.askyesno( "Confirmación", f"¿Estás seguro de que quieres terminar el proceso {name_to_kill} (PID: {pid_to_kill})?" ): return def kill_thread(pid, name): """Función que ejecuta el kill en un hilo y registra el resultado.""" try: proc = psutil.Process(pid) proc.terminate() system_utils.log_event(f"Proceso {name} (PID: {pid}) terminado exitosamente.") except psutil.NoSuchProcess: system_utils.log_event(f"ERROR: Proceso {name} (PID: {pid}) no encontrado.") except psutil.AccessDenied: system_utils.log_event(f"ERROR: No se pudo terminar el proceso {name}. Permiso denegado.") except Exception as e: system_utils.log_event(f"ERROR al terminar {name}: {e}") threading.Thread(target=kill_thread, args=(pid_to_kill, name_to_kill)).start()