112 lines
4.3 KiB
Python
112 lines
4.3 KiB
Python
import smtplib
|
|
import imaplib
|
|
import email
|
|
import os
|
|
import re
|
|
import time
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
from email.mime.base import MIMEBase
|
|
from email import encoders
|
|
|
|
EMAIL_SERVER_IP = "10.10.0.101"
|
|
SMTP_PORT = 25
|
|
IMAP_PORT = 143
|
|
TEMP_DIR = "temp_attachments"
|
|
|
|
if not os.path.exists(TEMP_DIR):
|
|
os.makedirs(TEMP_DIR)
|
|
|
|
def clean_content(text):
|
|
"""Limpia el contenido de etiquetas HTML para mejorar la lectura."""
|
|
text = re.sub(r'<(script|style).*?>.*?</\1>', '', text, flags=re.DOTALL | re.IGNORECASE)
|
|
text = re.sub(r'<br\s*/?>', '\n', text, flags=re.IGNORECASE)
|
|
text = re.sub(r'<.*?>', '', text)
|
|
entities = {" ": " ", "á": "á", "é": "é", "í": "í", "ó": "ó", "ú": "ú", "ñ": "ñ"}
|
|
for ent, char in entities.items():
|
|
text = text.replace(ent, char)
|
|
return text.strip()
|
|
|
|
def send_email_logic(sender, receiver, subject, body, password, attachment_path=None):
|
|
"""Envía el correo por SMTP y guarda copia en la carpeta 'Sent' del servidor."""
|
|
try:
|
|
msg = MIMEMultipart()
|
|
msg['From'] = sender
|
|
msg['To'] = receiver
|
|
msg['Subject'] = subject
|
|
msg.attach(MIMEText(body, 'plain'))
|
|
|
|
if attachment_path and os.path.exists(attachment_path):
|
|
filename = os.path.basename(attachment_path)
|
|
with open(attachment_path, "rb") as attachment:
|
|
part = MIMEBase("application", "octet-stream")
|
|
part.set_payload(attachment.read())
|
|
encoders.encode_base64(part)
|
|
part.add_header("Content-Disposition", f"attachment; filename= {filename}")
|
|
msg.attach(part)
|
|
|
|
with smtplib.SMTP(EMAIL_SERVER_IP, SMTP_PORT, timeout=10) as server:
|
|
server.send_message(msg)
|
|
|
|
try:
|
|
mail = imaplib.IMAP4(EMAIL_SERVER_IP, IMAP_PORT)
|
|
mail.login(sender, password)
|
|
imap_date = imaplib.Time2Internaldate(time.time())
|
|
mail.append('Sent', None, imap_date, msg.as_bytes())
|
|
mail.logout()
|
|
except: pass
|
|
|
|
return True, "Enviado"
|
|
except Exception as e:
|
|
return False, str(e)
|
|
|
|
def fetch_emails_logic(user, password):
|
|
"""Obtiene la lista de correos de la bandeja de entrada."""
|
|
try:
|
|
mail = imaplib.IMAP4(EMAIL_SERVER_IP, IMAP_PORT)
|
|
mail.login(user, password)
|
|
mail.select("inbox")
|
|
_, messages = mail.search(None, 'ALL')
|
|
mail_ids = messages[0].split()
|
|
results = []
|
|
for i in mail_ids[-15:]:
|
|
_, data = mail.fetch(i, '(RFC822)')
|
|
for part in data:
|
|
if isinstance(part, tuple):
|
|
msg = email.message_from_bytes(part[1])
|
|
results.append({
|
|
'id': i.decode(),
|
|
'sender': msg.get('from', 'Desconocido'),
|
|
'subject': msg.get('subject', 'Sin asunto')
|
|
})
|
|
mail.logout()
|
|
return True, results[::-1]
|
|
except Exception as e:
|
|
return False, str(e)
|
|
|
|
def fetch_email_full_data(user, password, email_id):
|
|
"""Descarga el cuerpo del mensaje y gestiona adjuntos/imágenes."""
|
|
try:
|
|
mail = imaplib.IMAP4(EMAIL_SERVER_IP, IMAP_PORT)
|
|
mail.login(user, password)
|
|
mail.select("inbox")
|
|
_, data = mail.fetch(str(email_id), '(RFC822)')
|
|
content, images, files = "", [], []
|
|
for part_data in data:
|
|
if isinstance(part_data, tuple):
|
|
msg = email.message_from_bytes(part_data[1])
|
|
for part in msg.walk():
|
|
ctype = part.get_content_type()
|
|
fname = part.get_filename()
|
|
if ctype in ["text/plain", "text/html"] and not fname:
|
|
content += part.get_payload(decode=True).decode(errors='replace')
|
|
elif fname:
|
|
fpath = os.path.join(TEMP_DIR, fname)
|
|
with open(fpath, "wb") as f:
|
|
f.write(part.get_payload(decode=True))
|
|
if ctype.startswith("image/"): images.append(fpath)
|
|
else: files.append(fpath)
|
|
mail.logout()
|
|
return True, {"body": clean_content(content), "images": images, "files": files}
|
|
except Exception as e:
|
|
return False, str(e) |