mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
support full filter push down
This commit is contained in:
parent
aa1df9f5c7
commit
a2814258bc
@ -765,4 +765,34 @@ bool RowSet::any() const
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
IColumn::Filter ExpressionFilter::execute(const ColumnsWithTypeAndName & columns)
|
||||
{
|
||||
auto block = Block(columns);
|
||||
actions->execute(block);
|
||||
auto filter_column = block.getByName(filter_name).column->assumeMutable();
|
||||
ColumnUInt8 * uint8_col = static_cast<ColumnUInt8 *>(filter_column.get());
|
||||
IColumn::Filter filter;
|
||||
filter.swap(uint8_col->getData());
|
||||
return filter;
|
||||
}
|
||||
NameSet ExpressionFilter::getInputs()
|
||||
{
|
||||
NameSet result;
|
||||
auto inputs = actions->getActionsDAG().getInputs();
|
||||
for (const auto & input : inputs)
|
||||
{
|
||||
result.insert(input->result_name);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
ExpressionFilter::ExpressionFilter(ActionsDAG && dag_)
|
||||
{
|
||||
actions = std::make_shared<ExpressionActions>(std::move(dag_));
|
||||
filter_name = actions->getActionsDAG().getOutputs().front()->result_name;
|
||||
if (!isUInt8(actions->getActionsDAG().getOutputs().front()->result_type))
|
||||
{
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Filter result type must be UInt8");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -4,6 +4,7 @@
|
||||
#include <Columns/ColumnsCommon.h>
|
||||
#include <Functions/IFunction.h>
|
||||
#include <Interpreters/ActionsDAG.h>
|
||||
#include <Interpreters/ExpressionActions.h>
|
||||
#include <base/types.h>
|
||||
#include <boost/dynamic_bitset.hpp>
|
||||
#include <Common/Exception.h>
|
||||
@ -97,6 +98,19 @@ public:
|
||||
size_t rows_to_read);
|
||||
};
|
||||
|
||||
class ExpressionFilter
|
||||
{
|
||||
public:
|
||||
explicit ExpressionFilter(ActionsDAG && dag_);
|
||||
NameSet getInputs();
|
||||
|
||||
IColumn::Filter execute(const ColumnsWithTypeAndName & columns);
|
||||
|
||||
private:
|
||||
ExpressionActionsPtr actions;
|
||||
String filter_name;
|
||||
};
|
||||
|
||||
class ColumnFilter
|
||||
{
|
||||
protected:
|
||||
|
@ -4,6 +4,36 @@ namespace DB
|
||||
{
|
||||
|
||||
ColumnFilterCreators ColumnFilterHelper::creators = {BigIntRangeFilter::create, NegatedBigIntRangeFilter::create, createFloatRangeFilter, ByteValuesFilter::create, NegatedByteValuesFilter::create};
|
||||
FilterSplitResult ColumnFilterHelper::splitFilterForPushDown(const ActionsDAG & filter_expression)
|
||||
{
|
||||
if (filter_expression.getOutputs().empty())
|
||||
return {};
|
||||
const auto * filter_node = filter_expression.getOutputs().front();
|
||||
auto conditions = ActionsDAG::extractConjunctionAtoms(filter_node);
|
||||
std::vector<ColumnFilterPtr> filters;
|
||||
ActionsDAG::NodeRawConstPtrs unsupported_conditions;
|
||||
FilterSplitResult split_result;
|
||||
for (const auto * condition : conditions)
|
||||
{
|
||||
if (std::none_of(creators.begin(), creators.end(), [&](ColumnFilterCreator & creator) {
|
||||
auto result = creator(*condition);
|
||||
if (result.has_value())
|
||||
split_result.filters[result.value().first].emplace_back(result.value().second);
|
||||
return result.has_value();
|
||||
}))
|
||||
unsupported_conditions.push_back(condition);
|
||||
}
|
||||
for (auto & condition : unsupported_conditions)
|
||||
{
|
||||
auto actions_dag = ActionsDAG::buildFilterActionsDAG({condition});
|
||||
if (actions_dag.has_value())
|
||||
{
|
||||
split_result.expression_filters.emplace_back(std::make_shared<ExpressionFilter>(std::move(actions_dag.value())));
|
||||
}
|
||||
}
|
||||
return split_result;
|
||||
}
|
||||
|
||||
void pushFilterToParquetReader(const ActionsDAG& filter_expression, ParquetReader & reader)
|
||||
{
|
||||
if (filter_expression.getOutputs().empty()) return ;
|
||||
@ -15,7 +45,9 @@ void pushFilterToParquetReader(const ActionsDAG& filter_expression, ParquetReade
|
||||
reader.addFilter(item.first, filter);
|
||||
}
|
||||
}
|
||||
if (split_result.remain_filter.has_value())
|
||||
reader.setRemainFilter(split_result.remain_filter);
|
||||
for (auto & expression_filter : split_result.expression_filters)
|
||||
{
|
||||
reader.addExpressionFilter(expression_filter);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -11,39 +11,14 @@ using ColumnFilterCreators = std::vector<ColumnFilterCreator>;
|
||||
struct FilterSplitResult
|
||||
{
|
||||
std::unordered_map<String, std::vector<ColumnFilterPtr>> filters;
|
||||
std::optional<ActionsDAG> remain_filter = std::nullopt;
|
||||
std::vector<std::shared_ptr<ExpressionFilter>> expression_filters;
|
||||
};
|
||||
|
||||
class ColumnFilterHelper
|
||||
{
|
||||
public:
|
||||
|
||||
static FilterSplitResult splitFilterForPushDown(const ActionsDAG& filter_expression)
|
||||
{
|
||||
if (filter_expression.getOutputs().empty())
|
||||
return {};
|
||||
const auto * filter_node = filter_expression.getOutputs().front();
|
||||
auto conditions = ActionsDAG::extractConjunctionAtoms(filter_node);
|
||||
std::vector<ColumnFilterPtr> filters;
|
||||
ActionsDAG::NodeRawConstPtrs unsupported_conditions;
|
||||
FilterSplitResult split_result;
|
||||
for (const auto * condition : conditions)
|
||||
{
|
||||
if (std::none_of(creators.begin(), creators.end(), [&](ColumnFilterCreator & creator) {
|
||||
auto result = creator(*condition);
|
||||
if (result.has_value())
|
||||
split_result.filters[result.value().first].emplace_back(result.value().second);
|
||||
return result.has_value();
|
||||
}))
|
||||
unsupported_conditions.push_back(condition);
|
||||
}
|
||||
if (!unsupported_conditions.empty())
|
||||
{
|
||||
auto remain_filter = ActionsDAG::buildFilterActionsDAG(unsupported_conditions);
|
||||
split_result.remain_filter = std::move(remain_filter);
|
||||
}
|
||||
return split_result;
|
||||
}
|
||||
static FilterSplitResult splitFilterForPushDown(const ActionsDAG& filter_expression);
|
||||
|
||||
private:
|
||||
static ColumnFilterCreators creators;
|
||||
|
@ -118,6 +118,10 @@ std::unique_ptr<SubRowGroupRangeReader> ParquetReader::getSubRowGroupRangeReader
|
||||
}
|
||||
return std::make_unique<SubRowGroupRangeReader>(row_group_indices_, std::move(row_group_prefetches), [&](const size_t idx, RowGroupPrefetchPtr prefetch) { return getRowGroupChunkReader(idx, std::move(prefetch)); });
|
||||
}
|
||||
void ParquetReader::addExpressionFilter(std::shared_ptr<ExpressionFilter> filter)
|
||||
{
|
||||
expression_filters.emplace_back(filter);
|
||||
}
|
||||
|
||||
|
||||
SubRowGroupRangeReader::SubRowGroupRangeReader(const std::vector<Int32> & rowGroupIndices, std::vector<RowGroupPrefetchPtr>&& row_group_prefetches_, RowGroupReaderCreator && creator)
|
||||
|
@ -50,6 +50,7 @@ public:
|
||||
|
||||
Block read();
|
||||
void addFilter(const String & column_name, ColumnFilterPtr filter);
|
||||
void addExpressionFilter(std::shared_ptr<ExpressionFilter> filter);
|
||||
void setRemainFilter(std::optional<ActionsDAG> & expr);
|
||||
std::unique_ptr<RowGroupChunkReader> getRowGroupChunkReader(size_t row_group_idx, RowGroupPrefetchPtr prefetch);
|
||||
std::unique_ptr<SubRowGroupRangeReader> getSubRowGroupRangeReader(std::vector<Int32> row_group_indices);
|
||||
@ -72,6 +73,7 @@ private:
|
||||
size_t next_row_group_idx = 0;
|
||||
std::shared_ptr<parquet::FileMetaData> meta_data;
|
||||
std::unordered_map<String, parquet::schema::NodePtr> parquet_columns;
|
||||
std::vector<std::shared_ptr<ExpressionFilter>> expression_filters;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -1,10 +1,10 @@
|
||||
#include "RowGroupChunkReader.h"
|
||||
#include <Processors/Formats/Impl/Parquet/ParquetColumnReaderFactory.h>
|
||||
#include <Processors/Formats/Impl/Parquet/ParquetReader.h>
|
||||
#include <Columns/FilterDescription.h>
|
||||
#include <IO/SharedThreadPools.h>
|
||||
#include <Common/threadPoolCallbackRunner.h>
|
||||
#include <Processors/Formats/Impl/Parquet/ParquetColumnReaderFactory.h>
|
||||
#include <Processors/Formats/Impl/Parquet/ParquetReader.h>
|
||||
#include <arrow/io/util_internal.h>
|
||||
#include <Common/threadPoolCallbackRunner.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -15,10 +15,6 @@ Chunk RowGroupChunkReader::readChunk(size_t rows)
|
||||
return {};
|
||||
rows = std::min(rows, remain_rows);
|
||||
MutableColumns columns;
|
||||
for (auto & reader : column_readers)
|
||||
{
|
||||
columns.push_back(reader->createColumn());
|
||||
}
|
||||
size_t rows_read = 0;
|
||||
while (!rows_read)
|
||||
{
|
||||
@ -36,66 +32,61 @@ Chunk RowGroupChunkReader::readChunk(size_t rows)
|
||||
if (!rows_to_read)
|
||||
break;
|
||||
|
||||
OptionalRowSet row_set = std::nullopt;
|
||||
if (!filter_columns.empty())
|
||||
row_set = std::optional(RowSet(rows_to_read));
|
||||
if (row_set.has_value())
|
||||
for (auto & column : filter_columns)
|
||||
{
|
||||
reader_columns_mapping[column]->computeRowSet(row_set, rows_to_read);
|
||||
if (row_set.value().none())
|
||||
break;
|
||||
}
|
||||
bool skip_all = false;
|
||||
if (row_set.has_value())
|
||||
skip_all = row_set.value().none();
|
||||
if (skip_all)
|
||||
auto select_result = selectConditions->selectRows(rows_to_read);
|
||||
|
||||
if (select_result.skip_all)
|
||||
{
|
||||
metrics.skipped_rows += rows_to_read;
|
||||
}
|
||||
|
||||
bool all = true;
|
||||
if (row_set.has_value())
|
||||
all = row_set.value().all();
|
||||
bool all = select_result.valid_count == rows_to_read;
|
||||
if (all)
|
||||
row_set = std::nullopt;
|
||||
if (!skip_all)
|
||||
for (auto & column : columns)
|
||||
{
|
||||
if (all)
|
||||
column->reserve(rows);
|
||||
else
|
||||
column->reserve(row_set.value().count());
|
||||
}
|
||||
for (size_t i = 0; i < column_readers.size(); i++)
|
||||
select_result.set = std::nullopt;
|
||||
auto column_names = parquet_reader->header.getNames();
|
||||
if (select_result.skip_all)
|
||||
{
|
||||
if (skip_all)
|
||||
column_readers[i]->skip(rows_to_read);
|
||||
else
|
||||
column_readers[i]->read(columns[i], row_set, rows_to_read);
|
||||
metrics.filtered_rows += rows_to_read;
|
||||
rows_read = 0;
|
||||
for (const auto & name : column_names)
|
||||
{
|
||||
if (!select_result.intermediate_columns.contains(name))
|
||||
{
|
||||
reader_columns_mapping.at(name)->skip(rows_to_read);
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
for (const auto & name : column_names)
|
||||
{
|
||||
if (select_result.intermediate_columns.contains(name))
|
||||
{
|
||||
if (all)
|
||||
columns.emplace_back(select_result.intermediate_columns.at(name)->assumeMutable());
|
||||
else
|
||||
columns.emplace_back(
|
||||
select_result.intermediate_columns.at(name)->filter(select_result.intermediate_filter, select_result.valid_count)->assumeMutable());
|
||||
}
|
||||
else
|
||||
{
|
||||
auto & reader = reader_columns_mapping.at(name);
|
||||
auto column = reader->createColumn();
|
||||
column->reserve(select_result.valid_count);
|
||||
reader->read(column, select_result.set, rows_to_read);
|
||||
columns.emplace_back(std::move(column));
|
||||
}
|
||||
}
|
||||
metrics.filtered_rows += (rows_to_read - (columns[0]->size() - rows_read));
|
||||
rows_read = columns[0]->size();
|
||||
}
|
||||
remain_rows -= rows_to_read;
|
||||
metrics.filtered_rows += (rows_to_read - (columns[0]->size() - rows_read));
|
||||
rows_read = columns[0]->size();
|
||||
}
|
||||
|
||||
// if (parquet_reader->remain_filter.has_value())
|
||||
// {
|
||||
// std::cerr<<"has filter\n"<<std::endl;
|
||||
// auto input = parquet_reader->header.cloneWithColumns(std::move(columns));
|
||||
// auto output = input.getColumns();
|
||||
// parquet_reader->remain_filter.value().execute(input);
|
||||
// const auto& filter = checkAndGetColumn<ColumnUInt8>(*input.getByPosition(0).column).getData();
|
||||
// size_t resize_hint = 0;
|
||||
// for (size_t i = 0; i < columns.size(); i++)
|
||||
// {
|
||||
// output[i] = output[i]->assumeMutable()->filter(filter, resize_hint);
|
||||
// resize_hint = output[i]->size();
|
||||
// }
|
||||
// return Chunk(std::move(output), resize_hint);
|
||||
// }
|
||||
metrics.output_rows += rows_read;
|
||||
return Chunk(std::move(columns), rows_read);
|
||||
if (rows_read)
|
||||
return Chunk(std::move(columns), rows_read);
|
||||
else
|
||||
return {};
|
||||
}
|
||||
|
||||
arrow::io::ReadRange getColumnRange(const parquet::ColumnChunkMetaData & column_metadata)
|
||||
@ -111,11 +102,12 @@ arrow::io::ReadRange getColumnRange(const parquet::ColumnChunkMetaData & column_
|
||||
}
|
||||
|
||||
|
||||
RowGroupPrefetch::RowGroupPrefetch(SeekableReadBuffer & file_, std::mutex & mutex, const parquet::ArrowReaderProperties& arrow_properties_) : file(file_), file_mutex(mutex), arrow_properties(arrow_properties_)
|
||||
RowGroupPrefetch::RowGroupPrefetch(SeekableReadBuffer & file_, std::mutex & mutex, const parquet::ArrowReaderProperties & arrow_properties_)
|
||||
: file(file_), file_mutex(mutex), arrow_properties(arrow_properties_)
|
||||
{
|
||||
callback_runner = threadPoolCallbackRunnerUnsafe<ColumnChunkData>(getIOThreadPool().get(), "ParquetRead");
|
||||
}
|
||||
void RowGroupPrefetch::prefetchRange(const arrow::io::ReadRange& range)
|
||||
void RowGroupPrefetch::prefetchRange(const arrow::io::ReadRange & range)
|
||||
{
|
||||
if (fetched)
|
||||
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "RowGroupPrefetch: prefetchColumnChunk called after startPrefetch");
|
||||
@ -123,16 +115,19 @@ void RowGroupPrefetch::prefetchRange(const arrow::io::ReadRange& range)
|
||||
}
|
||||
void RowGroupPrefetch::startPrefetch()
|
||||
{
|
||||
if (fetched) return;
|
||||
if (fetched)
|
||||
return;
|
||||
fetched = true;
|
||||
ranges = arrow::io::internal::CoalesceReadRanges(ranges, arrow_properties.cache_options().hole_size_limit, arrow_properties.cache_options().range_size_limit);
|
||||
ranges = arrow::io::internal::CoalesceReadRanges(
|
||||
ranges, arrow_properties.cache_options().hole_size_limit, arrow_properties.cache_options().range_size_limit);
|
||||
read_range_buffers.resize(ranges.size());
|
||||
for (size_t i=0; i < ranges.size(); i++)
|
||||
for (size_t i = 0; i < ranges.size(); i++)
|
||||
{
|
||||
auto& range = ranges[i];
|
||||
auto & range = ranges[i];
|
||||
read_range_buffers[i].range = range;
|
||||
auto task = [this, range, i]() -> ColumnChunkData {
|
||||
auto& buffer = read_range_buffers[i].buffer;
|
||||
auto task = [this, range, i]() -> ColumnChunkData
|
||||
{
|
||||
auto & buffer = read_range_buffers[i].buffer;
|
||||
|
||||
buffer.resize(range.length);
|
||||
int64_t count = 0;
|
||||
@ -164,32 +159,46 @@ ColumnChunkData RowGroupPrefetch::readRange(const arrow::io::ReadRange & range)
|
||||
|
||||
// wait fetch finished
|
||||
const auto it = std::lower_bound(
|
||||
tasks.begin(), tasks.end(), range,
|
||||
[](const TaskEntry& entry, const arrow::io::ReadRange& range_) {
|
||||
return entry.range.offset + entry.range.length < range_.offset + range_.length;
|
||||
});
|
||||
if (it != tasks.end() && it->range.Contains(range)) {
|
||||
tasks.begin(),
|
||||
tasks.end(),
|
||||
range,
|
||||
[](const TaskEntry & entry, const arrow::io::ReadRange & range_)
|
||||
{ return entry.range.offset + entry.range.length < range_.offset + range_.length; });
|
||||
if (it != tasks.end() && it->range.Contains(range))
|
||||
{
|
||||
it->task.wait();
|
||||
} else {
|
||||
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "Range was not requested for caching: offset={}, length={}", range.offset, range.length);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::ARGUMENT_OUT_OF_BOUND, "Range was not requested for caching: offset={}, length={}", range.offset, range.length);
|
||||
}
|
||||
|
||||
const auto buffer_it = std::lower_bound(
|
||||
read_range_buffers.begin(), read_range_buffers.end(), range,
|
||||
[](const ReadRangeBuffer& buffer, const arrow::io::ReadRange& range_) {
|
||||
return buffer.range.offset + buffer.range.length < range_.offset + range_.length;
|
||||
});
|
||||
if (buffer_it != read_range_buffers.end() && buffer_it->range.Contains(range)) {
|
||||
return {reinterpret_cast<uint8_t *>(buffer_it->buffer.data() + (range.offset - buffer_it->range.offset)), static_cast<size_t>(range.length)};
|
||||
} else {
|
||||
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "Range was not requested for caching: offset={}, length={}", range.offset, range.length);
|
||||
read_range_buffers.begin(),
|
||||
read_range_buffers.end(),
|
||||
range,
|
||||
[](const ReadRangeBuffer & buffer, const arrow::io::ReadRange & range_)
|
||||
{ return buffer.range.offset + buffer.range.length < range_.offset + range_.length; });
|
||||
if (buffer_it != read_range_buffers.end() && buffer_it->range.Contains(range))
|
||||
{
|
||||
return {
|
||||
reinterpret_cast<uint8_t *>(buffer_it->buffer.data() + (range.offset - buffer_it->range.offset)),
|
||||
static_cast<size_t>(range.length)};
|
||||
}
|
||||
else
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::ARGUMENT_OUT_OF_BOUND, "Range was not requested for caching: offset={}, length={}", range.offset, range.length);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
RowGroupChunkReader::RowGroupChunkReader(
|
||||
ParquetReader * parquetReader, size_t row_group_idx, RowGroupPrefetchPtr prefetch_, std::unordered_map<String, ColumnFilterPtr> filters)
|
||||
: parquet_reader(parquetReader), row_group_meta(parquetReader->meta_data->RowGroup(static_cast<int>(row_group_idx))), prefetch(std::move(prefetch_))
|
||||
: parquet_reader(parquetReader)
|
||||
, row_group_meta(parquetReader->meta_data->RowGroup(static_cast<int>(row_group_idx)))
|
||||
, prefetch(std::move(prefetch_))
|
||||
{
|
||||
column_readers.reserve(parquet_reader->header.columns());
|
||||
column_buffers.resize(parquet_reader->header.columns());
|
||||
@ -204,30 +213,10 @@ RowGroupChunkReader::RowGroupChunkReader(
|
||||
|
||||
auto idx = parquet_reader->meta_data->schema()->ColumnIndex(*node);
|
||||
auto filter = filters.contains(col_with_name.name) ? filters.at(col_with_name.name) : nullptr;
|
||||
// auto range = getColumnRange(*row_group_meta->ColumnChunk(idx));
|
||||
// size_t compress_size = range.second;
|
||||
// size_t offset = range.first;
|
||||
// column_buffers[reader_idx].resize(compress_size, 0);
|
||||
remain_rows = row_group_meta->ColumnChunk(idx)->num_values();
|
||||
// if (!parquet_reader->file.supportsReadAt())
|
||||
// {
|
||||
// std::lock_guard lock(parquet_reader->file_mutex);
|
||||
// parquet_reader->file.seek(offset, SEEK_SET);
|
||||
// size_t count = parquet_reader->file.readBig(reinterpret_cast<char *>(column_buffers[reader_idx].data()), compress_size);
|
||||
// if (count != compress_size)
|
||||
// throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Failed to read column data");
|
||||
// }
|
||||
// else
|
||||
// {
|
||||
// auto pb = [](size_t ) {return true;};
|
||||
// size_t count = parquet_reader->file.readBigAt(reinterpret_cast<char *>(column_buffers[reader_idx].data()), compress_size, offset, pb);
|
||||
// if (count != compress_size)
|
||||
// throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Failed to read column data");
|
||||
// }
|
||||
auto data = prefetch->readRange(getColumnRange(*row_group_meta->ColumnChunk(idx)));
|
||||
auto page_reader = std::make_unique<LazyPageReader>(
|
||||
std::make_shared<ReadBufferFromMemory>(
|
||||
reinterpret_cast<char *>(data.data), data.size),
|
||||
std::make_shared<ReadBufferFromMemory>(reinterpret_cast<char *>(data.data), data.size),
|
||||
parquet_reader->properties,
|
||||
remain_rows,
|
||||
row_group_meta->ColumnChunk(idx)->compression());
|
||||
@ -240,39 +229,39 @@ RowGroupChunkReader::RowGroupChunkReader(
|
||||
.targetType(col_with_name.type)
|
||||
.filter(filter)
|
||||
.build();
|
||||
// auto column_reader = SelectiveColumnReaderFactory::createLeafColumnReader(
|
||||
// *row_group_meta->ColumnChunk(idx), parquet_reader->meta_data->schema()->Column(idx), std::move(page_reader), filter);
|
||||
column_readers.push_back(column_reader);
|
||||
reader_columns_mapping[col_with_name.name] = column_reader;
|
||||
chassert(idx >= 0);
|
||||
if (filter)
|
||||
filter_columns.push_back(col_with_name.name);
|
||||
}
|
||||
selectConditions = std::make_unique<SelectConditions>(reader_columns_mapping, filter_columns, parquet_reader->expression_filters, parquet_reader->header);
|
||||
}
|
||||
|
||||
static std::shared_ptr<FilterDescription> mergeFilterDescriptions(std::unordered_map<size_t, std::shared_ptr<FilterDescription>>& /*intermediate_filter_descriptions*/)
|
||||
static IColumn::Filter mergeFilters(std::vector<IColumn::Filter> & filters)
|
||||
{
|
||||
// if (intermediate_filter_descriptions.empty()) return nullptr;
|
||||
// if (intermediate_filter_descriptions.size() == 1)
|
||||
// return intermediate_filter_descriptions.begin()->second;
|
||||
// auto first_column = intermediate_filter_descriptions.begin()->second->data_holder;
|
||||
// MutableColumnPtr new_filter_column = first_column->cloneEmpty();
|
||||
// auto size = first_column->size();
|
||||
// for (auto & [idx, filter_description] : intermediate_filter_descriptions)
|
||||
// {
|
||||
// for (size_t i = 0; i < size; ++i)
|
||||
// {
|
||||
//
|
||||
// }
|
||||
// }
|
||||
return nullptr;
|
||||
assert(!filters.empty());
|
||||
if (filters.size() == 1)
|
||||
return std::move(filters[0]);
|
||||
IColumn::Filter result;
|
||||
size_t size = filters.front().size();
|
||||
result.resize_fill(size, 1);
|
||||
for (size_t i = 0; i < filters.size(); i++)
|
||||
{
|
||||
auto & current = filters[i];
|
||||
for (size_t j = 0; j < size; j++)
|
||||
{
|
||||
if (!result[i])
|
||||
continue;
|
||||
if (!current[i])
|
||||
result[i] = 0;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
static void combineRowSetAndFilter(RowSet& set, std::shared_ptr<FilterDescription> filter_description)
|
||||
static void combineRowSetAndFilter(RowSet & set, const IColumn::Filter& filter_data)
|
||||
{
|
||||
const auto &filter_data = *filter_description->data;
|
||||
if (filter_data.size() != set.totalRows())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "bug, filter data size is not equal to row set size");
|
||||
int count = 0;
|
||||
for (size_t i = 0; i < set.totalRows(); ++i)
|
||||
{
|
||||
@ -286,19 +275,18 @@ static void combineRowSetAndFilter(RowSet& set, std::shared_ptr<FilterDescriptio
|
||||
|
||||
SelectResult SelectConditions::selectRows(size_t rows)
|
||||
{
|
||||
|
||||
OptionalRowSet total_set;
|
||||
if (has_filter)
|
||||
total_set = std::optional(RowSet(rows));
|
||||
else
|
||||
return SelectResult{std::nullopt, {}, nullptr};
|
||||
return SelectResult{std::nullopt, {}, {}, rows, false};
|
||||
|
||||
bool skip_all = false;
|
||||
|
||||
// apply fast filters
|
||||
for (auto & idx : fast_filter_column_idxs)
|
||||
for (const auto & name : fast_filter_columns)
|
||||
{
|
||||
readers[idx]->computeRowSet(total_set, rows);
|
||||
readers.at(name)->computeRowSet(total_set, rows);
|
||||
if (total_set.value().none())
|
||||
{
|
||||
skip_all = true;
|
||||
@ -307,26 +295,35 @@ SelectResult SelectConditions::selectRows(size_t rows)
|
||||
}
|
||||
|
||||
size_t count = 0;
|
||||
if (!skip_all)
|
||||
count = total_set.has_value() ? total_set.value().count() : rows;
|
||||
|
||||
// apply actions filter
|
||||
std::unordered_map<size_t, ColumnPtr> intermediate_columns;
|
||||
std::unordered_map<size_t, ColumnPtr> intermediate_filter_columns;
|
||||
std::unordered_map<size_t, std::shared_ptr<FilterDescription>> intermediate_filter_descriptions;
|
||||
for (auto & [idx, filter] : actions_filters)
|
||||
std::unordered_map<String, ColumnPtr> intermediate_columns;
|
||||
std::vector<IColumn::Filter> intermediate_filters;
|
||||
for (const auto & expr_filter : expression_filters)
|
||||
{
|
||||
if (!count) break;
|
||||
auto reader = readers[idx];
|
||||
auto column = reader->createColumn();
|
||||
column->reserve(count);
|
||||
reader->read(column, total_set, rows);
|
||||
intermediate_columns[idx] = std::move(column);
|
||||
auto filter_column = filter->testByExpression(intermediate_columns[idx]);
|
||||
filter_column = filter_column->convertToFullColumnIfSparse();
|
||||
intermediate_filter_columns[idx] = filter_column;
|
||||
intermediate_filter_descriptions[idx] = std::make_shared<FilterDescription>(*filter_column);
|
||||
size_t filter_count = intermediate_filter_descriptions[idx]->countBytesInFilter();
|
||||
if (skip_all)
|
||||
break;
|
||||
count = total_set.has_value() ? total_set.value().count() : rows;
|
||||
// prepare condition columns
|
||||
ColumnsWithTypeAndName input;
|
||||
for (const auto &name : expr_filter->getInputs())
|
||||
{
|
||||
if (!intermediate_columns.contains(name))
|
||||
{
|
||||
auto reader = readers.at(name);
|
||||
auto column = reader->createColumn();
|
||||
if (count == rows)
|
||||
reader->read(column, std::nullopt, rows);
|
||||
else
|
||||
reader->read(column, total_set, rows);
|
||||
intermediate_columns.emplace(name, std::move(column));
|
||||
}
|
||||
input.emplace_back(intermediate_columns.at(name), header.getByName(name).type, name);
|
||||
}
|
||||
|
||||
auto filter = expr_filter->execute(input);
|
||||
size_t filter_count = countBytesInFilter(filter);
|
||||
intermediate_filters.emplace_back(std::move(filter));
|
||||
if (!filter_count)
|
||||
{
|
||||
skip_all = true;
|
||||
@ -335,21 +332,25 @@ SelectResult SelectConditions::selectRows(size_t rows)
|
||||
}
|
||||
|
||||
if (skip_all)
|
||||
return SelectResult{std::nullopt, std::move(intermediate_columns), nullptr, true};
|
||||
return SelectResult{std::nullopt, std::move(intermediate_columns), {}, 0, true};
|
||||
else
|
||||
{
|
||||
auto filter_description = mergeFilterDescriptions(intermediate_filter_descriptions);
|
||||
if (filter_description)
|
||||
combineRowSetAndFilter(total_set.value(), filter_description);
|
||||
return SelectResult{std::move(total_set), std::move(intermediate_columns), filter_description};
|
||||
if (!intermediate_filters.empty())
|
||||
{
|
||||
auto filter = mergeFilters(intermediate_filters);
|
||||
combineRowSetAndFilter(total_set.value(), filter);
|
||||
return SelectResult{std::move(total_set), std::move(intermediate_columns), std::move(filter), total_set.value().count(), false};
|
||||
}
|
||||
return SelectResult{std::move(total_set), {}, {}, total_set.value().count(), false};
|
||||
}
|
||||
}
|
||||
SelectConditions::SelectConditions(
|
||||
std::unordered_map<size_t, SelectiveColumnReaderPtr> & readers_,
|
||||
std::vector<size_t> & fastFilterColumnIdxs,
|
||||
std::unordered_map<size_t, ColumnFilterPtr> & actionsFilters)
|
||||
: readers(readers_), fast_filter_column_idxs(fastFilterColumnIdxs), actions_filters(actionsFilters)
|
||||
{
|
||||
has_filter = fastFilterColumnIdxs.empty() && actionsFilters.empty();
|
||||
std::unordered_map<String, SelectiveColumnReaderPtr> & readers_,
|
||||
std::vector<String> & fast_filter_columns_,
|
||||
std::vector<std::shared_ptr<ExpressionFilter>> & expression_filters_,
|
||||
const Block & header_)
|
||||
: readers(readers_), fast_filter_columns(fast_filter_columns_), expression_filters(expression_filters_), header(header_)
|
||||
{ has_filter = !fast_filter_columns.empty() || !expression_filters.empty();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -10,8 +10,9 @@ struct FilterDescription;
|
||||
struct SelectResult
|
||||
{
|
||||
std::optional<RowSet> set;
|
||||
std::unordered_map<size_t, ColumnPtr> intermediate_columns;
|
||||
std::shared_ptr<FilterDescription> intermediate_filter_description;
|
||||
std::unordered_map<String, ColumnPtr> intermediate_columns;
|
||||
IColumn::Filter intermediate_filter;
|
||||
size_t valid_count = 0;
|
||||
bool skip_all = false;
|
||||
};
|
||||
|
||||
@ -19,15 +20,17 @@ class SelectConditions
|
||||
{
|
||||
public:
|
||||
SelectConditions(
|
||||
std::unordered_map<size_t, SelectiveColumnReaderPtr> & readers,
|
||||
std::vector<size_t> & fastFilterColumnIdxs,
|
||||
std::unordered_map<size_t, ColumnFilterPtr> & actionsFilters);
|
||||
std::unordered_map<String, SelectiveColumnReaderPtr> & readers_,
|
||||
std::vector<String> & fast_filter_columns_,
|
||||
std::vector<std::shared_ptr<ExpressionFilter>>& expression_filters,
|
||||
const Block & header_);
|
||||
SelectResult selectRows(size_t rows);
|
||||
private:
|
||||
bool has_filter = false;
|
||||
std::unordered_map<size_t, SelectiveColumnReaderPtr> & readers;
|
||||
std::vector<size_t> & fast_filter_column_idxs;
|
||||
std::unordered_map<size_t, ColumnFilterPtr>& actions_filters;
|
||||
const std::unordered_map<String, SelectiveColumnReaderPtr> & readers;
|
||||
const std::vector<String> & fast_filter_columns;
|
||||
const std::vector<std::shared_ptr<ExpressionFilter>>& expression_filters;
|
||||
const Block & header;
|
||||
};
|
||||
|
||||
|
||||
@ -106,5 +109,6 @@ private:
|
||||
std::vector<PaddedPODArray<UInt8>> column_buffers;
|
||||
size_t remain_rows = 0;
|
||||
ReadMetrics metrics;
|
||||
std::unique_ptr<SelectConditions> selectConditions;
|
||||
};
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user