import asyncio from playwright.async_api import async_playwright from bs4 import BeautifulSoup import json from tkinter import messagebox import time import requests import threading import os # Archivos para persistencia y depuración SCRAPING_FILE = "amazon_productos_resultados.json" DEBUG_HTML_FILE = "amazon_debugging_output.html" # URL BASE de búsqueda de Amazon BASE_URL = "https://www.amazon.es/s?k={search_term}&ref=nb_sb_noss" def run_scraper_async(search_term, root_app): """ Función de punto de entrada para el hilo de Python. Lanza el bucle asíncrono de Playwright. """ try: asyncio.run(scrape_products_playwright(search_term, root_app)) except Exception as e: error_message = f"Error al iniciar el runtime de Playwright: {e}" root_app.after(0, root_app.update_activity_status, error_message) root_app.after(0, root_app._display_scraping_results, [{"error": error_message}], search_term) async def scrape_products_playwright(search_term, root_app): """ Realiza el scraping usando Playwright con estrategia de espera por tiempo fijo y extracción basada en la estructura de etiquetas. """ search_term_formatted = requests.utils.quote(search_term) url = BASE_URL.format(search_term=search_term_formatted) product_data = [] try: async with async_playwright() as p: # 1. Configuración de Lanzamiento de Navegador (Visible para depuración) browser = await p.firefox.launch(headless=True, slow_mo=100) # 2. Configuración del Contexto de Navegación context = await browser.new_context( user_agent='Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/115.0', locale='es-ES', viewport={'width': 1920, 'height': 1080}, accept_downloads=False ) page = await context.new_page() # Navegar root_app.after(0, root_app.update_activity_status, f"Playwright: Navegando a {url}") await page.goto(url, timeout=90000, wait_until='domcontentloaded') # 3. ACEPTAR COOKIES 🍪 try: cookie_acceptor_selector = '#sp-cc-accept' await page.wait_for_selector(cookie_acceptor_selector, timeout=10000) await page.click(cookie_acceptor_selector) except Exception: pass # 4. ESPERA FIJA Y CONFIRMACIÓN root_app.after(0, root_app.update_activity_status, "Playwright: Espera forzada (10s) para carga dinámica...") await page.wait_for_timeout(10000) await page.wait_for_selector('#search', timeout=5000) root_app.after(0, root_app.update_activity_status, "Playwright: Contenido cargado. Extrayendo datos...") content = await page.content() await browser.close() # --- DEPURACIÓN: GUARDAR EL HTML CARGADO --- with open(DEBUG_HTML_FILE, 'w', encoding='utf-8') as f: f.write(content) # ------------------------------------------- # --- COMIENZA EL PARSEO CON EXTRACCIÓN TOLERANTE --- soup = BeautifulSoup(content, 'html.parser') # Contenedor de producto estable product_listings = soup.find_all('div', attrs={'data-component-type': 's-search-result'}) if not product_listings: return [{"error": f"No se encontraron listados. El contenedor 's-search-result' no fue encontrado. Posible cambio de selector principal."}] for listing in product_listings: try: # 1. Nombre del Producto (NOMBRE) # Buscamos el h2 que contiene el título. title_h2_tag = listing.find('h2') # Luego buscamos el primer span dentro de él (que es el texto del título) nombre = title_h2_tag.find('span').get_text(strip=True) if title_h2_tag and title_h2_tag.find('span') else "N/D" # 2. Precio (PRECIO) # Buscamos en el área de precio y luego buscamos el span a-offscreen (el precio real) price_container = listing.find('span', class_='a-price') precio_tag = price_container.find('span', class_='a-offscreen') if price_container else None precio = precio_tag.get_text(strip=True) if precio_tag else "N/D" # 3. Vendedor/Marca (VENDEDOR) # Buscamos una etiqueta que contenga la palabra 'marca' o 'vendido por' vendedor = "Marca/Vendedor (N/D)" # Intentamos usar el aria-label si está disponible, es la fuente más limpia h2_tag_for_vendor = listing.find('h2') if h2_tag_for_vendor and 'aria-label' in h2_tag_for_vendor.attrs: vendedor = h2_tag_for_vendor['aria-label'].split(',')[0] # 4. URL de la Imagen (IMAGEN_URL) image_tag = listing.find('img', class_='s-image') imagen_url = image_tag['src'] if image_tag and 'src' in image_tag.attrs else "No URL de imagen" if nombre == "N/D" or precio == "N/D" or 'Sponsored' in nombre: continue product_data.append({ "nombre": nombre, "precio": precio, "vendedor": vendedor, "imagen_url": imagen_url }) except Exception as e: # Este error es normal si el listado es un anuncio o elemento atípico. # El log de errores solo aparece en la consola de depuración. # print(f"Error al procesar listado de producto: {e}") pass # Omitir listados que causan error # Guardar en JSON y mostrar resultados with open(SCRAPING_FILE, 'w', encoding='utf-8') as f: json.dump(product_data, f, indent=4, ensure_ascii=False) root_app.after(0, root_app._display_scraping_results, product_data, search_term) except Exception as e: error_message = f"Error crítico durante el scraping: {e}" root_app.after(0, root_app.update_activity_status, error_message) root_app.after(0, root_app._display_scraping_results, [{"error": error_message}], search_term) def start_playwright_scraper(search_term, root_app): """ Lanzador principal que crea un hilo de Python para ejecutar Playwright. """ threading.Thread(target=run_scraper_async, args=(search_term, root_app), daemon=True).start()