mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-14 19:45:11 +00:00
Correct exact_rows_before_limit in all senarios
This commit is contained in:
parent
069cac8d6e
commit
02c5d1f364
@ -2874,8 +2874,10 @@ void InterpreterSelectQuery::executeMergeSorted(QueryPlan & query_plan, const st
|
||||
SortDescription sort_description = getSortDescription(query, context);
|
||||
const UInt64 limit = getLimitForSorting(query, context);
|
||||
const auto max_block_size = context->getSettingsRef().max_block_size;
|
||||
const auto exact_rows_before_limit = context->getSettingsRef().exact_rows_before_limit;
|
||||
|
||||
auto merging_sorted = std::make_unique<SortingStep>(query_plan.getCurrentDataStream(), std::move(sort_description), max_block_size, limit);
|
||||
auto merging_sorted = std::make_unique<SortingStep>(
|
||||
query_plan.getCurrentDataStream(), std::move(sort_description), max_block_size, limit, exact_rows_before_limit);
|
||||
merging_sorted->setStepDescription("Merge sorted streams " + description);
|
||||
query_plan.addStep(std::move(merging_sorted));
|
||||
}
|
||||
|
@ -568,7 +568,8 @@ void addMergeSortingStep(QueryPlan & query_plan,
|
||||
auto merging_sorted = std::make_unique<SortingStep>(query_plan.getCurrentDataStream(),
|
||||
sort_description,
|
||||
max_block_size,
|
||||
query_analysis_result.partial_sorting_limit);
|
||||
query_analysis_result.partial_sorting_limit,
|
||||
settings.exact_rows_before_limit);
|
||||
merging_sorted->setStepDescription("Merge sorted streams " + description);
|
||||
query_plan.addStep(std::move(merging_sorted));
|
||||
}
|
||||
|
@ -39,7 +39,7 @@ public:
|
||||
virtual void setRowsBeforeLimit(size_t /*rows_before_limit*/) {}
|
||||
|
||||
/// Counter to calculate rows_before_limit_at_least in processors pipeline.
|
||||
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) { rows_before_limit_counter.swap(counter); }
|
||||
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) override { rows_before_limit_counter.swap(counter); }
|
||||
|
||||
/// Notify about progress. Method could be called from different threads.
|
||||
/// Passed value are delta, that must be summarized.
|
||||
|
@ -21,6 +21,9 @@ class IQueryPlanStep;
|
||||
struct StorageLimits;
|
||||
using StorageLimitsList = std::list<StorageLimits>;
|
||||
|
||||
class RowsBeforeLimitCounter;
|
||||
using RowsBeforeLimitCounterPtr = std::shared_ptr<RowsBeforeLimitCounter>;
|
||||
|
||||
class IProcessor;
|
||||
using ProcessorPtr = std::shared_ptr<IProcessor>;
|
||||
using Processors = std::vector<ProcessorPtr>;
|
||||
@ -357,6 +360,10 @@ public:
|
||||
/// You should zero internal counters in the call, in order to make in idempotent.
|
||||
virtual std::optional<ReadProgress> getReadProgress() { return std::nullopt; }
|
||||
|
||||
/// Set rows_before_limit counter for current processor.
|
||||
/// This counter is used to calculate the number of rows right before any filtration of LimitTransform.
|
||||
virtual void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr /* counter */) {}
|
||||
|
||||
protected:
|
||||
virtual void onCancel() {}
|
||||
|
||||
|
@ -66,7 +66,7 @@ public:
|
||||
InputPort & getInputPort() { return inputs.front(); }
|
||||
OutputPort & getOutputPort() { return outputs.front(); }
|
||||
|
||||
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) { rows_before_limit_at_least.swap(counter); }
|
||||
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) override { rows_before_limit_at_least.swap(counter); }
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -16,7 +16,7 @@ public:
|
||||
const Block & header, size_t num_inputs,
|
||||
SortDescription description_, size_t max_block_size)
|
||||
: IMergingTransform(
|
||||
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0,
|
||||
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
|
||||
header,
|
||||
num_inputs,
|
||||
std::move(description_),
|
||||
|
@ -20,7 +20,7 @@ public:
|
||||
WriteBuffer * out_row_sources_buf_ = nullptr,
|
||||
bool use_average_block_sizes = false)
|
||||
: IMergingTransform(
|
||||
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0,
|
||||
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
|
||||
header,
|
||||
num_inputs,
|
||||
std::move(description_),
|
||||
|
@ -20,7 +20,7 @@ public:
|
||||
size_t max_block_size,
|
||||
size_t max_block_bytes)
|
||||
: IMergingTransform(
|
||||
num_inputs, header, {}, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0,
|
||||
num_inputs, header, {}, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
|
||||
header,
|
||||
num_inputs,
|
||||
params,
|
||||
|
@ -15,7 +15,7 @@ public:
|
||||
SortDescription description_, size_t max_block_size,
|
||||
Graphite::Params params_, time_t time_of_merge_)
|
||||
: IMergingTransform(
|
||||
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0,
|
||||
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
|
||||
header,
|
||||
num_inputs,
|
||||
std::move(description_),
|
||||
|
@ -14,10 +14,12 @@ IMergingTransformBase::IMergingTransformBase(
|
||||
const Block & input_header,
|
||||
const Block & output_header,
|
||||
bool have_all_inputs_,
|
||||
UInt64 limit_hint_)
|
||||
UInt64 limit_hint_,
|
||||
bool always_read_till_end_)
|
||||
: IProcessor(InputPorts(num_inputs, input_header), {output_header})
|
||||
, have_all_inputs(have_all_inputs_)
|
||||
, limit_hint(limit_hint_)
|
||||
, always_read_till_end(always_read_till_end_)
|
||||
{
|
||||
}
|
||||
|
||||
@ -33,10 +35,12 @@ IMergingTransformBase::IMergingTransformBase(
|
||||
const Blocks & input_headers,
|
||||
const Block & output_header,
|
||||
bool have_all_inputs_,
|
||||
UInt64 limit_hint_)
|
||||
UInt64 limit_hint_,
|
||||
bool always_read_till_end_)
|
||||
: IProcessor(createPorts(input_headers), {output_header})
|
||||
, have_all_inputs(have_all_inputs_)
|
||||
, limit_hint(limit_hint_)
|
||||
, always_read_till_end(always_read_till_end_)
|
||||
{
|
||||
}
|
||||
|
||||
@ -98,7 +102,7 @@ IProcessor::Status IMergingTransformBase::prepareInitializeInputs()
|
||||
/// (e.g. with optimized 'ORDER BY primary_key LIMIT n' and small 'n')
|
||||
/// we won't have to read any chunks anymore;
|
||||
auto chunk = input.pull(limit_hint != 0);
|
||||
if (limit_hint && chunk.getNumRows() < limit_hint)
|
||||
if ((limit_hint && chunk.getNumRows() < limit_hint) || always_read_till_end)
|
||||
input.setNeeded();
|
||||
|
||||
if (!chunk.hasRows())
|
||||
@ -164,6 +168,21 @@ IProcessor::Status IMergingTransformBase::prepare()
|
||||
if (is_port_full)
|
||||
return Status::PortFull;
|
||||
|
||||
if (always_read_till_end)
|
||||
{
|
||||
for (auto & input : inputs)
|
||||
{
|
||||
if (!input.isFinished())
|
||||
{
|
||||
input.setNeeded();
|
||||
if (input.hasData())
|
||||
std::ignore = input.pull();
|
||||
|
||||
return Status::NeedData;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (auto & input : inputs)
|
||||
input.close();
|
||||
|
||||
|
@ -17,13 +17,15 @@ public:
|
||||
const Block & input_header,
|
||||
const Block & output_header,
|
||||
bool have_all_inputs_,
|
||||
UInt64 limit_hint_);
|
||||
UInt64 limit_hint_,
|
||||
bool always_read_till_end_);
|
||||
|
||||
IMergingTransformBase(
|
||||
const Blocks & input_headers,
|
||||
const Block & output_header,
|
||||
bool have_all_inputs_,
|
||||
UInt64 limit_hint_);
|
||||
UInt64 limit_hint_,
|
||||
bool always_read_till_end_);
|
||||
|
||||
OutputPort & getOutputPort() { return outputs.front(); }
|
||||
|
||||
@ -67,6 +69,7 @@ private:
|
||||
std::atomic<bool> have_all_inputs;
|
||||
bool is_initialized = false;
|
||||
UInt64 limit_hint = 0;
|
||||
bool always_read_till_end = false;
|
||||
|
||||
IProcessor::Status prepareInitializeInputs();
|
||||
};
|
||||
@ -83,8 +86,9 @@ public:
|
||||
const Block & output_header,
|
||||
bool have_all_inputs_,
|
||||
UInt64 limit_hint_,
|
||||
bool always_read_till_end_,
|
||||
Args && ... args)
|
||||
: IMergingTransformBase(num_inputs, input_header, output_header, have_all_inputs_, limit_hint_)
|
||||
: IMergingTransformBase(num_inputs, input_header, output_header, have_all_inputs_, limit_hint_, always_read_till_end_)
|
||||
, algorithm(std::forward<Args>(args) ...)
|
||||
{
|
||||
}
|
||||
@ -95,9 +99,10 @@ public:
|
||||
const Block & output_header,
|
||||
bool have_all_inputs_,
|
||||
UInt64 limit_hint_,
|
||||
bool always_read_till_end_,
|
||||
bool empty_chunk_on_finish_,
|
||||
Args && ... args)
|
||||
: IMergingTransformBase(input_headers, output_header, have_all_inputs_, limit_hint_)
|
||||
: IMergingTransformBase(input_headers, output_header, have_all_inputs_, limit_hint_, always_read_till_end_)
|
||||
, empty_chunk_on_finish(empty_chunk_on_finish_)
|
||||
, algorithm(std::forward<Args>(args) ...)
|
||||
{
|
||||
|
@ -14,6 +14,7 @@ MergingSortedTransform::MergingSortedTransform(
|
||||
size_t max_block_size,
|
||||
SortingQueueStrategy sorting_queue_strategy,
|
||||
UInt64 limit_,
|
||||
bool always_read_till_end_,
|
||||
WriteBuffer * out_row_sources_buf_,
|
||||
bool quiet_,
|
||||
bool use_average_block_sizes,
|
||||
@ -24,6 +25,7 @@ MergingSortedTransform::MergingSortedTransform(
|
||||
header,
|
||||
have_all_inputs_,
|
||||
limit_,
|
||||
always_read_till_end_,
|
||||
header,
|
||||
num_inputs,
|
||||
description_,
|
||||
|
@ -18,6 +18,7 @@ public:
|
||||
size_t max_block_size,
|
||||
SortingQueueStrategy sorting_queue_strategy,
|
||||
UInt64 limit_ = 0,
|
||||
bool always_read_till_end_ = false,
|
||||
WriteBuffer * out_row_sources_buf_ = nullptr,
|
||||
bool quiet_ = false,
|
||||
bool use_average_block_sizes = false,
|
||||
|
@ -20,7 +20,7 @@ public:
|
||||
bool use_average_block_sizes = false,
|
||||
bool cleanup = false)
|
||||
: IMergingTransform(
|
||||
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0,
|
||||
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
|
||||
header,
|
||||
num_inputs,
|
||||
std::move(description_),
|
||||
|
@ -19,7 +19,7 @@ public:
|
||||
const Names & partition_key_columns,
|
||||
size_t max_block_size)
|
||||
: IMergingTransform(
|
||||
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0,
|
||||
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
|
||||
header,
|
||||
num_inputs,
|
||||
std::move(description_),
|
||||
|
@ -19,7 +19,7 @@ public:
|
||||
WriteBuffer * out_row_sources_buf_ = nullptr,
|
||||
bool use_average_block_sizes = false)
|
||||
: IMergingTransform(
|
||||
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0,
|
||||
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
|
||||
header,
|
||||
num_inputs,
|
||||
std::move(description_),
|
||||
|
@ -45,7 +45,7 @@ public:
|
||||
InputPort & getInputPort() { return inputs.front(); }
|
||||
OutputPort & getOutputPort() { return outputs.front(); }
|
||||
|
||||
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) { rows_before_limit_at_least.swap(counter); }
|
||||
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) override { rows_before_limit_at_least.swap(counter); }
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -98,11 +98,13 @@ SortingStep::SortingStep(
|
||||
const DataStream & input_stream,
|
||||
SortDescription sort_description_,
|
||||
size_t max_block_size_,
|
||||
UInt64 limit_)
|
||||
UInt64 limit_,
|
||||
bool always_read_till_end_)
|
||||
: ITransformingStep(input_stream, input_stream.header, getTraits(limit_))
|
||||
, type(Type::MergingSorted)
|
||||
, result_description(std::move(sort_description_))
|
||||
, limit(limit_)
|
||||
, always_read_till_end(always_read_till_end_)
|
||||
, sort_settings(max_block_size_)
|
||||
{
|
||||
sort_settings.max_block_size = max_block_size_;
|
||||
@ -175,7 +177,8 @@ void SortingStep::mergingSorted(QueryPipelineBuilder & pipeline, const SortDescr
|
||||
result_sort_desc,
|
||||
sort_settings.max_block_size,
|
||||
SortingQueueStrategy::Batch,
|
||||
limit_);
|
||||
limit_,
|
||||
always_read_till_end);
|
||||
|
||||
pipeline.addTransform(std::move(transform));
|
||||
}
|
||||
@ -262,7 +265,13 @@ void SortingStep::fullSort(
|
||||
if (pipeline.getNumStreams() > 1)
|
||||
{
|
||||
auto transform = std::make_shared<MergingSortedTransform>(
|
||||
pipeline.getHeader(), pipeline.getNumStreams(), result_sort_desc, sort_settings.max_block_size, SortingQueueStrategy::Batch, limit_);
|
||||
pipeline.getHeader(),
|
||||
pipeline.getNumStreams(),
|
||||
result_sort_desc,
|
||||
sort_settings.max_block_size,
|
||||
SortingQueueStrategy::Batch,
|
||||
limit_,
|
||||
always_read_till_end);
|
||||
|
||||
pipeline.addTransform(std::move(transform));
|
||||
}
|
||||
|
@ -53,7 +53,9 @@ public:
|
||||
const DataStream & input_stream,
|
||||
SortDescription sort_description_,
|
||||
size_t max_block_size_,
|
||||
UInt64 limit_ = 0);
|
||||
UInt64 limit_ = 0,
|
||||
bool always_read_till_end_ = false
|
||||
);
|
||||
|
||||
String getName() const override { return "Sorting"; }
|
||||
|
||||
@ -100,6 +102,7 @@ private:
|
||||
SortDescription prefix_description;
|
||||
const SortDescription result_description;
|
||||
UInt64 limit;
|
||||
bool always_read_till_end;
|
||||
|
||||
Settings sort_settings;
|
||||
|
||||
|
@ -107,7 +107,7 @@ std::optional<Chunk> RemoteSource::tryGenerate()
|
||||
query_executor->setProfileInfoCallback([this](const ProfileInfo & info)
|
||||
{
|
||||
if (rows_before_limit && info.hasAppliedLimit())
|
||||
rows_before_limit->set(info.getRowsBeforeLimit());
|
||||
rows_before_limit->add(info.getRowsBeforeLimit());
|
||||
});
|
||||
|
||||
query_executor->sendQuery();
|
||||
|
@ -3,7 +3,7 @@
|
||||
#include <Processors/ISource.h>
|
||||
#include <Processors/RowsBeforeLimitCounter.h>
|
||||
#include <QueryPipeline/Pipe.h>
|
||||
#include "Core/UUID.h"
|
||||
#include <Core/UUID.h>
|
||||
#include <atomic>
|
||||
|
||||
namespace DB
|
||||
@ -29,7 +29,7 @@ public:
|
||||
|
||||
void connectToScheduler(InputPort & input_port);
|
||||
|
||||
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) { rows_before_limit.swap(counter); }
|
||||
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) override { rows_before_limit.swap(counter); }
|
||||
|
||||
UUID getParallelReplicasGroupUUID();
|
||||
|
||||
|
@ -126,7 +126,7 @@ ColumnGathererTransform::ColumnGathererTransform(
|
||||
ReadBuffer & row_sources_buf_,
|
||||
size_t block_preferred_size_)
|
||||
: IMergingTransform<ColumnGathererStream>(
|
||||
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0,
|
||||
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
|
||||
num_inputs, row_sources_buf_, block_preferred_size_)
|
||||
, log(&Poco::Logger::get("ColumnGathererStream"))
|
||||
{
|
||||
|
@ -109,6 +109,7 @@ void FinishSortingTransform::generate()
|
||||
generated_prefix = true;
|
||||
}
|
||||
|
||||
// TODO: Here we should also consider LIMIT optimization.
|
||||
generated_chunk = merge_sorter->read();
|
||||
|
||||
if (!generated_chunk)
|
||||
|
@ -844,6 +844,7 @@ MergeJoinTransform::MergeJoinTransform(
|
||||
output_header,
|
||||
/* have_all_inputs_= */ true,
|
||||
limit_hint_,
|
||||
/* always_read_till_end_= */ false,
|
||||
/* empty_chunk_on_finish_= */ true,
|
||||
table_join, input_headers, max_block_size)
|
||||
, log(&Poco::Logger::get("MergeJoinTransform"))
|
||||
|
@ -187,6 +187,7 @@ void MergeSortingTransform::consume(Chunk chunk)
|
||||
max_merged_block_size,
|
||||
SortingQueueStrategy::Batch,
|
||||
limit,
|
||||
/*always_read_till_end_=*/ false,
|
||||
nullptr,
|
||||
quiet,
|
||||
use_average_block_sizes,
|
||||
|
@ -20,7 +20,7 @@ public:
|
||||
|
||||
String getName() const override { return "PartialSortingTransform"; }
|
||||
|
||||
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) { read_rows.swap(counter); }
|
||||
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) override { read_rows.swap(counter); }
|
||||
|
||||
protected:
|
||||
void transform(Chunk & chunk) override;
|
||||
|
@ -42,6 +42,8 @@ public:
|
||||
Status prepare() override;
|
||||
void work() override;
|
||||
|
||||
bool hasFilter() const { return !filter_column_name.empty(); }
|
||||
|
||||
static Block transformHeader(Block block, const ActionsDAG * expression, const std::string & filter_column_name, bool remove_filter, bool final, const ColumnsMask & aggregates_mask);
|
||||
|
||||
protected:
|
||||
|
@ -20,6 +20,7 @@
|
||||
#include <Processors/Transforms/PartialSortingTransform.h>
|
||||
#include <Processors/Transforms/StreamInQueryCacheTransform.h>
|
||||
#include <Processors/Transforms/ExpressionTransform.h>
|
||||
#include <Processors/Transforms/TotalsHavingTransform.h>
|
||||
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
|
||||
|
||||
|
||||
@ -129,50 +130,77 @@ static void checkCompleted(Processors & processors)
|
||||
static void initRowsBeforeLimit(IOutputFormat * output_format)
|
||||
{
|
||||
RowsBeforeLimitCounterPtr rows_before_limit_at_least;
|
||||
|
||||
/// TODO: add setRowsBeforeLimitCounter as virtual method to IProcessor.
|
||||
std::vector<LimitTransform *> limits;
|
||||
std::vector<RemoteSource *> remote_sources;
|
||||
|
||||
std::vector<IProcessor *> processors;
|
||||
std::map<IProcessor *, bool> limit_candidates;
|
||||
std::unordered_set<IProcessor *> visited;
|
||||
bool has_limit = false;
|
||||
|
||||
struct QueuedEntry
|
||||
{
|
||||
IProcessor * processor;
|
||||
bool visited_limit;
|
||||
IProcessor * limit_processor;
|
||||
};
|
||||
|
||||
std::queue<QueuedEntry> queue;
|
||||
|
||||
queue.push({ output_format, false });
|
||||
queue.push({ output_format, nullptr });
|
||||
visited.emplace(output_format);
|
||||
|
||||
while (!queue.empty())
|
||||
{
|
||||
auto * processor = queue.front().processor;
|
||||
auto visited_limit = queue.front().visited_limit;
|
||||
auto * limit_processor = queue.front().limit_processor;
|
||||
queue.pop();
|
||||
|
||||
if (!visited_limit)
|
||||
/// Set counter based on the following cases:
|
||||
/// 1. Remote: Set counter on Remote
|
||||
/// 2. Limit ... PartialSorting: Set counter on PartialSorting
|
||||
/// 3. Limit ... TotalsHaving(with filter) ... Remote: Set counter on Limit
|
||||
/// 4. Limit ... Remote: Set counter on Remote
|
||||
/// 5. Limit ... : Set counter on Limit
|
||||
|
||||
/// Case 1.
|
||||
if (typeid_cast<RemoteSource *>(processor) && !limit_processor)
|
||||
{
|
||||
if (auto * limit = typeid_cast<LimitTransform *>(processor))
|
||||
processors.emplace_back(processor);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (typeid_cast<LimitTransform *>(processor))
|
||||
{
|
||||
has_limit = true;
|
||||
|
||||
/// Ignore child limits
|
||||
if (limit_processor)
|
||||
continue;
|
||||
|
||||
limit_processor = processor;
|
||||
limit_candidates.emplace(limit_processor, true);
|
||||
}
|
||||
else if (limit_processor)
|
||||
{
|
||||
/// Case 2.
|
||||
if (typeid_cast<PartialSortingTransform *>(processor))
|
||||
{
|
||||
visited_limit = true;
|
||||
limits.emplace_back(limit);
|
||||
processors.emplace_back(processor);
|
||||
limit_candidates[limit_processor] = false;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (auto * source = typeid_cast<RemoteSource *>(processor))
|
||||
remote_sources.emplace_back(source);
|
||||
}
|
||||
else if (auto * sorting = typeid_cast<PartialSortingTransform *>(processor))
|
||||
{
|
||||
if (!rows_before_limit_at_least)
|
||||
rows_before_limit_at_least = std::make_shared<RowsBeforeLimitCounter>();
|
||||
/// Case 3.
|
||||
if (auto * having = typeid_cast<TotalsHavingTransform *>(processor))
|
||||
{
|
||||
if (having->hasFilter())
|
||||
continue;
|
||||
}
|
||||
|
||||
sorting->setRowsBeforeLimitCounter(rows_before_limit_at_least);
|
||||
|
||||
/// Don't go to children. Take rows_before_limit from last PartialSortingTransform.
|
||||
continue;
|
||||
/// Case 4.
|
||||
if (typeid_cast<RemoteSource *>(processor))
|
||||
{
|
||||
processors.emplace_back(processor);
|
||||
limit_candidates[limit_processor] = false;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
/// Skip totals and extremes port for output format.
|
||||
@ -180,7 +208,7 @@ static void initRowsBeforeLimit(IOutputFormat * output_format)
|
||||
{
|
||||
auto * child_processor = &format->getPort(IOutputFormat::PortKind::Main).getOutputPort().getProcessor();
|
||||
if (visited.emplace(child_processor).second)
|
||||
queue.push({ child_processor, visited_limit });
|
||||
queue.push({ child_processor, limit_processor });
|
||||
|
||||
continue;
|
||||
}
|
||||
@ -189,28 +217,30 @@ static void initRowsBeforeLimit(IOutputFormat * output_format)
|
||||
{
|
||||
auto * child_processor = &child_port.getOutputPort().getProcessor();
|
||||
if (visited.emplace(child_processor).second)
|
||||
queue.push({ child_processor, visited_limit });
|
||||
queue.push({ child_processor, limit_processor });
|
||||
}
|
||||
}
|
||||
|
||||
if (!rows_before_limit_at_least && (!limits.empty() || !remote_sources.empty()))
|
||||
/// Case 5.
|
||||
for (auto && [limit, valid] : limit_candidates)
|
||||
{
|
||||
rows_before_limit_at_least = std::make_shared<RowsBeforeLimitCounter>();
|
||||
|
||||
for (auto & limit : limits)
|
||||
limit->setRowsBeforeLimitCounter(rows_before_limit_at_least);
|
||||
|
||||
for (auto & source : remote_sources)
|
||||
source->setRowsBeforeLimitCounter(rows_before_limit_at_least);
|
||||
if (valid)
|
||||
processors.push_back(limit);
|
||||
}
|
||||
|
||||
/// If there is a limit, then enable rows_before_limit_at_least
|
||||
/// It is needed when zero rows is read, but we still want rows_before_limit_at_least in result.
|
||||
if (!limits.empty())
|
||||
rows_before_limit_at_least->add(0);
|
||||
if (!processors.empty())
|
||||
{
|
||||
rows_before_limit_at_least = std::make_shared<RowsBeforeLimitCounter>();
|
||||
for (auto & processor : processors)
|
||||
processor->setRowsBeforeLimitCounter(rows_before_limit_at_least);
|
||||
|
||||
/// If there is a limit, then enable rows_before_limit_at_least
|
||||
/// It is needed when zero rows is read, but we still want rows_before_limit_at_least in result.
|
||||
if (has_limit)
|
||||
rows_before_limit_at_least->add(0);
|
||||
|
||||
if (rows_before_limit_at_least)
|
||||
output_format->setRowsBeforeLimitCounter(rows_before_limit_at_least);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -83,7 +83,7 @@ TEST(MergingSortedTest, SimpleBlockSizeTest)
|
||||
EXPECT_EQ(pipe.numOutputPorts(), 3);
|
||||
|
||||
auto transform = std::make_shared<MergingSortedTransform>(pipe.getHeader(), pipe.numOutputPorts(), sort_description,
|
||||
DEFAULT_MERGE_BLOCK_SIZE, SortingQueueStrategy::Batch, 0, nullptr, false, true);
|
||||
DEFAULT_MERGE_BLOCK_SIZE, SortingQueueStrategy::Batch, 0, false, nullptr, false, true);
|
||||
|
||||
pipe.addTransform(std::move(transform));
|
||||
|
||||
@ -125,7 +125,7 @@ TEST(MergingSortedTest, MoreInterestingBlockSizes)
|
||||
EXPECT_EQ(pipe.numOutputPorts(), 3);
|
||||
|
||||
auto transform = std::make_shared<MergingSortedTransform>(pipe.getHeader(), pipe.numOutputPorts(), sort_description,
|
||||
DEFAULT_MERGE_BLOCK_SIZE, SortingQueueStrategy::Batch, 0, nullptr, false, true);
|
||||
DEFAULT_MERGE_BLOCK_SIZE, SortingQueueStrategy::Batch, 0, false, nullptr, false, true);
|
||||
|
||||
pipe.addTransform(std::move(transform));
|
||||
|
||||
|
@ -929,7 +929,16 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
|
||||
{
|
||||
case MergeTreeData::MergingParams::Ordinary:
|
||||
merged_transform = std::make_shared<MergingSortedTransform>(
|
||||
header, pipes.size(), sort_description, merge_block_size, SortingQueueStrategy::Default, 0, ctx->rows_sources_write_buf.get(), true, ctx->blocks_are_granules_size);
|
||||
header,
|
||||
pipes.size(),
|
||||
sort_description,
|
||||
merge_block_size,
|
||||
SortingQueueStrategy::Default,
|
||||
/* limit_= */0,
|
||||
/* always_read_till_end_= */false,
|
||||
ctx->rows_sources_write_buf.get(),
|
||||
true,
|
||||
ctx->blocks_are_granules_size);
|
||||
break;
|
||||
|
||||
case MergeTreeData::MergingParams::Collapsing:
|
||||
|
@ -49,6 +49,7 @@ StorageS3Cluster::StorageS3Cluster(
|
||||
ContextPtr context_,
|
||||
bool structure_argument_was_provided_)
|
||||
: IStorageCluster(table_id_)
|
||||
, log(&Poco::Logger::get("StorageS3Cluster (" + table_id_.table_name + ")"))
|
||||
, s3_configuration{configuration_}
|
||||
, cluster_name(configuration_.cluster_name)
|
||||
, format_name(configuration_.format)
|
||||
@ -156,6 +157,7 @@ Pipe StorageS3Cluster::read(
|
||||
processed_stage,
|
||||
extension);
|
||||
|
||||
remote_query_executor->setLogger(log);
|
||||
pipes.emplace_back(std::make_shared<RemoteSource>(remote_query_executor, add_agg_info, false));
|
||||
}
|
||||
}
|
||||
|
@ -48,6 +48,7 @@ public:
|
||||
ClusterPtr getCluster(ContextPtr context) const override;
|
||||
|
||||
private:
|
||||
Poco::Logger * log;
|
||||
StorageS3::Configuration s3_configuration;
|
||||
String cluster_name;
|
||||
String format_name;
|
||||
|
@ -0,0 +1,144 @@
|
||||
{
|
||||
"meta":
|
||||
[
|
||||
{
|
||||
"name": "i",
|
||||
"type": "Int32"
|
||||
}
|
||||
],
|
||||
|
||||
"data":
|
||||
[
|
||||
[0]
|
||||
],
|
||||
|
||||
"rows": 1,
|
||||
|
||||
"rows_before_limit_at_least": 10000
|
||||
}
|
||||
{
|
||||
"meta":
|
||||
[
|
||||
{
|
||||
"name": "i",
|
||||
"type": "Int32"
|
||||
}
|
||||
],
|
||||
|
||||
"data":
|
||||
[
|
||||
[0]
|
||||
],
|
||||
|
||||
"rows": 1,
|
||||
|
||||
"rows_before_limit_at_least": 10
|
||||
}
|
||||
{
|
||||
"meta":
|
||||
[
|
||||
{
|
||||
"name": "i",
|
||||
"type": "Int32"
|
||||
}
|
||||
],
|
||||
|
||||
"data":
|
||||
[
|
||||
[12]
|
||||
],
|
||||
|
||||
"rows": 1,
|
||||
|
||||
"rows_before_limit_at_least": 3
|
||||
}
|
||||
{
|
||||
"meta":
|
||||
[
|
||||
{
|
||||
"name": "i",
|
||||
"type": "Int32"
|
||||
}
|
||||
],
|
||||
|
||||
"data":
|
||||
[
|
||||
[0]
|
||||
],
|
||||
|
||||
"rows": 1,
|
||||
|
||||
"rows_before_limit_at_least": 20
|
||||
}
|
||||
{
|
||||
"meta":
|
||||
[
|
||||
{
|
||||
"name": "i",
|
||||
"type": "Int32"
|
||||
}
|
||||
],
|
||||
|
||||
"data":
|
||||
[
|
||||
[0]
|
||||
],
|
||||
|
||||
"rows": 1,
|
||||
|
||||
"rows_before_limit_at_least": 60
|
||||
}
|
||||
{
|
||||
"meta":
|
||||
[
|
||||
{
|
||||
"name": "i",
|
||||
"type": "Int32"
|
||||
}
|
||||
],
|
||||
|
||||
"data":
|
||||
[
|
||||
[0]
|
||||
],
|
||||
|
||||
"rows": 1,
|
||||
|
||||
"rows_before_limit_at_least": 40
|
||||
}
|
||||
{
|
||||
"meta":
|
||||
[
|
||||
{
|
||||
"name": "i",
|
||||
"type": "Int32"
|
||||
}
|
||||
],
|
||||
|
||||
"data":
|
||||
[
|
||||
[0]
|
||||
],
|
||||
|
||||
"rows": 1,
|
||||
|
||||
"rows_before_limit_at_least": 60
|
||||
}
|
||||
{
|
||||
"meta":
|
||||
[
|
||||
{
|
||||
"name": "i",
|
||||
"type": "Int32"
|
||||
}
|
||||
],
|
||||
|
||||
"data":
|
||||
[
|
||||
[0]
|
||||
],
|
||||
|
||||
"rows": 1,
|
||||
|
||||
"rows_before_limit_at_least": 40
|
||||
}
|
@ -0,0 +1,27 @@
|
||||
-- Tags: no-parallel, no-random-merge-tree-settings
|
||||
|
||||
drop table if exists test;
|
||||
|
||||
create table test (i int) engine MergeTree order by tuple();
|
||||
|
||||
insert into test select arrayJoin(range(10000));
|
||||
|
||||
set exact_rows_before_limit = 1, output_format_write_statistics = 0, max_block_size = 100;
|
||||
|
||||
select * from test limit 1 FORMAT JSONCompact;
|
||||
|
||||
select * from test where i < 10 group by i limit 1 FORMAT JSONCompact;
|
||||
|
||||
select * from test group by i having i in (10, 11, 12) limit 1 FORMAT JSONCompact;
|
||||
|
||||
select * from test where i < 20 order by i limit 1 FORMAT JSONCompact;
|
||||
|
||||
set prefer_localhost_replica = 0;
|
||||
select * from cluster(test_cluster_two_shards, currentDatabase(), test) where i < 30 limit 1 FORMAT JSONCompact;
|
||||
select * from cluster(test_cluster_two_shards, currentDatabase(), test) where i < 20 order by i limit 1 FORMAT JSONCompact;
|
||||
|
||||
set prefer_localhost_replica = 1;
|
||||
select * from cluster(test_cluster_two_shards, currentDatabase(), test) where i < 30 limit 1 FORMAT JSONCompact;
|
||||
select * from cluster(test_cluster_two_shards, currentDatabase(), test) where i < 20 order by i limit 1 FORMAT JSONCompact;
|
||||
|
||||
drop table if exists test;
|
Loading…
Reference in New Issue
Block a user