ProyectoPSP/scraping_logic.py

147 lines
6.8 KiB
Python

import asyncio
from playwright.async_api import async_playwright
from bs4 import BeautifulSoup
import json
from tkinter import messagebox
import time
import requests
import threading
import os
# Archivos para persistencia y depuración
SCRAPING_FILE = "amazon_productos_resultados.json"
DEBUG_HTML_FILE = "amazon_debugging_output.html"
# URL BASE de búsqueda de Amazon
BASE_URL = "https://www.amazon.es/s?k={search_term}&ref=nb_sb_noss"
def run_scraper_async(search_term, root_app):
"""
Función de punto de entrada para el hilo de Python.
Lanza el bucle asíncrono de Playwright.
"""
try:
asyncio.run(scrape_products_playwright(search_term, root_app))
except Exception as e:
error_message = f"Error al iniciar el runtime de Playwright: {e}"
root_app.after(0, root_app.update_activity_status, error_message)
root_app.after(0, root_app._display_scraping_results, [{"error": error_message}], search_term)
async def scrape_products_playwright(search_term, root_app):
"""
Realiza el scraping usando Playwright con estrategia de espera por tiempo fijo
y extracción basada en la estructura de etiquetas.
"""
search_term_formatted = requests.utils.quote(search_term)
url = BASE_URL.format(search_term=search_term_formatted)
product_data = []
try:
async with async_playwright() as p:
# 1. Configuración de Lanzamiento de Navegador (Visible para depuración)
browser = await p.firefox.launch(headless=True, slow_mo=100)
# 2. Configuración del Contexto de Navegación
context = await browser.new_context(
user_agent='Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/115.0',
locale='es-ES',
viewport={'width': 1920, 'height': 1080},
accept_downloads=False
)
page = await context.new_page()
# Navegar
root_app.after(0, root_app.update_activity_status, f"Playwright: Navegando a {url}")
await page.goto(url, timeout=90000, wait_until='domcontentloaded')
# 3. ACEPTAR COOKIES 🍪
try:
cookie_acceptor_selector = '#sp-cc-accept'
await page.wait_for_selector(cookie_acceptor_selector, timeout=10000)
await page.click(cookie_acceptor_selector)
except Exception:
pass
# 4. ESPERA FIJA Y CONFIRMACIÓN
root_app.after(0, root_app.update_activity_status, "Playwright: Espera forzada (10s) para carga dinámica...")
await page.wait_for_timeout(10000)
await page.wait_for_selector('#search', timeout=5000)
root_app.after(0, root_app.update_activity_status, "Playwright: Contenido cargado. Extrayendo datos...")
content = await page.content()
await browser.close()
# --- DEPURACIÓN: GUARDAR EL HTML CARGADO ---
with open(DEBUG_HTML_FILE, 'w', encoding='utf-8') as f:
f.write(content)
# -------------------------------------------
# --- COMIENZA EL PARSEO CON EXTRACCIÓN TOLERANTE ---
soup = BeautifulSoup(content, 'html.parser')
# Contenedor de producto estable
product_listings = soup.find_all('div', attrs={'data-component-type': 's-search-result'})
if not product_listings:
return [{"error": f"No se encontraron listados. El contenedor 's-search-result' no fue encontrado. Posible cambio de selector principal."}]
for listing in product_listings:
try:
# 1. Nombre del Producto (NOMBRE)
# Buscamos el h2 que contiene el título.
title_h2_tag = listing.find('h2')
# Luego buscamos el primer span dentro de él (que es el texto del título)
nombre = title_h2_tag.find('span').get_text(strip=True) if title_h2_tag and title_h2_tag.find('span') else "N/D"
# 2. Precio (PRECIO)
# Buscamos en el área de precio y luego buscamos el span a-offscreen (el precio real)
price_container = listing.find('span', class_='a-price')
precio_tag = price_container.find('span', class_='a-offscreen') if price_container else None
precio = precio_tag.get_text(strip=True) if precio_tag else "N/D"
# 3. Vendedor/Marca (VENDEDOR)
# Buscamos una etiqueta que contenga la palabra 'marca' o 'vendido por'
vendedor = "Marca/Vendedor (N/D)"
# Intentamos usar el aria-label si está disponible, es la fuente más limpia
h2_tag_for_vendor = listing.find('h2')
if h2_tag_for_vendor and 'aria-label' in h2_tag_for_vendor.attrs:
vendedor = h2_tag_for_vendor['aria-label'].split(',')[0]
# 4. URL de la Imagen (IMAGEN_URL)
image_tag = listing.find('img', class_='s-image')
imagen_url = image_tag['src'] if image_tag and 'src' in image_tag.attrs else "No URL de imagen"
if nombre == "N/D" or precio == "N/D" or 'Sponsored' in nombre:
continue
product_data.append({
"nombre": nombre,
"precio": precio,
"vendedor": vendedor,
"imagen_url": imagen_url
})
except Exception as e:
# Este error es normal si el listado es un anuncio o elemento atípico.
# El log de errores solo aparece en la consola de depuración.
# print(f"Error al procesar listado de producto: {e}")
pass # Omitir listados que causan error
# Guardar en JSON y mostrar resultados
with open(SCRAPING_FILE, 'w', encoding='utf-8') as f:
json.dump(product_data, f, indent=4, ensure_ascii=False)
root_app.after(0, root_app._display_scraping_results, product_data, search_term)
except Exception as e:
error_message = f"Error crítico durante el scraping: {e}"
root_app.after(0, root_app.update_activity_status, error_message)
root_app.after(0, root_app._display_scraping_results, [{"error": error_message}], search_term)
def start_playwright_scraper(search_term, root_app):
"""
Lanzador principal que crea un hilo de Python para ejecutar Playwright.
"""
threading.Thread(target=run_scraper_async, args=(search_term, root_app), daemon=True).start()