This commit is contained in:
LiuNeng 2024-11-21 00:08:01 +01:00 committed by GitHub
commit 2a5944e311
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
31 changed files with 89684 additions and 15 deletions

3
.gitmodules vendored
View File

@ -375,3 +375,6 @@
[submodule "contrib/postgres"]
path = contrib/postgres
url = https://github.com/ClickHouse/postgres.git
[submodule "contrib/xsimd"]
path = contrib/xsimd
url = https://github.com/xtensor-stack/xsimd.git

View File

@ -205,6 +205,7 @@ else ()
endif ()
add_contrib (xxHash-cmake xxHash)
add_contrib (xsimd-cmake xsimd)
add_contrib (expected-cmake expected)

1
contrib/xsimd vendored Submodule

@ -0,0 +1 @@
Subproject commit caeb777bdd38f0c3a00277b4b64c8ec259b0027a

View File

@ -0,0 +1,9 @@
set (LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/xsimd")
set(XSIMD_INCLUDE_DIR "${LIBRARY_DIR}/include")
add_library(xsimd INTERFACE)
target_include_directories(xsimd INTERFACE
${XSIMD_INCLUDE_DIR})
target_compile_features(xsimd INTERFACE cxx_std_11)
add_library(ch_contrib::xsimd ALIAS xsimd)

View File

@ -287,6 +287,7 @@ target_link_libraries (dbms PRIVATE ch_contrib::libdivide)
if (TARGET ch_contrib::jemalloc)
target_link_libraries (dbms PRIVATE ch_contrib::jemalloc)
endif()
target_compile_options(dbms PRIVATE -mavx2)
set (all_modules dbms)
macro (dbms_target_include_directories)
@ -454,7 +455,9 @@ endif()
dbms_target_link_libraries (
PUBLIC
boost::circular_buffer
boost::heap)
boost::heap
ch_contrib::xsimd
)
target_link_libraries(clickhouse_common_io PUBLIC
ch_contrib::miniselect
@ -684,5 +687,6 @@ if (ENABLE_TESTS)
if (TARGET ch_contrib::parquet)
target_link_libraries(unit_tests_dbms PRIVATE ch_contrib::parquet)
target_link_libraries(unit_tests_dbms PRIVATE ch_contrib::xsimd)
endif()
endif ()

View File

@ -905,7 +905,11 @@ The server successfully detected this situation and will download merged part fr
M(MemoryWorkerRunElapsedMicroseconds, "Total time spent by MemoryWorker for background work", ValueType::Microseconds) \
\
M(ParquetFetchWaitTimeMicroseconds, "Time of waiting fetching parquet data", ValueType::Microseconds) \
M(ParquetFilteredRows, "rows filtered by push down filters, include skipped rows", ValueType::Number) \
M(ParquetSkippedRows, "rows skipped by push down filters", ValueType::Number) \
M(ParquetOutputRows, "parquet output rows", ValueType::Number) \
M(ParquetSkipPageNum, "pages skipped", ValueType::Number) \
\
#ifdef APPLY_FOR_EXTERNAL_EVENTS
#define APPLY_FOR_EVENTS(M) APPLY_FOR_BUILTIN_EVENTS(M) APPLY_FOR_EXTERNAL_EVENTS(M)

View File

@ -273,7 +273,10 @@ Field convertFieldToTypeImpl(const Field & src, const IDataType & type, const ID
/// We don't need any conversion Int64 is under type of Date32
return src;
}
if (which_type.isDateTime() && isInt64OrUInt64FieldType(src.getType()))
{
return static_cast<UInt32>(src.safeGet<UInt64>());
}
if (which_type.isDateTime64() && src.getType() == Field::Types::Decimal64)
{
const auto & from_type = src.safeGet<Decimal64>();

View File

@ -0,0 +1,720 @@
#include "ColumnFilter.h"
#include <base/Decimal.h>
#include <Columns/ColumnSet.h>
#include <Columns/FilterDescription.h>
#include <Interpreters/Set.h>
#include <Processors/Formats/Impl/Parquet/xsimd_wrapper.h>
#include <format>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int PARQUET_EXCEPTION;
extern const int LOGICAL_ERROR;
}
template<class T>
struct PhysicTypeTraits
{
using simd_internal_type = T;
using simd_type = xsimd::batch<simd_internal_type>;
using simd_bool_type = xsimd::batch_bool<simd_internal_type>;
using simd_idx_type = xsimd::batch<simd_internal_type>;
using idx_type = simd_internal_type;
};
template struct PhysicTypeTraits<Int32>;
template struct PhysicTypeTraits<Int64>;
template<> struct PhysicTypeTraits<Float32>
{
using simd_internal_type = Float32;
using simd_type = xsimd::batch<simd_internal_type>;
using simd_bool_type = xsimd::batch_bool<simd_internal_type>;
using simd_idx_type = xsimd::batch<Int32>;
using idx_type = Int32;
};
template<> struct PhysicTypeTraits<Float64>
{
using simd_internal_type = Float64;
using simd_type = xsimd::batch<simd_internal_type>;
using simd_bool_type = xsimd::batch_bool<simd_internal_type>;
using simd_idx_type = xsimd::batch<Int64>;
using idx_type = Int64;
};
template<> struct PhysicTypeTraits<DateTime64>
{
using simd_internal_type = Int64;
using simd_type = xsimd::batch<simd_internal_type>;
using simd_bool_type = xsimd::batch_bool<simd_internal_type>;
using simd_idx_type = xsimd::batch<simd_internal_type>;
using idx_type = simd_internal_type;
};
template <typename T, typename S>
void FilterHelper::filterPlainFixedData(const S* src, PaddedPODArray<T> & dst, const RowSet & row_set, size_t rows_to_read)
{
using batch_type = PhysicTypeTraits<S>::simd_type;
using bool_type = PhysicTypeTraits<S>::simd_bool_type;
auto increment = batch_type::size;
auto num_batched = rows_to_read / increment;
for (size_t i = 0; i < num_batched; ++i)
{
auto rows = i * increment;
bool_type mask = bool_type::load_aligned(row_set.activeAddress() + rows);
auto old_size = dst.size();
if (xsimd::none(mask))
continue;
else if (xsimd::all(mask))
{
dst.resize( old_size + increment);
if constexpr (std::is_same_v<T, S>)
{
auto * start = dst.data() + old_size;
memcpySmallAllowReadWriteOverflow15(start, src + rows, increment * sizeof(S));
}
else
{
for (size_t j = 0; j < increment; ++j)
{
dst[old_size + j] = static_cast<T>(src[rows + j]);
}
}
}
else
{
for (size_t j = 0; j < increment; ++j)
{
size_t idx = rows + j;
if (row_set.get(idx))
dst.push_back(static_cast<T>(src[idx]));
}
}
}
for (size_t i = num_batched * increment; i < rows_to_read; ++i)
{
if (row_set.get(i))
dst.push_back(static_cast<T>(src[i]));
}
}
template void FilterHelper::filterPlainFixedData<Int16, Int32>(Int32 const*, DB::PaddedPODArray<Int16>&, DB::RowSet const&, size_t);
template void FilterHelper::filterPlainFixedData<Int16, Int16>(Int16 const*, DB::PaddedPODArray<Int16>&, DB::RowSet const&, size_t);
template void FilterHelper::filterPlainFixedData<UInt16, Int32>(Int32 const*, DB::PaddedPODArray<UInt16>&, DB::RowSet const&, size_t);
template void FilterHelper::filterPlainFixedData<UInt16, UInt16>(UInt16 const*, DB::PaddedPODArray<UInt16>&, DB::RowSet const&, size_t);
template void FilterHelper::filterPlainFixedData<Int32, Int32>(Int32 const*, DB::PaddedPODArray<Int32>&, DB::RowSet const&, size_t);
template void FilterHelper::filterPlainFixedData<UInt32, UInt32>(UInt32 const*, DB::PaddedPODArray<UInt32>&, DB::RowSet const&, size_t);
template void FilterHelper::filterPlainFixedData<UInt32, Int32>(Int32 const*, DB::PaddedPODArray<UInt32>&, DB::RowSet const&, size_t);
template void FilterHelper::filterPlainFixedData<UInt32, Int64>(Int64 const*, DB::PaddedPODArray<UInt32>&, DB::RowSet const&, size_t);
template void FilterHelper::filterPlainFixedData<Int64, Int64>(const Int64* src, PaddedPODArray<Int64> & dst, const RowSet & row_set, size_t rows_to_read);
template void FilterHelper::filterPlainFixedData<Float32, Float32>(const Float32* src, PaddedPODArray<Float32> & dst, const RowSet & row_set, size_t rows_to_read);
template void FilterHelper::filterPlainFixedData<Float64, Float64>(const Float64* src, PaddedPODArray<Float64> & dst, const RowSet & row_set, size_t rows_to_read);
template void FilterHelper::filterPlainFixedData<DateTime64, Int64>(const Int64* src, PaddedPODArray<DateTime64> & dst, const RowSet & row_set, size_t rows_to_read);
template void FilterHelper::filterPlainFixedData<DateTime64, DateTime64>(const DateTime64* src, PaddedPODArray<DateTime64> & dst, const RowSet & row_set, size_t rows_to_read);
template <typename T>
void FilterHelper::gatherDictFixedValue(
const PaddedPODArray<T> & dict, PaddedPODArray<T> & dst, const PaddedPODArray<Int32> & idx, size_t rows_to_read)
{
dst.resize(rows_to_read);
for (size_t i = 0; i < rows_to_read; ++i)
{
dst[i] = dict[idx[i]];
}
}
template void FilterHelper::gatherDictFixedValue(
const PaddedPODArray<Int32> & dict, PaddedPODArray<Int32> & data, const PaddedPODArray<Int32> & idx, size_t rows_to_read);
template void FilterHelper::gatherDictFixedValue(
const PaddedPODArray<Int64> & dict, PaddedPODArray<Int64> & data, const PaddedPODArray<Int32> & idx, size_t rows_to_read);
template void FilterHelper::gatherDictFixedValue(
const PaddedPODArray<Float32> & dict, PaddedPODArray<Float32> & data, const PaddedPODArray<Int32> & idx, size_t rows_to_read);
template void FilterHelper::gatherDictFixedValue(
const PaddedPODArray<Float64> & dict, PaddedPODArray<Float64> & data, const PaddedPODArray<Int32> & idx, size_t rows_to_read);
template void FilterHelper::gatherDictFixedValue(
const PaddedPODArray<DateTime64> & dict, PaddedPODArray<DateTime64> & data, const PaddedPODArray<Int32> & idx, size_t rows_to_read);
template void FilterHelper::gatherDictFixedValue(
const PaddedPODArray<Int16> & dict, PaddedPODArray<Int16> & data, const PaddedPODArray<Int32> & idx, size_t rows_to_read);
template <typename T>
void FilterHelper::filterDictFixedData(const PaddedPODArray<T> & dict, PaddedPODArray<T> & dst, const PaddedPODArray<Int32> & idx, const RowSet & row_set, size_t rows_to_read)
{
using batch_type = PhysicTypeTraits<T>::simd_type;
using bool_type = PhysicTypeTraits<T>::simd_bool_type;
using idx_batch_type = PhysicTypeTraits<T>::simd_idx_type;
using simd_internal_type = PhysicTypeTraits<T>::simd_internal_type;
auto increment = batch_type::size;
auto num_batched = rows_to_read / increment;
for (size_t i = 0; i < num_batched; ++i)
{
auto rows = i * increment;
bool_type mask = bool_type::load_aligned(row_set.activeAddress() + rows);
if (xsimd::none(mask))
continue;
else if (xsimd::all(mask))
{
auto old_size = dst.size();
auto * start = dst.data() + old_size;
dst.resize( old_size + increment);
idx_batch_type idx_batch = idx_batch_type::load_unaligned(idx.data() + rows);
auto batch = batch_type::gather(reinterpret_cast<const simd_internal_type *>(dict.data()), idx_batch);
batch.store_unaligned(reinterpret_cast<simd_internal_type *>(start));
}
else
{
for (size_t j = 0; j < increment; ++j)
if (row_set.get(rows + j))
dst.push_back(dict[idx[rows + j]]);
}
}
for (size_t i = num_batched * increment; i < rows_to_read; ++i)
{
if (row_set.get(i))
dst.push_back(dict[idx[i]]);
}
}
template void FilterHelper::filterDictFixedData(const PaddedPODArray<Int32> & dict, PaddedPODArray<Int32> & dst, const PaddedPODArray<Int32> & idx, const RowSet & row_set, size_t rows_to_read);
template void FilterHelper::filterDictFixedData(const PaddedPODArray<Int64> & dict, PaddedPODArray<Int64> & dst, const PaddedPODArray<Int32> & idx, const RowSet & row_set, size_t rows_to_read);
template void FilterHelper::filterDictFixedData(const PaddedPODArray<Float32> & dict, PaddedPODArray<Float32> & dst, const PaddedPODArray<Int32> & idx, const RowSet & row_set, size_t rows_to_read);
template void FilterHelper::filterDictFixedData(const PaddedPODArray<Float64> & dict, PaddedPODArray<Float64> & dst, const PaddedPODArray<Int32> & idx, const RowSet & row_set, size_t rows_to_read);
template void FilterHelper::filterDictFixedData(const PaddedPODArray<DateTime64> & dict, PaddedPODArray<DateTime64> & dst, const PaddedPODArray<Int32> & idx, const RowSet & row_set, size_t rows_to_read);
template void FilterHelper::filterDictFixedData(const PaddedPODArray<Int16> & dict, PaddedPODArray<Int16> & dst, const PaddedPODArray<Int32> & idx, const RowSet & row_set, size_t rows_to_read);
template <class T, bool negated>
void BigIntRangeFilter::testIntValues(RowSet & row_set, size_t len, const T * data) const
{
using batch_type = xsimd::batch<T>;
using bool_type = xsimd::batch_bool<T>;
auto increment = batch_type::size;
auto num_batched = len / increment;
batch_type min_batch = batch_type::broadcast(lower);
batch_type max_batch;
if (!is_single_value)
{
if constexpr (std::is_same_v<T, Int64>)
max_batch = batch_type::broadcast(upper);
else if constexpr (std::is_same_v<T, Int32>)
max_batch = batch_type::broadcast(upper32);
else if constexpr (std::is_same_v<T, Int16>)
max_batch = batch_type::broadcast(upper16);
else
UNREACHABLE();
}
bool aligned = row_set.getOffset() % increment == 0;
for (size_t i = 0; i < num_batched; ++i)
{
batch_type value;
const auto rows = i * increment;
if (aligned)
value = batch_type::load_aligned(data + rows);
else
value = batch_type::load_unaligned(data + rows);
bool_type mask;
if (is_single_value)
{
if constexpr (std::is_same_v<T, Int32>)
{
if unlikely(lower32 != lower)
mask = bool_type(false);
else
mask = value == min_batch;
}
else if constexpr (std::is_same_v<T, Int16>)
{
if unlikely(lower16 != lower)
mask = bool_type(false);
else
mask = value == min_batch;
}
else
{
mask = value == min_batch;
}
}
else
mask = (value >= min_batch) && (value <= max_batch);
if constexpr (negated)
mask = ~mask;
if (aligned)
mask.store_aligned(row_set.activeAddress() + rows);
else
mask.store_unaligned(row_set.activeAddress() + rows);
}
for (size_t i = num_batched * increment; i < len ; ++i)
{
bool value = data[i] >= lower & data[i] <= upper;
if (negated)
value = !value;
row_set.set(i, value);
}
}
template void BigIntRangeFilter::testIntValues(RowSet & row_set, size_t len, const Int64 * data) const;
template void BigIntRangeFilter::testIntValues(RowSet & row_set, size_t len, const Int32 * data) const;
template void BigIntRangeFilter::testIntValues(RowSet & row_set, size_t len, const Int16 * data) const;
void BigIntRangeFilter::testInt64Values(DB::RowSet & row_set, size_t len, const Int64 * data) const
{
testIntValues(row_set, len, data);
}
void BigIntRangeFilter::testInt32Values(RowSet & row_set, size_t len, const Int32 * data) const
{
testIntValues(row_set, len, data);
}
void BigIntRangeFilter::testInt16Values(RowSet & row_set, size_t len, const Int16 * data) const
{
testIntValues(row_set, len, data);
}
bool isFunctionNode(const ActionsDAG::Node & node)
{
return node.function_base != nullptr;
}
bool isInputNode(const ActionsDAG::Node & node)
{
return node.type == ActionsDAG::ActionType::INPUT;
}
bool isConstantNode(const ActionsDAG::Node & node)
{
return node.type == ActionsDAG::ActionType::COLUMN;
}
bool isCompareColumnWithConst(const ActionsDAG::Node & node)
{
if (!isFunctionNode(node))
return false;
// TODO: support or function
if (node.function_base->getName() == "or")
return false;
size_t input_count = 0;
size_t constant_count = 0;
for (const auto & child : node.children)
{
if (isInputNode(*child))
++input_count;
if (isConstantNode(*child))
++constant_count;
}
return input_count == 1 && constant_count >= 1;
}
const ActionsDAG::Node * getInputNode(const ActionsDAG::Node & node)
{
for (const auto & child : node.children)
{
if (isInputNode(*child))
return child;
}
throw DB::Exception(ErrorCodes::PARQUET_EXCEPTION, "No input node found");
}
ActionsDAG::NodeRawConstPtrs getConstantNode(const ActionsDAG::Node & node)
{
ActionsDAG::NodeRawConstPtrs result;
for (const auto & child : node.children)
{
if (isConstantNode(*child))
result.push_back(child);
}
return result;
}
OptionalFilter BigIntRangeFilter::create(const ActionsDAG::Node & node)
{
if (!isCompareColumnWithConst(node))
return std::nullopt;
const auto * input_node = getInputNode(node);
auto name = input_node->result_name;
if (!isInt64(input_node->result_type) && !isInt32(input_node->result_type) && !isInt16(input_node->result_type))
return std::nullopt;
auto constant_nodes = getConstantNode(node);
auto func_name = node.function_base->getName();
ColumnFilterPtr filter = nullptr;
if (func_name == "equals")
{
Int64 value = constant_nodes.front()->column->getInt(0);
filter = std::make_shared<BigIntRangeFilter>(value, value, false);
}
else if (func_name == "less")
{
Int64 value = constant_nodes.front()->column->getInt(0);
filter = std::make_shared<BigIntRangeFilter>(std::numeric_limits<Int64>::min(), value - 1, false);
}
else if (func_name == "greater")
{
Int64 value = constant_nodes.front()->column->getInt(0);
filter = std::make_shared<BigIntRangeFilter>(value + 1, std::numeric_limits<Int64>::max(), false);
}
else if (func_name == "lessOrEquals")
{
Int64 value = constant_nodes.front()->column->getInt(0);
filter = std::make_shared<BigIntRangeFilter>(std::numeric_limits<Int64>::min(), value, true);
}
else if (func_name == "greaterOrEquals")
{
Int64 value = constant_nodes.front()->column->getInt(0);
filter = std::make_shared<BigIntRangeFilter>(value, std::numeric_limits<Int64>::max(), true);
}
if (filter)
{
return std::make_optional(std::make_pair(name, filter));
}
return std::nullopt;
}
ColumnFilterPtr nullOrFalse(bool null_allowed) {
if (null_allowed) {
return std::make_shared<IsNullFilter>();
}
return std::make_unique<AlwaysFalseFilter>();
}
ColumnFilterPtr BigIntRangeFilter::merge(const ColumnFilter * other) const
{
switch (other->kind())
{
case AlwaysTrue:
case AlwaysFalse:
case IsNull:
return other->merge(this);
case IsNotNull:
return std::make_shared<BigIntRangeFilter>(lower, upper, false);
case BigIntRange: {
bool both_null_allowed = null_allowed && other->testNull();
const auto * other_range = dynamic_cast<const BigIntRangeFilter *>(other);
if (other_range->lower > upper || other_range->upper < lower)
{
return nullOrFalse(both_null_allowed);
}
return std::make_shared<BigIntRangeFilter>(
std::max(lower, other_range->lower), std::min(upper, other_range->upper), both_null_allowed);
}
default:
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Can't merge filter of kind {}", magic_enum::enum_name(other->kind()));
}
}
ColumnFilterPtr ByteValuesFilter::merge(const ColumnFilter * other) const
{
switch (other->kind())
{
case AlwaysTrue:
case AlwaysFalse:
case IsNull:
return other->merge(this);
case IsNotNull:
return clone(false);
case ByteValues: {
bool both_null_allowed = null_allowed && other->testNull();
const auto & other_values = dynamic_cast<const ByteValuesFilter *>(other);
// TODO: add string lower bound and upper bound to test always false fastly
const ByteValuesFilter * small_filter = this;
const ByteValuesFilter * large_filter = other_values;
if (small_filter->values.size() > large_filter->values.size())
{
std::swap(small_filter, large_filter);
}
std::vector<String> new_values;
new_values.reserve(small_filter->values.size());
for (const auto & value : large_filter->values)
{
if (small_filter->values.contains(value))
new_values.push_back(value);
}
if (new_values.empty())
{
return nullOrFalse(both_null_allowed);
}
return std::make_shared<ByteValuesFilter>(new_values, both_null_allowed);
}
default:
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Can't merge filter of kind {}", magic_enum::enum_name(other->kind()));
}
}
OptionalFilter ByteValuesFilter::create(const ActionsDAG::Node & node)
{
if (!isCompareColumnWithConst(node))
return std::nullopt;
const auto * input_node = getInputNode(node);
auto name = input_node->result_name;
if (!isString(input_node->result_type))
return std::nullopt;
auto constant_nodes = getConstantNode(node);
auto func_name = node.function_base->getName();
ColumnFilterPtr filter = nullptr;
if (func_name == "equals")
{
auto value = constant_nodes.front()->column->getDataAt(0);
String str;
str.resize(value.size);
memcpy(str.data(), value.data, value.size);
std::vector<String> values = {str};
filter = std::make_shared<ByteValuesFilter>(values, false);
}
else if (func_name == "in")
{
const auto *arg = checkAndGetColumn<const ColumnConst>(constant_nodes.front()->column.get());
const auto * column_set = checkAndGetColumn<const ColumnSet>(&arg->getDataColumn());
if (!column_set)
throw DB::Exception(ErrorCodes::NOT_IMPLEMENTED, "Only ColumnSet is supported in IN clause, but got {}", arg->getDataColumn().getName());
auto set = column_set->getData()->get();
auto elements = set->getSetElements().front();
std::vector<String> values;
for (size_t i = 0; i < elements->size(); ++i)
{
auto value = elements->getDataAt(i);
String str;
str.resize(value.size);
memcpy(str.data(), value.data, value.size);
values.emplace_back(str);
}
filter = std::make_shared<ByteValuesFilter>(values, false);
}
if (filter)
{
return std::make_optional(std::make_pair(name, filter));
}
return std::nullopt;
}
template <is_floating_point T>
ColumnFilterPtr FloatRangeFilter<T>::merge(const ColumnFilter * other) const
{
switch (other->kind())
{
case AlwaysTrue:
case AlwaysFalse:
case IsNull:
return other->merge(this);
case IsNotNull:
return std::make_shared<FloatRangeFilter<T>>(
min, lower_unbounded, lower_exclusive, max, upper_unbounded, upper_exclusive, false);
case FloatRange:
case DoubleRange:
{
bool both_null_allowed = null_allowed && other->testNull();
auto otherRange = static_cast<const FloatRangeFilter<T>*>(other);
auto lower = std::max(min, otherRange->min);
auto upper = std::min(max, otherRange->max);
auto both_lower_unbounded =
lower_unbounded && otherRange->lower_unbounded;
auto both_upper_unbounded =
upper_unbounded && otherRange->upper_unbounded;
auto lower_exclusive_ = !both_lower_unbounded &&
(!testFloat64(lower) || !other->testFloat64(lower));
auto upper_exclusive_ = !both_upper_unbounded &&
(!testFloat64(upper) || !other->testFloat64(upper));
if (lower > upper || (lower == upper && lower_exclusive)) {
nullOrFalse(both_null_allowed);
}
return std::make_unique<FloatRangeFilter<T>>(
lower,
both_lower_unbounded,
lower_exclusive_,
upper,
both_upper_unbounded,
upper_exclusive_,
both_null_allowed);
}
default:
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Can't merge filter of kind {}", magic_enum::enum_name(other->kind()));
}
}
template <> class FloatRangeFilter<Float32>;
template <> class FloatRangeFilter<Float64>;
ColumnFilterPtr NegatedByteValuesFilter::merge(const ColumnFilter * other) const
{
switch (other->kind())
{
case ColumnFilterKind::AlwaysTrue:
case ColumnFilterKind::AlwaysFalse:
case ColumnFilterKind::IsNull:
return other->merge(this);
case ColumnFilterKind::IsNotNull:
return this->clone(false);
case ColumnFilterKind::ByteValues:
return other->merge(this);
default:
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Can't merge filter of kind {}", magic_enum::enum_name(other->kind()));
}
}
ColumnFilterPtr NegatedBigIntRangeFilter::clone(std::optional<bool> null_allowed_) const
{
return std::make_shared<NegatedBigIntRangeFilter>(non_negated->lower, non_negated->upper, null_allowed_.value_or(null_allowed));
}
OptionalFilter NegatedBigIntRangeFilter::create(const ActionsDAG::Node & node)
{
if (!isCompareColumnWithConst(node))
return std::nullopt;
const auto * input_node = getInputNode(node);
auto name = input_node->result_name;
if (!isInt64(input_node->result_type) && !isInt32(input_node->result_type) && !isInt16(input_node->result_type))
return std::nullopt;
auto constant_nodes = getConstantNode(node);
auto func_name = node.function_base->getName();
ColumnFilterPtr filter = nullptr;
if (func_name == "notEquals")
{
Int64 value = constant_nodes.front()->column->getInt(0);
filter = std::make_shared<NegatedBigIntRangeFilter>(value, value, false);
}
if (filter)
{
return std::make_optional(std::make_pair(name, filter));
}
return std::nullopt;
}
ColumnFilterPtr NegatedBigIntRangeFilter::merge(const ColumnFilter * ) const
{
throw DB::Exception(ErrorCodes::PARQUET_EXCEPTION, "Unsupported merge operation");
}
DB::OptionalFilter DB::NegatedByteValuesFilter::create(const ActionsDAG::Node & node)
{
if (!isCompareColumnWithConst(node))
return std::nullopt;
const auto * input_node = getInputNode(node);
auto name = input_node->result_name;
if (!isString(input_node->result_type))
return std::nullopt;
auto constant_nodes = getConstantNode(node);
auto func_name = node.function_base->getName();
ColumnFilterPtr filter = nullptr;
if (func_name == "notEquals")
{
auto value = constant_nodes.front()->column->getDataAt(0);
String str;
str.resize(value.size);
memcpy(str.data(), value.data, value.size);
std::vector<String> values = {str};
filter = std::make_shared<NegatedByteValuesFilter>(values, false);
}
if (filter)
{
return std::make_optional(std::make_pair(name, filter));
}
return std::nullopt;
}
ColumnFilterPtr NegatedByteValuesFilter::clone(std::optional<bool> ) const
{
return nullptr;
}
OptionalFilter createFloatRangeFilter(const ActionsDAG::Node & node)
{
if (!isCompareColumnWithConst(node))
return std::nullopt;
const auto * input_node = getInputNode(node);
if (!isFloat(input_node->result_type))
return std::nullopt;
bool is_float32 = WhichDataType(input_node->result_type).isFloat32();
if (is_float32)
{
return Float32RangeFilter::create(node);
}
else
{
return Float64RangeFilter::create(node);
}
}
bool RowSet::none() const
{
bool res = true;
auto increment = xsimd::batch_bool<unsigned char>::size;
auto num_batched = max_rows / increment;
for (size_t i = 0; i < num_batched; ++i)
{
auto batch = xsimd::batch_bool<unsigned char>::load_aligned(mask.data() + (i * increment));
res &= xsimd::none(batch);
if (!res)
return false;
}
for (size_t i = num_batched * increment; i < max_rows; ++i)
{
res &= !mask[i];
}
return res;
}
bool RowSet::all() const
{
bool res = true;
auto increment = xsimd::batch_bool<unsigned char>::size;
auto num_batched = max_rows / increment;
for (size_t i = 0; i < num_batched; ++i)
{
auto batch = xsimd::batch_bool<unsigned char>::load_aligned(mask.data() + (i * increment));
res &= xsimd::all(batch);
if (!res)
return res;
}
for (size_t i = num_batched * increment; i < max_rows; ++i)
{
res &= mask[i];
}
return res;
}
bool RowSet::any() const
{
bool res = false;
auto increment = xsimd::batch_bool<unsigned char>::size;
auto num_batched = max_rows / increment;
for (size_t i = 0; i < num_batched; ++i)
{
auto batch = xsimd::batch_bool<unsigned char>::load_aligned(mask.data() + (i * increment));
res |= xsimd::any(batch);
}
for (size_t i = num_batched * increment; i < max_rows; ++i)
{
res |= mask[i];
}
return res;
}
IColumn::Filter ExpressionFilter::execute(const ColumnsWithTypeAndName & columns)
{
auto block = Block(columns);
actions->execute(block);
FilterDescription filter_desc(*block.getByName(filter_name).column);
auto mutable_column = filter_desc.data_holder ? filter_desc.data_holder->assumeMutable() : block.getByName(filter_name).column->assumeMutable();
ColumnUInt8 * uint8_col = static_cast<ColumnUInt8 *>(mutable_column.get());
IColumn::Filter filter;
filter.swap(uint8_col->getData());
return filter;
}
NameSet ExpressionFilter::getInputs()
{
NameSet result;
auto inputs = actions->getActionsDAG().getInputs();
for (const auto & input : inputs)
{
result.insert(input->result_name);
}
return result;
}
ExpressionFilter::ExpressionFilter(ActionsDAG && dag_)
{
actions = std::make_shared<ExpressionActions>(std::move(dag_));
filter_name = actions->getActionsDAG().getOutputs().front()->result_name;
if (!isUInt8(removeNullable(actions->getActionsDAG().getOutputs().front()->result_type)))
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Filter result type must be UInt8");
}
}
}

