diff --git a/src/Storages/MergeTree/MergeTreeIndexAggregatorBloomFilter.cpp b/src/Storages/MergeTree/MergeTreeIndexAggregatorBloomFilter.cpp index ef98accfbc6..c69c54f1c0d 100644 --- a/src/Storages/MergeTree/MergeTreeIndexAggregatorBloomFilter.cpp +++ b/src/Storages/MergeTree/MergeTreeIndexAggregatorBloomFilter.cpp @@ -19,7 +19,7 @@ namespace ErrorCodes MergeTreeIndexAggregatorBloomFilter::MergeTreeIndexAggregatorBloomFilter( size_t bits_per_row_, size_t hash_functions_, const Names & columns_name_) - : bits_per_row(bits_per_row_), hash_functions(hash_functions_), index_columns_name(columns_name_) + : bits_per_row(bits_per_row_), hash_functions(hash_functions_), index_columns_name(columns_name_), column_hashes(columns_name_.size()) { assert(bits_per_row != 0); assert(hash_functions != 0); @@ -32,9 +32,9 @@ bool MergeTreeIndexAggregatorBloomFilter::empty() const MergeTreeIndexGranulePtr MergeTreeIndexAggregatorBloomFilter::getGranuleAndReset() { - const auto granule = std::make_shared(bits_per_row, hash_functions, total_rows, granule_index_blocks); + const auto granule = std::make_shared(bits_per_row, hash_functions, column_hashes); total_rows = 0; - granule_index_blocks.clear(); + column_hashes.clear(); return granule; } @@ -47,17 +47,19 @@ void MergeTreeIndexAggregatorBloomFilter::update(const Block & block, size_t * p Block granule_index_block; size_t max_read_rows = std::min(block.rows() - *pos, limit); - for (const auto & index_column_name : index_columns_name) + for (size_t column = 0; column < index_columns_name.size(); ++column) { - const auto & column_and_type = block.getByName(index_column_name); + const auto & column_and_type = block.getByName(index_columns_name[column]); auto index_column = BloomFilterHash::hashWithColumn(column_and_type.type, column_and_type.column, *pos, max_read_rows); - granule_index_block.insert({index_column, std::make_shared(), column_and_type.name}); + const auto & index_col = checkAndGetColumn(index_column.get()); + const auto & index_data = index_col->getData(); + for (const auto & hash: index_data) + column_hashes[column].insert(hash); } *pos += max_read_rows; total_rows += max_read_rows; - granule_index_blocks.push_back(granule_index_block); } } diff --git a/src/Storages/MergeTree/MergeTreeIndexAggregatorBloomFilter.h b/src/Storages/MergeTree/MergeTreeIndexAggregatorBloomFilter.h index 9877db8ee30..d20653b7689 100644 --- a/src/Storages/MergeTree/MergeTreeIndexAggregatorBloomFilter.h +++ b/src/Storages/MergeTree/MergeTreeIndexAggregatorBloomFilter.h @@ -2,6 +2,7 @@ #include #include +#include namespace DB { @@ -22,8 +23,8 @@ private: size_t hash_functions; const Names index_columns_name; + std::vector> column_hashes; size_t total_rows = 0; - Blocks granule_index_blocks; }; } diff --git a/src/Storages/MergeTree/MergeTreeIndexGranuleBloomFilter.cpp b/src/Storages/MergeTree/MergeTreeIndexGranuleBloomFilter.cpp index 267708b5312..7db3aa3a6b1 100644 --- a/src/Storages/MergeTree/MergeTreeIndexGranuleBloomFilter.cpp +++ b/src/Storages/MergeTree/MergeTreeIndexGranuleBloomFilter.cpp @@ -8,7 +8,6 @@ #include #include - namespace DB { namespace ErrorCodes @@ -16,21 +15,6 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -static void assertGranuleBlocksStructure(const Blocks & granule_index_blocks) -{ - Block prev_block; - for (size_t index = 0; index < granule_index_blocks.size(); ++index) - { - const Block & granule_index_block = granule_index_blocks[index]; - - if (index != 0) - assertBlocksHaveEqualStructure(prev_block, granule_index_block, "Granule blocks of bloom filter has difference structure."); - - prev_block = granule_index_block; - } -} - - MergeTreeIndexGranuleBloomFilter::MergeTreeIndexGranuleBloomFilter(size_t bits_per_row_, size_t hash_functions_, size_t index_columns_) : bits_per_row(bits_per_row_), hash_functions(hash_functions_) { @@ -39,42 +23,28 @@ MergeTreeIndexGranuleBloomFilter::MergeTreeIndexGranuleBloomFilter(size_t bits_p } MergeTreeIndexGranuleBloomFilter::MergeTreeIndexGranuleBloomFilter( - size_t bits_per_row_, size_t hash_functions_, size_t total_rows_, const Blocks & granule_index_blocks_) - : total_rows(total_rows_), bits_per_row(bits_per_row_), hash_functions(hash_functions_) + size_t bits_per_row_, size_t hash_functions_, const std::vector>& column_hashes_) + : bits_per_row(bits_per_row_), hash_functions(hash_functions_), bloom_filters(column_hashes_.size()) { - if (granule_index_blocks_.empty() || !total_rows) - throw Exception(ErrorCodes::LOGICAL_ERROR, "LOGICAL ERROR: granule_index_blocks empty or total_rows is zero."); + if (column_hashes_.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Granule_index_blocks empty or total_rows is zero."); - assertGranuleBlocksStructure(granule_index_blocks_); + size_t bloom_filter_max_size = 0; + for (const auto & column_hash : column_hashes_) + bloom_filter_max_size = std::max(bloom_filter_max_size, column_hash.size()); - for (size_t index = 0; index < granule_index_blocks_.size(); ++index) + static size_t atom_size = 8; + + // If multiple columns are given, we will initialize all the bloom filters + // with the size of the highest-cardinality one. This is done for compatibility with + // existing binary serialization format + total_rows = bloom_filter_max_size; + size_t bytes_size = (bits_per_row * total_rows + atom_size - 1) / atom_size; + + for (size_t column = 0, columns = column_hashes_.size(); column < columns; ++column) { - Block granule_index_block = granule_index_blocks_[index]; - - if (unlikely(!granule_index_block || !granule_index_block.rows())) - throw Exception(ErrorCodes::LOGICAL_ERROR, "LOGICAL ERROR: granule_index_block is empty."); - - if (index == 0) - { - static size_t atom_size = 8; - - for (size_t column = 0, columns = granule_index_block.columns(); column < columns; ++column) - { - size_t total_items = total_rows; - - if (const auto * array_col = typeid_cast(granule_index_block.getByPosition(column).column.get())) - { - const IColumn * nested_col = array_col->getDataPtr().get(); - total_items = nested_col->size(); - } - - size_t bytes_size = (bits_per_row * total_items + atom_size - 1) / atom_size; - bloom_filters.emplace_back(std::make_shared(bytes_size, hash_functions, 0)); - } - } - - for (size_t column = 0, columns = granule_index_block.columns(); column < columns; ++column) - fillingBloomFilter(bloom_filters[column], granule_index_block, column); + bloom_filters[column] = std::make_shared(bytes_size, hash_functions, 0); + fillingBloomFilter(bloom_filters[column], column_hashes_[column]); } } @@ -123,18 +93,11 @@ void MergeTreeIndexGranuleBloomFilter::serializeBinary(WriteBuffer & ostr) const } } -void MergeTreeIndexGranuleBloomFilter::fillingBloomFilter(BloomFilterPtr & bf, const Block & granule_index_block, size_t index_hash_column) const +void MergeTreeIndexGranuleBloomFilter::fillingBloomFilter(BloomFilterPtr & bf, const HashSet &hashes) const { - const auto & column = granule_index_block.getByPosition(index_hash_column); - - if (const auto * hash_column = typeid_cast(column.column.get())) - { - const auto & hash_column_vec = hash_column->getData(); - - for (const auto & bf_base_hash : hash_column_vec) - for (size_t i = 0; i < hash_functions; ++i) - bf->addHashWithSeed(bf_base_hash, BloomFilterHash::bf_hash_seed[i]); - } + for (const auto & bf_base_hash : hashes) + for (size_t i = 0; i < hash_functions; ++i) + bf->addHashWithSeed(bf_base_hash.getKey(), BloomFilterHash::bf_hash_seed[i]); } } diff --git a/src/Storages/MergeTree/MergeTreeIndexGranuleBloomFilter.h b/src/Storages/MergeTree/MergeTreeIndexGranuleBloomFilter.h index 82bd91138a7..35335f5d0d2 100644 --- a/src/Storages/MergeTree/MergeTreeIndexGranuleBloomFilter.h +++ b/src/Storages/MergeTree/MergeTreeIndexGranuleBloomFilter.h @@ -2,6 +2,7 @@ #include #include +#include namespace DB { @@ -11,7 +12,7 @@ class MergeTreeIndexGranuleBloomFilter final : public IMergeTreeIndexGranule public: MergeTreeIndexGranuleBloomFilter(size_t bits_per_row_, size_t hash_functions_, size_t index_columns_); - MergeTreeIndexGranuleBloomFilter(size_t bits_per_row_, size_t hash_functions_, size_t total_rows_, const Blocks & granule_index_blocks_); + MergeTreeIndexGranuleBloomFilter(size_t bits_per_row_, size_t hash_functions_, const std::vector> & column_hashes); bool empty() const override; @@ -21,12 +22,12 @@ public: const std::vector & getFilters() const { return bloom_filters; } private: - size_t total_rows; + size_t total_rows = 0; size_t bits_per_row; size_t hash_functions; std::vector bloom_filters; - void fillingBloomFilter(BloomFilterPtr & bf, const Block & granule_index_block, size_t index_hash_column) const; + void fillingBloomFilter(BloomFilterPtr & bf, const HashSet & hashes) const; }; diff --git a/tests/queries/0_stateless/02231_bloom_filter_sizing.reference b/tests/queries/0_stateless/02231_bloom_filter_sizing.reference new file mode 100644 index 00000000000..bdba311c092 --- /dev/null +++ b/tests/queries/0_stateless/02231_bloom_filter_sizing.reference @@ -0,0 +1,6 @@ +Bloom filter on sort key +10000 +0 +Bloom filter on non-sort key +10000 +0 diff --git a/tests/queries/0_stateless/02231_bloom_filter_sizing.sql b/tests/queries/0_stateless/02231_bloom_filter_sizing.sql new file mode 100644 index 00000000000..233e3111067 --- /dev/null +++ b/tests/queries/0_stateless/02231_bloom_filter_sizing.sql @@ -0,0 +1,53 @@ +SELECT 'Bloom filter on sort key'; +DROP TABLE IF EXISTS bloom_filter_sizing_pk; +CREATE TABLE bloom_filter_sizing_pk( + key UInt64, + value UInt64, + + -- Very high granularity to have one filter per part. + INDEX key_bf key TYPE bloom_filter(0.01) GRANULARITY 2147483648 +) ENGINE=MergeTree ORDER BY key; + +INSERT INTO bloom_filter_sizing_pk +SELECT +number % 100 as key, -- 100 unique keys +number as value -- whatever +FROM numbers(1000 * 1000); + +-- +-- Merge everything into a single part +-- +OPTIMIZE TABLE bloom_filter_sizing_pk FINAL; + +SELECT COUNT() from bloom_filter_sizing_pk WHERE key = 1; + +-- Check bloom filter size. According to https://hur.st/bloomfilter/?n=100&p=0.01 for 100 keys it should be less that 200B +SELECT COUNT() from system.parts where database = currentDatabase() AND table = 'bloom_filter_sizing_pk' and secondary_indices_uncompressed_bytes > 200 and active; + +SELECT 'Bloom filter on non-sort key'; +DROP TABLE IF EXISTS bloom_filter_sizing_sec; +CREATE TABLE bloom_filter_sizing_sec( + key1 UInt64, + key2 UInt64, + value UInt64, + + -- Very high granularity to have one filter per part. + INDEX key_bf key2 TYPE bloom_filter(0.01) GRANULARITY 2147483648 +) ENGINE=MergeTree ORDER BY key1; + +INSERT INTO bloom_filter_sizing_sec +SELECT +number % 100 as key1, -- 100 unique keys +rand() % 100 as key2, -- 100 unique keys +number as value -- whatever +FROM numbers(1000 * 1000); + +-- +-- Merge everything into a single part +-- +OPTIMIZE TABLE bloom_filter_sizing_sec FINAL; + +SELECT COUNT() from bloom_filter_sizing_sec WHERE key1 = 1; + +-- Check bloom filter size. According to https://hur.st/bloomfilter/?n=100&p=0.01 for 100 keys it should be less that 200B +SELECT COUNT() from system.parts where database = currentDatabase() AND table = 'bloom_filter_sizing_sec' and secondary_indices_uncompressed_bytes > 200 and active;