2188 lines
91 KiB
Python
2188 lines
91 KiB
Python
#!/usr/bin/env python3
|
|
"""Dashboard principal del Proyecto Global."""
|
|
from __future__ import annotations
|
|
|
|
import datetime
|
|
import json
|
|
import os
|
|
import shutil
|
|
import queue
|
|
import random
|
|
import socket
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import threading
|
|
import time
|
|
import webbrowser
|
|
from dataclasses import dataclass
|
|
from typing import Any, Callable
|
|
|
|
import tkinter as tk
|
|
from tkinter import filedialog, messagebox, scrolledtext, ttk
|
|
from tkinter import font as tkfont
|
|
|
|
# -------------------- dependencias opcionales --------------------
|
|
try:
|
|
import psutil # type: ignore
|
|
except Exception: # pragma: no cover
|
|
psutil = None # type: ignore
|
|
|
|
try:
|
|
import matplotlib
|
|
matplotlib.use('TkAgg')
|
|
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
|
|
from matplotlib.figure import Figure
|
|
MATPLOTLIB_AVAILABLE = True
|
|
except Exception: # pragma: no cover
|
|
MATPLOTLIB_AVAILABLE = False
|
|
Figure = None
|
|
FigureCanvasTkAgg = None
|
|
|
|
try:
|
|
import pygame # type: ignore
|
|
pygame_available = True
|
|
except Exception: # pragma: no cover
|
|
pygame_available = False
|
|
|
|
try:
|
|
import requests # type: ignore
|
|
REQUESTS_AVAILABLE = True
|
|
except Exception: # pragma: no cover
|
|
requests = None # type: ignore
|
|
REQUESTS_AVAILABLE = False
|
|
|
|
try:
|
|
from bs4 import BeautifulSoup
|
|
BEAUTIFULSOUP_AVAILABLE = True
|
|
except Exception: # pragma: no cover
|
|
BEAUTIFULSOUP_AVAILABLE = False
|
|
|
|
SERVER_HOST_DEFAULT = '127.0.0.1'
|
|
SERVER_PORT_DEFAULT = 3333
|
|
# Clave predeterminada facilitada por el usuario para OpenWeather.
|
|
# Puede sobrescribirse exportando OPENWEATHER_API_KEY en el entorno.
|
|
OPENWEATHER_FALLBACK_API_KEY = os.environ.get(
|
|
'OPENWEATHER_FALLBACK_API_KEY',
|
|
'431239407e3628578c83e67180cf720f'
|
|
).strip()
|
|
_OPENWEATHER_ENV_KEY = os.environ.get('OPENWEATHER_API_KEY', '').strip()
|
|
OPENWEATHER_API_KEY = _OPENWEATHER_ENV_KEY or OPENWEATHER_FALLBACK_API_KEY
|
|
JAVEA_LATITUDE = 38.789166
|
|
JAVEA_LONGITUDE = 0.163055
|
|
|
|
# -------------------- paleta visual --------------------
|
|
PRIMARY_BG = '#f4f5fb'
|
|
PANEL_BG = '#ffffff'
|
|
SECONDARY_BG = '#eef2ff'
|
|
ACCENT_COLOR = '#ff6b6b'
|
|
ACCENT_DARK = '#c44569'
|
|
TEXT_COLOR = '#1f2d3d'
|
|
SUBTEXT_COLOR = '#67708a'
|
|
BUTTON_BG = '#e7ecff'
|
|
BUTTON_ACTIVE_BG = '#ffd6d1'
|
|
|
|
|
|
def _env_float(name: str, default: float) -> float:
|
|
value = os.environ.get(name)
|
|
if value is None:
|
|
return default
|
|
try:
|
|
return float(value)
|
|
except ValueError:
|
|
return default
|
|
WALLAPOP_DEVICE_ID = os.environ.get('WALLAPOP_DEVICE_ID', '48ca24ec-2e8c-4dbb-9f0b-6b4f99194626').strip()
|
|
WALLAPOP_APP_VERSION = os.environ.get('WALLAPOP_APP_VERSION', '814060').strip()
|
|
WALLAPOP_MPID = os.environ.get('WALLAPOP_MPID', '6088013701866267655').strip()
|
|
WALLAPOP_DEVICE_OS = os.environ.get('WALLAPOP_DEVICE_OS', '0').strip()
|
|
WALLAPOP_USER_AGENT = os.environ.get(
|
|
'WALLAPOP_USER_AGENT',
|
|
'Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) '
|
|
'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Mobile Safari/537.36'
|
|
).strip()
|
|
WALLAPOP_SEC_CH_UA = os.environ.get('WALLAPOP_SEC_CH_UA', '"Not_A Brand";v="99", "Chromium";v="142"')
|
|
WALLAPOP_SEC_CH_UA_MOBILE = os.environ.get('WALLAPOP_SEC_CH_UA_MOBILE', '?1')
|
|
WALLAPOP_SEC_CH_UA_PLATFORM = os.environ.get('WALLAPOP_SEC_CH_UA_PLATFORM', '"Android"')
|
|
WALLAPOP_LATITUDE = _env_float('WALLAPOP_LATITUDE', 40.416775)
|
|
WALLAPOP_LONGITUDE = _env_float('WALLAPOP_LONGITUDE', -3.70379)
|
|
WALLAPOP_COUNTRY_CODE = os.environ.get('WALLAPOP_COUNTRY_CODE', 'ES').strip() or 'ES'
|
|
WALLAPOP_LANGUAGE = os.environ.get('WALLAPOP_LANGUAGE', 'es').strip() or 'es'
|
|
WALLAPOP_CATEGORY_ID = os.environ.get('WALLAPOP_CATEGORY_ID', '').strip()
|
|
WALLAPOP_REFERER = os.environ.get('WALLAPOP_REFERER', 'https://es.wallapop.com/').strip()
|
|
|
|
WALLAPOP_HEADERS = {
|
|
'Accept': 'application/json, text/plain, */*',
|
|
'Accept-Language': 'es,es-ES;q=0.9',
|
|
'Referer': WALLAPOP_REFERER,
|
|
'User-Agent': WALLAPOP_USER_AGENT,
|
|
'x-deviceid': WALLAPOP_DEVICE_ID,
|
|
'x-deviceos': WALLAPOP_DEVICE_OS,
|
|
'deviceos': WALLAPOP_DEVICE_OS,
|
|
'x-appversion': WALLAPOP_APP_VERSION,
|
|
'mpid': WALLAPOP_MPID,
|
|
'sec-ch-ua': WALLAPOP_SEC_CH_UA,
|
|
'sec-ch-ua-mobile': WALLAPOP_SEC_CH_UA_MOBILE,
|
|
'sec-ch-ua-platform': WALLAPOP_SEC_CH_UA_PLATFORM
|
|
}
|
|
|
|
|
|
|
|
class ChatClient:
|
|
"""Cliente TCP básico."""
|
|
|
|
def __init__(self, on_message):
|
|
self._on_message = on_message
|
|
self._sock: socket.socket | None = None
|
|
self._lock = threading.Lock()
|
|
self._connected = False
|
|
|
|
def connect(self, host: str, port: int) -> bool:
|
|
with self._lock:
|
|
if self._connected:
|
|
return True
|
|
try:
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sock.connect((host, port))
|
|
self._sock = sock
|
|
self._connected = True
|
|
threading.Thread(target=self._recv_loop, daemon=True).start()
|
|
return True
|
|
except Exception as exc:
|
|
self._on_message(f'[ERROR] Conexión fallida: {exc}')
|
|
return False
|
|
|
|
def _recv_loop(self):
|
|
try:
|
|
while self._connected and self._sock:
|
|
data = self._sock.recv(4096)
|
|
if not data:
|
|
break
|
|
text = data.decode('utf-8', errors='replace').strip()
|
|
self._on_message(text)
|
|
except Exception as exc:
|
|
self._on_message(f'[ERROR] {exc}')
|
|
finally:
|
|
with self._lock:
|
|
self._connected = False
|
|
if self._sock:
|
|
try:
|
|
self._sock.close()
|
|
except Exception:
|
|
pass
|
|
self._sock = None
|
|
self._on_message('[INFO] Conexión cerrada')
|
|
|
|
def send(self, text: str) -> bool:
|
|
with self._lock:
|
|
if not self._connected or not self._sock:
|
|
return False
|
|
try:
|
|
self._sock.sendall(text.encode('utf-8'))
|
|
return True
|
|
except Exception:
|
|
self._connected = False
|
|
return False
|
|
|
|
def close(self):
|
|
with self._lock:
|
|
self._connected = False
|
|
if self._sock:
|
|
try:
|
|
self._sock.close()
|
|
except Exception:
|
|
pass
|
|
self._sock = None
|
|
|
|
|
|
class DashboardApp(tk.Tk):
|
|
def __init__(self) -> None:
|
|
super().__init__()
|
|
self.title('Panel de laboratorio - Proyecto Global')
|
|
self.geometry('1220x740')
|
|
self.minsize(1024, 660)
|
|
self.configure(bg=PRIMARY_BG)
|
|
self._setup_fonts()
|
|
self._maximize_with_borders()
|
|
|
|
self.columnconfigure(0, weight=0)
|
|
self.columnconfigure(1, weight=1)
|
|
self.columnconfigure(2, weight=0)
|
|
self.rowconfigure(0, weight=0)
|
|
self.rowconfigure(1, weight=1)
|
|
self.rowconfigure(2, weight=0)
|
|
|
|
self._running = True
|
|
self._chat_queue: 'queue.Queue[str]' = queue.Queue()
|
|
self._scraping_queue: 'queue.Queue[tuple[str, ...]]' = queue.Queue()
|
|
self._traffic_last = psutil.net_io_counters() if psutil else None
|
|
self._resource_history = {'cpu': [], 'mem': [], 'threads': []}
|
|
self._resource_poll_job: str | None = None
|
|
|
|
self.chat_client = ChatClient(self._enqueue_chat_message)
|
|
self.alarm_counter = 1
|
|
self.active_alarms: list[dict[str, datetime.datetime | str]] = []
|
|
self.game_window_canvas = None
|
|
self.game_window_status = None
|
|
self.game_move_queue: queue.Queue | None = None
|
|
self.game_queue_processor_active = False
|
|
self.game_done_event: threading.Event | None = None
|
|
self.game_finish_line = 0
|
|
self.game_racer_names: list[str] = []
|
|
self.game_camel_count = 3
|
|
self.game_speed_factor = 1.0
|
|
self.music_temp_file: str | None = None
|
|
self.weather_api_key = OPENWEATHER_API_KEY
|
|
self.results_area: tk.Frame | None = None
|
|
self.results_title: tk.Label | None = None
|
|
self.scraping_popup: tk.Toplevel | None = None
|
|
self.simple_scraping_popup: tk.Toplevel | None = None
|
|
self.weather_popup: tk.Toplevel | None = None
|
|
self.chart_canvas = None
|
|
self.ax_cpu = None
|
|
self.ax_mem = None
|
|
self.ax_threads = None
|
|
self.line_cpu = None
|
|
self.line_mem = None
|
|
self.mem_fill = None
|
|
self.thread_bars = None
|
|
if psutil:
|
|
try:
|
|
self._ps_process = psutil.Process(os.getpid())
|
|
except Exception:
|
|
self._ps_process = None
|
|
else:
|
|
self._ps_process = None
|
|
|
|
self._build_header()
|
|
self._build_left_panel()
|
|
self._build_center_panel()
|
|
self._build_right_panel()
|
|
self._build_status_bar()
|
|
self._update_clock()
|
|
|
|
if psutil:
|
|
self.after(1000, self._update_traffic)
|
|
try:
|
|
psutil.cpu_percent(interval=None)
|
|
except Exception:
|
|
pass
|
|
self._resource_poll_job = self.after(1000, self._resource_poll_tick)
|
|
threading.Thread(target=self._chat_loop, daemon=True).start()
|
|
self.after(100, self._process_scraping_queue)
|
|
self.after(1000, self._refresh_alarms_loop)
|
|
|
|
# ------------------------ UI ------------------------
|
|
def _maximize_with_borders(self) -> None:
|
|
def _apply_zoom():
|
|
# Intentar diferentes atributos para maximizar/pantalla completa en diferentes OS
|
|
for attr in ('-zoomed',):
|
|
try:
|
|
self.attributes(attr, True)
|
|
return
|
|
except tk.TclError:
|
|
continue
|
|
try:
|
|
self.state('zoomed')
|
|
except tk.TclError:
|
|
pass
|
|
|
|
self.after(0, _apply_zoom)
|
|
|
|
def _setup_fonts(self) -> None:
|
|
base_family = 'Segoe UI'
|
|
self.font_title = tkfont.Font(family=base_family, size=16, weight='bold')
|
|
self.font_header = tkfont.Font(family=base_family, size=13, weight='bold')
|
|
self.font_body = tkfont.Font(family=base_family, size=11)
|
|
self.font_small = tkfont.Font(family=base_family, size=10)
|
|
|
|
def _build_header(self) -> None:
|
|
header = tk.Frame(self, bg=PANEL_BG, bd=0, highlightthickness=0)
|
|
header.grid(row=0, column=0, columnspan=3, sticky='new')
|
|
header.grid_columnconfigure(tuple(range(7)), weight=1)
|
|
tabs = [
|
|
('T1. Procesos', '#4c6ef5'),
|
|
('T2. Threads', '#ff6b6b'),
|
|
('T3. Sockets', '#ffa94d'),
|
|
('T4. Servicios', '#51cf66'),
|
|
('T5. Seguridad', '#845ef7'),
|
|
('Configuración', '#2f3545')
|
|
]
|
|
for idx, (text, color) in enumerate(tabs):
|
|
badge = tk.Label(
|
|
header,
|
|
text=text,
|
|
fg=color,
|
|
bg=PANEL_BG,
|
|
font=self.font_title,
|
|
padx=16,
|
|
pady=8
|
|
)
|
|
badge.grid(row=0, column=idx, padx=6, pady=8)
|
|
|
|
def _build_left_panel(self) -> None:
|
|
panel = tk.Frame(self, width=220, bg=SECONDARY_BG, bd=0, highlightthickness=0)
|
|
panel.grid(row=1, column=0, sticky='nsw', padx=6, pady=(50,6))
|
|
panel.grid_propagate(False)
|
|
self.left_panel = panel
|
|
|
|
self._left_section('Acciones')
|
|
self._left_button('Analizar Wallapop', self._open_wallapop_popup)
|
|
self._left_button('Scraping simple', self._open_simple_scraping_popup)
|
|
self._left_button('Navegar', lambda: self._open_web('https://www.google.com'))
|
|
self._left_button('API Tiempo', self._show_weather_popup)
|
|
|
|
self._left_section('Aplicaciones')
|
|
self._left_button('Visual Code', lambda: self._launch_process(['code']))
|
|
self._left_button('Camellos', self._open_game_window)
|
|
self._left_button('App3', lambda: self._launch_process(['firefox']))
|
|
|
|
self._left_section('Procesos batch')
|
|
self._left_button('Copias de seguridad', self._run_backup_script)
|
|
|
|
def _left_section(self, text: str) -> None:
|
|
tk.Label(self.left_panel, text=text.upper(), bg=SECONDARY_BG, fg=SUBTEXT_COLOR, font=self.font_small).pack(anchor='w', padx=16, pady=(16,4))
|
|
|
|
def _left_button(self, text: str, command) -> None:
|
|
btn = tk.Button(
|
|
self.left_panel,
|
|
text=text,
|
|
width=20,
|
|
bg=BUTTON_BG,
|
|
activebackground=BUTTON_ACTIVE_BG,
|
|
relief='flat',
|
|
command=command
|
|
)
|
|
btn.pack(fill='x', padx=16, pady=4)
|
|
|
|
def _build_center_panel(self) -> None:
|
|
center = tk.Frame(self, bg=PANEL_BG, bd=0)
|
|
center.grid(row=1, column=1, sticky='nsew', padx=6, pady=(50,6))
|
|
center.rowconfigure(1, weight=1)
|
|
center.columnconfigure(0, weight=1)
|
|
self.center_panel = center
|
|
|
|
self._build_notebook()
|
|
self._build_notes_panel()
|
|
|
|
def _build_notebook(self) -> None:
|
|
style = ttk.Style()
|
|
style.configure('Custom.TNotebook', background=PANEL_BG, borderwidth=0)
|
|
style.configure('Custom.TNotebook.Tab', padding=(16, 8), font=self.font_body)
|
|
style.map('Custom.TNotebook.Tab', background=[('selected', '#dde2ff')])
|
|
|
|
notebook = ttk.Notebook(self.center_panel, style='Custom.TNotebook')
|
|
notebook.grid(row=0, column=0, sticky='nsew', padx=4, pady=4)
|
|
self.notebook = notebook
|
|
|
|
self.tab_resultados = tk.Frame(notebook, bg='white')
|
|
self.tab_navegador = tk.Frame(notebook, bg='white')
|
|
self.tab_correos = tk.Frame(notebook, bg='white')
|
|
self.tab_tareas = tk.Frame(notebook, bg='white')
|
|
self.tab_alarmas = tk.Frame(notebook, bg='white')
|
|
self.tab_enlaces = tk.Frame(notebook, bg='white')
|
|
self.tab_bloc = tk.Frame(notebook, bg='white')
|
|
|
|
notebook.add(self.tab_resultados, text='Resultados')
|
|
notebook.add(self.tab_navegador, text='Navegador')
|
|
notebook.add(self.tab_correos, text='Correos')
|
|
notebook.add(self.tab_bloc, text='Bloc de notas')
|
|
notebook.add(self.tab_tareas, text='Tareas')
|
|
notebook.add(self.tab_alarmas, text='Alarmas')
|
|
notebook.add(self.tab_enlaces, text='Enlaces')
|
|
|
|
self._build_tab_resultados()
|
|
self._build_tab_navegador()
|
|
self._build_tab_correos()
|
|
self._build_tab_bloc_notas()
|
|
self._build_tab_tareas()
|
|
self._build_tab_alarmas()
|
|
self._build_tab_enlaces()
|
|
|
|
def _build_tab_resultados(self) -> None:
|
|
wrapper = tk.Frame(self.tab_resultados, bg=PANEL_BG)
|
|
wrapper.pack(fill='both', expand=True)
|
|
self.results_title = tk.Label(wrapper, text='Resultados de actividades', font=self.font_header, bg=PANEL_BG, fg=TEXT_COLOR)
|
|
self.results_title.pack(pady=(12, 4))
|
|
|
|
toolbar = tk.Frame(wrapper, bg=PANEL_BG)
|
|
toolbar.pack(fill='x', padx=12, pady=(0, 4))
|
|
tk.Button(toolbar, text='Ver monitor del sistema', command=self._show_resource_monitor, bg=BUTTON_BG, relief='flat').pack(side='left', padx=(0, 8))
|
|
tk.Button(toolbar, text='Limpiar resultados', command=self._reset_results_view, bg=BUTTON_BG, relief='flat').pack(side='left')
|
|
|
|
self.results_area = tk.Frame(wrapper, bg='#f4f7ff', bd=1, relief='solid')
|
|
self.results_area.pack(fill='both', expand=True, padx=12, pady=(0, 12))
|
|
|
|
self._reset_results_view()
|
|
|
|
def _prepare_results_area(self) -> None:
|
|
self._stop_game_queue_processor()
|
|
if not self.results_area:
|
|
return
|
|
for child in self.results_area.winfo_children():
|
|
child.destroy()
|
|
self.chart_canvas = None
|
|
self.ax_cpu = None
|
|
self.ax_mem = None
|
|
self.ax_threads = None
|
|
self.line_cpu = None
|
|
self.line_mem = None
|
|
self.mem_fill = None
|
|
self.thread_bars = None
|
|
self.game_window_canvas = None
|
|
self.game_window_status = None
|
|
|
|
def _reset_results_view(self) -> None:
|
|
self._prepare_results_area()
|
|
if self.results_title:
|
|
self.results_title.config(text='Resultados de actividades')
|
|
if self.results_area:
|
|
tk.Label(
|
|
self.results_area,
|
|
text='Ejecute una actividad para ver aquí sus resultados más recientes.',
|
|
bg='#f4f7ff',
|
|
fg=SUBTEXT_COLOR,
|
|
font=self.font_body,
|
|
wraplength=520
|
|
).pack(expand=True, padx=12, pady=12)
|
|
|
|
def _render_results_view(self, title: str, builder: Callable[[tk.Frame], None]) -> None:
|
|
if self.results_title:
|
|
self.results_title.config(text=f'Resultados • {title}')
|
|
self._prepare_results_area()
|
|
if not self.results_area:
|
|
return
|
|
builder(self.results_area)
|
|
|
|
def _show_resource_monitor(self) -> None:
|
|
def builder(parent: tk.Frame) -> None:
|
|
if not (MATPLOTLIB_AVAILABLE and psutil):
|
|
tk.Label(
|
|
parent,
|
|
text='Instale matplotlib + psutil para habilitar el monitor del sistema.',
|
|
fg=ACCENT_COLOR,
|
|
bg='#f4f7ff',
|
|
font=self.font_body
|
|
).pack(fill='both', expand=True, padx=20, pady=20)
|
|
return
|
|
chart_frame = tk.Frame(parent, bg=PANEL_BG)
|
|
chart_frame.pack(fill='both', expand=True, padx=12, pady=12)
|
|
fig = Figure(figsize=(7.2, 5.6), dpi=100)
|
|
self.ax_cpu = fig.add_subplot(311)
|
|
self.ax_mem = fig.add_subplot(312)
|
|
self.ax_threads = fig.add_subplot(313)
|
|
|
|
self.ax_cpu.set_title('CPU (línea)')
|
|
self.ax_cpu.set_ylim(0, 100)
|
|
self.ax_cpu.set_ylabel('%')
|
|
self.ax_cpu.grid(True, alpha=0.25)
|
|
self.line_cpu, = self.ax_cpu.plot([], [], label='CPU', color='#ff6b6b', linewidth=2)
|
|
|
|
self.ax_mem.set_title('Memoria (área)')
|
|
self.ax_mem.set_ylim(0, 100)
|
|
self.ax_mem.set_ylabel('%')
|
|
self.ax_mem.grid(True, alpha=0.2)
|
|
self.line_mem, = self.ax_mem.plot([], [], color='#4ecdc4', linewidth=1.5)
|
|
|
|
self.ax_threads.set_title('Hilos del proceso (barras)')
|
|
self.ax_threads.set_ylabel('Hilos')
|
|
self.ax_threads.grid(True, axis='y', alpha=0.2)
|
|
|
|
fig.tight_layout(pad=2.2)
|
|
self.chart_canvas = FigureCanvasTkAgg(fig, master=chart_frame)
|
|
self.chart_canvas.get_tk_widget().pack(fill='both', expand=True)
|
|
self.ax_cpu.set_xlim(0, 40)
|
|
self.ax_mem.set_xlim(0, 40)
|
|
self.ax_threads.set_xlim(-0.5, 39.5)
|
|
self.chart_canvas.draw_idle()
|
|
|
|
self._render_results_view('Monitoreo del sistema', builder)
|
|
|
|
def _show_text_result(self, title: str, text: str) -> None:
|
|
def builder(parent: tk.Frame) -> None:
|
|
frame = tk.Frame(parent, bg='white')
|
|
frame.pack(fill='both', expand=True, padx=12, pady=12)
|
|
box = scrolledtext.ScrolledText(frame, height=12)
|
|
box.pack(fill='both', expand=True)
|
|
box.insert('1.0', text)
|
|
box.config(state='disabled')
|
|
|
|
self._render_results_view(title, builder)
|
|
|
|
def _format_weather_summary(self, data: dict[str, Any]) -> str:
|
|
main = data.get('main', {})
|
|
weather = data.get('weather', [{}])[0]
|
|
wind = data.get('wind', {})
|
|
temp = main.get('temp')
|
|
feels = main.get('feels_like')
|
|
humidity = main.get('humidity')
|
|
desc = weather.get('description', '').capitalize()
|
|
wind_speed = wind.get('speed')
|
|
timestamp = data.get('dt')
|
|
updated = datetime.datetime.fromtimestamp(timestamp).strftime('%d/%m %H:%M') if timestamp else 'N/D'
|
|
summary = [
|
|
f'Descripción : {desc or "N/D"}',
|
|
f'Temperatura : {temp:.1f}°C' if temp is not None else 'Temperatura : N/D',
|
|
f'Sensación : {feels:.1f}°C' if feels is not None else 'Sensación : N/D',
|
|
f'Humedad : {humidity}%' if humidity is not None else 'Humedad : N/D',
|
|
f'Veloc. viento: {wind_speed:.1f} m/s' if wind_speed is not None else 'Veloc. viento: N/D',
|
|
f'Actualizado : {updated}'
|
|
]
|
|
return '\n'.join(summary)
|
|
|
|
def _get_javea_weather_snapshot(self) -> tuple[str, str, bool]:
|
|
try:
|
|
data = self._fetch_javea_weather()
|
|
summary = self._format_weather_summary(data)
|
|
timestamp = data.get('dt')
|
|
status = 'Última lectura: '
|
|
status += datetime.datetime.fromtimestamp(timestamp).strftime('%d/%m %H:%M') if timestamp else 'N/D'
|
|
return summary, status, False
|
|
except Exception as exc:
|
|
return str(exc), 'No se pudo obtener el clima', True
|
|
|
|
def _show_weather_popup(self) -> None:
|
|
self._log('Abriendo ventana del tiempo para Jávea')
|
|
text, status, is_error = self._get_javea_weather_snapshot()
|
|
|
|
if self.weather_popup and self.weather_popup.winfo_exists():
|
|
self.weather_popup.destroy()
|
|
|
|
popup = tk.Toplevel(self)
|
|
popup.title('Tiempo en Jávea')
|
|
popup.geometry('760x500')
|
|
popup.minsize(760, 500)
|
|
popup.resizable(False, False)
|
|
popup.configure(bg='white')
|
|
popup.transient(self)
|
|
popup.grab_set()
|
|
self.weather_popup = popup
|
|
|
|
header = tk.Label(popup, text='OpenWeather • Jávea, España', font=self.font_header, bg='white', fg=TEXT_COLOR)
|
|
header.pack(pady=(12, 6))
|
|
info_label = tk.Label(
|
|
popup,
|
|
text=text,
|
|
justify='left',
|
|
font=('Consolas', 12),
|
|
anchor='nw',
|
|
bg='white',
|
|
fg='red' if is_error else TEXT_COLOR,
|
|
padx=6,
|
|
pady=6
|
|
)
|
|
info_label.pack(fill='both', expand=True, padx=18, pady=8)
|
|
status_label = tk.Label(popup, text=status, font=self.font_small, bg='white', fg=SUBTEXT_COLOR)
|
|
status_label.pack(pady=(0, 8))
|
|
|
|
def close_popup() -> None:
|
|
if popup.winfo_exists():
|
|
popup.destroy()
|
|
if self.weather_popup is popup:
|
|
self.weather_popup = None
|
|
|
|
def refresh() -> None:
|
|
new_text, new_status, new_is_error = self._get_javea_weather_snapshot()
|
|
info_label.config(text=new_text, fg='red' if new_is_error else '#1f2d3d')
|
|
status_label.config(text=new_status)
|
|
|
|
buttons = tk.Frame(popup, bg='white')
|
|
buttons.pack(pady=(4, 14))
|
|
tk.Button(buttons, text='Actualizar', command=refresh, bg=BUTTON_BG, relief='flat').pack(side='left', padx=10)
|
|
tk.Button(buttons, text='Cerrar', command=close_popup, bg=BUTTON_BG, relief='flat').pack(side='left', padx=10)
|
|
|
|
popup.protocol('WM_DELETE_WINDOW', close_popup)
|
|
|
|
def _build_tab_navegador(self) -> None:
|
|
tk.Label(self.tab_navegador, text='Abrir URL en navegador externo', bg='white').pack(pady=6)
|
|
frame = tk.Frame(self.tab_navegador, bg='white')
|
|
frame.pack(pady=6)
|
|
self.url_entry = tk.Entry(frame, width=48)
|
|
self.url_entry.insert(0, 'https://www.python.org')
|
|
self.url_entry.pack(side='left', padx=4)
|
|
tk.Button(frame, text='Abrir', command=lambda: self._open_web(self.url_entry.get())).pack(side='left', padx=4)
|
|
|
|
def _build_tab_correos(self) -> None:
|
|
tk.Label(
|
|
self.tab_correos,
|
|
text='Área futura para gestionar correos.'
|
|
'\nDe momento, utiliza la pestaña "Bloc de notas" para escribir.',
|
|
bg='white',
|
|
font=('Arial', 12),
|
|
justify='left'
|
|
).pack(pady=20)
|
|
|
|
def _build_tab_bloc_notas(self) -> None:
|
|
tk.Label(self.tab_bloc, text='Bloc de notas', bg='white', font=('Arial', 12, 'bold')).pack(pady=6)
|
|
toolbar = tk.Frame(self.tab_bloc, bg='white')
|
|
toolbar.pack(fill='x')
|
|
tk.Button(toolbar, text='Abrir', command=self._open_text).pack(side='left', padx=4, pady=4)
|
|
tk.Button(toolbar, text='Guardar', command=self._save_text).pack(side='left', padx=4, pady=4)
|
|
self.editor = scrolledtext.ScrolledText(self.tab_bloc, wrap='word')
|
|
self.editor.pack(fill='both', expand=True, padx=6, pady=6)
|
|
|
|
def _build_tab_tareas(self) -> None:
|
|
container = tk.Frame(self.tab_tareas, bg='white')
|
|
container.pack(fill='both', expand=True)
|
|
|
|
scraping_frame = tk.Frame(container, bg='white', padx=10, pady=10)
|
|
scraping_frame.pack(side='left', fill='both', expand=True)
|
|
tk.Label(scraping_frame, text='Análisis de enlaces Wallapop', bg='white', font=('Arial', 12, 'bold')).pack(pady=(12,4))
|
|
|
|
if not (REQUESTS_AVAILABLE and BEAUTIFULSOUP_AVAILABLE):
|
|
tk.Label(
|
|
scraping_frame,
|
|
text='Instala requests y beautifulsoup4 para habilitar el análisis de enlaces.',
|
|
bg='white',
|
|
fg='red',
|
|
wraplength=260,
|
|
justify='left'
|
|
).pack(fill='x', padx=10, pady=10)
|
|
else:
|
|
tk.Label(
|
|
scraping_frame,
|
|
text='Pulsa el botón "Analizar Wallapop" en el panel izquierdo para introducir la URL de un anuncio. '
|
|
'Aquí verás notas generales o recordatorios.',
|
|
bg='white',
|
|
wraplength=260,
|
|
justify='left'
|
|
).pack(fill='x', padx=10, pady=10)
|
|
|
|
game_frame = tk.Frame(container, bg='white')
|
|
game_frame.pack(side='left', fill='both', expand=True, padx=10)
|
|
tk.Label(game_frame, text='Juego de camellos (ventana dedicada)', bg='white', font=('Arial', 12, 'bold')).pack(pady=(6,2))
|
|
tk.Label(
|
|
game_frame,
|
|
text='Abre el simulador en una ventana independiente para ver la carrera a pantalla completa.',
|
|
wraplength=260,
|
|
justify='left',
|
|
bg='white'
|
|
).pack(padx=8, pady=8)
|
|
tk.Button(game_frame, text='Abrir juego', command=self._open_game_window).pack(pady=6)
|
|
|
|
def _build_tab_alarmas(self) -> None:
|
|
self.tab_alarmas.configure(bg=PANEL_BG)
|
|
|
|
card = tk.Frame(self.tab_alarmas, bg='#fef9f4', bd=0, padx=18, pady=18)
|
|
card.pack(pady=16, padx=20, fill='both', expand=True)
|
|
tk.Label(card, text='Programar nuevas alarmas', font=self.font_header, bg='#fef9f4', fg=TEXT_COLOR).pack(pady=(0,8))
|
|
|
|
form = tk.Frame(card, bg='#fef9f4')
|
|
form.pack(pady=6)
|
|
tk.Label(form, text='Minutos', bg='#fef9f4', font=self.font_body).grid(row=0, column=0, padx=6)
|
|
self.alarm_minutes = tk.Spinbox(form, from_=1, to=240, width=6, justify='center')
|
|
self.alarm_minutes.grid(row=0, column=1, padx=6)
|
|
tk.Label(form, text='Título', bg='#fef9f4', font=self.font_body).grid(row=0, column=2, padx=6)
|
|
self.alarm_title = tk.Entry(form, width=24)
|
|
self.alarm_title.grid(row=0, column=3, padx=6)
|
|
tk.Button(form, text='Agregar alarma', bg=BUTTON_BG, relief='flat', command=self._start_alarm).grid(row=0, column=4, padx=10)
|
|
|
|
tk.Label(card, text='Alarmas activas', font=self.font_body, bg='#fef9f4', fg=SUBTEXT_COLOR).pack(pady=(18,6))
|
|
self.alarm_list = tk.Listbox(card, height=8, width=50, bd=0, highlightthickness=1, highlightbackground='#e3e5f0')
|
|
self.alarm_list.pack(pady=4, fill='x')
|
|
tk.Button(card, text='Cancelar seleccionada', command=self._cancel_selected_alarm, bg=BUTTON_BG, relief='flat').pack(pady=(10,4))
|
|
self.alarm_status = tk.Label(card, text='Sin alarmas programadas', bg='#fef9f4', fg=SUBTEXT_COLOR, font=self.font_small)
|
|
self.alarm_status.pack(pady=4)
|
|
|
|
def _build_tab_enlaces(self) -> None:
|
|
tk.Label(self.tab_enlaces, text='Enlaces útiles', bg='white', font=('Arial', 12, 'bold')).pack(pady=6)
|
|
for text, url in [
|
|
('Documentación Tkinter', 'https://docs.python.org/3/library/tk.html'),
|
|
('psutil', 'https://psutil.readthedocs.io'),
|
|
('Matplotlib', 'https://matplotlib.org')
|
|
]:
|
|
tk.Button(self.tab_enlaces, text=text, command=lambda u=url: self._open_web(u)).pack(pady=3)
|
|
|
|
def _build_notes_panel(self) -> None:
|
|
notes = tk.LabelFrame(self.center_panel, text='Panel de notas e hilos', bg='#eefee8')
|
|
notes.grid(row=1, column=0, sticky='nsew', padx=6, pady=(4,6))
|
|
self.notes = scrolledtext.ScrolledText(notes, height=5)
|
|
self.notes.pack(fill='both', expand=True, padx=6, pady=6)
|
|
|
|
def _build_right_panel(self) -> None:
|
|
right = tk.Frame(self, width=280, bg=PANEL_BG, bd=0)
|
|
right.grid(row=1, column=2, sticky='nse', padx=6, pady=(50,6))
|
|
right.grid_propagate(False)
|
|
|
|
tk.Label(right, text='Chat', font=('Arial', 20, 'bold'), fg='red', bg='white').pack(pady=(6,4))
|
|
self.chat_display = scrolledtext.ScrolledText(right, width=30, height=12, state='disabled')
|
|
self.chat_display.pack(padx=6, pady=4)
|
|
|
|
tk.Label(right, text='Mensaje', bg='white').pack(anchor='w', padx=6)
|
|
self.message_entry = tk.Text(right, height=4)
|
|
self.message_entry.pack(padx=6, pady=4)
|
|
tk.Button(right, text='enviar', bg='#d6f2ce', command=self._send_chat).pack(pady=4)
|
|
|
|
conn = tk.Frame(right, bg='white')
|
|
conn.pack(pady=4)
|
|
tk.Label(conn, text='Host:', bg='white').grid(row=0, column=0)
|
|
self.host_entry = tk.Entry(conn, width=12)
|
|
self.host_entry.insert(0, SERVER_HOST_DEFAULT)
|
|
self.host_entry.grid(row=0, column=1)
|
|
tk.Label(conn, text='Puerto:', bg='white').grid(row=1, column=0)
|
|
self.port_entry = tk.Entry(conn, width=6)
|
|
self.port_entry.insert(0, str(SERVER_PORT_DEFAULT))
|
|
self.port_entry.grid(row=1, column=1)
|
|
tk.Button(conn, text='Conectar', command=self._connect_chat).grid(row=0, column=2, rowspan=2, padx=4)
|
|
|
|
tk.Label(right, text='Alumnos', font=('Arial', 14, 'bold'), bg='white').pack(pady=(10,4))
|
|
for name in ('Alumno 1', 'Alumno 2', 'Alumno 3'):
|
|
frame = tk.Frame(right, bg='white', bd=1, relief='groove')
|
|
frame.pack(fill='x', padx=6, pady=4)
|
|
tk.Label(frame, text=name, bg='white', font=('Arial', 11, 'bold')).pack(anchor='w')
|
|
tk.Label(frame, text='Lorem ipsum dolor sit amet, consectetur adipiscing elit.', wraplength=220, justify='left', bg='white').pack(anchor='w')
|
|
|
|
player = tk.Frame(right, bg='#fdf5f5', bd=1, relief='flat', padx=8, pady=8)
|
|
player.pack(fill='x', padx=8, pady=(10,6))
|
|
tk.Label(player, text='Reproductor música', font=self.font_header, bg='#fdf5f5', fg=ACCENT_DARK).pack(pady=(0,6))
|
|
|
|
def styled_btn(parent, text, command):
|
|
return tk.Button(
|
|
parent,
|
|
text=text,
|
|
command=command,
|
|
bg='#ffe3df',
|
|
activebackground='#ffd2ca',
|
|
fg=TEXT_COLOR,
|
|
relief='flat',
|
|
width=20,
|
|
pady=4
|
|
)
|
|
|
|
styled_btn(player, 'Seleccionar archivo', self._select_music).pack(pady=3)
|
|
action_row = tk.Frame(player, bg='#fdf5f5')
|
|
action_row.pack(pady=3)
|
|
styled_btn(action_row, 'Pausa', self._pause_music).pack(side='left', padx=4)
|
|
styled_btn(action_row, 'Reanudar', self._resume_music).pack(side='left', padx=4)
|
|
styled_btn(player, 'Quitar', self._stop_music).pack(pady=3)
|
|
|
|
def _build_status_bar(self) -> None:
|
|
status = tk.Frame(self, bg='#f1f1f1', bd=2, relief='ridge')
|
|
status.grid(row=2, column=0, columnspan=3, sticky='ew')
|
|
for idx in range(3):
|
|
status.columnconfigure(idx, weight=1)
|
|
|
|
tk.Label(status, text='Correos sin leer', font=('Arial', 11, 'bold'), bg='#f1f1f1').grid(row=0, column=0, padx=16, pady=6, sticky='w')
|
|
|
|
self.traffic_label = tk.Label(
|
|
status,
|
|
text='Red: ↑ --.- KB/s ↓ --.- KB/s',
|
|
font=('Arial', 11, 'bold'),
|
|
bg='#f1f1f1'
|
|
)
|
|
self.traffic_label.grid(row=0, column=1, padx=16, pady=6)
|
|
|
|
self.clock_label = tk.Label(status, text='--:--:--', font=('Arial', 12, 'bold'), bg='#f1f1f1')
|
|
self.clock_label.grid(row=0, column=2, padx=16, pady=6, sticky='e')
|
|
|
|
# ------------------------ acciones ------------------------
|
|
def _open_web(self, url: str) -> None:
|
|
webbrowser.open(url)
|
|
|
|
def _launch_process(self, cmd: list[str]) -> None:
|
|
try:
|
|
subprocess.Popen(cmd)
|
|
self._log(f'Proceso lanzado: {cmd}')
|
|
self._show_text_result('Aplicaciones', f'Se lanzó el proceso: {" ".join(cmd)}')
|
|
except Exception as exc:
|
|
messagebox.showerror('Error', f'No se pudo lanzar {cmd}: {exc}')
|
|
|
|
def _run_backup_script(self) -> None:
|
|
source = filedialog.askdirectory(title='Selecciona la carpeta a respaldar')
|
|
if not source:
|
|
return
|
|
destination = filedialog.askdirectory(title='Selecciona la carpeta destino para la copia')
|
|
if not destination:
|
|
return
|
|
|
|
ts = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
src_name = os.path.basename(os.path.normpath(source)) or 'backup'
|
|
target = os.path.join(destination, f'{src_name}_{ts}')
|
|
|
|
if os.path.exists(target):
|
|
messagebox.showerror('Backup', f'Ya existe la carpeta destino:\n{target}')
|
|
return
|
|
|
|
self._log(f'Iniciando copia de seguridad de {source} a {target}')
|
|
self._show_text_result('Copias de seguridad', f'Copiando archivos...\nOrigen: {source}\nDestino: {target}')
|
|
self.notebook.select(self.tab_resultados)
|
|
|
|
def worker() -> None:
|
|
try:
|
|
copied_files, copied_bytes, skipped = self._copy_directory_with_progress(source, target)
|
|
except Exception as exc:
|
|
self._scraping_queue.put(('error', f'Backup: {exc}'))
|
|
return
|
|
summary = (
|
|
f'Copia completada correctamente.\n\n'
|
|
f'Origen: {source}\nDestino: {target}\n'
|
|
f'Archivos copiados: {copied_files}\n'
|
|
f'Tamaño total: {copied_bytes / (1024*1024):.2f} MB'
|
|
)
|
|
if skipped:
|
|
summary += f"\nArchivos omitidos: {skipped} (consulta el panel de notas)"
|
|
self._scraping_queue.put(('backup_success', summary, target))
|
|
|
|
threading.Thread(target=worker, daemon=True).start()
|
|
|
|
def _copy_directory_with_progress(self, source: str, target: str) -> tuple[int, int, int]:
|
|
copied_files = 0
|
|
copied_bytes = 0
|
|
skipped = 0
|
|
for root, dirs, files in os.walk(source):
|
|
rel = os.path.relpath(root, source)
|
|
dest_dir = os.path.join(target, rel) if rel != '.' else target
|
|
os.makedirs(dest_dir, exist_ok=True)
|
|
for file in files:
|
|
src_path = os.path.join(root, file)
|
|
dest_path = os.path.join(dest_dir, file)
|
|
try:
|
|
shutil.copy2(src_path, dest_path)
|
|
copied_files += 1
|
|
try:
|
|
copied_bytes += os.path.getsize(dest_path)
|
|
except OSError:
|
|
pass
|
|
except Exception as exc:
|
|
skipped += 1
|
|
self._log(f'Archivo omitido ({src_path}): {exc}')
|
|
continue
|
|
if copied_files % 20 == 0:
|
|
self._log(f'Copia en progreso: {copied_files} archivos...')
|
|
return copied_files, copied_bytes, skipped
|
|
|
|
def _open_saved_file(self, path: str) -> None:
|
|
if not path or not os.path.exists(path):
|
|
messagebox.showwarning('Archivo', 'El archivo indicado ya no existe.')
|
|
return
|
|
try:
|
|
if sys.platform.startswith('win'):
|
|
os.startfile(path) # type: ignore[attr-defined]
|
|
elif sys.platform == 'darwin':
|
|
subprocess.Popen(['open', path])
|
|
else:
|
|
subprocess.Popen(['xdg-open', path])
|
|
except Exception as exc:
|
|
messagebox.showerror('Archivo', f'No se pudo abrir el archivo: {exc}')
|
|
|
|
def _show_file_popup(self, path: str) -> None:
|
|
popup = tk.Toplevel(self)
|
|
popup.title('Archivo generado')
|
|
popup.configure(bg='white', padx=16, pady=16)
|
|
popup.resizable(False, False)
|
|
popup.transient(self)
|
|
popup.grab_set()
|
|
tk.Label(popup, text='Se creó el archivo con los datos:', font=('Arial', 12, 'bold'), bg='white').pack(anchor='w')
|
|
entry = tk.Entry(popup, width=60)
|
|
entry.pack(fill='x', pady=8)
|
|
entry.insert(0, path)
|
|
entry.config(state='readonly')
|
|
buttons = tk.Frame(popup, bg='white')
|
|
buttons.pack(fill='x', pady=(8,0))
|
|
tk.Button(buttons, text='Abrir archivo', command=lambda p=path: self._open_saved_file(p)).pack(side='left', padx=4)
|
|
tk.Button(buttons, text='Cerrar', command=popup.destroy).pack(side='right', padx=4)
|
|
|
|
def _open_text(self) -> None:
|
|
path = filedialog.askopenfilename(filetypes=[('Texto', '*.txt'), ('Todos', '*.*')])
|
|
if not path:
|
|
return
|
|
with open(path, 'r', encoding='utf-8', errors='ignore') as fh:
|
|
self.editor.delete('1.0', 'end')
|
|
self.editor.insert('1.0', fh.read())
|
|
self._log(f'Abriste {path}')
|
|
|
|
def _save_text(self) -> None:
|
|
path = filedialog.asksaveasfilename(defaultextension='.txt')
|
|
if not path:
|
|
return
|
|
with open(path, 'w', encoding='utf-8') as fh:
|
|
fh.write(self.editor.get('1.0', 'end'))
|
|
self._log(f'Guardado en {path}')
|
|
|
|
def _open_simple_scraping_popup(self) -> None:
|
|
if not (REQUESTS_AVAILABLE and BEAUTIFULSOUP_AVAILABLE):
|
|
messagebox.showerror('Scraping', 'Instala requests y beautifulsoup4 para usar esta función.')
|
|
return
|
|
if self.simple_scraping_popup and tk.Toplevel.winfo_exists(self.simple_scraping_popup):
|
|
self.simple_scraping_popup.lift()
|
|
return
|
|
|
|
popup = tk.Toplevel(self)
|
|
popup.title('Scraping simple')
|
|
popup.configure(bg='white', padx=18, pady=18)
|
|
popup.resizable(False, False)
|
|
popup.transient(self)
|
|
popup.grab_set()
|
|
|
|
tk.Label(
|
|
popup,
|
|
text='Introduce una o varias URL completas (una por línea):',
|
|
bg='white',
|
|
font=('Arial', 11, 'bold'),
|
|
justify='left'
|
|
).pack(anchor='w')
|
|
|
|
url_box = scrolledtext.ScrolledText(popup, width=60, height=5)
|
|
url_box.pack(pady=(8, 4))
|
|
url_box.insert('1.0', 'https://es.wallapop.com\nhttps://www.wikipedia.org')
|
|
|
|
error_label = tk.Label(popup, text='', fg='red', bg='white', wraplength=420, justify='left')
|
|
error_label.pack(anchor='w')
|
|
|
|
btn_frame = tk.Frame(popup, bg='white')
|
|
btn_frame.pack(fill='x', pady=(10, 0))
|
|
|
|
def _close_popup() -> None:
|
|
if self.simple_scraping_popup:
|
|
try:
|
|
self.simple_scraping_popup.destroy()
|
|
except Exception:
|
|
pass
|
|
self.simple_scraping_popup = None
|
|
|
|
def _start(_: object | None = None) -> None:
|
|
raw = url_box.get('1.0', 'end').strip()
|
|
urls = [line.strip() for line in raw.splitlines() if line.strip()]
|
|
if not urls:
|
|
error_label.config(text='Añade al menos una URL completa.')
|
|
return
|
|
invalid = [u for u in urls if not u.startswith(('http://', 'https://'))]
|
|
if invalid:
|
|
error_label.config(text=f'Las URL deben empezar por http:// o https:// (revisa: {invalid[0]})')
|
|
return
|
|
_close_popup()
|
|
self._start_simple_scraping(urls)
|
|
|
|
tk.Button(btn_frame, text='Iniciar', command=_start, bg='#d6f2ce').pack(side='left', padx=4)
|
|
tk.Button(btn_frame, text='Cancelar', command=_close_popup).pack(side='right', padx=4)
|
|
url_box.bind('<Control-Return>', _start)
|
|
popup.bind('<Return>', _start)
|
|
popup.protocol('WM_DELETE_WINDOW', _close_popup)
|
|
self.simple_scraping_popup = popup
|
|
url_box.focus_set()
|
|
|
|
def _start_simple_scraping(self, urls: list[str]) -> None:
|
|
if not (REQUESTS_AVAILABLE and BEAUTIFULSOUP_AVAILABLE):
|
|
messagebox.showerror('Scraping', 'Instala requests y beautifulsoup4 para usar esta función.')
|
|
return
|
|
cleaned = [url.strip() for url in urls if url.strip()]
|
|
if not cleaned:
|
|
messagebox.showinfo('Scraping', 'No hay URL válidas para analizar.')
|
|
return
|
|
self._log(f'Scraping simple para {len(cleaned)} URL(s)')
|
|
preview = 'Analizando las siguientes URL:\n' + '\n'.join(f'• {url}' for url in cleaned)
|
|
self._show_text_result('Scraping simple', preview)
|
|
self.notebook.select(self.tab_resultados)
|
|
threading.Thread(target=self._simple_scraping_runner, args=(cleaned,), daemon=True).start()
|
|
|
|
def _simple_scraping_runner(self, urls: list[str]) -> None:
|
|
results: list[dict[str, str]] = []
|
|
lock = threading.Lock()
|
|
threads: list[threading.Thread] = []
|
|
for url in urls:
|
|
t = threading.Thread(target=self._simple_scraping_worker, args=(url, results, lock), daemon=True)
|
|
t.start()
|
|
threads.append(t)
|
|
for thread in threads:
|
|
thread.join()
|
|
self._scraping_queue.put(('generic_success', urls, results))
|
|
|
|
def _simple_scraping_worker(self, url: str, results: list[dict[str, str]], lock: threading.Lock) -> None:
|
|
timestamp = datetime.datetime.now().strftime('%H:%M:%S')
|
|
if not (requests and BEAUTIFULSOUP_AVAILABLE):
|
|
data = {
|
|
'url': url,
|
|
'error': 'Dependencias de scraping no disponibles',
|
|
'timestamp': timestamp
|
|
}
|
|
else:
|
|
try:
|
|
time.sleep(1)
|
|
headers = {
|
|
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36'
|
|
}
|
|
response = requests.get(url, headers=headers, timeout=10)
|
|
response.raise_for_status()
|
|
soup = BeautifulSoup(response.text, 'html.parser')
|
|
title = soup.title.string.strip() if soup.title and soup.title.string else 'Sin título'
|
|
num_links = len(soup.find_all('a'))
|
|
num_images = len(soup.find_all('img'))
|
|
num_paragraphs = len(soup.find_all('p'))
|
|
meta_desc = ''
|
|
meta_tag = soup.find('meta', attrs={'name': 'description'})
|
|
if meta_tag and meta_tag.get('content'):
|
|
content = meta_tag['content']
|
|
meta_desc = content[:100] + '...' if len(content) > 100 else content
|
|
data = {
|
|
'url': url,
|
|
'titulo': title,
|
|
'descripcion': meta_desc,
|
|
'num_links': num_links,
|
|
'num_imagenes': num_images,
|
|
'num_parrafos': num_paragraphs,
|
|
'longitud': len(response.text),
|
|
'status_code': response.status_code,
|
|
'timestamp': timestamp
|
|
}
|
|
except Exception as exc:
|
|
data = {
|
|
'url': url,
|
|
'error': str(exc),
|
|
'timestamp': timestamp
|
|
}
|
|
with lock:
|
|
results.append(data)
|
|
|
|
# --- Funcionalidad de Web Scraping (Wallapop) ---
|
|
def _open_wallapop_popup(self) -> None:
|
|
if not (REQUESTS_AVAILABLE and BEAUTIFULSOUP_AVAILABLE):
|
|
messagebox.showerror('Wallapop', 'Instala requests y beautifulsoup4 para analizar enlaces.')
|
|
return
|
|
|
|
if self.scraping_popup and tk.Toplevel.winfo_exists(self.scraping_popup):
|
|
self.scraping_popup.lift()
|
|
return
|
|
|
|
popup = tk.Toplevel(self)
|
|
popup.title('Búsqueda Wallapop')
|
|
popup.configure(bg='white', padx=18, pady=18)
|
|
popup.resizable(False, False)
|
|
popup.transient(self)
|
|
popup.grab_set()
|
|
|
|
tk.Label(
|
|
popup,
|
|
text='Escribe el producto que quieres buscar en Wallapop:',
|
|
bg='white',
|
|
font=('Arial', 11, 'bold')
|
|
).pack(anchor='w')
|
|
entry = tk.Entry(popup, width=60)
|
|
entry.pack(pady=(8, 4))
|
|
entry.insert(0, 'PlayStation 5')
|
|
|
|
controls = tk.Frame(popup, bg='white')
|
|
controls.pack(fill='x', pady=(2, 0))
|
|
tk.Label(controls, text='Resultados a mostrar:', bg='white').pack(side='left')
|
|
results_spin = tk.Spinbox(controls, from_=1, to=20, width=5, justify='center')
|
|
results_spin.pack(side='left', padx=(6, 0))
|
|
results_spin.delete(0, 'end')
|
|
results_spin.insert(0, '5')
|
|
|
|
error_label = tk.Label(popup, text='', fg='red', bg='white', wraplength=380, justify='left')
|
|
error_label.pack(anchor='w')
|
|
|
|
btn_frame = tk.Frame(popup, bg='white')
|
|
btn_frame.pack(fill='x', pady=(10, 0))
|
|
|
|
def _close_popup() -> None:
|
|
if self.scraping_popup:
|
|
try:
|
|
self.scraping_popup.destroy()
|
|
except Exception:
|
|
pass
|
|
self.scraping_popup = None
|
|
|
|
def _start(_: object | None = None) -> None:
|
|
query = entry.get().strip()
|
|
if not query:
|
|
error_label.config(text='Necesitas introducir un producto a buscar.')
|
|
return
|
|
try:
|
|
limit = max(1, min(20, int(results_spin.get())))
|
|
except ValueError:
|
|
error_label.config(text='Selecciona un número válido de resultados.')
|
|
return
|
|
_close_popup()
|
|
self._start_wallapop_search(query, limit)
|
|
|
|
tk.Button(btn_frame, text='Buscar', command=_start, bg='#d6f2ce').pack(side='left', padx=4)
|
|
tk.Button(btn_frame, text='Cancelar', command=_close_popup).pack(side='right', padx=4)
|
|
entry.bind('<Return>', _start)
|
|
popup.protocol('WM_DELETE_WINDOW', _close_popup)
|
|
self.scraping_popup = popup
|
|
entry.focus_set()
|
|
|
|
def _start_wallapop_search(self, query: str, limit: int) -> None:
|
|
if not (REQUESTS_AVAILABLE and BEAUTIFULSOUP_AVAILABLE):
|
|
messagebox.showerror('Wallapop', 'Instala requests y beautifulsoup4 para usar esta función.')
|
|
return
|
|
clean_query = query.strip()
|
|
if not clean_query:
|
|
messagebox.showerror('Wallapop', 'Escribe el producto que quieres buscar.')
|
|
return
|
|
max_results = max(1, min(20, int(limit)))
|
|
self._log(f'Buscando en Wallapop: "{clean_query}" (máx. {max_results})')
|
|
self._show_text_result(
|
|
'Wallapop',
|
|
f'Buscando "{clean_query}" en Wallapop...\nMostrando hasta {max_results} resultados.'
|
|
)
|
|
self.notebook.select(self.tab_resultados)
|
|
threading.Thread(
|
|
target=self._wallapop_search_worker,
|
|
args=(clean_query, max_results),
|
|
daemon=True
|
|
).start()
|
|
|
|
def _wallapop_search_worker(self, query: str, limit: int) -> None:
|
|
try:
|
|
results = self._fetch_wallapop_search_results(query, limit)
|
|
if not results:
|
|
raise ValueError('No se encontraron anuncios para tu búsqueda.')
|
|
self._scraping_queue.put(('search_success', query, results))
|
|
except Exception as exc:
|
|
self._scraping_queue.put(('search_error', str(exc)))
|
|
|
|
def _fetch_wallapop_search_results(self, query: str, limit: int) -> list[dict[str, str]]:
|
|
errors: list[str] = []
|
|
for fetcher in (self._fetch_wallapop_search_via_api, self._fetch_wallapop_search_via_html):
|
|
try:
|
|
results = fetcher(query, limit)
|
|
if results:
|
|
return results[:limit]
|
|
except Exception as exc:
|
|
errors.append(str(exc))
|
|
if errors:
|
|
raise ValueError('No se pudieron obtener resultados de Wallapop.\n' + '\n'.join(errors))
|
|
return []
|
|
|
|
def _fetch_wallapop_search_via_api(self, query: str, limit: int) -> list[dict[str, str]]:
|
|
if not requests:
|
|
return []
|
|
|
|
def _base_params() -> dict[str, object]:
|
|
params: dict[str, object] = {
|
|
'keywords': query,
|
|
'source': 'search_box',
|
|
'filters_source': 'quick_filters',
|
|
'order_by': 'most_relevance',
|
|
'latitude': f'{WALLAPOP_LATITUDE:.6f}',
|
|
'longitude': f'{WALLAPOP_LONGITUDE:.6f}',
|
|
'country_code': WALLAPOP_COUNTRY_CODE,
|
|
'language': WALLAPOP_LANGUAGE
|
|
}
|
|
if WALLAPOP_CATEGORY_ID:
|
|
params['category_id'] = WALLAPOP_CATEGORY_ID
|
|
return params
|
|
|
|
results: list[dict[str, str]] = []
|
|
seen_ids: set[str] = set()
|
|
next_page: str | None = None
|
|
attempts = 0
|
|
while len(results) < limit and attempts < 6:
|
|
params = {'next_page': next_page} if next_page else _base_params()
|
|
resp = requests.get(
|
|
'https://api.wallapop.com/api/v3/search',
|
|
params=params,
|
|
headers=WALLAPOP_HEADERS,
|
|
timeout=20
|
|
)
|
|
try:
|
|
resp.raise_for_status()
|
|
except requests.HTTPError as exc: # type: ignore[attr-defined]
|
|
if resp.status_code == 403:
|
|
raise ValueError(
|
|
'Wallapop devolvió 403. Asegúrate de definir WALLAPOP_DEVICE_ID/WALLAPOP_MPID '
|
|
'con valores obtenidos desde tu navegador.'
|
|
) from exc
|
|
raise
|
|
payload = resp.json()
|
|
items = self._extract_wallapop_items(payload)
|
|
if not items:
|
|
break
|
|
for entry in items:
|
|
if not isinstance(entry, dict):
|
|
continue
|
|
item_id = entry.get('id') or entry.get('itemId') or entry.get('item_id')
|
|
if not item_id:
|
|
continue
|
|
item_id = str(item_id)
|
|
if item_id in seen_ids:
|
|
continue
|
|
seen_ids.add(item_id)
|
|
slug = entry.get('web_slug') or entry.get('slug') or item_id
|
|
seller_info = entry.get('seller') if isinstance(entry.get('seller'), dict) else None
|
|
detail = self._format_wallapop_item(entry, self._build_wallapop_url(str(slug)), seller_info)
|
|
results.append(detail)
|
|
if len(results) >= limit:
|
|
break
|
|
next_page = self._extract_wallapop_next_page(payload)
|
|
if not next_page:
|
|
break
|
|
attempts += 1
|
|
return results
|
|
|
|
def _fetch_wallapop_search_via_html(self, query: str, limit: int) -> list[dict[str, str]]:
|
|
if not (requests and BEAUTIFULSOUP_AVAILABLE):
|
|
return []
|
|
params = {'keywords': query}
|
|
resp = requests.get(
|
|
'https://es.wallapop.com/app/search',
|
|
params=params,
|
|
headers=WALLAPOP_HEADERS,
|
|
timeout=20
|
|
)
|
|
resp.raise_for_status()
|
|
soup = BeautifulSoup(resp.text, 'html.parser')
|
|
script = soup.find('script', id='__NEXT_DATA__')
|
|
if not script or not script.string:
|
|
return []
|
|
data = json.loads(script.string)
|
|
page_props = data.get('props', {}).get('pageProps', {})
|
|
items: list[dict[str, object]] = []
|
|
for key in ('searchResults', 'results', 'items'):
|
|
candidate = page_props.get(key)
|
|
if isinstance(candidate, list):
|
|
items = candidate
|
|
break
|
|
if not items:
|
|
initial_state = page_props.get('initialState', {})
|
|
if isinstance(initial_state, dict):
|
|
for key in ('items', 'results'):
|
|
candidate = initial_state.get(key)
|
|
if isinstance(candidate, list):
|
|
items = candidate
|
|
break
|
|
if not items:
|
|
search_state = initial_state.get('search')
|
|
if isinstance(search_state, dict):
|
|
candidate = search_state.get('items')
|
|
if isinstance(candidate, list):
|
|
items = candidate
|
|
elif isinstance(search_state.get('search'), dict):
|
|
nested = search_state['search'].get('items')
|
|
if isinstance(nested, list):
|
|
items = nested
|
|
if not items:
|
|
return []
|
|
results: list[dict[str, str]] = []
|
|
seen: set[str] = set()
|
|
for entry in items:
|
|
if not isinstance(entry, dict):
|
|
continue
|
|
slug = (
|
|
entry.get('slug')
|
|
or entry.get('webSlug')
|
|
or entry.get('web_slug')
|
|
or entry.get('shareLink')
|
|
or entry.get('share_link')
|
|
or entry.get('url')
|
|
)
|
|
url = self._build_wallapop_url(str(slug)) if slug else ''
|
|
item_id = entry.get('id') or entry.get('itemId') or entry.get('item_id')
|
|
entry_key = str(item_id or url)
|
|
if not entry_key or entry_key in seen:
|
|
continue
|
|
seen.add(entry_key)
|
|
detail: dict[str, str] | None = None
|
|
if url:
|
|
try:
|
|
detail = self._fetch_wallapop_item(url)
|
|
except Exception:
|
|
detail = None
|
|
if not detail and item_id:
|
|
try:
|
|
detail = self._fetch_wallapop_item_from_api(str(item_id))
|
|
except Exception:
|
|
detail = None
|
|
if detail:
|
|
if url and not detail.get('link'):
|
|
detail['link'] = url
|
|
results.append(detail)
|
|
if len(results) >= limit:
|
|
break
|
|
return results
|
|
|
|
def _extract_wallapop_items(self, payload: dict[str, object]) -> list[dict[str, object]]:
|
|
items: list[dict[str, object]] = []
|
|
data_block = payload.get('data')
|
|
if isinstance(data_block, dict):
|
|
section = data_block.get('section')
|
|
if isinstance(section, dict):
|
|
section_payload = section.get('payload')
|
|
if isinstance(section_payload, dict):
|
|
raw_items = section_payload.get('items') or section_payload.get('section_items')
|
|
if isinstance(raw_items, list):
|
|
items.extend(item for item in raw_items if isinstance(item, dict))
|
|
elif isinstance(data_block.get('items'), list):
|
|
items.extend(item for item in data_block['items'] if isinstance(item, dict))
|
|
if not items:
|
|
web_results = payload.get('web_search_results')
|
|
if isinstance(web_results, dict):
|
|
sections = web_results.get('sections')
|
|
if isinstance(sections, list):
|
|
for section in sections:
|
|
if not isinstance(section, dict):
|
|
continue
|
|
raw_items = section.get('items') or section.get('section_items')
|
|
if isinstance(raw_items, list):
|
|
items.extend(item for item in raw_items if isinstance(item, dict))
|
|
return items
|
|
|
|
def _extract_wallapop_next_page(self, payload: dict[str, object]) -> str | None:
|
|
meta = payload.get('meta')
|
|
if isinstance(meta, dict):
|
|
token = meta.get('next_page')
|
|
if isinstance(token, str) and token:
|
|
return token
|
|
headers = payload.get('headers')
|
|
if isinstance(headers, dict):
|
|
token = headers.get('X-NextPage') or headers.get('x-nextpage')
|
|
if isinstance(token, str) and token:
|
|
return token
|
|
return None
|
|
|
|
def _fetch_wallapop_item(self, url: str) -> dict[str, str]:
|
|
if not requests or not BEAUTIFULSOUP_AVAILABLE:
|
|
raise RuntimeError('Dependencias de scraping no disponibles')
|
|
resp = requests.get(url, headers=WALLAPOP_HEADERS, timeout=20)
|
|
resp.raise_for_status()
|
|
soup = BeautifulSoup(resp.text, 'html.parser')
|
|
script = soup.find('script', id='__NEXT_DATA__')
|
|
if not script or not script.string:
|
|
raise ValueError('No se pudo encontrar la información del anuncio (sin datos Next.js).')
|
|
data = json.loads(script.string)
|
|
page_props = data.get('props', {}).get('pageProps', {})
|
|
item = page_props.get('item') or page_props.get('itemInfo', {}).get('item')
|
|
if not item:
|
|
raise ValueError('El formato de la página cambió y no se pudo leer el anuncio.')
|
|
seller_info = page_props.get('itemSeller') if isinstance(page_props.get('itemSeller'), dict) else None
|
|
return self._format_wallapop_item(item, url, seller_info)
|
|
|
|
def _fetch_wallapop_item_from_api(self, item_id: str) -> dict[str, str]:
|
|
if not requests:
|
|
raise RuntimeError('Dependencias de scraping no disponibles')
|
|
api_url = f'https://api.wallapop.com/api/v3/items/{item_id}'
|
|
resp = requests.get(api_url, headers=WALLAPOP_HEADERS, timeout=15)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
item = data.get('item') if isinstance(data.get('item'), dict) else data
|
|
if not isinstance(item, dict):
|
|
raise ValueError('Respuesta del API sin datos del artículo.')
|
|
slug = item.get('web_slug') or item.get('slug') or item.get('share_link')
|
|
link = self._build_wallapop_url(str(slug)) if slug else f'https://es.wallapop.com/item/{item_id}'
|
|
seller_info = data.get('seller') if isinstance(data.get('seller'), dict) else item.get('seller')
|
|
seller_dict = seller_info if isinstance(seller_info, dict) else None
|
|
return self._format_wallapop_item(item, link, seller_dict)
|
|
|
|
def _build_wallapop_url(self, slug_or_url: str) -> str:
|
|
if not slug_or_url:
|
|
return ''
|
|
text = slug_or_url.strip()
|
|
if text.startswith('http://') or text.startswith('https://'):
|
|
return text
|
|
text = text.strip('/')
|
|
if not text.startswith('item/'):
|
|
text = f'item/{text}'
|
|
return f'https://es.wallapop.com/{text}'
|
|
|
|
def _format_wallapop_item(
|
|
self,
|
|
item: dict[str, object],
|
|
url: str,
|
|
seller_info: dict[str, object] | None = None
|
|
) -> dict[str, str]:
|
|
title_field = item.get('title')
|
|
if isinstance(title_field, dict):
|
|
title = title_field.get('original') or title_field.get('translated') or '(Sin título)'
|
|
elif isinstance(title_field, str):
|
|
title = title_field or '(Sin título)'
|
|
else:
|
|
title = '(Sin título)'
|
|
|
|
price_text = 'Precio no disponible'
|
|
price_data = item.get('price')
|
|
price_cash = None
|
|
if isinstance(price_data, dict):
|
|
if isinstance(price_data.get('cash'), dict):
|
|
price_cash = price_data['cash']
|
|
else:
|
|
price_cash = price_data
|
|
if isinstance(price_cash, dict):
|
|
amount = price_cash.get('amount')
|
|
currency = price_cash.get('currency') or 'EUR'
|
|
if amount is not None:
|
|
price_text = f'{amount} {currency}'.strip()
|
|
elif isinstance(price_data, (int, float, str)):
|
|
price_text = f'{price_data} EUR'
|
|
|
|
desc_field = item.get('description')
|
|
if isinstance(desc_field, dict):
|
|
description = (desc_field.get('original') or desc_field.get('translated') or '').strip()
|
|
elif isinstance(desc_field, str):
|
|
description = desc_field.strip()
|
|
else:
|
|
description = ''
|
|
|
|
location = item.get('location') if isinstance(item.get('location'), dict) else {}
|
|
location_parts = []
|
|
if isinstance(location, dict):
|
|
for part in (location.get('city'), location.get('postalCode'), location.get('regionName')):
|
|
if part:
|
|
location_parts.append(part)
|
|
location_text = ', '.join(location_parts) if location_parts else 'Ubicación no disponible'
|
|
|
|
seller_name = ''
|
|
for candidate in (seller_info, item.get('seller'), item.get('user'), item.get('userInfo')):
|
|
if isinstance(candidate, dict):
|
|
seller_name = (
|
|
candidate.get('microName')
|
|
or candidate.get('name')
|
|
or candidate.get('userName')
|
|
or ''
|
|
)
|
|
if seller_name:
|
|
break
|
|
if not seller_name:
|
|
seller_name = 'Vendedor no disponible'
|
|
|
|
categories = ', '.join(
|
|
cat.get('name')
|
|
for cat in item.get('taxonomies', [])
|
|
if isinstance(cat, dict) and cat.get('name')
|
|
) or 'Sin categoría'
|
|
|
|
car_info = item.get('carInfo') if isinstance(item.get('carInfo'), dict) else {}
|
|
car_bits: list[str] = []
|
|
if isinstance(car_info, dict):
|
|
for key in ('year', 'km', 'engine', 'gearBox'):
|
|
field = car_info.get(key)
|
|
text = None
|
|
if isinstance(field, dict):
|
|
text = field.get('text') or field.get('iconText')
|
|
elif isinstance(field, str):
|
|
text = field
|
|
if text:
|
|
car_bits.append(text)
|
|
extra = ', '.join(car_bits)
|
|
|
|
images = item.get('images') if isinstance(item.get('images'), list) else []
|
|
image_url = ''
|
|
for img in images:
|
|
if not isinstance(img, dict):
|
|
continue
|
|
urls = img.get('urls') if isinstance(img.get('urls'), dict) else {}
|
|
if isinstance(urls, dict):
|
|
image_url = urls.get('medium') or urls.get('big') or urls.get('small') or ''
|
|
else:
|
|
image_url = img.get('url', '') if isinstance(img.get('url'), str) else ''
|
|
if image_url:
|
|
break
|
|
|
|
return {
|
|
'title': str(title),
|
|
'price': price_text,
|
|
'description': description,
|
|
'link': url,
|
|
'location': location_text,
|
|
'seller': seller_name,
|
|
'category': categories,
|
|
'extra': extra,
|
|
'image': image_url
|
|
}
|
|
|
|
def _process_scraping_queue(self) -> None:
|
|
if not self._running:
|
|
return
|
|
try:
|
|
while True:
|
|
status, *payload = self._scraping_queue.get_nowait()
|
|
if status == 'error':
|
|
message = payload[0] if payload else 'Error desconocido'
|
|
self._handle_scraping_error(str(message))
|
|
elif status == 'search_success':
|
|
query, data = payload
|
|
self._handle_wallapop_search_success(str(query), list(data))
|
|
elif status == 'search_error':
|
|
message = payload[0] if payload else 'Error desconocido'
|
|
self._handle_scraping_error(str(message))
|
|
elif status == 'generic_success':
|
|
urls, data = payload
|
|
self._handle_generic_scraping_success(list(urls), list(data))
|
|
elif status == 'backup_success':
|
|
summary, target = payload
|
|
self._handle_backup_success(str(summary), str(target))
|
|
except queue.Empty:
|
|
pass
|
|
finally:
|
|
if self._running:
|
|
self.after(100, self._process_scraping_queue)
|
|
|
|
def _handle_wallapop_search_success(self, query: str, results: list[dict[str, str]]) -> None:
|
|
count = len(results)
|
|
self._log(f'Se encontraron {count} anuncio(s) para "{query}" en Wallapop')
|
|
|
|
def builder(parent: tk.Frame) -> None:
|
|
frame = tk.Frame(parent, bg='white')
|
|
frame.pack(fill='both', expand=True, padx=12, pady=12)
|
|
summary = tk.Label(
|
|
frame,
|
|
text=f'{count} resultado(s) para "{query}"',
|
|
font=('Arial', 14, 'bold'),
|
|
bg='white',
|
|
fg='#2c3e50'
|
|
)
|
|
summary.pack(anchor='w', pady=(0, 6))
|
|
box = scrolledtext.ScrolledText(frame, wrap='word')
|
|
box.pack(fill='both', expand=True)
|
|
blocks: list[str] = []
|
|
for idx, item in enumerate(results, start=1):
|
|
block_lines = [
|
|
f'{idx}. {item.get("title", "Sin título")}',
|
|
f'Precio: {item.get("price", "N/D")}',
|
|
f'Vendedor: {item.get("seller", "N/D")}',
|
|
f'Ubicación: {item.get("location", "N/D")}',
|
|
f'Categoría: {item.get("category", "N/D")}'
|
|
]
|
|
extra = item.get('extra', '').strip()
|
|
if extra:
|
|
block_lines.append(f'Detalles: {extra}')
|
|
description = (item.get('description') or '').strip()
|
|
if description:
|
|
block_lines.append('Descripción:')
|
|
block_lines.append(description)
|
|
block_lines.append(f'Enlace: {item.get("link", "N/D")}')
|
|
block_lines.append('-' * 60)
|
|
blocks.append('\n'.join(block_lines))
|
|
box.insert('1.0', '\n\n'.join(blocks))
|
|
box.config(state='disabled')
|
|
|
|
self._render_results_view('Resultados Wallapop', builder)
|
|
self.notebook.select(self.tab_resultados)
|
|
|
|
try:
|
|
path = self._save_wallapop_results_to_file(query, results)
|
|
except Exception as exc:
|
|
self._log(f'[ERROR] No se pudo guardar el archivo: {exc}')
|
|
messagebox.showerror('Wallapop', f'No se pudo guardar el archivo: {exc}')
|
|
return
|
|
|
|
message = (
|
|
f'Se guardó el archivo con los resultados en:\n{path}\n\n'
|
|
'¿Quieres abrirlo ahora?'
|
|
)
|
|
if messagebox.askyesno('Wallapop', message):
|
|
self._open_saved_file(path)
|
|
else:
|
|
self._log(f'Archivo de resultados guardado en {path}')
|
|
|
|
def _handle_generic_scraping_success(self, urls: list[str], results: list[dict[str, str]]) -> None:
|
|
self._log(f'Scraping simple completado para {len(results)} URL(s)')
|
|
order = {url: idx for idx, url in enumerate(urls)}
|
|
ordered = sorted(results, key=lambda item: order.get(item.get('url', ''), len(order)))
|
|
|
|
def builder(parent: tk.Frame) -> None:
|
|
frame = tk.Frame(parent, bg='white')
|
|
frame.pack(fill='both', expand=True, padx=12, pady=12)
|
|
summary = tk.Label(
|
|
frame,
|
|
text='Resultados de scraping simple',
|
|
font=('Arial', 14, 'bold'),
|
|
bg='white'
|
|
)
|
|
summary.pack(anchor='w', pady=(0, 6))
|
|
box = scrolledtext.ScrolledText(frame, wrap='word')
|
|
box.pack(fill='both', expand=True)
|
|
blocks: list[str] = []
|
|
for idx, item in enumerate(ordered, start=1):
|
|
lines = [f'{idx}. {item.get("url", "URL desconocida")}', f"Hora: {item.get('timestamp', 'N/D')}" ]
|
|
if 'error' in item:
|
|
lines.append(f"Error: {item.get('error')}")
|
|
else:
|
|
lines.extend([
|
|
f"Título: {item.get('titulo', 'Sin título')}",
|
|
f"Descripción: {item.get('descripcion', '')}",
|
|
f"Estado HTTP: {item.get('status_code', 'N/D')}",
|
|
f"Longitud HTML: {item.get('longitud', 0)} caracteres",
|
|
f"Links: {item.get('num_links', 0)} | Imágenes: {item.get('num_imagenes', 0)} | Párrafos: {item.get('num_parrafos', 0)}"
|
|
])
|
|
lines.append('-' * 60)
|
|
blocks.append('\n'.join(lines))
|
|
box.insert('1.0', '\n\n'.join(blocks) if blocks else 'No se generaron resultados.')
|
|
box.config(state='disabled')
|
|
|
|
self._render_results_view('Scraping simple', builder)
|
|
self.notebook.select(self.tab_resultados)
|
|
|
|
def _handle_backup_success(self, summary: str, target: str) -> None:
|
|
self._log('Copia de seguridad finalizada')
|
|
self._show_text_result('Copias de seguridad', summary)
|
|
self.notebook.select(self.tab_resultados)
|
|
if messagebox.askyesno('Backup', f'{summary}\n\n¿Quieres abrir la carpeta destino?'):
|
|
self._open_saved_file(target)
|
|
|
|
def _save_wallapop_results_to_file(self, query: str, results: list[dict[str, str]]) -> str:
|
|
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
filename = f'wallapop_search_{timestamp}.txt'
|
|
path = os.path.join(tempfile.gettempdir(), filename)
|
|
lines = [
|
|
f'Consulta: {query}',
|
|
f'Resultados: {len(results)}',
|
|
f'Generado: {datetime.datetime.now():%Y-%m-%d %H:%M:%S}',
|
|
''
|
|
]
|
|
for idx, item in enumerate(results, start=1):
|
|
lines.append(f'Resultado {idx}')
|
|
lines.append(f'Título: {item.get("title", "Sin título")}')
|
|
lines.append(f'Precio: {item.get("price", "N/D")}')
|
|
lines.append(f'Vendedor: {item.get("seller", "N/D")}')
|
|
lines.append(f'Ubicación: {item.get("location", "N/D")}')
|
|
lines.append(f'Categoría: {item.get("category", "N/D")}')
|
|
if item.get('extra'):
|
|
lines.append(f'Detalles: {item.get("extra")}')
|
|
description = (item.get('description') or '').strip()
|
|
if description:
|
|
lines.append('Descripción:')
|
|
lines.append(description)
|
|
lines.append(f'Enlace: {item.get("link", "N/D")}')
|
|
if item.get('image'):
|
|
lines.append(f'Imagen: {item.get("image")}')
|
|
lines.append('-' * 60)
|
|
with open(path, 'w', encoding='utf-8') as fh:
|
|
fh.write('\n'.join(lines))
|
|
return path
|
|
|
|
def _handle_scraping_error(self, message: str) -> None:
|
|
self._log(f'[ERROR] Wallapop: {message}')
|
|
messagebox.showerror('Wallapop', message)
|
|
|
|
# --- Fin Web Scraping ---
|
|
|
|
|
|
|
|
def _open_game_window(self) -> None:
|
|
self._render_results_view('Carrera de camellos', self._build_camel_race_view)
|
|
|
|
def _build_camel_race_view(self, parent: tk.Frame) -> None:
|
|
container = tk.Frame(parent, bg='white')
|
|
container.pack(fill='both', expand=True)
|
|
tk.Label(
|
|
container,
|
|
text='Juego de camellos (sincronización de hilos)',
|
|
font=('Arial', 18, 'bold'),
|
|
bg='white'
|
|
).pack(pady=(16, 8))
|
|
canvas = tk.Canvas(container, height=380, bg='#fdfdfd', bd=2, relief='sunken')
|
|
canvas.pack(fill='both', expand=True, padx=16, pady=12)
|
|
status = tk.Label(container, text='Listo para correr', font=('Arial', 14, 'bold'), bg='white', fg='#2c3e50')
|
|
status.pack(pady=(0, 10))
|
|
controls = tk.Frame(container, bg='white')
|
|
controls.pack(pady=(0, 10))
|
|
|
|
tk.Button(
|
|
controls,
|
|
text='Iniciar carrera',
|
|
font=('Arial', 13, 'bold'),
|
|
width=16,
|
|
command=lambda: self._start_race(canvas, status)
|
|
).grid(row=0, column=0, padx=12, pady=4)
|
|
tk.Button(
|
|
controls,
|
|
text='Volver a resultados',
|
|
font=('Arial', 13),
|
|
width=16,
|
|
command=self._reset_results_view
|
|
).grid(row=0, column=1, padx=12, pady=4)
|
|
|
|
tk.Label(controls, text='Número de camellos:', bg='white', font=('Arial', 12)).grid(row=1, column=0, pady=6)
|
|
camel_spin = tk.Spinbox(
|
|
controls,
|
|
from_=3,
|
|
to=8,
|
|
width=5,
|
|
justify='center',
|
|
font=('Arial', 12),
|
|
command=lambda: self._set_camel_count(int(camel_spin.get()))
|
|
)
|
|
camel_spin.grid(row=1, column=1, pady=6)
|
|
camel_spin.delete(0, 'end')
|
|
camel_spin.insert(0, str(self.game_camel_count))
|
|
|
|
tk.Label(controls, text='Velocidad (1-5):', bg='white', font=('Arial', 12)).grid(row=2, column=0, pady=6)
|
|
speed_scale = tk.Scale(
|
|
controls,
|
|
from_=1,
|
|
to=5,
|
|
orient='horizontal',
|
|
resolution=0.5,
|
|
length=180,
|
|
bg='white',
|
|
command=lambda value: self._set_speed_factor(float(value))
|
|
)
|
|
speed_scale.grid(row=2, column=1, pady=6)
|
|
speed_scale.set(self.game_speed_factor)
|
|
|
|
self.game_window_canvas = canvas
|
|
self.game_window_status = status
|
|
|
|
def _set_camel_count(self, value: int) -> None:
|
|
self.game_camel_count = max(3, min(8, int(value)))
|
|
|
|
def _set_speed_factor(self, value: float) -> None:
|
|
self.game_speed_factor = max(1.0, min(5.0, float(value)))
|
|
|
|
def _start_race(self, canvas=None, status_label=None) -> None:
|
|
target_canvas = canvas or self.game_window_canvas
|
|
if target_canvas is None:
|
|
messagebox.showinfo('Juego', 'Abre el juego para iniciar la carrera.')
|
|
return
|
|
self._stop_game_queue_processor()
|
|
target_canvas.delete('all')
|
|
target_canvas.update_idletasks()
|
|
width = target_canvas.winfo_width() or int(target_canvas.cget('width'))
|
|
height = target_canvas.winfo_height() or int(target_canvas.cget('height'))
|
|
finish = width - 40
|
|
camel_count = max(3, min(8, int(self.game_camel_count)))
|
|
spacing = height / (camel_count + 1)
|
|
palette = ['#ff7675', '#74b9ff', '#55efc4', '#f1c40f', '#9b59b6', '#1abc9c', '#e67e22', '#95a5a6']
|
|
racers = []
|
|
for idx in range(camel_count):
|
|
y = spacing * (idx + 1)
|
|
target_canvas.create_line(20, y, finish, y, dash=(3,4))
|
|
color = palette[idx % len(palette)]
|
|
rect = target_canvas.create_rectangle(30, y-22, 130, y+22, fill=color)
|
|
racers.append(rect)
|
|
|
|
status = status_label or self.game_window_status
|
|
if status:
|
|
status.config(text='¡Carrera en curso!')
|
|
|
|
self.game_move_queue = queue.Queue()
|
|
self.game_done_event = threading.Event()
|
|
self.game_finish_line = finish
|
|
self.game_racer_names = [f'Camello {i+1}' for i in range(camel_count)]
|
|
self.game_queue_processor_active = True
|
|
self.after(30, self._process_game_queue)
|
|
|
|
indices = list(range(camel_count))
|
|
random.shuffle(indices)
|
|
for idx in indices:
|
|
rect = racers[idx]
|
|
threading.Thread(target=self._race_worker, args=(idx, rect), daemon=True).start()
|
|
|
|
def _race_worker(self, index: int, rect_id: int) -> None:
|
|
while True:
|
|
if not self.game_queue_processor_active or not self.game_move_queue or not self.game_done_event:
|
|
return
|
|
if self.game_done_event.is_set():
|
|
return
|
|
base_min = 3
|
|
base_max = 9
|
|
speed = max(1.0, float(self.game_speed_factor))
|
|
step = random.randint(int(base_min * speed), max(int(base_max * speed), int(base_min * speed) + 1))
|
|
self.game_move_queue.put(('move', (rect_id, step, index)))
|
|
sleep_time = max(0.02, 0.08 / speed)
|
|
time.sleep(random.uniform(sleep_time * 0.5, sleep_time * 1.2))
|
|
|
|
def _process_game_queue(self) -> None:
|
|
if not self.game_queue_processor_active or not self.game_move_queue:
|
|
return
|
|
canvas = self.game_window_canvas
|
|
if not canvas:
|
|
return
|
|
try:
|
|
while True:
|
|
action, payload = self.game_move_queue.get_nowait()
|
|
if action == 'move':
|
|
rect_id, step, idx = payload
|
|
try:
|
|
canvas.move(rect_id, step, 0)
|
|
except Exception:
|
|
continue
|
|
coords = canvas.coords(rect_id)
|
|
if coords and coords[2] >= self.game_finish_line and self.game_done_event and not self.game_done_event.is_set():
|
|
self.game_done_event.set()
|
|
if self.game_window_status:
|
|
self.game_window_status.config(text=f'{self.game_racer_names[idx]} ganó la carrera')
|
|
else:
|
|
continue
|
|
except queue.Empty:
|
|
pass
|
|
if not self.game_done_event or not self.game_done_event.is_set():
|
|
if self.game_queue_processor_active:
|
|
self.after(30, self._process_game_queue)
|
|
else:
|
|
self.game_queue_processor_active = False
|
|
|
|
def _stop_game_queue_processor(self) -> None:
|
|
self.game_queue_processor_active = False
|
|
self.game_move_queue = None
|
|
if self.game_done_event:
|
|
try:
|
|
self.game_done_event.set()
|
|
except Exception:
|
|
pass
|
|
self.game_done_event = None
|
|
|
|
def _start_alarm(self) -> None:
|
|
try:
|
|
minutes = max(1, int(self.alarm_minutes.get()))
|
|
except ValueError:
|
|
messagebox.showwarning('Alarmas', 'Ingresa un número válido de minutos.')
|
|
return
|
|
title = self.alarm_title.get().strip() or f'Alarma {self.alarm_counter}'
|
|
end_time = datetime.datetime.now() + datetime.timedelta(minutes=minutes)
|
|
alarm = {'id': self.alarm_counter, 'title': title, 'end': end_time}
|
|
self.alarm_counter += 1
|
|
self.active_alarms.append(alarm)
|
|
self.alarm_title.delete(0, 'end')
|
|
self._update_alarm_list()
|
|
self._log(f'Alarma "{title}" programada para las {end_time.strftime("%H:%M:%S")}')
|
|
|
|
def _refresh_alarms_loop(self) -> None:
|
|
if not self._running:
|
|
return
|
|
now = datetime.datetime.now()
|
|
triggered: list[dict[str, datetime.datetime | str]] = []
|
|
for alarm in list(self.active_alarms):
|
|
remaining = (alarm['end'] - now).total_seconds() # type: ignore[arg-type]
|
|
if remaining <= 0:
|
|
triggered.append(alarm)
|
|
for alarm in triggered:
|
|
self.active_alarms.remove(alarm)
|
|
self._trigger_alarm(alarm)
|
|
self._update_alarm_list()
|
|
self.after(1000, self._refresh_alarms_loop)
|
|
|
|
def _update_alarm_list(self) -> None:
|
|
if not hasattr(self, 'alarm_list'):
|
|
return
|
|
selection = self.alarm_list.curselection()
|
|
selected_id = None
|
|
if selection:
|
|
idx = selection[0]
|
|
if idx < len(self.active_alarms):
|
|
selected_id = self.active_alarms[idx]['id']
|
|
|
|
self.alarm_list.delete(0, 'end')
|
|
if not self.active_alarms:
|
|
self.alarm_status.config(text='Sin alarmas programadas')
|
|
return
|
|
now = datetime.datetime.now()
|
|
for index, alarm in enumerate(self.active_alarms):
|
|
remaining = max(0, int((alarm['end'] - now).total_seconds())) # type: ignore[arg-type]
|
|
minutes, seconds = divmod(remaining, 60)
|
|
self.alarm_list.insert('end', f"{alarm['title']} - {minutes:02d}:{seconds:02d}")
|
|
if selected_id is not None and alarm['id'] == selected_id:
|
|
self.alarm_list.selection_set(index)
|
|
self.alarm_status.config(text=f"{len(self.active_alarms)} alarma(s) activas")
|
|
|
|
def _trigger_alarm(self, alarm: dict[str, datetime.datetime | str]) -> None:
|
|
self._log(f"Alarma '{alarm['title']}' activada")
|
|
try:
|
|
self.bell()
|
|
except Exception:
|
|
pass
|
|
self._show_alarm_popup(str(alarm['title']))
|
|
|
|
def _show_alarm_popup(self, title: str) -> None:
|
|
popup = tk.Toplevel(self)
|
|
popup.title('Alarma activa')
|
|
popup.configure(bg='#ffeaea', padx=20, pady=20)
|
|
popup.transient(self)
|
|
popup.grab_set()
|
|
tk.Label(popup, text='¡Tiempo cumplido!', font=('Arial', 16, 'bold'), fg='#c0392b', bg='#ffeaea').pack(pady=6)
|
|
tk.Label(popup, text=title, font=('Arial', 13), bg='#ffeaea').pack(pady=4)
|
|
tk.Button(popup, text='Detener alarma', command=popup.destroy, bg='#f9dede').pack(pady=10)
|
|
|
|
def _cancel_selected_alarm(self) -> None:
|
|
if not self.active_alarms:
|
|
messagebox.showinfo('Alarmas', 'No hay alarmas para cancelar.')
|
|
return
|
|
selection = self.alarm_list.curselection()
|
|
if not selection:
|
|
messagebox.showinfo('Alarmas', 'Selecciona una alarma en la lista.')
|
|
return
|
|
idx = selection[0]
|
|
if idx >= len(self.active_alarms):
|
|
messagebox.showwarning('Alarmas', 'La selección ya no es válida.')
|
|
return
|
|
alarm = self.active_alarms.pop(idx)
|
|
self._update_alarm_list()
|
|
self._log(f"Alarma '{alarm['title']}' cancelada")
|
|
|
|
def _select_music(self) -> None:
|
|
if not pygame_available:
|
|
messagebox.showwarning('Música', 'Instale pygame para reproducir audio.')
|
|
return
|
|
path = filedialog.askopenfilename(
|
|
filetypes=[
|
|
('Audio/Video', '*.mp3 *.wav *.ogg *.mp4 *.mkv *.mov *.avi *.m4a *.webm'),
|
|
('Todos', '*.*')
|
|
]
|
|
)
|
|
if not path:
|
|
return
|
|
|
|
self._cleanup_temp_audio()
|
|
|
|
ext = os.path.splitext(path)[1].lower()
|
|
direct_audio = {'.mp3', '.wav', '.ogg', '.m4a'}
|
|
playable_path = path
|
|
temp_path = None
|
|
|
|
if ext not in direct_audio:
|
|
tmp = tempfile.NamedTemporaryFile(delete=False, suffix='.wav')
|
|
temp_path = tmp.name
|
|
tmp.close()
|
|
cmd = ['ffmpeg', '-y', '-i', path, '-vn', '-acodec', 'pcm_s16le', '-ar', '44100', '-ac', '2', temp_path]
|
|
try:
|
|
# Usar subprocess.run para esperar la finalización de ffmpeg
|
|
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
playable_path = temp_path
|
|
except FileNotFoundError:
|
|
if os.path.exists(temp_path):
|
|
os.unlink(temp_path)
|
|
messagebox.showerror('Música', 'ffmpeg no está instalado. Instálalo para extraer el audio de videos.')
|
|
return
|
|
except subprocess.CalledProcessError as exc:
|
|
if os.path.exists(temp_path):
|
|
os.unlink(temp_path)
|
|
messagebox.showerror('Música', f'No se pudo extraer el audio: {exc}')
|
|
return
|
|
|
|
pygame.mixer.init()
|
|
pygame.mixer.music.load(playable_path)
|
|
pygame.mixer.music.play(-1)
|
|
self.music_temp_file = temp_path
|
|
self._log(f'Reproduciendo {os.path.basename(path)}')
|
|
|
|
def _stop_music(self) -> None:
|
|
if pygame_available and pygame.mixer.get_init():
|
|
pygame.mixer.music.stop()
|
|
self._cleanup_temp_audio()
|
|
|
|
def _pause_music(self) -> None:
|
|
if pygame_available and pygame.mixer.get_init():
|
|
pygame.mixer.music.pause()
|
|
|
|
def _resume_music(self) -> None:
|
|
if pygame_available and pygame.mixer.get_init():
|
|
pygame.mixer.music.unpause()
|
|
|
|
def _cleanup_temp_audio(self) -> None:
|
|
if self.music_temp_file and os.path.exists(self.music_temp_file):
|
|
try:
|
|
os.remove(self.music_temp_file)
|
|
except OSError:
|
|
pass
|
|
self.music_temp_file = None
|
|
|
|
def _resolve_openweather_key(self) -> str:
|
|
key = (self.weather_api_key or '').strip()
|
|
if not key:
|
|
raise RuntimeError('Configure OPENWEATHER_API_KEY u OPENWEATHER_FALLBACK_API_KEY')
|
|
return key
|
|
|
|
def _fetch_weather_by_coordinates(self, lat: float, lon: float) -> dict[str, Any]:
|
|
if not REQUESTS_AVAILABLE:
|
|
raise RuntimeError('Instale requests para consultar el clima')
|
|
key = self._resolve_openweather_key()
|
|
params = {
|
|
'lat': lat,
|
|
'lon': lon,
|
|
'appid': key,
|
|
'units': 'metric',
|
|
'lang': 'es'
|
|
}
|
|
resp = requests.get('https://api.openweathermap.org/data/2.5/weather', params=params, timeout=8)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
if 'main' not in data or 'weather' not in data:
|
|
raise ValueError('Respuesta incompleta de OpenWeatherMap')
|
|
return data
|
|
|
|
def _fetch_weather_data(self, city_query: str) -> dict[str, Any]:
|
|
if not REQUESTS_AVAILABLE:
|
|
raise RuntimeError('Instale requests para consultar el clima')
|
|
key = self._resolve_openweather_key()
|
|
params = {
|
|
'q': city_query,
|
|
'appid': key,
|
|
'units': 'metric',
|
|
'lang': 'es'
|
|
}
|
|
resp = requests.get('https://api.openweathermap.org/data/2.5/weather', params=params, timeout=8)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
if 'main' not in data or 'weather' not in data:
|
|
raise ValueError('Respuesta incompleta de OpenWeatherMap')
|
|
return data
|
|
|
|
def _fetch_javea_weather(self) -> dict[str, Any]:
|
|
return self._fetch_weather_by_coordinates(JAVEA_LATITUDE, JAVEA_LONGITUDE)
|
|
|
|
# ------------------------ chat ------------------------
|
|
def _connect_chat(self) -> None:
|
|
host = self.host_entry.get().strip() or SERVER_HOST_DEFAULT
|
|
try:
|
|
port = int(self.port_entry.get().strip())
|
|
except ValueError:
|
|
port = SERVER_PORT_DEFAULT
|
|
if self.chat_client.connect(host, port):
|
|
self._enqueue_chat_message('[INFO] Conectado al servidor')
|
|
|
|
def _send_chat(self) -> None:
|
|
text = self.message_entry.get('1.0', 'end').strip()
|
|
if not text:
|
|
return
|
|
if self.chat_client.send(text + '\n'):
|
|
self._enqueue_chat_message('Yo: ' + text)
|
|
self.message_entry.delete('1.0', 'end')
|
|
else:
|
|
messagebox.showwarning('Chat', 'No hay conexión activa.')
|
|
|
|
def _enqueue_chat_message(self, text: str) -> None:
|
|
self._chat_queue.put(text)
|
|
|
|
def _chat_loop(self) -> None:
|
|
while self._running:
|
|
try:
|
|
msg = self._chat_queue.get(timeout=0.5)
|
|
except queue.Empty:
|
|
continue
|
|
self.chat_display.after(0, lambda m=msg: self._append_chat(m))
|
|
|
|
def _append_chat(self, text: str) -> None:
|
|
self.chat_display.config(state='normal')
|
|
self.chat_display.insert('end', text + '\n')
|
|
self.chat_display.see('end')
|
|
self.chat_display.config(state='disabled')
|
|
|
|
# ------------------------ hilos auxiliares ------------------------
|
|
def _update_clock(self) -> None:
|
|
if not self._running:
|
|
return
|
|
now = datetime.datetime.now().strftime('%d/%m/%Y %H:%M:%S')
|
|
self.clock_label.config(text=now)
|
|
self.after(1000, self._update_clock)
|
|
|
|
def _update_traffic(self) -> None:
|
|
if not (self._running and psutil):
|
|
return
|
|
if self._traffic_last is None:
|
|
try:
|
|
self._traffic_last = psutil.net_io_counters()
|
|
except Exception as exc:
|
|
self._log(f'Tráfico: psutil no disponible ({exc})')
|
|
self.traffic_label.config(text='Red: N/D')
|
|
return
|
|
try:
|
|
current = psutil.net_io_counters()
|
|
sent = (current.bytes_sent - self._traffic_last.bytes_sent) / 1024
|
|
recv = (current.bytes_recv - self._traffic_last.bytes_recv) / 1024
|
|
self._traffic_last = current
|
|
self.traffic_label.config(text=f'Red: ↑ {sent:.1f} KB/s ↓ {recv:.1f} KB/s')
|
|
except Exception as exc:
|
|
self._log(f'Tráfico: {exc}')
|
|
self.traffic_label.config(text='Red: N/D')
|
|
finally:
|
|
if self._running:
|
|
self.after(1000, self._update_traffic)
|
|
|
|
def _resource_poll_tick(self) -> None:
|
|
if not (self._running and psutil):
|
|
return
|
|
try:
|
|
cpu = psutil.cpu_percent(interval=None)
|
|
mem = psutil.virtual_memory().percent
|
|
except Exception as exc:
|
|
self._log(f'Recursos: {exc}')
|
|
if self._running:
|
|
self._resource_poll_job = self.after(2000, self._resource_poll_tick)
|
|
return
|
|
threads = 0
|
|
if self._ps_process:
|
|
try:
|
|
threads = self._ps_process.num_threads()
|
|
except Exception:
|
|
threads = 0
|
|
self._resource_history['cpu'].append(cpu)
|
|
self._resource_history['mem'].append(mem)
|
|
self._resource_history['threads'].append(threads)
|
|
self._resource_history['cpu'] = self._resource_history['cpu'][-40:]
|
|
self._resource_history['mem'] = self._resource_history['mem'][-40:]
|
|
self._resource_history['threads'] = self._resource_history['threads'][-40:]
|
|
self._update_chart()
|
|
if self._running:
|
|
self._resource_poll_job = self.after(1000, self._resource_poll_tick)
|
|
|
|
def _update_chart(self) -> None:
|
|
if not (
|
|
MATPLOTLIB_AVAILABLE
|
|
and self.chart_canvas
|
|
and self.line_cpu
|
|
and self.line_mem
|
|
and self.ax_cpu
|
|
and self.ax_mem
|
|
):
|
|
return
|
|
|
|
x = list(range(len(self._resource_history['cpu'])))
|
|
self.line_cpu.set_data(x, self._resource_history['cpu'])
|
|
self.line_mem.set_data(x, self._resource_history['mem'])
|
|
|
|
max_x = max(40, len(x))
|
|
self.ax_cpu.set_xlim(0, max_x)
|
|
self.ax_mem.set_xlim(0, max_x)
|
|
self.ax_cpu.set_ylim(0, 100)
|
|
self.ax_mem.set_ylim(0, 100)
|
|
|
|
if self.mem_fill:
|
|
try:
|
|
self.mem_fill.remove()
|
|
except Exception:
|
|
pass
|
|
if self.ax_mem and x:
|
|
self.mem_fill = self.ax_mem.fill_between(
|
|
x,
|
|
self._resource_history['mem'],
|
|
color='#4ecdc4',
|
|
alpha=0.3
|
|
)
|
|
|
|
if self.ax_threads:
|
|
threads = self._resource_history['threads']
|
|
if threads:
|
|
if self.thread_bars and len(self.thread_bars) == len(threads):
|
|
for rect, height in zip(self.thread_bars, threads):
|
|
rect.set_height(height)
|
|
else:
|
|
if self.thread_bars:
|
|
for rect in self.thread_bars:
|
|
rect.remove()
|
|
self.thread_bars = self.ax_threads.bar(x, threads, color='#ffa726') if x else None
|
|
if threads:
|
|
max_threads = max(threads)
|
|
self.ax_threads.set_ylim(0, max(max_threads * 1.2, 1))
|
|
self.ax_threads.set_xlim(-0.5, max(39.5, len(x) - 0.5))
|
|
|
|
if self.chart_canvas:
|
|
self.chart_canvas.draw_idle()
|
|
|
|
def _log(self, text: str) -> None:
|
|
if threading.current_thread() is not threading.main_thread():
|
|
self.after(0, lambda t=text: self._log(t))
|
|
return
|
|
timestamp = datetime.datetime.now().strftime('%H:%M:%S')
|
|
self.notes.insert('end', f'[{timestamp}] {text}\n')
|
|
self.notes.see('end')
|
|
|
|
def on_close(self) -> None:
|
|
self._running = False
|
|
if self._resource_poll_job is not None:
|
|
try:
|
|
self.after_cancel(self._resource_poll_job)
|
|
except Exception:
|
|
pass
|
|
self._resource_poll_job = None
|
|
self.chat_client.close()
|
|
self.destroy()
|
|
|
|
|
|
def main() -> None:
|
|
app = DashboardApp()
|
|
app.protocol('WM_DELETE_WINDOW', app.on_close)
|
|
app.mainloop()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main() |