Merge pull request #60341 from KevinyhZou/improve_join_insert_from

Improve left/inner join performance by rerange right table by keys
This commit is contained in:
Julia Kartseva 2024-09-11 19:04:41 +00:00 committed by GitHub
commit b5289c1f08
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 159 additions and 8 deletions

View File

@ -923,6 +923,9 @@ class IColumn;
M(Bool, implicit_transaction, false, "If enabled and not already inside a transaction, wraps the query inside a full transaction (begin + commit or rollback)", 0) \ M(Bool, implicit_transaction, false, "If enabled and not already inside a transaction, wraps the query inside a full transaction (begin + commit or rollback)", 0) \
M(UInt64, grace_hash_join_initial_buckets, 1, "Initial number of grace hash join buckets", 0) \ M(UInt64, grace_hash_join_initial_buckets, 1, "Initial number of grace hash join buckets", 0) \
M(UInt64, grace_hash_join_max_buckets, 1024, "Limit on the number of grace hash join buckets", 0) \ M(UInt64, grace_hash_join_max_buckets, 1024, "Limit on the number of grace hash join buckets", 0) \
M(Int32, join_to_sort_minimum_perkey_rows, 40, "The lower limit of per-key average rows in the right table to determine whether to rerange the right table by key in left or inner join. This setting ensures that the optimization is not applied for sparse table keys", 0) \
M(Int32, join_to_sort_maximum_table_rows, 10000, "The maximum number of rows in the right table to determine whether to rerange the right table by key in left or inner join.", 0) \
M(Bool, allow_experimental_join_right_table_sorting, false, "If it is set to true, and the conditions of `join_to_sort_minimum_perkey_rows` and `join_to_sort_maximum_table_rows` are met, rerange the right table by key to improve the performance in left or inner hash join.", 0) \
M(Timezone, session_timezone, "", "This setting can be removed in the future due to potential caveats. It is experimental and is not suitable for production usage. The default timezone for current session or query. The server default timezone if empty.", 0) \ M(Timezone, session_timezone, "", "This setting can be removed in the future due to potential caveats. It is experimental and is not suitable for production usage. The default timezone for current session or query. The server default timezone if empty.", 0) \
M(Bool, use_hive_partitioning, false, "Allows to use hive partitioning for File, URL, S3, AzureBlobStorage and HDFS engines.", 0)\ M(Bool, use_hive_partitioning, false, "Allows to use hive partitioning for File, URL, S3, AzureBlobStorage and HDFS engines.", 0)\
\ \

View File

