FinalProjectAndresGMoran/app/email/emailClient.py

134 lines
5.2 KiB
Python

# emailClient.py
import datetime
import smtplib
import imaplib
import email
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import threading
import traceback
class EmailClient:
def __init__(self):
self.server_ip = "s1.ieslamar.org"
self.ports = {
"SMTP": 25,
"SMTP-S": 465,
"SMTP-SUBMISSION": 587,
"IMAP": 143,
"IMAP-S": 993
}
def enviar_correo(self, email_user, password, destinatario, asunto, mensaje):
def enviar():
try:
with smtplib.SMTP(self.server_ip, self.ports["SMTP"], local_hostname="localhost") as smtp:
smtp.login(email_user, password)
msg = MIMEMultipart()
msg["From"] = email_user
msg["To"] = destinatario
msg["Subject"] = asunto
msg["Date"] = datetime.datetime.now().strftime("%a, %d %b %Y %H:%M:%S %z")
msg.attach(MIMEText(mensaje, "plain"))
smtp.sendmail(email_user, destinatario, msg.as_string())
print("Correo enviado correctamente.")
except Exception as e:
print(f"Error al enviar el correo: {e}")
traceback.print_exc()
thread = threading.Thread(target=enviar)
thread.start()
def recibir_correos(self, email_user, password, callback):
"""Recupera todos los correos (leídos y no leídos) del buzón de entrada."""
def recibir():
try:
with imaplib.IMAP4_SSL(self.server_ip, self.ports["IMAP-S"]) as mail:
mail.login(email_user, password)
mail.select("inbox")
status, mensajes = mail.search(None, 'ALL') # Obtener todos los correos
lista_mensajes = mensajes[0].split()
correos = []
for num in lista_mensajes:
# Obtener el mensaje completo junto con sus FLAGS
status, data = mail.fetch(num, '(RFC822 FLAGS)')
if not data or len(data) < 2 or not isinstance(data[0], tuple):
continue # Evitar errores si no hay datos válidos
mensaje_bytes = data[0][1]
mensaje = email.message_from_bytes(mensaje_bytes)
# Obtener las banderas del mensaje
status, flag_data = mail.fetch(num, '(FLAGS)')
flags = flag_data[0].decode() if flag_data and flag_data[0] else ""
# Determinar si está leído o no
leido = '\\Seen' in flags
correos.append({
"id": num.decode(),
"from": mensaje["From"],
"subject": mensaje["Subject"],
"date": mensaje["Date"],
"read": leido
})
callback(correos) # Enviar la lista de correos a la GUI
except Exception as e:
print(f"Error al recibir correos: {e}")
threading.Thread(target=recibir).start()
def eliminar_correo(self, email_user, password, correo_id):
def eliminar():
try:
with imaplib.IMAP4_SSL(self.server_ip, self.ports["IMAP-S"]) as mail:
mail.login(email_user, password)
mail.select("inbox")
mail.store(correo_id, "+FLAGS", "\\Deleted")
mail.expunge()
except Exception as e:
print(f"Error al eliminar el correo: {e}")
threading.Thread(target=eliminar).start()
def obtener_contenido_correo(self, email_user, password, correo_id, callback):
def obtener():
try:
with imaplib.IMAP4_SSL(self.server_ip, self.ports["IMAP-S"]) as mail:
mail.login(email_user, password)
mail.select("inbox")
status, data = mail.fetch(correo_id, '(RFC822)')
mensaje = email.message_from_bytes(data[0][1])
# Obtener el cuerpo del mensaje
cuerpo = ""
if mensaje.is_multipart():
for parte in mensaje.walk():
if parte.get_content_type() == "text/plain":
cuerpo = parte.get_payload(decode=True).decode()
else:
cuerpo = mensaje.get_payload(decode=True).decode()
# Marcar como leído
mail.store(correo_id, '+FLAGS', '\\Seen')
callback({
"from": mensaje["From"],
"subject": mensaje["Subject"],
"date": mensaje["Date"],
"body": cuerpo
})
except Exception as e:
print(f"Error al obtener el contenido del correo: {e}")
thread = threading.Thread(target=obtener)
thread.start()