Untitled
unknown
python
6 months ago
10 kB
13
Indexable
"""
UiPath LangChain Agent with Tavily Integration for Financial Data Research
This agent:
1. Uses Tavily to research financial data of a given company
2. Stores the retrieved data in UiPath Storage Bucket "vibecoded-agent" inside "Shared" folder
3. Retrieves existing data from the Context Grounding Index "companies" in "Shared" folder
4. Returns comprehensive financial analysis
"""
from dotenv import load_dotenv
from pydantic import BaseModel, Field
from typing import Optional, List, Dict, Any
from datetime import datetime
import json
from langgraph.graph import StateGraph, START, END
from langchain_tavily import TavilySearch
from uipath_langchain.chat import UiPathChat
from uipath_langchain.vectorstores import ContextGroundingVectorStore
from uipath import UiPath
# Load environment variables
load_dotenv()
# Configuration
BUCKET_NAME = "vibecoded-agent"
FOLDER_PATH = "Shared"
INDEX_NAME = "companies"
# Lazy initialization for SDK and LLM (initialized on first use)
_uipath_sdk = None
_llm = None
def get_uipath_sdk():
"""Get or initialize UiPath SDK instance"""
global _uipath_sdk
if _uipath_sdk is None:
_uipath_sdk = UiPath()
return _uipath_sdk
def get_llm():
"""Get or initialize LLM instance"""
global _llm
if _llm is None:
_llm = UiPathChat(model="gpt-4o-mini-2024-07-18", temperature=0.3)
return _llm
class GraphInput(BaseModel):
"""Input model for the financial research agent"""
company_name: str = Field(description="Name of the company to research")
class GraphOutput(BaseModel):
"""Output model for the financial research agent"""
company_name: str = Field(description="Name of the company researched")
research_summary: str = Field(description="Summary of financial research findings")
tavily_results: List[Dict[str, Any]] = Field(description="Raw research results from Tavily")
historical_data: Optional[str] = Field(description="Historical data retrieved from index", default=None)
storage_location: str = Field(description="Location where data was stored in bucket")
timestamp: str = Field(description="Timestamp of the research")
status: str = Field(description="Status of the operation")
class AgentState(BaseModel):
"""Internal state for the agent graph"""
company_name: str
research_summary: str = ""
tavily_results: List[Dict[str, Any]] = []
historical_data: Optional[str] = None
storage_location: str = ""
timestamp: str = ""
status: str = "initialized"
async def research_company(state: AgentState) -> AgentState:
"""
Node 1: Research company financial data using Tavily
"""
print(f"[Research] Starting Tavily research for: {state.company_name}")
# Initialize Tavily search tool
tavily_tool = TavilySearch(
max_results=5,
search_depth="advanced",
include_answer=True,
include_raw_content=False
)
# Construct comprehensive search query
search_query = f"{state.company_name} financial data revenue earnings stock performance quarterly results"
try:
# Execute Tavily search
results = tavily_tool.invoke({"query": search_query})
# Store results
state.tavily_results = results if isinstance(results, list) else [results]
state.timestamp = datetime.now().isoformat()
state.status = "research_completed"
print(f"[Research] Found {len(state.tavily_results)} results from Tavily")
except Exception as e:
print(f"[Research] Error during Tavily search: {e}")
state.tavily_results = []
state.status = "research_failed"
return state
async def retrieve_historical_data(state: AgentState) -> AgentState:
"""
Node 2: Retrieve historical data from Context Grounding Index
"""
print(f"[Retrieval] Searching index '{INDEX_NAME}' for historical data on: {state.company_name}")
try:
# Initialize Context Grounding Vector Store
vector_store = ContextGroundingVectorStore(
index_name=INDEX_NAME,
folder_path=FOLDER_PATH,
uipath_sdk=get_uipath_sdk()
)
# Search for relevant historical documents
query = f"financial data for {state.company_name}"
docs = vector_store.similarity_search(query=query, k=3)
if docs:
# Combine retrieved documents
historical_content = "\n\n".join([
f"Source: {doc.metadata.get('source', 'Unknown')}\n{doc.page_content}"
for doc in docs
])
state.historical_data = historical_content
print(f"[Retrieval] Found {len(docs)} historical documents")
else:
state.historical_data = "No historical data found in index."
print("[Retrieval] No historical documents found")
except Exception as e:
print(f"[Retrieval] Error retrieving from index: {e}")
state.historical_data = f"Error retrieving historical data: {str(e)}"
return state
async def analyze_and_summarize(state: AgentState) -> AgentState:
"""
Node 3: Use LLM to analyze and summarize all collected data
"""
print("[Analysis] Generating comprehensive financial summary using LLM")
# Prepare context from Tavily results
tavily_context = "\n\n".join([
f"Source {i+1}: {result.get('url', 'N/A')}\n{result.get('content', '')}"
for i, result in enumerate(state.tavily_results)
])
# Create comprehensive prompt
prompt = f"""You are a financial analyst. Analyze the following information about {state.company_name} and provide a comprehensive financial summary.
RECENT RESEARCH (from Tavily):
{tavily_context}
HISTORICAL DATA (from company index):
{state.historical_data or 'No historical data available'}
Please provide:
1. A concise executive summary
2. Key financial metrics and trends
3. Recent developments
4. Comparison with historical data (if available)
Keep the analysis professional, factual, and well-structured."""
try:
# Generate summary using LLM
response = await get_llm().ainvoke(prompt)
state.research_summary = response.content
print("[Analysis] Summary generated successfully")
except Exception as e:
print(f"[Analysis] Error generating summary: {e}")
state.research_summary = f"Error generating summary: {str(e)}"
return state
async def store_in_bucket(state: AgentState) -> GraphOutput:
"""
Node 4: Store the research results in UiPath Storage Bucket
"""
print(f"[Storage] Storing research data in bucket '{BUCKET_NAME}'")
# Prepare data to store
research_data = {
"company_name": state.company_name,
"timestamp": state.timestamp,
"research_summary": state.research_summary,
"tavily_results": state.tavily_results,
"historical_data": state.historical_data
}
# Create filename with timestamp
timestamp_safe = state.timestamp.replace(":", "-").replace(".", "-")
blob_file_path = f"{FOLDER_PATH}/{state.company_name.replace(' ', '_')}_{timestamp_safe}.json"
try:
# Convert to JSON
json_content = json.dumps(research_data, indent=2)
# Upload to bucket
get_uipath_sdk().buckets.upload(
name=BUCKET_NAME,
blob_file_path=blob_file_path,
content=json_content,
content_type="application/json",
folder_path=FOLDER_PATH
)
state.storage_location = blob_file_path
state.status = "completed"
print(f"[Storage] Data successfully stored at: {blob_file_path}")
except Exception as e:
print(f"[Storage] Error storing data in bucket: {e}")
state.storage_location = f"Error: {str(e)}"
state.status = "storage_failed"
# Return final output
return GraphOutput(
company_name=state.company_name,
research_summary=state.research_summary,
tavily_results=state.tavily_results,
historical_data=state.historical_data,
storage_location=state.storage_location,
timestamp=state.timestamp,
status=state.status
)
# Build the LangGraph
print("[Agent] Building LangGraph agent...")
builder = StateGraph(AgentState, input=GraphInput, output=GraphOutput)
# Add nodes
builder.add_node("research_company", research_company)
builder.add_node("retrieve_historical_data", retrieve_historical_data)
builder.add_node("analyze_and_summarize", analyze_and_summarize)
builder.add_node("store_in_bucket", store_in_bucket)
# Define edges (sequential workflow)
builder.add_edge(START, "research_company")
builder.add_edge("research_company", "retrieve_historical_data")
builder.add_edge("retrieve_historical_data", "analyze_and_summarize")
builder.add_edge("analyze_and_summarize", "store_in_bucket")
builder.add_edge("store_in_bucket", END)
# Compile the graph
graph = builder.compile()
print("[Agent] LangGraph agent compiled successfully!")
# Classic entrypoint for backward compatibility
def main(input_data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Classic entrypoint that can be called directly
"""
if input_data is None:
input_data = {"company_name": "Apple Inc"}
print(f"\n[Main] Starting financial research for: {input_data.get('company_name', 'Unknown')}")
# Run the graph synchronously
import asyncio
result = asyncio.run(graph.ainvoke(GraphInput(**input_data)))
return result.model_dump() if hasattr(result, 'model_dump') else result
if __name__ == "__main__":
# Test the agent locally
test_result = main({"company_name": "Microsoft"})
print("\n" + "="*80)
print("RESEARCH COMPLETE")
print("="*80)
print(f"Company: {test_result['company_name']}")
print(f"Status: {test_result['status']}")
print(f"Storage Location: {test_result['storage_location']}")
print(f"\nSummary:\n{test_result['research_summary'][:500]}...")
Editor is loading...
Leave a Comment