Use Resize instead of Concat in InterpreterInsertQuery.

This commit is contained in:
Nikolai Kochetov 2020-06-05 12:30:16 +03:00
parent 1887e343ec
commit 5fea34d0e1
3 changed files with 1 additions and 19 deletions

View File

@ -209,10 +209,7 @@ BlockIO InterpreterInsertQuery::execute()
if (table->supportsParallelInsert() && settings.max_insert_threads > 1)
out_streams_size = std::min(size_t(settings.max_insert_threads), res.pipeline.getNumStreams());
if (out_streams_size == 1)
res.pipeline.addPipe({std::make_shared<ConcatProcessor>(res.pipeline.getHeader(), res.pipeline.getNumStreams())});
else
res.pipeline.resize(out_streams_size);
res.pipeline.resize(out_streams_size);
}
else if (query.watch)
{

View File

@ -9,12 +9,6 @@ ConcatProcessor::ConcatProcessor(const Block & header, size_t num_inputs)
{
}
void ConcatProcessor::prepareInitializeInputs()
{
for (auto & input : inputs)
input.setNeeded();
}
ConcatProcessor::Status ConcatProcessor::prepare()
{
auto & output = outputs.front();
@ -53,12 +47,6 @@ ConcatProcessor::Status ConcatProcessor::prepare()
auto & input = *current_input;
if (!is_initialized)
{
prepareInitializeInputs();
is_initialized = true;
}
input.setNeeded();
if (!input.hasData())

View File

@ -26,9 +26,6 @@ public:
private:
InputPorts::iterator current_input;
bool is_initialized = false;
void prepareInitializeInputs();
};
}