refactoring

This commit is contained in:
Nikita Vasilev 2019-08-05 21:22:44 +03:00
parent 7b8c0f1750
commit 4e8ab12816
2 changed files with 26 additions and 27 deletions

View File

@ -305,10 +305,9 @@ void MutationsInterpreter::prepare(bool dry_run)
stages_copy.back().output_columns = stage.output_columns;
stages_copy.back().filters = stage.filters;
}
auto first_stage_header = prepareInterpreterSelect(/* dry_run = */ true)->getSampleBlock();
auto first_stage_header = prepareInterpreterSelect(stages_copy, /* dry_run = */ true)->getSampleBlock();
auto in = std::make_shared<NullBlockInputStream>(first_stage_header);
updated_header = std::make_unique<Block>(addStreamsForLaterStages(in)->getHeader());
std::swap(stages, stages_copy);
updated_header = std::make_unique<Block>(addStreamsForLaterStages(stages_copy, in)->getHeader());
}
/// Special step to recalculate affected indices.
stages.emplace_back(context);
@ -317,40 +316,40 @@ void MutationsInterpreter::prepare(bool dry_run)
column, std::make_shared<ASTIdentifier>(column));
}
interpreter_select = prepareInterpreterSelect(dry_run);
interpreter_select = prepareInterpreterSelect(stages, dry_run);
is_prepared = true;
}
std::unique_ptr<InterpreterSelectQuery> MutationsInterpreter::prepareInterpreterSelect(bool dry_run)
std::unique_ptr<InterpreterSelectQuery> MutationsInterpreter::prepareInterpreterSelect(std::vector<Stage> & prepared_stages, bool dry_run)
{
NamesAndTypesList all_columns = storage->getColumns().getAllPhysical();
/// Next, for each stage calculate columns changed by this and previous stages.
for (size_t i = 0; i < stages.size(); ++i)
for (size_t i = 0; i < prepared_stages.size(); ++i)
{
if (!stages[i].filters.empty())
if (!prepared_stages[i].filters.empty())
{
for (const auto & column : all_columns)
stages[i].output_columns.insert(column.name);
prepared_stages[i].output_columns.insert(column.name);
continue;
}
if (i > 0)
stages[i].output_columns = stages[i - 1].output_columns;
prepared_stages[i].output_columns = prepared_stages[i - 1].output_columns;
if (stages[i].output_columns.size() < all_columns.size())
if (prepared_stages[i].output_columns.size() < all_columns.size())
{
for (const auto & kv : stages[i].column_to_updated)
stages[i].output_columns.insert(kv.first);
for (const auto & kv : prepared_stages[i].column_to_updated)
prepared_stages[i].output_columns.insert(kv.first);
}
}
/// Now, calculate `expressions_chain` for each stage except the first.
/// Do it backwards to propagate information about columns required as input for a stage to the previous stage.
for (size_t i = stages.size() - 1; i > 0; --i)
for (size_t i = prepared_stages.size() - 1; i > 0; --i)
{
auto & stage = stages[i];
auto & stage = prepared_stages[i];
ASTPtr all_asts = std::make_shared<ASTExpressionList>();
@ -400,7 +399,7 @@ std::unique_ptr<InterpreterSelectQuery> MutationsInterpreter::prepareInterpreter
/// Propagate information about columns needed as input.
for (const auto & column : actions_chain.steps.front().actions->getRequiredColumnsWithTypes())
stages[i - 1].output_columns.insert(column.name);
prepared_stages[i - 1].output_columns.insert(column.name);
}
/// Execute first stage as a SELECT statement.
@ -408,21 +407,21 @@ std::unique_ptr<InterpreterSelectQuery> MutationsInterpreter::prepareInterpreter
auto select = std::make_shared<ASTSelectQuery>();
select->setExpression(ASTSelectQuery::Expression::SELECT, std::make_shared<ASTExpressionList>());
for (const auto & column_name : stages[0].output_columns)
for (const auto & column_name : prepared_stages[0].output_columns)
select->select()->children.push_back(std::make_shared<ASTIdentifier>(column_name));
if (!stages[0].filters.empty())
if (!prepared_stages[0].filters.empty())
{
ASTPtr where_expression;
if (stages[0].filters.size() == 1)
where_expression = stages[0].filters[0];
if (prepared_stages[0].filters.size() == 1)
where_expression = prepared_stages[0].filters[0];
else
{
auto coalesced_predicates = std::make_shared<ASTFunction>();
coalesced_predicates->name = "and";
coalesced_predicates->arguments = std::make_shared<ASTExpressionList>();
coalesced_predicates->children.push_back(coalesced_predicates->arguments);
coalesced_predicates->arguments->children = stages[0].filters;
coalesced_predicates->arguments->children = prepared_stages[0].filters;
where_expression = std::move(coalesced_predicates);
}
select->setExpression(ASTSelectQuery::Expression::WHERE, std::move(where_expression));
@ -431,11 +430,11 @@ std::unique_ptr<InterpreterSelectQuery> MutationsInterpreter::prepareInterpreter
return std::make_unique<InterpreterSelectQuery>(select, context, storage, SelectQueryOptions().analyze(dry_run).ignoreLimits());
}
BlockInputStreamPtr MutationsInterpreter::addStreamsForLaterStages(BlockInputStreamPtr in) const
BlockInputStreamPtr MutationsInterpreter::addStreamsForLaterStages(const std::vector<Stage> & prepared_stages, BlockInputStreamPtr in) const
{
for (size_t i_stage = 1; i_stage < stages.size(); ++i_stage)
for (size_t i_stage = 1; i_stage < prepared_stages.size(); ++i_stage)
{
const Stage & stage = stages[i_stage];
const Stage & stage = prepared_stages[i_stage];
for (size_t i = 0; i < stage.expressions_chain.steps.size(); ++i)
{
@ -467,14 +466,14 @@ void MutationsInterpreter::validate()
prepare(/* dry_run = */ true);
Block first_stage_header = interpreter_select->getSampleBlock();
BlockInputStreamPtr in = std::make_shared<NullBlockInputStream>(first_stage_header);
addStreamsForLaterStages(in)->getHeader();
addStreamsForLaterStages(stages, in)->getHeader();
}
BlockInputStreamPtr MutationsInterpreter::execute()
{
prepare(/* dry_run = */ false);
BlockInputStreamPtr in = interpreter_select->execute().in;
auto result_stream = addStreamsForLaterStages(in);
auto result_stream = addStreamsForLaterStages(stages, in);
if (!updated_header)
updated_header = std::make_unique<Block>(result_stream->getHeader());
return result_stream;

View File

@ -41,8 +41,8 @@ private:
struct Stage;
std::unique_ptr<InterpreterSelectQuery> prepareInterpreterSelect(bool dry_run);
BlockInputStreamPtr addStreamsForLaterStages(BlockInputStreamPtr in) const;
std::unique_ptr<InterpreterSelectQuery> prepareInterpreterSelect(std::vector<Stage> & prepared_stages, bool dry_run);
BlockInputStreamPtr addStreamsForLaterStages(const std::vector<Stage> & prepared_stages, BlockInputStreamPtr in) const;
private:
StoragePtr storage;