Review fixes.

This commit is contained in:
Nikolai Kochetov 2020-08-19 22:33:49 +03:00
parent 8e631a98ea
commit 5cd4312529
4 changed files with 104 additions and 118 deletions

View File

@ -1153,22 +1153,17 @@ ExpressionActionsPtr ExpressionActions::splitActionsBeforeArrayJoin(const NameSe
new_actions.emplace_back(action);
}
else
else if (action.type == ExpressionAction::REMOVE_COLUMN)
{
/// Exception for REMOVE_COLUMN.
/// We cannot move it to split_actions if any argument from `this` needed that column.
if (action.type == ExpressionAction::REMOVE_COLUMN)
{
if (array_join_dependent_columns_arguments.count(action.source_name))
new_actions.emplace_back(action);
else
split_actions->add(action);
continue;
}
split_actions->add(action);
if (array_join_dependent_columns_arguments.count(action.source_name))
new_actions.emplace_back(action);
else
split_actions->add(action);
}
else
split_actions->add(action);
}
/// Return empty actions if nothing was separated. Keep `this` unchanged.
@ -1365,8 +1360,8 @@ void ExpressionActionsChain::addStep()
if (steps.empty())
throw Exception("Cannot add action to empty ExpressionActionsChain", ErrorCodes::LOGICAL_ERROR);
ColumnsWithTypeAndName columns = steps.back().getResultColumns();
steps.push_back(Step(std::make_shared<ExpressionActions>(columns, context)));
ColumnsWithTypeAndName columns = steps.back()->getResultColumns();
steps.push_back(std::make_unique<ExpressionActionsStep>(std::make_shared<ExpressionActions>(columns, context)));
}
void ExpressionActionsChain::finalize()
@ -1374,16 +1369,16 @@ void ExpressionActionsChain::finalize()
/// Finalize all steps. Right to left to define unnecessary input columns.
for (int i = static_cast<int>(steps.size()) - 1; i >= 0; --i)
{
Names required_output = steps[i].required_output;
Names required_output = steps[i]->required_output;
std::unordered_map<String, size_t> required_output_indexes;
for (size_t j = 0; j < required_output.size(); ++j)
required_output_indexes[required_output[j]] = j;
auto & can_remove_required_output = steps[i].can_remove_required_output;
auto & can_remove_required_output = steps[i]->can_remove_required_output;
if (i + 1 < static_cast<int>(steps.size()))
{
const NameSet & additional_input = steps[i + 1].additional_input;
for (const auto & it : steps[i + 1].getRequiredColumns())
const NameSet & additional_input = steps[i + 1]->additional_input;
for (const auto & it : steps[i + 1]->getRequiredColumns())
{
if (additional_input.count(it.name) == 0)
{
@ -1395,19 +1390,19 @@ void ExpressionActionsChain::finalize()
}
}
}
steps[i].finalize(required_output);
steps[i]->finalize(required_output);
}
/// Adding the ejection of unnecessary columns to the beginning of each step.
for (size_t i = 1; i < steps.size(); ++i)
{
size_t columns_from_previous = steps[i - 1].getResultColumns().size();
size_t columns_from_previous = steps[i - 1]->getResultColumns().size();
/// If unnecessary columns are formed at the output of the previous step, we'll add them to the beginning of this step.
/// Except when we drop all the columns and lose the number of rows in the block.
if (!steps[i].getResultColumns().empty()
&& columns_from_previous > steps[i].getRequiredColumns().size())
steps[i].prependProjectInput();
if (!steps[i]->getResultColumns().empty()
&& columns_from_previous > steps[i]->getRequiredColumns().size())
steps[i]->prependProjectInput();
}
}
@ -1419,16 +1414,17 @@ std::string ExpressionActionsChain::dumpChain() const
{
ss << "step " << i << "\n";
ss << "required output:\n";
for (const std::string & name : steps[i].required_output)
for (const std::string & name : steps[i]->required_output)
ss << name << "\n";
ss << "\n" << steps[i].dump() << "\n";
ss << "\n" << steps[i]->dump() << "\n";
}
return ss.str();
}
ExpressionActionsChain::ArrayJoinLink::ArrayJoinLink(ArrayJoinActionPtr array_join_, ColumnsWithTypeAndName required_columns_)
: array_join(std::move(array_join_))
ExpressionActionsChain::ArrayJoinStep::ArrayJoinStep(ArrayJoinActionPtr array_join_, ColumnsWithTypeAndName required_columns_, Names required_outputs_)
: Step(std::move(required_outputs_))
, array_join(std::move(array_join_))
, result_columns(std::move(required_columns_))
{
for (auto & column : result_columns)
@ -1445,7 +1441,7 @@ ExpressionActionsChain::ArrayJoinLink::ArrayJoinLink(ArrayJoinActionPtr array_jo
}
}
void ExpressionActionsChain::ArrayJoinLink::finalize(const Names & required_output_)
void ExpressionActionsChain::ArrayJoinStep::finalize(const Names & required_output_)
{
NamesAndTypesList new_required_columns;
ColumnsWithTypeAndName new_result_columns;
@ -1466,47 +1462,14 @@ void ExpressionActionsChain::ArrayJoinLink::finalize(const Names & required_outp
std::swap(result_columns, new_result_columns);
}
ExpressionActionsChain::Step::Step(ArrayJoinActionPtr array_join, ColumnsWithTypeAndName required_columns)
: link(ArrayJoinLink(std::move(array_join), std::move(required_columns)))
ExpressionActionsPtr & ExpressionActionsChain::Step::actions()
{
return typeid_cast<ExpressionActionsStep *>(this)->actions;
}
template <typename Res, typename Ptr, typename Callback>
static Res dispatch(Ptr * ptr, Callback && callback)
const ExpressionActionsPtr & ExpressionActionsChain::Step::actions() const
{
if (std::holds_alternative<ExpressionActionsChain::ExpressionActionsLink>(ptr->link))
return callback(std::get<ExpressionActionsChain::ExpressionActionsLink>(ptr->link));
if (std::holds_alternative<ExpressionActionsChain::ArrayJoinLink>(ptr->link))
return callback(std::get<ExpressionActionsChain::ArrayJoinLink>(ptr->link));
throw Exception("Unknown variant in ExpressionActionsChain step", ErrorCodes::LOGICAL_ERROR);
}
const NamesAndTypesList & ExpressionActionsChain::Step::getRequiredColumns() const
{
using Res = const NamesAndTypesList &;
return dispatch<Res>(this, [](auto & x) -> Res { return x.getRequiredColumns(); });
}
const ColumnsWithTypeAndName & ExpressionActionsChain::Step::getResultColumns() const
{
using Res = const ColumnsWithTypeAndName &;
return dispatch<Res>(this, [](auto & x) -> Res{ return x.getResultColumns(); });
}
void ExpressionActionsChain::Step::finalize(const Names & required_output_)
{
dispatch<void>(this, [&required_output_](auto & x) { x.finalize(required_output_); });
}
void ExpressionActionsChain::Step::prependProjectInput() const
{
dispatch<void>(this, [](auto & x) { x.prependProjectInput(); });
}
std::string ExpressionActionsChain::Step::dump() const
{
return dispatch<std::string>(this, [](auto & x) { return x.dump(); });
return typeid_cast<const ExpressionActionsStep *>(this)->actions;
}
}

