Update SelectStreamFactory.

This commit is contained in:
Nikolai Kochetov 2020-02-03 21:14:26 +03:00
parent bc757f6b24
commit ed71fb347e
3 changed files with 16 additions and 11 deletions

View File

@ -75,7 +75,13 @@ Pipe createLocalStream(const ASTPtr & query_ast, const Block & header, const Con
checkStackSize();
InterpreterSelectQuery interpreter{query_ast, context, SelectQueryOptions(processed_stage)};
Pipe pipe = interpreter.executeWithProcessors().getPipe();
auto pipeline = interpreter.executeWithProcessors();
pipeline.addSimpleTransform([&](const Block & source_header)
{
return std::make_shared<ConvertingTransform>(
source_header, header, ConvertingTransform::MatchColumnsMode::Name, context);
});
/** Materialization is needed, since from remote servers the constants come materialized.
* If you do not do this, different types (Const and non-Const) columns will be produced in different threads,
@ -87,12 +93,7 @@ Pipe createLocalStream(const ASTPtr & query_ast, const Block & header, const Con
*/
/// return std::make_shared<MaterializingBlockInputStream>(stream);
auto converting = std::make_shared<ConvertingTransform>(
pipe.getHeader(), header,ConvertingTransform::MatchColumnsMode::Name, context);
pipe.addSimpleTransform(std::move(converting));
return pipe;
return interpreter.executeWithProcessors().getPipe();;
}
static String formattedAST(const ASTPtr & ast)
@ -112,6 +113,8 @@ void SelectStreamFactory::createForShard(
const Context & context, const ThrottlerPtr & throttler,
Pipes & res)
{
bool force_add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
auto modified_query_ast = query_ast->clone();
if (has_virtual_shard_num_column)
VirtualColumnUtils::rewriteEntityInAst(modified_query_ast, "_shard_num", shard_info.shard_num, "toUInt32");
@ -131,7 +134,7 @@ void SelectStreamFactory::createForShard(
if (!table_func_ptr)
stream->setMainTable(main_table);
auto source = std::make_shared<SourceFromInputStream>(std::move(stream));
auto source = std::make_shared<SourceFromInputStream>(std::move(stream), force_add_agg_info);
source->addTotalsPort();
res.emplace_back(std::move(source));
@ -275,7 +278,7 @@ void SelectStreamFactory::createForShard(
}
};
auto source = std::make_shared<SourceFromInputStream>("LazyShardWithLocalReplica", header, lazily_create_stream);
auto source = std::make_shared<SourceFromInputStream>("LazyShardWithLocalReplica", header, lazily_create_stream, force_add_agg_info);
source->addTotalsPort();
res.emplace_back(std::move(source));

View File

@ -14,8 +14,10 @@ SourceFromInputStream::SourceFromInputStream(BlockInputStreamPtr stream_, bool f
init();
}
SourceFromInputStream::SourceFromInputStream(String name, Block header, std::function<BlockInputStreamPtr()> stream_builder_)
SourceFromInputStream::SourceFromInputStream(
String name, Block header, std::function<BlockInputStreamPtr()> stream_builder_, bool force_add_aggregating_info_)
: ISourceWithProgress(std::move(header))
, force_add_aggregating_info(force_add_aggregating_info_)
, stream_builder(std::move(stream_builder_))
, source_name(std::move(name))
{

View File

@ -13,7 +13,7 @@ class SourceFromInputStream : public ISourceWithProgress
public:
explicit SourceFromInputStream(BlockInputStreamPtr stream_, bool force_add_aggregating_info_ = false);
/// Constructor which works like LazyBlockInputStream. First 'generate' method creates stream using callback.
SourceFromInputStream(String name, Block header, std::function<BlockInputStreamPtr()> stream_builder_);
SourceFromInputStream(String name, Block header, std::function<BlockInputStreamPtr()> stream_builder_, bool force_add_aggregating_info_ = false);
String getName() const override { return source_name.empty() ? "SourceFromInputStream" : source_name; }
Status prepare() override;