support bloom filter for any type

This commit is contained in:
zhang2014 2019-05-10 11:42:28 +08:00
parent ac70bd104d
commit f52b16e1e1
15 changed files with 1050 additions and 95 deletions

View File

@ -1,6 +1,6 @@
#include <Interpreters/BloomFilter.h>
#include <city.h>
#include "BloomFilter.h"
namespace DB
@ -9,14 +9,13 @@ namespace DB
static constexpr UInt64 SEED_GEN_A = 845897321;
static constexpr UInt64 SEED_GEN_B = 217728422;
StringBloomFilter::StringBloomFilter(size_t size_, size_t hashes_, size_t seed_)
BloomFilter::BloomFilter(size_t size_, size_t hashes_, size_t seed_)
: size(size_), hashes(hashes_), seed(seed_), words((size + sizeof(UnderType) - 1) / sizeof(UnderType)), filter(words, 0) {}
StringBloomFilter::StringBloomFilter(const StringBloomFilter & bloom_filter)
BloomFilter::BloomFilter(const BloomFilter & bloom_filter)
: size(bloom_filter.size), hashes(bloom_filter.hashes), seed(bloom_filter.seed), words(bloom_filter.words), filter(bloom_filter.filter) {}
bool StringBloomFilter::find(const char * data, size_t len)
bool BloomFilter::find(const char * data, size_t len)
{
size_t hash1 = CityHash_v1_0_2::CityHash64WithSeed(data, len, seed);
size_t hash2 = CityHash_v1_0_2::CityHash64WithSeed(data, len, SEED_GEN_A * seed + SEED_GEN_B);
@ -30,7 +29,7 @@ bool StringBloomFilter::find(const char * data, size_t len)
return true;
}
void StringBloomFilter::add(const char * data, size_t len)
void BloomFilter::add(const char * data, size_t len)
{
size_t hash1 = CityHash_v1_0_2::CityHash64WithSeed(data, len, seed);
size_t hash2 = CityHash_v1_0_2::CityHash64WithSeed(data, len, SEED_GEN_A * seed + SEED_GEN_B);
@ -42,12 +41,12 @@ void StringBloomFilter::add(const char * data, size_t len)
}
}
void StringBloomFilter::clear()
void BloomFilter::clear()
{
filter.assign(words, 0);
}
bool StringBloomFilter::contains(const StringBloomFilter & bf)
bool BloomFilter::contains(const BloomFilter & bf)
{
for (size_t i = 0; i < words; ++i)
{
@ -57,7 +56,7 @@ bool StringBloomFilter::contains(const StringBloomFilter & bf)
return true;
}
UInt64 StringBloomFilter::isEmpty() const
UInt64 BloomFilter::isEmpty() const
{
for (size_t i = 0; i < words; ++i)
if (filter[i] != 0)
@ -65,7 +64,7 @@ UInt64 StringBloomFilter::isEmpty() const
return true;
}
bool operator== (const StringBloomFilter & a, const StringBloomFilter & b)
bool operator== (const BloomFilter & a, const BloomFilter & b)
{
for (size_t i = 0; i < a.words; ++i)
if (a.filter[i] != b.filter[i])
@ -73,4 +72,72 @@ bool operator== (const StringBloomFilter & a, const StringBloomFilter & b)
return true;
}
void BloomFilter::addHashWithSeed(const UInt64 & hash, const UInt64 & seed)
{
size_t pos = CityHash_v1_0_2::Hash128to64(CityHash_v1_0_2::uint128(hash, seed)) % (8 * size);
filter[pos / (8 * sizeof(UnderType))] |= (1ULL << (pos % (8 * sizeof(UnderType))));
}
bool BloomFilter::containsWithSeed(const UInt64 & hash, const UInt64 & seed)
{
size_t pos = CityHash_v1_0_2::Hash128to64(CityHash_v1_0_2::uint128(hash, seed)) % (8 * size);
return bool(filter[pos / (8 * sizeof(UnderType))] & (1ULL << (pos % (8 * sizeof(UnderType)))));
}
static std::pair<sMergeTreeIndexFullText.cppize_t, size_t> calculationBestPracticesImpl(double max_conflict_probability)
{
static const size_t MAX_BITS_PER_ROW = 20;
static const size_t MAX_HASH_FUNCTION_COUNT = 15;
/// For the smallest index per level in probability_lookup_table
static const size_t min_probability_index_each_bits[] = {0, 0, 1, 2, 3, 3, 4, 5, 6, 6, 7, 8, 8, 9, 10, 10, 11, 12, 12, 13, 14};
static const long double probability_lookup_table[MAX_BITS_PER_ROW + 1][MAX_HASH_FUNCTION_COUNT] =
{
{1.0}, /// dummy, 0 bits per row
{1.0, 1.0},
{1.0, 0.393, 0.400},
{1.0, 0.283, 0.237, 0.253},
{1.0, 0.221, 0.155, 0.147, 0.160},
{1.0, 0.181, 0.109, 0.092, 0.092, 0.101}, // 5
{1.0, 0.154, 0.0804, 0.0609, 0.0561, 0.0578, 0.0638},
{1.0, 0.133, 0.0618, 0.0423, 0.0359, 0.0347, 0.0364},
{1.0, 0.118, 0.0489, 0.0306, 0.024, 0.0217, 0.0216, 0.0229},
{1.0, 0.105, 0.0397, 0.0228, 0.0166, 0.0141, 0.0133, 0.0135, 0.0145},
{1.0, 0.0952, 0.0329, 0.0174, 0.0118, 0.00943, 0.00844, 0.00819, 0.00846}, // 10
{1.0, 0.0869, 0.0276, 0.0136, 0.00864, 0.0065, 0.00552, 0.00513, 0.00509},
{1.0, 0.08, 0.0236, 0.0108, 0.00646, 0.00459, 0.00371, 0.00329, 0.00314},
{1.0, 0.074, 0.0203, 0.00875, 0.00492, 0.00332, 0.00255, 0.00217, 0.00199, 0.00194},
{1.0, 0.0689, 0.0177, 0.00718, 0.00381, 0.00244, 0.00179, 0.00146, 0.00129, 0.00121, 0.0012},
{1.0, 0.0645, 0.0156, 0.00596, 0.003, 0.00183, 0.00128, 0.001, 0.000852, 0.000775, 0.000744}, // 15
{1.0, 0.0606, 0.0138, 0.005, 0.00239, 0.00139, 0.000935, 0.000702, 0.000574, 0.000505, 0.00047, 0.000459},
{1.0, 0.0571, 0.0123, 0.00423, 0.00193, 0.00107, 0.000692, 0.000499, 0.000394, 0.000335, 0.000302, 0.000287, 0.000284},
{1.0, 0.054, 0.0111, 0.00362, 0.00158, 0.000839, 0.000519, 0.00036, 0.000275, 0.000226, 0.000198, 0.000183, 0.000176},
{1.0, 0.0513, 0.00998, 0.00312, 0.0013, 0.000663, 0.000394, 0.000264, 0.000194, 0.000155, 0.000132, 0.000118, 0.000111, 0.000109},
{1.0, 0.0488, 0.00906, 0.0027, 0.00108, 0.00053, 0.000303, 0.000196, 0.00014, 0.000108, 8.89e-05, 7.77e-05, 7.12e-05, 6.79e-05, 6.71e-05} // 20
};
for (size_t bits_per_row = 1; bits_per_row < MAX_BITS_PER_ROW; ++bits_per_row)
{
if (probability_lookup_table[bits_per_row][min_probability_index_each_bits[bits_per_row]] <= max_conflict_probability)
{
size_t max_size_of_hash_functions = min_probability_index_each_bits[bits_per_row];
for (size_t size_of_hash_functions = max_size_of_hash_functions; size_of_hash_functions > 0; --size_of_hash_functions)
if (probability_lookup_table[bits_per_row][size_of_hash_functions] > max_conflict_probability)
{
std::cout << "Best bf:" << bits_per_row << ", " << (size_of_hash_functions + 1) << "\n";
return std::pair<size_t, size_t>(bits_per_row, size_of_hash_functions + 1);
}
}
}
return std::pair<size_t, size_t>(MAX_BITS_PER_ROW - 1, min_probability_index_each_bits[MAX_BITS_PER_ROW - 1]);
}
std::pair<size_t, size_t> calculationBestPractices(double max_conflict_probability)
{
return calculationBestPracticesImpl(max_conflict_probability);
}
}

