Skip to content

feat(langgraph): migrate LangGraph harness onto unified surface#417

Open
declan-scale wants to merge 9 commits into
declan-scale/unified-harness-surfacefrom
declan-scale/pr5-langgraph
Open

feat(langgraph): migrate LangGraph harness onto unified surface#417
declan-scale wants to merge 9 commits into
declan-scale/unified-harness-surfacefrom
declan-scale/pr5-langgraph

Conversation

@declan-scale

@declan-scale declan-scale commented Jun 18, 2026

Copy link
Copy Markdown
Contributor

Summary

Migrates the LangGraph harness onto the unified harness surface introduced in PR 4 (pydantic-ai). Implements 12 tasks covering the new LangGraphTurn adapter, bespoke helper rewrites, offline integration tests, conformance fixtures, tutorial agents, and CI matrix.

New surface:

turn = LangGraphTurn(stream, model=model_name)
# Sync HTTP ACP
async for event in emitter.yield_turn(turn):
    yield event
# Async / temporal
result = await emitter.auto_send_turn(turn)

Key implementation points:

  • LangGraphTurn wraps LangGraph astream() and implements HarnessTurn (tasks 1-2)
  • stream_langgraph_events reimplemented on UnifiedEmitter (task 4)
  • _langgraph_tracing.py create_langgraph_tracing_handler marked deprecated with warnings.warn(DeprecationWarning) (task 3)
  • AGX1-377 documented: LangGraph emits tool requests as StreamTaskMessageFull (not Start+Delta+Done); SpanDeriver does not produce tool spans from Full events today (tracked in AGX1-373)
  • Usage timing: LangGraphTurn.usage() is populated via on_final_ai_message callback during event iteration; TurnResult.usage is a pre-iteration snapshot — callers should read turn.usage() after auto_send_turn returns
  • Added AsyncGenerator return type annotation to convert_langgraph_to_agentex_events and _generate_events to fix pyright inference (was treating them as coroutines)

Tests added (tasks 5-8, 219 passing):

  • test_langgraph_sync.py: 11 unit tests for convert_langgraph_to_agentex_events + deprecation
  • test_langgraph_turn.py: 19 unit tests for LangGraphTurn + langgraph_usage_to_turn_usage
  • test_langgraph_async.py: 6 characterization tests for the unified stream_langgraph_events
  • test_langgraph_sync_unified.py: 6 passthrough + span derivation tests
  • test_langgraph_conformance.py: 4 conformance fixtures (text-only, single-tool, reasoning, multi-step)
  • test_harness_langgraph_sync.py: 6 offline integration tests (yield channel)
  • test_harness_langgraph_async.py: 7 offline integration tests (auto_send channel)
  • test_harness_langgraph_temporal.py: 5 offline integration tests (temporal channel)

Tutorial agents (task 9):

  • examples/tutorials/00_sync/harness_langgraph/ (s-harness-langgraph) — sync, yield_turn
  • examples/tutorials/10_async/00_base/harness_langgraph/ (a-harness-langgraph) — async, auto_send_turn
  • examples/tutorials/10_async/10_temporal/harness_langgraph/ (at-harness-langgraph) — temporal, LangGraphPlugin + emit_langgraph_messages

CI (task 10): Enabled live-matrix job in harness-integration.yml with 3-way matrix over [sync, async, temporal] running offline LangGraph integration tests.

Test plan

  • uv run --all-packages --all-extras pytest tests/lib/core/harness/ tests/lib/adk/ -v — 219 passed
  • ./scripts/lint — 0 errors, 0 warnings (ruff + pyright)
  • Live agent smoke test (requires running AgentEx server + LLM keys)

🤖 Generated with Claude Code

Greptile Summary