@ -99,7 +99,10 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"use_json_alias_for_old_object_type", true, false, "Use JSON type alias to create new JSON type"}, {"use_json_alias_for_old_object_type", true, false, "Use JSON type alias to create new JSON type"},
{"type_json_skip_duplicated_paths", false, false, "Allow to skip duplicated paths during JSON parsing"}, {"type_json_skip_duplicated_paths", false, false, "Allow to skip duplicated paths during JSON parsing"},
{"allow_experimental_vector_similarity_index", false, false, "Added new setting to allow experimental vector similarity indexes"}, {"allow_experimental_vector_similarity_index", false, false, "Added new setting to allow experimental vector similarity indexes"},
{"input_format_try_infer_datetimes_only_datetime64", true, false, "Allow to infer DateTime instead of DateTime64 in data formats"} {"input_format_try_infer_datetimes_only_datetime64", true, false, "Allow to infer DateTime instead of DateTime64 in data formats"},
{"join_to_sort_minimum_perkey_rows", 0, 40, "The lower limit of per-key average rows in the right table to determine whether to rerange the right table by key in left or inner join. This setting ensures that the optimization is not applied for sparse table keys"},
{"join_to_sort_maximum_table_rows", 0, 10000, "The maximum number of rows in the right table to determine whether to rerange the right table by key in left or inner join."},
{"allow_experimental_join_right_table_sorting", false, false, "If it is set to true, and the conditions of `join_to_sort_minimum_perkey_rows` and `join_to_sort_maximum_table_rows` are met, rerange the right table by key to improve the performance in left or inner hash join."},
} }
}, },
{"24.7", {"24.7",

View File

@ -34,6 +34,23 @@ void AddedColumns<true>::buildOutput()
{ {
if (join_data_avg_perkey_rows < output_by_row_list_threshold) if (join_data_avg_perkey_rows < output_by_row_list_threshold)
buildOutputFromBlocks<true>(); buildOutputFromBlocks<true>();
else if (join_data_sorted)
{
for (size_t i = 0; i < this->size(); ++i)
{
auto & col = columns[i];
for (auto row_ref_i : lazy_output.row_refs)
{
if (row_ref_i)
{
const RowRefList * row_ref_list = reinterpret_cast<const RowRefList *>(row_ref_i);
col->insertRangeFrom(*row_ref_list->block->getByPosition(right_indexes[i]).column, row_ref_list->row_num, row_ref_list->rows);
}
else
type_name[i].type->insertDefaultInto(*col);
}
}
}
else else
{ {
for (size_t i = 0; i < this->size(); ++i) for (size_t i = 0; i < this->size(); ++i)

View File

@ -66,6 +66,9 @@ public:
, join_on_keys(join_on_keys_) , join_on_keys(join_on_keys_)
, additional_filter_expression(additional_filter_expression_) , additional_filter_expression(additional_filter_expression_)
, rows_to_add(left_block.rows()) , rows_to_add(left_block.rows())
, join_data_avg_perkey_rows(join.getJoinedData()->avgPerKeyRows())
, output_by_row_list_threshold(join.getTableJoin().outputByRowListPerkeyRowsThreshold())
, join_data_sorted(join.getJoinedData()->sorted)
, is_join_get(is_join_get_) , is_join_get(is_join_get_)
{ {
size_t num_columns_to_add = block_with_columns_to_add.columns(); size_t num_columns_to_add = block_with_columns_to_add.columns();
@ -113,8 +116,6 @@ public:
if (columns[j]->isNullable() && !saved_column->isNullable()) if (columns[j]->isNullable() && !saved_column->isNullable())
nullable_column_ptrs[j] = typeid_cast<ColumnNullable *>(columns[j].get()); nullable_column_ptrs[j] = typeid_cast<ColumnNullable *>(columns[j].get());
} }
join_data_avg_perkey_rows = join.getJoinedData()->avgPerKeyRows();
output_by_row_list_threshold = join.getTableJoin().outputByRowListPerkeyRowsThreshold();
} }
size_t size() const { return columns.size(); } size_t size() const { return columns.size(); }
@ -149,6 +150,7 @@ public:
bool output_by_row_list = false; bool output_by_row_list = false;
size_t join_data_avg_perkey_rows = 0; size_t join_data_avg_perkey_rows = 0;
size_t output_by_row_list_threshold = 0; size_t output_by_row_list_threshold = 0;
bool join_data_sorted = false;
IColumn::Filter filter; IColumn::Filter filter;
void reserve(bool need_replicate) void reserve(bool need_replicate)

View File

@ -649,7 +649,6 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
} }
data->keys_to_join = total_rows; data->keys_to_join = total_rows;
shrinkStoredBlocksToFit(total_bytes); shrinkStoredBlocksToFit(total_bytes);
return table_join->sizeLimits().check(total_rows, total_bytes, "JOIN", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED); return table_join->sizeLimits().check(total_rows, total_bytes, "JOIN", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED);
} }
@ -1361,4 +1360,96 @@ bool HashJoin::needUsedFlagsForPerRightTableRow(std::shared_ptr<TableJoin> table
return false; return false;
} }
template <JoinKind KIND, typename Map, JoinStrictness STRICTNESS>
void HashJoin::tryRerangeRightTableDataImpl(Map & map [[maybe_unused]])
{
constexpr JoinFeatures<KIND, STRICTNESS, Map> join_features;
if constexpr (!join_features.is_all_join || (!join_features.left && !join_features.inner))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Only left or inner join table can be reranged.");
else
{
auto merge_rows_into_one_block = [&](BlocksList & blocks, RowRefList & rows_ref)
{
auto it = rows_ref.begin();
if (it.ok())
{
if (blocks.empty() || blocks.back().rows() >= DEFAULT_BLOCK_SIZE)
blocks.emplace_back(it->block->cloneEmpty());
}
else
{
return;
}
auto & block = blocks.back();
size_t start_row = block.rows();
for (; it.ok(); ++it)
{
for (size_t i = 0; i < block.columns(); ++i)
{
auto & col = block.getByPosition(i).column->assumeMutableRef();
col.insertFrom(*it->block->getByPosition(i).column, it->row_num);
}
}
if (block.rows() > start_row)
{
RowRefList new_rows_ref(&block, start_row, block.rows() - start_row);
rows_ref = std::move(new_rows_ref);
}
};
auto visit_rows_map = [&](BlocksList & blocks, MapsAll & rows_map)
{
switch (data->type)
{
#define M(TYPE) \
case Type::TYPE: \
{\
rows_map.TYPE->forEachMapped([&](RowRefList & rows_ref) { merge_rows_into_one_block(blocks, rows_ref); }); \
break; \
}
APPLY_FOR_JOIN_VARIANTS(M)
#undef M
default:
break;
}
};
BlocksList sorted_blocks;
visit_rows_map(sorted_blocks, map);
data->blocks.swap(sorted_blocks);
}
}
void HashJoin::tryRerangeRightTableData()
{
if (!table_join->allowJoinSorting() || table_join->getMixedJoinExpression() || !isInnerOrLeft(kind) || strictness != JoinStrictness::All)
return;
/// We should not rerange the right table on such conditions:
/// 1. the right table is already reranged by key or it is empty.
/// 2. the join clauses size is greater than 1, like `...join on a.key1=b.key1 or a.key2=b.key2`, we can not rerange the right table on different set of keys.
/// 3. the number of right table rows exceed the threshold, which may result in a significant cost for reranging and lead to performance degradation.
/// 4. the keys of right table is very sparse, which may result in insignificant performance improvement after reranging by key.
if (!data || data->sorted || data->blocks.empty() || data->maps.size() > 1 || data->rows_to_join > table_join->sortRightMaximumTableRows() || data->avgPerKeyRows() < table_join->sortRightMinimumPerkeyRows())
return;
if (data->keys_to_join == 0)
data->keys_to_join = getTotalRowCount();
/// If the there is no columns to add, means no columns to output, then the rerange would not improve performance by using column's `insertRangeFrom`
/// to replace column's `insertFrom` to make the output.
if (sample_block_with_columns_to_add.columns() == 0)
{
LOG_DEBUG(log, "The joined right table total rows :{}, total keys :{}", data->rows_to_join, data->keys_to_join);
return;
}
[[maybe_unused]] bool result = joinDispatch(
kind,
strictness,
data->maps.front(),
/*prefer_use_maps_all*/ false,
[&](auto kind_, auto strictness_, auto & map_) { tryRerangeRightTableDataImpl<kind_, decltype(map_), strictness_>(map_); });
chassert(result);
data->sorted = true;
}
} }

