-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwebagent.py
189 lines (152 loc) · 7.43 KB
/
webagent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
import os
from singleton_decorator import singleton
from lm_mock import LM_Mock
from vlm_mock import VLM_Mock
from factory import Factory
import ast
from typing import Any, Tuple
from PIL import Image
import mylog
import time
import srt2txt
from emailsender import EMailSender
log = mylog.getLogger(__name__)
class WebAgent:
"""AI Agent driving tasks"""
factory = Factory()
lm = factory.provide_lm()
vlm = factory.provide_vlm()
webpage = factory.provide_browser()
debug = False
ocr = factory.provide_ocr()
email_sender = EMailSender()
target_audience = ""
def start(self, task_prompt: str, user_profile: dict):
self.target_audience = user_profile.get("profile", "(unknown)")
self.user_email = user_profile.get("email")
# determine next step
next_step = self.lm.start_agent(task_prompt)
tool_name, tool_args = self._parse_tool_call(next_step)
if tool_name == "get_bundestag_transcript" and tool_args:
self.get_bundestag_transcript(tool_args[0])
elif tool_name == "report_error_to_user":
error_msg = "Unknown error" if not tool_args else tool_args[0]
self.report_error(error_msg)
def get_bundestag_transcript(self, url: str):
# load web page
log.debug(f"Opening website {url}")
self.webpage.open(url)
time.sleep(5) # FIXME
cur_screenshot = self.webpage.screenshot()
if self.debug:
cur_screenshot.save("/tmp/screenshot.png")
# get web browser screenshot description
desc = self.vlm.desc_en(cur_screenshot)
log.debug(f"Screenshot described as '{desc}', determining next step")
next_step = self.lm.get_bundestag_transcript(url, desc)
tool_name, tool_args = self._parse_tool_call(next_step)
log.debug(f"Tool to use: '{tool_name}', args: {tool_args}")
if tool_name == "find_options_button":
vlm_coords = self.vlm.scan_for_button(cur_screenshot, "options")
if vlm_coords:
log.info(f"Found options button! {vlm_coords}")
elem = vlm_coords[0]
self._browser_click(elem)
self.get_dl_btn()
else:
self.report_error("Options/share button not found")
return
elif tool_name == "report_error_to_user":
error_msg = "Unknown error" if not tool_args else tool_args[0]
self.report_error(error_msg)
else:
self.report_error(f"LM requested unknown tool {tool_name} at get_bundestag_transcript()")
def get_dl_btn(self):
cur_screenshot = self.webpage.screenshot()
if self.debug:
cur_screenshot.save("/tmp/after_click.png")
desc = self.vlm.desc_en(cur_screenshot)
log.debug(f"Screenshot described as '{desc}', determining next step")
next_step = self.lm.get_dl_btn(desc)
tool_name, tool_args = self._parse_tool_call(next_step)
log.debug(f"Tool to use: '{tool_name}', args: {tool_args}")
if tool_name == "find_download_button":
vlm_coords = self.ocr.scan_for_text(cur_screenshot, "Herunterladen")
if vlm_coords:
log.info(f"Found downloads button! {vlm_coords}")
elem = vlm_coords[0]
self._browser_click(elem)
self.get_subtitles_btn()
elif tool_name == "report_error_to_user":
error_msg = "Unknown error" if not tool_args else tool_args[0]
self.report_error(error_msg)
else:
self.report_error(f"LM requested unknown tool {tool_name} at get_dl_btn()")
def get_subtitles_btn(self):
cur_screenshot = self.webpage.screenshot()
if self.debug:
cur_screenshot.save("/tmp/after_click.png")
desc = self.vlm.desc_en(cur_screenshot)
log.debug(f"Screenshot described as '{desc}', determining next step")
next_step = self.lm.get_subtitles_btn(desc)
tool_name, tool_args = self._parse_tool_call(next_step)
log.debug(f"Tool to use: '{tool_name}', args: {tool_args}")
if tool_name == "find_subtitles_button":
srt_coords = self.ocr.scan_for_text(cur_screenshot, "Untertitel")
if srt_coords:
log.info(f"Found downloads button! {srt_coords}")
elem = srt_coords[0]
self._browser_click(elem)
self.get_confirm_btn()
elif tool_name == "report_error_to_user":
error_msg = "Unknown error" if not tool_args else tool_args[0]
self.report_error(error_msg)
else:
self.report_error(f"LM requested unknown tool {tool_name} at get_subtitles_btn()")
def get_confirm_btn(self):
cur_screenshot = self.webpage.screenshot()
if self.debug:
cur_screenshot.save("/tmp/after_click.png")
desc = self.vlm.desc_en(cur_screenshot)
log.debug(f"Screenshot described as '{desc}', determining next step")
next_step = self.lm.get_confirm_btn(desc)
tool_name, tool_args = self._parse_tool_call(next_step)
log.debug(f"Tool to use: '{tool_name}', args: {tool_args}")
if tool_name == "find_download_button":
confirm_coords = self.ocr.scan_for_text(cur_screenshot, "Ja und herunterladen")
if confirm_coords:
log.info(f"Found confirm button! {confirm_coords}")
elem = confirm_coords[0]
self._browser_click(elem)
self.summarize()
elif tool_name == "report_error_to_user":
error_msg = "Unknown error" if not tool_args else tool_args[0]
self.report_error(error_msg)
else:
self.report_error(f"LM requested unknown tool {tool_name} at get_confirm_btn()")
def summarize(self):
dl_fn, payload = self.webpage.get_latest_download()
if dl_fn.endswith(".srt"):
payload = srt2txt.process(payload)
resp = self.lm.summarize_for_audience(payload, self.target_audience)
print(f"summarization result: {resp}")
if self.user_email and os.getenv("AWS_ACCESS_KEY_ID") and os.getenv("AWS_SECRET_ACCESS_KEY"):
self.email_sender.send_email(self.user_email, "Meeting summary", resp)
else:
log.warning("not sending E-Mail because no address or no AWS access is configured")
def report_error(self, error_msg: str):
log.error(f"Cannot operate webpage: {error_msg}")
if self.user_email and os.getenv("AWS_ACCESS_KEY_ID") and os.getenv("AWS_SECRET_ACCESS_KEY"):
self.email_sender.send_email(self.user_email, "Error", "An error has occured. Please check the logs.")
def _parse_tool_call(self, call_str: str) -> Tuple[Any, Any]:
stree = ast.parse(call_str.strip())
for node in ast.walk(stree):
if isinstance(node, ast.Call):
tool_name = node.func.id
tool_args = [ast.literal_eval(arg) for arg in node.args]
return (tool_name, tool_args)
return (None, None)
def _browser_click(self, elem: Tuple[int, int, int, int]):
coords = (((elem[0] + ((elem[2] - elem[0]) / 2.0)) / 2.0),
((elem[1] + ((elem[3] - elem[1]) / 2.0)) / 2.0))
self.webpage.click(coords)