From 5d763dead06fafb0ffe42f7e6b76dfa1232dd67e Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 19 Mar 2020 14:45:52 +0300 Subject: [PATCH] Change the way how rows_before_limit_at_least is calculated for processors pipeline. --- dbms/programs/server/TCPHandler.cpp | 2 - .../Interpreters/InterpreterSelectQuery.cpp | 16 +++-- dbms/src/Interpreters/executeQuery.cpp | 2 - dbms/src/Processors/Formats/IOutputFormat.cpp | 3 + dbms/src/Processors/Formats/IOutputFormat.h | 6 ++ dbms/src/Processors/LimitTransform.cpp | 4 +- dbms/src/Processors/LimitTransform.h | 5 +- dbms/src/Processors/QueryPipeline.cpp | 66 ++++++++----------- dbms/src/Processors/QueryPipeline.h | 5 +- dbms/src/Processors/RowsBeforeLimitCounter.h | 28 ++++++++ .../Sources/SourceFromInputStream.cpp | 14 ++++ .../Sources/SourceFromInputStream.h | 5 ++ .../Transforms/PartialSortingTransform.cpp | 8 +-- .../Transforms/PartialSortingTransform.h | 15 ++--- 14 files changed, 111 insertions(+), 68 deletions(-) create mode 100644 dbms/src/Processors/RowsBeforeLimitCounter.h diff --git a/dbms/programs/server/TCPHandler.cpp b/dbms/programs/server/TCPHandler.cpp index cb2df69d5d1..40ed50e45a2 100644 --- a/dbms/programs/server/TCPHandler.cpp +++ b/dbms/programs/server/TCPHandler.cpp @@ -647,8 +647,6 @@ void TCPHandler::processOrdinaryQueryWithProcessors(size_t num_threads) */ if (!isQueryCancelled()) { - pipeline.finalize(); - sendTotals(lazy_format->getTotals()); sendExtremes(lazy_format->getExtremes()); sendProfileInfo(lazy_format->getProfileInfo()); diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 3adc2eadbfc..085ebe52557 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -2046,10 +2046,12 @@ void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline, InputSorting if (need_finish_sorting) { - pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) + pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr { - bool do_count_rows = stream_type == QueryPipeline::StreamType::Main; - return std::make_shared(header, output_order_descr, limit, do_count_rows); + if (stream_type != QueryPipeline::StreamType::Main) + return nullptr; + + return std::make_shared(header, output_order_descr, limit); }); pipeline.addSimpleTransform([&](const Block & header) -> ProcessorPtr @@ -2063,10 +2065,12 @@ void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline, InputSorting return; } - pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) + pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr { - bool do_count_rows = stream_type == QueryPipeline::StreamType::Main; - return std::make_shared(header, output_order_descr, limit, do_count_rows); + if (stream_type != QueryPipeline::StreamType::Main) + return nullptr; + + return std::make_shared(header, output_order_descr, limit); }); /// Merge the sorted blocks. diff --git a/dbms/src/Interpreters/executeQuery.cpp b/dbms/src/Interpreters/executeQuery.cpp index cfd588078ff..fefca6b580f 100644 --- a/dbms/src/Interpreters/executeQuery.cpp +++ b/dbms/src/Interpreters/executeQuery.cpp @@ -748,8 +748,6 @@ void executeQuery( auto executor = pipeline.execute(); executor->execute(context.getSettingsRef().max_threads); } - - pipeline.finalize(); } } catch (...) diff --git a/dbms/src/Processors/Formats/IOutputFormat.cpp b/dbms/src/Processors/Formats/IOutputFormat.cpp index 971ad95d946..ff4ac393471 100644 --- a/dbms/src/Processors/Formats/IOutputFormat.cpp +++ b/dbms/src/Processors/Formats/IOutputFormat.cpp @@ -48,6 +48,9 @@ void IOutputFormat::work() { if (finished && !finalized) { + if (rows_before_limit_counter && rows_before_limit_counter->hasAppliedLimit()) + setRowsBeforeLimit(rows_before_limit_counter->get()); + finalize(); finalized = true; return; diff --git a/dbms/src/Processors/Formats/IOutputFormat.h b/dbms/src/Processors/Formats/IOutputFormat.h index 5200b897643..1137dd78446 100644 --- a/dbms/src/Processors/Formats/IOutputFormat.h +++ b/dbms/src/Processors/Formats/IOutputFormat.h @@ -2,6 +2,7 @@ #include #include +#include #include @@ -33,6 +34,8 @@ protected: bool finished = false; bool finalized = false; + RowsBeforeLimitCounterPtr rows_before_limit_counter; + virtual void consume(Chunk) = 0; virtual void consumeTotals(Chunk) {} virtual void consumeExtremes(Chunk) {} @@ -50,6 +53,9 @@ public: /// Value for rows_before_limit_at_least field. virtual void setRowsBeforeLimit(size_t /*rows_before_limit*/) {} + /// Counter to calculate rows_before_limit_at_least in processors pipeline. + void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) { rows_before_limit_counter.swap(counter); } + /// Notify about progress. Method could be called from different threads. /// Passed value are delta, that must be summarized. virtual void onProgress(const Progress & /*progress*/) {} diff --git a/dbms/src/Processors/LimitTransform.cpp b/dbms/src/Processors/LimitTransform.cpp index 219a663198f..fe8990f7b0f 100644 --- a/dbms/src/Processors/LimitTransform.cpp +++ b/dbms/src/Processors/LimitTransform.cpp @@ -185,7 +185,9 @@ LimitTransform::Status LimitTransform::preparePair(PortsData & data) data.current_chunk = input.pull(true); auto rows = data.current_chunk.getNumRows(); - rows_before_limit_at_least += rows; + + if (rows_before_limit_at_least) + rows_before_limit_at_least->add(rows); /// Skip block (for 'always_read_till_end' case). if (is_limit_reached || output_finished) diff --git a/dbms/src/Processors/LimitTransform.h b/dbms/src/Processors/LimitTransform.h index 1e7ec3bf322..a6989483c00 100644 --- a/dbms/src/Processors/LimitTransform.h +++ b/dbms/src/Processors/LimitTransform.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include namespace DB @@ -29,7 +30,7 @@ private: std::vector sort_column_positions; size_t rows_read = 0; /// including the last read block - size_t rows_before_limit_at_least = 0; + RowsBeforeLimitCounterPtr rows_before_limit_at_least; /// State of port's pair. /// Chunks from different port pairs are not mixed for berret cache locality. @@ -65,7 +66,7 @@ public: InputPort & getInputPort() { return inputs.front(); } OutputPort & getOutputPort() { return outputs.front(); } - size_t getRowsBeforeLimitAtLeast() const { return rows_before_limit_at_least; } + void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) { rows_before_limit_at_least.swap(counter); } }; } diff --git a/dbms/src/Processors/QueryPipeline.cpp b/dbms/src/Processors/QueryPipeline.cpp index 8398c2359ac..6b2ee284c2a 100644 --- a/dbms/src/Processors/QueryPipeline.cpp +++ b/dbms/src/Processors/QueryPipeline.cpp @@ -19,6 +19,7 @@ #include #include #include +#include namespace DB { @@ -438,6 +439,8 @@ void QueryPipeline::setOutput(ProcessorPtr output) connect(*streams.front(), main); connect(*totals_having_port, totals); connect(*extremes_port, extremes); + + initRowsBeforeLimit(); } void QueryPipeline::unitePipelines( @@ -552,25 +555,12 @@ void QueryPipeline::setProcessListElement(QueryStatus * elem) } } -void QueryPipeline::finalize() +void QueryPipeline::initRowsBeforeLimit() { - checkInitialized(); + RowsBeforeLimitCounterPtr rows_before_limit_at_least; - if (!output_format) - throw Exception("Cannot finalize pipeline because it doesn't have output.", ErrorCodes::LOGICAL_ERROR); - - calcRowsBeforeLimit(); -} - -void QueryPipeline::calcRowsBeforeLimit() -{ - /// TODO get from Remote - - UInt64 rows_before_limit_at_least = 0; - UInt64 rows_before_limit = 0; - - bool has_limit = false; - bool has_partial_sorting = false; + std::vector limits; + std::vector sources; std::unordered_set visited; @@ -593,30 +583,22 @@ void QueryPipeline::calcRowsBeforeLimit() if (!visited_limit) { - if (auto * limit = typeid_cast(processor)) + if (auto * limit = typeid_cast(processor)) { - has_limit = visited_limit = true; - rows_before_limit_at_least += limit->getRowsBeforeLimitAtLeast(); + visited_limit = true; + limits.emplace_back(limit); } if (auto * source = typeid_cast(processor)) - { - if (auto & stream = source->getStream()) - { - auto & info = stream->getProfileInfo(); - if (info.hasAppliedLimit()) - { - has_limit = visited_limit = true; - rows_before_limit_at_least += info.getRowsBeforeLimit(); - } - } - } + sources.emplace_back(source); } - if (auto * sorting = typeid_cast(processor)) + if (auto * sorting = typeid_cast(processor)) { - has_partial_sorting = true; - rows_before_limit += sorting->getNumReadRows(); + if (!rows_before_limit_at_least) + rows_before_limit_at_least = std::make_shared(); + + sorting->setRowsBeforeLimitCounter(rows_before_limit_at_least); /// Don't go to children. Take rows_before_limit from last PartialSortingTransform. /// continue; @@ -640,9 +622,19 @@ void QueryPipeline::calcRowsBeforeLimit() } } - /// Get num read rows from PartialSortingTransform if have it. - if (has_limit) - output_format->setRowsBeforeLimit(has_partial_sorting ? rows_before_limit : rows_before_limit_at_least); + if (!rows_before_limit_at_least && (!limits.empty() && !sources.empty())) + { + rows_before_limit_at_least = std::make_shared(); + + for (auto & limit : limits) + limit->setRowsBeforeLimitCounter(rows_before_limit_at_least); + + for (auto & source : sources) + source->setRowsBeforeLimitCounter(rows_before_limit_at_least); + } + + if (rows_before_limit_at_least) + output_format->setRowsBeforeLimitCounter(rows_before_limit_at_least); } Pipe QueryPipeline::getPipe() && diff --git a/dbms/src/Processors/QueryPipeline.h b/dbms/src/Processors/QueryPipeline.h index be90e07f281..e01087b717a 100644 --- a/dbms/src/Processors/QueryPipeline.h +++ b/dbms/src/Processors/QueryPipeline.h @@ -140,9 +140,6 @@ public: void setProgressCallback(const ProgressCallback & callback); void setProcessListElement(QueryStatus * elem); - /// Call after execution. - void finalize(); - /// Recommend number of threads for pipeline execution. size_t getNumThreads() const { @@ -200,7 +197,7 @@ private: template void addSimpleTransformImpl(const TProcessorGetter & getter); - void calcRowsBeforeLimit(); + void initRowsBeforeLimit(); }; } diff --git a/dbms/src/Processors/RowsBeforeLimitCounter.h b/dbms/src/Processors/RowsBeforeLimitCounter.h new file mode 100644 index 00000000000..1408010cec7 --- /dev/null +++ b/dbms/src/Processors/RowsBeforeLimitCounter.h @@ -0,0 +1,28 @@ +#include +#include + +namespace DB +{ + +class RowsBeforeLimitCounter +{ +public: + void add(uint64_t rows) + { + setAppliedLimit(); + rows_before_limit.fetch_add(rows, std::memory_order_release); + } + + uint64_t get() const { return rows_before_limit.load(std::memory_order_acquire); } + + bool setAppliedLimit() { has_applied_limit.store(true, std::memory_order::release); } + bool hasAppliedLimit() const { return has_applied_limit.load(std::memory_order_acquire); } + +private: + std::atomic rows_before_limit = 0; + std::atomic_bool has_applied_limit = false; +}; + +using RowsBeforeLimitCounterPtr = std::shared_ptr; + +} diff --git a/dbms/src/Processors/Sources/SourceFromInputStream.cpp b/dbms/src/Processors/Sources/SourceFromInputStream.cpp index 3cc050aaf95..6f2a7eeb28a 100644 --- a/dbms/src/Processors/Sources/SourceFromInputStream.cpp +++ b/dbms/src/Processors/Sources/SourceFromInputStream.cpp @@ -95,6 +95,13 @@ void SourceFromInputStream::work() if (!typeid_cast(stream.get())) stream->cancel(false); + if (rows_before_limit) + { + auto & info = stream->getProfileInfo(); + if (info.hasAppliedLimit()) + rows_before_limit->add(info.getRowsBeforeLimit()); + } + stream->readSuffix(); if (auto totals_block = stream->getTotals()) @@ -120,6 +127,13 @@ Chunk SourceFromInputStream::generate() auto block = stream->read(); if (!block && !isCancelled()) { + if (rows_before_limit) + { + auto & info = stream->getProfileInfo(); + if (info.hasAppliedLimit()) + rows_before_limit->add(info.getRowsBeforeLimit()); + } + stream->readSuffix(); if (auto totals_block = stream->getTotals()) diff --git a/dbms/src/Processors/Sources/SourceFromInputStream.h b/dbms/src/Processors/Sources/SourceFromInputStream.h index 00b03220cec..83e7f9929c9 100644 --- a/dbms/src/Processors/Sources/SourceFromInputStream.h +++ b/dbms/src/Processors/Sources/SourceFromInputStream.h @@ -1,5 +1,6 @@ #pragma once #include +#include namespace DB { @@ -23,6 +24,8 @@ public: void addTotalsPort(); + void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) { rows_before_limit.swap(counter); } + /// Implementation for methods from ISourceWithProgress. void setLimits(const LocalLimits & limits_) final { stream->setLimits(limits_); } void setQuota(const QuotaContextPtr & quota_) final { stream->setQuota(quota_); } @@ -38,6 +41,8 @@ private: bool force_add_aggregating_info = false; BlockInputStreamPtr stream; + RowsBeforeLimitCounterPtr rows_before_limit; + Chunk totals; bool has_totals_port = false; bool has_totals = false; diff --git a/dbms/src/Processors/Transforms/PartialSortingTransform.cpp b/dbms/src/Processors/Transforms/PartialSortingTransform.cpp index 7e217ea0a07..018614f0165 100644 --- a/dbms/src/Processors/Transforms/PartialSortingTransform.cpp +++ b/dbms/src/Processors/Transforms/PartialSortingTransform.cpp @@ -5,16 +5,16 @@ namespace DB { PartialSortingTransform::PartialSortingTransform( - const Block & header_, SortDescription & description_, UInt64 limit_, bool do_count_rows_) + const Block & header_, SortDescription & description_, UInt64 limit_) : ISimpleTransform(header_, header_, false) - , description(description_), limit(limit_), do_count_rows(do_count_rows_) + , description(description_), limit(limit_) { } void PartialSortingTransform::transform(Chunk & chunk) { - if (do_count_rows) - read_rows += chunk.getNumRows(); + if (read_rows) + read_rows->add(chunk.getNumRows()); auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()); chunk.clear(); diff --git a/dbms/src/Processors/Transforms/PartialSortingTransform.h b/dbms/src/Processors/Transforms/PartialSortingTransform.h index 905b294c0be..158b0af202e 100644 --- a/dbms/src/Processors/Transforms/PartialSortingTransform.h +++ b/dbms/src/Processors/Transforms/PartialSortingTransform.h @@ -1,5 +1,6 @@ #pragma once #include +#include #include namespace DB @@ -12,17 +13,15 @@ class PartialSortingTransform : public ISimpleTransform { public: /// limit - if not 0, then you can sort each block not completely, but only `limit` first rows by order. - /// When count_rows is false, getNumReadRows() will always return 0. + /// When count_rows is false, read_rows is not changed. It is needed PartialSortingTransform( const Block & header_, SortDescription & description_, - UInt64 limit_ = 0, - bool do_count_rows_ = true); + UInt64 limit_ = 0); String getName() const override { return "PartialSortingTransform"; } - /// Total num rows passed to transform. - UInt64 getNumReadRows() const { return read_rows; } + void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) { read_rows.swap(counter); } protected: void transform(Chunk & chunk) override; @@ -30,11 +29,7 @@ protected: private: SortDescription description; UInt64 limit; - UInt64 read_rows = 0; - - /// Do we need calculate read_rows value? - /// Used to skip total row when count rows_before_limit_at_least. - bool do_count_rows; + RowsBeforeLimitCounterPtr read_rows; }; }