View File

@ -0,0 +1,559 @@
#pragma once
#include <iostream>
#include <unordered_set>
#include <Columns/ColumnsCommon.h>
#include <Functions/IFunction.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/ExpressionActions.h>
#include <base/types.h>
#include <boost/dynamic_bitset.hpp>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int PARQUET_EXCEPTION;
}
class RowSet
{
public:
explicit RowSet(size_t max_rows_) : max_rows(max_rows_)
{
mask.resize(max_rows, true);
std::fill(mask.begin(), mask.end(), true);
}
inline void set(size_t i, bool value) { mask[offset + i] = value; }
inline bool get(size_t i) const
{
auto position = offset + i;
if (position >= max_rows)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "RowSet index out of bound: {} >= {}", i, max_rows);
return mask[position];
}
inline size_t totalRows() const { return max_rows; }
inline void setOffset(size_t offset_) { offset = offset_; }
inline size_t getOffset() const { return offset; }
bool none() const;
bool all() const;
bool any() const;
void setAllTrue() { std::fill(mask.begin(), mask.end(), true); }
void setAllFalse() { std::fill(mask.begin(), mask.end(), false); }
size_t count() const
{
return countBytesInFilter(reinterpret_cast<const UInt8 *>(mask.data()), 0, mask.size());
}
// PaddedPODArray<bool> & maskReference() { return mask; }
// const PaddedPODArray<bool> & maskReference() const { return mask; }
const bool * activeAddress() const { return mask.data() + offset; }
bool * activeAddress() { return mask.data() + offset; }
private:
size_t max_rows = 0;
size_t offset = 0;
PaddedPODArray<bool> mask;
};
using OptionalRowSet = std::optional<RowSet>;
bool isConstantNode(const ActionsDAG::Node & node);
bool isCompareColumnWithConst(const ActionsDAG::Node & node);
const ActionsDAG::Node * getInputNode(const ActionsDAG::Node & node);
ActionsDAG::NodeRawConstPtrs getConstantNode(const ActionsDAG::Node & node);
class ColumnFilter;
using ColumnFilterPtr = std::shared_ptr<ColumnFilter>;
enum ColumnFilterKind
{
Unknown,
AlwaysTrue,
AlwaysFalse,
IsNull,
IsNotNull,
BigIntRange,
NegatedBigintRange,
Int64In,
FloatRange,
DoubleRange,
ByteValues,
NegatedByteValues
};
class FilterHelper
{
public:
template <typename T, typename S>
static void filterPlainFixedData(const S * src, PaddedPODArray<T> & dst, const RowSet & row_set, size_t rows_to_read);
template <typename T>
static void
gatherDictFixedValue(const PaddedPODArray<T> & dict, PaddedPODArray<T> & dst, const PaddedPODArray<Int32> & idx, size_t rows_to_read);
template <typename T>
static void filterDictFixedData(
const PaddedPODArray<T> & dict,
PaddedPODArray<T> & dst,
const PaddedPODArray<Int32> & idx,
const RowSet & row_set,
size_t rows_to_read);
};
class ExpressionFilter
{
public:
explicit ExpressionFilter(ActionsDAG && dag_);
NameSet getInputs();
IColumn::Filter execute(const ColumnsWithTypeAndName & columns);
private:
ExpressionActionsPtr actions;
String filter_name;
};
class ColumnFilter
{
protected:
ColumnFilter(ColumnFilterKind kind, bool null_allowed_) : kind_(kind), null_allowed(null_allowed_) { }
public:
virtual ~ColumnFilter() = default;
virtual ColumnFilterKind kind() const { return kind_; }
virtual ColumnPtr testByExpression(ColumnPtr) { throw DB::Exception(ErrorCodes::NOT_IMPLEMENTED, "testByExpression not implemented"); }
virtual bool testNull() const { return null_allowed; }
virtual bool testNotNull() const { return true; }
virtual bool testInt64(Int64) const { throw DB::Exception(ErrorCodes::NOT_IMPLEMENTED, "testInt64 not implemented"); }
virtual bool testInt32(Int32) const { throw DB::Exception(ErrorCodes::NOT_IMPLEMENTED, "testInt32 not implemented"); }
virtual bool testInt16(Int16) const { throw DB::Exception(ErrorCodes::NOT_IMPLEMENTED, "testInt16 not implemented"); }
virtual bool testFloat32(Float32) const { throw DB::Exception(ErrorCodes::NOT_IMPLEMENTED, "testFloat32 not implemented"); }
virtual bool testFloat64(Float64) const { throw DB::Exception(ErrorCodes::NOT_IMPLEMENTED, "testFloat64 not implemented"); }
virtual bool testBool(bool) const { throw DB::Exception(ErrorCodes::NOT_IMPLEMENTED, "testBool not implemented"); }
virtual bool testString(const String & /*value*/) const
{
throw DB::Exception(ErrorCodes::NOT_IMPLEMENTED, "testString not implemented");
}
virtual bool testInt64Range(Int64, Int64) const { throw DB::Exception(ErrorCodes::NOT_IMPLEMENTED, "testInt64Range not implemented"); }
virtual bool testFloat32Range(Float32, Float32) const
{
throw DB::Exception(ErrorCodes::NOT_IMPLEMENTED, "testFloat32Range not implemented");
}
virtual bool testFloat64Range(Float64, Float64) const
{
throw DB::Exception(ErrorCodes::NOT_IMPLEMENTED, "testFloat64Range not implemented");
}
virtual void testInt64Values(RowSet & row_set, size_t len, const Int64 * data) const
{
for (size_t i = 0; i < len; ++i)
{
row_set.set(i, testInt64(data[i]));
}
}
virtual void testInt32Values(RowSet & row_set, size_t len, const Int32 * data) const
{
for (size_t i = 0; i < len; ++i)
{
row_set.set(i, testInt32(data[i]));
}
}
virtual void testInt16Values(RowSet & row_set, size_t len, const Int16 * data) const
{
for (size_t i = 0; i < len; ++i)
{
row_set.set(i, testInt16(data[i]));
}
}
virtual void testFloat32Values(RowSet & row_set, size_t len, const Float32 * data) const
{
for (size_t i = 0; i < len; ++i)
{
row_set.set(i, testFloat32(data[i]));
}
}
virtual void testFloat64Values(RowSet & row_set, size_t len, const Float64 * data) const
{
for (size_t i = 0; i < len; ++i)
{
row_set.set(i, testFloat64(data[i]));
}
}
virtual ColumnFilterPtr merge(const ColumnFilter * /*filter*/) const
{
throw DB::Exception(ErrorCodes::NOT_IMPLEMENTED, "merge not implemented");
}
virtual ColumnFilterPtr clone(std::optional<bool> nullAllowed) const = 0;
virtual String toString() const
{
return fmt::format("Filter({}, {})", magic_enum::enum_name(kind()), null_allowed ? "null allowed" : "null not allowed");
}
protected:
ColumnFilterKind kind_;
bool null_allowed;
};
class IsNullFilter : public ColumnFilter
{
public:
IsNullFilter() : ColumnFilter(IsNull, true) { }
bool testNotNull() const override { return false; }
ColumnFilterPtr clone(std::optional<bool>) const override { return std::make_shared<IsNullFilter>(); }
};
class IsNotNullFilter : public ColumnFilter
{
public:
IsNotNullFilter() : ColumnFilter(IsNotNull, false) { }
ColumnFilterPtr clone(std::optional<bool>) const override { return std::make_shared<IsNotNullFilter>(); }
};
class AlwaysTrueFilter : public ColumnFilter
{
public:
AlwaysTrueFilter() : ColumnFilter(AlwaysTrue, true) { }
bool testNull() const override { return true; }
bool testNotNull() const override { return true; }
bool testInt64(Int64) const override { return true; }
bool testFloat32(Float32) const override { return true; }
bool testFloat64(Float64) const override { return true; }
bool testBool(bool) const override { return true; }
bool testInt64Range(Int64, Int64) const override { return true; }
bool testFloat32Range(Float32, Float32) const override { return true; }
bool testFloat64Range(Float64, Float64) const override { return true; }
void testInt64Values(RowSet & set, size_t, const Int64 *) const override { set.setAllTrue(); }
bool testString(const String & /*value*/) const override { return true; }
ColumnFilterPtr merge(const ColumnFilter * /*filter*/) const override { return std::make_shared<AlwaysTrueFilter>(); }
ColumnFilterPtr clone(std::optional<bool>) const override { return std::make_shared<AlwaysTrueFilter>(); }
};
class AlwaysFalseFilter : public ColumnFilter
{
public:
AlwaysFalseFilter() : ColumnFilter(AlwaysFalse, false) { }
bool testNull() const override { return false; }
bool testNotNull() const override { return false; }
bool testInt64(Int64) const override { return false; }
bool testFloat32(Float32) const override { return false; }
bool testFloat64(Float64) const override { return false; }
bool testBool(bool) const override { return true; }
bool testInt64Range(Int64, Int64) const override { return false; }
bool testFloat32Range(Float32, Float32) const override { return false; }
bool testFloat64Range(Float64, Float64) const override { return false; }
void testInt64Values(RowSet & set, size_t, const Int64 *) const override { set.setAllFalse(); }
bool testString(const String & /*value*/) const override { return false; }
ColumnFilterPtr merge(const ColumnFilter * /*filter*/) const override { return std::make_shared<AlwaysFalseFilter>(); }
ColumnFilterPtr clone(std::optional<bool>) const override { return std::make_shared<AlwaysFalseFilter>(); }
};
using OptionalFilter = std::optional<std::pair<String, ColumnFilterPtr>>;
class BigIntRangeFilter : public ColumnFilter
{
friend class NegatedBigIntRangeFilter;
public:
static OptionalFilter create(const ActionsDAG::Node & node);
explicit BigIntRangeFilter(const Int64 lower_, const Int64 max_, bool null_allowed_)
: ColumnFilter(BigIntRange, null_allowed_)
, upper(max_)
, lower(lower_)
, lower32(static_cast<Int32>(std::max<Int64>(lower, std::numeric_limits<int32_t>::min())))
, upper32(static_cast<Int32>(std::min<Int64>(upper, std::numeric_limits<int32_t>::max())))
, lower16(static_cast<Int16>(std::max<Int64>(lower, std::numeric_limits<int16_t>::min())))
, upper16(static_cast<Int16>(std::min<Int64>(upper, std::numeric_limits<int16_t>::max())))
, is_single_value(upper == lower)
{
}
~BigIntRangeFilter() override = default;
bool testInt64(Int64 int64) const override { return int64 >= lower && int64 <= upper; }
bool testInt32(Int32 int32) const override { return int32 >= lower && int32 <= upper; }
bool testInt16(Int16 int16) const override { return int16 >= lower && int16 <= upper; }
bool testInt64Range(Int64 lower_, Int64 upper_) const override { return lower >= lower_ && upper_ <= upper; }
void testInt64Values(RowSet & row_set, size_t len, const Int64 * data) const override;
void testInt32Values(RowSet & row_set, size_t len, const Int32 * data) const override;
void testInt16Values(RowSet & row_set, size_t len, const Int16 * data) const override;
ColumnFilterPtr merge(const ColumnFilter * filter) const override;
ColumnFilterPtr clone(std::optional<bool> null_allowed_) const override
{
return std::make_shared<BigIntRangeFilter>(lower, upper, null_allowed_.value_or(null_allowed));
}
String toString() const override
{
return fmt::format("BigIntRangeFilter: [{}, {}] {}", lower, upper, null_allowed ? "with nulls" : "no nulls");
}
Int64 getUpper() const { return upper; }
Int64 getLower() const { return lower; }
private:
template <class T, bool negated = false>
void testIntValues(RowSet & row_set, size_t len, const T * data) const;
const Int64 upper;
const Int64 lower;
const int32_t lower32;
const int32_t upper32;
const int16_t lower16;
const int16_t upper16;
bool is_single_value [[maybe_unused]];
};
class NegatedBigIntRangeFilter : public ColumnFilter
{
public:
static OptionalFilter create(const ActionsDAG::Node & node);
NegatedBigIntRangeFilter(int64_t lower, int64_t upper, bool null_allowed_)
: ColumnFilter(ColumnFilterKind::NegatedBigintRange, null_allowed_),
non_negated(std::make_unique<BigIntRangeFilter>(lower, upper, !null_allowed_)) {
}
bool testInt64(Int64 int64) const override { return !non_negated->testInt64(int64); }
bool testInt32(Int32 int32) const override { return !non_negated->testInt32(int32); }
bool testInt16(Int16 int16) const override { return !non_negated->testInt16(int16); }
void testInt64Values(RowSet & row_set, size_t len, const Int64 * data) const override
{
non_negated->testIntValues<Int64, true>(row_set, len, data);
}
void testInt32Values(RowSet & row_set, size_t len, const Int32 * data) const override
{
non_negated->testIntValues<Int32, true>(row_set, len, data);
}
void testInt16Values(RowSet & row_set, size_t len, const Int16 * data) const override
{
non_negated->testIntValues<Int16, true>(row_set, len, data);
}
ColumnFilterPtr merge(const ColumnFilter * filter) const override;
ColumnFilterPtr clone(std::optional<bool> null_allowed_) const override;
String toString() const override
{
return fmt::format("NegatedBigIntRangeFilter: [{}, {}] {}", non_negated->lower, non_negated->upper, null_allowed ? "with nulls" : "no nulls");
}
private:
std::unique_ptr<BigIntRangeFilter> non_negated;
};
class ByteValuesFilter : public ColumnFilter
{
friend class NegatedByteValuesFilter;
public:
static OptionalFilter create(const ActionsDAG::Node & node);
ByteValuesFilter(const std::vector<String> & values_, bool null_allowed_)
: ColumnFilter(ByteValues, null_allowed_), values(values_.begin(), values_.end())
{
std::ranges::for_each(values_, [&](const String & value) { lengths.insert(value.size()); });
lower = *std::min_element(values_.begin(), values_.end());
upper = *std::max_element(values_.begin(), values_.end());
}
ByteValuesFilter(const ByteValuesFilter & other, bool nullAllowed)
: ColumnFilter(ColumnFilterKind::ByteValues, nullAllowed)
, lower(other.lower)
, upper(other.upper)
, values(other.values)
, lengths(other.lengths)
{
}
bool testString(const String & value) const override { return lengths.contains(value.size()) && values.contains(value); }
ColumnFilterPtr merge(const ColumnFilter * filter) const override;
ColumnFilterPtr clone(std::optional<bool> null_allowed_) const override
{
return std::make_shared<ByteValuesFilter>(*this, null_allowed_.value_or(null_allowed));
}
String toString() const override { return "ByteValuesFilter(" + lower + ", " + upper + ")"; }
private:
String lower;
String upper;
std::unordered_set<String> values;
std::unordered_set<size_t> lengths;
};
class NegatedByteValuesFilter : public ColumnFilter
{
public:
static OptionalFilter create(const ActionsDAG::Node & node);
NegatedByteValuesFilter(const std::vector<String> & values_, bool null_allowed_)
: ColumnFilter(NegatedByteValues, null_allowed_), non_negated(std::make_unique<ByteValuesFilter>(values_, !null_allowed_))
{
}
NegatedByteValuesFilter(const NegatedByteValuesFilter & other, bool nullAllowed)
: ColumnFilter(ColumnFilterKind::NegatedByteValues, nullAllowed)
, non_negated(std::make_unique<ByteValuesFilter>(*other.non_negated, other.non_negated->null_allowed))
{
}
bool testString(const String & string) const override { return !non_negated->testString(string); }
ColumnFilterPtr merge (const ColumnFilter * filter) const override;
ColumnFilterPtr clone(std::optional<bool> null_allowed_) const override;
private:
std::unique_ptr<ByteValuesFilter> non_negated;
};
class AbstractRange : public ColumnFilter
{
public:
bool lowerUnbounded() const { return lower_unbounded; }
bool lowerExclusive() const { return lower_exclusive; }
bool upperUnbounded() const { return upper_unbounded; }
bool upperExclusive() const { return upper_exclusive; }
protected:
AbstractRange(
bool lower_unbounded_,
bool lower_exclusive_,
bool upper_unbounded_,
bool upper_exclusive_,
bool null_allowed_,
ColumnFilterKind kind)
: ColumnFilter(kind, null_allowed_)
, lower_unbounded(lower_unbounded_)
, lower_exclusive(lower_exclusive_)
, upper_unbounded(upper_unbounded_)
, upper_exclusive(upper_exclusive_)
{
if (lower_unbounded && upper_unbounded)
{
throw DB::Exception(ErrorCodes::PARQUET_EXCEPTION, "A range filter must have a lower or upper bound");
}
}
const bool lower_unbounded;
const bool lower_exclusive;
const bool upper_unbounded;
const bool upper_exclusive;
};
template <class T>
concept is_floating_point = std::is_same_v<T, double> || std::is_same_v<T, float>;
template <is_floating_point T>
class FloatRangeFilter : public AbstractRange
{
public:
static OptionalFilter create(const ActionsDAG::Node & node)
{
if (!isCompareColumnWithConst(node))
return std::nullopt;
const auto * input_node = getInputNode(node);
auto name = input_node->result_name;
if (!isFloat(input_node->result_type))
return std::nullopt;
auto constant_nodes = getConstantNode(node);
auto func_name = node.function_base->getName();
T value;
if constexpr (std::is_same_v<T, Float32>)
value = constant_nodes.front()->column->getFloat32(0);
else
value = constant_nodes.front()->column->getFloat64(0);
ColumnFilterPtr filter = nullptr;
if (func_name == "less")
{
filter = std::make_shared<FloatRangeFilter<T>>(-std::numeric_limits<T>::infinity(), true, false, value, false, false, false);
}
else if (func_name == "greater")
{
filter = std::make_shared<FloatRangeFilter<T>>(value, false, false, std::numeric_limits<T>::infinity(), true, false, false);
}
else if (func_name == "lessOrEquals")
{
filter = std::make_shared<FloatRangeFilter<T>>(-std::numeric_limits<T>::infinity(), true, true, value, false, false, false);
}
else if (func_name == "greaterOrEquals")
{
filter = std::make_shared<FloatRangeFilter<T>>(value, false, false, std::numeric_limits<T>::infinity(), true, true, false);
}
if (filter)
return std::make_optional(std::make_pair(name, filter));
else
return std::nullopt;
}
FloatRangeFilter(
const T min_, bool lowerUnbounded, bool lowerExclusive, const T max_, bool upperUnbounded, bool upperExclusive, bool nullAllowed)
: AbstractRange(
lowerUnbounded,
lowerExclusive,
upperUnbounded,
upperExclusive,
nullAllowed,
(std::is_same_v<T, double>) ? ColumnFilterKind::DoubleRange : ColumnFilterKind::FloatRange)
, min(min_)
, max(max_)
{
}
FloatRangeFilter(const FloatRangeFilter & other, bool null_allowed_)
: AbstractRange(
other.lower_unbounded,
other.lower_exclusive,
other.upper_unbounded,
other.upper_exclusive,
null_allowed_,
(std::is_same_v<T, double>) ? ColumnFilterKind::DoubleRange : ColumnFilterKind::FloatRange)
, min(other.min)
, max(other.max)
{
chassert(lower_unbounded || !std::isnan(min));
chassert(upper_unbounded || !std::isnan(max));
}
bool testFloat32(Float32 value) const override { return testFloatingPoint(value); }
bool testFloat64(Float64 value) const override { return testFloatingPoint(static_cast<T>(value)); }
ColumnFilterPtr merge(const ColumnFilter * filter) const override;
ColumnFilterPtr clone(std::optional<bool> null_allowed_) const override
{
return std::make_shared<FloatRangeFilter>(*this, null_allowed_.value_or(null_allowed));
}
String toString() const override
{
return fmt::format(
"FloatRangeFilter: {}{}, {}{} {}",
(lower_exclusive || lower_unbounded) ? "(" : "[",
lower_unbounded ? "-inf" : std::to_string(min),
upper_unbounded ? "nan" : std::to_string(max),
(upper_exclusive && !upper_unbounded) ? ")" : "]",
null_allowed ? "with nulls" : "no nulls");
}
private:
bool testFloatingPoint(T value) const
{
if (std::isnan(value))
return false;
if (!lower_unbounded)
{
if (value < min)
return false;
if (lower_exclusive && min == value)
return false;
}
if (!upper_unbounded)
{
if (value > max)
return false;
if (upper_exclusive && value == max)
return false;
}
return true;
}
const T min;
const T max;
};
using Float32RangeFilter = FloatRangeFilter<Float32>;
using Float64RangeFilter = FloatRangeFilter<Float64>;
OptionalFilter createFloatRangeFilter(const ActionsDAG::Node & node);
}

