Insert select using PipelineExecutor.

This commit is contained in:
Nikolai Kochetov 2020-05-27 21:20:26 +03:00
parent 5a0f356cd6
commit e93882c977
10 changed files with 222 additions and 201 deletions

View File

@ -263,8 +263,8 @@ void TCPHandler::runImpl()
else if (state.need_receive_data_for_input)
{
/// It is special case for input(), all works for reading data from client will be done in callbacks.
/// state.io.in is NullAndDoCopyBlockInputStream so read it once.
state.io.in->read();
auto executor = state.io.pipeline.execute();
executor->execute(state.io.pipeline.getNumThreads());
state.io.onFinish();
}
else if (state.io.pipeline.initialized())

View File

@ -21,19 +21,10 @@ class NullAndDoCopyBlockInputStream : public IBlockInputStream
{
public:
NullAndDoCopyBlockInputStream(const BlockInputStreamPtr & input_, BlockOutputStreamPtr output_)
: input(std::move(input_))
, output(std::move(output_))
{
input_streams.push_back(input_);
output_streams.push_back(output_);
for (auto & input_stream : input_streams)
children.push_back(input_stream);
}
NullAndDoCopyBlockInputStream(const BlockInputStreams & input_, BlockOutputStreams & output_)
: input_streams(input_), output_streams(output_)
{
for (auto & input_stream : input_)
children.push_back(input_stream);
children.push_back(input);
}
/// Suppress readPrefix and readSuffix, because they are called by copyData.
@ -53,16 +44,13 @@ protected:
/// If query was cancelled, it will be processed by child streams.
/// Part of the data will be processed.
if (input_streams.size() == 1 && output_streams.size() == 1)
copyData(*input_streams.at(0), *output_streams.at(0));
else
copyData(input_streams, output_streams);
copyData(*input, *output);
return Block();
}
private:
BlockInputStreams input_streams;
BlockOutputStreams output_streams;
BlockInputStreamPtr input;
BlockOutputStreamPtr output;
};
}

View File

@ -1,9 +1,6 @@
#include <thread>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/copyData.h>
#include <DataStreams/ParallelInputsProcessor.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/ThreadPool.h>
@ -55,79 +52,6 @@ void copyDataImpl(IBlockInputStream & from, IBlockOutputStream & to, TCancelCall
inline void doNothing(const Block &) {}
namespace
{
struct ParallelInsertsHandler
{
using CencellationHook = std::function<void()>;
explicit ParallelInsertsHandler(BlockOutputStreams & output_streams, CencellationHook cancellation_hook_, size_t num_threads)
: outputs(output_streams.size()), cancellation_hook(std::move(cancellation_hook_))
{
exceptions.resize(num_threads);
for (auto & output : output_streams)
outputs.push(output.get());
}
void onBlock(Block & block, size_t /*thread_num*/)
{
IBlockOutputStream * out = nullptr;
outputs.pop(out);
out->write(block);
outputs.push(out);
}
void onFinishThread(size_t /*thread_num*/) {}
void onFinish() {}
void onException(std::exception_ptr & exception, size_t thread_num)
{
exceptions[thread_num] = exception;
cancellation_hook();
}
void rethrowFirstException()
{
for (auto & exception : exceptions)
if (exception)
std::rethrow_exception(exception);
}
ConcurrentBoundedQueue<IBlockOutputStream *> outputs;
std::vector<std::exception_ptr> exceptions;
CencellationHook cancellation_hook;
};
}
static void copyDataImpl(BlockInputStreams & inputs, BlockOutputStreams & outputs)
{
for (auto & output : outputs)
output->writePrefix();
using Processor = ParallelInputsProcessor<ParallelInsertsHandler>;
Processor * processor_ptr = nullptr;
ParallelInsertsHandler handler(outputs, [&processor_ptr]() { processor_ptr->cancel(false); }, inputs.size());
ParallelInputsProcessor<ParallelInsertsHandler> processor(inputs, nullptr, inputs.size(), handler);
processor_ptr = &processor;
processor.process();
processor.wait();
handler.rethrowFirstException();
/// readPrefix is called in ParallelInputsProcessor.
for (auto & input : inputs)
input->readSuffix();
for (auto & output : outputs)
output->writeSuffix();
}
void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<bool> * is_cancelled)
{
auto is_cancelled_pred = [is_cancelled] ()
@ -138,11 +62,6 @@ void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<boo
copyDataImpl(from, to, is_cancelled_pred, doNothing);
}
void copyData(BlockInputStreams & inputs, BlockOutputStreams & outputs)
{
copyDataImpl(inputs, outputs);
}
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled)
{
copyDataImpl(from, to, is_cancelled, doNothing);

View File

@ -16,8 +16,6 @@ class Block;
*/
void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<bool> * is_cancelled = nullptr);
void copyData(BlockInputStreams & inputs, BlockOutputStreams & outputs);
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled);
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled,

