Untitled

 avatar
unknown
python
6 months ago
10 kB
13
Indexable
"""
UiPath LangChain Agent with Tavily Integration for Financial Data Research

This agent:
1. Uses Tavily to research financial data of a given company
2. Stores the retrieved data in UiPath Storage Bucket "vibecoded-agent" inside "Shared" folder
3. Retrieves existing data from the Context Grounding Index "companies" in "Shared" folder
4. Returns comprehensive financial analysis
"""

from dotenv import load_dotenv
from pydantic import BaseModel, Field
from typing import Optional, List, Dict, Any
from datetime import datetime
import json

from langgraph.graph import StateGraph, START, END
from langchain_tavily import TavilySearch
from uipath_langchain.chat import UiPathChat
from uipath_langchain.vectorstores import ContextGroundingVectorStore
from uipath import UiPath

# Load environment variables
load_dotenv()

# Configuration
BUCKET_NAME = "vibecoded-agent"
FOLDER_PATH = "Shared"
INDEX_NAME = "companies"

# Lazy initialization for SDK and LLM (initialized on first use)
_uipath_sdk = None
_llm = None


def get_uipath_sdk():
    """Get or initialize UiPath SDK instance"""
    global _uipath_sdk
    if _uipath_sdk is None:
        _uipath_sdk = UiPath()
    return _uipath_sdk


def get_llm():
    """Get or initialize LLM instance"""
    global _llm
    if _llm is None:
        _llm = UiPathChat(model="gpt-4o-mini-2024-07-18", temperature=0.3)
    return _llm


class GraphInput(BaseModel):
    """Input model for the financial research agent"""
    company_name: str = Field(description="Name of the company to research")


class GraphOutput(BaseModel):
    """Output model for the financial research agent"""
    company_name: str = Field(description="Name of the company researched")
    research_summary: str = Field(description="Summary of financial research findings")
    tavily_results: List[Dict[str, Any]] = Field(description="Raw research results from Tavily")
    historical_data: Optional[str] = Field(description="Historical data retrieved from index", default=None)
    storage_location: str = Field(description="Location where data was stored in bucket")
    timestamp: str = Field(description="Timestamp of the research")
    status: str = Field(description="Status of the operation")


class AgentState(BaseModel):
    """Internal state for the agent graph"""
    company_name: str
    research_summary: str = ""
    tavily_results: List[Dict[str, Any]] = []
    historical_data: Optional[str] = None
    storage_location: str = ""
    timestamp: str = ""
    status: str = "initialized"


async def research_company(state: AgentState) -> AgentState:
    """
    Node 1: Research company financial data using Tavily
    """
    print(f"[Research] Starting Tavily research for: {state.company_name}")

    # Initialize Tavily search tool
    tavily_tool = TavilySearch(
        max_results=5,
        search_depth="advanced",
        include_answer=True,
        include_raw_content=False
    )

    # Construct comprehensive search query
    search_query = f"{state.company_name} financial data revenue earnings stock performance quarterly results"

    try:
        # Execute Tavily search
        results = tavily_tool.invoke({"query": search_query})

        # Store results
        state.tavily_results = results if isinstance(results, list) else [results]
        state.timestamp = datetime.now().isoformat()
        state.status = "research_completed"

        print(f"[Research] Found {len(state.tavily_results)} results from Tavily")

    except Exception as e:
        print(f"[Research] Error during Tavily search: {e}")
        state.tavily_results = []
        state.status = "research_failed"

    return state


async def retrieve_historical_data(state: AgentState) -> AgentState:
    """
    Node 2: Retrieve historical data from Context Grounding Index
    """
    print(f"[Retrieval] Searching index '{INDEX_NAME}' for historical data on: {state.company_name}")

    try:
        # Initialize Context Grounding Vector Store
        vector_store = ContextGroundingVectorStore(
            index_name=INDEX_NAME,
            folder_path=FOLDER_PATH,
            uipath_sdk=get_uipath_sdk()
        )

        # Search for relevant historical documents
        query = f"financial data for {state.company_name}"
        docs = vector_store.similarity_search(query=query, k=3)

        if docs:
            # Combine retrieved documents
            historical_content = "\n\n".join([
                f"Source: {doc.metadata.get('source', 'Unknown')}\n{doc.page_content}"
                for doc in docs
            ])
            state.historical_data = historical_content
            print(f"[Retrieval] Found {len(docs)} historical documents")
        else:
            state.historical_data = "No historical data found in index."
            print("[Retrieval] No historical documents found")

    except Exception as e:
        print(f"[Retrieval] Error retrieving from index: {e}")
        state.historical_data = f"Error retrieving historical data: {str(e)}"

    return state


