feat(openai-agents): migrate onto the unified harness surface#416
Open
declan-scale wants to merge 2 commits into
Open
feat(openai-agents): migrate onto the unified harness surface#416declan-scale wants to merge 2 commits into
declan-scale wants to merge 2 commits into
Conversation
| logger.info(f"Running harness OpenAI agent for task {params.task_id}") | ||
|
|
||
| agent = create_agent() | ||
| result = Runner.run_streamed(starting_agent=agent, input=params.user_message) |
There was a problem hiding this comment.
Multi-turn conversation history lost in 140_harness_openai tutorial
- Bug
- Each turn creates a fresh agent and passes only the latest user message to
Runner.run_streamed, so the LLM has no memory of prior turns.
- Each turn creates a fresh agent and passes only the latest user message to
- Cause
- The workflow accumulates no
_messageslist andRunHarnessAgentParamshas no history field;activities.py:47passes a bare string instead of a message list.
- The workflow accumulates no
- Fix
- Add a
_messageslist to the workflow (like 130_langgraph), include it inRunHarnessAgentParams, append the assistant reply after each turn, and pass the full list toRunner.run_streamed.
- Add a
Artifacts
Supporting artifact from the T-Rex run
- Contains supporting evidence from the run (text/markdown; charset=utf-8).
Add OpenAITurn, a HarnessTurn adapter that wraps an OpenAI Agents SDK streamed run (Runner.run_streamed) and converts its native events into the canonical StreamTaskMessage* stream via convert_openai_to_agentex_events, aggregating per-response usage into a provider-independent TurnUsage after stream exhaustion. Defensive getattr access preserves real zeros. Refactor OpenAIService.run_agent_streamed_auto_send to drive delivery, tracing, and usage through UnifiedEmitter.auto_send_turn(OpenAITurn(...)), replacing the ~270-line inline streaming loop. Guardrail tripwire handling and the RunResultStreaming return type are preserved; the created_at first-message ordering limitation under the unified path is documented. Docstring-deprecate SyncStreamingModel/SyncStreamingProvider (no runtime warning). Add unit tests for OpenAITurn + usage mapping, OpenAI conformance fixtures (module-local registry), update the streamed-auto-send activity test to the new full-message contract, and add three tutorials (sync 060, async 130, temporal 140) demonstrating OpenAITurn with yield_turn / auto_send_turn, each with an offline test. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…378) Thread the workflow-supplied created_at through UnifiedEmitter.auto_send_turn(turn, created_at=created_at) so the first agent message of the turn is stamped with the deterministic timestamp (e.g. workflow.now()) just as the original inline loop did before the unified-harness migration. The foundation (b4b8b33) wired auto_send_turn to accept and forward created_at to every streaming_task_message_context call. This commit connects the call site in run_agent_streamed_auto_send to that new parameter, restoring the behaviour that the migration comment documented as a known trade-off. Update the stale limitation comment to reflect the fix. Add test_run_agent_streamed_auto_send_forwards_created_at, which drives the activity through a fake stream with a pinned datetime and asserts every streaming context receives that datetime. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
d1c5c65 to
ab92b50
Compare
Contributor
Author
|
@greptile review |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What
PR 6 of the unified-harness-surface series: migrate the OpenAI Agents SDK integration onto the shared harness surface.
Library
OpenAITurn(src/agentex/lib/adk/providers/_modules/openai_turn.py): aHarnessTurnadapter that wraps aRunner.run_streamedresult. It converts the SDK's native events into the canonicalStreamTaskMessage*stream via the existingconvert_openai_to_agentex_events, and after the stream is exhausted readsresult.raw_responsesto aggregate per-response usage into a provider-independentTurnUsage.openai_usage_to_turn_usage(usage, model)mapsagents.Usage->TurnUsagewith defensivegetattraccess so present-but-zero values (e.g. 0 output tokens on a cache hit) survive as0, notNone._aggregate_usage(raw_responses)sums usage acrossModelResponses viaUsage.add, skipping responses without usage.result=(a streamed run) orstream=(a pre-built canonical stream, for tests); raisesValueErrorif neither.coalesce_tool_requestsis a no-op kept for API parity.OpenAIService.run_agent_streamed_auto_send: replaced the ~270-line inline streaming/reasoning/span loop withUnifiedEmitter.auto_send_turn(OpenAITurn(result=result, model=model)). Guardrail tripwire handling and theRunResultStreamingreturn type are preserved. Thecreated_atfirst-message ordering limitation under the unified path is documented in a comment.OpenAITurnis imported lazily inside the method to avoid a circular import at package init.SyncStreamingModel/SyncStreamingProvider: docstring-deprecated (no runtime warning), pointing at the harness pattern.Tests
tests/lib/adk/providers/test_openai_turn.py: usage mapping (full / None / real zeros),_aggregate_usage(empty / single / multiple),eventsdriven by an injected canonical stream,usage()before/after exhaustion (including the result-backed path), and theValueErrorguard.tests/lib/core/harness/conformance/test_openai_conformance.py: text-only, tool-call, reasoning, and multi-step canonical fixtures; registers module-locally and parametrizes over its own list to avoid the cross-module global-registry hazard.tests/lib/adk/providers/test_openai_activities.py: updated the streamed-auto-send activity test to the new contract (full tool messages are posted by opening a context withinitial_contentand closing it, nostream_update).Tutorials
Three tutorials demonstrating the same
OpenAITurnacross delivery modes, each with an offline test (no server / Redis / Temporal / API key required):examples/tutorials/00_sync/060_harness_openai—UnifiedEmitter.yield_turnexamples/tutorials/10_async/00_base/130_harness_openai—UnifiedEmitter.auto_send_turnexamples/tutorials/10_async/10_temporal/140_harness_openai—auto_send_turninside a custom Temporal activityVerification
./scripts/lint— clean (ruff + pyright, 0 errors)tests/suite — 1016 passed, 1376 skipped🤖 Generated with Claude Code
Greptile Summary
This PR migrates the OpenAI Agents SDK integration onto the shared unified harness surface, replacing the ~270-line inline streaming/reasoning/span loop in
run_agent_streamed_auto_sendwithOpenAITurn+UnifiedEmitter.auto_send_turn. Three tutorial directories (sync yield, async auto-send, Temporal activity) are added alongside unit tests and conformance fixtures.OpenAITurnis a newHarnessTurnadapter that convertsRunner.run_streamedevents into the canonicalStreamTaskMessage*stream and lazily aggregates per-response token usage fromraw_responsesafter stream exhaustion.OpenAIService.run_agent_streamed_auto_sendis simplified from a hand-rolled per-item streaming context loop to a singleUnifiedEmitter.auto_send_turn(OpenAITurn(...))call; guardrail tripwire handling andRunResultStreamingreturn type are preserved.SyncStreamingModel/SyncStreamingProviderreceive docstring-level deprecation notices pointing to the harness pattern.Confidence Score: 4/5
The core streaming refactor is clean, but token usage will always be empty in TurnResult — the emitter captures it before the stream is consumed.
The ~270-line inline loop is cleanly replaced and all observable behaviour (message delivery, guardrail handling, created_at threading) is preserved. One defect stands out: UnifiedEmitter.auto_send_turn evaluates turn.usage() eagerly — before consuming turn.events — so the lazy raw_responses aggregation that OpenAITurn performs after stream exhaustion never makes it into TurnResult.usage. No existing callers in this PR read turn_result.usage, so the regression is silent today, but any downstream consumer of token usage from the auto-send path will receive null counts. The fix is a two-line change in emitter.py.
src/agentex/lib/core/harness/emitter.py (not in diff) — auto_send_turn needs to call turn.usage() after awaiting auto_send, not before.
Important Files Changed
Sequence Diagram
%%{init: {'theme': 'neutral'}}%% sequenceDiagram participant Caller as Activity / ACP Handler participant Emitter as UnifiedEmitter participant Turn as OpenAITurn participant AutoSend as auto_send() participant Runner as Runner.run_streamed participant Streaming as adk.streaming Caller->>Runner: run_streamed(agent, input) Runner-->>Turn: RunResultStreaming (result) Caller->>Emitter: auto_send_turn(turn, created_at) Note over Emitter,Turn: turn.usage() evaluated HERE (pre-exhaustion) Emitter->>AutoSend: "auto_send(turn.events, usage=TurnUsage(model), ...)" loop For each canonical event AutoSend->>Turn: next event via _iter_events Turn->>Runner: stream_events() Runner-->>Turn: raw OpenAI event Turn-->>AutoSend: "StreamTaskMessage*" AutoSend->>Streaming: streaming_task_message_context(...) end Note over Turn: Stream exhausted, _usage updated from raw_responses AutoSend-->>Emitter: "TurnResult(final_text, usage=stale TurnUsage)" Emitter-->>Caller: TurnResult with empty token counts%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%% sequenceDiagram participant Caller as Activity / ACP Handler participant Emitter as UnifiedEmitter participant Turn as OpenAITurn participant AutoSend as auto_send() participant Runner as Runner.run_streamed participant Streaming as adk.streaming Caller->>Runner: run_streamed(agent, input) Runner-->>Turn: RunResultStreaming (result) Caller->>Emitter: auto_send_turn(turn, created_at) Note over Emitter,Turn: turn.usage() evaluated HERE (pre-exhaustion) Emitter->>AutoSend: "auto_send(turn.events, usage=TurnUsage(model), ...)" loop For each canonical event AutoSend->>Turn: next event via _iter_events Turn->>Runner: stream_events() Runner-->>Turn: raw OpenAI event Turn-->>AutoSend: "StreamTaskMessage*" AutoSend->>Streaming: streaming_task_message_context(...) end Note over Turn: Stream exhausted, _usage updated from raw_responses AutoSend-->>Emitter: "TurnResult(final_text, usage=stale TurnUsage)" Emitter-->>Caller: TurnResult with empty token countsComments Outside Diff (2)
src/agentex/lib/adk/providers/_modules/sync_provider.py, line 564-572 (link)The converter starts OpenAI reasoning output as
TextContent, but the shared span derivation opens reasoning spans only when the start content has typereasoning. Real OpenAI reasoning streams therefore flow through as text starts, so the unified harness never derives the reasoning span that the new conformance fixture expects.Prompt To Fix With AI
src/agentex/lib/core/services/adk/providers/openai.py, line 794 (link)previous_response_idis accepted as a parameter on line 681 but is never forwarded toRunner.run_streamedon lines 794-797. The migrated method uses only a 2-branch if/else (max_turns or not), while all three sibling methods use a 4-branch matrix that correctly forwardsprevious_response_id.Runner.run_streamedcall was simplified to 2 branches, dropping theprevious_response_idforwarding. The# noqa: ARG002annotation on line 681 suppressed the linter warning that would have caught the unused argument.run_agent_streamed(lines 632-646), forwardingprevious_response_idtoRunner.run_streamedwhen it is not None.Artifacts
Supporting artifact from the T-Rex run
Prompt To Fix All With AI
Reviews (3): Last reviewed commit: "fix(openai-agents): restore created_at d..." | Re-trigger Greptile