ProyectoPHP/system_utils.py

963 lines
38 KiB
Python

# system_utils.py
import tkinter as tk
from tkinter import messagebox, simpledialog, filedialog, ttk
import datetime
import webbrowser
import subprocess
import csv
import threading
import time
import psutil
import os
import platform
import uuid
import pygame
import json
import random
# Importaciones directas de módulos (Acceso con el prefijo del módulo)
import config
import monitor_manager
# Inicializar pygame mixer
try:
pygame.mixer.init()
except Exception as e:
print(f"ADVERTENCIA: No se pudo iniciar pygame.mixer: {e}")
# --- Constante para la comunicación de progreso ---
PROGRESS_FILE = os.path.join(config.BASE_DIR, "task_progress.txt")
# ===============================================
# Log y Soporte
# ===============================================
def log_event(message):
"""Escribe un mensaje en el log del sistema."""
if config.system_log and config.system_log.winfo_exists():
timestamp = datetime.datetime.now().strftime("[%H:%M:%S] ")
config.system_log.config(state=tk.NORMAL)
config.system_log.insert(tk.END, timestamp + message + "\n")
config.system_log.see(tk.END)
config.system_log.config(state=tk.DISABLED)
def bytes_a_human_readable(n):
"""Convierte bytes a KB, MB, GB, etc."""
symbols = ('K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
prefix = {}
for i, s in enumerate(symbols):
prefix[s] = 1 << (i + 1) * 10
for s in reversed(symbols):
if n >= prefix[s]:
value = float(n) / prefix[s]
return f'{value:.2f} {s}iB'
return f'{n} B'
# ===============================================
# Lógica de Persistencia de Alarmas
# ===============================================
def guardar_alarmas():
"""Guarda la lista de alarmas en un archivo JSON."""
# 1. Preparar datos para serializar: Convertir objetos datetime a strings ISO
data_to_save = {}
# Encontramos el último ID usado
last_id = 0
for uid, data in config.alarmas_programadas.items():
data_to_save[uid] = {
'time_str': data['time'].isoformat(), # Convertir datetime a string
'active': data['active'],
'message': data['message'],
'sound_file': data['sound_file']
}
last_id = max(last_id, uid)
final_data = {
"last_id": last_id,
"alarms": data_to_save
}
try:
os.makedirs(os.path.dirname(config.ALARM_SAVE_FILE), exist_ok=True)
with open(config.ALARM_SAVE_FILE, 'w', encoding='utf-8') as f:
json.dump(final_data, f, indent=4)
log_event("Datos de alarma guardados con éxito.")
except Exception as e:
log_event(f"ERROR: No se pudieron guardar las alarmas: {e}")
def cargar_alarmas(treeview_alarmas=None, root=None):
"""Carga las alarmas desde el archivo JSON y las añade al modelo y al Treeview."""
if not os.path.exists(config.ALARM_SAVE_FILE):
return
try:
with open(config.ALARM_SAVE_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
config.alarma_counter = data.get("last_id", 0)
alarm_data = data.get("alarms", {})
for uid_str, data in alarm_data.items():
uid = int(uid_str)
# 1. Convertir string ISO de vuelta a objeto datetime
target_time = datetime.datetime.fromisoformat(data['time_str'])
# 2. Re-agregar al modelo
config.alarmas_programadas[uid] = {
'time': target_time,
'active': data['active'],
'message': data['message'],
'sound_file': data['sound_file']
}
# 3. Si se proporciona Treeview, actualizar la interfaz
if treeview_alarmas:
status = "ACTIVA" if data['active'] else "INACTIVA"
treeview_alarmas.insert('', tk.END, values=(
uid,
target_time.strftime('%H:%M'),
data['message'],
status,
target_time.strftime('%Y-%m-%d')
), tags=('active',) if data['active'] else ('inactive',))
# Si la alarma está activa, aseguramos que el hilo de verificación inicie
if data['active'] and root and not hasattr(agregar_alarma, 'hilo_activo'):
threading.Thread(target=lambda: verificar_alarma(root, treeview_alarmas), daemon=True).start()
setattr(agregar_alarma, 'hilo_activo', True)
log_event(f"Se cargaron {len(alarm_data)} alarmas.")
except Exception as e:
log_event(f"ERROR: Fallo al cargar el archivo de alarmas: {e}")
config.alarmas_programadas = {} # Limpiar modelo en caso de fallo
# ===============================================
# Funcionalidades de Alarma (Modificadas para Pygame)
# ===============================================
def verificar_alarma(root, treeview_alarmas):
"""
Función del hilo secundario que verifica la hora actual contra la hora objetivo.
"""
while config.monitor_running:
alarmas_a_chequear = list(config.alarmas_programadas.items())
now = datetime.datetime.now()
for alarma_id, data in alarmas_a_chequear:
if data['active']:
target = data['time']
# Comparamos hora y minuto
if now.hour == target.hour and now.minute == target.minute and now.second < 2:
# Alarma alcanzada!
root.after(0, lambda: notificar_alarma_alcanzada(alarma_id, data, treeview_alarmas))
# Desactivamos la alarma en el modelo para que no se repita
data['active'] = False
guardar_alarmas() # Guardar después de desactivar
time.sleep(1) # Chequeamos cada segundo
def notificar_alarma_alcanzada(alarma_id, data, treeview_alarmas):
"""Muestra la notificación visual, reproduce el sonido, actualiza el Treeview y registra el evento."""
# Detenemos música antes de empezar a sonar la alarma
detener_mp3()
# 1. Reproducir Sonido (Usando pygame)
sound_file = data['sound_file'] # Usamos el archivo almacenado en el modelo
if sound_file and os.path.exists(sound_file):
try:
if pygame.mixer.music.get_busy():
pygame.mixer.music.stop()
pygame.mixer.music.load(sound_file)
pygame.mixer.music.set_volume(config.alarma_volumen)
pygame.mixer.music.play(-1) # Reproducir en bucle (-1)
config.alarma_sonando = True
log_event(f"Sonido de alarma '{os.path.basename(sound_file)}' iniciado.")
except Exception as e:
log_event(f"ERROR al reproducir sonido con pygame: {e}")
else:
log_event("ADVERTENCIA: Archivo de sonido no encontrado o no válido.")
# 2. Notificación visual
msg = f"¡Alarma programada alcanzada!\nTarea: {data['message']}"
log_event(f"ALARMA ACTIVADA: '{data['message']}' a las {data['time'].strftime('%H:%M:%S')}")
messagebox.showinfo("ALARMA", msg)
# 3. Actualizar el Treeview para mostrar 'INACTIVA'
for item in treeview_alarmas.get_children():
values = treeview_alarmas.item(item, 'values')
if values and values[0] == str(alarma_id):
treeview_alarmas.item(item, values=(alarma_id, values[1], values[2], "INACTIVA", values[4]), tags=('inactive',))
break
guardar_alarmas() # Guardar después de actualizar la interfaz
def detener_sonido_alarma():
"""Detiene la reproducción de sonido de la alarma usando pygame."""
if config.alarma_sonando and pygame.mixer.music.get_busy():
pygame.mixer.music.stop()
config.alarma_sonando = False
log_event("Reproducción de alarma detenida.")
elif config.alarma_sonando:
config.alarma_sonando = False
log_event("Estado de alarma limpiado. No estaba sonando.")
def ajustar_volumen_alarma(nuevo_volumen_str):
"""Ajusta el volumen de la alarma (rango 0.0 a 1.0) desde el slider (0-100)."""
try:
vol_int = int(float(nuevo_volumen_str))
vol = vol_int / 100.0 # Convertir de 0-100 a 0.0-1.0
# 1. Actualizar la variable de configuración para futuras alarmas/música
config.alarma_volumen = vol
# 2. Si hay algo sonando, ajustar inmediatamente el volumen
if pygame.mixer.music.get_busy():
pygame.mixer.music.set_volume(vol)
log_event(f"Volumen de reproducción ajustado a {vol:.2f}.")
except Exception as e:
log_event(f"Error al ajustar volumen: {e}")
def agregar_alarma(root, hora_str, minuto_str, tarea, treeview_alarmas, window_to_close=None, sound_file=None):
"""
Añade una nueva alarma a la lista (usando ID autonumérico) y actualiza el Treeview.
Acepta el argumento 'sound_file' para la ruta del sonido seleccionado.
"""
try:
hora = int(hora_str)
minuto = int(minuto_str)
tarea = tarea.strip()
if not tarea:
tarea = "Alarma sin descripción"
# Cálculo de la hora objetivo
now = datetime.datetime.now()
target_time = datetime.datetime(now.year, now.month, now.day, hora, minuto, 0)
if target_time < now:
target_time += datetime.timedelta(days=1)
# 1. Generar ID autonumérico y único
config.alarma_counter += 1
alarma_id = config.alarma_counter
# 2. Añadir al modelo
config.alarmas_programadas[alarma_id] = {
'time': target_time,
'active': True,
'message': tarea,
'sound_file': sound_file if sound_file else config.ALERTA_SOUND_FILE # Usar el seleccionado
}
# 3. Añadir al Treeview
treeview_alarmas.insert('', tk.END, values=(
alarma_id,
target_time.strftime('%H:%M'),
tarea,
"ACTIVA",
target_time.strftime('%Y-%m-%d')
), tags=('active',))
# Configuramos los tags visuales
treeview_alarmas.tag_configure('active', background='#FFCCCC') # Fondo rojo claro si está activa
treeview_alarmas.tag_configure('inactive', background='#DDDDDD')
log_event(f"Nueva alarma ({alarma_id}) programada: {tarea}")
# Guardar inmediatamente después de agregar
guardar_alarmas()
# 4. Iniciar el hilo de verificación si no está activo
if not hasattr(agregar_alarma, 'hilo_activo'):
threading.Thread(target=lambda: verificar_alarma(root, treeview_alarmas), daemon=True).start()
setattr(agregar_alarma, 'hilo_activo', True)
# 5. Cerrar la ventana flotante si existe
if window_to_close:
window_to_close.destroy()
except ValueError:
messagebox.showerror("Error de entrada", "Asegúrate de seleccionar una hora y minuto válidos (00-23, 00-59).")
def toggle_alarma(treeview_alarmas):
"""Activa o desactiva la alarma seleccionada."""
selected_item = treeview_alarmas.focus()
if not selected_item:
messagebox.showwarning("Advertencia", "Selecciona una alarma de la lista.")
return
alarma_id = int(treeview_alarmas.item(selected_item, 'values')[0])
data = config.alarmas_programadas.get(alarma_id)
if data:
data['active'] = not data['active']
new_status = "ACTIVA" if data['active'] else "INACTIVA"
# Actualizar Treeview y tags visuales
treeview_alarmas.item(selected_item, values=(alarma_id, data['time'].strftime('%H:%M'), data['message'], new_status, data['time'].strftime('%Y-%m-%d')), tags=('active',) if data['active'] else ('inactive',))
log_event(f"Alarma {alarma_id} toggled a {new_status}.")
guardar_alarmas() # Guardar después del toggle
def eliminar_alarma(treeview_alarmas):
"""Elimina la alarma seleccionada del modelo y del Treeview."""
selected_item = treeview_alarmas.focus()
if not selected_item:
messagebox.showwarning("Advertencia", "Selecciona una alarma para eliminar.")
return
alarma_id = int(treeview_alarmas.item(selected_item, 'values')[0])
if messagebox.askyesno("Confirmar Eliminación", f"¿Estás seguro de que deseas eliminar la alarma con ID {alarma_id}?"):
# 1. Eliminar del modelo
if alarma_id in config.alarmas_programadas:
del config.alarmas_programadas[alarma_id]
# 2. Eliminar del Treeview
treeview_alarmas.delete(selected_item)
log_event(f"Alarma {alarma_id} eliminada.")
guardar_alarmas() # Guardar después de la eliminación
def modificar_alarma_existente(root, alarma_id, hora_str, minuto_str, tarea, treeview_alarmas, window_to_close, sound_file):
"""
Sobreescribe los datos de una alarma existente en el modelo y actualiza el Treeview.
"""
try:
data = config.alarmas_programadas.get(alarma_id)
if not data:
messagebox.showerror("Error", "Alarma no encontrada en el sistema.")
return
hora = int(hora_str)
minuto = int(minuto_str)
tarea = tarea.strip() or "Alarma sin descripción"
# 1. Recalcular la hora objetivo
now = datetime.datetime.now()
target_time = datetime.datetime(now.year, now.month, now.day, hora, minuto, 0)
if target_time < now:
target_time += datetime.timedelta(days=1)
# 2. Actualizar el modelo (manteniendo el estado 'active' actual)
data['time'] = target_time
data['message'] = tarea
data['sound_file'] = sound_file
# 3. Actualizar el Treeview
new_status = "ACTIVA" if data['active'] else "INACTIVA"
# Encontramos el item en el Treeview para actualizarlo
for item in treeview_alarmas.get_children():
if treeview_alarmas.item(item, 'values')[0] == str(alarma_id):
treeview_alarmas.item(item, values=(
alarma_id,
target_time.strftime('%H:%M'),
tarea,
new_status,
target_time.strftime('%Y-%m-%d')
), tags=('active',) if data['active'] else ('inactive',))
break
log_event(f"Alarma {alarma_id} modificada y re-programada para {target_time.strftime('%H:%M')}.")
guardar_alarmas()
window_to_close.destroy()
except ValueError:
messagebox.showerror("Error de entrada", "Asegúrate de seleccionar una hora y minuto válidos.")
except Exception as e:
log_event(f"ERROR al modificar alarma: {e}")
messagebox.showerror("Error", f"Error al modificar la alarma: {e}")
def seleccionar_archivo_alarma(root, label_archivo):
"""Abre un diálogo para seleccionar un archivo de sonido de la carpeta alarmas."""
try:
# Asegurar que la carpeta exista antes de usarla como directorio inicial
os.makedirs(config.ALARM_FOLDER, exist_ok=True)
except Exception as e:
log_event(f"ERROR: No se pudo crear la carpeta de alarmas: {e}")
messagebox.showerror("Error", f"No se pudo crear el directorio de alarmas: {e}")
try:
archivo_seleccionado = filedialog.askopenfilename(
initialdir=config.ALARM_FOLDER,
title="Seleccionar Sonido de Alarma",
filetypes=(("Archivos de Audio", "*.wav *.mp3"), ("Todos los archivos", "*.*")),
parent=root
)
if archivo_seleccionado:
# 1. Almacenar la ruta completa en la variable de configuración global
config.ALERTA_SOUND_FILE = archivo_seleccionado
# 2. Actualizar el Label en la UI para mostrar el nombre del archivo
label_archivo.config(text=os.path.basename(archivo_seleccionado))
log_event(f"Sonido seleccionado: {os.path.basename(archivo_seleccionado)}")
return archivo_seleccionado
else:
log_event("Selección de sonido cancelada.")
return config.ALERTA_SOUND_FILE
except Exception as e:
log_event(f"ERROR al iniciar el diálogo de selección de archivo: {e}")
messagebox.showerror("Error", "No se pudo iniciar el diálogo de selección de archivo.")
return config.ALERTA_SOUND_FILE
# ===============================================
# Funcionalidades de Reproducción de Música (NUEVAS)
# ===============================================
def seleccionar_mp3(root, label_archivo):
"""Abre un diálogo para seleccionar un archivo MP3 o WAV."""
try:
# Directorio inicial
initial_dir = os.path.join(config.BASE_DIR, "data")
os.makedirs(initial_dir, exist_ok=True)
archivo_seleccionado = filedialog.askopenfilename(
initialdir=initial_dir,
title="Seleccionar Archivo de Música",
filetypes=(("Archivos de Audio", "*.mp3 *.wav"), ("Todos los archivos", "*.*")),
parent=root
)
if archivo_seleccionado:
config.current_music_file = archivo_seleccionado
label_archivo.config(text=os.path.basename(archivo_seleccionado))
log_event(f"Música seleccionada: {os.path.basename(archivo_seleccionado)}")
return archivo_seleccionado
else:
log_event("Selección de música cancelada.")
return None
except Exception as e:
log_event(f"ERROR al iniciar el diálogo de selección de música: {e}")
messagebox.showerror("Error", "No se pudo iniciar el diálogo de selección de archivo.")
return None
def reproducir_mp3(root):
"""Carga y reproduce el archivo seleccionado, asegurando que no haya conflictos con la alarma."""
if not config.current_music_file or not os.path.exists(config.current_music_file):
messagebox.showwarning("Advertencia", "Por favor, selecciona un archivo de música primero.")
return
# Detener cualquier reproducción previa (alarma o música)
detener_sonido_alarma() # Detiene la alarma si está sonando
detener_mp3() # Detiene la música si está sonando
try:
pygame.mixer.music.load(config.current_music_file)
# Reutilizamos la variable de volumen global, ya que Pygame Mixer tiene un solo canal de control de volumen
pygame.mixer.music.set_volume(config.alarma_volumen)
pygame.mixer.music.play(-1) # Reproducir en bucle
config.music_sonando = True
log_event(f"Reproducción de música iniciada: {os.path.basename(config.current_music_file)}")
except pygame.error as e:
log_event(f"ERROR de Pygame al reproducir música: {e}")
messagebox.showerror("Error de Reproducción", f"No se pudo reproducir el archivo. Asegúrate de que sea compatible con Pygame. ({e})")
except Exception as e:
log_event(f"ERROR inesperado al reproducir música: {e}")
def detener_mp3():
"""Detiene la reproducción de música si está activa."""
if hasattr(config, 'music_sonando') and config.music_sonando and pygame.mixer.music.get_busy():
pygame.mixer.music.stop()
config.music_sonando = False
log_event("Reproducción de música detenida.")
elif hasattr(config, 'music_sonando') and config.music_sonando:
config.music_sonando = False
log_event("Estado de música limpiado. No estaba sonando.")
def ajustar_volumen_mp3(nuevo_volumen_str):
"""Ajusta el volumen de Pygame (rango 0.0 a 1.0) desde el slider (0-100)."""
# Reutiliza la misma lógica que ajustar_volumen_alarma, ya que Pygame Mixer solo tiene un volumen maestro.
ajustar_volumen_alarma(nuevo_volumen_str)
# ===============================================
# Funcionalidades Externas (Existentes)
# ===============================================
def lanzar_url(url):
"""Abre una URL en el navegador y registra el evento."""
try:
webbrowser.open_new_tab(url)
log_event(f"Lanzando URL: {url}")
except Exception as e:
log_event(f"Error al intentar abrir la URL {url}: {e}")
def ejecutar_script_en_hilo(status_label, root):
"""Ejecuta un script de Bash en un hilo separado con ProgressBar."""
def run_script():
if not root.winfo_exists(): return
# 1. PREPARACIÓN E INICIO DE PROGRESSBAR
if os.path.exists(config.PROGRESS_FILE): os.remove(config.PROGRESS_FILE)
if config.progress_bar:
root.after(0, config.progress_bar.config, {"value": 0, "maximum": 100})
root.after(0, config.progress_bar.start, 20)
status_label.after(0, status_label.config, {"text": "Ejecutando Backup...", "bg": "orange"})
log_event("Iniciando copia de seguridad (backup)...")
try:
command = ["bash", config.SCRIPT_PATH, config.BASE_DIR]
result = subprocess.run(command, capture_output=True, text=True, check=False)
if not root.winfo_exists(): return # Doble chequeo después de subprocess
# 2. FINALIZACIÓN Y LIMPIEZA
if config.progress_bar:
root.after(0, config.progress_bar.stop)
root.after(0, config.progress_bar.config, {"value": 100 if result.returncode == 0 else 0})
if result.returncode == 0:
if root.winfo_exists(): root.after(0, status_label.config, {"text": "Backup: OK", "bg": "green"})
log_event("Copia de seguridad finalizada con éxito.")
else:
if root.winfo_exists(): root.after(0, status_label.config, {"text": f"Backup: ERROR ({result.returncode})", "bg": "red"})
log_event(f"ERROR en la copia de seguridad. Código: {result.returncode}. Verifique la terminal.")
except Exception as e:
if config.progress_bar: root.after(0, config.progress_bar.stop)
if root.winfo_exists(): root.after(0, status_label.config, {"text": f"Error desconocido: {str(e)}", "bg": "red"})
log_event(f"Error desconocido en backup: {str(e)}")
finally:
if os.path.exists(config.PROGRESS_FILE): os.remove(config.PROGRESS_FILE)
# 3. INICIO DEL HILO DE TAREA Y EL HILO MONITOR
threading.Thread(target=run_script, daemon=True).start()
threading.Thread(target=lambda: read_progress_pipe(root), daemon=True).start()
def read_progress_pipe(root):
"""Hilo que monitorea un archivo temporal para actualizar la ProgressBar."""
if not config.progress_bar: return
while config.monitor_running:
if not root.winfo_exists(): break
try:
if os.path.exists(config.PROGRESS_FILE):
with open(config.PROGRESS_FILE, 'r') as f:
content = f.read().strip()
if content.isdigit():
value = int(content)
root.after(0, config.progress_bar.config, {"value": value})
root.after(0, config.progress_bar.update)
elif content.startswith("STATUS:"):
log_event(content[7:])
except Exception as e:
pass
time.sleep(0.1)
def update_time(status_bar, root):
"""Función que actualiza la hora y el día de la semana en un label (Hilo)."""
while config.monitor_running:
now = datetime.datetime.now()
day_of_week = now.strftime("%A")
time_str = now.strftime("%H:%M:%S")
date_str = now.strftime("%Y-%m-%d")
label_text = f"{day_of_week}, {date_str} - {time_str}"
if root.winfo_exists():
root.after(1000, lambda: status_bar.winfo_exists() and status_bar.config(text=label_text))
else:
break
time.sleep(1)
def manejar_registro_csv(status_label):
"""Inicia o detiene la escritura de datos de recursos a un archivo CSV."""
if config.registro_csv_activo:
config.registro_csv_activo = False
status_label.config(text="Registro: Detenido", bg="gray")
log_event("Registro de historial de recursos detenido.")
else:
config.registro_csv_activo = True
status_label.config(text="Registro: ACTIVO", bg="gold")
log_event(f"Registro de historial de recursos iniciado en: {config.archivo_registro_csv}")
try:
with open(config.archivo_registro_csv, mode='a', newline='') as file:
writer = csv.writer(file)
if file.tell() == 0:
writer.writerow(['Timestamp', 'CPU_Total (%)', 'RAM_Total (%)', 'Net_Sent (KB/s)', 'Net_Recv (KB/s)'])
except Exception as e:
log_event(f"ERROR: No se pudo iniciar el registro CSV: {e}")
config.registro_csv_activo = False
status_label.config(text="Registro: ERROR", bg="red")
return config.registro_csv_activo
# ===============================================
# Funciones del Editor de Texto (Existentes)
# ===============================================
def nuevo_archivo():
"""Limpia el contenido actual del editor de texto."""
if not config.editor_texto:
log_event("ERROR: Editor de texto no inicializado.")
return
config.editor_texto.delete("1.0", tk.END)
log_event("Editor de texto limpiado. Nuevo archivo iniciado.")
def abrir_carpeta_especifica(ruta, nombre):
"""Abre una carpeta específica del proyecto en el explorador de archivos."""
try:
os.makedirs(ruta, exist_ok=True)
# Usamos subprocess.Popen para ser compatible con más sistemas
if platform.system() == "Windows":
subprocess.Popen(['explorer', ruta])
elif platform.system() == "Darwin": # macOS
subprocess.Popen(['open', ruta])
else: # Asume Linux/Unix (usa xdg-open)
subprocess.Popen(['xdg-open', ruta])
log_event(f"Carpeta '{nombre}' abierta: {ruta}")
except Exception as e:
log_event(f"ERROR al abrir la carpeta '{nombre}': {e}")
messagebox.showerror("Error", f"No se pudo abrir el directorio {nombre}: {e}")
def abrir_carpeta_notas():
"""Función wrapper para abrir la carpeta de notas."""
abrir_carpeta_especifica(config.NOTES_FOLDER, "Notas")
def abrir_archivo(root):
"""Muestra un diálogo para seleccionar y abrir un archivo .txt de la carpeta de notas."""
if not config.editor_texto:
log_event("ERROR: Editor de texto no inicializado.")
return
# Usar la nueva carpeta de notas
ruta_notas = config.NOTES_FOLDER
# Asegurar que la carpeta exista antes de abrir el diálogo
os.makedirs(ruta_notas, exist_ok=True)
archivo_seleccionado = filedialog.askopenfilename(
initialdir=ruta_notas,
title="Abrir Archivo de Notas",
filetypes=(("Archivos de texto", "*.txt"), ("Todos los archivos", "*.*")),
parent=root
)
if not archivo_seleccionado:
log_event("Apertura de archivo de notas cancelada.")
return
try:
with open(archivo_seleccionado, 'r', encoding='utf-8') as f:
contenido = f.read()
config.editor_texto.delete("1.0", tk.END)
config.editor_texto.insert("1.0", contenido)
nombre = os.path.basename(archivo_seleccionado)
log_event(f"Archivo de notas cargado con éxito: {nombre}")
messagebox.showinfo("Abierto", f"Archivo '{nombre}' cargado con éxito.")
except Exception as e:
log_event(f"ERROR al abrir o leer el archivo de notas: {e}")
messagebox.showerror("Error de Lectura", f"No se pudo leer el archivo seleccionado: {e}")
def guardar_texto(root):
"""
Pide al usuario un nombre de archivo y guarda el contenido del editor
en la carpeta Proyecto/data/notas/ como un archivo .txt.
"""
if not config.editor_texto:
log_event("ERROR: Editor de texto no inicializado.")
return
try:
nombre_archivo = simpledialog.askstring(
"Guardar Archivo",
"Introduce el nombre del archivo (ej: notas)",
parent=root
)
except Exception as e:
log_event(f"ERROR al iniciar diálogo de guardado: {e}")
return
if not nombre_archivo:
log_event("Guardado cancelado por el usuario.")
return
ruta_notas = config.NOTES_FOLDER # Usar la nueva carpeta de notas
if not nombre_archivo.lower().endswith('.txt'):
nombre_archivo += '.txt'
ruta_completa = os.path.join(ruta_notas, nombre_archivo)
contenido = config.editor_texto.get("1.0", tk.END)
try:
os.makedirs(ruta_notas, exist_ok=True)
with open(ruta_completa, 'w', encoding='utf-8') as f:
f.write(contenido)
log_event(f"Archivo de notas guardado con éxito: {ruta_completa}")
messagebox.showinfo("Guardado", f"Archivo guardado como:\n{nombre_archivo}")
except PermissionError:
error_msg = f"ERROR: Permiso denegado al escribir en {ruta_completa}."
log_event(error_msg)
messagebox.showerror("Error de Permiso", error_msg)
except Exception as e:
error_msg = f"ERROR al guardar {nombre_archivo}: {e}"
log_event(error_msg)
messagebox.showerror("Error de Escritura", error_msg)
# ===============================================
# Funciones de Web Scraping Adicionales
# ===============================================
def detener_scraping():
"""Detiene la ejecución del hilo de scraping si está activo."""
if config.scraping_running:
config.scraping_running = False
if config.scraping_progress_bar:
config.scraping_progress_bar.stop()
log_event("🛑 Proceso de Web Scrapear solicitado para detenerse.")
# Opcional: Escribir un mensaje de detención en el área de salida
if config.scraping_output_text:
config.scraping_output_text.insert(tk.END, "\n--- PROCESO CANCELADO POR EL USUARIO ---\n")
else:
log_event("El proceso de Web Scrapear no está actualmente activo.")
def guardar_scraping(contenido, root):
"""
Guarda el contenido del ScrolledText de scraping en un archivo TXT
en la carpeta Proyecto/data/scraping/.
"""
if not contenido or contenido.strip() == "Resultado de la Extracción:":
messagebox.showwarning("Advertencia", "El área de resultados está vacía.")
return
# Pedir un nombre de archivo
try:
nombre_base = datetime.datetime.now().strftime("scraping_%Y%m%d_%H%M%S")
nombre_archivo = simpledialog.askstring(
"Guardar Resultado",
f"Introduce el nombre del archivo (predeterminado: {nombre_base})",
initialvalue=nombre_base,
parent=root
)
except Exception as e:
log_event(f"ERROR al iniciar diálogo de guardado de scraping: {e}")
return
if not nombre_archivo:
log_event("Guardado de scraping cancelado por el usuario.")
return
# Rutas
ruta_scrapping = config.SCRAPING_FOLDER
if not nombre_archivo.lower().endswith('.txt'):
nombre_archivo += '.txt'
ruta_completa = os.path.join(ruta_scrapping, nombre_archivo)
try:
# Asegurar que la carpeta exista
os.makedirs(ruta_scrapping, exist_ok=True)
with open(ruta_completa, 'w', encoding='utf-8') as f:
f.write(contenido)
log_event(f"Resultado de scraping guardado con éxito: {ruta_completa}")
messagebox.showinfo("Guardado", f"Resultado de scraping guardado como:\n{nombre_archivo}")
except Exception as e:
error_msg = f"ERROR al guardar el resultado de scraping: {e}"
log_event(error_msg)
messagebox.showerror("Error de Escritura", error_msg)
def abrir_archivo_scraping_config(root):
"""
Abre un diálogo para seleccionar un archivo JSON de configuración de scraping
y lo carga en config.scraping_config_data.
"""
try:
# 1. Asegurar que la carpeta exista
os.makedirs(config.SCRAPING_CONFIG_FOLDER, exist_ok=True)
# 2. Abrir diálogo de selección
archivo_seleccionado = filedialog.askopenfilename(
initialdir=config.SCRAPING_CONFIG_FOLDER,
title="Cargar Configuración de Scraping (JSON)",
filetypes=(("Archivos JSON", "*.json"), ("Todos los archivos", "*.*")),
parent=root
)
if not archivo_seleccionado:
log_event("Carga de configuración de scraping cancelada.")
return
# 3. Cargar el JSON
with open(archivo_seleccionado, 'r', encoding='utf-8') as f:
data = json.load(f)
# 4. Validar estructura mínima (opcional, pero buena práctica)
required_keys = ['type', 'selector']
if not all(key in data for key in required_keys):
messagebox.showerror("Error de Configuración", f"El archivo JSON debe contener al menos las claves: {', '.join(required_keys)}.")
log_event("ERROR: Configuración JSON incompleta.")
return
# 5. Guardar en la configuración global
config.scraping_config_data = data
# 6. Actualizar campos de la UI
file_name = os.path.basename(archivo_seleccionado)
# Actualizar URL de la interfaz si el JSON tiene 'url'
if 'url' in data and config.scraping_url_input:
config.scraping_url_input.set(data['url'])
# Actualizar Label del archivo cargado
if config.scraping_config_file_label:
config.scraping_config_file_label.config(text=f"Config: [{file_name}]")
log_event(f"Configuración de scraping '{file_name}' cargada con éxito.")
messagebox.showinfo("Éxito", f"Configuración de '{file_name}' cargada. Presiona 'Iniciar Scrapear'.")
except FileNotFoundError:
log_event("ERROR: Archivo de configuración no encontrado.")
messagebox.showerror("Error", "Archivo no encontrado.")
except json.JSONDecodeError:
log_event("ERROR: El archivo no es un JSON válido.")
messagebox.showerror("Error", "El archivo cargado no es un formato JSON válido.")
except Exception as e:
log_event(f"ERROR al cargar la configuración de scraping: {e}")
messagebox.showerror("Error", f"Fallo al cargar la configuración: {e}")
# ===============================================
# Funcionalidad de Juegos (NUEVO)
# ===============================================
def simular_juego_camellos(root):
"""
Simula una carrera de camellos con actualizaciones Thread-Safe,
mostrando la simulación en una nueva ventana Toplevel.
"""
# 1. Chequeo de juego activo
if hasattr(config, 'juego_window') and config.juego_window and config.juego_window.winfo_exists():
messagebox.showwarning("Advertencia", "Ya hay un juego activo. Ciérralo para iniciar uno nuevo.")
return
# 2. Inicializar la ventana de juego
juego_window = tk.Toplevel(root)
juego_window.title("Carrera de Camellos 🐫 (Thread-Safe)")
juego_window.geometry("550x300")
juego_window.resizable(False, False)
config.juego_window = juego_window
config.juego_running = True
# Variables de estado del juego
posiciones = {'Camello A': 0, 'Camello B': 0, 'Camello C': 0}
meta = 35
# 3. Widgets de la UI
tk.Label(juego_window, text="¡Iniciando Carrera!", font=('Helvetica', 14, 'bold')).pack(pady=10)
track_frame = tk.Frame(juego_window)
track_frame.pack(padx=20, pady=10, fill='x')
# Labels para la pista
labels = {}
for i, camello in enumerate(posiciones.keys()):
tk.Label(track_frame, text=f"{camello}:", font=('Helvetica', 10, 'bold')).grid(row=i, column=0, sticky='w', padx=5, pady=5)
# Usamos un font monoespaciado para que la barra se vea bien
labels[camello] = tk.Label(track_frame, text="🐫", anchor='w', width=45, font=('Courier', 10), bg='lightgray', relief='sunken')
labels[camello].grid(row=i, column=1, sticky='ew', padx=5, pady=5)
resultado_label = tk.Label(juego_window, text="En curso...", font=('Helvetica', 12))
resultado_label.pack(pady=10)
def cerrar_juego():
"""Función que maneja el cierre de la ventana del juego."""
config.juego_running = False
juego_window.destroy()
log_event("Juego de camellos cerrado.")
juego_window.protocol("WM_DELETE_WINDOW", cerrar_juego)
def avanzar_carrera():
"""Lógica de avance de la carrera (simulación)."""
if not config.juego_running or not juego_window.winfo_exists():
return
ganador = None
for camello in posiciones.keys():
if posiciones[camello] < meta:
# Avance aleatorio (1-3 pasos)
avance = random.randint(1, 3)
posiciones[camello] += avance
# Actualizar la representación visual (Thread-Safe)
bar = " " * min(posiciones[camello], meta)
# Para la visualización, el camello siempre está al final de la barra
labels[camello].config(text=bar + "🐫" + " " * (meta - posiciones[camello]) + " | META")
if posiciones[camello] >= meta:
ganador = camello
break
if ganador:
resultado_label.config(text=f"¡{ganador} ha ganado la carrera!", fg='green')
config.juego_running = False
log_event(f"Carrera de camellos finalizada. Ganador: {ganador}")
# Botón para cerrar
ttk.Button(juego_window, text="Cerrar Juego", command=cerrar_juego).pack(pady=10)
elif config.juego_running:
# Re-programar el siguiente paso (Thread-Safe)
juego_window.after(300, avanzar_carrera)
log_event("Simulación de Carrera de Camellos iniciada.")
juego_window.after(100, avanzar_carrera) # Iniciar la simulación