View File

@ -345,11 +345,12 @@ public:
size_t blocks_allocated_size = 0; size_t blocks_allocated_size = 0;
size_t blocks_nullmaps_allocated_size = 0; size_t blocks_nullmaps_allocated_size = 0;
/// Number of rows of right table to join /// Number of rows of right table to join
size_t rows_to_join = 0; size_t rows_to_join = 0;
/// Number of keys of right table to join /// Number of keys of right table to join
size_t keys_to_join = 0; size_t keys_to_join = 0;
/// Whether the right table reranged by key
bool sorted = false;
size_t avgPerKeyRows() const size_t avgPerKeyRows() const
{ {
@ -465,6 +466,10 @@ private:
void validateAdditionalFilterExpression(std::shared_ptr<ExpressionActions> additional_filter_expression); void validateAdditionalFilterExpression(std::shared_ptr<ExpressionActions> additional_filter_expression);
bool needUsedFlagsForPerRightTableRow(std::shared_ptr<TableJoin> table_join_) const; bool needUsedFlagsForPerRightTableRow(std::shared_ptr<TableJoin> table_join_) const;
void tryRerangeRightTableData() override;
template <JoinKind KIND, typename Map, JoinStrictness STRICTNESS>
void tryRerangeRightTableDataImpl(Map & map);
}; };
} }