This PR migrates the LangGraph harness onto the unified harness surface (LangGraphTurn + UnifiedEmitter), replacing ~180 lines of bespoke async streaming logic with a thin adapter and adding 219 tests across sync, async, temporal, and conformance suites.

  • LangGraphTurn implements the HarnessTurn protocol, delegating event generation to convert_langgraph_to_agentex_events and capturing usage via an on_final_ai_message callback.
  • stream_langgraph_events is reimplemented as a one-liner over UnifiedEmitter.auto_send_turn; the old bespoke async handler is fully removed.
  • Reasoning block bug: in _langgraph_sync.py, the StreamTaskMessageStart emitted when a reasoning model returns a "reasoning" block uses TextContent instead of ReasoningContent. ReasoningContent is not imported in the file. The conformance fixture constructs the correct events by hand but never passes them through the converter, so this goes untested and undetected.

Confidence Score: 4/5

Safe to merge for text-only and tool-calling agents; reasoning-model agents (o1, gpt-o3) will emit malformed event streams until the TextContent/ReasoningContent mismatch in _langgraph_sync.py is fixed.

The core migration is solid and well-tested. The issue is in the reasoning block path of _langgraph_sync.py: when a reasoning model returns a "reasoning" typed block, the code opens the streaming context with TextContent instead of ReasoningContent. ReasoningContent is not even imported in the file. The conformance fixture constructs the correct events manually but never validates the converter's output, so the bug is invisible to the test suite. Any deployment using a reasoning model through the sync or async LangGraph harness will produce a type-mismatched event stream.

src/agentex/lib/adk/_modules/_langgraph_sync.py — the reasoning block StreamTaskMessageStart at line ~151

Important Files Changed

Filename Overview
src/agentex/lib/adk/_modules/_langgraph_turn.py New HarnessTurn adapter wrapping LangGraph astream(); usage capture via on_final_ai_message callback is clean; protocol conformance, model passthrough, and empty-stream cases are well-tested.
src/agentex/lib/adk/_modules/_langgraph_sync.py Reasoning block StreamTaskMessageStart emits TextContent instead of ReasoningContent; ReasoningContent is not imported. No test exercises the reasoning-block path, so this bug passes the full test suite silently.
src/agentex/lib/adk/_modules/_langgraph_async.py Heavily simplified by delegating to LangGraphTurn + UnifiedEmitter.auto_send_turn; bespoke async implementation removed; workflow_now_if_in_workflow correctly plumbed for Temporal created_at.
src/agentex/lib/adk/_modules/_langgraph_tracing.py Deprecation is docstring-only (no runtime warnings.warn); PR description says otherwise, but the test explicitly confirms and documents the intentional choice to avoid breaking callers under -W error.
tests/lib/core/harness/conformance/test_langgraph_conformance.py Fixtures manually construct correct events (ReasoningContent for reasoning) but never run them through the actual converter, so they don't catch the TextContent/ReasoningContent mismatch in _langgraph_sync.py; module docstring contradicts harness integration tests on SpanDeriver Full-event support.
tests/lib/core/harness/test_harness_langgraph_async.py Comprehensive offline integration tests for auto_send_turn path; correctly documents that TurnResult.usage is a pre-iteration snapshot and directs callers to turn.usage() post-iteration.
.github/workflows/harness-integration.yml Enables the previously-disabled live-matrix job with a 3-way channel matrix (sync/async/temporal); hash-pinned action references and fail-fast: false are both correct.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant Agent as Agent (acp.py)
    participant LGT as LangGraphTurn
    participant Conv as convert_langgraph_to_agentex_events
    participant Emitter as UnifiedEmitter
    participant Streaming as adk.streaming

    Agent->>LGT: "LangGraphTurn(graph.astream(), model=model)"
    Agent->>Emitter: yield_turn(turn) or auto_send_turn(turn)
    Emitter->>LGT: iterate turn.events
    LGT->>Conv: convert_langgraph_to_agentex_events(stream, on_final_ai_message)
    loop LangGraph events
        Conv-->>LGT: StreamTaskMessageStart/Delta/Done (text) or StreamTaskMessageFull (tool)
        LGT-->>Emitter: yield event
        Emitter->>Streaming: streaming_task_message_context(...)
        Note over Conv,LGT: on_final_ai_message fires for AIMessage in updates
    end
    LGT->>LGT: _usage updated via _capture()
    Emitter-->>Agent: TurnResult or async for event
    Note over Agent: turn.usage() returns post-iteration usage
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant Agent as Agent (acp.py)
    participant LGT as LangGraphTurn
    participant Conv as convert_langgraph_to_agentex_events
    participant Emitter as UnifiedEmitter
    participant Streaming as adk.streaming

    Agent->>LGT: "LangGraphTurn(graph.astream(), model=model)"
    Agent->>Emitter: yield_turn(turn) or auto_send_turn(turn)
    Emitter->>LGT: iterate turn.events
    LGT->>Conv: convert_langgraph_to_agentex_events(stream, on_final_ai_message)
    loop LangGraph events
        Conv-->>LGT: StreamTaskMessageStart/Delta/Done (text) or StreamTaskMessageFull (tool)
        LGT-->>Emitter: yield event
        Emitter->>Streaming: streaming_task_message_context(...)
        Note over Conv,LGT: on_final_ai_message fires for AIMessage in updates
    end
    LGT->>LGT: _usage updated via _capture()
    Emitter-->>Agent: TurnResult or async for event
    Note over Agent: turn.usage() returns post-iteration usage
