Untitled
unknown
plain_text
3 months ago
1.9 kB
2
Indexable
%%writefile agent_mcp.py import asyncio import nest_asyncio from google.adk.agents.llm_agent import LlmAgent from google.genai import types from google.adk.sessions import InMemorySessionService from google.adk.runners import Runner from google.adk.tools.mcp_tool.mcp_toolset import MCPToolset, StdioServerParameters nest_asyncio.apply() async def get_agent(): tools, exit_stack = await MCPToolset.from_server( connection_params=StdioServerParameters( command="npx", args=[ "-y", "@openbnb/mcp-server-airbnb", "--ignore-robots-txt" ] ) ) root_agent = LlmAgent( name="booking_assistant", model="gemini-2.0-flash", description="Help the user with booking listings on Airbnb", instruction="You are a booking assistant agent. You need to list details from Airbnb.", tools=tools ) return root_agent, exit_stack async def main(): session_service = InMemorySessionService() session = session_service.create_session( state={}, app_name="booking_app", user_id="user_bnb" ) query = "What listings are available in Gujarat Ahmedabad for 3 days and 2 nights from 1st June to 4th June 2025?" content = types.Content(role="user", parts=[types.Part(text=query)]) root_agent, exit_stack = await get_agent() runner = Runner( app_name="booking_app", agent=root_agent, session_service=session_service ) response = runner.run_async( session_id=session.id, user_id=session.user_id, new_message=content ) async for item in response: print(item.content.parts[0].text) print("Closing MCP") await exit_stack.aclose() if __name__ == "__main__": asyncio.run(main())
Editor is loading...
Leave a Comment