View File

@ -28,6 +28,11 @@
#include <Storages/StorageDistributed.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Common/checkStackSize.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/NullSink.h>
#include <Processors/Transforms/ConvertingTransform.h>
#include <Processors/Sources/SinkToOutputStream.h>
#include <Processors/ConcatProcessor.h>
namespace DB
@ -117,8 +122,6 @@ BlockIO InterpreterInsertQuery::execute()
if (!query.table_function)
context.checkAccess(AccessType::INSERT, query.table_id, query_sample_block.getNames());
BlockInputStreams in_streams;
BlockOutputStreams out_streams;
bool is_distributed_insert_select = false;
if (query.select && table->isRemote() && settings.parallel_distributed_insert_select)
@ -159,6 +162,8 @@ BlockIO InterpreterInsertQuery::execute()
const auto & cluster = storage_src->getCluster();
const auto & shards_info = cluster->getShardsInfo();
std::vector<QueryPipeline> pipelines;
String new_query_str = queryToString(new_query);
for (size_t shard_index : ext::range(0, shards_info.size()))
{
@ -166,8 +171,7 @@ BlockIO InterpreterInsertQuery::execute()
if (shard_info.isLocal())
{
InterpreterInsertQuery interpreter(new_query, context);
auto block_io = interpreter.execute();
in_streams.push_back(block_io.in);
pipelines.emplace_back(interpreter.execute().pipeline);
}
else
{
@ -179,13 +183,20 @@ BlockIO InterpreterInsertQuery::execute()
/// INSERT SELECT query returns empty block
auto in_stream = std::make_shared<RemoteBlockInputStream>(std::move(connections), new_query_str, Block{}, context);
in_streams.push_back(in_stream);
pipelines.emplace_back();
pipelines.back().init(Pipe(std::make_shared<SourceFromInputStream>(std::move(in_stream))));
pipelines.back().setSinks([](const Block & header, QueryPipeline::StreamType) -> ProcessorPtr
{
return std::make_shared<EmptySink>(header);
});
}
out_streams.push_back(std::make_shared<NullBlockOutputStream>(Block()));
}
res.pipeline.unitePipelines(std::move(pipelines), {});
}
}
BlockOutputStreams out_streams;
if (!is_distributed_insert_select || query.watch)
{
size_t out_streams_size = 1;
@ -193,27 +204,21 @@ BlockIO InterpreterInsertQuery::execute()
{
/// Passing 1 as subquery_depth will disable limiting size of intermediate result.
InterpreterSelectWithUnionQuery interpreter_select{ query.select, context, SelectQueryOptions(QueryProcessingStage::Complete, 1)};
res.pipeline = interpreter_select.executeWithProcessors();
if (table->supportsParallelInsert() && settings.max_insert_threads > 1)
{
in_streams = interpreter_select.executeWithMultipleStreams(res.pipeline);
out_streams_size = std::min(size_t(settings.max_insert_threads), in_streams.size());
}
out_streams_size = std::min(size_t(settings.max_insert_threads), res.pipeline.getNumStreams());
if (out_streams_size == 1)
res.pipeline.addPipe({std::make_shared<ConcatProcessor>(res.pipeline.getHeader(), res.pipeline.getNumStreams())});
else
{
res = interpreter_select.execute();
in_streams.emplace_back(res.in);
res.in = nullptr;
res.out = nullptr;
}
res.pipeline.resize(out_streams_size);
}
else if (query.watch)
{
InterpreterWatchQuery interpreter_watch{ query.watch, context };
res = interpreter_watch.execute();
in_streams.emplace_back(res.in);
res.in = nullptr;
res.out = nullptr;
res.pipeline.init(Pipe(std::make_shared<SourceFromInputStream>(std::move(res.in))));
}
for (size_t i = 0; i < out_streams_size; i++)
@ -256,27 +261,35 @@ BlockIO InterpreterInsertQuery::execute()
}
/// What type of query: INSERT or INSERT SELECT or INSERT WATCH?
if (query.select || query.watch)
if (is_distributed_insert_select)
{
for (auto & in_stream : in_streams)
{
in_stream = std::make_shared<ConvertingBlockInputStream>(
in_stream, out_streams.at(0)->getHeader(), ConvertingBlockInputStream::MatchColumnsMode::Position);
}
/// Pipeline was already built.
}
else if (query.select || query.watch)
{
const auto & header = out_streams.at(0)->getHeader();
Block in_header = in_streams.at(0)->getHeader();
if (in_streams.size() > 1)
res.pipeline.addSimpleTransform([&](const Block & in_header) -> ProcessorPtr
{
for (size_t i = 1; i < in_streams.size(); ++i)
assertBlocksHaveEqualStructure(in_streams[i]->getHeader(), in_header, query.select ? "INSERT SELECT" : "INSERT WATCH");
}
return std::make_shared<ConvertingTransform>(in_header, header,
ConvertingTransform::MatchColumnsMode::Position);
});
res.in = std::make_shared<NullAndDoCopyBlockInputStream>(in_streams, out_streams);
res.pipeline.setSinks([&](const Block &, QueryPipeline::StreamType type) -> ProcessorPtr
{
if (type != QueryPipeline::StreamType::Main)
return nullptr;
auto stream = std::move(out_streams.back());
out_streams.pop_back();
return std::make_shared<SinkToOutputStream>(std::move(stream));
});
if (!allow_materialized)
{
for (const auto & column : table->getColumns())
if (column.default_desc.kind == ColumnDefaultKind::Materialized && in_header.has(column.name))
if (column.default_desc.kind == ColumnDefaultKind::Materialized && header.has(column.name))
throw Exception("Cannot insert column " + column.name + ", because it is MATERIALIZED column.", ErrorCodes::ILLEGAL_COLUMN);
}
}
@ -288,6 +301,7 @@ BlockIO InterpreterInsertQuery::execute()
}
else
res.out = std::move(out_streams.at(0));
res.pipeline.addStorageHolder(table);
return res;