View File

@ -0,0 +1,53 @@
#include "ColumnFilterHelper.h"
namespace DB
{
ColumnFilterCreators ColumnFilterHelper::creators = {BigIntRangeFilter::create, NegatedBigIntRangeFilter::create, createFloatRangeFilter, ByteValuesFilter::create, NegatedByteValuesFilter::create};
FilterSplitResult ColumnFilterHelper::splitFilterForPushDown(const ActionsDAG & filter_expression)
{
if (filter_expression.getOutputs().empty())
return {};
const auto * filter_node = filter_expression.getOutputs().front();
auto conditions = ActionsDAG::extractConjunctionAtoms(filter_node);
std::vector<ColumnFilterPtr> filters;
ActionsDAG::NodeRawConstPtrs unsupported_conditions;
FilterSplitResult split_result;
for (const auto * condition : conditions)
{
if (std::none_of(creators.begin(), creators.end(), [&](ColumnFilterCreator & creator) {
auto result = creator(*condition);
if (result.has_value())
split_result.filters[result.value().first].emplace_back(result.value().second);
return result.has_value();
}))
unsupported_conditions.push_back(condition);
}
for (auto & condition : unsupported_conditions)
{
auto actions_dag = ActionsDAG::buildFilterActionsDAG({condition});
if (actions_dag.has_value())
{
split_result.expression_filters.emplace_back(std::make_shared<ExpressionFilter>(std::move(actions_dag.value())));
}
}
return split_result;
}
void pushFilterToParquetReader(const ActionsDAG& filter_expression, ParquetReader & reader)
{
if (filter_expression.getOutputs().empty()) return ;
auto split_result = ColumnFilterHelper::splitFilterForPushDown(std::move(filter_expression));
for (const auto & item : split_result.filters)
{
for (const auto& filter: item.second)
{
reader.addFilter(item.first, filter);
}
}
for (auto & expression_filter : split_result.expression_filters)
{
reader.addExpressionFilter(expression_filter);
}
}
}

View File

@ -0,0 +1,32 @@
#pragma once
#include <Processors/Formats/Impl/Parquet/ColumnFilter.h>
#include <Interpreters/ActionsDAG.h>
#include <Processors/Formats/Impl/Parquet/ParquetReader.h>
namespace DB
{
using ColumnFilterCreator = std::function<OptionalFilter(const ActionsDAG::Node &)>;
using ColumnFilterCreators = std::vector<ColumnFilterCreator>;
struct FilterSplitResult
{
std::unordered_map<String, std::vector<ColumnFilterPtr>> filters;
std::vector<std::shared_ptr<ExpressionFilter>> expression_filters;
};
class ColumnFilterHelper
{
public:
static FilterSplitResult splitFilterForPushDown(const ActionsDAG& filter_expression);
private:
static ColumnFilterCreators creators;
};
void pushFilterToParquetReader(const ActionsDAG& filter_expression, ParquetReader & reader);
}

View File

@ -0,0 +1,167 @@
#include "PageReader.h"
#include <parquet/thrift_internal.h>
#include <iostream>
#include <Common/ProfileEvents.h>
namespace ProfileEvents
{
extern const Event ParquetSkipPageNum;
}
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int PARQUET_EXCEPTION;
}
bool LazyPageReader::hasNext()
{
parquet::ThriftDeserializer deserializer(properties);
if (seen_num_values >= total_num_values)
return false;
uint32_t header_size = 0;
uint32_t allowed_header_size = DEFAULT_PAGE_HEADER_SIZE;
while (true)
{
if (stream->available() == 0)
{
return false;
}
// This gets used, then set by DeserializeThriftMsg
header_size = allowed_header_size;
try
{
// Reset current page header to avoid unclearing the __isset flag.
current_page_header = parquet::format::PageHeader();
deserializer.DeserializeMessage(
reinterpret_cast<const uint8_t *>(stream->position()), &header_size, &current_page_header, nullptr);
break;
}
catch (std::exception & e)
{
// Failed to deserialize. Double the allowed page header size and try again
allowed_header_size *= 2;
if (allowed_header_size > max_page_header_size)
{
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Deserializing page header failed: {}", e.what());
}
}
}
stream->seek(header_size, SEEK_CUR);
return true;
}
template <typename H>
parquet::EncodedStatistics ExtractStatsFromHeader(const H & header)
{
parquet::EncodedStatistics page_statistics;
if (!header.__isset.statistics)
{
return page_statistics;
}
const parquet::format::Statistics & stats = header.statistics;
// Use the new V2 min-max statistics over the former one if it is filled
if (stats.__isset.max_value || stats.__isset.min_value)
{
// TODO: check if the column_order is TYPE_DEFINED_ORDER.
if (stats.__isset.max_value)
{
page_statistics.set_max(stats.max_value);
}
if (stats.__isset.min_value)
{
page_statistics.set_min(stats.min_value);
}
}
else if (stats.__isset.max || stats.__isset.min)
{
// TODO: check created_by to see if it is corrupted for some types.
// TODO: check if the sort_order is SIGNED.
if (stats.__isset.max)
{
page_statistics.set_max(stats.max);
}
if (stats.__isset.min)
{
page_statistics.set_min(stats.min);
}
}
if (stats.__isset.null_count)
{
page_statistics.set_null_count(stats.null_count);
}
if (stats.__isset.distinct_count)
{
page_statistics.set_distinct_count(stats.distinct_count);
}
return page_statistics;
}
std::shared_ptr<parquet::Page> LazyPageReader::nextPage()
{
size_t compressed_len = current_page_header.compressed_page_size;
size_t uncompressed_len = current_page_header.uncompressed_page_size;
if (compressed_len > stream->available())
{
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Page was smaller {} than expected {}", stream->available(), compressed_len);
}
const parquet::PageType::type page_type = parquet::LoadEnumSafe(&current_page_header.type);
if (page_type == parquet::PageType::DICTIONARY_PAGE)
{
std::shared_ptr<arrow::Buffer> page_buffer
= decompressIfNeeded(reinterpret_cast<const uint8_t *>(stream->position()), compressed_len, uncompressed_len);
stream->seek(compressed_len, SEEK_CUR);
const parquet::format::DictionaryPageHeader & dict_header = current_page_header.dictionary_page_header;
bool is_sorted = dict_header.__isset.is_sorted ? dict_header.is_sorted : false;
return std::make_shared<parquet::DictionaryPage>(
page_buffer, dict_header.num_values, parquet::LoadEnumSafe(&dict_header.encoding), is_sorted);
}
else if (page_type == parquet::PageType::DATA_PAGE)
{
const parquet::format::DataPageHeader & header = current_page_header.data_page_header;
parquet::EncodedStatistics data_page_statistics = ExtractStatsFromHeader(header);
auto page_buffer = decompressIfNeeded(reinterpret_cast<const uint8_t *>(stream->position()), compressed_len, uncompressed_len);
stream->seek(compressed_len, SEEK_CUR);
return std::make_shared<parquet::DataPageV1>(
page_buffer,
header.num_values,
parquet::LoadEnumSafe(&header.encoding),
parquet::LoadEnumSafe(&header.definition_level_encoding),
parquet::LoadEnumSafe(&header.repetition_level_encoding),
uncompressed_len,
data_page_statistics);
}
else
{
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Unsupported page type {}", magic_enum::enum_name(page_type));
}
}
std::shared_ptr<arrow::Buffer> LazyPageReader::decompressIfNeeded(const uint8_t * data, size_t compressed_size, size_t uncompressed_size)
{
if (!decompressor)
return std::make_shared<arrow::Buffer>(data, compressed_size);
decompression_buffer.resize_fill(uncompressed_size, 0);
PARQUET_THROW_NOT_OK(
decompressor->Decompress(compressed_size, data, uncompressed_size, reinterpret_cast<uint8_t *>(decompression_buffer.data())));
return std::make_shared<arrow::Buffer>(
reinterpret_cast<const uint8_t *>(decompression_buffer.data()), static_cast<int64_t>(uncompressed_size));
}
void LazyPageReader::skipNextPage()
{
size_t compressed_len = current_page_header.compressed_page_size;
stream->seek(compressed_len, SEEK_CUR);
ProfileEvents::increment(ProfileEvents::ParquetSkipPageNum, 1);
}
const parquet::format::PageHeader & LazyPageReader::peekNextPageHeader()
{
return current_page_header;
}
}

View File

