diff --git a/dbms/src/Processors/Executors/PipelineExecutor.cpp b/dbms/src/Processors/Executors/PipelineExecutor.cpp index 8892418d0dc..9013b83486a 100644 --- a/dbms/src/Processors/Executors/PipelineExecutor.cpp +++ b/dbms/src/Processors/Executors/PipelineExecutor.cpp @@ -7,7 +7,6 @@ #include #include -#include #include #include #include @@ -52,26 +51,25 @@ bool PipelineExecutor::addEdges(UInt64 node) const IProcessor * cur = graph[node].processor; - auto add_edge = [&](auto & from_port, const IProcessor * to_proc, Edges & edges) + auto add_edge = [&](auto & from_port, const IProcessor * to_proc, Edges & edges, + bool is_backward, UInt64 input_port_number, UInt64 output_port_number, + std::vector * update_list) { auto it = processors_map.find(to_proc); if (it == processors_map.end()) throwUnknownProcessor(to_proc, cur, true); UInt64 proc_num = it->second; - Edge * edge_ptr = nullptr; for (auto & edge : edges) - if (edge.to == proc_num) - edge_ptr = &edge; - - if (!edge_ptr) { - edge_ptr = &edges.emplace_back(); - edge_ptr->to = proc_num; + if (edge.to == proc_num) + throw Exception("Multiple edges are not allowed for the same processors.", ErrorCodes::LOGICAL_ERROR); } - from_port.setVersion(&edge_ptr->version); + auto & edge = edges.emplace_back(proc_num, is_backward, input_port_number, output_port_number, update_list); + + from_port.setUpdateInfo(&edge.update_info); }; bool was_edge_added = false; @@ -83,10 +81,11 @@ bool PipelineExecutor::addEdges(UInt64 node) { was_edge_added = true; - for (auto it = std::next(inputs.begin(), from_input); it != inputs.end(); ++it) + for (auto it = std::next(inputs.begin(), from_input); it != inputs.end(); ++it, ++from_input) { const IProcessor * proc = &it->getOutputPort().getProcessor(); - add_edge(*it, proc, graph[node].backEdges); + auto output_port_number = proc->getOutputPortNumber(&it->getOutputPort()); + add_edge(*it, proc, graph[node].backEdges, true, from_input, output_port_number, &graph[node].post_updated_input_ports); } } @@ -97,10 +96,11 @@ bool PipelineExecutor::addEdges(UInt64 node) { was_edge_added = true; - for (auto it = std::next(outputs.begin(), from_output); it != outputs.end(); ++it) + for (auto it = std::next(outputs.begin(), from_output); it != outputs.end(); ++it, ++from_output) { const IProcessor * proc = &it->getInputPort().getProcessor(); - add_edge(*it, proc, graph[node].directEdges); + auto input_port_number = proc->getInputPortNumber(&it->getInputPort()); + add_edge(*it, proc, graph[node].directEdges, false, input_port_number, from_output, &graph[node].post_updated_output_ports); } } @@ -131,6 +131,7 @@ void PipelineExecutor::addChildlessProcessorsToStack(Stack & stack) if (graph[proc].directEdges.empty()) { stack.push(proc); + /// do not lock mutex, as this function is executedin single thread graph[proc].status = ExecStatus::Preparing; } } @@ -195,9 +196,20 @@ void PipelineExecutor::expandPipeline(Stack & stack, UInt64 pid) UInt64 num_processors = processors.size(); for (UInt64 node = 0; node < num_processors; ++node) { + size_t num_direct_edges = graph[node].directEdges.size(); + size_t num_back_edges = graph[node].backEdges.size(); + if (addEdges(node)) { - if (graph[node].status == ExecStatus::Idle || graph[node].status == ExecStatus::New) + std::lock_guard guard(graph[node].status_mutex); + + for (; num_back_edges < graph[node].backEdges.size(); ++num_back_edges) + graph[node].updated_input_ports.emplace_back(num_back_edges); + + for (; num_direct_edges < graph[node].directEdges.size(); ++num_direct_edges) + graph[node].updated_output_ports.emplace_back(num_direct_edges); + + if (graph[node].status == ExecStatus::Idle) { graph[node].status = ExecStatus::Preparing; stack.push(node); @@ -212,34 +224,26 @@ bool PipelineExecutor::tryAddProcessorToStackIfUpdated(Edge & edge, Stack & stac auto & node = graph[edge.to]; - ExecStatus status = node.status.load(); + std::lock_guard guard(node.status_mutex); - /// Don't add processor if nothing was read from port. - if (status != ExecStatus::New && edge.version == edge.prev_version) - return false; + ExecStatus status = node.status; if (status == ExecStatus::Finished) return false; - /// Signal that node need to be prepared. - node.need_to_be_prepared = true; - edge.prev_version = edge.version; + if (edge.backward) + node.updated_output_ports.push_back(edge.output_port_number); + else + node.updated_input_ports.push_back(edge.input_port_number); - /// Try to get ownership for node. - - /// Assume that current status is New or Idle. Otherwise, can't prepare node. - if (status != ExecStatus::New) - status = ExecStatus::Idle; - - /// Statuses but New and Idle are not interesting because they own node. - /// Prepare will be called in owning thread before changing status. - while (!node.status.compare_exchange_weak(status, ExecStatus::Preparing)) - if (!(status == ExecStatus::New || status == ExecStatus::Idle) || !node.need_to_be_prepared) - return false; - - stack.push(edge.to); - return true; + if (status == ExecStatus::Idle) + { + node.status = ExecStatus::Preparing; + stack.push(edge.to); + return true; + } + return false; } bool PipelineExecutor::prepareProcessor(UInt64 pid, Stack & children, Stack & parents, size_t thread_number, bool async) @@ -247,105 +251,117 @@ bool PipelineExecutor::prepareProcessor(UInt64 pid, Stack & children, Stack & pa /// In this method we have ownership on node. auto & node = graph[pid]; + bool need_traverse = false; + bool need_expand_pipeline = false; + + std::vector updated_back_edges; + std::vector updated_direct_edges; + { /// Stopwatch watch; - /// Disable flag before prepare call. Otherwise, we can skip prepare request. - /// Prepare can be called more times than needed, but it's ok. - node.need_to_be_prepared = false; + std::lock_guard guard(node.status_mutex); - auto status = node.processor->prepare(); + auto status = node.processor->prepare(node.updated_input_ports, node.updated_output_ports); + node.updated_input_ports.clear(); + node.updated_output_ports.clear(); /// node.execution_state->preparation_time_ns += watch.elapsed(); node.last_processor_status = status; - } - auto add_neighbours_to_prepare_queue = [&] () - { - for (auto & edge : node.backEdges) - tryAddProcessorToStackIfUpdated(edge, parents); - - for (auto & edge : node.directEdges) - tryAddProcessorToStackIfUpdated(edge, children); - }; - - auto try_release_ownership = [&] () - { - /// This function can be called after expand pipeline, where node from outer scope is not longer valid. - auto & node_ = graph[pid]; - ExecStatus expected = ExecStatus::Idle; - node_.status = ExecStatus::Idle; - - if (node_.need_to_be_prepared) + switch (node.last_processor_status) { - while (!node_.status.compare_exchange_weak(expected, ExecStatus::Preparing)) - if (!(expected == ExecStatus::Idle) || !node_.need_to_be_prepared) - return; - - children.push(pid); - } - }; - - switch (node.last_processor_status) - { - case IProcessor::Status::NeedData: - case IProcessor::Status::PortFull: - { - add_neighbours_to_prepare_queue(); - try_release_ownership(); - - break; - } - case IProcessor::Status::Finished: - { - add_neighbours_to_prepare_queue(); - node.status = ExecStatus::Finished; - break; - } - case IProcessor::Status::Ready: - { - node.status = ExecStatus::Executing; - return true; - } - case IProcessor::Status::Async: - { - throw Exception("Async is temporary not supported.", ErrorCodes::LOGICAL_ERROR); + case IProcessor::Status::NeedData: + case IProcessor::Status::PortFull: + { + need_traverse = true; + node.status = ExecStatus::Idle; + break; + } + case IProcessor::Status::Finished: + { + need_traverse = true; + node.status = ExecStatus::Finished; + break; + } + case IProcessor::Status::Ready: + { + node.status = ExecStatus::Executing; + return true; + } + case IProcessor::Status::Async: + { + throw Exception("Async is temporary not supported.", ErrorCodes::LOGICAL_ERROR); // node.status = ExecStatus::Executing; // addAsyncJob(pid); // break; - } - case IProcessor::Status::Wait: - { - if (!async) - throw Exception("Processor returned status Wait before Async.", ErrorCodes::LOGICAL_ERROR); - break; - } - case IProcessor::Status::ExpandPipeline: - { - executor_contexts[thread_number]->task_list.emplace_back( - node.execution_state.get(), - &parents - ); - - ExpandPipelineTask * desired = &executor_contexts[thread_number]->task_list.back(); - ExpandPipelineTask * expected = nullptr; - - while (!expand_pipeline_task.compare_exchange_strong(expected, desired)) + } + case IProcessor::Status::Wait: { - doExpandPipeline(expected, true); - expected = nullptr; + if (!async) + throw Exception("Processor returned status Wait before Async.", ErrorCodes::LOGICAL_ERROR); + break; + } + case IProcessor::Status::ExpandPipeline: + { + need_expand_pipeline = true; + break; + } + } + + if (need_traverse) + { + for (auto & edge_id : node.post_updated_input_ports) + { + auto edge = static_cast(edge_id); + updated_back_edges.emplace_back(edge); + edge->update_info.trigger(); } - doExpandPipeline(desired, true); + for (auto & edge_id : node.post_updated_output_ports) + { + auto edge = static_cast(edge_id); + updated_direct_edges.emplace_back(edge); + edge->update_info.trigger(); + } - /// node is not longer valid after pipeline was expanded - graph[pid].need_to_be_prepared = true; - try_release_ownership(); - break; + node.post_updated_input_ports.clear(); + node.post_updated_output_ports.clear(); } } + if (need_traverse) + { + for (auto & edge : updated_back_edges) + tryAddProcessorToStackIfUpdated(*edge, parents); + + for (auto & edge : updated_direct_edges) + tryAddProcessorToStackIfUpdated(*edge, children); + } + + if (need_expand_pipeline) + { + executor_contexts[thread_number]->task_list.emplace_back( + node.execution_state.get(), + &parents + ); + + ExpandPipelineTask * desired = &executor_contexts[thread_number]->task_list.back(); + ExpandPipelineTask * expected = nullptr; + + while (!expand_pipeline_task.compare_exchange_strong(expected, desired)) + { + doExpandPipeline(expected, true); + expected = nullptr; + } + + doExpandPipeline(desired, true); + + /// Add itself back to be prepared again. + children.push(pid); + } + return false; } @@ -427,7 +443,7 @@ void PipelineExecutor::execute(size_t num_threads) bool all_processors_finished = true; for (auto & node : graph) - if (node.status != ExecStatus::Finished) + if (node.status != ExecStatus::Finished) /// Single thread, do not hold mutex all_processors_finished = false; if (!all_processors_finished) diff --git a/dbms/src/Processors/Executors/PipelineExecutor.h b/dbms/src/Processors/Executors/PipelineExecutor.h index b5e3c7a0e1e..aded3de3008 100644 --- a/dbms/src/Processors/Executors/PipelineExecutor.h +++ b/dbms/src/Processors/Executors/PipelineExecutor.h @@ -43,12 +43,23 @@ private: struct Edge { + Edge(UInt64 to_, bool backward_, + UInt64 input_port_number_, UInt64 output_port_number_, std::vector * update_list) + : to(to_), backward(backward_) + , input_port_number(input_port_number_), output_port_number(output_port_number_) + { + update_info.update_list = update_list; + update_info.id = this; + } + UInt64 to = std::numeric_limits::max(); + bool backward; + UInt64 input_port_number; + UInt64 output_port_number; /// Edge version is increased when port's state is changed (e.g. when data is pushed). See Port.h for details. /// To compare version with prev_version we can decide if neighbour processor need to be prepared. - UInt64 version = 0; - UInt64 prev_version = 0; + Port::UpdateInfo update_info; }; /// Use std::list because new ports can be added to processor during execution. @@ -58,7 +69,6 @@ private: /// Can be owning or not. Owning means that executor who set this status can change node's data and nobody else can. enum class ExecStatus { - New, /// prepare wasn't called yet. Initial state. Non-owning. Idle, /// prepare returned NeedData or PortFull. Non-owning. Preparing, /// some executor is preparing processor, or processor is in task_queue. Owning. Executing, /// prepare returned Ready and task is executing. Owning. @@ -87,17 +97,22 @@ private: Edges directEdges; Edges backEdges; - std::atomic status; - /// This flag can be set by any executor. - /// When enabled, any executor can try to atomically set Preparing state to status. - std::atomic_bool need_to_be_prepared; + ExecStatus status; + std::mutex status_mutex; + + std::vector post_updated_input_ports; + std::vector post_updated_output_ports; + /// Last state for profiling. IProcessor::Status last_processor_status = IProcessor::Status::NeedData; std::unique_ptr execution_state; + IProcessor::PortNumbers updated_input_ports; + IProcessor::PortNumbers updated_output_ports; + Node(IProcessor * processor_, UInt64 processor_id) - : processor(processor_), status(ExecStatus::New), need_to_be_prepared(false) + : processor(processor_), status(ExecStatus::Idle) { execution_state = std::make_unique(); execution_state->processor = processor; @@ -105,8 +120,8 @@ private: } Node(Node && other) noexcept - : processor(other.processor), status(other.status.load()) - , need_to_be_prepared(other.need_to_be_prepared.load()), execution_state(std::move(other.execution_state)) + : processor(other.processor), status(other.status) + , execution_state(std::move(other.execution_state)) { } }; diff --git a/dbms/src/Processors/IProcessor.h b/dbms/src/Processors/IProcessor.h index ed59f4e591d..852bde2d467 100644 --- a/dbms/src/Processors/IProcessor.h +++ b/dbms/src/Processors/IProcessor.h @@ -171,7 +171,15 @@ public: * - method 'prepare' cannot be executed in parallel even for different objects, * if they are connected (including indirectly) to each other by their ports; */ - virtual Status prepare() = 0; + virtual Status prepare() + { + throw Exception("Method 'prepare' is not implemented for " + getName() + " processor", ErrorCodes::NOT_IMPLEMENTED); + } + + using PortNumbers = std::vector; + + /// Optimization for prepare in case we know ports were updated. + virtual Status prepare(const PortNumbers & /*updated_input_ports*/, const PortNumbers & /*updated_output_ports*/) { return prepare(); } /** You may call this method if 'prepare' returned Ready. * This method cannot access any ports. It should use only data that was prepared by 'prepare' method. @@ -183,11 +191,6 @@ public: throw Exception("Method 'work' is not implemented for " + getName() + " processor", ErrorCodes::NOT_IMPLEMENTED); } - virtual void work(size_t /*thread_num*/) - { - work(); - } - /** You may call this method if 'prepare' returned Async. * This method cannot access any ports. It should use only data that was prepared by 'prepare' method. * @@ -226,6 +229,34 @@ public: auto & getInputs() { return inputs; } auto & getOutputs() { return outputs; } + UInt64 getInputPortNumber(const InputPort * input_port) const + { + UInt64 number = 0; + for (auto & port : inputs) + { + if (&port == input_port) + return number; + + ++number; + } + + throw Exception("Can't find input port for " + getName() + " processor", ErrorCodes::LOGICAL_ERROR); + } + + UInt64 getOutputPortNumber(const OutputPort * output_port) const + { + UInt64 number = 0; + for (auto & port : outputs) + { + if (&port == output_port) + return number; + + ++number; + } + + throw Exception("Can't find output port for " + getName() + " processor", ErrorCodes::LOGICAL_ERROR); + } + const auto & getInputs() const { return inputs; } const auto & getOutputs() const { return outputs; } diff --git a/dbms/src/Processors/Port.h b/dbms/src/Processors/Port.h index 37d1ea9bd46..ff5d1d8dee0 100644 --- a/dbms/src/Processors/Port.h +++ b/dbms/src/Processors/Port.h @@ -28,6 +28,25 @@ class Port friend void connect(OutputPort &, InputPort &); friend class IProcessor; +public: + struct UpdateInfo + { + std::vector * update_list = nullptr; + void * id = nullptr; + UInt64 version = 0; + UInt64 prev_version = 0; + + void inline ALWAYS_INLINE update() + { + if (version == prev_version && update_list) + update_list->push_back(id); + + ++version; + } + + void inline ALWAYS_INLINE trigger() { prev_version = version; } + }; + protected: /// Shared state of two connected ports. class State @@ -182,12 +201,17 @@ protected: IProcessor * processor = nullptr; + /// If update_info was set, will call update() for it in case port's state have changed. + UpdateInfo * update_info = nullptr; + public: using Data = State::Data; Port(Block header_) : header(std::move(header_)) {} Port(Block header_, IProcessor * processor_) : header(std::move(header_)), processor(processor_) {} + void setUpdateInfo(UpdateInfo * info) { update_info = info; } + const Block & getHeader() const { return header; } bool ALWAYS_INLINE isConnected() const { return state != nullptr; } @@ -216,6 +240,13 @@ public: throw Exception("Port does not belong to Processor", ErrorCodes::LOGICAL_ERROR); return *processor; } + +protected: + void inline ALWAYS_INLINE updateVersion() + { + if (likely(update_info)) + update_info->update(); + } }; /// Invariants: @@ -230,20 +261,14 @@ class InputPort : public Port private: OutputPort * output_port = nullptr; - /// If version was set, it will be increased on each pull. - UInt64 * version = nullptr; - mutable bool is_finished = false; public: using Port::Port; - void setVersion(UInt64 * value) { version = value; } - Data ALWAYS_INLINE pullData() { - if (version) - ++(*version); + updateVersion(); assumeConnected(); @@ -296,8 +321,8 @@ public: { assumeConnected(); - if ((state->setFlags(State::IS_NEEDED, State::IS_NEEDED) & State::IS_NEEDED) == 0 && version) - ++(*version); + if ((state->setFlags(State::IS_NEEDED, State::IS_NEEDED) & State::IS_NEEDED) == 0) + updateVersion(); } void ALWAYS_INLINE setNotNeeded() @@ -310,8 +335,8 @@ public: { assumeConnected(); - if ((state->setFlags(State::IS_FINISHED, State::IS_FINISHED) & State::IS_FINISHED) == 0 && version) - ++(*version); + if ((state->setFlags(State::IS_FINISHED, State::IS_FINISHED) & State::IS_FINISHED) == 0) + updateVersion(); is_finished = true; } @@ -353,14 +378,9 @@ class OutputPort : public Port private: InputPort * input_port = nullptr; - /// If version was set, it will be increased on each push. - UInt64 * version = nullptr; - public: using Port::Port; - void setVersion(UInt64 * value) { version = value; } - void ALWAYS_INLINE push(Chunk chunk) { pushData({.chunk = std::move(chunk), .exception = {}}); @@ -385,8 +405,7 @@ public: throw Exception(msg, ErrorCodes::LOGICAL_ERROR); } - if (version) - ++(*version); + updateVersion(); assumeConnected(); @@ -401,8 +420,8 @@ public: auto flags = state->setFlags(State::IS_FINISHED, State::IS_FINISHED); - if (version && (flags & State::IS_FINISHED) == 0) - ++(*version); + if ((flags & State::IS_FINISHED) == 0) + updateVersion(); } bool ALWAYS_INLINE isNeeded() const diff --git a/dbms/src/Processors/ResizeProcessor.cpp b/dbms/src/Processors/ResizeProcessor.cpp index b3cb3a1735d..59d1f0db75e 100644 --- a/dbms/src/Processors/ResizeProcessor.cpp +++ b/dbms/src/Processors/ResizeProcessor.cpp @@ -153,5 +153,109 @@ ResizeProcessor::Status ResizeProcessor::prepare() return get_status_if_no_inputs(); } +IProcessor::Status ResizeProcessor::prepare(const PortNumbers & updated_inputs, const PortNumbers & updated_outputs) +{ + if (!initialized) + { + initialized = true; + + for (auto & input : inputs) + { + input.setNeeded(); + input_ports.push_back({.port = &input, .status = InputStatus::NotActive}); + } + + for (auto & output : outputs) + output_ports.push_back({.port = &output, .status = OutputStatus::NotActive}); + } + + for (auto & output_number : updated_outputs) + { + auto & output = output_ports[output_number]; + if (output.port->isFinished()) + { + if (output.status != OutputStatus::Finished) + { + ++num_finished_outputs; + output.status = OutputStatus::Finished; + } + + continue; + } + + if (output.port->canPush()) + { + if (output.status != OutputStatus::NeedData) + { + output.status = OutputStatus::NeedData; + waiting_outputs.push(output_number); + } + } + } + + if (num_finished_outputs == outputs.size()) + { + for (auto & input : inputs) + input.close(); + + return Status::Finished; + } + + for (auto & input_number : updated_inputs) + { + auto & input = input_ports[input_number]; + if (input.port->isFinished()) + { + if (input.status != InputStatus::Finished) + { + input.status = InputStatus::Finished; + ++num_finished_inputs; + } + continue; + } + + if (input.port->hasData()) + { + if (input.status != InputStatus::HasData) + { + input.status = InputStatus::HasData; + inputs_with_data.push(input_number); + } + } + } + + while (!waiting_outputs.empty() && !inputs_with_data.empty()) + { + auto & waiting_output = output_ports[waiting_outputs.front()]; + waiting_outputs.pop(); + + auto & input_with_data = input_ports[inputs_with_data.front()]; + inputs_with_data.pop(); + + waiting_output.port->pushData(input_with_data.port->pullData()); + input_with_data.status = InputStatus::NotActive; + waiting_output.status = OutputStatus::NotActive; + + if (input_with_data.port->isFinished()) + { + input_with_data.status = InputStatus::Finished; + ++num_finished_inputs; + } + } + + if (num_finished_inputs == inputs.size()) + { + for (auto & output : outputs) + output.finish(); + + return Status::Finished; + } + + if (!waiting_outputs.empty()) + return Status::NeedData; + + return Status::PortFull; +} + } diff --git a/dbms/src/Processors/ResizeProcessor.h b/dbms/src/Processors/ResizeProcessor.h index 67574c384a1..3a9c906ecbd 100644 --- a/dbms/src/Processors/ResizeProcessor.h +++ b/dbms/src/Processors/ResizeProcessor.h @@ -1,6 +1,7 @@ #pragma once #include +#include namespace DB @@ -31,10 +32,46 @@ public: String getName() const override { return "Resize"; } Status prepare() override; + Status prepare(const PortNumbers &, const PortNumbers &) override; private: InputPorts::iterator current_input; OutputPorts::iterator current_output; + + size_t num_finished_inputs = 0; + size_t num_finished_outputs = 0; + std::queue waiting_outputs; + std::queue inputs_with_data; + bool initialized = false; + + enum class OutputStatus + { + NotActive, + NeedData, + Finished, + }; + + enum class InputStatus + { + NotActive, + HasData, + Finished, + }; + + struct InputPortWithStatus + { + InputPort * port; + InputStatus status; + }; + + struct OutputPortWithStatus + { + OutputPort * port; + OutputStatus status; + }; + + std::vector input_ports; + std::vector output_ports; }; } diff --git a/dbms/src/Processors/Transforms/FilterTransform.cpp b/dbms/src/Processors/Transforms/FilterTransform.cpp index 058df590f0c..9cad9f85f92 100644 --- a/dbms/src/Processors/Transforms/FilterTransform.cpp +++ b/dbms/src/Processors/Transforms/FilterTransform.cpp @@ -65,14 +65,23 @@ FilterTransform::FilterTransform( IProcessor::Status FilterTransform::prepare() { if (constant_filter_description.always_false - || expression->checkColumnIsAlwaysFalse(filter_column_name)) + /// Optimization for `WHERE column in (empty set)`. + /// The result will not change after set was created, so we can skip this check. + /// It is implemented in prepare() stop pipeline before reading from input port. + || (!are_prepared_sets_initialized && expression->checkColumnIsAlwaysFalse(filter_column_name))) { input.close(); output.finish(); return Status::Finished; } - return ISimpleTransform::prepare(); + auto status = ISimpleTransform::prepare(); + + /// Until prepared sets are initialized, output port will be unneeded, and prepare will return PortFull. + if (status != IProcessor::Status::PortFull) + are_prepared_sets_initialized = true; + + return status; } diff --git a/dbms/src/Processors/Transforms/FilterTransform.h b/dbms/src/Processors/Transforms/FilterTransform.h index 127eb5a8039..1652473aa3c 100644 --- a/dbms/src/Processors/Transforms/FilterTransform.h +++ b/dbms/src/Processors/Transforms/FilterTransform.h @@ -36,6 +36,8 @@ private: /// Header after expression, but before removing filter column. Block transformed_header; + bool are_prepared_sets_initialized = false; + void removeFilterIfNeed(Chunk & chunk); }; diff --git a/dbms/src/Processors/Transforms/MergingSortedTransform.h b/dbms/src/Processors/Transforms/MergingSortedTransform.h index 0991835bfaf..b32dd076c5f 100644 --- a/dbms/src/Processors/Transforms/MergingSortedTransform.h +++ b/dbms/src/Processors/Transforms/MergingSortedTransform.h @@ -59,8 +59,11 @@ protected: auto num_rows = chunk.getNumRows(); columns = chunk.mutateColumns(); if (limit_rows && num_rows > limit_rows) + { + num_rows = limit_rows; for (auto & column : columns) - column = (*column->cut(0, limit_rows)->convertToFullColumnIfConst()).mutate(); + column = (*column->cut(0, num_rows)->convertToFullColumnIfConst()).mutate(); + } total_merged_rows += num_rows; merged_rows = num_rows;