fix behaviour of max_rows_to_read for trival limit queries

This commit is contained in:
Han Fei 2022-10-22 18:27:24 +02:00
parent d85bfe6ea3
commit 2fc91fd338
8 changed files with 80 additions and 17 deletions

View File

@ -2143,6 +2143,8 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, context);
auto local_limits = getStorageLimits(*context, options);
/** Optimization - if not specified DISTINCT, WHERE, GROUP, HAVING, ORDER, JOIN, LIMIT BY, WITH TIES
* but LIMIT is specified, and limit + offset < max_block_size,
* then as the block size we will use limit + offset (not to read more from the table than requested),
@ -2161,17 +2163,22 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
&& !query_analyzer->hasAggregation()
&& !query_analyzer->hasWindow()
&& query.limitLength()
&& limit_length <= std::numeric_limits<UInt64>::max() - limit_offset
&& limit_length + limit_offset < max_block_size)
&& limit_length <= std::numeric_limits<UInt64>::max() - limit_offset)
{
max_block_size = std::max<UInt64>(1, limit_length + limit_offset);
max_threads_execute_query = max_streams = 1;
if (limit_length + limit_offset < max_block_size)
{
max_block_size = std::max<UInt64>(1, limit_length + limit_offset);
max_threads_execute_query = max_streams = 1;
}
if (limit_length + limit_offset < local_limits.local_limits.size_limits.max_rows)
{
query_info.limit = limit_length + limit_offset;
}
}
if (!max_block_size)
throw Exception("Setting 'max_block_size' cannot be zero", ErrorCodes::PARAMETER_OUT_OF_BOUND);
auto local_limits = getStorageLimits(*context, options);
storage_limits.emplace_back(local_limits);
/// Initialize the initial data streams to which the query transforms are superimposed. Table or subquery or prepared input?

View File

@ -173,6 +173,9 @@ Pipe ReadFromMergeTree::readFromPool(
total_rows += part.getRowsCount();
}
if (query_info.limit > 0 && query_info.limit < total_rows)
total_rows = query_info.limit;
const auto & settings = context->getSettingsRef();
const auto & client_info = context->getClientInfo();
MergeTreeReadPool::BackoffSettings backoff_settings(settings);
@ -246,10 +249,24 @@ ProcessorPtr ReadFromMergeTree::createSource(
};
}
return std::make_shared<TSource>(
auto total_rows = part.getRowsCount();
if (query_info.limit > 0 && query_info.limit < total_rows)
total_rows = query_info.limit;
auto source = std::make_shared<TSource>(
data, storage_snapshot, part.data_part, max_block_size, preferred_block_size_bytes,
preferred_max_column_in_block_size_bytes, required_columns, part.ranges, use_uncompressed_cache, prewhere_info,
actions_settings, reader_settings, virt_column_names, part.part_index_in_query, has_limit_below_one_block, std::move(extension));
/// Actually it means that parallel reading from replicas enabled
/// and we have to collaborate with initiator.
/// In this case we won't set approximate rows, because it will be accounted multiple times.
/// Also do not count amount of read rows if we read in order of sorting key,
/// because we don't know actual amount of read rows in case when limit is set.
if (!extension.has_value() && !reader_settings.read_in_order)
source -> addTotalRowsApprox(total_rows);
return source;
}
Pipe ReadFromMergeTree::readInOrder(

View File

@ -1061,6 +1061,10 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
auto current_rows_estimate = ranges.getRowsCount();
size_t prev_total_rows_estimate = total_rows.fetch_add(current_rows_estimate);
size_t total_rows_estimate = current_rows_estimate + prev_total_rows_estimate;
if (query_info.limit > 0 && total_rows_estimate > query_info.limit)
{
total_rows_estimate = query_info.limit;
}
limits.check(total_rows_estimate, 0, "rows (controlled by 'max_rows_to_read' setting)", ErrorCodes::TOO_MANY_ROWS);
leaf_limits.check(
total_rows_estimate, 0, "rows (controlled by 'max_rows_to_read_leaf' setting)", ErrorCodes::TOO_MANY_ROWS);

View File

@ -38,14 +38,6 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
has_limit_below_one_block(has_limit_below_one_block_),
total_rows(data_part->index_granularity.getRowsCountInRanges(all_mark_ranges))
{
/// Actually it means that parallel reading from replicas enabled
/// and we have to collaborate with initiator.
/// In this case we won't set approximate rows, because it will be accounted multiple times.
/// Also do not count amount of read rows if we read in order of sorting key,
/// because we don't know actual amount of read rows in case when limit is set.
if (!extension_.has_value() && !reader_settings.read_in_order)
addTotalRowsApprox(total_rows);
ordered_names = header_without_virtual_columns.getNames();
}

View File

@ -220,6 +220,9 @@ struct SelectQueryInfo
Block minmax_count_projection_block;
MergeTreeDataSelectAnalysisResultPtr merge_tree_select_result_ptr;
// If limit is not 0, that means it's a trival limit query.
UInt64 limit = 0;
InputOrderInfoPtr getInputOrderInfo() const
{
return input_order_info ? input_order_info : (projection ? projection->input_order_info : nullptr);

View File

@ -2,6 +2,7 @@
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypesNumber.h>
#include <Storages/System/StorageSystemNumbers.h>
#include <Storages/SelectQueryInfo.h>
#include <Processors/ISource.h>
#include <QueryPipeline/Pipe.h>
@ -125,7 +126,7 @@ StorageSystemNumbers::StorageSystemNumbers(const StorageID & table_id, bool mult
Pipe StorageSystemNumbers::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo &,
SelectQueryInfo & query_info,
ContextPtr /*context*/,
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size,
@ -154,7 +155,12 @@ Pipe StorageSystemNumbers::read(
auto source = std::make_shared<NumbersMultiThreadedSource>(state, max_block_size, max_counter);
if (i == 0)
source->addTotalRowsApprox(*limit);
{
auto rows_appr = *limit;
if (query_info.limit > 0 && query_info.limit < rows_appr)
rows_appr = query_info.limit;
source->addTotalRowsApprox(rows_appr);
}
pipe.addSource(std::move(source));
}
@ -167,7 +173,12 @@ Pipe StorageSystemNumbers::read(
auto source = std::make_shared<NumbersSource>(max_block_size, offset + i * max_block_size, num_streams * max_block_size);
if (limit && i == 0)
source->addTotalRowsApprox(*limit);
{
auto rows_appr = *limit;
if (query_info.limit > 0 && query_info.limit < rows_appr)
rows_appr = query_info.limit;
source->addTotalRowsApprox(rows_appr);
}
pipe.addSource(std::move(source));
}

View File

@ -0,0 +1,7 @@
0
0
1
2
3
4
0

View File

@ -0,0 +1,22 @@
DROP TABLE IF EXISTS t_max_rows_to_read;
CREATE TABLE t_max_rows_to_read (a UInt64)
ENGINE = MergeTree ORDER BY a
SETTINGS index_granularity = 4;
INSERT INTO t_max_rows_to_read SELECT number FROM numbers(100);
SET max_block_size = 10;
SET max_rows_to_read = 20;
SET read_overflow_mode = 'throw';
SELECT number FROM numbers(30); -- { serverError 158 }
SELECT number FROM numbers(30) LIMIT 21; -- { serverError 158 }
SELECT number FROM numbers(30) LIMIT 1;
SELECT number FROM numbers(5);
SELECT a FROM t_max_rows_to_read LIMIT 1;
SELECT a FROM t_max_rows_to_read LIMIT 11 offset 11; -- { serverError 158 }
SELECT a FROM t_max_rows_to_read WHERE a > 50 LIMIT 1; -- { serverError 158 }
DROP TABLE t_max_rows_to_read;