Merge pull request #35102 from tonickkozlov/bloom-filter-index/deduplicate

Prune bloom filter indices based on data cardinality
This commit is contained in:
pufit 2023-08-31 10:04:19 -04:00 committed by GitHub
commit 1bc6b40a12
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 96 additions and 70 deletions

View File

@ -19,7 +19,7 @@ namespace ErrorCodes
MergeTreeIndexAggregatorBloomFilter::MergeTreeIndexAggregatorBloomFilter(
size_t bits_per_row_, size_t hash_functions_, const Names & columns_name_)
: bits_per_row(bits_per_row_), hash_functions(hash_functions_), index_columns_name(columns_name_)
: bits_per_row(bits_per_row_), hash_functions(hash_functions_), index_columns_name(columns_name_), column_hashes(columns_name_.size())
{
assert(bits_per_row != 0);
assert(hash_functions != 0);
@ -32,9 +32,9 @@ bool MergeTreeIndexAggregatorBloomFilter::empty() const
MergeTreeIndexGranulePtr MergeTreeIndexAggregatorBloomFilter::getGranuleAndReset()
{
const auto granule = std::make_shared<MergeTreeIndexGranuleBloomFilter>(bits_per_row, hash_functions, total_rows, granule_index_blocks);
const auto granule = std::make_shared<MergeTreeIndexGranuleBloomFilter>(bits_per_row, hash_functions, column_hashes);
total_rows = 0;
granule_index_blocks.clear();
column_hashes.clear();
return granule;
}
@ -47,17 +47,19 @@ void MergeTreeIndexAggregatorBloomFilter::update(const Block & block, size_t * p
Block granule_index_block;
size_t max_read_rows = std::min(block.rows() - *pos, limit);
for (const auto & index_column_name : index_columns_name)
for (size_t column = 0; column < index_columns_name.size(); ++column)
{
const auto & column_and_type = block.getByName(index_column_name);
const auto & column_and_type = block.getByName(index_columns_name[column]);
auto index_column = BloomFilterHash::hashWithColumn(column_and_type.type, column_and_type.column, *pos, max_read_rows);
granule_index_block.insert({index_column, std::make_shared<DataTypeUInt64>(), column_and_type.name});
const auto & index_col = checkAndGetColumn<ColumnUInt64>(index_column.get());
const auto & index_data = index_col->getData();
for (const auto & hash: index_data)
column_hashes[column].insert(hash);
}
*pos += max_read_rows;
total_rows += max_read_rows;
granule_index_blocks.push_back(granule_index_block);
}
}

View File

@ -2,6 +2,7 @@
#include <Storages/MergeTree/MergeTreeIndices.h>
#include <Storages/MergeTree/MergeTreeIndexGranuleBloomFilter.h>
#include <Common/HashTable/HashSet.h>
namespace DB
{
@ -22,8 +23,8 @@ private:
size_t hash_functions;
const Names index_columns_name;
std::vector<HashSet<UInt64>> column_hashes;
size_t total_rows = 0;
Blocks granule_index_blocks;
};
}

View File

