Merge branch 'feature/bloom_filter' of https://github.com/zhang2014/ClickHouse into zhang2014-feature/bloom_filter

This commit is contained in:
Alexey Milovidov 2019-06-29 20:44:11 +03:00
commit 88fc72f0b9
27 changed files with 1320 additions and 197 deletions

View File

@ -1,5 +1,4 @@
#include <Interpreters/BloomFilter.h>
#include <city.h>
@ -9,14 +8,13 @@ namespace DB
static constexpr UInt64 SEED_GEN_A = 845897321;
static constexpr UInt64 SEED_GEN_B = 217728422;
StringBloomFilter::StringBloomFilter(size_t size_, size_t hashes_, size_t seed_)
BloomFilter::BloomFilter(size_t size_, size_t hashes_, size_t seed_)
: size(size_), hashes(hashes_), seed(seed_), words((size + sizeof(UnderType) - 1) / sizeof(UnderType)), filter(words, 0) {}
StringBloomFilter::StringBloomFilter(const StringBloomFilter & bloom_filter)
BloomFilter::BloomFilter(const BloomFilter & bloom_filter)
: size(bloom_filter.size), hashes(bloom_filter.hashes), seed(bloom_filter.seed), words(bloom_filter.words), filter(bloom_filter.filter) {}
bool StringBloomFilter::find(const char * data, size_t len)
bool BloomFilter::find(const char * data, size_t len)
{
size_t hash1 = CityHash_v1_0_2::CityHash64WithSeed(data, len, seed);
size_t hash2 = CityHash_v1_0_2::CityHash64WithSeed(data, len, SEED_GEN_A * seed + SEED_GEN_B);
@ -30,7 +28,7 @@ bool StringBloomFilter::find(const char * data, size_t len)
return true;
}
void StringBloomFilter::add(const char * data, size_t len)
void BloomFilter::add(const char * data, size_t len)
{
size_t hash1 = CityHash_v1_0_2::CityHash64WithSeed(data, len, seed);
size_t hash2 = CityHash_v1_0_2::CityHash64WithSeed(data, len, SEED_GEN_A * seed + SEED_GEN_B);
@ -42,12 +40,12 @@ void StringBloomFilter::add(const char * data, size_t len)
}
}
void StringBloomFilter::clear()
void BloomFilter::clear()
{
filter.assign(words, 0);
}
bool StringBloomFilter::contains(const StringBloomFilter & bf)
bool BloomFilter::contains(const BloomFilter & bf)
{
for (size_t i = 0; i < words; ++i)
{
@ -57,7 +55,7 @@ bool StringBloomFilter::contains(const StringBloomFilter & bf)
return true;
}
UInt64 StringBloomFilter::isEmpty() const
UInt64 BloomFilter::isEmpty() const
{
for (size_t i = 0; i < words; ++i)
if (filter[i] != 0)
@ -65,7 +63,7 @@ UInt64 StringBloomFilter::isEmpty() const
return true;
}
bool operator== (const StringBloomFilter & a, const StringBloomFilter & b)
bool operator== (const BloomFilter & a, const BloomFilter & b)
{
for (size_t i = 0; i < a.words; ++i)
if (a.filter[i] != b.filter[i])
@ -73,4 +71,16 @@ bool operator== (const StringBloomFilter & a, const StringBloomFilter & b)
return true;
}
void BloomFilter::addHashWithSeed(const UInt64 & hash, const UInt64 & hash_seed)
{
size_t pos = CityHash_v1_0_2::Hash128to64(CityHash_v1_0_2::uint128(hash, hash_seed)) % (8 * size);
filter[pos / (8 * sizeof(UnderType))] |= (1ULL << (pos % (8 * sizeof(UnderType))));
}
bool BloomFilter::findHashWithSeed(const UInt64 & hash, const UInt64 & hash_seed)
{
size_t pos = CityHash_v1_0_2::Hash128to64(CityHash_v1_0_2::uint128(hash, hash_seed)) % (8 * size);
return bool(filter[pos / (8 * sizeof(UnderType))] & (1ULL << (pos % (8 * sizeof(UnderType)))));
}
}

View File

@ -1,15 +1,17 @@
#pragma once
#include <Core/Types.h>
#include <vector>
#include <Core/Types.h>
#include <Common/PODArray.h>
#include <Common/Allocator.h>
#include <Columns/ColumnVector.h>
namespace DB
{
/// Bloom filter for strings.
class StringBloomFilter
class BloomFilter
{
public:
using UnderType = UInt64;
using Container = std::vector<UnderType>;
@ -17,16 +19,19 @@ public:
/// size -- size of filter in bytes.
/// hashes -- number of used hash functions.
/// seed -- random seed for hash functions generation.
StringBloomFilter(size_t size_, size_t hashes_, size_t seed_);
StringBloomFilter(const StringBloomFilter & bloom_filter);
BloomFilter(size_t size_, size_t hashes_, size_t seed_);
BloomFilter(const BloomFilter & bloom_filter);
bool find(const char * data, size_t len);
void add(const char * data, size_t len);
void clear();
void addHashWithSeed(const UInt64 & hash, const UInt64 & hash_seed);
bool findHashWithSeed(const UInt64 & hash, const UInt64 & hash_seed);
/// Checks if this contains everything from another bloom filter.
/// Bloom filters must have equal size and seed.
bool contains(const StringBloomFilter & bf);
bool contains(const BloomFilter & bf);
const Container & getFilter() const { return filter; }
Container & getFilter() { return filter; }
@ -34,7 +39,7 @@ public:
/// For debug.
UInt64 isEmpty() const;
friend bool operator== (const StringBloomFilter & a, const StringBloomFilter & b);
friend bool operator== (const BloomFilter & a, const BloomFilter & b);
private:
size_t size;
@ -44,7 +49,8 @@ private:
Container filter;
};
using BloomFilterPtr = std::shared_ptr<BloomFilter>;
bool operator== (const StringBloomFilter & a, const StringBloomFilter & b);
bool operator== (const BloomFilter & a, const BloomFilter & b);
}

View File

@ -0,0 +1,207 @@
#pragma once
#include <Columns/IColumn.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnFixedString.h>
#include <DataTypes/IDataType.h>
#include <DataTypes/DataTypesNumber.h>
#include <ext/bit_cast.h>
#include <Common/HashTable/Hash.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
}
struct BloomFilterHash
{
static constexpr UInt64 bf_hash_seed[15] = {
13635471485423070496ULL, 10336109063487487899ULL, 17779957404565211594ULL, 8988612159822229247ULL, 4954614162757618085ULL,
12980113590177089081ULL, 9263883436177860930ULL, 3656772712723269762ULL, 10362091744962961274ULL, 7582936617938287249ULL,
15033938188484401405ULL, 18286745649494826751ULL, 6852245486148412312ULL, 8886056245089344681ULL, 10151472371158292780ULL
};
static ColumnPtr hashWithField(const IDataType * data_type, const Field & field)
{
WhichDataType which(data_type);
if (which.isUInt() || which.isDateOrDateTime())
return ColumnConst::create(ColumnUInt64::create(1, intHash64(field.safeGet<UInt64>())), 1);
else if (which.isInt() || which.isEnum())
return ColumnConst::create(ColumnUInt64::create(1, intHash64(ext::bit_cast<UInt64>(field.safeGet<Int64>()))), 1);
else if (which.isFloat32() || which.isFloat64())
return ColumnConst::create(ColumnUInt64::create(1, intHash64(ext::bit_cast<UInt64>(field.safeGet<Float64>()))), 1);
else if (which.isString() || which.isFixedString())
{
const auto & value = field.safeGet<String>();
return ColumnConst::create(ColumnUInt64::create(1, CityHash_v1_0_2::CityHash64(value.data(), value.size())), 1);
}
else
throw Exception("Unexpected type " + data_type->getName() + " of bloom filter index.", ErrorCodes::LOGICAL_ERROR);
}
static ColumnPtr hashWithColumn(const DataTypePtr & data_type, const ColumnPtr & column, size_t pos, size_t limit)
{
auto index_column = ColumnUInt64::create(limit);
ColumnUInt64::Container & index_column_vec = index_column->getData();
getAnyTypeHash<true>(&*data_type, &*column, index_column_vec, pos);
return index_column;
}
template <bool is_first>
static void getAnyTypeHash(const IDataType * data_type, const IColumn * column, ColumnUInt64::Container & vec, size_t pos)
{
WhichDataType which(data_type);
if (which.isUInt8()) getNumberTypeHash<UInt8, is_first>(column, vec, pos);
else if (which.isUInt16()) getNumberTypeHash<UInt16, is_first>(column, vec, pos);
else if (which.isUInt32()) getNumberTypeHash<UInt32, is_first>(column, vec, pos);
else if (which.isUInt64()) getNumberTypeHash<UInt64, is_first>(column, vec, pos);
else if (which.isInt8()) getNumberTypeHash<Int8, is_first>(column, vec, pos);
else if (which.isInt16()) getNumberTypeHash<Int16, is_first>(column, vec, pos);
else if (which.isInt32()) getNumberTypeHash<Int32, is_first>(column, vec, pos);
else if (which.isInt64()) getNumberTypeHash<Int64, is_first>(column, vec, pos);
else if (which.isEnum8()) getNumberTypeHash<Int8, is_first>(column, vec, pos);
else if (which.isEnum16()) getNumberTypeHash<Int16, is_first>(column, vec, pos);
else if (which.isDate()) getNumberTypeHash<UInt16, is_first>(column, vec, pos);
else if (which.isDateTime()) getNumberTypeHash<UInt32, is_first>(column, vec, pos);
else if (which.isFloat32()) getNumberTypeHash<Float32, is_first>(column, vec, pos);
else if (which.isFloat64()) getNumberTypeHash<Float64, is_first>(column, vec, pos);
else if (which.isString()) getStringTypeHash<is_first>(column, vec, pos);
else if (which.isFixedString()) getStringTypeHash<is_first>(column, vec, pos);
else throw Exception("Unexpected type " + data_type->getName() + " of bloom filter index.", ErrorCodes::LOGICAL_ERROR);
}
template <typename Type, bool is_first>
static void getNumberTypeHash(const IColumn * column, ColumnUInt64::Container & vec, size_t pos)
{
const auto * index_column = typeid_cast<const ColumnVector<Type> *>(column);
if (unlikely(!index_column))
throw Exception("Illegal column type was passed to the bloom filter index.", ErrorCodes::ILLEGAL_COLUMN);
const typename ColumnVector<Type>::Container & vec_from = index_column->getData();
/// Because we're missing the precision of float in the Field.h
/// to be consistent, we need to convert Float32 to Float64 processing, also see: BloomFilterHash::hashWithField
if constexpr (std::is_same_v<ColumnVector<Type>, ColumnFloat32>)
{
for (size_t index = 0, size = vec.size(); index < size; ++index)
{
UInt64 hash = intHash64(ext::bit_cast<UInt64>(Float64(vec_from[index + pos])));
if constexpr (is_first)
vec[index] = hash;
else
vec[index] = CityHash_v1_0_2::Hash128to64(CityHash_v1_0_2::uint128(vec[index], hash));
}
}
else
{
for (size_t index = 0, size = vec.size(); index < size; ++index)
{
UInt64 hash = intHash64(ext::bit_cast<UInt64>(vec_from[index + pos]));
if constexpr (is_first)
vec[index] = hash;
else
vec[index] = CityHash_v1_0_2::Hash128to64(CityHash_v1_0_2::uint128(vec[index], hash));
}
}
}
template <bool is_first>
static void getStringTypeHash(const IColumn * column, ColumnUInt64::Container & vec, size_t pos)
{
if (const auto * index_column = typeid_cast<const ColumnString *>(column))
{
const ColumnString::Chars & data = index_column->getChars();
const ColumnString::Offsets & offsets = index_column->getOffsets();
ColumnString::Offset current_offset = pos;
for (size_t index = 0, size = vec.size(); index < size; ++index)
{
UInt64 city_hash = CityHash_v1_0_2::CityHash64(
reinterpret_cast<const char *>(&data[current_offset]), offsets[index + pos] - current_offset - 1);
if constexpr (is_first)
vec[index] = city_hash;
else
vec[index] = CityHash_v1_0_2::Hash128to64(CityHash_v1_0_2::uint128(vec[index], city_hash));
current_offset = offsets[index + pos];
}
}
else if (const auto * fixed_string_index_column = typeid_cast<const ColumnFixedString *>(column))
{
size_t fixed_len = fixed_string_index_column->getN();
const auto & data = fixed_string_index_column->getChars();
for (size_t index = 0, size = vec.size(); index < size; ++index)
{
UInt64 city_hash = CityHash_v1_0_2::CityHash64(reinterpret_cast<const char *>(&data[(index + pos) * fixed_len]), fixed_len);
if constexpr (is_first)
vec[index] = city_hash;
else
vec[index] = CityHash_v1_0_2::Hash128to64(CityHash_v1_0_2::uint128(vec[index], city_hash));
}
}
else
throw Exception("Illegal column type was passed to the bloom filter index.", ErrorCodes::ILLEGAL_COLUMN);
}
static std::pair<size_t, size_t> calculationBestPractices(double max_conflict_probability)
{
static const size_t MAX_BITS_PER_ROW = 20;
static const size_t MAX_HASH_FUNCTION_COUNT = 15;
/// For the smallest index per level in probability_lookup_table
static const size_t min_probability_index_each_bits[] = {0, 0, 1, 2, 3, 3, 4, 5, 6, 6, 7, 8, 8, 9, 10, 10, 11, 12, 12, 13, 14};
static const long double probability_lookup_table[MAX_BITS_PER_ROW + 1][MAX_HASH_FUNCTION_COUNT] =
{
{1.0}, /// dummy, 0 bits per row
{1.0, 1.0},
{1.0, 0.393, 0.400},
{1.0, 0.283, 0.237, 0.253},
{1.0, 0.221, 0.155, 0.147, 0.160},
{1.0, 0.181, 0.109, 0.092, 0.092, 0.101}, // 5
{1.0, 0.154, 0.0804, 0.0609, 0.0561, 0.0578, 0.0638},
{1.0, 0.133, 0.0618, 0.0423, 0.0359, 0.0347, 0.0364},
{1.0, 0.118, 0.0489, 0.0306, 0.024, 0.0217, 0.0216, 0.0229},
{1.0, 0.105, 0.0397, 0.0228, 0.0166, 0.0141, 0.0133, 0.0135, 0.0145},
{1.0, 0.0952, 0.0329, 0.0174, 0.0118, 0.00943, 0.00844, 0.00819, 0.00846}, // 10
{1.0, 0.0869, 0.0276, 0.0136, 0.00864, 0.0065, 0.00552, 0.00513, 0.00509},
{1.0, 0.08, 0.0236, 0.0108, 0.00646, 0.00459, 0.00371, 0.00329, 0.00314},
{1.0, 0.074, 0.0203, 0.00875, 0.00492, 0.00332, 0.00255, 0.00217, 0.00199, 0.00194},
{1.0, 0.0689, 0.0177, 0.00718, 0.00381, 0.00244, 0.00179, 0.00146, 0.00129, 0.00121, 0.0012},
{1.0, 0.0645, 0.0156, 0.00596, 0.003, 0.00183, 0.00128, 0.001, 0.000852, 0.000775, 0.000744}, // 15
{1.0, 0.0606, 0.0138, 0.005, 0.00239, 0.00139, 0.000935, 0.000702, 0.000574, 0.000505, 0.00047, 0.000459},
{1.0, 0.0571, 0.0123, 0.00423, 0.00193, 0.00107, 0.000692, 0.000499, 0.000394, 0.000335, 0.000302, 0.000287, 0.000284},
{1.0, 0.054, 0.0111, 0.00362, 0.00158, 0.000839, 0.000519, 0.00036, 0.000275, 0.000226, 0.000198, 0.000183, 0.000176},
{1.0, 0.0513, 0.00998, 0.00312, 0.0013, 0.000663, 0.000394, 0.000264, 0.000194, 0.000155, 0.000132, 0.000118, 0.000111, 0.000109},
{1.0, 0.0488, 0.00906, 0.0027, 0.00108, 0.00053, 0.000303, 0.000196, 0.00014, 0.000108, 8.89e-05, 7.77e-05, 7.12e-05, 6.79e-05, 6.71e-05} // 20
};
for (size_t bits_per_row = 1; bits_per_row < MAX_BITS_PER_ROW; ++bits_per_row)
{
if (probability_lookup_table[bits_per_row][min_probability_index_each_bits[bits_per_row]] <= max_conflict_probability)
{
size_t max_size_of_hash_functions = min_probability_index_each_bits[bits_per_row];
for (size_t size_of_hash_functions = max_size_of_hash_functions; size_of_hash_functions > 0; --size_of_hash_functions)
if (probability_lookup_table[bits_per_row][size_of_hash_functions] > max_conflict_probability)
return std::pair<size_t, size_t>(bits_per_row, size_of_hash_functions + 1);
}
}
return std::pair<size_t, size_t>(MAX_BITS_PER_ROW - 1, min_probability_index_each_bits[MAX_BITS_PER_ROW - 1]);
}
};
}

