Update SelectStreamFactory.

This commit is contained in:
Nikolai Kochetov 2020-02-14 12:36:00 +03:00
parent bb6d6a37b3
commit 4cf65451de

View File

@ -114,6 +114,7 @@ void SelectStreamFactory::createForShard(
Pipes & res) Pipes & res)
{ {
bool force_add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState; bool force_add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
bool add_totals_port = processed_stage == QueryProcessingStage::Complete;
auto modified_query_ast = query_ast->clone(); auto modified_query_ast = query_ast->clone();
if (has_virtual_shard_num_column) if (has_virtual_shard_num_column)
@ -135,7 +136,9 @@ void SelectStreamFactory::createForShard(
stream->setMainTable(main_table); stream->setMainTable(main_table);
auto source = std::make_shared<SourceFromInputStream>(std::move(stream), force_add_agg_info); auto source = std::make_shared<SourceFromInputStream>(std::move(stream), force_add_agg_info);
source->addTotalsPort();
if (add_totals_port)
source->addTotalsPort();
res.emplace_back(std::move(source)); res.emplace_back(std::move(source));
}; };
@ -279,7 +282,9 @@ void SelectStreamFactory::createForShard(
}; };
auto source = std::make_shared<SourceFromInputStream>("LazyShardWithLocalReplica", header, lazily_create_stream, force_add_agg_info); auto source = std::make_shared<SourceFromInputStream>("LazyShardWithLocalReplica", header, lazily_create_stream, force_add_agg_info);
source->addTotalsPort();
if (add_totals_port)
source->addTotalsPort();
res.emplace_back(std::move(source)); res.emplace_back(std::move(source));
} }