From 0b5fc743f2e0711556ab4628aecd13e5fcd1a9b8 Mon Sep 17 00:00:00 2001 From: nemonlou Date: Tue, 12 Mar 2024 09:55:02 +0800 Subject: [PATCH] make nulls direction configuable for FullSortingMergeJoin(fix review comments) --- src/Core/Settings.h | 1 - src/Interpreters/InterpreterSelectQuery.cpp | 7 +-- .../Transforms/MergeJoinTransform.cpp | 56 +++++++------------ .../Transforms/MergeJoinTransform.h | 11 ---- 4 files changed, 22 insertions(+), 53 deletions(-) diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 8d48b3f5e68..a3c5638d97f 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -891,7 +891,6 @@ class IColumn; M(Int64, ignore_cold_parts_seconds, 0, "Only available in ClickHouse Cloud. Exclude new data parts from SELECT queries until they're either pre-warmed (see cache_populated_by_fetch) or this many seconds old. Only for Replicated-/SharedMergeTree.", 0) \ M(Int64, prefer_warmed_unmerged_parts_seconds, 0, "Only available in ClickHouse Cloud. If a merged part is less than this many seconds old and is not pre-warmed (see cache_populated_by_fetch), but all its source parts are available and pre-warmed, SELECT queries will read from those parts instead. Only for ReplicatedMergeTree. Note that this only checks whether CacheWarmer processed the part; if the part was fetched into cache by something else, it'll still be considered cold until CacheWarmer gets to it; if it was warmed, then evicted from cache, it'll still be considered warm.", 0) \ M(Bool, iceberg_engine_ignore_schema_evolution, false, "Ignore schema evolution in Iceberg table engine and read all data using latest schema saved on table creation. Note that it can lead to incorrect result", 0) \ - M(Bool, nulls_biggest_in_smj, true, "Treat nulls as biggest in sort. Used in sort merge join for compare null keys.", 0) \ // End of COMMON_SETTINGS // Please add settings related to formats into the FORMAT_FACTORY_SETTINGS, move obsolete settings to OBSOLETE_SETTINGS and obsolete format settings to OBSOLETE_FORMAT_SETTINGS. diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 6f0a9fa9bfb..7c87dadfce6 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -1693,10 +1693,9 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional

( query_plan.getCurrentDataStream(), joined_plan->getCurrentDataStream(), expressions.join, settings.max_block_size, max_streams, - analysis_result.optimize_read_in_order, - null_direct_hint); + analysis_result.optimize_read_in_order); join_step->setStepDescription(fmt::format("JOIN {}", expressions.join->pipelineType())); std::vector plans; diff --git a/src/Processors/Transforms/MergeJoinTransform.cpp b/src/Processors/Transforms/MergeJoinTransform.cpp index c8e3a806a9f..6288a850d76 100644 --- a/src/Processors/Transforms/MergeJoinTransform.cpp +++ b/src/Processors/Transforms/MergeJoinTransform.cpp @@ -359,7 +359,7 @@ void MergeJoinAlgorithm::consume(Input & input, size_t source_num) cursors[source_num]->setChunk(std::move(input.chunk)); } -template +template struct AllJoinImpl { constexpr static bool enabled = isInner(kind) || isLeft(kind) || isRight(kind) || isFull(kind); @@ -369,7 +369,8 @@ struct AllJoinImpl size_t max_block_size, PaddedPODArray & left_map, PaddedPODArray & right_map, - std::unique_ptr & state) + std::unique_ptr & state, + int null_direction_hint) { right_map.clear(); right_map.reserve(max_block_size); @@ -385,7 +386,7 @@ struct AllJoinImpl lpos = left_cursor->getRow(); rpos = right_cursor->getRow(); - cmp = compareCursors(left_cursor.cursor, right_cursor.cursor, nullDirection(nullOrder)); + cmp = compareCursors(left_cursor.cursor, right_cursor.cursor, null_direction_hint); if (cmp == 0) { size_t lnum = nextDistinct(left_cursor.cursor); @@ -435,37 +436,19 @@ struct AllJoinImpl } }; -template class Impl, typename ... Args> -void dispatchKind(JoinKind kind, int null_direction_hint, Args && ... args) +template class Impl, typename ... Args> +void dispatchKind(JoinKind kind, Args && ... args) { - if (isSmall(null_direction_hint)) - { - if (Impl::enabled && kind == JoinKind::Inner) - return Impl::join(std::forward(args)...); - else if (Impl::enabled && kind == JoinKind::Left) - return Impl::join(std::forward(args)...); - else if (Impl::enabled && kind == JoinKind::Right) - return Impl::join(std::forward(args)...); - else if (Impl::enabled && kind == JoinKind::Full) - return Impl::join(std::forward(args)...); + if (Impl::enabled && kind == JoinKind::Inner) + return Impl::join(std::forward(args)...); + else if (Impl::enabled && kind == JoinKind::Left) + return Impl::join(std::forward(args)...); + else if (Impl::enabled && kind == JoinKind::Right) + return Impl::join(std::forward(args)...); + else if (Impl::enabled && kind == JoinKind::Full) + return Impl::join(std::forward(args)...); else throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported join kind: \"{}\"", kind); - - } - else - { - if (Impl::enabled && kind == JoinKind::Inner) - return Impl::join(std::forward(args)...); - else if (Impl::enabled && kind == JoinKind::Left) - return Impl::join(std::forward(args)...); - else if (Impl::enabled && kind == JoinKind::Right) - return Impl::join(std::forward(args)...); - else if (Impl::enabled && kind == JoinKind::Full) - return Impl::join(std::forward(args)...); - else - throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported join kind: \"{}\"", kind); - - } } std::optional MergeJoinAlgorithm::handleAllJoinState() @@ -538,7 +521,7 @@ MergeJoinAlgorithm::Status MergeJoinAlgorithm::allJoin(JoinKind kind) { PaddedPODArray idx_map[2]; - dispatchKind(kind, null_direction_hint, *cursors[0], *cursors[1], max_block_size, idx_map[0], idx_map[1], all_join_state); + dispatchKind(kind, *cursors[0], *cursors[1], max_block_size, idx_map[0], idx_map[1], all_join_state, null_direction_hint); assert(idx_map[0].size() == idx_map[1].size()); Chunk result; @@ -588,7 +571,7 @@ MergeJoinAlgorithm::Status MergeJoinAlgorithm::allJoin(JoinKind kind) } -template +template struct AnyJoinImpl { constexpr static bool enabled = isInner(kind) || isLeft(kind) || isRight(kind); @@ -597,7 +580,8 @@ struct AnyJoinImpl FullMergeJoinCursor & right_cursor, PaddedPODArray & left_map, PaddedPODArray & right_map, - AnyJoinState & state) + AnyJoinState & state, + int null_direction_hint) { assert(enabled); @@ -620,7 +604,7 @@ struct AnyJoinImpl lpos = left_cursor->getRow(); rpos = right_cursor->getRow(); - cmp = compareCursors(left_cursor.cursor, right_cursor.cursor, nullDirection(order)); + cmp = compareCursors(left_cursor.cursor, right_cursor.cursor, null_direction_hint); if (cmp == 0) { if constexpr (isLeftOrFull(kind)) @@ -744,7 +728,7 @@ MergeJoinAlgorithm::Status MergeJoinAlgorithm::anyJoin(JoinKind kind) PaddedPODArray idx_map[2]; size_t prev_pos[] = {current_left.getRow(), current_right.getRow()}; - dispatchKind(kind, null_direction_hint, *cursors[0], *cursors[1], idx_map[0], idx_map[1], any_join_state); + dispatchKind(kind, *cursors[0], *cursors[1], idx_map[0], idx_map[1], any_join_state, null_direction_hint); assert(idx_map[0].empty() || idx_map[1].empty() || idx_map[0].size() == idx_map[1].size()); size_t num_result_rows = std::max(idx_map[0].size(), idx_map[1].size()); diff --git a/src/Processors/Transforms/MergeJoinTransform.h b/src/Processors/Transforms/MergeJoinTransform.h index 43485321122..8af486ea34b 100644 --- a/src/Processors/Transforms/MergeJoinTransform.h +++ b/src/Processors/Transforms/MergeJoinTransform.h @@ -220,17 +220,6 @@ private: bool recieved_all_blocks = false; }; -/// Join method. -enum class NullOrder -{ - SMALLEST, /// null is treated as smallest - BIGGEST /// null is treated as biggest -}; - -inline constexpr bool isSmall(int null_direction) { return null_direction == 1; } - -inline constexpr int nullDirection(NullOrder order) {return order == NullOrder::SMALLEST ? 1 : -1;} - /* * This class is used to join chunks from two sorted streams. * It is used in MergeJoinTransform.