#!/usr/bin/env python3 """Dashboard principal del Proyecto Global.""" from __future__ import annotations import datetime import json import os import shutil import queue import random import socket import subprocess import sys import tempfile import threading import time import webbrowser from dataclasses import dataclass from typing import Any, Callable import tkinter as tk from tkinter import filedialog, messagebox, scrolledtext, ttk from tkinter import font as tkfont # -------------------- dependencias opcionales -------------------- try: import psutil # type: ignore except Exception: # pragma: no cover psutil = None # type: ignore try: import matplotlib matplotlib.use('TkAgg') from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg from matplotlib.figure import Figure MATPLOTLIB_AVAILABLE = True except Exception: # pragma: no cover MATPLOTLIB_AVAILABLE = False Figure = None FigureCanvasTkAgg = None try: import pygame # type: ignore pygame_available = True except Exception: # pragma: no cover pygame_available = False try: import requests # type: ignore REQUESTS_AVAILABLE = True except Exception: # pragma: no cover requests = None # type: ignore REQUESTS_AVAILABLE = False try: from bs4 import BeautifulSoup BEAUTIFULSOUP_AVAILABLE = True except Exception: # pragma: no cover BEAUTIFULSOUP_AVAILABLE = False SERVER_HOST_DEFAULT = '127.0.0.1' SERVER_PORT_DEFAULT = 3333 # Clave predeterminada facilitada por el usuario para OpenWeather. # Puede sobrescribirse exportando OPENWEATHER_API_KEY en el entorno. OPENWEATHER_FALLBACK_API_KEY = os.environ.get( 'OPENWEATHER_FALLBACK_API_KEY', '431239407e3628578c83e67180cf720f' ).strip() _OPENWEATHER_ENV_KEY = os.environ.get('OPENWEATHER_API_KEY', '').strip() OPENWEATHER_API_KEY = _OPENWEATHER_ENV_KEY or OPENWEATHER_FALLBACK_API_KEY JAVEA_LATITUDE = 38.789166 JAVEA_LONGITUDE = 0.163055 # -------------------- paleta visual -------------------- PRIMARY_BG = '#f4f5fb' PANEL_BG = '#ffffff' SECONDARY_BG = '#eef2ff' ACCENT_COLOR = '#ff6b6b' ACCENT_DARK = '#c44569' TEXT_COLOR = '#1f2d3d' SUBTEXT_COLOR = '#67708a' BUTTON_BG = '#e7ecff' BUTTON_ACTIVE_BG = '#ffd6d1' def _env_float(name: str, default: float) -> float: value = os.environ.get(name) if value is None: return default try: return float(value) except ValueError: return default WALLAPOP_DEVICE_ID = os.environ.get('WALLAPOP_DEVICE_ID', '48ca24ec-2e8c-4dbb-9f0b-6b4f99194626').strip() WALLAPOP_APP_VERSION = os.environ.get('WALLAPOP_APP_VERSION', '814060').strip() WALLAPOP_MPID = os.environ.get('WALLAPOP_MPID', '6088013701866267655').strip() WALLAPOP_DEVICE_OS = os.environ.get('WALLAPOP_DEVICE_OS', '0').strip() WALLAPOP_USER_AGENT = os.environ.get( 'WALLAPOP_USER_AGENT', 'Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) ' 'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Mobile Safari/537.36' ).strip() WALLAPOP_SEC_CH_UA = os.environ.get('WALLAPOP_SEC_CH_UA', '"Not_A Brand";v="99", "Chromium";v="142"') WALLAPOP_SEC_CH_UA_MOBILE = os.environ.get('WALLAPOP_SEC_CH_UA_MOBILE', '?1') WALLAPOP_SEC_CH_UA_PLATFORM = os.environ.get('WALLAPOP_SEC_CH_UA_PLATFORM', '"Android"') WALLAPOP_LATITUDE = _env_float('WALLAPOP_LATITUDE', 40.416775) WALLAPOP_LONGITUDE = _env_float('WALLAPOP_LONGITUDE', -3.70379) WALLAPOP_COUNTRY_CODE = os.environ.get('WALLAPOP_COUNTRY_CODE', 'ES').strip() or 'ES' WALLAPOP_LANGUAGE = os.environ.get('WALLAPOP_LANGUAGE', 'es').strip() or 'es' WALLAPOP_CATEGORY_ID = os.environ.get('WALLAPOP_CATEGORY_ID', '').strip() WALLAPOP_REFERER = os.environ.get('WALLAPOP_REFERER', 'https://es.wallapop.com/').strip() WALLAPOP_HEADERS = { 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'es,es-ES;q=0.9', 'Referer': WALLAPOP_REFERER, 'User-Agent': WALLAPOP_USER_AGENT, 'x-deviceid': WALLAPOP_DEVICE_ID, 'x-deviceos': WALLAPOP_DEVICE_OS, 'deviceos': WALLAPOP_DEVICE_OS, 'x-appversion': WALLAPOP_APP_VERSION, 'mpid': WALLAPOP_MPID, 'sec-ch-ua': WALLAPOP_SEC_CH_UA, 'sec-ch-ua-mobile': WALLAPOP_SEC_CH_UA_MOBILE, 'sec-ch-ua-platform': WALLAPOP_SEC_CH_UA_PLATFORM } class ChatClient: """Cliente TCP básico.""" def __init__(self, on_message): self._on_message = on_message self._sock: socket.socket | None = None self._lock = threading.Lock() self._connected = False def connect(self, host: str, port: int) -> bool: with self._lock: if self._connected: return True try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((host, port)) self._sock = sock self._connected = True threading.Thread(target=self._recv_loop, daemon=True).start() return True except Exception as exc: self._on_message(f'[ERROR] Conexión fallida: {exc}') return False def _recv_loop(self): try: while self._connected and self._sock: data = self._sock.recv(4096) if not data: break text = data.decode('utf-8', errors='replace').strip() self._on_message(text) except Exception as exc: self._on_message(f'[ERROR] {exc}') finally: with self._lock: self._connected = False if self._sock: try: self._sock.close() except Exception: pass self._sock = None self._on_message('[INFO] Conexión cerrada') def send(self, text: str) -> bool: with self._lock: if not self._connected or not self._sock: return False try: self._sock.sendall(text.encode('utf-8')) return True except Exception: self._connected = False return False def close(self): with self._lock: self._connected = False if self._sock: try: self._sock.close() except Exception: pass self._sock = None class DashboardApp(tk.Tk): def __init__(self) -> None: super().__init__() self.title('Panel de laboratorio - Proyecto Global') self.geometry('1220x740') self.minsize(1024, 660) self.configure(bg=PRIMARY_BG) self._setup_fonts() self._maximize_with_borders() self.columnconfigure(0, weight=0) self.columnconfigure(1, weight=1) self.columnconfigure(2, weight=0) self.rowconfigure(0, weight=0) self.rowconfigure(1, weight=1) self.rowconfigure(2, weight=0) self._running = True self._chat_queue: 'queue.Queue[str]' = queue.Queue() self._scraping_queue: 'queue.Queue[tuple[str, ...]]' = queue.Queue() self._traffic_last = psutil.net_io_counters() if psutil else None self._resource_history = {'cpu': [], 'mem': [], 'threads': []} self._resource_poll_job: str | None = None self.chat_client = ChatClient(self._enqueue_chat_message) self.alarm_counter = 1 self.active_alarms: list[dict[str, datetime.datetime | str]] = [] self.game_window_canvas = None self.game_window_status = None self.game_move_queue: queue.Queue | None = None self.game_queue_processor_active = False self.game_done_event: threading.Event | None = None self.game_finish_line = 0 self.game_racer_names: list[str] = [] self.game_camel_count = 3 self.game_speed_factor = 1.0 self.music_temp_file: str | None = None self.weather_api_key = OPENWEATHER_API_KEY self.results_area: tk.Frame | None = None self.results_title: tk.Label | None = None self.scraping_popup: tk.Toplevel | None = None self.simple_scraping_popup: tk.Toplevel | None = None self.weather_popup: tk.Toplevel | None = None self.chart_canvas = None self.ax_cpu = None self.ax_mem = None self.ax_threads = None self.line_cpu = None self.line_mem = None self.mem_fill = None self.thread_bars = None if psutil: try: self._ps_process = psutil.Process(os.getpid()) except Exception: self._ps_process = None else: self._ps_process = None self._build_header() self._build_left_panel() self._build_center_panel() self._build_right_panel() self._build_status_bar() self._update_clock() if psutil: self.after(1000, self._update_traffic) try: psutil.cpu_percent(interval=None) except Exception: pass self._resource_poll_job = self.after(1000, self._resource_poll_tick) threading.Thread(target=self._chat_loop, daemon=True).start() self.after(100, self._process_scraping_queue) self.after(1000, self._refresh_alarms_loop) # ------------------------ UI ------------------------ def _maximize_with_borders(self) -> None: def _apply_zoom(): # Intentar diferentes atributos para maximizar/pantalla completa en diferentes OS for attr in ('-zoomed',): try: self.attributes(attr, True) return except tk.TclError: continue try: self.state('zoomed') except tk.TclError: pass self.after(0, _apply_zoom) def _setup_fonts(self) -> None: base_family = 'Segoe UI' self.font_title = tkfont.Font(family=base_family, size=16, weight='bold') self.font_header = tkfont.Font(family=base_family, size=13, weight='bold') self.font_body = tkfont.Font(family=base_family, size=11) self.font_small = tkfont.Font(family=base_family, size=10) def _build_header(self) -> None: header = tk.Frame(self, bg=PANEL_BG, bd=0, highlightthickness=0) header.grid(row=0, column=0, columnspan=3, sticky='new') header.grid_columnconfigure(tuple(range(7)), weight=1) tabs = [ ('T1. Procesos', '#4c6ef5'), ('T2. Threads', '#ff6b6b'), ('T3. Sockets', '#ffa94d'), ('T4. Servicios', '#51cf66'), ('T5. Seguridad', '#845ef7'), ('Configuración', '#2f3545') ] for idx, (text, color) in enumerate(tabs): badge = tk.Label( header, text=text, fg=color, bg=PANEL_BG, font=self.font_title, padx=16, pady=8 ) badge.grid(row=0, column=idx, padx=6, pady=8) def _build_left_panel(self) -> None: panel = tk.Frame(self, width=220, bg=SECONDARY_BG, bd=0, highlightthickness=0) panel.grid(row=1, column=0, sticky='nsw', padx=6, pady=(50,6)) panel.grid_propagate(False) self.left_panel = panel self._left_section('Acciones') self._left_button('Analizar Wallapop', self._open_wallapop_popup) self._left_button('Scraping simple', self._open_simple_scraping_popup) self._left_button('Navegar', lambda: self._open_web('https://www.google.com')) self._left_button('API Tiempo', self._show_weather_popup) self._left_section('Aplicaciones') self._left_button('Visual Code', lambda: self._launch_process(['code'])) self._left_button('Camellos', self._open_game_window) self._left_button('App3', lambda: self._launch_process(['firefox'])) self._left_section('Procesos batch') self._left_button('Copias de seguridad', self._run_backup_script) def _left_section(self, text: str) -> None: tk.Label(self.left_panel, text=text.upper(), bg=SECONDARY_BG, fg=SUBTEXT_COLOR, font=self.font_small).pack(anchor='w', padx=16, pady=(16,4)) def _left_button(self, text: str, command) -> None: btn = tk.Button( self.left_panel, text=text, width=20, bg=BUTTON_BG, activebackground=BUTTON_ACTIVE_BG, relief='flat', command=command ) btn.pack(fill='x', padx=16, pady=4) def _build_center_panel(self) -> None: center = tk.Frame(self, bg=PANEL_BG, bd=0) center.grid(row=1, column=1, sticky='nsew', padx=6, pady=(50,6)) center.rowconfigure(1, weight=1) center.columnconfigure(0, weight=1) self.center_panel = center self._build_notebook() self._build_notes_panel() def _build_notebook(self) -> None: style = ttk.Style() style.configure('Custom.TNotebook', background=PANEL_BG, borderwidth=0) style.configure('Custom.TNotebook.Tab', padding=(16, 8), font=self.font_body) style.map('Custom.TNotebook.Tab', background=[('selected', '#dde2ff')]) notebook = ttk.Notebook(self.center_panel, style='Custom.TNotebook') notebook.grid(row=0, column=0, sticky='nsew', padx=4, pady=4) self.notebook = notebook self.tab_resultados = tk.Frame(notebook, bg='white') self.tab_navegador = tk.Frame(notebook, bg='white') self.tab_correos = tk.Frame(notebook, bg='white') self.tab_tareas = tk.Frame(notebook, bg='white') self.tab_alarmas = tk.Frame(notebook, bg='white') self.tab_enlaces = tk.Frame(notebook, bg='white') self.tab_bloc = tk.Frame(notebook, bg='white') notebook.add(self.tab_resultados, text='Resultados') notebook.add(self.tab_navegador, text='Navegador') notebook.add(self.tab_correos, text='Correos') notebook.add(self.tab_bloc, text='Bloc de notas') notebook.add(self.tab_tareas, text='Tareas') notebook.add(self.tab_alarmas, text='Alarmas') notebook.add(self.tab_enlaces, text='Enlaces') self._build_tab_resultados() self._build_tab_navegador() self._build_tab_correos() self._build_tab_bloc_notas() self._build_tab_tareas() self._build_tab_alarmas() self._build_tab_enlaces() def _build_tab_resultados(self) -> None: wrapper = tk.Frame(self.tab_resultados, bg=PANEL_BG) wrapper.pack(fill='both', expand=True) self.results_title = tk.Label(wrapper, text='Resultados de actividades', font=self.font_header, bg=PANEL_BG, fg=TEXT_COLOR) self.results_title.pack(pady=(12, 4)) toolbar = tk.Frame(wrapper, bg=PANEL_BG) toolbar.pack(fill='x', padx=12, pady=(0, 4)) tk.Button(toolbar, text='Ver monitor del sistema', command=self._show_resource_monitor, bg=BUTTON_BG, relief='flat').pack(side='left', padx=(0, 8)) tk.Button(toolbar, text='Limpiar resultados', command=self._reset_results_view, bg=BUTTON_BG, relief='flat').pack(side='left') self.results_area = tk.Frame(wrapper, bg='#f4f7ff', bd=1, relief='solid') self.results_area.pack(fill='both', expand=True, padx=12, pady=(0, 12)) self._reset_results_view() def _prepare_results_area(self) -> None: self._stop_game_queue_processor() if not self.results_area: return for child in self.results_area.winfo_children(): child.destroy() self.chart_canvas = None self.ax_cpu = None self.ax_mem = None self.ax_threads = None self.line_cpu = None self.line_mem = None self.mem_fill = None self.thread_bars = None self.game_window_canvas = None self.game_window_status = None def _reset_results_view(self) -> None: self._prepare_results_area() if self.results_title: self.results_title.config(text='Resultados de actividades') if self.results_area: tk.Label( self.results_area, text='Ejecute una actividad para ver aquí sus resultados más recientes.', bg='#f4f7ff', fg=SUBTEXT_COLOR, font=self.font_body, wraplength=520 ).pack(expand=True, padx=12, pady=12) def _render_results_view(self, title: str, builder: Callable[[tk.Frame], None]) -> None: if self.results_title: self.results_title.config(text=f'Resultados • {title}') self._prepare_results_area() if not self.results_area: return builder(self.results_area) def _show_resource_monitor(self) -> None: def builder(parent: tk.Frame) -> None: if not (MATPLOTLIB_AVAILABLE and psutil): tk.Label( parent, text='Instale matplotlib + psutil para habilitar el monitor del sistema.', fg=ACCENT_COLOR, bg='#f4f7ff', font=self.font_body ).pack(fill='both', expand=True, padx=20, pady=20) return chart_frame = tk.Frame(parent, bg=PANEL_BG) chart_frame.pack(fill='both', expand=True, padx=12, pady=12) fig = Figure(figsize=(7.2, 5.6), dpi=100) self.ax_cpu = fig.add_subplot(311) self.ax_mem = fig.add_subplot(312) self.ax_threads = fig.add_subplot(313) self.ax_cpu.set_title('CPU (línea)') self.ax_cpu.set_ylim(0, 100) self.ax_cpu.set_ylabel('%') self.ax_cpu.grid(True, alpha=0.25) self.line_cpu, = self.ax_cpu.plot([], [], label='CPU', color='#ff6b6b', linewidth=2) self.ax_mem.set_title('Memoria (área)') self.ax_mem.set_ylim(0, 100) self.ax_mem.set_ylabel('%') self.ax_mem.grid(True, alpha=0.2) self.line_mem, = self.ax_mem.plot([], [], color='#4ecdc4', linewidth=1.5) self.ax_threads.set_title('Hilos del proceso (barras)') self.ax_threads.set_ylabel('Hilos') self.ax_threads.grid(True, axis='y', alpha=0.2) fig.tight_layout(pad=2.2) self.chart_canvas = FigureCanvasTkAgg(fig, master=chart_frame) self.chart_canvas.get_tk_widget().pack(fill='both', expand=True) self.ax_cpu.set_xlim(0, 40) self.ax_mem.set_xlim(0, 40) self.ax_threads.set_xlim(-0.5, 39.5) self.chart_canvas.draw_idle() self._render_results_view('Monitoreo del sistema', builder) def _show_text_result(self, title: str, text: str) -> None: def builder(parent: tk.Frame) -> None: frame = tk.Frame(parent, bg='white') frame.pack(fill='both', expand=True, padx=12, pady=12) box = scrolledtext.ScrolledText(frame, height=12) box.pack(fill='both', expand=True) box.insert('1.0', text) box.config(state='disabled') self._render_results_view(title, builder) def _format_weather_summary(self, data: dict[str, Any]) -> str: main = data.get('main', {}) weather = data.get('weather', [{}])[0] wind = data.get('wind', {}) temp = main.get('temp') feels = main.get('feels_like') humidity = main.get('humidity') desc = weather.get('description', '').capitalize() wind_speed = wind.get('speed') timestamp = data.get('dt') updated = datetime.datetime.fromtimestamp(timestamp).strftime('%d/%m %H:%M') if timestamp else 'N/D' summary = [ f'Descripción : {desc or "N/D"}', f'Temperatura : {temp:.1f}°C' if temp is not None else 'Temperatura : N/D', f'Sensación : {feels:.1f}°C' if feels is not None else 'Sensación : N/D', f'Humedad : {humidity}%' if humidity is not None else 'Humedad : N/D', f'Veloc. viento: {wind_speed:.1f} m/s' if wind_speed is not None else 'Veloc. viento: N/D', f'Actualizado : {updated}' ] return '\n'.join(summary) def _get_javea_weather_snapshot(self) -> tuple[str, str, bool]: try: data = self._fetch_javea_weather() summary = self._format_weather_summary(data) timestamp = data.get('dt') status = 'Última lectura: ' status += datetime.datetime.fromtimestamp(timestamp).strftime('%d/%m %H:%M') if timestamp else 'N/D' return summary, status, False except Exception as exc: return str(exc), 'No se pudo obtener el clima', True def _show_weather_popup(self) -> None: self._log('Abriendo ventana del tiempo para Jávea') text, status, is_error = self._get_javea_weather_snapshot() if self.weather_popup and self.weather_popup.winfo_exists(): self.weather_popup.destroy() popup = tk.Toplevel(self) popup.title('Tiempo en Jávea') popup.geometry('760x500') popup.minsize(760, 500) popup.resizable(False, False) popup.configure(bg='white') popup.transient(self) popup.grab_set() self.weather_popup = popup header = tk.Label(popup, text='OpenWeather • Jávea, España', font=self.font_header, bg='white', fg=TEXT_COLOR) header.pack(pady=(12, 6)) info_label = tk.Label( popup, text=text, justify='left', font=('Consolas', 12), anchor='nw', bg='white', fg='red' if is_error else TEXT_COLOR, padx=6, pady=6 ) info_label.pack(fill='both', expand=True, padx=18, pady=8) status_label = tk.Label(popup, text=status, font=self.font_small, bg='white', fg=SUBTEXT_COLOR) status_label.pack(pady=(0, 8)) def close_popup() -> None: if popup.winfo_exists(): popup.destroy() if self.weather_popup is popup: self.weather_popup = None def refresh() -> None: new_text, new_status, new_is_error = self._get_javea_weather_snapshot() info_label.config(text=new_text, fg='red' if new_is_error else '#1f2d3d') status_label.config(text=new_status) buttons = tk.Frame(popup, bg='white') buttons.pack(pady=(4, 14)) tk.Button(buttons, text='Actualizar', command=refresh, bg=BUTTON_BG, relief='flat').pack(side='left', padx=10) tk.Button(buttons, text='Cerrar', command=close_popup, bg=BUTTON_BG, relief='flat').pack(side='left', padx=10) popup.protocol('WM_DELETE_WINDOW', close_popup) def _build_tab_navegador(self) -> None: tk.Label(self.tab_navegador, text='Abrir URL en navegador externo', bg='white').pack(pady=6) frame = tk.Frame(self.tab_navegador, bg='white') frame.pack(pady=6) self.url_entry = tk.Entry(frame, width=48) self.url_entry.insert(0, 'https://www.python.org') self.url_entry.pack(side='left', padx=4) tk.Button(frame, text='Abrir', command=lambda: self._open_web(self.url_entry.get())).pack(side='left', padx=4) def _build_tab_correos(self) -> None: tk.Label( self.tab_correos, text='Área futura para gestionar correos.' '\nDe momento, utiliza la pestaña "Bloc de notas" para escribir.', bg='white', font=('Arial', 12), justify='left' ).pack(pady=20) def _build_tab_bloc_notas(self) -> None: tk.Label(self.tab_bloc, text='Bloc de notas', bg='white', font=('Arial', 12, 'bold')).pack(pady=6) toolbar = tk.Frame(self.tab_bloc, bg='white') toolbar.pack(fill='x') tk.Button(toolbar, text='Abrir', command=self._open_text).pack(side='left', padx=4, pady=4) tk.Button(toolbar, text='Guardar', command=self._save_text).pack(side='left', padx=4, pady=4) self.editor = scrolledtext.ScrolledText(self.tab_bloc, wrap='word') self.editor.pack(fill='both', expand=True, padx=6, pady=6) def _build_tab_tareas(self) -> None: container = tk.Frame(self.tab_tareas, bg='white') container.pack(fill='both', expand=True) scraping_frame = tk.Frame(container, bg='white', padx=10, pady=10) scraping_frame.pack(side='left', fill='both', expand=True) tk.Label(scraping_frame, text='Análisis de enlaces Wallapop', bg='white', font=('Arial', 12, 'bold')).pack(pady=(12,4)) if not (REQUESTS_AVAILABLE and BEAUTIFULSOUP_AVAILABLE): tk.Label( scraping_frame, text='Instala requests y beautifulsoup4 para habilitar el análisis de enlaces.', bg='white', fg='red', wraplength=260, justify='left' ).pack(fill='x', padx=10, pady=10) else: tk.Label( scraping_frame, text='Pulsa el botón "Analizar Wallapop" en el panel izquierdo para introducir la URL de un anuncio. ' 'Aquí verás notas generales o recordatorios.', bg='white', wraplength=260, justify='left' ).pack(fill='x', padx=10, pady=10) game_frame = tk.Frame(container, bg='white') game_frame.pack(side='left', fill='both', expand=True, padx=10) tk.Label(game_frame, text='Juego de camellos (ventana dedicada)', bg='white', font=('Arial', 12, 'bold')).pack(pady=(6,2)) tk.Label( game_frame, text='Abre el simulador en una ventana independiente para ver la carrera a pantalla completa.', wraplength=260, justify='left', bg='white' ).pack(padx=8, pady=8) tk.Button(game_frame, text='Abrir juego', command=self._open_game_window).pack(pady=6) def _build_tab_alarmas(self) -> None: self.tab_alarmas.configure(bg=PANEL_BG) card = tk.Frame(self.tab_alarmas, bg='#fef9f4', bd=0, padx=18, pady=18) card.pack(pady=16, padx=20, fill='both', expand=True) tk.Label(card, text='Programar nuevas alarmas', font=self.font_header, bg='#fef9f4', fg=TEXT_COLOR).pack(pady=(0,8)) form = tk.Frame(card, bg='#fef9f4') form.pack(pady=6) tk.Label(form, text='Minutos', bg='#fef9f4', font=self.font_body).grid(row=0, column=0, padx=6) self.alarm_minutes = tk.Spinbox(form, from_=1, to=240, width=6, justify='center') self.alarm_minutes.grid(row=0, column=1, padx=6) tk.Label(form, text='Título', bg='#fef9f4', font=self.font_body).grid(row=0, column=2, padx=6) self.alarm_title = tk.Entry(form, width=24) self.alarm_title.grid(row=0, column=3, padx=6) tk.Button(form, text='Agregar alarma', bg=BUTTON_BG, relief='flat', command=self._start_alarm).grid(row=0, column=4, padx=10) tk.Label(card, text='Alarmas activas', font=self.font_body, bg='#fef9f4', fg=SUBTEXT_COLOR).pack(pady=(18,6)) self.alarm_list = tk.Listbox(card, height=8, width=50, bd=0, highlightthickness=1, highlightbackground='#e3e5f0') self.alarm_list.pack(pady=4, fill='x') tk.Button(card, text='Cancelar seleccionada', command=self._cancel_selected_alarm, bg=BUTTON_BG, relief='flat').pack(pady=(10,4)) self.alarm_status = tk.Label(card, text='Sin alarmas programadas', bg='#fef9f4', fg=SUBTEXT_COLOR, font=self.font_small) self.alarm_status.pack(pady=4) def _build_tab_enlaces(self) -> None: tk.Label(self.tab_enlaces, text='Enlaces útiles', bg='white', font=('Arial', 12, 'bold')).pack(pady=6) for text, url in [ ('Documentación Tkinter', 'https://docs.python.org/3/library/tk.html'), ('psutil', 'https://psutil.readthedocs.io'), ('Matplotlib', 'https://matplotlib.org') ]: tk.Button(self.tab_enlaces, text=text, command=lambda u=url: self._open_web(u)).pack(pady=3) def _build_notes_panel(self) -> None: notes = tk.LabelFrame(self.center_panel, text='Panel de notas e hilos', bg='#eefee8') notes.grid(row=1, column=0, sticky='nsew', padx=6, pady=(4,6)) self.notes = scrolledtext.ScrolledText(notes, height=5) self.notes.pack(fill='both', expand=True, padx=6, pady=6) def _build_right_panel(self) -> None: right = tk.Frame(self, width=280, bg=PANEL_BG, bd=0) right.grid(row=1, column=2, sticky='nse', padx=6, pady=(50,6)) right.grid_propagate(False) tk.Label(right, text='Chat', font=('Arial', 20, 'bold'), fg='red', bg='white').pack(pady=(6,4)) self.chat_display = scrolledtext.ScrolledText(right, width=30, height=12, state='disabled') self.chat_display.pack(padx=6, pady=4) tk.Label(right, text='Mensaje', bg='white').pack(anchor='w', padx=6) self.message_entry = tk.Text(right, height=4) self.message_entry.pack(padx=6, pady=4) tk.Button(right, text='enviar', bg='#d6f2ce', command=self._send_chat).pack(pady=4) conn = tk.Frame(right, bg='white') conn.pack(pady=4) tk.Label(conn, text='Host:', bg='white').grid(row=0, column=0) self.host_entry = tk.Entry(conn, width=12) self.host_entry.insert(0, SERVER_HOST_DEFAULT) self.host_entry.grid(row=0, column=1) tk.Label(conn, text='Puerto:', bg='white').grid(row=1, column=0) self.port_entry = tk.Entry(conn, width=6) self.port_entry.insert(0, str(SERVER_PORT_DEFAULT)) self.port_entry.grid(row=1, column=1) tk.Button(conn, text='Conectar', command=self._connect_chat).grid(row=0, column=2, rowspan=2, padx=4) tk.Label(right, text='Alumnos', font=('Arial', 14, 'bold'), bg='white').pack(pady=(10,4)) for name in ('Alumno 1', 'Alumno 2', 'Alumno 3'): frame = tk.Frame(right, bg='white', bd=1, relief='groove') frame.pack(fill='x', padx=6, pady=4) tk.Label(frame, text=name, bg='white', font=('Arial', 11, 'bold')).pack(anchor='w') tk.Label(frame, text='Lorem ipsum dolor sit amet, consectetur adipiscing elit.', wraplength=220, justify='left', bg='white').pack(anchor='w') player = tk.Frame(right, bg='#fdf5f5', bd=1, relief='flat', padx=8, pady=8) player.pack(fill='x', padx=8, pady=(10,6)) tk.Label(player, text='Reproductor música', font=self.font_header, bg='#fdf5f5', fg=ACCENT_DARK).pack(pady=(0,6)) def styled_btn(parent, text, command): return tk.Button( parent, text=text, command=command, bg='#ffe3df', activebackground='#ffd2ca', fg=TEXT_COLOR, relief='flat', width=20, pady=4 ) styled_btn(player, 'Seleccionar archivo', self._select_music).pack(pady=3) action_row = tk.Frame(player, bg='#fdf5f5') action_row.pack(pady=3) styled_btn(action_row, 'Pausa', self._pause_music).pack(side='left', padx=4) styled_btn(action_row, 'Reanudar', self._resume_music).pack(side='left', padx=4) styled_btn(player, 'Quitar', self._stop_music).pack(pady=3) def _build_status_bar(self) -> None: status = tk.Frame(self, bg='#f1f1f1', bd=2, relief='ridge') status.grid(row=2, column=0, columnspan=3, sticky='ew') for idx in range(3): status.columnconfigure(idx, weight=1) tk.Label(status, text='Correos sin leer', font=('Arial', 11, 'bold'), bg='#f1f1f1').grid(row=0, column=0, padx=16, pady=6, sticky='w') self.traffic_label = tk.Label( status, text='Red: ↑ --.- KB/s ↓ --.- KB/s', font=('Arial', 11, 'bold'), bg='#f1f1f1' ) self.traffic_label.grid(row=0, column=1, padx=16, pady=6) self.clock_label = tk.Label(status, text='--:--:--', font=('Arial', 12, 'bold'), bg='#f1f1f1') self.clock_label.grid(row=0, column=2, padx=16, pady=6, sticky='e') # ------------------------ acciones ------------------------ def _open_web(self, url: str) -> None: webbrowser.open(url) def _launch_process(self, cmd: list[str]) -> None: try: subprocess.Popen(cmd) self._log(f'Proceso lanzado: {cmd}') self._show_text_result('Aplicaciones', f'Se lanzó el proceso: {" ".join(cmd)}') except Exception as exc: messagebox.showerror('Error', f'No se pudo lanzar {cmd}: {exc}') def _run_backup_script(self) -> None: source = filedialog.askdirectory(title='Selecciona la carpeta a respaldar') if not source: return destination = filedialog.askdirectory(title='Selecciona la carpeta destino para la copia') if not destination: return ts = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') src_name = os.path.basename(os.path.normpath(source)) or 'backup' target = os.path.join(destination, f'{src_name}_{ts}') if os.path.exists(target): messagebox.showerror('Backup', f'Ya existe la carpeta destino:\n{target}') return self._log(f'Iniciando copia de seguridad de {source} a {target}') self._show_text_result('Copias de seguridad', f'Copiando archivos...\nOrigen: {source}\nDestino: {target}') self.notebook.select(self.tab_resultados) def worker() -> None: try: copied_files, copied_bytes, skipped = self._copy_directory_with_progress(source, target) except Exception as exc: self._scraping_queue.put(('error', f'Backup: {exc}')) return summary = ( f'Copia completada correctamente.\n\n' f'Origen: {source}\nDestino: {target}\n' f'Archivos copiados: {copied_files}\n' f'Tamaño total: {copied_bytes / (1024*1024):.2f} MB' ) if skipped: summary += f"\nArchivos omitidos: {skipped} (consulta el panel de notas)" self._scraping_queue.put(('backup_success', summary, target)) threading.Thread(target=worker, daemon=True).start() def _copy_directory_with_progress(self, source: str, target: str) -> tuple[int, int, int]: copied_files = 0 copied_bytes = 0 skipped = 0 for root, dirs, files in os.walk(source): rel = os.path.relpath(root, source) dest_dir = os.path.join(target, rel) if rel != '.' else target os.makedirs(dest_dir, exist_ok=True) for file in files: src_path = os.path.join(root, file) dest_path = os.path.join(dest_dir, file) try: shutil.copy2(src_path, dest_path) copied_files += 1 try: copied_bytes += os.path.getsize(dest_path) except OSError: pass except Exception as exc: skipped += 1 self._log(f'Archivo omitido ({src_path}): {exc}') continue if copied_files % 20 == 0: self._log(f'Copia en progreso: {copied_files} archivos...') return copied_files, copied_bytes, skipped def _open_saved_file(self, path: str) -> None: if not path or not os.path.exists(path): messagebox.showwarning('Archivo', 'El archivo indicado ya no existe.') return try: if sys.platform.startswith('win'): os.startfile(path) # type: ignore[attr-defined] elif sys.platform == 'darwin': subprocess.Popen(['open', path]) else: subprocess.Popen(['xdg-open', path]) except Exception as exc: messagebox.showerror('Archivo', f'No se pudo abrir el archivo: {exc}') def _show_file_popup(self, path: str) -> None: popup = tk.Toplevel(self) popup.title('Archivo generado') popup.configure(bg='white', padx=16, pady=16) popup.resizable(False, False) popup.transient(self) popup.grab_set() tk.Label(popup, text='Se creó el archivo con los datos:', font=('Arial', 12, 'bold'), bg='white').pack(anchor='w') entry = tk.Entry(popup, width=60) entry.pack(fill='x', pady=8) entry.insert(0, path) entry.config(state='readonly') buttons = tk.Frame(popup, bg='white') buttons.pack(fill='x', pady=(8,0)) tk.Button(buttons, text='Abrir archivo', command=lambda p=path: self._open_saved_file(p)).pack(side='left', padx=4) tk.Button(buttons, text='Cerrar', command=popup.destroy).pack(side='right', padx=4) def _open_text(self) -> None: path = filedialog.askopenfilename(filetypes=[('Texto', '*.txt'), ('Todos', '*.*')]) if not path: return with open(path, 'r', encoding='utf-8', errors='ignore') as fh: self.editor.delete('1.0', 'end') self.editor.insert('1.0', fh.read()) self._log(f'Abriste {path}') def _save_text(self) -> None: path = filedialog.asksaveasfilename(defaultextension='.txt') if not path: return with open(path, 'w', encoding='utf-8') as fh: fh.write(self.editor.get('1.0', 'end')) self._log(f'Guardado en {path}') def _open_simple_scraping_popup(self) -> None: if not (REQUESTS_AVAILABLE and BEAUTIFULSOUP_AVAILABLE): messagebox.showerror('Scraping', 'Instala requests y beautifulsoup4 para usar esta función.') return if self.simple_scraping_popup and tk.Toplevel.winfo_exists(self.simple_scraping_popup): self.simple_scraping_popup.lift() return popup = tk.Toplevel(self) popup.title('Scraping simple') popup.configure(bg='white', padx=18, pady=18) popup.resizable(False, False) popup.transient(self) popup.grab_set() tk.Label( popup, text='Introduce una o varias URL completas (una por línea):', bg='white', font=('Arial', 11, 'bold'), justify='left' ).pack(anchor='w') url_box = scrolledtext.ScrolledText(popup, width=60, height=5) url_box.pack(pady=(8, 4)) url_box.insert('1.0', 'https://es.wallapop.com\nhttps://www.wikipedia.org') error_label = tk.Label(popup, text='', fg='red', bg='white', wraplength=420, justify='left') error_label.pack(anchor='w') btn_frame = tk.Frame(popup, bg='white') btn_frame.pack(fill='x', pady=(10, 0)) def _close_popup() -> None: if self.simple_scraping_popup: try: self.simple_scraping_popup.destroy() except Exception: pass self.simple_scraping_popup = None def _start(_: object | None = None) -> None: raw = url_box.get('1.0', 'end').strip() urls = [line.strip() for line in raw.splitlines() if line.strip()] if not urls: error_label.config(text='Añade al menos una URL completa.') return invalid = [u for u in urls if not u.startswith(('http://', 'https://'))] if invalid: error_label.config(text=f'Las URL deben empezar por http:// o https:// (revisa: {invalid[0]})') return _close_popup() self._start_simple_scraping(urls) tk.Button(btn_frame, text='Iniciar', command=_start, bg='#d6f2ce').pack(side='left', padx=4) tk.Button(btn_frame, text='Cancelar', command=_close_popup).pack(side='right', padx=4) url_box.bind('', _start) popup.bind('', _start) popup.protocol('WM_DELETE_WINDOW', _close_popup) self.simple_scraping_popup = popup url_box.focus_set() def _start_simple_scraping(self, urls: list[str]) -> None: if not (REQUESTS_AVAILABLE and BEAUTIFULSOUP_AVAILABLE): messagebox.showerror('Scraping', 'Instala requests y beautifulsoup4 para usar esta función.') return cleaned = [url.strip() for url in urls if url.strip()] if not cleaned: messagebox.showinfo('Scraping', 'No hay URL válidas para analizar.') return self._log(f'Scraping simple para {len(cleaned)} URL(s)') preview = 'Analizando las siguientes URL:\n' + '\n'.join(f'• {url}' for url in cleaned) self._show_text_result('Scraping simple', preview) self.notebook.select(self.tab_resultados) threading.Thread(target=self._simple_scraping_runner, args=(cleaned,), daemon=True).start() def _simple_scraping_runner(self, urls: list[str]) -> None: results: list[dict[str, str]] = [] lock = threading.Lock() threads: list[threading.Thread] = [] for url in urls: t = threading.Thread(target=self._simple_scraping_worker, args=(url, results, lock), daemon=True) t.start() threads.append(t) for thread in threads: thread.join() self._scraping_queue.put(('generic_success', urls, results)) def _simple_scraping_worker(self, url: str, results: list[dict[str, str]], lock: threading.Lock) -> None: timestamp = datetime.datetime.now().strftime('%H:%M:%S') if not (requests and BEAUTIFULSOUP_AVAILABLE): data = { 'url': url, 'error': 'Dependencias de scraping no disponibles', 'timestamp': timestamp } else: try: time.sleep(1) headers = { 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36' } response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') title = soup.title.string.strip() if soup.title and soup.title.string else 'Sin título' num_links = len(soup.find_all('a')) num_images = len(soup.find_all('img')) num_paragraphs = len(soup.find_all('p')) meta_desc = '' meta_tag = soup.find('meta', attrs={'name': 'description'}) if meta_tag and meta_tag.get('content'): content = meta_tag['content'] meta_desc = content[:100] + '...' if len(content) > 100 else content data = { 'url': url, 'titulo': title, 'descripcion': meta_desc, 'num_links': num_links, 'num_imagenes': num_images, 'num_parrafos': num_paragraphs, 'longitud': len(response.text), 'status_code': response.status_code, 'timestamp': timestamp } except Exception as exc: data = { 'url': url, 'error': str(exc), 'timestamp': timestamp } with lock: results.append(data) # --- Funcionalidad de Web Scraping (Wallapop) --- def _open_wallapop_popup(self) -> None: if not (REQUESTS_AVAILABLE and BEAUTIFULSOUP_AVAILABLE): messagebox.showerror('Wallapop', 'Instala requests y beautifulsoup4 para analizar enlaces.') return if self.scraping_popup and tk.Toplevel.winfo_exists(self.scraping_popup): self.scraping_popup.lift() return popup = tk.Toplevel(self) popup.title('Búsqueda Wallapop') popup.configure(bg='white', padx=18, pady=18) popup.resizable(False, False) popup.transient(self) popup.grab_set() tk.Label( popup, text='Escribe el producto que quieres buscar en Wallapop:', bg='white', font=('Arial', 11, 'bold') ).pack(anchor='w') entry = tk.Entry(popup, width=60) entry.pack(pady=(8, 4)) entry.insert(0, 'PlayStation 5') controls = tk.Frame(popup, bg='white') controls.pack(fill='x', pady=(2, 0)) tk.Label(controls, text='Resultados a mostrar:', bg='white').pack(side='left') results_spin = tk.Spinbox(controls, from_=1, to=20, width=5, justify='center') results_spin.pack(side='left', padx=(6, 0)) results_spin.delete(0, 'end') results_spin.insert(0, '5') error_label = tk.Label(popup, text='', fg='red', bg='white', wraplength=380, justify='left') error_label.pack(anchor='w') btn_frame = tk.Frame(popup, bg='white') btn_frame.pack(fill='x', pady=(10, 0)) def _close_popup() -> None: if self.scraping_popup: try: self.scraping_popup.destroy() except Exception: pass self.scraping_popup = None def _start(_: object | None = None) -> None: query = entry.get().strip() if not query: error_label.config(text='Necesitas introducir un producto a buscar.') return try: limit = max(1, min(20, int(results_spin.get()))) except ValueError: error_label.config(text='Selecciona un número válido de resultados.') return _close_popup() self._start_wallapop_search(query, limit) tk.Button(btn_frame, text='Buscar', command=_start, bg='#d6f2ce').pack(side='left', padx=4) tk.Button(btn_frame, text='Cancelar', command=_close_popup).pack(side='right', padx=4) entry.bind('', _start) popup.protocol('WM_DELETE_WINDOW', _close_popup) self.scraping_popup = popup entry.focus_set() def _start_wallapop_search(self, query: str, limit: int) -> None: if not (REQUESTS_AVAILABLE and BEAUTIFULSOUP_AVAILABLE): messagebox.showerror('Wallapop', 'Instala requests y beautifulsoup4 para usar esta función.') return clean_query = query.strip() if not clean_query: messagebox.showerror('Wallapop', 'Escribe el producto que quieres buscar.') return max_results = max(1, min(20, int(limit))) self._log(f'Buscando en Wallapop: "{clean_query}" (máx. {max_results})') self._show_text_result( 'Wallapop', f'Buscando "{clean_query}" en Wallapop...\nMostrando hasta {max_results} resultados.' ) self.notebook.select(self.tab_resultados) threading.Thread( target=self._wallapop_search_worker, args=(clean_query, max_results), daemon=True ).start() def _wallapop_search_worker(self, query: str, limit: int) -> None: try: results = self._fetch_wallapop_search_results(query, limit) if not results: raise ValueError('No se encontraron anuncios para tu búsqueda.') self._scraping_queue.put(('search_success', query, results)) except Exception as exc: self._scraping_queue.put(('search_error', str(exc))) def _fetch_wallapop_search_results(self, query: str, limit: int) -> list[dict[str, str]]: errors: list[str] = [] for fetcher in (self._fetch_wallapop_search_via_api, self._fetch_wallapop_search_via_html): try: results = fetcher(query, limit) if results: return results[:limit] except Exception as exc: errors.append(str(exc)) if errors: raise ValueError('No se pudieron obtener resultados de Wallapop.\n' + '\n'.join(errors)) return [] def _fetch_wallapop_search_via_api(self, query: str, limit: int) -> list[dict[str, str]]: if not requests: return [] def _base_params() -> dict[str, object]: params: dict[str, object] = { 'keywords': query, 'source': 'search_box', 'filters_source': 'quick_filters', 'order_by': 'most_relevance', 'latitude': f'{WALLAPOP_LATITUDE:.6f}', 'longitude': f'{WALLAPOP_LONGITUDE:.6f}', 'country_code': WALLAPOP_COUNTRY_CODE, 'language': WALLAPOP_LANGUAGE } if WALLAPOP_CATEGORY_ID: params['category_id'] = WALLAPOP_CATEGORY_ID return params results: list[dict[str, str]] = [] seen_ids: set[str] = set() next_page: str | None = None attempts = 0 while len(results) < limit and attempts < 6: params = {'next_page': next_page} if next_page else _base_params() resp = requests.get( 'https://api.wallapop.com/api/v3/search', params=params, headers=WALLAPOP_HEADERS, timeout=20 ) try: resp.raise_for_status() except requests.HTTPError as exc: # type: ignore[attr-defined] if resp.status_code == 403: raise ValueError( 'Wallapop devolvió 403. Asegúrate de definir WALLAPOP_DEVICE_ID/WALLAPOP_MPID ' 'con valores obtenidos desde tu navegador.' ) from exc raise payload = resp.json() items = self._extract_wallapop_items(payload) if not items: break for entry in items: if not isinstance(entry, dict): continue item_id = entry.get('id') or entry.get('itemId') or entry.get('item_id') if not item_id: continue item_id = str(item_id) if item_id in seen_ids: continue seen_ids.add(item_id) slug = entry.get('web_slug') or entry.get('slug') or item_id seller_info = entry.get('seller') if isinstance(entry.get('seller'), dict) else None detail = self._format_wallapop_item(entry, self._build_wallapop_url(str(slug)), seller_info) results.append(detail) if len(results) >= limit: break next_page = self._extract_wallapop_next_page(payload) if not next_page: break attempts += 1 return results def _fetch_wallapop_search_via_html(self, query: str, limit: int) -> list[dict[str, str]]: if not (requests and BEAUTIFULSOUP_AVAILABLE): return [] params = {'keywords': query} resp = requests.get( 'https://es.wallapop.com/app/search', params=params, headers=WALLAPOP_HEADERS, timeout=20 ) resp.raise_for_status() soup = BeautifulSoup(resp.text, 'html.parser') script = soup.find('script', id='__NEXT_DATA__') if not script or not script.string: return [] data = json.loads(script.string) page_props = data.get('props', {}).get('pageProps', {}) items: list[dict[str, object]] = [] for key in ('searchResults', 'results', 'items'): candidate = page_props.get(key) if isinstance(candidate, list): items = candidate break if not items: initial_state = page_props.get('initialState', {}) if isinstance(initial_state, dict): for key in ('items', 'results'): candidate = initial_state.get(key) if isinstance(candidate, list): items = candidate break if not items: search_state = initial_state.get('search') if isinstance(search_state, dict): candidate = search_state.get('items') if isinstance(candidate, list): items = candidate elif isinstance(search_state.get('search'), dict): nested = search_state['search'].get('items') if isinstance(nested, list): items = nested if not items: return [] results: list[dict[str, str]] = [] seen: set[str] = set() for entry in items: if not isinstance(entry, dict): continue slug = ( entry.get('slug') or entry.get('webSlug') or entry.get('web_slug') or entry.get('shareLink') or entry.get('share_link') or entry.get('url') ) url = self._build_wallapop_url(str(slug)) if slug else '' item_id = entry.get('id') or entry.get('itemId') or entry.get('item_id') entry_key = str(item_id or url) if not entry_key or entry_key in seen: continue seen.add(entry_key) detail: dict[str, str] | None = None if url: try: detail = self._fetch_wallapop_item(url) except Exception: detail = None if not detail and item_id: try: detail = self._fetch_wallapop_item_from_api(str(item_id)) except Exception: detail = None if detail: if url and not detail.get('link'): detail['link'] = url results.append(detail) if len(results) >= limit: break return results def _extract_wallapop_items(self, payload: dict[str, object]) -> list[dict[str, object]]: items: list[dict[str, object]] = [] data_block = payload.get('data') if isinstance(data_block, dict): section = data_block.get('section') if isinstance(section, dict): section_payload = section.get('payload') if isinstance(section_payload, dict): raw_items = section_payload.get('items') or section_payload.get('section_items') if isinstance(raw_items, list): items.extend(item for item in raw_items if isinstance(item, dict)) elif isinstance(data_block.get('items'), list): items.extend(item for item in data_block['items'] if isinstance(item, dict)) if not items: web_results = payload.get('web_search_results') if isinstance(web_results, dict): sections = web_results.get('sections') if isinstance(sections, list): for section in sections: if not isinstance(section, dict): continue raw_items = section.get('items') or section.get('section_items') if isinstance(raw_items, list): items.extend(item for item in raw_items if isinstance(item, dict)) return items def _extract_wallapop_next_page(self, payload: dict[str, object]) -> str | None: meta = payload.get('meta') if isinstance(meta, dict): token = meta.get('next_page') if isinstance(token, str) and token: return token headers = payload.get('headers') if isinstance(headers, dict): token = headers.get('X-NextPage') or headers.get('x-nextpage') if isinstance(token, str) and token: return token return None def _fetch_wallapop_item(self, url: str) -> dict[str, str]: if not requests or not BEAUTIFULSOUP_AVAILABLE: raise RuntimeError('Dependencias de scraping no disponibles') resp = requests.get(url, headers=WALLAPOP_HEADERS, timeout=20) resp.raise_for_status() soup = BeautifulSoup(resp.text, 'html.parser') script = soup.find('script', id='__NEXT_DATA__') if not script or not script.string: raise ValueError('No se pudo encontrar la información del anuncio (sin datos Next.js).') data = json.loads(script.string) page_props = data.get('props', {}).get('pageProps', {}) item = page_props.get('item') or page_props.get('itemInfo', {}).get('item') if not item: raise ValueError('El formato de la página cambió y no se pudo leer el anuncio.') seller_info = page_props.get('itemSeller') if isinstance(page_props.get('itemSeller'), dict) else None return self._format_wallapop_item(item, url, seller_info) def _fetch_wallapop_item_from_api(self, item_id: str) -> dict[str, str]: if not requests: raise RuntimeError('Dependencias de scraping no disponibles') api_url = f'https://api.wallapop.com/api/v3/items/{item_id}' resp = requests.get(api_url, headers=WALLAPOP_HEADERS, timeout=15) resp.raise_for_status() data = resp.json() item = data.get('item') if isinstance(data.get('item'), dict) else data if not isinstance(item, dict): raise ValueError('Respuesta del API sin datos del artículo.') slug = item.get('web_slug') or item.get('slug') or item.get('share_link') link = self._build_wallapop_url(str(slug)) if slug else f'https://es.wallapop.com/item/{item_id}' seller_info = data.get('seller') if isinstance(data.get('seller'), dict) else item.get('seller') seller_dict = seller_info if isinstance(seller_info, dict) else None return self._format_wallapop_item(item, link, seller_dict) def _build_wallapop_url(self, slug_or_url: str) -> str: if not slug_or_url: return '' text = slug_or_url.strip() if text.startswith('http://') or text.startswith('https://'): return text text = text.strip('/') if not text.startswith('item/'): text = f'item/{text}' return f'https://es.wallapop.com/{text}' def _format_wallapop_item( self, item: dict[str, object], url: str, seller_info: dict[str, object] | None = None ) -> dict[str, str]: title_field = item.get('title') if isinstance(title_field, dict): title = title_field.get('original') or title_field.get('translated') or '(Sin título)' elif isinstance(title_field, str): title = title_field or '(Sin título)' else: title = '(Sin título)' price_text = 'Precio no disponible' price_data = item.get('price') price_cash = None if isinstance(price_data, dict): if isinstance(price_data.get('cash'), dict): price_cash = price_data['cash'] else: price_cash = price_data if isinstance(price_cash, dict): amount = price_cash.get('amount') currency = price_cash.get('currency') or 'EUR' if amount is not None: price_text = f'{amount} {currency}'.strip() elif isinstance(price_data, (int, float, str)): price_text = f'{price_data} EUR' desc_field = item.get('description') if isinstance(desc_field, dict): description = (desc_field.get('original') or desc_field.get('translated') or '').strip() elif isinstance(desc_field, str): description = desc_field.strip() else: description = '' location = item.get('location') if isinstance(item.get('location'), dict) else {} location_parts = [] if isinstance(location, dict): for part in (location.get('city'), location.get('postalCode'), location.get('regionName')): if part: location_parts.append(part) location_text = ', '.join(location_parts) if location_parts else 'Ubicación no disponible' seller_name = '' for candidate in (seller_info, item.get('seller'), item.get('user'), item.get('userInfo')): if isinstance(candidate, dict): seller_name = ( candidate.get('microName') or candidate.get('name') or candidate.get('userName') or '' ) if seller_name: break if not seller_name: seller_name = 'Vendedor no disponible' categories = ', '.join( cat.get('name') for cat in item.get('taxonomies', []) if isinstance(cat, dict) and cat.get('name') ) or 'Sin categoría' car_info = item.get('carInfo') if isinstance(item.get('carInfo'), dict) else {} car_bits: list[str] = [] if isinstance(car_info, dict): for key in ('year', 'km', 'engine', 'gearBox'): field = car_info.get(key) text = None if isinstance(field, dict): text = field.get('text') or field.get('iconText') elif isinstance(field, str): text = field if text: car_bits.append(text) extra = ', '.join(car_bits) images = item.get('images') if isinstance(item.get('images'), list) else [] image_url = '' for img in images: if not isinstance(img, dict): continue urls = img.get('urls') if isinstance(img.get('urls'), dict) else {} if isinstance(urls, dict): image_url = urls.get('medium') or urls.get('big') or urls.get('small') or '' else: image_url = img.get('url', '') if isinstance(img.get('url'), str) else '' if image_url: break return { 'title': str(title), 'price': price_text, 'description': description, 'link': url, 'location': location_text, 'seller': seller_name, 'category': categories, 'extra': extra, 'image': image_url } def _process_scraping_queue(self) -> None: if not self._running: return try: while True: status, *payload = self._scraping_queue.get_nowait() if status == 'error': message = payload[0] if payload else 'Error desconocido' self._handle_scraping_error(str(message)) elif status == 'search_success': query, data = payload self._handle_wallapop_search_success(str(query), list(data)) elif status == 'search_error': message = payload[0] if payload else 'Error desconocido' self._handle_scraping_error(str(message)) elif status == 'generic_success': urls, data = payload self._handle_generic_scraping_success(list(urls), list(data)) elif status == 'backup_success': summary, target = payload self._handle_backup_success(str(summary), str(target)) except queue.Empty: pass finally: if self._running: self.after(100, self._process_scraping_queue) def _handle_wallapop_search_success(self, query: str, results: list[dict[str, str]]) -> None: count = len(results) self._log(f'Se encontraron {count} anuncio(s) para "{query}" en Wallapop') def builder(parent: tk.Frame) -> None: frame = tk.Frame(parent, bg='white') frame.pack(fill='both', expand=True, padx=12, pady=12) summary = tk.Label( frame, text=f'{count} resultado(s) para "{query}"', font=('Arial', 14, 'bold'), bg='white', fg='#2c3e50' ) summary.pack(anchor='w', pady=(0, 6)) box = scrolledtext.ScrolledText(frame, wrap='word') box.pack(fill='both', expand=True) blocks: list[str] = [] for idx, item in enumerate(results, start=1): block_lines = [ f'{idx}. {item.get("title", "Sin título")}', f'Precio: {item.get("price", "N/D")}', f'Vendedor: {item.get("seller", "N/D")}', f'Ubicación: {item.get("location", "N/D")}', f'Categoría: {item.get("category", "N/D")}' ] extra = item.get('extra', '').strip() if extra: block_lines.append(f'Detalles: {extra}') description = (item.get('description') or '').strip() if description: block_lines.append('Descripción:') block_lines.append(description) block_lines.append(f'Enlace: {item.get("link", "N/D")}') block_lines.append('-' * 60) blocks.append('\n'.join(block_lines)) box.insert('1.0', '\n\n'.join(blocks)) box.config(state='disabled') self._render_results_view('Resultados Wallapop', builder) self.notebook.select(self.tab_resultados) try: path = self._save_wallapop_results_to_file(query, results) except Exception as exc: self._log(f'[ERROR] No se pudo guardar el archivo: {exc}') messagebox.showerror('Wallapop', f'No se pudo guardar el archivo: {exc}') return message = ( f'Se guardó el archivo con los resultados en:\n{path}\n\n' '¿Quieres abrirlo ahora?' ) if messagebox.askyesno('Wallapop', message): self._open_saved_file(path) else: self._log(f'Archivo de resultados guardado en {path}') def _handle_generic_scraping_success(self, urls: list[str], results: list[dict[str, str]]) -> None: self._log(f'Scraping simple completado para {len(results)} URL(s)') order = {url: idx for idx, url in enumerate(urls)} ordered = sorted(results, key=lambda item: order.get(item.get('url', ''), len(order))) def builder(parent: tk.Frame) -> None: frame = tk.Frame(parent, bg='white') frame.pack(fill='both', expand=True, padx=12, pady=12) summary = tk.Label( frame, text='Resultados de scraping simple', font=('Arial', 14, 'bold'), bg='white' ) summary.pack(anchor='w', pady=(0, 6)) box = scrolledtext.ScrolledText(frame, wrap='word') box.pack(fill='both', expand=True) blocks: list[str] = [] for idx, item in enumerate(ordered, start=1): lines = [f'{idx}. {item.get("url", "URL desconocida")}', f"Hora: {item.get('timestamp', 'N/D')}" ] if 'error' in item: lines.append(f"Error: {item.get('error')}") else: lines.extend([ f"Título: {item.get('titulo', 'Sin título')}", f"Descripción: {item.get('descripcion', '')}", f"Estado HTTP: {item.get('status_code', 'N/D')}", f"Longitud HTML: {item.get('longitud', 0)} caracteres", f"Links: {item.get('num_links', 0)} | Imágenes: {item.get('num_imagenes', 0)} | Párrafos: {item.get('num_parrafos', 0)}" ]) lines.append('-' * 60) blocks.append('\n'.join(lines)) box.insert('1.0', '\n\n'.join(blocks) if blocks else 'No se generaron resultados.') box.config(state='disabled') self._render_results_view('Scraping simple', builder) self.notebook.select(self.tab_resultados) def _handle_backup_success(self, summary: str, target: str) -> None: self._log('Copia de seguridad finalizada') self._show_text_result('Copias de seguridad', summary) self.notebook.select(self.tab_resultados) if messagebox.askyesno('Backup', f'{summary}\n\n¿Quieres abrir la carpeta destino?'): self._open_saved_file(target) def _save_wallapop_results_to_file(self, query: str, results: list[dict[str, str]]) -> str: timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') filename = f'wallapop_search_{timestamp}.txt' path = os.path.join(tempfile.gettempdir(), filename) lines = [ f'Consulta: {query}', f'Resultados: {len(results)}', f'Generado: {datetime.datetime.now():%Y-%m-%d %H:%M:%S}', '' ] for idx, item in enumerate(results, start=1): lines.append(f'Resultado {idx}') lines.append(f'Título: {item.get("title", "Sin título")}') lines.append(f'Precio: {item.get("price", "N/D")}') lines.append(f'Vendedor: {item.get("seller", "N/D")}') lines.append(f'Ubicación: {item.get("location", "N/D")}') lines.append(f'Categoría: {item.get("category", "N/D")}') if item.get('extra'): lines.append(f'Detalles: {item.get("extra")}') description = (item.get('description') or '').strip() if description: lines.append('Descripción:') lines.append(description) lines.append(f'Enlace: {item.get("link", "N/D")}') if item.get('image'): lines.append(f'Imagen: {item.get("image")}') lines.append('-' * 60) with open(path, 'w', encoding='utf-8') as fh: fh.write('\n'.join(lines)) return path def _handle_scraping_error(self, message: str) -> None: self._log(f'[ERROR] Wallapop: {message}') messagebox.showerror('Wallapop', message) # --- Fin Web Scraping --- def _open_game_window(self) -> None: self._render_results_view('Carrera de camellos', self._build_camel_race_view) def _build_camel_race_view(self, parent: tk.Frame) -> None: container = tk.Frame(parent, bg='white') container.pack(fill='both', expand=True) tk.Label( container, text='Juego de camellos (sincronización de hilos)', font=('Arial', 18, 'bold'), bg='white' ).pack(pady=(16, 8)) canvas = tk.Canvas(container, height=380, bg='#fdfdfd', bd=2, relief='sunken') canvas.pack(fill='both', expand=True, padx=16, pady=12) status = tk.Label(container, text='Listo para correr', font=('Arial', 14, 'bold'), bg='white', fg='#2c3e50') status.pack(pady=(0, 10)) controls = tk.Frame(container, bg='white') controls.pack(pady=(0, 10)) tk.Button( controls, text='Iniciar carrera', font=('Arial', 13, 'bold'), width=16, command=lambda: self._start_race(canvas, status) ).grid(row=0, column=0, padx=12, pady=4) tk.Button( controls, text='Volver a resultados', font=('Arial', 13), width=16, command=self._reset_results_view ).grid(row=0, column=1, padx=12, pady=4) tk.Label(controls, text='Número de camellos:', bg='white', font=('Arial', 12)).grid(row=1, column=0, pady=6) camel_spin = tk.Spinbox( controls, from_=3, to=8, width=5, justify='center', font=('Arial', 12), command=lambda: self._set_camel_count(int(camel_spin.get())) ) camel_spin.grid(row=1, column=1, pady=6) camel_spin.delete(0, 'end') camel_spin.insert(0, str(self.game_camel_count)) tk.Label(controls, text='Velocidad (1-5):', bg='white', font=('Arial', 12)).grid(row=2, column=0, pady=6) speed_scale = tk.Scale( controls, from_=1, to=5, orient='horizontal', resolution=0.5, length=180, bg='white', command=lambda value: self._set_speed_factor(float(value)) ) speed_scale.grid(row=2, column=1, pady=6) speed_scale.set(self.game_speed_factor) self.game_window_canvas = canvas self.game_window_status = status def _set_camel_count(self, value: int) -> None: self.game_camel_count = max(3, min(8, int(value))) def _set_speed_factor(self, value: float) -> None: self.game_speed_factor = max(1.0, min(5.0, float(value))) def _start_race(self, canvas=None, status_label=None) -> None: target_canvas = canvas or self.game_window_canvas if target_canvas is None: messagebox.showinfo('Juego', 'Abre el juego para iniciar la carrera.') return self._stop_game_queue_processor() target_canvas.delete('all') target_canvas.update_idletasks() width = target_canvas.winfo_width() or int(target_canvas.cget('width')) height = target_canvas.winfo_height() or int(target_canvas.cget('height')) finish = width - 40 camel_count = max(3, min(8, int(self.game_camel_count))) spacing = height / (camel_count + 1) palette = ['#ff7675', '#74b9ff', '#55efc4', '#f1c40f', '#9b59b6', '#1abc9c', '#e67e22', '#95a5a6'] racers = [] for idx in range(camel_count): y = spacing * (idx + 1) target_canvas.create_line(20, y, finish, y, dash=(3,4)) color = palette[idx % len(palette)] rect = target_canvas.create_rectangle(30, y-22, 130, y+22, fill=color) racers.append(rect) status = status_label or self.game_window_status if status: status.config(text='¡Carrera en curso!') self.game_move_queue = queue.Queue() self.game_done_event = threading.Event() self.game_finish_line = finish self.game_racer_names = [f'Camello {i+1}' for i in range(camel_count)] self.game_queue_processor_active = True self.after(30, self._process_game_queue) indices = list(range(camel_count)) random.shuffle(indices) for idx in indices: rect = racers[idx] threading.Thread(target=self._race_worker, args=(idx, rect), daemon=True).start() def _race_worker(self, index: int, rect_id: int) -> None: while True: if not self.game_queue_processor_active or not self.game_move_queue or not self.game_done_event: return if self.game_done_event.is_set(): return base_min = 3 base_max = 9 speed = max(1.0, float(self.game_speed_factor)) step = random.randint(int(base_min * speed), max(int(base_max * speed), int(base_min * speed) + 1)) self.game_move_queue.put(('move', (rect_id, step, index))) sleep_time = max(0.02, 0.08 / speed) time.sleep(random.uniform(sleep_time * 0.5, sleep_time * 1.2)) def _process_game_queue(self) -> None: if not self.game_queue_processor_active or not self.game_move_queue: return canvas = self.game_window_canvas if not canvas: return try: while True: action, payload = self.game_move_queue.get_nowait() if action == 'move': rect_id, step, idx = payload try: canvas.move(rect_id, step, 0) except Exception: continue coords = canvas.coords(rect_id) if coords and coords[2] >= self.game_finish_line and self.game_done_event and not self.game_done_event.is_set(): self.game_done_event.set() if self.game_window_status: self.game_window_status.config(text=f'{self.game_racer_names[idx]} ganó la carrera') else: continue except queue.Empty: pass if not self.game_done_event or not self.game_done_event.is_set(): if self.game_queue_processor_active: self.after(30, self._process_game_queue) else: self.game_queue_processor_active = False def _stop_game_queue_processor(self) -> None: self.game_queue_processor_active = False self.game_move_queue = None if self.game_done_event: try: self.game_done_event.set() except Exception: pass self.game_done_event = None def _start_alarm(self) -> None: try: minutes = max(1, int(self.alarm_minutes.get())) except ValueError: messagebox.showwarning('Alarmas', 'Ingresa un número válido de minutos.') return title = self.alarm_title.get().strip() or f'Alarma {self.alarm_counter}' end_time = datetime.datetime.now() + datetime.timedelta(minutes=minutes) alarm = {'id': self.alarm_counter, 'title': title, 'end': end_time} self.alarm_counter += 1 self.active_alarms.append(alarm) self.alarm_title.delete(0, 'end') self._update_alarm_list() self._log(f'Alarma "{title}" programada para las {end_time.strftime("%H:%M:%S")}') def _refresh_alarms_loop(self) -> None: if not self._running: return now = datetime.datetime.now() triggered: list[dict[str, datetime.datetime | str]] = [] for alarm in list(self.active_alarms): remaining = (alarm['end'] - now).total_seconds() # type: ignore[arg-type] if remaining <= 0: triggered.append(alarm) for alarm in triggered: self.active_alarms.remove(alarm) self._trigger_alarm(alarm) self._update_alarm_list() self.after(1000, self._refresh_alarms_loop) def _update_alarm_list(self) -> None: if not hasattr(self, 'alarm_list'): return selection = self.alarm_list.curselection() selected_id = None if selection: idx = selection[0] if idx < len(self.active_alarms): selected_id = self.active_alarms[idx]['id'] self.alarm_list.delete(0, 'end') if not self.active_alarms: self.alarm_status.config(text='Sin alarmas programadas') return now = datetime.datetime.now() for index, alarm in enumerate(self.active_alarms): remaining = max(0, int((alarm['end'] - now).total_seconds())) # type: ignore[arg-type] minutes, seconds = divmod(remaining, 60) self.alarm_list.insert('end', f"{alarm['title']} - {minutes:02d}:{seconds:02d}") if selected_id is not None and alarm['id'] == selected_id: self.alarm_list.selection_set(index) self.alarm_status.config(text=f"{len(self.active_alarms)} alarma(s) activas") def _trigger_alarm(self, alarm: dict[str, datetime.datetime | str]) -> None: self._log(f"Alarma '{alarm['title']}' activada") try: self.bell() except Exception: pass self._show_alarm_popup(str(alarm['title'])) def _show_alarm_popup(self, title: str) -> None: popup = tk.Toplevel(self) popup.title('Alarma activa') popup.configure(bg='#ffeaea', padx=20, pady=20) popup.transient(self) popup.grab_set() tk.Label(popup, text='¡Tiempo cumplido!', font=('Arial', 16, 'bold'), fg='#c0392b', bg='#ffeaea').pack(pady=6) tk.Label(popup, text=title, font=('Arial', 13), bg='#ffeaea').pack(pady=4) tk.Button(popup, text='Detener alarma', command=popup.destroy, bg='#f9dede').pack(pady=10) def _cancel_selected_alarm(self) -> None: if not self.active_alarms: messagebox.showinfo('Alarmas', 'No hay alarmas para cancelar.') return selection = self.alarm_list.curselection() if not selection: messagebox.showinfo('Alarmas', 'Selecciona una alarma en la lista.') return idx = selection[0] if idx >= len(self.active_alarms): messagebox.showwarning('Alarmas', 'La selección ya no es válida.') return alarm = self.active_alarms.pop(idx) self._update_alarm_list() self._log(f"Alarma '{alarm['title']}' cancelada") def _select_music(self) -> None: if not pygame_available: messagebox.showwarning('Música', 'Instale pygame para reproducir audio.') return path = filedialog.askopenfilename( filetypes=[ ('Audio/Video', '*.mp3 *.wav *.ogg *.mp4 *.mkv *.mov *.avi *.m4a *.webm'), ('Todos', '*.*') ] ) if not path: return self._cleanup_temp_audio() ext = os.path.splitext(path)[1].lower() direct_audio = {'.mp3', '.wav', '.ogg', '.m4a'} playable_path = path temp_path = None if ext not in direct_audio: tmp = tempfile.NamedTemporaryFile(delete=False, suffix='.wav') temp_path = tmp.name tmp.close() cmd = ['ffmpeg', '-y', '-i', path, '-vn', '-acodec', 'pcm_s16le', '-ar', '44100', '-ac', '2', temp_path] try: # Usar subprocess.run para esperar la finalización de ffmpeg subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) playable_path = temp_path except FileNotFoundError: if os.path.exists(temp_path): os.unlink(temp_path) messagebox.showerror('Música', 'ffmpeg no está instalado. Instálalo para extraer el audio de videos.') return except subprocess.CalledProcessError as exc: if os.path.exists(temp_path): os.unlink(temp_path) messagebox.showerror('Música', f'No se pudo extraer el audio: {exc}') return pygame.mixer.init() pygame.mixer.music.load(playable_path) pygame.mixer.music.play(-1) self.music_temp_file = temp_path self._log(f'Reproduciendo {os.path.basename(path)}') def _stop_music(self) -> None: if pygame_available and pygame.mixer.get_init(): pygame.mixer.music.stop() self._cleanup_temp_audio() def _pause_music(self) -> None: if pygame_available and pygame.mixer.get_init(): pygame.mixer.music.pause() def _resume_music(self) -> None: if pygame_available and pygame.mixer.get_init(): pygame.mixer.music.unpause() def _cleanup_temp_audio(self) -> None: if self.music_temp_file and os.path.exists(self.music_temp_file): try: os.remove(self.music_temp_file) except OSError: pass self.music_temp_file = None def _resolve_openweather_key(self) -> str: key = (self.weather_api_key or '').strip() if not key: raise RuntimeError('Configure OPENWEATHER_API_KEY u OPENWEATHER_FALLBACK_API_KEY') return key def _fetch_weather_by_coordinates(self, lat: float, lon: float) -> dict[str, Any]: if not REQUESTS_AVAILABLE: raise RuntimeError('Instale requests para consultar el clima') key = self._resolve_openweather_key() params = { 'lat': lat, 'lon': lon, 'appid': key, 'units': 'metric', 'lang': 'es' } resp = requests.get('https://api.openweathermap.org/data/2.5/weather', params=params, timeout=8) resp.raise_for_status() data = resp.json() if 'main' not in data or 'weather' not in data: raise ValueError('Respuesta incompleta de OpenWeatherMap') return data def _fetch_weather_data(self, city_query: str) -> dict[str, Any]: if not REQUESTS_AVAILABLE: raise RuntimeError('Instale requests para consultar el clima') key = self._resolve_openweather_key() params = { 'q': city_query, 'appid': key, 'units': 'metric', 'lang': 'es' } resp = requests.get('https://api.openweathermap.org/data/2.5/weather', params=params, timeout=8) resp.raise_for_status() data = resp.json() if 'main' not in data or 'weather' not in data: raise ValueError('Respuesta incompleta de OpenWeatherMap') return data def _fetch_javea_weather(self) -> dict[str, Any]: return self._fetch_weather_by_coordinates(JAVEA_LATITUDE, JAVEA_LONGITUDE) # ------------------------ chat ------------------------ def _connect_chat(self) -> None: host = self.host_entry.get().strip() or SERVER_HOST_DEFAULT try: port = int(self.port_entry.get().strip()) except ValueError: port = SERVER_PORT_DEFAULT if self.chat_client.connect(host, port): self._enqueue_chat_message('[INFO] Conectado al servidor') def _send_chat(self) -> None: text = self.message_entry.get('1.0', 'end').strip() if not text: return if self.chat_client.send(text + '\n'): self._enqueue_chat_message('Yo: ' + text) self.message_entry.delete('1.0', 'end') else: messagebox.showwarning('Chat', 'No hay conexión activa.') def _enqueue_chat_message(self, text: str) -> None: self._chat_queue.put(text) def _chat_loop(self) -> None: while self._running: try: msg = self._chat_queue.get(timeout=0.5) except queue.Empty: continue self.chat_display.after(0, lambda m=msg: self._append_chat(m)) def _append_chat(self, text: str) -> None: self.chat_display.config(state='normal') self.chat_display.insert('end', text + '\n') self.chat_display.see('end') self.chat_display.config(state='disabled') # ------------------------ hilos auxiliares ------------------------ def _update_clock(self) -> None: if not self._running: return now = datetime.datetime.now().strftime('%d/%m/%Y %H:%M:%S') self.clock_label.config(text=now) self.after(1000, self._update_clock) def _update_traffic(self) -> None: if not (self._running and psutil): return if self._traffic_last is None: try: self._traffic_last = psutil.net_io_counters() except Exception as exc: self._log(f'Tráfico: psutil no disponible ({exc})') self.traffic_label.config(text='Red: N/D') return try: current = psutil.net_io_counters() sent = (current.bytes_sent - self._traffic_last.bytes_sent) / 1024 recv = (current.bytes_recv - self._traffic_last.bytes_recv) / 1024 self._traffic_last = current self.traffic_label.config(text=f'Red: ↑ {sent:.1f} KB/s ↓ {recv:.1f} KB/s') except Exception as exc: self._log(f'Tráfico: {exc}') self.traffic_label.config(text='Red: N/D') finally: if self._running: self.after(1000, self._update_traffic) def _resource_poll_tick(self) -> None: if not (self._running and psutil): return try: cpu = psutil.cpu_percent(interval=None) mem = psutil.virtual_memory().percent except Exception as exc: self._log(f'Recursos: {exc}') if self._running: self._resource_poll_job = self.after(2000, self._resource_poll_tick) return threads = 0 if self._ps_process: try: threads = self._ps_process.num_threads() except Exception: threads = 0 self._resource_history['cpu'].append(cpu) self._resource_history['mem'].append(mem) self._resource_history['threads'].append(threads) self._resource_history['cpu'] = self._resource_history['cpu'][-40:] self._resource_history['mem'] = self._resource_history['mem'][-40:] self._resource_history['threads'] = self._resource_history['threads'][-40:] self._update_chart() if self._running: self._resource_poll_job = self.after(1000, self._resource_poll_tick) def _update_chart(self) -> None: if not ( MATPLOTLIB_AVAILABLE and self.chart_canvas and self.line_cpu and self.line_mem and self.ax_cpu and self.ax_mem ): return x = list(range(len(self._resource_history['cpu']))) self.line_cpu.set_data(x, self._resource_history['cpu']) self.line_mem.set_data(x, self._resource_history['mem']) max_x = max(40, len(x)) self.ax_cpu.set_xlim(0, max_x) self.ax_mem.set_xlim(0, max_x) self.ax_cpu.set_ylim(0, 100) self.ax_mem.set_ylim(0, 100) if self.mem_fill: try: self.mem_fill.remove() except Exception: pass if self.ax_mem and x: self.mem_fill = self.ax_mem.fill_between( x, self._resource_history['mem'], color='#4ecdc4', alpha=0.3 ) if self.ax_threads: threads = self._resource_history['threads'] if threads: if self.thread_bars and len(self.thread_bars) == len(threads): for rect, height in zip(self.thread_bars, threads): rect.set_height(height) else: if self.thread_bars: for rect in self.thread_bars: rect.remove() self.thread_bars = self.ax_threads.bar(x, threads, color='#ffa726') if x else None if threads: max_threads = max(threads) self.ax_threads.set_ylim(0, max(max_threads * 1.2, 1)) self.ax_threads.set_xlim(-0.5, max(39.5, len(x) - 0.5)) if self.chart_canvas: self.chart_canvas.draw_idle() def _log(self, text: str) -> None: if threading.current_thread() is not threading.main_thread(): self.after(0, lambda t=text: self._log(t)) return timestamp = datetime.datetime.now().strftime('%H:%M:%S') self.notes.insert('end', f'[{timestamp}] {text}\n') self.notes.see('end') def on_close(self) -> None: self._running = False if self._resource_poll_job is not None: try: self.after_cancel(self._resource_poll_job) except Exception: pass self._resource_poll_job = None self.chat_client.close() self.destroy() def main() -> None: app = DashboardApp() app.protocol('WM_DELETE_WINDOW', app.on_close) app.mainloop() if __name__ == '__main__': main()