nicegui_app/services.py

290 lines
10 KiB
Python

import os
import time
import shutil
import threading
import multiprocessing
import sqlite3
import datetime
import psutil
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import asyncio
# =============================================================================
# BASE DE DATOS (SQLite simple para el editor)
# =============================================================================
DB_PATH = os.path.join(os.path.dirname(__file__), 'dashboard.db')
def init_db():
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS notes (id INTEGER PRIMARY KEY, content TEXT)''')
# Crear nota inicial si no existe
c.execute('SELECT count(*) FROM notes WHERE id=1')
if c.fetchone()[0] == 0:
c.execute('INSERT INTO notes (id, content) VALUES (1, "")')
conn.commit()
conn.close()
def get_note_content():
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute('SELECT content FROM notes WHERE id=1')
res = c.fetchone()
conn.close()
return res[0] if res else ""
def save_note_content_process(content):
"""Función que ejecutará el proceso independiente"""
# Simulamos carga
time.sleep(1)
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute('UPDATE notes SET content = ? WHERE id=1', (content,))
conn.commit()
conn.close()
print(f"[Proceso Editor] Notas guardadas: {len(content)} chars")
# =============================================================================
# VARIABLES GLOBALES Y ESTADO
# =============================================================================
trafico_red = {
'bajada_kbps': 0.0,
'subida_kbps': 0.0,
'total_bajada': 0,
'total_subida': 0,
}
# Para rastrear threads creados por nosotros
mis_threads = []
# Para contar tareas activas - simplificado
tareas_activas = {
'threads': 0,
'procesos': 0
}
# =============================================================================
# POOL DE THREADS Y PROCESOS
# =============================================================================
# Crear un pool global de threads para ejecutar tareas en paralelo sin bloqueo
thread_pool = ThreadPoolExecutor(max_workers=10, thread_name_prefix="TaskWorker")
process_pool = ProcessPoolExecutor(max_workers=4)
# =============================================================================
# TAREAS EN SEGUNDO PLANO (THREADS)
# =============================================================================
def monitor_red_loop():
global trafico_red
if not psutil: return
anterior = psutil.net_io_counters()
tiempo_anterior = time.time()
while True:
time.sleep(1)
actual = psutil.net_io_counters()
tiempo_actual = time.time()
segundos = tiempo_actual - tiempo_anterior
if segundos < 0.001: segundos = 0.001
bajada = (actual.bytes_recv - anterior.bytes_recv) / segundos / 1024
subida = (actual.bytes_sent - anterior.bytes_sent) / segundos / 1024
# Actualizar valores del diccionario existente en lugar de reasignarlo
trafico_red['bajada_kbps'] = bajada
trafico_red['subida_kbps'] = subida
trafico_red['total_bajada'] = actual.bytes_recv
trafico_red['total_subida'] = actual.bytes_sent
anterior = actual
tiempo_anterior = tiempo_actual
# Arrancar monitor de red al importar
t_red = threading.Thread(target=monitor_red_loop, daemon=True, name="MonitorRed")
t_red.start()
mis_threads.append(t_red)
def tarea_alarma_thread(segundos, mensaje, callback_fin):
"""Espera y luego llama al callback (que actualizará la UI)"""
time.sleep(segundos)
if callback_fin:
callback_fin(mensaje)
# =============================================================================
# TAREAS MULTIPROCESO
# =============================================================================
def tarea_backup_process(origen, destino):
time.sleep(2) # Simular trabajo pesado
if not os.path.exists(destino):
os.makedirs(destino)
nombre_archivo = f"backup_{int(time.time())}"
ruta_completa = os.path.join(destino, nombre_archivo)
shutil.make_archive(ruta_completa, 'zip', origen)
print(f"[Proceso Backup] Creado en {ruta_completa}.zip")
# =============================================================================
# SCRAPING WEB (THREAD)
# =============================================================================
def tarea_scraping_thread(url, lista_resultados):
"""Realiza scraping de una URL y guarda el resultado en la lista compartida"""
try:
import requests
from bs4 import BeautifulSoup
print(f"[Thread Scraping] Scrapeando {url}...")
# Delay mínimo para simular trabajo (hace visible el contador)
time.sleep(1)
# Headers para simular un navegador real
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36'
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
# Parsear HTML con BeautifulSoup
soup = BeautifulSoup(response.text, 'html.parser')
# Extraer información
titulo = soup.title.string.strip() if soup.title else "Sin título"
# Contar elementos
num_links = len(soup.find_all('a'))
num_imagenes = len(soup.find_all('img'))
num_parrafos = len(soup.find_all('p'))
# Extraer metadescripción si existe
meta_desc = ""
meta_tag = soup.find('meta', attrs={'name': 'description'})
if meta_tag and meta_tag.get('content'):
meta_desc = meta_tag['content'][:100] + "..." if len(meta_tag['content']) > 100 else meta_tag['content']
resultado = {
'url': url,
'titulo': titulo,
'descripcion': meta_desc,
'num_links': num_links,
'num_imagenes': num_imagenes,
'num_parrafos': num_parrafos,
'longitud': len(response.text),
'status_code': response.status_code,
'timestamp': datetime.datetime.now().strftime('%H:%M:%S')
}
lista_resultados.append(resultado)
print(f"[Thread Scraping] {titulo} - {num_links} links, {num_imagenes} imgs")
except Exception as e:
lista_resultados.append({
'url': url,
'error': str(e),
'timestamp': datetime.datetime.now().strftime('%H:%M:%S')
})
print(f"[Thread Scraping] Error: {e}")
# =============================================================================
# REPRODUCTOR MP3 (THREAD)
# =============================================================================
def tarea_reproducir_mp3_thread(archivo_mp3, lista_estado):
"""Reproduce un archivo MP3 usando pygame.mixer"""
try:
import pygame
import os
print("[Thread MP3] Iniciando reproducción...")
print(f"[Thread MP3] Archivo: {archivo_mp3}")
print(f"[Thread MP3] Existe archivo? {os.path.exists(archivo_mp3)}")
# Inicializar pygame mixer
pygame.mixer.init()
print("[Thread MP3] pygame.mixer inicializado")
# Cargar y reproducir el MP3
pygame.mixer.music.load(archivo_mp3)
print("[Thread MP3] Archivo cargado, reproduciendo...")
pygame.mixer.music.play()
# Esperar a que termine la reproducción
while pygame.mixer.music.get_busy():
time.sleep(0.1)
print("[Thread MP3] Reproducción finalizada exitosamente")
lista_estado.append({'status': 'completed', 'file': archivo_mp3})
# Limpiar
pygame.mixer.quit()
except Exception as e:
lista_estado.append({'status': 'error', 'error': str(e)})
print(f"[Thread MP3] Error: {e}")
try:
pygame.mixer.quit()
except:
pass
# =============================================================================
# FUNCIONES WRAPPER PARA EJECUTAR TAREAS DE FORMA NO BLOQUEANTE
# =============================================================================
def ejecutar_en_thread(funcion, *args, **kwargs):
"""
Ejecuta una función en el pool de threads de forma no bloqueante.
Retorna un Future que puede ser cancelado o monitoreado.
Incrementa el contador de tareas activas.
"""
global tareas_activas
# Incrementar ANTES de enviar al pool
tareas_activas['threads'] += 1
print(f"[Contador] Threads: {tareas_activas['threads']} (+1)")
# Wrapper para decrementar contador cuando termine
def wrapper():
try:
resultado = funcion(*args, **kwargs)
print("[Thread] Completado exitosamente")
return resultado
except Exception as e:
print(f"[Thread] Error: {e}")
raise
finally:
tareas_activas['threads'] -= 1
print(f"[Contador] Threads: {tareas_activas['threads']} (-1)")
future = thread_pool.submit(wrapper)
return future
def ejecutar_en_proceso(funcion, *args, **kwargs):
"""
Ejecuta una función en el pool de procesos de forma no bloqueante.
Retorna un Future que puede ser cancelado o monitoreado.
Incrementa el contador de tareas activas.
IMPORTANTE: Usa un thread auxiliar para monitorear el proceso y actualizar
el contador cuando termine, ya que los callbacks de ProcessPoolExecutor
pueden no ejecutarse correctamente en todos los contextos.
"""
global tareas_activas
tareas_activas['procesos'] += 1
print(f"[Contador] Procesos: {tareas_activas['procesos']} (+1)")
future = process_pool.submit(funcion, *args, **kwargs)
# Usamos un thread auxiliar para esperar el resultado y decrementar
def monitor_proceso():
try:
future.result() # Espera a que termine (bloqueante en este thread)
print("[Proceso] Completado exitosamente")
except Exception as e:
print(f"[Proceso] Error: {e}")
finally:
global tareas_activas
tareas_activas['procesos'] -= 1
print(f"[Contador] Procesos: {tareas_activas['procesos']} (-1)")
# Lanzar thread monitor (no cuenta en el pool de threads)
monitor_thread = threading.Thread(target=monitor_proceso, daemon=True, name="MonitorProceso")
monitor_thread.start()
return future