147 lines
6.8 KiB
Python
147 lines
6.8 KiB
Python
import asyncio
|
|
from playwright.async_api import async_playwright
|
|
from bs4 import BeautifulSoup
|
|
import json
|
|
from tkinter import messagebox
|
|
import time
|
|
import requests
|
|
import threading
|
|
import os
|
|
|
|
# Archivos para persistencia y depuración
|
|
SCRAPING_FILE = "amazon_productos_resultados.json"
|
|
DEBUG_HTML_FILE = "amazon_debugging_output.html"
|
|
# URL BASE de búsqueda de Amazon
|
|
BASE_URL = "https://www.amazon.es/s?k={search_term}&ref=nb_sb_noss"
|
|
|
|
|
|
def run_scraper_async(search_term, root_app):
|
|
"""
|
|
Función de punto de entrada para el hilo de Python.
|
|
Lanza el bucle asíncrono de Playwright.
|
|
"""
|
|
try:
|
|
asyncio.run(scrape_products_playwright(search_term, root_app))
|
|
except Exception as e:
|
|
error_message = f"Error al iniciar el runtime de Playwright: {e}"
|
|
root_app.after(0, root_app.update_activity_status, error_message)
|
|
root_app.after(0, root_app._display_scraping_results, [{"error": error_message}], search_term)
|
|
|
|
|
|
async def scrape_products_playwright(search_term, root_app):
|
|
"""
|
|
Realiza el scraping usando Playwright con estrategia de espera por tiempo fijo
|
|
y extracción basada en la estructura de etiquetas.
|
|
"""
|
|
search_term_formatted = requests.utils.quote(search_term)
|
|
url = BASE_URL.format(search_term=search_term_formatted)
|
|
product_data = []
|
|
|
|
try:
|
|
async with async_playwright() as p:
|
|
# 1. Configuración de Lanzamiento de Navegador (Visible para depuración)
|
|
browser = await p.firefox.launch(headless=True, slow_mo=100)
|
|
|
|
# 2. Configuración del Contexto de Navegación
|
|
context = await browser.new_context(
|
|
user_agent='Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/115.0',
|
|
locale='es-ES',
|
|
viewport={'width': 1920, 'height': 1080},
|
|
accept_downloads=False
|
|
)
|
|
page = await context.new_page()
|
|
|
|
# Navegar
|
|
root_app.after(0, root_app.update_activity_status, f"Playwright: Navegando a {url}")
|
|
await page.goto(url, timeout=90000, wait_until='domcontentloaded')
|
|
|
|
|
|
# 3. ACEPTAR COOKIES 🍪
|
|
try:
|
|
cookie_acceptor_selector = '#sp-cc-accept'
|
|
await page.wait_for_selector(cookie_acceptor_selector, timeout=10000)
|
|
await page.click(cookie_acceptor_selector)
|
|
except Exception:
|
|
pass
|
|
|
|
# 4. ESPERA FIJA Y CONFIRMACIÓN
|
|
root_app.after(0, root_app.update_activity_status, "Playwright: Espera forzada (10s) para carga dinámica...")
|
|
await page.wait_for_timeout(10000)
|
|
|
|
await page.wait_for_selector('#search', timeout=5000)
|
|
|
|
root_app.after(0, root_app.update_activity_status, "Playwright: Contenido cargado. Extrayendo datos...")
|
|
content = await page.content()
|
|
await browser.close()
|
|
|
|
# --- DEPURACIÓN: GUARDAR EL HTML CARGADO ---
|
|
with open(DEBUG_HTML_FILE, 'w', encoding='utf-8') as f:
|
|
f.write(content)
|
|
# -------------------------------------------
|
|
|
|
# --- COMIENZA EL PARSEO CON EXTRACCIÓN TOLERANTE ---
|
|
soup = BeautifulSoup(content, 'html.parser')
|
|
# Contenedor de producto estable
|
|
product_listings = soup.find_all('div', attrs={'data-component-type': 's-search-result'})
|
|
|
|
if not product_listings:
|
|
return [{"error": f"No se encontraron listados. El contenedor 's-search-result' no fue encontrado. Posible cambio de selector principal."}]
|
|
|
|
for listing in product_listings:
|
|
try:
|
|
# 1. Nombre del Producto (NOMBRE)
|
|
# Buscamos el h2 que contiene el título.
|
|
title_h2_tag = listing.find('h2')
|
|
# Luego buscamos el primer span dentro de él (que es el texto del título)
|
|
nombre = title_h2_tag.find('span').get_text(strip=True) if title_h2_tag and title_h2_tag.find('span') else "N/D"
|
|
|
|
# 2. Precio (PRECIO)
|
|
# Buscamos en el área de precio y luego buscamos el span a-offscreen (el precio real)
|
|
price_container = listing.find('span', class_='a-price')
|
|
precio_tag = price_container.find('span', class_='a-offscreen') if price_container else None
|
|
precio = precio_tag.get_text(strip=True) if precio_tag else "N/D"
|
|
|
|
# 3. Vendedor/Marca (VENDEDOR)
|
|
# Buscamos una etiqueta que contenga la palabra 'marca' o 'vendido por'
|
|
vendedor = "Marca/Vendedor (N/D)"
|
|
# Intentamos usar el aria-label si está disponible, es la fuente más limpia
|
|
h2_tag_for_vendor = listing.find('h2')
|
|
if h2_tag_for_vendor and 'aria-label' in h2_tag_for_vendor.attrs:
|
|
vendedor = h2_tag_for_vendor['aria-label'].split(',')[0]
|
|
|
|
# 4. URL de la Imagen (IMAGEN_URL)
|
|
image_tag = listing.find('img', class_='s-image')
|
|
imagen_url = image_tag['src'] if image_tag and 'src' in image_tag.attrs else "No URL de imagen"
|
|
|
|
if nombre == "N/D" or precio == "N/D" or 'Sponsored' in nombre:
|
|
continue
|
|
|
|
product_data.append({
|
|
"nombre": nombre,
|
|
"precio": precio,
|
|
"vendedor": vendedor,
|
|
"imagen_url": imagen_url
|
|
})
|
|
except Exception as e:
|
|
# Este error es normal si el listado es un anuncio o elemento atípico.
|
|
# El log de errores solo aparece en la consola de depuración.
|
|
# print(f"Error al procesar listado de producto: {e}")
|
|
pass # Omitir listados que causan error
|
|
|
|
# Guardar en JSON y mostrar resultados
|
|
with open(SCRAPING_FILE, 'w', encoding='utf-8') as f:
|
|
json.dump(product_data, f, indent=4, ensure_ascii=False)
|
|
|
|
root_app.after(0, root_app._display_scraping_results, product_data, search_term)
|
|
|
|
except Exception as e:
|
|
error_message = f"Error crítico durante el scraping: {e}"
|
|
root_app.after(0, root_app.update_activity_status, error_message)
|
|
root_app.after(0, root_app._display_scraping_results, [{"error": error_message}], search_term)
|
|
|
|
|
|
def start_playwright_scraper(search_term, root_app):
|
|
"""
|
|
Lanzador principal que crea un hilo de Python para ejecutar Playwright.
|
|
"""
|
|
threading.Thread(target=run_scraper_async, args=(search_term, root_app), daemon=True).start() |