@ -0,0 +1,44 @@
#include <IO/ReadBufferFromMemory.h>
#include <Common/PODArray.h>
#include <Processors/Formats/Impl/Parquet/generated/parquet_types.h>
#include <parquet/properties.h>
#include <parquet/column_page.h>
namespace DB
{
class LazyPageReader
{
public:
LazyPageReader(
const std::shared_ptr<ReadBufferFromMemory> & stream_, const parquet::ReaderProperties & properties_, size_t total_num_values_, parquet::Compression::type codec)
: stream(stream_), properties(properties_), total_num_values(total_num_values_)
{
decompressor = parquet::GetCodec(codec);
}
bool hasNext();
const parquet::format::PageHeader& peekNextPageHeader();
std::shared_ptr<parquet::Page> nextPage();
void skipNextPage();
private:
std::shared_ptr<arrow::Buffer> decompressIfNeeded(const uint8_t * data, size_t compressed_size, size_t uncompressed_size);
std::shared_ptr<ReadBufferFromMemory> stream;
parquet::format::PageHeader current_page_header;
std::shared_ptr<parquet::Page> current_page;
const parquet::ReaderProperties properties;
std::shared_ptr<arrow::util::Codec> decompressor;
PaddedPODArray<UInt8> decompression_buffer;
static const size_t max_page_header_size = 16 * 1024 * 1024;
static const size_t DEFAULT_PAGE_HEADER_SIZE = 16 * 1024;
size_t seen_num_values = 0;
size_t total_num_values = 0;
};
}

View File

@ -0,0 +1,576 @@
#include "ParquetColumnReaderFactory.h"
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeMap.h>
#include <Processors/Formats/Impl/Parquet/SelectiveColumnReader.h>
#include <Processors/Formats/Impl/Parquet/RowGroupChunkReader.h>
#include <Processors/Formats/Impl/Parquet/ParquetReader.h>
#include <parquet/column_reader.h>
#include <Common/ProfileEvents.h>
namespace ProfileEvents
{
extern const Event ParquetFetchWaitTimeMicroseconds;
}
namespace DB
{
template <parquet::Type::type physical_type, TypeIndex target_type, bool dict>
SelectiveColumnReaderPtr createColumnReader(
PageReaderCreator /*page_reader_creator*/, const ScanSpec & /*scan_spec*/, const parquet::LogicalType & /*logical_type*/)
{
throw DB::Exception(
ErrorCodes::NOT_IMPLEMENTED,
"ParquetColumnReaderFactory::createColumnReader: not implemented for physical type {} and target type {}",
magic_enum::enum_name(physical_type),
magic_enum::enum_name(target_type));
}
template <>
SelectiveColumnReaderPtr createColumnReader<parquet::Type::INT64, TypeIndex::Int64, true>(
PageReaderCreator page_reader_creator, const ScanSpec & scan_spec, const parquet::LogicalType & /*logical_type*/)
{
return std::make_shared<NumberDictionaryReader<DataTypeInt64, Int64>>(
std::move(page_reader_creator), scan_spec, std::make_shared<DataTypeInt64>());
}
template <>
SelectiveColumnReaderPtr createColumnReader<parquet::Type::INT64, TypeIndex::Int64, false>(
PageReaderCreator page_reader_creator, const ScanSpec & scan_spec, const parquet::LogicalType & /*logical_type*/)
{
return std::make_shared<NumberColumnDirectReader<DataTypeInt64, Int64>>(
std::move(page_reader_creator), scan_spec, std::make_shared<DataTypeInt64>());
}
static UInt32 getScaleFromLogicalTimestamp(parquet::LogicalType::TimeUnit::unit tm_unit)
{
switch (tm_unit)
{
case parquet::LogicalType::TimeUnit::MILLIS:
return 3;
case parquet::LogicalType::TimeUnit::MICROS:
return 6;
case parquet::LogicalType::TimeUnit::NANOS:
return 9;
default:
throw DB::Exception(ErrorCodes::PARQUET_EXCEPTION, ", invalid timestamp unit: {}", tm_unit);
}
}
template <>
SelectiveColumnReaderPtr createColumnReader<parquet::Type::INT64, TypeIndex::DateTime64, false>(
PageReaderCreator page_reader_creator, const ScanSpec & scan_spec, const parquet::LogicalType & logical_type)
{
DataTypePtr type_datetime64;
if (logical_type.is_timestamp())
{
const auto & tm_type = dynamic_cast<const parquet::TimestampLogicalType &>(logical_type);
type_datetime64 = std::make_shared<DataTypeDateTime64>(getScaleFromLogicalTimestamp(tm_type.time_unit()));
}
else
type_datetime64 = std::make_shared<DataTypeDateTime64>(3);
return std::make_shared<NumberColumnDirectReader<DataTypeDateTime64, Int64>>(
std::move(page_reader_creator), scan_spec, type_datetime64);
}
template <>
SelectiveColumnReaderPtr createColumnReader<parquet::Type::INT64, TypeIndex::DateTime64, true>(
PageReaderCreator page_reader_creator, const ScanSpec & scan_spec, const parquet::LogicalType & logical_type)
{
DataTypePtr type_datetime64;
if (logical_type.is_timestamp())
{
const auto & tm_type = dynamic_cast<const parquet::TimestampLogicalType &>(logical_type);
type_datetime64 = std::make_shared<DataTypeDateTime64>(getScaleFromLogicalTimestamp(tm_type.time_unit()));
}
else
type_datetime64 = std::make_shared<DataTypeDateTime64>(0);
return std::make_shared<NumberDictionaryReader<DataTypeDateTime64, Int64>>(
std::move(page_reader_creator), scan_spec, type_datetime64);
}
template <>
SelectiveColumnReaderPtr createColumnReader<parquet::Type::INT64, TypeIndex::DateTime, false>(
PageReaderCreator page_reader_creator, const ScanSpec & scan_spec, const parquet::LogicalType &)
{
return std::make_shared<NumberColumnDirectReader<DataTypeDateTime, Int64>>(
std::move(page_reader_creator), scan_spec, std::make_shared<DataTypeDateTime>());
}
template <>
SelectiveColumnReaderPtr createColumnReader<parquet::Type::INT64, TypeIndex::DateTime, true>(
PageReaderCreator page_reader_creator, const ScanSpec & scan_spec, const parquet::LogicalType &)
{
return std::make_shared<NumberDictionaryReader<DataTypeDateTime, Int64>>(
std::move(page_reader_creator), scan_spec, std::make_shared<DataTypeDateTime>());
}
template <>
SelectiveColumnReaderPtr createColumnReader<parquet::Type::INT32, TypeIndex::Int16, false>(
PageReaderCreator page_reader_creator, const ScanSpec & scan_spec, const parquet::LogicalType &)
{
return std::make_shared<NumberColumnDirectReader<DataTypeInt16, Int32>>(
std::move(page_reader_creator), scan_spec, std::make_shared<DataTypeInt16>());
}
template <>
SelectiveColumnReaderPtr createColumnReader<parquet::Type::INT32, TypeIndex::Int16, true>(
PageReaderCreator page_reader_creator, const ScanSpec & scan_spec, const parquet::LogicalType &)
{
return std::make_shared<NumberDictionaryReader<DataTypeInt16, Int32>>(
std::move(page_reader_creator), scan_spec, std::make_shared<DataTypeInt16>());
}
template <>
SelectiveColumnReaderPtr createColumnReader<parquet::Type::INT32, TypeIndex::Int32, false>(
PageReaderCreator page_reader_creator, const ScanSpec & scan_spec, const parquet::LogicalType &)
{
return std::make_shared<NumberColumnDirectReader<DataTypeInt32, Int32>>(
std::move(page_reader_creator), scan_spec, std::make_shared<DataTypeInt32>());
}
template <>
SelectiveColumnReaderPtr createColumnReader<parquet::Type::INT32, TypeIndex::Int32, true>(
PageReaderCreator page_reader_creator, const ScanSpec & scan_spec, const parquet::LogicalType &)
{
return std::make_shared<NumberDictionaryReader<DataTypeInt32, Int32>>(
std::move(page_reader_creator), scan_spec, std::make_shared<DataTypeInt32>());
}
template <>
SelectiveColumnReaderPtr createColumnReader<parquet::Type::INT32, TypeIndex::Date32, false>(
PageReaderCreator page_reader_creator, const ScanSpec & scan_spec, const parquet::LogicalType &)
{
return std::make_shared<NumberColumnDirectReader<DataTypeDate32, Int32>>(
std::move(page_reader_creator), scan_spec, std::make_shared<DataTypeDate32>());
}
template <>
SelectiveColumnReaderPtr createColumnReader<parquet::Type::INT32, TypeIndex::Date32, true>(
PageReaderCreator page_reader_creator, const ScanSpec & scan_spec, const parquet::LogicalType &)
{
return std::make_shared<NumberDictionaryReader<DataTypeDate32, Int32>>(
std::move(page_reader_creator), scan_spec, std::make_shared<DataTypeDate32>());
}
template <>
SelectiveColumnReaderPtr createColumnReader<parquet::Type::INT32, TypeIndex::Date, false>(
PageReaderCreator page_reader_creator, const ScanSpec & scan_spec, const parquet::LogicalType &)
{
return std::make_shared<NumberColumnDirectReader<DataTypeDate, Int32>>(
std::move(page_reader_creator), scan_spec, std::make_shared<DataTypeDate>());
}
template <>
SelectiveColumnReaderPtr createColumnReader<parquet::Type::INT32, TypeIndex::Date, true>(
PageReaderCreator page_reader_creator, const ScanSpec & scan_spec, const parquet::LogicalType &)
{
return std::make_shared<NumberDictionaryReader<DataTypeDate, Int32>>(
std::move(page_reader_creator), scan_spec, std::make_shared<DataTypeDate>());
}
template <>
SelectiveColumnReaderPtr createColumnReader<parquet::Type::INT32, TypeIndex::DateTime, false>(
PageReaderCreator page_reader_creator, const ScanSpec & scan_spec, const parquet::LogicalType &)
{
return std::make_shared<NumberColumnDirectReader<DataTypeDateTime, Int32>>(
std::move(page_reader_creator), scan_spec, std::make_shared<DataTypeDateTime>());
}
template <>
SelectiveColumnReaderPtr createColumnReader<parquet::Type::INT32, TypeIndex::DateTime, true>(
PageReaderCreator page_reader_creator, const ScanSpec & scan_spec, const parquet::LogicalType &)
{
return std::make_shared<NumberDictionaryReader<DataTypeDateTime, Int32>>(
std::move(page_reader_creator), scan_spec, std::make_shared<DataTypeDateTime>());
}
template <>
SelectiveColumnReaderPtr createColumnReader<parquet::Type::FLOAT, TypeIndex::Float32, false>(
PageReaderCreator page_reader_creator, const ScanSpec & scan_spec, const parquet::LogicalType &)
{
return std::make_shared<NumberColumnDirectReader<DataTypeFloat32, Float32>>(
std::move(page_reader_creator), scan_spec, std::make_shared<DataTypeFloat32>());
}
template <>
SelectiveColumnReaderPtr createColumnReader<parquet::Type::FLOAT, TypeIndex::Float32, true>(
PageReaderCreator page_reader_creator, const ScanSpec & scan_spec, const parquet::LogicalType &)
{
return std::make_shared<NumberDictionaryReader<DataTypeFloat32, Float32>>(
std::move(page_reader_creator), scan_spec, std::make_shared<DataTypeFloat32>());
}
template <>
SelectiveColumnReaderPtr createColumnReader<parquet::Type::DOUBLE, TypeIndex::Float64, false>(
PageReaderCreator page_reader_creator, const ScanSpec & scan_spec, const parquet::LogicalType &)
{
return std::make_shared<NumberColumnDirectReader<DataTypeFloat64, Float64>>(
std::move(page_reader_creator), scan_spec, std::make_shared<DataTypeFloat64>());
}
template <>
SelectiveColumnReaderPtr createColumnReader<parquet::Type::DOUBLE, TypeIndex::Float64, true>(
PageReaderCreator page_reader_creator, const ScanSpec & scan_spec, const parquet::LogicalType &)
{
return std::make_shared<NumberDictionaryReader<DataTypeFloat64, Float64>>(
std::move(page_reader_creator), scan_spec, std::make_shared<DataTypeFloat64>());
}
template <>
SelectiveColumnReaderPtr createColumnReader<parquet::Type::BYTE_ARRAY, TypeIndex::String, false>(
PageReaderCreator page_reader_creator, const ScanSpec & scan_spec, const parquet::LogicalType &)
{
return std::make_shared<StringDirectReader>(
std::move(page_reader_creator), scan_spec);
}
template <>
SelectiveColumnReaderPtr createColumnReader<parquet::Type::BYTE_ARRAY, TypeIndex::String, true>(
PageReaderCreator page_reader_creator, const ScanSpec & scan_spec, const parquet::LogicalType &)
{
return std::make_shared<StringDictionaryReader>(
std::move(page_reader_creator), scan_spec);
}
ParquetColumnReaderFactory::Builder & ParquetColumnReaderFactory::Builder::columnDescriptor(const parquet::ColumnDescriptor * columnDescr)
{
column_descriptor_ = columnDescr;
return *this;
}
ParquetColumnReaderFactory::Builder & ParquetColumnReaderFactory::Builder::dictionary(bool dictionary)
{
dictionary_ = dictionary;
return *this;
}
ParquetColumnReaderFactory::Builder & ParquetColumnReaderFactory::Builder::nullable(bool nullable)
{
nullable_ = nullable;
return *this;
}
ParquetColumnReaderFactory::Builder & ParquetColumnReaderFactory::Builder::filter(const ColumnFilterPtr & filter)
{
filter_ = filter;
return *this;
}
ParquetColumnReaderFactory::Builder & ParquetColumnReaderFactory::Builder::targetType(const DataTypePtr & target_type)
{
target_type_ = removeNullable(target_type);
return *this;
}
ParquetColumnReaderFactory::Builder & ParquetColumnReaderFactory::Builder::pageReader(PageReaderCreator page_reader_creator_)
{
page_reader_creator = page_reader_creator_;
return *this;
}
SelectiveColumnReaderPtr ParquetColumnReaderFactory::Builder::build()
{
if (!column_descriptor_ || !page_reader_creator || !target_type_)
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "ParquetColumnReaderFactory::Builder: column descriptor, page reader and target type must be set");
ScanSpec scan_spec{.column_name = column_descriptor_->name(), .column_desc = column_descriptor_, .filter = filter_};
parquet::Type::type physical_type = column_descriptor_->physical_type();
TypeIndex target_type = target_type_->getTypeId();
const auto& logical_type = *column_descriptor_->logical_type();
SelectiveColumnReaderPtr leaf_reader = nullptr;
if (physical_type == parquet::Type::INT64)
{
if (isInt64(target_type))
{
if (dictionary_)
leaf_reader = createColumnReader<parquet::Type::INT64, TypeIndex::Int64, true>(std::move(page_reader_creator), scan_spec, logical_type);
else
leaf_reader = createColumnReader<parquet::Type::INT64, TypeIndex::Int64, false>(std::move(page_reader_creator), scan_spec, logical_type);
}
else if (isDateTime64(target_type))
{
if (dictionary_)
leaf_reader = createColumnReader<parquet::Type::INT64, TypeIndex::DateTime64, true>(std::move(page_reader_creator), scan_spec, logical_type);
else
leaf_reader = createColumnReader<parquet::Type::INT64, TypeIndex::DateTime64, false>(std::move(page_reader_creator), scan_spec, logical_type);
}
else if (isDateTime(target_type))
{
if (dictionary_)
leaf_reader = createColumnReader<parquet::Type::INT64, TypeIndex::DateTime, true>(std::move(page_reader_creator), scan_spec, logical_type);
else
leaf_reader = createColumnReader<parquet::Type::INT64, TypeIndex::DateTime, false>(std::move(page_reader_creator), scan_spec, logical_type);
}
}
else if (physical_type == parquet::Type::INT32)
{
if (isInt16(target_type))
{
if (dictionary_)
leaf_reader = createColumnReader<parquet::Type::INT32, TypeIndex::Int16, true>(std::move(page_reader_creator), scan_spec, logical_type);
else
leaf_reader = createColumnReader<parquet::Type::INT32, TypeIndex::Int16, false>(std::move(page_reader_creator), scan_spec, logical_type);
}
else if (isInt32(target_type))
{
if (dictionary_)
leaf_reader = createColumnReader<parquet::Type::INT32, TypeIndex::Int32, true>(std::move(page_reader_creator), scan_spec, logical_type);
else
leaf_reader = createColumnReader<parquet::Type::INT32, TypeIndex::Int32, false>(std::move(page_reader_creator), scan_spec, logical_type);
}
else if (isDate32(target_type))
{
if (dictionary_)
leaf_reader = createColumnReader<parquet::Type::INT32, TypeIndex::Date32, true>(std::move(page_reader_creator), scan_spec, logical_type);
else
leaf_reader = createColumnReader<parquet::Type::INT32, TypeIndex::Date32, false>(std::move(page_reader_creator), scan_spec, logical_type);
}
else if (isDate(target_type))
{
if (dictionary_)
leaf_reader = createColumnReader<parquet::Type::INT32, TypeIndex::Date, true>(std::move(page_reader_creator), scan_spec, logical_type);
else
leaf_reader = createColumnReader<parquet::Type::INT32, TypeIndex::Date, false>(std::move(page_reader_creator), scan_spec, logical_type);
}
else if (isDateTime(target_type))
{
if (dictionary_)
leaf_reader = createColumnReader<parquet::Type::INT32, TypeIndex::DateTime, true>(std::move(page_reader_creator), scan_spec, logical_type);
else
leaf_reader = createColumnReader<parquet::Type::INT32, TypeIndex::DateTime, false>(std::move(page_reader_creator), scan_spec, logical_type);
}
}
else if (physical_type == parquet::Type::FLOAT)
{
if (isFloat(target_type))
{
if (dictionary_)
leaf_reader = createColumnReader<parquet::Type::FLOAT, TypeIndex::Float32, true>(std::move(page_reader_creator), scan_spec, logical_type);
else
leaf_reader = createColumnReader<parquet::Type::FLOAT, TypeIndex::Float32, false>(std::move(page_reader_creator), scan_spec, logical_type);
}
}
else if (physical_type == parquet::Type::DOUBLE)
{
if (isFloat(target_type))
{
if (dictionary_)
leaf_reader = createColumnReader<parquet::Type::DOUBLE, TypeIndex::Float64, true>(std::move(page_reader_creator), scan_spec, logical_type);
else
leaf_reader = createColumnReader<parquet::Type::DOUBLE, TypeIndex::Float64, false>(std::move(page_reader_creator), scan_spec, logical_type);
}
}
else if (physical_type == parquet::Type::BYTE_ARRAY)
{
if (isString(target_type))
{
if (dictionary_)
leaf_reader = createColumnReader<parquet::Type::BYTE_ARRAY, TypeIndex::String, true>(std::move(page_reader_creator), scan_spec, logical_type);
else
leaf_reader = createColumnReader<parquet::Type::BYTE_ARRAY, TypeIndex::String, false>(std::move(page_reader_creator), scan_spec, logical_type);
}
}
if (!leaf_reader)
{
throw DB::Exception(
ErrorCodes::NOT_IMPLEMENTED,
"ParquetColumnReaderFactory::createColumnReader: not implemented for physical type {} and target type {}",
magic_enum::enum_name(physical_type),
magic_enum::enum_name(target_type));
}
if (nullable_)
return std::make_shared<OptionalColumnReader>(scan_spec, leaf_reader);
else
return leaf_reader;
}
ParquetColumnReaderFactory::Builder ParquetColumnReaderFactory::builder()
{
return ParquetColumnReaderFactory::Builder();
}
/// Returns true if repeated type is an element type for the list.
/// Used to determine legacy list types.
/// This method is copied from Spark Parquet reader and is based on the reference:
/// <https://github.com/apache/parquet-format/blob/master/LogicalTypes.md>
/// #backward-compatibility-rules
bool isListElement(parquet::schema::Node & node)
{
// For legacy 2-level list types with primitive element type, e.g.:
//
// // ARRAY<INT> (nullable list, non-null elements)
// optional group my_list (LIST) {
// repeated int32 element;
// }
//
return node.is_primitive() ||
// For legacy 2-level list types whose element type is a group type with 2 or more
// fields, e.g.:
//
// // ARRAY<STRUCT<str: STRING, num: INT>> (nullable list, non-null elements)
// optional group my_list (LIST) {
// repeated group element {
// required binary str (UTF8);
// required int32 num;
// };
// }
//
(node.is_group() && static_cast<parquet::schema::GroupNode&>(node).field_count() > 1) ||
// For legacy 2-level list types generated by parquet-avro (Parquet version < 1.6.0),
// e.g.:
//
// // ARRAY<STRUCT<str: STRING>> (nullable list, non-null elements)
// optional group my_list (LIST) {
// repeated group array {
// required binary str (UTF8);
// };
// }
//
node.name() == "array" ||
// For Parquet data generated by parquet-thrift, e.g.:
//
// // ARRAY<STRUCT<str: STRING>> (nullable list, non-null elements)
// optional group my_list (LIST) {
// repeated group my_list_tuple {
// required binary str (UTF8);
// };
// }
//
node.name().ends_with("_tuple");
}
std::shared_ptr<parquet::schema::GroupNode> checkAndGetGroupNode(parquet::schema::NodePtr node)
{
if (!node)
return nullptr;
if (!node->is_group())
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "need group node");
return std::static_pointer_cast<parquet::schema::GroupNode>(node);
}
SelectiveColumnReaderPtr ColumnReaderBuilder::buildReader(parquet::schema::NodePtr node, const DataTypePtr & target_type, int def_level, int rep_level)
{
if (node->repetition() == parquet::Repetition::UNDEFINED)
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Undefined repetition level");
if (!node->is_required())
def_level++;
if (node->is_repeated())
rep_level++;
if (node->is_primitive())
{
auto full_name = node->path()->ToDotString();
int column_idx = context.parquet_reader->metaData().schema()->ColumnIndex(*node);
RowGroupPrefetchPtr row_group_prefetch;
if (predicate_columns.contains(full_name))
row_group_prefetch = context.prefetch_conditions;
else
row_group_prefetch = context.prefetch;
auto column_range = getColumnRange(*context.row_group_meta->ColumnChunk(column_idx));
row_group_prefetch->prefetchRange(column_range);
PageReaderCreator creator = [&,row_group_prefetch , column_idx, column_range]
{
Stopwatch time;
row_group_prefetch->startPrefetch();
auto data = row_group_prefetch->readRange(column_range);
auto page_reader = std::make_unique<LazyPageReader>(
std::make_shared<ReadBufferFromMemory>(reinterpret_cast<char *>(data.data), data.size),
context.parquet_reader->readerProperties(),
context.row_group_meta->num_rows(),
context.row_group_meta->ColumnChunk(column_idx)->compression());
ProfileEvents::increment(ProfileEvents::ParquetFetchWaitTimeMicroseconds, time.elapsedMicroseconds());
return page_reader;
};
const auto * column_desc = context.parquet_reader->metaData().schema()->Column(column_idx);
auto leaf_reader = ParquetColumnReaderFactory::builder()
.nullable(node->is_optional())
.dictionary(context.row_group_meta->ColumnChunk(column_idx)->has_dictionary_page())
.columnDescriptor(column_desc)
.pageReader(std::move(creator))
.targetType(target_type)
.filter(inplace_filter_mapping.contains(full_name) ? inplace_filter_mapping.at(full_name) : nullptr)
.build();
return leaf_reader;
}
else if (node->converted_type() == parquet::ConvertedType::LIST)
{
auto group_node = checkAndGetGroupNode(node);
if (group_node->field_count() != 1)
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "List group node must have exactly one field");
auto repeated_field = group_node->field(0);
if (isListElement(*repeated_field))
{
const auto * array_type = checkAndGetDataType<DataTypeArray>(target_type.get());
auto reader = buildReader(repeated_field, array_type->getNestedType(), def_level, rep_level);
return std::make_shared<ListColumnReader>(rep_level, def_level, reader);
}
else
{
auto child_field = std::static_pointer_cast<parquet::schema::GroupNode>(repeated_field)->field(0);
const auto * array_type = checkAndGetDataType<DataTypeArray>(target_type.get());
auto reader = buildReader(child_field, array_type->getNestedType(), def_level, rep_level);
return std::make_shared<ListColumnReader>(rep_level, def_level, reader);
}
}
else if (node->converted_type() == parquet::ConvertedType::MAP || node->converted_type() == parquet::ConvertedType::MAP_KEY_VALUE)
{
auto map_node = checkAndGetGroupNode(node);
if (map_node->field_count() != 1)
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Map group node must have exactly one field");
auto key_value_node = checkAndGetGroupNode(map_node->field(0));
if (key_value_node->field_count() != 2)
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Map key-value group node must have exactly two fields");
if (!key_value_node->field(0)->is_primitive())
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Map key field must be primitive");
const auto& map_type = checkAndGetDataType<DataTypeMap>(*target_type);
auto key_value_types = checkAndGetDataType<DataTypeTuple>(*checkAndGetDataType<DataTypeArray>(*map_type.getNestedType()).getNestedType()).getElements();
auto key_reader = buildReader(key_value_node->field(0), key_value_types.front(), def_level+1, rep_level+1);
auto value_reader = buildReader(key_value_node->field(1), key_value_types.back(), def_level+1, rep_level+1);
return std::make_shared<MapColumnReader>(rep_level, def_level, key_reader, value_reader);
}
// Structure type
else
{
auto struct_node = checkAndGetGroupNode(node);
const auto *struct_type = checkAndGetDataType<DataTypeTuple>(target_type.get());
if (!struct_type)
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Target type for node {} must be DataTypeTuple", struct_node->name());
auto names = struct_type->getElementNames();
int child_num = struct_node->field_count();
std::unordered_map<String, SelectiveColumnReaderPtr> readers;
for (const auto& name : names)
{
for (int i = 0; i < child_num; ++i)
{
if (struct_node->field(i)->name() == name)
{
auto child_field = struct_node->field(i);
auto child_type = struct_type->getElements().at(i);
auto reader = buildReader(child_field, child_type, def_level, rep_level);
readers.emplace(name, reader);
}
}
if (!readers.contains(name))
{
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "{} not found in struct node {}", name, struct_node->name());
}
}
return std::make_shared<StructColumnReader>(readers, target_type);
}
}
ColumnReaderBuilder::ColumnReaderBuilder(
const Block & requiredColumns_,
const RowGroupContext & context_,
const std::unordered_map<String, ColumnFilterPtr> & inplaceFilterMapping_,
const std::unordered_set<String> & predicateColumns_)
: required_columns(requiredColumns_)
, context(context_)
, inplace_filter_mapping(inplaceFilterMapping_)
, predicate_columns(predicateColumns_)
{
}
}

