diff --git a/dbms/src/Interpreters/MutationsInterpreter.cpp b/dbms/src/Interpreters/MutationsInterpreter.cpp index 7eb4562961e..bf822312821 100644 --- a/dbms/src/Interpreters/MutationsInterpreter.cpp +++ b/dbms/src/Interpreters/MutationsInterpreter.cpp @@ -305,10 +305,9 @@ void MutationsInterpreter::prepare(bool dry_run) stages_copy.back().output_columns = stage.output_columns; stages_copy.back().filters = stage.filters; } - auto first_stage_header = prepareInterpreterSelect(/* dry_run = */ true)->getSampleBlock(); + auto first_stage_header = prepareInterpreterSelect(stages_copy, /* dry_run = */ true)->getSampleBlock(); auto in = std::make_shared(first_stage_header); - updated_header = std::make_unique(addStreamsForLaterStages(in)->getHeader()); - std::swap(stages, stages_copy); + updated_header = std::make_unique(addStreamsForLaterStages(stages_copy, in)->getHeader()); } /// Special step to recalculate affected indices. stages.emplace_back(context); @@ -317,40 +316,40 @@ void MutationsInterpreter::prepare(bool dry_run) column, std::make_shared(column)); } - interpreter_select = prepareInterpreterSelect(dry_run); + interpreter_select = prepareInterpreterSelect(stages, dry_run); is_prepared = true; } -std::unique_ptr MutationsInterpreter::prepareInterpreterSelect(bool dry_run) +std::unique_ptr MutationsInterpreter::prepareInterpreterSelect(std::vector & prepared_stages, bool dry_run) { NamesAndTypesList all_columns = storage->getColumns().getAllPhysical(); /// Next, for each stage calculate columns changed by this and previous stages. - for (size_t i = 0; i < stages.size(); ++i) + for (size_t i = 0; i < prepared_stages.size(); ++i) { - if (!stages[i].filters.empty()) + if (!prepared_stages[i].filters.empty()) { for (const auto & column : all_columns) - stages[i].output_columns.insert(column.name); + prepared_stages[i].output_columns.insert(column.name); continue; } if (i > 0) - stages[i].output_columns = stages[i - 1].output_columns; + prepared_stages[i].output_columns = prepared_stages[i - 1].output_columns; - if (stages[i].output_columns.size() < all_columns.size()) + if (prepared_stages[i].output_columns.size() < all_columns.size()) { - for (const auto & kv : stages[i].column_to_updated) - stages[i].output_columns.insert(kv.first); + for (const auto & kv : prepared_stages[i].column_to_updated) + prepared_stages[i].output_columns.insert(kv.first); } } /// Now, calculate `expressions_chain` for each stage except the first. /// Do it backwards to propagate information about columns required as input for a stage to the previous stage. - for (size_t i = stages.size() - 1; i > 0; --i) + for (size_t i = prepared_stages.size() - 1; i > 0; --i) { - auto & stage = stages[i]; + auto & stage = prepared_stages[i]; ASTPtr all_asts = std::make_shared(); @@ -400,7 +399,7 @@ std::unique_ptr MutationsInterpreter::prepareInterpreter /// Propagate information about columns needed as input. for (const auto & column : actions_chain.steps.front().actions->getRequiredColumnsWithTypes()) - stages[i - 1].output_columns.insert(column.name); + prepared_stages[i - 1].output_columns.insert(column.name); } /// Execute first stage as a SELECT statement. @@ -408,21 +407,21 @@ std::unique_ptr MutationsInterpreter::prepareInterpreter auto select = std::make_shared(); select->setExpression(ASTSelectQuery::Expression::SELECT, std::make_shared()); - for (const auto & column_name : stages[0].output_columns) + for (const auto & column_name : prepared_stages[0].output_columns) select->select()->children.push_back(std::make_shared(column_name)); - if (!stages[0].filters.empty()) + if (!prepared_stages[0].filters.empty()) { ASTPtr where_expression; - if (stages[0].filters.size() == 1) - where_expression = stages[0].filters[0]; + if (prepared_stages[0].filters.size() == 1) + where_expression = prepared_stages[0].filters[0]; else { auto coalesced_predicates = std::make_shared(); coalesced_predicates->name = "and"; coalesced_predicates->arguments = std::make_shared(); coalesced_predicates->children.push_back(coalesced_predicates->arguments); - coalesced_predicates->arguments->children = stages[0].filters; + coalesced_predicates->arguments->children = prepared_stages[0].filters; where_expression = std::move(coalesced_predicates); } select->setExpression(ASTSelectQuery::Expression::WHERE, std::move(where_expression)); @@ -431,11 +430,11 @@ std::unique_ptr MutationsInterpreter::prepareInterpreter return std::make_unique(select, context, storage, SelectQueryOptions().analyze(dry_run).ignoreLimits()); } -BlockInputStreamPtr MutationsInterpreter::addStreamsForLaterStages(BlockInputStreamPtr in) const +BlockInputStreamPtr MutationsInterpreter::addStreamsForLaterStages(const std::vector & prepared_stages, BlockInputStreamPtr in) const { - for (size_t i_stage = 1; i_stage < stages.size(); ++i_stage) + for (size_t i_stage = 1; i_stage < prepared_stages.size(); ++i_stage) { - const Stage & stage = stages[i_stage]; + const Stage & stage = prepared_stages[i_stage]; for (size_t i = 0; i < stage.expressions_chain.steps.size(); ++i) { @@ -467,14 +466,14 @@ void MutationsInterpreter::validate() prepare(/* dry_run = */ true); Block first_stage_header = interpreter_select->getSampleBlock(); BlockInputStreamPtr in = std::make_shared(first_stage_header); - addStreamsForLaterStages(in)->getHeader(); + addStreamsForLaterStages(stages, in)->getHeader(); } BlockInputStreamPtr MutationsInterpreter::execute() { prepare(/* dry_run = */ false); BlockInputStreamPtr in = interpreter_select->execute().in; - auto result_stream = addStreamsForLaterStages(in); + auto result_stream = addStreamsForLaterStages(stages, in); if (!updated_header) updated_header = std::make_unique(result_stream->getHeader()); return result_stream; diff --git a/dbms/src/Interpreters/MutationsInterpreter.h b/dbms/src/Interpreters/MutationsInterpreter.h index 9880b07e686..268e5f4b081 100644 --- a/dbms/src/Interpreters/MutationsInterpreter.h +++ b/dbms/src/Interpreters/MutationsInterpreter.h @@ -41,8 +41,8 @@ private: struct Stage; - std::unique_ptr prepareInterpreterSelect(bool dry_run); - BlockInputStreamPtr addStreamsForLaterStages(BlockInputStreamPtr in) const; + std::unique_ptr prepareInterpreterSelect(std::vector & prepared_stages, bool dry_run); + BlockInputStreamPtr addStreamsForLaterStages(const std::vector & prepared_stages, BlockInputStreamPtr in) const; private: StoragePtr storage;