View File

@ -518,7 +518,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
RangesInDataParts parts_with_ranges;
std::vector<std::pair<MergeTreeIndexPtr, IndexConditionPtr>> useful_indices;
std::vector<std::pair<MergeTreeIndexPtr, MergeTreeIndexConditionPtr>> useful_indices;
for (const auto & index : data.skip_indices)
{
auto condition = index->createIndexCondition(query_info, context);
@ -998,7 +998,7 @@ MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange(
MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
MergeTreeIndexPtr index,
IndexConditionPtr condition,
MergeTreeIndexConditionPtr condition,
MergeTreeData::DataPartPtr part,
const MarkRanges & ranges,
const Settings & settings) const

View File

@ -84,7 +84,7 @@ private:
MarkRanges filterMarksUsingIndex(
MergeTreeIndexPtr index,
IndexConditionPtr condition,
MergeTreeIndexConditionPtr condition,
MergeTreeData::DataPartPtr part,
const MarkRanges & ranges,
const Settings & settings) const;

View File

@ -0,0 +1,62 @@
#include <Storages/MergeTree/MergeTreeIndexAggregatorBloomFilter.h>
#include <ext/bit_cast.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnFixedString.h>
#include <Common/HashTable/Hash.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/BloomFilterHash.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int ILLEGAL_COLUMN;
}
MergeTreeIndexAggregatorBloomFilter::MergeTreeIndexAggregatorBloomFilter(
size_t bits_per_row_, size_t hash_functions_, const Names & columns_name_)
: bits_per_row(bits_per_row_), hash_functions(hash_functions_), index_columns_name(columns_name_)
{
}
bool MergeTreeIndexAggregatorBloomFilter::empty() const
{
return !total_rows;
}
MergeTreeIndexGranulePtr MergeTreeIndexAggregatorBloomFilter::getGranuleAndReset()
{
const auto granule = std::make_shared<MergeTreeIndexGranuleBloomFilter>(bits_per_row, hash_functions, total_rows, granule_index_blocks);
total_rows = 0;
granule_index_blocks.clear();
return granule;
}
void MergeTreeIndexAggregatorBloomFilter::update(const Block & block, size_t * pos, size_t limit)
{
if (*pos >= block.rows())
throw Exception("The provided position is not less than the number of block rows. Position: " + toString(*pos) + ", Block rows: " +
toString(block.rows()) + ".", ErrorCodes::LOGICAL_ERROR);
Block granule_index_block;
size_t max_read_rows = std::min(block.rows() - *pos, limit);
for (size_t index = 0; index < index_columns_name.size(); ++index)
{
const auto & column_and_type = block.getByName(index_columns_name[index]);
const auto & index_column = BloomFilterHash::hashWithColumn(column_and_type.type, column_and_type.column, *pos, max_read_rows);
granule_index_block.insert({std::move(index_column), std::make_shared<DataTypeUInt64>(), column_and_type.name});
}
*pos += max_read_rows;
total_rows += max_read_rows;
granule_index_blocks.push_back(granule_index_block);
}
}

View File

@ -0,0 +1,29 @@
#pragma once
#include <Storages/MergeTree/MergeTreeIndices.h>
#include <Storages/MergeTree/MergeTreeIndexGranuleBloomFilter.h>
namespace DB
{
class MergeTreeIndexAggregatorBloomFilter : public IMergeTreeIndexAggregator
{
public:
MergeTreeIndexAggregatorBloomFilter(size_t bits_per_row_, size_t hash_functions_, const Names & columns_name_);
bool empty() const override;
MergeTreeIndexGranulePtr getGranuleAndReset() override;
void update(const Block & block, size_t * pos, size_t limit) override;
private:
size_t bits_per_row;
size_t hash_functions;
const Names index_columns_name;
size_t total_rows = 0;
Blocks granule_index_blocks;
};
}

View File

@ -0,0 +1,110 @@
#include <Storages/MergeTree/MergeTreeIndexBloomFilter.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Interpreters/SyntaxAnalyzer.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Core/Types.h>
#include <ext/bit_cast.h>
#include <Parsers/ASTLiteral.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <DataTypes/DataTypeNullable.h>
#include <Storages/MergeTree/MergeTreeIndexConditionBloomFilter.h>
#include <Parsers/queryToString.h>
#include <Columns/ColumnConst.h>
#include <Interpreters/BloomFilterHash.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int INCORRECT_QUERY;
}
MergeTreeIndexBloomFilter::MergeTreeIndexBloomFilter(
const String & name_, const ExpressionActionsPtr & expr_, const Names & columns_, const DataTypes & data_types_, const Block & header_,
size_t granularity_, size_t bits_per_row_, size_t hash_functions_)
: IMergeTreeIndex(name_, expr_, columns_, data_types_, header_, granularity_), bits_per_row(bits_per_row_),
hash_functions(hash_functions_)
{
}
MergeTreeIndexGranulePtr MergeTreeIndexBloomFilter::createIndexGranule() const
{
return std::make_shared<MergeTreeIndexGranuleBloomFilter>(bits_per_row, hash_functions, columns.size());
}
bool MergeTreeIndexBloomFilter::mayBenefitFromIndexForIn(const ASTPtr & node) const
{
const String & column_name = node->getColumnName();
for (const auto & name : columns)
if (column_name == name)
return true;
if (const auto * func = typeid_cast<const ASTFunction *>(node.get()))
{
for (const auto & children : func->arguments->children)
if (mayBenefitFromIndexForIn(children))
return true;
}
return false;
}
MergeTreeIndexAggregatorPtr MergeTreeIndexBloomFilter::createIndexAggregator() const
{
return std::make_shared<MergeTreeIndexAggregatorBloomFilter>(bits_per_row, hash_functions, columns);
}
MergeTreeIndexConditionPtr MergeTreeIndexBloomFilter::createIndexCondition(const SelectQueryInfo & query_info, const Context & context) const
{
return std::make_shared<MergeTreeIndexConditionBloomFilter>(query_info, context, header, hash_functions);
}
static void assertIndexColumnsType(const Block & header)
{
if (!header || !header.columns())
throw Exception("Index must have columns.", ErrorCodes::INCORRECT_QUERY);
const DataTypes & columns_data_types = header.getDataTypes();
for (size_t index = 0; index < columns_data_types.size(); ++index)
{
WhichDataType which(columns_data_types[index]);
if (!which.isUInt() && !which.isInt() && !which.isString() && !which.isFixedString() && !which.isFloat() &&
!which.isDateOrDateTime() && !which.isEnum())
throw Exception("Unexpected type " + columns_data_types[index]->getName() + " of bloom filter index.",
ErrorCodes::ILLEGAL_COLUMN);
}
}
std::unique_ptr<IMergeTreeIndex> bloomFilterIndexCreatorNew(
const NamesAndTypesList & columns, std::shared_ptr<ASTIndexDeclaration> node, const Context & context)
{
if (node->name.empty())
throw Exception("Index must have unique name.", ErrorCodes::INCORRECT_QUERY);
ASTPtr expr_list = MergeTreeData::extractKeyExpressionList(node->expr->clone());
auto syntax = SyntaxAnalyzer(context, {}).analyze(expr_list, columns);
auto index_expr = ExpressionAnalyzer(expr_list, syntax, context).getActions(false);
auto index_sample = ExpressionAnalyzer(expr_list, syntax, context).getActions(true)->getSampleBlock();
assertIndexColumnsType(index_sample);
double max_conflict_probability = 0.025;
if (node->type->arguments && !node->type->arguments->children.empty())
max_conflict_probability = typeid_cast<const ASTLiteral &>(*node->type->arguments->children[0]).value.get<Float64>();
const auto & bits_per_row_and_size_of_hash_functions = BloomFilterHash::calculationBestPractices(max_conflict_probability);
return std::make_unique<MergeTreeIndexBloomFilter>(
node->name, std::move(index_expr), index_sample.getNames(), index_sample.getDataTypes(), index_sample, node->granularity,
bits_per_row_and_size_of_hash_functions.first, bits_per_row_and_size_of_hash_functions.second);
}
}

