- 
                Notifications
    
You must be signed in to change notification settings  - Fork 701
 
          feat(sink): produce Upsert chunk in compaction for SinkType::Upsert
          #23581
        
          New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
          
 This stack of pull requests is managed by Graphite. Learn more about stacking.  | 
    
Upsert chunk for compactionUpsert chunk in compaction
      bf326df    to
    d1b8c13      
    Compare
  
    There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR renames StreamChunkCompactor to StreamChunkUpsertCompactor and modifies the compaction logic to produce chunks in upsert format by removing UpdateDelete operations and rewriting UpdateInsert operations as Insert operations. This change minimizes unnecessary Delete operations to external systems and aligns better with SinkType::Upsert semantics.
Key Changes:
- Renamed 
StreamChunkCompactortoStreamChunkUpsertCompactorandcompact_chunk_inlinetointo_upsert_compacted_chunk - Modified compaction methods to produce upsert format chunks using new 
Record::into_upsert()method - Added 
Kindtype parameter toChangeBuffer::into_chunkandinto_chunksmethods to support both upsert and retract formats - Updated 
StreamSinkto correctly derive and set stream kind based on sink type 
Reviewed Changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description | 
|---|---|
| src/stream/src/executor/sink.rs | Updates sink executor to use renamed StreamChunkUpsertCompactor and into_upsert_compacted_chunk | 
| src/stream/src/executor/mview/materialize.rs | Updates materialize executor to explicitly use RETRACT kind for change buffer | 
| src/stream/src/common/upsert_compact.rs | Renames compactor class, updates methods to produce upsert format chunks, and improves documentation | 
| src/stream/src/common/mod.rs | Renames module from compact_chunk to upsert_compact | 
| src/stream/src/common/change_buffer.rs | Adds Kind type parameter to support both upsert and retract formats in chunk conversion | 
| src/frontend/src/optimizer/plan_node/stream_sink.rs | Updates sink type derivation logic and explicitly sets stream kind based on sink type | 
| src/common/src/array/stream_record.rs | Adds into_upsert() method to convert Update records to Insert format | 
| src/common/src/array/stream_chunk_builder.rs | Refactors append_record to use helper method for cleaner code | 
d1b8c13    to
    702d0b5      
    Compare
  
    | 
           I realized that there's an exception:   | 
    
39dbfa6    to
    ecdff93      
    Compare
  
    Signed-off-by: Bugen Zhao <i@bugenzhao.com>
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
562d4b8    to
    c8661ad      
    Compare
  
    Upsert chunk in compactionUpsert chunk in compaction for SinkType::Upsert
      
          
 Addressed by introducing   | 
    
c8661ad    to
    a96f178      
    Compare
  
    
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Extend
StreamChunkCompactorto allow specifyingOutputKind, which decides whether to produce chunks inRetractformat, which convertUpdaterecord to a pair ofU-andU+rowsUpsertformat, which only keep the new row ofUpdaterecord and convert it to anInsertrowDispatch different
OutputKindbased on whether theSinkTypeisUpsertorRetract(introduced in last PR: #23593).Previously...
Updaterows might result in inconsecutive, forcing us to rewrite it into 2 operations (DeleteandInsert) (Unexpected Op in output of StreamChunkCompactor into_compacted_chunks #23540)Updateoperation in the optimal wayThis led to unnecessary
Deleteoperation to external system, along with I/O overhead and temporary inconsistent state.Directly produce chunk in upsert format in compaction minimize
Deleteoperation as much as possible. This approach also better aligns with the semantics ofSinkType::Upsert. If the sink explicitly requires old value for updates, typicallyDEBEZIUM, it should be markedSinkType::Retractinstead.By using upsert format in sink-into-table, we also fixes #22579, since the update won't be considered as a pair of
DeleteandInsertwhen handling table conflict.Checklist
Documentation
Release note