Loading

Comments Outside Diff (1)

  1. src/agentex/lib/adk/_modules/_langgraph_sync.py, line 147-153 (link)

    P1 Reasoning block StreamTaskMessageStart uses wrong content type

    When a reasoning model emits a block of type "reasoning", the code opens the stream with TextContent(type="text", ...) instead of ReasoningContent. Downstream consumers that dispatch on content.type (e.g. rendering pipelines, the SpanDeriver text-span logic) will receive a TextContent wrapper for what is actually a reasoning block, then see a ReasoningContentDelta arrive — a type mismatch that will confuse or break those consumers. ReasoningContent is also not imported in this file, confirming the intended type was never used. The conformance fixture _REASONING correctly shows ReasoningContent as the expected start content, but it constructs the events by hand and never runs them through the actual converter, so no test catches this today.

    Prompt To Fix With AI
    This is a comment left during a code review.
    Path: src/agentex/lib/adk/_modules/_langgraph_sync.py
    Line: 147-153
    
    Comment:
    **Reasoning block `StreamTaskMessageStart` uses wrong content type**
    
    When a reasoning model emits a block of type `"reasoning"`, the code opens the stream with `TextContent(type="text", ...)` instead of `ReasoningContent`. Downstream consumers that dispatch on `content.type` (e.g. rendering pipelines, the `SpanDeriver` text-span logic) will receive a `TextContent` wrapper for what is actually a reasoning block, then see a `ReasoningContentDelta` arrive — a type mismatch that will confuse or break those consumers. `ReasoningContent` is also not imported in this file, confirming the intended type was never used. The conformance fixture `_REASONING` correctly shows `ReasoningContent` as the expected start content, but it constructs the events by hand and never runs them through the actual converter, so no test catches this today.
    
    How can I resolve this? If you propose a fix, please make it concise.

    Fix in Claude Code

Fix All in Claude Code

Prompt To Fix All With AI
Fix the following 2 code review issues. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 2
src/agentex/lib/adk/_modules/_langgraph_sync.py:147-153
**Reasoning block `StreamTaskMessageStart` uses wrong content type**

When a reasoning model emits a block of type `"reasoning"`, the code opens the stream with `TextContent(type="text", ...)` instead of `ReasoningContent`. Downstream consumers that dispatch on `content.type` (e.g. rendering pipelines, the `SpanDeriver` text-span logic) will receive a `TextContent` wrapper for what is actually a reasoning block, then see a `ReasoningContentDelta` arrive — a type mismatch that will confuse or break those consumers. `ReasoningContent` is also not imported in this file, confirming the intended type was never used. The conformance fixture `_REASONING` correctly shows `ReasoningContent` as the expected start content, but it constructs the events by hand and never runs them through the actual converter, so no test catches this today.

### Issue 2 of 2
tests/lib/core/harness/conformance/test_langgraph_conformance.py:29-35
**Contradictory statements about `SpanDeriver` Full-event support**

