From 1b2856ede8abe74be8a9a687986260d87c36f753 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Wed, 29 Oct 2025 09:12:54 +0800 Subject: [PATCH 1/5] refactor(query): add hybrid hash join --- .../src/physical_plans/physical_hash_join.rs | 3 +- .../new_hash_join/grace/grace_join.rs | 15 +- .../new_hash_join/grace/grace_memory.rs | 30 +- .../new_hash_join/grace/grace_state.rs | 2 + .../transforms/new_hash_join/grace/mod.rs | 1 + .../new_hash_join/hash_join_factory.rs | 106 +++++-- .../new_hash_join/hybrid/hybrid_join.rs | 294 +++++++++++++++--- .../new_hash_join/hybrid/hybrid_state.rs | 69 +++- .../transforms/new_hash_join/hybrid/mod.rs | 3 + .../transforms/new_hash_join/join.rs | 4 +- src/query/settings/src/settings_default.rs | 8 +- .../settings/src/settings_getter_setter.rs | 4 + 12 files changed, 460 insertions(+), 79 deletions(-) diff --git a/src/query/service/src/physical_plans/physical_hash_join.rs b/src/query/service/src/physical_plans/physical_hash_join.rs index 64d15be1f7743..b2ee069c89d0d 100644 --- a/src/query/service/src/physical_plans/physical_hash_join.rs +++ b/src/query/service/src/physical_plans/physical_hash_join.rs @@ -445,7 +445,7 @@ impl HashJoin { build_input.clone(), probe_input.clone(), joined_output.clone(), - factory.create_hash_join(self.join_type.clone(), 0)?, + factory.create_hash_join(0)?, stage_sync_barrier.clone(), self.projections.clone(), rf_desc.clone(), @@ -489,6 +489,7 @@ impl HashJoin { .collect::>(); Ok(HashJoinFactory::create( + self.join_type.clone(), ctx.ctx.clone(), ctx.func_ctx.clone(), DataBlock::choose_hash_method_with_types(&hash_key_types)?, diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_join.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_join.rs index 97f5fdaf6cf43..7df1eaa9ed0e7 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_join.rs @@ -174,7 +174,14 @@ impl Join for GraceHashJoin { self.stage = RestoreStage::RestoreBuild; *self.state.restore_partition.as_mut() = None; - self.memory_hash_join.reset_memory(); + + // The first arrival in each partition clears the global state. + let mut reset_global_state = false; + std::mem::swap( + &mut reset_global_state, + self.state.reset_global_state.as_mut(), + ); + self.memory_hash_join.reset_memory(reset_global_state); Ok(Some(Box::new(EmptyJoinStream))) }, } @@ -223,6 +230,8 @@ impl GraceHashJoin { let locked = self.state.mutex.lock(); let _locked = locked.unwrap_or_else(PoisonError::into_inner); + *self.state.reset_global_state.as_mut() = true; + if self.state.restore_partition.is_none() { let Some((id, data)) = self.state.build_row_groups.as_mut().pop_first() else { let Some((id, data)) = self.state.probe_row_groups.as_mut().pop_first() else { @@ -367,6 +376,10 @@ impl GraceHashJoin { Ok(()) } + + pub fn into_inner(self) -> T { + self.memory_hash_join + } } pub enum RestoreStage { diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_memory.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_memory.rs index a0ad98b17f19e..1167c992dec14 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_memory.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_memory.rs @@ -14,6 +14,8 @@ use std::sync::PoisonError; +use databend_common_expression::DataBlock; + use crate::pipelines::processors::transforms::memory::outer_left_join::OuterLeftHashJoin; use crate::pipelines::processors::transforms::BasicHashJoinState; use crate::pipelines::processors::transforms::HashJoinHashTable; @@ -21,7 +23,8 @@ use crate::pipelines::processors::transforms::InnerHashJoin; use crate::pipelines::processors::transforms::Join; pub trait GraceMemoryJoin: Join { - fn reset_memory(&mut self); + fn reset_memory(&mut self, reset_global: bool); + fn take_memory_chunks(&self) -> Vec; } fn reset_basic_state(state: &BasicHashJoinState) { @@ -56,15 +59,32 @@ fn reset_basic_state(state: &BasicHashJoinState) { } impl GraceMemoryJoin for InnerHashJoin { - fn reset_memory(&mut self) { + fn reset_memory(&mut self, reset_global: bool) { self.performance_context.clear(); - reset_basic_state(&self.basic_state); + + if reset_global { + reset_basic_state(&self.basic_state); + } + } + + fn take_memory_chunks(&self) -> Vec { + let locked = self.basic_state.mutex.lock(); + let _locked = locked.unwrap_or_else(PoisonError::into_inner); + std::mem::take(self.basic_state.chunks.as_mut()) } } impl GraceMemoryJoin for OuterLeftHashJoin { - fn reset_memory(&mut self) { + fn reset_memory(&mut self, reset_global: bool) { self.performance_context.clear(); - reset_basic_state(&self.basic_state); + if reset_global { + reset_basic_state(&self.basic_state); + } + } + + fn take_memory_chunks(&self) -> Vec { + let locked = self.basic_state.mutex.lock(); + let _locked = locked.unwrap_or_else(PoisonError::into_inner); + std::mem::take(self.basic_state.chunks.as_mut()) } } diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_state.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_state.rs index f6a41102ec54e..c9a15395c9231 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_state.rs @@ -33,6 +33,7 @@ pub struct GraceHashJoinState { pub mutex: Mutex<()>, pub ctx: Arc, pub finished: CStyleCell, + pub reset_global_state: CStyleCell, pub restore_partition: CStyleCell>, pub restore_build_queue: CStyleCell>, pub restore_probe_queue: CStyleCell>, @@ -60,6 +61,7 @@ impl GraceHashJoinState { restore_build_queue: CStyleCell::new(VecDeque::new()), restore_probe_queue: CStyleCell::new(VecDeque::new()), restore_partition: CStyleCell::new(None), + reset_global_state: CStyleCell::new(true), }) } } diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/mod.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/mod.rs index 0ee237fcf9890..973456e9cf658 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/mod.rs @@ -17,4 +17,5 @@ mod grace_memory; mod grace_state; pub use grace_join::GraceHashJoin; +pub use grace_memory::*; pub use grace_state::*; diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/hash_join_factory.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/hash_join_factory.rs index bc7aad3e4a36f..f8e17b9b16a20 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/hash_join_factory.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/hash_join_factory.rs @@ -24,11 +24,15 @@ use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::FunctionContext; use databend_common_expression::HashMethodKind; +use databend_common_pipeline_transforms::MemorySettings; use databend_common_sql::plans::JoinType; +use crate::pipelines::memory_settings::MemorySettingsExt; use crate::pipelines::processors::transforms::memory::outer_left_join::OuterLeftHashJoin; use crate::pipelines::processors::transforms::new_hash_join::common::CStyleCell; use crate::pipelines::processors::transforms::new_hash_join::grace::GraceHashJoinState; +use crate::pipelines::processors::transforms::new_hash_join::hybrid::HybridHashJoin; +use crate::pipelines::processors::transforms::new_hash_join::hybrid::HybridHashJoinState; use crate::pipelines::processors::transforms::BasicHashJoinState; use crate::pipelines::processors::transforms::GraceHashJoin; use crate::pipelines::processors::transforms::InnerHashJoin; @@ -38,16 +42,19 @@ use crate::sessions::QueryContext; pub struct HashJoinFactory { mutex: Mutex<()>, + join_type: JoinType, ctx: Arc, desc: Arc, hash_method: HashMethodKind, function_ctx: FunctionContext, grace_state: CStyleCell>>, basic_state: CStyleCell>>, + hybrid_state: CStyleCell>>, } impl HashJoinFactory { pub fn create( + join_type: JoinType, ctx: Arc, function_ctx: FunctionContext, method: HashMethodKind, @@ -56,14 +63,44 @@ impl HashJoinFactory { Arc::new(HashJoinFactory { ctx, desc, + join_type, function_ctx, hash_method: method, mutex: Mutex::new(()), grace_state: CStyleCell::new(HashMap::new()), basic_state: CStyleCell::new(HashMap::new()), + hybrid_state: CStyleCell::new(HashMap::new()), }) } + pub fn create_hybrid_state(self: &Arc, id: usize) -> Result> { + let locked = self.mutex.lock(); + let _locked = locked.unwrap_or_else(PoisonError::into_inner); + + match self.hybrid_state.as_mut().entry(id) { + Entry::Occupied(v) => match v.get().upgrade() { + Some(v) => Ok(v), + None => Err(ErrorCode::Internal(format!( + "Error state: The level {} hybrid hash state has been destroyed.", + id + ))), + }, + Entry::Vacant(v) => { + let hybrid_state = HybridHashJoinState::create( + self.ctx.clone(), + self.function_ctx.clone(), + self.hash_method.clone(), + self.desc.clone(), + id, + self.clone(), + ); + + v.insert(Arc::downgrade(&hybrid_state)); + Ok(hybrid_state) + } + } + } + pub fn create_grace_state(self: &Arc, id: usize) -> Result> { let locked = self.mutex.lock(); let _locked = locked.unwrap_or_else(PoisonError::into_inner); @@ -118,34 +155,61 @@ impl HashJoinFactory { self.grace_state.as_mut().remove(&id); } - pub fn create_hash_join(self: &Arc, typ: JoinType, id: usize) -> Result> { + pub fn remove_hybrid_state(&self, id: usize) { + let locked = self.mutex.lock(); + let _locked = locked.unwrap_or_else(PoisonError::into_inner); + self.hybrid_state.as_mut().remove(&id); + } + + pub fn create_hash_join(self: &Arc, id: usize) -> Result> { let settings = self.ctx.get_settings(); if settings.get_force_join_data_spill()? { - return self.create_grace_join(typ, id); + return self.create_grace_join(id); } - match typ { - JoinType::Inner => Ok(Box::new(InnerHashJoin::create( - &self.ctx, - self.function_ctx.clone(), - self.hash_method.clone(), - self.desc.clone(), - self.create_basic_state(id)?, - )?)), - JoinType::Left => Ok(Box::new(OuterLeftHashJoin::create( - &self.ctx, - self.function_ctx.clone(), - self.hash_method.clone(), - self.desc.clone(), - self.create_basic_state(id)?, - )?)), + let max_level = settings.get_max_grace_hash_join_level()?; + let memory_settings = MemorySettings::from_join_settings(&self.ctx)?; + + match self.join_type { + JoinType::Inner => { + let inner_hash_join = InnerHashJoin::create( + &self.ctx, + self.function_ctx.clone(), + self.hash_method.clone(), + self.desc.clone(), + self.create_basic_state(id)?, + )?; + + Ok(Box::new(HybridHashJoin::create( + inner_hash_join, + memory_settings.clone(), + self.create_hybrid_state(id)?, + max_level, + ))) + } + JoinType::Left => { + let outer_left_hash_join = OuterLeftHashJoin::create( + &self.ctx, + self.function_ctx.clone(), + self.hash_method.clone(), + self.desc.clone(), + self.create_basic_state(id)?, + )?; + + Ok(Box::new(HybridHashJoin::create( + outer_left_hash_join, + memory_settings.clone(), + self.create_hybrid_state(id)?, + max_level, + ))) + } _ => unreachable!(), } } - pub fn create_grace_join(self: &Arc, typ: JoinType, id: usize) -> Result> { - match typ { + pub fn create_grace_join(self: &Arc, id: usize) -> Result> { + match self.join_type { JoinType::Inner => { let inner_hash_join = InnerHashJoin::create( &self.ctx, @@ -160,7 +224,7 @@ impl HashJoinFactory { self.function_ctx.clone(), self.hash_method.clone(), self.desc.clone(), - self.create_grace_state(id + 1)?, + self.create_grace_state(id)?, inner_hash_join, 0, )?)) @@ -179,7 +243,7 @@ impl HashJoinFactory { self.function_ctx.clone(), self.hash_method.clone(), self.desc.clone(), - self.create_grace_state(id + 1)?, + self.create_grace_state(id)?, left_hash_join, 0, )?)) diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/hybrid/hybrid_join.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/hybrid/hybrid_join.rs index 64b8eaef12663..8d5ca64986bef 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/hybrid/hybrid_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/hybrid/hybrid_join.rs @@ -12,55 +12,257 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::any::Any; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::sync::PoisonError; + +use databend_common_base::base::ProgressValues; +use databend_common_exception::Result; +use databend_common_expression::DataBlock; use databend_common_pipeline_transforms::MemorySettings; +use crate::pipelines::processors::transforms::new_hash_join::grace::GraceMemoryJoin; +use crate::pipelines::processors::transforms::new_hash_join::hybrid::hybrid_state::HybridHashJoinState; +use crate::pipelines::processors::transforms::new_hash_join::join::JoinStream; +use crate::pipelines::processors::transforms::GraceHashJoin; use crate::pipelines::processors::transforms::Join; +use crate::pipelines::processors::transforms::JoinRuntimeFilterPacket; +use crate::pipelines::processors::transforms::RuntimeFiltersDesc; + +#[derive(Default)] +enum HybridJoinVariant { + #[default] + Swaping, + Memory(Box), + GraceHashJoin(Box), +} -#[allow(dead_code)] -struct MemoryHashJoin { - inner: Box, +pub struct HybridHashJoin { + variant: HybridJoinVariant, memory_setting: MemorySettings, + state: Arc, + max_level: usize, + _marker: std::marker::PhantomData, +} + +unsafe impl Send for HybridHashJoin {} + +unsafe impl Sync for HybridHashJoin {} + +impl Join for HybridHashJoin { + fn add_block(&mut self, data: Option) -> Result<()> { + match &mut self.variant { + HybridJoinVariant::Swaping => unreachable!(), + HybridJoinVariant::Memory(memory) => { + let finalized = data.is_none(); + + memory.add_block(data)?; + + if self.state.level < self.max_level + && (self.memory_setting.check_spill() + || self.state.has_spilled.load(Ordering::Acquire)) + { + self.state.has_spilled.store(true, Ordering::Release); + + memory.add_block(None)?; + self.transform_memory_data(finalized)?; + } + } + HybridJoinVariant::GraceHashJoin(grace) => { + while let Some((_, data_block)) = self.state.steal_transform_task() { + grace.add_block(Some(data_block))?; + } + + grace.add_block(data)?; + } + }; + + Ok(()) + } + + fn final_build(&mut self) -> Result> { + if self.state.level < self.max_level && self.state.has_spilled.load(Ordering::Acquire) { + self.convert_to_grace_join()? + } + + match &mut self.variant { + HybridJoinVariant::Swaping => unreachable!(), + HybridJoinVariant::Memory(v) => v.final_build(), + HybridJoinVariant::GraceHashJoin(grace) => grace.final_build(), + } + } + + fn build_runtime_filter(&self, desc: &RuntimeFiltersDesc) -> Result { + match &self.variant { + HybridJoinVariant::Swaping => unreachable!(), + HybridJoinVariant::Memory(v) => v.build_runtime_filter(desc), + HybridJoinVariant::GraceHashJoin(grace) => grace.build_runtime_filter(desc), + } + } + + fn probe_block(&mut self, data: DataBlock) -> Result> { + match &mut self.variant { + HybridJoinVariant::Swaping => unreachable!(), + HybridJoinVariant::Memory(v) => v.probe_block(data), + HybridJoinVariant::GraceHashJoin(grace) => grace.probe_block(data), + } + } + + fn final_probe(&mut self) -> Result>> { + match &mut self.variant { + HybridJoinVariant::Swaping => unreachable!(), + HybridJoinVariant::Memory(v) => v.final_probe(), + HybridJoinVariant::GraceHashJoin(grace) => grace.final_probe(), + } + } +} + +impl GraceMemoryJoin for HybridHashJoin { + fn reset_memory(&mut self, reset_global: bool) { + match std::mem::take(&mut self.variant) { + HybridJoinVariant::Swaping => unreachable!(), + HybridJoinVariant::Memory(memory) => { + let any_box: Box = memory; + let mut downcast_type_join = any_box.downcast::().expect("wrong memory type"); + + downcast_type_join.reset_memory(reset_global); + self.variant = HybridJoinVariant::Memory(downcast_type_join); + } + + HybridJoinVariant::GraceHashJoin(grace) => { + let any_box: Box = grace; + + match any_box.downcast::>>() { + Ok(hybrid_inner) => { + let mut memory_inner = hybrid_inner.into_inner().into_inner(); + memory_inner.reset_memory(reset_global); + self.variant = HybridJoinVariant::Memory(memory_inner); + } + Err(any_box) => { + let inner_grace = any_box + .downcast::>() + .expect("downcast grace hash join error"); + + let mut memory_inner = Box::new(inner_grace.into_inner()); + memory_inner.reset_memory(reset_global); + self.variant = HybridJoinVariant::Memory(memory_inner); + } + } + } + } + } + + fn take_memory_chunks(&self) -> Vec { + unreachable!() + } +} + +impl HybridHashJoin { + pub fn create( + inner: T, + memory_setting: MemorySettings, + state: Arc, + max_level: usize, + ) -> Self { + HybridHashJoin { + state, + memory_setting, + variant: HybridJoinVariant::Memory(Box::new(inner)), + max_level, + _marker: Default::default(), + } + } + + fn into_inner(mut self) -> Box { + match std::mem::take(&mut self.variant) { + HybridJoinVariant::Swaping => unreachable!(), + HybridJoinVariant::Memory(memory) => { + let any_box: Box = memory; + any_box.downcast::().expect("wrong memory type") + } + HybridJoinVariant::GraceHashJoin(grace) => { + let any_box: Box = grace; + + match any_box.downcast::>>() { + Ok(hybrid_inner) => hybrid_inner.into_inner().into_inner(), + Err(any_box) => { + let inner_grace = any_box + .downcast::>() + .expect("downcast grace hash join error"); + Box::new(inner_grace.into_inner()) + } + } + } + } + } + + fn transform_memory_data(&mut self, finalized: bool) -> Result<()> { + self.variant = match std::mem::take(&mut self.variant) { + HybridJoinVariant::Swaping => HybridJoinVariant::Swaping, + HybridJoinVariant::GraceHashJoin(v) => HybridJoinVariant::GraceHashJoin(v), + HybridJoinVariant::Memory(memory_join) => { + let any_box: Box = memory_join; + let memory_join = any_box.downcast::().expect("wrong memory type"); + let take_memory_chunks = memory_join.take_memory_chunks(); + + if !take_memory_chunks.is_empty() { + let self_locked = self.state.mutex.lock(); + let _self_locked = self_locked.unwrap_or_else(PoisonError::into_inner); + self.state.spills_queue.as_mut().extend(take_memory_chunks); + } + + let mut grace_hash_join = self.create_grace_hash_join()?; + + while let Some((_, data_block)) = self.state.steal_transform_task() { + if !data_block.is_empty() { + grace_hash_join.add_block(Some(data_block))?; + } + } + + if finalized { + grace_hash_join.add_block(None)?; + } + + HybridJoinVariant::GraceHashJoin(Box::new(grace_hash_join)) + } + }; + + Ok(()) + } + + fn convert_to_grace_join(&mut self) -> Result<()> { + self.variant = match std::mem::take(&mut self.variant) { + HybridJoinVariant::Swaping => HybridJoinVariant::Swaping, + HybridJoinVariant::GraceHashJoin(grace) => HybridJoinVariant::GraceHashJoin(grace), + HybridJoinVariant::Memory(_) => { + HybridJoinVariant::GraceHashJoin(Box::new(self.create_grace_hash_join()?)) + } + }; + Ok(()) + } + + fn create_grace_hash_join(&mut self) -> Result>> { + let child = self.state.factory.create_hash_join(self.state.level + 1)?; + let any_child: Box = child; + + let downcast_child = any_child + .downcast::>() + .expect("wrong memory type"); + + let grace_state = self + .state + .factory + .create_grace_state(self.state.level + 1)?; + + GraceHashJoin::create( + self.state.ctx.clone(), + self.state.function_ctx.clone(), + self.state.hash_method_kind.clone(), + self.state.desc.clone(), + grace_state, + Box::into_inner(downcast_child), + self.state.level * 4, + ) + } } -// pub struct HybridHashJoin { -// inner: Box, -// memory_settings: MemorySettings, -// -// is_memory: bool, -// state_factory: Arc, -// } -// -// impl Join for HybridHashJoin { -// fn add_block(&mut self, data: Option) -> Result<()> { -// self.inner.add_block(data)?; -// -// // if self.is_memory -// if let HybridHashJoin::Memory(memory) = self { -// if memory.memory_setting.check_spill() { -// // memory.inner.reset_memory()?; -// } -// } -// -// Ok(()) -// } -// -// fn final_build(&mut self) -> Result> { -// match self { -// HybridHashJoin::Memory(memory) => memory.inner.final_build(), -// HybridHashJoin::GraceHashJoin(grace_hash_join) => grace_hash_join.final_build(), -// } -// } -// -// fn probe_block(&mut self, data: DataBlock) -> Result> { -// match self { -// HybridHashJoin::Memory(memory) => memory.inner.probe_block(data), -// HybridHashJoin::GraceHashJoin(grace_hash_join) => grace_hash_join.probe_block(data), -// } -// } -// -// fn final_probe(&mut self) -> Result>> { -// match self { -// HybridHashJoin::Memory(memory) => memory.inner.final_probe(), -// HybridHashJoin::GraceHashJoin(grace_hash_join) => grace_hash_join.final_probe(), -// } -// } -// } diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/hybrid/hybrid_state.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/hybrid/hybrid_state.rs index 159e8ef8674c9..7ddc30acb58d0 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/hybrid/hybrid_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/hybrid/hybrid_state.rs @@ -12,9 +12,72 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::pipelines::processors::transforms::new_hash_join::grace::GraceHashJoinState; +use std::collections::VecDeque; +use std::sync::atomic::AtomicBool; +use std::sync::Arc; +use std::sync::Mutex; +use std::sync::PoisonError; + +use databend_common_expression::DataBlock; +use databend_common_expression::FunctionContext; +use databend_common_expression::HashMethodKind; + +use crate::pipelines::processors::transforms::new_hash_join::common::CStyleCell; +use crate::pipelines::processors::transforms::HashJoinFactory; +use crate::pipelines::processors::HashJoinDesc; +use crate::sessions::QueryContext; #[allow(dead_code)] -pub struct HybridState { - grace_state: GraceHashJoinState, +pub struct HybridHashJoinState { + pub mutex: Mutex<()>, + + pub spills_queue: CStyleCell>, + + pub has_spilled: AtomicBool, + pub level: usize, + pub factory: Arc, + + pub ctx: Arc, + pub function_ctx: FunctionContext, + pub hash_method_kind: HashMethodKind, + pub desc: Arc, +} + +impl HybridHashJoinState { + pub fn create( + ctx: Arc, + function_context: FunctionContext, + hash_method_kind: HashMethodKind, + desc: Arc, + level: usize, + factory: Arc, + ) -> Arc { + Arc::new(HybridHashJoinState { + ctx, + desc, + level, + factory, + hash_method_kind, + mutex: Mutex::new(()), + spills_queue: CStyleCell::new(VecDeque::new()), + has_spilled: AtomicBool::new(false), + function_ctx: function_context, + }) + } + + pub fn steal_transform_task(&self) -> Option<(bool, DataBlock)> { + let locked = self.mutex.lock(); + let _locked = locked.unwrap_or_else(PoisonError::into_inner); + + match self.spills_queue.as_mut().pop_front() { + None => None, + Some(v) => Some((self.spills_queue.is_empty(), v)), + } + } +} + +impl Drop for HybridHashJoinState { + fn drop(&mut self) { + self.factory.remove_hybrid_state(self.level) + } } diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/hybrid/mod.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/hybrid/mod.rs index 4af53c0b88c44..75f4d39f10a0a 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/hybrid/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/hybrid/mod.rs @@ -14,3 +14,6 @@ mod hybrid_join; mod hybrid_state; + +pub use hybrid_join::*; +pub use hybrid_state::*; diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/join.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/join.rs index d1a44c0992b28..f8715109d7762 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/join.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/join.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::any::Any; + use databend_common_base::base::ProgressValues; use databend_common_exception::Result; use databend_common_expression::DataBlock; @@ -23,7 +25,7 @@ pub trait JoinStream: Send + Sync { fn next(&mut self) -> Result>; } -pub trait Join: Send + Sync + 'static { +pub trait Join: Any + Send + Sync + 'static { fn add_block(&mut self, data: Option) -> Result<()>; fn final_build(&mut self) -> Result>; diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 4adbef031093d..090f5c913571d 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -1490,7 +1490,13 @@ impl DefaultSettings { scope: SettingScope::Both, range: Some(SettingRange::Numeric(0..=1)), }), - + ("max_grace_hash_join_level", DefaultSettingValue { + value: UserSettingValue::UInt64(1), + desc: "The maximum recursion depth of Grace Hash Join, each level of recursion adds 16 times the number of partitions compared to the previous level.", + mode: SettingMode::Both, + scope: SettingScope::Both, + range: Some(SettingRange::Numeric(0..=16)), + }), ("s3_storage_class", DefaultSettingValue { value: { let storage_class = Self::extract_s3_storage_class_config(&global_conf).unwrap_or_default(); diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 6d67c19e2967b..a98b4af199f38 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -1097,6 +1097,10 @@ impl Settings { Ok(self.try_get_u64("enable_experimental_new_join")? == 1) } + pub fn get_max_grace_hash_join_level(&self) -> Result { + Ok(self.try_get_u64("max_grace_hash_join_level")? as usize) + } + pub fn get_s3_storage_class(&self) -> Result { let s3_storage_class_setting = self.try_get_string("s3_storage_class")?; S3StorageClass::from_str(&s3_storage_class_setting).map_err(|e| { From f18434a6313b681ac424c9e4200414854b0712c4 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Wed, 29 Oct 2025 09:41:11 +0800 Subject: [PATCH 2/5] refactor(query): add hybrid hash join --- .../transforms/new_hash_join/hybrid/hybrid_state.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/hybrid/hybrid_state.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/hybrid/hybrid_state.rs index 7ddc30acb58d0..d7d2c59c48a5f 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/hybrid/hybrid_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/hybrid/hybrid_state.rs @@ -69,10 +69,10 @@ impl HybridHashJoinState { let locked = self.mutex.lock(); let _locked = locked.unwrap_or_else(PoisonError::into_inner); - match self.spills_queue.as_mut().pop_front() { - None => None, - Some(v) => Some((self.spills_queue.is_empty(), v)), - } + self.spills_queue + .as_mut() + .pop_front() + .map(|v| (self.spills_queue.is_empty(), v)) } } From 752ba6e34861a73c8204d838a26ffebce5238990 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Wed, 29 Oct 2025 10:09:20 +0800 Subject: [PATCH 3/5] refactor(query): add hybrid hash join --- src/query/settings/src/settings_default.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 090f5c913571d..e004a3a2abaa4 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -1484,7 +1484,7 @@ impl DefaultSettings { range: Some(SettingRange::Numeric(0..=1)), }), ("enable_experimental_new_join", DefaultSettingValue { - value: UserSettingValue::UInt64(0), + value: UserSettingValue::UInt64(1), desc: "Enables the experimental new join implement", mode: SettingMode::Both, scope: SettingScope::Both, From b80c8dca0de1d0ebfc2df567f1bf3aa24e52bd51 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Wed, 29 Oct 2025 10:55:55 +0800 Subject: [PATCH 4/5] refactor(query): add hybrid hash join --- .../new_hash_join/hybrid/hybrid_join.rs | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/hybrid/hybrid_join.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/hybrid/hybrid_join.rs index 8d5ca64986bef..e809b26d3f3b7 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/hybrid/hybrid_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/hybrid/hybrid_join.rs @@ -33,7 +33,7 @@ use crate::pipelines::processors::transforms::RuntimeFiltersDesc; #[derive(Default)] enum HybridJoinVariant { #[default] - Swaping, + Swapping, Memory(Box), GraceHashJoin(Box), } @@ -53,7 +53,7 @@ unsafe impl Sync for HybridHashJoin {} impl Join for HybridHashJoin { fn add_block(&mut self, data: Option) -> Result<()> { match &mut self.variant { - HybridJoinVariant::Swaping => unreachable!(), + HybridJoinVariant::Swapping => unreachable!(), HybridJoinVariant::Memory(memory) => { let finalized = data.is_none(); @@ -87,7 +87,7 @@ impl Join for HybridHashJoin { } match &mut self.variant { - HybridJoinVariant::Swaping => unreachable!(), + HybridJoinVariant::Swapping => unreachable!(), HybridJoinVariant::Memory(v) => v.final_build(), HybridJoinVariant::GraceHashJoin(grace) => grace.final_build(), } @@ -95,7 +95,7 @@ impl Join for HybridHashJoin { fn build_runtime_filter(&self, desc: &RuntimeFiltersDesc) -> Result { match &self.variant { - HybridJoinVariant::Swaping => unreachable!(), + HybridJoinVariant::Swapping => unreachable!(), HybridJoinVariant::Memory(v) => v.build_runtime_filter(desc), HybridJoinVariant::GraceHashJoin(grace) => grace.build_runtime_filter(desc), } @@ -103,7 +103,7 @@ impl Join for HybridHashJoin { fn probe_block(&mut self, data: DataBlock) -> Result> { match &mut self.variant { - HybridJoinVariant::Swaping => unreachable!(), + HybridJoinVariant::Swapping => unreachable!(), HybridJoinVariant::Memory(v) => v.probe_block(data), HybridJoinVariant::GraceHashJoin(grace) => grace.probe_block(data), } @@ -111,7 +111,7 @@ impl Join for HybridHashJoin { fn final_probe(&mut self) -> Result>> { match &mut self.variant { - HybridJoinVariant::Swaping => unreachable!(), + HybridJoinVariant::Swapping => unreachable!(), HybridJoinVariant::Memory(v) => v.final_probe(), HybridJoinVariant::GraceHashJoin(grace) => grace.final_probe(), } @@ -121,7 +121,7 @@ impl Join for HybridHashJoin { impl GraceMemoryJoin for HybridHashJoin { fn reset_memory(&mut self, reset_global: bool) { match std::mem::take(&mut self.variant) { - HybridJoinVariant::Swaping => unreachable!(), + HybridJoinVariant::Swapping => unreachable!(), HybridJoinVariant::Memory(memory) => { let any_box: Box = memory; let mut downcast_type_join = any_box.downcast::().expect("wrong memory type"); @@ -176,7 +176,7 @@ impl HybridHashJoin { fn into_inner(mut self) -> Box { match std::mem::take(&mut self.variant) { - HybridJoinVariant::Swaping => unreachable!(), + HybridJoinVariant::Swapping => unreachable!(), HybridJoinVariant::Memory(memory) => { let any_box: Box = memory; any_box.downcast::().expect("wrong memory type") @@ -199,7 +199,7 @@ impl HybridHashJoin { fn transform_memory_data(&mut self, finalized: bool) -> Result<()> { self.variant = match std::mem::take(&mut self.variant) { - HybridJoinVariant::Swaping => HybridJoinVariant::Swaping, + HybridJoinVariant::Swapping => HybridJoinVariant::Swapping, HybridJoinVariant::GraceHashJoin(v) => HybridJoinVariant::GraceHashJoin(v), HybridJoinVariant::Memory(memory_join) => { let any_box: Box = memory_join; @@ -233,7 +233,7 @@ impl HybridHashJoin { fn convert_to_grace_join(&mut self) -> Result<()> { self.variant = match std::mem::take(&mut self.variant) { - HybridJoinVariant::Swaping => HybridJoinVariant::Swaping, + HybridJoinVariant::Swapping => HybridJoinVariant::Swapping, HybridJoinVariant::GraceHashJoin(grace) => HybridJoinVariant::GraceHashJoin(grace), HybridJoinVariant::Memory(_) => { HybridJoinVariant::GraceHashJoin(Box::new(self.create_grace_hash_join()?)) From 029f5d5b40266c7168684904a69c5da736bcb07b Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Wed, 29 Oct 2025 15:34:32 +0800 Subject: [PATCH 5/5] refactor(query): add hybrid hash join --- .../transforms/new_hash_join/hash_join_factory.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/hash_join_factory.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/hash_join_factory.rs index f8e17b9b16a20..48ad9a8441fcc 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/hash_join_factory.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/hash_join_factory.rs @@ -164,9 +164,9 @@ impl HashJoinFactory { pub fn create_hash_join(self: &Arc, id: usize) -> Result> { let settings = self.ctx.get_settings(); - if settings.get_force_join_data_spill()? { - return self.create_grace_join(id); - } + // if settings.get_force_join_data_spill()? { + // return self.create_grace_join(id); + // } let max_level = settings.get_max_grace_hash_join_level()?; let memory_settings = MemorySettings::from_join_settings(&self.ctx)?;