Fix build.

This commit is contained in:
Nikolai Kochetov 2021-09-17 20:52:26 +03:00
parent 618d4d863e
commit a8443bef4d
5 changed files with 20 additions and 23 deletions

View File

@ -7,7 +7,9 @@
#include <Processors/Transforms/getSourceFromASTInsertQuery.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/Executors/StreamingFormatExecutor.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Processors/QueryPipeline.h>
#include <IO/ConcatReadBuffer.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/ReadBufferFromString.h>
@ -363,10 +365,10 @@ try
insert_context->setSettings(key.settings);
InterpreterInsertQuery interpreter(key.query, insert_context, key.settings.insert_allow_materialized_columns);
auto sinks = interpreter.getSinks();
assert(sinks.size() == 1);
auto pipeline = interpreter.execute().pipeline;
assert(pipeline.pushing());
auto header = sinks.at(0)->getInputs().front().getHeader();
auto header = pipeline.getHeader();
auto format = getInputFormatFromASTInsertQuery(key.query, false, header, insert_context, nullptr);
size_t total_rows = 0;
@ -408,15 +410,10 @@ try
size_t total_bytes = chunk.bytes();
auto source = std::make_shared<SourceFromSingleChunk>(header, std::move(chunk));
Pipe pipe(source);
pipeline.complete(Pipe(std::move(source)));
QueryPipeline out_pipeline;
out_pipeline.init(std::move(pipe));
out_pipeline.resize(1);
out_pipeline.setSinks([&](const Block &, Pipe::StreamType) { return sinks.at(0); });
auto out_executor = out_pipeline.execute();
out_executor->execute(out_pipeline.getNumThreads());
CompletedPipelineExecutor completed_executor(pipeline);
completed_executor.execute();
LOG_INFO(log, "Flushed {} rows, {} bytes for query '{}'",
total_rows, total_bytes, queryToString(key.query));

View File

@ -424,7 +424,7 @@ BlockIO InterpreterInsertQuery::execute()
if (query.hasInlinedData())
{
/// can execute without additional data
auto pipe = getSourceFromASTInsertQuery(query_ptr, false, query_sample_block, getContext(), nullptr);
auto pipe = getSourceFromASTInsertQuery(query_ptr, true, query_sample_block, getContext(), nullptr);
res.pipeline.complete(std::move(pipe));
}
}

View File

@ -43,9 +43,10 @@ public:
void extendQueryLogElemImpl(QueryLogElement & elem, const ASTPtr & ast, ContextPtr context_) const override;
private:
StoragePtr getTable(ASTInsertQuery & query);
Block getSampleBlock(const ASTInsertQuery & query, const StoragePtr & table, const StorageMetadataPtr & metadata_snapshot) const;
private:
Block getSampleBlock(const Names & names, const StoragePtr & table, const StorageMetadataPtr & metadata_snapshot) const;
ASTPtr query_ptr;

View File

@ -52,13 +52,13 @@
#include <Common/SensitiveDataMasker.h>
#include "IO/CompressionMethod.h"
#include "Processors/printPipeline.h"
#include <Processors/Transforms/LimitsCheckingTransform.h>
#include <Processors/Transforms/MaterializingTransform.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Sinks/ExceptionHandlingSink.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Sources/SinkToOutputStream.h>
#include <Processors/Sources/WaitForAsyncInsertSource.h>
#include <random>
@ -597,7 +597,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
auto timeout = settings.wait_for_async_insert_timeout.totalMilliseconds();
auto query_id = context->getCurrentQueryId();
auto source = std::make_shared<WaitForAsyncInsertSource>(query_id, timeout, *queue);
io.pipeline.init(Pipe(source));
io.pipeline = QueryPipeline(Pipe(std::move(source)));
}
return std::make_tuple(ast, std::move(io));
@ -1020,7 +1020,7 @@ void executeQuery(
{
if (pipeline.pushing())
{
auto pipe = getSourceFromFromASTInsertQuery(ast, true, pipeline.getHeader(), context, nullptr);
auto pipe = getSourceFromASTInsertQuery(ast, true, pipeline.getHeader(), context, nullptr);
pipeline.complete(std::move(pipe));
}
else if (pipeline.pulling())

View File

@ -398,9 +398,8 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
column_part_source->setProgressCallback(
MergeProgressCallback(*global_ctx->merge_entry, global_ctx->watch_prev_elapsed, *global_ctx->column_progress));
QueryPipeline column_part_pipeline;
column_part_pipeline.init(Pipe(std::move(column_part_source)));
column_part_pipeline.setMaxThreads(1);
QueryPipeline column_part_pipeline(Pipe(std::move(column_part_source)));
column_part_pipeline.setNumThreads(1);
ctx->column_part_streams[part_num] =
std::make_shared<PipelineExecutingBlockInputStream>(std::move(column_part_pipeline));
@ -795,10 +794,10 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
break;
}
QueryPipeline pipeline;
pipeline.init(Pipe::unitePipes(std::move(pipes)));
pipeline.addTransform(std::move(merged_transform));
pipeline.setMaxThreads(1);
auto res_pipe = Pipe::unitePipes(std::move(pipes));
res_pipe.addTransform(std::move(merged_transform));
QueryPipeline pipeline(std::move(res_pipe));
pipeline.setNumThreads(1);
global_ctx->merged_stream = std::make_shared<PipelineExecutingBlockInputStream>(std::move(pipeline));