mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Change the way how rows_before_limit_at_least is calculated for processors pipeline.
This commit is contained in:
parent
638d1e6466
commit
5d763dead0
@ -647,8 +647,6 @@ void TCPHandler::processOrdinaryQueryWithProcessors(size_t num_threads)
|
||||
*/
|
||||
if (!isQueryCancelled())
|
||||
{
|
||||
pipeline.finalize();
|
||||
|
||||
sendTotals(lazy_format->getTotals());
|
||||
sendExtremes(lazy_format->getExtremes());
|
||||
sendProfileInfo(lazy_format->getProfileInfo());
|
||||
|
@ -2046,10 +2046,12 @@ void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline, InputSorting
|
||||
|
||||
if (need_finish_sorting)
|
||||
{
|
||||
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type)
|
||||
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr
|
||||
{
|
||||
bool do_count_rows = stream_type == QueryPipeline::StreamType::Main;
|
||||
return std::make_shared<PartialSortingTransform>(header, output_order_descr, limit, do_count_rows);
|
||||
if (stream_type != QueryPipeline::StreamType::Main)
|
||||
return nullptr;
|
||||
|
||||
return std::make_shared<PartialSortingTransform>(header, output_order_descr, limit);
|
||||
});
|
||||
|
||||
pipeline.addSimpleTransform([&](const Block & header) -> ProcessorPtr
|
||||
@ -2063,10 +2065,12 @@ void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline, InputSorting
|
||||
return;
|
||||
}
|
||||
|
||||
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type)
|
||||
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr
|
||||
{
|
||||
bool do_count_rows = stream_type == QueryPipeline::StreamType::Main;
|
||||
return std::make_shared<PartialSortingTransform>(header, output_order_descr, limit, do_count_rows);
|
||||
if (stream_type != QueryPipeline::StreamType::Main)
|
||||
return nullptr;
|
||||
|
||||
return std::make_shared<PartialSortingTransform>(header, output_order_descr, limit);
|
||||
});
|
||||
|
||||
/// Merge the sorted blocks.
|
||||
|
@ -748,8 +748,6 @@ void executeQuery(
|
||||
auto executor = pipeline.execute();
|
||||
executor->execute(context.getSettingsRef().max_threads);
|
||||
}
|
||||
|
||||
pipeline.finalize();
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
|
@ -48,6 +48,9 @@ void IOutputFormat::work()
|
||||
{
|
||||
if (finished && !finalized)
|
||||
{
|
||||
if (rows_before_limit_counter && rows_before_limit_counter->hasAppliedLimit())
|
||||
setRowsBeforeLimit(rows_before_limit_counter->get());
|
||||
|
||||
finalize();
|
||||
finalized = true;
|
||||
return;
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
#include <string>
|
||||
#include <Processors/IProcessor.h>
|
||||
#include <Processors/RowsBeforeLimitCounter.h>
|
||||
#include <IO/Progress.h>
|
||||
|
||||
|
||||
@ -33,6 +34,8 @@ protected:
|
||||
bool finished = false;
|
||||
bool finalized = false;
|
||||
|
||||
RowsBeforeLimitCounterPtr rows_before_limit_counter;
|
||||
|
||||
virtual void consume(Chunk) = 0;
|
||||
virtual void consumeTotals(Chunk) {}
|
||||
virtual void consumeExtremes(Chunk) {}
|
||||
@ -50,6 +53,9 @@ public:
|
||||
/// Value for rows_before_limit_at_least field.
|
||||
virtual void setRowsBeforeLimit(size_t /*rows_before_limit*/) {}
|
||||
|
||||
/// Counter to calculate rows_before_limit_at_least in processors pipeline.
|
||||
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) { rows_before_limit_counter.swap(counter); }
|
||||
|
||||
/// Notify about progress. Method could be called from different threads.
|
||||
/// Passed value are delta, that must be summarized.
|
||||
virtual void onProgress(const Progress & /*progress*/) {}
|
||||
|
@ -185,7 +185,9 @@ LimitTransform::Status LimitTransform::preparePair(PortsData & data)
|
||||
data.current_chunk = input.pull(true);
|
||||
|
||||
auto rows = data.current_chunk.getNumRows();
|
||||
rows_before_limit_at_least += rows;
|
||||
|
||||
if (rows_before_limit_at_least)
|
||||
rows_before_limit_at_least->add(rows);
|
||||
|
||||
/// Skip block (for 'always_read_till_end' case).
|
||||
if (is_limit_reached || output_finished)
|
||||
|
@ -1,6 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <Processors/IProcessor.h>
|
||||
#include <Processors/RowsBeforeLimitCounter.h>
|
||||
#include <Core/SortDescription.h>
|
||||
|
||||
namespace DB
|
||||
@ -29,7 +30,7 @@ private:
|
||||
std::vector<size_t> sort_column_positions;
|
||||
|
||||
size_t rows_read = 0; /// including the last read block
|
||||
size_t rows_before_limit_at_least = 0;
|
||||
RowsBeforeLimitCounterPtr rows_before_limit_at_least;
|
||||
|
||||
/// State of port's pair.
|
||||
/// Chunks from different port pairs are not mixed for berret cache locality.
|
||||
@ -65,7 +66,7 @@ public:
|
||||
InputPort & getInputPort() { return inputs.front(); }
|
||||
OutputPort & getOutputPort() { return outputs.front(); }
|
||||
|
||||
size_t getRowsBeforeLimitAtLeast() const { return rows_before_limit_at_least; }
|
||||
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) { rows_before_limit_at_least.swap(counter); }
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -19,6 +19,7 @@
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <Common/CurrentThread.h>
|
||||
#include <Processors/DelayedPortsProcessor.h>
|
||||
#include <Processors/RowsBeforeLimitCounter.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -438,6 +439,8 @@ void QueryPipeline::setOutput(ProcessorPtr output)
|
||||
connect(*streams.front(), main);
|
||||
connect(*totals_having_port, totals);
|
||||
connect(*extremes_port, extremes);
|
||||
|
||||
initRowsBeforeLimit();
|
||||
}
|
||||
|
||||
void QueryPipeline::unitePipelines(
|
||||
@ -552,25 +555,12 @@ void QueryPipeline::setProcessListElement(QueryStatus * elem)
|
||||
}
|
||||
}
|
||||
|
||||
void QueryPipeline::finalize()
|
||||
void QueryPipeline::initRowsBeforeLimit()
|
||||
{
|
||||
checkInitialized();
|
||||
RowsBeforeLimitCounterPtr rows_before_limit_at_least;
|
||||
|
||||
if (!output_format)
|
||||
throw Exception("Cannot finalize pipeline because it doesn't have output.", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
calcRowsBeforeLimit();
|
||||
}
|
||||
|
||||
void QueryPipeline::calcRowsBeforeLimit()
|
||||
{
|
||||
/// TODO get from Remote
|
||||
|
||||
UInt64 rows_before_limit_at_least = 0;
|
||||
UInt64 rows_before_limit = 0;
|
||||
|
||||
bool has_limit = false;
|
||||
bool has_partial_sorting = false;
|
||||
std::vector<LimitTransform *> limits;
|
||||
std::vector<SourceFromInputStream *> sources;
|
||||
|
||||
std::unordered_set<IProcessor *> visited;
|
||||
|
||||
@ -593,30 +583,22 @@ void QueryPipeline::calcRowsBeforeLimit()
|
||||
|
||||
if (!visited_limit)
|
||||
{
|
||||
if (auto * limit = typeid_cast<const LimitTransform *>(processor))
|
||||
if (auto * limit = typeid_cast<LimitTransform *>(processor))
|
||||
{
|
||||
has_limit = visited_limit = true;
|
||||
rows_before_limit_at_least += limit->getRowsBeforeLimitAtLeast();
|
||||
visited_limit = true;
|
||||
limits.emplace_back(limit);
|
||||
}
|
||||
|
||||
if (auto * source = typeid_cast<SourceFromInputStream *>(processor))
|
||||
{
|
||||
if (auto & stream = source->getStream())
|
||||
{
|
||||
auto & info = stream->getProfileInfo();
|
||||
if (info.hasAppliedLimit())
|
||||
{
|
||||
has_limit = visited_limit = true;
|
||||
rows_before_limit_at_least += info.getRowsBeforeLimit();
|
||||
}
|
||||
}
|
||||
}
|
||||
sources.emplace_back(source);
|
||||
}
|
||||
|
||||
if (auto * sorting = typeid_cast<const PartialSortingTransform *>(processor))
|
||||
if (auto * sorting = typeid_cast<PartialSortingTransform *>(processor))
|
||||
{
|
||||
has_partial_sorting = true;
|
||||
rows_before_limit += sorting->getNumReadRows();
|
||||
if (!rows_before_limit_at_least)
|
||||
rows_before_limit_at_least = std::make_shared<RowsBeforeLimitCounter>();
|
||||
|
||||
sorting->setRowsBeforeLimitCounter(rows_before_limit_at_least);
|
||||
|
||||
/// Don't go to children. Take rows_before_limit from last PartialSortingTransform.
|
||||
/// continue;
|
||||
@ -640,9 +622,19 @@ void QueryPipeline::calcRowsBeforeLimit()
|
||||
}
|
||||
}
|
||||
|
||||
/// Get num read rows from PartialSortingTransform if have it.
|
||||
if (has_limit)
|
||||
output_format->setRowsBeforeLimit(has_partial_sorting ? rows_before_limit : rows_before_limit_at_least);
|
||||
if (!rows_before_limit_at_least && (!limits.empty() && !sources.empty()))
|
||||
{
|
||||
rows_before_limit_at_least = std::make_shared<RowsBeforeLimitCounter>();
|
||||
|
||||
for (auto & limit : limits)
|
||||
limit->setRowsBeforeLimitCounter(rows_before_limit_at_least);
|
||||
|
||||
for (auto & source : sources)
|
||||
source->setRowsBeforeLimitCounter(rows_before_limit_at_least);
|
||||
}
|
||||
|
||||
if (rows_before_limit_at_least)
|
||||
output_format->setRowsBeforeLimitCounter(rows_before_limit_at_least);
|
||||
}
|
||||
|
||||
Pipe QueryPipeline::getPipe() &&
|
||||
|
@ -140,9 +140,6 @@ public:
|
||||
void setProgressCallback(const ProgressCallback & callback);
|
||||
void setProcessListElement(QueryStatus * elem);
|
||||
|
||||
/// Call after execution.
|
||||
void finalize();
|
||||
|
||||
/// Recommend number of threads for pipeline execution.
|
||||
size_t getNumThreads() const
|
||||
{
|
||||
@ -200,7 +197,7 @@ private:
|
||||
template <typename TProcessorGetter>
|
||||
void addSimpleTransformImpl(const TProcessorGetter & getter);
|
||||
|
||||
void calcRowsBeforeLimit();
|
||||
void initRowsBeforeLimit();
|
||||
};
|
||||
|
||||
}
|
||||
|
28
dbms/src/Processors/RowsBeforeLimitCounter.h
Normal file
28
dbms/src/Processors/RowsBeforeLimitCounter.h
Normal file
@ -0,0 +1,28 @@
|
||||
#include <atomic>
|
||||
#include <memory>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class RowsBeforeLimitCounter
|
||||
{
|
||||
public:
|
||||
void add(uint64_t rows)
|
||||
{
|
||||
setAppliedLimit();
|
||||
rows_before_limit.fetch_add(rows, std::memory_order_release);
|
||||
}
|
||||
|
||||
uint64_t get() const { return rows_before_limit.load(std::memory_order_acquire); }
|
||||
|
||||
bool setAppliedLimit() { has_applied_limit.store(true, std::memory_order::release); }
|
||||
bool hasAppliedLimit() const { return has_applied_limit.load(std::memory_order_acquire); }
|
||||
|
||||
private:
|
||||
std::atomic<uint64_t> rows_before_limit = 0;
|
||||
std::atomic_bool has_applied_limit = false;
|
||||
};
|
||||
|
||||
using RowsBeforeLimitCounterPtr = std::shared_ptr<RowsBeforeLimitCounter>;
|
||||
|
||||
}
|
@ -95,6 +95,13 @@ void SourceFromInputStream::work()
|
||||
if (!typeid_cast<const RemoteBlockInputStream *>(stream.get()))
|
||||
stream->cancel(false);
|
||||
|
||||
if (rows_before_limit)
|
||||
{
|
||||
auto & info = stream->getProfileInfo();
|
||||
if (info.hasAppliedLimit())
|
||||
rows_before_limit->add(info.getRowsBeforeLimit());
|
||||
}
|
||||
|
||||
stream->readSuffix();
|
||||
|
||||
if (auto totals_block = stream->getTotals())
|
||||
@ -120,6 +127,13 @@ Chunk SourceFromInputStream::generate()
|
||||
auto block = stream->read();
|
||||
if (!block && !isCancelled())
|
||||
{
|
||||
if (rows_before_limit)
|
||||
{
|
||||
auto & info = stream->getProfileInfo();
|
||||
if (info.hasAppliedLimit())
|
||||
rows_before_limit->add(info.getRowsBeforeLimit());
|
||||
}
|
||||
|
||||
stream->readSuffix();
|
||||
|
||||
if (auto totals_block = stream->getTotals())
|
||||
|
@ -1,5 +1,6 @@
|
||||
#pragma once
|
||||
#include <Processors/Sources/SourceWithProgress.h>
|
||||
#include <Processors/RowsBeforeLimitCounter.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -23,6 +24,8 @@ public:
|
||||
|
||||
void addTotalsPort();
|
||||
|
||||
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) { rows_before_limit.swap(counter); }
|
||||
|
||||
/// Implementation for methods from ISourceWithProgress.
|
||||
void setLimits(const LocalLimits & limits_) final { stream->setLimits(limits_); }
|
||||
void setQuota(const QuotaContextPtr & quota_) final { stream->setQuota(quota_); }
|
||||
@ -38,6 +41,8 @@ private:
|
||||
bool force_add_aggregating_info = false;
|
||||
BlockInputStreamPtr stream;
|
||||
|
||||
RowsBeforeLimitCounterPtr rows_before_limit;
|
||||
|
||||
Chunk totals;
|
||||
bool has_totals_port = false;
|
||||
bool has_totals = false;
|
||||
|
@ -5,16 +5,16 @@ namespace DB
|
||||
{
|
||||
|
||||
PartialSortingTransform::PartialSortingTransform(
|
||||
const Block & header_, SortDescription & description_, UInt64 limit_, bool do_count_rows_)
|
||||
const Block & header_, SortDescription & description_, UInt64 limit_)
|
||||
: ISimpleTransform(header_, header_, false)
|
||||
, description(description_), limit(limit_), do_count_rows(do_count_rows_)
|
||||
, description(description_), limit(limit_)
|
||||
{
|
||||
}
|
||||
|
||||
void PartialSortingTransform::transform(Chunk & chunk)
|
||||
{
|
||||
if (do_count_rows)
|
||||
read_rows += chunk.getNumRows();
|
||||
if (read_rows)
|
||||
read_rows->add(chunk.getNumRows());
|
||||
|
||||
auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
|
||||
chunk.clear();
|
||||
|
@ -1,5 +1,6 @@
|
||||
#pragma once
|
||||
#include <Processors/ISimpleTransform.h>
|
||||
#include <Processors/RowsBeforeLimitCounter.h>
|
||||
#include <Core/SortDescription.h>
|
||||
|
||||
namespace DB
|
||||
@ -12,17 +13,15 @@ class PartialSortingTransform : public ISimpleTransform
|
||||
{
|
||||
public:
|
||||
/// limit - if not 0, then you can sort each block not completely, but only `limit` first rows by order.
|
||||
/// When count_rows is false, getNumReadRows() will always return 0.
|
||||
/// When count_rows is false, read_rows is not changed. It is needed
|
||||
PartialSortingTransform(
|
||||
const Block & header_,
|
||||
SortDescription & description_,
|
||||
UInt64 limit_ = 0,
|
||||
bool do_count_rows_ = true);
|
||||
UInt64 limit_ = 0);
|
||||
|
||||
String getName() const override { return "PartialSortingTransform"; }
|
||||
|
||||
/// Total num rows passed to transform.
|
||||
UInt64 getNumReadRows() const { return read_rows; }
|
||||
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) { read_rows.swap(counter); }
|
||||
|
||||
protected:
|
||||
void transform(Chunk & chunk) override;
|
||||
@ -30,11 +29,7 @@ protected:
|
||||
private:
|
||||
SortDescription description;
|
||||
UInt64 limit;
|
||||
UInt64 read_rows = 0;
|
||||
|
||||
/// Do we need calculate read_rows value?
|
||||
/// Used to skip total row when count rows_before_limit_at_least.
|
||||
bool do_count_rows;
|
||||
RowsBeforeLimitCounterPtr read_rows;
|
||||
};
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user