200 lines
8.6 KiB
Python
200 lines
8.6 KiB
Python
import tkinter as tk
|
|
import threading
|
|
import time
|
|
import mysql.connector
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
from tkinter import messagebox
|
|
|
|
class WebScraperToDB:
|
|
def __init__(self, parent):
|
|
"""
|
|
Inicializa la interfaz gráfica para scraping web con integración a base de datos.
|
|
|
|
Args:
|
|
parent (tk.Frame): Contenedor en el que se mostrará la interfaz gráfica.
|
|
"""
|
|
self.parent = parent
|
|
self.scraping_thread = None # Hilo que ejecutará el scraping
|
|
self.stop_event = threading.Event() # Evento para detener el scraping
|
|
|
|
# Crear la sección de conexión a la base de datos
|
|
db_frame = tk.Frame(self.parent)
|
|
db_frame.pack(pady=5)
|
|
|
|
# Campos de entrada para la conexión a MySQL
|
|
tk.Label(db_frame, text="Host:").grid(row=0, column=0)
|
|
self.host_entry = tk.Entry(db_frame)
|
|
self.host_entry.insert(0, "localhost") # Valor predeterminado
|
|
self.host_entry.grid(row=0, column=1)
|
|
|
|
tk.Label(db_frame, text="Usuario:").grid(row=1, column=0)
|
|
self.user_entry = tk.Entry(db_frame)
|
|
self.user_entry.insert(0, "root") # Usuario predeterminado
|
|
self.user_entry.grid(row=1, column=1)
|
|
|
|
tk.Label(db_frame, text="Contraseña:").grid(row=2, column=0)
|
|
self.password_entry = tk.Entry(db_frame, show="*") # Campo oculto para seguridad
|
|
self.password_entry.grid(row=2, column=1)
|
|
|
|
tk.Label(db_frame, text="Nombre BD:").grid(row=3, column=0)
|
|
self.database_entry = tk.Entry(db_frame)
|
|
self.database_entry.insert(0, "scraping_db") # Base de datos predeterminada
|
|
self.database_entry.grid(row=3, column=1)
|
|
|
|
# Botón para crear la base de datos
|
|
tk.Button(db_frame, text="Crear Base de Datos", command=self.create_database).grid(row=4, column=0, columnspan=2, pady=5)
|
|
|
|
# Sección de controles para scraping
|
|
control_frame = tk.Frame(self.parent)
|
|
control_frame.pack(pady=5)
|
|
|
|
# Campo de entrada para la URL a scrape
|
|
tk.Label(control_frame, text="URL para Scraping:").grid(row=0, column=0)
|
|
self.url_entry = tk.Entry(control_frame, width=50)
|
|
self.url_entry.insert(0, "https://quotes.toscrape.com/") # URL de prueba
|
|
self.url_entry.grid(row=0, column=1)
|
|
|
|
# Campo para ingresar el selector HTML
|
|
tk.Label(control_frame, text="Selector HTML:").grid(row=2, column=0)
|
|
self.selector_entry = tk.Entry(control_frame, width=50)
|
|
self.selector_entry.insert(0, "h1") # Selector predeterminado
|
|
self.selector_entry.grid(row=2, column=1)
|
|
|
|
# Botones de control
|
|
self.start_button = tk.Button(control_frame, text="Iniciar Scraping", command=self.start_scraping)
|
|
self.start_button.grid(row=1, column=0, pady=5)
|
|
|
|
self.stop_button = tk.Button(control_frame, text="Parar Scraping", command=self.stop_scraping, state="disabled")
|
|
self.stop_button.grid(row=1, column=1, pady=5)
|
|
|
|
self.reset_button = tk.Button(control_frame, text="Resetear Scraping", command=self.reset_database)
|
|
self.reset_button.grid(row=1, column=2, pady=5)
|
|
|
|
# Etiqueta para mostrar el estado del scraping
|
|
self.status_label = tk.Label(self.parent, text="Estado: Inactivo", fg="red")
|
|
self.status_label.pack(pady=5)
|
|
|
|
# Área para mostrar los datos scrapeados
|
|
self.scraped_data_frame = tk.Frame(self.parent)
|
|
self.scraped_data_frame.pack(pady=5, fill="both", expand=True)
|
|
|
|
tk.Label(self.scraped_data_frame, text="Datos Scrapeados:").pack(anchor="w")
|
|
|
|
self.scraped_data_text = tk.Text(self.scraped_data_frame, height=10, state="disabled")
|
|
self.scraped_data_text.pack(fill="both", expand=True)
|
|
|
|
def create_database(self):
|
|
"""Crea la base de datos y la tabla para almacenar los datos de scraping."""
|
|
try:
|
|
connection = mysql.connector.connect(
|
|
host=self.host_entry.get(),
|
|
user=self.user_entry.get(),
|
|
password=self.password_entry.get()
|
|
)
|
|
cursor = connection.cursor()
|
|
cursor.execute(f"CREATE DATABASE IF NOT EXISTS {self.database_entry.get()}")
|
|
connection.database = self.database_entry.get()
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS scraped_data (
|
|
id INT AUTO_INCREMENT PRIMARY KEY,
|
|
title TEXT NOT NULL,
|
|
link TEXT NOT NULL,
|
|
scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
""")
|
|
connection.close()
|
|
messagebox.showinfo("Éxito", "Base de datos y tabla creadas correctamente.")
|
|
except mysql.connector.Error as e:
|
|
messagebox.showerror("Error", str(e))
|
|
|
|
def start_scraping(self):
|
|
"""Inicia el scraping en un hilo separado."""
|
|
if self.scraping_thread and self.scraping_thread.is_alive():
|
|
messagebox.showwarning("Aviso", "El scraping ya está en ejecución.")
|
|
return
|
|
self.stop_event.clear()
|
|
self.scraping_thread = threading.Thread(target=self.scrape_data, daemon=True)
|
|
self.scraping_thread.start()
|
|
self.status_label.config(text="Estado: Ejecutando...", fg="green")
|
|
self.start_button.config(state="disabled")
|
|
self.stop_button.config(state="normal")
|
|
|
|
def scrape_data(self):
|
|
"""Realiza el scraping de manera continua y guarda los datos en la base de datos."""
|
|
url = self.url_entry.get()
|
|
selector = self.selector_entry.get()
|
|
|
|
try:
|
|
headers = {"User-Agent": "Mozilla/5.0"}
|
|
connection = mysql.connector.connect(
|
|
host=self.host_entry.get(),
|
|
user=self.user_entry.get(),
|
|
password=self.password_entry.get(),
|
|
database=self.database_entry.get()
|
|
)
|
|
cursor = connection.cursor()
|
|
|
|
while not self.stop_event.is_set():
|
|
response = requests.get(url, headers=headers)
|
|
soup = BeautifulSoup(response.text, "html.parser")
|
|
|
|
# Busca elementos en la página usando el selector proporcionado
|
|
elements = soup.select(selector)
|
|
if not elements:
|
|
self.status_label.config(text="Estado: Sin datos encontrados.", fg="orange")
|
|
time.sleep(5) # Pausa antes de intentar de nuevo
|
|
continue
|
|
|
|
for element in elements:
|
|
title_text = element.get_text(strip=True)
|
|
link = element.get("href", "Sin enlace") # Extrae el enlace si está disponible
|
|
|
|
# Insertar datos en la base de datos
|
|
cursor.execute("INSERT INTO scraped_data (title, link) VALUES (%s, %s)", (title_text, link))
|
|
connection.commit()
|
|
|
|
# Mostrar en la interfaz
|
|
self.scraped_data_text.config(state="normal")
|
|
self.scraped_data_text.insert("end", f"{title_text} - {link}\n")
|
|
self.scraped_data_text.see("end")
|
|
self.scraped_data_text.config(state="disabled")
|
|
|
|
self.status_label.config(text=f"Estado: Scrapeando {title_text}...", fg="green")
|
|
|
|
time.sleep(5) # Pausa entre iteraciones
|
|
|
|
connection.close()
|
|
self.status_label.config(text="Estado: Inactivo", fg="red")
|
|
except Exception as e:
|
|
messagebox.showerror("Error", str(e))
|
|
self.status_label.config(text="Estado: Error", fg="red")
|
|
|
|
def stop_scraping(self):
|
|
"""Detiene el proceso de scraping."""
|
|
self.stop_event.set()
|
|
self.start_button.config(state="normal")
|
|
self.stop_button.config(state="disabled")
|
|
|
|
def reset_database(self):
|
|
"""Elimina todos los datos de la tabla scraped_data."""
|
|
try:
|
|
connection = mysql.connector.connect(
|
|
host=self.host_entry.get(),
|
|
user=self.user_entry.get(),
|
|
password=self.password_entry.get(),
|
|
database=self.database_entry.get()
|
|
)
|
|
cursor = connection.cursor()
|
|
cursor.execute("TRUNCATE TABLE scraped_data")
|
|
connection.commit()
|
|
connection.close()
|
|
messagebox.showinfo("Éxito", "Datos reseteados correctamente.")
|
|
|
|
# Limpiar el cuadro de texto
|
|
self.scraped_data_text.config(state="normal")
|
|
self.scraped_data_text.delete("1.0", "end")
|
|
self.scraped_data_text.config(state="disabled")
|
|
except Exception as e:
|
|
messagebox.showerror("Error", str(e))
|