Proyecto1AVApsp/app.py

2188 lines
91 KiB
Python

#!/usr/bin/env python3
"""Dashboard principal del Proyecto Global."""
from __future__ import annotations
import datetime
import json
import os
import shutil
import queue
import random
import socket
import subprocess
import sys
import tempfile
import threading
import time
import webbrowser
from dataclasses import dataclass
from typing import Any, Callable
import tkinter as tk
from tkinter import filedialog, messagebox, scrolledtext, ttk
from tkinter import font as tkfont
# -------------------- dependencias opcionales --------------------
try:
import psutil # type: ignore
except Exception: # pragma: no cover
psutil = None # type: ignore
try:
import matplotlib
matplotlib.use('TkAgg')
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.figure import Figure
MATPLOTLIB_AVAILABLE = True
except Exception: # pragma: no cover
MATPLOTLIB_AVAILABLE = False
Figure = None
FigureCanvasTkAgg = None
try:
import pygame # type: ignore
pygame_available = True
except Exception: # pragma: no cover
pygame_available = False
try:
import requests # type: ignore
REQUESTS_AVAILABLE = True
except Exception: # pragma: no cover
requests = None # type: ignore
REQUESTS_AVAILABLE = False
try:
from bs4 import BeautifulSoup
BEAUTIFULSOUP_AVAILABLE = True
except Exception: # pragma: no cover
BEAUTIFULSOUP_AVAILABLE = False
SERVER_HOST_DEFAULT = '127.0.0.1'
SERVER_PORT_DEFAULT = 3333
# Clave predeterminada facilitada por el usuario para OpenWeather.
# Puede sobrescribirse exportando OPENWEATHER_API_KEY en el entorno.
OPENWEATHER_FALLBACK_API_KEY = os.environ.get(
'OPENWEATHER_FALLBACK_API_KEY',
'431239407e3628578c83e67180cf720f'
).strip()
_OPENWEATHER_ENV_KEY = os.environ.get('OPENWEATHER_API_KEY', '').strip()
OPENWEATHER_API_KEY = _OPENWEATHER_ENV_KEY or OPENWEATHER_FALLBACK_API_KEY
JAVEA_LATITUDE = 38.789166
JAVEA_LONGITUDE = 0.163055
# -------------------- paleta visual --------------------
PRIMARY_BG = '#f4f5fb'
PANEL_BG = '#ffffff'
SECONDARY_BG = '#eef2ff'
ACCENT_COLOR = '#ff6b6b'
ACCENT_DARK = '#c44569'
TEXT_COLOR = '#1f2d3d'
SUBTEXT_COLOR = '#67708a'
BUTTON_BG = '#e7ecff'
BUTTON_ACTIVE_BG = '#ffd6d1'
def _env_float(name: str, default: float) -> float:
value = os.environ.get(name)
if value is None:
return default
try:
return float(value)
except ValueError:
return default
WALLAPOP_DEVICE_ID = os.environ.get('WALLAPOP_DEVICE_ID', '48ca24ec-2e8c-4dbb-9f0b-6b4f99194626').strip()
WALLAPOP_APP_VERSION = os.environ.get('WALLAPOP_APP_VERSION', '814060').strip()
WALLAPOP_MPID = os.environ.get('WALLAPOP_MPID', '6088013701866267655').strip()
WALLAPOP_DEVICE_OS = os.environ.get('WALLAPOP_DEVICE_OS', '0').strip()
WALLAPOP_USER_AGENT = os.environ.get(
'WALLAPOP_USER_AGENT',
'Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) '
'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Mobile Safari/537.36'
).strip()
WALLAPOP_SEC_CH_UA = os.environ.get('WALLAPOP_SEC_CH_UA', '"Not_A Brand";v="99", "Chromium";v="142"')
WALLAPOP_SEC_CH_UA_MOBILE = os.environ.get('WALLAPOP_SEC_CH_UA_MOBILE', '?1')
WALLAPOP_SEC_CH_UA_PLATFORM = os.environ.get('WALLAPOP_SEC_CH_UA_PLATFORM', '"Android"')
WALLAPOP_LATITUDE = _env_float('WALLAPOP_LATITUDE', 40.416775)
WALLAPOP_LONGITUDE = _env_float('WALLAPOP_LONGITUDE', -3.70379)
WALLAPOP_COUNTRY_CODE = os.environ.get('WALLAPOP_COUNTRY_CODE', 'ES').strip() or 'ES'
WALLAPOP_LANGUAGE = os.environ.get('WALLAPOP_LANGUAGE', 'es').strip() or 'es'
WALLAPOP_CATEGORY_ID = os.environ.get('WALLAPOP_CATEGORY_ID', '').strip()
WALLAPOP_REFERER = os.environ.get('WALLAPOP_REFERER', 'https://es.wallapop.com/').strip()
WALLAPOP_HEADERS = {
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'es,es-ES;q=0.9',
'Referer': WALLAPOP_REFERER,
'User-Agent': WALLAPOP_USER_AGENT,
'x-deviceid': WALLAPOP_DEVICE_ID,
'x-deviceos': WALLAPOP_DEVICE_OS,
'deviceos': WALLAPOP_DEVICE_OS,
'x-appversion': WALLAPOP_APP_VERSION,
'mpid': WALLAPOP_MPID,
'sec-ch-ua': WALLAPOP_SEC_CH_UA,
'sec-ch-ua-mobile': WALLAPOP_SEC_CH_UA_MOBILE,
'sec-ch-ua-platform': WALLAPOP_SEC_CH_UA_PLATFORM
}
class ChatClient:
"""Cliente TCP básico."""
def __init__(self, on_message):
self._on_message = on_message
self._sock: socket.socket | None = None
self._lock = threading.Lock()
self._connected = False
def connect(self, host: str, port: int) -> bool:
with self._lock:
if self._connected:
return True
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
self._sock = sock
self._connected = True
threading.Thread(target=self._recv_loop, daemon=True).start()
return True
except Exception as exc:
self._on_message(f'[ERROR] Conexión fallida: {exc}')
return False
def _recv_loop(self):
try:
while self._connected and self._sock:
data = self._sock.recv(4096)
if not data:
break
text = data.decode('utf-8', errors='replace').strip()
self._on_message(text)
except Exception as exc:
self._on_message(f'[ERROR] {exc}')
finally:
with self._lock:
self._connected = False
if self._sock:
try:
self._sock.close()
except Exception:
pass
self._sock = None
self._on_message('[INFO] Conexión cerrada')
def send(self, text: str) -> bool:
with self._lock:
if not self._connected or not self._sock:
return False
try:
self._sock.sendall(text.encode('utf-8'))
return True
except Exception:
self._connected = False
return False
def close(self):
with self._lock:
self._connected = False
if self._sock:
try:
self._sock.close()
except Exception:
pass
self._sock = None
class DashboardApp(tk.Tk):
def __init__(self) -> None:
super().__init__()
self.title('Panel de laboratorio - Proyecto Global')
self.geometry('1220x740')
self.minsize(1024, 660)
self.configure(bg=PRIMARY_BG)
self._setup_fonts()
self._maximize_with_borders()
self.columnconfigure(0, weight=0)
self.columnconfigure(1, weight=1)
self.columnconfigure(2, weight=0)
self.rowconfigure(0, weight=0)
self.rowconfigure(1, weight=1)
self.rowconfigure(2, weight=0)
self._running = True
self._chat_queue: 'queue.Queue[str]' = queue.Queue()
self._scraping_queue: 'queue.Queue[tuple[str, ...]]' = queue.Queue()
self._traffic_last = psutil.net_io_counters() if psutil else None
self._resource_history = {'cpu': [], 'mem': [], 'threads': []}
self._resource_poll_job: str | None = None
self.chat_client = ChatClient(self._enqueue_chat_message)
self.alarm_counter = 1
self.active_alarms: list[dict[str, datetime.datetime | str]] = []
self.game_window_canvas = None
self.game_window_status = None
self.game_move_queue: queue.Queue | None = None
self.game_queue_processor_active = False
self.game_done_event: threading.Event | None = None
self.game_finish_line = 0
self.game_racer_names: list[str] = []
self.game_camel_count = 3
self.game_speed_factor = 1.0
self.music_temp_file: str | None = None
self.weather_api_key = OPENWEATHER_API_KEY
self.results_area: tk.Frame | None = None
self.results_title: tk.Label | None = None
self.scraping_popup: tk.Toplevel | None = None
self.simple_scraping_popup: tk.Toplevel | None = None
self.weather_popup: tk.Toplevel | None = None
self.chart_canvas = None
self.ax_cpu = None
self.ax_mem = None
self.ax_threads = None
self.line_cpu = None
self.line_mem = None
self.mem_fill = None
self.thread_bars = None
if psutil:
try:
self._ps_process = psutil.Process(os.getpid())
except Exception:
self._ps_process = None
else:
self._ps_process = None
self._build_header()
self._build_left_panel()
self._build_center_panel()
self._build_right_panel()
self._build_status_bar()
self._update_clock()
if psutil:
self.after(1000, self._update_traffic)
try:
psutil.cpu_percent(interval=None)
except Exception:
pass
self._resource_poll_job = self.after(1000, self._resource_poll_tick)
threading.Thread(target=self._chat_loop, daemon=True).start()
self.after(100, self._process_scraping_queue)
self.after(1000, self._refresh_alarms_loop)
# ------------------------ UI ------------------------
def _maximize_with_borders(self) -> None:
def _apply_zoom():
# Intentar diferentes atributos para maximizar/pantalla completa en diferentes OS
for attr in ('-zoomed',):
try:
self.attributes(attr, True)
return
except tk.TclError:
continue
try:
self.state('zoomed')
except tk.TclError:
pass
self.after(0, _apply_zoom)
def _setup_fonts(self) -> None:
base_family = 'Segoe UI'
self.font_title = tkfont.Font(family=base_family, size=16, weight='bold')
self.font_header = tkfont.Font(family=base_family, size=13, weight='bold')
self.font_body = tkfont.Font(family=base_family, size=11)
self.font_small = tkfont.Font(family=base_family, size=10)
def _build_header(self) -> None:
header = tk.Frame(self, bg=PANEL_BG, bd=0, highlightthickness=0)
header.grid(row=0, column=0, columnspan=3, sticky='new')
header.grid_columnconfigure(tuple(range(7)), weight=1)
tabs = [
('T1. Procesos', '#4c6ef5'),
('T2. Threads', '#ff6b6b'),
('T3. Sockets', '#ffa94d'),
('T4. Servicios', '#51cf66'),
('T5. Seguridad', '#845ef7'),
('Configuración', '#2f3545')
]
for idx, (text, color) in enumerate(tabs):
badge = tk.Label(
header,
text=text,
fg=color,
bg=PANEL_BG,
font=self.font_title,
padx=16,
pady=8
)
badge.grid(row=0, column=idx, padx=6, pady=8)
def _build_left_panel(self) -> None:
panel = tk.Frame(self, width=220, bg=SECONDARY_BG, bd=0, highlightthickness=0)
panel.grid(row=1, column=0, sticky='nsw', padx=6, pady=(50,6))
panel.grid_propagate(False)
self.left_panel = panel
self._left_section('Acciones')
self._left_button('Analizar Wallapop', self._open_wallapop_popup)
self._left_button('Scraping simple', self._open_simple_scraping_popup)
self._left_button('Navegar', lambda: self._open_web('https://www.google.com'))
self._left_button('API Tiempo', self._show_weather_popup)
self._left_section('Aplicaciones')
self._left_button('Visual Code', lambda: self._launch_process(['code']))
self._left_button('Camellos', self._open_game_window)
self._left_button('App3', lambda: self._launch_process(['firefox']))
self._left_section('Procesos batch')
self._left_button('Copias de seguridad', self._run_backup_script)
def _left_section(self, text: str) -> None:
tk.Label(self.left_panel, text=text.upper(), bg=SECONDARY_BG, fg=SUBTEXT_COLOR, font=self.font_small).pack(anchor='w', padx=16, pady=(16,4))
def _left_button(self, text: str, command) -> None:
btn = tk.Button(
self.left_panel,
text=text,
width=20,
bg=BUTTON_BG,
activebackground=BUTTON_ACTIVE_BG,
relief='flat',
command=command
)
btn.pack(fill='x', padx=16, pady=4)
def _build_center_panel(self) -> None:
center = tk.Frame(self, bg=PANEL_BG, bd=0)
center.grid(row=1, column=1, sticky='nsew', padx=6, pady=(50,6))
center.rowconfigure(1, weight=1)
center.columnconfigure(0, weight=1)
self.center_panel = center
self._build_notebook()
self._build_notes_panel()
def _build_notebook(self) -> None:
style = ttk.Style()
style.configure('Custom.TNotebook', background=PANEL_BG, borderwidth=0)
style.configure('Custom.TNotebook.Tab', padding=(16, 8), font=self.font_body)
style.map('Custom.TNotebook.Tab', background=[('selected', '#dde2ff')])
notebook = ttk.Notebook(self.center_panel, style='Custom.TNotebook')
notebook.grid(row=0, column=0, sticky='nsew', padx=4, pady=4)
self.notebook = notebook
self.tab_resultados = tk.Frame(notebook, bg='white')
self.tab_navegador = tk.Frame(notebook, bg='white')
self.tab_correos = tk.Frame(notebook, bg='white')
self.tab_tareas = tk.Frame(notebook, bg='white')
self.tab_alarmas = tk.Frame(notebook, bg='white')
self.tab_enlaces = tk.Frame(notebook, bg='white')
self.tab_bloc = tk.Frame(notebook, bg='white')
notebook.add(self.tab_resultados, text='Resultados')
notebook.add(self.tab_navegador, text='Navegador')
notebook.add(self.tab_correos, text='Correos')
notebook.add(self.tab_bloc, text='Bloc de notas')
notebook.add(self.tab_tareas, text='Tareas')
notebook.add(self.tab_alarmas, text='Alarmas')
notebook.add(self.tab_enlaces, text='Enlaces')
self._build_tab_resultados()
self._build_tab_navegador()
self._build_tab_correos()
self._build_tab_bloc_notas()
self._build_tab_tareas()
self._build_tab_alarmas()
self._build_tab_enlaces()
def _build_tab_resultados(self) -> None:
wrapper = tk.Frame(self.tab_resultados, bg=PANEL_BG)
wrapper.pack(fill='both', expand=True)
self.results_title = tk.Label(wrapper, text='Resultados de actividades', font=self.font_header, bg=PANEL_BG, fg=TEXT_COLOR)
self.results_title.pack(pady=(12, 4))
toolbar = tk.Frame(wrapper, bg=PANEL_BG)
toolbar.pack(fill='x', padx=12, pady=(0, 4))
tk.Button(toolbar, text='Ver monitor del sistema', command=self._show_resource_monitor, bg=BUTTON_BG, relief='flat').pack(side='left', padx=(0, 8))
tk.Button(toolbar, text='Limpiar resultados', command=self._reset_results_view, bg=BUTTON_BG, relief='flat').pack(side='left')
self.results_area = tk.Frame(wrapper, bg='#f4f7ff', bd=1, relief='solid')
self.results_area.pack(fill='both', expand=True, padx=12, pady=(0, 12))
self._reset_results_view()
def _prepare_results_area(self) -> None:
self._stop_game_queue_processor()
if not self.results_area:
return
for child in self.results_area.winfo_children():
child.destroy()
self.chart_canvas = None
self.ax_cpu = None
self.ax_mem = None
self.ax_threads = None
self.line_cpu = None
self.line_mem = None
self.mem_fill = None
self.thread_bars = None
self.game_window_canvas = None
self.game_window_status = None
def _reset_results_view(self) -> None:
self._prepare_results_area()
if self.results_title:
self.results_title.config(text='Resultados de actividades')
if self.results_area:
tk.Label(
self.results_area,
text='Ejecute una actividad para ver aquí sus resultados más recientes.',
bg='#f4f7ff',
fg=SUBTEXT_COLOR,
font=self.font_body,
wraplength=520
).pack(expand=True, padx=12, pady=12)
def _render_results_view(self, title: str, builder: Callable[[tk.Frame], None]) -> None:
if self.results_title:
self.results_title.config(text=f'Resultados • {title}')
self._prepare_results_area()
if not self.results_area:
return
builder(self.results_area)
def _show_resource_monitor(self) -> None:
def builder(parent: tk.Frame) -> None:
if not (MATPLOTLIB_AVAILABLE and psutil):
tk.Label(
parent,
text='Instale matplotlib + psutil para habilitar el monitor del sistema.',
fg=ACCENT_COLOR,
bg='#f4f7ff',
font=self.font_body
).pack(fill='both', expand=True, padx=20, pady=20)
return
chart_frame = tk.Frame(parent, bg=PANEL_BG)
chart_frame.pack(fill='both', expand=True, padx=12, pady=12)
fig = Figure(figsize=(7.2, 5.6), dpi=100)
self.ax_cpu = fig.add_subplot(311)
self.ax_mem = fig.add_subplot(312)
self.ax_threads = fig.add_subplot(313)
self.ax_cpu.set_title('CPU (línea)')
self.ax_cpu.set_ylim(0, 100)
self.ax_cpu.set_ylabel('%')
self.ax_cpu.grid(True, alpha=0.25)
self.line_cpu, = self.ax_cpu.plot([], [], label='CPU', color='#ff6b6b', linewidth=2)
self.ax_mem.set_title('Memoria (área)')
self.ax_mem.set_ylim(0, 100)
self.ax_mem.set_ylabel('%')
self.ax_mem.grid(True, alpha=0.2)
self.line_mem, = self.ax_mem.plot([], [], color='#4ecdc4', linewidth=1.5)
self.ax_threads.set_title('Hilos del proceso (barras)')
self.ax_threads.set_ylabel('Hilos')
self.ax_threads.grid(True, axis='y', alpha=0.2)
fig.tight_layout(pad=2.2)
self.chart_canvas = FigureCanvasTkAgg(fig, master=chart_frame)
self.chart_canvas.get_tk_widget().pack(fill='both', expand=True)
self.ax_cpu.set_xlim(0, 40)
self.ax_mem.set_xlim(0, 40)
self.ax_threads.set_xlim(-0.5, 39.5)
self.chart_canvas.draw_idle()
self._render_results_view('Monitoreo del sistema', builder)
def _show_text_result(self, title: str, text: str) -> None:
def builder(parent: tk.Frame) -> None:
frame = tk.Frame(parent, bg='white')
frame.pack(fill='both', expand=True, padx=12, pady=12)
box = scrolledtext.ScrolledText(frame, height=12)
box.pack(fill='both', expand=True)
box.insert('1.0', text)
box.config(state='disabled')
self._render_results_view(title, builder)
def _format_weather_summary(self, data: dict[str, Any]) -> str:
main = data.get('main', {})
weather = data.get('weather', [{}])[0]
wind = data.get('wind', {})
temp = main.get('temp')
feels = main.get('feels_like')
humidity = main.get('humidity')
desc = weather.get('description', '').capitalize()
wind_speed = wind.get('speed')
timestamp = data.get('dt')
updated = datetime.datetime.fromtimestamp(timestamp).strftime('%d/%m %H:%M') if timestamp else 'N/D'
summary = [
f'Descripción : {desc or "N/D"}',
f'Temperatura : {temp:.1f}°C' if temp is not None else 'Temperatura : N/D',
f'Sensación : {feels:.1f}°C' if feels is not None else 'Sensación : N/D',
f'Humedad : {humidity}%' if humidity is not None else 'Humedad : N/D',
f'Veloc. viento: {wind_speed:.1f} m/s' if wind_speed is not None else 'Veloc. viento: N/D',
f'Actualizado : {updated}'
]
return '\n'.join(summary)
def _get_javea_weather_snapshot(self) -> tuple[str, str, bool]:
try:
data = self._fetch_javea_weather()
summary = self._format_weather_summary(data)
timestamp = data.get('dt')
status = 'Última lectura: '
status += datetime.datetime.fromtimestamp(timestamp).strftime('%d/%m %H:%M') if timestamp else 'N/D'
return summary, status, False
except Exception as exc:
return str(exc), 'No se pudo obtener el clima', True
def _show_weather_popup(self) -> None:
self._log('Abriendo ventana del tiempo para Jávea')
text, status, is_error = self._get_javea_weather_snapshot()
if self.weather_popup and self.weather_popup.winfo_exists():
self.weather_popup.destroy()
popup = tk.Toplevel(self)
popup.title('Tiempo en Jávea')
popup.geometry('760x500')
popup.minsize(760, 500)
popup.resizable(False, False)
popup.configure(bg='white')
popup.transient(self)
popup.grab_set()
self.weather_popup = popup
header = tk.Label(popup, text='OpenWeather • Jávea, España', font=self.font_header, bg='white', fg=TEXT_COLOR)
header.pack(pady=(12, 6))
info_label = tk.Label(
popup,
text=text,
justify='left',
font=('Consolas', 12),
anchor='nw',
bg='white',
fg='red' if is_error else TEXT_COLOR,
padx=6,
pady=6
)
info_label.pack(fill='both', expand=True, padx=18, pady=8)
status_label = tk.Label(popup, text=status, font=self.font_small, bg='white', fg=SUBTEXT_COLOR)
status_label.pack(pady=(0, 8))
def close_popup() -> None:
if popup.winfo_exists():
popup.destroy()
if self.weather_popup is popup:
self.weather_popup = None
def refresh() -> None:
new_text, new_status, new_is_error = self._get_javea_weather_snapshot()
info_label.config(text=new_text, fg='red' if new_is_error else '#1f2d3d')
status_label.config(text=new_status)
buttons = tk.Frame(popup, bg='white')
buttons.pack(pady=(4, 14))
tk.Button(buttons, text='Actualizar', command=refresh, bg=BUTTON_BG, relief='flat').pack(side='left', padx=10)
tk.Button(buttons, text='Cerrar', command=close_popup, bg=BUTTON_BG, relief='flat').pack(side='left', padx=10)
popup.protocol('WM_DELETE_WINDOW', close_popup)
def _build_tab_navegador(self) -> None:
tk.Label(self.tab_navegador, text='Abrir URL en navegador externo', bg='white').pack(pady=6)
frame = tk.Frame(self.tab_navegador, bg='white')
frame.pack(pady=6)
self.url_entry = tk.Entry(frame, width=48)
self.url_entry.insert(0, 'https://www.python.org')
self.url_entry.pack(side='left', padx=4)
tk.Button(frame, text='Abrir', command=lambda: self._open_web(self.url_entry.get())).pack(side='left', padx=4)
def _build_tab_correos(self) -> None:
tk.Label(
self.tab_correos,
text='Área futura para gestionar correos.'
'\nDe momento, utiliza la pestaña "Bloc de notas" para escribir.',
bg='white',
font=('Arial', 12),
justify='left'
).pack(pady=20)
def _build_tab_bloc_notas(self) -> None:
tk.Label(self.tab_bloc, text='Bloc de notas', bg='white', font=('Arial', 12, 'bold')).pack(pady=6)
toolbar = tk.Frame(self.tab_bloc, bg='white')
toolbar.pack(fill='x')
tk.Button(toolbar, text='Abrir', command=self._open_text).pack(side='left', padx=4, pady=4)
tk.Button(toolbar, text='Guardar', command=self._save_text).pack(side='left', padx=4, pady=4)
self.editor = scrolledtext.ScrolledText(self.tab_bloc, wrap='word')
self.editor.pack(fill='both', expand=True, padx=6, pady=6)
def _build_tab_tareas(self) -> None:
container = tk.Frame(self.tab_tareas, bg='white')
container.pack(fill='both', expand=True)
scraping_frame = tk.Frame(container, bg='white', padx=10, pady=10)
scraping_frame.pack(side='left', fill='both', expand=True)
tk.Label(scraping_frame, text='Análisis de enlaces Wallapop', bg='white', font=('Arial', 12, 'bold')).pack(pady=(12,4))
if not (REQUESTS_AVAILABLE and BEAUTIFULSOUP_AVAILABLE):
tk.Label(
scraping_frame,
text='Instala requests y beautifulsoup4 para habilitar el análisis de enlaces.',
bg='white',
fg='red',
wraplength=260,
justify='left'
).pack(fill='x', padx=10, pady=10)
else:
tk.Label(
scraping_frame,
text='Pulsa el botón "Analizar Wallapop" en el panel izquierdo para introducir la URL de un anuncio. '
'Aquí verás notas generales o recordatorios.',
bg='white',
wraplength=260,
justify='left'
).pack(fill='x', padx=10, pady=10)
game_frame = tk.Frame(container, bg='white')
game_frame.pack(side='left', fill='both', expand=True, padx=10)
tk.Label(game_frame, text='Juego de camellos (ventana dedicada)', bg='white', font=('Arial', 12, 'bold')).pack(pady=(6,2))
tk.Label(
game_frame,
text='Abre el simulador en una ventana independiente para ver la carrera a pantalla completa.',
wraplength=260,
justify='left',
bg='white'
).pack(padx=8, pady=8)
tk.Button(game_frame, text='Abrir juego', command=self._open_game_window).pack(pady=6)
def _build_tab_alarmas(self) -> None:
self.tab_alarmas.configure(bg=PANEL_BG)
card = tk.Frame(self.tab_alarmas, bg='#fef9f4', bd=0, padx=18, pady=18)
card.pack(pady=16, padx=20, fill='both', expand=True)
tk.Label(card, text='Programar nuevas alarmas', font=self.font_header, bg='#fef9f4', fg=TEXT_COLOR).pack(pady=(0,8))
form = tk.Frame(card, bg='#fef9f4')
form.pack(pady=6)
tk.Label(form, text='Minutos', bg='#fef9f4', font=self.font_body).grid(row=0, column=0, padx=6)
self.alarm_minutes = tk.Spinbox(form, from_=1, to=240, width=6, justify='center')
self.alarm_minutes.grid(row=0, column=1, padx=6)
tk.Label(form, text='Título', bg='#fef9f4', font=self.font_body).grid(row=0, column=2, padx=6)
self.alarm_title = tk.Entry(form, width=24)
self.alarm_title.grid(row=0, column=3, padx=6)
tk.Button(form, text='Agregar alarma', bg=BUTTON_BG, relief='flat', command=self._start_alarm).grid(row=0, column=4, padx=10)
tk.Label(card, text='Alarmas activas', font=self.font_body, bg='#fef9f4', fg=SUBTEXT_COLOR).pack(pady=(18,6))
self.alarm_list = tk.Listbox(card, height=8, width=50, bd=0, highlightthickness=1, highlightbackground='#e3e5f0')
self.alarm_list.pack(pady=4, fill='x')
tk.Button(card, text='Cancelar seleccionada', command=self._cancel_selected_alarm, bg=BUTTON_BG, relief='flat').pack(pady=(10,4))
self.alarm_status = tk.Label(card, text='Sin alarmas programadas', bg='#fef9f4', fg=SUBTEXT_COLOR, font=self.font_small)
self.alarm_status.pack(pady=4)
def _build_tab_enlaces(self) -> None:
tk.Label(self.tab_enlaces, text='Enlaces útiles', bg='white', font=('Arial', 12, 'bold')).pack(pady=6)
for text, url in [
('Documentación Tkinter', 'https://docs.python.org/3/library/tk.html'),
('psutil', 'https://psutil.readthedocs.io'),
('Matplotlib', 'https://matplotlib.org')
]:
tk.Button(self.tab_enlaces, text=text, command=lambda u=url: self._open_web(u)).pack(pady=3)
def _build_notes_panel(self) -> None:
notes = tk.LabelFrame(self.center_panel, text='Panel de notas e hilos', bg='#eefee8')
notes.grid(row=1, column=0, sticky='nsew', padx=6, pady=(4,6))
self.notes = scrolledtext.ScrolledText(notes, height=5)
self.notes.pack(fill='both', expand=True, padx=6, pady=6)
def _build_right_panel(self) -> None:
right = tk.Frame(self, width=280, bg=PANEL_BG, bd=0)
right.grid(row=1, column=2, sticky='nse', padx=6, pady=(50,6))
right.grid_propagate(False)
tk.Label(right, text='Chat', font=('Arial', 20, 'bold'), fg='red', bg='white').pack(pady=(6,4))
self.chat_display = scrolledtext.ScrolledText(right, width=30, height=12, state='disabled')
self.chat_display.pack(padx=6, pady=4)
tk.Label(right, text='Mensaje', bg='white').pack(anchor='w', padx=6)
self.message_entry = tk.Text(right, height=4)
self.message_entry.pack(padx=6, pady=4)
tk.Button(right, text='enviar', bg='#d6f2ce', command=self._send_chat).pack(pady=4)
conn = tk.Frame(right, bg='white')
conn.pack(pady=4)
tk.Label(conn, text='Host:', bg='white').grid(row=0, column=0)
self.host_entry = tk.Entry(conn, width=12)
self.host_entry.insert(0, SERVER_HOST_DEFAULT)
self.host_entry.grid(row=0, column=1)
tk.Label(conn, text='Puerto:', bg='white').grid(row=1, column=0)
self.port_entry = tk.Entry(conn, width=6)
self.port_entry.insert(0, str(SERVER_PORT_DEFAULT))
self.port_entry.grid(row=1, column=1)
tk.Button(conn, text='Conectar', command=self._connect_chat).grid(row=0, column=2, rowspan=2, padx=4)
tk.Label(right, text='Alumnos', font=('Arial', 14, 'bold'), bg='white').pack(pady=(10,4))
for name in ('Alumno 1', 'Alumno 2', 'Alumno 3'):
frame = tk.Frame(right, bg='white', bd=1, relief='groove')
frame.pack(fill='x', padx=6, pady=4)
tk.Label(frame, text=name, bg='white', font=('Arial', 11, 'bold')).pack(anchor='w')
tk.Label(frame, text='Lorem ipsum dolor sit amet, consectetur adipiscing elit.', wraplength=220, justify='left', bg='white').pack(anchor='w')
player = tk.Frame(right, bg='#fdf5f5', bd=1, relief='flat', padx=8, pady=8)
player.pack(fill='x', padx=8, pady=(10,6))
tk.Label(player, text='Reproductor música', font=self.font_header, bg='#fdf5f5', fg=ACCENT_DARK).pack(pady=(0,6))
def styled_btn(parent, text, command):
return tk.Button(
parent,
text=text,
command=command,
bg='#ffe3df',
activebackground='#ffd2ca',
fg=TEXT_COLOR,
relief='flat',
width=20,
pady=4
)
styled_btn(player, 'Seleccionar archivo', self._select_music).pack(pady=3)
action_row = tk.Frame(player, bg='#fdf5f5')
action_row.pack(pady=3)
styled_btn(action_row, 'Pausa', self._pause_music).pack(side='left', padx=4)
styled_btn(action_row, 'Reanudar', self._resume_music).pack(side='left', padx=4)
styled_btn(player, 'Quitar', self._stop_music).pack(pady=3)
def _build_status_bar(self) -> None:
status = tk.Frame(self, bg='#f1f1f1', bd=2, relief='ridge')
status.grid(row=2, column=0, columnspan=3, sticky='ew')
for idx in range(3):
status.columnconfigure(idx, weight=1)
tk.Label(status, text='Correos sin leer', font=('Arial', 11, 'bold'), bg='#f1f1f1').grid(row=0, column=0, padx=16, pady=6, sticky='w')
self.traffic_label = tk.Label(
status,
text='Red: ↑ --.- KB/s ↓ --.- KB/s',
font=('Arial', 11, 'bold'),
bg='#f1f1f1'
)
self.traffic_label.grid(row=0, column=1, padx=16, pady=6)
self.clock_label = tk.Label(status, text='--:--:--', font=('Arial', 12, 'bold'), bg='#f1f1f1')
self.clock_label.grid(row=0, column=2, padx=16, pady=6, sticky='e')
# ------------------------ acciones ------------------------
def _open_web(self, url: str) -> None:
webbrowser.open(url)
def _launch_process(self, cmd: list[str]) -> None:
try:
subprocess.Popen(cmd)
self._log(f'Proceso lanzado: {cmd}')
self._show_text_result('Aplicaciones', f'Se lanzó el proceso: {" ".join(cmd)}')
except Exception as exc:
messagebox.showerror('Error', f'No se pudo lanzar {cmd}: {exc}')
def _run_backup_script(self) -> None:
source = filedialog.askdirectory(title='Selecciona la carpeta a respaldar')
if not source:
return
destination = filedialog.askdirectory(title='Selecciona la carpeta destino para la copia')
if not destination:
return
ts = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
src_name = os.path.basename(os.path.normpath(source)) or 'backup'
target = os.path.join(destination, f'{src_name}_{ts}')
if os.path.exists(target):
messagebox.showerror('Backup', f'Ya existe la carpeta destino:\n{target}')
return
self._log(f'Iniciando copia de seguridad de {source} a {target}')
self._show_text_result('Copias de seguridad', f'Copiando archivos...\nOrigen: {source}\nDestino: {target}')
self.notebook.select(self.tab_resultados)
def worker() -> None:
try:
copied_files, copied_bytes, skipped = self._copy_directory_with_progress(source, target)
except Exception as exc:
self._scraping_queue.put(('error', f'Backup: {exc}'))
return
summary = (
f'Copia completada correctamente.\n\n'
f'Origen: {source}\nDestino: {target}\n'
f'Archivos copiados: {copied_files}\n'
f'Tamaño total: {copied_bytes / (1024*1024):.2f} MB'
)
if skipped:
summary += f"\nArchivos omitidos: {skipped} (consulta el panel de notas)"
self._scraping_queue.put(('backup_success', summary, target))
threading.Thread(target=worker, daemon=True).start()
def _copy_directory_with_progress(self, source: str, target: str) -> tuple[int, int, int]:
copied_files = 0
copied_bytes = 0
skipped = 0
for root, dirs, files in os.walk(source):
rel = os.path.relpath(root, source)
dest_dir = os.path.join(target, rel) if rel != '.' else target
os.makedirs(dest_dir, exist_ok=True)
for file in files:
src_path = os.path.join(root, file)
dest_path = os.path.join(dest_dir, file)
try:
shutil.copy2(src_path, dest_path)
copied_files += 1
try:
copied_bytes += os.path.getsize(dest_path)
except OSError:
pass
except Exception as exc:
skipped += 1
self._log(f'Archivo omitido ({src_path}): {exc}')
continue
if copied_files % 20 == 0:
self._log(f'Copia en progreso: {copied_files} archivos...')
return copied_files, copied_bytes, skipped
def _open_saved_file(self, path: str) -> None:
if not path or not os.path.exists(path):
messagebox.showwarning('Archivo', 'El archivo indicado ya no existe.')
return
try:
if sys.platform.startswith('win'):
os.startfile(path) # type: ignore[attr-defined]
elif sys.platform == 'darwin':
subprocess.Popen(['open', path])
else:
subprocess.Popen(['xdg-open', path])
except Exception as exc:
messagebox.showerror('Archivo', f'No se pudo abrir el archivo: {exc}')
def _show_file_popup(self, path: str) -> None:
popup = tk.Toplevel(self)
popup.title('Archivo generado')
popup.configure(bg='white', padx=16, pady=16)
popup.resizable(False, False)
popup.transient(self)
popup.grab_set()
tk.Label(popup, text='Se creó el archivo con los datos:', font=('Arial', 12, 'bold'), bg='white').pack(anchor='w')
entry = tk.Entry(popup, width=60)
entry.pack(fill='x', pady=8)
entry.insert(0, path)
entry.config(state='readonly')
buttons = tk.Frame(popup, bg='white')
buttons.pack(fill='x', pady=(8,0))
tk.Button(buttons, text='Abrir archivo', command=lambda p=path: self._open_saved_file(p)).pack(side='left', padx=4)
tk.Button(buttons, text='Cerrar', command=popup.destroy).pack(side='right', padx=4)
def _open_text(self) -> None:
path = filedialog.askopenfilename(filetypes=[('Texto', '*.txt'), ('Todos', '*.*')])
if not path:
return
with open(path, 'r', encoding='utf-8', errors='ignore') as fh:
self.editor.delete('1.0', 'end')
self.editor.insert('1.0', fh.read())
self._log(f'Abriste {path}')
def _save_text(self) -> None:
path = filedialog.asksaveasfilename(defaultextension='.txt')
if not path:
return
with open(path, 'w', encoding='utf-8') as fh:
fh.write(self.editor.get('1.0', 'end'))
self._log(f'Guardado en {path}')
def _open_simple_scraping_popup(self) -> None:
if not (REQUESTS_AVAILABLE and BEAUTIFULSOUP_AVAILABLE):
messagebox.showerror('Scraping', 'Instala requests y beautifulsoup4 para usar esta función.')
return
if self.simple_scraping_popup and tk.Toplevel.winfo_exists(self.simple_scraping_popup):
self.simple_scraping_popup.lift()
return
popup = tk.Toplevel(self)
popup.title('Scraping simple')
popup.configure(bg='white', padx=18, pady=18)
popup.resizable(False, False)
popup.transient(self)
popup.grab_set()
tk.Label(
popup,
text='Introduce una o varias URL completas (una por línea):',
bg='white',
font=('Arial', 11, 'bold'),
justify='left'
).pack(anchor='w')
url_box = scrolledtext.ScrolledText(popup, width=60, height=5)
url_box.pack(pady=(8, 4))
url_box.insert('1.0', 'https://es.wallapop.com\nhttps://www.wikipedia.org')
error_label = tk.Label(popup, text='', fg='red', bg='white', wraplength=420, justify='left')
error_label.pack(anchor='w')
btn_frame = tk.Frame(popup, bg='white')
btn_frame.pack(fill='x', pady=(10, 0))
def _close_popup() -> None:
if self.simple_scraping_popup:
try:
self.simple_scraping_popup.destroy()
except Exception:
pass
self.simple_scraping_popup = None
def _start(_: object | None = None) -> None:
raw = url_box.get('1.0', 'end').strip()
urls = [line.strip() for line in raw.splitlines() if line.strip()]
if not urls:
error_label.config(text='Añade al menos una URL completa.')
return
invalid = [u for u in urls if not u.startswith(('http://', 'https://'))]
if invalid:
error_label.config(text=f'Las URL deben empezar por http:// o https:// (revisa: {invalid[0]})')
return
_close_popup()
self._start_simple_scraping(urls)
tk.Button(btn_frame, text='Iniciar', command=_start, bg='#d6f2ce').pack(side='left', padx=4)
tk.Button(btn_frame, text='Cancelar', command=_close_popup).pack(side='right', padx=4)
url_box.bind('<Control-Return>', _start)
popup.bind('<Return>', _start)
popup.protocol('WM_DELETE_WINDOW', _close_popup)
self.simple_scraping_popup = popup
url_box.focus_set()
def _start_simple_scraping(self, urls: list[str]) -> None:
if not (REQUESTS_AVAILABLE and BEAUTIFULSOUP_AVAILABLE):
messagebox.showerror('Scraping', 'Instala requests y beautifulsoup4 para usar esta función.')
return
cleaned = [url.strip() for url in urls if url.strip()]
if not cleaned:
messagebox.showinfo('Scraping', 'No hay URL válidas para analizar.')
return
self._log(f'Scraping simple para {len(cleaned)} URL(s)')
preview = 'Analizando las siguientes URL:\n' + '\n'.join(f'{url}' for url in cleaned)
self._show_text_result('Scraping simple', preview)
self.notebook.select(self.tab_resultados)
threading.Thread(target=self._simple_scraping_runner, args=(cleaned,), daemon=True).start()
def _simple_scraping_runner(self, urls: list[str]) -> None:
results: list[dict[str, str]] = []
lock = threading.Lock()
threads: list[threading.Thread] = []
for url in urls:
t = threading.Thread(target=self._simple_scraping_worker, args=(url, results, lock), daemon=True)
t.start()
threads.append(t)
for thread in threads:
thread.join()
self._scraping_queue.put(('generic_success', urls, results))
def _simple_scraping_worker(self, url: str, results: list[dict[str, str]], lock: threading.Lock) -> None:
timestamp = datetime.datetime.now().strftime('%H:%M:%S')
if not (requests and BEAUTIFULSOUP_AVAILABLE):
data = {
'url': url,
'error': 'Dependencias de scraping no disponibles',
'timestamp': timestamp
}
else:
try:
time.sleep(1)
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36'
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
title = soup.title.string.strip() if soup.title and soup.title.string else 'Sin título'
num_links = len(soup.find_all('a'))
num_images = len(soup.find_all('img'))
num_paragraphs = len(soup.find_all('p'))
meta_desc = ''
meta_tag = soup.find('meta', attrs={'name': 'description'})
if meta_tag and meta_tag.get('content'):
content = meta_tag['content']
meta_desc = content[:100] + '...' if len(content) > 100 else content
data = {
'url': url,
'titulo': title,
'descripcion': meta_desc,
'num_links': num_links,
'num_imagenes': num_images,
'num_parrafos': num_paragraphs,
'longitud': len(response.text),
'status_code': response.status_code,
'timestamp': timestamp
}
except Exception as exc:
data = {
'url': url,
'error': str(exc),
'timestamp': timestamp
}
with lock:
results.append(data)
# --- Funcionalidad de Web Scraping (Wallapop) ---
def _open_wallapop_popup(self) -> None:
if not (REQUESTS_AVAILABLE and BEAUTIFULSOUP_AVAILABLE):
messagebox.showerror('Wallapop', 'Instala requests y beautifulsoup4 para analizar enlaces.')
return
if self.scraping_popup and tk.Toplevel.winfo_exists(self.scraping_popup):
self.scraping_popup.lift()
return
popup = tk.Toplevel(self)
popup.title('Búsqueda Wallapop')
popup.configure(bg='white', padx=18, pady=18)
popup.resizable(False, False)
popup.transient(self)
popup.grab_set()
tk.Label(
popup,
text='Escribe el producto que quieres buscar en Wallapop:',
bg='white',
font=('Arial', 11, 'bold')
).pack(anchor='w')
entry = tk.Entry(popup, width=60)
entry.pack(pady=(8, 4))
entry.insert(0, 'PlayStation 5')
controls = tk.Frame(popup, bg='white')
controls.pack(fill='x', pady=(2, 0))
tk.Label(controls, text='Resultados a mostrar:', bg='white').pack(side='left')
results_spin = tk.Spinbox(controls, from_=1, to=20, width=5, justify='center')
results_spin.pack(side='left', padx=(6, 0))
results_spin.delete(0, 'end')
results_spin.insert(0, '5')
error_label = tk.Label(popup, text='', fg='red', bg='white', wraplength=380, justify='left')
error_label.pack(anchor='w')
btn_frame = tk.Frame(popup, bg='white')
btn_frame.pack(fill='x', pady=(10, 0))
def _close_popup() -> None:
if self.scraping_popup:
try:
self.scraping_popup.destroy()
except Exception:
pass
self.scraping_popup = None
def _start(_: object | None = None) -> None:
query = entry.get().strip()
if not query:
error_label.config(text='Necesitas introducir un producto a buscar.')
return
try:
limit = max(1, min(20, int(results_spin.get())))
except ValueError:
error_label.config(text='Selecciona un número válido de resultados.')
return
_close_popup()
self._start_wallapop_search(query, limit)
tk.Button(btn_frame, text='Buscar', command=_start, bg='#d6f2ce').pack(side='left', padx=4)
tk.Button(btn_frame, text='Cancelar', command=_close_popup).pack(side='right', padx=4)
entry.bind('<Return>', _start)
popup.protocol('WM_DELETE_WINDOW', _close_popup)
self.scraping_popup = popup
entry.focus_set()
def _start_wallapop_search(self, query: str, limit: int) -> None:
if not (REQUESTS_AVAILABLE and BEAUTIFULSOUP_AVAILABLE):
messagebox.showerror('Wallapop', 'Instala requests y beautifulsoup4 para usar esta función.')
return
clean_query = query.strip()
if not clean_query:
messagebox.showerror('Wallapop', 'Escribe el producto que quieres buscar.')
return
max_results = max(1, min(20, int(limit)))
self._log(f'Buscando en Wallapop: "{clean_query}" (máx. {max_results})')
self._show_text_result(
'Wallapop',
f'Buscando "{clean_query}" en Wallapop...\nMostrando hasta {max_results} resultados.'
)
self.notebook.select(self.tab_resultados)
threading.Thread(
target=self._wallapop_search_worker,
args=(clean_query, max_results),
daemon=True
).start()
def _wallapop_search_worker(self, query: str, limit: int) -> None:
try:
results = self._fetch_wallapop_search_results(query, limit)
if not results:
raise ValueError('No se encontraron anuncios para tu búsqueda.')
self._scraping_queue.put(('search_success', query, results))
except Exception as exc:
self._scraping_queue.put(('search_error', str(exc)))
def _fetch_wallapop_search_results(self, query: str, limit: int) -> list[dict[str, str]]:
errors: list[str] = []
for fetcher in (self._fetch_wallapop_search_via_api, self._fetch_wallapop_search_via_html):
try:
results = fetcher(query, limit)
if results:
return results[:limit]
except Exception as exc:
errors.append(str(exc))
if errors:
raise ValueError('No se pudieron obtener resultados de Wallapop.\n' + '\n'.join(errors))
return []
def _fetch_wallapop_search_via_api(self, query: str, limit: int) -> list[dict[str, str]]:
if not requests:
return []
def _base_params() -> dict[str, object]:
params: dict[str, object] = {
'keywords': query,
'source': 'search_box',
'filters_source': 'quick_filters',
'order_by': 'most_relevance',
'latitude': f'{WALLAPOP_LATITUDE:.6f}',
'longitude': f'{WALLAPOP_LONGITUDE:.6f}',
'country_code': WALLAPOP_COUNTRY_CODE,
'language': WALLAPOP_LANGUAGE
}
if WALLAPOP_CATEGORY_ID:
params['category_id'] = WALLAPOP_CATEGORY_ID
return params
results: list[dict[str, str]] = []
seen_ids: set[str] = set()
next_page: str | None = None
attempts = 0
while len(results) < limit and attempts < 6:
params = {'next_page': next_page} if next_page else _base_params()
resp = requests.get(
'https://api.wallapop.com/api/v3/search',
params=params,
headers=WALLAPOP_HEADERS,
timeout=20
)
try:
resp.raise_for_status()
except requests.HTTPError as exc: # type: ignore[attr-defined]
if resp.status_code == 403:
raise ValueError(
'Wallapop devolvió 403. Asegúrate de definir WALLAPOP_DEVICE_ID/WALLAPOP_MPID '
'con valores obtenidos desde tu navegador.'
) from exc
raise
payload = resp.json()
items = self._extract_wallapop_items(payload)
if not items:
break
for entry in items:
if not isinstance(entry, dict):
continue
item_id = entry.get('id') or entry.get('itemId') or entry.get('item_id')
if not item_id:
continue
item_id = str(item_id)
if item_id in seen_ids:
continue
seen_ids.add(item_id)
slug = entry.get('web_slug') or entry.get('slug') or item_id
seller_info = entry.get('seller') if isinstance(entry.get('seller'), dict) else None
detail = self._format_wallapop_item(entry, self._build_wallapop_url(str(slug)), seller_info)
results.append(detail)
if len(results) >= limit:
break
next_page = self._extract_wallapop_next_page(payload)
if not next_page:
break
attempts += 1
return results
def _fetch_wallapop_search_via_html(self, query: str, limit: int) -> list[dict[str, str]]:
if not (requests and BEAUTIFULSOUP_AVAILABLE):
return []
params = {'keywords': query}
resp = requests.get(
'https://es.wallapop.com/app/search',
params=params,
headers=WALLAPOP_HEADERS,
timeout=20
)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, 'html.parser')
script = soup.find('script', id='__NEXT_DATA__')
if not script or not script.string:
return []
data = json.loads(script.string)
page_props = data.get('props', {}).get('pageProps', {})
items: list[dict[str, object]] = []
for key in ('searchResults', 'results', 'items'):
candidate = page_props.get(key)
if isinstance(candidate, list):
items = candidate
break
if not items:
initial_state = page_props.get('initialState', {})
if isinstance(initial_state, dict):
for key in ('items', 'results'):
candidate = initial_state.get(key)
if isinstance(candidate, list):
items = candidate
break
if not items:
search_state = initial_state.get('search')
if isinstance(search_state, dict):
candidate = search_state.get('items')
if isinstance(candidate, list):
items = candidate
elif isinstance(search_state.get('search'), dict):
nested = search_state['search'].get('items')
if isinstance(nested, list):
items = nested
if not items:
return []
results: list[dict[str, str]] = []
seen: set[str] = set()
for entry in items:
if not isinstance(entry, dict):
continue
slug = (
entry.get('slug')
or entry.get('webSlug')
or entry.get('web_slug')
or entry.get('shareLink')
or entry.get('share_link')
or entry.get('url')
)
url = self._build_wallapop_url(str(slug)) if slug else ''
item_id = entry.get('id') or entry.get('itemId') or entry.get('item_id')
entry_key = str(item_id or url)
if not entry_key or entry_key in seen:
continue
seen.add(entry_key)
detail: dict[str, str] | None = None
if url:
try:
detail = self._fetch_wallapop_item(url)
except Exception:
detail = None
if not detail and item_id:
try:
detail = self._fetch_wallapop_item_from_api(str(item_id))
except Exception:
detail = None
if detail:
if url and not detail.get('link'):
detail['link'] = url
results.append(detail)
if len(results) >= limit:
break
return results
def _extract_wallapop_items(self, payload: dict[str, object]) -> list[dict[str, object]]:
items: list[dict[str, object]] = []
data_block = payload.get('data')
if isinstance(data_block, dict):
section = data_block.get('section')
if isinstance(section, dict):
section_payload = section.get('payload')
if isinstance(section_payload, dict):
raw_items = section_payload.get('items') or section_payload.get('section_items')
if isinstance(raw_items, list):
items.extend(item for item in raw_items if isinstance(item, dict))
elif isinstance(data_block.get('items'), list):
items.extend(item for item in data_block['items'] if isinstance(item, dict))
if not items:
web_results = payload.get('web_search_results')
if isinstance(web_results, dict):
sections = web_results.get('sections')
if isinstance(sections, list):
for section in sections:
if not isinstance(section, dict):
continue
raw_items = section.get('items') or section.get('section_items')
if isinstance(raw_items, list):
items.extend(item for item in raw_items if isinstance(item, dict))
return items
def _extract_wallapop_next_page(self, payload: dict[str, object]) -> str | None:
meta = payload.get('meta')
if isinstance(meta, dict):
token = meta.get('next_page')
if isinstance(token, str) and token:
return token
headers = payload.get('headers')
if isinstance(headers, dict):
token = headers.get('X-NextPage') or headers.get('x-nextpage')
if isinstance(token, str) and token:
return token
return None
def _fetch_wallapop_item(self, url: str) -> dict[str, str]:
if not requests or not BEAUTIFULSOUP_AVAILABLE:
raise RuntimeError('Dependencias de scraping no disponibles')
resp = requests.get(url, headers=WALLAPOP_HEADERS, timeout=20)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, 'html.parser')
script = soup.find('script', id='__NEXT_DATA__')
if not script or not script.string:
raise ValueError('No se pudo encontrar la información del anuncio (sin datos Next.js).')
data = json.loads(script.string)
page_props = data.get('props', {}).get('pageProps', {})
item = page_props.get('item') or page_props.get('itemInfo', {}).get('item')
if not item:
raise ValueError('El formato de la página cambió y no se pudo leer el anuncio.')
seller_info = page_props.get('itemSeller') if isinstance(page_props.get('itemSeller'), dict) else None
return self._format_wallapop_item(item, url, seller_info)
def _fetch_wallapop_item_from_api(self, item_id: str) -> dict[str, str]:
if not requests:
raise RuntimeError('Dependencias de scraping no disponibles')
api_url = f'https://api.wallapop.com/api/v3/items/{item_id}'
resp = requests.get(api_url, headers=WALLAPOP_HEADERS, timeout=15)
resp.raise_for_status()
data = resp.json()
item = data.get('item') if isinstance(data.get('item'), dict) else data
if not isinstance(item, dict):
raise ValueError('Respuesta del API sin datos del artículo.')
slug = item.get('web_slug') or item.get('slug') or item.get('share_link')
link = self._build_wallapop_url(str(slug)) if slug else f'https://es.wallapop.com/item/{item_id}'
seller_info = data.get('seller') if isinstance(data.get('seller'), dict) else item.get('seller')
seller_dict = seller_info if isinstance(seller_info, dict) else None
return self._format_wallapop_item(item, link, seller_dict)
def _build_wallapop_url(self, slug_or_url: str) -> str:
if not slug_or_url:
return ''
text = slug_or_url.strip()
if text.startswith('http://') or text.startswith('https://'):
return text
text = text.strip('/')
if not text.startswith('item/'):
text = f'item/{text}'
return f'https://es.wallapop.com/{text}'
def _format_wallapop_item(
self,
item: dict[str, object],
url: str,
seller_info: dict[str, object] | None = None
) -> dict[str, str]:
title_field = item.get('title')
if isinstance(title_field, dict):
title = title_field.get('original') or title_field.get('translated') or '(Sin título)'
elif isinstance(title_field, str):
title = title_field or '(Sin título)'
else:
title = '(Sin título)'
price_text = 'Precio no disponible'
price_data = item.get('price')
price_cash = None
if isinstance(price_data, dict):
if isinstance(price_data.get('cash'), dict):
price_cash = price_data['cash']
else:
price_cash = price_data
if isinstance(price_cash, dict):
amount = price_cash.get('amount')
currency = price_cash.get('currency') or 'EUR'
if amount is not None:
price_text = f'{amount} {currency}'.strip()
elif isinstance(price_data, (int, float, str)):
price_text = f'{price_data} EUR'
desc_field = item.get('description')
if isinstance(desc_field, dict):
description = (desc_field.get('original') or desc_field.get('translated') or '').strip()
elif isinstance(desc_field, str):
description = desc_field.strip()
else:
description = ''
location = item.get('location') if isinstance(item.get('location'), dict) else {}
location_parts = []
if isinstance(location, dict):
for part in (location.get('city'), location.get('postalCode'), location.get('regionName')):
if part:
location_parts.append(part)
location_text = ', '.join(location_parts) if location_parts else 'Ubicación no disponible'
seller_name = ''
for candidate in (seller_info, item.get('seller'), item.get('user'), item.get('userInfo')):
if isinstance(candidate, dict):
seller_name = (
candidate.get('microName')
or candidate.get('name')
or candidate.get('userName')
or ''
)
if seller_name:
break
if not seller_name:
seller_name = 'Vendedor no disponible'
categories = ', '.join(
cat.get('name')
for cat in item.get('taxonomies', [])
if isinstance(cat, dict) and cat.get('name')
) or 'Sin categoría'
car_info = item.get('carInfo') if isinstance(item.get('carInfo'), dict) else {}
car_bits: list[str] = []
if isinstance(car_info, dict):
for key in ('year', 'km', 'engine', 'gearBox'):
field = car_info.get(key)
text = None
if isinstance(field, dict):
text = field.get('text') or field.get('iconText')
elif isinstance(field, str):
text = field
if text:
car_bits.append(text)
extra = ', '.join(car_bits)
images = item.get('images') if isinstance(item.get('images'), list) else []
image_url = ''
for img in images:
if not isinstance(img, dict):
continue
urls = img.get('urls') if isinstance(img.get('urls'), dict) else {}
if isinstance(urls, dict):
image_url = urls.get('medium') or urls.get('big') or urls.get('small') or ''
else:
image_url = img.get('url', '') if isinstance(img.get('url'), str) else ''
if image_url:
break
return {
'title': str(title),
'price': price_text,
'description': description,
'link': url,
'location': location_text,
'seller': seller_name,
'category': categories,
'extra': extra,
'image': image_url
}
def _process_scraping_queue(self) -> None:
if not self._running:
return
try:
while True:
status, *payload = self._scraping_queue.get_nowait()
if status == 'error':
message = payload[0] if payload else 'Error desconocido'
self._handle_scraping_error(str(message))
elif status == 'search_success':
query, data = payload
self._handle_wallapop_search_success(str(query), list(data))
elif status == 'search_error':
message = payload[0] if payload else 'Error desconocido'
self._handle_scraping_error(str(message))
elif status == 'generic_success':
urls, data = payload
self._handle_generic_scraping_success(list(urls), list(data))
elif status == 'backup_success':
summary, target = payload
self._handle_backup_success(str(summary), str(target))
except queue.Empty:
pass
finally:
if self._running:
self.after(100, self._process_scraping_queue)
def _handle_wallapop_search_success(self, query: str, results: list[dict[str, str]]) -> None:
count = len(results)
self._log(f'Se encontraron {count} anuncio(s) para "{query}" en Wallapop')
def builder(parent: tk.Frame) -> None:
frame = tk.Frame(parent, bg='white')
frame.pack(fill='both', expand=True, padx=12, pady=12)
summary = tk.Label(
frame,
text=f'{count} resultado(s) para "{query}"',
font=('Arial', 14, 'bold'),
bg='white',
fg='#2c3e50'
)
summary.pack(anchor='w', pady=(0, 6))
box = scrolledtext.ScrolledText(frame, wrap='word')
box.pack(fill='both', expand=True)
blocks: list[str] = []
for idx, item in enumerate(results, start=1):
block_lines = [
f'{idx}. {item.get("title", "Sin título")}',
f'Precio: {item.get("price", "N/D")}',
f'Vendedor: {item.get("seller", "N/D")}',
f'Ubicación: {item.get("location", "N/D")}',
f'Categoría: {item.get("category", "N/D")}'
]
extra = item.get('extra', '').strip()
if extra:
block_lines.append(f'Detalles: {extra}')
description = (item.get('description') or '').strip()
if description:
block_lines.append('Descripción:')
block_lines.append(description)
block_lines.append(f'Enlace: {item.get("link", "N/D")}')
block_lines.append('-' * 60)
blocks.append('\n'.join(block_lines))
box.insert('1.0', '\n\n'.join(blocks))
box.config(state='disabled')
self._render_results_view('Resultados Wallapop', builder)
self.notebook.select(self.tab_resultados)
try:
path = self._save_wallapop_results_to_file(query, results)
except Exception as exc:
self._log(f'[ERROR] No se pudo guardar el archivo: {exc}')
messagebox.showerror('Wallapop', f'No se pudo guardar el archivo: {exc}')
return
message = (
f'Se guardó el archivo con los resultados en:\n{path}\n\n'
'¿Quieres abrirlo ahora?'
)
if messagebox.askyesno('Wallapop', message):
self._open_saved_file(path)
else:
self._log(f'Archivo de resultados guardado en {path}')
def _handle_generic_scraping_success(self, urls: list[str], results: list[dict[str, str]]) -> None:
self._log(f'Scraping simple completado para {len(results)} URL(s)')
order = {url: idx for idx, url in enumerate(urls)}
ordered = sorted(results, key=lambda item: order.get(item.get('url', ''), len(order)))
def builder(parent: tk.Frame) -> None:
frame = tk.Frame(parent, bg='white')
frame.pack(fill='both', expand=True, padx=12, pady=12)
summary = tk.Label(
frame,
text='Resultados de scraping simple',
font=('Arial', 14, 'bold'),
bg='white'
)
summary.pack(anchor='w', pady=(0, 6))
box = scrolledtext.ScrolledText(frame, wrap='word')
box.pack(fill='both', expand=True)
blocks: list[str] = []
for idx, item in enumerate(ordered, start=1):
lines = [f'{idx}. {item.get("url", "URL desconocida")}', f"Hora: {item.get('timestamp', 'N/D')}" ]
if 'error' in item:
lines.append(f"Error: {item.get('error')}")
else:
lines.extend([
f"Título: {item.get('titulo', 'Sin título')}",
f"Descripción: {item.get('descripcion', '')}",
f"Estado HTTP: {item.get('status_code', 'N/D')}",
f"Longitud HTML: {item.get('longitud', 0)} caracteres",
f"Links: {item.get('num_links', 0)} | Imágenes: {item.get('num_imagenes', 0)} | Párrafos: {item.get('num_parrafos', 0)}"
])
lines.append('-' * 60)
blocks.append('\n'.join(lines))
box.insert('1.0', '\n\n'.join(blocks) if blocks else 'No se generaron resultados.')
box.config(state='disabled')
self._render_results_view('Scraping simple', builder)
self.notebook.select(self.tab_resultados)
def _handle_backup_success(self, summary: str, target: str) -> None:
self._log('Copia de seguridad finalizada')
self._show_text_result('Copias de seguridad', summary)
self.notebook.select(self.tab_resultados)
if messagebox.askyesno('Backup', f'{summary}\n\n¿Quieres abrir la carpeta destino?'):
self._open_saved_file(target)
def _save_wallapop_results_to_file(self, query: str, results: list[dict[str, str]]) -> str:
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f'wallapop_search_{timestamp}.txt'
path = os.path.join(tempfile.gettempdir(), filename)
lines = [
f'Consulta: {query}',
f'Resultados: {len(results)}',
f'Generado: {datetime.datetime.now():%Y-%m-%d %H:%M:%S}',
''
]
for idx, item in enumerate(results, start=1):
lines.append(f'Resultado {idx}')
lines.append(f'Título: {item.get("title", "Sin título")}')
lines.append(f'Precio: {item.get("price", "N/D")}')
lines.append(f'Vendedor: {item.get("seller", "N/D")}')
lines.append(f'Ubicación: {item.get("location", "N/D")}')
lines.append(f'Categoría: {item.get("category", "N/D")}')
if item.get('extra'):
lines.append(f'Detalles: {item.get("extra")}')
description = (item.get('description') or '').strip()
if description:
lines.append('Descripción:')
lines.append(description)
lines.append(f'Enlace: {item.get("link", "N/D")}')
if item.get('image'):
lines.append(f'Imagen: {item.get("image")}')
lines.append('-' * 60)
with open(path, 'w', encoding='utf-8') as fh:
fh.write('\n'.join(lines))
return path
def _handle_scraping_error(self, message: str) -> None:
self._log(f'[ERROR] Wallapop: {message}')
messagebox.showerror('Wallapop', message)
# --- Fin Web Scraping ---
def _open_game_window(self) -> None:
self._render_results_view('Carrera de camellos', self._build_camel_race_view)
def _build_camel_race_view(self, parent: tk.Frame) -> None:
container = tk.Frame(parent, bg='white')
container.pack(fill='both', expand=True)
tk.Label(
container,
text='Juego de camellos (sincronización de hilos)',
font=('Arial', 18, 'bold'),
bg='white'
).pack(pady=(16, 8))
canvas = tk.Canvas(container, height=380, bg='#fdfdfd', bd=2, relief='sunken')
canvas.pack(fill='both', expand=True, padx=16, pady=12)
status = tk.Label(container, text='Listo para correr', font=('Arial', 14, 'bold'), bg='white', fg='#2c3e50')
status.pack(pady=(0, 10))
controls = tk.Frame(container, bg='white')
controls.pack(pady=(0, 10))
tk.Button(
controls,
text='Iniciar carrera',
font=('Arial', 13, 'bold'),
width=16,
command=lambda: self._start_race(canvas, status)
).grid(row=0, column=0, padx=12, pady=4)
tk.Button(
controls,
text='Volver a resultados',
font=('Arial', 13),
width=16,
command=self._reset_results_view
).grid(row=0, column=1, padx=12, pady=4)
tk.Label(controls, text='Número de camellos:', bg='white', font=('Arial', 12)).grid(row=1, column=0, pady=6)
camel_spin = tk.Spinbox(
controls,
from_=3,
to=8,
width=5,
justify='center',
font=('Arial', 12),
command=lambda: self._set_camel_count(int(camel_spin.get()))
)
camel_spin.grid(row=1, column=1, pady=6)
camel_spin.delete(0, 'end')
camel_spin.insert(0, str(self.game_camel_count))
tk.Label(controls, text='Velocidad (1-5):', bg='white', font=('Arial', 12)).grid(row=2, column=0, pady=6)
speed_scale = tk.Scale(
controls,
from_=1,
to=5,
orient='horizontal',
resolution=0.5,
length=180,
bg='white',
command=lambda value: self._set_speed_factor(float(value))
)
speed_scale.grid(row=2, column=1, pady=6)
speed_scale.set(self.game_speed_factor)
self.game_window_canvas = canvas
self.game_window_status = status
def _set_camel_count(self, value: int) -> None:
self.game_camel_count = max(3, min(8, int(value)))
def _set_speed_factor(self, value: float) -> None:
self.game_speed_factor = max(1.0, min(5.0, float(value)))
def _start_race(self, canvas=None, status_label=None) -> None:
target_canvas = canvas or self.game_window_canvas
if target_canvas is None:
messagebox.showinfo('Juego', 'Abre el juego para iniciar la carrera.')
return
self._stop_game_queue_processor()
target_canvas.delete('all')
target_canvas.update_idletasks()
width = target_canvas.winfo_width() or int(target_canvas.cget('width'))
height = target_canvas.winfo_height() or int(target_canvas.cget('height'))
finish = width - 40
camel_count = max(3, min(8, int(self.game_camel_count)))
spacing = height / (camel_count + 1)
palette = ['#ff7675', '#74b9ff', '#55efc4', '#f1c40f', '#9b59b6', '#1abc9c', '#e67e22', '#95a5a6']
racers = []
for idx in range(camel_count):
y = spacing * (idx + 1)
target_canvas.create_line(20, y, finish, y, dash=(3,4))
color = palette[idx % len(palette)]
rect = target_canvas.create_rectangle(30, y-22, 130, y+22, fill=color)
racers.append(rect)
status = status_label or self.game_window_status
if status:
status.config(text='¡Carrera en curso!')
self.game_move_queue = queue.Queue()
self.game_done_event = threading.Event()
self.game_finish_line = finish
self.game_racer_names = [f'Camello {i+1}' for i in range(camel_count)]
self.game_queue_processor_active = True
self.after(30, self._process_game_queue)
indices = list(range(camel_count))
random.shuffle(indices)
for idx in indices:
rect = racers[idx]
threading.Thread(target=self._race_worker, args=(idx, rect), daemon=True).start()
def _race_worker(self, index: int, rect_id: int) -> None:
while True:
if not self.game_queue_processor_active or not self.game_move_queue or not self.game_done_event:
return
if self.game_done_event.is_set():
return
base_min = 3
base_max = 9
speed = max(1.0, float(self.game_speed_factor))
step = random.randint(int(base_min * speed), max(int(base_max * speed), int(base_min * speed) + 1))
self.game_move_queue.put(('move', (rect_id, step, index)))
sleep_time = max(0.02, 0.08 / speed)
time.sleep(random.uniform(sleep_time * 0.5, sleep_time * 1.2))
def _process_game_queue(self) -> None:
if not self.game_queue_processor_active or not self.game_move_queue:
return
canvas = self.game_window_canvas
if not canvas:
return
try:
while True:
action, payload = self.game_move_queue.get_nowait()
if action == 'move':
rect_id, step, idx = payload
try:
canvas.move(rect_id, step, 0)
except Exception:
continue
coords = canvas.coords(rect_id)
if coords and coords[2] >= self.game_finish_line and self.game_done_event and not self.game_done_event.is_set():
self.game_done_event.set()
if self.game_window_status:
self.game_window_status.config(text=f'{self.game_racer_names[idx]} ganó la carrera')
else:
continue
except queue.Empty:
pass
if not self.game_done_event or not self.game_done_event.is_set():
if self.game_queue_processor_active:
self.after(30, self._process_game_queue)
else:
self.game_queue_processor_active = False
def _stop_game_queue_processor(self) -> None:
self.game_queue_processor_active = False
self.game_move_queue = None
if self.game_done_event:
try:
self.game_done_event.set()
except Exception:
pass
self.game_done_event = None
def _start_alarm(self) -> None:
try:
minutes = max(1, int(self.alarm_minutes.get()))
except ValueError:
messagebox.showwarning('Alarmas', 'Ingresa un número válido de minutos.')
return
title = self.alarm_title.get().strip() or f'Alarma {self.alarm_counter}'
end_time = datetime.datetime.now() + datetime.timedelta(minutes=minutes)
alarm = {'id': self.alarm_counter, 'title': title, 'end': end_time}
self.alarm_counter += 1
self.active_alarms.append(alarm)
self.alarm_title.delete(0, 'end')
self._update_alarm_list()
self._log(f'Alarma "{title}" programada para las {end_time.strftime("%H:%M:%S")}')
def _refresh_alarms_loop(self) -> None:
if not self._running:
return
now = datetime.datetime.now()
triggered: list[dict[str, datetime.datetime | str]] = []
for alarm in list(self.active_alarms):
remaining = (alarm['end'] - now).total_seconds() # type: ignore[arg-type]
if remaining <= 0:
triggered.append(alarm)
for alarm in triggered:
self.active_alarms.remove(alarm)
self._trigger_alarm(alarm)
self._update_alarm_list()
self.after(1000, self._refresh_alarms_loop)
def _update_alarm_list(self) -> None:
if not hasattr(self, 'alarm_list'):
return
selection = self.alarm_list.curselection()
selected_id = None
if selection:
idx = selection[0]
if idx < len(self.active_alarms):
selected_id = self.active_alarms[idx]['id']
self.alarm_list.delete(0, 'end')
if not self.active_alarms:
self.alarm_status.config(text='Sin alarmas programadas')
return
now = datetime.datetime.now()
for index, alarm in enumerate(self.active_alarms):
remaining = max(0, int((alarm['end'] - now).total_seconds())) # type: ignore[arg-type]
minutes, seconds = divmod(remaining, 60)
self.alarm_list.insert('end', f"{alarm['title']} - {minutes:02d}:{seconds:02d}")
if selected_id is not None and alarm['id'] == selected_id:
self.alarm_list.selection_set(index)
self.alarm_status.config(text=f"{len(self.active_alarms)} alarma(s) activas")
def _trigger_alarm(self, alarm: dict[str, datetime.datetime | str]) -> None:
self._log(f"Alarma '{alarm['title']}' activada")
try:
self.bell()
except Exception:
pass
self._show_alarm_popup(str(alarm['title']))
def _show_alarm_popup(self, title: str) -> None:
popup = tk.Toplevel(self)
popup.title('Alarma activa')
popup.configure(bg='#ffeaea', padx=20, pady=20)
popup.transient(self)
popup.grab_set()
tk.Label(popup, text='¡Tiempo cumplido!', font=('Arial', 16, 'bold'), fg='#c0392b', bg='#ffeaea').pack(pady=6)
tk.Label(popup, text=title, font=('Arial', 13), bg='#ffeaea').pack(pady=4)
tk.Button(popup, text='Detener alarma', command=popup.destroy, bg='#f9dede').pack(pady=10)
def _cancel_selected_alarm(self) -> None:
if not self.active_alarms:
messagebox.showinfo('Alarmas', 'No hay alarmas para cancelar.')
return
selection = self.alarm_list.curselection()
if not selection:
messagebox.showinfo('Alarmas', 'Selecciona una alarma en la lista.')
return
idx = selection[0]
if idx >= len(self.active_alarms):
messagebox.showwarning('Alarmas', 'La selección ya no es válida.')
return
alarm = self.active_alarms.pop(idx)
self._update_alarm_list()
self._log(f"Alarma '{alarm['title']}' cancelada")
def _select_music(self) -> None:
if not pygame_available:
messagebox.showwarning('Música', 'Instale pygame para reproducir audio.')
return
path = filedialog.askopenfilename(
filetypes=[
('Audio/Video', '*.mp3 *.wav *.ogg *.mp4 *.mkv *.mov *.avi *.m4a *.webm'),
('Todos', '*.*')
]
)
if not path:
return
self._cleanup_temp_audio()
ext = os.path.splitext(path)[1].lower()
direct_audio = {'.mp3', '.wav', '.ogg', '.m4a'}
playable_path = path
temp_path = None
if ext not in direct_audio:
tmp = tempfile.NamedTemporaryFile(delete=False, suffix='.wav')
temp_path = tmp.name
tmp.close()
cmd = ['ffmpeg', '-y', '-i', path, '-vn', '-acodec', 'pcm_s16le', '-ar', '44100', '-ac', '2', temp_path]
try:
# Usar subprocess.run para esperar la finalización de ffmpeg
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
playable_path = temp_path
except FileNotFoundError:
if os.path.exists(temp_path):
os.unlink(temp_path)
messagebox.showerror('Música', 'ffmpeg no está instalado. Instálalo para extraer el audio de videos.')
return
except subprocess.CalledProcessError as exc:
if os.path.exists(temp_path):
os.unlink(temp_path)
messagebox.showerror('Música', f'No se pudo extraer el audio: {exc}')
return
pygame.mixer.init()
pygame.mixer.music.load(playable_path)
pygame.mixer.music.play(-1)
self.music_temp_file = temp_path
self._log(f'Reproduciendo {os.path.basename(path)}')
def _stop_music(self) -> None:
if pygame_available and pygame.mixer.get_init():
pygame.mixer.music.stop()
self._cleanup_temp_audio()
def _pause_music(self) -> None:
if pygame_available and pygame.mixer.get_init():
pygame.mixer.music.pause()
def _resume_music(self) -> None:
if pygame_available and pygame.mixer.get_init():
pygame.mixer.music.unpause()
def _cleanup_temp_audio(self) -> None:
if self.music_temp_file and os.path.exists(self.music_temp_file):
try:
os.remove(self.music_temp_file)
except OSError:
pass
self.music_temp_file = None
def _resolve_openweather_key(self) -> str:
key = (self.weather_api_key or '').strip()
if not key:
raise RuntimeError('Configure OPENWEATHER_API_KEY u OPENWEATHER_FALLBACK_API_KEY')
return key
def _fetch_weather_by_coordinates(self, lat: float, lon: float) -> dict[str, Any]:
if not REQUESTS_AVAILABLE:
raise RuntimeError('Instale requests para consultar el clima')
key = self._resolve_openweather_key()
params = {
'lat': lat,
'lon': lon,
'appid': key,
'units': 'metric',
'lang': 'es'
}
resp = requests.get('https://api.openweathermap.org/data/2.5/weather', params=params, timeout=8)
resp.raise_for_status()
data = resp.json()
if 'main' not in data or 'weather' not in data:
raise ValueError('Respuesta incompleta de OpenWeatherMap')
return data
def _fetch_weather_data(self, city_query: str) -> dict[str, Any]:
if not REQUESTS_AVAILABLE:
raise RuntimeError('Instale requests para consultar el clima')
key = self._resolve_openweather_key()
params = {
'q': city_query,
'appid': key,
'units': 'metric',
'lang': 'es'
}
resp = requests.get('https://api.openweathermap.org/data/2.5/weather', params=params, timeout=8)
resp.raise_for_status()
data = resp.json()
if 'main' not in data or 'weather' not in data:
raise ValueError('Respuesta incompleta de OpenWeatherMap')
return data
def _fetch_javea_weather(self) -> dict[str, Any]:
return self._fetch_weather_by_coordinates(JAVEA_LATITUDE, JAVEA_LONGITUDE)
# ------------------------ chat ------------------------
def _connect_chat(self) -> None:
host = self.host_entry.get().strip() or SERVER_HOST_DEFAULT
try:
port = int(self.port_entry.get().strip())
except ValueError:
port = SERVER_PORT_DEFAULT
if self.chat_client.connect(host, port):
self._enqueue_chat_message('[INFO] Conectado al servidor')
def _send_chat(self) -> None:
text = self.message_entry.get('1.0', 'end').strip()
if not text:
return
if self.chat_client.send(text + '\n'):
self._enqueue_chat_message('Yo: ' + text)
self.message_entry.delete('1.0', 'end')
else:
messagebox.showwarning('Chat', 'No hay conexión activa.')
def _enqueue_chat_message(self, text: str) -> None:
self._chat_queue.put(text)
def _chat_loop(self) -> None:
while self._running:
try:
msg = self._chat_queue.get(timeout=0.5)
except queue.Empty:
continue
self.chat_display.after(0, lambda m=msg: self._append_chat(m))
def _append_chat(self, text: str) -> None:
self.chat_display.config(state='normal')
self.chat_display.insert('end', text + '\n')
self.chat_display.see('end')
self.chat_display.config(state='disabled')
# ------------------------ hilos auxiliares ------------------------
def _update_clock(self) -> None:
if not self._running:
return
now = datetime.datetime.now().strftime('%d/%m/%Y %H:%M:%S')
self.clock_label.config(text=now)
self.after(1000, self._update_clock)
def _update_traffic(self) -> None:
if not (self._running and psutil):
return
if self._traffic_last is None:
try:
self._traffic_last = psutil.net_io_counters()
except Exception as exc:
self._log(f'Tráfico: psutil no disponible ({exc})')
self.traffic_label.config(text='Red: N/D')
return
try:
current = psutil.net_io_counters()
sent = (current.bytes_sent - self._traffic_last.bytes_sent) / 1024
recv = (current.bytes_recv - self._traffic_last.bytes_recv) / 1024
self._traffic_last = current
self.traffic_label.config(text=f'Red: ↑ {sent:.1f} KB/s ↓ {recv:.1f} KB/s')
except Exception as exc:
self._log(f'Tráfico: {exc}')
self.traffic_label.config(text='Red: N/D')
finally:
if self._running:
self.after(1000, self._update_traffic)
def _resource_poll_tick(self) -> None:
if not (self._running and psutil):
return
try:
cpu = psutil.cpu_percent(interval=None)
mem = psutil.virtual_memory().percent
except Exception as exc:
self._log(f'Recursos: {exc}')
if self._running:
self._resource_poll_job = self.after(2000, self._resource_poll_tick)
return
threads = 0
if self._ps_process:
try:
threads = self._ps_process.num_threads()
except Exception:
threads = 0
self._resource_history['cpu'].append(cpu)
self._resource_history['mem'].append(mem)
self._resource_history['threads'].append(threads)
self._resource_history['cpu'] = self._resource_history['cpu'][-40:]
self._resource_history['mem'] = self._resource_history['mem'][-40:]
self._resource_history['threads'] = self._resource_history['threads'][-40:]
self._update_chart()
if self._running:
self._resource_poll_job = self.after(1000, self._resource_poll_tick)
def _update_chart(self) -> None:
if not (
MATPLOTLIB_AVAILABLE
and self.chart_canvas
and self.line_cpu
and self.line_mem
and self.ax_cpu
and self.ax_mem
):
return
x = list(range(len(self._resource_history['cpu'])))
self.line_cpu.set_data(x, self._resource_history['cpu'])
self.line_mem.set_data(x, self._resource_history['mem'])
max_x = max(40, len(x))
self.ax_cpu.set_xlim(0, max_x)
self.ax_mem.set_xlim(0, max_x)
self.ax_cpu.set_ylim(0, 100)
self.ax_mem.set_ylim(0, 100)
if self.mem_fill:
try:
self.mem_fill.remove()
except Exception:
pass
if self.ax_mem and x:
self.mem_fill = self.ax_mem.fill_between(
x,
self._resource_history['mem'],
color='#4ecdc4',
alpha=0.3
)
if self.ax_threads:
threads = self._resource_history['threads']
if threads:
if self.thread_bars and len(self.thread_bars) == len(threads):
for rect, height in zip(self.thread_bars, threads):
rect.set_height(height)
else:
if self.thread_bars:
for rect in self.thread_bars:
rect.remove()
self.thread_bars = self.ax_threads.bar(x, threads, color='#ffa726') if x else None
if threads:
max_threads = max(threads)
self.ax_threads.set_ylim(0, max(max_threads * 1.2, 1))
self.ax_threads.set_xlim(-0.5, max(39.5, len(x) - 0.5))
if self.chart_canvas:
self.chart_canvas.draw_idle()
def _log(self, text: str) -> None:
if threading.current_thread() is not threading.main_thread():
self.after(0, lambda t=text: self._log(t))
return
timestamp = datetime.datetime.now().strftime('%H:%M:%S')
self.notes.insert('end', f'[{timestamp}] {text}\n')
self.notes.see('end')
def on_close(self) -> None:
self._running = False
if self._resource_poll_job is not None:
try:
self.after_cancel(self._resource_poll_job)
except Exception:
pass
self._resource_poll_job = None
self.chat_client.close()
self.destroy()
def main() -> None:
app = DashboardApp()
app.protocol('WM_DELETE_WINDOW', app.on_close)
app.mainloop()
if __name__ == '__main__':
main()