View File

@ -287,35 +287,11 @@ struct ExpressionActionsChain
{
explicit ExpressionActionsChain(const Context & context_) : context(context_) {}
struct ExpressionActionsLink
{
ExpressionActionsPtr actions;
const NamesAndTypesList & getRequiredColumns() const { return actions->getRequiredColumnsWithTypes(); }
const ColumnsWithTypeAndName & getResultColumns() const { return actions->getSampleBlock().getColumnsWithTypeAndName(); }
void finalize(const Names & required_output_) const { actions->finalize(required_output_); }
void prependProjectInput() const { actions->prependProjectInput(); }
std::string dump() const { return actions->dumpActions(); }
};
struct ArrayJoinLink
{
ArrayJoinActionPtr array_join;
NamesAndTypesList required_columns;
ColumnsWithTypeAndName result_columns;
ArrayJoinLink(ArrayJoinActionPtr array_join_, ColumnsWithTypeAndName required_columns_);
const NamesAndTypesList & getRequiredColumns() const { return required_columns; }
const ColumnsWithTypeAndName & getResultColumns() const { return result_columns; }
void finalize(const Names & required_output_);
void prependProjectInput() const {} /// TODO: remove unused columns before ARRAY JOIN ?
static std::string dump() { return "ARRAY JOIN"; }
};
struct Step
{
std::variant<ExpressionActionsLink, ArrayJoinLink> link;
virtual ~Step() = default;
explicit Step(Names required_output_) : required_output(std::move(required_output_)) {}
/// Columns were added to the block before current step in addition to prev step output.
NameSet additional_input;
@ -326,28 +302,72 @@ struct ExpressionActionsChain
/// If not empty, has the same size with required_output; is filled in finalize().
std::vector<bool> can_remove_required_output;
public:
explicit Step(ExpressionActionsPtr actions, const Names & required_output_ = Names())
: link(ExpressionActionsLink{std::move(actions)})
, required_output(required_output_)
virtual const NamesAndTypesList & getRequiredColumns() const = 0;
virtual const ColumnsWithTypeAndName & getResultColumns() const = 0;
/// Remove unused result and update required columns
virtual void finalize(const Names & required_output_) = 0;
/// Add projections to expression
virtual void prependProjectInput() const = 0;
virtual std::string dump() const = 0;
/// Only for ExpressionActionsStep
ExpressionActionsPtr & actions();
const ExpressionActionsPtr & actions() const;
};
struct ExpressionActionsStep : public Step
{
ExpressionActionsPtr actions;
explicit ExpressionActionsStep(ExpressionActionsPtr actions_, Names required_output_ = Names())
: Step(std::move(required_output_))
, actions(std::move(actions_))
{
}
explicit Step(ArrayJoinActionPtr array_join, ColumnsWithTypeAndName required_columns);
const NamesAndTypesList & getRequiredColumns() const override
{
return actions->getRequiredColumnsWithTypes();
}
const NamesAndTypesList & getRequiredColumns() const;
const ColumnsWithTypeAndName & getResultColumns() const;
/// Remove unused result and update required columns
void finalize(const Names & required_output_);
/// Add projections to expression
void prependProjectInput() const;
std::string dump() const;
const ColumnsWithTypeAndName & getResultColumns() const override
{
return actions->getSampleBlock().getColumnsWithTypeAndName();
}
ExpressionActionsPtr & actions() { return std::get<ExpressionActionsLink>(link).actions; }
const ExpressionActionsPtr & actions() const { return std::get<ExpressionActionsLink>(link).actions; }
void finalize(const Names & required_output_) override
{
actions->finalize(required_output_);
}
void prependProjectInput() const override
{
actions->prependProjectInput();
}
std::string dump() const override
{
return actions->dumpActions();
}
};
using Steps = std::vector<Step>;
struct ArrayJoinStep : public Step
{
ArrayJoinActionPtr array_join;
NamesAndTypesList required_columns;
ColumnsWithTypeAndName result_columns;
ArrayJoinStep(ArrayJoinActionPtr array_join_, ColumnsWithTypeAndName required_columns_, Names required_output_);
const NamesAndTypesList & getRequiredColumns() const override { return required_columns; }
const ColumnsWithTypeAndName & getResultColumns() const override { return result_columns; }
void finalize(const Names & required_output_) override;
void prependProjectInput() const override {} /// TODO: remove unused columns before ARRAY JOIN ?
std::string dump() const override { return "ARRAY JOIN"; }
};
using StepPtr = std::unique_ptr<Step>;
using Steps = std::vector<StepPtr>;
const Context & context;
Steps steps;

View File

@ -473,7 +473,9 @@ ArrayJoinActionPtr SelectQueryExpressionAnalyzer::appendArrayJoin(ExpressionActi
before_array_join = chain.getLastActions();
auto array_join = addMultipleArrayJoinAction(step.actions(), is_array_join_left);
chain.steps.push_back(ExpressionActionsChain::Step(array_join, step.getResultColumns()));
chain.steps.push_back(std::make_unique<ExpressionActionsChain::ArrayJoinStep>(
array_join, step.getResultColumns(),
Names())); /// Required output is empty because all array joined columns are kept by step.
chain.addStep();
@ -685,8 +687,9 @@ bool SelectQueryExpressionAnalyzer::appendPrewhere(
}
}
chain.steps.emplace_back(std::make_shared<ExpressionActions>(std::move(columns), context));
chain.steps.back().additional_input = std::move(unused_source_columns);
chain.steps.emplace_back(std::make_unique<ExpressionActionsChain::ExpressionActionsStep>(
std::make_shared<ExpressionActions>(std::move(columns), context)));
chain.steps.back()->additional_input = std::move(unused_source_columns);
}
return true;
@ -697,7 +700,7 @@ void SelectQueryExpressionAnalyzer::appendPreliminaryFilter(ExpressionActionsCha
ExpressionActionsChain::Step & step = chain.lastStep(sourceColumns());
// FIXME: assert(filter_info);
step = ExpressionActionsChain::Step(std::move(actions));
step.actions() = std::move(actions);
step.required_output.push_back(std::move(column_name));
step.can_remove_required_output = {true};
@ -1065,7 +1068,7 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
if (query_analyzer.appendPrewhere(chain, !first_stage, additional_required_columns_after_prewhere))
{
prewhere_info = std::make_shared<PrewhereInfo>(
chain.steps.front().actions(), query.prewhere()->getColumnName());
chain.steps.front()->actions(), query.prewhere()->getColumnName());
if (allowEarlyConstantFolding(*prewhere_info->prewhere_actions, settings))
{
@ -1108,7 +1111,7 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
{
Block before_where_sample;
if (chain.steps.size() > 1)
before_where_sample = Block(chain.steps[chain.steps.size() - 2].getResultColumns());
before_where_sample = Block(chain.steps[chain.steps.size() - 2]->getResultColumns());
else
before_where_sample = source_header;
if (sanitizeBlock(before_where_sample))
@ -1189,7 +1192,7 @@ void ExpressionAnalysisResult::finalize(const ExpressionActionsChain & chain, co
{
if (hasPrewhere())
{
const ExpressionActionsChain::Step & step = chain.steps.at(0);
const ExpressionActionsChain::Step & step = *chain.steps.at(0);
prewhere_info->remove_prewhere_column = step.can_remove_required_output.at(0);
Names columns_to_remove;
@ -1214,10 +1217,10 @@ void ExpressionAnalysisResult::finalize(const ExpressionActionsChain & chain, co
else if (hasFilter())
{
/// Can't have prewhere and filter set simultaneously
filter_info->do_remove_column = chain.steps.at(0).can_remove_required_output.at(0);
filter_info->do_remove_column = chain.steps.at(0)->can_remove_required_output.at(0);
}
if (hasWhere())
remove_where_filter = chain.steps.at(where_step_num).can_remove_required_output.at(0);
remove_where_filter = chain.steps.at(where_step_num)->can_remove_required_output.at(0);
}
void ExpressionAnalysisResult::removeExtraColumns() const

View File

@ -623,7 +623,7 @@ ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector<Stage> &
actions_chain.finalize();
/// Propagate information about columns needed as input.
for (const auto & column : actions_chain.steps.front().actions()->getRequiredColumnsWithTypes())
for (const auto & column : actions_chain.steps.front()->actions()->getRequiredColumnsWithTypes())
prepared_stages[i - 1].output_columns.insert(column.name);
}
@ -667,12 +667,12 @@ BlockInputStreamPtr MutationsInterpreter::addStreamsForLaterStages(const std::ve
if (i < stage.filter_column_names.size())
{
/// Execute DELETEs.
in = std::make_shared<FilterBlockInputStream>(in, step.actions(), stage.filter_column_names[i]);
in = std::make_shared<FilterBlockInputStream>(in, step->actions(), stage.filter_column_names[i]);
}
else
{
/// Execute UPDATE or final projection.
in = std::make_shared<ExpressionBlockInputStream>(in, step.actions());
in = std::make_shared<ExpressionBlockInputStream>(in, step->actions());
}
}