Skip to content

Commit ae2d946

Browse files
committed
Adding Expertimental Notebook 11.5 with LangGraph Supervisor agent
1 parent 5f32bf8 commit ae2d946

10 files changed

+1847
-535
lines changed

06-First-RAG.ipynb

+64-61
Large diffs are not rendered by default.

09-BingChatClone.ipynb

+70-78
Large diffs are not rendered by default.

11-Smart_Agent.ipynb

+310-200
Large diffs are not rendered by default.

11.5-Smart_Agent-LangGraph.ipynb

+866
Large diffs are not rendered by default.

14-LangServe-API.ipynb

+253-108
Large diffs are not rendered by default.

apps/backend/langserve/app/server.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ def get_session_history(session_id: str, user_id: str) -> CosmosDBChatMessageHis
169169

170170
chatgpt_search = ChatGPTTool(llm=llm,
171171
name="chatgpt",
172-
description="use for general questions, profile, greeting-like questions and when the questions includes the term: chatgpt",
172+
description="useful when the questions includes the term: chatgpt",
173173
verbose=False)
174174

175175
tools = [doc_search, book_search, www_search, sql_search, chatgpt_search]

common/prompts.py

+62-67
Large diffs are not rendered by default.

common/sql_checkpointer.py

+194
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
from langchain.pydantic_v1 import BaseModel, Field
2+
from sqlalchemy import create_engine, Column, Integer, String, LargeBinary, Table, MetaData, PrimaryKeyConstraint, select
3+
from sqlalchemy.orm import sessionmaker, Session, scoped_session
4+
from sqlalchemy.exc import SQLAlchemyError
5+
from sqlalchemy.engine import URL
6+
from sqlalchemy import Engine
7+
from typing import Iterator, Optional, Any
8+
from types import TracebackType
9+
import pickle
10+
from contextlib import AbstractContextManager, contextmanager
11+
from langchain_core.runnables import RunnableConfig
12+
from typing_extensions import Self
13+
14+
from langgraph.checkpoint.base import (
15+
BaseCheckpointSaver,
16+
Checkpoint,
17+
CheckpointAt,
18+
CheckpointTuple,
19+
Serializable,
20+
)
21+
22+
23+
metadata = MetaData()
24+
25+
# Adjusting the column type from String (which defaults to VARCHAR(max)) to a specific length
26+
checkpoints_table = Table(
27+
'checkpoints', metadata,
28+
Column('thread_id', String(255), primary_key=True), # String(255) specifies the max length
29+
Column('thread_ts', String(255), primary_key=True),
30+
Column('parent_ts', String(255)), # Optional: Specify length here if it's a commonly used field
31+
Column('checkpoint', LargeBinary), # VARBINARY(max) is fine for non-indexed columns
32+
PrimaryKeyConstraint('thread_id', 'thread_ts')
33+
)
34+
35+
class BaseCheckpointSaver(BaseModel):
36+
37+
engine: Optional[Engine] = None
38+
Session: Optional[scoped_session] = None
39+
is_setup: bool = Field(default=False)
40+
session: Any = Field(default=None)
41+
42+
class Config:
43+
arbitrary_types_allowed = True
44+
45+
class SQLAlchemyCheckpointSaver(BaseCheckpointSaver, AbstractContextManager):
46+
47+
def __init__(self, engine: Engine, *, serde: Optional[Serializable] = None, at: Optional[CheckpointAt] = None):
48+
# Call super with all expected fields by Pydantic
49+
super().__init__(serde=serde or pickle, at=at or CheckpointAt.END_OF_STEP, is_setup=False)
50+
self.engine = engine
51+
self.Session = scoped_session(sessionmaker(bind=self.engine))
52+
53+
54+
@classmethod
55+
def from_db_config(cls, db_config):
56+
db_url = URL.create(
57+
drivername=db_config['drivername'],
58+
username=db_config['username'],
59+
password=db_config['password'],
60+
host=db_config['host'],
61+
port=db_config['port'],
62+
database=db_config['database'],
63+
query=db_config['query']
64+
)
65+
engine = create_engine(db_url)
66+
return cls(engine)
67+
68+
def __enter__(self):
69+
self.session = self.Session()
70+
return self
71+
72+
def __exit__(self, exc_type, exc_value, traceback):
73+
try:
74+
if exc_type:
75+
self.session.rollback()
76+
else:
77+
self.session.commit()
78+
finally:
79+
self.Session.remove()
80+
self.session.close()
81+
82+
83+
def setup(self):
84+
if not self.is_setup:
85+
# Create all tables if they don't exist
86+
metadata.create_all(self.engine)
87+
self.is_setup = True
88+
89+
def get(self, config: RunnableConfig) -> Optional[Checkpoint]:
90+
if value := self.get_tuple(config):
91+
return value['checkpoint']
92+
93+
def get_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
94+
print("SQLAlchemyCheckpointSaver.get_tuple properly called")
95+
with self.Session() as session:
96+
thread_id = config["configurable"].get("thread_id")
97+
thread_ts = config["configurable"].get("thread_ts")
98+
99+
query = select(checkpoints_table)
100+
if thread_ts:
101+
query = query.where(
102+
(checkpoints_table.c.thread_id == thread_id) &
103+
(checkpoints_table.c.thread_ts == thread_ts)
104+
)
105+
else:
106+
query = query.where(
107+
checkpoints_table.c.thread_id == thread_id
108+
).order_by(checkpoints_table.c.thread_ts.desc()).limit(1)
109+
110+
result = session.execute(query).fetchone()
111+
if result:
112+
# Handling both potential types of result objects
113+
if isinstance(result, tuple):
114+
# Convert tuple to dictionary using column keys if result is a tuple
115+
result = dict(zip(result.keys(), result))
116+
elif hasattr(result, '_mapping'):
117+
# Convert SQLAlchemy RowProxy to dictionary directly if available
118+
result = dict(result._mapping)
119+
120+
return {
121+
'config': config,
122+
'checkpoint': pickle.loads(result['checkpoint']),
123+
'additional_info': {
124+
"thread_id": result['thread_id'],
125+
"thread_ts": result['parent_ts'] if result['parent_ts'] else None
126+
}
127+
}
128+
return None
129+
130+
131+
132+
def list(self, config: RunnableConfig):
133+
with self.Session() as session:
134+
query = select(checkpoints_table).where(
135+
checkpoints_table.c.thread_id == config["configurable"]["thread_id"]
136+
).order_by(checkpoints_table.c.thread_ts.desc())
137+
results = session.execute(query).fetchall()
138+
139+
return [
140+
{
141+
"configurable": {
142+
"thread_id": result['thread_id'],
143+
"thread_ts": result['thread_ts']
144+
},
145+
"checkpoint": pickle.loads(result['checkpoint']),
146+
"additional_info": {
147+
"thread_id": result['thread_id'],
148+
"thread_ts": result['parent_ts'] or None
149+
}
150+
}
151+
for result in results
152+
]
153+
154+
155+
def put(self, config: RunnableConfig, checkpoint: Checkpoint):
156+
print("Attempting to connect with engine:", self.engine.url) # Check the engine URL
157+
with self.Session() as session:
158+
print("Session started for put operation.")
159+
try:
160+
session.execute(
161+
checkpoints_table.insert().values(
162+
thread_id=config["configurable"]["thread_id"],
163+
thread_ts=checkpoint["ts"],
164+
parent_ts=config["configurable"].get("thread_ts"),
165+
checkpoint=pickle.dumps(checkpoint)
166+
)
167+
)
168+
session.commit()
169+
print("Data inserted and committed successfully.")
170+
except Exception as e:
171+
print("Error during database operation:", e)
172+
session.rollback()
173+
raise
174+
finally:
175+
print("Session closed after put operation.")
176+
return {
177+
"configurable": {
178+
"thread_id": config["configurable"]["thread_id"],
179+
"thread_ts": checkpoint["ts"]
180+
}
181+
}
182+
183+
async def aget(self, config: RunnableConfig) -> Optional[Checkpoint]:
184+
return await asyncio.get_running_loop().run_in_executor(None, self.get, config)
185+
186+
async def aget_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
187+
return await asyncio.get_running_loop().run_in_executor(None, self.get_tuple, config)
188+
189+
async def alist(self, config: RunnableConfig) -> Iterator[CheckpointTuple]:
190+
return await asyncio.get_running_loop().run_in_executor(None, self.list, config)
191+
192+
async def aput(self, config: RunnableConfig, checkpoint: Checkpoint) -> RunnableConfig:
193+
return await asyncio.get_running_loop().run_in_executor(None, self.put, config, checkpoint)
194+