The module docstring states "The SpanDeriver does not produce tool spans from Full events today; that gap is tracked in AGX1-373", but both `test_harness_langgraph_sync.py` and `test_harness_langgraph_async.py` contain a test titled `test_tracer_produces_tool_spans_for_full_events` that asserts `SpanDeriver` *does* open and close tool spans for `Full(ToolRequestContent)` / `Full(ToolResponseContent)` events. If those tests pass, the docstring comment here is stale and will mislead future readers about the current capability.

Reviews (3): Last reviewed commit: "fix(langgraph): restore created_at + doc..." | Re-trigger Greptile

Comment on lines +95 to +119
def __init__(self, stream: Any, model: str | None = None) -> None:
self._stream = stream
self._model = model
self._usage: TurnUsage = TurnUsage(model=model)

@property
def events(self) -> AsyncIterator[StreamTaskMessage]:
return self._generate_events()

async def _generate_events(self) -> AsyncGenerator[StreamTaskMessage, None]:
def _capture(ai_msg: Any) -> None:
usage_metadata = getattr(ai_msg, "usage_metadata", None)
if usage_metadata is not None:
self._usage = langgraph_usage_to_turn_usage(usage_metadata, self._model)

async for ev in convert_langgraph_to_agentex_events(self._stream, on_final_ai_message=_capture):
yield ev

def usage(self) -> TurnUsage:
"""Return the usage captured from the last AIMessage in the stream.

Valid only after ``events`` has been fully consumed.
Returns a zero-usage ``TurnUsage`` if the model did not report usage.
"""
return self._usage

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 TurnResult.usage is always empty when using auto_send_turn

LangGraphTurn populates self._usage lazily via the on_final_ai_message callback, which fires during event iteration. However, UnifiedEmitter.auto_send_turn passes usage=turn.usage() as an argument to auto_send before iteration begins (Python evaluates all arguments before the call). By the time the stream is consumed and _capture updates self._usage, the pre-iteration snapshot has already been handed to TurnResult.

Concretely: every caller that reads result.usage after await emitter.auto_send_turn(turn) gets TurnUsage(model=model) — zero token counts regardless of what the model reported. The PR description documents the workaround ("callers should read turn.usage() after auto_send_turn returns"), but TurnResult.usage existing with silent stale data is a trap for every future user of this API.

The fix belongs in emitter.py: call turn.usage() after await auto_send(turn.events, ...) returns, then construct the TurnResult from the now-populated usage.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/agentex/lib/adk/_modules/_langgraph_turn.py
Line: 95-119

Comment:
**`TurnResult.usage` is always empty when using `auto_send_turn`**

`LangGraphTurn` populates `self._usage` lazily via the `on_final_ai_message` callback, which fires _during_ event iteration. However, `UnifiedEmitter.auto_send_turn` passes `usage=turn.usage()` as an argument to `auto_send` _before_ iteration begins (Python evaluates all arguments before the call). By the time the stream is consumed and `_capture` updates `self._usage`, the pre-iteration snapshot has already been handed to `TurnResult`.

Concretely: every caller that reads `result.usage` after `await emitter.auto_send_turn(turn)` gets `TurnUsage(model=model)` — zero token counts regardless of what the model reported. The PR description documents the workaround ("callers should read `turn.usage()` after `auto_send_turn` returns"), but `TurnResult.usage` existing with silent stale data is a trap for every future user of this API.

The fix belongs in `emitter.py`: call `turn.usage()` _after_ `await auto_send(turn.events, ...)` returns, then construct the `TurnResult` from the now-populated usage.

How can I resolve this? If you propose a fix, please make it concise.

Fix in Claude Code

declan-scale and others added 9 commits June 18, 2026 17:03
Adds an additive on_final_ai_message=None parameter to
convert_langgraph_to_agentex_events so callers can capture AIMessage
usage_metadata without re-traversing the stream. No behavior change when
omitted. Also adds a DeprecationWarning to create_langgraph_tracing_handler
and its module docstring, pointing to the unified harness surface, and
updates the sync module docstring with the preferred unified path.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Implements LangGraphTurn (HarnessTurn protocol) that wraps a LangGraph
astream() event stream and captures usage from AIMessage.usage_metadata
via the on_final_ai_message callback. Implements langgraph_usage_to_turn_usage
that maps all UsageMetadata fields (input/output/total/cache_read/reasoning)
onto the framework-agnostic TurnUsage model. Zero token counts are preserved.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…pre-refactor)

