fix correo: lambda closure en except, SMTP auth opcional, requirements con versiones

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Olivia Mestre Llobell 2026-02-19 18:46:40 +01:00
parent 479696b783
commit f343c8c5f6
4 changed files with 247 additions and 25 deletions

10
.gitignore vendored
View File

@ -1 +1,11 @@
/.read.md /.read.md
# Entorno virtual
venv/
# Bytecode Python
__pycache__/
*.py[cod]
# Variables de entorno (credenciales)
.env

210
logica/correo.py Normal file
View File

@ -0,0 +1,210 @@
# Módulo: logica/correo.py
import imaplib
import smtplib
import email
import ssl
import os
from email.mime.text import MIMEText
from email.header import decode_header
from dotenv import load_dotenv, set_key
ENV_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), '.env')
class CorreoClient:
"""Cliente de correo IMAP/SMTP con persistencia de credenciales."""
# Puertos por defecto según modo de cifrado
PUERTOS = {
"none": {"imap": 143, "smtp": 25},
"ssl": {"imap": 993, "smtp": 465},
"starttls": {"imap": 143, "smtp": 587},
}
def __init__(self):
self.imap = None
self.server = ""
self.imap_port = 143
self.smtp_port = 25
self.user = ""
self.password = ""
self.security = "none" # "none", "ssl", "starttls"
# ------------------------------------------------------------------
# Configuración persistente (.env)
# ------------------------------------------------------------------
def cargar_config(self):
"""Lee credenciales del archivo .env. Devuelve dict con los valores."""
load_dotenv(ENV_PATH, override=True)
return {
"server": os.getenv("MAIL_SERVER", ""),
"imap_port": os.getenv("MAIL_IMAP_PORT", "143"),
"smtp_port": os.getenv("MAIL_SMTP_PORT", "25"),
"user": os.getenv("MAIL_USER", ""),
"password": os.getenv("MAIL_PASSWORD", ""),
"security": os.getenv("MAIL_SECURITY", "none"),
}
def guardar_config(self, server, imap_port, smtp_port, user, password, security):
"""Escribe credenciales al archivo .env."""
if not os.path.exists(ENV_PATH):
with open(ENV_PATH, "w") as f:
f.write("")
set_key(ENV_PATH, "MAIL_SERVER", server)
set_key(ENV_PATH, "MAIL_IMAP_PORT", str(imap_port))
set_key(ENV_PATH, "MAIL_SMTP_PORT", str(smtp_port))
set_key(ENV_PATH, "MAIL_USER", user)
set_key(ENV_PATH, "MAIL_PASSWORD", password)
set_key(ENV_PATH, "MAIL_SECURITY", security)
# ------------------------------------------------------------------
# Conexión IMAP
# ------------------------------------------------------------------
def conectar(self, server, imap_port, user, password, security="none"):
"""Conecta al servidor IMAP y hace login.
security: "none", "ssl" o "starttls"
"""
self.server = server
self.imap_port = int(imap_port)
self.user = user
self.password = password
self.security = security
if security == "ssl":
ctx = ssl.create_default_context()
self.imap = imaplib.IMAP4_SSL(server, self.imap_port, ssl_context=ctx)
elif security == "starttls":
self.imap = imaplib.IMAP4(server, self.imap_port)
self.imap.starttls(ssl.create_default_context())
else:
self.imap = imaplib.IMAP4(server, self.imap_port)
self.imap.login(user, password)
def desconectar(self):
"""Cierra la conexión IMAP."""
if self.imap:
try:
self.imap.logout()
except Exception:
pass
self.imap = None
# ------------------------------------------------------------------
# Bandeja de entrada
# ------------------------------------------------------------------
def obtener_bandeja(self):
"""Lista correos del INBOX. Devuelve lista de dicts con uid, de, asunto, fecha."""
self.imap.select("INBOX")
status, data = self.imap.uid("search", None, "ALL")
if status != "OK":
return []
uids = data[0].split()
correos = []
for uid in reversed(uids[-100:]): # Últimos 100, más recientes primero
status, msg_data = self.imap.uid("fetch", uid, "(BODY.PEEK[HEADER.FIELDS (FROM SUBJECT DATE)])")
if status != "OK" or not msg_data[0]:
continue
raw = msg_data[0][1]
msg = email.message_from_bytes(raw)
de = self._decode_header(msg.get("From", ""))
asunto = self._decode_header(msg.get("Subject", "(Sin asunto)"))
fecha = msg.get("Date", "")
correos.append({
"uid": uid.decode(),
"de": de,
"asunto": asunto,
"fecha": fecha,
})
return correos
def leer_correo(self, uid):
"""Lee el contenido completo de un correo por UID. Devuelve texto plano."""
self.imap.select("INBOX")
status, msg_data = self.imap.uid("fetch", uid.encode(), "(RFC822)")
if status != "OK" or not msg_data[0]:
return "(No se pudo leer el correo)"
raw = msg_data[0][1]
msg = email.message_from_bytes(raw)
body = ""
if msg.is_multipart():
for part in msg.walk():
ct = part.get_content_type()
if ct == "text/plain":
payload = part.get_payload(decode=True)
charset = part.get_content_charset() or "utf-8"
body += payload.decode(charset, errors="replace")
else:
payload = msg.get_payload(decode=True)
charset = msg.get_content_charset() or "utf-8"
body = payload.decode(charset, errors="replace")
return body if body else "(Correo sin contenido de texto)"
# ------------------------------------------------------------------
# Envío SMTP
# ------------------------------------------------------------------
def enviar_correo(self, smtp_server, smtp_port, from_addr, password, to, subject, body, security="none"):
"""Envía un email vía SMTP.
security: "none", "ssl" o "starttls"
"""
msg = MIMEText(body, "plain", "utf-8")
msg["From"] = from_addr
msg["To"] = to
msg["Subject"] = subject
if security == "ssl":
ctx = ssl.create_default_context()
with smtplib.SMTP_SSL(smtp_server, int(smtp_port), context=ctx) as smtp:
smtp.ehlo()
if smtp.has_extn("AUTH") and password:
smtp.login(from_addr, password)
smtp.send_message(msg)
elif security == "starttls":
with smtplib.SMTP(smtp_server, int(smtp_port)) as smtp:
smtp.ehlo()
smtp.starttls(context=ssl.create_default_context())
smtp.ehlo()
if smtp.has_extn("AUTH") and password:
smtp.login(from_addr, password)
smtp.send_message(msg)
else:
with smtplib.SMTP(smtp_server, int(smtp_port)) as smtp:
smtp.ehlo()
if smtp.has_extn("AUTH") and password:
try:
smtp.login(from_addr, password)
except smtplib.SMTPAuthenticationError:
pass # Servidor interno: anuncia AUTH pero no lo requiere
smtp.send_message(msg)
# ------------------------------------------------------------------
# Utilidades
# ------------------------------------------------------------------
@staticmethod
def _decode_header(value):
"""Decodifica cabeceras MIME."""
if not value:
return ""
parts = decode_header(value)
decoded = []
for part, charset in parts:
if isinstance(part, bytes):
decoded.append(part.decode(charset or "utf-8", errors="replace"))
else:
decoded.append(part)
return " ".join(decoded)