@ -8,7 +8,6 @@
#include <Interpreters/BloomFilterHash.h>
#include <IO/WriteHelpers.h>
namespace DB
{
namespace ErrorCodes
@ -16,21 +15,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
static void assertGranuleBlocksStructure(const Blocks & granule_index_blocks)
{
Block prev_block;
for (size_t index = 0; index < granule_index_blocks.size(); ++index)
{
const Block & granule_index_block = granule_index_blocks[index];
if (index != 0)
assertBlocksHaveEqualStructure(prev_block, granule_index_block, "Granule blocks of bloom filter has difference structure.");
prev_block = granule_index_block;
}
}
MergeTreeIndexGranuleBloomFilter::MergeTreeIndexGranuleBloomFilter(size_t bits_per_row_, size_t hash_functions_, size_t index_columns_)
: bits_per_row(bits_per_row_), hash_functions(hash_functions_)
{
@ -39,42 +23,28 @@ MergeTreeIndexGranuleBloomFilter::MergeTreeIndexGranuleBloomFilter(size_t bits_p
}
MergeTreeIndexGranuleBloomFilter::MergeTreeIndexGranuleBloomFilter(
size_t bits_per_row_, size_t hash_functions_, size_t total_rows_, const Blocks & granule_index_blocks_)
: total_rows(total_rows_), bits_per_row(bits_per_row_), hash_functions(hash_functions_)
size_t bits_per_row_, size_t hash_functions_, const std::vector<HashSet<UInt64>>& column_hashes_)
: bits_per_row(bits_per_row_), hash_functions(hash_functions_), bloom_filters(column_hashes_.size())
{
if (granule_index_blocks_.empty() || !total_rows)
throw Exception(ErrorCodes::LOGICAL_ERROR, "LOGICAL ERROR: granule_index_blocks empty or total_rows is zero.");
if (column_hashes_.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Granule_index_blocks empty or total_rows is zero.");
assertGranuleBlocksStructure(granule_index_blocks_);
size_t bloom_filter_max_size = 0;
for (const auto & column_hash : column_hashes_)
bloom_filter_max_size = std::max(bloom_filter_max_size, column_hash.size());
for (size_t index = 0; index < granule_index_blocks_.size(); ++index)
static size_t atom_size = 8;
// If multiple columns are given, we will initialize all the bloom filters
// with the size of the highest-cardinality one. This is done for compatibility with
// existing binary serialization format
total_rows = bloom_filter_max_size;
size_t bytes_size = (bits_per_row * total_rows + atom_size - 1) / atom_size;
for (size_t column = 0, columns = column_hashes_.size(); column < columns; ++column)
{
Block granule_index_block = granule_index_blocks_[index];
if (unlikely(!granule_index_block || !granule_index_block.rows()))
throw Exception(ErrorCodes::LOGICAL_ERROR, "LOGICAL ERROR: granule_index_block is empty.");
if (index == 0)
{
static size_t atom_size = 8;
for (size_t column = 0, columns = granule_index_block.columns(); column < columns; ++column)
{
size_t total_items = total_rows;
if (const auto * array_col = typeid_cast<const ColumnArray *>(granule_index_block.getByPosition(column).column.get()))
{
const IColumn * nested_col = array_col->getDataPtr().get();
total_items = nested_col->size();
}
size_t bytes_size = (bits_per_row * total_items + atom_size - 1) / atom_size;
bloom_filters.emplace_back(std::make_shared<BloomFilter>(bytes_size, hash_functions, 0));
}
}
for (size_t column = 0, columns = granule_index_block.columns(); column < columns; ++column)
fillingBloomFilter(bloom_filters[column], granule_index_block, column);
bloom_filters[column] = std::make_shared<BloomFilter>(bytes_size, hash_functions, 0);
fillingBloomFilter(bloom_filters[column], column_hashes_[column]);
}
}
@ -123,18 +93,11 @@ void MergeTreeIndexGranuleBloomFilter::serializeBinary(WriteBuffer & ostr) const
}
}
void MergeTreeIndexGranuleBloomFilter::fillingBloomFilter(BloomFilterPtr & bf, const Block & granule_index_block, size_t index_hash_column) const
void MergeTreeIndexGranuleBloomFilter::fillingBloomFilter(BloomFilterPtr & bf, const HashSet<UInt64> &hashes) const
{
const auto & column = granule_index_block.getByPosition(index_hash_column);
if (const auto * hash_column = typeid_cast<const ColumnUInt64 *>(column.column.get()))
{
const auto & hash_column_vec = hash_column->getData();
for (const auto & bf_base_hash : hash_column_vec)
for (size_t i = 0; i < hash_functions; ++i)
bf->addHashWithSeed(bf_base_hash, BloomFilterHash::bf_hash_seed[i]);
}
for (const auto & bf_base_hash : hashes)
for (size_t i = 0; i < hash_functions; ++i)
bf->addHashWithSeed(bf_base_hash.getKey(), BloomFilterHash::bf_hash_seed[i]);
}
}