async def analyze_and_summarize(state: AgentState) -> AgentState:
    """
    Node 3: Use LLM to analyze and summarize all collected data
    """
    print("[Analysis] Generating comprehensive financial summary using LLM")

    # Prepare context from Tavily results
    tavily_context = "\n\n".join([
        f"Source {i+1}: {result.get('url', 'N/A')}\n{result.get('content', '')}"
        for i, result in enumerate(state.tavily_results)
    ])

    # Create comprehensive prompt
    prompt = f"""You are a financial analyst. Analyze the following information about {state.company_name} and provide a comprehensive financial summary.

RECENT RESEARCH (from Tavily):
{tavily_context}

HISTORICAL DATA (from company index):
{state.historical_data or 'No historical data available'}

Please provide:
1. A concise executive summary
2. Key financial metrics and trends
3. Recent developments
4. Comparison with historical data (if available)

Keep the analysis professional, factual, and well-structured."""

    try:
        # Generate summary using LLM
        response = await get_llm().ainvoke(prompt)
        state.research_summary = response.content
        print("[Analysis] Summary generated successfully")

    except Exception as e:
        print(f"[Analysis] Error generating summary: {e}")
        state.research_summary = f"Error generating summary: {str(e)}"

    return state


async def store_in_bucket(state: AgentState) -> GraphOutput:
    """
    Node 4: Store the research results in UiPath Storage Bucket
    """
    print(f"[Storage] Storing research data in bucket '{BUCKET_NAME}'")

    # Prepare data to store
    research_data = {
        "company_name": state.company_name,
        "timestamp": state.timestamp,
        "research_summary": state.research_summary,
        "tavily_results": state.tavily_results,
        "historical_data": state.historical_data
    }

    # Create filename with timestamp
    timestamp_safe = state.timestamp.replace(":", "-").replace(".", "-")
    blob_file_path = f"{FOLDER_PATH}/{state.company_name.replace(' ', '_')}_{timestamp_safe}.json"

    try:
        # Convert to JSON
        json_content = json.dumps(research_data, indent=2)

        # Upload to bucket
        get_uipath_sdk().buckets.upload(
            name=BUCKET_NAME,
            blob_file_path=blob_file_path,
            content=json_content,
            content_type="application/json",
            folder_path=FOLDER_PATH
        )

        state.storage_location = blob_file_path
        state.status = "completed"
        print(f"[Storage] Data successfully stored at: {blob_file_path}")

    except Exception as e:
        print(f"[Storage] Error storing data in bucket: {e}")
        state.storage_location = f"Error: {str(e)}"
        state.status = "storage_failed"

    # Return final output
    return GraphOutput(
        company_name=state.company_name,
        research_summary=state.research_summary,
        tavily_results=state.tavily_results,
        historical_data=state.historical_data,
        storage_location=state.storage_location,
        timestamp=state.timestamp,
        status=state.status
    )


# Build the LangGraph
print("[Agent] Building LangGraph agent...")

builder = StateGraph(AgentState, input=GraphInput, output=GraphOutput)

# Add nodes
builder.add_node("research_company", research_company)
builder.add_node("retrieve_historical_data", retrieve_historical_data)
builder.add_node("analyze_and_summarize", analyze_and_summarize)
builder.add_node("store_in_bucket", store_in_bucket)

# Define edges (sequential workflow)
builder.add_edge(START, "research_company")
builder.add_edge("research_company", "retrieve_historical_data")
builder.add_edge("retrieve_historical_data", "analyze_and_summarize")
builder.add_edge("analyze_and_summarize", "store_in_bucket")
builder.add_edge("store_in_bucket", END)

# Compile the graph
graph = builder.compile()

print("[Agent] LangGraph agent compiled successfully!")


# Classic entrypoint for backward compatibility
def main(input_data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
    """
    Classic entrypoint that can be called directly
    """
    if input_data is None:
        input_data = {"company_name": "Apple Inc"}

    print(f"\n[Main] Starting financial research for: {input_data.get('company_name', 'Unknown')}")

    # Run the graph synchronously
    import asyncio
    result = asyncio.run(graph.ainvoke(GraphInput(**input_data)))

    return result.model_dump() if hasattr(result, 'model_dump') else result


if __name__ == "__main__":
    # Test the agent locally
    test_result = main({"company_name": "Microsoft"})
    print("\n" + "="*80)
    print("RESEARCH COMPLETE")
    print("="*80)
    print(f"Company: {test_result['company_name']}")
    print(f"Status: {test_result['status']}")
    print(f"Storage Location: {test_result['storage_location']}")
    print(f"\nSummary:\n{test_result['research_summary'][:500]}...")
Editor is loading...
Leave a Comment