diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 493752fc3fe..4ea06763320 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -923,6 +923,9 @@ class IColumn; M(Bool, implicit_transaction, false, "If enabled and not already inside a transaction, wraps the query inside a full transaction (begin + commit or rollback)", 0) \ M(UInt64, grace_hash_join_initial_buckets, 1, "Initial number of grace hash join buckets", 0) \ M(UInt64, grace_hash_join_max_buckets, 1024, "Limit on the number of grace hash join buckets", 0) \ + M(Int32, join_to_sort_minimum_perkey_rows, 40, "The lower limit of per-key average rows in the right table to determine whether to rerange the right table by key in left or inner join. This setting ensures that the optimization is not applied for sparse table keys", 0) \ + M(Int32, join_to_sort_maximum_table_rows, 10000, "The maximum number of rows in the right table to determine whether to rerange the right table by key in left or inner join.", 0) \ + M(Bool, allow_experimental_join_right_table_sorting, false, "If it is set to true, and the conditions of `join_to_sort_minimum_perkey_rows` and `join_to_sort_maximum_table_rows` are met, rerange the right table by key to improve the performance in left or inner hash join.", 0) \ M(Timezone, session_timezone, "", "This setting can be removed in the future due to potential caveats. It is experimental and is not suitable for production usage. The default timezone for current session or query. The server default timezone if empty.", 0) \ M(Bool, use_hive_partitioning, false, "Allows to use hive partitioning for File, URL, S3, AzureBlobStorage and HDFS engines.", 0)\ \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 19f2d5ccdf0..664154a4887 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -99,7 +99,10 @@ static std::initializer_list::buildOutput() { if (join_data_avg_perkey_rows < output_by_row_list_threshold) buildOutputFromBlocks(); + else if (join_data_sorted) + { + for (size_t i = 0; i < this->size(); ++i) + { + auto & col = columns[i]; + for (auto row_ref_i : lazy_output.row_refs) + { + if (row_ref_i) + { + const RowRefList * row_ref_list = reinterpret_cast(row_ref_i); + col->insertRangeFrom(*row_ref_list->block->getByPosition(right_indexes[i]).column, row_ref_list->row_num, row_ref_list->rows); + } + else + type_name[i].type->insertDefaultInto(*col); + } + } + } else { for (size_t i = 0; i < this->size(); ++i) diff --git a/src/Interpreters/HashJoin/AddedColumns.h b/src/Interpreters/HashJoin/AddedColumns.h index f1b95a63be6..4603d493329 100644 --- a/src/Interpreters/HashJoin/AddedColumns.h +++ b/src/Interpreters/HashJoin/AddedColumns.h @@ -66,6 +66,9 @@ public: , join_on_keys(join_on_keys_) , additional_filter_expression(additional_filter_expression_) , rows_to_add(left_block.rows()) + , join_data_avg_perkey_rows(join.getJoinedData()->avgPerKeyRows()) + , output_by_row_list_threshold(join.getTableJoin().outputByRowListPerkeyRowsThreshold()) + , join_data_sorted(join.getJoinedData()->sorted) , is_join_get(is_join_get_) { size_t num_columns_to_add = block_with_columns_to_add.columns(); @@ -113,8 +116,6 @@ public: if (columns[j]->isNullable() && !saved_column->isNullable()) nullable_column_ptrs[j] = typeid_cast(columns[j].get()); } - join_data_avg_perkey_rows = join.getJoinedData()->avgPerKeyRows(); - output_by_row_list_threshold = join.getTableJoin().outputByRowListPerkeyRowsThreshold(); } size_t size() const { return columns.size(); } @@ -149,6 +150,7 @@ public: bool output_by_row_list = false; size_t join_data_avg_perkey_rows = 0; size_t output_by_row_list_threshold = 0; + bool join_data_sorted = false; IColumn::Filter filter; void reserve(bool need_replicate) diff --git a/src/Interpreters/HashJoin/HashJoin.cpp b/src/Interpreters/HashJoin/HashJoin.cpp index 9c07a71e614..1b8b45b94ea 100644 --- a/src/Interpreters/HashJoin/HashJoin.cpp +++ b/src/Interpreters/HashJoin/HashJoin.cpp @@ -649,7 +649,6 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits) } data->keys_to_join = total_rows; shrinkStoredBlocksToFit(total_bytes); - return table_join->sizeLimits().check(total_rows, total_bytes, "JOIN", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED); } @@ -1361,4 +1360,96 @@ bool HashJoin::needUsedFlagsForPerRightTableRow(std::shared_ptr table return false; } +template +void HashJoin::tryRerangeRightTableDataImpl(Map & map [[maybe_unused]]) +{ + constexpr JoinFeatures join_features; + if constexpr (!join_features.is_all_join || (!join_features.left && !join_features.inner)) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Only left or inner join table can be reranged."); + else + { + auto merge_rows_into_one_block = [&](BlocksList & blocks, RowRefList & rows_ref) + { + auto it = rows_ref.begin(); + if (it.ok()) + { + if (blocks.empty() || blocks.back().rows() >= DEFAULT_BLOCK_SIZE) + blocks.emplace_back(it->block->cloneEmpty()); + } + else + { + return; + } + auto & block = blocks.back(); + size_t start_row = block.rows(); + for (; it.ok(); ++it) + { + for (size_t i = 0; i < block.columns(); ++i) + { + auto & col = block.getByPosition(i).column->assumeMutableRef(); + col.insertFrom(*it->block->getByPosition(i).column, it->row_num); + } + } + if (block.rows() > start_row) + { + RowRefList new_rows_ref(&block, start_row, block.rows() - start_row); + rows_ref = std::move(new_rows_ref); + } + }; + + auto visit_rows_map = [&](BlocksList & blocks, MapsAll & rows_map) + { + switch (data->type) + { + #define M(TYPE) \ + case Type::TYPE: \ + {\ + rows_map.TYPE->forEachMapped([&](RowRefList & rows_ref) { merge_rows_into_one_block(blocks, rows_ref); }); \ + break; \ + } + APPLY_FOR_JOIN_VARIANTS(M) + #undef M + default: + break; + } + }; + BlocksList sorted_blocks; + visit_rows_map(sorted_blocks, map); + data->blocks.swap(sorted_blocks); + } +} + +void HashJoin::tryRerangeRightTableData() +{ + if (!table_join->allowJoinSorting() || table_join->getMixedJoinExpression() || !isInnerOrLeft(kind) || strictness != JoinStrictness::All) + return; + + /// We should not rerange the right table on such conditions: + /// 1. the right table is already reranged by key or it is empty. + /// 2. the join clauses size is greater than 1, like `...join on a.key1=b.key1 or a.key2=b.key2`, we can not rerange the right table on different set of keys. + /// 3. the number of right table rows exceed the threshold, which may result in a significant cost for reranging and lead to performance degradation. + /// 4. the keys of right table is very sparse, which may result in insignificant performance improvement after reranging by key. + if (!data || data->sorted || data->blocks.empty() || data->maps.size() > 1 || data->rows_to_join > table_join->sortRightMaximumTableRows() || data->avgPerKeyRows() < table_join->sortRightMinimumPerkeyRows()) + return; + + if (data->keys_to_join == 0) + data->keys_to_join = getTotalRowCount(); + + /// If the there is no columns to add, means no columns to output, then the rerange would not improve performance by using column's `insertRangeFrom` + /// to replace column's `insertFrom` to make the output. + if (sample_block_with_columns_to_add.columns() == 0) + { + LOG_DEBUG(log, "The joined right table total rows :{}, total keys :{}", data->rows_to_join, data->keys_to_join); + return; + } + [[maybe_unused]] bool result = joinDispatch( + kind, + strictness, + data->maps.front(), + /*prefer_use_maps_all*/ false, + [&](auto kind_, auto strictness_, auto & map_) { tryRerangeRightTableDataImpl(map_); }); + chassert(result); + data->sorted = true; +} + } diff --git a/src/Interpreters/HashJoin/HashJoin.h b/src/Interpreters/HashJoin/HashJoin.h index d645b8e9273..230343691ea 100644 --- a/src/Interpreters/HashJoin/HashJoin.h +++ b/src/Interpreters/HashJoin/HashJoin.h @@ -345,11 +345,12 @@ public: size_t blocks_allocated_size = 0; size_t blocks_nullmaps_allocated_size = 0; - /// Number of rows of right table to join size_t rows_to_join = 0; /// Number of keys of right table to join size_t keys_to_join = 0; + /// Whether the right table reranged by key + bool sorted = false; size_t avgPerKeyRows() const { @@ -465,6 +466,10 @@ private: void validateAdditionalFilterExpression(std::shared_ptr additional_filter_expression); bool needUsedFlagsForPerRightTableRow(std::shared_ptr table_join_) const; + + void tryRerangeRightTableData() override; + template + void tryRerangeRightTableDataImpl(Map & map); }; } diff --git a/src/Interpreters/HashJoin/HashJoinMethods.h b/src/Interpreters/HashJoin/HashJoinMethods.h index 97ad57d26ea..c5b54a62f36 100644 --- a/src/Interpreters/HashJoin/HashJoinMethods.h +++ b/src/Interpreters/HashJoin/HashJoinMethods.h @@ -83,7 +83,6 @@ public: const Block & block_with_columns_to_add, const MapsTemplateVector & maps_, bool is_join_get = false); - private: template static KeyGetter createKeyGetter(const ColumnRawPtrs & key_columns, const Sizes & key_sizes); @@ -199,4 +198,3 @@ extern template class HashJoinMethods; extern template class HashJoinMethods; } - diff --git a/src/Interpreters/IJoin.h b/src/Interpreters/IJoin.h index 7374348da50..8f648de2538 100644 --- a/src/Interpreters/IJoin.h +++ b/src/Interpreters/IJoin.h @@ -115,6 +115,7 @@ public: /// Peek next stream of delayed joined blocks. virtual IBlocksStreamPtr getDelayedBlocks() { return nullptr; } virtual bool hasDelayedBlocks() const { return false; } + virtual void tryRerangeRightTableData() {} virtual IBlocksStreamPtr getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const = 0; diff --git a/src/Interpreters/RowRefs.h b/src/Interpreters/RowRefs.h index 7c98c47dd11..f8ac68191d6 100644 --- a/src/Interpreters/RowRefs.h +++ b/src/Interpreters/RowRefs.h @@ -123,6 +123,7 @@ struct RowRefList : RowRef RowRefList() {} /// NOLINT RowRefList(const Block * block_, size_t row_num_) : RowRef(block_, row_num_), rows(1) {} + RowRefList(const Block * block_, size_t row_start_, size_t rows_) : RowRef(block_, row_start_), rows(static_cast(rows_)) {} ForwardIterator begin() const { return ForwardIterator(this); } diff --git a/src/Interpreters/TableJoin.cpp b/src/Interpreters/TableJoin.cpp index 138085f0710..59a0374051f 100644 --- a/src/Interpreters/TableJoin.cpp +++ b/src/Interpreters/TableJoin.cpp @@ -116,6 +116,9 @@ TableJoin::TableJoin(const Settings & settings, VolumePtr tmp_volume_, Temporary , max_files_to_merge(settings.join_on_disk_max_files_to_merge) , temporary_files_codec(settings.temporary_files_codec) , output_by_rowlist_perkey_rows_threshold(settings.join_output_by_rowlist_perkey_rows_threshold) + , sort_right_minimum_perkey_rows(settings.join_to_sort_minimum_perkey_rows) + , sort_right_maximum_table_rows(settings.join_to_sort_maximum_table_rows) + , allow_join_sorting(settings.allow_experimental_join_right_table_sorting) , max_memory_usage(settings.max_memory_usage) , tmp_volume(tmp_volume_) , tmp_data(tmp_data_) diff --git a/src/Interpreters/TableJoin.h b/src/Interpreters/TableJoin.h index 4d626084d81..e1bae55a4ed 100644 --- a/src/Interpreters/TableJoin.h +++ b/src/Interpreters/TableJoin.h @@ -149,6 +149,9 @@ private: const size_t max_files_to_merge = 0; const String temporary_files_codec = "LZ4"; const size_t output_by_rowlist_perkey_rows_threshold = 0; + const size_t sort_right_minimum_perkey_rows = 0; + const size_t sort_right_maximum_table_rows = 0; + const bool allow_join_sorting = false; /// Value if setting max_memory_usage for query, can be used when max_bytes_in_join is not specified. size_t max_memory_usage = 0; @@ -297,6 +300,9 @@ public: } size_t outputByRowListPerkeyRowsThreshold() const { return output_by_rowlist_perkey_rows_threshold; } + size_t sortRightMinimumPerkeyRows() const { return sort_right_minimum_perkey_rows; } + size_t sortRightMaximumTableRows() const { return sort_right_maximum_table_rows; } + bool allowJoinSorting() const { return allow_join_sorting; } size_t defaultMaxBytes() const { return default_max_bytes; } size_t maxJoinedBlockRows() const { return max_joined_block_rows; } size_t maxRowsInRightBlock() const { return partial_merge_join_rows_in_right_blocks; } diff --git a/src/Processors/Transforms/JoiningTransform.cpp b/src/Processors/Transforms/JoiningTransform.cpp index ca204bcb482..f2fb6327129 100644 --- a/src/Processors/Transforms/JoiningTransform.cpp +++ b/src/Processors/Transforms/JoiningTransform.cpp @@ -299,13 +299,17 @@ IProcessor::Status FillingRightJoinSideTransform::prepare() void FillingRightJoinSideTransform::work() { - auto block = inputs.front().getHeader().cloneWithColumns(chunk.detachColumns()); + auto & input = inputs.front(); + auto block = input.getHeader().cloneWithColumns(chunk.detachColumns()); if (for_totals) join->setTotals(block); else stop_reading = !join->addBlockToJoin(block); + if (input.isFinished()) + join->tryRerangeRightTableData(); + set_totals = for_totals; } diff --git a/tests/queries/0_stateless/03228_join_to_rerange_right_table.reference b/tests/queries/0_stateless/03228_join_to_rerange_right_table.reference new file mode 100644 index 00000000000..b62923296e5 --- /dev/null +++ b/tests/queries/0_stateless/03228_join_to_rerange_right_table.reference @@ -0,0 +1,2 @@ +9 +9 diff --git a/tests/queries/0_stateless/03228_join_to_rerange_right_table.sql b/tests/queries/0_stateless/03228_join_to_rerange_right_table.sql new file mode 100644 index 00000000000..f3ee0f0b933 --- /dev/null +++ b/tests/queries/0_stateless/03228_join_to_rerange_right_table.sql @@ -0,0 +1,14 @@ +drop table if exists test_left; +drop table if exists test_right; + +CREATE TABLE test_left (a Int64, b String, c LowCardinality(String)) ENGINE = MergeTree() ORDER BY a; +CREATE TABLE test_right (a Int64, b String, c LowCardinality(String)) ENGINE = MergeTree() ORDER BY a; + +INSERT INTO test_left SELECT number % 10000, number % 10000, number % 10000 FROM numbers(100000); +INSERT INTO test_right SELECT number % 10 , number % 10, number % 10 FROM numbers(10000); + +SELECT MAX(test_right.a) FROM test_left INNER JOIN test_right on test_left.b = test_right.b SETTINGS allow_experimental_join_right_table_sorting=true; +SELECT MAX(test_right.a) FROM test_left LEFT JOIN test_right on test_left.b = test_right.b SETTINGS allow_experimental_join_right_table_sorting=true; + +drop table test_left; +drop table test_right; diff --git a/utils/check-style/experimental_settings_ignore.txt b/utils/check-style/experimental_settings_ignore.txt index 94c46cf562e..3eda9821799 100644 --- a/utils/check-style/experimental_settings_ignore.txt +++ b/utils/check-style/experimental_settings_ignore.txt @@ -31,6 +31,7 @@ allow_experimental_statistics allow_experimental_time_series_table allow_experimental_undrop_table_query allow_experimental_usearch_index +allow_experimental_join_right_table_sorting allow_get_client_http_header allow_introspection_functions allow_materialized_view_with_bad_select