View File

@ -2,6 +2,7 @@
#include <Interpreters/BloomFilter.h>
#include <Storages/MergeTree/MergeTreeIndices.h>
#include <Common/HashTable/HashSet.h>
namespace DB
{
@ -11,7 +12,7 @@ class MergeTreeIndexGranuleBloomFilter final : public IMergeTreeIndexGranule
public:
MergeTreeIndexGranuleBloomFilter(size_t bits_per_row_, size_t hash_functions_, size_t index_columns_);
MergeTreeIndexGranuleBloomFilter(size_t bits_per_row_, size_t hash_functions_, size_t total_rows_, const Blocks & granule_index_blocks_);
MergeTreeIndexGranuleBloomFilter(size_t bits_per_row_, size_t hash_functions_, const std::vector<HashSet<UInt64>> & column_hashes);
bool empty() const override;
@ -21,12 +22,12 @@ public:
const std::vector<BloomFilterPtr> & getFilters() const { return bloom_filters; }
private:
size_t total_rows;
size_t total_rows = 0;
size_t bits_per_row;
size_t hash_functions;
std::vector<BloomFilterPtr> bloom_filters;
void fillingBloomFilter(BloomFilterPtr & bf, const Block & granule_index_block, size_t index_hash_column) const;
void fillingBloomFilter(BloomFilterPtr & bf, const HashSet<UInt64> & hashes) const;
};

View File

@ -0,0 +1,6 @@
Bloom filter on sort key
10000
0
Bloom filter on non-sort key
10000
0

View File

@ -0,0 +1,53 @@
SELECT 'Bloom filter on sort key';
DROP TABLE IF EXISTS bloom_filter_sizing_pk;
CREATE TABLE bloom_filter_sizing_pk(
key UInt64,
value UInt64,
-- Very high granularity to have one filter per part.
INDEX key_bf key TYPE bloom_filter(0.01) GRANULARITY 2147483648
) ENGINE=MergeTree ORDER BY key;
INSERT INTO bloom_filter_sizing_pk
SELECT
number % 100 as key, -- 100 unique keys
number as value -- whatever
FROM numbers(1000 * 1000);
--
-- Merge everything into a single part
--
OPTIMIZE TABLE bloom_filter_sizing_pk FINAL;
SELECT COUNT() from bloom_filter_sizing_pk WHERE key = 1;
-- Check bloom filter size. According to https://hur.st/bloomfilter/?n=100&p=0.01 for 100 keys it should be less that 200B
SELECT COUNT() from system.parts where database = currentDatabase() AND table = 'bloom_filter_sizing_pk' and secondary_indices_uncompressed_bytes > 200 and active;
SELECT 'Bloom filter on non-sort key';
DROP TABLE IF EXISTS bloom_filter_sizing_sec;
CREATE TABLE bloom_filter_sizing_sec(
key1 UInt64,
key2 UInt64,
value UInt64,
-- Very high granularity to have one filter per part.
INDEX key_bf key2 TYPE bloom_filter(0.01) GRANULARITY 2147483648
) ENGINE=MergeTree ORDER BY key1;
INSERT INTO bloom_filter_sizing_sec
SELECT
number % 100 as key1, -- 100 unique keys
rand() % 100 as key2, -- 100 unique keys
number as value -- whatever
FROM numbers(1000 * 1000);
--
-- Merge everything into a single part
--
OPTIMIZE TABLE bloom_filter_sizing_sec FINAL;
SELECT COUNT() from bloom_filter_sizing_sec WHERE key1 = 1;
-- Check bloom filter size. According to https://hur.st/bloomfilter/?n=100&p=0.01 for 100 keys it should be less that 200B
SELECT COUNT() from system.parts where database = currentDatabase() AND table = 'bloom_filter_sizing_sec' and secondary_indices_uncompressed_bytes > 200 and active;