PSP/client.py

1041 lines
47 KiB
Python

import tkinter as tk
from tkinter import ttk, messagebox, simpledialog, filedialog
import threading
import time
import datetime
import subprocess
import webbrowser
import shutil
import socket
import queue
import hashlib
import json
import sys
import platform
try:
import pygame
pygame.mixer.init()
except ModuleNotFoundError:
pygame = None
try:
from cryptography.hazmat.primitives import hashes as crypto_hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding as rsa_padding
from cryptography.hazmat.backends import default_backend
except ModuleNotFoundError:
rsa = None
def start_client():
root = tk.Tk()
root.title("Cliente - Panel PSP (mockup)")
root.geometry("1200x800")
# Cola de eventos para comunicación hilo->GUI
event_queue = queue.Queue()
# Grid principal: top bar, main content, status
root.columnconfigure(0, weight=0, minsize=240)
root.columnconfigure(1, weight=1)
root.columnconfigure(2, weight=0, minsize=320)
root.rowconfigure(0, weight=0) # barra superior fija
root.rowconfigure(1, weight=1) # contenido
root.rowconfigure(2, weight=0) # barra estado
# Barra superior T1..T5 + Configuración
top_bar = tk.Frame(root, bg='#ffffff', height=40)
top_bar.grid(row=0, column=0, columnspan=3, sticky='ew')
top_bar.grid_propagate(False)
categorias = [
('T1. Procesos', '#1e90ff'),
('T2.Threads', '#c71585'),
('T3. Sockets', '#ff4500'),
('T4. Servicios', '#228b22'),
('T5. Seguridad', '#daa520'),
('Configuración', '#d3d3d3')
]
def seleccionar_categoria(nombre):
info_label.config(text=f"Categoría seleccionada: {nombre}")
# Creamos después info_label; el callback se ejecuta luego y tendrá acceso.
for i, (texto, color) in enumerate(categorias):
lbl = tk.Label(top_bar, text=texto, bg=color, fg='white', font=('Helvetica', 11, 'bold'), padx=10, pady=6, cursor='hand2')
lbl.pack(side='left', padx=(8 if i==0 else 4, 4), pady=4)
lbl.bind('<Button-1>', lambda e, n=texto: seleccionar_categoria(n))
# Frames principales
left = tk.Frame(root, bg="#f8f8f8")
center = tk.Frame(root, bg="#ffffff")
right = tk.Frame(root, bg="#ffffff")
left.grid(row=1, column=0, sticky="nsw", padx=6, pady=(6,6))
center.grid(row=1, column=1, sticky="nsew", padx=6, pady=(6,6))
right.grid(row=1, column=2, sticky="nse", padx=6, pady=(6,6))
left.grid_propagate(False)
right.grid_propagate(False)
# LEFT: acciones y listas
def section(parent, title):
f = tk.LabelFrame(parent, text=title, padx=6, pady=6)
f.pack(fill="x", pady=8, padx=8)
return f
s_actions = section(left, "")
# --- funcionalidad adicional ---
def open_vscode():
exe = shutil.which('code') or shutil.which('code-insiders') or shutil.which('code-oss')
if exe:
try:
subprocess.Popen([exe])
info_label.config(text="Abriendo Visual Studio Code...")
except Exception as e:
messagebox.showerror("Error", f"No se pudo abrir VS Code: {e}")
else:
webbrowser.open('https://code.visualstudio.com/')
info_label.config(text="VS Code no encontrado: abriendo web como alternativa")
def open_browser():
url = simpledialog.askstring('Abrir URL', 'Ingresa la URL a abrir:')
if url and url.strip():
url = url.strip()
if not url.startswith(('http://', 'https://')):
url = 'https://' + url
try:
webbrowser.open(url)
event_queue.put(('status', f'Abierto navegador: {url}'))
except Exception as e:
event_queue.put(('status', f'Error: {e}'))
def buscar_google():
url = 'https://publicapis.dev/'
def worker():
event_queue.put(('status', f'Scraping APIs desde {url}...'))
try:
import requests
from bs4 import BeautifulSoup
response = requests.get(url, timeout=10, headers={'User-Agent': 'Mozilla/5.0'})
soup = BeautifulSoup(response.content, 'html.parser')
# Extraer APIs con sus URLs
apis = []
# Buscar todos los links que contengan URLs de APIs
links = soup.find_all('a', href=True)
for link in links:
href = link.get('href', '').strip()
texto = link.get_text().strip()
# Filtrar links válidos
if (href and (href.startswith('http://') or href.startswith('https://')) and
len(texto) > 2 and len(texto) < 100 and
texto not in [t[0] for t in apis]): # Evitar duplicados por nombre
apis.append((texto, href))
# Si no encuentra suficientes, buscar en divs
if len(apis) < 10:
api_elements = soup.find_all(['div', 'section', 'article'])
for elem in api_elements[:50]:
elem_link = elem.find('a', href=True)
if elem_link:
href = elem_link.get('href', '').strip()
texto = elem.get_text().strip()[:80]
if (href and (href.startswith('http://') or href.startswith('https://')) and
len(texto) > 2 and
(texto, href) not in apis):
apis.append((texto, href))
# Construir resultado - TODAS las APIs con URLs
resultado = f'\n{"="*70}\n'
resultado += f'URL: {url}\n'
resultado += f'{"="*70}\n\n'
resultado += f'🔗 APIs ENCONTRADAS ({len(apis)})\n'
resultado += f'{"-"*70}\n'
if apis:
for nombre, api_url in apis:
resultado += f'{nombre}\n'
resultado += f' URL: {api_url}\n\n'
else:
resultado += ' [No se encontraron]\n'
resultado += f'{"="*70}\n'
event_queue.put(('scrape_result', resultado))
event_queue.put(('status', f'✓ APIs encontradas: {len(apis)}'))
except Exception as e:
event_queue.put(('scrape_result', f'ERROR AL PROCESAR:\n{str(e)}'))
event_queue.put(('status', f'✗ Error: {e}'))
threading.Thread(target=worker, daemon=True, name='APIsScraper').start()
# Alarma
def configurar_alarma():
minutos = simpledialog.askinteger('Alarma', 'Minutos hasta alarma', minvalue=1, maxvalue=720)
if not minutos:
return
def worker():
target = time.time() + minutos*60
while True:
restante = int(target - time.time())
if restante <= 0:
event_queue.put(('alarm', '¡Alarma!'))
break
event_queue.put(('alarm_progress', restante))
time.sleep(1)
threading.Thread(target=worker, daemon=True).start()
tk.Button(s_actions, text='Programar Alarma', bg='#ffe0ff', width=24, command=configurar_alarma).pack(pady=6)
tk.Button(s_actions, text="Navegar (URL)", bg="#dff0d8", width=24, command=open_browser).pack(pady=6)
tk.Button(s_actions, text="Buscar API Google", bg="#dff0d8", width=24, command=buscar_google).pack(pady=6)
# Launch external command with parameters
def launch_command():
cmd = simpledialog.askstring("Lanzar comando", "Introduce el comando a ejecutar (ej: firefox https://google.com):")
if not cmd:
return
try:
# split by shell to allow parameters; run via shell for convenience
subprocess.Popen(cmd, shell=True)
info_label.config(text=f"Lanzado: {cmd}")
except Exception as e:
messagebox.showerror("Error", f"No se pudo ejecutar el comando: {e}")
tk.Button(s_actions, text="Lanzar aplicación (con parámetros)", bg="#e8f4ff", width=30, command=launch_command).pack(pady=6)
# Execute PowerShell script (.ps1) if pwsh/powershell available
def run_powershell_script():
path = filedialog.askopenfilename(title="Selecciona script PowerShell (.ps1)", filetypes=[("PowerShell", "*.ps1" )])
if not path:
return
exe = shutil.which('pwsh') or shutil.which('powershell')
if not exe:
messagebox.showwarning("pwsh no encontrado", "No se encontró 'pwsh' ni 'powershell' en PATH. En Linux puedes instalar PowerShell Core (pwsh) o ejecutar el script en Windows.")
return
try:
subprocess.Popen([exe, '-File', path])
info_label.config(text=f"Ejecutando script PowerShell: {path}")
except Exception as e:
messagebox.showerror("Error", f"Fallo al ejecutar el script: {e}")
tk.Button(s_actions, text="Ejecutar .ps1 (PowerShell)", bg="#ffeedd", width=30, command=run_powershell_script).pack(pady=6)
s_apps = section(left, "Aplicaciones")
tk.Button(s_apps, text="Visual Code", bg="#e6f7ff", width=24, command=open_vscode).pack(pady=4)
tk.Button(s_apps, text="App2", bg="#e6f7ff", width=24).pack(pady=4)
tk.Button(s_apps, text="App3", bg="#e6f7ff", width=24).pack(pady=4)
# Resource monitor and editor
def open_text_editor():
ed = tk.Toplevel(root)
ed.title("Editor - Notepad simple")
ed.geometry("700x500")
text = tk.Text(ed)
text.pack(fill='both', expand=True)
def save_file():
p = filedialog.asksaveasfilename(defaultextension='.txt')
if p:
with open(p, 'w', encoding='utf-8') as f:
f.write(text.get('1.0', 'end'))
info_label.config(text=f"Fichero guardado: {p}")
def open_file():
p = filedialog.askopenfilename()
if p:
with open(p, 'r', encoding='utf-8', errors='ignore') as f:
text.delete('1.0', 'end')
text.insert('1.0', f.read())
info_label.config(text=f"Fichero abierto: {p}")
btns = tk.Frame(ed)
btns.pack(fill='x')
tk.Button(btns, text='Abrir', command=open_file).pack(side='left')
tk.Button(btns, text='Guardar', command=save_file).pack(side='left')
tk.Button(s_apps, text="Editor texto", bg="#f0e6ff", width=24, command=open_text_editor).pack(pady=6)
# Hash archivo
def hash_archivo():
path = filedialog.askopenfilename(title='Selecciona archivo para hash')
if not path:
return
def worker():
event_queue.put(('status', f'Calculando SHA256 de {path}'))
try:
h = hashlib.sha256()
with open(path, 'rb') as f:
for chunk in iter(lambda: f.read(8192), b''):
h.update(chunk)
event_queue.put(('hash', f'{path}\nSHA256: {h.hexdigest()}'))
event_queue.put(('status', 'Hash completado'))
except Exception as e:
event_queue.put(('status', f'Error hash: {e}'))
threading.Thread(target=worker, daemon=True).start()
tk.Button(s_apps, text='Hash archivo', bg='#e6ffe6', width=24, command=hash_archivo).pack(pady=4)
def open_resource_monitor():
# Monitor gráfico sin depender de Pillow: Canvas puro.
try:
import psutil
except ModuleNotFoundError:
messagebox.showerror("Dependencia falta", "Instala 'psutil' para monitor de recursos: pip install psutil")
return
win = tk.Toplevel(root)
win.title('Monitor recursos (Canvas)')
width, height = 600, 360
cv = tk.Canvas(win, width=width, height=height, bg='white')
cv.pack(fill='both', expand=True)
# Series
cpu_data = []
mem_data = []
thr_data = []
maxlen = 120
def draw_axes():
cv.delete('axis')
cv.create_rectangle(50,20,width-20,height-20, outline='#444', tags='axis')
for i in range(6):
y = 20 + (height-40)*i/5
cv.create_line(50,y,width-20,y, fill='#eee', tags='axis')
cv.create_text(10,20, text='100%', anchor='nw', tags='axis')
cv.create_text(10,height-40, text='0%', anchor='nw', tags='axis')
cv.create_text(width-120,10, text='CPU( rojo ) MEM( verde ) HILOS( azul )', anchor='nw', tags='axis')
def scale_y(val, series_max=100):
# map 0..series_max to canvas space
return 20 + (height-40)*(1 - val/series_max)
def draw_series():
cv.delete('series')
# Determine dynamic max for threads
thr_max = max(thr_data) if thr_data else 1
def draw_line(data, color, yscale_max):
if len(data) < 2:
return
step_x = (width-70)/ (maxlen-1)
pts = []
start_index = max(0, len(data)-maxlen)
for idx, val in enumerate(data[start_index:]):
x = 50 + step_x*idx
y = scale_y(val, yscale_max)
pts.append((x,y))
for i in range(len(pts)-1):
cv.create_line(pts[i][0], pts[i][1], pts[i+1][0], pts[i+1][1], fill=color, width=2, tags='series')
draw_line(cpu_data, 'red', 100)
draw_line(mem_data, 'green', 100)
draw_line(thr_data, 'blue', thr_max)
if thr_data:
cv.create_text(width-180,height-25, text=f'Threads max: {thr_max}', anchor='nw', tags='series', fill='blue')
def update():
try:
cpu = psutil.cpu_percent(interval=None)
mem = psutil.virtual_memory().percent
thr = sum(p.num_threads() for p in psutil.process_iter())
cpu_data.append(cpu); mem_data.append(mem); thr_data.append(thr)
cpu_data[:] = cpu_data[-maxlen:]
mem_data[:] = mem_data[-maxlen:]
thr_data[:] = thr_data[-maxlen:]
draw_axes(); draw_series()
cv.create_text(60, height-25, text=f'CPU {cpu:.1f}% MEM {mem:.1f}% HILOS {thr}', anchor='nw', fill='#222', tags='series')
except tk.TclError:
return
win.after(1000, update)
draw_axes(); update()
tk.Button(left, text="Monitor recursos (gráficas)", bg="#fff2cc", width=30, command=open_resource_monitor).pack(pady=6)
s_batch = section(left, "Procesos batch")
def realizar_backup():
# OS-aware backup: Windows uses PowerShell Compress-Archive; Linux uses tar.gz
src = filedialog.askdirectory(title='Directorio origen a respaldar')
if not src:
return
dest = filedialog.asksaveasfilename(title='Archivo destino backup', defaultextension='.zip' if platform.system()=='Windows' else '.tar.gz')
if not dest:
return
def worker():
event_queue.put(('status', f'Iniciando backup de {src} -> {dest}'))
if platform.system()=='Windows':
exe = shutil.which('powershell') or shutil.which('pwsh')
if not exe:
event_queue.put(('status','PowerShell no encontrado'))
return
# Compress-Archive -Path src -DestinationPath dest
cmd = [exe, '-Command', f"Compress-Archive -Path '{src}' -DestinationPath '{dest}' -Force"]
try:
subprocess.run(cmd, timeout=600)
event_queue.put(('status','Backup completado (Windows)'))
except Exception as e:
event_queue.put(('status', f'Error backup: {e}'))
else:
# Linux/Unix: tar -czf dest -C parent basename
import os
parent = os.path.dirname(src)
base = os.path.basename(src)
cmd = ['tar','-czf',dest,'-C',parent,base]
try:
subprocess.run(cmd, timeout=600)
event_queue.put(('status','Backup completado (tar.gz)'))
except Exception as e:
event_queue.put(('status', f'Error backup: {e}'))
threading.Thread(target=worker, daemon=True).start()
tk.Button(s_batch, text="Copias de seguridad", bg="#fff0d6", width=24, command=realizar_backup).pack(pady=6)
# CENTER: Notebook grande + panel inferior
center.rowconfigure(0, weight=1)
center.rowconfigure(1, weight=0)
center.columnconfigure(0, weight=1)
notebook = ttk.Notebook(center)
notebook.grid(row=0, column=0, sticky="nsew", padx=6, pady=6)
# store text widgets so we can update a specific tab programmatically
tab_texts = {}
tab_names = ["Resultados", "Navegador", "Correos", "Tareas", "Alarmas", "Enlaces", "Servicios", "Seguridad"]
for name in tab_names:
f = ttk.Frame(notebook)
notebook.add(f, text=name)
# Añadir botones ANTES del área de texto para que sean visibles
if name == 'Navegador':
tk.Label(f, text='Herramientas de navegación:', font=('Arial', 10, 'bold')).pack(anchor='w', padx=6, pady=4)
nav_frame = tk.Frame(f)
nav_frame.pack(anchor='w', padx=6, pady=4)
def abrir_url_navegador():
url = simpledialog.askstring('Abrir URL', 'Ingresa la URL a abrir:')
if url and url.strip():
url = url.strip()
if not url.startswith(('http://', 'https://')):
url = 'https://' + url
try:
webbrowser.open(url)
event_queue.put(('status', f'Abriendo: {url}'))
except Exception as e:
event_queue.put(('status', f'Error: {e}'))
def iniciar_scraping():
url = simpledialog.askstring('Web Scraping', 'Ingresa la URL a escanear:')
if url and url.strip():
url = url.strip()
if not url.startswith(('http://', 'https://')):
url = 'https://' + url
def worker():
event_queue.put(('status', f'Scraping: {url}...'))
try:
import requests, re
from bs4 import BeautifulSoup
response = requests.get(url, timeout=10, headers={'User-Agent': 'Mozilla/5.0'})
soup = BeautifulSoup(response.content, 'html.parser')
# Extraer emails
emails = sorted(list(set(re.findall(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', response.text))))
# Extraer teléfonos
telefonos = sorted(list(set(re.findall(r'(?:\+?\d{1,3}[-.]?)?\(?\d{2,4}\)?[-.]?\d{2,4}[-.]?\d{2,4}|\d{9,}', response.text))))
telefonos = [t for t in telefonos if len(t) >= 9]
# Extraer nombres/títulos
nombres = set()
for elem in soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'b', 'strong', 'a']):
texto = elem.get_text().strip()
if 2 <= len(texto) <= 50:
nombres.add(texto)
nombres = sorted(list(nombres))[:30]
# Construir resultado con mejor formato
resultado = f'\n{"="*70}\n'
resultado += f'URL: {url}\n'
resultado += f'{"="*70}\n\n'
# Sección de Emails
resultado += f'📧 EMAILS ({len(emails)})\n'
resultado += f'{"-"*70}\n'
if emails:
for e in emails[:15]:
resultado += f' {e}\n'
if len(emails) > 15:
resultado += f'\n ... y {len(emails)-15} más\n'
else:
resultado += ' [No se encontraron]\n'
resultado += '\n'
# Sección de Teléfonos
resultado += f'📞 TELÉFONOS ({len(telefonos)})\n'
resultado += f'{"-"*70}\n'
if telefonos:
for t in telefonos[:15]:
resultado += f' {t}\n'
if len(telefonos) > 15:
resultado += f'\n ... y {len(telefonos)-15} más\n'
else:
resultado += ' [No se encontraron]\n'
resultado += '\n'
# Sección de Títulos/Nombres
resultado += f'👥 TÍTULOS/NOMBRES ({len(nombres)})\n'
resultado += f'{"-"*70}\n'
if nombres:
for n in nombres[:20]:
resultado += f' {n}\n'
if len(nombres) > 20:
resultado += f'\n ... y {len(nombres)-20} más\n'
else:
resultado += ' [No se encontraron]\n'
resultado += f'\n{"="*70}\n'
event_queue.put(('scrape_result', resultado))
event_queue.put(('status', f'✓ Completado: {len(emails)} emails, {len(telefonos)} telefonos'))
except Exception as e:
event_queue.put(('scrape_result', f'ERROR AL PROCESAR:\n{str(e)}'))
event_queue.put(('status', f'✗ Error: {e}'))
threading.Thread(target=worker, daemon=True, name='Scraper').start()
tk.Button(nav_frame, text='🌐 Abrir URL', bg='#87CEEB', font=('Arial', 10, 'bold'), command=abrir_url_navegador).pack(side='left', padx=2)
tk.Button(nav_frame, text='🔍 Extraer Datos', bg='#FFB6C1', font=('Arial', 10, 'bold'), command=iniciar_scraping).pack(side='left', padx=2)
if name == 'Tareas':
btn_frame = tk.Frame(f)
btn_frame.pack(anchor='w', padx=6, pady=4)
tk.Button(btn_frame, text='🏁 Iniciar Carrera de Camellos', bg='#90EE90', font=('Arial', 10, 'bold'),
command=lambda: iniciar_carrera(tab_texts['Tareas'])).pack(side='left', padx=2)
if name == 'Servicios':
srv_frame = tk.Frame(f)
srv_frame.pack(anchor='w', padx=6, pady=4)
tk.Button(srv_frame, text='POP3 listar', command=lambda: servicio_pop3()).pack(side='left', padx=2)
tk.Button(srv_frame, text='SMTP enviar', command=lambda: servicio_smtp()).pack(side='left', padx=2)
tk.Button(srv_frame, text='FTP listar', command=lambda: servicio_ftp()).pack(side='left', padx=2)
tk.Button(srv_frame, text='HTTP GET', command=lambda: consumir_api('https://httpbin.org/get')).pack(side='left', padx=2)
if name == 'Seguridad':
sec_frame = tk.Frame(f)
sec_frame.pack(anchor='w', padx=6, pady=4)
tk.Button(sec_frame, text='Generar RSA', command=lambda: generar_rsa()).pack(side='left', padx=2)
tk.Button(sec_frame, text='AES Cifrar', command=lambda: aes_cifrar()).pack(side='left', padx=2)
# Área de texto después de los botones
txt = tk.Text(f)
txt.insert("1.0", f"{name} - área de contenido\n\n")
txt.pack(fill="both", expand=True, padx=6, pady=6)
tab_texts[name] = txt
info = tk.Frame(center, bg="#f7fff0", height=120)
info.grid(row=1, column=0, sticky="ew", padx=6, pady=(0,6))
info.grid_propagate(False)
info_label = tk.Label(info, text="Panel para notas informativas y mensajes sobre la ejecución de los hilos.", bg="#f7fff0", anchor="w")
info_label.pack(fill='both', expand=True, padx=8, pady=8)
# RIGHT: Chat y lista de alumnos
chat_box = tk.LabelFrame(right, text="Chat", padx=6, pady=6)
chat_box.pack(fill="x", padx=8, pady=(8,4))
tk.Label(chat_box, text="Mensaje").pack(anchor="w")
msg = tk.Text(chat_box, height=6)
msg.pack(fill="x", pady=4)
tk.Button(chat_box, text="enviar", bg="#cfe8cf").pack(pady=(0,6))
students = tk.LabelFrame(right, text="Alumnos", padx=6, pady=6)
students.pack(fill="both", expand=True, padx=8, pady=(4,8))
for i in range(1, 4):
s = tk.Frame(students)
s.pack(fill="x", pady=6)
tk.Label(s, text=f"Alumno {i}", font=("Helvetica", 13, "bold")).pack(anchor="w")
tk.Label(s, text="Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod.", wraplength=280, justify="left").pack(anchor="w")
# Reproductor música
music_state = {'path': None, 'thread': None, 'playing': False, 'stopping': False}
def seleccionar_musica():
p = filedialog.askopenfilename(title='Seleccionar audio', filetypes=[('Audio','*.wav *.mp3 *.ogg'), ('Todos','*.*')])
if p:
music_state['path'] = p
info_label.config(text=f'Audio: {p}')
def reproducir_musica():
if pygame is None:
messagebox.showwarning('Audio', 'Instala pygame: pip install pygame')
return
path = music_state.get('path')
if not path:
seleccionar_musica(); path = music_state.get('path')
if not path:
return
if pygame.mixer.music.get_busy():
event_queue.put(('status', 'Ya reproduciendo'))
return
music_state['stopping'] = False
def worker():
event_queue.put(('status', f'Reproduciendo {path}'))
try:
pygame.mixer.music.load(path)
pygame.mixer.music.play()
music_state['playing'] = True
# Esperar mientras reproduce y no se detiene
while pygame.mixer.music.get_busy() and not music_state['stopping']:
time.sleep(0.1)
if not music_state['stopping']:
event_queue.put(('status', 'Audio terminado'))
music_state['playing'] = False
except Exception as e:
event_queue.put(('status', f'Error audio: {e}'))
music_state['playing'] = False
t = threading.Thread(target=worker, daemon=True); music_state['thread']=t; t.start()
def detener_musica():
try:
if pygame and pygame.mixer.music.get_busy():
music_state['stopping'] = True
pygame.mixer.music.stop()
event_queue.put(('status', 'Audio detenido'))
else:
event_queue.put(('status', 'No hay audio reproduciéndose'))
except Exception as e:
event_queue.put(('status', f'Error al detener: {e}'))
music_box = tk.LabelFrame(right, text='Reproductor música', padx=4, pady=4)
music_box.pack(fill='x', padx=8, pady=(0,8))
tk.Button(music_box, text='Seleccionar', command=seleccionar_musica).pack(side='left', padx=2)
tk.Button(music_box, text='Play', command=reproducir_musica).pack(side='left', padx=2)
tk.Button(music_box, text='Stop', command=detener_musica).pack(side='left', padx=2)
# Chat cliente
chat_client = {'sock': None}
def conectar_chat():
if chat_client['sock']:
messagebox.showinfo('Chat','Ya conectado')
return
host = simpledialog.askstring('Host','Host chat', initialvalue='127.0.0.1')
port = simpledialog.askinteger('Puerto','Puerto', initialvalue=3333)
if not host or not port:
return
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((host, port))
chat_client['sock']=s
event_queue.put(('status', f'Chat conectado {host}:{port}'))
def receptor():
try:
while True:
data = s.recv(1024)
if not data:
break
event_queue.put(('chat', data.decode(errors='ignore')))
except Exception as e:
event_queue.put(('status', f'Error chat: {e}'))
finally:
s.close(); chat_client['sock']=None
event_queue.put(('status','Chat desconectado'))
threading.Thread(target=receptor, daemon=True).start()
except Exception as e:
messagebox.showerror('Chat', f'Error conexión: {e}')
def enviar_chat():
texto = msg.get('1.0','end').strip()
if not texto:
return
s = chat_client.get('sock')
if not s:
messagebox.showwarning('Chat','No conectado')
return
try:
s.send(texto.encode())
msg.delete('1.0','end')
except Exception as e:
event_queue.put(('status', f'Error envío: {e}'))
tk.Button(chat_box, text='Conectar', bg='#ddeeff', command=conectar_chat).pack(pady=(0,4))
tk.Button(chat_box, text='Enviar mensaje', bg='#cfe8cf', command=enviar_chat).pack(pady=(0,6))
# Sección Sockets (TCP/UDP servers) añadida al panel izquierdo
s_sockets = section(left, 'Sockets Locales')
tcp_state = {'thread': None, 'stop': False, 'sock': None}
udp_state = {'thread': None, 'stop': False, 'sock': None}
def start_tcp_server():
if tcp_state['thread'] and tcp_state['thread'].is_alive():
info_label.config(text='TCP Server ya iniciado')
return
port = simpledialog.askinteger('TCP Server','Puerto', initialvalue=5555)
if not port:
return
def worker():
event_queue.put(('status', f'TCP Server escuchando {port}'))
import socket as s
srv = s.socket(s.AF_INET, s.SOCK_STREAM)
srv.setsockopt(s.SOL_SOCKET, s.SO_REUSEADDR,1)
srv.bind(('0.0.0.0', port))
srv.listen(5)
tcp_state['sock']=srv
while not tcp_state['stop']:
try:
srv.settimeout(1.0)
try:
c, addr = srv.accept()
except s.timeout:
continue
event_queue.put(('status', f'Nueva conexión TCP {addr}'))
threading.Thread(target=lambda: manejar_tcp_cliente(c, addr), daemon=True).start()
except Exception as e:
event_queue.put(('status', f'Error server TCP: {e}'))
break
srv.close(); tcp_state['sock']=None; tcp_state['stop']=False
event_queue.put(('status','TCP Server detenido'))
tcp_state['stop']=False
t = threading.Thread(target=worker, daemon=True); tcp_state['thread']=t; t.start()
def manejar_tcp_cliente(c, addr):
try:
while True:
data = c.recv(1024)
if not data:
break
event_queue.put(('status', f'TCP {addr} -> {data[:40]!r}'))
c.send(b'ACK')
except Exception as e:
event_queue.put(('status', f'Error cliente TCP {addr}: {e}'))
finally:
c.close()
def stop_tcp_server():
tcp_state['stop']=True
def start_udp_server():
if udp_state['thread'] and udp_state['thread'].is_alive():
info_label.config(text='UDP Server ya iniciado')
return
port = simpledialog.askinteger('UDP Server','Puerto', initialvalue=5556)
if not port:
return
def worker():
event_queue.put(('status', f'UDP Server escuchando {port}'))
import socket as s
srv = s.socket(s.AF_INET, s.SOCK_DGRAM)
srv.bind(('0.0.0.0', port))
udp_state['sock']=srv
srv.settimeout(1.0)
while not udp_state['stop']:
try:
try:
data, addr = srv.recvfrom(1024)
except s.timeout:
continue
event_queue.put(('status', f'UDP {addr} -> {data[:40]!r}'))
srv.sendto(b'ACK', addr)
except Exception as e:
event_queue.put(('status', f'Error server UDP: {e}'))
break
srv.close(); udp_state['sock']=None; udp_state['stop']=False
event_queue.put(('status','UDP Server detenido'))
udp_state['stop']=False
t = threading.Thread(target=worker, daemon=True); udp_state['thread']=t; t.start()
def stop_udp_server():
udp_state['stop']=True
tk.Button(s_sockets, text='Start TCP', bg='#e0ffe0', command=start_tcp_server).pack(pady=2, fill='x')
tk.Button(s_sockets, text='Stop TCP', bg='#ffe0e0', command=stop_tcp_server).pack(pady=2, fill='x')
tk.Button(s_sockets, text='Start UDP', bg='#e0ffe0', command=start_udp_server).pack(pady=2, fill='x')
tk.Button(s_sockets, text='Stop UDP', bg='#ffe0e0', command=stop_udp_server).pack(pady=2, fill='x')
# Carrera camellos con sincronización mejorada
race_state = {'running': False, 'winner': None, 'lock': threading.Lock(), 'condition': threading.Condition()}
def iniciar_carrera(text_widget):
# Verificar si ya hay una carrera en curso
with race_state['lock']:
if race_state['running']:
event_queue.put(('status', 'Ya hay una carrera en curso'))
return
race_state['running'] = True
race_state['winner'] = None
corredores = 5
posiciones = [0] * corredores
meta = 50
ganador_declarado = threading.Event()
def corredor(i):
import random
nombre = f"Camello {i+1}"
try:
while not ganador_declarado.is_set():
# Sincronización: solo un corredor avanza a la vez
with race_state['lock']:
# Verificar si alguien ya ganó
if ganador_declarado.is_set():
break
# Avanzar
avance = random.randint(1, 3)
posiciones[i] += avance
# Actualizar visualización
event_queue.put(('race_update', (i, posiciones[i], meta)))
# Verificar si alcanzó la meta
if posiciones[i] >= meta and not ganador_declarado.is_set():
ganador_declarado.set()
race_state['winner'] = i
event_queue.put(('race_end', i))
break
# Esperar un poco antes del siguiente avance
time.sleep(random.uniform(0.1, 0.2))
except Exception as e:
event_queue.put(('status', f'Error en {nombre}: {e}'))
finally:
# Asegurar que liberamos recursos
with race_state['lock']:
pass
# Iniciar todos los corredores
event_queue.put(('race_start', corredores))
for i in range(corredores):
threading.Thread(target=corredor, args=(i,), daemon=True, name=f'Corredor-{i+1}').start()
# Thread que monitorea finalización
def monitor_finalizacion():
ganador_declarado.wait(timeout=30) # Timeout de seguridad
time.sleep(0.5) # Esperar a que se procesen últimos eventos
with race_state['lock']:
race_state['running'] = False
if race_state['winner'] is None:
event_queue.put(('status', 'Carrera terminada (timeout)'))
threading.Thread(target=monitor_finalizacion, daemon=True, name='Monitor-Carrera').start()
# Servicios placeholders
def servicio_pop3():
event_queue.put(('status','POP3 placeholder (implementación futura)'))
def servicio_smtp():
event_queue.put(('status','SMTP placeholder (implementación futura)'))
def servicio_ftp():
event_queue.put(('status','FTP placeholder (implementación futura)'))
# Seguridad avanzada
def generar_rsa():
def worker():
if rsa is None:
event_queue.put(('status','Instala cryptography para RSA'))
return
event_queue.put(('status','Generando claves RSA...'))
try:
key = rsa.generate_private_key(public_exponent=65537, key_size=2048, backend=default_backend())
pub = key.public_key().public_bytes(serialization.Encoding.PEM, serialization.PublicFormat.SubjectPublicKeyInfo)
event_queue.put(('hash', f'Llave pública RSA:\n{pub.decode()}'))
event_queue.put(('status','RSA generado'))
except Exception as e:
event_queue.put(('status', f'Error RSA: {e}'))
threading.Thread(target=worker, daemon=True).start()
def aes_cifrar():
texto = simpledialog.askstring('AES','Texto a cifrar')
if not texto:
return
def worker():
try:
from cryptography.fernet import Fernet
except ModuleNotFoundError:
event_queue.put(('status','Instala cryptography para AES/Fernet'))
return
key = Fernet.generate_key(); f = Fernet(key)
ct = f.encrypt(texto.encode())
event_queue.put(('hash', f'AES(Fernet)\nKey: {key.decode()}\nCT: {ct.decode()}'))
event_queue.put(('status','Texto cifrado'))
threading.Thread(target=worker, daemon=True).start()
# STATUS BAR
status = tk.Frame(root, bd=1, relief="sunken")
status.grid(row=2, column=0, columnspan=3, sticky="ew")
status.columnconfigure(0, weight=1)
status.columnconfigure(1, weight=1)
status.columnconfigure(2, weight=1)
status.columnconfigure(3, weight=1)
status.columnconfigure(4, weight=1)
lbl_mail = tk.Label(status, text="Correos sin leer", anchor="w", padx=6)
lbl_temp = tk.Label(status, text="Temperatura local", anchor="w")
lbl_net = tk.Label(status, text="Net 0 KB/s in / 0 KB/s out", anchor="w")
lbl_dt = tk.Label(status, text="Fecha Día y Hora", anchor="e")
lbl_alarm = tk.Label(status, text='Alarma: --', anchor='w')
lbl_mail.grid(row=0, column=0, sticky="w")
lbl_temp.grid(row=0, column=1, sticky="w")
lbl_net.grid(row=0, column=2, sticky="w")
lbl_dt.grid(row=0, column=3, sticky="e", padx=6)
lbl_alarm.grid(row=0, column=4, sticky='w')
# Hilo para actualizar la fecha/hora
def updater(lbl):
try:
while True:
now = datetime.datetime.now()
lbl_text = f"{now.strftime('%A')}, {now.strftime('%Y-%m-%d %H:%M:%S')}"
lbl.after(0, lbl.config, {"text": lbl_text})
time.sleep(1)
except tk.TclError:
return
th = threading.Thread(target=updater, args=(lbl_dt,), daemon=True)
th.start()
# Network I/O monitor (kb/s)
def net_io_runner(lbl):
try:
import psutil
except ModuleNotFoundError:
lbl.after(0, lbl.config, {"text": "Instala psutil para monitor red (pip install psutil)"})
return
prev = psutil.net_io_counters()
prev_time = time.time()
try:
while True:
time.sleep(1)
cur = psutil.net_io_counters()
now = time.time()
dt = now - prev_time if now - prev_time > 0 else 1
sent_b = cur.bytes_sent - prev.bytes_sent
recv_b = cur.bytes_recv - prev.bytes_recv
sent_k = sent_b / 1024.0 / dt
recv_k = recv_b / 1024.0 / dt
prev = cur
prev_time = now
lbl_text = f"Net {recv_k:.1f} KB/s in / {sent_k:.1f} KB/s out"
lbl.after(0, lbl.config, {"text": lbl_text})
except tk.TclError:
return
threading.Thread(target=net_io_runner, args=(lbl_net,), daemon=True).start()
# Consumir API REST con detalles mejorados
def consumir_api(url='https://datatracker.ietf.org/doc/html/rfc6749'):
def worker():
event_queue.put(('status', f'GET {url}'))
try:
import requests
from bs4 import BeautifulSoup
r = requests.get(url, timeout=10)
# Construir respuesta detallada
output = []
output.append("="*80)
output.append(f"URL: {url}")
output.append(f"Status: {r.status_code} {r.reason}")
output.append(f"Content-Type: {r.headers.get('Content-Type', 'N/A')}")
output.append(f"Content-Length: {r.headers.get('Content-Length', 'N/A')}")
output.append(f"Server: {r.headers.get('Server', 'N/A')}")
output.append("="*80)
output.append("\nHEADERS:")
for k, v in list(r.headers.items())[:10]:
output.append(f" {k}: {v}")
output.append("\n" + "="*80)
output.append("RESPONSE BODY:\n")
# Intentar formatear según tipo de contenido
content_type = r.headers.get('Content-Type', '').lower()
if 'application/json' in content_type:
try:
data = r.json()
if isinstance(data, list):
output.append(f"Array with {len(data)} elements:\n")
output.append(json.dumps(data[:5], indent=2)) # Primeros 5 elementos
if len(data) > 5:
output.append(f"\n... and {len(data) - 5} more items")
else:
output.append(json.dumps(data, indent=2)[:3000])
except Exception as e:
output.append(f"JSON parse error: {e}\n{r.text[:2000]}")
elif 'text/html' in content_type:
try:
soup = BeautifulSoup(r.text, 'html.parser')
# Extraer título
title = soup.find('title')
if title:
output.append(f"Title: {title.get_text()}\n")
# Extraer primeros párrafos
paragraphs = soup.find_all('p')[:5]
for p in paragraphs:
text = p.get_text().strip()
if text:
output.append(f"{text}\n")
output.append(f"\n[HTML document with {len(soup.find_all())} tags]")
except Exception:
output.append(r.text[:2000])
else:
# Texto plano u otro
output.append(r.text[:3000])
texto = '\n'.join(output)
event_queue.put(('api', texto))
event_queue.put(('status', f'✓ Respuesta {r.status_code} - {len(r.content)} bytes'))
except Exception as e:
event_queue.put(('api', f'ERROR: {str(e)}'))
event_queue.put(('status', f'✗ Error API: {e}'))
threading.Thread(target=worker, daemon=True).start()
# Añadir botón API en pestaña Navegador después de crear pestañas (modificar contenido)
# Pump de cola
def pump_queue():
try:
while True:
tipo, payload = event_queue.get_nowait()
if tipo == 'status':
info_label.config(text=payload)
elif tipo == 'scrape':
tab_texts['Resultados'].insert('end', payload + '\n---\n')
elif tipo == 'hash':
tab_texts['Resultados'].insert('end', payload + '\n')
elif tipo == 'chat':
tab_texts['Resultados'].insert('end', f'[CHAT] {payload}\n')
elif tipo == 'alarm_progress':
lbl_alarm.config(text=f'Alarma: {payload}s')
elif tipo == 'alarm':
lbl_alarm.config(text='Alarma disparada')
messagebox.showinfo('Alarma', '¡Tiempo cumplido!')
elif tipo == 'api':
tab_texts['Navegador'].insert('end', payload + '\n')
elif tipo == 'race_start':
num_corredores = payload
tab_texts['Tareas'].delete('1.0', 'end')
# Crear tags con colores para cada camello
colores = ['#FF6B6B', '#4ECDC4', '#45B7D1', '#FFA07A', '#98D8C8']
for i in range(num_corredores):
tab_texts['Tareas'].tag_config(f'camello_{i}', foreground=colores[i % len(colores)], font=('Courier', 10, 'bold'))
tab_texts['Tareas'].insert('end', f'🏁 CARRERA INICIADA - {num_corredores} camellos 🏁\n', 'title')
tab_texts['Tareas'].insert('end', '='*60 + '\n\n')
for i in range(num_corredores):
tab_texts['Tareas'].insert('end', f'Camello {i+1}: [░'*40 + f'] 0/{num_corredores*10}\n', f'camello_{i}')
elif tipo == 'race_update':
idx, pos, meta = payload
progreso = int((pos / meta) * 40)
barra = '' * progreso + '' * (40 - progreso)
line_num = idx + 4
try:
inicio = tab_texts['Tareas'].index(f'{line_num}.0')
fin = tab_texts['Tareas'].index(f'{line_num}.end')
tab_texts['Tareas'].delete(inicio, fin)
nueva_linea = f'Camello {idx+1}: [{barra}] {pos}/{meta}'
tab_texts['Tareas'].insert(inicio, nueva_linea, f'camello_{idx}')
except:
pass
elif tipo == 'race_end':
g = payload
tab_texts['Tareas'].insert('end', '\n' + '='*60 + '\n')
tab_texts['Tareas'].insert('end', f'🏆 ¡GANADOR: CAMELLO {g+1}! 🏆\n')
tab_texts['Tareas'].insert('end', '='*60 + '\n')
event_queue.put(('status', f'Carrera finalizada - Ganó Camello {g+1}'))
elif tipo == 'scrape_result':
tab_texts['Navegador'].delete('1.0', 'end')
tab_texts['Navegador'].insert('end', payload)
except queue.Empty:
pass
root.after(200, pump_queue)
root.after(200, pump_queue)
root.mainloop()
if __name__ == '__main__':
start_client()