common/utils.py

+25-19
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,10 @@ def get_answer(llm: AzureChatOpenAI,
363363

364364
class SearchInput(BaseModel):
365365
query: str = Field(description="should be a search query")
366+
return_direct: bool = Field(
367+
description="Whether or the result of this should be returned directly to the user without you seeing what it is",
368+
default=False,
369+
)
366370

367371
class GetDocSearchResults_Tool(BaseTool):
368372
name = "docsearch"
@@ -375,7 +379,7 @@ class GetDocSearchResults_Tool(BaseTool):
375379
sas_token: str = ""
376380

377381
def _run(
378-
self, query: str, run_manager: Optional[CallbackManagerForToolRun] = None
382+
self, query: str, return_direct = False, run_manager: Optional[CallbackManagerForToolRun] = None
379383
) -> str:
380384

381385
retriever = CustomAzureSearchRetriever(indexes=self.indexes, topK=self.k, reranker_threshold=self.reranker_th,
@@ -385,7 +389,7 @@ def _run(
385389
return results
386390

387391
async def _arun(
388-
self, query: str, run_manager: Optional[AsyncCallbackManagerForToolRun] = None
392+
self, query: str, return_direct = False, run_manager: Optional[AsyncCallbackManagerForToolRun] = None
389393
) -> str:
390394
"""Use the tool asynchronously."""
391395

@@ -424,15 +428,15 @@ def __init__(self, **data):
424428
self.agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=self.verbose, callback_manager=self.callbacks, handle_parsing_errors=True)
425429

426430

427-
def _run(self, query: str, run_manager: Optional[CallbackManagerForToolRun] = None) -> str:
431+
def _run(self, query: str, return_direct = False, run_manager: Optional[CallbackManagerForToolRun] = None) -> str:
428432
try:
429433
result = self.agent_executor.invoke({"question": query})
430434
return result['output']
431435
except Exception as e:
432436
print(e)
433437
return str(e) # Return an empty string or some error indicator
434438

435-
async def _arun(self, query: str, run_manager: Optional[AsyncCallbackManagerForToolRun] = None) -> str:
439+
async def _arun(self, query: str, return_direct = False, run_manager: Optional[AsyncCallbackManagerForToolRun] = None) -> str:
436440
try:
437441
result = await self.agent_executor.ainvoke({"question": query})
438442
return result['output']
@@ -465,7 +469,7 @@ def __init__(self, **data):
465469
callback_manager=self.callbacks,
466470
)
467471

468-
def _run(self, query: str, run_manager: Optional[CallbackManagerForToolRun] = None) -> str:
472+
def _run(self, query: str, return_direct = False, run_manager: Optional[CallbackManagerForToolRun] = None) -> str:
469473
try:
470474
# Use the initialized agent_executor to invoke the query
471475
result = self.agent_executor.invoke(query)
@@ -474,7 +478,7 @@ def _run(self, query: str, run_manager: Optional[CallbackManagerForToolRun] = No
474478
print(e)
475479
return str(e) # Return an error indicator
476480

477-
async def _arun(self, query: str, run_manager: Optional[AsyncCallbackManagerForToolRun] = None) -> str:
481+
async def _arun(self, query: str, return_direct = False, run_manager: Optional[AsyncCallbackManagerForToolRun] = None) -> str:
478482
# Note: Implementation assumes the agent_executor and its methods support async operations
479483
try:
480484
# Use the initialized agent_executor to asynchronously invoke the query
@@ -528,7 +532,7 @@ def get_db_config(self):
528532
'query': {'driver': 'ODBC Driver 17 for SQL Server'}
529533
}
530534

531-
def _run(self, query: str, run_manager: Optional[CallbackManagerForToolRun] = None) -> str:
535+
def _run(self, query: str, return_direct = False, run_manager: Optional[CallbackManagerForToolRun] = None) -> str:
532536
try:
533537
# Use the initialized agent_executor to invoke the query
534538
result = self.agent_executor.invoke(query)
@@ -537,7 +541,7 @@ def _run(self, query: str, run_manager: Optional[CallbackManagerForToolRun] = No
537541
print(e)
538542
return str(e) # Return an error indicator
539543

540-
async def _arun(self, query: str, run_manager: Optional[AsyncCallbackManagerForToolRun] = None) -> str:
544+
async def _arun(self, query: str, return_direct = False, run_manager: Optional[AsyncCallbackManagerForToolRun] = None) -> str:
541545
# Note: Implementation assumes the agent_executor and its methods support async operations
542546
try:
543547
# Use the initialized agent_executor to asynchronously invoke the query
@@ -567,15 +571,15 @@ def __init__(self, **data):
567571
output_parser = StrOutputParser()
568572
self.chatgpt_chain = CHATGPT_PROMPT | self.llm | output_parser
569573

570-
def _run(self, query: str) -> str:
574+
def _run(self, query: str, return_direct = False, run_manager: Optional[CallbackManagerForToolRun] = None) -> str:
571575
try:
572576
response = self.chatgpt_chain.invoke({"question": query})
573577
return response
574578
except Exception as e:
575579
print(e)
576580
return str(e) # Return an error indicator
577581

578-
async def _arun(self, query: str, run_manager: Optional[AsyncCallbackManagerForToolRun] = None) -> str:
582+
async def _arun(self, query: str, return_direct = False, run_manager: Optional[AsyncCallbackManagerForToolRun] = None) -> str:
579583
"""Implement the tool to be used asynchronously."""
580584
try:
581585
response = await self.chatgpt_chain.ainvoke({"question": query})
@@ -595,14 +599,14 @@ class GetBingSearchResults_Tool(BaseTool):
595599

596600
k: int = 5
597601

598-
def _run(self, query: str, run_manager: Optional[CallbackManagerForToolRun] = None) -> str:
602+
def _run(self, query: str, return_direct = False, run_manager: Optional[CallbackManagerForToolRun] = None) -> str:
599603
bing = BingSearchAPIWrapper(k=self.k)
600604
try:
601605
return bing.results(query,num_results=self.k)
602606
except:
603607
return "No Results Found"
604608

605-
async def _arun(self, query: str, run_manager: Optional[AsyncCallbackManagerForToolRun] = None) -> str:
609+
async def _arun(self, query: str, return_direct = False, run_manager: Optional[AsyncCallbackManagerForToolRun] = None) -> str:
606610
bing = BingSearchAPIWrapper(k=self.k)
607611
loop = asyncio.get_event_loop()
608612
try:
@@ -635,7 +639,9 @@ def __init__(self, **data):
635639
description="useful to fetch the content of a url"
636640
)
637641

638-
tools = [GetBingSearchResults_Tool(k=self.k), web_fetch_tool]
642+
tools = [GetBingSearchResults_Tool(k=self.k)]
643+
# tools = [GetBingSearchResults_Tool(k=self.k), web_fetch_tool] # Uncomment if using GPT-4
644+
639645
agent = create_openai_tools_agent(self.llm, tools, BINGSEARCH_PROMPT)
640646

641647
self.agent_executor = AgentExecutor(agent=agent, tools=tools,
@@ -656,15 +662,15 @@ def fetch_web_page(self, url: str) -> str:
656662
response = requests.get(url, headers=HEADERS)
657663
return self.parse_html(response.content)
658664

659-
def _run(self, query: str, run_manager: Optional[CallbackManagerForToolRun] = None) -> str:
665+
def _run(self, query: str, return_direct = False, run_manager: Optional[CallbackManagerForToolRun] = None) -> str:
660666
try:
661667
response = self.agent_executor.invoke({"question": query})
662668
return response['output']
663669
except Exception as e:
664670
print(e)
665671
return str(e) # Return an error indicator
666672

667-
async def _arun(self, query: str, run_manager: Optional[AsyncCallbackManagerForToolRun] = None) -> str:
673+
async def _arun(self, query: str, return_direct = False, run_manager: Optional[AsyncCallbackManagerForToolRun] = None) -> str:
668674
"""Implements the tool to be used asynchronously."""
669675
try:
670676
response = await self.agent_executor.ainvoke({"question": query})
@@ -701,7 +707,7 @@ def __init__(self, **data):
701707
limit_to_domains=self.limit_to_domains
702708
)
703709

704-
def _run(self, query: str, run_manager: Optional[CallbackManagerForToolRun] = None) -> str:
710+
def _run(self, query: str, return_direct = False, run_manager: Optional[CallbackManagerForToolRun] = None) -> str:
705711
try:
706712
# Optionally sleep to avoid possible TPM rate limits
707713
sleep(2)
@@ -711,7 +717,7 @@ def _run(self, query: str, run_manager: Optional[CallbackManagerForToolRun] = No
711717

712718
return response
713719

714-
async def _arun(self, query: str, run_manager: Optional[AsyncCallbackManagerForToolRun] = None) -> str:
720+
async def _arun(self, query: str, return_direct = False, run_manager: Optional[AsyncCallbackManagerForToolRun] = None) -> str:
715721
"""Use the tool asynchronously."""
716722
loop = asyncio.get_event_loop()
717723
try:
@@ -757,7 +763,7 @@ def __init__(self, **data):
757763
return_intermediate_steps=True,
758764
callback_manager=self.callbacks)
759765

760-
def _run(self, query: str, run_manager: Optional[CallbackManagerForToolRun] = None) -> str:
766+
def _run(self, query: str, return_direct = False, run_manager: Optional[CallbackManagerForToolRun] = None) -> str:
761767
try:
762768
# Use the initialized agent_executor to invoke the query
763769
response = self.agent_executor.invoke({"question":query})
@@ -766,7 +772,7 @@ def _run(self, query: str, run_manager: Optional[CallbackManagerForToolRun] = No
766772
print(e)
767773
return str(e) # Return an error indicator
768774

769-
async def _arun(self, query: str, run_manager: Optional[AsyncCallbackManagerForToolRun] = None) -> str:
775+
async def _arun(self, query: str, return_direct = False, run_manager: Optional[AsyncCallbackManagerForToolRun] = None) -> str:
770776
# Note: Implementation assumes the agent_executor and its methods support async operations
771777
try:
772778
# Use the initialized agent_executor to asynchronously invoke the query

credentials.env

+2-1
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,5 @@ AZURE_COSMOSDB_NAME="ENTER YOUR VALUE"
3131
AZURE_COSMOSDB_CONTAINER_NAME="ENTER YOUR VALUE"
3232
AZURE_COMOSDB_CONNECTION_STRING="ENTER YOUR VALUE" # Find this in the Keys section
3333
BOT_ID="ENTER YOUR VALUE" # This is the name of your bot service created in Notebook 12
34-
BOT_SERVICE_DIRECT_LINE_SECRET="ENTER YOUR VALUE" # Find this in Azure Bot Service -> Channels -> Direct Line
34+
BOT_SERVICE_DIRECT_LINE_SECRET="ENTER YOUR VALUE" # Find this in Azure Bot Service -> Channels -> Direct Line
35+

0 commit comments

Comments
 (0)