ProyectoPSP/email_logic.py

112 lines
4.3 KiB
Python

import smtplib
import imaplib
import email
import os
import re
import time
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
EMAIL_SERVER_IP = "10.10.0.101"
SMTP_PORT = 25
IMAP_PORT = 143
TEMP_DIR = "temp_attachments"
if not os.path.exists(TEMP_DIR):
os.makedirs(TEMP_DIR)
def clean_content(text):
"""Limpia el contenido de etiquetas HTML para mejorar la lectura."""
text = re.sub(r'<(script|style).*?>.*?</\1>', '', text, flags=re.DOTALL | re.IGNORECASE)
text = re.sub(r'<br\s*/?>', '\n', text, flags=re.IGNORECASE)
text = re.sub(r'<.*?>', '', text)
entities = {"&nbsp;": " ", "&aacute;": "á", "&eacute;": "é", "&iacute;": "í", "&oacute;": "ó", "&uacute;": "ú", "&ntilde;": "ñ"}
for ent, char in entities.items():
text = text.replace(ent, char)
return text.strip()
def send_email_logic(sender, receiver, subject, body, password, attachment_path=None):
"""Envía el correo por SMTP y guarda copia en la carpeta 'Sent' del servidor."""
try:
msg = MIMEMultipart()
msg['From'] = sender
msg['To'] = receiver
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
if attachment_path and os.path.exists(attachment_path):
filename = os.path.basename(attachment_path)
with open(attachment_path, "rb") as attachment:
part = MIMEBase("application", "octet-stream")
part.set_payload(attachment.read())
encoders.encode_base64(part)
part.add_header("Content-Disposition", f"attachment; filename= {filename}")
msg.attach(part)
with smtplib.SMTP(EMAIL_SERVER_IP, SMTP_PORT, timeout=10) as server:
server.send_message(msg)
try:
mail = imaplib.IMAP4(EMAIL_SERVER_IP, IMAP_PORT)
mail.login(sender, password)
imap_date = imaplib.Time2Internaldate(time.time())
mail.append('Sent', None, imap_date, msg.as_bytes())
mail.logout()
except: pass
return True, "Enviado"
except Exception as e:
return False, str(e)
def fetch_emails_logic(user, password):
"""Obtiene la lista de correos de la bandeja de entrada."""
try:
mail = imaplib.IMAP4(EMAIL_SERVER_IP, IMAP_PORT)
mail.login(user, password)
mail.select("inbox")
_, messages = mail.search(None, 'ALL')
mail_ids = messages[0].split()
results = []
for i in mail_ids[-15:]:
_, data = mail.fetch(i, '(RFC822)')
for part in data:
if isinstance(part, tuple):
msg = email.message_from_bytes(part[1])
results.append({
'id': i.decode(),
'sender': msg.get('from', 'Desconocido'),
'subject': msg.get('subject', 'Sin asunto')
})
mail.logout()
return True, results[::-1]
except Exception as e:
return False, str(e)
def fetch_email_full_data(user, password, email_id):
"""Descarga el cuerpo del mensaje y gestiona adjuntos/imágenes."""
try:
mail = imaplib.IMAP4(EMAIL_SERVER_IP, IMAP_PORT)
mail.login(user, password)
mail.select("inbox")
_, data = mail.fetch(str(email_id), '(RFC822)')
content, images, files = "", [], []
for part_data in data:
if isinstance(part_data, tuple):
msg = email.message_from_bytes(part_data[1])
for part in msg.walk():
ctype = part.get_content_type()
fname = part.get_filename()
if ctype in ["text/plain", "text/html"] and not fname:
content += part.get_payload(decode=True).decode(errors='replace')
elif fname:
fpath = os.path.join(TEMP_DIR, fname)
with open(fpath, "wb") as f:
f.write(part.get_payload(decode=True))
if ctype.startswith("image/"): images.append(fpath)
else: files.append(fpath)
mail.logout()
return True, {"body": clean_content(content), "images": images, "files": files}
except Exception as e:
return False, str(e)