Merge pull request #37165 from ClickHouse/merge_tree_reader

Merge tree reader support for multiple read/filter steps: row level filter, prewhere, ...
This commit is contained in:
Alexander Gololobov 2022-06-24 21:32:31 +02:00 committed by GitHub
commit 189d0fffb8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 366 additions and 225 deletions

View File

@ -1176,7 +1176,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
query_plan.getCurrentDataStream(),
expressions.prewhere_info->row_level_filter,
expressions.prewhere_info->row_level_column_name,
false);
true);
row_level_filter_step->setStepDescription("Row-level security filter (PREWHERE)");
query_plan.addStep(std::move(row_level_filter_step));

View File

@ -297,6 +297,11 @@ std::string PrewhereInfo::dump() const
WriteBufferFromOwnString ss;
ss << "PrewhereDagInfo\n";
if (row_level_filter)
{
ss << "row_level_filter " << row_level_filter->dumpDAG() << "\n";
}
if (prewhere_actions)
{
ss << "prewhere_actions " << prewhere_actions->dumpDAG() << "\n";

View File

@ -89,8 +89,6 @@ protected:
using ColumnPosition = std::optional<size_t>;
ColumnPosition findColumnForOffsets(const String & column_name) const;
friend class MergeTreeRangeReader::DelayedStream;
private:
/// Alter conversions, which must be applied on fly if required
MergeTreeData::AlterConversions alter_conversions;

View File

@ -74,14 +74,27 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor(
prewhere_actions = std::make_unique<PrewhereExprInfo>();
if (prewhere_info->row_level_filter)
prewhere_actions->row_level_filter = std::make_shared<ExpressionActions>(prewhere_info->row_level_filter, actions_settings);
{
PrewhereExprStep row_level_filter_step
{
.actions = std::make_shared<ExpressionActions>(prewhere_info->row_level_filter, actions_settings),
.column_name = prewhere_info->row_level_column_name,
.remove_column = true,
.need_filter = true
};
prewhere_actions->prewhere_actions = std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions, actions_settings);
prewhere_actions->steps.emplace_back(std::move(row_level_filter_step));
}
prewhere_actions->row_level_column_name = prewhere_info->row_level_column_name;
prewhere_actions->prewhere_column_name = prewhere_info->prewhere_column_name;
prewhere_actions->remove_prewhere_column = prewhere_info->remove_prewhere_column;
prewhere_actions->need_filter = prewhere_info->need_filter;
PrewhereExprStep prewhere_step
{
.actions = std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions, actions_settings),
.column_name = prewhere_info->prewhere_column_name,
.remove_column = prewhere_info->remove_prewhere_column,
.need_filter = prewhere_info->need_filter
};
prewhere_actions->steps.emplace_back(std::move(prewhere_step));
}
}
@ -204,30 +217,78 @@ Chunk MergeTreeBaseSelectProcessor::generate()
void MergeTreeBaseSelectProcessor::initializeRangeReaders(MergeTreeReadTask & current_task)
{
MergeTreeRangeReader* prev_reader = nullptr;
bool last_reader = false;
if (prewhere_info)
{
if (reader->getColumns().empty())
{
current_task.range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_actions.get(), true, non_const_virtual_column_names);
}
else
{
MergeTreeRangeReader * pre_reader_ptr = nullptr;
if (pre_reader != nullptr)
{
current_task.pre_range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_actions.get(), false, non_const_virtual_column_names);
pre_reader_ptr = &current_task.pre_range_reader;
}
if (prewhere_actions->steps.size() != pre_reader_for_step.size())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"PREWHERE steps count mismatch, actions: {}, readers: {}",
prewhere_actions->steps.size(), pre_reader_for_step.size());
current_task.range_reader = MergeTreeRangeReader(reader.get(), pre_reader_ptr, nullptr, true, non_const_virtual_column_names);
for (size_t i = 0; i < prewhere_actions->steps.size(); ++i)
{
last_reader = reader->getColumns().empty() && (i + 1 == prewhere_actions->steps.size());
current_task.pre_range_readers.push_back(
MergeTreeRangeReader(pre_reader_for_step[i].get(), prev_reader, &prewhere_actions->steps[i], last_reader, non_const_virtual_column_names));
prev_reader = &current_task.pre_range_readers.back();
}
}
if (!last_reader)
{
current_task.range_reader = MergeTreeRangeReader(reader.get(), prev_reader, nullptr, true, non_const_virtual_column_names);
}
else
{
current_task.range_reader = MergeTreeRangeReader(reader.get(), nullptr, nullptr, true, non_const_virtual_column_names);
/// If all columns are read by pre_range_readers than move last pre_range_reader into range_reader
current_task.range_reader = std::move(current_task.pre_range_readers.back());
current_task.pre_range_readers.pop_back();
}
}
static UInt64 estimateNumRows(const MergeTreeReadTask & current_task, UInt64 current_preferred_block_size_bytes,
UInt64 current_max_block_size_rows, UInt64 current_preferred_max_column_in_block_size_bytes, double min_filtration_ratio)
{
const MergeTreeRangeReader & current_reader = current_task.range_reader;
if (!current_task.size_predictor)
return static_cast<size_t>(current_max_block_size_rows);
/// Calculates number of rows will be read using preferred_block_size_bytes.
/// Can't be less than avg_index_granularity.
size_t rows_to_read = current_task.size_predictor->estimateNumRows(current_preferred_block_size_bytes);
if (!rows_to_read)
return rows_to_read;
auto total_row_in_current_granule = current_reader.numRowsInCurrentGranule();
rows_to_read = std::max(total_row_in_current_granule, rows_to_read);
if (current_preferred_max_column_in_block_size_bytes)
{
/// Calculates number of rows will be read using preferred_max_column_in_block_size_bytes.
auto rows_to_read_for_max_size_column
= current_task.size_predictor->estimateNumRowsForMaxSizeColumn(current_preferred_max_column_in_block_size_bytes);
double filtration_ratio = std::max(min_filtration_ratio, 1.0 - current_task.size_predictor->filtered_rows_ratio);
auto rows_to_read_for_max_size_column_with_filtration
= static_cast<size_t>(rows_to_read_for_max_size_column / filtration_ratio);
/// If preferred_max_column_in_block_size_bytes is used, number of rows to read can be less than current_index_granularity.
rows_to_read = std::min(rows_to_read, rows_to_read_for_max_size_column_with_filtration);
}
auto unread_rows_in_current_granule = current_reader.numPendingRowsInCurrentGranule();
if (unread_rows_in_current_granule >= rows_to_read)
return rows_to_read;
const MergeTreeIndexGranularity & index_granularity = current_task.data_part->index_granularity;
return index_granularity.countMarksForRows(current_reader.currentMark(), rows_to_read, current_reader.numReadRowsInCurrentGranule());
}
Chunk MergeTreeBaseSelectProcessor::readFromPartImpl()
{
@ -237,45 +298,10 @@ Chunk MergeTreeBaseSelectProcessor::readFromPartImpl()
const UInt64 current_max_block_size_rows = max_block_size_rows;
const UInt64 current_preferred_block_size_bytes = preferred_block_size_bytes;
const UInt64 current_preferred_max_column_in_block_size_bytes = preferred_max_column_in_block_size_bytes;
const MergeTreeIndexGranularity & index_granularity = task->data_part->index_granularity;
const double min_filtration_ratio = 0.00001;
auto estimate_num_rows = [current_preferred_block_size_bytes, current_max_block_size_rows,
&index_granularity, current_preferred_max_column_in_block_size_bytes, min_filtration_ratio](
MergeTreeReadTask & current_task, MergeTreeRangeReader & current_reader)
{
if (!current_task.size_predictor)
return static_cast<size_t>(current_max_block_size_rows);
/// Calculates number of rows will be read using preferred_block_size_bytes.
/// Can't be less than avg_index_granularity.
size_t rows_to_read = current_task.size_predictor->estimateNumRows(current_preferred_block_size_bytes);
if (!rows_to_read)
return rows_to_read;
auto total_row_in_current_granule = current_reader.numRowsInCurrentGranule();
rows_to_read = std::max(total_row_in_current_granule, rows_to_read);
if (current_preferred_max_column_in_block_size_bytes)
{
/// Calculates number of rows will be read using preferred_max_column_in_block_size_bytes.
auto rows_to_read_for_max_size_column
= current_task.size_predictor->estimateNumRowsForMaxSizeColumn(current_preferred_max_column_in_block_size_bytes);
double filtration_ratio = std::max(min_filtration_ratio, 1.0 - current_task.size_predictor->filtered_rows_ratio);
auto rows_to_read_for_max_size_column_with_filtration
= static_cast<size_t>(rows_to_read_for_max_size_column / filtration_ratio);
/// If preferred_max_column_in_block_size_bytes is used, number of rows to read can be less than current_index_granularity.
rows_to_read = std::min(rows_to_read, rows_to_read_for_max_size_column_with_filtration);
}
auto unread_rows_in_current_granule = current_reader.numPendingRowsInCurrentGranule();
if (unread_rows_in_current_granule >= rows_to_read)
return rows_to_read;
return index_granularity.countMarksForRows(current_reader.currentMark(), rows_to_read, current_reader.numReadRowsInCurrentGranule());
};
UInt64 recommended_rows = estimate_num_rows(*task, task->range_reader);
UInt64 recommended_rows = estimateNumRows(*task, current_preferred_block_size_bytes,
current_max_block_size_rows, current_preferred_max_column_in_block_size_bytes, min_filtration_ratio);
UInt64 rows_to_read = std::max(static_cast<UInt64>(1), std::min(current_max_block_size_rows, recommended_rows));
auto read_result = task->range_reader.read(rows_to_read, task->mark_ranges);
@ -602,9 +628,12 @@ std::unique_ptr<MergeTreeBlockSizePredictor> MergeTreeBaseSelectProcessor::getSi
const Block & sample_block)
{
const auto & required_column_names = task_columns.columns.getNames();
const auto & required_pre_column_names = task_columns.pre_columns.getNames();
NameSet complete_column_names(required_column_names.begin(), required_column_names.end());
complete_column_names.insert(required_pre_column_names.begin(), required_pre_column_names.end());
for (const auto & pre_columns_per_step : task_columns.pre_columns)
{
const auto & required_pre_column_names = pre_columns_per_step.getNames();
complete_column_names.insert(required_pre_column_names.begin(), required_pre_column_names.end());
}
return std::make_unique<MergeTreeBlockSizePredictor>(
data_part, Names(complete_column_names.begin(), complete_column_names.end()), sample_block);

View File

@ -115,7 +115,7 @@ protected:
using MergeTreeReaderPtr = std::unique_ptr<IMergeTreeReader>;
MergeTreeReaderPtr reader;
MergeTreeReaderPtr pre_reader;
std::vector<MergeTreeReaderPtr> pre_reader_for_step;
MergeTreeReadTaskPtr task;

View File

@ -5,6 +5,9 @@
#include <Common/checkStackSize.h>
#include <Common/typeid_cast.h>
#include <Columns/ColumnConst.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <unordered_set>
@ -131,12 +134,12 @@ NameSet injectRequiredColumns(
MergeTreeReadTask::MergeTreeReadTask(
const MergeTreeData::DataPartPtr & data_part_, const MarkRanges & mark_ranges_, size_t part_index_in_query_,
const Names & ordered_names_, const NameSet & column_name_set_, const NamesAndTypesList & columns_,
const NamesAndTypesList & pre_columns_, bool remove_prewhere_column_, bool should_reorder_,
const Names & ordered_names_, const NameSet & column_name_set_, const MergeTreeReadTaskColumns & task_columns_,
bool remove_prewhere_column_,
MergeTreeBlockSizePredictorPtr && size_predictor_)
: data_part{data_part_}, mark_ranges{mark_ranges_}, part_index_in_query{part_index_in_query_},
ordered_names{ordered_names_}, column_name_set{column_name_set_}, columns{columns_}, pre_columns{pre_columns_},
remove_prewhere_column{remove_prewhere_column_}, should_reorder{should_reorder_}, size_predictor{std::move(size_predictor_)}
ordered_names{ordered_names_}, column_name_set{column_name_set_}, task_columns{task_columns_},
remove_prewhere_column{remove_prewhere_column_}, size_predictor{std::move(size_predictor_)}
{
}
@ -276,34 +279,40 @@ MergeTreeReadTaskColumns getReadTaskColumns(
Names pre_column_names;
/// inject columns required for defaults evaluation
bool should_reorder = !injectRequiredColumns(
storage, storage_snapshot, data_part, with_subcolumns, column_names).empty();
injectRequiredColumns(
storage, storage_snapshot, data_part, with_subcolumns, column_names);
MergeTreeReadTaskColumns result;
auto options = GetColumnsOptions(GetColumnsOptions::All).withExtendedObjects();
if (with_subcolumns)
options.withSubcolumns();
if (prewhere_info)
{
pre_column_names = prewhere_info->prewhere_actions->getRequiredColumnsNames();
NameSet pre_name_set;
/// Add column reading steps:
/// 1. Columns for row level filter
if (prewhere_info->row_level_filter)
{
NameSet names(pre_column_names.begin(), pre_column_names.end());
for (auto & name : prewhere_info->row_level_filter->getRequiredColumnsNames())
{
if (!names.contains(name))
pre_column_names.push_back(name);
}
Names row_filter_column_names = prewhere_info->row_level_filter->getRequiredColumnsNames();
result.pre_columns.push_back(storage_snapshot->getColumnsByNames(options, row_filter_column_names));
pre_name_set.insert(row_filter_column_names.begin(), row_filter_column_names.end());
}
if (pre_column_names.empty())
pre_column_names.push_back(column_names[0]);
/// 2. Columns for prewhere
Names all_pre_column_names = prewhere_info->prewhere_actions->getRequiredColumnsNames();
const auto injected_pre_columns = injectRequiredColumns(
storage, storage_snapshot, data_part, with_subcolumns, pre_column_names);
storage, storage_snapshot, data_part, with_subcolumns, all_pre_column_names);
if (!injected_pre_columns.empty())
should_reorder = true;
const NameSet pre_name_set(pre_column_names.begin(), pre_column_names.end());
for (const auto & name : all_pre_column_names)
{
if (pre_name_set.contains(name))
continue;
pre_column_names.push_back(name);
pre_name_set.insert(name);
}
Names post_column_names;
for (const auto & name : column_names)
@ -313,17 +322,23 @@ MergeTreeReadTaskColumns getReadTaskColumns(
column_names = post_column_names;
}
MergeTreeReadTaskColumns result;
NamesAndTypesList all_columns;
result.pre_columns.push_back(storage_snapshot->getColumnsByNames(options, pre_column_names));
auto options = GetColumnsOptions(GetColumnsOptions::All).withExtendedObjects();
if (with_subcolumns)
options.withSubcolumns();
result.pre_columns = storage_snapshot->getColumnsByNames(options, pre_column_names);
/// 3. Rest of the requested columns
result.columns = storage_snapshot->getColumnsByNames(options, column_names);
result.should_reorder = should_reorder;
return result;
}
std::string MergeTreeReadTaskColumns::dump() const
{
WriteBufferFromOwnString s;
for (size_t i = 0; i < pre_columns.size(); ++i)
{
s << "STEP " << i << ": " << pre_columns[i].toString() << "\n";
}
s << "COLUMNS: " << columns.toString() << "\n";
return s.str();
}
}

View File

@ -30,6 +30,16 @@ NameSet injectRequiredColumns(
Names & columns);
struct MergeTreeReadTaskColumns
{
/// column names to read during WHERE
NamesAndTypesList columns;
/// column names to read during each PREWHERE step
std::vector<NamesAndTypesList> pre_columns;
std::string dump() const;
};
/// A batch of work for MergeTreeThreadSelectProcessor
struct MergeTreeReadTask
{
@ -43,39 +53,27 @@ struct MergeTreeReadTask
const Names & ordered_names;
/// used to determine whether column should be filtered during PREWHERE or WHERE
const NameSet & column_name_set;
/// column names to read during WHERE
const NamesAndTypesList & columns;
/// column names to read during PREWHERE
const NamesAndTypesList & pre_columns;
/// column names to read during PREWHERE and WHERE
const MergeTreeReadTaskColumns & task_columns;
/// should PREWHERE column be returned to requesting side?
const bool remove_prewhere_column;
/// resulting block may require reordering in accordance with `ordered_names`
const bool should_reorder;
/// Used to satistfy preferred_block_size_bytes limitation
MergeTreeBlockSizePredictorPtr size_predictor;
/// Used to save current range processing status
MergeTreeRangeReader range_reader;
MergeTreeRangeReader pre_range_reader;
/// Range readers for multiple filtering steps: row level security, PREWHERE etc.
/// NOTE: we take references to elements and push_back new elements, that's why it is a deque but noit a vector
std::deque<MergeTreeRangeReader> pre_range_readers;
bool isFinished() const { return mark_ranges.empty() && range_reader.isCurrentRangeFinished(); }
MergeTreeReadTask(
const MergeTreeData::DataPartPtr & data_part_, const MarkRanges & mark_ranges_, size_t part_index_in_query_,
const Names & ordered_names_, const NameSet & column_name_set_, const NamesAndTypesList & columns_,
const NamesAndTypesList & pre_columns_, bool remove_prewhere_column_, bool should_reorder_,
const Names & ordered_names_, const NameSet & column_name_set_, const MergeTreeReadTaskColumns & task_columns_,
bool remove_prewhere_column_,
MergeTreeBlockSizePredictorPtr && size_predictor_);
};
struct MergeTreeReadTaskColumns
{
/// column names to read during WHERE
NamesAndTypesList columns;
/// column names to read during PREWHERE
NamesAndTypesList pre_columns;
/// resulting block may require reordering in accordance with `ordered_names`
bool should_reorder = false;
};
MergeTreeReadTaskColumns getReadTaskColumns(
const MergeTreeData & storage,
const StorageSnapshotPtr & storage_snapshot,

View File

@ -34,9 +34,9 @@ try
: getSizePredictor(data_part, task_columns, sample_block);
task = std::make_unique<MergeTreeReadTask>(
data_part, mark_ranges_for_task, part_index_in_query, ordered_names, column_name_set, task_columns.columns,
task_columns.pre_columns, prewhere_info && prewhere_info->remove_prewhere_column,
task_columns.should_reorder, std::move(size_predictor));
data_part, mark_ranges_for_task, part_index_in_query, ordered_names, column_name_set, task_columns,
prewhere_info && prewhere_info->remove_prewhere_column,
std::move(size_predictor));
return true;
}

