diff --git a/examples/tutorials/00_sync/030_langgraph/.dockerignore b/examples/tutorials/00_sync/030_langgraph/.dockerignore new file mode 100644 index 000000000..c49489471 --- /dev/null +++ b/examples/tutorials/00_sync/030_langgraph/.dockerignore @@ -0,0 +1,43 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Environments +.env** +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# Git +.git +.gitignore + +# Misc +.DS_Store diff --git a/examples/tutorials/00_sync/030_langgraph/Dockerfile b/examples/tutorials/00_sync/030_langgraph/Dockerfile new file mode 100644 index 000000000..ed7172f0d --- /dev/null +++ b/examples/tutorials/00_sync/030_langgraph/Dockerfile @@ -0,0 +1,50 @@ +# syntax=docker/dockerfile:1.3 +FROM python:3.12-slim +COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/ + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + htop \ + vim \ + curl \ + tar \ + python3-dev \ + postgresql-client \ + build-essential \ + libpq-dev \ + gcc \ + cmake \ + netcat-openbsd \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +RUN uv pip install --system --upgrade pip setuptools wheel + +ENV UV_HTTP_TIMEOUT=1000 + +# Copy pyproject.toml and README.md to install dependencies +COPY 00_sync/030_langgraph/pyproject.toml /app/030_langgraph/pyproject.toml +COPY 00_sync/030_langgraph/README.md /app/030_langgraph/README.md + +WORKDIR /app/030_langgraph + +# Copy the project code +COPY 00_sync/030_langgraph/project /app/030_langgraph/project + +# Copy the test files +COPY 00_sync/030_langgraph/tests /app/030_langgraph/tests + +# Copy shared test utilities +COPY test_utils /app/test_utils + +# Install the required Python packages with dev dependencies +RUN uv pip install --system .[dev] + +# Set environment variables +ENV PYTHONPATH=/app + +# Set test environment variables +ENV AGENT_NAME=s030-langgraph + +# Run the agent using uvicorn +CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"] diff --git a/examples/tutorials/00_sync/030_langgraph/README.md b/examples/tutorials/00_sync/030_langgraph/README.md new file mode 100644 index 000000000..e5b1db0f7 --- /dev/null +++ b/examples/tutorials/00_sync/030_langgraph/README.md @@ -0,0 +1,48 @@ +# Tutorial 030: Sync LangGraph Agent + +This tutorial demonstrates how to build a **synchronous** LangGraph agent on AgentEx with: +- Tool calling (ReAct pattern) +- Streaming token output +- Multi-turn conversation memory via AgentEx checkpointer +- Tracing integration + +## Graph Structure + +![Graph](graph.png) + +## Key Concepts + +### Sync ACP +The sync ACP model uses HTTP request/response for communication. The `@acp.on_message_send` handler receives a message and yields streaming events back to the client. + +### LangGraph Integration +- **StateGraph**: Defines the agent's state machine with `AgentState` (message history) +- **ToolNode**: Automatically executes tool calls from the LLM +- **tools_condition**: Routes between tool execution and final response +- **Checkpointer**: Uses AgentEx's HTTP checkpointer for cross-request memory + +### Streaming +The agent streams tokens as they're generated using `convert_langgraph_to_agentex_events()`, which converts LangGraph's stream events into AgentEx `TaskMessageUpdate` events. + +## Files + +| File | Description | +|------|-------------| +| `project/acp.py` | ACP server and message handler | +| `project/graph.py` | LangGraph state graph definition | +| `project/tools.py` | Tool definitions (weather example) | +| `tests/test_agent.py` | Integration tests | +| `manifest.yaml` | Agent configuration | + +## Running Locally + +```bash +# From this directory +agentex agents run +``` + +## Running Tests + +```bash +pytest tests/test_agent.py -v +``` diff --git a/examples/tutorials/00_sync/030_langgraph/graph.png b/examples/tutorials/00_sync/030_langgraph/graph.png new file mode 100644 index 000000000..16d22a1e7 Binary files /dev/null and b/examples/tutorials/00_sync/030_langgraph/graph.png differ diff --git a/examples/tutorials/00_sync/030_langgraph/manifest.yaml b/examples/tutorials/00_sync/030_langgraph/manifest.yaml new file mode 100644 index 000000000..bfe005626 --- /dev/null +++ b/examples/tutorials/00_sync/030_langgraph/manifest.yaml @@ -0,0 +1,58 @@ +build: + context: + root: ../../ + include_paths: + - 00_sync/030_langgraph + - test_utils + dockerfile: 00_sync/030_langgraph/Dockerfile + dockerignore: 00_sync/030_langgraph/.dockerignore + +local_development: + agent: + port: 8000 + host_address: host.docker.internal + paths: + acp: project/acp.py + +agent: + acp_type: sync + name: s030-langgraph + description: A sync LangGraph agent with tool calling and streaming + + temporal: + enabled: false + + credentials: + - env_var_name: OPENAI_API_KEY + secret_name: openai-api-key + secret_key: api-key + - env_var_name: REDIS_URL + secret_name: redis-url-secret + secret_key: url + - env_var_name: SGP_API_KEY + secret_name: sgp-api-key + secret_key: api-key + - env_var_name: SGP_ACCOUNT_ID + secret_name: sgp-account-id + secret_key: account-id + - env_var_name: SGP_CLIENT_BASE_URL + secret_name: sgp-client-base-url + secret_key: url + +deployment: + image: + repository: "" + tag: "latest" + + global: + agent: + name: "s030-langgraph" + description: "A sync LangGraph agent with tool calling and streaming" + replicaCount: 1 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" diff --git a/examples/tutorials/00_sync/030_langgraph/project/__init__.py b/examples/tutorials/00_sync/030_langgraph/project/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/examples/tutorials/00_sync/030_langgraph/project/acp.py b/examples/tutorials/00_sync/030_langgraph/project/acp.py new file mode 100644 index 000000000..517a00322 --- /dev/null +++ b/examples/tutorials/00_sync/030_langgraph/project/acp.py @@ -0,0 +1,94 @@ +""" +ACP (Agent Communication Protocol) handler for Agentex. + +This is the API layer — it manages the graph lifecycle and streams +tokens and tool calls from the LangGraph graph to the Agentex frontend. +""" + +from __future__ import annotations + +import os +from typing import AsyncGenerator + +from dotenv import load_dotenv + +load_dotenv() + +import agentex.lib.adk as adk +from project.graph import create_graph +from agentex.lib.adk import create_langgraph_tracing_handler, convert_langgraph_to_agentex_events +from agentex.lib.types.acp import SendMessageParams +from agentex.lib.types.tracing import SGPTracingProcessorConfig +from agentex.lib.utils.logging import make_logger +from agentex.lib.sdk.fastacp.fastacp import FastACP +from agentex.types.task_message_delta import TextDelta +from agentex.types.task_message_update import TaskMessageUpdate +from agentex.types.task_message_content import TaskMessageContent +from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config + +logger = make_logger(__name__) + +# Register the Agentex tracing processor so spans are shipped to the backend +add_tracing_processor_config( + SGPTracingProcessorConfig( + sgp_api_key=os.environ.get("SGP_API_KEY", ""), + sgp_account_id=os.environ.get("SGP_ACCOUNT_ID", ""), + sgp_base_url=os.environ.get("SGP_CLIENT_BASE_URL", ""), + )) +# Create ACP server +acp = FastACP.create(acp_type="sync") + +# Compiled graph (lazy-initialized on first request) +_graph = None + + +async def get_graph(): + """Get or create the compiled graph instance.""" + global _graph + if _graph is None: + _graph = await create_graph() + return _graph + + +@acp.on_message_send +async def handle_message_send( + params: SendMessageParams, +) -> TaskMessageContent | list[TaskMessageContent] | AsyncGenerator[TaskMessageUpdate, None]: + """Handle incoming messages from Agentex, streaming tokens and tool calls.""" + graph = await get_graph() + + thread_id = params.task.id + user_message = params.content.content + + logger.info(f"Processing message for thread {thread_id}") + + async with adk.tracing.span( + trace_id=thread_id, + name="message", + input={"message": user_message}, + data={"__span_type__": "AGENT_WORKFLOW"}, + ) as turn_span: + callback = create_langgraph_tracing_handler( + trace_id=thread_id, + parent_span_id=turn_span.id if turn_span else None, + ) + + stream = graph.astream( + {"messages": [{"role": "user", "content": user_message}]}, + config={ + "configurable": {"thread_id": thread_id}, + "callbacks": [callback], + }, + stream_mode=["messages", "updates"], + ) + + final_text = "" + async for event in convert_langgraph_to_agentex_events(stream): + # Accumulate text deltas for span output + delta = getattr(event, "delta", None) + if isinstance(delta, TextDelta) and delta.text_delta: + final_text += delta.text_delta + yield event + + if turn_span: + turn_span.output = {"final_output": final_text} diff --git a/examples/tutorials/00_sync/030_langgraph/project/graph.py b/examples/tutorials/00_sync/030_langgraph/project/graph.py new file mode 100644 index 000000000..53728cd58 --- /dev/null +++ b/examples/tutorials/00_sync/030_langgraph/project/graph.py @@ -0,0 +1,73 @@ +""" +LangGraph graph definition. + +Defines the state, nodes, edges, and compiles the graph. +The compiled graph is the boundary between this module and the API layer. +""" + +from __future__ import annotations + +from typing import Any, Annotated +from datetime import datetime +from typing_extensions import TypedDict + +from langgraph.graph import START, StateGraph +from langchain_openai import ChatOpenAI +from langgraph.prebuilt import ToolNode, tools_condition +from langchain_core.messages import SystemMessage +from langgraph.graph.message import add_messages + +from project.tools import TOOLS +from agentex.lib.adk import create_checkpointer + +MODEL_NAME = "gpt-5" +SYSTEM_PROMPT = """You are a helpful AI assistant with access to tools. + +Current date and time: {timestamp} + +Guidelines: +- Be concise and helpful +- Use tools when they would help answer the user's question +- If you're unsure, ask clarifying questions +- Always provide accurate information +""" + + +class AgentState(TypedDict): + """State schema for the agent graph.""" + messages: Annotated[list[Any], add_messages] + + +async def create_graph(): + """Create and compile the agent graph with checkpointer. + + Returns: + A compiled LangGraph StateGraph ready for invocation. + """ + llm = ChatOpenAI( + model=MODEL_NAME, + reasoning={"effort": "high", "summary": "auto"}, + ) + llm_with_tools = llm.bind_tools(TOOLS) + + checkpointer = await create_checkpointer() + + def agent_node(state: AgentState) -> dict[str, Any]: + """Process the current state and generate a response.""" + messages = state["messages"] + if not messages or not isinstance(messages[0], SystemMessage): + system_content = SYSTEM_PROMPT.format( + timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S") + ) + messages = [SystemMessage(content=system_content)] + messages + response = llm_with_tools.invoke(messages) + return {"messages": [response]} + + builder = StateGraph(AgentState) + builder.add_node("agent", agent_node) + builder.add_node("tools", ToolNode(tools=TOOLS)) + builder.add_edge(START, "agent") + builder.add_conditional_edges("agent", tools_condition, "tools") + builder.add_edge("tools", "agent") + + return builder.compile(checkpointer=checkpointer) diff --git a/examples/tutorials/00_sync/030_langgraph/project/tools.py b/examples/tutorials/00_sync/030_langgraph/project/tools.py new file mode 100644 index 000000000..1b402a906 --- /dev/null +++ b/examples/tutorials/00_sync/030_langgraph/project/tools.py @@ -0,0 +1,32 @@ +""" +Tool definitions for the LangGraph agent. + +Add your custom tools here. Each tool should be a function decorated with @tool +or created using the Tool class. +""" + +from langchain_core.tools import Tool + + +def get_weather(city: str) -> str: + """Get the current weather for a city. + + Args: + city: The name of the city to get weather for. + + Returns: + A string describing the weather conditions. + """ + # TODO: Replace with actual weather API call + return f"The weather in {city} is sunny and 72°F" + + +# Define tools +weather_tool = Tool( + name="get_weather", + func=get_weather, + description="Get the current weather for a city. Input should be a city name.", +) + +# Export all tools as a list +TOOLS = [weather_tool] diff --git a/examples/tutorials/00_sync/030_langgraph/pyproject.toml b/examples/tutorials/00_sync/030_langgraph/pyproject.toml new file mode 100644 index 000000000..fc9f99971 --- /dev/null +++ b/examples/tutorials/00_sync/030_langgraph/pyproject.toml @@ -0,0 +1,37 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "s030-langgraph" +version = "0.1.0" +description = "A sync LangGraph agent with tool calling and streaming" +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + "agentex-sdk", + "scale-gp", + "langgraph", + "langchain-openai", +] + +[project.optional-dependencies] +dev = [ + "pytest", + "pytest-asyncio", + "httpx", + "black", + "isort", + "flake8", +] + +[tool.hatch.build.targets.wheel] +packages = ["project"] + +[tool.black] +line-length = 88 +target-version = ['py312'] + +[tool.isort] +profile = "black" +line_length = 88 diff --git a/examples/tutorials/00_sync/030_langgraph/tests/test_agent.py b/examples/tutorials/00_sync/030_langgraph/tests/test_agent.py new file mode 100644 index 000000000..36fcf418f --- /dev/null +++ b/examples/tutorials/00_sync/030_langgraph/tests/test_agent.py @@ -0,0 +1,169 @@ +""" +Tests for the sync LangGraph agent. + +This test suite validates: +- Non-streaming message sending with tool-calling LangGraph agent +- Streaming message sending with token-by-token output + +To run these tests: +1. Make sure the agent is running (via docker-compose or `agentex agents run`) +2. Set the AGENTEX_API_BASE_URL environment variable if not using default +3. Run: pytest test_agent.py -v + +Configuration: +- AGENTEX_API_BASE_URL: Base URL for the AgentEx server (default: http://localhost:5003) +- AGENT_NAME: Name of the agent to test (default: s030-langgraph) +""" + +import os + +import pytest +from test_utils.sync import validate_text_in_string, collect_streaming_response + +from agentex import Agentex +from agentex.types import TextContent, TextContentParam +from agentex.types.agent_rpc_params import ParamsCreateTaskRequest, ParamsSendMessageRequest +from agentex.lib.sdk.fastacp.base.base_acp_server import uuid + +# Configuration from environment variables +AGENTEX_API_BASE_URL = os.environ.get("AGENTEX_API_BASE_URL", "http://localhost:5003") +AGENT_NAME = os.environ.get("AGENT_NAME", "s030-langgraph") + + +@pytest.fixture +def client(): + """Create an AgentEx client instance for testing.""" + return Agentex(base_url=AGENTEX_API_BASE_URL) + + +@pytest.fixture +def agent_name(): + """Return the agent name for testing.""" + return AGENT_NAME + + +@pytest.fixture +def agent_id(client, agent_name): + """Retrieve the agent ID based on the agent name.""" + agents = client.agents.list() + for agent in agents: + if agent.name == agent_name: + return agent.id + raise ValueError(f"Agent with name {agent_name} not found.") + + +class TestNonStreamingMessages: + """Test non-streaming message sending with LangGraph agent.""" + + def test_send_simple_message(self, client: Agentex, agent_name: str): + """Test sending a simple message and receiving a response.""" + response = client.agents.send_message( + agent_name=agent_name, + params=ParamsSendMessageRequest( + content=TextContentParam( + author="user", + content="Hello! What can you help me with?", + type="text", + ) + ), + ) + result = response.result + assert result is not None + assert len(result) >= 1 + + def test_tool_calling(self, client: Agentex, agent_name: str): + """Test that the agent can use tools (e.g., weather tool).""" + response = client.agents.send_message( + agent_name=agent_name, + params=ParamsSendMessageRequest( + content=TextContentParam( + author="user", + content="What's the weather in San Francisco?", + type="text", + ) + ), + ) + result = response.result + assert result is not None + assert len(result) >= 1 + + def test_multiturn_conversation(self, client: Agentex, agent_name: str, agent_id: str): + """Test multi-turn conversation with memory via LangGraph checkpointer.""" + task_response = client.agents.create_task(agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex)) + task = task_response.result + assert task is not None + + # First message + response1 = client.agents.send_message( + agent_name=agent_name, + params=ParamsSendMessageRequest( + content=TextContentParam( + author="user", + content="My name is Alice. Remember that.", + type="text", + ), + task_id=task.id, + ), + ) + assert response1.result is not None + + # Second message - agent should remember the name + response2 = client.agents.send_message( + agent_name=agent_name, + params=ParamsSendMessageRequest( + content=TextContentParam( + author="user", + content="What is my name?", + type="text", + ), + task_id=task.id, + ), + ) + assert response2.result is not None + for message in response2.result: + if isinstance(message.content, TextContent): + validate_text_in_string("alice", message.content.content.lower()) + + +class TestStreamingMessages: + """Test streaming message sending with LangGraph agent.""" + + def test_stream_simple_message(self, client: Agentex, agent_name: str): + """Test streaming a simple message response.""" + stream = client.agents.send_message_stream( + agent_name=agent_name, + params=ParamsSendMessageRequest( + content=TextContentParam( + author="user", + content="Tell me a short joke.", + type="text", + ) + ), + ) + + aggregated_content, chunks = collect_streaming_response(stream) + + assert aggregated_content is not None + assert len(chunks) > 1, "No chunks received in streaming response." + + def test_stream_tool_calling(self, client: Agentex, agent_name: str): + """Test streaming with tool calls.""" + stream = client.agents.send_message_stream( + agent_name=agent_name, + params=ParamsSendMessageRequest( + content=TextContentParam( + author="user", + content="What's the weather in New York?", + type="text", + ) + ), + ) + + aggregated_content, chunks = collect_streaming_response(stream) + + assert aggregated_content is not None + assert len(chunks) > 0, "No chunks received in streaming response." + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/examples/tutorials/10_async/00_base/100_langgraph/.dockerignore b/examples/tutorials/10_async/00_base/100_langgraph/.dockerignore new file mode 100644 index 000000000..c49489471 --- /dev/null +++ b/examples/tutorials/10_async/00_base/100_langgraph/.dockerignore @@ -0,0 +1,43 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Environments +.env** +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# Git +.git +.gitignore + +# Misc +.DS_Store diff --git a/examples/tutorials/10_async/00_base/100_langgraph/Dockerfile b/examples/tutorials/10_async/00_base/100_langgraph/Dockerfile new file mode 100644 index 000000000..c2e4b464c --- /dev/null +++ b/examples/tutorials/10_async/00_base/100_langgraph/Dockerfile @@ -0,0 +1,50 @@ +# syntax=docker/dockerfile:1.3 +FROM python:3.12-slim +COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/ + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + htop \ + vim \ + curl \ + tar \ + python3-dev \ + postgresql-client \ + build-essential \ + libpq-dev \ + gcc \ + cmake \ + netcat-openbsd \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +RUN uv pip install --system --upgrade pip setuptools wheel + +ENV UV_HTTP_TIMEOUT=1000 + +# Copy pyproject.toml and README.md to install dependencies +COPY 10_async/00_base/100_langgraph/pyproject.toml /app/100_langgraph/pyproject.toml +COPY 10_async/00_base/100_langgraph/README.md /app/100_langgraph/README.md + +WORKDIR /app/100_langgraph + +# Copy the project code +COPY 10_async/00_base/100_langgraph/project /app/100_langgraph/project + +# Copy the test files +COPY 10_async/00_base/100_langgraph/tests /app/100_langgraph/tests + +# Copy shared test utilities +COPY test_utils /app/test_utils + +# Install the required Python packages with dev dependencies +RUN uv pip install --system .[dev] pytest-asyncio httpx + +# Set environment variables +ENV PYTHONPATH=/app + +# Set test environment variables +ENV AGENT_NAME=ab100-langgraph + +# Run the agent using uvicorn +CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"] diff --git a/examples/tutorials/10_async/00_base/100_langgraph/README.md b/examples/tutorials/10_async/00_base/100_langgraph/README.md new file mode 100644 index 000000000..6f6c6a36b --- /dev/null +++ b/examples/tutorials/10_async/00_base/100_langgraph/README.md @@ -0,0 +1,51 @@ +# Tutorial 100: Async LangGraph Agent + +This tutorial demonstrates how to build an **asynchronous** LangGraph agent on AgentEx with: +- Task-based event handling via Redis +- Tool calling (ReAct pattern) +- Multi-turn conversation memory via AgentEx checkpointer +- Tracing integration + +## Graph Structure + +![Graph](graph.png) + +## Sync vs Async: Key Differences + +| Aspect | Sync (Tutorial 030) | Async (This Tutorial) | +|--------|--------------------|-----------------------| +| **ACP Type** | `sync` | `async` | +| **Handler** | `@acp.on_message_send` | `@acp.on_task_event_send` | +| **Response** | HTTP streaming (yields) | Redis streaming | +| **Message Echo** | Implicit | Explicit (`adk.messages.create`) | +| **Streaming Helper** | `convert_langgraph_to_agentex_events()` | `stream_langgraph_events()` | +| **Extra Handlers** | None | `on_task_create`, `on_task_cancel` | + +### When to use Async? +- Long-running tasks that may exceed HTTP timeout +- Agents that need to push updates asynchronously +- Multi-step workflows where the client polls for results +- Production agents that need reliable message delivery via Redis + +## Files + +| File | Description | +|------|-------------| +| `project/acp.py` | ACP server with async event handlers | +| `project/graph.py` | LangGraph state graph definition | +| `project/tools.py` | Tool definitions (weather example) | +| `tests/test_agent.py` | Integration tests | +| `manifest.yaml` | Agent configuration | + +## Running Locally + +```bash +# From this directory +agentex agents run +``` + +## Running Tests + +```bash +pytest tests/test_agent.py -v +``` diff --git a/examples/tutorials/10_async/00_base/100_langgraph/graph.png b/examples/tutorials/10_async/00_base/100_langgraph/graph.png new file mode 100644 index 000000000..16d22a1e7 Binary files /dev/null and b/examples/tutorials/10_async/00_base/100_langgraph/graph.png differ diff --git a/examples/tutorials/10_async/00_base/100_langgraph/manifest.yaml b/examples/tutorials/10_async/00_base/100_langgraph/manifest.yaml new file mode 100644 index 000000000..1b0b5d490 --- /dev/null +++ b/examples/tutorials/10_async/00_base/100_langgraph/manifest.yaml @@ -0,0 +1,58 @@ +build: + context: + root: ../../../ + include_paths: + - 10_async/00_base/100_langgraph + - test_utils + dockerfile: 10_async/00_base/100_langgraph/Dockerfile + dockerignore: 10_async/00_base/100_langgraph/.dockerignore + +local_development: + agent: + port: 8000 + host_address: host.docker.internal + paths: + acp: project/acp.py + +agent: + acp_type: async + name: ab100-langgraph + description: An async LangGraph agent with tool calling and Redis streaming + + temporal: + enabled: false + + credentials: + - env_var_name: OPENAI_API_KEY + secret_name: openai-api-key + secret_key: api-key + - env_var_name: REDIS_URL + secret_name: redis-url-secret + secret_key: url + - env_var_name: SGP_API_KEY + secret_name: sgp-api-key + secret_key: api-key + - env_var_name: SGP_ACCOUNT_ID + secret_name: sgp-account-id + secret_key: account-id + - env_var_name: SGP_CLIENT_BASE_URL + secret_name: sgp-client-base-url + secret_key: url + +deployment: + image: + repository: "" + tag: "latest" + + global: + agent: + name: "ab100-langgraph" + description: "An async LangGraph agent with tool calling and Redis streaming" + replicaCount: 1 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" diff --git a/examples/tutorials/10_async/00_base/100_langgraph/project/__init__.py b/examples/tutorials/10_async/00_base/100_langgraph/project/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/examples/tutorials/10_async/00_base/100_langgraph/project/acp.py b/examples/tutorials/10_async/00_base/100_langgraph/project/acp.py new file mode 100644 index 000000000..2585fefd6 --- /dev/null +++ b/examples/tutorials/10_async/00_base/100_langgraph/project/acp.py @@ -0,0 +1,94 @@ +""" +ACP handler for async LangGraph agent. + +Uses the async ACP model with Redis streaming instead of HTTP yields. +""" + +from __future__ import annotations + +import os + +from dotenv import load_dotenv + +load_dotenv() + +import agentex.lib.adk as adk +from project.graph import create_graph +from agentex.lib.adk import stream_langgraph_events, create_langgraph_tracing_handler +from agentex.lib.types.acp import SendEventParams, CancelTaskParams, CreateTaskParams +from agentex.lib.types.fastacp import AsyncACPConfig +from agentex.lib.types.tracing import SGPTracingProcessorConfig +from agentex.lib.utils.logging import make_logger +from agentex.lib.sdk.fastacp.fastacp import FastACP +from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config + +logger = make_logger(__name__) + +add_tracing_processor_config( + SGPTracingProcessorConfig( + sgp_api_key=os.environ.get("SGP_API_KEY", ""), + sgp_account_id=os.environ.get("SGP_ACCOUNT_ID", ""), + sgp_base_url=os.environ.get("SGP_CLIENT_BASE_URL", ""), + )) + +acp = FastACP.create( + acp_type="async", + config=AsyncACPConfig(type="base"), +) + +_graph = None + + +async def get_graph(): + global _graph + if _graph is None: + _graph = await create_graph() + return _graph + + +@acp.on_task_event_send +async def handle_task_event_send(params: SendEventParams): + """Handle incoming events, streaming tokens and tool calls via Redis.""" + graph = await get_graph() + task_id = params.task.id + user_message = params.event.content.content + + logger.info(f"Processing message for thread {task_id}") + + # Echo the user's message + await adk.messages.create(task_id=task_id, content=params.event.content) + + async with adk.tracing.span( + trace_id=task_id, + name="message", + input={"message": user_message}, + data={"__span_type__": "AGENT_WORKFLOW"}, + ) as turn_span: + callback = create_langgraph_tracing_handler( + trace_id=task_id, + parent_span_id=turn_span.id if turn_span else None, + ) + + stream = graph.astream( + {"messages": [{"role": "user", "content": user_message}]}, + config={ + "configurable": {"thread_id": task_id}, + "callbacks": [callback], + }, + stream_mode=["messages", "updates"], + ) + + final_output = await stream_langgraph_events(stream, task_id) + + if turn_span: + turn_span.output = {"final_output": final_output} + + +@acp.on_task_create +async def handle_task_create(params: CreateTaskParams): + logger.info(f"Task created: {params.task.id}") + + +@acp.on_task_cancel +async def handle_task_canceled(params: CancelTaskParams): + logger.info(f"Task canceled: {params.task.id}") diff --git a/examples/tutorials/10_async/00_base/100_langgraph/project/graph.py b/examples/tutorials/10_async/00_base/100_langgraph/project/graph.py new file mode 100644 index 000000000..af6e31313 --- /dev/null +++ b/examples/tutorials/10_async/00_base/100_langgraph/project/graph.py @@ -0,0 +1,68 @@ +""" +LangGraph graph definition. + +Defines the state, nodes, edges, and compiles the graph. +""" + +from __future__ import annotations + +from typing import Any, Annotated +from datetime import datetime +from typing_extensions import TypedDict + +from langgraph.graph import START, StateGraph +from langchain_openai import ChatOpenAI +from langgraph.prebuilt import ToolNode, tools_condition +from langchain_core.messages import SystemMessage +from langgraph.graph.message import add_messages + +from project.tools import TOOLS +from agentex.lib.adk import create_checkpointer + +MODEL_NAME = "gpt-5" +SYSTEM_PROMPT = """You are a helpful AI assistant with access to tools. + +Current date and time: {timestamp} + +Guidelines: +- Be concise and helpful +- Use tools when they would help answer the user's question +- If you're unsure, ask clarifying questions +- Always provide accurate information +""" + + +class AgentState(TypedDict): + """State schema for the agent graph.""" + messages: Annotated[list[Any], add_messages] + + +async def create_graph(): + """Create and compile the agent graph with checkpointer.""" + llm = ChatOpenAI( + model=MODEL_NAME, + reasoning={"effort": "high", "summary": "auto"}, + ) + llm_with_tools = llm.bind_tools(TOOLS) + + checkpointer = await create_checkpointer() + + def agent_node(state: AgentState) -> dict[str, Any]: + """Process the current state and generate a response.""" + messages = state["messages"] + if not messages or not isinstance(messages[0], SystemMessage): + system_content = SYSTEM_PROMPT.format( + timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S") + ) + messages = [SystemMessage(content=system_content)] + messages + response = llm_with_tools.invoke(messages) + return {"messages": [response]} + + builder = StateGraph(AgentState) + builder.add_node("agent", agent_node) + builder.add_node("tools", ToolNode(tools=TOOLS)) + builder.add_edge(START, "agent") + builder.add_conditional_edges("agent", tools_condition, "tools") + builder.add_edge("tools", "agent") + + return builder.compile(checkpointer=checkpointer) diff --git a/examples/tutorials/10_async/00_base/100_langgraph/project/tools.py b/examples/tutorials/10_async/00_base/100_langgraph/project/tools.py new file mode 100644 index 000000000..1b402a906 --- /dev/null +++ b/examples/tutorials/10_async/00_base/100_langgraph/project/tools.py @@ -0,0 +1,32 @@ +""" +Tool definitions for the LangGraph agent. + +Add your custom tools here. Each tool should be a function decorated with @tool +or created using the Tool class. +""" + +from langchain_core.tools import Tool + + +def get_weather(city: str) -> str: + """Get the current weather for a city. + + Args: + city: The name of the city to get weather for. + + Returns: + A string describing the weather conditions. + """ + # TODO: Replace with actual weather API call + return f"The weather in {city} is sunny and 72°F" + + +# Define tools +weather_tool = Tool( + name="get_weather", + func=get_weather, + description="Get the current weather for a city. Input should be a city name.", +) + +# Export all tools as a list +TOOLS = [weather_tool] diff --git a/examples/tutorials/10_async/00_base/100_langgraph/pyproject.toml b/examples/tutorials/10_async/00_base/100_langgraph/pyproject.toml new file mode 100644 index 000000000..fecbc6149 --- /dev/null +++ b/examples/tutorials/10_async/00_base/100_langgraph/pyproject.toml @@ -0,0 +1,37 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "ab100-langgraph" +version = "0.1.0" +description = "An async LangGraph agent with tool calling and Redis streaming" +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + "agentex-sdk", + "scale-gp", + "langgraph", + "langchain-openai", +] + +[project.optional-dependencies] +dev = [ + "pytest", + "pytest-asyncio", + "httpx", + "black", + "isort", + "flake8", +] + +[tool.hatch.build.targets.wheel] +packages = ["project"] + +[tool.black] +line-length = 88 +target-version = ['py312'] + +[tool.isort] +profile = "black" +line_length = 88 diff --git a/examples/tutorials/10_async/00_base/100_langgraph/tests/test_agent.py b/examples/tutorials/10_async/00_base/100_langgraph/tests/test_agent.py new file mode 100644 index 000000000..948db1558 --- /dev/null +++ b/examples/tutorials/10_async/00_base/100_langgraph/tests/test_agent.py @@ -0,0 +1,123 @@ +""" +Tests for the async LangGraph agent. + +This test suite validates: +- Non-streaming event sending and polling +- Streaming event sending + +To run these tests: +1. Make sure the agent is running (via docker-compose or `agentex agents run`) +2. Set the AGENTEX_API_BASE_URL environment variable if not using default +3. Run: pytest test_agent.py -v + +Configuration: +- AGENTEX_API_BASE_URL: Base URL for the AgentEx server (default: http://localhost:5003) +- AGENT_NAME: Name of the agent to test (default: ab100-langgraph) +""" + +import os + +import pytest +import pytest_asyncio + +from agentex import AsyncAgentex +from agentex.types import TextContentParam +from agentex.types.agent_rpc_params import ParamsCreateTaskRequest +from agentex.lib.sdk.fastacp.base.base_acp_server import uuid + +# Configuration from environment variables +AGENTEX_API_BASE_URL = os.environ.get("AGENTEX_API_BASE_URL", "http://localhost:5003") +AGENT_NAME = os.environ.get("AGENT_NAME", "ab100-langgraph") + + +@pytest_asyncio.fixture +async def client(): + """Create an AsyncAgentex client instance for testing.""" + client = AsyncAgentex(base_url=AGENTEX_API_BASE_URL) + yield client + await client.close() + + +@pytest.fixture +def agent_name(): + """Return the agent name for testing.""" + return AGENT_NAME + + +@pytest_asyncio.fixture +async def agent_id(client, agent_name): + """Retrieve the agent ID based on the agent name.""" + agents = await client.agents.list() + for agent in agents: + if agent.name == agent_name: + return agent.id + raise ValueError(f"Agent with name {agent_name} not found.") + + +class TestNonStreamingEvents: + """Test non-streaming event sending and polling.""" + + @pytest.mark.asyncio + async def test_send_event(self, client: AsyncAgentex, agent_id: str): + """Test sending an event to the async LangGraph agent.""" + task_response = await client.agents.create_task( + agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex) + ) + task = task_response.result + assert task is not None + + event_content = TextContentParam( + type="text", + author="user", + content="Hello! What can you help me with?", + ) + await client.agents.send_event( + agent_id=agent_id, + params={"task_id": task.id, "content": event_content}, + ) + + @pytest.mark.asyncio + async def test_tool_calling(self, client: AsyncAgentex, agent_id: str): + """Test that the agent can use tools (e.g., weather tool).""" + task_response = await client.agents.create_task( + agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex) + ) + task = task_response.result + assert task is not None + + event_content = TextContentParam( + type="text", + author="user", + content="What's the weather in San Francisco?", + ) + await client.agents.send_event( + agent_id=agent_id, + params={"task_id": task.id, "content": event_content}, + ) + + +class TestStreamingEvents: + """Test streaming event sending.""" + + @pytest.mark.asyncio + async def test_send_event_and_stream(self, client: AsyncAgentex, agent_id: str): + """Test sending an event and streaming the response.""" + task_response = await client.agents.create_task( + agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex) + ) + task = task_response.result + assert task is not None + + event_content = TextContentParam( + type="text", + author="user", + content="Tell me a short joke.", + ) + await client.agents.send_event( + agent_id=agent_id, + params={"task_id": task.id, "content": event_content}, + ) + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/src/agentex/lib/cli/commands/init.py b/src/agentex/lib/cli/commands/init.py index 1654f48c5..63f610f69 100644 --- a/src/agentex/lib/cli/commands/init.py +++ b/src/agentex/lib/cli/commands/init.py @@ -25,8 +25,10 @@ class TemplateType(str, Enum): TEMPORAL = "temporal" TEMPORAL_OPENAI_AGENTS = "temporal-openai-agents" DEFAULT = "default" + DEFAULT_LANGGRAPH = "default-langgraph" SYNC = "sync" SYNC_OPENAI_AGENTS = "sync-openai-agents" + SYNC_LANGGRAPH = "sync-langgraph" def render_template( @@ -58,8 +60,10 @@ def create_project_structure( TemplateType.TEMPORAL: ["acp.py", "workflow.py", "run_worker.py"], TemplateType.TEMPORAL_OPENAI_AGENTS: ["acp.py", "workflow.py", "run_worker.py", "activities.py"], TemplateType.DEFAULT: ["acp.py"], + TemplateType.DEFAULT_LANGGRAPH: ["acp.py", "graph.py", "tools.py"], TemplateType.SYNC: ["acp.py"], TemplateType.SYNC_OPENAI_AGENTS: ["acp.py"], + TemplateType.SYNC_LANGGRAPH: ["acp.py", "graph.py", "tools.py"], }[template_type] # Create project/code files @@ -155,7 +159,7 @@ def validate_agent_name(text: str) -> bool | str: template_type = questionary.select( "What type of template would you like to create?", choices=[ - {"name": "Async - ACP Only", "value": TemplateType.DEFAULT}, + {"name": "Async - ACP Only", "value": "async_submenu"}, {"name": "Async - Temporal", "value": "temporal_submenu"}, {"name": "Sync ACP", "value": "sync_submenu"}, ], @@ -163,8 +167,18 @@ def validate_agent_name(text: str) -> bool | str: if not template_type: return - # If Temporal was selected, show sub-menu for Temporal variants - if template_type == "temporal_submenu": + # If a submenu was selected, show sub-menu for variants + if template_type == "async_submenu": + template_type = questionary.select( + "Which Async template would you like to use?", + choices=[ + {"name": "Basic Async ACP", "value": TemplateType.DEFAULT}, + {"name": "Async ACP + LangGraph", "value": TemplateType.DEFAULT_LANGGRAPH}, + ], + ).ask() + if not template_type: + return + elif template_type == "temporal_submenu": template_type = questionary.select( "Which Temporal template would you like to use?", choices=[ @@ -180,6 +194,7 @@ def validate_agent_name(text: str) -> bool | str: choices=[ {"name": "Basic Sync ACP", "value": TemplateType.SYNC}, {"name": "Sync ACP + OpenAI Agents SDK (Recommended)", "value": TemplateType.SYNC_OPENAI_AGENTS}, + {"name": "Sync ACP + LangGraph", "value": TemplateType.SYNC_LANGGRAPH}, ], ).ask() if not template_type: diff --git a/src/agentex/lib/cli/templates/default-langgraph/.dockerignore.j2 b/src/agentex/lib/cli/templates/default-langgraph/.dockerignore.j2 new file mode 100644 index 000000000..c2d7fca4d --- /dev/null +++ b/src/agentex/lib/cli/templates/default-langgraph/.dockerignore.j2 @@ -0,0 +1,43 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Environments +.env** +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# Git +.git +.gitignore + +# Misc +.DS_Store diff --git a/src/agentex/lib/cli/templates/default-langgraph/Dockerfile-uv.j2 b/src/agentex/lib/cli/templates/default-langgraph/Dockerfile-uv.j2 new file mode 100644 index 000000000..2ac5be7d2 --- /dev/null +++ b/src/agentex/lib/cli/templates/default-langgraph/Dockerfile-uv.j2 @@ -0,0 +1,42 @@ +# syntax=docker/dockerfile:1.3 +FROM python:3.12-slim +COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/ + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + htop \ + vim \ + curl \ + tar \ + python3-dev \ + postgresql-client \ + build-essential \ + libpq-dev \ + gcc \ + cmake \ + netcat-openbsd \ + nodejs \ + npm \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/** + +RUN uv pip install --system --upgrade pip setuptools wheel + +ENV UV_HTTP_TIMEOUT=1000 + +# Copy just the pyproject.toml file to optimize caching +COPY {{ project_path_from_build_root }}/pyproject.toml /app/{{ project_path_from_build_root }}/pyproject.toml + +WORKDIR /app/{{ project_path_from_build_root }} + +# Install the required Python packages using uv +RUN uv pip install --system . + +# Copy the project code +COPY {{ project_path_from_build_root }}/project /app/{{ project_path_from_build_root }}/project + +# Set environment variables +ENV PYTHONPATH=/app + +# Run the agent using uvicorn +CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"] \ No newline at end of file diff --git a/src/agentex/lib/cli/templates/default-langgraph/Dockerfile.j2 b/src/agentex/lib/cli/templates/default-langgraph/Dockerfile.j2 new file mode 100644 index 000000000..0395caf74 --- /dev/null +++ b/src/agentex/lib/cli/templates/default-langgraph/Dockerfile.j2 @@ -0,0 +1,42 @@ +# syntax=docker/dockerfile:1.3 +FROM python:3.12-slim +COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/ + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + htop \ + vim \ + curl \ + tar \ + python3-dev \ + postgresql-client \ + build-essential \ + libpq-dev \ + gcc \ + cmake \ + netcat-openbsd \ + node \ + npm \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +RUN uv pip install --system --upgrade pip setuptools wheel + +ENV UV_HTTP_TIMEOUT=1000 + +# Copy just the requirements file to optimize caching +COPY {{ project_path_from_build_root }}/requirements.txt /app/{{ project_path_from_build_root }}/requirements.txt + +WORKDIR /app/{{ project_path_from_build_root }} + +# Install the required Python packages +RUN uv pip install --system -r requirements.txt + +# Copy the project code +COPY {{ project_path_from_build_root }}/project /app/{{ project_path_from_build_root }}/project + +# Set environment variables +ENV PYTHONPATH=/app + +# Run the agent using uvicorn +CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"] \ No newline at end of file diff --git a/src/agentex/lib/cli/templates/default-langgraph/README.md.j2 b/src/agentex/lib/cli/templates/default-langgraph/README.md.j2 new file mode 100644 index 000000000..30f7e1706 --- /dev/null +++ b/src/agentex/lib/cli/templates/default-langgraph/README.md.j2 @@ -0,0 +1,85 @@ +# {{ agent_name }} - AgentEx Async LangGraph Agent + +This template builds an **asynchronous** LangGraph agent on AgentEx with: +- Task-based event handling via Redis +- Tool calling (ReAct pattern) +- Multi-turn conversation memory via AgentEx checkpointer +- Tracing integration + +## Graph Structure + +``` +START --> agent --> [has tool calls?] --> tools --> agent + --> [no tool calls?] --> END +``` + +## Sync vs Async + +| Aspect | Sync | Async (This Template) | +|--------|------|-----------------------| +| **ACP Type** | `sync` | `async` | +| **Handler** | `@acp.on_message_send` | `@acp.on_task_event_send` | +| **Response** | HTTP streaming (yields) | Redis streaming | +| **Message Echo** | Implicit | Explicit (`adk.messages.create`) | +| **Streaming Helper** | `convert_langgraph_to_agentex_events()` | `stream_langgraph_events()` | + +### When to use Async? +- Long-running tasks that may exceed HTTP timeout +- Agents that need to push updates asynchronously +- Multi-step workflows where the client polls for results +- Production agents that need reliable message delivery via Redis + +## Running the Agent + +```bash +agentex agents run --manifest manifest.yaml +``` + +## Project Structure + +``` +{{ project_name }}/ +├── project/ +│ ├── __init__.py +│ ├── acp.py # ACP server with async event handlers +│ ├── graph.py # LangGraph state graph definition +│ └── tools.py # Tool definitions +├── Dockerfile +├── manifest.yaml +├── dev.ipynb +{% if use_uv %} +└── pyproject.toml +{% else %} +└── requirements.txt +{% endif %} +``` + +## Development + +### 1. Add Your Own Tools +Edit `project/tools.py` to define custom tools: + +```python +from langchain_core.tools import Tool + +def my_tool(query: str) -> str: + """Your tool description.""" + return "result" + +my_tool = Tool(name="my_tool", func=my_tool, description="...") +TOOLS = [my_tool] +``` + +### 2. Customize the Graph +Edit `project/graph.py` to modify the model, system prompt, or graph structure. + +### 3. Configure Credentials +Set your OpenAI API key: +1. In `manifest.yaml` under `env.OPENAI_API_KEY` +2. Or export: `export OPENAI_API_KEY=...` +3. Or create a `.env` file in the project directory + +### 4. Run Locally +```bash +export ENVIRONMENT=development && agentex agents run --manifest manifest.yaml +``` diff --git a/src/agentex/lib/cli/templates/default-langgraph/dev.ipynb.j2 b/src/agentex/lib/cli/templates/default-langgraph/dev.ipynb.j2 new file mode 100644 index 000000000..d3a68303f --- /dev/null +++ b/src/agentex/lib/cli/templates/default-langgraph/dev.ipynb.j2 @@ -0,0 +1,126 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "36834357", + "metadata": {}, + "outputs": [], + "source": [ + "from agentex import Agentex\n", + "\n", + "client = Agentex(base_url=\"http://localhost:5003\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d1c309d6", + "metadata": {}, + "outputs": [], + "source": [ + "AGENT_NAME = \"{{ agent_name }}\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9f6e6ef0", + "metadata": {}, + "outputs": [], + "source": [ + "# (REQUIRED) Create a new task. For Async agents, you must create a task for messages to be associated with.\n", + "import uuid\n", + "\n", + "rpc_response = client.agents.create_task(\n", + " agent_name=AGENT_NAME,\n", + " params={\n", + " \"name\": f\"{str(uuid.uuid4())[:8]}-task\",\n", + " \"params\": {}\n", + " }\n", + ")\n", + "\n", + "task = rpc_response.result\n", + "print(task)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b03b0d37", + "metadata": {}, + "outputs": [], + "source": [ + "# Send an event to the agent\n", + "\n", + "# The response is expected to be a list of TaskMessage objects, which is a union of the following types:\n", + "# - TextContent: A message with just text content \n", + "# - DataContent: A message with JSON-serializable data content\n", + "# - ToolRequestContent: A message with a tool request, which contains a JSON-serializable request to call a tool\n", + "# - ToolResponseContent: A message with a tool response, which contains response object from a tool call in its content\n", + "\n", + "# When processing the message/send response, if you are expecting more than TextContent, such as DataContent, ToolRequestContent, or ToolResponseContent, you can process them as well\n", + "\n", + "rpc_response = client.agents.send_event(\n", + " agent_name=AGENT_NAME,\n", + " params={\n", + " \"content\": {\"type\": \"text\", \"author\": \"user\", \"content\": \"Hello what can you do?\"},\n", + " \"task_id\": task.id,\n", + " }\n", + ")\n", + "\n", + "event = rpc_response.result\n", + "print(event)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a6927cc0", + "metadata": {}, + "outputs": [], + "source": [ + "# Subscribe to the async task messages produced by the agent\n", + "from agentex.lib.utils.dev_tools import subscribe_to_async_task_messages\n", + "\n", + "task_messages = subscribe_to_async_task_messages(\n", + " client=client,\n", + " task=task, \n", + " only_after_timestamp=event.created_at, \n", + " print_messages=True,\n", + " rich_print=True,\n", + " timeout=5,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4864e354", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.9" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/src/agentex/lib/cli/templates/default-langgraph/environments.yaml.j2 b/src/agentex/lib/cli/templates/default-langgraph/environments.yaml.j2 new file mode 100644 index 000000000..f802776f0 --- /dev/null +++ b/src/agentex/lib/cli/templates/default-langgraph/environments.yaml.j2 @@ -0,0 +1,57 @@ +# Agent Environment Configuration +# ------------------------------ +# This file defines environment-specific settings for your agent. +# This DIFFERS from the manifest.yaml file in that it is used to program things that are ONLY per environment. + +# ********** EXAMPLE ********** +# schema_version: "v1" # This is used to validate the file structure and is not used by the agentex CLI +# environments: +# dev: +# auth: +# principal: +# user_id: "1234567890" +# user_name: "John Doe" +# user_email: "john.doe@example.com" +# user_role: "admin" +# user_permissions: "read, write, delete" +# helm_overrides: # This is used to override the global helm values.yaml file in the agentex-agent helm charts +# replicas: 3 +# resources: +# requests: +# cpu: "1000m" +# memory: "2Gi" +# limits: +# cpu: "2000m" +# memory: "4Gi" +# env: +# - name: LOG_LEVEL +# value: "DEBUG" +# - name: ENVIRONMENT +# value: "staging" +# +# kubernetes: +# # OPTIONAL - Otherwise it will be derived from separately. However, this can be used to override the derived +# # namespace and deploy it with in the same namespace that already exists for a separate agent. +# namespace: "team-{{agent_name}}" +# ********** END EXAMPLE ********** + +schema_version: "v1" # This is used to validate the file structure and is not used by the agentex CLI +environments: + dev: + auth: + principal: + user_id: # TODO: Fill in + account_id: # TODO: Fill in + helm_overrides: + replicaCount: 2 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" + temporal: + enabled: false + + diff --git a/src/agentex/lib/cli/templates/default-langgraph/manifest.yaml.j2 b/src/agentex/lib/cli/templates/default-langgraph/manifest.yaml.j2 new file mode 100644 index 000000000..a614bdcd1 --- /dev/null +++ b/src/agentex/lib/cli/templates/default-langgraph/manifest.yaml.j2 @@ -0,0 +1,120 @@ +# Agent Manifest Configuration +# --------------------------- +# This file defines how your agent should be built and deployed. + +# Build Configuration +# ------------------ +# The build config defines what gets packaged into your agent's Docker image. +# This same configuration is used whether building locally or remotely. +# +# When building: +# 1. All files from include_paths are collected into a build context +# 2. The context is filtered by dockerignore rules +# 3. The Dockerfile uses this context to build your agent's image +# 4. The image is pushed to a registry and used to run your agent +build: + context: + # Root directory for the build context + root: ../ # Keep this as the default root + + # Paths to include in the Docker build context + # Must include: + # - Your agent's directory (your custom agent code) + # These paths are collected and sent to the Docker daemon for building + include_paths: + - {{ project_path_from_build_root }} + + # Path to your agent's Dockerfile + # This defines how your agent's image is built from the context + # Relative to the root directory + dockerfile: {{ project_path_from_build_root }}/Dockerfile + + # Path to your agent's .dockerignore + # Filters unnecessary files from the build context + # Helps keep build context small and builds fast + dockerignore: {{ project_path_from_build_root }}/.dockerignore + + +# Local Development Configuration +# ----------------------------- +# Only used when running the agent locally +local_development: + agent: + port: 8000 # Port where your local ACP server is running + host_address: host.docker.internal # Host address for Docker networking (host.docker.internal for Docker, localhost for direct) + + # File paths for local development (relative to this manifest.yaml) + paths: + # Path to ACP server file + # Examples: + # project/acp.py (standard) + # src/server.py (custom structure) + # ../shared/acp.py (shared across projects) + # /absolute/path/acp.py (absolute path) + acp: project/acp.py + + +# Agent Configuration +# ----------------- +agent: + acp_type: async + + # Unique name for your agent + # Used for task routing and monitoring + name: {{ agent_name }} + + # Description of what your agent does + # Helps with documentation and discovery + description: {{ description }} + + # Temporal workflow configuration + # Set enabled: true to use Temporal workflows for long-running tasks + temporal: + enabled: false + + # Optional: Credentials mapping + # Maps Kubernetes secrets to environment variables + # Common credentials include: + credentials: + - env_var_name: OPENAI_API_KEY + secret_name: openai-api-key + secret_key: api-key + - env_var_name: SGP_API_KEY + secret_name: sgp-api-key + secret_key: api-key + - env_var_name: REDIS_URL + secret_name: redis-url-secret + secret_key: url + + # Optional: Set Environment variables for running your agent locally as well + # as for deployment later on + env: + OPENAI_API_KEY: "" # Set your OpenAI API key + # OPENAI_BASE_URL: "" + +# Deployment Configuration +# ----------------------- +# Configuration for deploying your agent to Kubernetes clusters +deployment: + # Container image configuration + image: + repository: "" # Update with your container registry + tag: "latest" # Default tag, should be versioned in production + + imagePullSecrets: [] # Update with your image pull secret names + # - name: my-registry-secret + + # Global deployment settings that apply to all clusters + # These can be overridden in cluster-specific environments (environments.yaml) + global: + # Default replica count + replicaCount: 1 + + # Default resource requirements + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" \ No newline at end of file diff --git a/src/agentex/lib/cli/templates/default-langgraph/project/acp.py.j2 b/src/agentex/lib/cli/templates/default-langgraph/project/acp.py.j2 new file mode 100644 index 000000000..00122a726 --- /dev/null +++ b/src/agentex/lib/cli/templates/default-langgraph/project/acp.py.j2 @@ -0,0 +1,92 @@ +""" +ACP handler for async LangGraph agent. + +Uses the async ACP model with Redis streaming instead of HTTP yields. +""" + +from dotenv import load_dotenv + +load_dotenv() +import os + +import agentex.lib.adk as adk +from agentex.lib.adk import create_langgraph_tracing_handler, stream_langgraph_events +from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config +from agentex.lib.sdk.fastacp.fastacp import FastACP +from agentex.lib.types.acp import SendEventParams, CancelTaskParams, CreateTaskParams +from agentex.lib.types.fastacp import AsyncACPConfig +from agentex.lib.types.tracing import SGPTracingProcessorConfig +from agentex.lib.utils.logging import make_logger + +from project.graph import create_graph + +logger = make_logger(__name__) + +add_tracing_processor_config( + SGPTracingProcessorConfig( + sgp_api_key=os.environ.get("SGP_API_KEY", ""), + sgp_account_id=os.environ.get("SGP_ACCOUNT_ID", ""), + sgp_base_url=os.environ.get("SGP_CLIENT_BASE_URL", ""), + )) + +acp = FastACP.create( + acp_type="async", + config=AsyncACPConfig(type="base"), +) + +_graph = None + + +async def get_graph(): + global _graph + if _graph is None: + _graph = await create_graph() + return _graph + + +@acp.on_task_event_send +async def handle_task_event_send(params: SendEventParams): + """Handle incoming events, streaming tokens and tool calls via Redis.""" + graph = await get_graph() + task_id = params.task.id + user_message = params.event.content.content + + logger.info(f"Processing message for thread {task_id}") + + # Echo the user's message + await adk.messages.create(task_id=task_id, content=params.event.content) + + async with adk.tracing.span( + trace_id=task_id, + name="message", + input={"message": user_message}, + data={"__span_type__": "AGENT_WORKFLOW"}, + ) as turn_span: + callback = create_langgraph_tracing_handler( + trace_id=task_id, + parent_span_id=turn_span.id if turn_span else None, + ) + + stream = graph.astream( + {"messages": [{"role": "user", "content": user_message}]}, + config={ + "configurable": {"thread_id": task_id}, + "callbacks": [callback], + }, + stream_mode=["messages", "updates"], + ) + + final_output = await stream_langgraph_events(stream, task_id) + + if turn_span: + turn_span.output = {"final_output": final_output} + + +@acp.on_task_create +async def handle_task_create(params: CreateTaskParams): + logger.info(f"Task created: {params.task.id}") + + +@acp.on_task_cancel +async def handle_task_canceled(params: CancelTaskParams): + logger.info(f"Task canceled: {params.task.id}") diff --git a/src/agentex/lib/cli/templates/default-langgraph/project/graph.py.j2 b/src/agentex/lib/cli/templates/default-langgraph/project/graph.py.j2 new file mode 100644 index 000000000..b7fd2d6bd --- /dev/null +++ b/src/agentex/lib/cli/templates/default-langgraph/project/graph.py.j2 @@ -0,0 +1,63 @@ +""" +LangGraph graph definition. + +Defines the state, nodes, edges, and compiles the graph. +""" + +from datetime import datetime +from typing import Annotated, Any + +from agentex.lib.adk import create_checkpointer +from langchain_core.messages import SystemMessage +from langchain_openai import ChatOpenAI +from langgraph.graph import START, StateGraph +from langgraph.graph.message import add_messages +from langgraph.prebuilt import ToolNode, tools_condition +from typing_extensions import TypedDict + +from project.tools import TOOLS + +MODEL_NAME = "gpt-4o" +SYSTEM_PROMPT = """You are a helpful AI assistant with access to tools. + +Current date and time: {timestamp} + +Guidelines: +- Be concise and helpful +- Use tools when they would help answer the user's question +- If you're unsure, ask clarifying questions +- Always provide accurate information +""" + + +class AgentState(TypedDict): + """State schema for the agent graph.""" + messages: Annotated[list[Any], add_messages] + + +async def create_graph(): + """Create and compile the agent graph with checkpointer.""" + llm = ChatOpenAI(model=MODEL_NAME) + llm_with_tools = llm.bind_tools(TOOLS) + + checkpointer = await create_checkpointer() + + def agent_node(state: AgentState) -> dict[str, Any]: + """Process the current state and generate a response.""" + messages = state["messages"] + if not messages or not isinstance(messages[0], SystemMessage): + system_content = SYSTEM_PROMPT.format( + timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S") + ) + messages = [SystemMessage(content=system_content)] + messages + response = llm_with_tools.invoke(messages) + return {"messages": [response]} + + builder = StateGraph(AgentState) + builder.add_node("agent", agent_node) + builder.add_node("tools", ToolNode(tools=TOOLS)) + builder.add_edge(START, "agent") + builder.add_conditional_edges("agent", tools_condition, "tools") + builder.add_edge("tools", "agent") + + return builder.compile(checkpointer=checkpointer) diff --git a/src/agentex/lib/cli/templates/default-langgraph/project/tools.py.j2 b/src/agentex/lib/cli/templates/default-langgraph/project/tools.py.j2 new file mode 100644 index 000000000..1b402a906 --- /dev/null +++ b/src/agentex/lib/cli/templates/default-langgraph/project/tools.py.j2 @@ -0,0 +1,32 @@ +""" +Tool definitions for the LangGraph agent. + +Add your custom tools here. Each tool should be a function decorated with @tool +or created using the Tool class. +""" + +from langchain_core.tools import Tool + + +def get_weather(city: str) -> str: + """Get the current weather for a city. + + Args: + city: The name of the city to get weather for. + + Returns: + A string describing the weather conditions. + """ + # TODO: Replace with actual weather API call + return f"The weather in {city} is sunny and 72°F" + + +# Define tools +weather_tool = Tool( + name="get_weather", + func=get_weather, + description="Get the current weather for a city. Input should be a city name.", +) + +# Export all tools as a list +TOOLS = [weather_tool] diff --git a/src/agentex/lib/cli/templates/default-langgraph/pyproject.toml.j2 b/src/agentex/lib/cli/templates/default-langgraph/pyproject.toml.j2 new file mode 100644 index 000000000..3c752f025 --- /dev/null +++ b/src/agentex/lib/cli/templates/default-langgraph/pyproject.toml.j2 @@ -0,0 +1,35 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "{{ project_name }}" +version = "0.1.0" +description = "{{ description }}" +requires-python = ">=3.12" +dependencies = [ + "agentex-sdk", + "scale-gp", + "langgraph", + "langchain-openai", + "python-dotenv", +] + +[project.optional-dependencies] +dev = [ + "pytest", + "black", + "isort", + "flake8", +] + +[tool.hatch.build.targets.wheel] +packages = ["project"] + +[tool.black] +line-length = 88 +target-version = ['py312'] + +[tool.isort] +profile = "black" +line_length = 88 diff --git a/src/agentex/lib/cli/templates/default-langgraph/requirements.txt.j2 b/src/agentex/lib/cli/templates/default-langgraph/requirements.txt.j2 new file mode 100644 index 000000000..4a148e901 --- /dev/null +++ b/src/agentex/lib/cli/templates/default-langgraph/requirements.txt.j2 @@ -0,0 +1,10 @@ +# Install agentex-sdk from local path +agentex-sdk + +# Scale GenAI Platform Python SDK +scale-gp + +# LangGraph and LangChain +langgraph +langchain-openai +python-dotenv diff --git a/src/agentex/lib/cli/templates/default-langgraph/test_agent.py.j2 b/src/agentex/lib/cli/templates/default-langgraph/test_agent.py.j2 new file mode 100644 index 000000000..ee71f177c --- /dev/null +++ b/src/agentex/lib/cli/templates/default-langgraph/test_agent.py.j2 @@ -0,0 +1,147 @@ +""" +Sample tests for AgentEx ACP agent. + +This test suite demonstrates how to test the main AgentEx API functions: +- Non-streaming event sending and polling +- Streaming event sending + +To run these tests: +1. Make sure the agent is running (via docker-compose or `agentex agents run`) +2. Set the AGENTEX_API_BASE_URL environment variable if not using default +3. Run: pytest test_agent.py -v + +Configuration: +- AGENTEX_API_BASE_URL: Base URL for the AgentEx server (default: http://localhost:5003) +- AGENT_NAME: Name of the agent to test (default: {{ agent_name }}) +""" + +import os +import uuid +import asyncio +import pytest +import pytest_asyncio +from agentex import AsyncAgentex +from agentex.types import TaskMessage +from agentex.types.agent_rpc_params import ParamsCreateTaskRequest +from agentex.types.text_content_param import TextContentParam +from test_utils.async_utils import ( + poll_for_agent_response, + send_event_and_poll_yielding, + stream_agent_response, + validate_text_in_response, + poll_messages, +) + + +# Configuration from environment variables +AGENTEX_API_BASE_URL = os.environ.get("AGENTEX_API_BASE_URL", "http://localhost:5003") +AGENT_NAME = os.environ.get("AGENT_NAME", "{{ agent_name }}") + + +@pytest_asyncio.fixture +async def client(): + """Create an AsyncAgentex client instance for testing.""" + client = AsyncAgentex(base_url=AGENTEX_API_BASE_URL) + yield client + await client.close() + + +@pytest.fixture +def agent_name(): + """Return the agent name for testing.""" + return AGENT_NAME + + +@pytest_asyncio.fixture +async def agent_id(client, agent_name): + """Retrieve the agent ID based on the agent name.""" + agents = await client.agents.list() + for agent in agents: + if agent.name == agent_name: + return agent.id + raise ValueError(f"Agent with name {agent_name} not found.") + + +class TestNonStreamingEvents: + """Test non-streaming event sending and polling.""" + + @pytest.mark.asyncio + async def test_send_event_and_poll(self, client: AsyncAgentex, _agent_name: str, agent_id: str): + """Test sending an event and polling for the response.""" + # TODO: Create a task for this conversation + # task_response = await client.agents.create_task(agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex)) + # task = task_response.result + # assert task is not None + + # TODO: Poll for the initial task creation message (if your agent sends one) + # async for message in poll_messages( + # client=client, + # task_id=task.id, + # timeout=30, + # sleep_interval=1.0, + # ): + # assert isinstance(message, TaskMessage) + # if message.content and message.content.type == "text" and message.content.author == "agent": + # # Check for your expected initial message + # assert "expected initial text" in message.content.content + # break + + # TODO: Send an event and poll for response using the yielding helper function + # user_message = "Your test message here" + # async for message in send_event_and_poll_yielding( + # client=client, + # agent_id=agent_id, + # task_id=task.id, + # user_message=user_message, + # timeout=30, + # sleep_interval=1.0, + # ): + # assert isinstance(message, TaskMessage) + # if message.content and message.content.type == "text" and message.content.author == "agent": + # # Check for your expected response + # assert "expected response text" in message.content.content + # break + pass + + +class TestStreamingEvents: + """Test streaming event sending.""" + + @pytest.mark.asyncio + async def test_send_event_and_stream(self, client: AsyncAgentex, _agent_name: str, agent_id: str): + """Test sending an event and streaming the response.""" + # TODO: Create a task for this conversation + # task_response = await client.agents.create_task(agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex)) + # task = task_response.result + # assert task is not None + + # user_message = "Your test message here" + + # # Collect events from stream + # all_events = [] + + # async def collect_stream_events(): + # async for event in stream_agent_response( + # client=client, + # task_id=task.id, + # timeout=30, + # ): + # all_events.append(event) + + # # Start streaming task + # stream_task = asyncio.create_task(collect_stream_events()) + + # # Send the event + # event_content = TextContentParam(type="text", author="user", content=user_message) + # await client.agents.send_event(agent_id=agent_id, params={"task_id": task.id, "content": event_content}) + + # # Wait for streaming to complete + # await stream_task + + # # TODO: Add your validation here + # assert len(all_events) > 0, "No events received in streaming response" + pass + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/src/agentex/lib/cli/templates/sync-langgraph/.dockerignore.j2 b/src/agentex/lib/cli/templates/sync-langgraph/.dockerignore.j2 new file mode 100644 index 000000000..c2d7fca4d --- /dev/null +++ b/src/agentex/lib/cli/templates/sync-langgraph/.dockerignore.j2 @@ -0,0 +1,43 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Environments +.env** +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# Git +.git +.gitignore + +# Misc +.DS_Store diff --git a/src/agentex/lib/cli/templates/sync-langgraph/Dockerfile-uv.j2 b/src/agentex/lib/cli/templates/sync-langgraph/Dockerfile-uv.j2 new file mode 100644 index 000000000..2ac5be7d2 --- /dev/null +++ b/src/agentex/lib/cli/templates/sync-langgraph/Dockerfile-uv.j2 @@ -0,0 +1,42 @@ +# syntax=docker/dockerfile:1.3 +FROM python:3.12-slim +COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/ + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + htop \ + vim \ + curl \ + tar \ + python3-dev \ + postgresql-client \ + build-essential \ + libpq-dev \ + gcc \ + cmake \ + netcat-openbsd \ + nodejs \ + npm \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/** + +RUN uv pip install --system --upgrade pip setuptools wheel + +ENV UV_HTTP_TIMEOUT=1000 + +# Copy just the pyproject.toml file to optimize caching +COPY {{ project_path_from_build_root }}/pyproject.toml /app/{{ project_path_from_build_root }}/pyproject.toml + +WORKDIR /app/{{ project_path_from_build_root }} + +# Install the required Python packages using uv +RUN uv pip install --system . + +# Copy the project code +COPY {{ project_path_from_build_root }}/project /app/{{ project_path_from_build_root }}/project + +# Set environment variables +ENV PYTHONPATH=/app + +# Run the agent using uvicorn +CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"] \ No newline at end of file diff --git a/src/agentex/lib/cli/templates/sync-langgraph/Dockerfile.j2 b/src/agentex/lib/cli/templates/sync-langgraph/Dockerfile.j2 new file mode 100644 index 000000000..4d9f41d45 --- /dev/null +++ b/src/agentex/lib/cli/templates/sync-langgraph/Dockerfile.j2 @@ -0,0 +1,43 @@ +# syntax=docker/dockerfile:1.3 +FROM python:3.12-slim +COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/ + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + htop \ + vim \ + curl \ + tar \ + python3-dev \ + postgresql-client \ + build-essential \ + libpq-dev \ + gcc \ + cmake \ + netcat-openbsd \ + node \ + npm \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +RUN uv pip install --system --upgrade pip setuptools wheel + +ENV UV_HTTP_TIMEOUT=1000 + +# Copy just the requirements file to optimize caching +COPY {{ project_path_from_build_root }}/requirements.txt /app/{{ project_path_from_build_root }}/requirements.txt + +WORKDIR /app/{{ project_path_from_build_root }} + +# Install the required Python packages +RUN uv pip install --system -r requirements.txt + +# Copy the project code +COPY {{ project_path_from_build_root }}/project /app/{{ project_path_from_build_root }}/project + + +# Set environment variables +ENV PYTHONPATH=/app + +# Run the agent using uvicorn +CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"] \ No newline at end of file diff --git a/src/agentex/lib/cli/templates/sync-langgraph/README.md.j2 b/src/agentex/lib/cli/templates/sync-langgraph/README.md.j2 new file mode 100644 index 000000000..ef57f5874 --- /dev/null +++ b/src/agentex/lib/cli/templates/sync-langgraph/README.md.j2 @@ -0,0 +1,83 @@ +# {{ agent_name }} - AgentEx Sync LangGraph Agent + +This template builds a **synchronous** LangGraph agent on AgentEx with: +- Tool calling (ReAct pattern) +- Streaming token output +- Multi-turn conversation memory via AgentEx checkpointer +- Tracing integration + +## Graph Structure + +``` +START --> agent --> [has tool calls?] --> tools --> agent + --> [no tool calls?] --> END +``` + +## Running the Agent + +```bash +agentex agents run --manifest manifest.yaml +``` + +## Project Structure + +``` +{{ project_name }}/ +├── project/ +│ ├── __init__.py +│ ├── acp.py # ACP server and message handler +│ ├── graph.py # LangGraph state graph definition +│ └── tools.py # Tool definitions +├── Dockerfile +├── manifest.yaml +├── dev.ipynb +{% if use_uv %} +└── pyproject.toml +{% else %} +└── requirements.txt +{% endif %} +``` + +## Key Concepts + +### Sync ACP with LangGraph +The sync ACP model uses HTTP request/response. The `@acp.on_message_send` handler receives a message and yields streaming events from the LangGraph graph back to the client. + +### LangGraph Integration +- **StateGraph**: Defines the agent's state machine with `AgentState` (message history) +- **ToolNode**: Automatically executes tool calls from the LLM +- **tools_condition**: Routes between tool execution and final response +- **Checkpointer**: Uses AgentEx's HTTP checkpointer for cross-request memory + +### Streaming +Tokens are streamed as they're generated using `convert_langgraph_to_agentex_events()`, which converts LangGraph's stream events into AgentEx `TaskMessageUpdate` events. + +## Development + +### 1. Add Your Own Tools +Edit `project/tools.py` to define custom tools: + +```python +from langchain_core.tools import Tool + +def my_tool(query: str) -> str: + """Your tool description.""" + return "result" + +my_tool = Tool(name="my_tool", func=my_tool, description="...") +TOOLS = [my_tool] +``` + +### 2. Customize the Graph +Edit `project/graph.py` to modify the model, system prompt, or graph structure. + +### 3. Configure Credentials +Set your OpenAI API key: +1. In `manifest.yaml` under `env.OPENAI_API_KEY` +2. Or export: `export OPENAI_API_KEY=...` +3. Or create a `.env` file in the project directory + +### 4. Run Locally +```bash +export ENVIRONMENT=development && agentex agents run --manifest manifest.yaml +``` diff --git a/src/agentex/lib/cli/templates/sync-langgraph/dev.ipynb.j2 b/src/agentex/lib/cli/templates/sync-langgraph/dev.ipynb.j2 new file mode 100644 index 000000000..d8c10a65a --- /dev/null +++ b/src/agentex/lib/cli/templates/sync-langgraph/dev.ipynb.j2 @@ -0,0 +1,167 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "36834357", + "metadata": {}, + "outputs": [], + "source": [ + "from agentex import Agentex\n", + "\n", + "client = Agentex(base_url=\"http://localhost:5003\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d1c309d6", + "metadata": {}, + "outputs": [], + "source": [ + "AGENT_NAME = \"{{ agent_name }}\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9f6e6ef0", + "metadata": {}, + "outputs": [], + "source": [ + "# # (Optional) Create a new task. If you don't create a new task, each message will be sent to a new task. The server will create the task for you.\n", + "\n", + "# import uuid\n", + "\n", + "# TASK_ID = str(uuid.uuid4())[:8]\n", + "\n", + "# rpc_response = client.agents.rpc_by_name(\n", + "# agent_name=AGENT_NAME,\n", + "# method=\"task/create\",\n", + "# params={\n", + "# \"name\": f\"{TASK_ID}-task\",\n", + "# \"params\": {}\n", + "# }\n", + "# )\n", + "\n", + "# task = rpc_response.result\n", + "# print(task)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b03b0d37", + "metadata": {}, + "outputs": [], + "source": [ + "# Test non streaming response\n", + "from agentex.types import TextContent\n", + "\n", + "# The response is expected to be a list of TaskMessage objects, which is a union of the following types:\n", + "# - TextContent: A message with just text content \n", + "# - DataContent: A message with JSON-serializable data content\n", + "# - ToolRequestContent: A message with a tool request, which contains a JSON-serializable request to call a tool\n", + "# - ToolResponseContent: A message with a tool response, which contains response object from a tool call in its content\n", + "\n", + "# When processing the message/send response, if you are expecting more than TextContent, such as DataContent, ToolRequestContent, or ToolResponseContent, you can process them as well\n", + "\n", + "rpc_response = client.agents.send_message(\n", + " agent_name=AGENT_NAME,\n", + " params={\n", + " \"content\": {\"type\": \"text\", \"author\": \"user\", \"content\": \"Hello what can you do?\"},\n", + " \"stream\": False\n", + " }\n", + ")\n", + "\n", + "if not rpc_response or not rpc_response.result:\n", + " raise ValueError(\"No result in response\")\n", + "\n", + "# Extract and print just the text content from the response\n", + "for task_message in rpc_response.result:\n", + " content = task_message.content\n", + " if isinstance(content, TextContent):\n", + " text = content.content\n", + " print(text)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "79688331", + "metadata": {}, + "outputs": [], + "source": [ + "# Test streaming response\n", + "from agentex.types.task_message_update import StreamTaskMessageDelta, StreamTaskMessageFull\n", + "from agentex.types.text_delta import TextDelta\n", + "\n", + "\n", + "# The result object of message/send will be a TaskMessageUpdate which is a union of the following types:\n", + "# - StreamTaskMessageStart: \n", + "# - An indicator that a streaming message was started, doesn't contain any useful content\n", + "# - StreamTaskMessageDelta: \n", + "# - A delta of a streaming message, contains the text delta to aggregate\n", + "# - StreamTaskMessageDone: \n", + "# - An indicator that a streaming message was done, doesn't contain any useful content\n", + "# - StreamTaskMessageFull: \n", + "# - A non-streaming message, there is nothing to aggregate, since this contains the full message, not deltas\n", + "\n", + "# Whenn processing StreamTaskMessageDelta, if you are expecting more than TextDeltas, such as DataDelta, ToolRequestDelta, or ToolResponseDelta, you can process them as well\n", + "# Whenn processing StreamTaskMessageFull, if you are expecting more than TextContent, such as DataContent, ToolRequestContent, or ToolResponseContent, you can process them as well\n", + "\n", + "for agent_rpc_response_chunk in client.agents.send_message_stream(\n", + " agent_name=AGENT_NAME,\n", + " params={\n", + " \"content\": {\"type\": \"text\", \"author\": \"user\", \"content\": \"Hello what can you do?\"},\n", + " \"stream\": True\n", + " }\n", + "):\n", + " # We know that the result of the message/send when stream is set to True will be a TaskMessageUpdate\n", + " task_message_update = agent_rpc_response_chunk.result\n", + " # Print oly the text deltas as they arrive or any full messages\n", + " if isinstance(task_message_update, StreamTaskMessageDelta):\n", + " delta = task_message_update.delta\n", + " if isinstance(delta, TextDelta):\n", + " print(delta.text_delta, end=\"\", flush=True)\n", + " else:\n", + " print(f\"Found non-text {type(task_message)} object in streaming message.\")\n", + " elif isinstance(task_message_update, StreamTaskMessageFull):\n", + " content = task_message_update.content\n", + " if isinstance(content, TextContent):\n", + " print(content.content)\n", + " else:\n", + " print(f\"Found non-text {type(task_message)} object in full message.\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c5e7e042", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.9" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/src/agentex/lib/cli/templates/sync-langgraph/environments.yaml.j2 b/src/agentex/lib/cli/templates/sync-langgraph/environments.yaml.j2 new file mode 100644 index 000000000..73924abdd --- /dev/null +++ b/src/agentex/lib/cli/templates/sync-langgraph/environments.yaml.j2 @@ -0,0 +1,53 @@ +# Agent Environment Configuration +# ------------------------------ +# This file defines environment-specific settings for your agent. +# This DIFFERS from the manifest.yaml file in that it is used to program things that are ONLY per environment. + +# ********** EXAMPLE ********** +# schema_version: "v1" # This is used to validate the file structure and is not used by the agentex CLI +# environments: +# dev: +# auth: +# principal: +# user_id: "1234567890" +# user_name: "John Doe" +# user_email: "john.doe@example.com" +# user_role: "admin" +# user_permissions: "read, write, delete" +# helm_overrides: # This is used to override the global helm values.yaml file in the agentex-agent helm charts +# replicas: 3 +# resources: +# requests: +# cpu: "1000m" +# memory: "2Gi" +# limits: +# cpu: "2000m" +# memory: "4Gi" +# env: +# - name: LOG_LEVEL +# value: "DEBUG" +# - name: ENVIRONMENT +# value: "staging" +# kubernetes: +# # OPTIONAL - Otherwise it will be derived from separately. However, this can be used to override the derived +# # namespace and deploy it with in the same namespace that already exists for a separate agent. +# namespace: "team-{{agent_name}}" +# ********** END EXAMPLE ********** + +schema_version: "v1" # This is used to validate the file structure and is not used by the agentex CLI +environments: + dev: + auth: + principal: + user_id: # TODO: Fill in + account_id: # TODO: Fill in + helm_overrides: + replicaCount: 2 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" + diff --git a/src/agentex/lib/cli/templates/sync-langgraph/manifest.yaml.j2 b/src/agentex/lib/cli/templates/sync-langgraph/manifest.yaml.j2 new file mode 100644 index 000000000..ae62f3033 --- /dev/null +++ b/src/agentex/lib/cli/templates/sync-langgraph/manifest.yaml.j2 @@ -0,0 +1,117 @@ +# Agent Manifest Configuration +# --------------------------- +# This file defines how your agent should be built and deployed. + +# Build Configuration +# ------------------ +# The build config defines what gets packaged into your agent's Docker image. +# This same configuration is used whether building locally or remotely. +# +# When building: +# 1. All files from include_paths are collected into a build context +# 2. The context is filtered by dockerignore rules +# 3. The Dockerfile uses this context to build your agent's image +# 4. The image is pushed to a registry and used to run your agent +build: + context: + # Root directory for the build context + root: ../ # Keep this as the default root + + # Paths to include in the Docker build context + # Must include: + # - Your agent's directory (your custom agent code) + # These paths are collected and sent to the Docker daemon for building + include_paths: + - {{ project_path_from_build_root }} + + # Path to your agent's Dockerfile + # This defines how your agent's image is built from the context + # Relative to the root directory + dockerfile: {{ project_path_from_build_root }}/Dockerfile + + # Path to your agent's .dockerignore + # Filters unnecessary files from the build context + # Helps keep build context small and builds fast + dockerignore: {{ project_path_from_build_root }}/.dockerignore + + +# Local Development Configuration +# ----------------------------- +# Only used when running the agent locally +local_development: + agent: + port: 8000 # Port where your local ACP server is running + host_address: host.docker.internal # Host address for Docker networking (host.docker.internal for Docker, localhost for direct) + + # File paths for local development (relative to this manifest.yaml) + paths: + # Path to ACP server file + # Examples: + # project/acp.py (standard) + # src/server.py (custom structure) + # ../shared/acp.py (shared across projects) + # /absolute/path/acp.py (absolute path) + acp: project/acp.py + + +# Agent Configuration +# ----------------- +agent: + acp_type: sync + # Unique name for your agent + # Used for task routing and monitoring + name: {{ agent_name }} + + # Description of what your agent does + # Helps with documentation and discovery + description: {{ description }} + + # Temporal workflow configuration + # Set enabled: true to use Temporal workflows for long-running tasks + temporal: + enabled: false + + # Optional: Credentials mapping + # Maps Kubernetes secrets to environment variables + # Common credentials include: + credentials: + - env_var_name: OPENAI_API_KEY + secret_name: openai-api-key + secret_key: api-key + - env_var_name: SGP_API_KEY + secret_name: sgp-api-key + secret_key: api-key + + # Optional: Set Environment variables for running your agent locally as well + # as for deployment later on + env: + OPENAI_API_KEY: "" # Set your OpenAI API key + # OPENAI_BASE_URL: "" + + +# Deployment Configuration +# ----------------------- +# Configuration for deploying your agent to Kubernetes clusters +deployment: + # Container image configuration + image: + repository: "" # Update with your container registry + tag: "latest" # Default tag, should be versioned in production + + imagePullSecrets: [] # Update with your image pull secret names + # - name: my-registry-secret + + # Global deployment settings that apply to all clusters + # These can be overridden in cluster-specific environments (environments.yaml) + global: + # Default replica count + replicaCount: 1 + + # Default resource requirements + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" \ No newline at end of file diff --git a/src/agentex/lib/cli/templates/sync-langgraph/project/acp.py.j2 b/src/agentex/lib/cli/templates/sync-langgraph/project/acp.py.j2 new file mode 100644 index 000000000..6e9a60340 --- /dev/null +++ b/src/agentex/lib/cli/templates/sync-langgraph/project/acp.py.j2 @@ -0,0 +1,93 @@ +""" +ACP (Agent Communication Protocol) handler for Agentex. + +This is the API layer — it manages the graph lifecycle and streams +tokens and tool calls from the LangGraph graph to the Agentex frontend. +""" + +from typing import AsyncGenerator + +import agentex.lib.adk as adk +from agentex.lib.adk import create_langgraph_tracing_handler, convert_langgraph_to_agentex_events +from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config +from agentex.lib.sdk.fastacp.fastacp import FastACP +from agentex.lib.types.acp import SendMessageParams +from agentex.lib.types.tracing import SGPTracingProcessorConfig +from agentex.lib.utils.logging import make_logger +from agentex.types.task_message_content import TaskMessageContent +from agentex.types.task_message_delta import TextDelta +from agentex.types.task_message_update import TaskMessageUpdate +from dotenv import load_dotenv + +load_dotenv() +import os + +from project.graph import create_graph + +logger = make_logger(__name__) + +# Register the Agentex tracing processor so spans are shipped to the backend +add_tracing_processor_config( + SGPTracingProcessorConfig( + sgp_api_key=os.environ.get("SGP_API_KEY", ""), + sgp_account_id=os.environ.get("SGP_ACCOUNT_ID", ""), + sgp_base_url=os.environ.get("SGP_CLIENT_BASE_URL", ""), + )) + +# Create ACP server +acp = FastACP.create(acp_type="sync") + +# Compiled graph (lazy-initialized on first request) +_graph = None + + +async def get_graph(): + """Get or create the compiled graph instance.""" + global _graph + if _graph is None: + _graph = await create_graph() + return _graph + + +@acp.on_message_send +async def handle_message_send( + params: SendMessageParams, +) -> TaskMessageContent | list[TaskMessageContent] | AsyncGenerator[TaskMessageUpdate, None]: + """Handle incoming messages from Agentex, streaming tokens and tool calls.""" + graph = await get_graph() + + thread_id = params.task.id + user_message = params.content.content + + logger.info(f"Processing message for thread {thread_id}") + + async with adk.tracing.span( + trace_id=thread_id, + name="message", + input={"message": user_message}, + data={"__span_type__": "AGENT_WORKFLOW"}, + ) as turn_span: + callback = create_langgraph_tracing_handler( + trace_id=thread_id, + parent_span_id=turn_span.id if turn_span else None, + ) + + stream = graph.astream( + {"messages": [{"role": "user", "content": user_message}]}, + config={ + "configurable": {"thread_id": thread_id}, + "callbacks": [callback], + }, + stream_mode=["messages", "updates"], + ) + + final_text = "" + async for event in convert_langgraph_to_agentex_events(stream): + # Accumulate text deltas for span output + delta = getattr(event, "delta", None) + if isinstance(delta, TextDelta) and delta.text_delta: + final_text += delta.text_delta + yield event + + if turn_span: + turn_span.output = {"final_output": final_text} diff --git a/src/agentex/lib/cli/templates/sync-langgraph/project/graph.py.j2 b/src/agentex/lib/cli/templates/sync-langgraph/project/graph.py.j2 new file mode 100644 index 000000000..8b1f6297f --- /dev/null +++ b/src/agentex/lib/cli/templates/sync-langgraph/project/graph.py.j2 @@ -0,0 +1,68 @@ +""" +LangGraph graph definition. + +Defines the state, nodes, edges, and compiles the graph. +The compiled graph is the boundary between this module and the API layer. +""" + +from datetime import datetime +from typing import Annotated, Any + +from agentex.lib.adk import create_checkpointer +from langchain_core.messages import SystemMessage +from langchain_openai import ChatOpenAI +from langgraph.graph import START, StateGraph +from langgraph.graph.message import add_messages +from langgraph.prebuilt import ToolNode, tools_condition +from typing_extensions import TypedDict + +from project.tools import TOOLS + +MODEL_NAME = "gpt-4o" +SYSTEM_PROMPT = """You are a helpful AI assistant with access to tools. + +Current date and time: {timestamp} + +Guidelines: +- Be concise and helpful +- Use tools when they would help answer the user's question +- If you're unsure, ask clarifying questions +- Always provide accurate information +""" + + +class AgentState(TypedDict): + """State schema for the agent graph.""" + messages: Annotated[list[Any], add_messages] + + +async def create_graph(): + """Create and compile the agent graph with checkpointer. + + Returns: + A compiled LangGraph StateGraph ready for invocation. + """ + llm = ChatOpenAI(model=MODEL_NAME) + llm_with_tools = llm.bind_tools(TOOLS) + + checkpointer = await create_checkpointer() + + def agent_node(state: AgentState) -> dict[str, Any]: + """Process the current state and generate a response.""" + messages = state["messages"] + if not messages or not isinstance(messages[0], SystemMessage): + system_content = SYSTEM_PROMPT.format( + timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S") + ) + messages = [SystemMessage(content=system_content)] + messages + response = llm_with_tools.invoke(messages) + return {"messages": [response]} + + builder = StateGraph(AgentState) + builder.add_node("agent", agent_node) + builder.add_node("tools", ToolNode(tools=TOOLS)) + builder.add_edge(START, "agent") + builder.add_conditional_edges("agent", tools_condition, "tools") + builder.add_edge("tools", "agent") + + return builder.compile(checkpointer=checkpointer) diff --git a/src/agentex/lib/cli/templates/sync-langgraph/project/tools.py.j2 b/src/agentex/lib/cli/templates/sync-langgraph/project/tools.py.j2 new file mode 100644 index 000000000..1b402a906 --- /dev/null +++ b/src/agentex/lib/cli/templates/sync-langgraph/project/tools.py.j2 @@ -0,0 +1,32 @@ +""" +Tool definitions for the LangGraph agent. + +Add your custom tools here. Each tool should be a function decorated with @tool +or created using the Tool class. +""" + +from langchain_core.tools import Tool + + +def get_weather(city: str) -> str: + """Get the current weather for a city. + + Args: + city: The name of the city to get weather for. + + Returns: + A string describing the weather conditions. + """ + # TODO: Replace with actual weather API call + return f"The weather in {city} is sunny and 72°F" + + +# Define tools +weather_tool = Tool( + name="get_weather", + func=get_weather, + description="Get the current weather for a city. Input should be a city name.", +) + +# Export all tools as a list +TOOLS = [weather_tool] diff --git a/src/agentex/lib/cli/templates/sync-langgraph/pyproject.toml.j2 b/src/agentex/lib/cli/templates/sync-langgraph/pyproject.toml.j2 new file mode 100644 index 000000000..3c752f025 --- /dev/null +++ b/src/agentex/lib/cli/templates/sync-langgraph/pyproject.toml.j2 @@ -0,0 +1,35 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "{{ project_name }}" +version = "0.1.0" +description = "{{ description }}" +requires-python = ">=3.12" +dependencies = [ + "agentex-sdk", + "scale-gp", + "langgraph", + "langchain-openai", + "python-dotenv", +] + +[project.optional-dependencies] +dev = [ + "pytest", + "black", + "isort", + "flake8", +] + +[tool.hatch.build.targets.wheel] +packages = ["project"] + +[tool.black] +line-length = 88 +target-version = ['py312'] + +[tool.isort] +profile = "black" +line_length = 88 diff --git a/src/agentex/lib/cli/templates/sync-langgraph/requirements.txt.j2 b/src/agentex/lib/cli/templates/sync-langgraph/requirements.txt.j2 new file mode 100644 index 000000000..4a148e901 --- /dev/null +++ b/src/agentex/lib/cli/templates/sync-langgraph/requirements.txt.j2 @@ -0,0 +1,10 @@ +# Install agentex-sdk from local path +agentex-sdk + +# Scale GenAI Platform Python SDK +scale-gp + +# LangGraph and LangChain +langgraph +langchain-openai +python-dotenv diff --git a/src/agentex/lib/cli/templates/sync-langgraph/test_agent.py.j2 b/src/agentex/lib/cli/templates/sync-langgraph/test_agent.py.j2 new file mode 100644 index 000000000..7de4684f4 --- /dev/null +++ b/src/agentex/lib/cli/templates/sync-langgraph/test_agent.py.j2 @@ -0,0 +1,70 @@ +""" +Sample tests for AgentEx ACP agent. + +This test suite demonstrates how to test the main AgentEx API functions: +- Non-streaming message sending +- Streaming message sending +- Task creation via RPC + +To run these tests: +1. Make sure the agent is running (via docker-compose or `agentex agents run`) +2. Set the AGENTEX_API_BASE_URL environment variable if not using default +3. Run: pytest test_agent.py -v + +Configuration: +- AGENTEX_API_BASE_URL: Base URL for the AgentEx server (default: http://localhost:5003) +- AGENT_NAME: Name of the agent to test (default: {{ agent_name }}) +""" + +import os +import pytest +from agentex import Agentex + + +# Configuration from environment variables +AGENTEX_API_BASE_URL = os.environ.get("AGENTEX_API_BASE_URL", "http://localhost:5003") +AGENT_NAME = os.environ.get("AGENT_NAME", "{{ agent_name }}") + + +@pytest.fixture +def client(): + """Create an AgentEx client instance for testing.""" + return Agentex(base_url=AGENTEX_API_BASE_URL) + + +@pytest.fixture +def agent_name(): + """Return the agent name for testing.""" + return AGENT_NAME + + +@pytest.fixture +def agent_id(client, agent_name): + """Retrieve the agent ID based on the agent name.""" + agents = client.agents.list() + for agent in agents: + if agent.name == agent_name: + return agent.id + raise ValueError(f"Agent with name {agent_name} not found.") + + +class TestNonStreamingMessages: + """Test non-streaming message sending.""" + + def test_send_message(self, client: Agentex, _agent_name: str): + """Test sending a message and receiving a response.""" + # TODO: Fill in the test based on what data your agent is expected to handle + ... + + +class TestStreamingMessages: + """Test streaming message sending.""" + + def test_send_stream_message(self, client: Agentex, _agent_name: str): + """Test streaming a message and aggregating deltas.""" + # TODO: Fill in the test based on what data your agent is expected to handle + ... + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])