Fix build.

This commit is contained in:
Nikolai Kochetov 2020-06-18 21:40:02 +03:00
parent a99d9f7712
commit 7bfb5c9a47

View File

@ -3,18 +3,15 @@
#include <DataStreams/AddingDefaultBlockOutputStream.h>
#include <DataStreams/AddingDefaultsBlockInputStream.h>
#include <DataStreams/CheckConstraintsBlockOutputStream.h>
#include <DataStreams/ConvertingBlockInputStream.h>
#include <DataStreams/CountingBlockOutputStream.h>
#include <DataStreams/InputStreamFromASTInsertQuery.h>
#include <DataStreams/NullAndDoCopyBlockInputStream.h>
#include <DataStreams/NullBlockOutputStream.h>
#include <DataStreams/OwningBlockInputStream.h>
#include <DataStreams/PushingToViewsBlockOutputStream.h>
#include <DataStreams/RemoteBlockInputStream.h>
#include <DataStreams/SquashingBlockOutputStream.h>
#include <DataStreams/copyData.h>
#include <IO/ConcatReadBuffer.h>
#include <IO/ReadBufferFromMemory.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/InterpreterWatchQuery.h>
#include <Access/AccessFlags.h>
@ -24,7 +21,6 @@
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/queryToString.h>
#include <Storages/Kafka/StorageKafka.h>
#include <Storages/StorageDistributed.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Common/checkStackSize.h>
@ -32,7 +28,6 @@
#include <Processors/NullSink.h>
#include <Processors/Transforms/ConvertingTransform.h>
#include <Processors/Sources/SinkToOutputStream.h>
#include <Processors/ConcatProcessor.h>
namespace DB
@ -162,7 +157,7 @@ BlockIO InterpreterInsertQuery::execute()
const auto & cluster = storage_src->getCluster();
const auto & shards_info = cluster->getShardsInfo();
std::vector<QueryPipeline> pipelines;
std::vector<std::unique_ptr<QueryPipeline>> pipelines;
String new_query_str = queryToString(new_query);
for (size_t shard_index : ext::range(0, shards_info.size()))
@ -171,7 +166,7 @@ BlockIO InterpreterInsertQuery::execute()
if (shard_info.isLocal())
{
InterpreterInsertQuery interpreter(new_query, context);
pipelines.emplace_back(interpreter.execute().pipeline);
pipelines.emplace_back(std::make_unique<QueryPipeline>(interpreter.execute().pipeline));
}
else
{
@ -184,8 +179,8 @@ BlockIO InterpreterInsertQuery::execute()
/// INSERT SELECT query returns empty block
auto in_stream = std::make_shared<RemoteBlockInputStream>(std::move(connections), new_query_str, Block{}, context);
pipelines.emplace_back();
pipelines.back().init(Pipe(std::make_shared<SourceFromInputStream>(std::move(in_stream))));
pipelines.back().setSinks([](const Block & header, QueryPipeline::StreamType) -> ProcessorPtr
pipelines.back()->init(Pipe(std::make_shared<SourceFromInputStream>(std::move(in_stream))));
pipelines.back()->setSinks([](const Block & header, QueryPipeline::StreamType) -> ProcessorPtr
{
return std::make_shared<EmptySink>(header);
});