View File

@ -1,15 +1,17 @@
#pragma once
#include <Core/Types.h>
#include <vector>
#include <Core/Types.h>
#include <Common/PODArray.h>
#include <Common/Allocator.h>
#include <Columns/ColumnVector.h>
namespace DB
{
/// Bloom filter for strings.
class StringBloomFilter
class BloomFilter
{
public:
using UnderType = UInt64;
using Container = std::vector<UnderType>;
@ -17,16 +19,19 @@ public:
/// size -- size of filter in bytes.
/// hashes -- number of used hash functions.
/// seed -- random seed for hash functions generation.
StringBloomFilter(size_t size_, size_t hashes_, size_t seed_);
StringBloomFilter(const StringBloomFilter & bloom_filter);
BloomFilter(size_t size_, size_t hashes_, size_t seed_);
BloomFilter(const BloomFilter & bloom_filter);
bool find(const char * data, size_t len);
void add(const char * data, size_t len);
void clear();
void addHashWithSeed(const UInt64 & hash, const UInt64 & seed);
bool containsWithSeed(const UInt64 & hash, const UInt64 & seed);
/// Checks if this contains everything from another bloom filter.
/// Bloom filters must have equal size and seed.
bool contains(const StringBloomFilter & bf);
bool contains(const BloomFilter & bf);
const Container & getFilter() const { return filter; }
Container & getFilter() { return filter; }
@ -34,7 +39,7 @@ public:
/// For debug.
UInt64 isEmpty() const;
friend bool operator== (const StringBloomFilter & a, const StringBloomFilter & b);
friend bool operator== (const BloomFilter & a, const BloomFilter & b);
private:
size_t size;
@ -44,7 +49,10 @@ private:
Container filter;
};
using BloomFilterPtr = std::shared_ptr<BloomFilter>;
bool operator== (const StringBloomFilter & a, const StringBloomFilter & b);
bool operator== (const BloomFilter & a, const BloomFilter & b);
std::pair<size_t, size_t> calculationBestPractices(double max_conflict_probability);
}

View File

@ -0,0 +1,141 @@
#pragma once
#include <Columns/IColumn.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnFixedString.h>
#include <DataTypes/IDataType.h>
#include <DataTypes/DataTypesNumber.h>
#include <ext/bit_cast.h>
#include <Common/HashTable/Hash.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
}
struct BloomFilterHash
{
static constexpr UInt64 bf_hash_seed[15] = {
13635471485423070496ULL, 10336109063487487899ULL, 17779957404565211594ULL, 8988612159822229247ULL, 4954614162757618085ULL,
12980113590177089081ULL, 9263883436177860930ULL, 3656772712723269762ULL, 10362091744962961274ULL, 7582936617938287249ULL,
15033938188484401405ULL, 18286745649494826751ULL, 6852245486148412312ULL, 8886056245089344681ULL, 10151472371158292780ULL
};
static ColumnPtr hashWithField(const IDataType * data_type, const Field & field)
{
WhichDataType which(data_type);
if (which.isUInt())
return ColumnConst::create(ColumnUInt64::create(1, intHash64(field.safeGet<UInt64>())), 1);
else if (which.isInt())
return ColumnConst::create(ColumnUInt64::create(1, intHash64(ext::bit_cast<UInt64>(field.safeGet<Int64>()))), 1);
else if (which.isString() || which.isFixedString())
{
const auto & value = field.safeGet<String>();
return ColumnConst::create(ColumnUInt64::create(1, CityHash_v1_0_2::CityHash64(value.data(), value.size())), 1);
}
else
throw Exception("Unexpected type " + data_type->getName() + " of bloom filter index.", ErrorCodes::LOGICAL_ERROR);
}
static ColumnPtr hashWithColumn(const IDataType * data_type, const IColumn * column, size_t pos, size_t limit)
{
auto index_column = ColumnUInt64::create(limit);
ColumnUInt64::Container & index_column_vec = index_column->getData();
getAnyTypeHash<true>(data_type, column, index_column_vec, pos);
return index_column;
}
template <bool is_first>
static void getAnyTypeHash(const IDataType *data_type, const IColumn *column, ColumnUInt64::Container &vec, size_t pos)
{
WhichDataType which(data_type);
if (which.isUInt8()) getNumberTypeHash<UInt8, is_first>(column, vec, pos);
else if (which.isUInt16()) getNumberTypeHash<UInt16, is_first>(column, vec, pos);
else if (which.isUInt32()) getNumberTypeHash<UInt32, is_first>(column, vec, pos);
else if (which.isUInt64()) getNumberTypeHash<UInt64, is_first>(column, vec, pos);
else if (which.isInt8()) getNumberTypeHash<Int8, is_first>(column, vec, pos);
else if (which.isInt16()) getNumberTypeHash<Int16, is_first>(column, vec, pos);
else if (which.isInt32()) getNumberTypeHash<Int32, is_first>(column, vec, pos);
else if (which.isInt64()) getNumberTypeHash<Int64, is_first>(column, vec, pos);
else if (which.isEnum8()) getNumberTypeHash<Int8, is_first>(column, vec, pos);
else if (which.isEnum16()) getNumberTypeHash<Int16, is_first>(column, vec, pos);
else if (which.isDate()) getNumberTypeHash<UInt16, is_first>(column, vec, pos);
else if (which.isDateTime()) getNumberTypeHash<UInt32, is_first>(column, vec, pos);
else if (which.isFloat32()) getNumberTypeHash<Float32, is_first>(column, vec, pos);
else if (which.isFloat64()) getNumberTypeHash<Float64, is_first>(column, vec, pos);
else if (which.isString()) getStringTypeHash<is_first>(column, vec, pos);
else if (which.isFixedString()) getStringTypeHash<is_first>(column, vec, pos);
else throw Exception("Unexpected type " + data_type->getName() + " of bloom filter index.", ErrorCodes::LOGICAL_ERROR);
}
template <typename Type, bool is_first>
static void getNumberTypeHash(const IColumn * column, ColumnUInt64::Container & vec, size_t pos)
{
const auto * index_column = typeid_cast<const ColumnVector<Type> *>(column);
if (unlikely(!index_column))
throw Exception("Illegal column type was passed to the bloom filter index.", ErrorCodes::ILLEGAL_COLUMN);
const typename ColumnVector<Type>::Container & vec_from = index_column->getData();
for (size_t index = 0, size = vec.size(); index < size; ++index)
{
UInt64 hash = intHash64(ext::bit_cast<UInt64>(vec_from[index + pos]));
if constexpr (is_first)
vec[index] = hash;
else
vec[index] = CityHash_v1_0_2::Hash128to64(CityHash_v1_0_2::uint128(vec[index], hash));
}
}
template <bool is_first>
static void getStringTypeHash(const IColumn * column, ColumnUInt64::Container & vec, size_t pos)
{
if (const auto * index_column = typeid_cast<const ColumnString *>(column))
{
const ColumnString::Chars & data = index_column->getChars();
const ColumnString::Offsets & offsets = index_column->getOffsets();
ColumnString::Offset current_offset = pos;
for (size_t index = 0, size = vec.size(); index < size; ++index)
{
UInt64 city_hash = CityHash_v1_0_2::CityHash64(
reinterpret_cast<const char *>(&data[current_offset]), offsets[index + pos] - current_offset - 1);
if constexpr (is_first)
vec[index] = city_hash;
else
vec[index] = CityHash_v1_0_2::Hash128to64(CityHash_v1_0_2::uint128(vec[index], city_hash));
current_offset = offsets[index + pos];
}
}
else if (const auto * fixed_string_index_column = typeid_cast<const ColumnFixedString *>(column))
{
size_t fixed_len = fixed_string_index_column->getN();
const auto & data = fixed_string_index_column->getChars();
for (size_t index = 0, size = vec.size(); index < size; ++index)
{
UInt64 city_hash = CityHash_v1_0_2::CityHash64(reinterpret_cast<const char *>(&data[(index + pos) * fixed_len]), fixed_len);
if constexpr (is_first)
vec[index] = city_hash;
else
vec[index] = CityHash_v1_0_2::Hash128to64(CityHash_v1_0_2::uint128(vec[index], city_hash));
}
}
else
throw Exception("Illegal column type was passed to the bloom filter index.", ErrorCodes::ILLEGAL_COLUMN);
}
};
}