View File

@ -3,6 +3,8 @@
#include <Columns/ColumnConst.h>
#include <Columns/ColumnsCommon.h>
#include <Common/TargetSpecific.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <base/range.h>
#include <Interpreters/castColumn.h>
#include <DataTypes/DataTypeNothing.h>
@ -64,7 +66,7 @@ static void filterColumns(Columns & columns, const ColumnPtr & filter)
}
static size_t getLastMark(const MergeTreeRangeReader::ReadResult::RangesInfo & ranges)
size_t MergeTreeRangeReader::ReadResult::getLastMark(const MergeTreeRangeReader::ReadResult::RangesInfo & ranges)
{
size_t current_task_last_mark = 0;
for (const auto & mark_range : ranges)
@ -594,6 +596,7 @@ size_t MergeTreeRangeReader::ReadResult::numZerosInTail(const UInt8 * begin, con
return count;
}
/// Filter size must match total_rows_per_granule
void MergeTreeRangeReader::ReadResult::setFilter(const ColumnPtr & new_filter)
{
if (!new_filter && filter)
@ -644,7 +647,7 @@ size_t MergeTreeRangeReader::ReadResult::countBytesInResultFilter(const IColumn:
MergeTreeRangeReader::MergeTreeRangeReader(
IMergeTreeReader * merge_tree_reader_,
MergeTreeRangeReader * prev_reader_,
const PrewhereExprInfo * prewhere_info_,
const PrewhereExprStep * prewhere_info_,
bool last_reader_in_chain_,
const Names & non_const_virtual_column_names_)
: merge_tree_reader(merge_tree_reader_)
@ -672,17 +675,12 @@ MergeTreeRangeReader::MergeTreeRangeReader(
if (prewhere_info)
{
if (prewhere_info->row_level_filter)
{
prewhere_info->row_level_filter->execute(sample_block, true);
sample_block.erase(prewhere_info->row_level_column_name);
}
const auto & step = *prewhere_info;
if (step.actions)
step.actions->execute(sample_block, true);
if (prewhere_info->prewhere_actions)
prewhere_info->prewhere_actions->execute(sample_block, true);
if (prewhere_info->remove_prewhere_column)
sample_block.erase(prewhere_info->prewhere_column_name);
if (step.remove_column)
sample_block.erase(step.column_name);
}
}
@ -983,11 +981,15 @@ void MergeTreeRangeReader::fillPartOffsetColumn(ReadResult & result, UInt64 lead
result.columns.emplace_back(std::move(column));
}
Columns MergeTreeRangeReader::continueReadingChain(ReadResult & result, size_t & num_rows)
Columns MergeTreeRangeReader::continueReadingChain(const ReadResult & result, size_t & num_rows)
{
Columns columns;
num_rows = 0;
/// No columns need to be read at this step? (only more filtering)
if (merge_tree_reader->getColumns().empty())
return columns;
if (result.rowsPerGranule().empty())
{
/// If zero rows were read on prev step, than there is no more rows to read.
@ -1001,7 +1003,7 @@ Columns MergeTreeRangeReader::continueReadingChain(ReadResult & result, size_t &
const auto & rows_per_granule = result.rowsPerGranule();
const auto & started_ranges = result.startedRanges();
size_t current_task_last_mark = getLastMark(started_ranges);
size_t current_task_last_mark = ReadResult::getLastMark(started_ranges);
size_t next_range_to_start = 0;
auto size = rows_per_granule.size();
@ -1039,6 +1041,8 @@ static void checkCombinedFiltersSize(size_t bytes_in_first_filter, size_t second
"does not match second filter size ({})", bytes_in_first_filter, second_filter_size);
}
/// Second filter size must be equal to number of 1s in the first filter.
/// The result size is equal to first filter size.
static ColumnPtr combineFilters(ColumnPtr first, ColumnPtr second)
{
ConstantFilterDescription first_const_descr(*first);
@ -1099,13 +1103,17 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
const auto & header = merge_tree_reader->getColumns();
size_t num_columns = header.size();
if (result.columns.size() != (num_columns + non_const_virtual_column_names.size()))
throw Exception("Invalid number of columns passed to MergeTreeRangeReader. "
"Expected " + toString(num_columns) + ", "
"got " + toString(result.columns.size()), ErrorCodes::LOGICAL_ERROR);
/// Check that we have columns from previous steps and newly read required columns
if (result.columns.size() < num_columns + non_const_virtual_column_names.size())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Invalid number of columns passed to MergeTreeRangeReader. Expected {}, got {}",
num_columns, result.columns.size());
ColumnPtr filter;
ColumnPtr row_level_filter;
/// This filter has the size of total_rows_per granule. It is applied after reading contiguous chunks from
/// the start of each granule.
ColumnPtr combined_filter;
/// Filter computed at the current step. Its size is equal to num_rows which is <= total_rows_per_granule
ColumnPtr current_step_filter;
size_t prewhere_column_pos;
{
@ -1122,13 +1130,23 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
}
}
for (auto name_and_type = header.begin(); pos < num_columns; ++pos, ++name_and_type)
for (auto name_and_type = header.begin(); name_and_type != header.end() && pos < result.columns.size(); ++pos, ++name_and_type)
block.insert({result.columns[pos], name_and_type->type, name_and_type->name});
for (const auto & column_name : non_const_virtual_column_names)
{
if (block.has(column_name))
continue;
if (column_name == "_part_offset")
{
if (pos >= result.columns.size())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Invalid number of columns passed to MergeTreeRangeReader. Expected {}, got {}",
num_columns, result.columns.size());
block.insert({result.columns[pos], std::make_shared<DataTypeUInt64>(), column_name});
}
else
throw Exception("Unexpected non-const virtual column: " + column_name, ErrorCodes::LOGICAL_ERROR);
++pos;
@ -1137,58 +1155,37 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
/// Columns might be projected out. We need to store them here so that default columns can be evaluated later.
result.block_before_prewhere = block;
if (prewhere_info->row_level_filter)
{
prewhere_info->row_level_filter->execute(block);
auto row_level_filter_pos = block.getPositionByName(prewhere_info->row_level_column_name);
row_level_filter = block.getByPosition(row_level_filter_pos).column;
block.erase(row_level_filter_pos);
if (prewhere_info->actions)
prewhere_info->actions->execute(block);
auto columns = block.getColumns();
filterColumns(columns, row_level_filter);
if (columns.empty())
block = block.cloneEmpty();
else
block.setColumns(columns);
}
prewhere_info->prewhere_actions->execute(block);
prewhere_column_pos = block.getPositionByName(prewhere_info->prewhere_column_name);
prewhere_column_pos = block.getPositionByName(prewhere_info->column_name);
result.columns.clear();
result.columns.reserve(block.columns());
for (auto & col : block)
result.columns.emplace_back(std::move(col.column));
filter.swap(result.columns[prewhere_column_pos]);
current_step_filter.swap(result.columns[prewhere_column_pos]);
combined_filter = current_step_filter;
}
if (result.getFilter())
{
/// TODO: implement for prewhere chain.
/// In order to do it we need combine filter and result.filter, where filter filters only '1' in result.filter.
throw Exception("MergeTreeRangeReader chain with several prewhere actions in not implemented.",
ErrorCodes::LOGICAL_ERROR);
ColumnPtr prev_filter = result.getFilterHolder();
combined_filter = combineFilters(prev_filter, std::move(combined_filter));
}
if (filter && row_level_filter)
{
row_level_filter = combineFilters(std::move(row_level_filter), filter);
result.setFilter(row_level_filter);
}
else
result.setFilter(filter);
result.setFilter(combined_filter);
/// If there is a WHERE, we filter in there, and only optimize IO and shrink columns here
if (!last_reader_in_chain)
result.optimize(merge_tree_reader->canReadIncompleteGranules(), prewhere_info->row_level_filter == nullptr);
result.optimize(merge_tree_reader->canReadIncompleteGranules(), true);
/// If we read nothing or filter gets optimized to nothing
if (result.totalRowsPerGranule() == 0)
result.setFilterConstFalse();
/// If we need to filter in PREWHERE
else if (prewhere_info->need_filter || result.need_filter || prewhere_info->row_level_filter)
else if (prewhere_info->need_filter || result.need_filter)
{
/// If there is a filter and without optimized
if (result.getFilter() && last_reader_in_chain)
@ -1208,10 +1205,7 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
/// filter might be shrunk while columns not
const auto * result_filter = result.getFilterOriginal();
if (row_level_filter)
filterColumns(result.columns, filter);
else
filterColumns(result.columns, result_filter->getData());
filterColumns(result.columns, current_step_filter);
result.need_filter = true;
@ -1234,22 +1228,22 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
/// Check if the PREWHERE column is needed
if (!result.columns.empty())
{
if (prewhere_info->remove_prewhere_column)
if (prewhere_info->remove_column)
result.columns.erase(result.columns.begin() + prewhere_column_pos);
else
result.columns[prewhere_column_pos] =
getSampleBlock().getByName(prewhere_info->prewhere_column_name).type->
getSampleBlock().getByName(prewhere_info->column_name).type->
createColumnConst(result.num_rows, 1u)->convertToFullColumnIfConst();
}
}
/// Filter in WHERE instead
else
{
if (prewhere_info->remove_prewhere_column)
if (prewhere_info->remove_column)
result.columns.erase(result.columns.begin() + prewhere_column_pos);
else
{
auto type = getSampleBlock().getByName(prewhere_info->prewhere_column_name).type;
auto type = getSampleBlock().getByName(prewhere_info->column_name).type;
ColumnWithTypeAndName col(result.getFilterHolder()->convertToFullColumnIfConst(), std::make_shared<DataTypeUInt8>(), "");
result.columns[prewhere_column_pos] = castColumn(col, type);
result.clearFilter(); // Acting as a flag to not filter in PREWHERE
@ -1257,4 +1251,20 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
}
}
std::string PrewhereExprInfo::dump() const
{
WriteBufferFromOwnString s;
for (size_t i = 0; i < steps.size(); ++i)
{
s << "STEP " << i << ":\n"
<< " ACTIONS: " << (steps[i].actions ? steps[i].actions->dumpActions() : "nullptr") << "\n"
<< " COLUMN: " << steps[i].column_name << "\n"
<< " REMOVE_COLUMN: " << steps[i].remove_column << "\n"
<< " NEED_FILTER: " << steps[i].need_filter << "\n";
}
return s.str();
}
}

View File

@ -18,18 +18,20 @@ using PrewhereInfoPtr = std::shared_ptr<PrewhereInfo>;
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
struct PrewhereExprStep
{
ExpressionActionsPtr actions;
String column_name;
bool remove_column = false;
bool need_filter = false;
};
/// The same as PrewhereInfo, but with ExpressionActions instead of ActionsDAG
struct PrewhereExprInfo
{
/// Actions for row level security filter. Applied separately before prewhere_actions.
/// This actions are separate because prewhere condition should not be executed over filtered rows.
ExpressionActionsPtr row_level_filter;
/// Actions which are executed on block in order to get filter column for prewhere step.
ExpressionActionsPtr prewhere_actions;
String row_level_column_name;
String prewhere_column_name;
bool remove_prewhere_column = false;
bool need_filter = false;
std::vector<PrewhereExprStep> steps;
std::string dump() const;
};
/// MergeTreeReader iterator which allows sequential reading for arbitrary number of rows between pairs of marks in the same part.
@ -41,7 +43,7 @@ public:
MergeTreeRangeReader(
IMergeTreeReader * merge_tree_reader_,
MergeTreeRangeReader * prev_reader_,
const PrewhereExprInfo * prewhere_info_,
const PrewhereExprStep * prewhere_info_,
bool last_reader_in_chain_,
const Names & non_const_virtual_column_names);
@ -57,6 +59,7 @@ public:
bool isCurrentRangeFinished() const;
bool isInitialized() const { return is_initialized; }
private:
/// Accumulates sequential read() requests to perform a large read instead of multiple small reads
class DelayedStream
{
@ -144,10 +147,23 @@ public:
size_t ceilRowsToCompleteGranules(size_t rows_num) const;
};
public:
/// Statistics after next reading step.
class ReadResult
{
public:
Columns columns;
size_t num_rows = 0;
/// The number of rows were added to block as a result of reading chain.
size_t numReadRows() const { return num_read_rows; }
/// The number of bytes read from disk.
size_t numBytesRead() const { return num_bytes_read; }
private:
/// Only MergeTreeRangeReader is supposed to access ReadResult internals.
friend class MergeTreeRangeReader;
using NumRows = std::vector<size_t>;
struct RangeInfo
@ -161,13 +177,11 @@ public:
const RangesInfo & startedRanges() const { return started_ranges; }
const NumRows & rowsPerGranule() const { return rows_per_granule; }
static size_t getLastMark(const MergeTreeRangeReader::ReadResult::RangesInfo & ranges);
/// The number of rows were read at LAST iteration in chain. <= num_added_rows + num_filtered_rows.
size_t totalRowsPerGranule() const { return total_rows_per_granule; }
/// The number of rows were added to block as a result of reading chain.
size_t numReadRows() const { return num_read_rows; }
size_t numRowsToSkipInLastGranule() const { return num_rows_to_skip_in_last_granule; }
/// The number of bytes read from disk.
size_t numBytesRead() const { return num_bytes_read; }
/// Filter you need to apply to newly-read columns in order to add them to block.
const ColumnUInt8 * getFilterOriginal() const { return filter_original ? filter_original : filter; }
const ColumnUInt8 * getFilter() const { return filter; }
@ -195,13 +209,12 @@ public:
size_t countBytesInResultFilter(const IColumn::Filter & filter);
Columns columns;
size_t num_rows = 0;
/// If this flag is false than filtering form PREWHERE can be delayed and done in WHERE
/// to reduce memory copies and applying heavy filters multiple times
bool need_filter = false;
Block block_before_prewhere;
private:
RangesInfo started_ranges;
/// The number of rows read from each granule.
/// Granule here is not number of rows between two marks
@ -234,16 +247,15 @@ public:
const Block & getSampleBlock() const { return sample_block; }
private:
ReadResult startReadingChain(size_t max_rows, MarkRanges & ranges);
Columns continueReadingChain(ReadResult & result, size_t & num_rows);
Columns continueReadingChain(const ReadResult & result, size_t & num_rows);
void executePrewhereActionsAndFilterColumns(ReadResult & result);
void fillPartOffsetColumn(ReadResult & result, UInt64 leading_begin_part_offset, UInt64 leading_end_part_offset);
IMergeTreeReader * merge_tree_reader = nullptr;
const MergeTreeIndexGranularity * index_granularity = nullptr;
MergeTreeRangeReader * prev_reader = nullptr; /// If not nullptr, read from prev_reader firstly.
const PrewhereExprInfo * prewhere_info;
const PrewhereExprStep * prewhere_info;
Stream stream;

View File

@ -135,13 +135,15 @@ MergeTreeReadTaskPtr MergeTreeReadPool::getTask(size_t min_marks_to_read, size_t
}
}
auto curr_task_size_predictor = !per_part_size_predictor[part_idx] ? nullptr
: std::make_unique<MergeTreeBlockSizePredictor>(*per_part_size_predictor[part_idx]); /// make a copy
const auto & per_part = per_part_params[part_idx];
auto curr_task_size_predictor = !per_part.size_predictor ? nullptr
: std::make_unique<MergeTreeBlockSizePredictor>(*per_part.size_predictor); /// make a copy
return std::make_unique<MergeTreeReadTask>(
part.data_part, ranges_to_get_from_part, part.part_index_in_query, ordered_names,
per_part_column_name_set[part_idx], per_part_columns[part_idx], per_part_pre_columns[part_idx],
prewhere_info && prewhere_info->remove_prewhere_column, per_part_should_reorder[part_idx], std::move(curr_task_size_predictor));
per_part.column_name_set, per_part.task_columns,
prewhere_info && prewhere_info->remove_prewhere_column, std::move(curr_task_size_predictor));
}
Block MergeTreeReadPool::getHeader() const
@ -216,15 +218,14 @@ std::vector<size_t> MergeTreeReadPool::fillPerPartInfo(const RangesInDataParts &
auto size_predictor = !predict_block_size_bytes ? nullptr
: MergeTreeBaseSelectProcessor::getSizePredictor(part.data_part, task_columns, sample_block);
per_part_size_predictor.emplace_back(std::move(size_predictor));
auto & per_part = per_part_params.emplace_back();
per_part.size_predictor = std::move(size_predictor);
/// will be used to distinguish between PREWHERE and WHERE columns when applying filter
const auto & required_column_names = task_columns.columns.getNames();
per_part_column_name_set.emplace_back(required_column_names.begin(), required_column_names.end());
per_part_pre_columns.push_back(std::move(task_columns.pre_columns));
per_part_columns.push_back(std::move(task_columns.columns));
per_part_should_reorder.push_back(task_columns.should_reorder);
per_part.column_name_set = {required_column_names.begin(), required_column_names.end()};
per_part.task_columns = std::move(task_columns);
parts_with_idx.push_back({ part.data_part, part.part_index_in_query });
}

View File

@ -99,11 +99,16 @@ private:
const Names column_names;
bool do_not_steal_tasks;
bool predict_block_size_bytes;
std::vector<NameSet> per_part_column_name_set;
std::vector<NamesAndTypesList> per_part_columns;
std::vector<NamesAndTypesList> per_part_pre_columns;
std::vector<char> per_part_should_reorder;
std::vector<MergeTreeBlockSizePredictorPtr> per_part_size_predictor;
struct PerPartParams
{
MergeTreeReadTaskColumns task_columns;
NameSet column_name_set;
MergeTreeBlockSizePredictorPtr size_predictor;
};
std::vector<PerPartParams> per_part_params;
PrewhereInfoPtr prewhere_info;
struct Part

View File

@ -67,9 +67,12 @@ size_t MergeTreeReaderWide::readRows(
size_t read_rows = 0;
try
{
size_t num_columns = columns.size();
size_t num_columns = res_columns.size();
checkNumberOfColumns(num_columns);
if (num_columns == 0)
return max_rows_to_read;
std::unordered_map<String, ISerialization::SubstreamsCache> caches;
std::unordered_set<std::string> prefetched_streams;

View File

@ -31,8 +31,8 @@ try
task = std::make_unique<MergeTreeReadTask>(
data_part, mark_ranges_for_task, part_index_in_query, ordered_names, column_name_set,
task_columns.columns, task_columns.pre_columns, prewhere_info && prewhere_info->remove_prewhere_column,
task_columns.should_reorder, std::move(size_predictor));
task_columns, prewhere_info && prewhere_info->remove_prewhere_column,
std::move(size_predictor));
return true;
}

View File

@ -66,10 +66,16 @@ void MergeTreeSelectProcessor::initializeReaders()
reader = data_part->getReader(task_columns.columns, storage_snapshot->getMetadataForQuery(),
all_mark_ranges, owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings, {}, {});
if (prewhere_info)
pre_reader = data_part->getReader(task_columns.pre_columns, storage_snapshot->getMetadataForQuery(),
all_mark_ranges, owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings, {}, {});
pre_reader_for_step.clear();
if (prewhere_info)
{
for (const auto & pre_columns_for_step : task_columns.pre_columns)
{
pre_reader_for_step.push_back(data_part->getReader(pre_columns_for_step, storage_snapshot->getMetadataForQuery(),
all_mark_ranges, owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings, {}, {}));
}
}
}
@ -80,7 +86,7 @@ void MergeTreeSelectProcessor::finish()
* buffers don't waste memory.
*/
reader.reset();
pre_reader.reset();
pre_reader_for_step.clear();
data_part.reset();
}

View File

@ -111,14 +111,20 @@ void MergeTreeThreadSelectProcessor::finalizeNewTask()
owned_uncompressed_cache = storage.getContext()->getUncompressedCache();
owned_mark_cache = storage.getContext()->getMarkCache();
reader = task->data_part->getReader(task->columns, metadata_snapshot, task->mark_ranges,
reader = task->data_part->getReader(task->task_columns.columns, metadata_snapshot, task->mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
IMergeTreeReader::ValueSizeMap{}, profile_callback);
pre_reader_for_step.clear();
if (prewhere_info)
pre_reader = task->data_part->getReader(task->pre_columns, metadata_snapshot, task->mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
IMergeTreeReader::ValueSizeMap{}, profile_callback);
{
for (const auto & pre_columns_per_step : task->task_columns.pre_columns)
{
pre_reader_for_step.push_back(task->data_part->getReader(pre_columns_per_step, metadata_snapshot, task->mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
IMergeTreeReader::ValueSizeMap{}, profile_callback));
}
}
}
else
{
@ -126,14 +132,20 @@ void MergeTreeThreadSelectProcessor::finalizeNewTask()
if (part_name != last_readed_part_name)
{
/// retain avg_value_size_hints
reader = task->data_part->getReader(task->columns, metadata_snapshot, task->mark_ranges,
reader = task->data_part->getReader(task->task_columns.columns, metadata_snapshot, task->mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
reader->getAvgValueSizeHints(), profile_callback);
pre_reader_for_step.clear();
if (prewhere_info)
pre_reader = task->data_part->getReader(task->pre_columns, metadata_snapshot, task->mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
reader->getAvgValueSizeHints(), profile_callback);
{
for (const auto & pre_columns_per_step : task->task_columns.pre_columns)
{
pre_reader_for_step.push_back(task->data_part->getReader(pre_columns_per_step, metadata_snapshot, task->mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
reader->getAvgValueSizeHints(), profile_callback));
}
}
}
}
@ -144,7 +156,7 @@ void MergeTreeThreadSelectProcessor::finalizeNewTask()
void MergeTreeThreadSelectProcessor::finish()
{
reader.reset();
pre_reader.reset();
pre_reader_for_step.clear();
}

View File

@ -0,0 +1,11 @@
1000000
0
0
0
400000
195431
195431
5923
200000
200000
6061

View File

@ -0,0 +1,36 @@
DROP ROW POLICY IF EXISTS test_filter_policy ON test_table;
DROP ROW POLICY IF EXISTS test_filter_policy_2 ON test_table;
DROP TABLE IF EXISTS test_table;
CREATE TABLE test_table (`n` UInt64, `s` String)
ENGINE = MergeTree
PRIMARY KEY n ORDER BY n;
INSERT INTO test_table SELECT number, concat('some string ', CAST(number, 'String')) FROM numbers(1000000);
-- Create row policy that doesn't use any column
CREATE ROW POLICY test_filter_policy ON test_table USING False TO ALL;
-- Run query under default user so that always false row_level_filter is added that doesn't require any columns
SELECT count(1) FROM test_table;
SELECT count(1) FROM test_table PREWHERE (n % 8192) < 4000;
SELECT count(1) FROM test_table WHERE (n % 8192) < 4000;
SELECT count(1) FROM test_table PREWHERE (n % 8192) < 4000 WHERE (n % 33) == 0;
-- Add policy for default user that will read a column
CREATE ROW POLICY test_filter_policy_2 ON test_table USING (n % 5) >= 3 TO default;
-- Run query under default user that needs the same column as PREWHERE and WHERE
SELECT count(1) FROM test_table;
SELECT count(1) FROM test_table PREWHERE (n % 8192) < 4000;
SELECT count(1) FROM test_table WHERE (n % 8192) < 4000;
SELECT count(1) FROM test_table PREWHERE (n % 8192) < 4000 WHERE (n % 33) == 0;
-- Run queries that have division by zero if row level filter isn't applied before prewhere
SELECT count(1) FROM test_table PREWHERE 7 / (n % 5) > 2;
SELECT count(1) FROM test_table WHERE 7 / (n % 5) > 2;
SELECT count(1) FROM test_table PREWHERE 7 / (n % 5) > 2 WHERE (n % 33) == 0;
DROP TABLE test_table;
DROP ROW POLICY test_filter_policy ON test_table;
DROP ROW POLICY test_filter_policy_2 ON test_table;