Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 40 additions & 10 deletions src/agentex/lib/core/services/adk/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,12 @@ def __init__(self, on_flush: Callable[[StreamTaskMessageDelta], Awaitable[object
self._first_flushed = False
self._closed = False
self._lock = asyncio.Lock()
self._flush_signal = asyncio.Event()
# Two events so the ticker can park at zero CPU when idle:
# _wake — buffer went empty -> non-empty; the ticker should run
# _flush_now — flush immediately (first delta / size threshold / close),
# bypassing the coalescing window
self._wake = asyncio.Event()
self._flush_now = asyncio.Event()
self._task: asyncio.Task[None] | None = None

def start(self) -> None:
Expand All @@ -177,22 +182,42 @@ async def add(self, update: StreamTaskMessageDelta) -> None:
if self._closed:
return
async with self._lock:
was_empty = not self._buf
self._buf.append(update)
self._buf_chars += _delta_char_len(update.delta)
if not self._first_flushed or self._buf_chars >= self.MAX_BUFFERED_CHARS:
self._first_flushed = True
self._flush_signal.set()
self._flush_now.set()
# Wake the (possibly parked) ticker when the buffer goes from empty
# to non-empty; it then applies the coalescing window itself.
if was_empty:
self._wake.set()

async def _run(self) -> None:
try:
while True:
try:
await asyncio.wait_for(self._flush_signal.wait(), timeout=self.FLUSH_INTERVAL_S)
except asyncio.TimeoutError:
pass
# Park at zero CPU until there is data to flush (or close()).
# This is the key change from a fixed-interval ticker: an idle
# or orphaned buffer blocks here instead of waking every
# FLUSH_INTERVAL_S forever — the latter leaked CPU when a buffer
# outlived its stream without close() running (one spinning task
# per such stream).
await self._wake.wait()
self._wake.clear()
# First delta / size threshold / close flush immediately;
# otherwise coalesce for up to FLUSH_INTERVAL_S so consecutive
# deltas batch into a single publish.
if not self._flush_now.is_set() and not self._closed:
try:
await asyncio.wait_for(self._flush_now.wait(), timeout=self.FLUSH_INTERVAL_S)
except asyncio.TimeoutError:
pass
async with self._lock:
self._flush_signal.clear()
self._flush_now.clear()
drained = self._drain_locked()
# Data that arrived during the flush keeps the ticker running.
if self._buf:
self._wake.set()
for u in drained:
try:
await self._on_flush(u)
Expand All @@ -215,12 +240,17 @@ async def close(self) -> None:
# producing the duplicate-tail symptom seen on the UI stream.
self._closed = True
if self._task is not None:
self._flush_signal.set()
# Wake the parked ticker so it sees _closed and exits after its
# next drain.
self._wake.set()
self._flush_now.set()
try:
await self._task
except asyncio.CancelledError:
# Propagate if our caller is being cancelled; the task itself
# swallows CancelledError so this only fires on outer cancel.
# Our caller is being cancelled. Force-cancel the ticker so it
# can never be orphaned into a parked/looping task, then
# propagate the cancellation.
self._task.cancel()
raise
self._task = None
async with self._lock:
Expand Down
54 changes: 54 additions & 0 deletions tests/lib/core/services/adk/test_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,60 @@ async def on_flush(u: StreamTaskMessageDelta) -> None:
await buf.close()


class TestCoalescingBufferIdleParks:
"""Regression: the ticker must park (block on its wake event) when there is
no buffered data, instead of waking every FLUSH_INTERVAL_S. The old
fixed-interval ticker spun at 1/FLUSH_INTERVAL forever, so a buffer that
outlived its stream (orphaned, close() not run) pinned worker CPU — one
spinning task per such stream.
"""

@staticmethod
def _count_drains(buf: CoalescingBuffer) -> list[int]:
"""Instrument _drain_locked to count ticker wake/drain cycles."""
n = [0]
orig = buf._drain_locked

def counting() -> list[StreamTaskMessageDelta]:
n[0] += 1
return orig()

buf._drain_locked = counting # type: ignore[method-assign]
return n

@pytest.mark.asyncio
async def test_idle_buffer_does_not_spin(self) -> None:
"""With no data ever added, the ticker must not drain at all over many
FLUSH_INTERVAL_S windows."""
buf = CoalescingBuffer(on_flush=AsyncMock())
drains = self._count_drains(buf)
buf.start()
try:
# ~8 windows at FLUSH_INTERVAL_S=0.050; a polling ticker would have
# woken ~8 times. A parked ticker drains 0 times.
await asyncio.sleep(0.4)
assert drains[0] == 0, f"idle ticker woke {drains[0]}x (must park at 0)"
finally:
await buf.close()

@pytest.mark.asyncio
async def test_orphaned_buffer_parks_after_flush(self, task_message: TaskMessage) -> None:
"""A buffer whose close() never runs (orphaned on an abnormal stream
exit) must still park at zero CPU once its data is drained — not spin.
This is the exact condition that previously leaked worker CPU."""
buf = CoalescingBuffer(on_flush=AsyncMock())
buf.start()
try:
await buf.add(_text(task_message, "hi")) # one immediate flush
await asyncio.sleep(0.020) # let it flush and park
drains = self._count_drains(buf)
# Deliberately do NOT close — simulate an orphaned buffer.
await asyncio.sleep(0.4)
assert drains[0] == 0, f"orphaned ticker woke {drains[0]}x (must park at 0)"
finally:
await buf.close() # cleanup only


class TestCoalescingBufferClose:
@pytest.mark.asyncio
async def test_close_drains_remaining_buffered_items(self, task_message: TaskMessage) -> None:
Expand Down
Loading