|
|
import asyncio
|
|
|
import atexit
|
|
|
import logging
|
|
|
import os
|
|
|
import platform
|
|
|
import signal
|
|
|
import subprocess
|
|
|
import time
|
|
|
from typing import Any, Callable, Dict, Optional
|
|
|
|
|
|
from playwright.async_api import (
|
|
|
BrowserContext,
|
|
|
Download,
|
|
|
Page,
|
|
|
Playwright,
|
|
|
async_playwright,
|
|
|
)
|
|
|
|
|
|
from .playwright_controller import PlaywrightController
|
|
|
|
|
|
|
|
|
class BrowserBB:
|
|
|
"""Manages browser instance, context, and page lifecycle."""
|
|
|
|
|
|
def __init__(
|
|
|
self,
|
|
|
viewport_height: int,
|
|
|
viewport_width: int,
|
|
|
headless: bool,
|
|
|
page_script_path: str,
|
|
|
browser_channel: str = "firefox",
|
|
|
browser_data_dir: str | None = None,
|
|
|
downloads_folder: str | None = None,
|
|
|
to_resize_viewport: bool = True,
|
|
|
single_tab_mode: bool = True,
|
|
|
animate_actions: bool = False,
|
|
|
use_browser_base: bool = False,
|
|
|
logger: Optional[logging.Logger] = None,
|
|
|
):
|
|
|
self.headless = headless
|
|
|
self.page_script_path = page_script_path
|
|
|
self.browser_channel = browser_channel
|
|
|
self.browser_data_dir = browser_data_dir
|
|
|
self.downloads_folder = downloads_folder
|
|
|
self.to_resize_viewport = to_resize_viewport
|
|
|
self.animate_actions = animate_actions
|
|
|
self.single_tab_mode = single_tab_mode
|
|
|
self.use_browser_base = use_browser_base
|
|
|
self.logger = logger or logging.getLogger("browser_manager")
|
|
|
|
|
|
self._viewport_height = viewport_height
|
|
|
self._viewport_width = viewport_width
|
|
|
|
|
|
|
|
|
if not isinstance(self._viewport_width, int) or self._viewport_width <= 0:
|
|
|
raise ValueError(
|
|
|
f"Error: Browser_manager.Browser: Invalid viewport width: {self._viewport_width}. Must be a positive integer."
|
|
|
)
|
|
|
if not isinstance(self._viewport_height, int) or self._viewport_height <= 0:
|
|
|
raise ValueError(
|
|
|
f"Error: Browser_manager.Browser:Invalid viewport height: {self._viewport_height}. Must be a positive integer."
|
|
|
)
|
|
|
assert isinstance(self.headless, bool), (
|
|
|
f"Error: Browser_manager.Browser: headless must be a boolean, got {type(self.headless)}"
|
|
|
)
|
|
|
if page_script_path is None:
|
|
|
page_script_path = os.path.join(
|
|
|
os.path.abspath(os.path.dirname(__file__)), "page_script.js"
|
|
|
)
|
|
|
self.page_script_path = page_script_path
|
|
|
assert isinstance(page_script_path, str), (
|
|
|
f"Error: Browser_manager.Browser: page_script_path must be a string, got {type(self.page_script_path)}"
|
|
|
)
|
|
|
assert os.path.exists(self.page_script_path), (
|
|
|
f"Error: Browser_manager.Browser: page_script_path does not exist: {self.page_script_path}"
|
|
|
)
|
|
|
|
|
|
assert isinstance(self.browser_channel, str) and (
|
|
|
self.browser_channel in ["chromium", "firefox", "webkit"]
|
|
|
), (
|
|
|
f"Error: Browser_manager.Browser: browser_channel must be one of ['chromium', 'firefox', 'webkit'], got {self.browser_channel}"
|
|
|
)
|
|
|
|
|
|
|
|
|
self._playwright: Playwright | None = None
|
|
|
self._context: BrowserContext | None = None
|
|
|
self._page: Page | None = None
|
|
|
self.browser = None
|
|
|
self.session = None
|
|
|
self.xvfb_process = None
|
|
|
|
|
|
|
|
|
self._captcha_event = asyncio.Event()
|
|
|
self._captcha_event.set()
|
|
|
self._download_handler: Callable[[Download], None] | None = None
|
|
|
|
|
|
self._playwright_controller = PlaywrightController(
|
|
|
animate_actions=self.animate_actions,
|
|
|
downloads_folder=self.downloads_folder,
|
|
|
viewport_width=self._viewport_width,
|
|
|
viewport_height=self._viewport_height,
|
|
|
_download_handler=self._download_handler,
|
|
|
to_resize_viewport=self.to_resize_viewport,
|
|
|
single_tab_mode=self.single_tab_mode,
|
|
|
logger=self.logger,
|
|
|
)
|
|
|
|
|
|
def set_download_handler(self, handler: Callable[[Download], None]) -> None:
|
|
|
"""Set the download handler for the browser."""
|
|
|
self._download_handler = handler
|
|
|
self._playwright_controller._download_handler = handler
|
|
|
|
|
|
def set_captcha_solved_callback(self, callback: Callable[[bool], None]) -> None:
|
|
|
"""Set callback to be called when captcha status changes."""
|
|
|
self._captcha_solved_callback = callback
|
|
|
|
|
|
async def init(
|
|
|
self,
|
|
|
start_page: str,
|
|
|
shared_data_point=None,
|
|
|
) -> None:
|
|
|
"""Initialize the browser, context, and page."""
|
|
|
self._playwright = await async_playwright().start()
|
|
|
self.shared_data_point = shared_data_point
|
|
|
|
|
|
if self.use_browser_base:
|
|
|
await self._init_browser_base(self.shared_data_point)
|
|
|
elif self.browser_data_dir is None:
|
|
|
await self._init_regular_browser(channel=self.browser_channel)
|
|
|
else:
|
|
|
await self._init_persistent_browser()
|
|
|
|
|
|
|
|
|
await self._setup_common_browser_features(start_page)
|
|
|
|
|
|
async def _init_browser_base(self, shared_data_point) -> None:
|
|
|
"""Initialize BrowserBase connection, defaults to chromium."""
|
|
|
|
|
|
import browserbase
|
|
|
from browserbase import Browserbase
|
|
|
|
|
|
self.logger.info("Initializing BrowserBase session...")
|
|
|
self.bb = Browserbase(api_key=os.environ["BROWSERBASE_API_KEY"])
|
|
|
|
|
|
while True:
|
|
|
try:
|
|
|
self.session = self.bb.sessions.create(
|
|
|
project_id=os.environ["BROWSERBASE_PROJECT_ID"],
|
|
|
proxies=True,
|
|
|
browser_settings={"advanced_stealth": True},
|
|
|
keep_alive=True,
|
|
|
timeout=7200,
|
|
|
region="us-east-1",
|
|
|
)
|
|
|
break
|
|
|
except browserbase.RateLimitError:
|
|
|
self.logger.warning(
|
|
|
"Rate limit exceeded while trying to create BrowserBase session. Retrying in 10 seconds..."
|
|
|
)
|
|
|
await asyncio.sleep(10)
|
|
|
|
|
|
assert self.session.id is not None
|
|
|
assert self.session.status == "RUNNING", (
|
|
|
f"Session status is {self.session.status}"
|
|
|
)
|
|
|
|
|
|
chromium = self._playwright.chromium
|
|
|
self.browser = await chromium.connect_over_cdp(self.session.connect_url)
|
|
|
self.logger.info(
|
|
|
f"Connected to BrowserBase session: https://browserbase.com/sessions/{self.session.id}"
|
|
|
)
|
|
|
|
|
|
self._context = self.browser.contexts[0]
|
|
|
assert len(self._context.pages) == 1
|
|
|
self._page = self._context.pages[0]
|
|
|
|
|
|
|
|
|
def handle_console(msg):
|
|
|
"""Handle captcha detection and solving."""
|
|
|
if msg.text == "browserbase-solving-started":
|
|
|
self.logger.info("Captcha Solving In Progress!!")
|
|
|
if shared_data_point:
|
|
|
shared_data_point.set_encountered_captcha(True)
|
|
|
self._captcha_event.clear()
|
|
|
elif msg.text == "browserbase-solving-finished":
|
|
|
self.logger.info("Captcha Solving Completed!!")
|
|
|
|
|
|
async def delayed_resume():
|
|
|
await asyncio.sleep(3)
|
|
|
await self._page.wait_for_load_state("networkidle")
|
|
|
self._captcha_event.set()
|
|
|
|
|
|
asyncio.create_task(delayed_resume())
|
|
|
|
|
|
self._context.on("console", handle_console)
|
|
|
self._page.on("console", handle_console)
|
|
|
|
|
|
async def _init_regular_browser(self, channel: str = "chromium") -> None:
|
|
|
"""Initialize regular browser according to the specified channel."""
|
|
|
if not self.headless:
|
|
|
self.start_xvfb()
|
|
|
|
|
|
launch_args: Dict[str, Any] = {"headless": self.headless}
|
|
|
|
|
|
if channel == "chromium":
|
|
|
self.browser = await self._playwright.chromium.launch(**launch_args)
|
|
|
elif channel == "firefox":
|
|
|
self.browser = await self._playwright.firefox.launch(**launch_args)
|
|
|
elif channel == "webkit":
|
|
|
self.browser = await self._playwright.webkit.launch(**launch_args)
|
|
|
else:
|
|
|
raise ValueError(
|
|
|
f"Unsupported browser channel: {channel}. Supported channels are 'chromium', 'firefox', and 'webkit'."
|
|
|
)
|
|
|
|
|
|
self._context = await self.browser.new_context(
|
|
|
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0"
|
|
|
)
|
|
|
|
|
|
self._page = await self._context.new_page()
|
|
|
|
|
|
async def _init_persistent_browser(self) -> None:
|
|
|
"""Initialize persistent browser with data directory."""
|
|
|
if not self.headless:
|
|
|
self.start_xvfb()
|
|
|
|
|
|
launch_args: Dict[str, Any] = {"headless": self.headless}
|
|
|
self._context = await self._playwright.chromium.launch_persistent_context(
|
|
|
self.browser_data_dir, **launch_args
|
|
|
)
|
|
|
self._page = await self._context.new_page()
|
|
|
|
|
|
async def _setup_common_browser_features(self, start_page: str) -> None:
|
|
|
"""Set up features common to all browser types."""
|
|
|
self._context.set_default_timeout(60000)
|
|
|
await self._playwright_controller.on_new_page(self._page)
|
|
|
assert self._page is not None
|
|
|
|
|
|
|
|
|
if self.single_tab_mode:
|
|
|
self._context.on(
|
|
|
"page", lambda new_pg: self._handle_new_page_safe(new_pg, self._page)
|
|
|
)
|
|
|
|
|
|
|
|
|
if self._download_handler:
|
|
|
self._page.on("download", self._download_handler)
|
|
|
|
|
|
|
|
|
await self._page.set_viewport_size(
|
|
|
{"width": self._viewport_width, "height": self._viewport_height}
|
|
|
)
|
|
|
|
|
|
await self._page.add_init_script(path=self.page_script_path)
|
|
|
|
|
|
|
|
|
await self._page.goto(start_page)
|
|
|
await self._page.wait_for_load_state()
|
|
|
|
|
|
async def _handle_new_page_safe(self, new_pg: Page, main_page: Page) -> None:
|
|
|
"""Safely handle new pages in single tab mode."""
|
|
|
try:
|
|
|
await new_pg.wait_for_load_state("domcontentloaded")
|
|
|
|
|
|
|
|
|
if new_pg == main_page or new_pg.url == main_page.url:
|
|
|
self.logger.info("New tab is same as current page, not closing.")
|
|
|
return
|
|
|
|
|
|
new_url = new_pg.url
|
|
|
await new_pg.close()
|
|
|
await self._playwright_controller.visit_page(main_page, new_url)
|
|
|
except Exception as e:
|
|
|
self.logger.warning(f"Error in handle_new_page_safe: {e}")
|
|
|
|
|
|
def start_xvfb(self) -> None:
|
|
|
"""Start Xvfb virtual display server (Linux only)."""
|
|
|
|
|
|
if platform.system() != "Linux":
|
|
|
return
|
|
|
|
|
|
display_num = 99
|
|
|
self.xvfb_process = subprocess.Popen(
|
|
|
["Xvfb", f":{display_num}", "-screen", "0", "1280x1024x24", "-ac"],
|
|
|
stdout=subprocess.DEVNULL,
|
|
|
stderr=subprocess.DEVNULL,
|
|
|
)
|
|
|
os.environ["DISPLAY"] = f":{display_num}"
|
|
|
|
|
|
|
|
|
time.sleep(1)
|
|
|
|
|
|
|
|
|
atexit.register(self.stop_xvfb)
|
|
|
|
|
|
def stop_xvfb(self) -> None:
|
|
|
"""Stop the Xvfb process if it's running."""
|
|
|
if self.xvfb_process:
|
|
|
self.xvfb_process.send_signal(signal.SIGTERM)
|
|
|
self.xvfb_process.wait()
|
|
|
self.xvfb_process = None
|
|
|
|
|
|
async def wait_for_captcha_resolution(self) -> None:
|
|
|
"""Wait for captcha to be resolved if one is being solved."""
|
|
|
await self._captcha_event.wait()
|
|
|
|
|
|
@property
|
|
|
def page(self) -> Page | None:
|
|
|
"""Get the current page."""
|
|
|
return self._page
|
|
|
|
|
|
@page.setter
|
|
|
def page(self, value):
|
|
|
self._page = value
|
|
|
|
|
|
@property
|
|
|
def context(self) -> BrowserContext | None:
|
|
|
"""Get the browser context."""
|
|
|
return self._context
|
|
|
|
|
|
@property
|
|
|
def playwright_controller(self):
|
|
|
"""Get the playwright controller."""
|
|
|
return self._playwright_controller
|
|
|
|
|
|
async def close(self) -> None:
|
|
|
"""Close the browser and clean up resources."""
|
|
|
self.logger.info("Closing browser...")
|
|
|
|
|
|
if self._page is not None:
|
|
|
await self._page.close()
|
|
|
self._page = None
|
|
|
|
|
|
if self._context is not None:
|
|
|
await self._context.close()
|
|
|
self._context = None
|
|
|
|
|
|
if self._playwright is not None:
|
|
|
await self._playwright.stop()
|
|
|
self._playwright = None
|
|
|
|
|
|
if self.browser:
|
|
|
if self.use_browser_base and self.session:
|
|
|
self.bb.sessions.update(
|
|
|
self.session.id,
|
|
|
status="REQUEST_RELEASE",
|
|
|
project_id=os.environ["BROWSERBASE_PROJECT_ID"],
|
|
|
)
|
|
|
await self.browser.close()
|
|
|
self.browser = None
|
|
|
|
|
|
if not self.headless:
|
|
|
self.stop_xvfb()
|
|
|
|