Untitled

 avatar
unknown
plain_text
3 months ago
1.9 kB
2
Indexable
%%writefile agent_mcp.py
import asyncio
import nest_asyncio
from google.adk.agents.llm_agent import LlmAgent
from google.genai import types
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.adk.tools.mcp_tool.mcp_toolset import MCPToolset, StdioServerParameters

nest_asyncio.apply()


async def get_agent():
    tools, exit_stack = await MCPToolset.from_server(
        connection_params=StdioServerParameters(
            command="npx",
            args=[
                "-y",
                "@openbnb/mcp-server-airbnb",
                "--ignore-robots-txt"
            ]
        )
    )

    root_agent = LlmAgent(
        name="booking_assistant",
        model="gemini-2.0-flash",
        description="Help the user with booking listings on Airbnb",
        instruction="You are a booking assistant agent. You need to list details from Airbnb.",
        tools=tools
    )

    return root_agent, exit_stack


async def main():
    session_service = InMemorySessionService()
    session = session_service.create_session(
        state={}, app_name="booking_app", user_id="user_bnb"
    )

    query = "What listings are available in Gujarat Ahmedabad for 3 days and 2 nights from 1st June to 4th June 2025?"
    content = types.Content(role="user", parts=[types.Part(text=query)])

    root_agent, exit_stack = await get_agent()

    runner = Runner(
        app_name="booking_app",
        agent=root_agent,
        session_service=session_service
    )

    response = runner.run_async(
        session_id=session.id,
        user_id=session.user_id,
        new_message=content
    )

    async for item in response:
        print(item.content.parts[0].text)

    print("Closing MCP")
    await exit_stack.aclose()


if __name__ == "__main__":
    asyncio.run(main())
Editor is loading...
Leave a Comment