View File

@ -83,7 +83,6 @@ public:
const Block & block_with_columns_to_add, const Block & block_with_columns_to_add,
const MapsTemplateVector & maps_, const MapsTemplateVector & maps_,
bool is_join_get = false); bool is_join_get = false);
private: private:
template <typename KeyGetter, bool is_asof_join> template <typename KeyGetter, bool is_asof_join>
static KeyGetter createKeyGetter(const ColumnRawPtrs & key_columns, const Sizes & key_sizes); static KeyGetter createKeyGetter(const ColumnRawPtrs & key_columns, const Sizes & key_sizes);
@ -199,4 +198,3 @@ extern template class HashJoinMethods<JoinKind::Full, JoinStrictness::Semi, Hash
extern template class HashJoinMethods<JoinKind::Full, JoinStrictness::Anti, HashJoin::MapsOne>; extern template class HashJoinMethods<JoinKind::Full, JoinStrictness::Anti, HashJoin::MapsOne>;
extern template class HashJoinMethods<JoinKind::Full, JoinStrictness::Asof, HashJoin::MapsAsof>; extern template class HashJoinMethods<JoinKind::Full, JoinStrictness::Asof, HashJoin::MapsAsof>;
} }

View File

@ -115,6 +115,7 @@ public:
/// Peek next stream of delayed joined blocks. /// Peek next stream of delayed joined blocks.
virtual IBlocksStreamPtr getDelayedBlocks() { return nullptr; } virtual IBlocksStreamPtr getDelayedBlocks() { return nullptr; }
virtual bool hasDelayedBlocks() const { return false; } virtual bool hasDelayedBlocks() const { return false; }
virtual void tryRerangeRightTableData() {}
virtual IBlocksStreamPtr virtual IBlocksStreamPtr
getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const = 0; getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const = 0;

View File

@ -123,6 +123,7 @@ struct RowRefList : RowRef
RowRefList() {} /// NOLINT RowRefList() {} /// NOLINT
RowRefList(const Block * block_, size_t row_num_) : RowRef(block_, row_num_), rows(1) {} RowRefList(const Block * block_, size_t row_num_) : RowRef(block_, row_num_), rows(1) {}
RowRefList(const Block * block_, size_t row_start_, size_t rows_) : RowRef(block_, row_start_), rows(static_cast<SizeT>(rows_)) {}
ForwardIterator begin() const { return ForwardIterator(this); } ForwardIterator begin() const { return ForwardIterator(this); }

View File

@ -116,6 +116,9 @@ TableJoin::TableJoin(const Settings & settings, VolumePtr tmp_volume_, Temporary
, max_files_to_merge(settings.join_on_disk_max_files_to_merge) , max_files_to_merge(settings.join_on_disk_max_files_to_merge)
, temporary_files_codec(settings.temporary_files_codec) , temporary_files_codec(settings.temporary_files_codec)
, output_by_rowlist_perkey_rows_threshold(settings.join_output_by_rowlist_perkey_rows_threshold) , output_by_rowlist_perkey_rows_threshold(settings.join_output_by_rowlist_perkey_rows_threshold)
, sort_right_minimum_perkey_rows(settings.join_to_sort_minimum_perkey_rows)
, sort_right_maximum_table_rows(settings.join_to_sort_maximum_table_rows)
, allow_join_sorting(settings.allow_experimental_join_right_table_sorting)
, max_memory_usage(settings.max_memory_usage) , max_memory_usage(settings.max_memory_usage)
, tmp_volume(tmp_volume_) , tmp_volume(tmp_volume_)
, tmp_data(tmp_data_) , tmp_data(tmp_data_)

View File

@ -149,6 +149,9 @@ private:
const size_t max_files_to_merge = 0; const size_t max_files_to_merge = 0;
const String temporary_files_codec = "LZ4"; const String temporary_files_codec = "LZ4";
const size_t output_by_rowlist_perkey_rows_threshold = 0; const size_t output_by_rowlist_perkey_rows_threshold = 0;
const size_t sort_right_minimum_perkey_rows = 0;
const size_t sort_right_maximum_table_rows = 0;
const bool allow_join_sorting = false;
/// Value if setting max_memory_usage for query, can be used when max_bytes_in_join is not specified. /// Value if setting max_memory_usage for query, can be used when max_bytes_in_join is not specified.
size_t max_memory_usage = 0; size_t max_memory_usage = 0;
@ -297,6 +300,9 @@ public:
} }
size_t outputByRowListPerkeyRowsThreshold() const { return output_by_rowlist_perkey_rows_threshold; } size_t outputByRowListPerkeyRowsThreshold() const { return output_by_rowlist_perkey_rows_threshold; }
size_t sortRightMinimumPerkeyRows() const { return sort_right_minimum_perkey_rows; }
size_t sortRightMaximumTableRows() const { return sort_right_maximum_table_rows; }
bool allowJoinSorting() const { return allow_join_sorting; }
size_t defaultMaxBytes() const { return default_max_bytes; } size_t defaultMaxBytes() const { return default_max_bytes; }
size_t maxJoinedBlockRows() const { return max_joined_block_rows; } size_t maxJoinedBlockRows() const { return max_joined_block_rows; }
size_t maxRowsInRightBlock() const { return partial_merge_join_rows_in_right_blocks; } size_t maxRowsInRightBlock() const { return partial_merge_join_rows_in_right_blocks; }

View File

@ -299,13 +299,17 @@ IProcessor::Status FillingRightJoinSideTransform::prepare()
void FillingRightJoinSideTransform::work() void FillingRightJoinSideTransform::work()
{ {
auto block = inputs.front().getHeader().cloneWithColumns(chunk.detachColumns()); auto & input = inputs.front();
auto block = input.getHeader().cloneWithColumns(chunk.detachColumns());
if (for_totals) if (for_totals)
join->setTotals(block); join->setTotals(block);
else else
stop_reading = !join->addBlockToJoin(block); stop_reading = !join->addBlockToJoin(block);
if (input.isFinished())
join->tryRerangeRightTableData();
set_totals = for_totals; set_totals = for_totals;
} }

View File

@ -0,0 +1,2 @@
9
9

View File

@ -0,0 +1,14 @@
drop table if exists test_left;
drop table if exists test_right;
CREATE TABLE test_left (a Int64, b String, c LowCardinality(String)) ENGINE = MergeTree() ORDER BY a;
CREATE TABLE test_right (a Int64, b String, c LowCardinality(String)) ENGINE = MergeTree() ORDER BY a;
INSERT INTO test_left SELECT number % 10000, number % 10000, number % 10000 FROM numbers(100000);
INSERT INTO test_right SELECT number % 10 , number % 10, number % 10 FROM numbers(10000);
SELECT MAX(test_right.a) FROM test_left INNER JOIN test_right on test_left.b = test_right.b SETTINGS allow_experimental_join_right_table_sorting=true;
SELECT MAX(test_right.a) FROM test_left LEFT JOIN test_right on test_left.b = test_right.b SETTINGS allow_experimental_join_right_table_sorting=true;
drop table test_left;
drop table test_right;

View File

@ -31,6 +31,7 @@ allow_experimental_statistics
allow_experimental_time_series_table allow_experimental_time_series_table
allow_experimental_undrop_table_query allow_experimental_undrop_table_query
allow_experimental_usearch_index allow_experimental_usearch_index
allow_experimental_join_right_table_sorting
allow_get_client_http_header allow_get_client_http_header
allow_introspection_functions allow_introspection_functions
allow_materialized_view_with_bad_select allow_materialized_view_with_bad_select