Speedup partition key detection if block has only one partition

This commit is contained in:
alesapin 2024-11-23 19:48:35 +01:00
parent 1aceb608f3
commit ee6ff8bbd6

View File

@ -298,62 +298,67 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(
ColumnRawPtrs partition_columns;
partition_columns.reserve(partition_key_names_and_types.size());
bool all_partition_columns_are_equal = true;
for (const auto & element : partition_key_names_and_types)
{
partition_columns.emplace_back(block_copy.getByName(element.name).column.get());
PODArray<size_t> partition_num_to_first_row;
IColumn::Selector selector;
buildScatterSelector(partition_columns, partition_num_to_first_row, selector, max_parts, context);
auto async_insert_info_with_partition = scatterAsyncInsertInfoBySelector(async_insert_info, selector, partition_num_to_first_row.size());
size_t partitions_count = partition_num_to_first_row.size();
result.reserve(partitions_count);
auto get_partition = [&](size_t num)
if (!partition_columns.back()->hasEqualValues())
all_partition_columns_are_equal = false;
}
auto get_partition = [&](size_t row_num)
{
Row partition(partition_columns.size());
for (size_t i = 0; i < partition_columns.size(); ++i)
partition[i] = (*partition_columns[i])[partition_num_to_first_row[num]];
partition[i] = (*partition_columns[i])[row_num];
return partition;
};
if (partitions_count == 1)
if (!all_partition_columns_are_equal)
{
PODArray<size_t> partition_num_to_first_row;
IColumn::Selector selector;
buildScatterSelector(partition_columns, partition_num_to_first_row, selector, max_parts, context);
auto async_insert_info_with_partition = scatterAsyncInsertInfoBySelector(async_insert_info, selector, partition_num_to_first_row.size());
size_t partitions_count = partition_num_to_first_row.size();
result.reserve(partitions_count);
for (size_t i = 0; i < partitions_count; ++i)
result.emplace_back(block.cloneEmpty(), get_partition(i));
for (size_t col = 0; col < block.columns(); ++col)
{
MutableColumns scattered = block.getByPosition(col).column->scatter(partitions_count, selector);
for (size_t i = 0; i < partitions_count; ++i)
result[i].block.getByPosition(col).column = std::move(scattered[i]);
}
for (size_t i = 0; i < async_insert_info_with_partition.size(); ++i)
{
if (async_insert_info_with_partition[i] == nullptr)
{
LOG_ERROR(
getLogger("MergeTreeDataWriter"),
"The {}th element in async_insert_info_with_partition is nullptr. There are totally {} partitions in the insert. Selector content is ({}). Offsets content is ({})",
i, partitions_count, fmt::join(selector.begin(), selector.end(), ","), fmt::join(async_insert_info->offsets.begin(), async_insert_info->offsets.end(), ","));
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected error for async deduplicated insert, please check error logs");
}
result[i].offsets = std::move(async_insert_info_with_partition[i]->offsets);
result[i].tokens = std::move(async_insert_info_with_partition[i]->tokens);
}
}
else
{
/// A typical case is when there is one partition (you do not need to split anything).
/// NOTE: returning a copy of the original block so that calculated partition key columns
/// do not interfere with possible calculated primary key columns of the same name.
result.emplace_back(Block(block), get_partition(0));
if (!async_insert_info_with_partition.empty())
if (async_insert_info != nullptr)
{
result[0].offsets = std::move(async_insert_info_with_partition[0]->offsets);
result[0].tokens = std::move(async_insert_info_with_partition[0]->tokens);
result[0].offsets = std::move(async_insert_info->offsets);
result[0].tokens = std::move(async_insert_info->tokens);
}
return result;
}
for (size_t i = 0; i < partitions_count; ++i)
result.emplace_back(block.cloneEmpty(), get_partition(i));
for (size_t col = 0; col < block.columns(); ++col)
{
MutableColumns scattered = block.getByPosition(col).column->scatter(partitions_count, selector);
for (size_t i = 0; i < partitions_count; ++i)
result[i].block.getByPosition(col).column = std::move(scattered[i]);
}
for (size_t i = 0; i < async_insert_info_with_partition.size(); ++i)
{
if (async_insert_info_with_partition[i] == nullptr)
{
LOG_ERROR(
getLogger("MergeTreeDataWriter"),
"The {}th element in async_insert_info_with_partition is nullptr. There are totally {} partitions in the insert. Selector content is ({}). Offsets content is ({})",
i, partitions_count, fmt::join(selector.begin(), selector.end(), ","), fmt::join(async_insert_info->offsets.begin(), async_insert_info->offsets.end(), ","));
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected error for async deduplicated insert, please check error logs");
}
result[i].offsets = std::move(async_insert_info_with_partition[i]->offsets);
result[i].tokens = std::move(async_insert_info_with_partition[i]->tokens);
}
return result;