From 05cfa49c063bb72178b6c907421eea4e34f9ba68 Mon Sep 17 00:00:00 2001 From: Nikita Taranov Date: Mon, 9 Dec 2024 22:42:58 +0100 Subject: [PATCH 1/7] stash --- src/Interpreters/HashJoin/HashJoinMethods.h | 3 +- .../HashJoin/HashJoinMethodsImpl.h | 44 +++++++++++-------- 2 files changed, 28 insertions(+), 19 deletions(-) diff --git a/src/Interpreters/HashJoin/HashJoinMethods.h b/src/Interpreters/HashJoin/HashJoinMethods.h index 10fb50a6b83..af07da53019 100644 --- a/src/Interpreters/HashJoin/HashJoinMethods.h +++ b/src/Interpreters/HashJoin/HashJoinMethods.h @@ -157,12 +157,13 @@ private: AddedColumns & added_columns); /// First to collect all matched rows refs by join keys, then filter out rows which are not true in additional filter expression. - template + template static size_t joinRightColumnsWithAddtitionalFilter( std::vector && key_getter_vector, const std::vector & mapv, AddedColumns & added_columns, JoinStuff::JoinUsedFlags & used_flags [[maybe_unused]], + const Selector & selector, bool need_filter [[maybe_unused]], bool flag_per_row [[maybe_unused]]); diff --git a/src/Interpreters/HashJoin/HashJoinMethodsImpl.h b/src/Interpreters/HashJoin/HashJoinMethodsImpl.h index 1e52278f020..2b5e92b6841 100644 --- a/src/Interpreters/HashJoin/HashJoinMethodsImpl.h +++ b/src/Interpreters/HashJoin/HashJoinMethodsImpl.h @@ -357,9 +357,15 @@ size_t HashJoinMethods::joinRightColumnsSwitchMu { if (added_columns.additional_filter_expression) { - bool mark_per_row_used = join_features.right || join_features.full || mapv.size() > 1; + const bool mark_per_row_used = join_features.right || join_features.full || mapv.size() > 1; return joinRightColumnsWithAddtitionalFilter( - std::forward>(key_getter_vector), mapv, added_columns, used_flags, need_filter, mark_per_row_used); + std::forward>(key_getter_vector), + mapv, + added_columns, + used_flags, + added_columns.src_block.getSelector(), + need_filter, + mark_per_row_used); } } @@ -664,17 +670,18 @@ ColumnPtr HashJoinMethods::buildAdditionalFilter } template -template +template size_t HashJoinMethods::joinRightColumnsWithAddtitionalFilter( std::vector && key_getter_vector, const std::vector & mapv, AddedColumns & added_columns, JoinStuff::JoinUsedFlags & used_flags [[maybe_unused]], + const Selector & selector, bool need_filter [[maybe_unused]], bool flag_per_row [[maybe_unused]]) { constexpr JoinFeatures join_features; - size_t left_block_rows = added_columns.rows_to_add; + const size_t left_block_rows = added_columns.src_block.rows(); if (need_filter) added_columns.filter = IColumn::Filter(left_block_rows, 0); @@ -688,7 +695,7 @@ size_t HashJoinMethods::joinRightColumnsWithAddt using FindResult = typename KeyGetter::FindResult; size_t max_joined_block_rows = added_columns.max_joined_block_rows; - size_t left_row_iter = 0; + size_t it = 0; PreSelectedRows selected_rows; selected_rows.reserve(left_block_rows); std::vector find_results; @@ -705,8 +712,10 @@ size_t HashJoinMethods::joinRightColumnsWithAddt row_replicate_offset.push_back(0); current_added_rows = 0; selected_rows.clear(); - for (; left_row_iter < left_block_rows; ++left_row_iter) + for (; it < left_block_rows; ++it) { + size_t ind = selector[it]; + if constexpr (join_features.need_replication) { if (unlikely(total_added_rows + current_added_rows >= max_joined_block_rows)) @@ -719,13 +728,12 @@ size_t HashJoinMethods::joinRightColumnsWithAddt for (size_t join_clause_idx = 0; join_clause_idx < added_columns.join_on_keys.size(); ++join_clause_idx) { const auto & join_keys = added_columns.join_on_keys[join_clause_idx]; - if (join_keys.null_map && (*join_keys.null_map)[left_row_iter]) + if (join_keys.null_map && (*join_keys.null_map)[ind]) continue; - bool row_acceptable = !join_keys.isRowFiltered(left_row_iter); - auto find_result = row_acceptable - ? key_getter_vector[join_clause_idx].findKey(*(mapv[join_clause_idx]), left_row_iter, *pool) - : FindResult(); + bool row_acceptable = !join_keys.isRowFiltered(ind); + auto find_result + = row_acceptable ? key_getter_vector[join_clause_idx].findKey(*(mapv[join_clause_idx]), ind, *pool) : FindResult(); if (find_result.isFound()) { @@ -878,11 +886,11 @@ size_t HashJoinMethods::joinRightColumnsWithAddt } }; - while (left_row_iter < left_block_rows && !exceeded_max_block_rows) + while (it < left_block_rows && !exceeded_max_block_rows) { - auto left_start_row = left_row_iter; + auto left_start_row = it; collect_keys_matched_rows_refs(); - if (selected_rows.size() != current_added_rows || row_replicate_offset.size() != left_row_iter - left_start_row + 1) + if (selected_rows.size() != current_added_rows || row_replicate_offset.size() != it - left_start_row + 1) { throw Exception( ErrorCodes::LOGICAL_ERROR, @@ -891,7 +899,7 @@ size_t HashJoinMethods::joinRightColumnsWithAddt selected_rows.size(), current_added_rows, row_replicate_offset.size(), - left_row_iter, + it, left_start_row); } auto filter_col = buildAdditionalFilter(left_start_row, selected_rows, row_replicate_offset, added_columns); @@ -907,11 +915,11 @@ size_t HashJoinMethods::joinRightColumnsWithAddt if constexpr (join_features.need_replication) { - added_columns.offsets_to_replicate->resize_assume_reserved(left_row_iter); - added_columns.filter.resize_assume_reserved(left_row_iter); + added_columns.offsets_to_replicate->resize_assume_reserved(it); + added_columns.filter.resize_assume_reserved(it); } added_columns.applyLazyDefaults(); - return left_row_iter; + return it; } template From 11269cf3545e5f6b9fb67d25f09aa5f0069322b9 Mon Sep 17 00:00:00 2001 From: Nikita Taranov Date: Mon, 9 Dec 2024 22:51:13 +0100 Subject: [PATCH 2/7] add test --- ...llel_join_with_additional_filter.reference | 8 ++++++ ...2_parallel_join_with_additional_filter.sql | 26 +++++++++++++++++++ 2 files changed, 34 insertions(+) create mode 100644 tests/queries/0_stateless/03282_parallel_join_with_additional_filter.reference create mode 100644 tests/queries/0_stateless/03282_parallel_join_with_additional_filter.sql diff --git a/tests/queries/0_stateless/03282_parallel_join_with_additional_filter.reference b/tests/queries/0_stateless/03282_parallel_join_with_additional_filter.reference new file mode 100644 index 00000000000..2f5cded0ed5 --- /dev/null +++ b/tests/queries/0_stateless/03282_parallel_join_with_additional_filter.reference @@ -0,0 +1,8 @@ +---- HASH +1 10 alpha 1 5 ALPHA +2 15 beta 2 10 beta +3 20 gamma 0 0 +---- PARALLEL HASH +1 10 alpha 1 5 ALPHA +2 15 beta 2 10 beta +3 20 gamma 0 0 diff --git a/tests/queries/0_stateless/03282_parallel_join_with_additional_filter.sql b/tests/queries/0_stateless/03282_parallel_join_with_additional_filter.sql new file mode 100644 index 00000000000..b04b466eb2a --- /dev/null +++ b/tests/queries/0_stateless/03282_parallel_join_with_additional_filter.sql @@ -0,0 +1,26 @@ +CREATE TABLE t1 ( + key UInt32, + a UInt32, + attr String +) ENGINE = MergeTree ORDER BY key; + +CREATE TABLE t2 ( + key UInt32, + a UInt32, + attr String +) ENGINE = MergeTree ORDER BY key; + +INSERT INTO t1 (key, a, attr) VALUES (1, 10, 'alpha'), (2, 15, 'beta'), (3, 20, 'gamma'); +INSERT INTO t2 (key, a, attr) VALUES (1, 5, 'ALPHA'), (2, 10, 'beta'), (4, 25, 'delta'); + +SET allow_experimental_join_condition = 1; +SET enable_analyzer = 1; +SET max_threads = 16; + +SELECT '---- HASH'; +SELECT t1.*, t2.* FROM t1 LEFT JOIN t2 ON t1.key = t2.key AND (t1.key < t2.a OR t1.a % 2 = 0) ORDER BY ALL SETTINGS join_algorithm = 'hash'; + +SELECT '---- PARALLEL HASH'; +SELECT t1.*, t2.* FROM t1 LEFT JOIN t2 ON t1.key = t2.key AND (t1.key < t2.a OR t1.a % 2 = 0) ORDER BY ALL SETTINGS join_algorithm = 'parallel_hash'; -- { serverError NOT_IMPLEMENTED} + +SELECT t1.*, t2.* FROM t1 LEFT JOIN t2 ON t1.key = t2.key AND (t1.key < t2.a OR t1.a % 2 = 0) ORDER BY ALL; From 201f021db1e35d29ad3eed10c067e645deced66b Mon Sep 17 00:00:00 2001 From: Nikita Taranov Date: Mon, 9 Dec 2024 23:02:26 +0100 Subject: [PATCH 3/7] allow with parallel_hash --- src/Planner/PlannerJoins.cpp | 1 + .../03282_parallel_join_with_additional_filter.sql | 4 +--- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/Planner/PlannerJoins.cpp b/src/Planner/PlannerJoins.cpp index 74ff72f73fe..28be45f699d 100644 --- a/src/Planner/PlannerJoins.cpp +++ b/src/Planner/PlannerJoins.cpp @@ -876,6 +876,7 @@ std::shared_ptr chooseJoinAlgorithm( { if (table_join->getMixedJoinExpression() && !table_join->isEnabledAlgorithm(JoinAlgorithm::HASH) + && !table_join->isEnabledAlgorithm(JoinAlgorithm::PARALLEL_HASH) && !table_join->isEnabledAlgorithm(JoinAlgorithm::GRACE_HASH)) { throw Exception(ErrorCodes::NOT_IMPLEMENTED, diff --git a/tests/queries/0_stateless/03282_parallel_join_with_additional_filter.sql b/tests/queries/0_stateless/03282_parallel_join_with_additional_filter.sql index b04b466eb2a..4dadc777adf 100644 --- a/tests/queries/0_stateless/03282_parallel_join_with_additional_filter.sql +++ b/tests/queries/0_stateless/03282_parallel_join_with_additional_filter.sql @@ -21,6 +21,4 @@ SELECT '---- HASH'; SELECT t1.*, t2.* FROM t1 LEFT JOIN t2 ON t1.key = t2.key AND (t1.key < t2.a OR t1.a % 2 = 0) ORDER BY ALL SETTINGS join_algorithm = 'hash'; SELECT '---- PARALLEL HASH'; -SELECT t1.*, t2.* FROM t1 LEFT JOIN t2 ON t1.key = t2.key AND (t1.key < t2.a OR t1.a % 2 = 0) ORDER BY ALL SETTINGS join_algorithm = 'parallel_hash'; -- { serverError NOT_IMPLEMENTED} - -SELECT t1.*, t2.* FROM t1 LEFT JOIN t2 ON t1.key = t2.key AND (t1.key < t2.a OR t1.a % 2 = 0) ORDER BY ALL; +SELECT t1.*, t2.* FROM t1 LEFT JOIN t2 ON t1.key = t2.key AND (t1.key < t2.a OR t1.a % 2 = 0) ORDER BY ALL SETTINGS join_algorithm = 'parallel_hash'; From 6d5e97d9d9bf13800a2d1498bd8e4f294ab2c35d Mon Sep 17 00:00:00 2001 From: Nikita Taranov Date: Tue, 10 Dec 2024 15:30:05 +0100 Subject: [PATCH 4/7] fix --- src/Interpreters/HashJoin/HashJoinMethods.h | 4 ++- .../HashJoin/HashJoinMethodsImpl.h | 26 ++++++++++++++++--- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/src/Interpreters/HashJoin/HashJoinMethods.h b/src/Interpreters/HashJoin/HashJoinMethods.h index af07da53019..5be3b86c1fb 100644 --- a/src/Interpreters/HashJoin/HashJoinMethods.h +++ b/src/Interpreters/HashJoin/HashJoinMethods.h @@ -11,6 +11,7 @@ #include #include +#include "Parsers/ExpressionListParsers.h" namespace DB { @@ -149,9 +150,10 @@ private: template static void setUsed(IColumn::Filter & filter [[maybe_unused]], size_t pos [[maybe_unused]]); - template + template static ColumnPtr buildAdditionalFilter( size_t left_start_row, + const Selector & selector, const std::vector & selected_rows, const std::vector & row_replicate_offset, AddedColumns & added_columns); diff --git a/src/Interpreters/HashJoin/HashJoinMethodsImpl.h b/src/Interpreters/HashJoin/HashJoinMethodsImpl.h index 2b5e92b6841..f415ed6f44f 100644 --- a/src/Interpreters/HashJoin/HashJoinMethodsImpl.h +++ b/src/Interpreters/HashJoin/HashJoinMethodsImpl.h @@ -358,6 +358,12 @@ size_t HashJoinMethods::joinRightColumnsSwitchMu if (added_columns.additional_filter_expression) { const bool mark_per_row_used = join_features.right || join_features.full || mapv.size() > 1; + LOG_DEBUG( + &Poco::Logger::get("debug"), + "__PRETTY_FUNCTION__={}, __LINE__={}, mark_per_row_used={}", + __PRETTY_FUNCTION__, + __LINE__, + mark_per_row_used); return joinRightColumnsWithAddtitionalFilter( std::forward>(key_getter_vector), mapv, @@ -554,9 +560,10 @@ void HashJoinMethods::setUsed(IColumn::Filter & } template -template +template ColumnPtr HashJoinMethods::buildAdditionalFilter( size_t left_start_row, + const Selector & selector, const std::vector & selected_rows, const std::vector & row_replicate_offset, AddedColumns & added_columns) @@ -569,12 +576,14 @@ ColumnPtr HashJoinMethods::buildAdditionalFilter result_column = ColumnUInt8::create(); break; } + LOG_DEBUG(&Poco::Logger::get("debug"), "__PRETTY_FUNCTION__={}, __LINE__={}", __PRETTY_FUNCTION__, __LINE__); const Block & sample_right_block = *((*selected_rows.begin())->block); if (!sample_right_block || !added_columns.additional_filter_expression) { auto filter = ColumnUInt8::create(); filter->insertMany(1, selected_rows.size()); result_column = std::move(filter); + LOG_DEBUG(&Poco::Logger::get("debug"), "__PRETTY_FUNCTION__={}, __LINE__={}", __PRETTY_FUNCTION__, __LINE__); break; } @@ -584,6 +593,7 @@ ColumnPtr HashJoinMethods::buildAdditionalFilter Block block; added_columns.additional_filter_expression->execute(block); result_column = block.getByPosition(0).column->cloneResized(selected_rows.size()); + LOG_DEBUG(&Poco::Logger::get("debug"), "__PRETTY_FUNCTION__={}, __LINE__={}", __PRETTY_FUNCTION__, __LINE__); break; } NameSet required_column_names; @@ -600,6 +610,14 @@ ColumnPtr HashJoinMethods::buildAdditionalFilter for (const auto & selected_row : selected_rows) { const auto & src_col = selected_row->block->getByPosition(right_col_pos); + if (executed_block.columns() == 0) + LOG_DEBUG( + &Poco::Logger::get("debug"), + "__PRETTY_FUNCTION__={}, __LINE__={}, src_col.name={}, selected_row->row_num={}", + __PRETTY_FUNCTION__, + __LINE__, + src_col.name, + selected_row->row_num); new_col->insertFrom(*src_col.column, selected_row->row_num); } executed_block.insert({std::move(new_col), col.type, col.name}); @@ -619,7 +637,9 @@ ColumnPtr HashJoinMethods::buildAdditionalFilter const size_t & left_offset = row_replicate_offset[i]; size_t rows = left_offset - prev_left_offset; if (rows) - new_col->insertManyFrom(*src_col->column, left_start_row + i - 1, rows); + { + new_col->insertManyFrom(*src_col->column, selector[left_start_row + i - 1], rows); + } prev_left_offset = left_offset; } executed_block.insert({std::move(new_col), src_col->type, col_name}); @@ -902,7 +922,7 @@ size_t HashJoinMethods::joinRightColumnsWithAddt it, left_start_row); } - auto filter_col = buildAdditionalFilter(left_start_row, selected_rows, row_replicate_offset, added_columns); + auto filter_col = buildAdditionalFilter(left_start_row, selector, selected_rows, row_replicate_offset, added_columns); copy_final_matched_rows(left_start_row, filter_col); if constexpr (join_features.need_replication) From 55c9a92e3eefbea69ede4f60c1926f595f15af69 Mon Sep 17 00:00:00 2001 From: Nikita Taranov Date: Tue, 10 Dec 2024 15:34:49 +0100 Subject: [PATCH 5/7] fix test --- ...6_join_on_inequal_expression_fast.reference | 18 ++++++++++++++++++ ...3006_join_on_inequal_expression_fast.sql.j2 | 4 ++-- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/tests/queries/0_stateless/03006_join_on_inequal_expression_fast.reference b/tests/queries/0_stateless/03006_join_on_inequal_expression_fast.reference index a70e70ef7e9..1c1162dfcd2 100644 --- a/tests/queries/0_stateless/03006_join_on_inequal_expression_fast.reference +++ b/tests/queries/0_stateless/03006_join_on_inequal_expression_fast.reference @@ -735,6 +735,24 @@ key1 b 2 3 2 key1 C 3 4 5 key1 c 3 2 1 key1 D 4 1 6 SELECT * FROM (SELECT 1 AS a, 1 AS b, 1 AS c) AS t1 INNER ANY JOIN (SELECT 1 AS a, 1 AS b, 1 AS c) AS t2 ON t1.a = t2.a AND (t1.b > 0 OR t2.b > 0); 1 1 1 1 1 1 +SET join_algorithm='parallel_hash'; +SELECT t1.*, t2.* FROM t1 INNER ANY JOIN t2 ON (t1.a < t2.a OR lower(t1.attr) == lower(t2.attr)) AND t1.key = t2.key ORDER BY (t1.key, t1.attr, t2.key, t2.attr); +key1 a 1 1 2 key1 A 1 2 1 +key1 b 2 3 2 key1 B 2 1 2 +key1 c 3 2 1 key1 C 3 4 5 +key1 d 4 7 2 key1 D 4 1 6 +key4 f 2 3 4 key4 F 1 1 1 +SELECT t1.*, t2.* from t1 INNER ANY JOIN t2 ON t1.key = t2.key and (t1.b + t2.b == t1.c + t2.c) ORDER BY (t1.key, t1.attr, t2.key, t2.attr); +key1 a 1 1 2 key1 A 1 2 1 +key1 b 2 3 2 key1 B 2 1 2 +key1 c 3 2 1 key1 B 2 1 2 +key1 d 4 7 2 key1 D 4 1 6 +SELECT t1.*, t2.* from t1 INNER ANY JOIN t2 ON t1.key = t2.key and (t1.a < t2.a) ORDER BY (t1.key, t1.attr, t2.key, t2.attr); +key1 a 1 1 2 key1 B 2 1 2 +key1 b 2 3 2 key1 C 3 4 5 +key1 c 3 2 1 key1 D 4 1 6 +SELECT * FROM (SELECT 1 AS a, 1 AS b, 1 AS c) AS t1 INNER ANY JOIN (SELECT 1 AS a, 1 AS b, 1 AS c) AS t2 ON t1.a = t2.a AND (t1.b > 0 OR t2.b > 0); +1 1 1 1 1 1 SET join_algorithm='hash'; SELECT t1.* FROM t1 INNER ANY JOIN t2 ON t1.key = t2.key AND t1.a < t2.a OR t1.a = t2.a ORDER BY ALL; key1 a 1 1 2 diff --git a/tests/queries/0_stateless/03006_join_on_inequal_expression_fast.sql.j2 b/tests/queries/0_stateless/03006_join_on_inequal_expression_fast.sql.j2 index 029cf10d3c3..12ecc994ace 100644 --- a/tests/queries/0_stateless/03006_join_on_inequal_expression_fast.sql.j2 +++ b/tests/queries/0_stateless/03006_join_on_inequal_expression_fast.sql.j2 @@ -50,7 +50,7 @@ SELECT t1.*, t2.* FROM t1 {{ join_type }} JOIN t2 ON t1.key = t2.key AND t1.a < {% endfor -%} {% endfor -%} -{% for algorithm in ['hash', 'grace_hash'] -%} +{% for algorithm in ['hash', 'grace_hash', 'parallel_hash'] -%} SET join_algorithm='{{ algorithm }}'; {% for join_type in ['INNER'] -%} {% for join_strictness in ['ANY'] -%} @@ -74,7 +74,7 @@ SELECT t1.* FROM t1 {{ join_type }} {{ join_strictness }} JOIN t2 ON t1.key = t2 -- { echoOff } -- test error messages -{% for algorithm in ['partial_merge', 'full_sorting_merge', 'parallel_hash', 'auto', 'direct'] -%} +{% for algorithm in ['partial_merge', 'full_sorting_merge', 'auto', 'direct'] -%} SET join_algorithm='{{ algorithm }}'; {% for join_type in ['LEFT', 'RIGHT', 'FULL'] -%} SELECT t1.*, t2.* FROM t1 {{ join_type }} JOIN t2 ON (t1.a < t2.a OR lower(t1.attr) == lower(t2.attr)) AND t1.key = t2.key ORDER BY (t1.key, t1.attr, t2.key, t2.attr); -- { serverError NOT_IMPLEMENTED } From 06901e1c6a68c0ad803062b78568838a59af2d4e Mon Sep 17 00:00:00 2001 From: Nikita Taranov Date: Tue, 10 Dec 2024 16:22:04 +0100 Subject: [PATCH 6/7] better --- src/Interpreters/HashJoin/HashJoinMethods.h | 4 ---- .../HashJoin/HashJoinMethodsImpl.h | 19 ------------------- 2 files changed, 23 deletions(-) diff --git a/src/Interpreters/HashJoin/HashJoinMethods.h b/src/Interpreters/HashJoin/HashJoinMethods.h index 5be3b86c1fb..49aa0e87526 100644 --- a/src/Interpreters/HashJoin/HashJoinMethods.h +++ b/src/Interpreters/HashJoin/HashJoinMethods.h @@ -9,10 +9,6 @@ #include #include -#include -#include -#include "Parsers/ExpressionListParsers.h" - namespace DB { /// Inserting an element into a hash table of the form `key -> reference to a string`, which will then be used by JOIN. diff --git a/src/Interpreters/HashJoin/HashJoinMethodsImpl.h b/src/Interpreters/HashJoin/HashJoinMethodsImpl.h index f415ed6f44f..74e22abd016 100644 --- a/src/Interpreters/HashJoin/HashJoinMethodsImpl.h +++ b/src/Interpreters/HashJoin/HashJoinMethodsImpl.h @@ -358,12 +358,6 @@ size_t HashJoinMethods::joinRightColumnsSwitchMu if (added_columns.additional_filter_expression) { const bool mark_per_row_used = join_features.right || join_features.full || mapv.size() > 1; - LOG_DEBUG( - &Poco::Logger::get("debug"), - "__PRETTY_FUNCTION__={}, __LINE__={}, mark_per_row_used={}", - __PRETTY_FUNCTION__, - __LINE__, - mark_per_row_used); return joinRightColumnsWithAddtitionalFilter( std::forward>(key_getter_vector), mapv, @@ -576,14 +570,12 @@ ColumnPtr HashJoinMethods::buildAdditionalFilter result_column = ColumnUInt8::create(); break; } - LOG_DEBUG(&Poco::Logger::get("debug"), "__PRETTY_FUNCTION__={}, __LINE__={}", __PRETTY_FUNCTION__, __LINE__); const Block & sample_right_block = *((*selected_rows.begin())->block); if (!sample_right_block || !added_columns.additional_filter_expression) { auto filter = ColumnUInt8::create(); filter->insertMany(1, selected_rows.size()); result_column = std::move(filter); - LOG_DEBUG(&Poco::Logger::get("debug"), "__PRETTY_FUNCTION__={}, __LINE__={}", __PRETTY_FUNCTION__, __LINE__); break; } @@ -593,7 +585,6 @@ ColumnPtr HashJoinMethods::buildAdditionalFilter Block block; added_columns.additional_filter_expression->execute(block); result_column = block.getByPosition(0).column->cloneResized(selected_rows.size()); - LOG_DEBUG(&Poco::Logger::get("debug"), "__PRETTY_FUNCTION__={}, __LINE__={}", __PRETTY_FUNCTION__, __LINE__); break; } NameSet required_column_names; @@ -610,14 +601,6 @@ ColumnPtr HashJoinMethods::buildAdditionalFilter for (const auto & selected_row : selected_rows) { const auto & src_col = selected_row->block->getByPosition(right_col_pos); - if (executed_block.columns() == 0) - LOG_DEBUG( - &Poco::Logger::get("debug"), - "__PRETTY_FUNCTION__={}, __LINE__={}, src_col.name={}, selected_row->row_num={}", - __PRETTY_FUNCTION__, - __LINE__, - src_col.name, - selected_row->row_num); new_col->insertFrom(*src_col.column, selected_row->row_num); } executed_block.insert({std::move(new_col), col.type, col.name}); @@ -637,9 +620,7 @@ ColumnPtr HashJoinMethods::buildAdditionalFilter const size_t & left_offset = row_replicate_offset[i]; size_t rows = left_offset - prev_left_offset; if (rows) - { new_col->insertManyFrom(*src_col->column, selector[left_start_row + i - 1], rows); - } prev_left_offset = left_offset; } executed_block.insert({std::move(new_col), src_col->type, col_name}); From 206d5153f7992eb18399673e44585e7b61967e0a Mon Sep 17 00:00:00 2001 From: Nikita Taranov Date: Tue, 10 Dec 2024 20:32:01 +0100 Subject: [PATCH 7/7] add heavier test --- ...el_hash_returns_same_res_as_hash.reference | 0 ..._parallel_hash_returns_same_res_as_hash.sh | 73 +++++++++++++++++++ 2 files changed, 73 insertions(+) create mode 100644 tests/queries/1_stateful/00184_parallel_hash_returns_same_res_as_hash.reference create mode 100755 tests/queries/1_stateful/00184_parallel_hash_returns_same_res_as_hash.sh diff --git a/tests/queries/1_stateful/00184_parallel_hash_returns_same_res_as_hash.reference b/tests/queries/1_stateful/00184_parallel_hash_returns_same_res_as_hash.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/1_stateful/00184_parallel_hash_returns_same_res_as_hash.sh b/tests/queries/1_stateful/00184_parallel_hash_returns_same_res_as_hash.sh new file mode 100755 index 00000000000..3f4a94ef756 --- /dev/null +++ b/tests/queries/1_stateful/00184_parallel_hash_returns_same_res_as_hash.sh @@ -0,0 +1,73 @@ +#!/usr/bin/env bash + +set -e + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + + +ROWS=123456 +SEED=$(${CLICKHOUSE_CLIENT} -q "SELECT reinterpretAsUInt32(today())") + +${CLICKHOUSE_CLIENT} --max_threads 16 --query=" +CREATE TABLE t1 ENGINE = MergeTree ORDER BY tuple() AS +SELECT + sipHash64(CounterID, $SEED) AS CounterID, + EventDate, + sipHash64(WatchID, $SEED) AS WatchID, + sipHash64(UserID, $SEED) AS UserID, + URL +FROM test.hits +ORDER BY + CounterID ASC, + EventDate ASC +LIMIT $ROWS; + +CREATE TABLE t2 ENGINE = MergeTree ORDER BY tuple() AS +SELECT + sipHash64(CounterID, $SEED) AS CounterID, + EventDate, + sipHash64(WatchID, $SEED) AS WatchID, + sipHash64(UserID, $SEED) AS UserID, + URL +FROM test.hits +ORDER BY + CounterID DESC, + EventDate DESC +LIMIT $ROWS; + +set max_memory_usage = 0; + +CREATE TABLE res_hash +ENGINE = MergeTree() +ORDER BY (CounterID, EventDate, WatchID, UserID, URL, t2.CounterID, t2.EventDate, t2.WatchID, t2.UserID, t2.URL) +AS SELECT + t1.*, + t2.* +FROM t1 +LEFT JOIN t2 ON (t1.UserID = t2.UserID) AND ((t1.EventDate < t2.EventDate) OR (length(t1.URL) > length(t2.URL))) +ORDER BY ALL +LIMIT $ROWS +SETTINGS join_algorithm = 'hash'; + +CREATE TABLE res_parallel_hash +ENGINE = MergeTree() +ORDER BY (CounterID, EventDate, WatchID, UserID, URL, t2.CounterID, t2.EventDate, t2.WatchID, t2.UserID, t2.URL) +AS SELECT + t1.*, + t2.* +FROM t1 +LEFT JOIN t2 ON (t1.UserID = t2.UserID) AND ((t1.EventDate < t2.EventDate) OR (length(t1.URL) > length(t2.URL))) +ORDER BY ALL +LIMIT $ROWS +SETTINGS join_algorithm = 'parallel_hash'; + +SELECT * +FROM ( + SELECT * FROM res_hash ORDER BY ALL + EXCEPT + SELECT * FROM res_parallel_hash ORDER BY ALL +) +LIMIT 1; +"