View File

@ -0,0 +1,62 @@
#include <Storages/MergeTree/MergeTreeIndexAggregatorBloomFilter.h>
#include <ext/bit_cast.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnFixedString.h>
#include <Common/HashTable/Hash.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/BloomFilterHash.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int ILLEGAL_COLUMN;
}
MergeTreeIndexAggregatorBloomFilter::MergeTreeIndexAggregatorBloomFilter(
size_t bits_per_row_, size_t hash_functions_, const Names & columns_name_)
: bits_per_row(bits_per_row_), hash_functions(hash_functions_), index_columns_name(columns_name_)
{
}
bool MergeTreeIndexAggregatorBloomFilter::empty() const
{
return !total_rows;
}
MergeTreeIndexGranulePtr MergeTreeIndexAggregatorBloomFilter::getGranuleAndReset()
{
const auto granule = std::make_shared<MergeTreeIndexGranuleBloomFilter>(bits_per_row, hash_functions, total_rows, granule_index_blocks);
total_rows = 0;
granule_index_blocks.clear();
return granule;
}
void MergeTreeIndexAggregatorBloomFilter::update(const Block & block, size_t * pos, size_t limit)
{
if (*pos >= block.rows())
throw Exception("The provided position is not less than the number of block rows. Position: " + toString(*pos) + ", Block rows: " +
toString(block.rows()) + ".", ErrorCodes::LOGICAL_ERROR);
Block granule_index_block;
size_t max_read_rows = std::min(block.rows() - *pos, limit);
for (size_t index = 0; index < index_columns_name.size(); ++index)
{
const auto & column_and_type = block.getByName(index_columns_name[index]);
const auto & index_column = BloomFilterHash::hashWithColumn(&*column_and_type.type, &*column_and_type.column, *pos, max_read_rows);
granule_index_block.insert({std::move(index_column), std::make_shared<DataTypeUInt64>(), column_and_type.name});
}
*pos += max_read_rows;
total_rows += max_read_rows;
granule_index_blocks.push_back(granule_index_block);
}
}

View File

@ -0,0 +1,29 @@
#pragma once
#include <Storages/MergeTree/MergeTreeIndices.h>
#include <Storages/MergeTree/MergeTreeIndexGranuleBloomFilter.h>
namespace DB
{
class MergeTreeIndexAggregatorBloomFilter : public IMergeTreeIndexAggregator
{
public:
MergeTreeIndexAggregatorBloomFilter(size_t bits_per_row_, size_t hash_functions_, const Names & columns_name_);
bool empty() const override;
MergeTreeIndexGranulePtr getGranuleAndReset() override;
void update(const Block & block, size_t * pos, size_t limit) override;
private:
size_t bits_per_row;
size_t hash_functions;
const Names index_columns_name;
size_t total_rows = 0;
Blocks granule_index_blocks;
};
}

View File

@ -0,0 +1,87 @@
#include <Storages/MergeTree/MergeTreeIndexBloomFilter.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Interpreters/SyntaxAnalyzer.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Core/Types.h>
#include <ext/bit_cast.h>
#include <Parsers/ASTLiteral.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <DataTypes/DataTypeNullable.h>
#include <Storages/MergeTree/MergeTreeIndexConditionBloomFilter.h>
#include <Parsers/queryToString.h>
#include <Columns/ColumnConst.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int INCORRECT_QUERY;
}
MergeTreeIndexBloomFilter::MergeTreeIndexBloomFilter(
const String & name, const ExpressionActionsPtr & expr, const Names & columns, const DataTypes & data_types, const Block & header,
size_t granularity, size_t bits_per_row_, size_t hash_functions_)
: IMergeTreeIndex(name, expr, columns, data_types, header, granularity), bits_per_row(bits_per_row_), hash_functions(hash_functions_)
{
}
MergeTreeIndexGranulePtr MergeTreeIndexBloomFilter::createIndexGranule() const
{
return std::make_shared<MergeTreeIndexGranuleBloomFilter>(bits_per_row, hash_functions, columns.size());
}
bool MergeTreeIndexBloomFilter::mayBenefitFromIndexForIn(const ASTPtr & node) const
{
const String column_name = node->getColumnName();
for (const auto & name : columns)
if (column_name == name)
return true;
if (const auto * func = typeid_cast<const ASTFunction *>(node.get()))
if (func->arguments->children.size() == 1)
return mayBenefitFromIndexForIn(func->arguments->children.front());
return false;
}
MergeTreeIndexAggregatorPtr MergeTreeIndexBloomFilter::createIndexAggregator() const
{
return std::make_shared<MergeTreeIndexAggregatorBloomFilter>(bits_per_row, hash_functions, columns);
}
IndexConditionPtr MergeTreeIndexBloomFilter::createIndexCondition(const SelectQueryInfo & query_info, const Context & context) const
{
return std::make_shared<MergeTreeIndexConditionBloomFilter>(query_info, context, header, hash_functions);
}
std::unique_ptr<IMergeTreeIndex> bloomFilterIndexCreatorNew(const NamesAndTypesList & columns, std::shared_ptr<ASTIndexDeclaration> node, const Context & context)
{
if (node->name.empty())
throw Exception("Index must have unique name.", ErrorCodes::INCORRECT_QUERY);
ASTPtr expr_list = MergeTreeData::extractKeyExpressionList(node->expr->clone());
auto syntax = SyntaxAnalyzer(context, {}).analyze(expr_list, columns);
auto index_expr = ExpressionAnalyzer(expr_list, syntax, context).getActions(false);
auto index_sample = ExpressionAnalyzer(expr_list, syntax, context).getActions(true)->getSampleBlock();
if (!index_sample || !index_sample.columns())
throw Exception("Index must have columns.", ErrorCodes::INCORRECT_QUERY);
double max_conflict_probability = 0.025;
if (node->type->arguments && !node->type->arguments->children.empty())
max_conflict_probability = typeid_cast<const ASTLiteral &>(*node->type->arguments->children[0]).value.get<Float64>();
const auto & bits_per_row_and_size_of_hash_functions = calculationBestPractices(max_conflict_probability);
return std::make_unique<MergeTreeIndexBloomFilter>(
node->name, std::move(index_expr), index_sample.getNames(), index_sample.getDataTypes(), index_sample, node->granularity,
bits_per_row_and_size_of_hash_functions.first, bits_per_row_and_size_of_hash_functions.second);
}
}

View File

@ -0,0 +1,31 @@
#pragma once
#include <Interpreters/BloomFilter.h>
#include <Storages/MergeTree/MergeTreeIndices.h>
#include <Storages/MergeTree/MergeTreeIndexGranuleBloomFilter.h>
#include <Storages/MergeTree/MergeTreeIndexAggregatorBloomFilter.h>
namespace DB
{
class MergeTreeIndexBloomFilter : public IMergeTreeIndex
{
public:
MergeTreeIndexBloomFilter(
const String & name, const ExpressionActionsPtr & expr, const Names & columns, const DataTypes & data_types,
const Block & header, size_t granularity, size_t bits_per_row_, size_t hash_functions_);
MergeTreeIndexGranulePtr createIndexGranule() const override;
MergeTreeIndexAggregatorPtr createIndexAggregator() const override;
IndexConditionPtr createIndexCondition(const SelectQueryInfo & query_info, const Context & context) const override;
bool mayBenefitFromIndexForIn(const ASTPtr & node) const override;
private:
size_t bits_per_row;
size_t hash_functions;
};
}

