From 5b92e1c5ad4b190f995f79e13858cd3ee1740c62 Mon Sep 17 00:00:00 2001 From: Declan Brady Date: Thu, 18 Jun 2026 15:24:16 -0400 Subject: [PATCH 1/3] fix(harness): assert cross-channel (yield vs auto-send) conformance equivalence [AGX1-373] Rebased on the pyright-clean foundation. Includes @override on _RecordingTracer.handle and relative conformance imports so the whole-repo pyright (scripts/lint) passes. Co-Authored-By: Claude Opus 4.8 (1M context) --- tests/lib/core/harness/conformance/runner.py | 388 +++++++++++++++++- .../harness/conformance/test_conformance.py | 147 ++++++- 2 files changed, 524 insertions(+), 11 deletions(-) diff --git a/tests/lib/core/harness/conformance/runner.py b/tests/lib/core/harness/conformance/runner.py index 81a74860c..e992c26de 100644 --- a/tests/lib/core/harness/conformance/runner.py +++ b/tests/lib/core/harness/conformance/runner.py @@ -1,8 +1,22 @@ """Shared conformance engine: every harness tap registers fixtures here. -A fixture is (name, list[StreamTaskMessage]). The runner asserts that span -derivation over the events is identical regardless of delivery channel, which is -the cross-channel guarantee from the spec. +A fixture is (name, list[StreamTaskMessage]). The runner asserts two things: + +1. **Cross-channel logical equivalence**: yield_events and auto_send produce the + same *logical* sequence of delivered message contents. "Logical" means we + normalise away the streaming-envelope difference: + - yield channel delivers StreamTaskMessageFull(ToolResponseContent) verbatim. + - auto_send channel delivers the same tool-response by opening a streaming + context with the full content and closing it immediately (Start+Done on the + wire), not a Full event. + Both reduce to the same LogicalDelivery(type, identity) tuple; the conformance + test compares those normalised sequences. + +2. **Span signal equivalence**: each channel is driven with its own recording + tracer that captures every SpanSignal it actually receives in handle(); the + two channels' recorded signal lists must be identical. Comparing what each + channel genuinely emitted (rather than re-deriving from the events) catches a + regression where a channel skips deriver.observe() for some event type. Registry shared-state hazard: `_REGISTRY` is process-global. Every `test_*.py` module that calls `register()` at import time contributes to it, so a module @@ -12,13 +26,40 @@ module should register and parametrize over its OWN fixtures (e.g. keep a module-local list it both registers and parametrizes), rather than relying on cross-module global accumulation via `all_fixtures()`. + +Design decision — Full-message handling in auto_send +---------------------------------------------------- +auto_send posts a StreamTaskMessageFull (tool_request or tool_response) by +opening a streaming context with the full content and closing it immediately, +rather than calling adk.messages.create. This open+close approach is retained +because: + - StreamingTaskMessageContext.close() persists initial_content when no deltas + have been streamed, so the message IS correctly persisted. + - It mirrors the pattern already used by the real _langgraph_async.py harness, + keeping behavioural parity. + - Switching to adk.messages.create would require an additional injectable + dependency, adding surface area for no observable benefit. +The conformance test treats this as an ACCEPTABLE envelope difference: at the +logical-content level, Full(ToolResponseContent) from yield and +Start(content)+Done from auto_send are equivalent. The recorded span signals are +identical because both adapters drive the same SpanDeriver.observe() call +sequence and forward every signal to their tracer. """ from __future__ import annotations +import types as _types +from typing import Any, NamedTuple, override from dataclasses import dataclass +from agentex.types.task_message import TaskMessage from agentex.lib.core.harness.types import SpanSignal, StreamTaskMessage +from agentex.lib.core.harness.tracer import SpanTracer +from agentex.types.task_message_update import ( + StreamTaskMessageDone, + StreamTaskMessageFull, + StreamTaskMessageStart, +) from agentex.lib.core.harness.span_derivation import SpanDeriver @@ -46,3 +87,344 @@ def derive_all(events: list[StreamTaskMessage]) -> list[SpanSignal]: out.extend(d.observe(e)) out.extend(d.flush()) return out + + +# --------------------------------------------------------------------------- +# Logical delivery normalisation +# --------------------------------------------------------------------------- + + +class LogicalDelivery(NamedTuple): + """A single logically-delivered message, channel-agnostic. + + `content_type` is the .type of the content (e.g. "text", "reasoning", + "tool_request", "tool_response"). `identity` is a frozenset of key=value + pairs that uniquely identify the content (e.g. tool_call_id for tool + messages, or index for text/reasoning). + """ + + content_type: str + identity: frozenset[tuple[str, Any]] + + +def _yield_logical_deliveries(events: list[StreamTaskMessage]) -> list[LogicalDelivery]: + """Extract logical deliveries from the yield channel's event list. + + The yield channel forwards events verbatim. A logical delivery is: + - A Full event (tool_request / tool_response): content delivered as-is. + - A Start + ... + Done sequence for text/reasoning content. + """ + deliveries: list[LogicalDelivery] = [] + # Track which indices had a Start so we can pair with Done + started: dict[int, Any] = {} # index -> initial content + + for event in events: + if isinstance(event, StreamTaskMessageStart): + if event.index is not None: + started[event.index] = event.content + elif isinstance(event, StreamTaskMessageDone): + if event.index is not None and event.index in started: + content = started.pop(event.index) + ctype = getattr(content, "type", None) or "" + if ctype in ("text", "reasoning"): + # Identify text by index, reasoning by index + deliveries.append( + LogicalDelivery( + content_type=ctype, + identity=frozenset({("index", event.index)}), + ) + ) + # tool_request Start+Done just means the span opens; the message + # itself is delivered via Full (ToolRequestContent Full), so we + # don't emit a delivery here for Start(tool_request)+Done. + elif isinstance(event, StreamTaskMessageFull): + content = event.content + ctype = getattr(content, "type", None) or "" + if ctype == "tool_response": + from agentex.types.tool_response_content import ToolResponseContent + + if isinstance(content, ToolResponseContent): + deliveries.append( + LogicalDelivery( + content_type=ctype, + identity=frozenset( + { + ("tool_call_id", content.tool_call_id), + ("name", content.name), + } + ), + ) + ) + elif ctype == "tool_request": + from agentex.types.tool_request_content import ToolRequestContent + + if isinstance(content, ToolRequestContent): + deliveries.append( + LogicalDelivery( + content_type=ctype, + identity=frozenset( + { + ("tool_call_id", content.tool_call_id), + ("name", content.name), + } + ), + ) + ) + + return deliveries + + +# --------------------------------------------------------------------------- +# Fake streaming backend for auto_send conformance runner +# --------------------------------------------------------------------------- + + +class _FakeCtx: + """Mirrors StreamingTaskMessageContext: __aenter__ opens, close() closes.""" + + def __init__(self, sink: list[Any], content_type: str, initial_content: Any) -> None: + self.sink = sink + self.content_type = content_type + self.task_message = TaskMessage( + id="msg-conformance", + task_id="conformance-task", + content=initial_content, + ) + + async def __aenter__(self) -> "_FakeCtx": + self.sink.append(("open", self.content_type, self.task_message.content)) + return self + + async def __aexit__(self, *args: Any) -> bool: + await self.close() + return False + + async def close(self) -> None: + self.sink.append(("close", self.content_type)) + + async def stream_update(self, update: Any) -> Any: + self.sink.append(("update", update)) + return update + + +class _FakeStreaming: + """Fake streaming backend; records every context lifecycle event.""" + + def __init__(self) -> None: + self.sink: list[Any] = [] + + def streaming_task_message_context( + self, + task_id: str, + initial_content: Any, + streaming_mode: str = "coalesced", + created_at: Any = None, + ) -> _FakeCtx: + ctype = getattr(initial_content, "type", None) or "" + self.sink.append(("ctx", ctype, initial_content)) + return _FakeCtx(self.sink, ctype, initial_content) + + +class _FakeTracing: + """Minimal tracing backend: records started/ended span names + outputs.""" + + def __init__(self) -> None: + self.started: list[str] = [] + self.ended: list[Any] = [] + + async def start_span( + self, + *, + trace_id: str, + name: str, + input: Any = None, + parent_id: Any = None, + data: Any = None, + task_id: Any = None, + ) -> Any: + self.started.append(name) + return _types.SimpleNamespace() + + async def end_span(self, *, trace_id: str, span: Any) -> None: + self.ended.append(getattr(span, "output", None)) + + +class _RecordingTracer(SpanTracer): + """SpanTracer that records every SpanSignal it actually receives. + + Each delivery channel calls `tracer.handle(signal)` for every signal it + derives from the stream, so `received_signals` captures what the channel + genuinely emitted — not a re-derivation. Comparing the two channels' + recorded lists catches regressions where a channel skips + `deriver.observe(event)` for some event type. + """ + + def __init__(self, tracing: Any) -> None: + super().__init__( + trace_id="conformance-trace", + parent_span_id="conformance-parent", + tracing=tracing, + ) + self.received_signals: list[SpanSignal] = [] + + @override + async def handle(self, signal: SpanSignal) -> None: + self.received_signals.append(signal) + await super().handle(signal) + + +async def _gen(events: list[StreamTaskMessage]): # type: ignore[return] + for e in events: + yield e + + +def _auto_send_logical_deliveries(sink: list[Any]) -> list[LogicalDelivery]: + """Extract logical deliveries from the auto_send fake streaming sink. + + Each context lifecycle in the sink looks like: + ("ctx", ctype, content) -- context created + ("open", ctype, content) -- context __aenter__ + [("update", delta), ...] -- optional deltas + ("close", ctype) -- context closed + + A logical delivery corresponds to each open+close pair. For text/reasoning + we identify by index embedded in the content; for tool messages we use + tool_call_id + name. + """ + deliveries: list[LogicalDelivery] = [] + # Pair up opens by scanning the sink in order + open_idx = 0 + while open_idx < len(sink): + entry = sink[open_idx] + if entry[0] == "ctx": + ctype: str = entry[1] + content: Any = entry[2] + # Find the matching open (should be right after ctx) + # and close for this ctype + found_open = False + for j in range(open_idx + 1, len(sink)): + if sink[j][0] == "open" and sink[j][1] == ctype and not found_open: + found_open = True + elif sink[j][0] == "close" and sink[j][1] == ctype and found_open: + # Matched: emit logical delivery + if ctype in ("text", "reasoning"): + # For text/reasoning, we use the index from the event + # which auto_send doesn't track directly. However the + # conformance test sends a single stream so the order + # of text/reasoning deliveries IS meaningful; use + # a positional counter derived from the sink scan. + # We identify text/reasoning by counting how many text/ + # reasoning ctx entries appeared before this one. + count = sum(1 for k in range(open_idx) if sink[k][0] == "ctx" and sink[k][1] == ctype) + deliveries.append( + LogicalDelivery( + content_type=ctype, + identity=frozenset({("seq", count)}), + ) + ) + elif ctype == "tool_response": + from agentex.types.tool_response_content import ToolResponseContent + + if isinstance(content, ToolResponseContent): + deliveries.append( + LogicalDelivery( + content_type=ctype, + identity=frozenset( + { + ("tool_call_id", content.tool_call_id), + ("name", content.name), + } + ), + ) + ) + elif ctype == "tool_request": + from agentex.types.tool_request_content import ToolRequestContent + + if isinstance(content, ToolRequestContent): + deliveries.append( + LogicalDelivery( + content_type=ctype, + identity=frozenset( + { + ("tool_call_id", content.tool_call_id), + ("name", content.name), + } + ), + ) + ) + open_idx = j + 1 + break + else: + open_idx += 1 + else: + open_idx += 1 + + return deliveries + + +def _yield_text_reasoning_seq(deliveries: list[LogicalDelivery]) -> list[LogicalDelivery]: + """Re-key text/reasoning deliveries from index-based to seq-based identity. + + The yield channel uses event.index as identity; auto_send uses a sequential + counter. To compare across channels, normalise both to sequential position + within each content type. + """ + result: list[LogicalDelivery] = [] + counts: dict[str, int] = {} + for d in deliveries: + if d.content_type in ("text", "reasoning"): + seq = counts.get(d.content_type, 0) + counts[d.content_type] = seq + 1 + result.append( + LogicalDelivery( + content_type=d.content_type, + identity=frozenset({("seq", seq)}), + ) + ) + else: + result.append(d) + return result + + +async def run_cross_channel_conformance( + fixture: Fixture, +) -> tuple[list[LogicalDelivery], list[LogicalDelivery], list[SpanSignal], list[SpanSignal]]: + """Run both channels over a fixture; return (yield_deliveries, auto_deliveries, + yield_spans, auto_spans). + + The caller asserts yield_deliveries == auto_deliveries and + yield_spans == auto_spans. The span signals are the ones each channel's + tracer ACTUALLY recorded while delivering (not a re-derivation), so a + regression where a channel skips deriver.observe() for some event type is + caught. + """ + from agentex.lib.core.harness.auto_send import auto_send + from agentex.lib.core.harness.yield_delivery import yield_events + + # --- yield channel --- + tracer_yield = _RecordingTracer(tracing=_FakeTracing()) + yield_out = [e async for e in yield_events(_gen(fixture.events), tracer=tracer_yield)] + + # Span signals the yield channel actually emitted to its tracer + yield_spans = tracer_yield.received_signals + + # Logical deliveries from yield output + yield_deliveries = _yield_text_reasoning_seq(_yield_logical_deliveries(yield_out)) + + # --- auto_send channel --- + tracer_auto = _RecordingTracer(tracing=_FakeTracing()) + fake_streaming = _FakeStreaming() + await auto_send( + _gen(fixture.events), + task_id="conformance-task", + tracer=tracer_auto, + streaming=fake_streaming, + ) + + # Span signals the auto_send channel actually emitted to its tracer + auto_spans = tracer_auto.received_signals + + # Logical deliveries from what the streaming backend received + auto_deliveries = _auto_send_logical_deliveries(fake_streaming.sink) + + return yield_deliveries, auto_deliveries, yield_spans, auto_spans diff --git a/tests/lib/core/harness/conformance/test_conformance.py b/tests/lib/core/harness/conformance/test_conformance.py index d9eec1c15..eb6adb80a 100644 --- a/tests/lib/core/harness/conformance/test_conformance.py +++ b/tests/lib/core/harness/conformance/test_conformance.py @@ -1,16 +1,60 @@ +"""Cross-channel conformance tests: yield_events vs auto_send. + +What is asserted +---------------- +For each fixture the conformance runner drives BOTH delivery channels and +verifies two guarantees: + +1. **Logical-delivery equivalence**: the sequence of logically-delivered + messages is identical across channels. "Logical" normalises away the + streaming-envelope difference: + - yield channel delivers StreamTaskMessageFull(ToolResponseContent) as-is. + - auto_send delivers the same tool-response by opening a streaming context + with the full content and closing it immediately. + Both collapse to LogicalDelivery(content_type, identity) tuples that compare + equal. + +2. **Span signal equivalence**: both channels feed the same pure SpanDeriver + over the same event sequence, so the derived span signals must be identical. + +What is NOT asserted +-------------------- +Raw wire-level event shapes are NOT compared (that would fail by design: the +Full vs Start+Done envelope difference is a documented, acceptable choice in +auto_send — see runner.py for the rationale). +""" + +from __future__ import annotations + import pytest +from agentex.types.text_delta import TextDelta +from agentex.types.text_content import TextContent +from agentex.types.reasoning_content import ReasoningContent from agentex.types.task_message_update import ( StreamTaskMessageDone, StreamTaskMessageFull, + StreamTaskMessageDelta, StreamTaskMessageStart, ) from agentex.types.tool_request_content import ToolRequestContent from agentex.types.tool_response_content import ToolResponseContent +from agentex.types.reasoning_content_delta import ReasoningContentDelta -from .runner import Fixture, register, derive_all, all_fixtures +from .runner import ( + Fixture, + register, + derive_all, + all_fixtures, + run_cross_channel_conformance, +) + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- -register( +_FIXTURES: list[Fixture] = [ + # fixture 1: single tool call (the canonical builtin example) Fixture( name="builtin-single-tool", events=[ @@ -30,14 +74,101 @@ ), ), ], + ), + # fixture 2: streaming text — exercises the text start/delta/done path + Fixture( + name="streaming-text", + events=[ + StreamTaskMessageStart( + type="start", + index=0, + content=TextContent(type="text", author="agent", content=""), + ), + StreamTaskMessageDelta( + type="delta", + index=0, + delta=TextDelta(type="text", text_delta="Hello"), + ), + StreamTaskMessageDelta( + type="delta", + index=0, + delta=TextDelta(type="text", text_delta=" world"), + ), + StreamTaskMessageDone(type="done", index=0), + ], + ), + # fixture 3: reasoning block — exercises reasoning span open/close + delivery + Fixture( + name="reasoning-block", + events=[ + StreamTaskMessageStart( + type="start", + index=0, + content=ReasoningContent( + type="reasoning", + author="agent", + summary=["Thinking..."], + ), + ), + StreamTaskMessageDelta( + type="delta", + index=0, + delta=ReasoningContentDelta( + type="reasoning_content", + content_index=0, + content_delta="step 1", + ), + ), + StreamTaskMessageDone(type="done", index=0), + ], + ), +] + +# Register all fixtures for backward-compatible use via all_fixtures() +for _f in _FIXTURES: + register(_f) + + +# --------------------------------------------------------------------------- +# Cross-channel conformance: logical equivalence + span equivalence +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("fixture", _FIXTURES, ids=lambda f: f.name) +@pytest.mark.asyncio +async def test_cross_channel_equivalence(fixture: Fixture) -> None: + """Assert that yield_events and auto_send produce equivalent logical + deliveries and identical span signals for every fixture. + + This is the real cross-channel guarantee: the two delivery adapters + agree on WHAT was delivered (logical content) and HOW spans were derived, + even though their streaming-envelope shapes differ (Full vs Start+Done for + tool messages). + + The span signals are the ones each channel's tracer ACTUALLY recorded while + delivering, not a re-derivation, so a regression where one channel skips + deriver.observe() for some event type is caught here. + """ + yield_deliveries, auto_deliveries, yield_spans, auto_spans = await run_cross_channel_conformance(fixture) + + assert yield_deliveries == auto_deliveries, ( + f"[{fixture.name}] logical deliveries differ:\n yield: {yield_deliveries}\n auto_send: {auto_deliveries}" ) -) + assert yield_spans == auto_spans, ( + f"[{fixture.name}] span signals differ:\n yield: {yield_spans}\n auto_send: {auto_spans}" + ) + + +# --------------------------------------------------------------------------- +# Backward-compatible determinism test (kept for regression coverage) +# --------------------------------------------------------------------------- @pytest.mark.parametrize("fixture", all_fixtures(), ids=lambda f: f.name) -def test_span_derivation_is_deterministic(fixture): - """Exercises the cross-channel guarantee: yield and auto-send observe the - same event stream, so span derivation must be deterministic/idempotent.""" - # Deriving twice over the same events yields identical signals (the property - # that makes yield vs auto-send equivalent, since both observe the same stream). +def test_span_derivation_is_deterministic(fixture: Fixture) -> None: + """Span derivation over the same event list is idempotent. + + Retained as a lightweight regression guard. The primary cross-channel + guarantee is asserted in test_cross_channel_equivalence above. + """ assert derive_all(fixture.events) == derive_all(fixture.events) From 9903c503392b057cc41996dd9fa18c0532e947cd Mon Sep 17 00:00:00 2001 From: Declan Brady Date: Thu, 18 Jun 2026 16:03:21 -0400 Subject: [PATCH 2/3] =?UTF-8?q?test(harness):=20address=20greptile=20?= =?UTF-8?q?=E2=80=94=20strengthen=20conformance=20payload=20comparison?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add `payload: str` field to LogicalDelivery (NamedTuple, default ""). - _yield_logical_deliveries: track TextDelta / ReasoningContentDelta accumulation per-index; include "".join(deltas) as payload for text/ reasoning deliveries. Include json.dumps(arguments, sort_keys=True) as payload for tool_request; str(content) for tool_response. - _auto_send_logical_deliveries: collect ("update", delta) entries from the _FakeCtx sink between open and close; extract TextDelta / ReasoningContentDelta text and accumulate. Carry same tool payload fields. - _yield_text_reasoning_seq: forward payload through when re-keying index → seq. - All 35 harness tests pass; ruff + pyright clean. Co-Authored-By: Claude Sonnet 4.6 --- tests/lib/core/harness/conformance/runner.py | 74 +++++++++++++++----- 1 file changed, 56 insertions(+), 18 deletions(-) diff --git a/tests/lib/core/harness/conformance/runner.py b/tests/lib/core/harness/conformance/runner.py index e992c26de..10af164e3 100644 --- a/tests/lib/core/harness/conformance/runner.py +++ b/tests/lib/core/harness/conformance/runner.py @@ -9,8 +9,15 @@ - auto_send channel delivers the same tool-response by opening a streaming context with the full content and closing it immediately (Start+Done on the wire), not a Full event. - Both reduce to the same LogicalDelivery(type, identity) tuple; the conformance - test compares those normalised sequences. + Both reduce to the same LogicalDelivery(type, identity, payload) tuple; the + conformance test compares those normalised sequences. + + `payload` carries the content that callers actually consume: + - text/reasoning: the accumulated concatenated delta string + - tool_request: the arguments dict (JSON-sorted) + - tool_response: the content value (str) + This catches a channel that delivers the right structural shape but corrupts + or drops the payload. 2. **Span signal equivalence**: each channel is driven with its own recording tracer that captures every SpanSignal it actually receives in handle(); the @@ -48,18 +55,22 @@ from __future__ import annotations +import json import types as _types from typing import Any, NamedTuple, override from dataclasses import dataclass +from agentex.types.text_delta import TextDelta from agentex.types.task_message import TaskMessage from agentex.lib.core.harness.types import SpanSignal, StreamTaskMessage from agentex.lib.core.harness.tracer import SpanTracer from agentex.types.task_message_update import ( StreamTaskMessageDone, StreamTaskMessageFull, + StreamTaskMessageDelta, StreamTaskMessageStart, ) +from agentex.types.reasoning_content_delta import ReasoningContentDelta from agentex.lib.core.harness.span_derivation import SpanDeriver @@ -100,11 +111,16 @@ class LogicalDelivery(NamedTuple): `content_type` is the .type of the content (e.g. "text", "reasoning", "tool_request", "tool_response"). `identity` is a frozenset of key=value pairs that uniquely identify the content (e.g. tool_call_id for tool - messages, or index for text/reasoning). + messages, or index for text/reasoning). `payload` is a stable string + representation of the content callers actually consume: + - text/reasoning: accumulated delta string + - tool_request: JSON-sorted arguments + - tool_response: str(content) """ content_type: str identity: frozenset[tuple[str, Any]] + payload: str = "" def _yield_logical_deliveries(events: list[StreamTaskMessage]) -> list[LogicalDelivery]: @@ -113,30 +129,47 @@ def _yield_logical_deliveries(events: list[StreamTaskMessage]) -> list[LogicalDe The yield channel forwards events verbatim. A logical delivery is: - A Full event (tool_request / tool_response): content delivered as-is. - A Start + ... + Done sequence for text/reasoning content. + + The `payload` field captures the content callers consume: accumulated delta + text for text/reasoning, arguments for tool_request, and response body for + tool_response. This ensures a channel that delivers the right structure but + corrupts the payload is caught. """ deliveries: list[LogicalDelivery] = [] # Track which indices had a Start so we can pair with Done started: dict[int, Any] = {} # index -> initial content + # Accumulate delta text per index + accumulated: dict[int, list[str]] = {} # index -> list of delta strings for event in events: if isinstance(event, StreamTaskMessageStart): if event.index is not None: started[event.index] = event.content + accumulated[event.index] = [] + elif isinstance(event, StreamTaskMessageDelta): + if event.index is not None and event.delta is not None: + if isinstance(event.delta, TextDelta) and event.delta.text_delta: + accumulated.setdefault(event.index, []).append(event.delta.text_delta) + elif isinstance(event.delta, ReasoningContentDelta) and event.delta.content_delta: + accumulated.setdefault(event.index, []).append(event.delta.content_delta) elif isinstance(event, StreamTaskMessageDone): if event.index is not None and event.index in started: content = started.pop(event.index) + deltas = accumulated.pop(event.index, []) ctype = getattr(content, "type", None) or "" if ctype in ("text", "reasoning"): - # Identify text by index, reasoning by index deliveries.append( LogicalDelivery( content_type=ctype, identity=frozenset({("index", event.index)}), + payload="".join(deltas), ) ) # tool_request Start+Done just means the span opens; the message # itself is delivered via Full (ToolRequestContent Full), so we # don't emit a delivery here for Start(tool_request)+Done. + # AGX1-377: once auto_send handles the streamed tool-request shape, + # this suppression will be removed and the delivery counted here. elif isinstance(event, StreamTaskMessageFull): content = event.content ctype = getattr(content, "type", None) or "" @@ -153,6 +186,7 @@ def _yield_logical_deliveries(events: list[StreamTaskMessage]) -> list[LogicalDe ("name", content.name), } ), + payload=str(content.content), ) ) elif ctype == "tool_request": @@ -168,6 +202,7 @@ def _yield_logical_deliveries(events: list[StreamTaskMessage]) -> list[LogicalDe ("name", content.name), } ), + payload=json.dumps(content.arguments, sort_keys=True), ) ) @@ -284,42 +319,42 @@ def _auto_send_logical_deliveries(sink: list[Any]) -> list[LogicalDelivery]: Each context lifecycle in the sink looks like: ("ctx", ctype, content) -- context created ("open", ctype, content) -- context __aenter__ - [("update", delta), ...] -- optional deltas + [("update", delta), ...] -- optional deltas (StreamTaskMessageDelta) ("close", ctype) -- context closed A logical delivery corresponds to each open+close pair. For text/reasoning - we identify by index embedded in the content; for tool messages we use - tool_call_id + name. + we identify by sequential position and accumulate the delta payload; for + tool messages we use tool_call_id + name and capture arguments/content. """ deliveries: list[LogicalDelivery] = [] - # Pair up opens by scanning the sink in order open_idx = 0 while open_idx < len(sink): entry = sink[open_idx] if entry[0] == "ctx": ctype: str = entry[1] content: Any = entry[2] - # Find the matching open (should be right after ctx) - # and close for this ctype found_open = False + delta_parts: list[str] = [] for j in range(open_idx + 1, len(sink)): if sink[j][0] == "open" and sink[j][1] == ctype and not found_open: found_open = True + elif found_open and sink[j][0] == "update": + # Accumulate delta content from StreamTaskMessageDelta + update = sink[j][1] + if isinstance(update, StreamTaskMessageDelta) and update.delta is not None: + if isinstance(update.delta, TextDelta) and update.delta.text_delta: + delta_parts.append(update.delta.text_delta) + elif isinstance(update.delta, ReasoningContentDelta) and update.delta.content_delta: + delta_parts.append(update.delta.content_delta) elif sink[j][0] == "close" and sink[j][1] == ctype and found_open: - # Matched: emit logical delivery + # Matched open+close: emit logical delivery with payload if ctype in ("text", "reasoning"): - # For text/reasoning, we use the index from the event - # which auto_send doesn't track directly. However the - # conformance test sends a single stream so the order - # of text/reasoning deliveries IS meaningful; use - # a positional counter derived from the sink scan. - # We identify text/reasoning by counting how many text/ - # reasoning ctx entries appeared before this one. count = sum(1 for k in range(open_idx) if sink[k][0] == "ctx" and sink[k][1] == ctype) deliveries.append( LogicalDelivery( content_type=ctype, identity=frozenset({("seq", count)}), + payload="".join(delta_parts), ) ) elif ctype == "tool_response": @@ -335,6 +370,7 @@ def _auto_send_logical_deliveries(sink: list[Any]) -> list[LogicalDelivery]: ("name", content.name), } ), + payload=str(content.content), ) ) elif ctype == "tool_request": @@ -350,6 +386,7 @@ def _auto_send_logical_deliveries(sink: list[Any]) -> list[LogicalDelivery]: ("name", content.name), } ), + payload=json.dumps(content.arguments, sort_keys=True), ) ) open_idx = j + 1 @@ -379,6 +416,7 @@ def _yield_text_reasoning_seq(deliveries: list[LogicalDelivery]) -> list[Logical LogicalDelivery( content_type=d.content_type, identity=frozenset({("seq", seq)}), + payload=d.payload, ) ) else: From 2e820c719cb9e7843d1eb8a01892d371366f9860 Mon Sep 17 00:00:00 2001 From: Declan Brady Date: Thu, 18 Jun 2026 17:08:44 -0400 Subject: [PATCH 3/3] =?UTF-8?q?test(harness):=20propagate=20AGX1-377/378?= =?UTF-8?q?=20fix=20into=20conformance=20=E2=80=94=20un-suppress=20streame?= =?UTF-8?q?d=20tool-request=20delivery,=20include=20initial=5Fcontent=20in?= =?UTF-8?q?=20payload?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove the Start(tool_request)+Done suppression in _yield_logical_deliveries: auto_send now delivers streamed tool-request messages (AGX1-377 fix), so both channels emit a LogicalDelivery for a streamed tool_request. The cross-channel assertion verifies delivery on both sides. - Include StreamTaskMessageStart.content in payload comparison for text and reasoning types: TextContent.content is prepended to accumulated deltas; ReasoningContent.summary items are prepended. This catches a channel that drops initial_content or reasoning summary (Greptile id 3438655533, P1). _auto_send_logical_deliveries mirrors the same seeding from ctx initial_content. - Add "streamed-tool-request" fixture (Start + Done, no Full) to confirm delivery on both channels under the new auto_send behaviour. - Update "streaming-text" fixture to use non-empty initial_content ("Init") so the initial_content seeding is actually exercised by the test. - Update module/docstring comments that referenced the AGX1-377 suppression. Co-Authored-By: Claude Sonnet 4.6 --- tests/lib/core/harness/conformance/runner.py | 98 ++++++++++++++----- .../harness/conformance/test_conformance.py | 55 +++++++++-- 2 files changed, 121 insertions(+), 32 deletions(-) diff --git a/tests/lib/core/harness/conformance/runner.py b/tests/lib/core/harness/conformance/runner.py index 10af164e3..84e84fa51 100644 --- a/tests/lib/core/harness/conformance/runner.py +++ b/tests/lib/core/harness/conformance/runner.py @@ -13,11 +13,12 @@ conformance test compares those normalised sequences. `payload` carries the content that callers actually consume: - - text/reasoning: the accumulated concatenated delta string - - tool_request: the arguments dict (JSON-sorted) + - text: initial_content.content prepended, then accumulated delta string + - reasoning: initial_content.summary joined, then accumulated delta string + - tool_request: the arguments dict (JSON-sorted), from Start content - tool_response: the content value (str) - This catches a channel that delivers the right structural shape but corrupts - or drops the payload. + This catches a channel that delivers the right structural shape but corrupts, + drops, or omits initial_content (including reasoning summary) or payload. 2. **Span signal equivalence**: each channel is driven with its own recording tracer that captures every SpanSignal it actually receives in handle(); the @@ -51,6 +52,13 @@ Start(content)+Done from auto_send are equivalent. The recorded span signals are identical because both adapters drive the same SpanDeriver.observe() call sequence and forward every signal to their tracer. + +AGX1-377 fix: auto_send now DELIVERS streamed tool-request messages (Start+Done) +instead of dropping them. The conformance normaliser previously suppressed the +delivery for Start(tool_request)+Done on the yield channel to match auto_send's +old drop behaviour. That suppression is now removed: both channels produce a +LogicalDelivery for a streamed tool_request, and the cross-channel assertion +verifies it is delivered on both. """ from __future__ import annotations @@ -113,9 +121,11 @@ class LogicalDelivery(NamedTuple): pairs that uniquely identify the content (e.g. tool_call_id for tool messages, or index for text/reasoning). `payload` is a stable string representation of the content callers actually consume: - - text/reasoning: accumulated delta string - - tool_request: JSON-sorted arguments - - tool_response: str(content) + - text: initial_content.content prepended to accumulated delta strings + - reasoning: initial_content.summary joined, prepended to accumulated + reasoning-content delta strings + - tool_request: JSON-sorted arguments from Start content + - tool_response: str(content) from Full event """ content_type: str @@ -128,24 +138,39 @@ def _yield_logical_deliveries(events: list[StreamTaskMessage]) -> list[LogicalDe The yield channel forwards events verbatim. A logical delivery is: - A Full event (tool_request / tool_response): content delivered as-is. - - A Start + ... + Done sequence for text/reasoning content. - - The `payload` field captures the content callers consume: accumulated delta - text for text/reasoning, arguments for tool_request, and response body for - tool_response. This ensures a channel that delivers the right structure but - corrupts the payload is caught. + - A Start + ... + Done sequence for text/reasoning/tool_request content. + + The `payload` field captures the content callers consume: + - text: initial_content.content (from Start) prepended to accumulated deltas + - reasoning: initial_content.summary joined (from Start) prepended to + accumulated reasoning-content deltas (this catches a channel that drops + the summary) + - tool_request: JSON-sorted arguments from the Start content (AGX1-377: now + delivered on both channels, no longer suppressed) + - tool_response: str(content) from Full event """ + from agentex.types.text_content import TextContent + from agentex.types.reasoning_content import ReasoningContent + from agentex.types.tool_request_content import ToolRequestContent + deliveries: list[LogicalDelivery] = [] # Track which indices had a Start so we can pair with Done started: dict[int, Any] = {} # index -> initial content - # Accumulate delta text per index + # Accumulate delta text per index (seed with initial_content text if present) accumulated: dict[int, list[str]] = {} # index -> list of delta strings for event in events: if isinstance(event, StreamTaskMessageStart): if event.index is not None: started[event.index] = event.content - accumulated[event.index] = [] + # Seed accumulator with initial_content so a channel that drops + # initial_content but delivers deltas correctly will fail. + seed: list[str] = [] + if isinstance(event.content, TextContent) and event.content.content: + seed = [event.content.content] + elif isinstance(event.content, ReasoningContent) and event.content.summary: + seed = list(event.content.summary) + accumulated[event.index] = seed elif isinstance(event, StreamTaskMessageDelta): if event.index is not None and event.delta is not None: if isinstance(event.delta, TextDelta) and event.delta.text_delta: @@ -165,11 +190,22 @@ def _yield_logical_deliveries(events: list[StreamTaskMessage]) -> list[LogicalDe payload="".join(deltas), ) ) - # tool_request Start+Done just means the span opens; the message - # itself is delivered via Full (ToolRequestContent Full), so we - # don't emit a delivery here for Start(tool_request)+Done. - # AGX1-377: once auto_send handles the streamed tool-request shape, - # this suppression will be removed and the delivery counted here. + elif ctype == "tool_request" and isinstance(content, ToolRequestContent): + # AGX1-377 fix: auto_send now delivers streamed tool-request + # messages. Emit a delivery here so the cross-channel + # assertion verifies it is present on both channels. + deliveries.append( + LogicalDelivery( + content_type=ctype, + identity=frozenset( + { + ("tool_call_id", content.tool_call_id), + ("name", content.name), + } + ), + payload=json.dumps(content.arguments, sort_keys=True), + ) + ) elif isinstance(event, StreamTaskMessageFull): content = event.content ctype = getattr(content, "type", None) or "" @@ -323,9 +359,17 @@ def _auto_send_logical_deliveries(sink: list[Any]) -> list[LogicalDelivery]: ("close", ctype) -- context closed A logical delivery corresponds to each open+close pair. For text/reasoning - we identify by sequential position and accumulate the delta payload; for - tool messages we use tool_call_id + name and capture arguments/content. + we identify by sequential position and build the payload by prepending the + initial_content text (TextContent.content) or summary (ReasoningContent.summary) + to accumulated deltas. This matches _yield_logical_deliveries so a channel + that drops initial_content or reasoning summary fails the comparison. + For tool messages we use tool_call_id + name and capture arguments/content. """ + from agentex.types.text_content import TextContent + from agentex.types.reasoning_content import ReasoningContent + from agentex.types.tool_request_content import ToolRequestContent + from agentex.types.tool_response_content import ToolResponseContent + deliveries: list[LogicalDelivery] = [] open_idx = 0 while open_idx < len(sink): @@ -335,6 +379,12 @@ def _auto_send_logical_deliveries(sink: list[Any]) -> list[LogicalDelivery]: content: Any = entry[2] found_open = False delta_parts: list[str] = [] + # Seed delta_parts with initial_content so payload comparison + # catches a channel that drops initial_content but delivers deltas. + if isinstance(content, TextContent) and content.content: + delta_parts = [content.content] + elif isinstance(content, ReasoningContent) and content.summary: + delta_parts = list(content.summary) for j in range(open_idx + 1, len(sink)): if sink[j][0] == "open" and sink[j][1] == ctype and not found_open: found_open = True @@ -358,8 +408,6 @@ def _auto_send_logical_deliveries(sink: list[Any]) -> list[LogicalDelivery]: ) ) elif ctype == "tool_response": - from agentex.types.tool_response_content import ToolResponseContent - if isinstance(content, ToolResponseContent): deliveries.append( LogicalDelivery( @@ -374,8 +422,6 @@ def _auto_send_logical_deliveries(sink: list[Any]) -> list[LogicalDelivery]: ) ) elif ctype == "tool_request": - from agentex.types.tool_request_content import ToolRequestContent - if isinstance(content, ToolRequestContent): deliveries.append( LogicalDelivery( diff --git a/tests/lib/core/harness/conformance/test_conformance.py b/tests/lib/core/harness/conformance/test_conformance.py index eb6adb80a..627e5b587 100644 --- a/tests/lib/core/harness/conformance/test_conformance.py +++ b/tests/lib/core/harness/conformance/test_conformance.py @@ -11,8 +11,9 @@ - yield channel delivers StreamTaskMessageFull(ToolResponseContent) as-is. - auto_send delivers the same tool-response by opening a streaming context with the full content and closing it immediately. - Both collapse to LogicalDelivery(content_type, identity) tuples that compare - equal. + Both collapse to LogicalDelivery(content_type, identity, payload) tuples + that compare equal. The payload includes initial_content (TextContent.content + and ReasoningContent.summary) so a channel that drops initial content fails. 2. **Span signal equivalence**: both channels feed the same pure SpanDeriver over the same event sequence, so the derived span signals must be identical. @@ -22,6 +23,12 @@ Raw wire-level event shapes are NOT compared (that would fail by design: the Full vs Start+Done envelope difference is a documented, acceptable choice in auto_send — see runner.py for the rationale). + +AGX1-377 fix: auto_send now delivers streamed tool-request messages. The +suppression that previously prevented the yield normaliser from emitting a +LogicalDelivery for Start(tool_request)+Done is removed. Both channels now +produce a delivery for streamed tool_request, verified by the +"streamed-tool-request" fixture. """ from __future__ import annotations @@ -54,7 +61,8 @@ # --------------------------------------------------------------------------- _FIXTURES: list[Fixture] = [ - # fixture 1: single tool call (the canonical builtin example) + # fixture 1: single tool call — tool_request delivered via Full (classic path) + # plus a streamed tool_response via Full. Both channels should deliver both. Fixture( name="builtin-single-tool", events=[ @@ -75,14 +83,16 @@ ), ], ), - # fixture 2: streaming text — exercises the text start/delta/done path + # fixture 2: streaming text — exercises the text start/delta/done path. + # Uses non-empty initial_content so the payload comparison catches a channel + # that drops StreamTaskMessageStart.content (Greptile id 3438655533, P1). Fixture( name="streaming-text", events=[ StreamTaskMessageStart( type="start", index=0, - content=TextContent(type="text", author="agent", content=""), + content=TextContent(type="text", author="agent", content="Init"), ), StreamTaskMessageDelta( type="delta", @@ -97,7 +107,9 @@ StreamTaskMessageDone(type="done", index=0), ], ), - # fixture 3: reasoning block — exercises reasoning span open/close + delivery + # fixture 3: reasoning block — exercises reasoning span open/close + delivery. + # ReasoningContent.summary is included in the payload so a channel that drops + # the reasoning-summary fails (Greptile id 3438655533, P1). Fixture( name="reasoning-block", events=[ @@ -122,6 +134,37 @@ StreamTaskMessageDone(type="done", index=0), ], ), + # fixture 4: streamed tool_request (AGX1-377 fix) — tool_request delivered + # via Start+Done (no Full). auto_send now delivers this instead of dropping + # it. Both channels must produce a LogicalDelivery for this fixture. + Fixture( + name="streamed-tool-request", + events=[ + StreamTaskMessageStart( + type="start", + index=0, + content=ToolRequestContent( + type="tool_request", + author="agent", + tool_call_id="tr-1", + name="Read", + arguments={"path": "/tmp/foo"}, + ), + ), + StreamTaskMessageDone(type="done", index=0), + StreamTaskMessageFull( + type="full", + index=1, + content=ToolResponseContent( + type="tool_response", + author="agent", + tool_call_id="tr-1", + name="Read", + content="file contents", + ), + ), + ], + ), ] # Register all fixtures for backward-compatible use via all_fixtures()