feat(hooks): add BeforeStreamChunkEvent for stream chunk interception#1761
feat(hooks): add BeforeStreamChunkEvent for stream chunk interception#1761fede-kamel wants to merge 2 commits intostrands-agents:mainfrom
Conversation
d24b10f to
686f8f8
Compare
|
@mkmeral 👍 |
Integration Test ResultsRebased on latest
What was tested
Unit testsAll 108 unit tests pass after rebase (hooks, event_loop, streaming). |
|
Hey @fede-kamel — solid work here, really well-iterated PR. The use cases (content redaction, monitoring, chunk filtering) are all valid. However, we'd like to defer this for now due to the recently merged state machine design (0005) from @pgrayy. The writable Also worth noting: TS currently only has read-only @pgrayy — would love your take on whether this fits middleware or hooks in the 0005 design. 🤖 AI agent response on behalf of @mkmeral. Strands Agents. Feedback welcome! |
|
Hey @mkmeral @pgrayy — thanks for the thorough feedback. Totally understand. I went through the 0005 design and I can see how chunk interception (writable Happy to defer this PR. We can keep it as a reference alongside the linked issue (#1762), since the use case (content redaction, chunk filtering, stream monitoring) is still genuine. Once the middleware layer lands, we can revisit implementing this through that path. Is there a rough timeline for when middleware support might be available? Just so we know when to circle back. |
…ption Add BeforeStreamChunkEvent hook that fires BEFORE each stream chunk is processed, enabling true interception capabilities: - Monitor streaming progress in real-time - Modify chunk content before processing (affects final message) - Skip chunks entirely by setting skip=True (excluded from final message) - Implement content transformation (e.g., redaction, translation) Implementation details: - Added ChunkInterceptor callback type to streaming.py - Modified process_stream() to invoke interceptor BEFORE processing - Modified stream_messages() to accept chunk_interceptor parameter - event_loop.py creates interceptor that invokes BeforeStreamChunkEvent When skip=True: - The chunk is not processed at all - No events (ModelStreamChunkEvent, TextStreamEvent) are yielded - The chunk does not contribute to the final message When chunk is modified: - The modified chunk is used for all downstream processing - TextStreamEvent will contain the modified text - The final message will contain the modified content
686f8f8 to
dc8c2aa
Compare
Exercises all three hook capabilities (capture, modify, skip) against OpenAI gpt-4o-mini to confirm the feature works end-to-end with a live streaming provider.
Rebased & validated against real modelRebased on latest Live validation with OpenAI
|
Motivation
There's currently no way to intercept stream chunks before they're processed. Hooks fire after processing, so modifications don't affect the TextStreamEvent or final message content.
The current hook lifecycle has a gap during streaming:
BeforeModelCallEventfires before chunks exist.AfterModelCallEventfires after chunks have already been streamed to the user. Neither allows chunk interception.This adds
BeforeStreamChunkEventthat fires before each chunk is processed, allowing:skip=True(excludes from all events and final message)invocation_statefor contextResolves #1760
Use Cases
skip=True)Public API Changes
New event:
BeforeStreamChunkEventProperties:
chunk(writable) - the raw stream chunkskip(writable) - set True to exclude chunk from processinginvocation_state(read-only) - context dict from invocationReproduction and Demonstration
Issue reproduction (main branch): https://gist.github.com/fede-kamel/9d93429a3d6d7676906e41eb60890d46
Working demonstration (feature branch): https://gist.github.com/fede-kamel/a6a78d8d35eb9707d80c2327cd17b462
Type of Change
New feature
Testing
hatch run prepare(lint, format, tests all pass)Checklist
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.