diff --git a/examples/tutorials/00_sync/060_harness_openai/.dockerignore b/examples/tutorials/00_sync/060_harness_openai/.dockerignore new file mode 100644 index 000000000..c49489471 --- /dev/null +++ b/examples/tutorials/00_sync/060_harness_openai/.dockerignore @@ -0,0 +1,43 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Environments +.env** +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# Git +.git +.gitignore + +# Misc +.DS_Store diff --git a/examples/tutorials/00_sync/060_harness_openai/Dockerfile b/examples/tutorials/00_sync/060_harness_openai/Dockerfile new file mode 100644 index 000000000..1bd4f4860 --- /dev/null +++ b/examples/tutorials/00_sync/060_harness_openai/Dockerfile @@ -0,0 +1,50 @@ +# syntax=docker/dockerfile:1.3 +FROM python:3.12-slim +COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/ + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + htop \ + vim \ + curl \ + tar \ + python3-dev \ + postgresql-client \ + build-essential \ + libpq-dev \ + gcc \ + cmake \ + netcat-openbsd \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +RUN uv pip install --system --upgrade pip setuptools wheel + +ENV UV_HTTP_TIMEOUT=1000 + +# Copy pyproject.toml and README.md to install dependencies +COPY 00_sync/060_harness_openai/pyproject.toml /app/060_harness_openai/pyproject.toml +COPY 00_sync/060_harness_openai/README.md /app/060_harness_openai/README.md + +WORKDIR /app/060_harness_openai + +# Copy the project code +COPY 00_sync/060_harness_openai/project /app/060_harness_openai/project + +# Copy the test files +COPY 00_sync/060_harness_openai/tests /app/060_harness_openai/tests + +# Copy shared test utilities +COPY test_utils /app/test_utils + +# Install the required Python packages with dev dependencies +RUN uv pip install --system .[dev] + +# Set environment variables +ENV PYTHONPATH=/app + +# Set test environment variables +ENV AGENT_NAME=s060-harness-openai + +# Run the agent using uvicorn +CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"] diff --git a/examples/tutorials/00_sync/060_harness_openai/README.md b/examples/tutorials/00_sync/060_harness_openai/README.md new file mode 100644 index 000000000..e22e9aa8b --- /dev/null +++ b/examples/tutorials/00_sync/060_harness_openai/README.md @@ -0,0 +1,35 @@ +# Sync OpenAI Agents on the unified harness surface + +A sync (HTTP) Agentex agent that runs the OpenAI Agents SDK and delivers its +output through the **unified harness surface**. + +## What this demonstrates + +The OpenAI Agents SDK produces native streaming events. This tutorial wraps a +`Runner.run_streamed` result in an `OpenAITurn` — the provider -> canonical +`StreamTaskMessage*` adapter — and forwards the canonical stream to the frontend +via `UnifiedEmitter.yield_turn`. The same `OpenAITurn` flows unchanged through +`auto_send_turn` in the async (`130_harness_openai`) and temporal +(`140_harness_openai`) variants; only the delivery method differs. + +```python +result = Runner.run_streamed(starting_agent=agent, input=user_message) +turn = OpenAITurn(result=result, model="gpt-4o") +emitter = UnifiedEmitter(task_id=task_id, trace_id=task_id, parent_span_id=parent_span_id) +async for event in emitter.yield_turn(turn): + yield event +``` + +## Run it + +```bash +agentex agents run --manifest manifest.yaml +``` + +## Test it + +The offline test exercises the harness wiring without a server or API key: + +```bash +pytest tests/test_agent.py -v +``` diff --git a/examples/tutorials/00_sync/060_harness_openai/manifest.yaml b/examples/tutorials/00_sync/060_harness_openai/manifest.yaml new file mode 100644 index 000000000..4967c1f8d --- /dev/null +++ b/examples/tutorials/00_sync/060_harness_openai/manifest.yaml @@ -0,0 +1,58 @@ +build: + context: + root: ../../ + include_paths: + - 00_sync/060_harness_openai + - test_utils + dockerfile: 00_sync/060_harness_openai/Dockerfile + dockerignore: 00_sync/060_harness_openai/.dockerignore + +local_development: + agent: + port: 8000 + host_address: host.docker.internal + paths: + acp: project/acp.py + +agent: + acp_type: sync + name: s060-harness-openai + description: A sync OpenAI Agents SDK agent on the unified harness surface + + temporal: + enabled: false + + credentials: + - env_var_name: OPENAI_API_KEY + secret_name: openai-api-key + secret_key: api-key + - env_var_name: REDIS_URL + secret_name: redis-url-secret + secret_key: url + - env_var_name: SGP_API_KEY + secret_name: sgp-api-key + secret_key: api-key + - env_var_name: SGP_ACCOUNT_ID + secret_name: sgp-account-id + secret_key: account-id + - env_var_name: SGP_CLIENT_BASE_URL + secret_name: sgp-client-base-url + secret_key: url + +deployment: + image: + repository: "" + tag: "latest" + + global: + agent: + name: "s060-harness-openai" + description: "A sync OpenAI Agents SDK agent on the unified harness surface" + replicaCount: 1 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" diff --git a/examples/tutorials/00_sync/060_harness_openai/project/__init__.py b/examples/tutorials/00_sync/060_harness_openai/project/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/examples/tutorials/00_sync/060_harness_openai/project/acp.py b/examples/tutorials/00_sync/060_harness_openai/project/acp.py new file mode 100644 index 000000000..caaa0b132 --- /dev/null +++ b/examples/tutorials/00_sync/060_harness_openai/project/acp.py @@ -0,0 +1,87 @@ +"""ACP handler for the sync OpenAI Agents harness tutorial. + +This is the API layer. It runs the OpenAI Agents SDK via ``Runner.run_streamed``, +wraps the streamed run in an ``OpenAITurn`` (the provider -> canonical +``StreamTaskMessage*`` adapter), and forwards the canonical stream to the +Agentex frontend via ``UnifiedEmitter.yield_turn`` — the same harness surface +used by the async and temporal variants of this tutorial. +""" + +from __future__ import annotations + +import os +from typing import AsyncGenerator + +from dotenv import load_dotenv + +load_dotenv() + +from agents import Runner + +from agentex.lib import adk +from project.agent import MODEL_NAME, create_agent +from agentex.lib.types.acp import SendMessageParams +from agentex.lib.types.tracing import SGPTracingProcessorConfig +from agentex.lib.utils.logging import make_logger +from agentex.lib.sdk.fastacp.fastacp import FastACP +from agentex.lib.core.harness.emitter import UnifiedEmitter +from agentex.types.task_message_update import TaskMessageUpdate +from agentex.types.task_message_content import TaskMessageContent +from agentex.lib.adk.providers._modules.openai_turn import OpenAITurn +from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config + +logger = make_logger(__name__) + +# LiteLLM proxy auth: copy LITELLM_API_KEY to OPENAI_API_KEY for OpenAI client +# compatibility, so the same example works behind the Scale LiteLLM gateway. +_litellm_key = os.environ.get("LITELLM_API_KEY") +if _litellm_key and not os.environ.get("OPENAI_API_KEY"): + os.environ["OPENAI_API_KEY"] = _litellm_key + +add_tracing_processor_config( + SGPTracingProcessorConfig( + sgp_api_key=os.environ.get("SGP_API_KEY", ""), + sgp_account_id=os.environ.get("SGP_ACCOUNT_ID", ""), + sgp_base_url=os.environ.get("SGP_CLIENT_BASE_URL", ""), + ) +) + +acp = FastACP.create(acp_type="sync") + +_agent = None + + +def get_agent(): + """Get or create the OpenAI Agents SDK agent instance.""" + global _agent + if _agent is None: + _agent = create_agent() + return _agent + + +@acp.on_message_send +async def handle_message_send( + params: SendMessageParams, +) -> TaskMessageContent | list[TaskMessageContent] | AsyncGenerator[TaskMessageUpdate, None]: + """Handle incoming messages, streaming tokens and tool calls via the harness.""" + agent = get_agent() + task_id = params.task.id + user_message = params.content.content + logger.info(f"Processing message for task {task_id}") + + async with adk.tracing.span( + trace_id=task_id, + task_id=task_id, + name="message", + input={"message": user_message}, + data={"__span_type__": "AGENT_WORKFLOW"}, + ) as turn_span: + result = Runner.run_streamed(starting_agent=agent, input=user_message) + turn = OpenAITurn(result=result, model=MODEL_NAME) + emitter = UnifiedEmitter( + task_id=task_id, + trace_id=task_id, + parent_span_id=turn_span.id if turn_span else None, + ) + async for event in emitter.yield_turn(turn): + yield event diff --git a/examples/tutorials/00_sync/060_harness_openai/project/agent.py b/examples/tutorials/00_sync/060_harness_openai/project/agent.py new file mode 100644 index 000000000..3611012fe --- /dev/null +++ b/examples/tutorials/00_sync/060_harness_openai/project/agent.py @@ -0,0 +1,47 @@ +"""OpenAI Agents SDK agent definition for the harness tutorial. + +The agent is the boundary between this module and the API layer (acp.py). +The OpenAI Agents SDK runs its own tool-call loop internally; acp.py wraps a +``Runner.run_streamed`` result with ``OpenAITurn`` so it flows through the +unified harness surface. +""" + +from __future__ import annotations + +from datetime import datetime + +from agents import Agent, function_tool, set_tracing_disabled + +from project.tools import get_weather + +# Disable the openai-agents SDK's native tracer so it doesn't ship traces to +# api.openai.com (the key may be a gateway/proxy key). Agentex tracing still +# runs via the harness + tracing manager configured in acp.py. +set_tracing_disabled(True) + +MODEL_NAME = "gpt-4o" +INSTRUCTIONS = """You are a helpful AI assistant with access to tools. + +Current date and time: {timestamp} + +Guidelines: +- Be concise and helpful +- Use the weather tool when the user asks about the weather +- Always report the real tool output back to the user +""" + + +@function_tool +def weather(city: str) -> str: + """Get the current weather for a city.""" + return get_weather(city) + + +def create_agent() -> Agent: + """Build and return the OpenAI Agents SDK agent with the weather tool.""" + return Agent( + name="Harness OpenAI Assistant", + model=MODEL_NAME, + instructions=INSTRUCTIONS.format(timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S")), + tools=[weather], + ) diff --git a/examples/tutorials/00_sync/060_harness_openai/project/tools.py b/examples/tutorials/00_sync/060_harness_openai/project/tools.py new file mode 100644 index 000000000..b03aa7c31 --- /dev/null +++ b/examples/tutorials/00_sync/060_harness_openai/project/tools.py @@ -0,0 +1,19 @@ +"""Tool definitions for the OpenAI Agents harness tutorial. + +The bare function lives here so it's easy to unit-test; it's wrapped as an +OpenAI Agents SDK ``function_tool`` in ``project.agent``. +""" + +from __future__ import annotations + + +def get_weather(city: str) -> str: + """Get the current weather for a city. + + Args: + city: The name of the city to get weather for. + + Returns: + A string describing the weather conditions. + """ + return f"The weather in {city} is sunny and 72°F" diff --git a/examples/tutorials/00_sync/060_harness_openai/pyproject.toml b/examples/tutorials/00_sync/060_harness_openai/pyproject.toml new file mode 100644 index 000000000..39cceb8f2 --- /dev/null +++ b/examples/tutorials/00_sync/060_harness_openai/pyproject.toml @@ -0,0 +1,36 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "s060-harness-openai" +version = "0.1.0" +description = "A sync OpenAI Agents SDK agent on the unified harness surface" +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + "agentex-sdk", + "scale-gp", + "openai-agents", +] + +[project.optional-dependencies] +dev = [ + "pytest", + "pytest-asyncio", + "httpx", + "black", + "isort", + "flake8", +] + +[tool.hatch.build.targets.wheel] +packages = ["project"] + +[tool.black] +line-length = 88 +target-version = ['py312'] + +[tool.isort] +profile = "black" +line_length = 88 diff --git a/examples/tutorials/00_sync/060_harness_openai/tests/test_agent.py b/examples/tutorials/00_sync/060_harness_openai/tests/test_agent.py new file mode 100644 index 000000000..960b232b7 --- /dev/null +++ b/examples/tutorials/00_sync/060_harness_openai/tests/test_agent.py @@ -0,0 +1,48 @@ +"""Offline test for the sync OpenAI Agents harness tutorial. + +This test does NOT require a running Agentex server or an OpenAI API key. It +verifies the harness wiring this tutorial demonstrates: an ``OpenAITurn`` built +from an injected canonical ``StreamTaskMessage*`` stream, forwarded through +``UnifiedEmitter.yield_turn`` (the sync HTTP ACP delivery path), passes the +events through unchanged. + +To run: ``pytest tests/test_agent.py -v`` +""" + +from __future__ import annotations + +import pytest + +from agentex.types.text_content import TextContent +from agentex.lib.core.harness.emitter import UnifiedEmitter +from agentex.types.task_message_delta import TextDelta +from agentex.types.task_message_update import ( + StreamTaskMessageDone, + StreamTaskMessageDelta, + StreamTaskMessageStart, +) +from agentex.lib.adk.providers._modules.openai_turn import OpenAITurn + + +async def _canonical_stream(events): + for e in events: + yield e + + +@pytest.mark.asyncio +async def test_yield_turn_forwards_canonical_stream(): + events = [ + StreamTaskMessageStart(type="start", index=0, content=TextContent(type="text", author="agent", content="")), + StreamTaskMessageDelta(type="delta", index=0, delta=TextDelta(type="text", text_delta="Hi")), + StreamTaskMessageDone(type="done", index=0), + ] + turn = OpenAITurn(stream=_canonical_stream(events), model="gpt-4o") + # trace_id=None disables tracing, so no Agentex server is needed. + emitter = UnifiedEmitter(task_id="task-1", trace_id=None, parent_span_id=None) + + out = [e async for e in emitter.yield_turn(turn)] + assert out == events + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/examples/tutorials/10_async/00_base/130_harness_openai/.dockerignore b/examples/tutorials/10_async/00_base/130_harness_openai/.dockerignore new file mode 100644 index 000000000..c49489471 --- /dev/null +++ b/examples/tutorials/10_async/00_base/130_harness_openai/.dockerignore @@ -0,0 +1,43 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Environments +.env** +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# Git +.git +.gitignore + +# Misc +.DS_Store diff --git a/examples/tutorials/10_async/00_base/130_harness_openai/Dockerfile b/examples/tutorials/10_async/00_base/130_harness_openai/Dockerfile new file mode 100644 index 000000000..a31c89a31 --- /dev/null +++ b/examples/tutorials/10_async/00_base/130_harness_openai/Dockerfile @@ -0,0 +1,50 @@ +# syntax=docker/dockerfile:1.3 +FROM python:3.12-slim +COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/ + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + htop \ + vim \ + curl \ + tar \ + python3-dev \ + postgresql-client \ + build-essential \ + libpq-dev \ + gcc \ + cmake \ + netcat-openbsd \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +RUN uv pip install --system --upgrade pip setuptools wheel + +ENV UV_HTTP_TIMEOUT=1000 + +# Copy pyproject.toml and README.md to install dependencies +COPY 10_async/00_base/130_harness_openai/pyproject.toml /app/130_harness_openai/pyproject.toml +COPY 10_async/00_base/130_harness_openai/README.md /app/130_harness_openai/README.md + +WORKDIR /app/130_harness_openai + +# Copy the project code +COPY 10_async/00_base/130_harness_openai/project /app/130_harness_openai/project + +# Copy the test files +COPY 10_async/00_base/130_harness_openai/tests /app/130_harness_openai/tests + +# Copy shared test utilities +COPY test_utils /app/test_utils + +# Install the required Python packages with dev dependencies +RUN uv pip install --system .[dev] pytest-asyncio httpx + +# Set environment variables +ENV PYTHONPATH=/app + +# Set test environment variables +ENV AGENT_NAME=ab130-harness-openai + +# Run the agent using uvicorn +CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"] diff --git a/examples/tutorials/10_async/00_base/130_harness_openai/README.md b/examples/tutorials/10_async/00_base/130_harness_openai/README.md new file mode 100644 index 000000000..ac439e4ed --- /dev/null +++ b/examples/tutorials/10_async/00_base/130_harness_openai/README.md @@ -0,0 +1,33 @@ +# Async OpenAI Agents on the unified harness surface + +An async (Redis-streaming) Agentex agent that runs the OpenAI Agents SDK and +delivers its output through the **unified harness surface**. + +## What this demonstrates + +Same `OpenAITurn` adapter as the sync tutorial (`060_harness_openai`), but the +async ACP pushes the turn to the task stream via +`UnifiedEmitter.auto_send_turn` instead of yielding over HTTP. `auto_send_turn` +returns a `TurnResult` with the accumulated final text and normalized usage. + +```python +result = Runner.run_streamed(starting_agent=agent, input=user_message) +turn = OpenAITurn(result=result, model="gpt-4o") +emitter = UnifiedEmitter(task_id=task_id, trace_id=task_id, parent_span_id=parent_span_id) +turn_result = await emitter.auto_send_turn(turn) +``` + +## Run it + +```bash +agentex agents run --manifest manifest.yaml +``` + +## Test it + +The offline test exercises the auto-send delivery path with an injected fake +streaming backend (no server, Redis, or API key required): + +```bash +pytest tests/test_agent.py -v +``` diff --git a/examples/tutorials/10_async/00_base/130_harness_openai/manifest.yaml b/examples/tutorials/10_async/00_base/130_harness_openai/manifest.yaml new file mode 100644 index 000000000..7e67675fa --- /dev/null +++ b/examples/tutorials/10_async/00_base/130_harness_openai/manifest.yaml @@ -0,0 +1,58 @@ +build: + context: + root: ../../../ + include_paths: + - 10_async/00_base/130_harness_openai + - test_utils + dockerfile: 10_async/00_base/130_harness_openai/Dockerfile + dockerignore: 10_async/00_base/130_harness_openai/.dockerignore + +local_development: + agent: + port: 8000 + host_address: host.docker.internal + paths: + acp: project/acp.py + +agent: + acp_type: async + name: ab130-harness-openai + description: An async OpenAI Agents SDK agent on the unified harness surface + + temporal: + enabled: false + + credentials: + - env_var_name: OPENAI_API_KEY + secret_name: openai-api-key + secret_key: api-key + - env_var_name: REDIS_URL + secret_name: redis-url-secret + secret_key: url + - env_var_name: SGP_API_KEY + secret_name: sgp-api-key + secret_key: api-key + - env_var_name: SGP_ACCOUNT_ID + secret_name: sgp-account-id + secret_key: account-id + - env_var_name: SGP_CLIENT_BASE_URL + secret_name: sgp-client-base-url + secret_key: url + +deployment: + image: + repository: "" + tag: "latest" + + global: + agent: + name: "ab130-harness-openai" + description: "An async OpenAI Agents SDK agent on the unified harness surface" + replicaCount: 1 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" diff --git a/examples/tutorials/10_async/00_base/130_harness_openai/project/__init__.py b/examples/tutorials/10_async/00_base/130_harness_openai/project/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/examples/tutorials/10_async/00_base/130_harness_openai/project/acp.py b/examples/tutorials/10_async/00_base/130_harness_openai/project/acp.py new file mode 100644 index 000000000..fcd10cc62 --- /dev/null +++ b/examples/tutorials/10_async/00_base/130_harness_openai/project/acp.py @@ -0,0 +1,98 @@ +"""ACP handler for the async OpenAI Agents harness tutorial. + +Uses the async ACP model with Redis streaming instead of HTTP yields. The +OpenAI Agents SDK run is wrapped in an ``OpenAITurn`` and pushed to the task +stream via ``UnifiedEmitter.auto_send_turn`` — the async/temporal delivery path +of the unified harness surface. ``auto_send_turn`` returns a ``TurnResult`` +carrying the accumulated final text and normalized usage. +""" + +from __future__ import annotations + +import os + +from dotenv import load_dotenv + +load_dotenv() + +from agents import Runner + +from agentex.lib import adk +from project.agent import MODEL_NAME, create_agent +from agentex.lib.types.acp import SendEventParams, CancelTaskParams, CreateTaskParams +from agentex.lib.types.fastacp import AsyncACPConfig +from agentex.lib.types.tracing import SGPTracingProcessorConfig +from agentex.lib.utils.logging import make_logger +from agentex.lib.sdk.fastacp.fastacp import FastACP +from agentex.lib.core.harness.emitter import UnifiedEmitter +from agentex.lib.adk.providers._modules.openai_turn import OpenAITurn +from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config + +logger = make_logger(__name__) + +_litellm_key = os.environ.get("LITELLM_API_KEY") +if _litellm_key and not os.environ.get("OPENAI_API_KEY"): + os.environ["OPENAI_API_KEY"] = _litellm_key + +add_tracing_processor_config( + SGPTracingProcessorConfig( + sgp_api_key=os.environ.get("SGP_API_KEY", ""), + sgp_account_id=os.environ.get("SGP_ACCOUNT_ID", ""), + sgp_base_url=os.environ.get("SGP_CLIENT_BASE_URL", ""), + ) +) + +acp = FastACP.create( + acp_type="async", + config=AsyncACPConfig(type="base"), +) + +_agent = None + + +def get_agent(): + global _agent + if _agent is None: + _agent = create_agent() + return _agent + + +@acp.on_task_create +async def handle_task_create(params: CreateTaskParams): + logger.info(f"Task created: {params.task.id}") + + +@acp.on_task_event_send +async def handle_task_event_send(params: SendEventParams): + """Handle each user message: run the agent and auto-send its turn.""" + agent = get_agent() + task_id = params.task.id + user_message = params.event.content.content + + logger.info(f"Processing message for task {task_id}") + + # Echo the user's message into the task history. + await adk.messages.create(task_id=task_id, content=params.event.content) + + async with adk.tracing.span( + trace_id=task_id, + task_id=task_id, + name="message", + input={"message": user_message}, + data={"__span_type__": "AGENT_WORKFLOW"}, + ) as turn_span: + result = Runner.run_streamed(starting_agent=agent, input=user_message) + turn = OpenAITurn(result=result, model=MODEL_NAME) + emitter = UnifiedEmitter( + task_id=task_id, + trace_id=task_id, + parent_span_id=turn_span.id if turn_span else None, + ) + turn_result = await emitter.auto_send_turn(turn) + if turn_span: + turn_span.output = {"final_output": turn_result.final_text} + + +@acp.on_task_cancel +async def handle_task_canceled(params: CancelTaskParams): + logger.info(f"Task canceled: {params.task.id}") diff --git a/examples/tutorials/10_async/00_base/130_harness_openai/project/agent.py b/examples/tutorials/10_async/00_base/130_harness_openai/project/agent.py new file mode 100644 index 000000000..5b83c5aab --- /dev/null +++ b/examples/tutorials/10_async/00_base/130_harness_openai/project/agent.py @@ -0,0 +1,43 @@ +"""OpenAI Agents SDK agent definition for the async harness tutorial. + +Identical agent shape to the sync tutorial (060). The only difference is the +delivery path in acp.py: the async ACP uses ``UnifiedEmitter.auto_send_turn`` +(Redis streaming) instead of yielding events over an HTTP response. +""" + +from __future__ import annotations + +from datetime import datetime + +from agents import Agent, function_tool, set_tracing_disabled + +from project.tools import get_weather + +set_tracing_disabled(True) + +MODEL_NAME = "gpt-4o" +INSTRUCTIONS = """You are a helpful AI assistant with access to tools. + +Current date and time: {timestamp} + +Guidelines: +- Be concise and helpful +- Use the weather tool when the user asks about the weather +- Always report the real tool output back to the user +""" + + +@function_tool +def weather(city: str) -> str: + """Get the current weather for a city.""" + return get_weather(city) + + +def create_agent() -> Agent: + """Build and return the OpenAI Agents SDK agent with the weather tool.""" + return Agent( + name="Harness OpenAI Assistant", + model=MODEL_NAME, + instructions=INSTRUCTIONS.format(timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S")), + tools=[weather], + ) diff --git a/examples/tutorials/10_async/00_base/130_harness_openai/project/tools.py b/examples/tutorials/10_async/00_base/130_harness_openai/project/tools.py new file mode 100644 index 000000000..d2e5468c9 --- /dev/null +++ b/examples/tutorials/10_async/00_base/130_harness_openai/project/tools.py @@ -0,0 +1,15 @@ +"""Tool definitions for the async OpenAI Agents harness tutorial.""" + +from __future__ import annotations + + +def get_weather(city: str) -> str: + """Get the current weather for a city. + + Args: + city: The name of the city to get weather for. + + Returns: + A string describing the weather conditions. + """ + return f"The weather in {city} is sunny and 72°F" diff --git a/examples/tutorials/10_async/00_base/130_harness_openai/pyproject.toml b/examples/tutorials/10_async/00_base/130_harness_openai/pyproject.toml new file mode 100644 index 000000000..c05e8c1c6 --- /dev/null +++ b/examples/tutorials/10_async/00_base/130_harness_openai/pyproject.toml @@ -0,0 +1,36 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "ab130-harness-openai" +version = "0.1.0" +description = "An async OpenAI Agents SDK agent on the unified harness surface" +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + "agentex-sdk", + "scale-gp", + "openai-agents", +] + +[project.optional-dependencies] +dev = [ + "pytest", + "pytest-asyncio", + "httpx", + "black", + "isort", + "flake8", +] + +[tool.hatch.build.targets.wheel] +packages = ["project"] + +[tool.black] +line-length = 88 +target-version = ['py312'] + +[tool.isort] +profile = "black" +line_length = 88 diff --git a/examples/tutorials/10_async/00_base/130_harness_openai/tests/test_agent.py b/examples/tutorials/10_async/00_base/130_harness_openai/tests/test_agent.py new file mode 100644 index 000000000..ceb95dbab --- /dev/null +++ b/examples/tutorials/10_async/00_base/130_harness_openai/tests/test_agent.py @@ -0,0 +1,77 @@ +"""Offline test for the async OpenAI Agents harness tutorial. + +This test does NOT require a running Agentex server, Redis, or an OpenAI API +key. It verifies the async delivery path this tutorial demonstrates: an +``OpenAITurn`` built from an injected canonical stream, pushed through +``UnifiedEmitter.auto_send_turn`` with an injected fake streaming backend, +returns the accumulated final text. + +To run: ``pytest tests/test_agent.py -v`` +""" + +from __future__ import annotations + +import pytest + +from agentex.types.task_message import TaskMessage +from agentex.types.text_content import TextContent +from agentex.lib.core.harness.emitter import UnifiedEmitter +from agentex.types.task_message_delta import TextDelta +from agentex.types.task_message_update import ( + StreamTaskMessageDone, + StreamTaskMessageDelta, + StreamTaskMessageStart, +) +from agentex.lib.adk.providers._modules.openai_turn import OpenAITurn + + +class _FakeCtx: + def __init__(self, initial_content): + self.task_message = TaskMessage(id="m-1", task_id="task-1", content=initial_content) + + async def __aenter__(self): + return self + + async def __aexit__(self, *a): + await self.close() + return False + + async def close(self): + pass + + async def stream_update(self, update): + return update + + +class _FakeStreaming: + def streaming_task_message_context(self, task_id, initial_content, **_kwargs): # noqa: ARG002 + return _FakeCtx(initial_content) + + +async def _canonical_stream(events): + for e in events: + yield e + + +@pytest.mark.asyncio +async def test_auto_send_turn_returns_final_text(): + events = [ + StreamTaskMessageStart(type="start", index=0, content=TextContent(type="text", author="agent", content="")), + StreamTaskMessageDelta(type="delta", index=0, delta=TextDelta(type="text", text_delta="Hel")), + StreamTaskMessageDelta(type="delta", index=0, delta=TextDelta(type="text", text_delta="lo")), + StreamTaskMessageDone(type="done", index=0), + ] + turn = OpenAITurn(stream=_canonical_stream(events), model="gpt-4o") + emitter = UnifiedEmitter( + task_id="task-1", + trace_id=None, + parent_span_id=None, + streaming=_FakeStreaming(), + ) + + result = await emitter.auto_send_turn(turn) + assert result.final_text == "Hello" + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/examples/tutorials/10_async/10_temporal/140_harness_openai/.dockerignore b/examples/tutorials/10_async/10_temporal/140_harness_openai/.dockerignore new file mode 100644 index 000000000..c49489471 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/140_harness_openai/.dockerignore @@ -0,0 +1,43 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Environments +.env** +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# Git +.git +.gitignore + +# Misc +.DS_Store diff --git a/examples/tutorials/10_async/10_temporal/140_harness_openai/Dockerfile b/examples/tutorials/10_async/10_temporal/140_harness_openai/Dockerfile new file mode 100644 index 000000000..c107e3269 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/140_harness_openai/Dockerfile @@ -0,0 +1,43 @@ +# syntax=docker/dockerfile:1.3 +FROM python:3.12-slim +COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/ + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + htop \ + vim \ + curl \ + tar \ + python3-dev \ + postgresql-client \ + build-essential \ + libpq-dev \ + gcc \ + cmake \ + netcat-openbsd \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +RUN uv pip install --system --upgrade pip setuptools wheel + +ENV UV_HTTP_TIMEOUT=1000 + +COPY 10_async/10_temporal/140_harness_openai/pyproject.toml /app/140_harness_openai/pyproject.toml +COPY 10_async/10_temporal/140_harness_openai/README.md /app/140_harness_openai/README.md + +WORKDIR /app/140_harness_openai + +COPY 10_async/10_temporal/140_harness_openai/project /app/140_harness_openai/project +COPY 10_async/10_temporal/140_harness_openai/tests /app/140_harness_openai/tests +COPY test_utils /app/test_utils + +RUN uv pip install --system .[dev] + +ENV PYTHONPATH=/app + +ENV AGENT_NAME=at140-harness-openai + +CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"] + +# When we deploy the worker, we will replace the CMD with the following +# CMD ["python", "-m", "run_worker"] diff --git a/examples/tutorials/10_async/10_temporal/140_harness_openai/README.md b/examples/tutorials/10_async/10_temporal/140_harness_openai/README.md new file mode 100644 index 000000000..0415ae225 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/140_harness_openai/README.md @@ -0,0 +1,41 @@ +# Temporal OpenAI Agents on the unified harness surface + +A Temporal-backed Agentex agent that runs the OpenAI Agents SDK and delivers its +output through the **unified harness surface**. + +## What this demonstrates + +LLM calls are non-deterministic, so they can't run directly in a Temporal +workflow. This tutorial keeps the workflow (`project/workflow.py`) +deterministic and delegates each turn to a custom activity +(`project/activities.py`). The activity uses the SAME `OpenAITurn` adapter as +the sync (`060_harness_openai`) and async (`130_harness_openai`) variants, and +delivers via `UnifiedEmitter.auto_send_turn` — which is designed to run inside +an activity (it writes streaming side effects to Redis and returns the final +text + usage). + +```python +# inside the activity: +result = Runner.run_streamed(starting_agent=agent, input=user_message) +turn = OpenAITurn(result=result, model="gpt-4o") +emitter = UnifiedEmitter(task_id=task_id, trace_id=trace_id, parent_span_id=parent_span_id) +turn_result = await emitter.auto_send_turn(turn) +return turn_result.final_text +``` + +## Run it + +```bash +agentex agents run --manifest manifest.yaml +``` + +This starts both the ACP HTTP server and the Temporal worker. + +## Test it + +The offline test exercises the activity's delivery path with an injected fake +streaming backend (no server, Temporal, Redis, or API key required): + +```bash +pytest tests/test_agent.py -v +``` diff --git a/examples/tutorials/10_async/10_temporal/140_harness_openai/environments.yaml b/examples/tutorials/10_async/10_temporal/140_harness_openai/environments.yaml new file mode 100644 index 000000000..f90511911 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/140_harness_openai/environments.yaml @@ -0,0 +1,64 @@ +# Agent Environment Configuration +# ------------------------------ +# This file defines environment-specific settings for your agent. +# This DIFFERS from the manifest.yaml file in that it is used to program things that are ONLY per environment. + +# ********** EXAMPLE ********** +# schema_version: "v1" # This is used to validate the file structure and is not used by the agentex CLI +# environments: +# dev: +# auth: +# principal: +# user_id: "1234567890" +# user_name: "John Doe" +# user_email: "john.doe@example.com" +# user_role: "admin" +# user_permissions: "read, write, delete" +# helm_overrides: # This is used to override the global helm values.yaml file in the agentex-agent helm charts +# replicas: 3 +# resources: +# requests: +# cpu: "1000m" +# memory: "2Gi" +# limits: +# cpu: "2000m" +# memory: "4Gi" +# env: +# - name: LOG_LEVEL +# value: "DEBUG" +# - name: ENVIRONMENT +# value: "staging" +# +# kubernetes: +# # OPTIONAL - Otherwise it will be derived from separately. However, this can be used to override the derived +# # namespace and deploy it with in the same namespace that already exists for a separate agent. +# namespace: "team-example-tutorial" +# ********** END EXAMPLE ********** + +schema_version: "v1" # This is used to validate the file structure and is not used by the agentex CLI +environments: + dev: + auth: + principal: + user_id: # TODO: Fill in + account_id: # TODO: Fill in + helm_overrides: + # This is used to override the global helm values.yaml file in the agentex-agent helm charts + replicaCount: 2 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" + temporal-worker: + enabled: true + replicaCount: 2 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" \ No newline at end of file diff --git a/examples/tutorials/10_async/10_temporal/140_harness_openai/manifest.yaml b/examples/tutorials/10_async/10_temporal/140_harness_openai/manifest.yaml new file mode 100644 index 000000000..64a943438 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/140_harness_openai/manifest.yaml @@ -0,0 +1,62 @@ +build: + context: + root: ../../../ + include_paths: + - 10_async/10_temporal/140_harness_openai + - test_utils + dockerfile: 10_async/10_temporal/140_harness_openai/Dockerfile + dockerignore: 10_async/10_temporal/140_harness_openai/.dockerignore + +local_development: + agent: + port: 8000 + host_address: host.docker.internal + paths: + acp: project/acp.py + worker: project/run_worker.py + +agent: + acp_type: async + name: at140-harness-openai + description: A Temporal-backed OpenAI Agents SDK agent on the unified harness surface + + temporal: + enabled: true + workflows: + - name: at140-harness-openai + queue_name: at140_harness_openai_queue + + credentials: + - env_var_name: REDIS_URL + secret_name: redis-url-secret + secret_key: url + - env_var_name: OPENAI_API_KEY + secret_name: openai-api-key + secret_key: api-key + - env_var_name: SGP_API_KEY + secret_name: sgp-api-key + secret_key: api-key + - env_var_name: SGP_ACCOUNT_ID + secret_name: sgp-account-id + secret_key: account-id + - env_var_name: SGP_CLIENT_BASE_URL + secret_name: sgp-client-base-url + secret_key: url + +deployment: + image: + repository: "" + tag: "latest" + + global: + agent: + name: "at140-harness-openai" + description: "A Temporal-backed OpenAI Agents SDK agent on the unified harness surface" + replicaCount: 1 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" diff --git a/examples/tutorials/10_async/10_temporal/140_harness_openai/project/__init__.py b/examples/tutorials/10_async/10_temporal/140_harness_openai/project/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/examples/tutorials/10_async/10_temporal/140_harness_openai/project/acp.py b/examples/tutorials/10_async/10_temporal/140_harness_openai/project/acp.py new file mode 100644 index 000000000..6076835ba --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/140_harness_openai/project/acp.py @@ -0,0 +1,33 @@ +"""ACP server for the Temporal OpenAI Agents harness tutorial. + +Thin by design: with ``acp_type="async"`` + ``TemporalACPConfig``, FastACP +auto-wires task/create, task/event/send, and task/cancel onto the workflow. +The agent logic lives in ``project/workflow.py`` (deterministic) and +``project/activities.py`` (the harness-backed LLM run), executed by the worker +in ``project/run_worker.py``. +""" + +from __future__ import annotations + +import os + +from dotenv import load_dotenv + +load_dotenv() + +from agentex.lib.types.fastacp import TemporalACPConfig +from agentex.lib.sdk.fastacp.fastacp import FastACP + +# LiteLLM proxy auth: copy LITELLM_API_KEY to OPENAI_API_KEY for OpenAI client +# compatibility, so the same example works behind the Scale LiteLLM gateway. +_litellm_key = os.environ.get("LITELLM_API_KEY") +if _litellm_key and not os.environ.get("OPENAI_API_KEY"): + os.environ["OPENAI_API_KEY"] = _litellm_key + +acp = FastACP.create( + acp_type="async", + config=TemporalACPConfig( + type="temporal", + temporal_address=os.getenv("TEMPORAL_ADDRESS", "localhost:7233"), + ), +) diff --git a/examples/tutorials/10_async/10_temporal/140_harness_openai/project/activities.py b/examples/tutorials/10_async/10_temporal/140_harness_openai/project/activities.py new file mode 100644 index 000000000..398a488ed --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/140_harness_openai/project/activities.py @@ -0,0 +1,55 @@ +"""Custom Temporal activity that runs the OpenAI agent on the harness surface. + +LLM calls are non-deterministic, so they must run inside a Temporal activity +rather than directly in the workflow. This activity runs the OpenAI Agents SDK +via ``Runner.run_streamed``, wraps the result in an ``OpenAITurn``, and pushes +the canonical stream to the task stream via ``UnifiedEmitter.auto_send_turn``. + +``auto_send`` (which backs ``auto_send_turn``) is explicitly designed to be +called from inside an activity: it writes streaming side effects to Redis and +returns the accumulated final text + normalized usage. +""" + +from __future__ import annotations + +from agents import Runner +from pydantic import BaseModel +from temporalio import activity + +from project.agent import MODEL_NAME, create_agent +from agentex.lib.utils.logging import make_logger +from agentex.lib.core.harness.emitter import UnifiedEmitter +from agentex.lib.adk.providers._modules.openai_turn import OpenAITurn + +logger = make_logger(__name__) + +RUN_HARNESS_AGENT_ACTIVITY = "run_harness_openai_agent" + + +class RunHarnessAgentParams(BaseModel): + """Parameters for the harness agent activity.""" + + task_id: str + user_message: str + trace_id: str | None = None + parent_span_id: str | None = None + + +class HarnessActivities: + """Hosts the harness-backed OpenAI agent activity.""" + + @activity.defn(name=RUN_HARNESS_AGENT_ACTIVITY) + async def run_harness_openai_agent(self, params: RunHarnessAgentParams) -> str: + """Run the agent for one turn and auto-send its output; return final text.""" + logger.info(f"Running harness OpenAI agent for task {params.task_id}") + + agent = create_agent() + result = Runner.run_streamed(starting_agent=agent, input=params.user_message) + turn = OpenAITurn(result=result, model=MODEL_NAME) + emitter = UnifiedEmitter( + task_id=params.task_id, + trace_id=params.trace_id, + parent_span_id=params.parent_span_id, + ) + turn_result = await emitter.auto_send_turn(turn) + return turn_result.final_text diff --git a/examples/tutorials/10_async/10_temporal/140_harness_openai/project/agent.py b/examples/tutorials/10_async/10_temporal/140_harness_openai/project/agent.py new file mode 100644 index 000000000..385a80b69 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/140_harness_openai/project/agent.py @@ -0,0 +1,44 @@ +"""OpenAI Agents SDK agent definition for the Temporal harness tutorial. + +Same agent shape as the sync (060) and async (130) variants. Here the agent is +built and run inside a Temporal activity (see ``project.activities``); the +workflow stays deterministic and delegates the non-deterministic LLM run to that +activity, which delivers the turn via the unified harness surface. +""" + +from __future__ import annotations + +from datetime import datetime + +from agents import Agent, function_tool, set_tracing_disabled + +from project.tools import get_weather + +set_tracing_disabled(True) + +MODEL_NAME = "gpt-4o" +INSTRUCTIONS = """You are a helpful AI assistant with access to tools. + +Current date and time: {timestamp} + +Guidelines: +- Be concise and helpful +- Use the weather tool when the user asks about the weather +- Always report the real tool output back to the user +""" + + +@function_tool +def weather(city: str) -> str: + """Get the current weather for a city.""" + return get_weather(city) + + +def create_agent() -> Agent: + """Build and return the OpenAI Agents SDK agent with the weather tool.""" + return Agent( + name="Harness OpenAI Assistant", + model=MODEL_NAME, + instructions=INSTRUCTIONS.format(timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S")), + tools=[weather], + ) diff --git a/examples/tutorials/10_async/10_temporal/140_harness_openai/project/run_worker.py b/examples/tutorials/10_async/10_temporal/140_harness_openai/project/run_worker.py new file mode 100644 index 000000000..69586a395 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/140_harness_openai/project/run_worker.py @@ -0,0 +1,44 @@ +"""Temporal worker for the OpenAI Agents harness tutorial. + +Runs as a separate long-lived process alongside the ACP HTTP server. Registers +the built-in Agentex activities plus the custom harness agent activity +(``HarnessActivities.run_harness_openai_agent``), and the workflow. +""" + +import asyncio + +from project.workflow import At140HarnessOpenaiWorkflow +from project.activities import HarnessActivities +from agentex.lib.utils.debug import setup_debug_if_enabled +from agentex.lib.utils.logging import make_logger +from agentex.lib.environment_variables import EnvironmentVariables +from agentex.lib.core.temporal.activities import get_all_activities +from agentex.lib.core.temporal.workers.worker import AgentexWorker + +environment_variables = EnvironmentVariables.refresh() +logger = make_logger(__name__) + + +async def main(): + setup_debug_if_enabled() + + task_queue_name = environment_variables.WORKFLOW_TASK_QUEUE + if task_queue_name is None: + raise ValueError("WORKFLOW_TASK_QUEUE is not set") + + harness_activities = HarnessActivities() + all_activities = [ + harness_activities.run_harness_openai_agent, + *get_all_activities(), + ] + + worker = AgentexWorker(task_queue=task_queue_name) + + await worker.run( + activities=all_activities, + workflow=At140HarnessOpenaiWorkflow, + ) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/tutorials/10_async/10_temporal/140_harness_openai/project/tools.py b/examples/tutorials/10_async/10_temporal/140_harness_openai/project/tools.py new file mode 100644 index 000000000..d26f9b097 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/140_harness_openai/project/tools.py @@ -0,0 +1,15 @@ +"""Tool definitions for the Temporal OpenAI Agents harness tutorial.""" + +from __future__ import annotations + + +def get_weather(city: str) -> str: + """Get the current weather for a city. + + Args: + city: The name of the city to get weather for. + + Returns: + A string describing the weather conditions. + """ + return f"The weather in {city} is sunny and 72°F" diff --git a/examples/tutorials/10_async/10_temporal/140_harness_openai/project/workflow.py b/examples/tutorials/10_async/10_temporal/140_harness_openai/project/workflow.py new file mode 100644 index 000000000..43b5b8466 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/140_harness_openai/project/workflow.py @@ -0,0 +1,110 @@ +"""Temporal workflow for the OpenAI Agents harness tutorial. + +The workflow stays deterministic: it echoes the user message and delegates the +non-deterministic LLM run to ``run_harness_openai_agent`` (see +``project.activities``). That activity runs the OpenAI Agents SDK and delivers +the turn through the unified harness surface (``OpenAITurn`` + +``UnifiedEmitter.auto_send_turn``). +""" + +from __future__ import annotations + +import os +import json +from datetime import timedelta + +from temporalio import workflow +from temporalio.common import RetryPolicy + +from agentex.lib import adk +from project.activities import RUN_HARNESS_AGENT_ACTIVITY, RunHarnessAgentParams +from agentex.lib.types.acp import SendEventParams, CreateTaskParams +from agentex.lib.types.tracing import SGPTracingProcessorConfig +from agentex.lib.utils.logging import make_logger +from agentex.types.text_content import TextContent +from agentex.lib.environment_variables import EnvironmentVariables +from agentex.lib.core.temporal.types.workflow import SignalName +from agentex.lib.core.temporal.workflows.workflow import BaseWorkflow +from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config + +add_tracing_processor_config( + SGPTracingProcessorConfig( + sgp_api_key=os.environ.get("SGP_API_KEY", ""), + sgp_account_id=os.environ.get("SGP_ACCOUNT_ID", ""), + sgp_base_url=os.environ.get("SGP_CLIENT_BASE_URL", ""), + ) +) + +environment_variables = EnvironmentVariables.refresh() + +if environment_variables.WORKFLOW_NAME is None: + raise ValueError("Environment variable WORKFLOW_NAME is not set") +if environment_variables.AGENT_NAME is None: + raise ValueError("Environment variable AGENT_NAME is not set") + +logger = make_logger(__name__) + + +@workflow.defn(name=environment_variables.WORKFLOW_NAME) +class At140HarnessOpenaiWorkflow(BaseWorkflow): + """Long-running workflow that runs each turn through the harness activity.""" + + def __init__(self): + super().__init__(display_name=environment_variables.AGENT_NAME) + self._complete_task = False + self._turn_number = 0 + + @workflow.signal(name=SignalName.RECEIVE_EVENT) + async def on_task_event_send(self, params: SendEventParams) -> None: + """Handle a user message: echo it, then run the harness activity durably.""" + logger.info(f"Received task event: {params.task.id}") + self._turn_number += 1 + + # Echo the user's message so it shows up in the UI as a chat bubble. + await adk.messages.create(task_id=params.task.id, content=params.event.content) + + async with adk.tracing.span( + trace_id=params.task.id, + task_id=params.task.id, + name=f"Turn {self._turn_number}", + input={"message": params.event.content.content}, + ) as span: + final_text = await workflow.execute_activity( + RUN_HARNESS_AGENT_ACTIVITY, + RunHarnessAgentParams( + task_id=params.task.id, + user_message=params.event.content.content, + trace_id=params.task.id, + parent_span_id=span.id if span else None, + ), + start_to_close_timeout=timedelta(minutes=5), + retry_policy=RetryPolicy(maximum_attempts=3), + ) + if span: + span.output = {"final_output": final_text} + + @workflow.run + async def on_task_create(self, params: CreateTaskParams) -> str: + """Workflow entry point — keep the conversation alive for incoming signals.""" + logger.info(f"Task created: {params.task.id}") + + await adk.messages.create( + task_id=params.task.id, + content=TextContent( + author="agent", + content=( + f"Task initialized with params:\n{json.dumps(params.params, indent=2)}\n" + f"Send me a message and I'll respond using an OpenAI Agents SDK agent " + f"delivered through the unified harness surface." + ), + ), + ) + + await workflow.wait_condition(lambda: self._complete_task, timeout=None) + return "Task completed" + + @workflow.signal + async def complete_task_signal(self) -> None: + """Graceful workflow shutdown signal.""" + logger.info("Received complete_task signal") + self._complete_task = True diff --git a/examples/tutorials/10_async/10_temporal/140_harness_openai/pyproject.toml b/examples/tutorials/10_async/10_temporal/140_harness_openai/pyproject.toml new file mode 100644 index 000000000..5bf53f6be --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/140_harness_openai/pyproject.toml @@ -0,0 +1,38 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "at140-harness-openai" +version = "0.1.0" +description = "A Temporal-backed OpenAI Agents SDK agent on the unified harness surface" +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + "agentex-sdk", + "scale-gp", + "temporalio>=1.18.2", + "openai-agents", +] + +[project.optional-dependencies] +dev = [ + "pytest", + "pytest-asyncio", + "httpx", + "black", + "isort", + "flake8", + "debugpy>=1.8.15", +] + +[tool.hatch.build.targets.wheel] +packages = ["project"] + +[tool.black] +line-length = 88 +target-version = ['py312'] + +[tool.isort] +profile = "black" +line_length = 88 diff --git a/examples/tutorials/10_async/10_temporal/140_harness_openai/tests/test_agent.py b/examples/tutorials/10_async/10_temporal/140_harness_openai/tests/test_agent.py new file mode 100644 index 000000000..dd043c44c --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/140_harness_openai/tests/test_agent.py @@ -0,0 +1,77 @@ +"""Offline test for the Temporal OpenAI Agents harness tutorial. + +This test does NOT require a running Agentex server, Temporal, Redis, or an +OpenAI API key. It verifies the delivery path the harness activity uses: an +``OpenAITurn`` built from an injected canonical stream, pushed through +``UnifiedEmitter.auto_send_turn`` with an injected fake streaming backend, +returns the accumulated final text (which the activity returns to the workflow). + +To run: ``pytest tests/test_agent.py -v`` +""" + +from __future__ import annotations + +import pytest + +from agentex.types.task_message import TaskMessage +from agentex.types.text_content import TextContent +from agentex.lib.core.harness.emitter import UnifiedEmitter +from agentex.types.task_message_delta import TextDelta +from agentex.types.task_message_update import ( + StreamTaskMessageDone, + StreamTaskMessageDelta, + StreamTaskMessageStart, +) +from agentex.lib.adk.providers._modules.openai_turn import OpenAITurn + + +class _FakeCtx: + def __init__(self, initial_content): + self.task_message = TaskMessage(id="m-1", task_id="task-1", content=initial_content) + + async def __aenter__(self): + return self + + async def __aexit__(self, *a): + await self.close() + return False + + async def close(self): + pass + + async def stream_update(self, update): + return update + + +class _FakeStreaming: + def streaming_task_message_context(self, task_id, initial_content, **_kwargs): # noqa: ARG002 + return _FakeCtx(initial_content) + + +async def _canonical_stream(events): + for e in events: + yield e + + +@pytest.mark.asyncio +async def test_activity_delivery_returns_final_text(): + events = [ + StreamTaskMessageStart(type="start", index=0, content=TextContent(type="text", author="agent", content="")), + StreamTaskMessageDelta(type="delta", index=0, delta=TextDelta(type="text", text_delta="72")), + StreamTaskMessageDelta(type="delta", index=0, delta=TextDelta(type="text", text_delta="F")), + StreamTaskMessageDone(type="done", index=0), + ] + turn = OpenAITurn(stream=_canonical_stream(events), model="gpt-4o") + emitter = UnifiedEmitter( + task_id="task-1", + trace_id=None, + parent_span_id=None, + streaming=_FakeStreaming(), + ) + + result = await emitter.auto_send_turn(turn) + assert result.final_text == "72F" + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/src/agentex/lib/adk/providers/_modules/openai_turn.py b/src/agentex/lib/adk/providers/_modules/openai_turn.py new file mode 100644 index 000000000..17a6518ee --- /dev/null +++ b/src/agentex/lib/adk/providers/_modules/openai_turn.py @@ -0,0 +1,134 @@ +"""OpenAITurn: adapt an OpenAI Agents SDK streamed run onto the harness surface. + +A ``HarnessTurn`` exposes a single canonical ``StreamTaskMessage*`` stream plus +normalized usage. ``OpenAITurn`` wraps a ``RunResultStreaming`` (from +``Runner.run_streamed``), converts its native OpenAI events into the canonical +stream via ``convert_openai_to_agentex_events``, and after exhaustion reads the +run's ``raw_responses`` to aggregate usage into a provider-independent +``TurnUsage``. + +Delivery (yield vs auto-send) and tracing are owned by ``UnifiedEmitter``; this +module is purely the provider->canonical adapter. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, AsyncIterator + +from agents.usage import Usage + +from agentex.lib.utils.logging import make_logger +from agentex.lib.core.harness.types import TurnUsage, StreamTaskMessage +from agentex.lib.adk.providers._modules.sync_provider import ( + convert_openai_to_agentex_events, +) + +if TYPE_CHECKING: + from agents import ModelResponse, RunResultStreaming + +logger = make_logger(__name__) + + +def openai_usage_to_turn_usage(usage: Usage | None, model: str | None) -> TurnUsage: + """Map an ``agents.Usage`` to a harness-independent ``TurnUsage``. + + All field access is defensive (``getattr(..., None)``): different model + backends populate different subsets of the usage object, and real zeros are + valid values (e.g. 0 output tokens on a pure cache hit), so we never coerce + a present-but-zero value into ``None``. + """ + if usage is None: + return TurnUsage(model=model) + + input_details = getattr(usage, "input_tokens_details", None) + output_details = getattr(usage, "output_tokens_details", None) + + return TurnUsage( + model=model, + num_llm_calls=getattr(usage, "requests", None) or 0, + input_tokens=getattr(usage, "input_tokens", None), + cached_input_tokens=getattr(input_details, "cached_tokens", None), + output_tokens=getattr(usage, "output_tokens", None), + reasoning_tokens=getattr(output_details, "reasoning_tokens", None), + total_tokens=getattr(usage, "total_tokens", None), + ) + + +def _aggregate_usage(raw_responses: list[ModelResponse]) -> Usage | None: + """Sum the per-response ``Usage`` across a run's ``ModelResponse`` list. + + Returns ``None`` when no response carries usage so the caller can emit a + usage object with only the model name set. ``Usage.add`` accumulates + requests/tokens (including cached/reasoning detail fields). + """ + total: Usage | None = None + for response in raw_responses: + resp_usage = getattr(response, "usage", None) + if resp_usage is None: + continue + if total is None: + total = Usage() + total.add(resp_usage) + return total + + +class OpenAITurn: + """A single OpenAI Agents SDK turn adapted to the ``HarnessTurn`` protocol. + + Construct with exactly one of: + - ``result``: a ``RunResultStreaming`` from ``Runner.run_streamed``. Its + ``stream_events()`` is converted to the canonical stream, and after the + stream is exhausted ``raw_responses`` is read to compute usage. + - ``stream``: a pre-built async iterator of canonical ``StreamTaskMessage`` + events (bypasses ``convert_openai_to_agentex_events``). Useful for tests + and for callers that have already produced canonical events. Usage stays + at ``TurnUsage(model=...)`` because there is no run to read usage from. + + ``coalesce_tool_requests`` is accepted for API parity with other provider + turns but is a no-op for OpenAI: the OpenAI converter already emits a single + ``Full(ToolRequestContent)`` per tool call rather than streamed argument + deltas, so there is nothing to coalesce. + """ + + def __init__( + self, + result: RunResultStreaming | None = None, + model: str | None = None, + stream: AsyncIterator[StreamTaskMessage] | None = None, + coalesce_tool_requests: bool = False, # noqa: ARG002 - API parity, no-op for OpenAI + ) -> None: + if result is None and stream is None: + raise ValueError("OpenAITurn requires either `result` or `stream`") + self._result = result + self._model = model + self._stream = stream + self._usage: TurnUsage = TurnUsage(model=model) + + @property + def events(self) -> AsyncIterator[StreamTaskMessage]: + return self._iter_events() + + async def _iter_events(self) -> AsyncIterator[StreamTaskMessage]: + if self._stream is not None: + async for event in self._stream: + yield event + return + + result = self._result + assert result is not None # guaranteed by __init__ + async for event in convert_openai_to_agentex_events(result.stream_events()): + yield event + + # Stream is exhausted: the run has finished and raw_responses is now + # populated, so usage can be aggregated and normalized. + try: + raw_responses: list[Any] = list(getattr(result, "raw_responses", None) or []) + aggregated = _aggregate_usage(raw_responses) + self._usage = openai_usage_to_turn_usage(aggregated, self._model) + except Exception as exc: # pragma: no cover - defensive: never break delivery on usage + logger.warning(f"Failed to aggregate OpenAI usage: {exc}") + self._usage = TurnUsage(model=self._model) + + def usage(self) -> TurnUsage: + """Normalized turn usage. Valid only after ``events`` is exhausted.""" + return self._usage diff --git a/src/agentex/lib/adk/providers/_modules/sync_provider.py b/src/agentex/lib/adk/providers/_modules/sync_provider.py index a34cfcda1..9996bf30d 100644 --- a/src/agentex/lib/adk/providers/_modules/sync_provider.py +++ b/src/agentex/lib/adk/providers/_modules/sync_provider.py @@ -55,24 +55,28 @@ def _serialize_item(item: Any) -> dict[str, Any]: Uses model_dump() for Pydantic models, otherwise extracts attributes manually. Filters out internal Pydantic fields that can't be serialized. """ - if hasattr(item, 'model_dump'): + if hasattr(item, "model_dump"): # Pydantic model - use model_dump for proper serialization try: - return item.model_dump(mode='json', exclude_unset=True) + return item.model_dump(mode="json", exclude_unset=True) except Exception: # Fallback to dict conversion - return dict(item) if hasattr(item, '__iter__') else {} + return dict(item) if hasattr(item, "__iter__") else {} else: # Not a Pydantic model - extract attributes manually item_dict = {} for attr_name in dir(item): - if not attr_name.startswith('_') and attr_name not in ('model_fields', 'model_config', 'model_computed_fields'): + if not attr_name.startswith("_") and attr_name not in ( + "model_fields", + "model_config", + "model_computed_fields", + ): try: attr_value = getattr(item, attr_name, None) # Skip methods and None values if attr_value is not None and not callable(attr_value): # Convert to JSON-serializable format - if hasattr(attr_value, 'model_dump'): + if hasattr(attr_value, "model_dump"): item_dict[attr_name] = attr_value.model_dump() elif isinstance(attr_value, (str, int, float, bool, list, dict)): item_dict[attr_name] = attr_value @@ -85,9 +89,26 @@ def _serialize_item(item: Any) -> dict[str, Any]: class SyncStreamingModel(Model): - """Simple model wrapper that adds logging to stream_response and supports tracing.""" + """Simple model wrapper that adds logging to stream_response and supports tracing. + + .. deprecated:: + Prefer the unified harness surface for new OpenAI Agents integrations: + wrap a ``Runner.run_streamed`` result in + ``agentex.lib.adk.providers._modules.openai_turn.OpenAITurn`` and drive + delivery + tracing through ``UnifiedEmitter`` (see the + ``060_harness_openai`` / ``130_harness_openai`` / ``140_harness_openai`` + tutorials). This per-model tracing wrapper predates the harness and is + retained only for backwards compatibility; it will be removed in a + future release. No runtime warning is emitted. + """ - def __init__(self, original_model: Model, trace_id: str | None = None, parent_span_id: str | None = None, tracer: AsyncTracer | None = None): + def __init__( + self, + original_model: Model, + trace_id: str | None = None, + parent_span_id: str | None = None, + tracer: AsyncTracer | None = None, + ): """Initialize with the original OpenAI model to wrap. Args: original_model: The OpenAI model instance to wrap @@ -147,7 +168,7 @@ async def get_response( } # Only add conversation_id if the model supports it - if hasattr(self.original_model, 'supports_conversation_id'): + if hasattr(self.original_model, "supports_conversation_id"): kwargs["conversation_id"] = conversation_id response = await self.original_model.get_response(**kwargs) @@ -158,12 +179,12 @@ async def get_response( final_output = None # Extract final output text from response - response_final_output = getattr(response, 'final_output', None) + response_final_output = getattr(response, "final_output", None) if response_final_output: final_output = response_final_output # Extract items from the response output - response_output = getattr(response, 'output', None) + response_output = getattr(response, "output", None) if response_output: output_items = response_output if isinstance(response_output, list) else [response_output] @@ -174,12 +195,12 @@ async def get_response( new_items.append(item_dict) # Extract final_output from message type if available - if item_dict.get('type') == 'message' and not final_output: - content = item_dict.get('content', []) + if item_dict.get("type") == "message" and not final_output: + content = item_dict.get("content", []) if content and isinstance(content, list): for content_part in content: - if isinstance(content_part, dict) and 'text' in content_part: - final_output = content_part['text'] + if isinstance(content_part, dict) and "text" in content_part: + final_output = content_part["text"] break except Exception as e: logger.warning(f"Failed to serialize item in get_response: {e}") @@ -207,7 +228,7 @@ async def get_response( } # Only add conversation_id if the model supports it - if hasattr(self.original_model, 'supports_conversation_id'): + if hasattr(self.original_model, "supports_conversation_id"): kwargs["conversation_id"] = conversation_id return await self.original_model.get_response(**kwargs) @@ -266,7 +287,7 @@ async def stream_response( } # Only add conversation_id if the model supports it - if hasattr(self.original_model, 'supports_conversation_id'): + if hasattr(self.original_model, "supports_conversation_id"): stream_kwargs["conversation_id"] = conversation_id # Get the stream response from the original model and yield each event @@ -277,11 +298,11 @@ async def stream_response( final_response_text = "" async for event in stream_response: - event_type = getattr(event, 'type', 'no-type') + event_type = getattr(event, "type", "no-type") # Handle response.output_item.done events which contain completed items - if event_type == 'response.output_item.done': - item = getattr(event, 'item', None) + if event_type == "response.output_item.done": + item = getattr(event, "item", None) if item is not None: try: item_dict = _serialize_item(item) @@ -289,12 +310,12 @@ async def stream_response( new_items.append(item_dict) # Update final_response_text from message type if available - if item_dict.get('type') == 'message': - content = item_dict.get('content', []) + if item_dict.get("type") == "message": + content = item_dict.get("content", []) if content and isinstance(content, list): for content_part in content: - if isinstance(content_part, dict) and 'text' in content_part: - final_response_text = content_part['text'] + if isinstance(content_part, dict) and "text" in content_part: + final_response_text = content_part["text"] break except Exception as e: logger.warning(f"Failed to serialize item in stream_response: {e}") @@ -326,7 +347,7 @@ async def stream_response( } # Only add conversation_id if the model supports it - if hasattr(self.original_model, 'supports_conversation_id'): + if hasattr(self.original_model, "supports_conversation_id"): stream_kwargs["conversation_id"] = conversation_id # Get the stream response from the original model and yield each event @@ -336,8 +357,17 @@ async def stream_response( async for event in stream_response: yield event + class SyncStreamingProvider(OpenAIProvider): - """Simple OpenAI provider wrapper that adds logging to streaming and supports tracing.""" + """Simple OpenAI provider wrapper that adds logging to streaming and supports tracing. + + .. deprecated:: + Prefer the unified harness surface for new OpenAI Agents integrations + (see :class:`SyncStreamingModel` and the ``OpenAITurn`` + + ``UnifiedEmitter`` pattern). This provider wrapper predates the harness + and is retained only for backwards compatibility; it will be removed in + a future release. No runtime warning is emitted. + """ def __init__(self, trace_id: str | None = None, parent_span_id: str | None = None, *args, **kwargs): """Initialize the provider with tracing support. @@ -405,6 +435,7 @@ def _extract_tool_call_info(tool_call_item: Any) -> tuple[str, str, dict[str, An if tool_call_item.arguments: if isinstance(tool_call_item.arguments, str): import json + tool_arguments = json.loads(tool_call_item.arguments) if tool_call_item.arguments else {} else: tool_arguments = tool_call_item.arguments @@ -418,6 +449,7 @@ def _extract_tool_call_info(tool_call_item: Any) -> tuple[str, str, dict[str, An arguments = tool_call_item.arguments if isinstance(arguments, str): import json + tool_arguments = json.loads(arguments) if arguments else {} elif arguments is None: tool_arguments = {} @@ -466,11 +498,11 @@ def _extract_tool_response_info(tool_map: dict[str, Any], tool_output_item: Any) async def convert_openai_to_agentex_events(stream_response): """Convert OpenAI streaming events to AgentEx TaskMessageUpdate events with reasoning support. - + This is an enhanced version of the base converter that includes support for: - Reasoning content deltas (for o1 models) - Reasoning summary deltas (for o1 models) - + Args: stream_response: An async iterator of OpenAI streaming events Yields: @@ -488,8 +520,8 @@ async def convert_openai_to_agentex_events(stream_response): event_count += 1 # Check for raw response events which contain the actual OpenAI streaming events - if hasattr(event, 'type') and event.type == 'raw_response_event': - if hasattr(event, 'data'): + if hasattr(event, "type") and event.type == "raw_response_event": + if hasattr(event, "data"): raw_event = event.data # Check for ResponseOutputItemAddedEvent which signals a new message starting @@ -504,7 +536,7 @@ async def convert_openai_to_agentex_events(stream_response): if item_id in item_id_to_index: # Get the message type to decide whether to send done event message_type = item_id_to_type.get(item_id, "text") - + # Don't send done events for reasoning content/summary # They just end with their last delta if message_type not in ("reasoning_content", "reasoning_summary"): @@ -608,7 +640,7 @@ async def convert_openai_to_agentex_events(stream_response): # Check if this is a text delta event from OpenAI elif isinstance(raw_event, ResponseTextDeltaEvent): # Check if this event has an item_id - item_id = getattr(raw_event, 'item_id', None) + item_id = getattr(raw_event, "item_id", None) # If this is a new item_id we haven't seen, it's a new message if item_id and item_id not in item_id_to_index: @@ -647,13 +679,13 @@ async def convert_openai_to_agentex_events(stream_response): ) yield delta_message - elif hasattr(event, 'type') and event.type == 'run_item_stream_event': + elif hasattr(event, "type") and event.type == "run_item_stream_event": # Skip reasoning_item events - they're handled via raw_response_event above - if hasattr(event, 'item') and event.item.type == 'reasoning_item': + if hasattr(event, "item") and event.item.type == "reasoning_item": continue # Check for tool_call_item type (this is when a tool is being called) - elif hasattr(event, 'item') and event.item.type == 'tool_call_item': + elif hasattr(event, "item") and event.item.type == "tool_call_item": # Extract tool call information using the helper method call_id, tool_name, tool_arguments = _extract_tool_call_info(event.item.raw_item) tool_map[call_id] = tool_name @@ -671,7 +703,7 @@ async def convert_openai_to_agentex_events(stream_response): ) # Check for tool_call_output_item type (this is when a tool returns output) - elif hasattr(event, 'item') and event.item.type == 'tool_call_output_item': + elif hasattr(event, "item") and event.item.type == "tool_call_output_item": # Extract tool response information using the helper method call_id, tool_name, content = _extract_tool_response_info(tool_map, event.item.raw_item) tool_response_content = ToolResponseContent( @@ -687,4 +719,3 @@ async def convert_openai_to_agentex_events(stream_response): index=message_index, content=tool_response_content, ) - diff --git a/src/agentex/lib/core/services/adk/providers/openai.py b/src/agentex/lib/core/services/adk/providers/openai.py index 75e507d8a..d7c2ca126 100644 --- a/src/agentex/lib/core/services/adk/providers/openai.py +++ b/src/agentex/lib/core/services/adk/providers/openai.py @@ -14,15 +14,8 @@ from agents.guardrail import InputGuardrail, OutputGuardrail from agents.exceptions import InputGuardrailTripwireTriggered, OutputGuardrailTripwireTriggered from openai.types.responses import ( - ResponseCompletedEvent, - ResponseTextDeltaEvent, - ResponseFunctionToolCall, ResponseFunctionWebSearch, - ResponseOutputItemDoneEvent, ResponseCodeInterpreterToolCall, - ResponseReasoningSummaryPartDoneEvent, - ResponseReasoningSummaryPartAddedEvent, - ResponseReasoningSummaryTextDeltaEvent, ) # Local imports @@ -31,24 +24,14 @@ from agentex.lib.utils.mcp import redact_mcp_server_params from agentex.lib.utils.temporal import heartbeat_if_in_workflow from agentex.lib.core.tracing.tracer import AsyncTracer -from agentex.types.task_message_delta import ( - TextDelta, - ReasoningSummaryDelta, -) -from agentex.types.task_message_update import ( - StreamTaskMessageFull, - StreamTaskMessageDelta, -) +from agentex.lib.core.harness.emitter import UnifiedEmitter +from agentex.types.task_message_update import StreamTaskMessageFull from agentex.types.task_message_content import ( TextContent, - ReasoningContent, ToolRequestContent, ToolResponseContent, ) -from agentex.lib.core.services.adk.streaming import ( - StreamingService, - StreamingTaskMessageContext, -) +from agentex.lib.core.services.adk.streaming import StreamingService logger = logging.make_logger(__name__) @@ -733,8 +716,6 @@ async def run_agent_streamed_auto_send( if self.agentex_client is None: raise ValueError("Agentex client must be provided for auto_send methods") - tool_call_map: dict[str, ResponseFunctionToolCall] = {} - if self.tracer is None: raise RuntimeError("Tracer not initialized - ensure tracer is provided to OpenAIService") trace = self.tracer.trace(trace_id) @@ -761,12 +742,13 @@ async def run_agent_streamed_auto_send( ) as span: heartbeat_if_in_workflow("run agent streamed auto send") - # Consume the workflow-supplied created_at on the FIRST message - # opened by this activity (whichever streaming context opens first - # for this turn). That's the message that races the workflow's - # user-echo at the server. Subsequent messages in the same turn are - # separated by network/processing latency and rely on the server's - # wall clock. + # AGX1-378 restored: created_at is now threaded through + # UnifiedEmitter.auto_send_turn -> auto_send -> every + # streaming_task_message_context call, so the first agent message of + # the turn is stamped with the workflow-supplied timestamp (e.g. + # workflow.now()) just as the original inline loop did. + # The dispenser is still used below for guardrail-rejection messages, + # which open their own streaming contexts directly. _take_created_at = _make_created_at_dispenser(created_at) async with mcp_server_context(mcp_server_params, mcp_timeout_seconds) as servers: @@ -809,198 +791,28 @@ async def run_agent_streamed_auto_send( else: result = Runner.run_streamed(starting_agent=agent, input=input_list) - item_id_to_streaming_context: dict[str, StreamingTaskMessageContext] = {} - unclosed_item_ids: set[str] = set() - # Simple string to accumulate reasoning summary - current_reasoning_summary: str = "" + # Migrate onto the unified harness surface: wrap the streamed run + # as an OpenAITurn (provider -> canonical StreamTaskMessage* + # adapter) and let UnifiedEmitter.auto_send_turn drive delivery + + # tracing + usage. The previous ~270-line inline loop that hand- + # rolled per-item streaming contexts, reasoning handling, and + # span derivation now lives in the shared harness modules. + # Imported lazily: openai_turn pulls in agentex.lib.adk, which + # imports this service module, so an eager import would create a + # circular import at package init. + from agentex.lib.adk.providers._modules.openai_turn import OpenAITurn + + turn = OpenAITurn(result=result, model=model) + emitter = UnifiedEmitter( + task_id=task_id, + trace_id=trace_id, + parent_span_id=parent_span_id, + tracer=self.tracer, + streaming=self.streaming_service, + ) try: - # Process streaming events with TaskMessage creation - async for event in result.stream_events(): - heartbeat_if_in_workflow("processing stream event with auto send") - - if event.type == "run_item_stream_event": - if event.item.type == "tool_call_item": - tool_call_item = event.item.raw_item - - # Extract tool call information using the helper method - call_id, tool_name, tool_arguments = self._extract_tool_call_info(tool_call_item) - tool_call_map[call_id] = tool_call_item - - tool_request_content = ToolRequestContent( - author="agent", - tool_call_id=call_id, - name=tool_name, - arguments=tool_arguments, - ) - - # Create tool request using streaming context (immediate completion) - async with self.streaming_service.streaming_task_message_context( - task_id=task_id, - initial_content=tool_request_content, - created_at=_take_created_at(), - ) as streaming_context: - # The message has already been persisted, but we still need to send an upda - await streaming_context.stream_update( - update=StreamTaskMessageFull( - parent_task_message=streaming_context.task_message, - content=tool_request_content, - type="full", - ), - ) - - elif event.item.type == "tool_call_output_item": - tool_output_item = event.item.raw_item - - # Extract tool response information using the helper method - call_id, tool_name, content = self._extract_tool_response_info( - tool_call_map, tool_output_item - ) - - tool_response_content = ToolResponseContent( - author="agent", - tool_call_id=call_id, - name=tool_name, - content=content, - ) - - # Create tool response using streaming context (immediate completion) - async with self.streaming_service.streaming_task_message_context( - task_id=task_id, - initial_content=tool_response_content, - created_at=_take_created_at(), - ) as streaming_context: - # The message has already been persisted, but we still need to send an update - await streaming_context.stream_update( - update=StreamTaskMessageFull( - parent_task_message=streaming_context.task_message, - content=tool_response_content, - type="full", - ), - ) - - elif event.type == "raw_response_event": - if isinstance(event.data, ResponseTextDeltaEvent): - # Handle text delta - item_id = event.data.item_id - - # Check if we already have a streaming context for this item - if item_id not in item_id_to_streaming_context: - # Create a new streaming context for this item - streaming_context = self.streaming_service.streaming_task_message_context( - task_id=task_id, - initial_content=TextContent( - author="agent", - content="", - ), - created_at=_take_created_at(), - ) - # Open the streaming context - item_id_to_streaming_context[item_id] = await streaming_context.open() - unclosed_item_ids.add(item_id) - else: - streaming_context = item_id_to_streaming_context[item_id] - - # Stream the delta through the streaming service - await streaming_context.stream_update( - update=StreamTaskMessageDelta( - parent_task_message=streaming_context.task_message, - delta=TextDelta(text_delta=event.data.delta, type="text"), - type="delta", - ), - ) - # Reasoning step one: new summary part added - elif isinstance(event.data, ResponseReasoningSummaryPartAddedEvent): - # We need to create a new streaming context for this reasoning item - item_id = event.data.item_id - - # Reset the reasoning summary string - current_reasoning_summary = "" - - streaming_context = self.streaming_service.streaming_task_message_context( - task_id=task_id, - initial_content=ReasoningContent( - author="agent", - summary=[], - content=[], - type="reasoning", - style="active", - ), - created_at=_take_created_at(), - ) - - # Replace the existing streaming context (if it exists) - # Why do we replace? Cause all the reasoning parts use the same item_id! - item_id_to_streaming_context[item_id] = await streaming_context.open() - unclosed_item_ids.add(item_id) - - # Reasoning step two: handling summary text delta - elif isinstance(event.data, ResponseReasoningSummaryTextDeltaEvent): - # Accumulate the delta into the string - current_reasoning_summary += event.data.delta - streaming_context = item_id_to_streaming_context[item_id] - - # Stream the summary delta through the streaming service - await streaming_context.stream_update( - update=StreamTaskMessageDelta( - parent_task_message=streaming_context.task_message, - delta=ReasoningSummaryDelta( - summary_index=event.data.summary_index, - summary_delta=event.data.delta, - type="reasoning_summary", - ), - type="delta", - ), - ) - - # Reasoning step three: handling summary text done, closing the streaming context - elif isinstance(event.data, ResponseReasoningSummaryPartDoneEvent): - # Handle reasoning summary text completion - streaming_context = item_id_to_streaming_context[item_id] - - # Create the complete reasoning content with the accumulated summary - complete_reasoning_content = ReasoningContent( - author="agent", - summary=[current_reasoning_summary], - content=[], - type="reasoning", - style="static", - ) - - # Send a full message update with the complete reasoning content - await streaming_context.stream_update( - update=StreamTaskMessageFull( - parent_task_message=streaming_context.task_message, - content=complete_reasoning_content, - type="full", - ), - ) - - await streaming_context.close() - unclosed_item_ids.discard(item_id) - - elif isinstance(event.data, ResponseOutputItemDoneEvent): - # Handle item completion - item_id = event.data.item.id - - # Finish the streaming context (sends DONE event and updates message) - if item_id in item_id_to_streaming_context: - streaming_context = item_id_to_streaming_context[item_id] - await streaming_context.close() - if item_id in unclosed_item_ids: - unclosed_item_ids.remove(item_id) - - elif isinstance(event.data, ResponseCompletedEvent): - # All items complete, finish all remaining streaming contexts for this session - # Create a copy to avoid modifying set during iteration - remaining_items = list(unclosed_item_ids) - for item_id in remaining_items: - if ( - item_id in unclosed_item_ids and item_id in item_id_to_streaming_context - ): # Check if still unclosed - streaming_context = item_id_to_streaming_context[item_id] - await streaming_context.close() - unclosed_item_ids.discard(item_id) + await emitter.auto_send_turn(turn, created_at=created_at) except InputGuardrailTripwireTriggered as e: # Handle guardrail trigger by sending a rejection message @@ -1080,18 +892,6 @@ async def run_agent_streamed_auto_send( # Re-raise to let the activity handle it raise - finally: - # Cleanup: ensure all streaming contexts for this session are properly finished - # Create a copy to avoid modifying set during iteration - remaining_items = list(unclosed_item_ids) - for item_id in remaining_items: - if ( - item_id in unclosed_item_ids and item_id in item_id_to_streaming_context - ): # Check if still unclosed - streaming_context = item_id_to_streaming_context[item_id] - await streaming_context.close() - unclosed_item_ids.discard(item_id) - if span: span.output = { "new_items": [ diff --git a/tests/lib/adk/providers/test_openai_activities.py b/tests/lib/adk/providers/test_openai_activities.py index c933b6ce4..05094556d 100644 --- a/tests/lib/adk/providers/test_openai_activities.py +++ b/tests/lib/adk/providers/test_openai_activities.py @@ -335,23 +335,28 @@ async def mock_stream_events(): expected_params.tools = [CodeInterpreterTool(tool_config={"type": "code_interpreter"})] self._assert_starting_agent_params(starting_agent, expected_params) - # Verify streaming context received tool request and response updates - # Should have been called twice - once for tool request, once for response - assert mock_streaming_context.stream_update.call_count == 2 + # Under the unified harness, the OpenAI events are converted to canonical + # StreamTaskMessageFull events and auto_send posts each full tool message + # by opening a streaming context with the content as initial_content and + # closing it (no stream_update). So assert on the opened contents. + opened = mock_streaming_context.opened_contents + tool_contents = [ + c + for c in opened + if getattr(c, "type", None) in ("tool_request", "tool_response") + ] + assert len(tool_contents) == 2 - # First call should be tool request - first_call = mock_streaming_context.stream_update.call_args_list[0] - first_update = first_call[1]["update"] # keyword argument - assert hasattr(first_update, "content") - assert first_update.content.name == "code_interpreter" - assert first_update.content.tool_call_id == "code_interpreter_call_123" + # First opened context is the tool request. + first = tool_contents[0] + assert first.type == "tool_request" + assert first.name == "code_interpreter" + assert first.tool_call_id == "code_interpreter_call_123" - # Second call should be tool response - second_call = mock_streaming_context.stream_update.call_args_list[1] - second_update = second_call[1]["update"] # keyword argument - assert hasattr(second_update, "content") - assert second_update.content.name == "code_interpreter_call" - assert second_update.content.tool_call_id == "code_interpreter_call_123" + # Second opened context is the tool response. + second = tool_contents[1] + assert second.type == "tool_response" + assert second.tool_call_id == "code_interpreter_call_123" def _create_mock_tracer(self): """Helper method to create a properly mocked tracer with async context manager support.""" @@ -613,6 +618,48 @@ def _assert_tools_conversion(self, starting_agent, tools_case, _original_tools): else: raise ValueError(f"Unknown tools_case: {tools_case}") + @patch("agents.Runner.run_streamed") + async def test_run_agent_streamed_auto_send_forwards_created_at(self, mock_runner_run_streamed): + """created_at is forwarded to every streaming context opened by auto_send_turn (AGX1-378).""" + from datetime import datetime, timezone + + from agentex.lib.core.temporal.activities.adk.providers.openai_activities import ( + RunAgentStreamedAutoSendParams, + ) + + deterministic_ts = datetime(2025, 1, 15, 12, 0, 0, tzinfo=timezone.utc) + + mock_streaming_result = self._create_streaming_result_mock() + + async def _no_events(): + return + yield # make it an async generator + + mock_streaming_result.stream_events = _no_events + mock_runner_run_streamed.return_value = mock_streaming_result + + mock_tracer = self._create_mock_tracer() + openai_service, openai_activities, env = self._create_test_setup(mock_tracer) + mock_ctx, recorded_created_ats = self._setup_streaming_service_mocks_with_created_at(openai_service) + + params = RunAgentStreamedAutoSendParams( + input_list=[{"role": "user", "content": "hello"}], + mcp_server_params=[], + agent_name="test_agent", + agent_instructions="You are a helpful assistant", + trace_id="test-trace-id", + parent_span_id="test-span-id", + task_id="test-task-id", + created_at=deterministic_ts, + ) + + await env.run(openai_activities.run_agent_streamed_auto_send, params) + + assert all(ts == deterministic_ts for ts in recorded_created_ats), ( + f"Expected all streaming contexts to receive created_at={deterministic_ts!r}, " + f"got: {recorded_created_ats!r}" + ) + def _setup_streaming_service_mocks(self, openai_service): """Helper method to setup streaming service mocks for run_agent_auto_send.""" from unittest.mock import AsyncMock @@ -635,21 +682,64 @@ def _setup_streaming_service_mocks(self, openai_service): mock_streaming_context.task_message = mock_task_message mock_streaming_context.stream_update = AsyncMock() + # Record the initial_content passed to each opened streaming context. + # The unified harness auto_send path posts full tool messages by opening + # a context with initial_content and closing it (no stream_update), so + # assertions inspect the opened contents rather than stream_update calls. + opened_contents: list = [] + # Create a proper async context manager mock from contextlib import asynccontextmanager from unittest.mock import AsyncMock @asynccontextmanager - async def mock_streaming_context_manager(*_args, **_kwargs): + async def mock_streaming_context_manager(*_args, **kwargs): + if "initial_content" in kwargs: + opened_contents.append(kwargs["initial_content"]) yield mock_streaming_context mock_streaming_service.streaming_task_message_context = mock_streaming_context_manager + # Expose the recorded contents on the returned context mock for assertions. + mock_streaming_context.opened_contents = opened_contents openai_service.streaming_service = mock_streaming_service openai_service.agentex_client = mock_agentex_client return mock_streaming_context + def _setup_streaming_service_mocks_with_created_at(self, openai_service): + """Like _setup_streaming_service_mocks but also records every created_at kwarg.""" + from contextlib import asynccontextmanager + from unittest.mock import AsyncMock + + from agentex.types.task_message import TaskMessage + + mock_streaming_service = AsyncMock() + mock_agentex_client = AsyncMock() + + mock_streaming_context = AsyncMock() + mock_task_message = Mock(spec=TaskMessage) + mock_task_message.id = "test-task-message-id" + mock_task_message.task_id = "test-task-id" + mock_task_message.content = {"type": "text", "content": "test"} + mock_streaming_context.task_message = mock_task_message + mock_streaming_context.stream_update = AsyncMock() + + recorded_created_ats: list = [] + + @asynccontextmanager + async def mock_ctx_manager(*_args, **kwargs): + recorded_created_ats.append(kwargs.get("created_at")) + yield mock_streaming_context + + mock_streaming_service.streaming_task_message_context = mock_ctx_manager + mock_streaming_context.opened_contents = [] + + openai_service.streaming_service = mock_streaming_service + openai_service.agentex_client = mock_agentex_client + + return mock_streaming_context, recorded_created_ats + def _create_code_interpreter_tool_call_mock(self, call_id="code_interpreter_call_123"): """Helper to create ResponseCodeInterpreterToolCall mock objects.""" return ResponseCodeInterpreterToolCall( @@ -680,6 +770,9 @@ def _create_streaming_result_mock(self, final_output="Code executed successfully mock_streaming_result = Mock(spec=RunResultStreaming) mock_streaming_result.final_output = final_output mock_streaming_result.new_items = [] + # OpenAITurn reads raw_responses after stream exhaustion to aggregate + # usage; provide an empty list so usage normalizes to model-only. + mock_streaming_result.raw_responses = [] mock_streaming_result.final_input_list = [ {"role": "user", "content": "Run some Python code"}, {"role": "assistant", "content": final_output}, diff --git a/tests/lib/adk/providers/test_openai_turn.py b/tests/lib/adk/providers/test_openai_turn.py new file mode 100644 index 000000000..023b0ed4e --- /dev/null +++ b/tests/lib/adk/providers/test_openai_turn.py @@ -0,0 +1,246 @@ +"""Tests for OpenAITurn and its usage mapping. + +OpenAITurn adapts an OpenAI Agents SDK streamed run onto the harness +``HarnessTurn`` protocol. These tests cover: +- ``openai_usage_to_turn_usage`` (full usage, None, real zeros) +- ``_aggregate_usage`` (empty, single, multiple ModelResponses) +- ``OpenAITurn.events`` driven by an injected canonical stream (bypassing the + OpenAI->canonical converter), plus ``usage()`` before/after exhaustion +- the ``ValueError`` guard when neither ``result`` nor ``stream`` is supplied +""" + +import types as _types + +import pytest +from agents.usage import Usage +from openai.types.responses.response_usage import InputTokensDetails, OutputTokensDetails + +from agentex.types.text_content import TextContent +from agentex.types.task_message_delta import TextDelta +from agentex.types.task_message_update import ( + StreamTaskMessageDone, + StreamTaskMessageDelta, + StreamTaskMessageStart, +) + + +def _import_target(): + from agentex.lib.adk.providers._modules.openai_turn import ( + OpenAITurn, + _aggregate_usage, + openai_usage_to_turn_usage, + ) + + return OpenAITurn, _aggregate_usage, openai_usage_to_turn_usage + + +# --------------------------------------------------------------------------- +# openai_usage_to_turn_usage +# --------------------------------------------------------------------------- + + +def test_usage_mapping_full(): + _, _, openai_usage_to_turn_usage = _import_target() + usage = Usage( + requests=3, + input_tokens=100, + input_tokens_details=InputTokensDetails(cached_tokens=20), + output_tokens=50, + output_tokens_details=OutputTokensDetails(reasoning_tokens=10), + total_tokens=150, + ) + turn_usage = openai_usage_to_turn_usage(usage, model="gpt-4o") + + assert turn_usage.model == "gpt-4o" + assert turn_usage.num_llm_calls == 3 + assert turn_usage.input_tokens == 100 + assert turn_usage.cached_input_tokens == 20 + assert turn_usage.output_tokens == 50 + assert turn_usage.reasoning_tokens == 10 + assert turn_usage.total_tokens == 150 + + +def test_usage_mapping_none_usage(): + _, _, openai_usage_to_turn_usage = _import_target() + turn_usage = openai_usage_to_turn_usage(None, model="gpt-4o") + + assert turn_usage.model == "gpt-4o" + assert turn_usage.num_llm_calls == 0 + assert turn_usage.input_tokens is None + assert turn_usage.output_tokens is None + assert turn_usage.total_tokens is None + + +def test_usage_mapping_real_zeros_are_preserved(): + # A cache hit can legitimately produce 0 output tokens; a present-but-zero + # value must survive as 0, not be coerced to None. + _, _, openai_usage_to_turn_usage = _import_target() + usage = Usage( + requests=1, + input_tokens=0, + input_tokens_details=InputTokensDetails(cached_tokens=0), + output_tokens=0, + output_tokens_details=OutputTokensDetails(reasoning_tokens=0), + total_tokens=0, + ) + turn_usage = openai_usage_to_turn_usage(usage, model="m") + + assert turn_usage.input_tokens == 0 + assert turn_usage.cached_input_tokens == 0 + assert turn_usage.output_tokens == 0 + assert turn_usage.reasoning_tokens == 0 + assert turn_usage.total_tokens == 0 + assert turn_usage.num_llm_calls == 1 + + +# --------------------------------------------------------------------------- +# _aggregate_usage +# --------------------------------------------------------------------------- + + +def _resp(usage): + return _types.SimpleNamespace(usage=usage) + + +def test_aggregate_usage_empty(): + _, _aggregate_usage, _ = _import_target() + assert _aggregate_usage([]) is None + + +def test_aggregate_usage_single(): + _, _aggregate_usage, _ = _import_target() + usage = Usage(requests=1, input_tokens=10, output_tokens=5, total_tokens=15) + total = _aggregate_usage([_resp(usage)]) + + assert total is not None + assert total.requests == 1 + assert total.input_tokens == 10 + assert total.output_tokens == 5 + assert total.total_tokens == 15 + + +def test_aggregate_usage_multiple(): + _, _aggregate_usage, _ = _import_target() + u1 = Usage( + requests=1, + input_tokens=10, + input_tokens_details=InputTokensDetails(cached_tokens=2), + output_tokens=5, + output_tokens_details=OutputTokensDetails(reasoning_tokens=1), + total_tokens=15, + ) + u2 = Usage( + requests=2, + input_tokens=20, + input_tokens_details=InputTokensDetails(cached_tokens=3), + output_tokens=7, + output_tokens_details=OutputTokensDetails(reasoning_tokens=4), + total_tokens=27, + ) + # A response without usage must be skipped, not crash the aggregation. + total = _aggregate_usage([_resp(u1), _resp(None), _resp(u2)]) + + assert total is not None + assert total.requests == 3 + assert total.input_tokens == 30 + assert total.output_tokens == 12 + assert total.total_tokens == 42 + assert total.input_tokens_details.cached_tokens == 5 + assert total.output_tokens_details.reasoning_tokens == 5 + + +# --------------------------------------------------------------------------- +# OpenAITurn.events / usage / construction +# --------------------------------------------------------------------------- + + +async def _canonical_stream(events): + for e in events: + yield e + + +@pytest.mark.asyncio +async def test_turn_events_forwards_injected_stream(): + OpenAITurn, _, _ = _import_target() + events = [ + StreamTaskMessageStart(type="start", index=0, content=TextContent(type="text", author="agent", content="")), + StreamTaskMessageDelta(type="delta", index=0, delta=TextDelta(type="text", text_delta="Hi")), + StreamTaskMessageDone(type="done", index=0), + ] + turn = OpenAITurn(stream=_canonical_stream(events), model="gpt-4o") + + out = [e async for e in turn.events] + assert out == events + + +@pytest.mark.asyncio +async def test_turn_usage_before_and_after_exhaustion_with_injected_stream(): + OpenAITurn, _, _ = _import_target() + events = [ + StreamTaskMessageStart(type="start", index=0, content=TextContent(type="text", author="agent", content="")), + StreamTaskMessageDone(type="done", index=0), + ] + turn = OpenAITurn(stream=_canonical_stream(events), model="gpt-4o") + + # Before exhaustion: usage carries only the model name. + before = turn.usage() + assert before.model == "gpt-4o" + assert before.input_tokens is None + + async for _ in turn.events: + pass + + # With an injected stream there is no run to read usage from, so usage + # stays model-only after exhaustion. + after = turn.usage() + assert after.model == "gpt-4o" + assert after.input_tokens is None + + +@pytest.mark.asyncio +async def test_turn_usage_populated_from_result_after_exhaustion(): + OpenAITurn, _, _ = _import_target() + + canonical = [ + StreamTaskMessageStart(type="start", index=0, content=TextContent(type="text", author="agent", content="")), + StreamTaskMessageDone(type="done", index=0), + ] + + class _FakeResult: + def __init__(self): + self.raw_responses = [ + _resp(Usage(requests=1, input_tokens=8, output_tokens=4, total_tokens=12)), + ] + + def stream_events(self): + # OpenAITurn passes this to convert_openai_to_agentex_events; we + # monkeypatch that converter below so this can yield canonical events. + return _canonical_stream(canonical) + + import agentex.lib.adk.providers._modules.openai_turn as mod + + async def _passthrough(stream): + async for e in stream: + yield e + + original = mod.convert_openai_to_agentex_events + mod.convert_openai_to_agentex_events = _passthrough + try: + turn = OpenAITurn(result=_FakeResult(), model="gpt-4o") + out = [e async for e in turn.events] + finally: + mod.convert_openai_to_agentex_events = original + + assert out == canonical + usage = turn.usage() + assert usage.model == "gpt-4o" + assert usage.num_llm_calls == 1 + assert usage.input_tokens == 8 + assert usage.output_tokens == 4 + assert usage.total_tokens == 12 + + +def test_turn_requires_result_or_stream(): + OpenAITurn, _, _ = _import_target() + with pytest.raises(ValueError, match="either"): + OpenAITurn() diff --git a/tests/lib/core/harness/conformance/test_openai_conformance.py b/tests/lib/core/harness/conformance/test_openai_conformance.py new file mode 100644 index 000000000..a07713546 --- /dev/null +++ b/tests/lib/core/harness/conformance/test_openai_conformance.py @@ -0,0 +1,169 @@ +"""OpenAI conformance fixtures for the shared harness span-derivation engine. + +The cross-channel guarantee is that yield-delivery and auto-send observe the +SAME canonical ``StreamTaskMessage*`` stream, so span derivation over that +stream must be deterministic and idempotent regardless of channel. These +fixtures express the canonical sequences an OpenAI turn produces (text, +tool-call, reasoning, and a combined multi-step turn) and assert that property. + +Registry hazard (see conformance/runner.py): ``_REGISTRY`` is process-global and +collection order across modules is not guaranteed. To stay deterministic this +module keeps its OWN fixture list and parametrizes over THAT list, rather than +over ``all_fixtures()``. It still calls ``register()`` so the cross-module +conformance suite can see these fixtures too. +""" + +from __future__ import annotations + +import pytest + +from agentex.types.text_content import TextContent +from agentex.types.reasoning_content import ReasoningContent +from agentex.types.task_message_delta import TextDelta, ReasoningSummaryDelta +from agentex.types.task_message_update import ( + StreamTaskMessageDone, + StreamTaskMessageFull, + StreamTaskMessageDelta, + StreamTaskMessageStart, +) +from agentex.types.tool_request_content import ToolRequestContent +from agentex.types.tool_response_content import ToolResponseContent + +from .runner import Fixture, register, derive_all + +_OPENAI_FIXTURES: list[Fixture] = [] + + +def _add(fixture: Fixture) -> None: + """Register both module-locally (for parametrization) and globally.""" + _OPENAI_FIXTURES.append(fixture) + register(fixture) + + +# Text-only turn: start -> deltas -> done. No spans are derived from plain text. +_add( + Fixture( + name="openai-text-only", + events=[ + StreamTaskMessageStart( + type="start", + index=0, + content=TextContent(type="text", author="agent", content=""), + ), + StreamTaskMessageDelta(type="delta", index=0, delta=TextDelta(type="text", text_delta="Hel")), + StreamTaskMessageDelta(type="delta", index=0, delta=TextDelta(type="text", text_delta="lo")), + StreamTaskMessageDone(type="done", index=0), + ], + ) +) + +# Tool-call turn: the OpenAI converter emits a single Full(ToolRequestContent) +# for the call and a Full(ToolResponseContent) for the result, matched by +# tool_call_id. Mirrors convert_openai_to_agentex_events' tool path. +_add( + Fixture( + name="openai-tool-call", + events=[ + StreamTaskMessageFull( + type="full", + index=0, + content=ToolRequestContent( + type="tool_request", + author="agent", + tool_call_id="call_1", + name="get_weather", + arguments={"city": "SF"}, + ), + ), + StreamTaskMessageFull( + type="full", + index=1, + content=ToolResponseContent( + type="tool_response", + author="agent", + tool_call_id="call_1", + name="get_weather", + content="72F", + ), + ), + ], + ) +) + +# Reasoning turn: start(ReasoningContent) -> summary deltas -> done. Span +# derivation opens a reasoning span on Start and closes it on the index's Done. +_add( + Fixture( + name="openai-reasoning", + events=[ + StreamTaskMessageStart( + type="start", + index=0, + content=ReasoningContent(type="reasoning", author="agent", summary=[], content=[], style="active"), + ), + StreamTaskMessageDelta( + type="delta", + index=0, + delta=ReasoningSummaryDelta(type="reasoning_summary", summary_index=0, summary_delta="thinking"), + ), + StreamTaskMessageDone(type="done", index=0), + ], + ) +) + +# Multi-step turn: reasoning, then a tool round, then the final answer text. +_add( + Fixture( + name="openai-multi-step", + events=[ + StreamTaskMessageStart( + type="start", + index=0, + content=ReasoningContent(type="reasoning", author="agent", summary=[], content=[], style="active"), + ), + StreamTaskMessageDelta( + type="delta", + index=0, + delta=ReasoningSummaryDelta(type="reasoning_summary", summary_index=0, summary_delta="plan"), + ), + StreamTaskMessageDone(type="done", index=0), + StreamTaskMessageFull( + type="full", + index=1, + content=ToolRequestContent( + type="tool_request", + author="agent", + tool_call_id="call_2", + name="search", + arguments={"q": "x"}, + ), + ), + StreamTaskMessageFull( + type="full", + index=2, + content=ToolResponseContent( + type="tool_response", + author="agent", + tool_call_id="call_2", + name="search", + content="result", + ), + ), + StreamTaskMessageStart( + type="start", + index=3, + content=TextContent(type="text", author="agent", content=""), + ), + StreamTaskMessageDelta(type="delta", index=3, delta=TextDelta(type="text", text_delta="done")), + StreamTaskMessageDone(type="done", index=3), + ], + ) +) + + +@pytest.mark.parametrize("fixture", _OPENAI_FIXTURES, ids=lambda f: f.name) +def test_openai_span_derivation_is_deterministic(fixture): + """Deriving twice over the same canonical events yields identical signals, + which is exactly what makes yield-delivery and auto-send equivalent (both + observe the same stream).""" + assert derive_all(fixture.events) == derive_all(fixture.events)