224 lines
8.1 KiB
Python
224 lines
8.1 KiB
Python
# Módulo: logica/correo.py
|
|
|
|
import imaplib
|
|
import smtplib
|
|
import email
|
|
import ssl
|
|
import os
|
|
from email.mime.text import MIMEText
|
|
from email.header import decode_header
|
|
from dotenv import load_dotenv, set_key
|
|
|
|
ENV_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), '.env')
|
|
|
|
|
|
class CorreoClient:
|
|
"""Cliente de correo IMAP/SMTP con persistencia de credenciales."""
|
|
|
|
# Puertos por defecto según modo de cifrado
|
|
PUERTOS = {
|
|
"none": {"imap": 143, "smtp": 25},
|
|
"ssl": {"imap": 993, "smtp": 465},
|
|
"starttls": {"imap": 143, "smtp": 587},
|
|
}
|
|
|
|
def __init__(self):
|
|
self.imap = None
|
|
self.server = ""
|
|
self.imap_port = 143
|
|
self.smtp_port = 25
|
|
self.user = ""
|
|
self.password = ""
|
|
self.security = "none" # "none", "ssl", "starttls"
|
|
|
|
# ------------------------------------------------------------------
|
|
# Configuración persistente (.env)
|
|
# ------------------------------------------------------------------
|
|
|
|
def cargar_config(self):
|
|
"""Lee credenciales del archivo .env. Devuelve dict con los valores."""
|
|
load_dotenv(ENV_PATH, override=True)
|
|
return {
|
|
"server": os.getenv("MAIL_SERVER", ""),
|
|
"imap_port": os.getenv("MAIL_IMAP_PORT", "143"),
|
|
"smtp_port": os.getenv("MAIL_SMTP_PORT", "25"),
|
|
"user": os.getenv("MAIL_USER", ""),
|
|
"password": os.getenv("MAIL_PASSWORD", ""),
|
|
"security": os.getenv("MAIL_SECURITY", "none"),
|
|
}
|
|
|
|
def guardar_config(self, server, imap_port, smtp_port, user, password, security):
|
|
"""Escribe credenciales al archivo .env."""
|
|
if not os.path.exists(ENV_PATH):
|
|
with open(ENV_PATH, "w") as f:
|
|
f.write("")
|
|
set_key(ENV_PATH, "MAIL_SERVER", server)
|
|
set_key(ENV_PATH, "MAIL_IMAP_PORT", str(imap_port))
|
|
set_key(ENV_PATH, "MAIL_SMTP_PORT", str(smtp_port))
|
|
set_key(ENV_PATH, "MAIL_USER", user)
|
|
set_key(ENV_PATH, "MAIL_PASSWORD", password)
|
|
set_key(ENV_PATH, "MAIL_SECURITY", security)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Conexión IMAP
|
|
# ------------------------------------------------------------------
|
|
|
|
def conectar(self, server, imap_port, user, password, security="none"):
|
|
"""Conecta al servidor IMAP y hace login.
|
|
security: "none", "ssl" o "starttls"
|
|
"""
|
|
self.server = server
|
|
self.imap_port = int(imap_port)
|
|
self.user = user
|
|
self.password = password
|
|
self.security = security
|
|
|
|
if security == "ssl":
|
|
ctx = ssl.create_default_context()
|
|
self.imap = imaplib.IMAP4_SSL(server, self.imap_port, ssl_context=ctx)
|
|
elif security == "starttls":
|
|
self.imap = imaplib.IMAP4(server, self.imap_port)
|
|
self.imap.starttls(ssl.create_default_context())
|
|
else:
|
|
self.imap = imaplib.IMAP4(server, self.imap_port)
|
|
|
|
self.imap.login(user, password)
|
|
|
|
def desconectar(self):
|
|
"""Cierra la conexión IMAP."""
|
|
if self.imap:
|
|
try:
|
|
self.imap.logout()
|
|
except Exception:
|
|
pass
|
|
self.imap = None
|
|
|
|
# ------------------------------------------------------------------
|
|
# Bandeja de entrada
|
|
# ------------------------------------------------------------------
|
|
|
|
def obtener_bandeja(self):
|
|
"""Lista correos del INBOX. Devuelve lista de dicts con uid, de, asunto, fecha."""
|
|
self.imap.noop() # Fuerza sincronización con el servidor antes de leer
|
|
self.imap.select("INBOX")
|
|
status, data = self.imap.uid("search", None, "ALL")
|
|
if status != "OK":
|
|
return []
|
|
|
|
uids = data[0].split()
|
|
correos = []
|
|
|
|
for uid in reversed(uids[-100:]): # Últimos 100, más recientes primero
|
|
status, msg_data = self.imap.uid("fetch", uid, "(FLAGS BODY.PEEK[HEADER.FIELDS (FROM SUBJECT DATE)])")
|
|
if status != "OK" or not msg_data[0]:
|
|
continue
|
|
|
|
info_raw = msg_data[0][0].decode("utf-8", errors="replace")
|
|
leido = "\\Seen" in info_raw
|
|
|
|
raw = msg_data[0][1]
|
|
msg = email.message_from_bytes(raw)
|
|
|
|
de = self._decode_header(msg.get("From", ""))
|
|
asunto = self._decode_header(msg.get("Subject", "(Sin asunto)"))
|
|
fecha = msg.get("Date", "")
|
|
|
|
correos.append({
|
|
"uid": uid.decode(),
|
|
"de": de,
|
|
"asunto": asunto,
|
|
"fecha": fecha,
|
|
"leido": leido,
|
|
})
|
|
|
|
return correos
|
|
|
|
def marcar_leido(self, uid):
|
|
"""Marca un correo como leído en el servidor."""
|
|
try:
|
|
self.imap.select("INBOX")
|
|
self.imap.uid("store", uid.encode(), "+FLAGS", "\\Seen")
|
|
except Exception:
|
|
pass
|
|
|
|
def leer_correo(self, uid):
|
|
"""Lee el contenido completo de un correo por UID. Devuelve texto plano."""
|
|
self.imap.select("INBOX")
|
|
status, msg_data = self.imap.uid("fetch", uid.encode(), "(RFC822)")
|
|
if status != "OK" or not msg_data[0]:
|
|
return "(No se pudo leer el correo)"
|
|
|
|
raw = msg_data[0][1]
|
|
msg = email.message_from_bytes(raw)
|
|
|
|
body = ""
|
|
if msg.is_multipart():
|
|
for part in msg.walk():
|
|
ct = part.get_content_type()
|
|
if ct == "text/plain":
|
|
payload = part.get_payload(decode=True)
|
|
charset = part.get_content_charset() or "utf-8"
|
|
body += payload.decode(charset, errors="replace")
|
|
else:
|
|
payload = msg.get_payload(decode=True)
|
|
charset = msg.get_content_charset() or "utf-8"
|
|
body = payload.decode(charset, errors="replace")
|
|
|
|
return body if body else "(Correo sin contenido de texto)"
|
|
|
|
# ------------------------------------------------------------------
|
|
# Envío SMTP
|
|
# ------------------------------------------------------------------
|
|
|
|
def enviar_correo(self, smtp_server, smtp_port, from_addr, password, to, subject, body, security="none"):
|
|
"""Envía un email vía SMTP.
|
|
security: "none", "ssl" o "starttls"
|
|
"""
|
|
msg = MIMEText(body, "plain", "utf-8")
|
|
msg["From"] = from_addr
|
|
msg["To"] = to
|
|
msg["Subject"] = subject
|
|
|
|
if security == "ssl":
|
|
ctx = ssl.create_default_context()
|
|
with smtplib.SMTP_SSL(smtp_server, int(smtp_port), context=ctx) as smtp:
|
|
smtp.ehlo()
|
|
if smtp.has_extn("AUTH") and password:
|
|
smtp.login(from_addr, password)
|
|
smtp.send_message(msg)
|
|
elif security == "starttls":
|
|
with smtplib.SMTP(smtp_server, int(smtp_port)) as smtp:
|
|
smtp.ehlo()
|
|
smtp.starttls(context=ssl.create_default_context())
|
|
smtp.ehlo()
|
|
if smtp.has_extn("AUTH") and password:
|
|
smtp.login(from_addr, password)
|
|
smtp.send_message(msg)
|
|
else:
|
|
with smtplib.SMTP(smtp_server, int(smtp_port)) as smtp:
|
|
smtp.ehlo()
|
|
if smtp.has_extn("AUTH") and password:
|
|
try:
|
|
smtp.login(from_addr, password)
|
|
except smtplib.SMTPAuthenticationError:
|
|
pass # Servidor interno: anuncia AUTH pero no lo requiere
|
|
smtp.send_message(msg)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Utilidades
|
|
# ------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def _decode_header(value):
|
|
"""Decodifica cabeceras MIME."""
|
|
if not value:
|
|
return ""
|
|
parts = decode_header(value)
|
|
decoded = []
|
|
for part, charset in parts:
|
|
if isinstance(part, bytes):
|
|
decoded.append(part.decode(charset or "utf-8", errors="replace"))
|
|
else:
|
|
decoded.append(part)
|
|
return " ".join(decoded)
|