|
|
import ast
|
|
|
import asyncio
|
|
|
import io
|
|
|
import json
|
|
|
import logging
|
|
|
import os
|
|
|
from typing import Any, Dict, List, Tuple
|
|
|
from urllib.parse import quote_plus
|
|
|
|
|
|
from openai import AsyncOpenAI
|
|
|
from PIL import Image
|
|
|
from playwright.async_api import BrowserContext, Download, Page
|
|
|
from tenacity import before_sleep_log, retry, stop_after_attempt, wait_exponential
|
|
|
|
|
|
from ._prompts import get_computer_use_system_prompt
|
|
|
from .browser.playwright_controller import PlaywrightController
|
|
|
from .types import (
|
|
|
AssistantMessage,
|
|
|
FunctionCall,
|
|
|
ImageObj,
|
|
|
LLMMessage,
|
|
|
ModelResponse,
|
|
|
SystemMessage,
|
|
|
UserMessage,
|
|
|
WebSurferEvent,
|
|
|
message_to_openai_format,
|
|
|
)
|
|
|
from .utils import get_trimmed_url
|
|
|
|
|
|
|
|
|
class FaraAgent:
|
|
|
DEFAULT_START_PAGE = "https://www.bing.com/"
|
|
|
|
|
|
MLM_PROCESSOR_IM_CFG = {
|
|
|
"min_pixels": 3136,
|
|
|
"max_pixels": 12845056,
|
|
|
"patch_size": 14,
|
|
|
"merge_size": 2,
|
|
|
}
|
|
|
|
|
|
SCREENSHOT_TOKENS = 1105
|
|
|
USER_MESSAGE = "Here is the next screenshot. Think about what to do next."
|
|
|
MAX_URL_LENGTH = 100
|
|
|
|
|
|
def __init__(
|
|
|
self,
|
|
|
browser_manager: Any,
|
|
|
client_config: dict,
|
|
|
downloads_folder: str | None = None,
|
|
|
start_page: str | None = "about:blank",
|
|
|
animate_actions: bool = False,
|
|
|
single_tab_mode: bool = True,
|
|
|
max_n_images: int = 3,
|
|
|
fn_call_template: str = "default",
|
|
|
model_call_timeout: int = 20,
|
|
|
max_rounds: int = 10,
|
|
|
save_screenshots: bool = False,
|
|
|
logger: logging.Logger | None = None,
|
|
|
):
|
|
|
self.downloads_folder = downloads_folder
|
|
|
if not os.path.exists(self.downloads_folder or "") and self.downloads_folder:
|
|
|
os.makedirs(self.downloads_folder)
|
|
|
self.single_tab_mode = single_tab_mode
|
|
|
self.start_page = start_page or self.DEFAULT_START_PAGE
|
|
|
self.animate_actions = animate_actions
|
|
|
self.browser_manager = browser_manager
|
|
|
self.client_config = client_config
|
|
|
self.max_n_images = max_n_images
|
|
|
self.fn_call_template = fn_call_template
|
|
|
self.model_call_timeout = model_call_timeout
|
|
|
self.max_rounds = max_rounds
|
|
|
self.max_url_chars = self.MAX_URL_LENGTH
|
|
|
if save_screenshots and self.downloads_folder is None:
|
|
|
assert False, "downloads_folder must be set if save_screenshots is True"
|
|
|
self.save_screenshots = save_screenshots
|
|
|
self._facts = []
|
|
|
self._task_summary = None
|
|
|
self._num_actions = 0
|
|
|
self.logger = logger or logging.getLogger(__name__)
|
|
|
self._mlm_width = 1440
|
|
|
self._mlm_height = 900
|
|
|
self.viewport_height = 900
|
|
|
self.viewport_width = 1440
|
|
|
self.include_input_text_key_args = True
|
|
|
|
|
|
def _download_handler(download: Download) -> None:
|
|
|
self._last_download = download
|
|
|
|
|
|
self._download_handler = _download_handler
|
|
|
self.did_initialize = False
|
|
|
|
|
|
|
|
|
self._openai_client: AsyncOpenAI | None = None
|
|
|
self._chat_history: List[LLMMessage] = []
|
|
|
|
|
|
async def initialize(self) -> None:
|
|
|
if self.did_initialize:
|
|
|
return
|
|
|
self._last_download = None
|
|
|
self._prior_metadata_hash = None
|
|
|
|
|
|
|
|
|
self._openai_client = AsyncOpenAI(
|
|
|
api_key=self.client_config.get("api_key"),
|
|
|
base_url=self.client_config.get("base_url"),
|
|
|
default_headers=self.client_config.get("default_headers"),
|
|
|
)
|
|
|
|
|
|
|
|
|
self.browser_manager.set_download_handler(self._download_handler)
|
|
|
|
|
|
|
|
|
await self.browser_manager.init(self.start_page)
|
|
|
self.did_initialize = True
|
|
|
|
|
|
@property
|
|
|
def _page(self) -> Page | None:
|
|
|
"""Get the current page from browser manager."""
|
|
|
return self.browser_manager.page if self.browser_manager else None
|
|
|
|
|
|
@_page.setter
|
|
|
def _page(self, value):
|
|
|
if self.browser_manager:
|
|
|
self.browser_manager.page = value
|
|
|
else:
|
|
|
raise ValueError("Browser manager is not initialized. Cannot set page.")
|
|
|
|
|
|
@property
|
|
|
def context(self) -> BrowserContext | None:
|
|
|
"""Get the browser context from browser manager."""
|
|
|
return self.browser_manager.context if self.browser_manager else None
|
|
|
|
|
|
@property
|
|
|
def _playwright_controller(self) -> PlaywrightController | None:
|
|
|
"""Get the playwright controller from browser manager."""
|
|
|
return (
|
|
|
self.browser_manager.playwright_controller if self.browser_manager else None
|
|
|
)
|
|
|
|
|
|
async def wait_for_captcha_with_timeout(
|
|
|
self, timeout_seconds=300
|
|
|
):
|
|
|
"""Wait for captcha to be solved with timeout"""
|
|
|
try:
|
|
|
await asyncio.wait_for(
|
|
|
self.browser_manager.wait_for_captcha_resolution(),
|
|
|
timeout=timeout_seconds,
|
|
|
)
|
|
|
return True
|
|
|
except asyncio.TimeoutError:
|
|
|
self.logger.warning(f"Captcha timeout after {timeout_seconds} seconds!")
|
|
|
|
|
|
self.browser_manager._captcha_event.set()
|
|
|
return False
|
|
|
|
|
|
@retry(
|
|
|
stop=stop_after_attempt(5),
|
|
|
wait=wait_exponential(multiplier=5.0, min=5.0, max=60),
|
|
|
before_sleep=before_sleep_log(logging.getLogger(__name__), logging.WARNING),
|
|
|
reraise=True,
|
|
|
)
|
|
|
async def _make_model_call(
|
|
|
self,
|
|
|
history: List[LLMMessage],
|
|
|
extra_create_args: Dict[str, Any] | None = None,
|
|
|
) -> ModelResponse:
|
|
|
"""Make a model call using OpenAI client"""
|
|
|
openai_messages = [message_to_openai_format(msg) for msg in history]
|
|
|
request_params = {
|
|
|
"model": self.client_config.get("model", "gpt-4o"),
|
|
|
"messages": openai_messages,
|
|
|
}
|
|
|
if extra_create_args:
|
|
|
request_params.update(extra_create_args)
|
|
|
|
|
|
response = await self._openai_client.chat.completions.create(**request_params)
|
|
|
content = response.choices[0].message.content
|
|
|
usage = {}
|
|
|
if response.usage:
|
|
|
usage = {
|
|
|
"prompt_tokens": response.usage.prompt_tokens,
|
|
|
"completion_tokens": response.usage.completion_tokens,
|
|
|
"total_tokens": response.usage.total_tokens,
|
|
|
}
|
|
|
return ModelResponse(content=content, usage=usage)
|
|
|
|
|
|
def remove_screenshot_from_message(self, msg: List[Dict[str, Any]] | Any) -> Any:
|
|
|
"""Remove the screenshot from the message content."""
|
|
|
if isinstance(msg.content, list):
|
|
|
new_content = []
|
|
|
for c in msg.content:
|
|
|
if not isinstance(c, ImageObj):
|
|
|
new_content.append(c)
|
|
|
msg.content = new_content
|
|
|
elif isinstance(msg.content, ImageObj):
|
|
|
msg = None
|
|
|
return msg
|
|
|
|
|
|
def maybe_remove_old_screenshots(
|
|
|
self, history: List[LLMMessage], includes_current: bool = False
|
|
|
) -> List[LLMMessage]:
|
|
|
"""Remove old screenshots from the chat history. Assuming we have not yet added the current screenshot message.
|
|
|
|
|
|
Note: Original user messages (marked with is_original=True) have their TEXT preserved,
|
|
|
but their images may be removed if we exceed max_n_images. Boilerplate messages can be
|
|
|
completely removed.
|
|
|
"""
|
|
|
if self.max_n_images <= 0:
|
|
|
return history
|
|
|
|
|
|
max_n_images = self.max_n_images if includes_current else self.max_n_images - 1
|
|
|
new_history: List[LLMMessage] = []
|
|
|
n_images = 0
|
|
|
for i in range(len(history) - 1, -1, -1):
|
|
|
msg = history[i]
|
|
|
|
|
|
is_original_user_message = isinstance(msg, UserMessage) and getattr(
|
|
|
msg, "is_original", False
|
|
|
)
|
|
|
|
|
|
if i == 0 and n_images >= max_n_images:
|
|
|
|
|
|
msg = self.remove_screenshot_from_message(msg)
|
|
|
if msg is None:
|
|
|
continue
|
|
|
|
|
|
if isinstance(msg.content, list):
|
|
|
|
|
|
has_image = False
|
|
|
for c in msg.content:
|
|
|
if isinstance(c, ImageObj):
|
|
|
has_image = True
|
|
|
break
|
|
|
if has_image:
|
|
|
if n_images < max_n_images:
|
|
|
new_history.append(msg)
|
|
|
elif is_original_user_message:
|
|
|
|
|
|
msg = self.remove_screenshot_from_message(msg)
|
|
|
if msg is not None:
|
|
|
new_history.append(msg)
|
|
|
n_images += 1
|
|
|
else:
|
|
|
new_history.append(msg)
|
|
|
elif isinstance(msg.content, ImageObj):
|
|
|
if n_images < max_n_images:
|
|
|
new_history.append(msg)
|
|
|
n_images += 1
|
|
|
else:
|
|
|
new_history.append(msg)
|
|
|
|
|
|
new_history = new_history[::-1]
|
|
|
|
|
|
return new_history
|
|
|
|
|
|
async def _get_scaled_screenshot(self) -> Image.Image:
|
|
|
"""Get current screenshot and scale it for the model."""
|
|
|
screenshot = await self._playwright_controller.get_screenshot(self._page)
|
|
|
screenshot = Image.open(io.BytesIO(screenshot))
|
|
|
_, scaled_screenshot = self._get_system_message(screenshot)
|
|
|
return scaled_screenshot
|
|
|
|
|
|
def _get_system_message(
|
|
|
self, screenshot: ImageObj | Image.Image
|
|
|
) -> Tuple[List[SystemMessage], Image.Image]:
|
|
|
system_prompt_info = get_computer_use_system_prompt(
|
|
|
screenshot,
|
|
|
self.MLM_PROCESSOR_IM_CFG,
|
|
|
include_input_text_key_args=self.include_input_text_key_args,
|
|
|
fn_call_template=self.fn_call_template,
|
|
|
)
|
|
|
self._mlm_width, self._mlm_height = system_prompt_info["im_size"]
|
|
|
scaled_screenshot = screenshot.resize((self._mlm_width, self._mlm_height))
|
|
|
|
|
|
system_message = []
|
|
|
for msg in system_prompt_info["conversation"]:
|
|
|
tmp_content = ""
|
|
|
for content in msg["content"]:
|
|
|
tmp_content += content["text"]
|
|
|
|
|
|
system_message.append(SystemMessage(content=tmp_content))
|
|
|
|
|
|
return system_message, scaled_screenshot
|
|
|
|
|
|
def _parse_thoughts_and_action(self, message: str) -> Tuple[str, Dict[str, Any]]:
|
|
|
try:
|
|
|
tmp = message.split("<tool_call>\n")
|
|
|
thoughts = tmp[0].strip()
|
|
|
action_text = tmp[1].split("\n</tool_call>")[0]
|
|
|
try:
|
|
|
action = json.loads(action_text)
|
|
|
except json.decoder.JSONDecodeError:
|
|
|
self.logger.error(f"Invalid action text: {action_text}")
|
|
|
action = ast.literal_eval(action_text)
|
|
|
|
|
|
return thoughts, action
|
|
|
except Exception as e:
|
|
|
self.logger.error(
|
|
|
f"Error parsing thoughts and action: {message}", exc_info=True
|
|
|
)
|
|
|
raise e
|
|
|
|
|
|
def convert_resized_coords_to_original(
|
|
|
self, coords: List[float], rsz_w: int, rsz_h: int, og_w: int, og_h: int
|
|
|
) -> List[float]:
|
|
|
scale_x = og_w / rsz_w
|
|
|
scale_y = og_h / rsz_h
|
|
|
return [coords[0] * scale_x, coords[1] * scale_y]
|
|
|
|
|
|
def proc_coords(
|
|
|
self,
|
|
|
coords: List[float] | None,
|
|
|
im_w: int,
|
|
|
im_h: int,
|
|
|
og_im_w: int | None = None,
|
|
|
og_im_h: int | None = None,
|
|
|
) -> List[float] | None:
|
|
|
if not coords:
|
|
|
return coords
|
|
|
|
|
|
if og_im_w is None:
|
|
|
og_im_w = im_w
|
|
|
if og_im_h is None:
|
|
|
og_im_h = im_h
|
|
|
|
|
|
tgt_x, tgt_y = coords
|
|
|
return self.convert_resized_coords_to_original(
|
|
|
[tgt_x, tgt_y], im_w, im_h, og_im_w, og_im_h
|
|
|
)
|
|
|
|
|
|
async def run(self, user_message: str) -> Tuple:
|
|
|
"""Run the agent with a user message."""
|
|
|
|
|
|
await self.initialize()
|
|
|
|
|
|
|
|
|
assert self._page is not None, "Page should be initialized"
|
|
|
|
|
|
|
|
|
scaled_screenshot = await self._get_scaled_screenshot()
|
|
|
|
|
|
if self.save_screenshots:
|
|
|
await self._playwright_controller.get_screenshot(
|
|
|
self._page,
|
|
|
path=os.path.join(
|
|
|
self.downloads_folder, f"screenshot{self._num_actions}.png"
|
|
|
),
|
|
|
)
|
|
|
|
|
|
self._chat_history.append(
|
|
|
UserMessage(
|
|
|
content=[ImageObj.from_pil(scaled_screenshot), user_message],
|
|
|
is_original=True,
|
|
|
)
|
|
|
)
|
|
|
|
|
|
all_actions = []
|
|
|
all_observations = []
|
|
|
final_answer = "<no_answer>"
|
|
|
is_stop_action = False
|
|
|
for i in range(self.max_rounds):
|
|
|
is_first_round = i == 0
|
|
|
if not self.browser_manager._captcha_event.is_set():
|
|
|
self.logger.info("Waiting 60s for captcha to finish...")
|
|
|
captcha_solved = await self.wait_for_captcha_with_timeout(60)
|
|
|
if (
|
|
|
not captcha_solved
|
|
|
and not self.browser_manager._captcha_event.is_set()
|
|
|
):
|
|
|
raise RuntimeError(
|
|
|
"Captcha timed out, unable to proceed with web surfing."
|
|
|
)
|
|
|
|
|
|
function_call, raw_response = await self.generate_model_call(
|
|
|
is_first_round, scaled_screenshot if is_first_round else None
|
|
|
)
|
|
|
assert isinstance(raw_response, str)
|
|
|
all_actions.append(raw_response)
|
|
|
thoughts, action_dict = self._parse_thoughts_and_action(raw_response)
|
|
|
action_args = action_dict.get("arguments", {})
|
|
|
action = action_args["action"]
|
|
|
self.logger.info(
|
|
|
f"\nThought #{i + 1}: {thoughts}\nAction #{i + 1}: executing tool '{action}' with arguments {json.dumps(action_args)}"
|
|
|
)
|
|
|
|
|
|
(
|
|
|
is_stop_action,
|
|
|
new_screenshot,
|
|
|
action_description,
|
|
|
) = await self.execute_action(function_call)
|
|
|
all_observations.append(action_description)
|
|
|
self.logger.info(f"Observation#{i + 1}: {action_description}")
|
|
|
if is_stop_action:
|
|
|
final_answer = thoughts
|
|
|
break
|
|
|
return final_answer, all_actions, all_observations
|
|
|
|
|
|
async def generate_model_call(
|
|
|
self, is_first_round: bool, first_screenshot: Image.Image | None = None
|
|
|
) -> Tuple[List[FunctionCall], str]:
|
|
|
history = self.maybe_remove_old_screenshots(self._chat_history)
|
|
|
|
|
|
screenshot_for_system = first_screenshot
|
|
|
if not is_first_round:
|
|
|
|
|
|
scaled_screenshot = await self._get_scaled_screenshot()
|
|
|
screenshot_for_system = scaled_screenshot
|
|
|
|
|
|
text_prompt = self.USER_MESSAGE
|
|
|
curr_url = await self._playwright_controller.get_page_url(self._page)
|
|
|
trimmed_url = get_trimmed_url(curr_url, max_len=self.max_url_chars)
|
|
|
text_prompt = f"Current URL: {trimmed_url}\n" + text_prompt
|
|
|
|
|
|
curr_message = UserMessage(
|
|
|
content=[ImageObj.from_pil(scaled_screenshot), text_prompt]
|
|
|
)
|
|
|
self._chat_history.append(curr_message)
|
|
|
history.append(curr_message)
|
|
|
|
|
|
|
|
|
system_message, _ = self._get_system_message(screenshot_for_system)
|
|
|
history = system_message + history
|
|
|
response = await self._make_model_call(
|
|
|
history, extra_create_args={"temperature": 0}
|
|
|
)
|
|
|
message = response.content
|
|
|
|
|
|
self._chat_history.append(AssistantMessage(content=message))
|
|
|
thoughts, action = self._parse_thoughts_and_action(message)
|
|
|
action["arguments"]["thoughts"] = thoughts
|
|
|
|
|
|
function_call = [FunctionCall(id="dummy", **action)]
|
|
|
return function_call, message
|
|
|
|
|
|
async def execute_action(
|
|
|
self,
|
|
|
function_call: List[FunctionCall],
|
|
|
) -> Tuple[bool, bytes, str]:
|
|
|
name = function_call[0].name
|
|
|
args = function_call[0].arguments
|
|
|
action_description = ""
|
|
|
assert self._page is not None
|
|
|
self.logger.debug(
|
|
|
WebSurferEvent(
|
|
|
source="FaraAgent",
|
|
|
url=await self._playwright_controller.get_page_url(self._page),
|
|
|
action=name,
|
|
|
arguments=args,
|
|
|
message=f"{name}( {json.dumps(args)} )",
|
|
|
)
|
|
|
)
|
|
|
if "coordinate" in args:
|
|
|
args["coordinate"] = self.proc_coords(
|
|
|
args["coordinate"],
|
|
|
self._mlm_width,
|
|
|
self._mlm_height,
|
|
|
self.viewport_width,
|
|
|
self.viewport_height,
|
|
|
)
|
|
|
|
|
|
is_stop_action = False
|
|
|
|
|
|
if args["action"] == "visit_url":
|
|
|
url = str(args["url"])
|
|
|
action_description = f"I typed '{url}' into the browser address bar."
|
|
|
|
|
|
if url.startswith(("https://", "http://", "file://", "about:")):
|
|
|
(
|
|
|
reset_prior_metadata,
|
|
|
reset_last_download,
|
|
|
) = await self._playwright_controller.visit_page(self._page, url)
|
|
|
|
|
|
elif " " in url:
|
|
|
(
|
|
|
reset_prior_metadata,
|
|
|
reset_last_download,
|
|
|
) = await self._playwright_controller.visit_page(
|
|
|
self._page,
|
|
|
f"https://www.bing.com/search?q={quote_plus(url)}&FORM=QBLH",
|
|
|
)
|
|
|
|
|
|
else:
|
|
|
(
|
|
|
reset_prior_metadata,
|
|
|
reset_last_download,
|
|
|
) = await self._playwright_controller.visit_page(
|
|
|
self._page, "https://" + url
|
|
|
)
|
|
|
if reset_last_download and self._last_download is not None:
|
|
|
self._last_download = None
|
|
|
if reset_prior_metadata and self._prior_metadata_hash is not None:
|
|
|
self._prior_metadata_hash = None
|
|
|
elif args["action"] == "history_back":
|
|
|
action_description = "I clicked the browser back button."
|
|
|
await self._playwright_controller.back(self._page)
|
|
|
elif args["action"] == "web_search":
|
|
|
query = args.get("query")
|
|
|
action_description = f"I typed '{query}' into the browser search bar."
|
|
|
encoded_query = quote_plus(query)
|
|
|
(
|
|
|
reset_prior_metadata,
|
|
|
reset_last_download,
|
|
|
) = await self._playwright_controller.visit_page(
|
|
|
self._page, f"https://www.bing.com/search?q={encoded_query}&FORM=QBLH"
|
|
|
)
|
|
|
if reset_last_download and self._last_download is not None:
|
|
|
self._last_download = None
|
|
|
if reset_prior_metadata and self._prior_metadata_hash is not None:
|
|
|
self._prior_metadata_hash = None
|
|
|
elif args["action"] == "scroll":
|
|
|
pixels = int(args.get("pixels", 0))
|
|
|
if pixels > 0:
|
|
|
action_description = "I scrolled up one page in the browser."
|
|
|
await self._playwright_controller.page_up(self._page)
|
|
|
elif pixels < 0:
|
|
|
action_description = "I scrolled down one page in the browser."
|
|
|
await self._playwright_controller.page_down(self._page)
|
|
|
elif args["action"] == "keypress" or args["action"] == "key":
|
|
|
keys = args.get("keys", [])
|
|
|
action_description = f"I pressed the following keys: {keys}"
|
|
|
await self._playwright_controller.keypress(self._page, keys)
|
|
|
elif args["action"] == "hover" or args["action"] == "mouse_move":
|
|
|
if "coordinate" in args:
|
|
|
tgt_x, tgt_y = args["coordinate"]
|
|
|
await self._playwright_controller.hover_coords(self._page, tgt_x, tgt_y)
|
|
|
|
|
|
elif args["action"] == "sleep" or args["action"] == "wait":
|
|
|
duration = args.get("duration", 3.0)
|
|
|
duration = args.get("time", duration)
|
|
|
action_description = (
|
|
|
"I am waiting a short period of time before taking further action."
|
|
|
)
|
|
|
await self._playwright_controller.sleep(self._page, duration)
|
|
|
elif args["action"] == "click" or args["action"] == "left_click":
|
|
|
if "coordinate" in args:
|
|
|
tgt_x, tgt_y = args["coordinate"]
|
|
|
action_description = f"I clicked at coordinates ({tgt_x}, {tgt_y})."
|
|
|
new_page_tentative = await self._playwright_controller.click_coords(
|
|
|
self._page, tgt_x, tgt_y
|
|
|
)
|
|
|
|
|
|
if new_page_tentative is not None:
|
|
|
self._page = new_page_tentative
|
|
|
self._prior_metadata_hash = None
|
|
|
|
|
|
elif args["action"] == "input_text" or args["action"] == "type":
|
|
|
text_value = str(args.get("text", args.get("text_value")))
|
|
|
action_description = f"I typed '{text_value}'."
|
|
|
press_enter = args.get("press_enter", True)
|
|
|
delete_existing_text = args.get("delete_existing_text", False)
|
|
|
|
|
|
if "coordinate" in args:
|
|
|
tgt_x, tgt_y = args["coordinate"]
|
|
|
new_page_tentative = await self._playwright_controller.fill_coords(
|
|
|
self._page,
|
|
|
tgt_x,
|
|
|
tgt_y,
|
|
|
text_value,
|
|
|
press_enter=press_enter,
|
|
|
delete_existing_text=delete_existing_text,
|
|
|
)
|
|
|
if new_page_tentative is not None:
|
|
|
self._page = new_page_tentative
|
|
|
self._prior_metadata_hash = None
|
|
|
|
|
|
elif args["action"] == "pause_and_memorize_fact":
|
|
|
fact = str(args.get("fact"))
|
|
|
self._facts.append(fact)
|
|
|
action_description = f"I memorized the following fact: {fact}"
|
|
|
elif args["action"] == "stop" or args["action"] == "terminate":
|
|
|
action_description = args.get("thoughts")
|
|
|
is_stop_action = True
|
|
|
|
|
|
else:
|
|
|
raise ValueError(f"Unknown tool: {args['action']}")
|
|
|
|
|
|
await self._playwright_controller.wait_for_load_state(self._page)
|
|
|
await self._playwright_controller.sleep(self._page, 3)
|
|
|
|
|
|
|
|
|
self._num_actions += 1
|
|
|
if self.save_screenshots:
|
|
|
new_screenshot = await self._playwright_controller.get_screenshot(
|
|
|
self._page,
|
|
|
path=os.path.join(
|
|
|
self.downloads_folder, f"screenshot{self._num_actions}.png"
|
|
|
),
|
|
|
)
|
|
|
else:
|
|
|
new_screenshot = await self._playwright_controller.get_screenshot(
|
|
|
self._page
|
|
|
)
|
|
|
return is_stop_action, new_screenshot, action_description
|
|
|
|
|
|
async def close(self) -> None:
|
|
|
"""
|
|
|
Close the browser and the page.
|
|
|
Should be called when the agent is no longer needed.
|
|
|
"""
|
|
|
if self._page is not None:
|
|
|
self._page = None
|
|
|
await self.browser_manager.close()
|
|
|
|