From 0016d99f5e98c8bf06a77259c27d5e6c7dcf85ee Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Mon, 2 Sep 2019 17:49:05 +0300 Subject: [PATCH] Add stream enumeration to Processors. --- .../Processors/Executors/PipelineExecutor.cpp | 31 +++------------ .../Processors/Executors/PipelineExecutor.h | 39 +++++++++++-------- dbms/src/Processors/IProcessor.h | 7 ++++ dbms/src/Processors/QueryPipeline.cpp | 10 +++-- ...gingAggregatedMemoryEfficientTransform.cpp | 1 + 5 files changed, 44 insertions(+), 44 deletions(-) diff --git a/dbms/src/Processors/Executors/PipelineExecutor.cpp b/dbms/src/Processors/Executors/PipelineExecutor.cpp index ae2cdb5fd98..1b7a22a3ca2 100644 --- a/dbms/src/Processors/Executors/PipelineExecutor.cpp +++ b/dbms/src/Processors/Executors/PipelineExecutor.cpp @@ -474,32 +474,13 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads /// Just travers graph and prepare any processor. while (!finished) { - auto pushed = task_queue.num_pushed.load(); - bool found = false; - - while (pushed > task_queue.num_popped.load()) - { - /// Fast branch. - if (task_queue.pop(state)) - { - found = true; - break; - } - } - - if (found) - break; - std::unique_lock lock(task_queue_mutex); - auto popped = task_queue.num_popped.load(); - -// if (!task_queue.empty()) -// { -// state = task_queue.front(); -// task_queue.pop(); -// break; -// } + if (!task_queue.empty()) + { + state = task_queue.pop(thread_num); + break; + } ++num_waiting_threads; @@ -513,7 +494,7 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads task_queue_condvar.wait(lock, [&]() { - return finished || popped < task_queue.num_pushed.load(); + return finished || !task_queue.empty(); }); --num_waiting_threads; diff --git a/dbms/src/Processors/Executors/PipelineExecutor.h b/dbms/src/Processors/Executors/PipelineExecutor.h index 18156c8b7c8..e6ba90ad7d1 100644 --- a/dbms/src/Processors/Executors/PipelineExecutor.h +++ b/dbms/src/Processors/Executors/PipelineExecutor.h @@ -120,26 +120,33 @@ private: struct TaskQueue { - bool pop(ExecutionState *& state) - { - if (stack.pop(state)) - { - num_popped.fetch_add(1); - return true; - } - - return false; - } - void push(ExecutionState * state) { - stack.push(state); - num_pushed.fetch_add(1); + map.emplace(state->processor->getStream(), state); } - lfs::LFStack stack; - std::atomic num_pushed {0}; - std::atomic num_popped {0}; + ExecutionState * pop(size_t stream) + { + auto it = map.find(stream); + + if (it == map.end()) + it = map.find(IProcessor::NO_STREAM); + + if (it == map.end()) + it = map.begin(); + + if (it == map.end()) + return nullptr; + + auto res = it->second; + map.erase(it); + + return res; + } + + bool empty() const { return map.empty(); } + + std::unordered_multimap map; }; /// Queue with pointers to tasks. Each thread will concurrently read from it until finished flag is set. diff --git a/dbms/src/Processors/IProcessor.h b/dbms/src/Processors/IProcessor.h index 18d34f29c99..174862f70ec 100644 --- a/dbms/src/Processors/IProcessor.h +++ b/dbms/src/Processors/IProcessor.h @@ -228,10 +228,17 @@ public: void setDescription(const std::string & description_) { processor_description = description_; } const std::string & getDescription() const { return processor_description; } + /// Helpers for pipeline executor. + void setStream(size_t value) { stream = value; } + size_t getStream() const { return stream; } + constexpr static size_t NO_STREAM = std::numeric_limits::max(); + private: std::atomic is_cancelled{false}; std::string processor_description; + + size_t stream = NO_STREAM; }; diff --git a/dbms/src/Processors/QueryPipeline.cpp b/dbms/src/Processors/QueryPipeline.cpp index cb9be19088c..ef50062a50d 100644 --- a/dbms/src/Processors/QueryPipeline.cpp +++ b/dbms/src/Processors/QueryPipeline.cpp @@ -75,6 +75,7 @@ void QueryPipeline::init(Processors sources) totals.emplace_back(&source->getOutputs().back()); } + source->setStream(streams.size()); streams.emplace_back(&source->getOutputs().front()); processors.emplace_back(std::move(source)); } @@ -115,7 +116,7 @@ void QueryPipeline::addSimpleTransformImpl(const TProcessorGetter & getter) Block header; - auto add_transform = [&](OutputPort *& stream, StreamType stream_type) + auto add_transform = [&](OutputPort *& stream, StreamType stream_type, size_t stream_num = IProcessor::NO_STREAM) { if (!stream) return; @@ -148,14 +149,17 @@ void QueryPipeline::addSimpleTransformImpl(const TProcessorGetter & getter) if (transform) { + if (stream_type == StreamType::Main) + transform->setStream(stream_num); + connect(*stream, transform->getInputs().front()); stream = &transform->getOutputs().front(); processors.emplace_back(std::move(transform)); } }; - for (auto & stream : streams) - add_transform(stream, StreamType::Main); + for (size_t stream_num = 0; stream_num < streams.size(); ++stream_num) + add_transform(streams[stream_num], StreamType::Main, stream_num); add_transform(delayed_stream_port, StreamType::Main); add_transform(totals_having_port, StreamType::Totals); diff --git a/dbms/src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.cpp b/dbms/src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.cpp index 735dafdee14..d9b5fbc330e 100644 --- a/dbms/src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.cpp +++ b/dbms/src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.cpp @@ -507,6 +507,7 @@ Processors createMergingAggregatedMemoryEfficientPipe( for (size_t i = 0; i < num_merging_processors; ++i, ++in, ++out) { auto transform = std::make_shared(params); + transform->setStream(i); connect(*out, transform->getInputPort()); connect(transform->getOutputPort(), *in); processors.emplace_back(std::move(transform));