106 lines
3.7 KiB
Python
106 lines
3.7 KiB
Python
import tkinter as tk
|
|
from tkinter import Frame, Button, Label, Entry, Listbox, StringVar, messagebox
|
|
import mysql.connector
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
from app.widgets.abc import ThreadedTab
|
|
|
|
class WebScrapingTab(ThreadedTab):
|
|
|
|
def __init__(self, root: Frame | tk.Tk, stop_event, **kwargs):
|
|
# Inicializa los atributos necesarios antes de llamar a la clase base
|
|
self.url = StringVar()
|
|
self.data = []
|
|
self.conn = None # La conexión se inicializa después
|
|
super().__init__(root, stop_event, **kwargs) # Llama al constructor de ThreadedTab
|
|
self.conn = self.create_database() # Crea o conecta a la base de datos
|
|
|
|
def build(self):
|
|
# Main frame
|
|
self.scraping_frame = Frame(self)
|
|
self.scraping_frame.pack(fill="both", expand=True)
|
|
|
|
# Input field for URL
|
|
Label(self.scraping_frame, text="Enter URL:", font=("Arial", 12)).pack(pady=5)
|
|
Entry(self.scraping_frame, textvariable=self.url, font=("Arial", 12), width=50).pack(pady=5)
|
|
|
|
# Buttons for actions
|
|
Button(self.scraping_frame, text="Scrape", command=self.scrape_website).pack(pady=5)
|
|
Button(self.scraping_frame, text="View Data", command=self.view_data).pack(pady=5)
|
|
|
|
# Listbox to display scraped data
|
|
self.data_listbox = Listbox(self.scraping_frame, font=("Arial", 10), width=80, height=20)
|
|
self.data_listbox.pack(pady=10)
|
|
|
|
def create_database(self):
|
|
# Connect to MySQL database
|
|
conn = mysql.connector.connect(
|
|
host="127.0.0.1 ",
|
|
user="santipy",
|
|
password="1234",
|
|
database="scraping_db"
|
|
)
|
|
cursor = conn.cursor()
|
|
|
|
# Crear la tabla si no existe
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS scraped_data (
|
|
id INT AUTO_INCREMENT PRIMARY KEY,
|
|
title VARCHAR(255),
|
|
link TEXT
|
|
)
|
|
""")
|
|
conn.commit()
|
|
return conn
|
|
|
|
|
|
def save_to_database(self):
|
|
cursor = self.conn.cursor()
|
|
query = "INSERT INTO scraped_data (title, link) VALUES (%s, %s)"
|
|
cursor.executemany(query, self.data)
|
|
self.conn.commit()
|
|
|
|
def scrape_website(self):
|
|
url = self.url.get()
|
|
if not url:
|
|
messagebox.showwarning("Warning", "Please enter a URL.")
|
|
return
|
|
|
|
try:
|
|
response = requests.get(url)
|
|
response.raise_for_status()
|
|
except requests.RequestException as e:
|
|
messagebox.showerror("Error", f"Failed to fetch URL: {e}")
|
|
return
|
|
|
|
soup = BeautifulSoup(response.text, "html.parser")
|
|
items = soup.select("h2 a") # Modify selector based on website structure
|
|
|
|
self.data = [(item.get_text(strip=True), item.get("href")) for item in items]
|
|
|
|
if self.data:
|
|
self.save_to_database()
|
|
messagebox.showinfo("Success", f"Scraped {len(self.data)} items and saved to database.")
|
|
else:
|
|
messagebox.showinfo("No Data", "No data found on the page.")
|
|
|
|
self.update_listbox()
|
|
|
|
def update_listbox(self):
|
|
self.data_listbox.delete(0, "end")
|
|
for title, link in self.data:
|
|
self.data_listbox.insert("end", f"Title: {title} | Link: {link}")
|
|
|
|
def view_data(self):
|
|
cursor = self.conn.cursor()
|
|
cursor.execute("SELECT title, link FROM scraped_data")
|
|
rows = cursor.fetchall()
|
|
|
|
self.data_listbox.delete(0, "end")
|
|
for title, link in rows:
|
|
self.data_listbox.insert("end", f"Title: {title} | Link: {link}")
|
|
|
|
def task(self):
|
|
# Placeholder for any background task
|
|
pass
|