This commit is contained in:
Nikolai Kochetov 2021-02-20 14:00:16 +03:00
parent 8361904b4d
commit 673e24d7ef
7 changed files with 196 additions and 119 deletions

View File

@ -208,6 +208,8 @@ public:
const Context & context,
bool can_replace = false);
void addNodeToIndex(const Node * node) { index.insert(const_cast<Node *>(node)); }
/// Call addAlias several times.
void addAliases(const NamesWithAliases & aliases);
/// Add alias actions and remove unused columns from index. Also specify result columns order in index.

View File

@ -1405,7 +1405,8 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
if (storage && filter_info_)
{
filter_info = filter_info_;
query_analyzer.appendPreliminaryFilter(chain, filter_info->actions, filter_info->column_name);
filter_info->do_remove_column = true;
//query_analyzer.appendPreliminaryFilter(chain, filter_info->actions, filter_info->column_name);
}
if (auto actions = query_analyzer.appendPrewhere(chain, !first_stage, additional_required_columns_after_prewhere))
@ -1574,11 +1575,11 @@ void ExpressionAnalysisResult::finalize(const ExpressionActionsChain & chain, si
{
size_t next_step_i = 0;
if (hasFilter())
{
const ExpressionActionsChain::Step & step = *chain.steps.at(next_step_i++);
filter_info->do_remove_column = step.can_remove_required_output.at(0);
}
// if (hasFilter())
// {
// const ExpressionActionsChain::Step & step = *chain.steps.at(next_step_i++);
// filter_info->do_remove_column = step.can_remove_required_output.at(0);
// }
if (hasPrewhere())
{
@ -1605,8 +1606,8 @@ void ExpressionAnalysisResult::finalize(const ExpressionActionsChain & chain, si
void ExpressionAnalysisResult::removeExtraColumns() const
{
if (hasFilter())
filter_info->actions->projectInput();
// if (hasFilter())
// filter_info->actions->projectInput();
if (hasWhere())
before_where->projectInput();
if (hasHaving())

View File

@ -141,7 +141,15 @@ String InterpreterSelectQuery::generateFilterActions(ActionsDAGPtr & actions, co
SelectQueryExpressionAnalyzer analyzer(query_ast, syntax_result, *context, metadata_snapshot);
actions = analyzer.simpleSelectActions();
return expr_list->children.at(0)->getColumnName();
auto column_name = expr_list->children.at(0)->getColumnName();
actions->removeUnusedActions({column_name});
actions->projectInput(false);
ActionsDAG::Index index;
for (const auto * node : actions->getInputs())
actions->addNodeToIndex(node);
return column_name;
}
InterpreterSelectQuery::InterpreterSelectQuery(
@ -444,16 +452,22 @@ InterpreterSelectQuery::InterpreterSelectQuery(
if (storage)
{
source_header = metadata_snapshot->getSampleBlockForColumns(required_columns, storage->getVirtuals(), storage->getStorageID());
/// Fix source_header for filter actions.
if (row_policy_filter)
{
filter_info = std::make_shared<FilterDAGInfo>();
filter_info->column_name = generateFilterActions(filter_info->actions, required_columns);
source_header = metadata_snapshot->getSampleBlockForColumns(
filter_info->actions->getRequiredColumns().getNames(), storage->getVirtuals(), storage->getStorageID());
auto required_columns_from_filter = filter_info->actions->getRequiredColumns();
for (const auto & column : required_columns_from_filter)
{
if (required_columns.end() == std::find(required_columns.begin(), required_columns.end(), column.name))
required_columns.push_back(column.name);
}
}
source_header = metadata_snapshot->getSampleBlockForColumns(required_columns, storage->getVirtuals(), storage->getStorageID());
}
/// Calculate structure of the result.
@ -834,6 +848,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
expressions.prewhere_info = std::make_shared<PrewhereDAGInfo>(
std::move(expressions.filter_info->actions),
std::move(expressions.filter_info->column_name));
expressions.prewhere_info->prewhere_actions->projectInput(false);
expressions.prewhere_info->remove_prewhere_column = expressions.filter_info->do_remove_column;
expressions.prewhere_info->need_filter = true;
expressions.filter_info = nullptr;
@ -845,19 +860,19 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
expressions.prewhere_info->row_level_filter_actions = std::move(expressions.filter_info->actions);
expressions.prewhere_info->row_level_column_name = std::move(expressions.filter_info->column_name);
expressions.prewhere_info->row_level_filter_actions->projectInput(false);
if (expressions.filter_info->do_remove_column)
{
/// Instead of removing column, add it to prewhere_actions input (but not in index).
/// It will be removed at prewhere_actions execution.
const auto & index = expressions.prewhere_info->row_level_filter_actions->getIndex();
auto it = index.find(expressions.prewhere_info->row_level_column_name);
if (it == index.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Not found column {} in row level security filter {}",
expressions.prewhere_info->row_level_column_name, expressions.prewhere_info->row_level_filter_actions->dumpDAG());
const auto & node = *it;
// if (expressions.filter_info->do_remove_column)
// {
// /// Instead of removing column, add it to prewhere_actions input (but not in index).
// /// It will be removed at prewhere_actions execution.
// const auto & index = expressions.prewhere_info->row_level_filter_actions->getIndex();
// auto it = index.find(expressions.prewhere_info->row_level_column_name);
// if (it == index.end())
// throw Exception(ErrorCodes::LOGICAL_ERROR, "Not found column {} in row level security filter {}",
// expressions.prewhere_info->row_level_column_name, expressions.prewhere_info->row_level_filter_actions->dumpDAG());
// const auto & node = *it;
expressions.prewhere_info->prewhere_actions->addInput(node->result_name, node->result_type, true, false);
}
// expressions.prewhere_info->prewhere_actions->addInput(node->result_name, node->result_type, true, false);
// }
expressions.filter_info = nullptr;
}
@ -1285,7 +1300,7 @@ void InterpreterSelectQuery::addEmptySourceToQueryPlan(QueryPlan & query_plan, c
header,
prewhere_info.row_level_filter,
prewhere_info.row_level_column_name,
false);
true);
});
}

View File

@ -346,6 +346,8 @@ void MergeTreeBaseSelectProcessor::executePrewhereActions(Block & block, const P
throw Exception("Invalid type for filter in PREWHERE: " + row_level_column.type->getName(),
ErrorCodes::LOGICAL_ERROR);
}
block.erase(prewhere_info->row_level_column_name);
}
if (prewhere_info->prewhere_actions)

View File

@ -271,11 +271,22 @@ MergeTreeReadTaskColumns getReadTaskColumns(
{
if (prewhere_info->alias_actions)
pre_column_names = prewhere_info->alias_actions->getRequiredColumns();
else if (prewhere_info->row_level_filter)
pre_column_names = prewhere_info->row_level_filter->getRequiredColumns();
else if (prewhere_info->prewhere_actions)
else
{
pre_column_names = prewhere_info->prewhere_actions->getRequiredColumns();
if (prewhere_info->row_level_filter)
{
NameSet names(pre_column_names.begin(), pre_column_names.end());
for (auto & name : prewhere_info->row_level_filter->getRequiredColumns())
{
if (names.count(name) == 0)
pre_column_names.push_back(name);
}
}
}
if (pre_column_names.empty())
pre_column_names.push_back(column_names[0]);
@ -293,6 +304,13 @@ MergeTreeReadTaskColumns getReadTaskColumns(
column_names = post_column_names;
}
// std::cerr << "---------- Pre column names\n";
// for (const auto & col : pre_column_names)
// std::cerr << col << std::endl;
// std::cerr << "----------- Post column names\n";
// for (const auto & col : column_names)
// std::cerr << col << std::endl;
MergeTreeReadTaskColumns result;
if (check_columns)

View File

@ -33,6 +33,25 @@ static void filterColumns(Columns & columns, const IColumn::Filter & filter)
}
}
static void filterColumns(Columns & columns, const ColumnPtr & filter)
{
ConstantFilterDescription const_descr(*filter);
if (const_descr.always_true)
return;
if (const_descr.always_false)
{
for (auto & col : columns)
if (col)
col = col->cloneEmpty();
return;
}
FilterDescription descr(*filter);
filterColumns(columns, *descr.data);
}
MergeTreeRangeReader::DelayedStream::DelayedStream(
size_t from_mark, IMergeTreeReader * merge_tree_reader_)
@ -315,7 +334,7 @@ void MergeTreeRangeReader::ReadResult::setFilterConstFalse()
num_rows = 0;
}
void MergeTreeRangeReader::ReadResult::optimize(bool can_read_incomplete_granules)
void MergeTreeRangeReader::ReadResult::optimize(bool can_read_incomplete_granules, bool allow_filter_columns)
{
if (total_rows_per_granule == 0 || filter == nullptr)
return;
@ -347,7 +366,7 @@ void MergeTreeRangeReader::ReadResult::optimize(bool can_read_incomplete_granule
filter_holder_original = std::move(filter_holder);
/// Check if const 1 after shrink
if (countBytesInResultFilter(filter->getData()) + total_zero_rows_in_tails == total_rows_per_granule)
if (allow_filter_columns && countBytesInResultFilter(filter->getData()) + total_zero_rows_in_tails == total_rows_per_granule)
{
total_rows_per_granule = total_rows_per_granule - total_zero_rows_in_tails;
num_rows = total_rows_per_granule;
@ -451,79 +470,32 @@ size_t MergeTreeRangeReader::ReadResult::numZerosInTail(const UInt8 * begin, con
return count;
}
void MergeTreeRangeReader::ReadResult::addFilter(const ColumnPtr & new_filter)
void MergeTreeRangeReader::ReadResult::setFilter(const ColumnPtr & new_filter)
{
if (!new_filter && filter)
throw Exception("Can't replace existing filter with empty.", ErrorCodes::LOGICAL_ERROR);
if (filter)
{
if (!new_filter)
throw Exception("Can't add an empty filter to the existing one.", ErrorCodes::LOGICAL_ERROR);
size_t new_size = new_filter->size();
const auto new_size = new_filter->size();
if (new_size != total_rows_per_granule)
throw Exception("Can't add the new filter because it's size is " + toString(new_size) + " but "
throw Exception("Can't set filter because it's size is " + toString(new_size) + " but "
+ toString(total_rows_per_granule) + " rows was read.", ErrorCodes::LOGICAL_ERROR);
}
ConstantFilterDescription const_description(*new_filter);
if (const_description.always_true)
{
if (!filter)
setFilterConstTrue();
}
setFilterConstTrue();
else if (const_description.always_false)
{
clear();
}
else
{
FilterDescription description(*new_filter);
auto new_holder = (description.data_holder ? description.data_holder : new_filter);
const auto * new_holder_cast = typeid_cast<const ColumnUInt8 *>(new_holder.get());
if (!new_holder_cast)
throw Exception("addFilter function expected ColumnUInt8.", ErrorCodes::LOGICAL_ERROR);
if (filter)
{
MutableColumnPtr new_mutable_holder = IColumn::mutate(std::move(new_holder));
auto * new_mutable_holder_cast = typeid_cast<ColumnUInt8 *>(new_mutable_holder.get());
if (!new_mutable_holder_cast)
throw Exception("addFilter function expected ColumnUInt8.", ErrorCodes::LOGICAL_ERROR);
const auto & data = filter->getData();
const auto * it = data.begin();
auto & new_data = new_mutable_holder_cast->getData();
auto * n_it = new_data.begin();
while (it != data.end() && n_it != new_data.end())
{
*n_it = (*n_it && *it);
++it;
++n_it;
}
ConstantFilterDescription new_const_description(*new_mutable_holder);
if (new_const_description.always_true)
{
setFilterConstTrue();
}
else if (new_const_description.always_false)
{
clear();
}
else
{
filter_holder = std::move(new_mutable_holder);
filter = new_mutable_holder_cast;
}
}
else
{
filter_holder = std::move(new_holder);
filter = new_holder_cast;
}
FilterDescription filter_description(*new_filter);
filter_holder = filter_description.data_holder ? filter_description.data_holder : new_filter;
filter = typeid_cast<const ColumnUInt8 *>(filter_holder.get());
if (!filter)
throw Exception("setFilter function expected ColumnUInt8.", ErrorCodes::LOGICAL_ERROR);
}
}
@ -565,7 +537,10 @@ MergeTreeRangeReader::MergeTreeRangeReader(
prewhere_info->alias_actions->execute(sample_block, true);
if (prewhere_info->row_level_filter)
{
prewhere_info->row_level_filter->execute(sample_block, true);
sample_block.erase(prewhere_info->row_level_column_name);
}
if (prewhere_info->prewhere_actions)
prewhere_info->prewhere_actions->execute(sample_block, true);
@ -859,20 +834,76 @@ Columns MergeTreeRangeReader::continueReadingChain(ReadResult & result, size_t &
return columns;
}
static void checkCombindeFiltersSize(size_t bytes_in_first_filter, size_t second_filter_size)
{
if (bytes_in_first_filter != second_filter_size)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot combine filters because number of bytes in a first filter ({}) "
"does not match second filter size ({})", bytes_in_first_filter, second_filter_size);
}
static ColumnPtr combineFilters(ColumnPtr first, ColumnPtr second)
{
ConstantFilterDescription firsrt_const_descr(*first);
if (firsrt_const_descr.always_true)
{
checkCombindeFiltersSize(first->size(), second->size());
return second;
}
if (firsrt_const_descr.always_false)
{
checkCombindeFiltersSize(0, second->size());
return first;
}
auto mut_first = IColumn::mutate(std::move(first));
FilterDescription firsrt_descr(*mut_first);
size_t bytes_in_first_filter = countBytesInFilter(*firsrt_descr.data);
checkCombindeFiltersSize(bytes_in_first_filter, second->size());
ConstantFilterDescription second_const_descr(*second);
if (second_const_descr.always_true)
return mut_first;
if (second_const_descr.always_false)
return second->cloneResized(mut_first->size());
FilterDescription second_descr(*second);
auto & first_data = const_cast<IColumn::Filter &>(*firsrt_descr.data);
const auto * second_data = second_descr.data->data();
for (auto & val : first_data)
{
if (val)
{
val = *second_data;
++second_data;
}
}
return mut_first;
}
void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & result)
{
if (!prewhere_info)
return;
const auto & header = merge_tree_reader->getColumns();
const auto num_columns = header.size();
size_t num_columns = header.size();
if (result.columns.size() != num_columns)
throw Exception("Invalid number of columns passed to MergeTreeRangeReader. "
"Expected " + toString(num_columns) + ", "
"got " + toString(result.columns.size()), ErrorCodes::LOGICAL_ERROR);
size_t prewhere_column_pos = 0;
ColumnPtr filter;
ColumnPtr row_level_filter;
size_t prewhere_column_pos;
{
/// Restore block from columns list.
@ -900,44 +931,47 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
if (prewhere_info->row_level_filter)
{
prewhere_info->row_level_filter->execute(block);
auto row_level_filter_pos = block.getPositionByName(prewhere_info->row_level_column_name);
row_level_filter = block.getByPosition(row_level_filter_pos).column;
block.erase(row_level_filter_pos);
const auto filter_column_pos = block.getPositionByName(prewhere_info->row_level_column_name);
result.addFilter(block.getByPosition(filter_column_pos).column);
result.columns.clear();
result.columns.reserve(block.columns());
for (auto & col : block)
result.columns.emplace_back(std::move(col.column));
const auto * result_filter = result.getFilter();
filterColumns(result.columns, result_filter->getData());
auto it = block.begin();
for (auto & col : result.columns)
it++->column = std::move(col);
result.columns.clear();
result.clearFilter();
auto columns = block.getColumns();
filterColumns(columns, row_level_filter);
block.setColumns(columns);
}
prewhere_info->prewhere_actions->execute(block);
prewhere_column_pos = block.getPositionByName(prewhere_info->prewhere_column_name);
result.addFilter(block.getByPosition(prewhere_column_pos).column);
block.getByPosition(prewhere_column_pos).column.reset();
result.columns.clear();
result.columns.reserve(block.columns());
for (auto & col : block)
result.columns.emplace_back(std::move(col.column));
filter.swap(result.columns[prewhere_column_pos]);
}
if (result.getFilter())
{
/// TODO: implement for prewhere chain.
/// In order to do it we need combine filter and result.filter, where filter filters only '1' in result.filter.
throw Exception("MergeTreeRangeReader chain with several prewhere actions in not implemented.",
ErrorCodes::LOGICAL_ERROR);
}
if (filter && row_level_filter)
{
row_level_filter = combineFilters(std::move(row_level_filter), filter);
result.setFilter(row_level_filter);
}
else
result.setFilter(filter);
/// If there is a WHERE, we filter in there, and only optimize IO and shrink columns here
if (!last_reader_in_chain)
result.optimize(merge_tree_reader->canReadIncompleteGranules());
result.optimize(merge_tree_reader->canReadIncompleteGranules(), prewhere_info->row_level_filter == nullptr);
/// If we read nothing or filter gets optimized to nothing
if (result.totalRowsPerGranule() == 0)
@ -962,7 +996,12 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
{
/// filter might be shrunk while columns not
const auto * result_filter = result.getFilterOriginal();
filterColumns(result.columns, result_filter->getData());
if (row_level_filter)
filterColumns(result.columns, filter);
else
filterColumns(result.columns, result_filter->getData());
result.need_filter = true;
bool has_column = false;

View File

@ -153,10 +153,10 @@ public:
void addRows(size_t rows) { num_read_rows += rows; }
void addRange(const MarkRange & range) { started_ranges.push_back({rows_per_granule.size(), range}); }
/// Apply a filter on top of the existing one (AND'ed) or set it if there isn't any.
void addFilter(const ColumnPtr & new_filter);
/// Set filter or replace old one. Filter must have more zeroes than previous.
void setFilter(const ColumnPtr & new_filter);
/// For each granule calculate the number of filtered rows at the end. Remove them and update filter.
void optimize(bool can_read_incomplete_granules);
void optimize(bool can_read_incomplete_granules, bool allow_filter_columns);
/// Remove all rows from granules.
void clear();