Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/query/service/src/physical_plans/physical_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ impl HashJoin {
build_input.clone(),
probe_input.clone(),
joined_output.clone(),
factory.create_hash_join(self.join_type.clone(), 0)?,
factory.create_hash_join(0)?,
stage_sync_barrier.clone(),
self.projections.clone(),
rf_desc.clone(),
Expand Down Expand Up @@ -489,6 +489,7 @@ impl HashJoin {
.collect::<Vec<_>>();

Ok(HashJoinFactory::create(
self.join_type.clone(),
ctx.ctx.clone(),
ctx.func_ctx.clone(),
DataBlock::choose_hash_method_with_types(&hash_key_types)?,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,14 @@ impl<T: GraceMemoryJoin> Join for GraceHashJoin<T> {

self.stage = RestoreStage::RestoreBuild;
*self.state.restore_partition.as_mut() = None;
self.memory_hash_join.reset_memory();

// The first arrival in each partition clears the global state.
let mut reset_global_state = false;
std::mem::swap(
&mut reset_global_state,
self.state.reset_global_state.as_mut(),
);
self.memory_hash_join.reset_memory(reset_global_state);
Ok(Some(Box::new(EmptyJoinStream)))
},
}
Expand Down Expand Up @@ -223,6 +230,8 @@ impl<T: GraceMemoryJoin> GraceHashJoin<T> {
let locked = self.state.mutex.lock();
let _locked = locked.unwrap_or_else(PoisonError::into_inner);

*self.state.reset_global_state.as_mut() = true;

if self.state.restore_partition.is_none() {
let Some((id, data)) = self.state.build_row_groups.as_mut().pop_first() else {
let Some((id, data)) = self.state.probe_row_groups.as_mut().pop_first() else {
Expand Down Expand Up @@ -367,6 +376,10 @@ impl<T: GraceMemoryJoin> GraceHashJoin<T> {

Ok(())
}

pub fn into_inner(self) -> T {
self.memory_hash_join
}
}

pub enum RestoreStage {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@

use std::sync::PoisonError;

use databend_common_expression::DataBlock;

use crate::pipelines::processors::transforms::memory::outer_left_join::OuterLeftHashJoin;
use crate::pipelines::processors::transforms::BasicHashJoinState;
use crate::pipelines::processors::transforms::HashJoinHashTable;
use crate::pipelines::processors::transforms::InnerHashJoin;
use crate::pipelines::processors::transforms::Join;

pub trait GraceMemoryJoin: Join {
fn reset_memory(&mut self);
fn reset_memory(&mut self, reset_global: bool);
fn take_memory_chunks(&self) -> Vec<DataBlock>;
}

fn reset_basic_state(state: &BasicHashJoinState) {
Expand Down Expand Up @@ -56,15 +59,32 @@ fn reset_basic_state(state: &BasicHashJoinState) {
}

impl GraceMemoryJoin for InnerHashJoin {
fn reset_memory(&mut self) {
fn reset_memory(&mut self, reset_global: bool) {
self.performance_context.clear();
reset_basic_state(&self.basic_state);

if reset_global {
reset_basic_state(&self.basic_state);
}
}

fn take_memory_chunks(&self) -> Vec<DataBlock> {
let locked = self.basic_state.mutex.lock();
let _locked = locked.unwrap_or_else(PoisonError::into_inner);
std::mem::take(self.basic_state.chunks.as_mut())
}
}

impl GraceMemoryJoin for OuterLeftHashJoin {
fn reset_memory(&mut self) {
fn reset_memory(&mut self, reset_global: bool) {
self.performance_context.clear();
reset_basic_state(&self.basic_state);
if reset_global {
reset_basic_state(&self.basic_state);
}
}

fn take_memory_chunks(&self) -> Vec<DataBlock> {
let locked = self.basic_state.mutex.lock();
let _locked = locked.unwrap_or_else(PoisonError::into_inner);
std::mem::take(self.basic_state.chunks.as_mut())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub struct GraceHashJoinState {
pub mutex: Mutex<()>,
pub ctx: Arc<QueryContext>,
pub finished: CStyleCell<bool>,
pub reset_global_state: CStyleCell<bool>,
pub restore_partition: CStyleCell<Option<usize>>,
pub restore_build_queue: CStyleCell<VecDeque<SpillMetadata>>,
pub restore_probe_queue: CStyleCell<VecDeque<SpillMetadata>>,
Expand Down Expand Up @@ -60,6 +61,7 @@ impl GraceHashJoinState {
restore_build_queue: CStyleCell::new(VecDeque::new()),
restore_probe_queue: CStyleCell::new(VecDeque::new()),
restore_partition: CStyleCell::new(None),
reset_global_state: CStyleCell::new(true),
})
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ mod grace_memory;
mod grace_state;

pub use grace_join::GraceHashJoin;
pub use grace_memory::*;
pub use grace_state::*;
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,15 @@ use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::FunctionContext;
use databend_common_expression::HashMethodKind;
use databend_common_pipeline_transforms::MemorySettings;
use databend_common_sql::plans::JoinType;

use crate::pipelines::memory_settings::MemorySettingsExt;
use crate::pipelines::processors::transforms::memory::outer_left_join::OuterLeftHashJoin;
use crate::pipelines::processors::transforms::new_hash_join::common::CStyleCell;
use crate::pipelines::processors::transforms::new_hash_join::grace::GraceHashJoinState;
use crate::pipelines::processors::transforms::new_hash_join::hybrid::HybridHashJoin;
use crate::pipelines::processors::transforms::new_hash_join::hybrid::HybridHashJoinState;
use crate::pipelines::processors::transforms::BasicHashJoinState;
use crate::pipelines::processors::transforms::GraceHashJoin;
use crate::pipelines::processors::transforms::InnerHashJoin;
Expand All @@ -38,16 +42,19 @@ use crate::sessions::QueryContext;

pub struct HashJoinFactory {
mutex: Mutex<()>,
join_type: JoinType,
ctx: Arc<QueryContext>,
desc: Arc<HashJoinDesc>,
hash_method: HashMethodKind,
function_ctx: FunctionContext,
grace_state: CStyleCell<HashMap<usize, Weak<GraceHashJoinState>>>,
basic_state: CStyleCell<HashMap<usize, Weak<BasicHashJoinState>>>,
hybrid_state: CStyleCell<HashMap<usize, Weak<HybridHashJoinState>>>,
}

impl HashJoinFactory {
pub fn create(
join_type: JoinType,
ctx: Arc<QueryContext>,
function_ctx: FunctionContext,
method: HashMethodKind,
Expand All @@ -56,14 +63,44 @@ impl HashJoinFactory {
Arc::new(HashJoinFactory {
ctx,
desc,
join_type,
function_ctx,
hash_method: method,
mutex: Mutex::new(()),
grace_state: CStyleCell::new(HashMap::new()),
basic_state: CStyleCell::new(HashMap::new()),
hybrid_state: CStyleCell::new(HashMap::new()),
})
}

pub fn create_hybrid_state(self: &Arc<Self>, id: usize) -> Result<Arc<HybridHashJoinState>> {
let locked = self.mutex.lock();
let _locked = locked.unwrap_or_else(PoisonError::into_inner);

match self.hybrid_state.as_mut().entry(id) {
Entry::Occupied(v) => match v.get().upgrade() {
Some(v) => Ok(v),
None => Err(ErrorCode::Internal(format!(
"Error state: The level {} hybrid hash state has been destroyed.",
id
))),
},
Entry::Vacant(v) => {
let hybrid_state = HybridHashJoinState::create(
self.ctx.clone(),
self.function_ctx.clone(),
self.hash_method.clone(),
self.desc.clone(),
id,
self.clone(),
);

v.insert(Arc::downgrade(&hybrid_state));
Ok(hybrid_state)
}
}
}

pub fn create_grace_state(self: &Arc<Self>, id: usize) -> Result<Arc<GraceHashJoinState>> {
let locked = self.mutex.lock();
let _locked = locked.unwrap_or_else(PoisonError::into_inner);
Expand Down Expand Up @@ -118,34 +155,61 @@ impl HashJoinFactory {
self.grace_state.as_mut().remove(&id);
}

pub fn create_hash_join(self: &Arc<Self>, typ: JoinType, id: usize) -> Result<Box<dyn Join>> {
pub fn remove_hybrid_state(&self, id: usize) {
let locked = self.mutex.lock();
let _locked = locked.unwrap_or_else(PoisonError::into_inner);
self.hybrid_state.as_mut().remove(&id);
}

pub fn create_hash_join(self: &Arc<Self>, id: usize) -> Result<Box<dyn Join>> {
let settings = self.ctx.get_settings();

if settings.get_force_join_data_spill()? {
return self.create_grace_join(typ, id);
}
// if settings.get_force_join_data_spill()? {
// return self.create_grace_join(id);
// }

let max_level = settings.get_max_grace_hash_join_level()?;
let memory_settings = MemorySettings::from_join_settings(&self.ctx)?;

match self.join_type {
JoinType::Inner => {
let inner_hash_join = InnerHashJoin::create(
&self.ctx,
self.function_ctx.clone(),
self.hash_method.clone(),
self.desc.clone(),
self.create_basic_state(id)?,
)?;

match typ {
JoinType::Inner => Ok(Box::new(InnerHashJoin::create(
&self.ctx,
self.function_ctx.clone(),
self.hash_method.clone(),
self.desc.clone(),
self.create_basic_state(id)?,
)?)),
JoinType::Left => Ok(Box::new(OuterLeftHashJoin::create(
&self.ctx,
self.function_ctx.clone(),
self.hash_method.clone(),
self.desc.clone(),
self.create_basic_state(id)?,
)?)),
Ok(Box::new(HybridHashJoin::create(
inner_hash_join,
memory_settings.clone(),
self.create_hybrid_state(id)?,
max_level,
)))
}
JoinType::Left => {
let outer_left_hash_join = OuterLeftHashJoin::create(
&self.ctx,
self.function_ctx.clone(),
self.hash_method.clone(),
self.desc.clone(),
self.create_basic_state(id)?,
)?;

Ok(Box::new(HybridHashJoin::create(
outer_left_hash_join,
memory_settings.clone(),
self.create_hybrid_state(id)?,
max_level,
)))
}
_ => unreachable!(),
}
}

pub fn create_grace_join(self: &Arc<Self>, typ: JoinType, id: usize) -> Result<Box<dyn Join>> {
match typ {
pub fn create_grace_join(self: &Arc<Self>, id: usize) -> Result<Box<dyn Join>> {
match self.join_type {
JoinType::Inner => {
let inner_hash_join = InnerHashJoin::create(
&self.ctx,
Expand All @@ -160,7 +224,7 @@ impl HashJoinFactory {
self.function_ctx.clone(),
self.hash_method.clone(),
self.desc.clone(),
self.create_grace_state(id + 1)?,
self.create_grace_state(id)?,
inner_hash_join,
0,
)?))
Expand All @@ -179,7 +243,7 @@ impl HashJoinFactory {
self.function_ctx.clone(),
self.hash_method.clone(),
self.desc.clone(),
self.create_grace_state(id + 1)?,
self.create_grace_state(id)?,
left_hash_join,
0,
)?))
Expand Down
Loading