Records the current bespoke behavior as a contract test. After Task 4 rewrites
the internals to use UnifiedEmitter + LangGraphTurn, these tests must still pass
to confirm behavioral parity.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…urface

Replaces the bespoke Redis-streaming loop with UnifiedEmitter.auto_send_turn(
LangGraphTurn(...)), matching the pattern established for pydantic-ai. Public
signature preserved identically. Behavioral difference: tool calls/responses
are now posted via streaming_task_message_context (not adk.messages.create),
and final_text accumulates all text across the turn. Updates the characterization
test to document these unified-surface semantics.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Verifies yield_turn(LangGraphTurn) produces identical events to direct
iteration, and documents the AGX1-377 behavior (LangGraph Full tool events
don't produce SpanDeriver spans today; cross-channel equivalence comes with
AGX1-373).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…-step)

Registers LangGraph-specific conformance fixtures with the shared harness
conformance runner. Documents the AGX1-377 behavior (tool requests are Full
events, not Start+Done). Span derivation is deterministic for all 4 fixtures.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ral channels

Adds 18 offline integration tests across the three delivery channels using
fake LangGraph event streams and fake streaming backends. Documents the
AGX1-377 behavior (Full events don't produce tool spans). Notes the usage
capture timing: turn.usage() is the authoritative post-iteration value since
auto_send_turn evaluates usage eagerly before events are consumed.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Task 9: add 3 deployable tutorial agents that demonstrate the unified
harness surface side-by-side with the bespoke reference examples:
- examples/tutorials/00_sync/harness_langgraph/ (s-harness-langgraph)
  uses UnifiedEmitter.yield_turn(LangGraphTurn(stream))
- examples/tutorials/10_async/00_base/harness_langgraph/ (a-harness-langgraph)
  uses UnifiedEmitter.auto_send_turn(LangGraphTurn(stream))
- examples/tutorials/10_async/10_temporal/harness_langgraph/ (at-harness-langgraph)
  follows 130_langgraph pattern (LangGraphPlugin + emit_langgraph_messages)

Task 10: enable live-matrix CI job in harness-integration.yml with a
3-way matrix over [sync, async, temporal] running offline integration tests.
Also add test_harness_langgraph_*.py to PR path triggers.

Task 11 (pyright fixes): annotate convert_langgraph_to_agentex_events and
_generate_events with AsyncGenerator return types so pyright infers them as
async generators rather than coroutines. Add start_time to Span construction
in test_langgraph_sync_unified.py fake tracing backend.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…racing handler (PR 5/6)

AGX1-378: wire workflow_now_if_in_workflow() into stream_langgraph_events so
Temporal callers get deterministic message timestamps, matching the pattern
used by the openai/litellm providers.

Deprecation alignment: remove runtime warnings.warn from
create_langgraph_tracing_handler (and unused import warnings) to match PR 4/6
pydantic-ai convention. Deprecation remains in docstrings on module, class,
and function. Callers under -W error are no longer broken.

Test alignment after rebase onto unified-harness-surface (b4b8b33):
- FakeStreamingModule.streaming_task_message_context in test_langgraph_async.py
  and test_pydantic_ai_async.py updated to accept **kw (foundation now passes
  created_at).
- Three "no tool spans for Full events" tests updated to assert the new
  SpanDeriver behaviour: Full(ToolRequestContent) opens a span,
  Full(ToolResponseContent) closes it.
- Two "accumulates all text" multi-step tests corrected to last-segment
  semantics (auto_send resets final_text_parts on each new Start(TextContent)).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@declan-scale declan-scale force-pushed the declan-scale/pr5-langgraph branch from dc5c81d to 68572d5 Compare June 18, 2026 21:12
@declan-scale

Copy link
Copy Markdown
Contributor Author

@greptile review

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant