Improve columns squashing for String/Array/Map/Variant/Dynamic types

This commit is contained in:
avogar 2024-07-24 13:06:29 +00:00
parent bab5565b65
commit f37f228af9
19 changed files with 285 additions and 10 deletions

View File

@ -452,6 +452,22 @@ void ColumnArray::reserve(size_t n)
getData().reserve(n); /// The average size of arrays is not taken into account here. Or it is considered to be no more than 1.
}
void ColumnArray::prepareForSquashing(const Columns & source_columns)
{
size_t new_size = size();
Columns source_data_columns;
source_data_columns.reserve(source_columns.size());
for (const auto & source_column : source_columns)
{
const auto & source_array_column = assert_cast<const ColumnArray &>(*source_column);
new_size += source_array_column.size();
source_data_columns.push_back(source_array_column.getDataPtr());
}
getOffsets().reserve_exact(new_size);
data->prepareForSquashing(source_data_columns);
}
void ColumnArray::shrinkToFit()
{
getOffsets().shrink_to_fit();

View File

@ -118,6 +118,7 @@ public:
void updatePermutationWithCollation(const Collator & collator, PermutationSortDirection direction, PermutationSortStability stability,
size_t limit, int nan_direction_hint, Permutation & res, EqualRanges& equal_ranges) const override;
void reserve(size_t n) override;
void prepareForSquashing(const Columns & source_columns) override;
void shrinkToFit() override;
void ensureOwnership() override;
size_t byteSize() const override;

View File

@ -643,6 +643,116 @@ ColumnPtr ColumnDynamic::compress() const
});
}
void ColumnDynamic::prepareForSquashing(const Columns & source_columns)
{
if (source_columns.empty())
return;
/// Internal variants of source dynamic columns may differ.
/// We want to preallocate memory for all variants we will have after squashing.
/// It may happen that the total number of variants in source columns will
/// exceed the limit, in this case we will choose the most frequent variants.
/// First, preallocate memory for variant discriminators and offsets.
size_t new_size = size();
for (const auto & source_column : source_columns)
new_size += source_column->size();
auto & variant_col = getVariantColumn();
variant_col.getLocalDiscriminators().reserve_exact(new_size);
variant_col.getOffsets().reserve_exact(new_size);
/// Second, collect all variants and their total sizes.
std::unordered_map<String, size_t> total_variant_sizes;
DataTypes all_variants;
auto add_variants = [&](const ColumnDynamic & source_dynamic)
{
const auto & source_variant_column = source_dynamic.getVariantColumn();
const auto & source_variant_info = source_dynamic.getVariantInfo();
const auto & source_variants = assert_cast<const DataTypeVariant &>(*source_variant_info.variant_type).getVariants();
for (size_t i = 0; i != source_variants.size(); ++i)
{
const auto & variant_name = source_variant_info.variant_names[i];
auto it = total_variant_sizes.find(variant_name);
/// Add this variant to the list of all variants if we didn't see it yet.
if (it == total_variant_sizes.end())
{
all_variants.push_back(source_variants[i]);
it = total_variant_sizes.emplace(variant_name, 0).first;
}
it->second += source_variant_column.getVariantByGlobalDiscriminator(i).size();
}
};
for (const auto & source_column : source_columns)
add_variants(assert_cast<const ColumnDynamic &>(*source_column));
/// Add variants from this dynamic column.
add_variants(*this);
DataTypePtr result_variant_type;
/// Check if the number of all variants exceeds the limit.
if (all_variants.size() > max_dynamic_types || (all_variants.size() == max_dynamic_types && !total_variant_sizes.contains("String")))
{
/// We want to keep the most frequent variants in the resulting dynamic column.
DataTypes result_variants;
result_variants.reserve(max_dynamic_types);
/// Add variants from current variant column as we will not rewrite it.
for (const auto & variant : assert_cast<const DataTypeVariant &>(*variant_info.variant_type).getVariants())
result_variants.push_back(variant);
/// Add String variant in advance (if we didn't add it yet) as we must have it across variants when we reach the limit.
if (!variant_info.variant_name_to_discriminator.contains("String"))
result_variants.push_back(std::make_shared<DataTypeString>());
/// Create list of remaining variants with their sizes and sort it.
std::vector<std::pair<size_t, DataTypePtr>> variants_with_sizes;
variants_with_sizes.reserve(all_variants.size() - variant_info.variant_names.size());
for (const auto & variant : all_variants)
{
/// Add variant to the list only of we didn't add it yet.
auto variant_name = variant->getName();
if (variant_name != "String" && !variant_info.variant_name_to_discriminator.contains(variant_name))
variants_with_sizes.emplace_back(total_variant_sizes[variant->getName()], variant);
}
std::sort(variants_with_sizes.begin(), variants_with_sizes.end(), std::greater());
/// Add the most frequent variants until we reach max_dynamic_types.
size_t num_new_variants = max_dynamic_types - result_variants.size();
for (size_t i = 0; i != num_new_variants; ++i)
result_variants.push_back(variants_with_sizes[i].second);
result_variant_type = std::make_shared<DataTypeVariant>(result_variants);
}
else
{
result_variant_type = std::make_shared<DataTypeVariant>(all_variants);
}
if (!result_variant_type->equals(*variant_info.variant_type))
updateVariantInfoAndExpandVariantColumn(result_variant_type);
/// Now current dynamic column has all resulting variants and we can call
/// prepareForSquashing on them to preallocate the memory.
for (size_t i = 0; i != variant_info.variant_names.size(); ++i)
{
Columns source_variant_columns;
source_variant_columns.reserve(source_columns.size());
for (const auto & source_column : source_columns)
{
const auto & source_dynamic_column = assert_cast<const ColumnDynamic &>(*source_column);
const auto & source_variant_info = source_dynamic_column.getVariantInfo();
/// Try to find this variant in the current source column.
auto it = source_variant_info.variant_name_to_discriminator.find(variant_info.variant_names[i]);
if (it != source_variant_info.variant_name_to_discriminator.end())
source_variant_columns.push_back(source_dynamic_column.getVariantColumn().getVariantPtrByGlobalDiscriminator(it->second));
}
variant_col.getVariantByGlobalDiscriminator(i).prepareForSquashing(source_variant_columns);
}
}
void ColumnDynamic::takeDynamicStructureFromSourceColumns(const Columns & source_columns)
{
if (!empty())

View File

@ -254,6 +254,8 @@ public:
variant_column->reserve(n);
}
void prepareForSquashing(const Columns & source_columns) override;
void ensureOwnership() override
{
variant_column->ensureOwnership();

View File

@ -249,6 +249,15 @@ void ColumnMap::reserve(size_t n)
nested->reserve(n);
}
void ColumnMap::prepareForSquashing(const Columns & source_columns)
{
Columns nested_source_columns;
nested_source_columns.reserve(source_columns.size());
for (const auto & source_column : source_columns)
nested_source_columns.push_back(assert_cast<const ColumnMap &>(*source_column).getNestedColumnPtr());
nested->prepareForSquashing(nested_source_columns);
}
void ColumnMap::shrinkToFit()
{
nested->shrinkToFit();

View File

@ -94,6 +94,7 @@ public:
void updatePermutation(IColumn::PermutationSortDirection direction, IColumn::PermutationSortStability stability,
size_t limit, int nan_direction_hint, IColumn::Permutation & res, EqualRanges & equal_ranges) const override;
void reserve(size_t n) override;
void prepareForSquashing(const Columns & source_columns) override;
void shrinkToFit() override;
void ensureOwnership() override;
size_t byteSize() const override;

View File

@ -706,6 +706,22 @@ void ColumnNullable::reserve(size_t n)
getNullMapData().reserve(n);
}
void ColumnNullable::prepareForSquashing(const Columns & source_columns)
{
size_t new_size = size();
Columns nested_source_columns;
nested_source_columns.reserve(source_columns.size());
for (const auto & source_column : source_columns)
{
const auto & source_nullable_column = assert_cast<const ColumnNullable &>(*source_column);
new_size += source_nullable_column.size();
nested_source_columns.push_back(source_nullable_column.getNestedColumnPtr());
}
nested_column->prepareForSquashing(nested_source_columns);
getNullMapData().reserve(new_size);
}
void ColumnNullable::shrinkToFit()
{
getNestedColumn().shrinkToFit();

View File

@ -125,6 +125,7 @@ public:
size_t limit, int null_direction_hint, Permutation & res, EqualRanges& equal_ranges) const override;
size_t estimateCardinalityInPermutedRange(const Permutation & permutation, const EqualRange & equal_range) const override;
void reserve(size_t n) override;
void prepareForSquashing(const Columns & source_columns) override;
void shrinkToFit() override;
void ensureOwnership() override;
size_t byteSize() const override;

View File

@ -557,6 +557,21 @@ void ColumnString::reserve(size_t n)
offsets.reserve_exact(n);
}
void ColumnString::prepareForSquashing(const Columns & source_columns)
{
size_t new_size = size();
size_t new_chars_size = chars.size();
for (const auto & source_column : source_columns)
{
const auto & source_string_column = assert_cast<const ColumnString &>(*source_column);
new_size += source_string_column.size();
new_chars_size += source_string_column.chars.size();
}
offsets.reserve_exact(new_size);
chars.reserve_exact(new_chars_size);
}
void ColumnString::shrinkToFit()
{
chars.shrink_to_fit();

View File

@ -283,6 +283,7 @@ public:
ColumnPtr compress() const override;
void reserve(size_t n) override;
void prepareForSquashing(const Columns & source_columns) override;
void shrinkToFit() override;
void getExtremes(Field & min, Field & max) const override;

View File

@ -595,6 +595,19 @@ void ColumnTuple::reserve(size_t n)
getColumn(i).reserve(n);
}
void ColumnTuple::prepareForSquashing(const Columns & source_columns)
{
const size_t tuple_size = columns.size();
for (size_t i = 0; i < tuple_size; ++i)
{
Columns nested_columns;
nested_columns.reserve(source_columns.size());
for (const auto & source_column : source_columns)
nested_columns.push_back(assert_cast<const ColumnTuple &>(*source_column).getColumnPtr(i));
getColumn(i).prepareForSquashing(nested_columns);
}
}
void ColumnTuple::shrinkToFit()
{
const size_t tuple_size = columns.size();

View File

@ -110,6 +110,7 @@ public:
void updatePermutationWithCollation(const Collator & collator, IColumn::PermutationSortDirection direction, IColumn::PermutationSortStability stability,
size_t limit, int nan_direction_hint, IColumn::Permutation & res, EqualRanges& equal_ranges) const override;
void reserve(size_t n) override;
void prepareForSquashing(const Columns & source_columns) override;
void shrinkToFit() override;
void ensureOwnership() override;
size_t byteSize() const override;

View File

@ -1247,8 +1247,25 @@ void ColumnVariant::updatePermutation(IColumn::PermutationSortDirection directio
void ColumnVariant::reserve(size_t n)
{
local_discriminators->reserve(n);
offsets->reserve(n);
getLocalDiscriminators().reserve_exact(n);
getOffsets().reserve_exact(n);
}
void ColumnVariant::prepareForSquashing(const Columns & source_columns)
{
size_t new_size = size();
for (const auto & source_column : source_columns)
new_size += source_column->size();
reserve(new_size);
for (size_t i = 0; i != variants.size(); ++i)
{
Columns source_variant_columns;
source_variant_columns.reserve(source_columns.size());
for (const auto & source_column : source_columns)
source_variant_columns.push_back(assert_cast<const ColumnVariant &>(*source_column).getVariantPtrByGlobalDiscriminator(i));
getVariantByGlobalDiscriminator(i).prepareForSquashing(source_variant_columns);
}
}
void ColumnVariant::ensureOwnership()

View File

@ -237,6 +237,7 @@ public:
size_t limit, int nan_direction_hint, IColumn::Permutation & res, EqualRanges & equal_ranges) const override;
void reserve(size_t n) override;
void prepareForSquashing(const Columns & source_columns) override;
void ensureOwnership() override;
size_t byteSize() const override;
size_t byteSizeAt(size_t n) const override;

View File

@ -475,6 +475,15 @@ public:
/// It affects performance only (not correctness).
virtual void reserve(size_t /*n*/) {}
/// Reserve memory before squashing all specified source columns into this column.
virtual void prepareForSquashing(const std::vector<Ptr> & source_columns)
{
size_t new_size = size();
for (const auto & source_column : source_columns)
new_size += source_column->size();
reserve(new_size);
}
/// Requests the removal of unused capacity.
/// It is a non-binding request to reduce the capacity of the underlying container to its size.
virtual void shrinkToFit() {}

View File

@ -5,7 +5,6 @@
#include <Common/CurrentThread.h>
#include <base/defines.h>
namespace DB
{
@ -114,20 +113,32 @@ Chunk Squashing::squash(std::vector<Chunk> && input_chunks, Chunk::ChunkInfoColl
{
auto & first_chunk = input_chunks[0];
Columns columns = first_chunk.detachColumns();
mutable_columns.reserve(columns.size());
for (auto & column : columns)
{
mutable_columns.push_back(IColumn::mutate(std::move(column)));
mutable_columns.back()->reserve(rows);
}
}
size_t num_columns = mutable_columns.size();
/// Collect the list of source columns for each column.
std::vector<Columns> source_columns_list(num_columns, Columns{});
for (size_t i = 0; i != num_columns; ++i)
source_columns_list[i].reserve(input_chunks.size() - 1);
for (size_t i = 1; i < input_chunks.size(); ++i) // We've already processed the first chunk above
{
Columns columns = input_chunks[i].detachColumns();
for (size_t j = 0, size = mutable_columns.size(); j < size; ++j)
auto columns = input_chunks[i].detachColumns();
for (size_t j = 0; j != num_columns; ++j)
source_columns_list[j].emplace_back(std::move(columns[j]));
}
for (size_t i = 0; i != num_columns; ++i)
{
/// We know all the data we will insert in advance and can make all necessary pre-allocations.
mutable_columns[i]->prepareForSquashing(source_columns_list[i]);
for (auto & source_column : source_columns_list[i])
{
const auto source_column = columns[j];
mutable_columns[j]->insertRangeFrom(*source_column, 0, source_column->size());
auto column = std::move(source_column);
mutable_columns[i]->insertRangeFrom(*column, 0, column->size());
}
}

View File

@ -0,0 +1,23 @@
<test>
<settings>
<max_block_size>1000</max_block_size>
</settings>
<create_query>
CREATE TABLE squash_performance
(
s1 String,
s2 Nullable(String),
a1 Array(Array(String)),
a2 Array(Array(UInt32)),
m1 Map(String, Array(String)),
m2 Map(String, Array(UInt64)),
t Tuple(String, Array(String), Map(String, String))
)
ENGINE = Null;
</create_query>
<query>INSERT INTO squash_performance SELECT * FROM generateRandom(42) LIMIT 500000</query>
<drop_query>DROP TABLE IF EXISTS squash_performance</drop_query>
</test>

View File

@ -0,0 +1,8 @@
Array(UInt8)
None
UInt64
None
String
UInt64
String
UInt64

View File

@ -0,0 +1,20 @@
set allow_experimental_dynamic_type = 1;
set max_block_size = 1000;
drop table if exists test;
create table test (d Dynamic) engine=MergeTree order by tuple();
insert into test select multiIf(number < 1000, NULL::Dynamic(max_types=2), number < 3000, range(number % 5)::Dynamic(max_types=2), number::Dynamic(max_types=2)) from numbers(1000000);
select distinct dynamicType(d) as type from test order by type;
drop table test;
create table test (d Dynamic(max_types=2)) engine=MergeTree order by tuple();
insert into test select multiIf(number < 1000, NULL::Dynamic(max_types=2), number < 3000, range(number % 5)::Dynamic(max_types=2), number::Dynamic(max_types=2)) from numbers(1000000);
select distinct dynamicType(d) as type from test order by type;
truncate table test;
insert into test select multiIf(number < 1000, 'Str'::Dynamic(max_types=2), number < 3000, range(number % 5)::Dynamic(max_types=2), number::Dynamic(max_types=2)) from numbers(1000000);
select distinct dynamicType(d) as type from test order by type;
drop table test;