View File

@ -0,0 +1,67 @@
#pragma once
#include <ostream>
#include <Processors/Formats/Impl/Parquet/ColumnFilter.h>
#include <DataTypes/IDataType.h>
namespace parquet
{
class ColumnDescriptor;
namespace schema
{
class Node;
using NodePtr = std::shared_ptr<Node>;
}
}
namespace DB
{
class SelectiveColumnReader;
using SelectiveColumnReaderPtr = std::shared_ptr<SelectiveColumnReader>;
class LazyPageReader;
struct RowGroupContext;
using PageReaderCreator = std::function<std::unique_ptr<LazyPageReader>()>;
class ParquetColumnReaderFactory
{
public:
class Builder
{
public:
Builder& dictionary(bool dictionary);
Builder& nullable(bool nullable);
Builder& columnDescriptor(const parquet::ColumnDescriptor * columnDescr);
Builder& filter(const ColumnFilterPtr & filter);
Builder& targetType(const DataTypePtr & target_type);
Builder& pageReader(PageReaderCreator page_reader_creator);
SelectiveColumnReaderPtr build();
private:
bool dictionary_ = false;
bool nullable_ = false;
const parquet::ColumnDescriptor * column_descriptor_ = nullptr;
DataTypePtr target_type_ = nullptr;
PageReaderCreator page_reader_creator = nullptr;
std::unique_ptr<LazyPageReader> page_reader_ = nullptr;
ColumnFilterPtr filter_ = nullptr;
};
static Builder builder();
};
class ColumnReaderBuilder
{
public:
ColumnReaderBuilder(
const Block & requiredColumns,
const RowGroupContext & context,
const std::unordered_map<String, ColumnFilterPtr> & inplaceFilterMapping,
const std::unordered_set<String> & predicateColumns);
SelectiveColumnReaderPtr buildReader(parquet::schema::NodePtr node, const DataTypePtr & target_type, int def_level = 0, int rep_level = 0);
private:
const Block& required_columns;
const RowGroupContext& context;
const std::unordered_map<String, ColumnFilterPtr>& inplace_filter_mapping;
const std::unordered_set<String>& predicate_columns;
};
}

View File

@ -0,0 +1,177 @@
#include "ParquetReader.h"
#include <IO/SharedThreadPools.h>
#include <Common/ThreadPool.h>
#include <arrow/io/util_internal.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int PARQUET_EXCEPTION;
extern const int ARGUMENT_OUT_OF_BOUND;
}
#define THROW_PARQUET_EXCEPTION(s) \
do \
{ \
try \
{ \
(s); \
} \
catch (const ::parquet::ParquetException & e) \
{ \
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Parquet exception: {}", e.what()); \
} \
} while (false)
static std::unique_ptr<parquet::ParquetFileReader> createFileReader(
std::shared_ptr<::arrow::io::RandomAccessFile> arrow_file,
parquet::ReaderProperties reader_properties,
std::shared_ptr<parquet::FileMetaData> metadata = nullptr)
{
std::unique_ptr<parquet::ParquetFileReader> res;
THROW_PARQUET_EXCEPTION(res = parquet::ParquetFileReader::Open(std::move(arrow_file), reader_properties, metadata));
return res;
}
ParquetReader::ParquetReader(
Block header_,
SeekableReadBuffer& file_,
parquet::ArrowReaderProperties arrow_properties_,
parquet::ReaderProperties reader_properties_,
std::shared_ptr<::arrow::io::RandomAccessFile> arrow_file_,
const FormatSettings & format_settings,
std::vector<int> row_groups_indices_,
std::shared_ptr<parquet::FileMetaData> metadata)
: file_reader(metadata ? nullptr : createFileReader(arrow_file_, reader_properties_, metadata))
, file(file_)
, arrow_properties(arrow_properties_)
, header(std::move(header_))
, max_block_size(format_settings.parquet.max_block_size)
, properties(reader_properties_)
, row_groups_indices(std::move(row_groups_indices_))
, meta_data(metadata ? metadata : file_reader->metadata())
{
if (row_groups_indices.empty())
for (int i = 0; i < meta_data->num_row_groups(); i++)
row_groups_indices.push_back(i);
const auto * root = meta_data->schema()->group_node();
for (int i = 0; i < root->field_count(); ++i)
{
const auto & node = root->field(i);
parquet_columns.emplace(node->name(), node);
}
chunk_reader = getSubRowGroupRangeReader(row_groups_indices);
}
Block ParquetReader::read()
{
Chunk chunk = chunk_reader->read(max_block_size);
if (!chunk) return header.cloneEmpty();
return header.cloneWithColumns(chunk.detachColumns());
}
void ParquetReader::addFilter(const String & column_name, const ColumnFilterPtr filter)
{
// std::cerr << "add filter to column " << column_name << ": " << filter->toString() << std::endl;
condition_columns.insert(column_name);
if (!filters.contains(column_name))
filters[column_name] = filter;
else
filters[column_name] = filters[column_name]->merge(filter.get());
// std::cerr << "filter on column " << column_name << ": " << filters[column_name]->toString() << std::endl;
}
std::unique_ptr<RowGroupChunkReader> ParquetReader::getRowGroupChunkReader(size_t row_group_idx, RowGroupPrefetchPtr conditions_prefetch, RowGroupPrefetchPtr prefetch)
{
return std::make_unique<RowGroupChunkReader>(this, row_group_idx, std::move(conditions_prefetch), std::move(prefetch), filters);
}
extern arrow::io::ReadRange getColumnRange(const parquet::ColumnChunkMetaData & column_metadata);
std::unique_ptr<SubRowGroupRangeReader> ParquetReader::getSubRowGroupRangeReader(std::vector<Int32> row_group_indices_)
{
std::vector<RowGroupPrefetchPtr> row_group_prefetches;
std::vector<RowGroupPrefetchPtr> row_group_condition_prefetches;
for (auto row_group_idx : row_group_indices_)
{
RowGroupPrefetchPtr prefetch = std::make_shared<RowGroupPrefetch>(file, file_mutex, arrow_properties);
RowGroupPrefetchPtr condition_prefetch = std::make_unique<RowGroupPrefetch>(file, file_mutex, arrow_properties);
auto row_group_meta = meta_data->RowGroup(row_group_idx);
// for (const auto & name : header.getNames())
// {
// if (!parquet_columns.contains(name))
// throw Exception(ErrorCodes::PARQUET_EXCEPTION, "no column with '{}' in parquet file", name);
// auto idx = meta_data->schema()->ColumnIndex(*parquet_columns[name]);
// auto range = getColumnRange(*row_group_meta->ColumnChunk(idx));
// if (condition_columns.contains(name))
// condition_prefetch->prefetchRange(range);
// else
// prefetch->prefetchRange(range);
// }
row_group_prefetches.push_back(std::move(prefetch));
if (!condition_prefetch->isEmpty())
row_group_condition_prefetches.push_back(std::move(condition_prefetch));
}
return std::make_unique<SubRowGroupRangeReader>(
row_group_indices_,
std::move(row_group_condition_prefetches),
std::move(row_group_prefetches),
[&](const size_t idx, RowGroupPrefetchPtr condition_prefetch, RowGroupPrefetchPtr prefetch) { return getRowGroupChunkReader(idx, std::move(condition_prefetch), std::move(prefetch)); });
}
void ParquetReader::addExpressionFilter(std::shared_ptr<ExpressionFilter> filter)
{
if (!filter) return;
for (const auto & item : filter->getInputs())
{
condition_columns.insert(item);
}
expression_filters.emplace_back(filter);
}
SubRowGroupRangeReader::SubRowGroupRangeReader(const std::vector<Int32> & rowGroupIndices, std::vector<RowGroupPrefetchPtr> && row_group_condition_prefetches_, std::vector<RowGroupPrefetchPtr>&& row_group_prefetches_, RowGroupReaderCreator && creator)
: row_group_indices(rowGroupIndices), row_group_condition_prefetches(std::move(row_group_condition_prefetches_)), row_group_prefetches(std::move(row_group_prefetches_)), row_group_reader_creator(creator)
{
if (row_group_indices.size() != row_group_prefetches.size())
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "row group indices and prefetches size mismatch");
}
DB::Chunk SubRowGroupRangeReader::read(size_t rows)
{
Chunk chunk;
while (chunk.getNumRows() == 0)
{
if (!loadRowGroupChunkReaderIfNeeded()) break;
chunk = row_group_chunk_reader->readChunk(rows);
}
return chunk;
}
bool SubRowGroupRangeReader::loadRowGroupChunkReaderIfNeeded()
{
if (row_group_chunk_reader && !row_group_chunk_reader->hasMoreRows() && next_row_group_idx >= row_group_indices.size())
return false;
if ((!row_group_chunk_reader || !row_group_chunk_reader->hasMoreRows()) && next_row_group_idx < row_group_indices.size())
{
// if (next_row_group_idx == 0)
// {
// if (row_group_condition_prefetches.empty())
// row_group_prefetches.front()->startPrefetch();
// else
// row_group_condition_prefetches.front()->startPrefetch();
// }
row_group_chunk_reader = row_group_reader_creator(row_group_indices[next_row_group_idx], row_group_condition_prefetches.empty()? nullptr : std::move(row_group_condition_prefetches[next_row_group_idx]), std::move(row_group_prefetches[next_row_group_idx]));
next_row_group_idx++;
// if (next_row_group_idx < row_group_indices.size())
// {
// if (row_group_condition_prefetches.empty())
// row_group_prefetches[next_row_group_idx]->startPrefetch();
// else
// row_group_condition_prefetches[next_row_group_idx]->startPrefetch();
// }
}
return true;
}
}

View File

@ -0,0 +1,89 @@
#pragma once
#include <Core/Block.h>
#include <Formats/FormatSettings.h>
#include <Processors/Chunk.h>
#include <IO/ReadBufferFromFileBase.h>
#include <Processors/Formats/Impl/Parquet/SelectiveColumnReader.h>
#include <Processors/Formats/Impl/Parquet/RowGroupChunkReader.h>
#include <Interpreters/ExpressionActions.h>
#include <arrow/io/interfaces.h>
#include <parquet/file_reader.h>
#include <parquet/properties.h>
#include <Common/threadPoolCallbackRunner.h>
namespace DB
{
class SubRowGroupRangeReader
{
public:
using RowGroupReaderCreator = std::function<std::unique_ptr<RowGroupChunkReader>(size_t, RowGroupPrefetchPtr, RowGroupPrefetchPtr)>;
SubRowGroupRangeReader(const std::vector<Int32> & rowGroupIndices, std::vector<RowGroupPrefetchPtr> && row_group_condition_prefetches_, std::vector<RowGroupPrefetchPtr> && row_group_prefetches, RowGroupReaderCreator&& creator);
DB::Chunk read(size_t rows);
private:
bool loadRowGroupChunkReaderIfNeeded();
std::vector<Int32> row_group_indices;
std::vector<RowGroupPrefetchPtr> row_group_condition_prefetches;
std::vector<RowGroupPrefetchPtr> row_group_prefetches;
std::unique_ptr<RowGroupChunkReader> row_group_chunk_reader;
size_t next_row_group_idx = 0;
RowGroupReaderCreator row_group_reader_creator;
};
class ParquetReader
{
public:
friend class RowGroupChunkReader;
ParquetReader(
Block header_,
SeekableReadBuffer& file,
parquet::ArrowReaderProperties arrow_properties_,
parquet::ReaderProperties reader_properties_,
std::shared_ptr<::arrow::io::RandomAccessFile> arrow_file,
const FormatSettings & format_settings,
std::vector<int> row_groups_indices_ = {},
std::shared_ptr<parquet::FileMetaData> metadata = nullptr);
Block read();
void addFilter(const String & column_name, ColumnFilterPtr filter);
void addExpressionFilter(std::shared_ptr<ExpressionFilter> filter);
std::unique_ptr<RowGroupChunkReader> getRowGroupChunkReader(size_t row_group_idx, RowGroupPrefetchPtr conditions_prefetch, RowGroupPrefetchPtr prefetch);
std::unique_ptr<SubRowGroupRangeReader> getSubRowGroupRangeReader(std::vector<Int32> row_group_indices);
const parquet::FileMetaData& metaData()
{
return *meta_data;
}
const parquet::ReaderProperties& readerProperties()
{
return properties;
}
private:
std::unique_ptr<parquet::ParquetFileReader> file_reader;
std::mutex file_mutex;
SeekableReadBuffer& file;
parquet::ArrowReaderProperties arrow_properties;
Block header;
std::unique_ptr<SubRowGroupRangeReader> chunk_reader;
UInt64 max_block_size;
parquet::ReaderProperties properties;
std::unordered_map<String, ColumnFilterPtr> filters;
std::vector<int> parquet_col_indices;
std::vector<int> row_groups_indices;
size_t next_row_group_idx = 0;
std::shared_ptr<parquet::FileMetaData> meta_data;
std::unordered_map<String, parquet::schema::NodePtr> parquet_columns;
std::vector<std::shared_ptr<ExpressionFilter>> expression_filters;
std::unordered_set<String> condition_columns;
};
}

View File

@ -0,0 +1,367 @@
#include "RowGroupChunkReader.h"
#include <Columns/FilterDescription.h>
#include <IO/SharedThreadPools.h>
#include <Processors/Formats/Impl/Parquet/ParquetColumnReaderFactory.h>
#include <Processors/Formats/Impl/Parquet/ParquetReader.h>
#include <arrow/io/util_internal.h>
#include <Common/threadPoolCallbackRunner.h>
#include <Common/Stopwatch.h>
namespace ProfileEvents
{
extern const Event ParquetFilteredRows;
extern const Event ParquetFetchWaitTimeMicroseconds;
extern const Event ParquetSkippedRows;
extern const Event ParquetOutputRows;
}
namespace DB
{
Chunk RowGroupChunkReader::readChunk(size_t rows)
{
if (!remain_rows)
return {};
rows = std::min(rows, remain_rows);
MutableColumns columns;
size_t rows_read = 0;
while (!rows_read)
{
size_t rows_to_read = std::min(rows - rows_read, remain_rows);
if (!rows_to_read)
break;
for (const auto & column : parquet_reader->condition_columns)
{
auto reader = reader_columns_mapping.at(column);
if (!reader->availableRows())
{
reader->readPageIfNeeded();
}
rows_to_read = std::min(reader->availableRows(), rows_to_read);
}
if (!rows_to_read)
break;
auto select_result = selectConditions->selectRows(rows_to_read);
if (select_result.skip_all)
{
metrics.skipped_rows += rows_to_read;
ProfileEvents::increment(ProfileEvents::ParquetSkippedRows, rows_to_read);
}
bool all = select_result.valid_count == rows_to_read;
if (all)
select_result.set = std::nullopt;
auto column_names = parquet_reader->header.getNames();
if (select_result.skip_all)
{
metrics.filtered_rows += rows_to_read;
rows_read = 0;
for (const auto & name : column_names)
{
if (!select_result.intermediate_columns.contains(name))
{
reader_columns_mapping.at(name)->skip(rows_to_read);
}
}
ProfileEvents::increment(ProfileEvents::ParquetFilteredRows, rows_to_read);
}
else
{
for (const auto & name : column_names)
{
if (select_result.intermediate_columns.contains(name))
{
if (all)
columns.emplace_back(select_result.intermediate_columns.at(name)->assumeMutable());
else
columns.emplace_back(
select_result.intermediate_columns.at(name)->filter(select_result.intermediate_filter, select_result.valid_count)->assumeMutable());
}
else
{
auto & reader = reader_columns_mapping.at(name);
auto column = reader->createColumn();
column->reserve(select_result.valid_count);
reader->read(column, select_result.set, rows_to_read);
columns.emplace_back(std::move(column));
}
}
rows_read = columns[0]->size();
metrics.filtered_rows += (rows_to_read - rows_read);
ProfileEvents::increment(ProfileEvents::ParquetFilteredRows, rows_to_read - rows_read);
}
remain_rows -= rows_to_read;
}
metrics.output_rows += rows_read;
ProfileEvents::increment(ProfileEvents::ParquetOutputRows, rows_read);
if (rows_read)
return Chunk(std::move(columns), rows_read);
else
return {};
}
arrow::io::ReadRange getColumnRange(const parquet::ColumnChunkMetaData & column_metadata)
{
int64_t col_start = column_metadata.data_page_offset();
if (column_metadata.has_dictionary_page() && column_metadata.dictionary_page_offset() > 0
&& col_start > column_metadata.dictionary_page_offset())
{
col_start = column_metadata.dictionary_page_offset();
}
int64_t len = column_metadata.total_compressed_size();
return {col_start, len};
}
RowGroupPrefetch::RowGroupPrefetch(SeekableReadBuffer & file_, std::mutex & mutex, const parquet::ArrowReaderProperties & arrow_properties_)
: file(file_), file_mutex(mutex), arrow_properties(arrow_properties_)
{
callback_runner = threadPoolCallbackRunnerUnsafe<ColumnChunkData>(getIOThreadPool().get(), "ParquetRead");
}
void RowGroupPrefetch::prefetchRange(const arrow::io::ReadRange & range)
{
if (fetched)
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "RowGroupPrefetch: prefetchColumnChunk called after startPrefetch");
ranges.emplace_back(range);
}
void RowGroupPrefetch::startPrefetch()
{
if (fetched)
return;
fetched = true;
ranges = arrow::io::internal::CoalesceReadRanges(
ranges, arrow_properties.cache_options().hole_size_limit, arrow_properties.cache_options().range_size_limit);
read_range_buffers.resize(ranges.size());
for (size_t i = 0; i < ranges.size(); i++)
{
auto & range = ranges[i];
read_range_buffers[i].range = range;
auto task = [this, range, i]() -> ColumnChunkData
{
auto & buffer = read_range_buffers[i].buffer;
buffer.resize(range.length);
int64_t count = 0;
if (file.supportsReadAt())
{
auto pb = [](size_t) { return true; };
count = file.readBigAt(reinterpret_cast<char *>(buffer.data()), range.length, range.offset, pb);
}
else
{
std::lock_guard lock(file_mutex);
file.seek(range.offset, SEEK_SET);
count = file.readBig(reinterpret_cast<char *>(buffer.data()), range.length);
}
if (count != range.length)
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Failed to read column data");
return {reinterpret_cast<uint8_t *>(buffer.data()), buffer.size()};
};
// Assuming Priority is an enum or a type that is defined elsewhere
Priority priority = {0}; // Set the appropriate priority value
auto future = callback_runner(std::move(task), priority);
tasks.emplace_back(TaskEntry{range, std::move(future)});
}
}
ColumnChunkData RowGroupPrefetch::readRange(const arrow::io::ReadRange & range)
{
if (!fetched)
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "RowGroupPrefetch: readRange called before startPrefetch");
// wait fetch finished
const auto it = std::lower_bound(
tasks.begin(),
tasks.end(),
range,
[](const TaskEntry & entry, const arrow::io::ReadRange & range_)
{ return entry.range.offset + entry.range.length < range_.offset + range_.length; });
if (it != tasks.end() && it->range.Contains(range))
{
it->task.wait();
}
else
{
throw Exception(
ErrorCodes::ARGUMENT_OUT_OF_BOUND, "Range was not requested for caching: offset={}, length={}", range.offset, range.length);
}
const auto buffer_it = std::lower_bound(
read_range_buffers.begin(),
read_range_buffers.end(),
range,
[](const ReadRangeBuffer & buffer, const arrow::io::ReadRange & range_)
{ return buffer.range.offset + buffer.range.length < range_.offset + range_.length; });
if (buffer_it != read_range_buffers.end() && buffer_it->range.Contains(range))
{
return {
reinterpret_cast<uint8_t *>(buffer_it->buffer.data() + (range.offset - buffer_it->range.offset)),
static_cast<size_t>(range.length)};
}
else
{
throw Exception(
ErrorCodes::ARGUMENT_OUT_OF_BOUND, "Range was not requested for caching: offset={}, length={}", range.offset, range.length);
}
}
RowGroupChunkReader::RowGroupChunkReader(
ParquetReader * parquetReader, size_t row_group_idx, RowGroupPrefetchPtr prefetch_conditions_, RowGroupPrefetchPtr prefetch_, std::unordered_map<String, ColumnFilterPtr> filters)
: parquet_reader(parquetReader)
, row_group_meta(parquetReader->meta_data->RowGroup(static_cast<int>(row_group_idx)))
, prefetch_conditions(std::move(prefetch_conditions_))
, prefetch(std::move(prefetch_))
{
column_readers.reserve(parquet_reader->header.columns());
column_buffers.resize(parquet_reader->header.columns());
context.parquet_reader = parquetReader;
context.row_group_meta = row_group_meta;
context.prefetch = prefetch;
context.prefetch_conditions = prefetch_conditions;
context.filter_columns = filter_columns;
remain_rows = row_group_meta->num_rows();
builder = std::make_unique<ColumnReaderBuilder>(parquet_reader->header, context, parquet_reader->filters, parquet_reader->condition_columns);
for (const auto & col_with_name : parquet_reader->header)
{
if (!parquetReader->parquet_columns.contains(col_with_name.name))
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "no column with '{}' in parquet file", col_with_name.name);
const auto & node = parquetReader->parquet_columns.at(col_with_name.name);
SelectiveColumnReaderPtr column_reader;
auto filter = filters.contains(col_with_name.name) ? filters.at(col_with_name.name) : nullptr;
column_reader = builder->buildReader(node, col_with_name.type, 0, 0);
column_readers.push_back(column_reader);
reader_columns_mapping[col_with_name.name] = column_reader;
if (filter)
filter_columns.push_back(col_with_name.name);
}
if (prefetch_conditions)
prefetch_conditions->startPrefetch();
prefetch->startPrefetch();
selectConditions = std::make_unique<SelectConditions>(reader_columns_mapping, filter_columns, parquet_reader->expression_filters, parquet_reader->header);
}
static IColumn::Filter mergeFilters(std::vector<IColumn::Filter> & filters)
{
assert(!filters.empty());
if (filters.size() == 1)
return std::move(filters[0]);
IColumn::Filter result;
size_t size = filters.front().size();
result.resize_fill(size, 1);
for (size_t i = 0; i < filters.size(); i++)
{
auto & current = filters[i];
for (size_t j = 0; j < size; j++)
{
if (!result[i])
continue;
if (!current[i])
result[i] = 0;
}
}
return result;
}
static void combineRowSetAndFilter(RowSet & set, const IColumn::Filter& filter_data)
{
int count = 0;
for (size_t i = 0; i < set.totalRows(); ++i)
{
if (!set.get(i))
continue;
if (!filter_data[count])
set.set(i, false);
count++;
}
}
SelectResult SelectConditions::selectRows(size_t rows)
{
OptionalRowSet total_set;
if (has_filter)
total_set = std::optional(RowSet(rows));
else
return SelectResult{std::nullopt, {}, {}, rows, false};
bool skip_all = false;
// apply fast filters
for (const auto & name : fast_filter_columns)
{
readers.at(name)->computeRowSet(total_set, rows);
if (total_set.value().none())
{
skip_all = true;
break;
}
}
size_t count = 0;
// apply actions filter
std::unordered_map<String, ColumnPtr> intermediate_columns;
std::vector<IColumn::Filter> intermediate_filters;
for (const auto & expr_filter : expression_filters)
{
if (skip_all)
break;
count = total_set.has_value() ? total_set.value().count() : rows;
// prepare condition columns
ColumnsWithTypeAndName input;
for (const auto &name : expr_filter->getInputs())
{
if (!intermediate_columns.contains(name))
{
auto reader = readers.at(name);
auto column = reader->createColumn();
if (count == rows)
{
OptionalRowSet set;
reader->read(column, set, rows);
}
else
reader->read(column, total_set, rows);
intermediate_columns.emplace(name, std::move(column));
}
input.emplace_back(intermediate_columns.at(name), header.getByName(name).type, name);
}
auto filter = expr_filter->execute(input);
size_t filter_count = countBytesInFilter(filter);
intermediate_filters.emplace_back(std::move(filter));
if (!filter_count)
{
skip_all = true;
break;
}
}
if (skip_all)
return SelectResult{std::nullopt, std::move(intermediate_columns), {}, 0, true};
else
{
auto total_count = total_set.value().count();
if (!intermediate_filters.empty())
{
auto filter = mergeFilters(intermediate_filters);
combineRowSetAndFilter(total_set.value(), filter);
return SelectResult{std::move(total_set), std::move(intermediate_columns), std::move(filter), total_count, false};
}
return SelectResult{std::move(total_set), {}, {}, total_count, false};
}
}
SelectConditions::SelectConditions(
std::unordered_map<String, SelectiveColumnReaderPtr> & readers_,
std::vector<String> & fast_filter_columns_,
std::vector<std::shared_ptr<ExpressionFilter>> & expression_filters_,
const Block & header_)
: readers(readers_), fast_filter_columns(fast_filter_columns_), expression_filters(expression_filters_), header(header_)
{ has_filter = !fast_filter_columns.empty() || !expression_filters.empty();
}
}

View File