View File

@ -342,6 +342,9 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
else
res = interpreter->execute();
if (res.pipeline.initialized())
use_processors = true;
if (const auto * insert_interpreter = typeid_cast<const InterpreterInsertQuery *>(&*interpreter))
{
/// Save insertion table (not table function). TODO: support remote() table function.
@ -369,7 +372,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
/// Limits apply only to the final result.
pipeline.setProgressCallback(context.getProgressCallback());
pipeline.setProcessListElement(context.getProcessListElement());
if (stage == QueryProcessingStage::Complete)
if (stage == QueryProcessingStage::Complete && !pipeline.isCompleted())
{
pipeline.resize(1);
pipeline.addSimpleTransform([&](const Block & header)
@ -740,29 +743,36 @@ void executeQuery(
if (ast_query_with_output && ast_query_with_output->settings_ast)
InterpreterSetQuery(ast_query_with_output->settings_ast, context).executeForCurrentContext();
pipeline.addSimpleTransform([](const Block & header)
if (!pipeline.isCompleted())
{
return std::make_shared<MaterializingTransform>(header);
});
pipeline.addSimpleTransform([](const Block & header)
{
return std::make_shared<MaterializingTransform>(header);
});
auto out = context.getOutputFormatProcessor(format_name, *out_buf, pipeline.getHeader());
out->setAutoFlush();
auto out = context.getOutputFormatProcessor(format_name, *out_buf, pipeline.getHeader());
out->setAutoFlush();
/// Save previous progress callback if any. TODO Do it more conveniently.
auto previous_progress_callback = context.getProgressCallback();
/// Save previous progress callback if any. TODO Do it more conveniently.
auto previous_progress_callback = context.getProgressCallback();
/// NOTE Progress callback takes shared ownership of 'out'.
pipeline.setProgressCallback([out, previous_progress_callback] (const Progress & progress)
/// NOTE Progress callback takes shared ownership of 'out'.
pipeline.setProgressCallback([out, previous_progress_callback] (const Progress & progress)
{
if (previous_progress_callback)
previous_progress_callback(progress);
out->onProgress(progress);
});
if (set_result_details)
set_result_details(context.getClientInfo().current_query_id, out->getContentType(), format_name, DateLUT::instance().getTimeZone());
pipeline.setOutputFormat(std::move(out));
}
else
{
if (previous_progress_callback)
previous_progress_callback(progress);
out->onProgress(progress);
});
if (set_result_details)
set_result_details(context.getClientInfo().current_query_id, out->getContentType(), format_name, DateLUT::instance().getTimeZone());
pipeline.setOutput(std::move(out));
pipeline.setProgressCallback(context.getProgressCallback());
}
{
auto executor = pipeline.execute();

View File

@ -14,9 +14,10 @@ struct PullingAsyncPipelineExecutor::Data
{
PipelineExecutorPtr executor;
std::exception_ptr exception;
std::atomic_bool is_executed = false;
std::atomic_bool is_finished = false;
std::atomic_bool has_exception = false;
ThreadFromGlobalPool thread;
Poco::Event finish_event;
~Data()
{
@ -36,8 +37,11 @@ struct PullingAsyncPipelineExecutor::Data
PullingAsyncPipelineExecutor::PullingAsyncPipelineExecutor(QueryPipeline & pipeline_) : pipeline(pipeline_)
{
lazy_format = std::make_shared<LazyOutputFormat>(pipeline.getHeader());
pipeline.setOutput(lazy_format);
if (!pipeline.isCompleted())
{
lazy_format = std::make_shared<LazyOutputFormat>(pipeline.getHeader());
pipeline.setOutputFormat(lazy_format);
}
}
PullingAsyncPipelineExecutor::~PullingAsyncPipelineExecutor()
@ -54,7 +58,8 @@ PullingAsyncPipelineExecutor::~PullingAsyncPipelineExecutor()
const Block & PullingAsyncPipelineExecutor::getHeader() const
{
return lazy_format->getPort(IOutputFormat::PortKind::Main).getHeader();
return lazy_format ? lazy_format->getPort(IOutputFormat::PortKind::Main).getHeader()
: pipeline.getHeader(); /// Empty.
}
static void threadFunction(PullingAsyncPipelineExecutor::Data & data, ThreadGroupStatusPtr thread_group, size_t num_threads)
@ -78,6 +83,9 @@ static void threadFunction(PullingAsyncPipelineExecutor::Data & data, ThreadGrou
data.exception = std::current_exception();
data.has_exception = true;
}
data.is_finished = true;
data.finish_event.set();
}
@ -99,20 +107,33 @@ bool PullingAsyncPipelineExecutor::pull(Chunk & chunk, uint64_t milliseconds)
if (data->has_exception)
{
/// Finish lazy format in case of exception. Otherwise thread.join() may hung.
lazy_format->finish();
if (lazy_format)
lazy_format->finish();
data->has_exception = false;
std::rethrow_exception(std::move(data->exception));
}
if (lazy_format->isFinished())
bool is_execution_finished = lazy_format ? lazy_format->isFinished()
: data->is_finished.load();
if (is_execution_finished)
{
data->is_executed = true;
/// If lazy format is finished, we don't cancel pipeline but wait for main thread to be finished.
data->is_finished = true;
/// Wait thread ant rethrow exception if any.
cancel();
return false;
}
chunk = lazy_format->getChunk(milliseconds);
if (lazy_format)
{
chunk = lazy_format->getChunk(milliseconds);
return true;
}
chunk.clear();
data->finish_event.tryWait(milliseconds);
return true;
}
@ -147,11 +168,11 @@ bool PullingAsyncPipelineExecutor::pull(Block & block, uint64_t milliseconds)
void PullingAsyncPipelineExecutor::cancel()
{
/// Cancel execution if it wasn't finished.
if (data && !data->is_executed && data->executor)
if (data && !data->is_finished && data->executor)
data->executor->cancel();
/// Finish lazy format. Otherwise thread.join() may hung.
if (!lazy_format->isFinished())
if (lazy_format && !lazy_format->isFinished())
lazy_format->finish();
/// Join thread here to wait for possible exception.
@ -165,12 +186,14 @@ void PullingAsyncPipelineExecutor::cancel()
Chunk PullingAsyncPipelineExecutor::getTotals()
{
return lazy_format->getTotals();
return lazy_format ? lazy_format->getTotals()
: Chunk();
}
Chunk PullingAsyncPipelineExecutor::getExtremes()
{
return lazy_format->getExtremes();
return lazy_format ? lazy_format->getExtremes()
: Chunk();
}
Block PullingAsyncPipelineExecutor::getTotalsBlock()
@ -197,7 +220,9 @@ Block PullingAsyncPipelineExecutor::getExtremesBlock()
BlockStreamProfileInfo & PullingAsyncPipelineExecutor::getProfileInfo()
{
return lazy_format->getProfileInfo();
static BlockStreamProfileInfo profile_info;
return lazy_format ? lazy_format->getProfileInfo()
: profile_info;
}
}

View File

@ -9,7 +9,7 @@ namespace DB
PullingPipelineExecutor::PullingPipelineExecutor(QueryPipeline & pipeline_) : pipeline(pipeline_)
{
pulling_format = std::make_shared<PullingOutputFormat>(pipeline.getHeader(), has_data_flag);
pipeline.setOutput(pulling_format);
pipeline.setOutputFormat(pulling_format);
}
PullingPipelineExecutor::~PullingPipelineExecutor()

View File

@ -34,6 +34,14 @@ void QueryPipeline::checkInitialized()
throw Exception("QueryPipeline wasn't initialized.", ErrorCodes::LOGICAL_ERROR);
}
void QueryPipeline::checkInitializedAndNotCompleted()
{
checkInitialized();
if (streams.empty())
throw Exception("QueryPipeline was already completed.", ErrorCodes::LOGICAL_ERROR);
}
void QueryPipeline::checkSource(const ProcessorPtr & source, bool can_have_totals)
{
if (!source->getInputs().empty())
@ -194,11 +202,11 @@ static ProcessorPtr callProcessorGetter(
template <typename TProcessorGetter>
void QueryPipeline::addSimpleTransformImpl(const TProcessorGetter & getter)
{
checkInitialized();
checkInitializedAndNotCompleted();
Block header;
auto add_transform = [&](OutputPort *& stream, StreamType stream_type, size_t stream_num [[maybe_unused]] = IProcessor::NO_STREAM)
auto add_transform = [&](OutputPort *& stream, StreamType stream_type)
{
if (!stream)
return;
@ -231,17 +239,14 @@ void QueryPipeline::addSimpleTransformImpl(const TProcessorGetter & getter)
if (transform)
{
// if (stream_type == StreamType::Main)
// transform->setStream(stream_num);
connect(*stream, transform->getInputs().front());
stream = &transform->getOutputs().front();
processors.emplace_back(std::move(transform));
}
};
for (size_t stream_num = 0; stream_num < streams.size(); ++stream_num)
add_transform(streams[stream_num], StreamType::Main, stream_num);
for (auto & stream : streams)
add_transform(stream, StreamType::Main);
add_transform(totals_having_port, StreamType::Totals);
add_transform(extremes_port, StreamType::Extremes);
@ -259,9 +264,50 @@ void QueryPipeline::addSimpleTransform(const ProcessorGetterWithStreamKind & get
addSimpleTransformImpl(getter);
}
void QueryPipeline::setSinks(const ProcessorGetterWithStreamKind & getter)
{
checkInitializedAndNotCompleted();
auto add_transform = [&](OutputPort *& stream, StreamType stream_type)
{
if (!stream)
return;
auto transform = getter(stream->getHeader(), stream_type);
if (transform)
{
if (transform->getInputs().size() != 1)
throw Exception("Sink for query pipeline transform should have single input, "
"but " + transform->getName() + " has " +
toString(transform->getInputs().size()) + " inputs.", ErrorCodes::LOGICAL_ERROR);
if (!transform->getOutputs().empty())
throw Exception("Sink for query pipeline transform should have no outputs, "
"but " + transform->getName() + " has " +
toString(transform->getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
}
if (!transform)
transform = std::make_shared<NullSink>(stream->getHeader());
connect(*stream, transform->getInputs().front());
processors.emplace_back(std::move(transform));
};
for (auto & stream : streams)
add_transform(stream, StreamType::Main);
add_transform(totals_having_port, StreamType::Totals);
add_transform(extremes_port, StreamType::Extremes);
streams.clear();
current_header.clear();
}
void QueryPipeline::addPipe(Processors pipe)
{
checkInitialized();
checkInitializedAndNotCompleted();
if (pipe.empty())
throw Exception("Can't add empty processors list to QueryPipeline.", ErrorCodes::LOGICAL_ERROR);
@ -298,7 +344,7 @@ void QueryPipeline::addPipe(Processors pipe)
void QueryPipeline::addDelayedStream(ProcessorPtr source)
{
checkInitialized();
checkInitializedAndNotCompleted();
checkSource(source, false);
assertBlocksHaveEqualStructure(current_header, source->getOutputs().front().getHeader(), "QueryPipeline");
@ -313,7 +359,7 @@ void QueryPipeline::addDelayedStream(ProcessorPtr source)
void QueryPipeline::resize(size_t num_streams, bool force, bool strict)
{
checkInitialized();
checkInitializedAndNotCompleted();
if (!force && num_streams == getNumStreams())
return;
@ -347,7 +393,7 @@ void QueryPipeline::enableQuotaForCurrentStreams()
void QueryPipeline::addTotalsHavingTransform(ProcessorPtr transform)
{
checkInitialized();
checkInitializedAndNotCompleted();
if (!typeid_cast<const TotalsHavingTransform *>(transform.get()))
throw Exception("TotalsHavingTransform expected for QueryPipeline::addTotalsHavingTransform.",
@ -370,7 +416,7 @@ void QueryPipeline::addTotalsHavingTransform(ProcessorPtr transform)
void QueryPipeline::addDefaultTotals()
{
checkInitialized();
checkInitializedAndNotCompleted();
if (totals_having_port)
throw Exception("Totals having transform was already added to pipeline.", ErrorCodes::LOGICAL_ERROR);
@ -392,7 +438,7 @@ void QueryPipeline::addDefaultTotals()
void QueryPipeline::addTotals(ProcessorPtr source)
{
checkInitialized();
checkInitializedAndNotCompleted();
if (totals_having_port)
throw Exception("Totals having transform was already added to pipeline.", ErrorCodes::LOGICAL_ERROR);
@ -423,7 +469,7 @@ void QueryPipeline::dropTotalsAndExtremes()
void QueryPipeline::addExtremesTransform()
{
checkInitialized();
checkInitializedAndNotCompleted();
if (extremes_port)
throw Exception("Extremes transform was already added to pipeline.", ErrorCodes::LOGICAL_ERROR);
@ -450,7 +496,7 @@ void QueryPipeline::addExtremesTransform()
void QueryPipeline::addCreatingSetsTransform(ProcessorPtr transform)
{
checkInitialized();
checkInitializedAndNotCompleted();
if (!typeid_cast<const CreatingSetsTransform *>(transform.get()))
throw Exception("CreatingSetsTransform expected for QueryPipeline::addExtremesTransform.",
@ -467,14 +513,14 @@ void QueryPipeline::addCreatingSetsTransform(ProcessorPtr transform)
processors.emplace_back(std::move(concat));
}
void QueryPipeline::setOutput(ProcessorPtr output)
void QueryPipeline::setOutputFormat(ProcessorPtr output)
{
checkInitialized();
checkInitializedAndNotCompleted();
auto * format = dynamic_cast<IOutputFormat * >(output.get());
if (!format)
throw Exception("IOutputFormat processor expected for QueryPipeline::setOutput.", ErrorCodes::LOGICAL_ERROR);
throw Exception("IOutputFormat processor expected for QueryPipeline::setOutputFormat.", ErrorCodes::LOGICAL_ERROR);
if (output_format)
throw Exception("QueryPipeline already has output.", ErrorCodes::LOGICAL_ERROR);
@ -507,19 +553,25 @@ void QueryPipeline::setOutput(ProcessorPtr output)
connect(*totals_having_port, totals);
connect(*extremes_port, extremes);
streams.clear();
current_header.clear();
extremes_port = nullptr;
totals_having_port = nullptr;
initRowsBeforeLimit();
}
void QueryPipeline::unitePipelines(
std::vector<QueryPipeline> && pipelines, const Block & common_header)
{
checkInitialized();
addSimpleTransform([&](const Block & header)
if (initialized())
{
return std::make_shared<ConvertingTransform>(
header, common_header, ConvertingTransform::MatchColumnsMode::Position);
});
addSimpleTransform([&](const Block & header)
{
return std::make_shared<ConvertingTransform>(
header, common_header, ConvertingTransform::MatchColumnsMode::Position);
});
}
std::vector<OutputPort *> extremes;
std::vector<OutputPort *> totals;
@ -534,11 +586,14 @@ void QueryPipeline::unitePipelines(
{
pipeline.checkInitialized();
pipeline.addSimpleTransform([&](const Block & header)
if (!pipeline.isCompleted())
{
return std::make_shared<ConvertingTransform>(
header, common_header, ConvertingTransform::MatchColumnsMode::Position);
});
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ConvertingTransform>(
header, common_header, ConvertingTransform::MatchColumnsMode::Position);
});
}
if (pipeline.extremes_port)
{
@ -726,10 +781,8 @@ Pipe QueryPipeline::getPipe() &&
PipelineExecutorPtr QueryPipeline::execute()
{
checkInitialized();
if (!output_format)
throw Exception("Cannot execute pipeline because it doesn't have output.", ErrorCodes::LOGICAL_ERROR);
if (!isCompleted())
throw Exception("Cannot execute pipeline because it is not completed.", ErrorCodes::LOGICAL_ERROR);
return std::make_shared<PipelineExecutor>(processors, process_list_element);
}

View File

@ -28,6 +28,7 @@ private:
{
public:
auto size() const { return data.size(); }
bool empty() const { return size() == 0; }
auto begin() { return data.begin(); }
auto end() { return data.end(); }
auto & front() { return data.front(); }
@ -81,6 +82,7 @@ public:
void init(Pipes pipes);
void init(Pipe pipe); /// Simple init for single pipe
bool initialized() { return !processors.empty(); }
bool isCompleted() { return initialized() && streams.empty(); }
/// Type of logical data stream for simple transform.
/// Sometimes it's important to know which part of pipeline we are working for.
@ -95,13 +97,23 @@ public:
using ProcessorGetter = std::function<ProcessorPtr(const Block & header)>;
using ProcessorGetterWithStreamKind = std::function<ProcessorPtr(const Block & header, StreamType stream_type)>;
/// Add transform with simple input and simple output for each port.
void addSimpleTransform(const ProcessorGetter & getter);
void addSimpleTransform(const ProcessorGetterWithStreamKind & getter);
/// Add several processors. They must have same header for inputs and same for outputs.
/// Total number of inputs must be the same as the number of streams. Output ports will become new streams.
void addPipe(Processors pipe);
/// Add TotalsHavingTransform. Resize pipeline to single input. Adds totals port.
void addTotalsHavingTransform(ProcessorPtr transform);
/// Add transform which calculates extremes. This transform adds extremes port and doesn't change inputs number.
void addExtremesTransform();
/// Adds transform which creates sets. It will be executed before reading any data from input ports.
void addCreatingSetsTransform(ProcessorPtr transform);
void setOutput(ProcessorPtr output);
/// Resize pipeline to single output and add IOutputFormat. Pipeline will be completed after this transformation.
void setOutputFormat(ProcessorPtr output);
/// Sink is a processor with single input port and no output ports. Creates sink for each output port.
/// Pipeline will be completed after this transformation.
void setSinks(const ProcessorGetterWithStreamKind & getter);
/// Add totals which returns one chunk with single row with defaults.
void addDefaultTotals();
@ -118,6 +130,7 @@ public:
/// Check if resize transform was used. (In that case another distinct transform will be added).
bool hasMixedStreams() const { return has_resize || hasMoreThanOneStream(); }
/// Changes the number of input ports if needed. Adds ResizeTransform.
void resize(size_t num_streams, bool force = false, bool strict = false);
void enableQuotaForCurrentStreams();
@ -193,6 +206,7 @@ private:
QueryStatus * process_list_element = nullptr;
void checkInitialized();
void checkInitializedAndNotCompleted();
static void checkSource(const ProcessorPtr & source, bool can_have_totals);
template <typename TProcessorGetter>