diff --git a/src/Processors/ConcatProcessor.cpp b/src/Processors/ConcatProcessor.cpp index 27338c7c879..2c540abb259 100644 --- a/src/Processors/ConcatProcessor.cpp +++ b/src/Processors/ConcatProcessor.cpp @@ -4,6 +4,17 @@ namespace DB { +ConcatProcessor::ConcatProcessor(const Block & header, size_t num_inputs) + : IProcessor(InputPorts(num_inputs, header), OutputPorts{header}), current_input(inputs.begin()) +{ +} + +void ConcatProcessor::prepareInitializeInputs() +{ + for (auto & input : inputs) + input.setNeeded(); +} + ConcatProcessor::Status ConcatProcessor::prepare() { auto & output = outputs.front(); @@ -42,6 +53,12 @@ ConcatProcessor::Status ConcatProcessor::prepare() auto & input = *current_input; + if (!is_initialized) + { + prepareInitializeInputs(); + is_initialized = true; + } + input.setNeeded(); if (!input.hasData()) diff --git a/src/Processors/ConcatProcessor.h b/src/Processors/ConcatProcessor.h index 4aa5099b38a..852d08fb5c2 100644 --- a/src/Processors/ConcatProcessor.h +++ b/src/Processors/ConcatProcessor.h @@ -16,10 +16,7 @@ namespace DB class ConcatProcessor : public IProcessor { public: - ConcatProcessor(const Block & header, size_t num_inputs) - : IProcessor(InputPorts(num_inputs, header), OutputPorts{header}), current_input(inputs.begin()) - { - } + ConcatProcessor(const Block & header, size_t num_inputs); String getName() const override { return "Concat"; } @@ -29,6 +26,9 @@ public: private: InputPorts::iterator current_input; + + bool is_initialized = false; + void prepareInitializeInputs(); }; }