fix(streaming): CoalescingBuffer leaks CPU — idle/orphaned ticker spins at 1/FLUSH_INTERVAL forever#418
Draft
eberki-scale wants to merge 1 commit into
Draft
fix(streaming): CoalescingBuffer leaks CPU — idle/orphaned ticker spins at 1/FLUSH_INTERVAL forever#418eberki-scale wants to merge 1 commit into
eberki-scale wants to merge 1 commit into
Conversation
|
This PR is targeting The
See |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
CoalescingBuffer(agentex/lib/core/services/adk/streaming.py) runs a per-instance background ticker that wakes everyFLUSH_INTERVAL_S(50 ms) whether or not there is buffered data. Because one buffer + ticker is created per streaming task-message context, any buffer that outlives its stream without a cleanclose()becomes an orphaned task that polls at 20 Hz forever. In a long-lived worker that handles many streaming tasks, these accumulate and ratchet CPU up until a core is saturated, with no memory growth and no log output, clearing only on process restart.Symptoms
Root cause
When the buffer is empty and
_closedisFalse, thewait_fortimes out every 50 ms, drains nothing, and loops — a permanent 20 Hz busy-loop. The loop only exits on_closed, which is set byclose(). Ifclose()never runs, or is interrupted by cancellation while awaiting the ticker, the ticker is orphaned and spins indefinitely. With N orphaned/idle buffers, the event loop spends most of its time arming/cancellingTimerHandles.Evidence (py-spy on an affected worker)
CPU was ~linearly proportional to the number of streamed tasks since the last restart; terminating the in-flight tasks/workflows did not release it (the leak is the worker-process asyncio tasks, independent of task/workflow state).
Fix
Replace the single
_flush_signal+ fixed-interval wait with two events so the ticker can park at zero CPU when idle:_wake— set byadd()only when the buffer goes empty → non-empty._runblocks onawait self._wake.wait()when idle (no polling)._flush_now— set on first delta / size threshold / close → immediate flush, bypassing the coalescing delay._runparks on_wake; on wake it flushes immediately if_flush_nowis set, otherwise coalesces for up toFLUSH_INTERVAL_S, drains, re-arms_wakeif data arrived during the flush, and exits on_closed.close()additionally force-cancels the ticker ifclose()itself is cancelled, so it can never be orphaned on the cancellation path.Behaviour preserved: first-delta-immediate flush (latency-critical), the
MAX_BUFFERED_CHARSearly flush, theFLUSH_INTERVAL_Scoalescing window for trailing partials, and the exactly-once final drain onclose(). The only difference: an idle or orphaned buffer parks instead of polling, and under light streaming (buffer empties between deltas) a delta may flush slightly sooner — coalescing under sustained streaming is unchanged.Tests
New
TestCoalescingBufferIdleParks:test_idle_buffer_does_not_spin— no data added → 0 drain cycles over ~8 windows (was ~8).test_orphaned_buffer_parks_after_flush— buffer flushed then never closed → 0 drain cycles afterward.Both fail against the old code (
~8) and pass against the fix.Existing streaming suite unchanged: 33 passed (
tests/lib/core/services/adk/test_streaming.py).Risk
Low. Hot-path semantics (first-flush, size/time coalescing, exactly-once close drain) are covered by the existing 31 tests and unchanged. The change is local to
CoalescingBuffer.