diff --git a/src/agentex/lib/core/services/adk/streaming.py b/src/agentex/lib/core/services/adk/streaming.py index 7215f084c..e5e2a4f99 100644 --- a/src/agentex/lib/core/services/adk/streaming.py +++ b/src/agentex/lib/core/services/adk/streaming.py @@ -166,7 +166,12 @@ def __init__(self, on_flush: Callable[[StreamTaskMessageDelta], Awaitable[object self._first_flushed = False self._closed = False self._lock = asyncio.Lock() - self._flush_signal = asyncio.Event() + # Two events so the ticker can park at zero CPU when idle: + # _wake — buffer went empty -> non-empty; the ticker should run + # _flush_now — flush immediately (first delta / size threshold / close), + # bypassing the coalescing window + self._wake = asyncio.Event() + self._flush_now = asyncio.Event() self._task: asyncio.Task[None] | None = None def start(self) -> None: @@ -177,22 +182,42 @@ async def add(self, update: StreamTaskMessageDelta) -> None: if self._closed: return async with self._lock: + was_empty = not self._buf self._buf.append(update) self._buf_chars += _delta_char_len(update.delta) if not self._first_flushed or self._buf_chars >= self.MAX_BUFFERED_CHARS: self._first_flushed = True - self._flush_signal.set() + self._flush_now.set() + # Wake the (possibly parked) ticker when the buffer goes from empty + # to non-empty; it then applies the coalescing window itself. + if was_empty: + self._wake.set() async def _run(self) -> None: try: while True: - try: - await asyncio.wait_for(self._flush_signal.wait(), timeout=self.FLUSH_INTERVAL_S) - except asyncio.TimeoutError: - pass + # Park at zero CPU until there is data to flush (or close()). + # This is the key change from a fixed-interval ticker: an idle + # or orphaned buffer blocks here instead of waking every + # FLUSH_INTERVAL_S forever — the latter leaked CPU when a buffer + # outlived its stream without close() running (one spinning task + # per such stream). + await self._wake.wait() + self._wake.clear() + # First delta / size threshold / close flush immediately; + # otherwise coalesce for up to FLUSH_INTERVAL_S so consecutive + # deltas batch into a single publish. + if not self._flush_now.is_set() and not self._closed: + try: + await asyncio.wait_for(self._flush_now.wait(), timeout=self.FLUSH_INTERVAL_S) + except asyncio.TimeoutError: + pass async with self._lock: - self._flush_signal.clear() + self._flush_now.clear() drained = self._drain_locked() + # Data that arrived during the flush keeps the ticker running. + if self._buf: + self._wake.set() for u in drained: try: await self._on_flush(u) @@ -215,12 +240,17 @@ async def close(self) -> None: # producing the duplicate-tail symptom seen on the UI stream. self._closed = True if self._task is not None: - self._flush_signal.set() + # Wake the parked ticker so it sees _closed and exits after its + # next drain. + self._wake.set() + self._flush_now.set() try: await self._task except asyncio.CancelledError: - # Propagate if our caller is being cancelled; the task itself - # swallows CancelledError so this only fires on outer cancel. + # Our caller is being cancelled. Force-cancel the ticker so it + # can never be orphaned into a parked/looping task, then + # propagate the cancellation. + self._task.cancel() raise self._task = None async with self._lock: diff --git a/tests/lib/core/services/adk/test_streaming.py b/tests/lib/core/services/adk/test_streaming.py index b07c55f74..cb762fea1 100644 --- a/tests/lib/core/services/adk/test_streaming.py +++ b/tests/lib/core/services/adk/test_streaming.py @@ -303,6 +303,60 @@ async def on_flush(u: StreamTaskMessageDelta) -> None: await buf.close() +class TestCoalescingBufferIdleParks: + """Regression: the ticker must park (block on its wake event) when there is + no buffered data, instead of waking every FLUSH_INTERVAL_S. The old + fixed-interval ticker spun at 1/FLUSH_INTERVAL forever, so a buffer that + outlived its stream (orphaned, close() not run) pinned worker CPU — one + spinning task per such stream. + """ + + @staticmethod + def _count_drains(buf: CoalescingBuffer) -> list[int]: + """Instrument _drain_locked to count ticker wake/drain cycles.""" + n = [0] + orig = buf._drain_locked + + def counting() -> list[StreamTaskMessageDelta]: + n[0] += 1 + return orig() + + buf._drain_locked = counting # type: ignore[method-assign] + return n + + @pytest.mark.asyncio + async def test_idle_buffer_does_not_spin(self) -> None: + """With no data ever added, the ticker must not drain at all over many + FLUSH_INTERVAL_S windows.""" + buf = CoalescingBuffer(on_flush=AsyncMock()) + drains = self._count_drains(buf) + buf.start() + try: + # ~8 windows at FLUSH_INTERVAL_S=0.050; a polling ticker would have + # woken ~8 times. A parked ticker drains 0 times. + await asyncio.sleep(0.4) + assert drains[0] == 0, f"idle ticker woke {drains[0]}x (must park at 0)" + finally: + await buf.close() + + @pytest.mark.asyncio + async def test_orphaned_buffer_parks_after_flush(self, task_message: TaskMessage) -> None: + """A buffer whose close() never runs (orphaned on an abnormal stream + exit) must still park at zero CPU once its data is drained — not spin. + This is the exact condition that previously leaked worker CPU.""" + buf = CoalescingBuffer(on_flush=AsyncMock()) + buf.start() + try: + await buf.add(_text(task_message, "hi")) # one immediate flush + await asyncio.sleep(0.020) # let it flush and park + drains = self._count_drains(buf) + # Deliberately do NOT close — simulate an orphaned buffer. + await asyncio.sleep(0.4) + assert drains[0] == 0, f"orphaned ticker woke {drains[0]}x (must park at 0)" + finally: + await buf.close() # cleanup only + + class TestCoalescingBufferClose: @pytest.mark.asyncio async def test_close_drains_remaining_buffered_items(self, task_message: TaskMessage) -> None: