This commit is contained in:
Alexander Gololobov 2022-06-13 15:00:26 +02:00
parent b629833ef9
commit dbc6d1a159
9 changed files with 74 additions and 259 deletions

View File

@ -84,15 +84,7 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor(
}; };
prewhere_actions->steps.emplace_back(std::move(row_level_filter_step)); prewhere_actions->steps.emplace_back(std::move(row_level_filter_step));
// prewhere_actions->row_level_filter = std::make_shared<ExpressionActions>(prewhere_info->row_level_filter, actions_settings);
} }
// prewhere_actions->row_level_column_name = prewhere_info->row_level_column_name;
// prewhere_actions->prewhere_actions = std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions, actions_settings);
// prewhere_actions->prewhere_column_name = prewhere_info->prewhere_column_name;
// prewhere_actions->remove_prewhere_column = prewhere_info->remove_prewhere_column;
// prewhere_actions->need_filter = prewhere_info->need_filter;
PrewhereExprStep prewhere_step PrewhereExprStep prewhere_step
{ {
@ -103,12 +95,6 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor(
}; };
prewhere_actions->steps.emplace_back(std::move(prewhere_step)); prewhere_actions->steps.emplace_back(std::move(prewhere_step));
// std::cerr
// << "PREWHERE ========================\n"
// << prewhere_actions->dump()
// << "========================\n\n";
} }
} }
@ -236,7 +222,11 @@ void MergeTreeBaseSelectProcessor::initializeRangeReaders(MergeTreeReadTask & cu
if (prewhere_info) if (prewhere_info)
{ {
assert(prewhere_actions->steps.size() == pre_reader_for_step.size()); if (prewhere_actions->steps.size() != pre_reader_for_step.size())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"PREWHERE steps count mismatch, actions: {}, readers: {}",
prewhere_actions->steps.size(), pre_reader_for_step.size());
for (size_t i = 0; i < prewhere_actions->steps.size(); ++i) for (size_t i = 0; i < prewhere_actions->steps.size(); ++i)
{ {
@ -244,7 +234,6 @@ void MergeTreeBaseSelectProcessor::initializeRangeReaders(MergeTreeReadTask & cu
current_task.pre_range_reader.push_back( current_task.pre_range_reader.push_back(
MergeTreeRangeReader(pre_reader_for_step[i].get(), prev_reader, &prewhere_actions->steps[i], last_reader, non_const_virtual_column_names)); MergeTreeRangeReader(pre_reader_for_step[i].get(), prev_reader, &prewhere_actions->steps[i], last_reader, non_const_virtual_column_names));
prev_reader = &current_task.pre_range_reader.back(); prev_reader = &current_task.pre_range_reader.back();
} }
@ -256,41 +245,14 @@ void MergeTreeBaseSelectProcessor::initializeRangeReaders(MergeTreeReadTask & cu
} }
else else
{ {
// HACK!! /// If all columns are read by pre_range_readers than move last pre_range_reader into range_reader
// If all columns are read by pre_range_readers than move last pre_range_reader into range_reader
current_task.range_reader = std::move(current_task.pre_range_reader.back()); current_task.range_reader = std::move(current_task.pre_range_reader.back());
current_task.pre_range_reader.pop_back(); current_task.pre_range_reader.pop_back();
} }
/*
if (prewhere_info)
{
if (reader->getColumns().empty())
{
current_task.range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_actions.get(), true, non_const_virtual_column_names);
}
else
{
MergeTreeRangeReader * pre_reader_ptr = nullptr;
if (pre_reader != nullptr)
{
current_task.pre_range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_actions.get(), false, non_const_virtual_column_names);
pre_reader_ptr = &current_task.pre_range_reader;
}
current_task.range_reader = MergeTreeRangeReader(reader.get(), pre_reader_ptr, nullptr, true, non_const_virtual_column_names);
}
}
else
{
current_task.range_reader = MergeTreeRangeReader(reader.get(), nullptr, nullptr, true, non_const_virtual_column_names);
}
//*/
} }
static UInt64 estimateNumRows(const MergeTreeReadTask & current_task, UInt64 current_preferred_block_size_bytes, static UInt64 estimateNumRows(const MergeTreeReadTask & current_task, UInt64 current_preferred_block_size_bytes,
UInt64 current_max_block_size_rows, UInt64 current_preferred_max_column_in_block_size_bytes, double min_filtration_ratio) UInt64 current_max_block_size_rows, UInt64 current_preferred_max_column_in_block_size_bytes, double min_filtration_ratio)
//, const MergeTreeRangeReader & current_reader)
{ {
const MergeTreeRangeReader & current_reader = current_task.range_reader; const MergeTreeRangeReader & current_reader = current_task.range_reader;
@ -339,7 +301,7 @@ Chunk MergeTreeBaseSelectProcessor::readFromPartImpl()
const double min_filtration_ratio = 0.00001; const double min_filtration_ratio = 0.00001;
UInt64 recommended_rows = estimateNumRows(*task, current_preferred_block_size_bytes, UInt64 recommended_rows = estimateNumRows(*task, current_preferred_block_size_bytes,
current_max_block_size_rows, current_preferred_max_column_in_block_size_bytes, min_filtration_ratio);//, task->range_reader); current_max_block_size_rows, current_preferred_max_column_in_block_size_bytes, min_filtration_ratio);
UInt64 rows_to_read = std::max(static_cast<UInt64>(1), std::min(current_max_block_size_rows, recommended_rows)); UInt64 rows_to_read = std::max(static_cast<UInt64>(1), std::min(current_max_block_size_rows, recommended_rows));
auto read_result = task->range_reader.read(rows_to_read, task->mark_ranges); auto read_result = task->range_reader.read(rows_to_read, task->mark_ranges);

View File

@ -5,6 +5,9 @@
#include <Common/checkStackSize.h> #include <Common/checkStackSize.h>
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <Columns/ColumnConst.h> #include <Columns/ColumnConst.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <unordered_set> #include <unordered_set>
@ -284,34 +287,22 @@ MergeTreeReadTaskColumns getReadTaskColumns(
if (with_subcolumns) if (with_subcolumns)
options.withSubcolumns(); options.withSubcolumns();
// NameSet all_pre_columns;
if (prewhere_info) if (prewhere_info)
{ {
NameSet pre_name_set; NameSet pre_name_set;
// TODO: for each prewhere step /// Add column reading steps:
/// 1. Columns for row level filter /// 1. Columns for row level filter
if (prewhere_info->row_level_filter) if (prewhere_info->row_level_filter)
{ {
Names row_filter_column_names = prewhere_info->row_level_filter->getRequiredColumnsNames(); Names row_filter_column_names = prewhere_info->row_level_filter->getRequiredColumnsNames();
////// HACK!!!
result.pre_columns.push_back(storage_snapshot->getColumnsByNames(options, row_filter_column_names)); result.pre_columns.push_back(storage_snapshot->getColumnsByNames(options, row_filter_column_names));
//////////////
pre_name_set.insert(row_filter_column_names.begin(), row_filter_column_names.end()); pre_name_set.insert(row_filter_column_names.begin(), row_filter_column_names.end());
// all_pre_columns.insert(pre_column_names.begin(), pre_column_names.end());
} }
/// 2. Columns for prewhere /// 2. Columns for prewhere
Names all_pre_column_names = prewhere_info->prewhere_actions->getRequiredColumnsNames(); Names all_pre_column_names = prewhere_info->prewhere_actions->getRequiredColumnsNames();
// if (pre_column_names.empty())
// pre_column_names.push_back(column_names[0]);
const auto injected_pre_columns = injectRequiredColumns( const auto injected_pre_columns = injectRequiredColumns(
storage, storage_snapshot, data_part, with_subcolumns, all_pre_column_names); storage, storage_snapshot, data_part, with_subcolumns, all_pre_column_names);
@ -323,7 +314,6 @@ MergeTreeReadTaskColumns getReadTaskColumns(
pre_name_set.insert(name); pre_name_set.insert(name);
} }
Names post_column_names; Names post_column_names;
for (const auto & name : column_names) for (const auto & name : column_names)
if (!pre_name_set.contains(name)) if (!pre_name_set.contains(name))
@ -332,16 +322,23 @@ MergeTreeReadTaskColumns getReadTaskColumns(
column_names = post_column_names; column_names = post_column_names;
} }
// NamesAndTypesList all_columns;
////// HACK!!!
result.pre_columns.push_back(storage_snapshot->getColumnsByNames(options, pre_column_names)); result.pre_columns.push_back(storage_snapshot->getColumnsByNames(options, pre_column_names));
//////////////
/// 3. Rest of the requested columns /// 3. Rest of the requested columns
result.columns = storage_snapshot->getColumnsByNames(options, column_names); result.columns = storage_snapshot->getColumnsByNames(options, column_names);
return result; return result;
} }
std::string MergeTreeReadTaskColumns::dump() const
{
WriteBufferFromOwnString s;
for (size_t i = 0; i < pre_columns.size(); ++i)
{
s << "STEP " << i << ": " << pre_columns[i].toString() << "\n";
}
s << "COLUMNS: " << columns.toString() << "\n";
return s.str();
}
} }

View File

@ -37,16 +37,7 @@ struct MergeTreeReadTaskColumns
/// column names to read during each PREWHERE step /// column names to read during each PREWHERE step
std::vector<NamesAndTypesList> pre_columns; std::vector<NamesAndTypesList> pre_columns;
std::string dump() const std::string dump() const;
{
std::ostringstream s;
for (size_t i = 0; i < pre_columns.size(); ++i)
{
s << "STEP " << i << ": " << pre_columns[i].toString() << "\n";
}
s << "COLUMNS: " << columns.toString() << "\n";
return s.str();
}
}; };
/// A batch of work for MergeTreeThreadSelectProcessor /// A batch of work for MergeTreeThreadSelectProcessor
@ -62,13 +53,8 @@ struct MergeTreeReadTask
const Names & ordered_names; const Names & ordered_names;
/// used to determine whether column should be filtered during PREWHERE or WHERE /// used to determine whether column should be filtered during PREWHERE or WHERE
const NameSet & column_name_set; const NameSet & column_name_set;
/// column names to read during PREWHERE and WHERE
const MergeTreeReadTaskColumns & task_columns; const MergeTreeReadTaskColumns & task_columns;
// /// column names to read during WHERE
// const NamesAndTypesList & columns;
// /// column names to read during PREWHERE
// const NamesAndTypesList & pre_columns;
/// should PREWHERE column be returned to requesting side? /// should PREWHERE column be returned to requesting side?
const bool remove_prewhere_column; const bool remove_prewhere_column;
/// Used to satistfy preferred_block_size_bytes limitation /// Used to satistfy preferred_block_size_bytes limitation

View File

@ -1,9 +1,10 @@
#include <sstream>
#include <Storages/MergeTree/IMergeTreeReader.h> #include <Storages/MergeTree/IMergeTreeReader.h>
#include <Columns/FilterDescription.h> #include <Columns/FilterDescription.h>
#include <Columns/ColumnConst.h> #include <Columns/ColumnConst.h>
#include <Columns/ColumnsCommon.h> #include <Columns/ColumnsCommon.h>
#include <Common/TargetSpecific.h> #include <Common/TargetSpecific.h>
#include "IO/WriteBufferFromString.h"
#include "IO/Operators.h"
#include <base/range.h> #include <base/range.h>
#include <Interpreters/castColumn.h> #include <Interpreters/castColumn.h>
#include <DataTypes/DataTypeNothing.h> #include <DataTypes/DataTypeNothing.h>
@ -44,7 +45,7 @@ static void filterColumns(Columns & columns, const IColumn::Filter & filter)
} }
} }
} }
/*
static void filterColumns(Columns & columns, const ColumnPtr & filter) static void filterColumns(Columns & columns, const ColumnPtr & filter)
{ {
ConstantFilterDescription const_descr(*filter); ConstantFilterDescription const_descr(*filter);
@ -63,7 +64,7 @@ static void filterColumns(Columns & columns, const ColumnPtr & filter)
FilterDescription descr(*filter); FilterDescription descr(*filter);
filterColumns(columns, *descr.data); filterColumns(columns, *descr.data);
} }
*/
size_t MergeTreeRangeReader::ReadResult::getLastMark(const MergeTreeRangeReader::ReadResult::RangesInfo & ranges) size_t MergeTreeRangeReader::ReadResult::getLastMark(const MergeTreeRangeReader::ReadResult::RangesInfo & ranges)
{ {
@ -359,7 +360,6 @@ void MergeTreeRangeReader::ReadResult::setFilterConstFalse()
num_rows = 0; num_rows = 0;
} }
///
void MergeTreeRangeReader::ReadResult::optimize(bool can_read_incomplete_granules, bool allow_filter_columns) void MergeTreeRangeReader::ReadResult::optimize(bool can_read_incomplete_granules, bool allow_filter_columns)
{ {
if (total_rows_per_granule == 0 || filter == nullptr) if (total_rows_per_granule == 0 || filter == nullptr)
@ -417,7 +417,6 @@ void MergeTreeRangeReader::ReadResult::optimize(bool can_read_incomplete_granule
need_filter = true; need_filter = true;
} }
/// For each read granule
size_t MergeTreeRangeReader::ReadResult::countZeroTails(const IColumn::Filter & filter_vec, NumRows & zero_tails, bool can_read_incomplete_granules) const size_t MergeTreeRangeReader::ReadResult::countZeroTails(const IColumn::Filter & filter_vec, NumRows & zero_tails, bool can_read_incomplete_granules) const
{ {
zero_tails.resize(0); zero_tails.resize(0);
@ -676,29 +675,13 @@ MergeTreeRangeReader::MergeTreeRangeReader(
if (prewhere_info) if (prewhere_info)
{ {
// for (const auto & step : prewhere_info->steps)
const auto & step = *prewhere_info; const auto & step = *prewhere_info;
{
if (step.actions) if (step.actions)
step.actions->execute(sample_block, true); step.actions->execute(sample_block, true);
if (step.remove_column) if (step.remove_column)
sample_block.erase(step.column_name); sample_block.erase(step.column_name);
} }
/* if (prewhere_info->row_level_filter)
{
prewhere_info->row_level_filter->execute(sample_block, true);
sample_block.erase(prewhere_info->row_level_column_name);
}
if (prewhere_info->prewhere_actions)
prewhere_info->prewhere_actions->execute(sample_block, true);
if (prewhere_info->remove_prewhere_column)
sample_block.erase(prewhere_info->prewhere_column_name);
*/
}
} }
bool MergeTreeRangeReader::isReadingFinished() const bool MergeTreeRangeReader::isReadingFinished() const
@ -998,16 +981,14 @@ void MergeTreeRangeReader::fillPartOffsetColumn(ReadResult & result, UInt64 lead
result.columns.emplace_back(std::move(column)); result.columns.emplace_back(std::move(column));
} }
Columns MergeTreeRangeReader::continueReadingChain(ReadResult & result, size_t & num_rows) Columns MergeTreeRangeReader::continueReadingChain(const ReadResult & result, size_t & num_rows)
{ {
Columns columns; Columns columns;
num_rows = 0; num_rows = 0;
/////////////// /// No columns need to be read at this step? (only more filtering)
// HACK!!: no columns need to be read at this step? (only more filtering)
if (merge_tree_reader->getColumns().empty()) if (merge_tree_reader->getColumns().empty())
return columns; return columns;
///////////////
if (result.rowsPerGranule().empty()) if (result.rowsPerGranule().empty())
{ {
@ -1044,8 +1025,11 @@ Columns MergeTreeRangeReader::continueReadingChain(ReadResult & result, size_t &
stream.skip(result.numRowsToSkipInLastGranule()); stream.skip(result.numRowsToSkipInLastGranule());
num_rows += stream.finalize(columns); num_rows += stream.finalize(columns);
/// verify that stream and prev_reader->stream are at exactly same offset
// TODO: here we can verify that stream and prev_reader->stream are at exactly same offset if (stream.currentPartOffset() != prev_reader->stream.currentPartOffset())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Current step stream read position {} doesn't match previous step read read position {}",
stream.currentPartOffset(), prev_reader->stream.currentPartOffset());
/// added_rows may be zero if all columns were read in prewhere and it's ok. /// added_rows may be zero if all columns were read in prewhere and it's ok.
if (num_rows && num_rows != result.totalRowsPerGranule()) if (num_rows && num_rows != result.totalRowsPerGranule())
@ -1125,15 +1109,17 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
const auto & header = merge_tree_reader->getColumns(); const auto & header = merge_tree_reader->getColumns();
size_t num_columns = header.size(); size_t num_columns = header.size();
// TODO: properly check that we have columns from previous steps and newly read required columns /// Check that we have columns from previous steps and newly read required columns
if (result.columns.size() < num_columns + non_const_virtual_column_names.size()) if (result.columns.size() < num_columns + non_const_virtual_column_names.size())
throw Exception("Invalid number of columns passed to MergeTreeRangeReader. " throw Exception(ErrorCodes::LOGICAL_ERROR,
"Expected " + toString(num_columns) + ", " "Invalid number of columns passed to MergeTreeRangeReader. Expected {}, got {}",
"got " + toString(result.columns.size()), ErrorCodes::LOGICAL_ERROR); num_columns, result.columns.size());
ColumnPtr current_filter; /// This filter has the size of total_rows_per granule. It is applied after reading contiguous chunks from
ColumnPtr filter; /// the start of each granule.
// ColumnPtr row_level_filter; ColumnPtr combined_filter;
/// Filter computed at the current step. Its size is equal to num_rows which is <= total_rows_per_granule
ColumnPtr current_step_filter;
size_t prewhere_column_pos; size_t prewhere_column_pos;
{ {
@ -1150,12 +1136,8 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
} }
} }
// for (auto name_and_type = header.begin(); pos < num_columns; ++pos, ++name_and_type) for (auto name_and_type = header.begin(); name_and_type != header.end() && pos < result.columns.size(); ++pos, ++name_and_type)
// block.insert({result.columns[pos], name_and_type->type, name_and_type->name}); block.insert({result.columns[pos], name_and_type->type, name_and_type->name});
for (const auto & name_and_type : header) {
block.insert({result.columns[pos], name_and_type.type, name_and_type.name});
++pos;
}
for (const auto & column_name : non_const_virtual_column_names) for (const auto & column_name : non_const_virtual_column_names)
{ {
@ -1163,7 +1145,14 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
continue; continue;
if (column_name == "_part_offset") if (column_name == "_part_offset")
{
if (pos >= result.columns.size())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Invalid number of columns passed to MergeTreeRangeReader. Expected {}, got {}",
num_columns, result.columns.size());
block.insert({result.columns[pos], std::make_shared<DataTypeUInt64>(), column_name}); block.insert({result.columns[pos], std::make_shared<DataTypeUInt64>(), column_name});
}
else else
throw Exception("Unexpected non-const virtual column: " + column_name, ErrorCodes::LOGICAL_ERROR); throw Exception("Unexpected non-const virtual column: " + column_name, ErrorCodes::LOGICAL_ERROR);
++pos; ++pos;
@ -1172,21 +1161,6 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
/// Columns might be projected out. We need to store them here so that default columns can be evaluated later. /// Columns might be projected out. We need to store them here so that default columns can be evaluated later.
result.block_before_prewhere = block; result.block_before_prewhere = block;
/* if (prewhere_info->row_level_filter)
{
prewhere_info->row_level_filter->execute(block);
auto row_level_filter_pos = block.getPositionByName(prewhere_info->row_level_column_name);
row_level_filter = block.getByPosition(row_level_filter_pos).column;
block.erase(row_level_filter_pos);
auto columns = block.getColumns();
filterColumns(columns, row_level_filter);
if (columns.empty())
block = block.cloneEmpty();
else
block.setColumns(columns);
}
*/
if (prewhere_info->actions) if (prewhere_info->actions)
prewhere_info->actions->execute(block); prewhere_info->actions->execute(block);
@ -1197,40 +1171,27 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
for (auto & col : block) for (auto & col : block)
result.columns.emplace_back(std::move(col.column)); result.columns.emplace_back(std::move(col.column));
current_filter.swap(result.columns[prewhere_column_pos]); current_step_filter.swap(result.columns[prewhere_column_pos]);
filter = current_filter; combined_filter = current_step_filter;
} }
if (result.getFilter()) if (result.getFilter())
{ {
ColumnPtr prev_filter = result.getFilterHolder(); ColumnPtr prev_filter = result.getFilterHolder();
filter = combineFilters(prev_filter, std::move(filter)); combined_filter = combineFilters(prev_filter, std::move(combined_filter));
// /// TODO: implement for prewhere chain.
// /// In order to do it we need combine filter and result.filter, where filter filters only '1' in result.filter.
// throw Exception("MergeTreeRangeReader chain with several prewhere actions in not implemented.",
// ErrorCodes::LOGICAL_ERROR);
} }
// if (filter && row_level_filter) result.setFilter(combined_filter);
// {
// row_level_filter = combineFilters(std::move(row_level_filter), filter);
// result.setFilter(row_level_filter);
// }
// else
result.setFilter(filter);
/// If there is a WHERE, we filter in there, and only optimize IO and shrink columns here /// If there is a WHERE, we filter in there, and only optimize IO and shrink columns here
if (!last_reader_in_chain) if (!last_reader_in_chain)
result.optimize(merge_tree_reader->canReadIncompleteGranules(), true); // TODO: prewhere_info->row_level_filter == nullptr); result.optimize(merge_tree_reader->canReadIncompleteGranules(), true);
/// If we read nothing or filter gets optimized to nothing /// If we read nothing or filter gets optimized to nothing
if (result.totalRowsPerGranule() == 0) if (result.totalRowsPerGranule() == 0)
result.setFilterConstFalse(); result.setFilterConstFalse();
/// If we need to filter in PREWHERE /// If we need to filter in PREWHERE
else if (prewhere_info->need_filter || result.need_filter)// || prewhere_info->row_level_filter) else if (prewhere_info->need_filter || result.need_filter)
{ {
/// If there is a filter and without optimized /// If there is a filter and without optimized
if (result.getFilter() && last_reader_in_chain) if (result.getFilter() && last_reader_in_chain)
@ -1250,22 +1211,7 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
/// filter might be shrunk while columns not /// filter might be shrunk while columns not
const auto * result_filter = result.getFilterOriginal(); const auto * result_filter = result.getFilterOriginal();
// if (row_level_filter) filterColumns(result.columns, current_step_filter);
// filterColumns(result.columns, filter);
// else
// filterColumns(result.columns, result_filter->getData());
{
FilterDescription current_filter_descr(*current_filter);
// TODO: need to filter by current filter column that has num_rows size, not the original size
// TODO: properly handle const true and const false cases
if (current_filter_descr.countBytesInFilter() == 0)
result.columns.clear();
else if (current_filter_descr.data)
filterColumns(result.columns, *current_filter_descr.data);
}
result.need_filter = true; result.need_filter = true;
@ -1313,7 +1259,7 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
std::string PrewhereExprInfo::dump() const std::string PrewhereExprInfo::dump() const
{ {
std::ostringstream s; WriteBufferFromOwnString s;
for (size_t i = 0; i < steps.size(); ++i) for (size_t i = 0; i < steps.size(); ++i)
{ {

View File

@ -18,7 +18,6 @@ using PrewhereInfoPtr = std::shared_ptr<PrewhereInfo>;
class ExpressionActions; class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>; using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
//*
struct PrewhereExprStep struct PrewhereExprStep
{ {
ExpressionActionsPtr actions; ExpressionActionsPtr actions;
@ -30,42 +29,11 @@ struct PrewhereExprStep
/// The same as PrewhereInfo, but with ExpressionActions instead of ActionsDAG /// The same as PrewhereInfo, but with ExpressionActions instead of ActionsDAG
struct PrewhereExprInfo struct PrewhereExprInfo
{ {
// /// Actions for row level security filter. Applied separately before prewhere_actions.
// /// This actions are separate because prewhere condition should not be executed over filtered rows.
// ExpressionActionsPtr row_level_filter;
// /// Actions which are executed on block in order to get filter column for prewhere step.
// ExpressionActionsPtr prewhere_actions;
// String row_level_column_name;
// String prewhere_column_name;
// bool remove_prewhere_column = false;
// bool need_filter = false;
std::vector<PrewhereExprStep> steps; std::vector<PrewhereExprStep> steps;
///// PrewhereExprStep deleted_row_filter;
///// PrewhereExprStep row_level_filter;
///// PrewhereExprStep prewhere;
std::string dump() const; std::string dump() const;
}; };
/*/
/// The same as PrewhereInfo, but with ExpressionActions instead of ActionsDAG
struct PrewhereExprInfo
{
/// Actions for row level security filter. Applied separately before prewhere_actions.
/// This actions are separate because prewhere condition should not be executed over filtered rows.
ExpressionActionsPtr row_level_filter;
/// Actions which are executed on block in order to get filter column for prewhere step.
ExpressionActionsPtr prewhere_actions;
String row_level_column_name;
String prewhere_column_name;
bool remove_prewhere_column = false;
bool need_filter = false;
};
//*/
/// MergeTreeReader iterator which allows sequential reading for arbitrary number of rows between pairs of marks in the same part. /// MergeTreeReader iterator which allows sequential reading for arbitrary number of rows between pairs of marks in the same part.
/// Stores reading state, which can be inside granule. Can skip rows in current granule and start reading from next mark. /// Stores reading state, which can be inside granule. Can skip rows in current granule and start reading from next mark.
/// Used generally for reading number of rows less than index granularity to decrease cache misses for fat blocks. /// Used generally for reading number of rows less than index granularity to decrease cache misses for fat blocks.
@ -278,7 +246,7 @@ public:
private: private:
ReadResult startReadingChain(size_t max_rows, MarkRanges & ranges); ReadResult startReadingChain(size_t max_rows, MarkRanges & ranges);
Columns continueReadingChain(ReadResult & result, size_t & num_rows); Columns continueReadingChain(const ReadResult & result, size_t & num_rows);
void executePrewhereActionsAndFilterColumns(ReadResult & result); void executePrewhereActionsAndFilterColumns(ReadResult & result);
void fillPartOffsetColumn(ReadResult & result, UInt64 leading_begin_part_offset, UInt64 leading_end_part_offset); void fillPartOffsetColumn(ReadResult & result, UInt64 leading_begin_part_offset, UInt64 leading_end_part_offset);

View File

@ -70,7 +70,6 @@ size_t MergeTreeReaderWide::readRows(
size_t num_columns = res_columns.size(); size_t num_columns = res_columns.size();
checkNumberOfColumns(num_columns); checkNumberOfColumns(num_columns);
/// TODO: is this ok to request no columns?
if (num_columns == 0) if (num_columns == 0)
return max_rows_to_read; return max_rows_to_read;

View File

@ -50,9 +50,6 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
void MergeTreeSelectProcessor::initializeReaders() void MergeTreeSelectProcessor::initializeReaders()
{ {
pre_reader_for_step.clear(); // is it being reused???
task_columns = getReadTaskColumns( task_columns = getReadTaskColumns(
storage, storage_snapshot, data_part, storage, storage_snapshot, data_part,
required_columns, prewhere_info, /*with_subcolumns=*/ true); required_columns, prewhere_info, /*with_subcolumns=*/ true);
@ -69,6 +66,8 @@ pre_reader_for_step.clear(); // is it being reused???
reader = data_part->getReader(task_columns.columns, storage_snapshot->getMetadataForQuery(), reader = data_part->getReader(task_columns.columns, storage_snapshot->getMetadataForQuery(),
all_mark_ranges, owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings, {}, {}); all_mark_ranges, owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings, {}, {});
pre_reader_for_step.clear();
if (prewhere_info) if (prewhere_info)
{ {
for (const auto & pre_columns_for_step : task_columns.pre_columns) for (const auto & pre_columns_for_step : task_columns.pre_columns)

View File

@ -105,9 +105,6 @@ void MergeTreeThreadSelectProcessor::finalizeNewTask()
auto profile_callback = [this](ReadBufferFromFileBase::ProfileInfo info_) { pool->profileFeedback(info_); }; auto profile_callback = [this](ReadBufferFromFileBase::ProfileInfo info_) { pool->profileFeedback(info_); };
const auto & metadata_snapshot = storage_snapshot->metadata; const auto & metadata_snapshot = storage_snapshot->metadata;
//std::cerr << "==============TASK:==============\n" << task->task_columns.dump() << "\n";
//std::cerr << "pre_reader_for_step.size() " << pre_reader_for_step.size() << "\n\n";
if (!reader) if (!reader)
{ {
if (use_uncompressed_cache) if (use_uncompressed_cache)
@ -118,8 +115,7 @@ void MergeTreeThreadSelectProcessor::finalizeNewTask()
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings, owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
IMergeTreeReader::ValueSizeMap{}, profile_callback); IMergeTreeReader::ValueSizeMap{}, profile_callback);
pre_reader_for_step.clear(); // is it being reused??? pre_reader_for_step.clear();
if (prewhere_info) if (prewhere_info)
{ {
for (const auto & pre_columns_per_step : task->task_columns.pre_columns) for (const auto & pre_columns_per_step : task->task_columns.pre_columns)
@ -140,8 +136,7 @@ pre_reader_for_step.clear(); // is it being reused???
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings, owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
reader->getAvgValueSizeHints(), profile_callback); reader->getAvgValueSizeHints(), profile_callback);
pre_reader_for_step.clear(); // is it being reused??? pre_reader_for_step.clear();
if (prewhere_info) if (prewhere_info)
{ {
for (const auto & pre_columns_per_step : task->task_columns.pre_columns) for (const auto & pre_columns_per_step : task->task_columns.pre_columns)

View File

@ -47,42 +47,6 @@ using MergeTreeDataSelectAnalysisResultPtr = std::shared_ptr<MergeTreeDataSelect
struct SubqueryForSet; struct SubqueryForSet;
using SubqueriesForSets = std::unordered_map<String, SubqueryForSet>; using SubqueriesForSets = std::unordered_map<String, SubqueryForSet>;
/*
struct PrewhereStep
{
ActionsDAGPtr prewhere_actions;
String prewhere_column_name;
bool remove_prewhere_column = false;
bool need_filter = false;
explicit PrewhereStep(ActionsDAGPtr prewhere_actions_, String prewhere_column_name_)
: prewhere_actions(std::move(prewhere_actions_)), prewhere_column_name(std::move(prewhere_column_name_)) {}
std::string dump() const;
};
struct PrewhereInfo
{
// /// Actions for row level security filter. Applied separately before prewhere_actions.
// /// This actions are separate because prewhere condition should not be executed over filtered rows.
// ActionsDAGPtr row_level_filter;
// /// Actions which are executed on block in order to get filter column for prewhere step.
// ActionsDAGPtr prewhere_actions;
// String row_level_column_name;
// String prewhere_column_name;
// bool remove_prewhere_column = false;
// bool need_filter = false;
std::vector<PrewhereStep> steps;
PrewhereInfo() = default;
// explicit PrewhereInfo(ActionsDAGPtr prewhere_actions_, String prewhere_column_name_)
// : prewhere_actions(std::move(prewhere_actions_)), prewhere_column_name(std::move(prewhere_column_name_)) {}
std::string dump() const;
};
/*/
struct PrewhereInfo struct PrewhereInfo
{ {
/// Actions for row level security filter. Applied separately before prewhere_actions. /// Actions for row level security filter. Applied separately before prewhere_actions.
@ -101,7 +65,6 @@ struct PrewhereInfo
std::string dump() const; std::string dump() const;
}; };
//*/
/// Helper struct to store all the information about the filter expression. /// Helper struct to store all the information about the filter expression.
struct FilterInfo struct FilterInfo