@ -0,0 +1,131 @@
#pragma once
#include <Processors/Formats/Impl/Parquet/SelectiveColumnReader.h>
#include <Processors/Formats/Impl/Parquet/ParquetColumnReaderFactory.h>
#include <Common/threadPoolCallbackRunner.h>
namespace DB
{
class ParquetReader;
struct FilterDescription;
struct SelectResult
{
std::optional<RowSet> set;
std::unordered_map<String, ColumnPtr> intermediate_columns;
IColumn::Filter intermediate_filter;
size_t valid_count = 0;
bool skip_all = false;
};
class SelectConditions
{
public:
SelectConditions(
std::unordered_map<String, SelectiveColumnReaderPtr> & readers_,
std::vector<String> & fast_filter_columns_,
std::vector<std::shared_ptr<ExpressionFilter>>& expression_filters,
const Block & header_);
SelectResult selectRows(size_t rows);
private:
bool has_filter = false;
const std::unordered_map<String, SelectiveColumnReaderPtr> & readers;
const std::vector<String> & fast_filter_columns;
const std::vector<std::shared_ptr<ExpressionFilter>>& expression_filters;
const Block & header;
};
struct ColumnChunkData
{
uint8_t * data;
size_t size;
};
struct ReadRangeBuffer
{
arrow::io::ReadRange range;
PaddedPODArray<UInt8> buffer;
};
using ColumnChunkDataPtr = std::shared_ptr<ColumnChunkData>;
class RowGroupPrefetch
{
public:
RowGroupPrefetch(SeekableReadBuffer & file_, std::mutex & mutex, const parquet::ArrowReaderProperties& arrow_properties_);
void prefetchRange(const arrow::io::ReadRange& range);
void startPrefetch();
ColumnChunkData readRange(const arrow::io::ReadRange& range);
bool isEmpty() const { return ranges.empty(); }
private:
struct TaskEntry
{
arrow::io::ReadRange range;
std::future<ColumnChunkData> task;
};
ThreadPoolCallbackRunnerUnsafe<ColumnChunkData> callback_runner;
SeekableReadBuffer& file;
std::mutex& file_mutex;
std::vector<arrow::io::ReadRange> ranges;
std::vector<ReadRangeBuffer> read_range_buffers;
std::vector<TaskEntry> tasks;
std::mutex chunks_mutex;
parquet::ArrowReaderProperties arrow_properties;
bool fetched = false;
};
using RowGroupPrefetchPtr = std::shared_ptr<RowGroupPrefetch>;
struct RowGroupContext
{
ParquetReader * parquet_reader;
std::shared_ptr<parquet::RowGroupMetaData> row_group_meta;
std::vector<String> filter_columns;
RowGroupPrefetchPtr prefetch_conditions;
RowGroupPrefetchPtr prefetch;
};
arrow::io::ReadRange getColumnRange(const parquet::ColumnChunkMetaData & column_metadata);
class RowGroupChunkReader
{
public:
struct ReadMetrics
{
size_t output_rows = 0;
size_t filtered_rows = 0;
size_t skipped_rows = 0;
};
RowGroupChunkReader(
ParquetReader * parquetReader,
size_t row_group_idx,
RowGroupPrefetchPtr prefetch_conditions,
RowGroupPrefetchPtr prefetch,
std::unordered_map<String, ColumnFilterPtr> filters);
~RowGroupChunkReader()
{
// printMetrics(std::cerr);
}
Chunk readChunk(size_t rows);
bool hasMoreRows() const { return remain_rows > 0; }
void printMetrics(std::ostream & out) const
{
out << fmt::format("metrics.output_rows: {} \n metrics.filtered_rows: {} \n metrics.skipped_rows: {} \n", metrics.output_rows, metrics.filtered_rows, metrics.skipped_rows);
}
private:
ParquetReader * parquet_reader;
std::shared_ptr<parquet::RowGroupMetaData> row_group_meta;
std::vector<String> filter_columns;
RowGroupPrefetchPtr prefetch_conditions;
RowGroupPrefetchPtr prefetch;
std::unordered_map<String, SelectiveColumnReaderPtr> reader_columns_mapping;
std::vector<SelectiveColumnReaderPtr> column_readers;
std::vector<PaddedPODArray<UInt8>> column_buffers;
size_t remain_rows = 0;
ReadMetrics metrics;
std::unique_ptr<SelectConditions> selectConditions;
RowGroupContext context;
std::unique_ptr<ColumnReaderBuilder> builder;
};
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,525 @@
#pragma once
#include "ColumnFilter.h"
#include <iostream>
#include <vector>
#include <Columns/ColumnString.h>
#include <DataTypes/IDataType.h>
#include <Processors/Chunk.h>
#include <Processors/Formats/Impl/Parquet/PageReader.h>
#include <arrow/util/rle_encoding.h>
#include <parquet/column_page.h>
#include <parquet/column_reader.h>
#include <parquet/file_reader.h>
#include <Common/PODArray.h>
namespace parquet
{
class ColumnDescriptor;
}
namespace DB
{
class SelectiveColumnReader;
using SelectiveColumnReaderPtr = std::shared_ptr<SelectiveColumnReader>;
using PageReaderCreator = std::function<std::unique_ptr<LazyPageReader>()>;
struct ScanSpec
{
String column_name;
const parquet::ColumnDescriptor * column_desc = nullptr;
ColumnFilterPtr filter;
};
class FilterCache
{
public:
explicit FilterCache(size_t size)
{
cache_set.resize(size);
filter_cache.resize(size);
}
inline bool has(size_t size) { return cache_set.test(size); }
inline bool hasNull() const { return exist_null; }
inline bool getNull() const { return value_null; }
inline void setNull(bool value)
{
exist_null = true;
value_null = value;
}
inline bool get(size_t size) { return filter_cache.test(size); }
inline void set(size_t size, bool value)
{
cache_set.set(size, true);
filter_cache.set(size, value);
}
private:
boost::dynamic_bitset<> cache_set;
boost::dynamic_bitset<> filter_cache;
bool exist_null = false;
bool value_null = false;
};
struct ParquetData
{
// raw page data
const uint8_t * buffer = nullptr;
// size of raw page data
size_t buffer_size = 0;
void checkSize(size_t size) const
{
if (size > buffer_size) [[unlikely]]
throw Exception(ErrorCodes::PARQUET_EXCEPTION , "ParquetData: buffer size is not enough, {} > {}", size, buffer_size);
}
// before consume, should check size first
void consume(size_t size)
{
buffer += size;
buffer_size -= size;
}
void checkAndConsume(size_t size)
{
checkSize(size);
consume(size);
}
};
struct ScanState
{
std::shared_ptr<parquet::Page> page;
PaddedPODArray<Int16> def_levels;
PaddedPODArray<Int16> rep_levels;
ParquetData data;
// rows should be skipped before read data
size_t lazy_skip_rows = 0;
// for dictionary encoding
PaddedPODArray<Int32> idx_buffer;
std::unique_ptr<FilterCache> filter_cache;
// current column chunk available rows
size_t remain_rows = 0;
};
Int32 loadLength(const uint8_t * data);
class PlainDecoder
{
public:
PlainDecoder(ParquetData& data_, size_t & remain_rows_) : page_data(data_), remain_rows(remain_rows_) { }
template <typename T, typename S>
void decodeFixedValue(PaddedPODArray<T> & data, const OptionalRowSet & row_set, size_t rows_to_read);
void decodeString(ColumnString::Chars & chars, ColumnString::Offsets & offsets, const OptionalRowSet & row_set, size_t rows_to_read);
template <typename T, typename S>
void decodeFixedValueSpace(PaddedPODArray<T> & data, const OptionalRowSet & row_set, PaddedPODArray<UInt8> & null_map, size_t rows_to_read);
void decodeStringSpace(
ColumnString::Chars & chars,
ColumnString::Offsets & offsets,
const OptionalRowSet & row_set,
PaddedPODArray<UInt8> & null_map,
size_t rows_to_read);
size_t calculateStringTotalSize(const ParquetData& data, const OptionalRowSet & row_set, size_t rows_to_read);
size_t
calculateStringTotalSizeSpace(const ParquetData& data, const OptionalRowSet & row_set, PaddedPODArray<UInt8> & null_map, size_t rows_to_read);
private:
void addOneString(bool null, const ParquetData& data, size_t & offset, const OptionalRowSet & row_set, size_t row, size_t & total_size)
{
if (row_set.has_value())
{
const auto & sets = row_set.value();
if (null)
{
if (sets.get(row))
total_size++;
return;
}
data.checkSize(offset +4);
auto len = loadLength(data.buffer + offset);
offset += 4 + len;
data.checkSize(offset);
if (sets.get(row))
total_size += len + 1;
}
else
{
if (null)
{
total_size++;
return;
}
data.checkSize(offset +4);
auto len = loadLength(data.buffer + offset);
offset += 4 + len;
data.checkSize(offset);
total_size += len + 1;
}
}
ParquetData& page_data;
size_t & remain_rows;
};
class DictDecoder
{
public:
DictDecoder(PaddedPODArray<Int32> & idx_buffer_, size_t & remain_rows_) : idx_buffer(idx_buffer_), remain_rows(remain_rows_) { }
template <class DictValueType>
void decodeFixedValue(PaddedPODArray<DictValueType> & dict, PaddedPODArray<DictValueType> & data, const OptionalRowSet & row_set, size_t rows_to_read);
void decodeString(
std::vector<String> & dict,
ColumnString::Chars & chars,
ColumnString::Offsets & offsets,
const OptionalRowSet & row_set,
size_t rows_to_read);
template <class DictValueType>
void decodeFixedValueSpace(
PaddedPODArray<DictValueType> & dict,
PaddedPODArray<DictValueType> & data,
const OptionalRowSet & row_set,
PaddedPODArray<UInt8> & null_map,
size_t rows_to_read);
void decodeStringSpace(
std::vector<String> & dict,
ColumnString::Chars & chars,
ColumnString::Offsets & offsets,
const OptionalRowSet & row_set,
PaddedPODArray<UInt8> & null_map,
size_t rows_to_read);
private:
PaddedPODArray<Int32> & idx_buffer;
size_t & remain_rows;
};
class SelectiveColumnReader
{
friend class OptionalColumnReader;
public:
SelectiveColumnReader(PageReaderCreator page_reader_creator_, const ScanSpec & scan_spec_)
: page_reader_creator(std::move(page_reader_creator_)), scan_spec(scan_spec_)
{
}
virtual ~SelectiveColumnReader() = default;
void initPageReaderIfNeed()
{
if (!page_reader)
page_reader = page_reader_creator();
}
virtual void computeRowSet(std::optional<RowSet> & row_set, size_t rows_to_read) = 0;
virtual void computeRowSetSpace(OptionalRowSet &, PaddedPODArray<UInt8> &, size_t, size_t) { }
virtual void read(MutableColumnPtr & column, OptionalRowSet & row_set, size_t rows_to_read) = 0;
virtual void readSpace(MutableColumnPtr &, OptionalRowSet &, PaddedPODArray<UInt8> &, size_t, size_t) { }
virtual void readPageIfNeeded();
void readAndDecodePage()
{
readPageIfNeeded();
decodePage();
}
virtual MutableColumnPtr createColumn() = 0;
const PaddedPODArray<Int16> & getDefinitionLevels()
{
readAndDecodePage();
return state.def_levels;
}
const PaddedPODArray<Int16> & getRepetitionLevels()
{
readAndDecodePage();
return state.rep_levels;
}
virtual size_t currentRemainRows() const { return state.remain_rows; }
virtual size_t availableRows() const { return std::max(state.remain_rows - state.lazy_skip_rows, 0UL); }
void skipNulls(size_t rows_to_skip);
void skip(size_t rows);
// skip values in current page, return the number of rows need to lazy skip
virtual size_t skipValuesInCurrentPage(size_t rows_to_skip) = 0;
virtual int16_t maxDefinitionLevel() const { return scan_spec.column_desc->max_definition_level(); }
virtual int16_t maxRepetitionLevel() const { return scan_spec.column_desc->max_repetition_level(); }
protected:
void decodePage();
virtual void skipPageIfNeed();
bool readPage();
void readDataPageV1(const parquet::DataPageV1 & page);
virtual void readDictPage(const parquet::DictionaryPage &) { }
virtual void initIndexDecoderIfNeeded() { }
virtual void createDictDecoder() { }
virtual void downgradeToPlain() { }
PageReaderCreator page_reader_creator;
std::unique_ptr<LazyPageReader> page_reader;
ScanState state;
ScanSpec scan_spec;
std::unique_ptr<PlainDecoder> plain_decoder;
bool plain = true;
};
template <typename DataType, typename SerializedType>
class NumberColumnDirectReader : public SelectiveColumnReader
{
public:
NumberColumnDirectReader(PageReaderCreator page_reader_creator_, ScanSpec scan_spec_, DataTypePtr datatype_);
~NumberColumnDirectReader() override = default;
MutableColumnPtr createColumn() override;
void computeRowSet(OptionalRowSet & row_set, size_t rows_to_read) override;
void computeRowSetSpace(OptionalRowSet & row_set, PaddedPODArray<UInt8> & null_map, size_t null_count, size_t rows_to_read) override;
void read(MutableColumnPtr & column, OptionalRowSet & row_set, size_t rows_to_read) override;
void readSpace(
MutableColumnPtr & column, OptionalRowSet & row_set, PaddedPODArray<UInt8> & null_map, size_t null_count, size_t rows_to_read) override;
size_t skipValuesInCurrentPage(size_t rows_to_skip) override;
private:
DataTypePtr datatype;
};
template <typename DataType, typename SerializedType>
class NumberDictionaryReader : public SelectiveColumnReader
{
public:
NumberDictionaryReader(PageReaderCreator page_reader_creator_, ScanSpec scan_spec_, DataTypePtr datatype_);
~NumberDictionaryReader() override = default;
void computeRowSet(OptionalRowSet & row_set, size_t rows_to_read) override;
void computeRowSetSpace(OptionalRowSet & set, PaddedPODArray<UInt8> & null_map, size_t null_count, size_t rows_to_read) override;
void read(MutableColumnPtr & column, OptionalRowSet & row_set, size_t rows_to_read) override;
void readSpace(MutableColumnPtr & ptr, OptionalRowSet & set, PaddedPODArray<UInt8> & null_map, size_t null_count, size_t size) override;
MutableColumnPtr createColumn() override { return datatype->createColumn(); }
size_t skipValuesInCurrentPage(size_t rows_to_skip) override;
protected:
void readDictPage(const parquet::DictionaryPage & page) override;
void initIndexDecoderIfNeeded() override
{
if (dict.empty())
return;
uint8_t bit_width = *state.data.buffer;
state.data.checkSize(1);
state.data.consume(1);
idx_decoder = arrow::util::RleDecoder(state.data.buffer, static_cast<int>(state.data.buffer_size), bit_width);
}
void nextIdxBatchIfEmpty(size_t rows_to_read);
void createDictDecoder() override;
void downgradeToPlain() override;
private:
DataTypePtr datatype;
arrow::util::RleDecoder idx_decoder;
std::unique_ptr<DictDecoder> dict_decoder;
PaddedPODArray<typename DataType::FieldType> dict;
PaddedPODArray<typename DataType::FieldType> batch_buffer;
};
void computeRowSetPlainString(const uint8_t * start, OptionalRowSet & row_set, ColumnFilterPtr filter, size_t rows_to_read);
void computeRowSetPlainStringSpace(
const uint8_t * start, OptionalRowSet & row_set, ColumnFilterPtr filter, size_t rows_to_read, PaddedPODArray<UInt8> & null_map);
class StringDirectReader : public SelectiveColumnReader
{
public:
StringDirectReader(PageReaderCreator page_reader_creator_, const ScanSpec & scan_spec_)
: SelectiveColumnReader(std::move(page_reader_creator_), scan_spec_)
{
}
void computeRowSet(OptionalRowSet & row_set, size_t rows_to_read) override;
void computeRowSetSpace(OptionalRowSet & row_set, PaddedPODArray<UInt8> & null_map, size_t /*null_count*/, size_t rows_to_read) override;
void read(MutableColumnPtr & column, OptionalRowSet & row_set, size_t rows_to_read) override;
void readSpace(
MutableColumnPtr & column, OptionalRowSet & row_set, PaddedPODArray<UInt8> & null_map, size_t null_count, size_t rows_to_read) override;
size_t skipValuesInCurrentPage(size_t rows_to_skip) override;
MutableColumnPtr createColumn() override { return ColumnString::create(); }
};
class StringDictionaryReader : public SelectiveColumnReader
{
public:
StringDictionaryReader(PageReaderCreator page_reader_creator_, const ScanSpec & scan_spec_)
: SelectiveColumnReader(std::move(page_reader_creator_), scan_spec_)
{
}
MutableColumnPtr createColumn() override { return ColumnString::create(); }
void computeRowSet(OptionalRowSet & row_set, size_t rows_to_read) override;
void computeRowSetSpace(OptionalRowSet & row_set, PaddedPODArray<UInt8> & null_map, size_t null_count, size_t rows_to_read) override;
void read(MutableColumnPtr & column, OptionalRowSet & row_set, size_t rows_to_read) override;
void readSpace(MutableColumnPtr & column, OptionalRowSet & row_set, PaddedPODArray<UInt8> & null_map, size_t null_count, size_t rows_to_read) override;
size_t skipValuesInCurrentPage(size_t rows_to_skip) override;
protected:
void readDictPage(const parquet::DictionaryPage & page) override;
void initIndexDecoderIfNeeded() override;
/// TODO move to DictDecoder
void nextIdxBatchIfEmpty(size_t rows_to_read);
void createDictDecoder() override { dict_decoder = std::make_unique<DictDecoder>(state.idx_buffer, state.remain_rows); }
void downgradeToPlain() override;
private:
std::vector<String> dict;
std::unique_ptr<DictDecoder> dict_decoder;
arrow::util::RleDecoder idx_decoder;
};
class OptionalColumnReader : public SelectiveColumnReader
{
public:
OptionalColumnReader(const ScanSpec & scanSpec, const SelectiveColumnReaderPtr child_)
: SelectiveColumnReader(nullptr, scanSpec), child(child_)
{
def_level = child->maxDefinitionLevel();
rep_level = child->maxRepetitionLevel();
}
~OptionalColumnReader() override = default;
void readPageIfNeeded() override { child->readPageIfNeeded(); }
MutableColumnPtr createColumn() override;
size_t currentRemainRows() const override;
void computeRowSet(OptionalRowSet & row_set, size_t rows_to_read) override;
void read(MutableColumnPtr & column, OptionalRowSet & row_set, size_t rows_to_read) override;
size_t skipValuesInCurrentPage(size_t rows_to_skip) override;
int16_t maxDefinitionLevel() const override { return child->maxDefinitionLevel(); }
int16_t maxRepetitionLevel() const override { return child->maxRepetitionLevel(); }
size_t availableRows() const override;
private:
void applyLazySkip();
protected:
void skipPageIfNeed() override;
private:
void nextBatchNullMapIfNeeded(size_t rows_to_read);
void cleanNullMap()
{
cur_null_count = 0;
cur_null_map.resize(0);
}
SelectiveColumnReaderPtr child;
PaddedPODArray<UInt8> cur_null_map;
size_t cur_null_count = 0;
int def_level = 0;
int rep_level = 0;
};
class ListColumnReader : public SelectiveColumnReader
{
public:
ListColumnReader(int16_t rep_level_, int16_t def_level_, const SelectiveColumnReaderPtr child_)
: SelectiveColumnReader(nullptr, ScanSpec{}), child(child_), def_level(def_level_), rep_level(rep_level_)
{
}
~ListColumnReader() override = default;
void read(MutableColumnPtr & column, OptionalRowSet & row_set, size_t rows_to_read) override;
int16_t maxDefinitionLevel() const override { return def_level; }
int16_t maxRepetitionLevel() const override { return rep_level; }
void computeRowSet(std::optional<RowSet> & row_set, size_t rows_to_read) override;
MutableColumnPtr createColumn() override;
size_t skipValuesInCurrentPage(size_t rows_to_skip) override;
private:
SelectiveColumnReaderPtr child;
int16_t def_level = 0;
int16_t rep_level = 0;
};
class MapColumnReader : public SelectiveColumnReader
{
public:
MapColumnReader(int16_t rep_level_, int16_t def_level_, const SelectiveColumnReaderPtr key_, const SelectiveColumnReaderPtr value_)
: SelectiveColumnReader(nullptr, ScanSpec{}), key_reader(key_), value_reader(value_), def_level(def_level_), rep_level(rep_level_)
{
}
~MapColumnReader() override = default;
void read(MutableColumnPtr & column, OptionalRowSet & row_set, size_t rows_to_read) override;
int16_t maxDefinitionLevel() const override { return def_level; }
int16_t maxRepetitionLevel() const override { return rep_level; }
void computeRowSet(std::optional<RowSet> & row_set, size_t rows_to_read) override;
MutableColumnPtr createColumn() override;
size_t skipValuesInCurrentPage(size_t rows_to_skip) override;
private:
SelectiveColumnReaderPtr key_reader;
SelectiveColumnReaderPtr value_reader;
int16_t def_level = 0;
int16_t rep_level = 0;
};
class StructColumnReader : public SelectiveColumnReader
{
public:
StructColumnReader(const std::unordered_map<String, SelectiveColumnReaderPtr> & children_, DataTypePtr structType_)
: SelectiveColumnReader(nullptr, ScanSpec{}), children(children_), structType(structType_)
{
}
~StructColumnReader() override = default;
void read(MutableColumnPtr & column, OptionalRowSet & row_set, size_t rows_to_read) override;
void computeRowSet(OptionalRowSet & row_set, size_t rows_to_read) override;
MutableColumnPtr createColumn() override;
size_t skipValuesInCurrentPage(size_t rows_to_skip) override;
private:
std::unordered_map<String, SelectiveColumnReaderPtr> children;
DataTypePtr structType;
};
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,15 @@
#pragma once
_Pragma("clang diagnostic push")
_Pragma("clang diagnostic ignored \"-Wold-style-cast\"")
_Pragma("clang diagnostic ignored \"-Wreserved-identifier\"")
_Pragma("clang diagnostic ignored \"-Wdeprecated-redundant-constexpr-static-def\"")
_Pragma("clang diagnostic ignored \"-Wdocumentation\"")
_Pragma("clang diagnostic ignored \"-Wzero-as-null-pointer-constant\"")
_Pragma("clang diagnostic ignored \"-Wcast-align\"")
_Pragma("clang diagnostic ignored \"-Wshorten-64-to-32\"")
_Pragma("clang diagnostic ignored \"-Wfloat-conversion\"")
_Pragma("clang diagnostic ignored \"-Wunused-but-set-variable\"")
_Pragma("clang diagnostic ignored \"-Wundef\"")
#include <xsimd/xsimd.hpp>
_Pragma("clang diagnostic pop")

View File

@ -29,6 +29,8 @@
#include <Processors/Formats/Impl/Parquet/ParquetRecordReader.h>
#include <Processors/Formats/Impl/Parquet/ParquetBloomFilterCondition.h>
#include <Interpreters/convertFieldToType.h>
#include <Processors/Formats/Impl/Parquet/ParquetReader.h>
#include <Processors/Formats/Impl/Parquet/ColumnFilterHelper.h>
namespace ProfileEvents
{
@ -629,6 +631,21 @@ void ParquetBlockInputFormat::initializeIfNeeded()
auto rows = adaptive_chunk_size(row_group);
row_group_batches.back().adaptive_chunk_size = rows ? rows : format_settings.parquet.max_block_size;
}
if (format_settings.parquet.use_native_reader)
{
auto * seekable_in = dynamic_cast<SeekableReadBuffer *>(in);
if (!seekable_in)
throw DB::Exception(ErrorCodes::PARQUET_EXCEPTION, "native ParquetReader only supports SeekableReadBuffer");
new_native_reader = std::make_shared<ParquetReader>(
getPort().getHeader(),
*seekable_in,
parquet::ArrowReaderProperties{},
parquet::ReaderProperties(ArrowMemoryPool::instance()),
arrow_file,
format_settings);
if (filter.has_value())
pushFilterToParquetReader(filter.value(), *new_native_reader);
}
}
void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_batch_idx)
@ -686,7 +703,6 @@ void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_bat
// That version is >10 years old, so this is not very important.
if (metadata->writer_version().VersionLt(parquet::ApplicationVersion::PARQUET_816_FIXED_VERSION()))
arrow_properties.set_pre_buffer(false);
if (format_settings.parquet.use_native_reader)
{
#pragma clang diagnostic push
@ -696,14 +712,14 @@ void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_bat
ErrorCodes::BAD_ARGUMENTS,
"parquet native reader only supports little endian system currently");
#pragma clang diagnostic pop
row_group_batch.native_record_reader = std::make_shared<ParquetRecordReader>(
getPort().getHeader(),
arrow_properties,
reader_properties,
arrow_file,
format_settings,
row_group_batch.row_groups_idxs);
row_group_batch.row_group_chunk_reader = new_native_reader->getSubRowGroupRangeReader(row_group_batch.row_groups_idxs);
// row_group_batch.native_record_reader = std::make_shared<ParquetRecordReader>(
// getPort().getHeader(),
// arrow_properties,
// reader_properties,
// arrow_file,
// format_settings,
// row_group_batch.row_groups_idxs);
}
else
{
@ -836,6 +852,7 @@ void ParquetBlockInputFormat::decodeOneChunk(size_t row_group_batch_idx, std::un
lock.unlock();
auto end_of_row_group = [&] {
row_group_batch.row_group_chunk_reader.reset();
row_group_batch.native_record_reader.reset();
row_group_batch.arrow_column_to_ch_column.reset();
row_group_batch.record_batch_reader.reset();
@ -855,7 +872,7 @@ void ParquetBlockInputFormat::decodeOneChunk(size_t row_group_batch_idx, std::un
return static_cast<size_t>(std::ceil(static_cast<double>(row_group_batch.total_bytes_compressed) / row_group_batch.total_rows * num_rows));
};
if (!row_group_batch.record_batch_reader && !row_group_batch.native_record_reader)
if (!row_group_batch.record_batch_reader && !row_group_batch.row_group_chunk_reader)
initializeRowGroupBatchReader(row_group_batch_idx);
PendingChunk res(getPort().getHeader().columns());
@ -864,7 +881,7 @@ void ParquetBlockInputFormat::decodeOneChunk(size_t row_group_batch_idx, std::un
if (format_settings.parquet.use_native_reader)
{
auto chunk = row_group_batch.native_record_reader->readChunk();
auto chunk = row_group_batch.row_group_chunk_reader->read(row_group_batch.adaptive_chunk_size);
if (!chunk)
{
end_of_row_group();
@ -1025,6 +1042,18 @@ const BlockMissingValues * ParquetBlockInputFormat::getMissingValues() const
{
return &previous_block_missing_values;
}
void ParquetBlockInputFormat::setKeyCondition(const std::optional<ActionsDAG> & expr, ContextPtr context)
{
if (expr.has_value())
filter = std::optional(expr.value().clone());
SourceWithKeyCondition::setKeyCondition(expr, context);
}
void ParquetBlockInputFormat::setKeyCondition(const std::shared_ptr<const KeyCondition> & key_condition_)
{
filter = std::optional(key_condition_->getFilterDagCopy());
SourceWithKeyCondition::setKeyCondition(key_condition_);
}
ParquetSchemaReader::ParquetSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_)
: ISchemaReader(in_), format_settings(format_settings_)

View File

@ -23,6 +23,8 @@ namespace DB
class ArrowColumnToCHColumn;
class ParquetRecordReader;
class ParquetReader;
class SubRowGroupRangeReader;
// Parquet files contain a metadata block with the following information:
// * list of columns,
@ -72,6 +74,9 @@ public:
size_t getApproxBytesReadForChunk() const override { return previous_approx_bytes_read_for_chunk; }
void setKeyCondition(const std::optional<ActionsDAG> & expr, ContextPtr context) override;
void setKeyCondition(const std::shared_ptr<const KeyCondition> & key_condition_) override;
private:
Chunk read() override;
@ -231,6 +236,7 @@ private:
std::unique_ptr<parquet::arrow::FileReader> file_reader;
std::unique_ptr<RowGroupPrefetchIterator> prefetch_iterator;
std::shared_ptr<arrow::RecordBatchReader> record_batch_reader;
std::unique_ptr<SubRowGroupRangeReader> row_group_chunk_reader;
std::unique_ptr<ArrowColumnToCHColumn> arrow_column_to_ch_column;
};
@ -338,6 +344,8 @@ private:
std::exception_ptr background_exception = nullptr;
std::atomic<int> is_stopped{0};
bool is_initialized = false;
std::shared_ptr<ParquetReader> new_native_reader = nullptr;
std::optional<ActionsDAG> filter = std::nullopt;
};
class ParquetSchemaReader : public ISchemaReader

View File

@ -0,0 +1,248 @@
#include <gtest/gtest.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Formats/Impl/ArrowBufferedStreams.h>
#include <Processors/Formats/Impl/Parquet/ParquetReader.h>
#include <Processors/Formats/Impl/ParquetBlockInputFormat.h>
#include <Processors/Formats/Impl/ParquetBlockOutputFormat.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Functions/FunctionFactory.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromFile.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypesNumber.h>
#include <memory>
using namespace DB;
template<class T, class S>
static void testFilterPlainFixedData(int size, double positive_rate)
{
PaddedPODArray<T> data;
PaddedPODArray<S> src;
PaddedPODArray<T> expected;
RowSet set(size);
data.reserve(size);
for (size_t i = 0; i < size; ++i)
{
auto value = std::rand() % 100000;
src.push_back(value);
(std::rand() % 100 + 1) < positive_rate * 100 ? set.set(i, true) : set.set(i, false);
if (set.get(i))
{
expected.push_back(value);
}
}
FilterHelper::filterPlainFixedData(src.data(), data, set, src.size());
ASSERT_EQ(expected.size(), data.size());
for (size_t i = 0; i < expected.size(); ++i)
{
ASSERT_EQ(expected[i], data[i]);
}
}
TEST(TestColumnFilterHepler, TestFilterPlainFixedData)
{
testFilterPlainFixedData<Int16, Int32>(100000, 1.0);
testFilterPlainFixedData<Int16, Int16>(100000, 1.0);
testFilterPlainFixedData<UInt16, Int32>(100000, 1.0);
testFilterPlainFixedData<UInt16, UInt16>(100000, 1.0);
testFilterPlainFixedData<Int32, Int32>(100000, 1.0);
testFilterPlainFixedData<Int64, Int64>(100000, 1.0);
testFilterPlainFixedData<Float32, Float32>(100000, 1.0);
testFilterPlainFixedData<Float64, Float64>(100000, 1.0);
testFilterPlainFixedData<DateTime64, Int64>(100000, 1.0);
testFilterPlainFixedData<DateTime64, DateTime64>(100000, 1.0);
testFilterPlainFixedData<Int16, Int32>(100000, 0);
testFilterPlainFixedData<Int16, Int16>(100000, 0);
testFilterPlainFixedData<UInt16, Int32>(100000, 0);
testFilterPlainFixedData<UInt16, UInt16>(100000, 0);
testFilterPlainFixedData<Int32, Int32>(100000, 0);
testFilterPlainFixedData<Int64, Int64>(100000, 0);
testFilterPlainFixedData<Float32, Float32>(100000, 0);
testFilterPlainFixedData<Float64, Float64>(100000, 0);
testFilterPlainFixedData<DateTime64, Int64>(100000, 0);
testFilterPlainFixedData<DateTime64, DateTime64>(100000, 0);
testFilterPlainFixedData<Int16, Int32>(100000, 0.9);
testFilterPlainFixedData<Int16, Int16>(100000, 0.9);
testFilterPlainFixedData<UInt16, Int32>(100000, 0.9);
testFilterPlainFixedData<UInt16, UInt16>(100000, 0.9);
testFilterPlainFixedData<Int32, Int32>(100000, 0.9);
testFilterPlainFixedData<Int64, Int64>(100000, 0.9);
testFilterPlainFixedData<Float32, Float32>(100000, 0.9);
testFilterPlainFixedData<Float64, Float64>(100000, 0.9);
testFilterPlainFixedData<DateTime64, Int64>(100000, 0.9);
testFilterPlainFixedData<DateTime64, DateTime64>(100000, 0.9);
}
template<class T>
static void testGatherDictInt()
{
int size = 10000;
PaddedPODArray<T> data;
PaddedPODArray<T> dict;
std::unordered_map<T, Int32> map;
PaddedPODArray<T> dist;
PaddedPODArray<Int32> idx;
for (size_t i = 0; i < size; ++i)
{
auto value = std::rand() % 10000;
data.push_back(value);
if (map.find(value) == map.end())
{
map[value] = static_cast<Int32>(dict.size());
dict.push_back(value);
}
idx.push_back(map[value]);
}
dist.reserve(data.size());
FilterHelper::gatherDictFixedValue(dict, dist, idx, data.size());
ASSERT_EQ(data.size(), dist.size());
for (size_t i = 0; i < data.size(); ++i)
{
ASSERT_EQ(data[i], dist[i]);
}
}
TEST(TestColumnFilterHepler, TestGatherDictNumberData)
{
testGatherDictInt<Int16>();
testGatherDictInt<Int32>();
testGatherDictInt<Int64>();
testGatherDictInt<Float32>();
testGatherDictInt<Float64>();
testGatherDictInt<DateTime64>();
}
template <class T, class S>
static void testDecodePlainData(size_t numbers, size_t exist_nums)
{
PaddedPODArray<S> src;
for (size_t i = 0; i < numbers; ++i)
{
src.push_back(std::rand() % 10000);
}
PaddedPODArray<T> dst;
for (size_t i = 0; i < exist_nums; ++i)
{
dst.push_back(std::rand() % 10000);
}
dst.reserve(exist_nums + src.size());
size_t size = src.size();
const auto * buffer = reinterpret_cast<const uint8_t*>(src.data());
PlainDecoder decoder(buffer, size);
OptionalRowSet row_set = std::nullopt;
decoder.decodeFixedValue<T, S>(dst, row_set, size);
ASSERT_EQ(dst.size(), exist_nums + src.size());
for (size_t i = 0; i < src.size(); ++i)
{
ASSERT_EQ(src[i], dst[exist_nums+i]);
}
}
TEST(TestPlainDecoder, testDecodeNoFilter)
{
testDecodePlainData<DateTime64, Int64>(1000, 100);
testDecodePlainData<DateTime64, Int64>(1000, 0);
testDecodePlainData<Int64, Int64>(1000, 100);
testDecodePlainData<Int64, Int64>(1000, 0);
testDecodePlainData<Int32, Int32>(1000, 100);
testDecodePlainData<Int32, Int32>(1000, 0);
testDecodePlainData<Int16, Int32>(1000, 100);
testDecodePlainData<Int16, Int32>(1000, 0);
}
TEST(TestRowSet, TestRowSet)
{
RowSet rowSet(10000);
rowSet.setAllFalse();
rowSet.set(100, true);
rowSet.set(1234, true);
ASSERT_EQ(2, rowSet.count());
ASSERT_FALSE(rowSet.none());
ASSERT_TRUE(rowSet.any());
ASSERT_FALSE(rowSet.all());
}
TEST(TestColumnFilter, TestColumnIntFilter)
{
BigIntRangeFilter filter(100, 200, false);
BigIntRangeFilter filter2(200, 200, false);
ASSERT_TRUE(!filter.testInt16(99));
ASSERT_TRUE(filter.testInt16(100));
ASSERT_TRUE(filter.testInt16(150));
ASSERT_TRUE(filter.testInt16(200));
ASSERT_TRUE(filter2.testInt16(200));
ASSERT_TRUE(!filter.testInt16(210));
ASSERT_TRUE(!filter2.testInt16(210));
ASSERT_TRUE(!filter.testInt32(99));
ASSERT_TRUE(filter.testInt32(100));
ASSERT_TRUE(filter.testInt32(150));
ASSERT_TRUE(filter.testInt32(200));
ASSERT_TRUE(filter2.testInt32(200));
ASSERT_TRUE(!filter.testInt32(210));
ASSERT_TRUE(!filter2.testInt32(210));
ASSERT_TRUE(!filter.testInt64(99));
ASSERT_TRUE(filter.testInt64(100));
ASSERT_TRUE(filter.testInt64(150));
ASSERT_TRUE(filter.testInt64(200));
ASSERT_TRUE(filter2.testInt64(200));
ASSERT_TRUE(!filter.testInt64(210));
ASSERT_TRUE(!filter2.testInt64(210));
PaddedPODArray<Int16> int16_values = {99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 150, 200, 210, 211, 231, 24, 25, 26, 27, 28,
99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 150, 200, 210, 211, 231, 24, 25, 26, 27, 28};
RowSet set1(int16_values.size());
filter.testInt16Values(set1, 0, int16_values.size(), int16_values.data());
ASSERT_EQ(set1.count(), 22);
set1.setAllTrue();
filter2.testInt16Values(set1, 0, int16_values.size(), int16_values.data());
ASSERT_EQ(set1.count(), 2);
PaddedPODArray<Int32> int32_values = {99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 150, 200, 210, 211, 231, 24, 25, 26, 27, 28,
99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 150, 200, 210, 211, 231, 24, 25, 26, 27, 28};
RowSet set2(int32_values.size());
filter.testInt32Values(set2, 0, int32_values.size(), int32_values.data());
ASSERT_EQ(set2.count(), 22);
set2.setAllTrue();
filter2.testInt32Values(set2, 0, int32_values.size(), int32_values.data());
ASSERT_EQ(set2.count(), 2);
PaddedPODArray<Int64> int64_values = {99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 150, 200, 210, 211, 231, 24, 25, 26, 27, 28,
99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 150, 200, 210, 211, 231, 24, 25, 26, 27, 28};
RowSet set3(int64_values.size());
filter.testInt64Values(set3, 0, int64_values.size(), int64_values.data());
ASSERT_EQ(set3.count(), 22);
set3.setAllTrue();
filter2.testInt64Values(set3, 0, int64_values.size(), int64_values.data());
ASSERT_EQ(set3.count(), 2);
NegatedBigIntRangeFilter negated_filter(200, 200, false);
ASSERT_FALSE(negated_filter.testInt16(200));
ASSERT_FALSE(negated_filter.testInt32(200));
ASSERT_FALSE(negated_filter.testInt64(200));
RowSet row_set4 = RowSet(int16_values.size());
negated_filter.testInt16Values(row_set4, 0, int16_values.size(), int16_values.data());
ASSERT_EQ(38, row_set4.count());
RowSet row_set5 = RowSet(int32_values.size());
negated_filter.testInt32Values(row_set5, 0, int32_values.size(), int32_values.data());
ASSERT_EQ(38, row_set5.count());
RowSet row_set6 = RowSet(int64_values.size());
negated_filter.testInt64Values(row_set6, 0, int64_values.size(), int64_values.data());
ASSERT_EQ(38, row_set6.count());
}

View File

@ -852,7 +852,7 @@ KeyCondition::KeyCondition(
}
has_filter = true;
filter_expr = filter_dag->clone();
/** When non-strictly monotonic functions are employed in functional index (e.g. ORDER BY toStartOfHour(dateTime)),
* the use of NOT operator in predicate will result in the indexing algorithm leave out some data.
* This is caused by rewriting in KeyCondition::tryParseAtomFromAST of relational operators to less strict

View File

@ -134,6 +134,11 @@ public:
DataTypePtr current_type,
bool single_point = false);
ActionsDAG getFilterDagCopy() const
{
return filter_expr.clone();
}
static ActionsDAG cloneASTWithInversionPushDown(ActionsDAG::NodeRawConstPtrs nodes, const ContextPtr & context);
bool matchesExactContinuousRange() const;
@ -350,6 +355,7 @@ private:
/// This flag identify whether there are filters.
bool has_filter;
ActionsDAG filter_expr;
ColumnIndices key_columns;
std::vector<size_t> key_indices;

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,141 @@
-- { echoOn }
-- support data types: Int16, Int32, Int64, Float32, Float64, String, Date32, DateTime64, Date, DateTime
drop table if exists test_native_parquet;
create table test_native_parquet (i16 Int16, i32 Int32, i64 Int64, float Float32, double Float64, string String, date32 Date32, time64 DateTime64, date Date, time DateTime) engine=File(Parquet) settings input_format_parquet_use_native_reader=true;
insert into test_native_parquet select number, number, number+1, 0.1*number, 0.2*number, toString(number), number, toDateTime('2024-10-11 00:00:00') + number, number, toDateTime('2024-10-11 00:00:00') + number from numbers(10000);
-- test int16
select sum(i16) from test_native_parquet;
select * from test_native_parquet where i16 < 10;
select * from test_native_parquet where i16 <= 10;
select * from test_native_parquet where i16 between 10 and 20;
select * from test_native_parquet where i16 > 9990;
select * from test_native_parquet where i16 >= 9990;
-- test int32
select sum(i32) from test_native_parquet;
select * from test_native_parquet where i32 < 10;
select * from test_native_parquet where i32 <= 10;
select * from test_native_parquet where i32 between 10 and 20;
select * from test_native_parquet where i32 > 9990;
select * from test_native_parquet where i32 >= 9990;
-- test int64
select sum(i64) from test_native_parquet;
select * from test_native_parquet where i64 < 10;
select * from test_native_parquet where i64 <= 10;
select * from test_native_parquet where i64 between 10 and 20;
select * from test_native_parquet where i64 > 9990;
select * from test_native_parquet where i64 >= 9990;
-- test float
select sum(float) from test_native_parquet;
select * from test_native_parquet where float < 1;
select * from test_native_parquet where float <= 1;
select * from test_native_parquet where float between 1 and 2;
select * from test_native_parquet where float > 999;
select * from test_native_parquet where float >= 999;
-- test double
select sum(double) from test_native_parquet;
select * from test_native_parquet where double < 2;
select * from test_native_parquet where double <= 2;
select * from test_native_parquet where double between 2 and 4;
select * from test_native_parquet where double > 9990 *0.2;
select * from test_native_parquet where double >= 9990 *0.2;
-- test date
select max(date32) from test_native_parquet;
select * from test_native_parquet where date32 < '1970-01-10';
select * from test_native_parquet where date32 <= '1970-01-10';
select * from test_native_parquet where date32 between '1970-01-10' and '1970-01-20';
select * from test_native_parquet where date32 > '1970-01-10';
select * from test_native_parquet where date32 >= '1970-01-10';
-- test datetime
select max(time64) from test_native_parquet;
-- test String
select max(string) from test_native_parquet;
select * from test_native_parquet where string = '1';
select * from test_native_parquet where string in ('1','2','3');
-- test date
select max(date) from test_native_parquet;
select * from test_native_parquet where date < '1970-01-10';
select * from test_native_parquet where date <= '1970-01-10';
select * from test_native_parquet where date between '1970-01-10' and '1970-01-20';
select * from test_native_parquet where date > '1970-01-10';
select * from test_native_parquet where date >= '1970-01-10';
-- test datetime
select max(time) from test_native_parquet;
drop table if exists test_nullable_native_parquet;
create table test_nullable_native_parquet (i16 Nullable(Int16), i32 Nullable(Int32), i64 Nullable(Int64), float Nullable(Float32), double Nullable(Float64), string Nullable(String), date32 Nullable(Date32), time64 Nullable(DateTime64), date Nullable(Date), time Nullable(DateTime)) engine=File(Parquet) settings input_format_parquet_use_native_reader=true;
insert into test_nullable_native_parquet select if(number%5, number, NULL), if(number%5, number, NULL), if(number%5, number+1, NULL), if(number%5, 0.1*number, NULL), if(number%5, 0.2*number, NULL), toString(number), if(number%5, number, NULL), if(number%5, toDateTime('2024-10-11 00:00:00') + number, NULL), if(number%5, number, NULL), if(number%5, toDateTime('2024-10-11 00:00:00') + number, NULL) from numbers(10000);
-- test int16
select sum(i16) from test_nullable_native_parquet;
select * from test_nullable_native_parquet where i16 < 10;
select * from test_nullable_native_parquet where i16 <= 10;
select * from test_nullable_native_parquet where i16 between 10 and 20;
select * from test_nullable_native_parquet where i16 > 9990;
select * from test_nullable_native_parquet where i16 >= 9990;
-- test int32
select sum(i32) from test_nullable_native_parquet;
select * from test_nullable_native_parquet where i32 < 10;
select * from test_nullable_native_parquet where i32 <= 10;
select * from test_nullable_native_parquet where i32 between 10 and 20;
select * from test_nullable_native_parquet where i32 > 9990;
select * from test_nullable_native_parquet where i32 >= 9990;
-- test int64
select sum(i64) from test_nullable_native_parquet;
select * from test_nullable_native_parquet where i64 < 10;
select * from test_nullable_native_parquet where i64 <= 10;
select * from test_nullable_native_parquet where i64 between 10 and 20;
select * from test_nullable_native_parquet where i64 > 9990;
select * from test_nullable_native_parquet where i64 >= 9990;
-- test float
select sum(float) from test_nullable_native_parquet;
select * from test_nullable_native_parquet where float < 1;
select * from test_nullable_native_parquet where float <= 1;
select * from test_nullable_native_parquet where float between 1 and 2;
select * from test_nullable_native_parquet where float > 999;
select * from test_nullable_native_parquet where float >= 999;
-- test double
select sum(double) from test_nullable_native_parquet;
select * from test_nullable_native_parquet where double < 2;
select * from test_nullable_native_parquet where double <= 2;
select * from test_nullable_native_parquet where double between 2 and 4;
select * from test_nullable_native_parquet where double > 9990 *0.2;
select * from test_nullable_native_parquet where double >= 9990 *0.2;
-- test date
select max(date32) from test_nullable_native_parquet;
select * from test_nullable_native_parquet where date32 < '1970-01-10';
select * from test_nullable_native_parquet where date32 <= '1970-01-10';
select * from test_nullable_native_parquet where date32 between '1970-01-10' and '1970-01-20';
select * from test_nullable_native_parquet where date32 > '1970-01-10';
select * from test_nullable_native_parquet where date32 >= '1970-01-10';
-- test datetime
select max(time64) from test_nullable_native_parquet;
-- test String
select max(string) from test_nullable_native_parquet;
select * from test_nullable_native_parquet where string = '1';
select * from test_nullable_native_parquet where string in ('1','2','3');
-- test date
select max(date) from test_nullable_native_parquet;
select * from test_nullable_native_parquet where date < '1970-01-10';
select * from test_nullable_native_parquet where date <= '1970-01-10';
select * from test_nullable_native_parquet where date between '1970-01-10' and '1970-01-20';
select * from test_nullable_native_parquet where date > '1970-01-10';
select * from test_nullable_native_parquet where date >= '1970-01-10';
-- test datetime
select max(time) from test_nullable_native_parquet;