Fix rows_before_limit_at_least.

This commit is contained in:
Nikolai Kochetov 2019-04-09 13:17:25 +03:00
parent e0a214d081
commit 54066fc312
8 changed files with 84 additions and 21 deletions

View File

@ -1799,9 +1799,10 @@ void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline)
// limits.size_limits = SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode);
pipeline.addSimpleTransform([&](const Block & header)
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type)
{
return std::make_shared<PartialSortingTransform>(header, order_descr, limit);
bool do_count_rows = stream_type == QueryPipeline::StreamType::Main;
return std::make_shared<PartialSortingTransform>(header, order_descr, limit, do_count_rows);
});
/// If there are several streams, we merge them into one
@ -2102,9 +2103,11 @@ void InterpreterSelectQuery::executeLimit(QueryPipeline & pipeline)
UInt64 limit_offset;
std::tie(limit_length, limit_offset) = getLimitLengthAndOffset(query, context);
pipeline.addSimpleTransform([&](const Block & header)
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type)
{
return std::make_shared<LimitTransform>(header, limit_length, limit_offset, always_read_till_end);
bool do_count_rows_before_limit = stream_type == QueryPipeline::StreamType::Main;
return std::make_shared<LimitTransform>(
header, limit_length, limit_offset, always_read_till_end, do_count_rows_before_limit);
});
}
}

View File

@ -616,6 +616,7 @@ void executeQuery(
pipeline.setOutput(std::move(out));
auto executor = pipeline.execute(context.getSettingsRef().max_threads);
executor->execute();
pipeline.finalize();
}
}
catch (...)

View File

@ -4,10 +4,14 @@
namespace DB
{
LimitTransform::LimitTransform(Block header, size_t limit, size_t offset, bool always_read_till_end)
: IProcessor({std::move(header)}, {std::move(header)}),
input(inputs.front()), output(outputs.front()),
limit(limit), offset(offset), always_read_till_end(always_read_till_end)
LimitTransform::LimitTransform(
Block header, size_t limit, size_t offset,
bool always_read_till_end, bool do_count_rows_before_limit)
: IProcessor({std::move(header)}, {std::move(header)})
, input(inputs.front()), output(outputs.front())
, limit(limit), offset(offset)
, always_read_till_end(always_read_till_end)
, do_count_rows_before_limit(do_count_rows_before_limit)
{
}
@ -64,7 +68,8 @@ LimitTransform::Status LimitTransform::prepare()
has_block = true;
auto rows = current_chunk.getNumRows();
rows_before_limit_at_least += rows;
if (do_count_rows_before_limit)
rows_before_limit_at_least += rows;
/// Skip block (for 'always_read_till_end' case).
if (pushing_is_finished)

View File

@ -23,8 +23,15 @@ private:
UInt64 rows_before_limit_at_least = 0;
/// Do we need calculate rows_before_limit_at_least value?
/// Used to skip total row when count rows_before_limit_at_least.
bool do_count_rows_before_limit;
public:
LimitTransform(Block header, size_t limit, size_t offset, bool always_read_till_end = false);
LimitTransform(
Block header, size_t limit, size_t offset,
bool always_read_till_end = false,
bool do_count_rows_before_limit = true);
String getName() const override { return "Limit"; }

View File

@ -62,18 +62,31 @@ void QueryPipeline::init(Processors sources)
}
}
void QueryPipeline::addSimpleTransform(const ProcessorGetter & getter)
static ProcessorPtr callProcessorGetter(
const Block & header, const QueryPipeline::ProcessorGetter & getter, QueryPipeline::StreamType)
{
return getter(header);
}
static ProcessorPtr callProcessorGetter(
const Block & header, const QueryPipeline::ProcessorGetterWithStreamKind & getter, QueryPipeline::StreamType kind)
{
return getter(header, kind);
}
template <typename TProcessorGetter>
void QueryPipeline::addSimpleTransformImpl(const TProcessorGetter & getter)
{
checkInitialized();
Block header;
auto add_transform = [&](OutputPort *& stream)
auto add_transform = [&](OutputPort *& stream, StreamType stream_type)
{
if (!stream)
return;
auto transform = getter(current_header);
auto transform = callProcessorGetter(current_header, getter, stream_type);
if (transform->getInputs().size() != 1)
throw Exception("Processor for query pipeline transform should have single input, "
@ -98,15 +111,25 @@ void QueryPipeline::addSimpleTransform(const ProcessorGetter & getter)
};
for (auto & stream : streams)
add_transform(stream);
add_transform(stream, StreamType::Main);
add_transform(totals_having_port);
add_transform(extremes_port);
add_transform(delayed_stream_port);
add_transform(delayed_stream_port, StreamType::Main);
add_transform(totals_having_port, StreamType::Totals);
add_transform(extremes_port, StreamType::Extremes);
current_header = std::move(header);
}
void QueryPipeline::addSimpleTransform(const ProcessorGetter & getter)
{
addSimpleTransformImpl(getter);
}
void QueryPipeline::addSimpleTransform(const ProcessorGetterWithStreamKind & getter)
{
addSimpleTransformImpl(getter);
}
void QueryPipeline::addPipe(Processors pipe)
{
checkInitialized();

View File

@ -26,9 +26,18 @@ public:
void init(Processors sources);
bool initialized() { return !processors.empty(); }
enum class StreamType
{
Main = 0,
Totals,
Extremes,
};
using ProcessorGetter = std::function<ProcessorPtr(const Block & header)>;
using ProcessorGetterWithStreamKind = std::function<ProcessorPtr(const Block & header, StreamType stream_type)>;
void addSimpleTransform(const ProcessorGetter & getter);
void addSimpleTransform(const ProcessorGetterWithStreamKind & getter);
void addPipe(Processors pipe);
void addTotalsHavingTransform(ProcessorPtr transform);
void addExtremesTransform(ProcessorPtr transform);
@ -89,6 +98,9 @@ private:
void checkSource(const ProcessorPtr & source);
void concatDelayedStream();
template <typename TProcessorGetter>
void addSimpleTransformImpl(const TProcessorGetter & getter);
void calcRowsBeforeLimit();
};

View File

@ -4,15 +4,17 @@
namespace DB
{
PartialSortingTransform::PartialSortingTransform(const Block & header, SortDescription & description, UInt64 limit)
PartialSortingTransform::PartialSortingTransform(
const Block & header, SortDescription & description, UInt64 limit, bool do_count_rows)
: ISimpleTransform(header, header, false)
, description(description), limit(limit)
, description(description), limit(limit), do_count_rows(do_count_rows)
{
}
void PartialSortingTransform::transform(Chunk & chunk)
{
read_rows += chunk.getNumRows();
if (do_count_rows)
read_rows += chunk.getNumRows();
auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
chunk.clear();

View File

@ -12,10 +12,16 @@ class PartialSortingTransform : public ISimpleTransform
{
public:
/// limit - if not 0, then you can sort each block not completely, but only `limit` first rows by order.
PartialSortingTransform(const Block & header, SortDescription & description, UInt64 limit = 0);
/// When count_rows is false, getNumReadRows() will always return 0.
PartialSortingTransform(
const Block & header,
SortDescription & description,
UInt64 limit = 0,
bool do_count_rows = true);
String getName() const override { return "PartialSortingTransform"; }
/// Total num rows passed to transform.
UInt64 getNumReadRows() const { return read_rows; }
protected:
@ -25,6 +31,10 @@ private:
SortDescription description;
UInt64 limit;
UInt64 read_rows = 0;
/// Do we need calculate read_rows value?
/// Used to skip total row when count rows_before_limit_at_least.
bool do_count_rows;
};
}