mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Merge f559037c4d
into 44b4bd38b9
This commit is contained in:
commit
7fdf3efe63
@ -1,73 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <config.h>
|
||||
|
||||
#if USE_PARQUET
|
||||
|
||||
#include <Storages/MergeTree/KeyCondition.h>
|
||||
#include <parquet/metadata.h>
|
||||
#include <Processors/Formats/Impl/ArrowFieldIndexUtil.h>
|
||||
|
||||
namespace parquet
|
||||
{
|
||||
class BloomFilter;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class ParquetBloomFilterCondition
|
||||
{
|
||||
public:
|
||||
|
||||
struct ConditionElement
|
||||
{
|
||||
enum Function
|
||||
{
|
||||
/// Atoms of a Boolean expression.
|
||||
FUNCTION_IN,
|
||||
FUNCTION_NOT_IN,
|
||||
/// Can take any value.
|
||||
FUNCTION_UNKNOWN,
|
||||
/// Operators of the logical expression.
|
||||
FUNCTION_NOT,
|
||||
FUNCTION_AND,
|
||||
FUNCTION_OR,
|
||||
/// Constants
|
||||
ALWAYS_FALSE,
|
||||
ALWAYS_TRUE,
|
||||
};
|
||||
|
||||
using ColumnPtr = IColumn::Ptr;
|
||||
using HashesForColumns = std::vector<std::vector<uint64_t>>;
|
||||
using KeyColumns = std::vector<std::size_t>;
|
||||
|
||||
Function function;
|
||||
// each entry represents a list of hashes per column
|
||||
// suppose there are three columns with 2 rows each
|
||||
// hashes_per_column.size() == 3 and hashes_per_column[0].size() == 2
|
||||
HashesForColumns hashes_per_column;
|
||||
KeyColumns key_columns;
|
||||
};
|
||||
|
||||
using RPNElement = KeyCondition::RPNElement;
|
||||
using ColumnIndexToBF = std::unordered_map<std::size_t, std::unique_ptr<parquet::BloomFilter>>;
|
||||
|
||||
explicit ParquetBloomFilterCondition(const std::vector<ConditionElement> & condition_, const Block & header_);
|
||||
|
||||
bool mayBeTrueOnRowGroup(const ColumnIndexToBF & column_index_to_column_bf) const;
|
||||
std::unordered_set<std::size_t> getFilteringColumnKeys() const;
|
||||
|
||||
private:
|
||||
std::vector<ParquetBloomFilterCondition::ConditionElement> condition;
|
||||
Block header;
|
||||
};
|
||||
|
||||
std::vector<ParquetBloomFilterCondition::ConditionElement> keyConditionRPNToParquetBloomFilterCondition(
|
||||
const std::vector<KeyCondition::RPNElement> & rpn,
|
||||
const std::vector<ArrowFieldIndexUtil::ClickHouseIndexToParquetIndex> & clickhouse_column_index_to_parquet_index,
|
||||
const std::unique_ptr<parquet::RowGroupMetaData> & parquet_rg_metadata);
|
||||
|
||||
}
|
||||
|
||||
#endif
|
@ -1,12 +1,10 @@
|
||||
#include <Processors/Formats/Impl/Parquet/ParquetBloomFilterCondition.h>
|
||||
#include <iostream>
|
||||
#include <Processors/Formats/Impl/Parquet/keyConditionRPNToParquetRPN.h>
|
||||
|
||||
#if USE_PARQUET
|
||||
|
||||
#include <parquet/bloom_filter.h>
|
||||
#include <parquet/metadata.h>
|
||||
#include <parquet/xxhasher.h>
|
||||
#include <Interpreters/convertFieldToType.h>
|
||||
#include <Columns/ColumnConst.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -16,8 +14,43 @@ namespace ErrorCodes
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
namespace
|
||||
const parquet::ColumnDescriptor * getColumnDescriptorIfBloomFilterIsPresent(
|
||||
const std::unique_ptr<parquet::RowGroupMetaData> & parquet_rg_metadata,
|
||||
const std::vector<ArrowFieldIndexUtil::ClickHouseIndexToParquetIndex> & clickhouse_column_index_to_parquet_index,
|
||||
std::size_t clickhouse_column_index)
|
||||
{
|
||||
if (clickhouse_column_index_to_parquet_index.size() <= clickhouse_column_index)
|
||||
{
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
const auto & parquet_indexes = clickhouse_column_index_to_parquet_index[clickhouse_column_index].parquet_indexes;
|
||||
|
||||
// complex types like structs, tuples and maps will have more than one index.
|
||||
// we don't support those for now
|
||||
if (parquet_indexes.size() > 1)
|
||||
{
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
if (parquet_indexes.empty())
|
||||
{
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Something bad happened, raise an issue and try the query with `input_format_parquet_bloom_filter_push_down=false`");
|
||||
}
|
||||
|
||||
auto parquet_column_index = parquet_indexes[0];
|
||||
|
||||
const auto * parquet_column_descriptor = parquet_rg_metadata->schema()->Column(parquet_column_index);
|
||||
|
||||
bool column_has_bloom_filter = parquet_rg_metadata->ColumnChunk(parquet_column_index)->bloom_filter_offset().has_value();
|
||||
if (!column_has_bloom_filter)
|
||||
{
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
return parquet_column_descriptor;
|
||||
}
|
||||
|
||||
|
||||
bool isParquetStringTypeSupportedForBloomFilters(
|
||||
const std::shared_ptr<const parquet::LogicalType> & logical_type,
|
||||
@ -48,9 +81,9 @@ bool isParquetIntegerTypeSupportedForBloomFilters(const std::shared_ptr<const pa
|
||||
}
|
||||
|
||||
if (parquet::ConvertedType::type::NONE != converted_type && !(converted_type == parquet::ConvertedType::INT_8 || converted_type == parquet::ConvertedType::INT_16
|
||||
|| converted_type == parquet::ConvertedType::INT_32 || converted_type == parquet::ConvertedType::INT_64
|
||||
|| converted_type == parquet::ConvertedType::UINT_8 || converted_type == parquet::ConvertedType::UINT_16
|
||||
|| converted_type == parquet::ConvertedType::UINT_32 || converted_type == parquet::ConvertedType::UINT_64))
|
||||
|| converted_type == parquet::ConvertedType::INT_32 || converted_type == parquet::ConvertedType::INT_64
|
||||
|| converted_type == parquet::ConvertedType::UINT_8 || converted_type == parquet::ConvertedType::UINT_16
|
||||
|| converted_type == parquet::ConvertedType::UINT_32 || converted_type == parquet::ConvertedType::UINT_64))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
@ -197,194 +230,25 @@ std::optional<std::vector<uint64_t>> hash(const IColumn * data_column, const par
|
||||
return hashes;
|
||||
}
|
||||
|
||||
bool maybeTrueOnBloomFilter(const std::vector<uint64_t> & hashes, const std::unique_ptr<parquet::BloomFilter> & bloom_filter)
|
||||
KeyCondition::RPN keyConditionRPNToParquetRPN(const std::vector<KeyCondition::RPNElement> & rpn,
|
||||
const std::vector<ArrowFieldIndexUtil::ClickHouseIndexToParquetIndex> & clickhouse_column_index_to_parquet_index,
|
||||
const std::unique_ptr<parquet::RowGroupMetaData> & parquet_rg_metadata)
|
||||
{
|
||||
for (const auto hash : hashes)
|
||||
{
|
||||
if (bloom_filter->FindHash(hash))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
const parquet::ColumnDescriptor * getColumnDescriptorIfBloomFilterIsPresent(
|
||||
const std::unique_ptr<parquet::RowGroupMetaData> & parquet_rg_metadata,
|
||||
const std::vector<ArrowFieldIndexUtil::ClickHouseIndexToParquetIndex> & clickhouse_column_index_to_parquet_index,
|
||||
std::size_t clickhouse_column_index)
|
||||
{
|
||||
if (clickhouse_column_index_to_parquet_index.size() <= clickhouse_column_index)
|
||||
{
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
const auto & parquet_indexes = clickhouse_column_index_to_parquet_index[clickhouse_column_index].parquet_indexes;
|
||||
|
||||
// complex types like structs, tuples and maps will have more than one index.
|
||||
// we don't support those for now
|
||||
if (parquet_indexes.size() > 1)
|
||||
{
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
if (parquet_indexes.empty())
|
||||
{
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Something bad happened, raise an issue and try the query with `input_format_parquet_bloom_filter_push_down=false`");
|
||||
}
|
||||
|
||||
auto parquet_column_index = parquet_indexes[0];
|
||||
|
||||
const auto * parquet_column_descriptor = parquet_rg_metadata->schema()->Column(parquet_column_index);
|
||||
|
||||
bool column_has_bloom_filter = parquet_rg_metadata->ColumnChunk(parquet_column_index)->bloom_filter_offset().has_value();
|
||||
if (!column_has_bloom_filter)
|
||||
{
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
return parquet_column_descriptor;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
ParquetBloomFilterCondition::ParquetBloomFilterCondition(const std::vector<ConditionElement> & condition_, const Block & header_)
|
||||
: condition(condition_), header(header_)
|
||||
{
|
||||
}
|
||||
|
||||
bool ParquetBloomFilterCondition::mayBeTrueOnRowGroup(const ColumnIndexToBF & column_index_to_column_bf) const
|
||||
{
|
||||
using Function = ConditionElement::Function;
|
||||
std::vector<BoolMask> rpn_stack;
|
||||
|
||||
for (const auto & element : condition)
|
||||
{
|
||||
if (element.function == Function::FUNCTION_IN
|
||||
|| element.function == Function::FUNCTION_NOT_IN)
|
||||
{
|
||||
bool maybe_true = true;
|
||||
for (auto column_index = 0u; column_index < element.hashes_per_column.size(); column_index++)
|
||||
{
|
||||
// in case bloom filter is not present for this row group
|
||||
// https://github.com/ClickHouse/ClickHouse/pull/62966#discussion_r1722361237
|
||||
if (!column_index_to_column_bf.contains(element.key_columns[column_index]))
|
||||
{
|
||||
rpn_stack.emplace_back(true, true);
|
||||
continue;
|
||||
}
|
||||
|
||||
bool column_maybe_contains = maybeTrueOnBloomFilter(
|
||||
element.hashes_per_column[column_index],
|
||||
column_index_to_column_bf.at(element.key_columns[column_index]));
|
||||
|
||||
if (!column_maybe_contains)
|
||||
{
|
||||
maybe_true = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
rpn_stack.emplace_back(maybe_true, true);
|
||||
if (element.function == Function::FUNCTION_NOT_IN)
|
||||
rpn_stack.back() = !rpn_stack.back();
|
||||
}
|
||||
else if (element.function == Function::FUNCTION_NOT)
|
||||
{
|
||||
rpn_stack.back() = !rpn_stack.back();
|
||||
}
|
||||
else if (element.function == Function::FUNCTION_OR)
|
||||
{
|
||||
auto arg1 = rpn_stack.back();
|
||||
rpn_stack.pop_back();
|
||||
auto arg2 = rpn_stack.back();
|
||||
rpn_stack.back() = arg1 | arg2;
|
||||
}
|
||||
else if (element.function == Function::FUNCTION_AND)
|
||||
{
|
||||
auto arg1 = rpn_stack.back();
|
||||
rpn_stack.pop_back();
|
||||
auto arg2 = rpn_stack.back();
|
||||
rpn_stack.back() = arg1 & arg2;
|
||||
}
|
||||
else if (element.function == Function::ALWAYS_TRUE)
|
||||
{
|
||||
rpn_stack.emplace_back(true, false);
|
||||
}
|
||||
else if (element.function == Function::ALWAYS_FALSE)
|
||||
{
|
||||
rpn_stack.emplace_back(false, true);
|
||||
}
|
||||
else
|
||||
{
|
||||
rpn_stack.emplace_back(true, true);
|
||||
}
|
||||
}
|
||||
|
||||
if (rpn_stack.size() != 1)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected stack size in KeyCondition::mayBeTrueOnRowGroup");
|
||||
|
||||
return rpn_stack[0].can_be_true;
|
||||
}
|
||||
|
||||
std::unordered_set<std::size_t> ParquetBloomFilterCondition::getFilteringColumnKeys() const
|
||||
{
|
||||
std::unordered_set<std::size_t> column_keys;
|
||||
|
||||
for (const auto & element : condition)
|
||||
{
|
||||
for (const auto index : element.key_columns)
|
||||
{
|
||||
column_keys.insert(index);
|
||||
}
|
||||
}
|
||||
|
||||
return column_keys;
|
||||
}
|
||||
|
||||
/*
|
||||
* `KeyCondition::rpn` is overly complex for bloom filters, some operations are not even supported. Not only that, but to avoid hashing each time
|
||||
* we loop over a rpn element, we need to store hashes instead of where predicate values. To address this, we loop over `KeyCondition::rpn`
|
||||
* and build a simplified RPN that holds hashes instead of values.
|
||||
*
|
||||
* `KeyCondition::RPNElement::FUNCTION_IN_RANGE` becomes:
|
||||
* `FUNCTION_IN`
|
||||
* `FUNCTION_UNKNOWN` when range limits are different
|
||||
* `KeyCondition::RPNElement::FUNCTION_IN_SET` becomes
|
||||
* `FUNCTION_IN`
|
||||
*
|
||||
* Complex types and structs are not supported.
|
||||
* There are two sources of data types being analyzed, and they need to be compatible: DB::Field type and parquet type.
|
||||
* This is determined by the `isColumnSupported` method.
|
||||
*
|
||||
* Some interesting examples:
|
||||
* 1. file(..., 'str_column UInt64') where str_column = 50; Field.type == UInt64. Parquet type string. Not supported.
|
||||
* 2. file(...) where str_column = 50; Field.type == String (conversion already taken care by `KeyCondition`). Parquet type string.
|
||||
* 3. file(...) where uint32_column = toIPv4(5). Field.type == IPv4. Incompatible column types, resolved by `KeyCondition` itself.
|
||||
* 4. file(...) where toIPv4(uint32_column) = toIPv4(5). Field.type == IPv4. We know it is safe to hash it using an int32 API.
|
||||
* */
|
||||
std::vector<ParquetBloomFilterCondition::ConditionElement> keyConditionRPNToParquetBloomFilterCondition(
|
||||
const std::vector<KeyCondition::RPNElement> & rpn,
|
||||
const std::vector<ArrowFieldIndexUtil::ClickHouseIndexToParquetIndex> & clickhouse_column_index_to_parquet_index,
|
||||
const std::unique_ptr<parquet::RowGroupMetaData> & parquet_rg_metadata)
|
||||
{
|
||||
std::vector<ParquetBloomFilterCondition::ConditionElement> condition_elements;
|
||||
std::vector<KeyCondition::RPNElement> condition_elements;
|
||||
|
||||
using RPNElement = KeyCondition::RPNElement;
|
||||
using Function = ParquetBloomFilterCondition::ConditionElement::Function;
|
||||
|
||||
for (const auto & rpn_element : rpn)
|
||||
{
|
||||
condition_elements.emplace_back(rpn_element);
|
||||
// this would be a problem for `where negate(x) = -58`.
|
||||
// It would perform a bf search on `-58`, and possibly miss row groups containing this data.
|
||||
if (!rpn_element.monotonic_functions_chain.empty())
|
||||
{
|
||||
condition_elements.emplace_back(Function::FUNCTION_UNKNOWN);
|
||||
continue;
|
||||
}
|
||||
|
||||
ParquetBloomFilterCondition::ConditionElement::HashesForColumns hashes;
|
||||
KeyCondition::BloomFilterData::HashesForColumns hashes;
|
||||
|
||||
if (rpn_element.function == RPNElement::FUNCTION_IN_RANGE
|
||||
|| rpn_element.function == RPNElement::FUNCTION_NOT_IN_RANGE)
|
||||
@ -392,7 +256,6 @@ std::vector<ParquetBloomFilterCondition::ConditionElement> keyConditionRPNToParq
|
||||
// Only FUNCTION_EQUALS is supported and for that extremes need to be the same
|
||||
if (rpn_element.range.left != rpn_element.range.right)
|
||||
{
|
||||
condition_elements.emplace_back(Function::FUNCTION_UNKNOWN);
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -401,7 +264,6 @@ std::vector<ParquetBloomFilterCondition::ConditionElement> keyConditionRPNToParq
|
||||
|
||||
if (!parquet_column_descriptor)
|
||||
{
|
||||
condition_elements.emplace_back(Function::FUNCTION_UNKNOWN);
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -409,7 +271,6 @@ std::vector<ParquetBloomFilterCondition::ConditionElement> keyConditionRPNToParq
|
||||
|
||||
if (!hashed_value)
|
||||
{
|
||||
condition_elements.emplace_back(Function::FUNCTION_UNKNOWN);
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -418,14 +279,10 @@ std::vector<ParquetBloomFilterCondition::ConditionElement> keyConditionRPNToParq
|
||||
|
||||
hashes.emplace_back(std::move(hashes_for_column));
|
||||
|
||||
auto function = rpn_element.function == RPNElement::FUNCTION_IN_RANGE
|
||||
? ParquetBloomFilterCondition::ConditionElement::Function::FUNCTION_IN
|
||||
: ParquetBloomFilterCondition::ConditionElement::Function::FUNCTION_NOT_IN;
|
||||
|
||||
std::vector<std::size_t> key_columns;
|
||||
key_columns.emplace_back(rpn_element.key_column);
|
||||
|
||||
condition_elements.emplace_back(function, std::move(hashes), std::move(key_columns));
|
||||
condition_elements.back().bloom_filter_data = KeyCondition::BloomFilterData {std::move(hashes), std::move(key_columns)};
|
||||
}
|
||||
else if (rpn_element.function == RPNElement::FUNCTION_IN_SET
|
||||
|| rpn_element.function == RPNElement::FUNCTION_NOT_IN_SET)
|
||||
@ -485,35 +342,16 @@ std::vector<ParquetBloomFilterCondition::ConditionElement> keyConditionRPNToParq
|
||||
|
||||
if (found_empty_column)
|
||||
{
|
||||
condition_elements.emplace_back(Function::ALWAYS_FALSE);
|
||||
// todo arthur
|
||||
continue;
|
||||
}
|
||||
|
||||
if (hashes.empty())
|
||||
{
|
||||
condition_elements.emplace_back(Function::FUNCTION_UNKNOWN);
|
||||
continue;
|
||||
}
|
||||
|
||||
auto function = RPNElement::FUNCTION_IN_SET == rpn_element.function ? Function::FUNCTION_IN : Function::FUNCTION_NOT_IN;
|
||||
|
||||
condition_elements.emplace_back(function, hashes, key_columns);
|
||||
}
|
||||
else if (rpn_element.function == RPNElement::FUNCTION_NOT)
|
||||
{
|
||||
condition_elements.emplace_back(Function::FUNCTION_NOT);
|
||||
}
|
||||
else if (rpn_element.function == RPNElement::FUNCTION_OR)
|
||||
{
|
||||
condition_elements.emplace_back(Function::FUNCTION_OR);
|
||||
}
|
||||
else if (rpn_element.function == RPNElement::FUNCTION_AND)
|
||||
{
|
||||
condition_elements.emplace_back(Function::FUNCTION_AND);
|
||||
}
|
||||
else
|
||||
{
|
||||
condition_elements.emplace_back(Function::ALWAYS_TRUE);
|
||||
condition_elements.back().bloom_filter_data = {std::move(hashes), std::move(key_columns)};
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,24 @@
|
||||
#pragma once
|
||||
|
||||
#include <config.h>
|
||||
|
||||
#if USE_PARQUET
|
||||
|
||||
#include <Storages/MergeTree/KeyCondition.h>
|
||||
#include <Processors/Formats/Impl/ArrowFieldIndexUtil.h>
|
||||
|
||||
namespace parquet
|
||||
{
|
||||
class RowGroupMetadata;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
KeyCondition::RPN keyConditionRPNToParquetRPN(const std::vector<KeyCondition::RPNElement> & rpn,
|
||||
const std::vector<ArrowFieldIndexUtil::ClickHouseIndexToParquetIndex> & clickhouse_column_index_to_parquet_index,
|
||||
const std::unique_ptr<parquet::RowGroupMetaData> & parquet_rg_metadata);
|
||||
|
||||
}
|
||||
|
||||
#endif
|
@ -27,7 +27,7 @@
|
||||
#include <DataTypes/DataTypeNullable.h>
|
||||
#include <Common/FieldVisitorsAccurateComparison.h>
|
||||
#include <Processors/Formats/Impl/Parquet/ParquetRecordReader.h>
|
||||
#include <Processors/Formats/Impl/Parquet/ParquetBloomFilterCondition.h>
|
||||
#include <Processors/Formats/Impl/Parquet/keyConditionRPNToParquetRPN.h>
|
||||
#include <Interpreters/convertFieldToType.h>
|
||||
|
||||
namespace ProfileEvents
|
||||
@ -275,7 +275,21 @@ static Field decodePlainParquetValueSlow(const std::string & data, parquet::Type
|
||||
return field;
|
||||
}
|
||||
|
||||
static ParquetBloomFilterCondition::ColumnIndexToBF buildColumnIndexToBF(
|
||||
struct ParquetBloomFilter : public KeyCondition::BloomFilter
|
||||
{
|
||||
explicit ParquetBloomFilter(std::unique_ptr<parquet::BloomFilter> && parquet_bf_)
|
||||
: parquet_bf(std::move(parquet_bf_)) {}
|
||||
|
||||
bool findHash(uint64_t hash) override
|
||||
{
|
||||
return parquet_bf->FindHash(hash);
|
||||
}
|
||||
|
||||
private:
|
||||
std::unique_ptr<parquet::BloomFilter> parquet_bf;
|
||||
};
|
||||
|
||||
static KeyCondition::ColumnIndexToBloomFilter buildColumnIndexToBF(
|
||||
parquet::BloomFilterReader & bf_reader,
|
||||
int row_group,
|
||||
const std::vector<ArrowFieldIndexUtil::ClickHouseIndexToParquetIndex> & clickhouse_column_index_to_parquet_index,
|
||||
@ -289,7 +303,7 @@ static ParquetBloomFilterCondition::ColumnIndexToBF buildColumnIndexToBF(
|
||||
return {};
|
||||
}
|
||||
|
||||
ParquetBloomFilterCondition::ColumnIndexToBF index_to_column_bf;
|
||||
KeyCondition::ColumnIndexToBloomFilter index_to_column_bf;
|
||||
|
||||
for (const auto & [clickhouse_index, parquet_indexes] : clickhouse_column_index_to_parquet_index)
|
||||
{
|
||||
@ -306,14 +320,14 @@ static ParquetBloomFilterCondition::ColumnIndexToBF buildColumnIndexToBF(
|
||||
|
||||
auto parquet_index = parquet_indexes[0];
|
||||
|
||||
auto bf = rg_bf->GetColumnBloomFilter(parquet_index);
|
||||
auto parquet_bf = rg_bf->GetColumnBloomFilter(parquet_index);
|
||||
|
||||
if (!bf)
|
||||
if (!parquet_bf)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
index_to_column_bf[clickhouse_index] = std::move(bf);
|
||||
index_to_column_bf[clickhouse_index] = std::make_unique<ParquetBloomFilter>(std::move(parquet_bf));
|
||||
}
|
||||
|
||||
return index_to_column_bf;
|
||||
@ -484,6 +498,24 @@ static std::vector<Range> getHyperrectangleForRowGroup(const parquet::FileMetaDa
|
||||
return hyperrectangle;
|
||||
}
|
||||
|
||||
std::unordered_set<std::size_t> getBloomFilterFilteringColumnKeys(const KeyCondition::RPN & rpn)
|
||||
{
|
||||
std::unordered_set<std::size_t> column_keys;
|
||||
|
||||
for (const auto & element : rpn)
|
||||
{
|
||||
if (auto bf_data = element.bloom_filter_data)
|
||||
{
|
||||
for (const auto index : bf_data->key_columns)
|
||||
{
|
||||
column_keys.insert(index);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return column_keys;
|
||||
}
|
||||
|
||||
ParquetBlockInputFormat::ParquetBlockInputFormat(
|
||||
ReadBuffer & buf,
|
||||
const Block & header_,
|
||||
@ -577,45 +609,68 @@ void ParquetBlockInputFormat::initializeIfNeeded()
|
||||
return std::min(std::max(preferred_num_rows, MIN_ROW_NUM), static_cast<size_t>(format_settings.parquet.max_block_size));
|
||||
};
|
||||
|
||||
std::unique_ptr<ParquetBloomFilterCondition> parquet_bloom_filter_condition;
|
||||
|
||||
std::unordered_set<std::size_t> filtering_columns;
|
||||
|
||||
if (format_settings.parquet.bloom_filter_push_down && key_condition)
|
||||
{
|
||||
bf_reader = parquet::BloomFilterReader::Make(arrow_file, metadata, bf_reader_properties, nullptr);
|
||||
|
||||
const auto parquet_conditions = keyConditionRPNToParquetBloomFilterCondition(
|
||||
const auto parquet_conditions = keyConditionRPNToParquetRPN(
|
||||
key_condition->getRPN(),
|
||||
index_mapping,
|
||||
metadata->RowGroup(0));
|
||||
parquet_bloom_filter_condition = std::make_unique<ParquetBloomFilterCondition>(parquet_conditions, getPort().getHeader());
|
||||
|
||||
filtering_columns = parquet_bloom_filter_condition->getFilteringColumnKeys();
|
||||
filtering_columns = getBloomFilterFilteringColumnKeys(parquet_conditions);
|
||||
}
|
||||
|
||||
auto skip_row_group_based_on_filters = [&](int row_group)
|
||||
{
|
||||
if (!format_settings.parquet.filter_push_down && !format_settings.parquet.bloom_filter_push_down)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
KeyCondition::RPN possibly_modified_rpn = key_condition->getRPN();
|
||||
KeyCondition::ColumnIndexToBloomFilter column_index_to_bloom_filter;
|
||||
|
||||
const auto & header = getPort().getHeader();
|
||||
|
||||
std::vector<Range> hyperrectangle(header.columns(), Range::createWholeUniverse());
|
||||
|
||||
if (format_settings.parquet.filter_push_down)
|
||||
{
|
||||
hyperrectangle = getHyperrectangleForRowGroup(*metadata, row_group, header, format_settings);getHyperrectangleForRowGroup(*metadata, row_group, getPort().getHeader(), format_settings);
|
||||
}
|
||||
|
||||
if (format_settings.parquet.bloom_filter_push_down)
|
||||
{
|
||||
possibly_modified_rpn = keyConditionRPNToParquetRPN(key_condition->getRPN(),
|
||||
index_mapping,
|
||||
metadata->RowGroup(row_group));
|
||||
|
||||
column_index_to_bloom_filter = buildColumnIndexToBF(*bf_reader, row_group, index_mapping, filtering_columns);
|
||||
}
|
||||
|
||||
bool maybe_exists = KeyCondition::checkRPNAgainstHyperrectangle(
|
||||
possibly_modified_rpn,
|
||||
hyperrectangle,
|
||||
key_condition->key_space_filling_curves,
|
||||
getPort().getHeader().getDataTypes(),
|
||||
key_condition->isSinglePoint(),
|
||||
column_index_to_bloom_filter).can_be_true;
|
||||
|
||||
return !maybe_exists;
|
||||
};
|
||||
|
||||
for (int row_group = 0; row_group < num_row_groups; ++row_group)
|
||||
{
|
||||
if (skip_row_groups.contains(row_group))
|
||||
continue;
|
||||
|
||||
if (parquet_bloom_filter_condition)
|
||||
if (key_condition && skip_row_group_based_on_filters(row_group))
|
||||
{
|
||||
const auto column_index_to_bf = buildColumnIndexToBF(*bf_reader, row_group, index_mapping, filtering_columns);
|
||||
|
||||
if (!parquet_bloom_filter_condition->mayBeTrueOnRowGroup(column_index_to_bf))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (format_settings.parquet.filter_push_down && key_condition
|
||||
&& !key_condition
|
||||
->checkInHyperrectangle(
|
||||
getHyperrectangleForRowGroup(*metadata, row_group, getPort().getHeader(), format_settings),
|
||||
getPort().getHeader().getDataTypes())
|
||||
.can_be_true)
|
||||
continue;
|
||||
}
|
||||
|
||||
// When single-threaded parsing, can prefetch row groups, so need to put all row groups in the same row_group_batch
|
||||
if (row_group_batches.empty() || (!prefetch_group && row_group_batches.back().total_bytes_compressed >= min_bytes_for_seek))
|
||||
|
@ -3004,6 +3004,57 @@ bool KeyCondition::extractPlainRanges(Ranges & ranges) const
|
||||
BoolMask KeyCondition::checkInHyperrectangle(
|
||||
const Hyperrectangle & hyperrectangle,
|
||||
const DataTypes & data_types) const
|
||||
{
|
||||
return checkRPNAgainstHyperrectangle(rpn, hyperrectangle, key_space_filling_curves, data_types, single_point);
|
||||
}
|
||||
|
||||
bool mayExistOnBloomFilter(const std::vector<uint64_t> & hashes, const std::unique_ptr<KeyCondition::BloomFilter> & bloom_filter)
|
||||
{
|
||||
for (const auto hash : hashes)
|
||||
{
|
||||
if (bloom_filter->findHash(hash))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
bool mayExistOnBloomFilter(const KeyCondition::BloomFilterData & condition_bloom_filter_data,
|
||||
const KeyCondition::ColumnIndexToBloomFilter & column_index_to_column_bf)
|
||||
{
|
||||
bool maybe_true = true;
|
||||
for (auto column_index = 0u; column_index < condition_bloom_filter_data.hashes_per_column.size(); column_index++)
|
||||
{
|
||||
// in case bloom filter is not present for this row group
|
||||
// https://github.com/ClickHouse/ClickHouse/pull/62966#discussion_r1722361237
|
||||
if (!column_index_to_column_bf.contains(condition_bloom_filter_data.key_columns[column_index]))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
bool column_maybe_contains = mayExistOnBloomFilter(
|
||||
condition_bloom_filter_data.hashes_per_column[column_index],
|
||||
column_index_to_column_bf.at(condition_bloom_filter_data.key_columns[column_index]));
|
||||
|
||||
if (!column_maybe_contains)
|
||||
{
|
||||
maybe_true = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return maybe_true;
|
||||
}
|
||||
|
||||
BoolMask KeyCondition::checkRPNAgainstHyperrectangle(
|
||||
const RPN & rpn,
|
||||
const Hyperrectangle & hyperrectangle,
|
||||
const KeyCondition::SpaceFillingCurveDescriptions & key_space_filling_curves,
|
||||
const DataTypes & data_types,
|
||||
bool single_point,
|
||||
const ColumnIndexToBloomFilter & column_index_to_column_bf)
|
||||
{
|
||||
std::vector<BoolMask> rpn_stack;
|
||||
|
||||
@ -3019,6 +3070,7 @@ BoolMask KeyCondition::checkInHyperrectangle(
|
||||
{
|
||||
if (element.argument_num_of_space_filling_curve.has_value())
|
||||
{
|
||||
// todo arthur, not sure what to do here yet
|
||||
/// If a condition on argument of a space filling curve wasn't collapsed into FUNCTION_ARGS_IN_HYPERRECTANGLE,
|
||||
/// we cannot process it.
|
||||
rpn_stack.emplace_back(true, true);
|
||||
@ -3063,6 +3115,12 @@ BoolMask KeyCondition::checkInHyperrectangle(
|
||||
bool contains = element.range.containsRange(*key_range);
|
||||
|
||||
rpn_stack.emplace_back(intersects, !contains);
|
||||
|
||||
if (rpn_stack.back().can_be_true && element.bloom_filter_data)
|
||||
{
|
||||
rpn_stack.back().can_be_true = mayExistOnBloomFilter(*element.bloom_filter_data, column_index_to_column_bf);
|
||||
}
|
||||
|
||||
if (element.function == RPNElement::FUNCTION_NOT_IN_RANGE)
|
||||
rpn_stack.back() = !rpn_stack.back();
|
||||
}
|
||||
@ -3236,6 +3294,12 @@ BoolMask KeyCondition::checkInHyperrectangle(
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Set for IN is not created yet");
|
||||
|
||||
rpn_stack.emplace_back(element.set_index->checkInRange(hyperrectangle, data_types, single_point));
|
||||
|
||||
if (rpn_stack.back().can_be_true && element.bloom_filter_data)
|
||||
{
|
||||
rpn_stack.back().can_be_true = mayExistOnBloomFilter(*element.bloom_filter_data, column_index_to_column_bf);
|
||||
}
|
||||
|
||||
if (element.function == RPNElement::FUNCTION_NOT_IN_SET)
|
||||
rpn_stack.back() = !rpn_stack.back();
|
||||
}
|
||||
|
@ -148,6 +148,13 @@ public:
|
||||
/// TODO handle the cases when generate RPN.
|
||||
bool extractPlainRanges(Ranges & ranges) const;
|
||||
|
||||
struct BloomFilterData
|
||||
{
|
||||
using HashesForColumns = std::vector<std::vector<uint64_t>>;
|
||||
HashesForColumns hashes_per_column;
|
||||
std::vector<std::size_t> key_columns;
|
||||
};
|
||||
|
||||
/// The expression is stored as Reverse Polish Notation.
|
||||
struct RPNElement
|
||||
{
|
||||
@ -224,6 +231,8 @@ public:
|
||||
Polygon polygon;
|
||||
|
||||
MonotonicFunctionsChain monotonic_functions_chain;
|
||||
|
||||
std::optional<BloomFilterData> bloom_filter_data;
|
||||
};
|
||||
|
||||
using RPN = std::vector<RPNElement>;
|
||||
@ -237,6 +246,44 @@ public:
|
||||
|
||||
bool isRelaxed() const { return relaxed; }
|
||||
|
||||
/// Space-filling curves in the key
|
||||
enum class SpaceFillingCurveType
|
||||
{
|
||||
Unknown = 0,
|
||||
Morton,
|
||||
Hilbert
|
||||
};
|
||||
static const std::unordered_map<String, SpaceFillingCurveType> space_filling_curve_name_to_type;
|
||||
|
||||
struct SpaceFillingCurveDescription
|
||||
{
|
||||
size_t key_column_pos;
|
||||
String function_name;
|
||||
std::vector<String> arguments;
|
||||
SpaceFillingCurveType type;
|
||||
};
|
||||
using SpaceFillingCurveDescriptions = std::vector<SpaceFillingCurveDescription>;
|
||||
SpaceFillingCurveDescriptions key_space_filling_curves;
|
||||
|
||||
struct BloomFilter
|
||||
{
|
||||
virtual ~BloomFilter() = default;
|
||||
|
||||
virtual bool findHash(uint64_t hash) = 0;
|
||||
};
|
||||
|
||||
using ColumnIndexToBloomFilter = std::unordered_map<std::size_t, std::unique_ptr<BloomFilter>>;
|
||||
|
||||
static BoolMask checkRPNAgainstHyperrectangle(
|
||||
const RPN & rpn,
|
||||
const Hyperrectangle & hyperrectangle,
|
||||
const KeyCondition::SpaceFillingCurveDescriptions & key_space_filling_curves,
|
||||
const DataTypes & data_types,
|
||||
bool single_point,
|
||||
const ColumnIndexToBloomFilter & column_index_to_column_bf = {});
|
||||
|
||||
bool isSinglePoint() const { return single_point; }
|
||||
|
||||
private:
|
||||
BoolMask checkInRange(
|
||||
size_t used_key_size,
|
||||
@ -358,24 +405,6 @@ private:
|
||||
/// All intermediate columns are used to calculate key_expr.
|
||||
const NameSet key_subexpr_names;
|
||||
|
||||
/// Space-filling curves in the key
|
||||
enum class SpaceFillingCurveType
|
||||
{
|
||||
Unknown = 0,
|
||||
Morton,
|
||||
Hilbert
|
||||
};
|
||||
static const std::unordered_map<String, SpaceFillingCurveType> space_filling_curve_name_to_type;
|
||||
|
||||
struct SpaceFillingCurveDescription
|
||||
{
|
||||
size_t key_column_pos;
|
||||
String function_name;
|
||||
std::vector<String> arguments;
|
||||
SpaceFillingCurveType type;
|
||||
};
|
||||
using SpaceFillingCurveDescriptions = std::vector<SpaceFillingCurveDescription>;
|
||||
SpaceFillingCurveDescriptions key_space_filling_curves;
|
||||
void getAllSpaceFillingCurves();
|
||||
|
||||
/// Array joined column names
|
||||
|
@ -0,0 +1,8 @@
|
||||
{
|
||||
"data": [],
|
||||
"rows": 0,
|
||||
"statistics": {
|
||||
"rows_read": 0,
|
||||
"bytes_read": 0
|
||||
}
|
||||
}
|
@ -0,0 +1,29 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: no-ubsan, no-fasttest
|
||||
|
||||
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CUR_DIR"/../shell_config.sh
|
||||
|
||||
|
||||
USER_FILES_PATH=$($CLICKHOUSE_CLIENT_BINARY --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}')
|
||||
|
||||
WORKING_DIR="${USER_FILES_PATH}/${CLICKHOUSE_TEST_UNIQUE_NAME}"
|
||||
|
||||
mkdir -p "${WORKING_DIR}"
|
||||
|
||||
DATA_FILE="${CUR_DIR}/data_parquet/integers_1_5_no_3_bf_minmax.parquet"
|
||||
|
||||
DATA_FILE_USER_PATH="${WORKING_DIR}/integers_1to5_no_3_bf_minmax.parquet"
|
||||
|
||||
cp ${DATA_FILE} ${DATA_FILE_USER_PATH}
|
||||
|
||||
# Prior to this PR, bloom filter and minmax were evaluated separately.
|
||||
# This was sub-optimal for conditions like `x = 3 or x > 5` where data is [1, 2, 4, 5].
|
||||
# Bloom filter is not able to handle greater than operations. Therefore, it can't evaluate x > 5. Even though it can tell
|
||||
# `3` is not in the set by evaluating `x = 3`, it can't discard the row group because of the `or` condition.
|
||||
# On the other hand, min max can handle both. It'll evaluate x = 3 to true (because it is within the range) and the latter to false
|
||||
# Therefore, bloom filter would determine `false or true` and minmax would determine `true or false`. Resulting in true.
|
||||
|
||||
# Since both structures are now evaluated together, the row group should be skipped
|
||||
${CLICKHOUSE_CLIENT} --query="select * from file('${DATA_FILE_USER_PATH}', Parquet) WHERE int8 = 3 or int8 > 5 FORMAT Json SETTINGS input_format_parquet_filter_push_down=true, input_format_parquet_bloom_filter_push_down=true;" | jq 'del(.meta,.statistics.elapsed)'
|
Binary file not shown.
Loading…
Reference in New Issue
Block a user