View File

@ -0,0 +1,310 @@
#include <Storages/MergeTree/MergeTreeIndexConditionBloomFilter.h>
#include <Interpreters/QueryNormalizer.h>
#include <Interpreters/BloomFilterHash.h>
#include <Common/HashTable/ClearableHashMap.h>
#include <Storages/MergeTree/RPNBuilder.h>
#include <Storages/MergeTree/MergeTreeIndexGranuleBloomFilter.h>
#include <DataTypes/DataTypeTuple.h>
#include <Columns/ColumnConst.h>
#include <ext/bit_cast.h>
#include <Parsers/ASTSubquery.h>
#include <Parsers/ASTIdentifier.h>
#include <Columns/ColumnTuple.h>
namespace DB
{
namespace
{
PreparedSetKey getPreparedSetKey(const ASTPtr & node, const DataTypePtr & data_type)
{
/// If the data type is tuple, let's try unbox once
if (node->as<ASTSubquery>() || node->as<ASTIdentifier>())
return PreparedSetKey::forSubquery(*node);
if (const auto * date_type_tuple = typeid_cast<const DataTypeTuple *>(&*data_type))
return PreparedSetKey::forLiteral(*node, date_type_tuple->getElements());
return PreparedSetKey::forLiteral(*node, DataTypes(1, data_type));
}
bool maybeTrueOnBloomFilter(const IColumn * hash_column, const BloomFilterPtr & bloom_filter, size_t hash_functions)
{
const auto const_column = typeid_cast<const ColumnConst *>(hash_column);
const auto non_const_column = typeid_cast<const ColumnUInt64 *>(hash_column);
if (!const_column && !non_const_column)
throw Exception("LOGICAL ERROR: hash column must be Const Column or UInt64 Column.", ErrorCodes::LOGICAL_ERROR);
if (const_column)
{
for (size_t index = 0; index < hash_functions; ++index)
if (!bloom_filter->containsWithSeed(const_column->getValue<UInt64>(), BloomFilterHash::bf_hash_seed[index]))
return false;
return true;
}
else
{
bool missing_rows = true;
const ColumnUInt64::Container & data = non_const_column->getData();
for (size_t index = 0, size = data.size(); missing_rows && index < size; ++index)
{
bool match_row = true;
for (size_t hash_index = 0; match_row && hash_index < hash_functions; ++hash_index)
match_row = bloom_filter->containsWithSeed(data[index], BloomFilterHash::bf_hash_seed[hash_index]);
missing_rows = !match_row;
}
return !missing_rows;
}
}
}
MergeTreeIndexConditionBloomFilter::MergeTreeIndexConditionBloomFilter(
const SelectQueryInfo & info, const Context & context, const Block & header, size_t hash_functions)
: header(header), query_info(info), hash_functions(hash_functions)
{
auto atomFromAST = [this](auto & node, auto &, auto & constants, auto & out) { return traverseAtomAST(node, constants, out); };
rpn = std::move(RPNBuilder<RPNElement>(info, context, atomFromAST).extractRPN());
}
bool MergeTreeIndexConditionBloomFilter::alwaysUnknownOrTrue() const
{
std::vector<bool> rpn_stack;
for (const auto & element : rpn)
{
if (element.function == RPNElement::FUNCTION_UNKNOWN
|| element.function == RPNElement::ALWAYS_TRUE)
{
rpn_stack.push_back(true);
}
else if (element.function == RPNElement::FUNCTION_EQUALS
|| element.function == RPNElement::FUNCTION_NOT_EQUALS
|| element.function == RPNElement::FUNCTION_IN
|| element.function == RPNElement::FUNCTION_NOT_IN
|| element.function == RPNElement::ALWAYS_FALSE)
{
rpn_stack.push_back(false);
}
else if (element.function == RPNElement::FUNCTION_NOT)
{
// do nothing
}
else if (element.function == RPNElement::FUNCTION_AND)
{
auto arg1 = rpn_stack.back();
rpn_stack.pop_back();
auto arg2 = rpn_stack.back();
rpn_stack.back() = arg1 && arg2;
}
else if (element.function == RPNElement::FUNCTION_OR)
{
auto arg1 = rpn_stack.back();
rpn_stack.pop_back();
auto arg2 = rpn_stack.back();
rpn_stack.back() = arg1 || arg2;
}
else
throw Exception("Unexpected function type in KeyCondition::RPNElement", ErrorCodes::LOGICAL_ERROR);
}
return rpn_stack[0];
}
bool MergeTreeIndexConditionBloomFilter::mayBeTrueOnGranule(const MergeTreeIndexGranuleBloomFilter * granule) const
{
std::vector<BoolMask> rpn_stack;
const auto & filters = granule->getFilters();
for (const auto & element : rpn)
{
if (element.function == RPNElement::FUNCTION_UNKNOWN)
{
rpn_stack.emplace_back(true, true);
}
else if (element.function == RPNElement::FUNCTION_IN
|| element.function == RPNElement::FUNCTION_NOT_IN
|| element.function == RPNElement::FUNCTION_EQUALS
|| element.function == RPNElement::FUNCTION_NOT_EQUALS)
{
bool match_rows = true;
const auto & predicate = element.predicate;
for (size_t index = 0; match_rows && index < predicate.size(); ++index)
{
const auto & query_index_hash = predicate[index];
const auto & filter = filters[query_index_hash.first];
const ColumnPtr & hash_column = query_index_hash.second;
match_rows = maybeTrueOnBloomFilter(&*hash_column, filter, hash_functions);
}
rpn_stack.emplace_back(match_rows, !match_rows);
if (element.function == RPNElement::FUNCTION_NOT_EQUALS || element.function == RPNElement::FUNCTION_NOT_IN)
rpn_stack.back() = !rpn_stack.back();
}
else if (element.function == RPNElement::FUNCTION_NOT)
{
rpn_stack.back() = !rpn_stack.back();
}
else if (element.function == RPNElement::FUNCTION_OR)
{
auto arg1 = rpn_stack.back();
rpn_stack.pop_back();
auto arg2 = rpn_stack.back();
rpn_stack.back() = arg1 | arg2;
}
else if (element.function == RPNElement::FUNCTION_AND)
{
auto arg1 = rpn_stack.back();
rpn_stack.pop_back();
auto arg2 = rpn_stack.back();
rpn_stack.back() = arg1 & arg2;
}
else if (element.function == RPNElement::ALWAYS_TRUE)
{
rpn_stack.emplace_back(true, false);
}
else if (element.function == RPNElement::ALWAYS_FALSE)
{
rpn_stack.emplace_back(false, true);
}
else
throw Exception("Unexpected function type in KeyCondition::RPNElement", ErrorCodes::LOGICAL_ERROR);
}
if (rpn_stack.size() != 1)
throw Exception("Unexpected stack size in KeyCondition::mayBeTrueInRange", ErrorCodes::LOGICAL_ERROR);
return rpn_stack[0].can_be_true;
}
bool MergeTreeIndexConditionBloomFilter::traverseAtomAST(const ASTPtr & node, Block & block_with_constants, RPNElement & out)
{
{
Field const_value;
DataTypePtr const_type;
if (KeyCondition::getConstant(node, block_with_constants, const_value, const_type))
{
if (const_value.getType() == Field::Types::UInt64 || const_value.getType() == Field::Types::Int64 ||
const_value.getType() == Field::Types::Float64)
{
/// Zero in all types is represented in memory the same way as in UInt64.
out.function = const_value.get<UInt64>() ? RPNElement::ALWAYS_TRUE : RPNElement::ALWAYS_FALSE;
return true;
}
}
}
if (const auto * function = node->as<ASTFunction>())
{
const ASTs & arguments = function->arguments->children;
if (arguments.size() != 2)
return false;
if (functionIsInOrGlobalInOperator(function->name))
return processInOrNotInOperator(function->name, arguments[0], arguments[1], out);
if (function->name == "equals" || function->name == "notEquals")
{
Field const_value;
DataTypePtr const_type;
if (KeyCondition::getConstant(arguments[1], block_with_constants, const_value, const_type))
return processEqualsOrNotEquals(function->name, arguments[0], const_type, const_value, out);
else if (KeyCondition::getConstant(arguments[0], block_with_constants, const_value, const_type))
return processEqualsOrNotEquals(function->name, arguments[1], const_type, const_value, out);
}
}
return false;
}
bool MergeTreeIndexConditionBloomFilter::processInOrNotInOperator(
const String & function_name, const ASTPtr & key_ast, const ASTPtr & expr_list, RPNElement & out)
{
if (header.has(key_ast->getColumnName()))
{
const auto & column_and_type = header.getByName(key_ast->getColumnName());
const auto & prepared_set_it = query_info.sets.find(getPreparedSetKey(expr_list, column_and_type.type));
if (prepared_set_it != query_info.sets.end() && prepared_set_it->second->hasExplicitSetElements())
{
const IDataType * type = &*column_and_type.type;
const auto & prepared_set = prepared_set_it->second;
if (!typeid_cast<const DataTypeTuple *>(type))
{
const Columns & columns = prepared_set->getSetElements();
if (columns.size() != 1)
throw Exception("LOGICAL ERROR: prepared_set columns size must be 1.", ErrorCodes::LOGICAL_ERROR);
ColumnPtr column = columns[0];
size_t position = header.getPositionByName(key_ast->getColumnName());
out.predicate.emplace_back(std::make_pair(position, BloomFilterHash::hashWithColumn(type, &*column, 0, column->size())));
}
else
{
size_t position = header.getPositionByName(key_ast->getColumnName());
const auto & tuple_column = ColumnTuple::create(prepared_set->getSetElements());
const auto & bf_hash_column = BloomFilterHash::hashWithColumn(type, &*tuple_column, 0, prepared_set->getTotalRowCount());
out.predicate.emplace_back(std::make_pair(position, bf_hash_column));
}
if (function_name == "in" || function_name == "globalIn")
out.function = RPNElement::FUNCTION_IN;
if (function_name == "notIn" || function_name == "globalNotIn")
out.function = RPNElement::FUNCTION_NOT_IN;
return true;
}
}
return false;
}
bool MergeTreeIndexConditionBloomFilter::processEqualsOrNotEquals(
const String & function_name, const ASTPtr & key_ast, const DataTypePtr & value_type, const Field & value_field, RPNElement & out)
{
if (header.has(key_ast->getColumnName()))
{
size_t position = header.getPositionByName(key_ast->getColumnName());
out.predicate.emplace_back(std::make_pair(position, BloomFilterHash::hashWithField(&*value_type, value_field)));
out.function = function_name == "equals" ? RPNElement::FUNCTION_EQUALS : RPNElement::FUNCTION_NOT_EQUALS;
return true;
}
if (const auto * function = key_ast->as<ASTFunction>())
{
WhichDataType which(value_type);
/// TODO: support SQL: where array(index_column_x, column_y) = [1, 2]
if (which.isTuple() && function->name == "tuple")
{
const TupleBackend & tuple = get<const Tuple &>(value_field).toUnderType();
const auto value_tuple_data_type = typeid_cast<const DataTypeTuple *>(value_type.get());
const ASTs & arguments = typeid_cast<const ASTExpressionList &>(*function->arguments).children;
if (tuple.size() != arguments.size())
throw Exception("Illegal types of arguments of function " + function_name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
bool match_with_subtype = false;
const DataTypes & subtypes = value_tuple_data_type->getElements();
for (size_t index = 0; index < tuple.size(); ++index)
match_with_subtype |= processEqualsOrNotEquals(function_name, arguments[index], subtypes[index], tuple[index], out);
return match_with_subtype;
}
}
return false;
}
}

View File

@ -0,0 +1,69 @@
#pragma once
#include <Columns/IColumn.h>
#include <Interpreters/BloomFilter.h>
#include <Storages/MergeTree/KeyCondition.h>
#include <Storages/MergeTree/MergeTreeIndices.h>
#include <Storages/MergeTree/MergeTreeIndexGranuleBloomFilter.h>
namespace DB
{
class MergeTreeIndexConditionBloomFilter : public IIndexCondition
{
public:
struct RPNElement
{
enum Function
{
/// Atoms of a Boolean expression.
FUNCTION_EQUALS,
FUNCTION_NOT_EQUALS,
FUNCTION_IN,
FUNCTION_NOT_IN,
FUNCTION_UNKNOWN, /// Can take any value.
/// Operators of the logical expression.
FUNCTION_NOT,
FUNCTION_AND,
FUNCTION_OR,
/// Constants
ALWAYS_FALSE,
ALWAYS_TRUE,
};
RPNElement(Function function_ = FUNCTION_UNKNOWN) : function(function_) {}
Function function = FUNCTION_UNKNOWN;
std::vector<std::pair<size_t, ColumnPtr>> predicate;
};
MergeTreeIndexConditionBloomFilter(const SelectQueryInfo & info, const Context & context, const Block & header, size_t hash_functions);
bool alwaysUnknownOrTrue() const override;
bool mayBeTrueOnGranule(MergeTreeIndexGranulePtr granule) const override
{
if (const auto & bf_granule = typeid_cast<const MergeTreeIndexGranuleBloomFilter *>(granule.get()))
{
return mayBeTrueOnGranule(bf_granule);
}
throw Exception("LOGICAL ERROR: require bloom filter index granule.", ErrorCodes::LOGICAL_ERROR);
}
private:
const Block & header;
const SelectQueryInfo & query_info;
const size_t hash_functions;
std::vector<RPNElement> rpn;
bool mayBeTrueOnGranule(const MergeTreeIndexGranuleBloomFilter * granule) const;
bool traverseAtomAST(const ASTPtr & node, Block & block_with_constants, RPNElement & out);
bool processInOrNotInOperator(const String &function_name, const ASTPtr &key_ast, const ASTPtr &expr_list, RPNElement &out);
bool processEqualsOrNotEquals(const String & function_name, const ASTPtr & key_ast, const DataTypePtr & value_type, const Field & value_field, RPNElement & out);
};
}

View File

@ -1,4 +1,4 @@
#include <Storages/MergeTree/MergeTreeBloomFilterIndex.h>
#include <Storages/MergeTree/MergeTreeIndexFullText.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/UTF8Helpers.h>
@ -31,7 +31,7 @@ namespace ErrorCodes
/// Adds all tokens from string to bloom filter.
static void stringToBloomFilter(
const char * data, size_t size, const std::unique_ptr<ITokenExtractor> & token_extractor, StringBloomFilter & bloom_filter)
const char * data, size_t size, const std::unique_ptr<ITokenExtractor> & token_extractor, BloomFilter & bloom_filter)
{
size_t cur = 0;
size_t token_start = 0;
@ -42,7 +42,7 @@ static void stringToBloomFilter(
/// Adds all tokens from like pattern string to bloom filter. (Because like pattern can contain `\%` and `\_`.)
static void likeStringToBloomFilter(
const String & data, const std::unique_ptr<ITokenExtractor> & token_extractor, StringBloomFilter & bloom_filter)
const String & data, const std::unique_ptr<ITokenExtractor> & token_extractor, BloomFilter & bloom_filter)
{
size_t cur = 0;
String token;
@ -51,24 +51,23 @@ static void likeStringToBloomFilter(
}
MergeTreeBloomFilterIndexGranule::MergeTreeBloomFilterIndexGranule(const MergeTreeBloomFilterIndex & index)
MergeTreeIndexGranuleFullText::MergeTreeIndexGranuleFullText(const MergeTreeIndexFullText & index)
: IMergeTreeIndexGranule()
, index(index)
, bloom_filters(
index.columns.size(), StringBloomFilter(index.bloom_filter_size, index.bloom_filter_hashes, index.seed))
index.columns.size(), BloomFilter(index.bloom_filter_size, index.bloom_filter_hashes, index.seed))
, has_elems(false) {}
void MergeTreeBloomFilterIndexGranule::serializeBinary(WriteBuffer & ostr) const
void MergeTreeIndexGranuleFullText::serializeBinary(WriteBuffer & ostr) const
{
if (empty())
throw Exception(
"Attempt to write empty minmax index " + backQuote(index.name), ErrorCodes::LOGICAL_ERROR);
throw Exception("Attempt to write empty minmax index " + backQuote(index.name), ErrorCodes::LOGICAL_ERROR);
for (const auto & bloom_filter : bloom_filters)
ostr.write(reinterpret_cast<const char *>(bloom_filter.getFilter().data()), index.bloom_filter_size);
}
void MergeTreeBloomFilterIndexGranule::deserializeBinary(ReadBuffer & istr)
void MergeTreeIndexGranuleFullText::deserializeBinary(ReadBuffer & istr)
{
for (auto & bloom_filter : bloom_filters)
{
@ -78,17 +77,17 @@ void MergeTreeBloomFilterIndexGranule::deserializeBinary(ReadBuffer & istr)
}
MergeTreeBloomFilterIndexAggregator::MergeTreeBloomFilterIndexAggregator(const MergeTreeBloomFilterIndex & index)
: index(index), granule(std::make_shared<MergeTreeBloomFilterIndexGranule>(index)) {}
MergeTreeIndexAggregatorFullText::MergeTreeIndexAggregatorFullText(const MergeTreeIndexFullText & index)
: index(index), granule(std::make_shared<MergeTreeIndexGranuleFullText>(index)) {}
MergeTreeIndexGranulePtr MergeTreeBloomFilterIndexAggregator::getGranuleAndReset()
MergeTreeIndexGranulePtr MergeTreeIndexAggregatorFullText::getGranuleAndReset()
{
auto new_granule = std::make_shared<MergeTreeBloomFilterIndexGranule>(index);
auto new_granule = std::make_shared<MergeTreeIndexGranuleFullText>(index);
new_granule.swap(granule);
return new_granule;
}
void MergeTreeBloomFilterIndexAggregator::update(const Block & block, size_t * pos, size_t limit)
void MergeTreeIndexAggregatorFullText::update(const Block & block, size_t * pos, size_t limit)
{
if (*pos >= block.rows())
throw Exception(
@ -111,14 +110,14 @@ void MergeTreeBloomFilterIndexAggregator::update(const Block & block, size_t * p
}
const BloomFilterCondition::AtomMap BloomFilterCondition::atom_map
const MergeTreeConditionFullText::AtomMap MergeTreeConditionFullText::atom_map
{
{
"notEquals",
[] (RPNElement & out, const Field & value, const MergeTreeBloomFilterIndex & idx)
[] (RPNElement & out, const Field & value, const MergeTreeIndexFullText & idx)
{
out.function = RPNElement::FUNCTION_NOT_EQUALS;
out.bloom_filter = std::make_unique<StringBloomFilter>(
out.bloom_filter = std::make_unique<BloomFilter>(
idx.bloom_filter_size, idx.bloom_filter_hashes, idx.seed);
const auto & str = value.get<String>();
@ -128,10 +127,10 @@ const BloomFilterCondition::AtomMap BloomFilterCondition::atom_map
},
{
"equals",
[] (RPNElement & out, const Field & value, const MergeTreeBloomFilterIndex & idx)
[] (RPNElement & out, const Field & value, const MergeTreeIndexFullText & idx)
{
out.function = RPNElement::FUNCTION_EQUALS;
out.bloom_filter = std::make_unique<StringBloomFilter>(
out.bloom_filter = std::make_unique<BloomFilter>(
idx.bloom_filter_size, idx.bloom_filter_hashes, idx.seed);
const auto & str = value.get<String>();
@ -141,10 +140,10 @@ const BloomFilterCondition::AtomMap BloomFilterCondition::atom_map
},
{
"like",
[] (RPNElement & out, const Field & value, const MergeTreeBloomFilterIndex & idx)
[] (RPNElement & out, const Field & value, const MergeTreeIndexFullText & idx)
{
out.function = RPNElement::FUNCTION_LIKE;
out.bloom_filter = std::make_unique<StringBloomFilter>(
out.bloom_filter = std::make_unique<BloomFilter>(
idx.bloom_filter_size, idx.bloom_filter_hashes, idx.seed);
const auto & str = value.get<String>();
@ -154,7 +153,7 @@ const BloomFilterCondition::AtomMap BloomFilterCondition::atom_map
},
{
"notIn",
[] (RPNElement & out, const Field &, const MergeTreeBloomFilterIndex &)
[] (RPNElement & out, const Field &, const MergeTreeIndexFullText &)
{
out.function = RPNElement::FUNCTION_NOT_IN;
return true;
@ -162,7 +161,7 @@ const BloomFilterCondition::AtomMap BloomFilterCondition::atom_map
},
{
"in",
[] (RPNElement & out, const Field &, const MergeTreeBloomFilterIndex &)
[] (RPNElement & out, const Field &, const MergeTreeIndexFullText &)
{
out.function = RPNElement::FUNCTION_IN;
return true;
@ -170,24 +169,21 @@ const BloomFilterCondition::AtomMap BloomFilterCondition::atom_map
},
};
BloomFilterCondition::BloomFilterCondition(
MergeTreeConditionFullText::MergeTreeConditionFullText(
const SelectQueryInfo & query_info,
const Context & context,
const MergeTreeBloomFilterIndex & index_) : index(index_), prepared_sets(query_info.sets)
const MergeTreeIndexFullText & index_) : index(index_), prepared_sets(query_info.sets)
{
rpn = std::move(
RPNBuilder<RPNElement>(
query_info, context,
[this] (const ASTPtr & node,
const Context & /* context */,
Block & block_with_constants,
RPNElement & out) -> bool
[this] (const ASTPtr & node, const Context & /* context */, Block & block_with_constants, RPNElement & out) -> bool
{
return this->atomFromAST(node, block_with_constants, out);
}).extractRPN());
}
bool BloomFilterCondition::alwaysUnknownOrTrue() const
bool MergeTreeConditionFullText::alwaysUnknownOrTrue() const
{
/// Check like in KeyCondition.
std::vector<bool> rpn_stack;
@ -234,10 +230,10 @@ bool BloomFilterCondition::alwaysUnknownOrTrue() const
return rpn_stack[0];
}
bool BloomFilterCondition::mayBeTrueOnGranule(MergeTreeIndexGranulePtr idx_granule) const
bool MergeTreeConditionFullText::mayBeTrueOnGranule(MergeTreeIndexGranulePtr idx_granule) const
{
std::shared_ptr<MergeTreeBloomFilterIndexGranule> granule
= std::dynamic_pointer_cast<MergeTreeBloomFilterIndexGranule>(idx_granule);
std::shared_ptr<MergeTreeIndexGranuleFullText> granule
= std::dynamic_pointer_cast<MergeTreeIndexGranuleFullText>(idx_granule);
if (!granule)
throw Exception(
"BloomFilter index condition got a granule with the wrong type.", ErrorCodes::LOGICAL_ERROR);
@ -323,7 +319,7 @@ bool BloomFilterCondition::mayBeTrueOnGranule(MergeTreeIndexGranulePtr idx_granu
return rpn_stack[0].can_be_true;
}
bool BloomFilterCondition::getKey(const ASTPtr & node, size_t & key_column_num)
bool MergeTreeConditionFullText::getKey(const ASTPtr & node, size_t & key_column_num)
{
auto it = std::find(index.columns.begin(), index.columns.end(), node->getColumnName());
if (it == index.columns.end())
@ -333,7 +329,7 @@ bool BloomFilterCondition::getKey(const ASTPtr & node, size_t & key_column_num)
return true;
}
bool BloomFilterCondition::atomFromAST(
bool MergeTreeConditionFullText::atomFromAST(
const ASTPtr & node, Block & block_with_constants, RPNElement & out)
{
Field const_value;
@ -399,7 +395,7 @@ bool BloomFilterCondition::atomFromAST(
return false;
}
bool BloomFilterCondition::tryPrepareSetBloomFilter(
bool MergeTreeConditionFullText::tryPrepareSetBloomFilter(
const ASTs & args,
RPNElement & out)
{
@ -454,7 +450,7 @@ bool BloomFilterCondition::tryPrepareSetBloomFilter(
if (data_type->getTypeId() != TypeIndex::String && data_type->getTypeId() != TypeIndex::FixedString)
return false;
std::vector<std::vector<StringBloomFilter>> bloom_filters;
std::vector<std::vector<BloomFilter>> bloom_filters;
std::vector<size_t> key_position;
Columns columns = prepared_set->getSetElements();
@ -480,23 +476,23 @@ bool BloomFilterCondition::tryPrepareSetBloomFilter(
}
MergeTreeIndexGranulePtr MergeTreeBloomFilterIndex::createIndexGranule() const
MergeTreeIndexGranulePtr MergeTreeIndexFullText::createIndexGranule() const
{
return std::make_shared<MergeTreeBloomFilterIndexGranule>(*this);
return std::make_shared<MergeTreeIndexGranuleFullText>(*this);
}
MergeTreeIndexAggregatorPtr MergeTreeBloomFilterIndex::createIndexAggregator() const
MergeTreeIndexAggregatorPtr MergeTreeIndexFullText::createIndexAggregator() const
{
return std::make_shared<MergeTreeBloomFilterIndexAggregator>(*this);
return std::make_shared<MergeTreeIndexAggregatorFullText>(*this);
}
IndexConditionPtr MergeTreeBloomFilterIndex::createIndexCondition(
IndexConditionPtr MergeTreeIndexFullText::createIndexCondition(
const SelectQueryInfo & query, const Context & context) const
{
return std::make_shared<BloomFilterCondition>(query, context, *this);
return std::make_shared<MergeTreeConditionFullText>(query, context, *this);
};
bool MergeTreeBloomFilterIndex::mayBenefitFromIndexForIn(const ASTPtr & node) const
bool MergeTreeIndexFullText::mayBenefitFromIndexForIn(const ASTPtr & node) const
{
return std::find(std::cbegin(columns), std::cend(columns), node->getColumnName()) != std::cend(columns);
}
@ -679,7 +675,7 @@ std::unique_ptr<IMergeTreeIndex> bloomFilterIndexCreator(
auto tokenizer = std::make_unique<NgramTokenExtractor>(n);
return std::make_unique<MergeTreeBloomFilterIndex>(
return std::make_unique<MergeTreeIndexFullText>(
node->name, std::move(index_expr), columns, data_types, sample, node->granularity,
bloom_filter_size, bloom_filter_hashes, seed, std::move(tokenizer));
}
@ -697,7 +693,7 @@ std::unique_ptr<IMergeTreeIndex> bloomFilterIndexCreator(
auto tokenizer = std::make_unique<SplitTokenExtractor>();
return std::make_unique<MergeTreeBloomFilterIndex>(
return std::make_unique<MergeTreeIndexFullText>(
node->name, std::move(index_expr), columns, data_types, sample, node->granularity,
bloom_filter_size, bloom_filter_hashes, seed, std::move(tokenizer));
}

View File

@ -10,54 +10,54 @@
namespace DB
{
class MergeTreeBloomFilterIndex;
class MergeTreeIndexFullText;
struct MergeTreeBloomFilterIndexGranule : public IMergeTreeIndexGranule
struct MergeTreeIndexGranuleFullText : public IMergeTreeIndexGranule
{
explicit MergeTreeBloomFilterIndexGranule(
const MergeTreeBloomFilterIndex & index);
explicit MergeTreeIndexGranuleFullText(
const MergeTreeIndexFullText & index);
~MergeTreeBloomFilterIndexGranule() override = default;
~MergeTreeIndexGranuleFullText() override = default;
void serializeBinary(WriteBuffer & ostr) const override;
void deserializeBinary(ReadBuffer & istr) override;
bool empty() const override { return !has_elems; }
const MergeTreeBloomFilterIndex & index;
std::vector<StringBloomFilter> bloom_filters;
const MergeTreeIndexFullText & index;
std::vector<BloomFilter> bloom_filters;
bool has_elems;
};
using MergeTreeBloomFilterIndexGranulePtr = std::shared_ptr<MergeTreeBloomFilterIndexGranule>;
using MergeTreeIndexGranuleFullTextPtr = std::shared_ptr<MergeTreeIndexGranuleFullText>;
struct MergeTreeBloomFilterIndexAggregator : IMergeTreeIndexAggregator
struct MergeTreeIndexAggregatorFullText : IMergeTreeIndexAggregator
{
explicit MergeTreeBloomFilterIndexAggregator(const MergeTreeBloomFilterIndex & index);
explicit MergeTreeIndexAggregatorFullText(const MergeTreeIndexFullText & index);
~MergeTreeBloomFilterIndexAggregator() override = default;
~MergeTreeIndexAggregatorFullText() override = default;
bool empty() const override { return !granule || granule->empty(); }
MergeTreeIndexGranulePtr getGranuleAndReset() override;
void update(const Block & block, size_t * pos, size_t limit) override;
const MergeTreeBloomFilterIndex & index;
MergeTreeBloomFilterIndexGranulePtr granule;
const MergeTreeIndexFullText & index;
MergeTreeIndexGranuleFullTextPtr granule;
};
class BloomFilterCondition : public IIndexCondition
class MergeTreeConditionFullText : public IIndexCondition
{
public:
BloomFilterCondition(
MergeTreeConditionFullText(
const SelectQueryInfo & query_info,
const Context & context,
const MergeTreeBloomFilterIndex & index_);
const MergeTreeIndexFullText & index_);
~BloomFilterCondition() override = default;
~MergeTreeConditionFullText() override = default;
bool alwaysUnknownOrTrue() const override;
@ -93,19 +93,19 @@ private:
};
RPNElement(
Function function_ = FUNCTION_UNKNOWN, size_t key_column_ = 0, std::unique_ptr<StringBloomFilter> && const_bloom_filter_ = nullptr)
Function function_ = FUNCTION_UNKNOWN, size_t key_column_ = 0, std::unique_ptr<BloomFilter> && const_bloom_filter_ = nullptr)
: function(function_), key_column(key_column_), bloom_filter(std::move(const_bloom_filter_)) {}
Function function = FUNCTION_UNKNOWN;
/// For FUNCTION_EQUALS, FUNCTION_NOT_EQUALS, FUNCTION_LIKE, FUNCTION_NOT_LIKE.
size_t key_column;
std::unique_ptr<StringBloomFilter> bloom_filter;
std::unique_ptr<BloomFilter> bloom_filter;
/// For FUNCTION_IN and FUNCTION_NOT_IN
std::vector<std::vector<StringBloomFilter>> set_bloom_filters;
std::vector<std::vector<BloomFilter>> set_bloom_filters;
std::vector<size_t> set_key_position;
};
using AtomMap = std::unordered_map<std::string, bool(*)(RPNElement & out, const Field & value, const MergeTreeBloomFilterIndex & idx)>;
using AtomMap = std::unordered_map<std::string, bool(*)(RPNElement & out, const Field & value, const MergeTreeIndexFullText & idx)>;
using RPN = std::vector<RPNElement>;
bool atomFromAST(const ASTPtr & node, Block & block_with_constants, RPNElement & out);
@ -115,7 +115,7 @@ private:
static const AtomMap atom_map;
const MergeTreeBloomFilterIndex & index;
const MergeTreeIndexFullText & index;
RPN rpn;
/// Sets from syntax analyzer.
PreparedSets prepared_sets;
@ -164,10 +164,10 @@ struct SplitTokenExtractor : public ITokenExtractor
};
class MergeTreeBloomFilterIndex : public IMergeTreeIndex
class MergeTreeIndexFullText : public IMergeTreeIndex
{
public:
MergeTreeBloomFilterIndex(
MergeTreeIndexFullText(
String name_,
ExpressionActionsPtr expr_,
const Names & columns_,
@ -184,7 +184,7 @@ public:
, seed(seed_)
, token_extractor_func(std::move(token_extractor_func_)) {}
~MergeTreeBloomFilterIndex() override = default;
~MergeTreeIndexFullText() override = default;
MergeTreeIndexGranulePtr createIndexGranule() const override;
MergeTreeIndexAggregatorPtr createIndexAggregator() const override;

View File

@ -0,0 +1,116 @@
#include <Storages/MergeTree/MergeTreeIndexGranuleBloomFilter.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnFixedString.h>
#include <DataTypes/DataTypeNullable.h>
#include <Common/HashTable/Hash.h>
#include <ext/bit_cast.h>
#include <Interpreters/BloomFilterHash.h>
namespace DB
{
MergeTreeIndexGranuleBloomFilter::MergeTreeIndexGranuleBloomFilter(size_t bits_per_row, size_t hash_functions, size_t index_columns)
: bits_per_row(bits_per_row), hash_functions(hash_functions)
{
total_rows = 0;
bloom_filters.resize(index_columns);
}
MergeTreeIndexGranuleBloomFilter::MergeTreeIndexGranuleBloomFilter(
size_t bits_per_row, size_t hash_functions, size_t total_rows, const Blocks & granule_index_blocks)
: total_rows(total_rows), bits_per_row(bits_per_row), hash_functions(hash_functions)
{
if (granule_index_blocks.empty() || !total_rows)
throw Exception("LOGICAL ERROR: granule_index_blocks empty or total_rows is zero.", ErrorCodes::LOGICAL_ERROR);
assertGranuleBlocksStructure(granule_index_blocks);
for (size_t index = 0; index < granule_index_blocks.size(); ++index)
{
Block granule_index_block = granule_index_blocks[index];
if (unlikely(!granule_index_block || !granule_index_block.rows()))
throw Exception("LOGICAL ERROR: granule_index_block is empty.", ErrorCodes::LOGICAL_ERROR);
if (index == 0)
{
static size_t atom_size = 8;
size_t bytes_size = (bits_per_row * total_rows + atom_size - 1) / atom_size;
for (size_t column = 0, columns = granule_index_block.columns(); column < columns; ++column)
bloom_filters.emplace_back(std::make_shared<BloomFilter>(bytes_size, hash_functions, 0));
}
for (size_t column = 0, columns = granule_index_block.columns(); column < columns; ++column)
fillingBloomFilter(bloom_filters[column], granule_index_block, column, hash_functions);
}
}
bool MergeTreeIndexGranuleBloomFilter::empty() const
{
return !total_rows;
}
void MergeTreeIndexGranuleBloomFilter::deserializeBinary(ReadBuffer & istr)
{
if (!empty())
throw Exception("Cannot read data to a non-empty bloom filter index.", ErrorCodes::LOGICAL_ERROR);
readVarUInt(total_rows, istr);
for (size_t index = 0; index < bloom_filters.size(); ++index)
{
static size_t atom_size = 8;
size_t bytes_size = (bits_per_row * total_rows + atom_size - 1) / atom_size;
bloom_filters[index] = std::make_shared<BloomFilter>(bytes_size, hash_functions, 0);
istr.read(reinterpret_cast<char *>(bloom_filters[index]->getFilter().data()), bytes_size);
}
}
void MergeTreeIndexGranuleBloomFilter::serializeBinary(WriteBuffer & ostr) const
{
if (empty())
throw Exception("Attempt to write empty bloom filter index.", ErrorCodes::LOGICAL_ERROR);
static size_t atom_size = 8;
writeVarUInt(total_rows, ostr);
size_t bytes_size = (bits_per_row * total_rows + atom_size - 1) / atom_size;
for (const auto & bloom_filter : bloom_filters)
ostr.write(reinterpret_cast<const char *>(bloom_filter->getFilter().data()), bytes_size);
}
void MergeTreeIndexGranuleBloomFilter::assertGranuleBlocksStructure(const Blocks & granule_index_blocks) const
{
Block prev_block;
for (size_t index = 0; index < granule_index_blocks.size(); ++index)
{
Block granule_index_block = granule_index_blocks[index];
if (index != 0)
assertBlocksHaveEqualStructure(prev_block, granule_index_block, "Granule blocks of bloom filter has difference structure.");
prev_block = granule_index_block;
}
}
void MergeTreeIndexGranuleBloomFilter::fillingBloomFilter(
std::shared_ptr<BloomFilter> & bf, const Block & granule_index_block, size_t index_hash_column, size_t hash_functions)
{
const auto & column = granule_index_block.getByPosition(index_hash_column);
if (const auto hash_column = typeid_cast<const ColumnUInt64 *>(column.column.get()))
{
const auto & hash_column_vec = hash_column->getData();
for (size_t index = 0, size = hash_column_vec.size(); index < size; ++index)
{
const UInt64 & bf_base_hash = hash_column_vec[index];
for (size_t i = 0; i < hash_functions; ++i)
bf->addHashWithSeed(bf_base_hash, BloomFilterHash::bf_hash_seed[i]);
}
}
}
}

View File

@ -0,0 +1,36 @@
#pragma once
#include <Interpreters/BloomFilter.h>
#include <Storages/MergeTree/MergeTreeIndices.h>
namespace DB
{
class MergeTreeIndexGranuleBloomFilter : public IMergeTreeIndexGranule
{
public:
MergeTreeIndexGranuleBloomFilter(size_t bits_per_row, size_t hash_functions, size_t index_columns);
MergeTreeIndexGranuleBloomFilter(size_t bits_per_row, size_t hash_functions, size_t total_rows, const Blocks & granule_index_blocks);
bool empty() const override;
void serializeBinary(WriteBuffer & ostr) const override;
void deserializeBinary(ReadBuffer & istr) override;
const std::vector<BloomFilterPtr> getFilters() const { return bloom_filters; }
private:
size_t total_rows;
size_t bits_per_row;
size_t hash_functions;
std::vector<BloomFilterPtr> bloom_filters;
void assertGranuleBlocksStructure(const Blocks & granule_index_blocks) const;
void fillingBloomFilter(BloomFilterPtr & bf, const Block & granule_index_block, size_t index_hash_column, size_t hash_functions);
};
}

View File

@ -19,7 +19,7 @@ namespace ErrorCodes
extern const int UNKNOWN_EXCEPTION;
}
void MergeTreeIndexFactory::registerIndex(const std::string &name, Creator creator)
void MergeTreeIndexFactory::registerIndex(const std::string & name, Creator creator)
{
if (!indexes.emplace(name, std::move(creator)).second)
throw Exception("MergeTreeIndexFactory: the Index creator name '" + name + "' is not unique",
@ -70,6 +70,11 @@ std::unique_ptr<IMergeTreeIndex> bloomFilterIndexCreator(
std::shared_ptr<ASTIndexDeclaration> node,
const Context & context);
std::unique_ptr<IMergeTreeIndex> bloomFilterIndexCreatorNew(
const NamesAndTypesList & columns,
std::shared_ptr<ASTIndexDeclaration> node,
const Context & context);
MergeTreeIndexFactory::MergeTreeIndexFactory()
{
@ -77,6 +82,7 @@ MergeTreeIndexFactory::MergeTreeIndexFactory()
registerIndex("set", setIndexCreator);
registerIndex("ngrambf_v1", bloomFilterIndexCreator);
registerIndex("tokenbf_v1", bloomFilterIndexCreator);
registerIndex("bloom_filter", bloomFilterIndexCreatorNew);
}
}

View File

@ -24,10 +24,7 @@ public:
using AtomFromASTFunc = std::function<
bool(const ASTPtr & node, const Context & context, Block & block_with_constants, RPNElement & out)>;
RPNBuilder(
const SelectQueryInfo & query_info,
const Context & context_,
const AtomFromASTFunc & atomFromAST_)
RPNBuilder(const SelectQueryInfo & query_info, const Context & context_, const AtomFromASTFunc & atomFromAST_)
: context(context_), atomFromAST(atomFromAST_)
{
/** Evaluation of expressions that depend only on constants.