import os import time import shutil import threading import multiprocessing import sqlite3 import datetime import psutil from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor import asyncio # ============================================================================= # BASE DE DATOS (SQLite simple para el editor) # ============================================================================= DB_PATH = os.path.join(os.path.dirname(__file__), 'dashboard.db') def init_db(): conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute('''CREATE TABLE IF NOT EXISTS notes (id INTEGER PRIMARY KEY, content TEXT)''') # Crear nota inicial si no existe c.execute('SELECT count(*) FROM notes WHERE id=1') if c.fetchone()[0] == 0: c.execute('INSERT INTO notes (id, content) VALUES (1, "")') conn.commit() conn.close() def get_note_content(): conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute('SELECT content FROM notes WHERE id=1') res = c.fetchone() conn.close() return res[0] if res else "" def save_note_content_process(content): """Función que ejecutará el proceso independiente""" # Simulamos carga time.sleep(1) conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute('UPDATE notes SET content = ? WHERE id=1', (content,)) conn.commit() conn.close() print(f"[Proceso Editor] Notas guardadas: {len(content)} chars") # ============================================================================= # VARIABLES GLOBALES Y ESTADO # ============================================================================= trafico_red = { 'bajada_kbps': 0.0, 'subida_kbps': 0.0, 'total_bajada': 0, 'total_subida': 0, } # Para rastrear threads creados por nosotros mis_threads = [] # Para contar tareas activas - simplificado tareas_activas = { 'threads': 0, 'procesos': 0 } # ============================================================================= # POOL DE THREADS Y PROCESOS # ============================================================================= # Crear un pool global de threads para ejecutar tareas en paralelo sin bloqueo thread_pool = ThreadPoolExecutor(max_workers=10, thread_name_prefix="TaskWorker") process_pool = ProcessPoolExecutor(max_workers=4) # ============================================================================= # TAREAS EN SEGUNDO PLANO (THREADS) # ============================================================================= def monitor_red_loop(): global trafico_red if not psutil: return anterior = psutil.net_io_counters() tiempo_anterior = time.time() while True: time.sleep(1) actual = psutil.net_io_counters() tiempo_actual = time.time() segundos = tiempo_actual - tiempo_anterior if segundos < 0.001: segundos = 0.001 bajada = (actual.bytes_recv - anterior.bytes_recv) / segundos / 1024 subida = (actual.bytes_sent - anterior.bytes_sent) / segundos / 1024 # Actualizar valores del diccionario existente en lugar de reasignarlo trafico_red['bajada_kbps'] = bajada trafico_red['subida_kbps'] = subida trafico_red['total_bajada'] = actual.bytes_recv trafico_red['total_subida'] = actual.bytes_sent anterior = actual tiempo_anterior = tiempo_actual # Arrancar monitor de red al importar t_red = threading.Thread(target=monitor_red_loop, daemon=True, name="MonitorRed") t_red.start() mis_threads.append(t_red) def tarea_alarma_thread(segundos, mensaje, callback_fin): """Espera y luego llama al callback (que actualizará la UI)""" time.sleep(segundos) if callback_fin: callback_fin(mensaje) # ============================================================================= # TAREAS MULTIPROCESO # ============================================================================= def tarea_backup_process(origen, destino): time.sleep(2) # Simular trabajo pesado if not os.path.exists(destino): os.makedirs(destino) nombre_archivo = f"backup_{int(time.time())}" ruta_completa = os.path.join(destino, nombre_archivo) shutil.make_archive(ruta_completa, 'zip', origen) print(f"[Proceso Backup] Creado en {ruta_completa}.zip") # ============================================================================= # SCRAPING WEB (THREAD) # ============================================================================= def tarea_scraping_thread(url, lista_resultados): """Realiza scraping de una URL y guarda el resultado en la lista compartida""" try: import requests from bs4 import BeautifulSoup print(f"[Thread Scraping] Scrapeando {url}...") # Delay mínimo para simular trabajo (hace visible el contador) time.sleep(1) # Headers para simular un navegador real headers = { 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36' } response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() # Parsear HTML con BeautifulSoup soup = BeautifulSoup(response.text, 'html.parser') # Extraer información titulo = soup.title.string.strip() if soup.title else "Sin título" # Contar elementos num_links = len(soup.find_all('a')) num_imagenes = len(soup.find_all('img')) num_parrafos = len(soup.find_all('p')) # Extraer metadescripción si existe meta_desc = "" meta_tag = soup.find('meta', attrs={'name': 'description'}) if meta_tag and meta_tag.get('content'): meta_desc = meta_tag['content'][:100] + "..." if len(meta_tag['content']) > 100 else meta_tag['content'] resultado = { 'url': url, 'titulo': titulo, 'descripcion': meta_desc, 'num_links': num_links, 'num_imagenes': num_imagenes, 'num_parrafos': num_parrafos, 'longitud': len(response.text), 'status_code': response.status_code, 'timestamp': datetime.datetime.now().strftime('%H:%M:%S') } lista_resultados.append(resultado) print(f"[Thread Scraping] {titulo} - {num_links} links, {num_imagenes} imgs") except Exception as e: lista_resultados.append({ 'url': url, 'error': str(e), 'timestamp': datetime.datetime.now().strftime('%H:%M:%S') }) print(f"[Thread Scraping] Error: {e}") # ============================================================================= # REPRODUCTOR MP3 (THREAD) # ============================================================================= def tarea_reproducir_mp3_thread(archivo_mp3, lista_estado): """Reproduce un archivo MP3 usando pygame.mixer""" try: import pygame import os print("[Thread MP3] Iniciando reproducción...") print(f"[Thread MP3] Archivo: {archivo_mp3}") print(f"[Thread MP3] Existe archivo? {os.path.exists(archivo_mp3)}") # Inicializar pygame mixer pygame.mixer.init() print("[Thread MP3] pygame.mixer inicializado") # Cargar y reproducir el MP3 pygame.mixer.music.load(archivo_mp3) print("[Thread MP3] Archivo cargado, reproduciendo...") pygame.mixer.music.play() # Esperar a que termine la reproducción while pygame.mixer.music.get_busy(): time.sleep(0.1) print("[Thread MP3] Reproducción finalizada exitosamente") lista_estado.append({'status': 'completed', 'file': archivo_mp3}) # Limpiar pygame.mixer.quit() except Exception as e: lista_estado.append({'status': 'error', 'error': str(e)}) print(f"[Thread MP3] Error: {e}") try: pygame.mixer.quit() except: pass # ============================================================================= # FUNCIONES WRAPPER PARA EJECUTAR TAREAS DE FORMA NO BLOQUEANTE # ============================================================================= def ejecutar_en_thread(funcion, *args, **kwargs): """ Ejecuta una función en el pool de threads de forma no bloqueante. Retorna un Future que puede ser cancelado o monitoreado. Incrementa el contador de tareas activas. """ global tareas_activas # Incrementar ANTES de enviar al pool tareas_activas['threads'] += 1 print(f"[Contador] Threads: {tareas_activas['threads']} (+1)") # Wrapper para decrementar contador cuando termine def wrapper(): try: resultado = funcion(*args, **kwargs) print("[Thread] Completado exitosamente") return resultado except Exception as e: print(f"[Thread] Error: {e}") raise finally: tareas_activas['threads'] -= 1 print(f"[Contador] Threads: {tareas_activas['threads']} (-1)") future = thread_pool.submit(wrapper) return future def ejecutar_en_proceso(funcion, *args, **kwargs): """ Ejecuta una función en el pool de procesos de forma no bloqueante. Retorna un Future que puede ser cancelado o monitoreado. Incrementa el contador de tareas activas. IMPORTANTE: Usa un thread auxiliar para monitorear el proceso y actualizar el contador cuando termine, ya que los callbacks de ProcessPoolExecutor pueden no ejecutarse correctamente en todos los contextos. """ global tareas_activas tareas_activas['procesos'] += 1 print(f"[Contador] Procesos: {tareas_activas['procesos']} (+1)") future = process_pool.submit(funcion, *args, **kwargs) # Usamos un thread auxiliar para esperar el resultado y decrementar def monitor_proceso(): try: future.result() # Espera a que termine (bloqueante en este thread) print("[Proceso] Completado exitosamente") except Exception as e: print(f"[Proceso] Error: {e}") finally: global tareas_activas tareas_activas['procesos'] -= 1 print(f"[Contador] Procesos: {tareas_activas['procesos']} (-1)") # Lanzar thread monitor (no cuenta en el pool de threads) monitor_thread = threading.Thread(target=monitor_proceso, daemon=True, name="MonitorProceso") monitor_thread.start() return future