View File

@ -0,0 +1,31 @@
#pragma once
#include <Interpreters/BloomFilter.h>
#include <Storages/MergeTree/MergeTreeIndices.h>
#include <Storages/MergeTree/MergeTreeIndexGranuleBloomFilter.h>
#include <Storages/MergeTree/MergeTreeIndexAggregatorBloomFilter.h>
namespace DB
{
class MergeTreeIndexBloomFilter : public IMergeTreeIndex
{
public:
MergeTreeIndexBloomFilter(
const String & name_, const ExpressionActionsPtr & expr_, const Names & columns_, const DataTypes & data_types_,
const Block & header_, size_t granularity_, size_t bits_per_row_, size_t hash_functions_);
MergeTreeIndexGranulePtr createIndexGranule() const override;
MergeTreeIndexAggregatorPtr createIndexAggregator() const override;
MergeTreeIndexConditionPtr createIndexCondition(const SelectQueryInfo & query_info, const Context & context) const override;
bool mayBenefitFromIndexForIn(const ASTPtr & node) const override;
private:
size_t bits_per_row;
size_t hash_functions;
};
}

View File

@ -0,0 +1,352 @@
#include <Storages/MergeTree/MergeTreeIndexConditionBloomFilter.h>
#include <Interpreters/QueryNormalizer.h>
#include <Interpreters/BloomFilterHash.h>
#include <Common/HashTable/ClearableHashMap.h>
#include <Storages/MergeTree/RPNBuilder.h>
#include <Storages/MergeTree/MergeTreeIndexGranuleBloomFilter.h>
#include <DataTypes/DataTypeTuple.h>
#include <Columns/ColumnConst.h>
#include <ext/bit_cast.h>
#include <Parsers/ASTSubquery.h>
#include <Parsers/ASTIdentifier.h>
#include <Columns/ColumnTuple.h>
#include <Interpreters/castColumn.h>
#include <Interpreters/convertFieldToType.h>
namespace DB
{
namespace
{
PreparedSetKey getPreparedSetKey(const ASTPtr & node, const DataTypePtr & data_type)
{
/// If the data type is tuple, let's try unbox once
if (node->as<ASTSubquery>() || node->as<ASTIdentifier>())
return PreparedSetKey::forSubquery(*node);
if (const auto * date_type_tuple = typeid_cast<const DataTypeTuple *>(&*data_type))
return PreparedSetKey::forLiteral(*node, date_type_tuple->getElements());
return PreparedSetKey::forLiteral(*node, DataTypes(1, data_type));
}
ColumnWithTypeAndName getPreparedSetInfo(const SetPtr & prepared_set)
{
if (prepared_set->getDataTypes().size() == 1)
return {prepared_set->getSetElements()[0], prepared_set->getDataTypes()[0], "dummy"};
return {ColumnTuple::create(prepared_set->getSetElements()), std::make_shared<DataTypeTuple>(prepared_set->getDataTypes()), "dummy"};
}
bool maybeTrueOnBloomFilter(const IColumn * hash_column, const BloomFilterPtr & bloom_filter, size_t hash_functions)
{
const auto const_column = typeid_cast<const ColumnConst *>(hash_column);
const auto non_const_column = typeid_cast<const ColumnUInt64 *>(hash_column);
if (!const_column && !non_const_column)
throw Exception("LOGICAL ERROR: hash column must be Const Column or UInt64 Column.", ErrorCodes::LOGICAL_ERROR);
if (const_column)
{
for (size_t index = 0; index < hash_functions; ++index)
if (!bloom_filter->findHashWithSeed(const_column->getValue<UInt64>(), BloomFilterHash::bf_hash_seed[index]))
return false;
return true;
}
else
{
bool missing_rows = true;
const ColumnUInt64::Container & data = non_const_column->getData();
for (size_t index = 0, size = data.size(); missing_rows && index < size; ++index)
{
bool match_row = true;
for (size_t hash_index = 0; match_row && hash_index < hash_functions; ++hash_index)
match_row = bloom_filter->findHashWithSeed(data[index], BloomFilterHash::bf_hash_seed[hash_index]);
missing_rows = !match_row;
}
return !missing_rows;
}
}
}
MergeTreeIndexConditionBloomFilter::MergeTreeIndexConditionBloomFilter(
const SelectQueryInfo & info, const Context & context, const Block & header, size_t hash_functions)
: header(header), context(context), query_info(info), hash_functions(hash_functions)
{
auto atomFromAST = [this](auto & node, auto &, auto & constants, auto & out) { return traverseAtomAST(node, constants, out); };
rpn = std::move(RPNBuilder<RPNElement>(info, context, atomFromAST).extractRPN());
}
bool MergeTreeIndexConditionBloomFilter::alwaysUnknownOrTrue() const
{
std::vector<bool> rpn_stack;
for (const auto & element : rpn)
{
if (element.function == RPNElement::FUNCTION_UNKNOWN
|| element.function == RPNElement::ALWAYS_TRUE)
{
rpn_stack.push_back(true);
}
else if (element.function == RPNElement::FUNCTION_EQUALS
|| element.function == RPNElement::FUNCTION_NOT_EQUALS
|| element.function == RPNElement::FUNCTION_IN
|| element.function == RPNElement::FUNCTION_NOT_IN
|| element.function == RPNElement::ALWAYS_FALSE)
{
rpn_stack.push_back(false);
}
else if (element.function == RPNElement::FUNCTION_NOT)
{
// do nothing
}
else if (element.function == RPNElement::FUNCTION_AND)
{
auto arg1 = rpn_stack.back();
rpn_stack.pop_back();
auto arg2 = rpn_stack.back();
rpn_stack.back() = arg1 && arg2;
}
else if (element.function == RPNElement::FUNCTION_OR)
{
auto arg1 = rpn_stack.back();
rpn_stack.pop_back();
auto arg2 = rpn_stack.back();
rpn_stack.back() = arg1 || arg2;
}
else
throw Exception("Unexpected function type in KeyCondition::RPNElement", ErrorCodes::LOGICAL_ERROR);
}
return rpn_stack[0];
}
bool MergeTreeIndexConditionBloomFilter::mayBeTrueOnGranule(const MergeTreeIndexGranuleBloomFilter * granule) const
{
std::vector<BoolMask> rpn_stack;
const auto & filters = granule->getFilters();
for (const auto & element : rpn)
{
if (element.function == RPNElement::FUNCTION_UNKNOWN)
{
rpn_stack.emplace_back(true, true);
}
else if (element.function == RPNElement::FUNCTION_IN
|| element.function == RPNElement::FUNCTION_NOT_IN
|| element.function == RPNElement::FUNCTION_EQUALS
|| element.function == RPNElement::FUNCTION_NOT_EQUALS)
{
bool match_rows = true;
const auto & predicate = element.predicate;
for (size_t index = 0; match_rows && index < predicate.size(); ++index)
{
const auto & query_index_hash = predicate[index];
const auto & filter = filters[query_index_hash.first];
const ColumnPtr & hash_column = query_index_hash.second;
match_rows = maybeTrueOnBloomFilter(&*hash_column, filter, hash_functions);
}
rpn_stack.emplace_back(match_rows, !match_rows);
if (element.function == RPNElement::FUNCTION_NOT_EQUALS || element.function == RPNElement::FUNCTION_NOT_IN)
rpn_stack.back() = !rpn_stack.back();
}
else if (element.function == RPNElement::FUNCTION_NOT)
{
rpn_stack.back() = !rpn_stack.back();
}
else if (element.function == RPNElement::FUNCTION_OR)
{
auto arg1 = rpn_stack.back();
rpn_stack.pop_back();
auto arg2 = rpn_stack.back();
rpn_stack.back() = arg1 | arg2;
}
else if (element.function == RPNElement::FUNCTION_AND)
{
auto arg1 = rpn_stack.back();
rpn_stack.pop_back();
auto arg2 = rpn_stack.back();
rpn_stack.back() = arg1 & arg2;
}
else if (element.function == RPNElement::ALWAYS_TRUE)
{
rpn_stack.emplace_back(true, false);
}
else if (element.function == RPNElement::ALWAYS_FALSE)
{
rpn_stack.emplace_back(false, true);
}
else
throw Exception("Unexpected function type in KeyCondition::RPNElement", ErrorCodes::LOGICAL_ERROR);
}
if (rpn_stack.size() != 1)
throw Exception("Unexpected stack size in KeyCondition::mayBeTrueInRange", ErrorCodes::LOGICAL_ERROR);
return rpn_stack[0].can_be_true;
}
bool MergeTreeIndexConditionBloomFilter::traverseAtomAST(const ASTPtr & node, Block & block_with_constants, RPNElement & out)
{
{
Field const_value;
DataTypePtr const_type;
if (KeyCondition::getConstant(node, block_with_constants, const_value, const_type))
{
if (const_value.getType() == Field::Types::UInt64 || const_value.getType() == Field::Types::Int64 ||
const_value.getType() == Field::Types::Float64)
{
/// Zero in all types is represented in memory the same way as in UInt64.
out.function = const_value.get<UInt64>() ? RPNElement::ALWAYS_TRUE : RPNElement::ALWAYS_FALSE;
return true;
}
}
}
if (const auto * function = node->as<ASTFunction>())
{
const ASTs & arguments = function->arguments->children;
if (arguments.size() != 2)
return false;
if (functionIsInOrGlobalInOperator(function->name))
{
if (const auto & prepared_set = getPreparedSet(arguments[1]))
return traverseASTIn(function->name, arguments[0], prepared_set, out);
}
else if (function->name == "equals" || function->name == "notEquals")
{
Field const_value;
DataTypePtr const_type;
if (KeyCondition::getConstant(arguments[1], block_with_constants, const_value, const_type))
return traverseASTEquals(function->name, arguments[0], const_type, const_value, out);
else if (KeyCondition::getConstant(arguments[0], block_with_constants, const_value, const_type))
return traverseASTEquals(function->name, arguments[1], const_type, const_value, out);
}
}
return false;
}
bool MergeTreeIndexConditionBloomFilter::traverseASTIn(
const String & function_name, const ASTPtr & key_ast, const SetPtr & prepared_set, RPNElement & out)
{
const auto & prepared_info = getPreparedSetInfo(prepared_set);
return traverseASTIn(function_name, key_ast, prepared_info.type, prepared_info.column, out);
}
bool MergeTreeIndexConditionBloomFilter::traverseASTIn(
const String & function_name, const ASTPtr & key_ast, const DataTypePtr & type, const ColumnPtr & column, RPNElement & out)
{
if (header.has(key_ast->getColumnName()))
{
size_t row_size = column->size();
size_t position = header.getPositionByName(key_ast->getColumnName());
const DataTypePtr & index_type = header.getByPosition(position).type;
const auto & converted_column = castColumn(ColumnWithTypeAndName{column, type, ""}, index_type, context);
out.predicate.emplace_back(std::make_pair(position, BloomFilterHash::hashWithColumn(index_type, converted_column, 0, row_size)));
if (function_name == "in" || function_name == "globalIn")
out.function = RPNElement::FUNCTION_IN;
if (function_name == "notIn" || function_name == "globalNotIn")
out.function = RPNElement::FUNCTION_NOT_IN;
return true;
}
if (const auto * function = key_ast->as<ASTFunction>())
{
WhichDataType which(type);
if (which.isTuple() && function->name == "tuple")
{
const auto & tuple_column = typeid_cast<const ColumnTuple *>(column.get());
const auto & tuple_data_type = typeid_cast<const DataTypeTuple *>(type.get());
const ASTs & arguments = typeid_cast<const ASTExpressionList &>(*function->arguments).children;
if (tuple_data_type->getElements().size() != arguments.size() || tuple_column->getColumns().size() != arguments.size())
throw Exception("Illegal types of arguments of function " + function_name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
bool match_with_subtype = false;
const auto & sub_columns = tuple_column->getColumns();
const auto & sub_data_types = tuple_data_type->getElements();
for (size_t index = 0; index < arguments.size(); ++index)
match_with_subtype |= traverseASTIn(function_name, arguments[index], sub_data_types[index], sub_columns[index], out);
return match_with_subtype;
}
}
return false;
}
bool MergeTreeIndexConditionBloomFilter::traverseASTEquals(
const String & function_name, const ASTPtr & key_ast, const DataTypePtr & value_type, const Field & value_field, RPNElement & out)
{
if (header.has(key_ast->getColumnName()))
{
size_t position = header.getPositionByName(key_ast->getColumnName());
const DataTypePtr & index_type = header.getByPosition(position).type;
Field converted_field = convertFieldToType(value_field, *index_type, &*value_type);
out.predicate.emplace_back(std::make_pair(position, BloomFilterHash::hashWithField(&*index_type, converted_field)));
out.function = function_name == "equals" ? RPNElement::FUNCTION_EQUALS : RPNElement::FUNCTION_NOT_EQUALS;
return true;
}
if (const auto * function = key_ast->as<ASTFunction>())
{
WhichDataType which(value_type);
if (which.isTuple() && function->name == "tuple")
{
const TupleBackend & tuple = get<const Tuple &>(value_field).toUnderType();
const auto value_tuple_data_type = typeid_cast<const DataTypeTuple *>(value_type.get());
const ASTs & arguments = typeid_cast<const ASTExpressionList &>(*function->arguments).children;
if (tuple.size() != arguments.size())
throw Exception("Illegal types of arguments of function " + function_name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
bool match_with_subtype = false;
const DataTypes & subtypes = value_tuple_data_type->getElements();
for (size_t index = 0; index < tuple.size(); ++index)
match_with_subtype |= traverseASTEquals(function_name, arguments[index], subtypes[index], tuple[index], out);
return match_with_subtype;
}
}
return false;
}
SetPtr MergeTreeIndexConditionBloomFilter::getPreparedSet(const ASTPtr & node)
{
if (header.has(node->getColumnName()))
{
const auto & column_and_type = header.getByName(node->getColumnName());
const auto & prepared_set_it = query_info.sets.find(getPreparedSetKey(node, column_and_type.type));
if (prepared_set_it != query_info.sets.end() && prepared_set_it->second->hasExplicitSetElements())
return prepared_set_it->second;
}
else
{
for (const auto & prepared_set_it : query_info.sets)
if (prepared_set_it.first.ast_hash == node->getTreeHash() && prepared_set_it.second->hasExplicitSetElements())
return prepared_set_it.second;
}
return DB::SetPtr();
}
}

View File

@ -0,0 +1,74 @@
#pragma once
#include <Columns/IColumn.h>
#include <Interpreters/BloomFilter.h>
#include <Storages/MergeTree/KeyCondition.h>
#include <Storages/MergeTree/MergeTreeIndices.h>
#include <Storages/MergeTree/MergeTreeIndexGranuleBloomFilter.h>
namespace DB
{
class MergeTreeIndexConditionBloomFilter : public IMergeTreeIndexCondition
{
public:
struct RPNElement
{
enum Function
{
/// Atoms of a Boolean expression.
FUNCTION_EQUALS,
FUNCTION_NOT_EQUALS,
FUNCTION_IN,
FUNCTION_NOT_IN,
FUNCTION_UNKNOWN, /// Can take any value.
/// Operators of the logical expression.
FUNCTION_NOT,
FUNCTION_AND,
FUNCTION_OR,
/// Constants
ALWAYS_FALSE,
ALWAYS_TRUE,
};
RPNElement(Function function_ = FUNCTION_UNKNOWN) : function(function_) {}
Function function = FUNCTION_UNKNOWN;
std::vector<std::pair<size_t, ColumnPtr>> predicate;
};
MergeTreeIndexConditionBloomFilter(const SelectQueryInfo & info, const Context & context, const Block & header, size_t hash_functions);
bool alwaysUnknownOrTrue() const override;
bool mayBeTrueOnGranule(MergeTreeIndexGranulePtr granule) const override
{
if (const auto & bf_granule = typeid_cast<const MergeTreeIndexGranuleBloomFilter *>(granule.get()))
return mayBeTrueOnGranule(bf_granule);
throw Exception("LOGICAL ERROR: require bloom filter index granule.", ErrorCodes::LOGICAL_ERROR);
}
private:
const Block & header;
const Context & context;
const SelectQueryInfo & query_info;
const size_t hash_functions;
std::vector<RPNElement> rpn;
SetPtr getPreparedSet(const ASTPtr & node);
bool mayBeTrueOnGranule(const MergeTreeIndexGranuleBloomFilter * granule) const;
bool traverseAtomAST(const ASTPtr & node, Block & block_with_constants, RPNElement & out);
bool traverseASTIn(const String & function_name, const ASTPtr & key_ast, const SetPtr & prepared_set, RPNElement & out);
bool traverseASTIn(
const String & function_name, const ASTPtr & key_ast, const DataTypePtr & type, const ColumnPtr & column, RPNElement & out);
bool traverseASTEquals(
const String & function_name, const ASTPtr & key_ast, const DataTypePtr & value_type, const Field & value_field, RPNElement & out);
};
}

View File

@ -1,4 +1,4 @@
#include <Storages/MergeTree/MergeTreeBloomFilterIndex.h>
#include <Storages/MergeTree/MergeTreeIndexFullText.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/UTF8Helpers.h>
@ -31,7 +31,7 @@ namespace ErrorCodes
/// Adds all tokens from string to bloom filter.
static void stringToBloomFilter(
const char * data, size_t size, const std::unique_ptr<ITokenExtractor> & token_extractor, StringBloomFilter & bloom_filter)
const char * data, size_t size, const std::unique_ptr<ITokenExtractor> & token_extractor, BloomFilter & bloom_filter)
{
size_t cur = 0;
size_t token_start = 0;
@ -42,7 +42,7 @@ static void stringToBloomFilter(
/// Adds all tokens from like pattern string to bloom filter. (Because like pattern can contain `\%` and `\_`.)
static void likeStringToBloomFilter(
const String & data, const std::unique_ptr<ITokenExtractor> & token_extractor, StringBloomFilter & bloom_filter)
const String & data, const std::unique_ptr<ITokenExtractor> & token_extractor, BloomFilter & bloom_filter)
{
size_t cur = 0;
String token;
@ -51,24 +51,23 @@ static void likeStringToBloomFilter(
}
MergeTreeBloomFilterIndexGranule::MergeTreeBloomFilterIndexGranule(const MergeTreeBloomFilterIndex & index)
MergeTreeIndexGranuleFullText::MergeTreeIndexGranuleFullText(const MergeTreeIndexFullText & index)
: IMergeTreeIndexGranule()
, index(index)
, bloom_filters(
index.columns.size(), StringBloomFilter(index.bloom_filter_size, index.bloom_filter_hashes, index.seed))
index.columns.size(), BloomFilter(index.bloom_filter_size, index.bloom_filter_hashes, index.seed))
, has_elems(false) {}
void MergeTreeBloomFilterIndexGranule::serializeBinary(WriteBuffer & ostr) const
void MergeTreeIndexGranuleFullText::serializeBinary(WriteBuffer & ostr) const
{
if (empty())
throw Exception(
"Attempt to write empty minmax index " + backQuote(index.name), ErrorCodes::LOGICAL_ERROR);
throw Exception("Attempt to write empty minmax index " + backQuote(index.name), ErrorCodes::LOGICAL_ERROR);
for (const auto & bloom_filter : bloom_filters)
ostr.write(reinterpret_cast<const char *>(bloom_filter.getFilter().data()), index.bloom_filter_size);
}
void MergeTreeBloomFilterIndexGranule::deserializeBinary(ReadBuffer & istr)
void MergeTreeIndexGranuleFullText::deserializeBinary(ReadBuffer & istr)
{
for (auto & bloom_filter : bloom_filters)
{
@ -78,17 +77,17 @@ void MergeTreeBloomFilterIndexGranule::deserializeBinary(ReadBuffer & istr)
}
MergeTreeBloomFilterIndexAggregator::MergeTreeBloomFilterIndexAggregator(const MergeTreeBloomFilterIndex & index)
: index(index), granule(std::make_shared<MergeTreeBloomFilterIndexGranule>(index)) {}
MergeTreeIndexAggregatorFullText::MergeTreeIndexAggregatorFullText(const MergeTreeIndexFullText & index)
: index(index), granule(std::make_shared<MergeTreeIndexGranuleFullText>(index)) {}
MergeTreeIndexGranulePtr MergeTreeBloomFilterIndexAggregator::getGranuleAndReset()
MergeTreeIndexGranulePtr MergeTreeIndexAggregatorFullText::getGranuleAndReset()
{
auto new_granule = std::make_shared<MergeTreeBloomFilterIndexGranule>(index);
auto new_granule = std::make_shared<MergeTreeIndexGranuleFullText>(index);
new_granule.swap(granule);
return new_granule;
}
void MergeTreeBloomFilterIndexAggregator::update(const Block & block, size_t * pos, size_t limit)
void MergeTreeIndexAggregatorFullText::update(const Block & block, size_t * pos, size_t limit)
{
if (*pos >= block.rows())
throw Exception(
@ -111,14 +110,14 @@ void MergeTreeBloomFilterIndexAggregator::update(const Block & block, size_t * p
}
const BloomFilterCondition::AtomMap BloomFilterCondition::atom_map
const MergeTreeConditionFullText::AtomMap MergeTreeConditionFullText::atom_map
{
{
"notEquals",
[] (RPNElement & out, const Field & value, const MergeTreeBloomFilterIndex & idx)
[] (RPNElement & out, const Field & value, const MergeTreeIndexFullText & idx)
{
out.function = RPNElement::FUNCTION_NOT_EQUALS;
out.bloom_filter = std::make_unique<StringBloomFilter>(
out.bloom_filter = std::make_unique<BloomFilter>(
idx.bloom_filter_size, idx.bloom_filter_hashes, idx.seed);
const auto & str = value.get<String>();
@ -128,10 +127,10 @@ const BloomFilterCondition::AtomMap BloomFilterCondition::atom_map
},
{
"equals",
[] (RPNElement & out, const Field & value, const MergeTreeBloomFilterIndex & idx)
[] (RPNElement & out, const Field & value, const MergeTreeIndexFullText & idx)
{
out.function = RPNElement::FUNCTION_EQUALS;
out.bloom_filter = std::make_unique<StringBloomFilter>(
out.bloom_filter = std::make_unique<BloomFilter>(
idx.bloom_filter_size, idx.bloom_filter_hashes, idx.seed);
const auto & str = value.get<String>();
@ -141,10 +140,10 @@ const BloomFilterCondition::AtomMap BloomFilterCondition::atom_map
},
{
"like",
[] (RPNElement & out, const Field & value, const MergeTreeBloomFilterIndex & idx)
[] (RPNElement & out, const Field & value, const MergeTreeIndexFullText & idx)
{
out.function = RPNElement::FUNCTION_LIKE;
out.bloom_filter = std::make_unique<StringBloomFilter>(
out.bloom_filter = std::make_unique<BloomFilter>(
idx.bloom_filter_size, idx.bloom_filter_hashes, idx.seed);
const auto & str = value.get<String>();
@ -154,7 +153,7 @@ const BloomFilterCondition::AtomMap BloomFilterCondition::atom_map
},
{
"notIn",
[] (RPNElement & out, const Field &, const MergeTreeBloomFilterIndex &)
[] (RPNElement & out, const Field &, const MergeTreeIndexFullText &)
{
out.function = RPNElement::FUNCTION_NOT_IN;
return true;
@ -162,7 +161,7 @@ const BloomFilterCondition::AtomMap BloomFilterCondition::atom_map
},
{
"in",
[] (RPNElement & out, const Field &, const MergeTreeBloomFilterIndex &)
[] (RPNElement & out, const Field &, const MergeTreeIndexFullText &)
{
out.function = RPNElement::FUNCTION_IN;
return true;
@ -170,24 +169,21 @@ const BloomFilterCondition::AtomMap BloomFilterCondition::atom_map
},
};
BloomFilterCondition::BloomFilterCondition(
MergeTreeConditionFullText::MergeTreeConditionFullText(
const SelectQueryInfo & query_info,
const Context & context,
const MergeTreeBloomFilterIndex & index_) : index(index_), prepared_sets(query_info.sets)
const MergeTreeIndexFullText & index_) : index(index_), prepared_sets(query_info.sets)
{
rpn = std::move(
RPNBuilder<RPNElement>(
query_info, context,
[this] (const ASTPtr & node,
const Context & /* context */,
Block & block_with_constants,
RPNElement & out) -> bool
[this] (const ASTPtr & node, const Context & /* context */, Block & block_with_constants, RPNElement & out) -> bool
{
return this->atomFromAST(node, block_with_constants, out);
}).extractRPN());
}
bool BloomFilterCondition::alwaysUnknownOrTrue() const
bool MergeTreeConditionFullText::alwaysUnknownOrTrue() const
{
/// Check like in KeyCondition.
std::vector<bool> rpn_stack;
@ -234,10 +230,10 @@ bool BloomFilterCondition::alwaysUnknownOrTrue() const
return rpn_stack[0];
}
bool BloomFilterCondition::mayBeTrueOnGranule(MergeTreeIndexGranulePtr idx_granule) const
bool MergeTreeConditionFullText::mayBeTrueOnGranule(MergeTreeIndexGranulePtr idx_granule) const
{
std::shared_ptr<MergeTreeBloomFilterIndexGranule> granule
= std::dynamic_pointer_cast<MergeTreeBloomFilterIndexGranule>(idx_granule);
std::shared_ptr<MergeTreeIndexGranuleFullText> granule
= std::dynamic_pointer_cast<MergeTreeIndexGranuleFullText>(idx_granule);
if (!granule)
throw Exception(
"BloomFilter index condition got a granule with the wrong type.", ErrorCodes::LOGICAL_ERROR);
@ -323,7 +319,7 @@ bool BloomFilterCondition::mayBeTrueOnGranule(MergeTreeIndexGranulePtr idx_granu
return rpn_stack[0].can_be_true;
}
bool BloomFilterCondition::getKey(const ASTPtr & node, size_t & key_column_num)
bool MergeTreeConditionFullText::getKey(const ASTPtr & node, size_t & key_column_num)
{
auto it = std::find(index.columns.begin(), index.columns.end(), node->getColumnName());
if (it == index.columns.end())
@ -333,7 +329,7 @@ bool BloomFilterCondition::getKey(const ASTPtr & node, size_t & key_column_num)
return true;
}
bool BloomFilterCondition::atomFromAST(
bool MergeTreeConditionFullText::atomFromAST(
const ASTPtr & node, Block & block_with_constants, RPNElement & out)
{
Field const_value;
@ -399,7 +395,7 @@ bool BloomFilterCondition::atomFromAST(
return false;
}
bool BloomFilterCondition::tryPrepareSetBloomFilter(
bool MergeTreeConditionFullText::tryPrepareSetBloomFilter(
const ASTs & args,
RPNElement & out)
{
@ -454,7 +450,7 @@ bool BloomFilterCondition::tryPrepareSetBloomFilter(
if (data_type->getTypeId() != TypeIndex::String && data_type->getTypeId() != TypeIndex::FixedString)
return false;
std::vector<std::vector<StringBloomFilter>> bloom_filters;
std::vector<std::vector<BloomFilter>> bloom_filters;
std::vector<size_t> key_position;
Columns columns = prepared_set->getSetElements();
@ -480,23 +476,23 @@ bool BloomFilterCondition::tryPrepareSetBloomFilter(
}
MergeTreeIndexGranulePtr MergeTreeBloomFilterIndex::createIndexGranule() const
MergeTreeIndexGranulePtr MergeTreeIndexFullText::createIndexGranule() const
{
return std::make_shared<MergeTreeBloomFilterIndexGranule>(*this);
return std::make_shared<MergeTreeIndexGranuleFullText>(*this);
}
MergeTreeIndexAggregatorPtr MergeTreeBloomFilterIndex::createIndexAggregator() const
MergeTreeIndexAggregatorPtr MergeTreeIndexFullText::createIndexAggregator() const
{
return std::make_shared<MergeTreeBloomFilterIndexAggregator>(*this);
return std::make_shared<MergeTreeIndexAggregatorFullText>(*this);
}
IndexConditionPtr MergeTreeBloomFilterIndex::createIndexCondition(
MergeTreeIndexConditionPtr MergeTreeIndexFullText::createIndexCondition(
const SelectQueryInfo & query, const Context & context) const
{
return std::make_shared<BloomFilterCondition>(query, context, *this);
return std::make_shared<MergeTreeConditionFullText>(query, context, *this);
};
bool MergeTreeBloomFilterIndex::mayBenefitFromIndexForIn(const ASTPtr & node) const
bool MergeTreeIndexFullText::mayBenefitFromIndexForIn(const ASTPtr & node) const
{
return std::find(std::cbegin(columns), std::cend(columns), node->getColumnName()) != std::cend(columns);
}
@ -679,7 +675,7 @@ std::unique_ptr<IMergeTreeIndex> bloomFilterIndexCreator(
auto tokenizer = std::make_unique<NgramTokenExtractor>(n);
return std::make_unique<MergeTreeBloomFilterIndex>(
return std::make_unique<MergeTreeIndexFullText>(
node->name, std::move(index_expr), columns, data_types, sample, node->granularity,
bloom_filter_size, bloom_filter_hashes, seed, std::move(tokenizer));
}
@ -697,7 +693,7 @@ std::unique_ptr<IMergeTreeIndex> bloomFilterIndexCreator(
auto tokenizer = std::make_unique<SplitTokenExtractor>();
return std::make_unique<MergeTreeBloomFilterIndex>(
return std::make_unique<MergeTreeIndexFullText>(
node->name, std::move(index_expr), columns, data_types, sample, node->granularity,
bloom_filter_size, bloom_filter_hashes, seed, std::move(tokenizer));
}

View File

@ -10,54 +10,54 @@
namespace DB
{
class MergeTreeBloomFilterIndex;
class MergeTreeIndexFullText;
struct MergeTreeBloomFilterIndexGranule : public IMergeTreeIndexGranule
struct MergeTreeIndexGranuleFullText : public IMergeTreeIndexGranule
{
explicit MergeTreeBloomFilterIndexGranule(
const MergeTreeBloomFilterIndex & index);
explicit MergeTreeIndexGranuleFullText(
const MergeTreeIndexFullText & index);
~MergeTreeBloomFilterIndexGranule() override = default;
~MergeTreeIndexGranuleFullText() override = default;
void serializeBinary(WriteBuffer & ostr) const override;
void deserializeBinary(ReadBuffer & istr) override;
bool empty() const override { return !has_elems; }
const MergeTreeBloomFilterIndex & index;
std::vector<StringBloomFilter> bloom_filters;
const MergeTreeIndexFullText & index;
std::vector<BloomFilter> bloom_filters;
bool has_elems;
};
using MergeTreeBloomFilterIndexGranulePtr = std::shared_ptr<MergeTreeBloomFilterIndexGranule>;
using MergeTreeIndexGranuleFullTextPtr = std::shared_ptr<MergeTreeIndexGranuleFullText>;
struct MergeTreeBloomFilterIndexAggregator : IMergeTreeIndexAggregator
struct MergeTreeIndexAggregatorFullText : IMergeTreeIndexAggregator
{
explicit MergeTreeBloomFilterIndexAggregator(const MergeTreeBloomFilterIndex & index);
explicit MergeTreeIndexAggregatorFullText(const MergeTreeIndexFullText & index);
~MergeTreeBloomFilterIndexAggregator() override = default;
~MergeTreeIndexAggregatorFullText() override = default;
bool empty() const override { return !granule || granule->empty(); }
MergeTreeIndexGranulePtr getGranuleAndReset() override;
void update(const Block & block, size_t * pos, size_t limit) override;
const MergeTreeBloomFilterIndex & index;
MergeTreeBloomFilterIndexGranulePtr granule;
const MergeTreeIndexFullText & index;
MergeTreeIndexGranuleFullTextPtr granule;
};
class BloomFilterCondition : public IIndexCondition
class MergeTreeConditionFullText : public IMergeTreeIndexCondition
{
public:
BloomFilterCondition(
MergeTreeConditionFullText(
const SelectQueryInfo & query_info,
const Context & context,
const MergeTreeBloomFilterIndex & index_);
const MergeTreeIndexFullText & index_);
~BloomFilterCondition() override = default;
~MergeTreeConditionFullText() override = default;
bool alwaysUnknownOrTrue() const override;
@ -93,19 +93,19 @@ private:
};
RPNElement(
Function function_ = FUNCTION_UNKNOWN, size_t key_column_ = 0, std::unique_ptr<StringBloomFilter> && const_bloom_filter_ = nullptr)
Function function_ = FUNCTION_UNKNOWN, size_t key_column_ = 0, std::unique_ptr<BloomFilter> && const_bloom_filter_ = nullptr)
: function(function_), key_column(key_column_), bloom_filter(std::move(const_bloom_filter_)) {}
Function function = FUNCTION_UNKNOWN;
/// For FUNCTION_EQUALS, FUNCTION_NOT_EQUALS, FUNCTION_LIKE, FUNCTION_NOT_LIKE.
size_t key_column;
std::unique_ptr<StringBloomFilter> bloom_filter;
std::unique_ptr<BloomFilter> bloom_filter;
/// For FUNCTION_IN and FUNCTION_NOT_IN
std::vector<std::vector<StringBloomFilter>> set_bloom_filters;
std::vector<std::vector<BloomFilter>> set_bloom_filters;
std::vector<size_t> set_key_position;
};
using AtomMap = std::unordered_map<std::string, bool(*)(RPNElement & out, const Field & value, const MergeTreeBloomFilterIndex & idx)>;
using AtomMap = std::unordered_map<std::string, bool(*)(RPNElement & out, const Field & value, const MergeTreeIndexFullText & idx)>;
using RPN = std::vector<RPNElement>;
bool atomFromAST(const ASTPtr & node, Block & block_with_constants, RPNElement & out);
@ -115,7 +115,7 @@ private:
static const AtomMap atom_map;
const MergeTreeBloomFilterIndex & index;
const MergeTreeIndexFullText & index;
RPN rpn;
/// Sets from syntax analyzer.
PreparedSets prepared_sets;
@ -164,10 +164,10 @@ struct SplitTokenExtractor : public ITokenExtractor
};
class MergeTreeBloomFilterIndex : public IMergeTreeIndex
class MergeTreeIndexFullText : public IMergeTreeIndex
{
public:
MergeTreeBloomFilterIndex(
MergeTreeIndexFullText(
String name_,
ExpressionActionsPtr expr_,
const Names & columns_,
@ -184,12 +184,12 @@ public:
, seed(seed_)
, token_extractor_func(std::move(token_extractor_func_)) {}
~MergeTreeBloomFilterIndex() override = default;
~MergeTreeIndexFullText() override = default;
MergeTreeIndexGranulePtr createIndexGranule() const override;
MergeTreeIndexAggregatorPtr createIndexAggregator() const override;
IndexConditionPtr createIndexCondition(
MergeTreeIndexConditionPtr createIndexCondition(
const SelectQueryInfo & query, const Context & context) const override;
bool mayBenefitFromIndexForIn(const ASTPtr & node) const override;

View File

@ -0,0 +1,115 @@
#include <Storages/MergeTree/MergeTreeIndexGranuleBloomFilter.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnFixedString.h>
#include <DataTypes/DataTypeNullable.h>
#include <Common/HashTable/Hash.h>
#include <ext/bit_cast.h>
#include <Interpreters/BloomFilterHash.h>
namespace DB
{
MergeTreeIndexGranuleBloomFilter::MergeTreeIndexGranuleBloomFilter(size_t bits_per_row, size_t hash_functions, size_t index_columns)
: bits_per_row(bits_per_row), hash_functions(hash_functions)
{
total_rows = 0;
bloom_filters.resize(index_columns);
}
MergeTreeIndexGranuleBloomFilter::MergeTreeIndexGranuleBloomFilter(
size_t bits_per_row, size_t hash_functions, size_t total_rows, const Blocks & granule_index_blocks)
: total_rows(total_rows), bits_per_row(bits_per_row), hash_functions(hash_functions)
{
if (granule_index_blocks.empty() || !total_rows)
throw Exception("LOGICAL ERROR: granule_index_blocks empty or total_rows is zero.", ErrorCodes::LOGICAL_ERROR);
assertGranuleBlocksStructure(granule_index_blocks);
for (size_t index = 0; index < granule_index_blocks.size(); ++index)
{
Block granule_index_block = granule_index_blocks[index];
if (unlikely(!granule_index_block || !granule_index_block.rows()))
throw Exception("LOGICAL ERROR: granule_index_block is empty.", ErrorCodes::LOGICAL_ERROR);
if (index == 0)
{
static size_t atom_size = 8;
size_t bytes_size = (bits_per_row * total_rows + atom_size - 1) / atom_size;
for (size_t column = 0, columns = granule_index_block.columns(); column < columns; ++column)
bloom_filters.emplace_back(std::make_shared<BloomFilter>(bytes_size, hash_functions, 0));
}
for (size_t column = 0, columns = granule_index_block.columns(); column < columns; ++column)
fillingBloomFilter(bloom_filters[column], granule_index_block, column);
}
}
bool MergeTreeIndexGranuleBloomFilter::empty() const
{
return !total_rows;
}
void MergeTreeIndexGranuleBloomFilter::deserializeBinary(ReadBuffer & istr)
{
if (!empty())
throw Exception("Cannot read data to a non-empty bloom filter index.", ErrorCodes::LOGICAL_ERROR);
readVarUInt(total_rows, istr);
for (size_t index = 0; index < bloom_filters.size(); ++index)
{
static size_t atom_size = 8;
size_t bytes_size = (bits_per_row * total_rows + atom_size - 1) / atom_size;
bloom_filters[index] = std::make_shared<BloomFilter>(bytes_size, hash_functions, 0);
istr.read(reinterpret_cast<char *>(bloom_filters[index]->getFilter().data()), bytes_size);
}
}
void MergeTreeIndexGranuleBloomFilter::serializeBinary(WriteBuffer & ostr) const
{
if (empty())
throw Exception("Attempt to write empty bloom filter index.", ErrorCodes::LOGICAL_ERROR);
static size_t atom_size = 8;
writeVarUInt(total_rows, ostr);
size_t bytes_size = (bits_per_row * total_rows + atom_size - 1) / atom_size;
for (const auto & bloom_filter : bloom_filters)
ostr.write(reinterpret_cast<const char *>(bloom_filter->getFilter().data()), bytes_size);
}
void MergeTreeIndexGranuleBloomFilter::assertGranuleBlocksStructure(const Blocks & granule_index_blocks) const
{
Block prev_block;
for (size_t index = 0; index < granule_index_blocks.size(); ++index)
{
Block granule_index_block = granule_index_blocks[index];
if (index != 0)
assertBlocksHaveEqualStructure(prev_block, granule_index_block, "Granule blocks of bloom filter has difference structure.");
prev_block = granule_index_block;
}
}
void MergeTreeIndexGranuleBloomFilter::fillingBloomFilter(BloomFilterPtr & bf, const Block & granule_index_block, size_t index_hash_column)
{
const auto & column = granule_index_block.getByPosition(index_hash_column);
if (const auto hash_column = typeid_cast<const ColumnUInt64 *>(column.column.get()))
{
const auto & hash_column_vec = hash_column->getData();
for (size_t index = 0, size = hash_column_vec.size(); index < size; ++index)
{
const UInt64 & bf_base_hash = hash_column_vec[index];
for (size_t i = 0; i < hash_functions; ++i)
bf->addHashWithSeed(bf_base_hash, BloomFilterHash::bf_hash_seed[i]);
}
}
}
}

View File

@ -0,0 +1,36 @@
#pragma once
#include <Interpreters/BloomFilter.h>
#include <Storages/MergeTree/MergeTreeIndices.h>
namespace DB
{
class MergeTreeIndexGranuleBloomFilter : public IMergeTreeIndexGranule
{
public:
MergeTreeIndexGranuleBloomFilter(size_t bits_per_row, size_t hash_functions, size_t index_columns);
MergeTreeIndexGranuleBloomFilter(size_t bits_per_row, size_t hash_functions, size_t total_rows, const Blocks & granule_index_blocks);
bool empty() const override;
void serializeBinary(WriteBuffer & ostr) const override;
void deserializeBinary(ReadBuffer & istr) override;
const std::vector<BloomFilterPtr> getFilters() const { return bloom_filters; }
private:
size_t total_rows;
size_t bits_per_row;
size_t hash_functions;
std::vector<BloomFilterPtr> bloom_filters;
void assertGranuleBlocksStructure(const Blocks & granule_index_blocks) const;
void fillingBloomFilter(BloomFilterPtr & bf, const Block & granule_index_block, size_t index_hash_column);
};
}

View File

@ -1,4 +1,4 @@
#include <Storages/MergeTree/MergeTreeMinMaxIndex.h>
#include <Storages/MergeTree/MergeTreeIndexMinMax.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/ExpressionAnalyzer.h>
@ -16,14 +16,14 @@ namespace ErrorCodes
}
MergeTreeMinMaxGranule::MergeTreeMinMaxGranule(const MergeTreeMinMaxIndex & index)
MergeTreeIndexGranuleMinMax::MergeTreeIndexGranuleMinMax(const MergeTreeIndexMinMax & index)
: IMergeTreeIndexGranule(), index(index), parallelogram() {}
MergeTreeMinMaxGranule::MergeTreeMinMaxGranule(
const MergeTreeMinMaxIndex & index, std::vector<Range> && parallelogram)
MergeTreeIndexGranuleMinMax::MergeTreeIndexGranuleMinMax(
const MergeTreeIndexMinMax & index, std::vector<Range> && parallelogram)
: IMergeTreeIndexGranule(), index(index), parallelogram(std::move(parallelogram)) {}
void MergeTreeMinMaxGranule::serializeBinary(WriteBuffer & ostr) const
void MergeTreeIndexGranuleMinMax::serializeBinary(WriteBuffer & ostr) const
{
if (empty())
throw Exception(
@ -50,7 +50,7 @@ void MergeTreeMinMaxGranule::serializeBinary(WriteBuffer & ostr) const
}
}
void MergeTreeMinMaxGranule::deserializeBinary(ReadBuffer & istr)
void MergeTreeIndexGranuleMinMax::deserializeBinary(ReadBuffer & istr)
{
parallelogram.clear();
Field min_val;
@ -83,15 +83,15 @@ void MergeTreeMinMaxGranule::deserializeBinary(ReadBuffer & istr)
}
MergeTreeMinMaxAggregator::MergeTreeMinMaxAggregator(const MergeTreeMinMaxIndex & index)
MergeTreeIndexAggregatorMinMax::MergeTreeIndexAggregatorMinMax(const MergeTreeIndexMinMax & index)
: index(index) {}
MergeTreeIndexGranulePtr MergeTreeMinMaxAggregator::getGranuleAndReset()
MergeTreeIndexGranulePtr MergeTreeIndexAggregatorMinMax::getGranuleAndReset()
{
return std::make_shared<MergeTreeMinMaxGranule>(index, std::move(parallelogram));
return std::make_shared<MergeTreeIndexGranuleMinMax>(index, std::move(parallelogram));
}
void MergeTreeMinMaxAggregator::update(const Block & block, size_t * pos, size_t limit)
void MergeTreeIndexAggregatorMinMax::update(const Block & block, size_t * pos, size_t limit)
{
if (*pos >= block.rows())
throw Exception(
@ -122,21 +122,21 @@ void MergeTreeMinMaxAggregator::update(const Block & block, size_t * pos, size_t
}
MinMaxCondition::MinMaxCondition(
MergeTreeIndexConditionMinMax::MergeTreeIndexConditionMinMax(
const SelectQueryInfo &query,
const Context &context,
const MergeTreeMinMaxIndex &index)
: IIndexCondition(), index(index), condition(query, context, index.columns, index.expr) {}
const MergeTreeIndexMinMax &index)
: IMergeTreeIndexCondition(), index(index), condition(query, context, index.columns, index.expr) {}
bool MinMaxCondition::alwaysUnknownOrTrue() const
bool MergeTreeIndexConditionMinMax::alwaysUnknownOrTrue() const
{
return condition.alwaysUnknownOrTrue();
}
bool MinMaxCondition::mayBeTrueOnGranule(MergeTreeIndexGranulePtr idx_granule) const
bool MergeTreeIndexConditionMinMax::mayBeTrueOnGranule(MergeTreeIndexGranulePtr idx_granule) const
{
std::shared_ptr<MergeTreeMinMaxGranule> granule
= std::dynamic_pointer_cast<MergeTreeMinMaxGranule>(idx_granule);
std::shared_ptr<MergeTreeIndexGranuleMinMax> granule
= std::dynamic_pointer_cast<MergeTreeIndexGranuleMinMax>(idx_granule);
if (!granule)
throw Exception(
"Minmax index condition got a granule with the wrong type.", ErrorCodes::LOGICAL_ERROR);
@ -147,25 +147,25 @@ bool MinMaxCondition::mayBeTrueOnGranule(MergeTreeIndexGranulePtr idx_granule) c
}
MergeTreeIndexGranulePtr MergeTreeMinMaxIndex::createIndexGranule() const
MergeTreeIndexGranulePtr MergeTreeIndexMinMax::createIndexGranule() const
{
return std::make_shared<MergeTreeMinMaxGranule>(*this);
return std::make_shared<MergeTreeIndexGranuleMinMax>(*this);
}
MergeTreeIndexAggregatorPtr MergeTreeMinMaxIndex::createIndexAggregator() const
MergeTreeIndexAggregatorPtr MergeTreeIndexMinMax::createIndexAggregator() const
{
return std::make_shared<MergeTreeMinMaxAggregator>(*this);
return std::make_shared<MergeTreeIndexAggregatorMinMax>(*this);
}
IndexConditionPtr MergeTreeMinMaxIndex::createIndexCondition(
MergeTreeIndexConditionPtr MergeTreeIndexMinMax::createIndexCondition(
const SelectQueryInfo & query, const Context & context) const
{
return std::make_shared<MinMaxCondition>(query, context, *this);
return std::make_shared<MergeTreeIndexConditionMinMax>(query, context, *this);
};
bool MergeTreeMinMaxIndex::mayBenefitFromIndexForIn(const ASTPtr & node) const
bool MergeTreeIndexMinMax::mayBenefitFromIndexForIn(const ASTPtr & node) const
{
const String column_name = node->getColumnName();
@ -210,7 +210,7 @@ std::unique_ptr<IMergeTreeIndex> minmaxIndexCreator(
data_types.emplace_back(column.type);
}
return std::make_unique<MergeTreeMinMaxIndex>(
return std::make_unique<MergeTreeIndexMinMax>(
node->name, std::move(minmax_expr), columns, data_types, sample, node->granularity);
}

View File

@ -10,62 +10,62 @@
namespace DB
{
class MergeTreeMinMaxIndex;
class MergeTreeIndexMinMax;
struct MergeTreeMinMaxGranule : public IMergeTreeIndexGranule
struct MergeTreeIndexGranuleMinMax : public IMergeTreeIndexGranule
{
explicit MergeTreeMinMaxGranule(const MergeTreeMinMaxIndex & index);
MergeTreeMinMaxGranule(const MergeTreeMinMaxIndex & index, std::vector<Range> && parallelogram);
~MergeTreeMinMaxGranule() override = default;
explicit MergeTreeIndexGranuleMinMax(const MergeTreeIndexMinMax & index);
MergeTreeIndexGranuleMinMax(const MergeTreeIndexMinMax & index, std::vector<Range> && parallelogram);
~MergeTreeIndexGranuleMinMax() override = default;
void serializeBinary(WriteBuffer & ostr) const override;
void deserializeBinary(ReadBuffer & istr) override;
bool empty() const override { return parallelogram.empty(); }
const MergeTreeMinMaxIndex & index;
const MergeTreeIndexMinMax & index;
std::vector<Range> parallelogram;
};
struct MergeTreeMinMaxAggregator : IMergeTreeIndexAggregator
struct MergeTreeIndexAggregatorMinMax : IMergeTreeIndexAggregator
{
explicit MergeTreeMinMaxAggregator(const MergeTreeMinMaxIndex & index);
~MergeTreeMinMaxAggregator() override = default;
explicit MergeTreeIndexAggregatorMinMax(const MergeTreeIndexMinMax & index);
~MergeTreeIndexAggregatorMinMax() override = default;
bool empty() const override { return parallelogram.empty(); }
MergeTreeIndexGranulePtr getGranuleAndReset() override;
void update(const Block & block, size_t * pos, size_t limit) override;
const MergeTreeMinMaxIndex & index;
const MergeTreeIndexMinMax & index;
std::vector<Range> parallelogram;
};
class MinMaxCondition : public IIndexCondition
class MergeTreeIndexConditionMinMax : public IMergeTreeIndexCondition
{
public:
MinMaxCondition(
MergeTreeIndexConditionMinMax(
const SelectQueryInfo & query,
const Context & context,
const MergeTreeMinMaxIndex & index);
const MergeTreeIndexMinMax & index);
bool alwaysUnknownOrTrue() const override;
bool mayBeTrueOnGranule(MergeTreeIndexGranulePtr idx_granule) const override;
~MinMaxCondition() override = default;
~MergeTreeIndexConditionMinMax() override = default;
private:
const MergeTreeMinMaxIndex & index;
const MergeTreeIndexMinMax & index;
KeyCondition condition;
};
class MergeTreeMinMaxIndex : public IMergeTreeIndex
class MergeTreeIndexMinMax : public IMergeTreeIndex
{
public:
MergeTreeMinMaxIndex(
MergeTreeIndexMinMax(
String name_,
ExpressionActionsPtr expr_,
const Names & columns_,
@ -74,12 +74,12 @@ public:
size_t granularity_)
: IMergeTreeIndex(name_, expr_, columns_, data_types_, header_, granularity_) {}
~MergeTreeMinMaxIndex() override = default;
~MergeTreeIndexMinMax() override = default;
MergeTreeIndexGranulePtr createIndexGranule() const override;
MergeTreeIndexAggregatorPtr createIndexAggregator() const override;
IndexConditionPtr createIndexCondition(
MergeTreeIndexConditionPtr createIndexCondition(
const SelectQueryInfo & query, const Context & context) const override;
bool mayBenefitFromIndexForIn(const ASTPtr & node) const override;

View File

@ -1,4 +1,4 @@
#include <Storages/MergeTree/MergeTreeSetSkippingIndex.h>
#include <Storages/MergeTree/MergeTreeIndexSet.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/ExpressionAnalyzer.h>
@ -21,18 +21,18 @@ namespace ErrorCodes
const Field UNKNOWN_FIELD(3u);
MergeTreeSetIndexGranule::MergeTreeSetIndexGranule(const MergeTreeSetSkippingIndex & index)
MergeTreeIndexGranuleSet::MergeTreeIndexGranuleSet(const MergeTreeIndexSet & index)
: IMergeTreeIndexGranule()
, index(index)
, block(index.header.cloneEmpty()) {}
MergeTreeSetIndexGranule::MergeTreeSetIndexGranule(
const MergeTreeSetSkippingIndex & index, MutableColumns && mutable_columns)
MergeTreeIndexGranuleSet::MergeTreeIndexGranuleSet(
const MergeTreeIndexSet & index, MutableColumns && mutable_columns)
: IMergeTreeIndexGranule()
, index(index)
, block(index.header.cloneWithColumns(std::move(mutable_columns))) {}
void MergeTreeSetIndexGranule::serializeBinary(WriteBuffer & ostr) const
void MergeTreeIndexGranuleSet::serializeBinary(WriteBuffer & ostr) const
{
if (empty())
throw Exception(
@ -64,7 +64,7 @@ void MergeTreeSetIndexGranule::serializeBinary(WriteBuffer & ostr) const
}
}
void MergeTreeSetIndexGranule::deserializeBinary(ReadBuffer & istr)
void MergeTreeIndexGranuleSet::deserializeBinary(ReadBuffer & istr)
{
block.clear();
@ -94,7 +94,7 @@ void MergeTreeSetIndexGranule::deserializeBinary(ReadBuffer & istr)
}
MergeTreeSetIndexAggregator::MergeTreeSetIndexAggregator(const MergeTreeSetSkippingIndex & index)
MergeTreeIndexAggregatorSet::MergeTreeIndexAggregatorSet(const MergeTreeIndexSet & index)
: index(index), columns(index.header.cloneEmptyColumns())
{
ColumnRawPtrs column_ptrs;
@ -111,7 +111,7 @@ MergeTreeSetIndexAggregator::MergeTreeSetIndexAggregator(const MergeTreeSetSkipp
columns = index.header.cloneEmptyColumns();
}
void MergeTreeSetIndexAggregator::update(const Block & block, size_t * pos, size_t limit)
void MergeTreeIndexAggregatorSet::update(const Block & block, size_t * pos, size_t limit)
{
if (*pos >= block.rows())
throw Exception(
@ -164,7 +164,7 @@ void MergeTreeSetIndexAggregator::update(const Block & block, size_t * pos, size
}
template <typename Method>
bool MergeTreeSetIndexAggregator::buildFilter(
bool MergeTreeIndexAggregatorSet::buildFilter(
Method & method,
const ColumnRawPtrs & column_ptrs,
IColumn::Filter & filter,
@ -190,9 +190,9 @@ bool MergeTreeSetIndexAggregator::buildFilter(
return has_new_data;
}
MergeTreeIndexGranulePtr MergeTreeSetIndexAggregator::getGranuleAndReset()
MergeTreeIndexGranulePtr MergeTreeIndexAggregatorSet::getGranuleAndReset()
{
auto granule = std::make_shared<MergeTreeSetIndexGranule>(index, std::move(columns));
auto granule = std::make_shared<MergeTreeIndexGranuleSet>(index, std::move(columns));
switch (data.type)
{
@ -212,11 +212,11 @@ MergeTreeIndexGranulePtr MergeTreeSetIndexAggregator::getGranuleAndReset()
}
SetIndexCondition::SetIndexCondition(
MergeTreeIndexConditionSet::MergeTreeIndexConditionSet(
const SelectQueryInfo & query,
const Context & context,
const MergeTreeSetSkippingIndex &index)
: IIndexCondition(), index(index)
const MergeTreeIndexSet &index)
: IMergeTreeIndexCondition(), index(index)
{
for (size_t i = 0, size = index.columns.size(); i < size; ++i)
{
@ -253,14 +253,14 @@ SetIndexCondition::SetIndexCondition(
actions = ExpressionAnalyzer(expression_ast, syntax_analyzer_result, context).getActions(true);
}
bool SetIndexCondition::alwaysUnknownOrTrue() const
bool MergeTreeIndexConditionSet::alwaysUnknownOrTrue() const
{
return useless;
}
bool SetIndexCondition::mayBeTrueOnGranule(MergeTreeIndexGranulePtr idx_granule) const
bool MergeTreeIndexConditionSet::mayBeTrueOnGranule(MergeTreeIndexGranulePtr idx_granule) const
{
auto granule = std::dynamic_pointer_cast<MergeTreeSetIndexGranule>(idx_granule);
auto granule = std::dynamic_pointer_cast<MergeTreeIndexGranuleSet>(idx_granule);
if (!granule)
throw Exception(
"Set index condition got a granule with the wrong type.", ErrorCodes::LOGICAL_ERROR);
@ -294,7 +294,7 @@ bool SetIndexCondition::mayBeTrueOnGranule(MergeTreeIndexGranulePtr idx_granule)
return false;
}
void SetIndexCondition::traverseAST(ASTPtr & node) const
void MergeTreeIndexConditionSet::traverseAST(ASTPtr & node) const
{
if (operatorFromAST(node))
{
@ -309,7 +309,7 @@ void SetIndexCondition::traverseAST(ASTPtr & node) const
node = std::make_shared<ASTLiteral>(UNKNOWN_FIELD);
}
bool SetIndexCondition::atomFromAST(ASTPtr & node) const
bool MergeTreeIndexConditionSet::atomFromAST(ASTPtr & node) const
{
/// Function, literal or column
@ -340,7 +340,7 @@ bool SetIndexCondition::atomFromAST(ASTPtr & node) const
return false;
}
bool SetIndexCondition::operatorFromAST(ASTPtr & node) const
bool MergeTreeIndexConditionSet::operatorFromAST(ASTPtr & node) const
{
/// Functions AND, OR, NOT. Replace with bit*.
auto * func = node->as<ASTFunction>();
@ -416,7 +416,7 @@ static bool checkAtomName(const String & name)
return atoms.find(name) != atoms.end();
}
bool SetIndexCondition::checkASTUseless(const ASTPtr &node, bool atomic) const
bool MergeTreeIndexConditionSet::checkASTUseless(const ASTPtr &node, bool atomic) const
{
if (const auto * func = node->as<ASTFunction>())
{
@ -446,23 +446,23 @@ bool SetIndexCondition::checkASTUseless(const ASTPtr &node, bool atomic) const
}
MergeTreeIndexGranulePtr MergeTreeSetSkippingIndex::createIndexGranule() const
MergeTreeIndexGranulePtr MergeTreeIndexSet::createIndexGranule() const
{
return std::make_shared<MergeTreeSetIndexGranule>(*this);
return std::make_shared<MergeTreeIndexGranuleSet>(*this);
}
MergeTreeIndexAggregatorPtr MergeTreeSetSkippingIndex::createIndexAggregator() const
MergeTreeIndexAggregatorPtr MergeTreeIndexSet::createIndexAggregator() const
{
return std::make_shared<MergeTreeSetIndexAggregator>(*this);
return std::make_shared<MergeTreeIndexAggregatorSet>(*this);
}
IndexConditionPtr MergeTreeSetSkippingIndex::createIndexCondition(
MergeTreeIndexConditionPtr MergeTreeIndexSet::createIndexCondition(
const SelectQueryInfo & query, const Context & context) const
{
return std::make_shared<SetIndexCondition>(query, context, *this);
return std::make_shared<MergeTreeIndexConditionSet>(query, context, *this);
};
bool MergeTreeSetSkippingIndex::mayBenefitFromIndexForIn(const ASTPtr &) const
bool MergeTreeIndexSet::mayBenefitFromIndexForIn(const ASTPtr &) const
{
return false;
}
@ -506,7 +506,7 @@ std::unique_ptr<IMergeTreeIndex> setIndexCreator(
header.insert(ColumnWithTypeAndName(column.type->createColumn(), column.type, column.name));
}
return std::make_unique<MergeTreeSetSkippingIndex>(
return std::make_unique<MergeTreeIndexSet>(
node->name, std::move(unique_expr), columns, data_types, header, node->granularity, max_rows);
}

View File

@ -12,12 +12,12 @@
namespace DB
{
class MergeTreeSetSkippingIndex;
class MergeTreeIndexSet;
struct MergeTreeSetIndexGranule : public IMergeTreeIndexGranule
struct MergeTreeIndexGranuleSet : public IMergeTreeIndexGranule
{
explicit MergeTreeSetIndexGranule(const MergeTreeSetSkippingIndex & index);
MergeTreeSetIndexGranule(const MergeTreeSetSkippingIndex & index, MutableColumns && columns);
explicit MergeTreeIndexGranuleSet(const MergeTreeIndexSet & index);
MergeTreeIndexGranuleSet(const MergeTreeIndexSet & index, MutableColumns && columns);
void serializeBinary(WriteBuffer & ostr) const override;
void deserializeBinary(ReadBuffer & istr) override;
@ -25,17 +25,17 @@ struct MergeTreeSetIndexGranule : public IMergeTreeIndexGranule
size_t size() const { return block.rows(); }
bool empty() const override { return !size(); }
~MergeTreeSetIndexGranule() override = default;
~MergeTreeIndexGranuleSet() override = default;
const MergeTreeSetSkippingIndex & index;
const MergeTreeIndexSet & index;
Block block;
};
struct MergeTreeSetIndexAggregator : IMergeTreeIndexAggregator
struct MergeTreeIndexAggregatorSet : IMergeTreeIndexAggregator
{
explicit MergeTreeSetIndexAggregator(const MergeTreeSetSkippingIndex & index);
~MergeTreeSetIndexAggregator() override = default;
explicit MergeTreeIndexAggregatorSet(const MergeTreeIndexSet & index);
~MergeTreeIndexAggregatorSet() override = default;
size_t size() const { return data.getTotalRowCount(); }
bool empty() const override { return !size(); }
@ -55,26 +55,26 @@ private:
size_t limit,
ClearableSetVariants & variants) const;
const MergeTreeSetSkippingIndex & index;
const MergeTreeIndexSet & index;
ClearableSetVariants data;
Sizes key_sizes;
MutableColumns columns;
};
class SetIndexCondition : public IIndexCondition
class MergeTreeIndexConditionSet : public IMergeTreeIndexCondition
{
public:
SetIndexCondition(
MergeTreeIndexConditionSet(
const SelectQueryInfo & query,
const Context & context,
const MergeTreeSetSkippingIndex & index);
const MergeTreeIndexSet & index);
bool alwaysUnknownOrTrue() const override;
bool mayBeTrueOnGranule(MergeTreeIndexGranulePtr idx_granule) const override;
~SetIndexCondition() override = default;
~MergeTreeIndexConditionSet() override = default;
private:
void traverseAST(ASTPtr & node) const;
bool atomFromAST(ASTPtr & node) const;
@ -82,7 +82,7 @@ private:
bool checkASTUseless(const ASTPtr &node, bool atomic = false) const;
const MergeTreeSetSkippingIndex & index;
const MergeTreeIndexSet & index;
bool useless;
std::set<String> key_columns;
@ -91,10 +91,10 @@ private:
};
class MergeTreeSetSkippingIndex : public IMergeTreeIndex
class MergeTreeIndexSet : public IMergeTreeIndex
{
public:
MergeTreeSetSkippingIndex(
MergeTreeIndexSet(
String name_,
ExpressionActionsPtr expr_,
const Names & columns_,
@ -104,12 +104,12 @@ public:
size_t max_rows_)
: IMergeTreeIndex(std::move(name_), std::move(expr_), columns_, data_types_, header_, granularity_), max_rows(max_rows_) {}
~MergeTreeSetSkippingIndex() override = default;
~MergeTreeIndexSet() override = default;
MergeTreeIndexGranulePtr createIndexGranule() const override;
MergeTreeIndexAggregatorPtr createIndexAggregator() const override;
IndexConditionPtr createIndexCondition(
MergeTreeIndexConditionPtr createIndexCondition(
const SelectQueryInfo & query, const Context & context) const override;
bool mayBenefitFromIndexForIn(const ASTPtr & node) const override;

View File

@ -19,7 +19,7 @@ namespace ErrorCodes
extern const int UNKNOWN_EXCEPTION;
}
void MergeTreeIndexFactory::registerIndex(const std::string &name, Creator creator)
void MergeTreeIndexFactory::registerIndex(const std::string & name, Creator creator)
{
if (!indexes.emplace(name, std::move(creator)).second)
throw Exception("MergeTreeIndexFactory: the Index creator name '" + name + "' is not unique",
@ -70,6 +70,11 @@ std::unique_ptr<IMergeTreeIndex> bloomFilterIndexCreator(
std::shared_ptr<ASTIndexDeclaration> node,
const Context & context);
std::unique_ptr<IMergeTreeIndex> bloomFilterIndexCreatorNew(
const NamesAndTypesList & columns,
std::shared_ptr<ASTIndexDeclaration> node,
const Context & context);
MergeTreeIndexFactory::MergeTreeIndexFactory()
{
@ -77,6 +82,7 @@ MergeTreeIndexFactory::MergeTreeIndexFactory()
registerIndex("set", setIndexCreator);
registerIndex("ngrambf_v1", bloomFilterIndexCreator);
registerIndex("tokenbf_v1", bloomFilterIndexCreator);
registerIndex("bloom_filter", bloomFilterIndexCreatorNew);
}
}

View File

@ -59,17 +59,17 @@ using MergeTreeIndexAggregators = std::vector<MergeTreeIndexAggregatorPtr>;
/// Condition on the index.
class IIndexCondition
class IMergeTreeIndexCondition
{
public:
virtual ~IIndexCondition() = default;
virtual ~IMergeTreeIndexCondition() = default;
/// Checks if this index is useful for query.
virtual bool alwaysUnknownOrTrue() const = 0;
virtual bool mayBeTrueOnGranule(MergeTreeIndexGranulePtr granule) const = 0;
};
using IndexConditionPtr = std::shared_ptr<IIndexCondition>;
using MergeTreeIndexConditionPtr = std::shared_ptr<IMergeTreeIndexCondition>;
/// Structure for storing basic index info like columns, expression, arguments, ...
@ -101,7 +101,7 @@ public:
virtual MergeTreeIndexGranulePtr createIndexGranule() const = 0;
virtual MergeTreeIndexAggregatorPtr createIndexAggregator() const = 0;
virtual IndexConditionPtr createIndexCondition(
virtual MergeTreeIndexConditionPtr createIndexCondition(
const SelectQueryInfo & query_info, const Context & context) const = 0;
String name;

View File

@ -24,10 +24,7 @@ public:
using AtomFromASTFunc = std::function<
bool(const ASTPtr & node, const Context & context, Block & block_with_constants, RPNElement & out)>;
RPNBuilder(
const SelectQueryInfo & query_info,
const Context & context_,
const AtomFromASTFunc & atomFromAST_)
RPNBuilder(const SelectQueryInfo & query_info, const Context & context_, const AtomFromASTFunc & atomFromAST_)
: context(context_), atomFromAST(atomFromAST_)
{
/** Evaluation of expressions that depend only on constants.

View File

@ -2,8 +2,8 @@
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/MergeTreeIndices.h>
#include <Storages/MergeTree/MergeTreeMinMaxIndex.h>
#include <Storages/MergeTree/MergeTreeSetSkippingIndex.h>
#include <Storages/MergeTree/MergeTreeIndexMinMax.h>
#include <Storages/MergeTree/MergeTreeIndexSet.h>
#include <Common/typeid_cast.h>
#include <Common/OptimizedRegularExpression.h>

View File

@ -0,0 +1,12 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
set -e
for sequence in 1 10 100 1000 10000 100000 1000000 10000000 100000000 1000000000; do \
rate=`echo "1 $sequence" | awk '{printf("%0.9f\n",$1/$2)}'`
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS test.bloom_filter_idx";
$CLICKHOUSE_CLIENT --allow_experimental_data_skipping_indices=1 --query="CREATE TABLE test.bloom_filter_idx ( u64 UInt64, i32 Int32, f64 Float64, d Decimal(10, 2), s String, e Enum8('a' = 1, 'b' = 2, 'c' = 3), dt Date, INDEX bloom_filter_a i32 TYPE bloom_filter($rate) GRANULARITY 1 ) ENGINE = MergeTree() ORDER BY u64 SETTINGS index_granularity = 8192"
done

View File

@ -0,0 +1,30 @@
1
0
1
1
2
0
2
2
2
0
2
2
2
0
2
2
1
1
1
1
1
1
1
1
1
1
1
1
1
1

View File

@ -0,0 +1,50 @@
SET allow_experimental_data_skipping_indices = 1;
DROP TABLE IF EXISTS test.single_column_bloom_filter;
CREATE TABLE test.single_column_bloom_filter (u64 UInt64, i32 Int32, i64 UInt64, INDEX idx (i32) TYPE bloom_filter GRANULARITY 1) ENGINE = MergeTree() ORDER BY u64 SETTINGS index_granularity = 6;
INSERT INTO test.single_column_bloom_filter SELECT number AS u64, number AS i32, number AS i64 FROM system.numbers LIMIT 100;
SELECT COUNT() FROM test.single_column_bloom_filter WHERE i32 = 1 SETTINGS max_rows_to_read = 6;
SELECT COUNT() FROM test.single_column_bloom_filter WHERE (i32, i32) = (1, 2) SETTINGS max_rows_to_read = 6;
SELECT COUNT() FROM test.single_column_bloom_filter WHERE (i32, i64) = (1, 1) SETTINGS max_rows_to_read = 6;
SELECT COUNT() FROM test.single_column_bloom_filter WHERE (i64, (i64, i32)) = (1, (1, 1)) SETTINGS max_rows_to_read = 6;
SELECT COUNT() FROM test.single_column_bloom_filter WHERE i32 IN (1, 2) SETTINGS max_rows_to_read = 6;
SELECT COUNT() FROM test.single_column_bloom_filter WHERE (i32, i32) IN ((1, 2), (2, 3)) SETTINGS max_rows_to_read = 6;
SELECT COUNT() FROM test.single_column_bloom_filter WHERE (i32, i64) IN ((1, 1), (2, 2)) SETTINGS max_rows_to_read = 6;
SELECT COUNT() FROM test.single_column_bloom_filter WHERE (i64, (i64, i32)) IN ((1, (1, 1)), (2, (2, 2))) SETTINGS max_rows_to_read = 6;
SELECT COUNT() FROM test.single_column_bloom_filter WHERE i32 IN (SELECT arrayJoin([toInt32(1), toInt32(2)])) SETTINGS max_rows_to_read = 6;
SELECT COUNT() FROM test.single_column_bloom_filter WHERE (i32, i32) IN (SELECT arrayJoin([(toInt32(1), toInt32(2)), (toInt32(2), toInt32(3))])) SETTINGS max_rows_to_read = 6;
SELECT COUNT() FROM test.single_column_bloom_filter WHERE (i32, i64) IN (SELECT arrayJoin([(toInt32(1), toUInt64(1)), (toInt32(2), toUInt64(2))])) SETTINGS max_rows_to_read = 6;
SELECT COUNT() FROM test.single_column_bloom_filter WHERE (i64, (i64, i32)) IN (SELECT arrayJoin([(toUInt64(1), (toUInt64(1), toInt32(1))), (toUInt64(2), (toUInt64(2), toInt32(2)))])) SETTINGS max_rows_to_read = 6;
WITH (1, 2) AS liter_prepared_set SELECT COUNT() FROM test.single_column_bloom_filter WHERE i32 IN liter_prepared_set SETTINGS max_rows_to_read = 6;
WITH ((1, 2), (2, 3)) AS liter_prepared_set SELECT COUNT() FROM test.single_column_bloom_filter WHERE (i32, i32) IN liter_prepared_set SETTINGS max_rows_to_read = 6;
WITH ((1, 1), (2, 2)) AS liter_prepared_set SELECT COUNT() FROM test.single_column_bloom_filter WHERE (i32, i64) IN liter_prepared_set SETTINGS max_rows_to_read = 6;
WITH ((1, (1, 1)), (2, (2, 2))) AS liter_prepared_set SELECT COUNT() FROM test.single_column_bloom_filter WHERE (i64, (i64, i32)) IN liter_prepared_set SETTINGS max_rows_to_read = 6;
DROP TABLE IF EXISTS test.single_column_bloom_filter;
DROP TABLE IF EXISTS test.bloom_filter_types_test;
CREATE TABLE test.bloom_filter_types_test (order_key UInt64, i8 Int8, i16 Int16, i32 Int32, i64 Int64, u8 UInt8, u16 UInt16, u32 UInt32, u64 UInt64, f32 Float32, f64 Float64, date Date, date_time DateTime('Europe/Moscow'), str String, fixed_string FixedString(5), INDEX idx (i8, i16, i32, i64, u8, u16, u32, u64, f32, f64, date, date_time, str, fixed_string) TYPE bloom_filter GRANULARITY 1) ENGINE = MergeTree() ORDER BY order_key SETTINGS index_granularity = 6;
INSERT INTO test.bloom_filter_types_test SELECT number AS order_key, toInt8(number) AS i8, toInt16(number) AS i16, toInt32(number) AS i32, toInt64(number) AS i64, toUInt8(number) AS u8, toUInt16(number) AS u16, toUInt32(number) AS u32, toUInt64(number) AS u64, toFloat32(number) AS f32, toFloat64(number) AS f64, toDate(number, 'Europe/Moscow') AS date, toDateTime(number, 'Europe/Moscow') AS date_time, toString(number) AS str, toFixedString(toString(number), 5) AS fixed_string FROM system.numbers LIMIT 100;
SELECT COUNT() FROM test.bloom_filter_types_test WHERE i8 = 1 SETTINGS max_rows_to_read = 6;
SELECT COUNT() FROM test.bloom_filter_types_test WHERE i16 = 1 SETTINGS max_rows_to_read = 6;
SELECT COUNT() FROM test.bloom_filter_types_test WHERE i32 = 1 SETTINGS max_rows_to_read = 6;
SELECT COUNT() FROM test.bloom_filter_types_test WHERE i64 = 1 SETTINGS max_rows_to_read = 6;
SELECT COUNT() FROM test.bloom_filter_types_test WHERE u8 = 1 SETTINGS max_rows_to_read = 6;
SELECT COUNT() FROM test.bloom_filter_types_test WHERE u16 = 1 SETTINGS max_rows_to_read = 6;
SELECT COUNT() FROM test.bloom_filter_types_test WHERE u32 = 1 SETTINGS max_rows_to_read = 6;
SELECT COUNT() FROM test.bloom_filter_types_test WHERE u64 = 1 SETTINGS max_rows_to_read = 6;
SELECT COUNT() FROM test.bloom_filter_types_test WHERE f32 = 1 SETTINGS max_rows_to_read = 6;
SELECT COUNT() FROM test.bloom_filter_types_test WHERE f64 = 1 SETTINGS max_rows_to_read = 6;
SELECT COUNT() FROM test.bloom_filter_types_test WHERE date = '1970-01-02' SETTINGS max_rows_to_read = 6;
SELECT COUNT() FROM test.bloom_filter_types_test WHERE date_time = toDateTime('1970-01-01 03:00:01', 'Europe/Moscow') SETTINGS max_rows_to_read = 6;
SELECT COUNT() FROM test.bloom_filter_types_test WHERE str = '1' SETTINGS max_rows_to_read = 6;
SELECT COUNT() FROM test.bloom_filter_types_test WHERE fixed_string = toFixedString('1', 5) SETTINGS max_rows_to_read = 12;
DROP TABLE IF EXISTS test.bloom_filter_types_test;