Lógica del email

This commit is contained in:
mireya 2026-02-27 10:58:36 +00:00
parent 16b0003820
commit 384c99cb4a
1 changed files with 112 additions and 0 deletions

112
email_logic.py Normal file
View File

@ -0,0 +1,112 @@
import smtplib
import imaplib
import email
import os
import re
import time
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
EMAIL_SERVER_IP = "10.10.0.101"
SMTP_PORT = 25
IMAP_PORT = 143
TEMP_DIR = "temp_attachments"
if not os.path.exists(TEMP_DIR):
os.makedirs(TEMP_DIR)
def clean_content(text):
"""Limpia el contenido de etiquetas HTML para mejorar la lectura."""
text = re.sub(r'<(script|style).*?>.*?</\1>', '', text, flags=re.DOTALL | re.IGNORECASE)
text = re.sub(r'<br\s*/?>', '\n', text, flags=re.IGNORECASE)
text = re.sub(r'<.*?>', '', text)
entities = {"&nbsp;": " ", "&aacute;": "á", "&eacute;": "é", "&iacute;": "í", "&oacute;": "ó", "&uacute;": "ú", "&ntilde;": "ñ"}
for ent, char in entities.items():
text = text.replace(ent, char)
return text.strip()
def send_email_logic(sender, receiver, subject, body, password, attachment_path=None):
"""Envía el correo por SMTP y guarda copia en la carpeta 'Sent' del servidor."""
try:
msg = MIMEMultipart()
msg['From'] = sender
msg['To'] = receiver
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
if attachment_path and os.path.exists(attachment_path):
filename = os.path.basename(attachment_path)
with open(attachment_path, "rb") as attachment:
part = MIMEBase("application", "octet-stream")
part.set_payload(attachment.read())
encoders.encode_base64(part)
part.add_header("Content-Disposition", f"attachment; filename= {filename}")
msg.attach(part)
with smtplib.SMTP(EMAIL_SERVER_IP, SMTP_PORT, timeout=10) as server:
server.send_message(msg)
try:
mail = imaplib.IMAP4(EMAIL_SERVER_IP, IMAP_PORT)
mail.login(sender, password)
imap_date = imaplib.Time2Internaldate(time.time())
mail.append('Sent', None, imap_date, msg.as_bytes())
mail.logout()
except: pass
return True, "Enviado"
except Exception as e:
return False, str(e)
def fetch_emails_logic(user, password):
"""Obtiene la lista de correos de la bandeja de entrada."""
try:
mail = imaplib.IMAP4(EMAIL_SERVER_IP, IMAP_PORT)
mail.login(user, password)
mail.select("inbox")
_, messages = mail.search(None, 'ALL')
mail_ids = messages[0].split()
results = []
for i in mail_ids[-15:]:
_, data = mail.fetch(i, '(RFC822)')
for part in data:
if isinstance(part, tuple):
msg = email.message_from_bytes(part[1])
results.append({
'id': i.decode(),
'sender': msg.get('from', 'Desconocido'),
'subject': msg.get('subject', 'Sin asunto')
})
mail.logout()
return True, results[::-1]
except Exception as e:
return False, str(e)
def fetch_email_full_data(user, password, email_id):
"""Descarga el cuerpo del mensaje y gestiona adjuntos/imágenes."""
try:
mail = imaplib.IMAP4(EMAIL_SERVER_IP, IMAP_PORT)
mail.login(user, password)
mail.select("inbox")
_, data = mail.fetch(str(email_id), '(RFC822)')
content, images, files = "", [], []
for part_data in data:
if isinstance(part_data, tuple):
msg = email.message_from_bytes(part_data[1])
for part in msg.walk():
ctype = part.get_content_type()
fname = part.get_filename()
if ctype in ["text/plain", "text/html"] and not fname:
content += part.get_payload(decode=True).decode(errors='replace')
elif fname:
fpath = os.path.join(TEMP_DIR, fname)
with open(fpath, "wb") as f:
f.write(part.get_payload(decode=True))
if ctype.startswith("image/"): images.append(fpath)
else: files.append(fpath)
mail.logout()
return True, {"body": clean_content(content), "images": images, "files": files}
except Exception as e:
return False, str(e)