View File

@ -1,23 +1,23 @@
beautifulsoup4 beautifulsoup4==4.14.3
bs4 bs4==0.0.2
certifi certifi==2026.1.4
charset-normalizer charset-normalizer==3.4.4
contourpy contourpy==1.3.3
cycler cycler==0.12.1
fonttools fonttools==4.61.1
idna idna==3.11
kiwisolver kiwisolver==1.4.9
matplotlib matplotlib==3.10.8
numpy numpy==2.4.1
packaging packaging==25.0
pillow pillow==12.1.0
psutil psutil==7.2.1
pyparsing pyparsing==3.3.1
python-dateutil python-dateutil==2.9.0.post0
python-vlc python-dotenv==1.2.1
requests python-vlc==3.0.21203
six requests==2.32.5
soupsieve six==1.17.0
typing_extensions soupsieve==2.8.2
urllib3 typing_extensions==4.15.0
python-dotenv urllib3==2.6.3

View File

@ -196,7 +196,8 @@ class CorreosPanel(ttk.Frame):
correos = self.cliente.obtener_bandeja() correos = self.cliente.obtener_bandeja()
self.root.after(0, lambda: self._conexion_exitosa(correos, user)) self.root.after(0, lambda: self._conexion_exitosa(correos, user))
except Exception as e: except Exception as e:
self.root.after(0, lambda: self._conexion_fallida(str(e))) msg = str(e)
self.root.after(0, lambda m=msg: self._conexion_fallida(m))
threading.Thread(target=tarea, daemon=True).start() threading.Thread(target=tarea, daemon=True).start()
@ -393,7 +394,8 @@ class CorreosPanel(ttk.Frame):
) )
self.root.after(0, self._envio_exitoso) self.root.after(0, self._envio_exitoso)
except Exception as e: except Exception as e:
self.root.after(0, lambda: self._envio_fallido(str(e))) msg = str(e)
self.root.after(0, lambda m=msg: self._envio_fallido(m))
threading.Thread(target=tarea, daemon=True).start() threading.Thread(target=tarea, daemon=True).start()