Merge pull request #25721 from CurtizJ/read-in-order-perf-2

Reduce memory usage in queries with `ORDER BY primary_key
This commit is contained in:
Alexander Kuzmenkov 2021-08-05 16:13:48 +03:00 committed by GitHub
commit d852207b0e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 275 additions and 303 deletions

View File

@ -83,7 +83,7 @@ TEST(MergingSortedTest, SimpleBlockSizeTest)
EXPECT_EQ(pipe.numOutputPorts(), 3);
auto transform = std::make_shared<MergingSortedTransform>(pipe.getHeader(), pipe.numOutputPorts(), sort_description,
DEFAULT_MERGE_BLOCK_SIZE, 0, nullptr, false, true);
DEFAULT_MERGE_BLOCK_SIZE, 0, false, nullptr, false, true);
pipe.addTransform(std::move(transform));
@ -128,7 +128,7 @@ TEST(MergingSortedTest, MoreInterestingBlockSizes)
EXPECT_EQ(pipe.numOutputPorts(), 3);
auto transform = std::make_shared<MergingSortedTransform>(pipe.getHeader(), pipe.numOutputPorts(), sort_description,
DEFAULT_MERGE_BLOCK_SIZE, 0, nullptr, false, true);
DEFAULT_MERGE_BLOCK_SIZE, 0, false, nullptr, false, true);
pipe.addTransform(std::move(transform));

View File

@ -1928,11 +1928,13 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
}
}
/// If we don't have filtration, we can pushdown limit to reading stage for optimizations.
UInt64 limit = (query.hasFiltration() || query.groupBy()) ? 0 : getLimitForSorting(query, context);
if (query_info.projection)
query_info.projection->input_order_info
= query_info.projection->order_optimizer->getInputOrder(query_info.projection->desc->metadata, context);
= query_info.projection->order_optimizer->getInputOrder(query_info.projection->desc->metadata, context, limit);
else
query_info.input_order_info = query_info.order_optimizer->getInputOrder(metadata_snapshot, context);
query_info.input_order_info = query_info.order_optimizer->getInputOrder(metadata_snapshot, context, limit);
}
StreamLocalLimits limits;
@ -2290,8 +2292,14 @@ void InterpreterSelectQuery::executeOrderOptimized(QueryPlan & query_plan, Input
{
const Settings & settings = context->getSettingsRef();
const auto & query = getSelectQuery();
auto finish_sorting_step = std::make_unique<FinishSortingStep>(
query_plan.getCurrentDataStream(), input_sorting_info->order_key_prefix_descr, output_order_descr, settings.max_block_size, limit);
query_plan.getCurrentDataStream(),
input_sorting_info->order_key_prefix_descr,
output_order_descr,
settings.max_block_size,
limit,
query.hasFiltration());
query_plan.addStep(std::move(finish_sorting_step));
}

View File

@ -69,6 +69,8 @@ public:
const ASTPtr limitLength() const { return getExpression(Expression::LIMIT_LENGTH); }
const ASTPtr settings() const { return getExpression(Expression::SETTINGS); }
bool hasFiltration() const { return where() || prewhere() || having(); }
/// Set/Reset/Remove expression.
void setExpression(Expression expr, ASTPtr && ast);

View File

@ -16,7 +16,7 @@ public:
const Block & header, size_t num_inputs,
SortDescription description_, size_t max_block_size)
: IMergingTransform(
num_inputs, header, header, true,
num_inputs, header, header, /*have_all_inputs_=*/ true, /*has_limit_below_one_block_=*/ false,
header,
num_inputs,
std::move(description_),

View File

@ -20,7 +20,7 @@ public:
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false)
: IMergingTransform(
num_inputs, header, header, true,
num_inputs, header, header, /*have_all_inputs_=*/ true, /*has_limit_below_one_block_=*/ false,
header,
num_inputs,
std::move(description_),

View File

@ -19,7 +19,7 @@ public:
SortDescription description,
size_t max_block_size)
: IMergingTransform(
num_inputs, header, header, true,
num_inputs, header, header, /*have_all_inputs_=*/ true, /*has_limit_below_one_block_=*/ false,
header,
num_inputs,
params,

View File

@ -15,7 +15,7 @@ public:
SortDescription description_, size_t max_block_size,
Graphite::Params params_, time_t time_of_merge_)
: IMergingTransform(
num_inputs, header, header, true,
num_inputs, header, header, /*have_all_inputs_=*/ true, /*has_limit_below_one_block_=*/ false,
header,
num_inputs,
std::move(description_),

View File

@ -14,9 +14,11 @@ IMergingTransformBase::IMergingTransformBase(
size_t num_inputs,
const Block & input_header,
const Block & output_header,
bool have_all_inputs_)
bool have_all_inputs_,
bool has_limit_below_one_block_)
: IProcessor(InputPorts(num_inputs, input_header), {output_header})
, have_all_inputs(have_all_inputs_)
, has_limit_below_one_block(has_limit_below_one_block_)
{
}
@ -64,10 +66,7 @@ IProcessor::Status IMergingTransformBase::prepareInitializeInputs()
continue;
if (input_states[i].is_initialized)
{
// input.setNotNeeded();
continue;
}
input.setNeeded();
@ -77,12 +76,17 @@ IProcessor::Status IMergingTransformBase::prepareInitializeInputs()
continue;
}
auto chunk = input.pull();
/// setNotNeeded after reading first chunk, because in optimismtic case
/// (e.g. with optimized 'ORDER BY primary_key LIMIT n' and small 'n')
/// we won't have to read any chunks anymore;
auto chunk = input.pull(has_limit_below_one_block);
if (!chunk.hasRows())
{
if (!input.isFinished())
{
input.setNeeded();
all_inputs_has_data = false;
}
continue;
}

View File

@ -16,7 +16,8 @@ public:
size_t num_inputs,
const Block & input_header,
const Block & output_header,
bool have_all_inputs_);
bool have_all_inputs_,
bool has_limit_below_one_block_);
OutputPort & getOutputPort() { return outputs.front(); }
@ -66,6 +67,7 @@ private:
std::vector<InputState> input_states;
std::atomic<bool> have_all_inputs;
bool is_initialized = false;
bool has_limit_below_one_block = false;
IProcessor::Status prepareInitializeInputs();
};
@ -81,8 +83,9 @@ public:
const Block & input_header,
const Block & output_header,
bool have_all_inputs_,
bool has_limit_below_one_block_,
Args && ... args)
: IMergingTransformBase(num_inputs, input_header, output_header, have_all_inputs_)
: IMergingTransformBase(num_inputs, input_header, output_header, have_all_inputs_, has_limit_below_one_block_)
, algorithm(std::forward<Args>(args) ...)
{
}

View File

@ -13,12 +13,13 @@ MergingSortedTransform::MergingSortedTransform(
SortDescription description_,
size_t max_block_size,
UInt64 limit_,
bool has_limit_below_one_block_,
WriteBuffer * out_row_sources_buf_,
bool quiet_,
bool use_average_block_sizes,
bool have_all_inputs_)
: IMergingTransform(
num_inputs, header, header, have_all_inputs_,
num_inputs, header, header, have_all_inputs_, has_limit_below_one_block_,
header,
num_inputs,
std::move(description_),

View File

@ -17,6 +17,7 @@ public:
SortDescription description,
size_t max_block_size,
UInt64 limit_ = 0,
bool has_limit_below_one_block_ = false,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool quiet_ = false,
bool use_average_block_sizes = false,

View File

@ -18,7 +18,7 @@ public:
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false)
: IMergingTransform(
num_inputs, header, header, true,
num_inputs, header, header, /*have_all_inputs_=*/ true, /*has_limit_below_one_block_=*/ false,
header,
num_inputs,
std::move(description_),

View File

@ -19,7 +19,7 @@ public:
const Names & partition_key_columns,
size_t max_block_size)
: IMergingTransform(
num_inputs, header, header, true,
num_inputs, header, header, /*have_all_inputs_=*/ true, /*has_limit_below_one_block_=*/ false,
header,
num_inputs,
std::move(description_),

View File

@ -19,7 +19,7 @@ public:
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false)
: IMergingTransform(
num_inputs, header, header, true,
num_inputs, header, header, /*have_all_inputs_=*/ true, /*has_limit_below_one_block_=*/ false,
header,
num_inputs,
std::move(description_),

View File

@ -31,12 +31,14 @@ FinishSortingStep::FinishSortingStep(
SortDescription prefix_description_,
SortDescription result_description_,
size_t max_block_size_,
UInt64 limit_)
UInt64 limit_,
bool has_filtration_)
: ITransformingStep(input_stream_, input_stream_.header, getTraits(limit_))
, prefix_description(std::move(prefix_description_))
, result_description(std::move(result_description_))
, max_block_size(max_block_size_)
, limit(limit_)
, has_filtration(has_filtration_)
{
/// TODO: check input_stream is sorted by prefix_description.
output_stream->sort_description = result_description;
@ -58,11 +60,14 @@ void FinishSortingStep::transformPipeline(QueryPipeline & pipeline, const BuildQ
if (pipeline.getNumStreams() > 1)
{
UInt64 limit_for_merging = (need_finish_sorting ? 0 : limit);
bool has_limit_below_one_block = !has_filtration && limit_for_merging && limit_for_merging < max_block_size;
auto transform = std::make_shared<MergingSortedTransform>(
pipeline.getHeader(),
pipeline.getNumStreams(),
prefix_description,
max_block_size, limit_for_merging);
max_block_size,
limit_for_merging,
has_limit_below_one_block);
pipeline.addTransform(std::move(transform));
}

View File

@ -13,8 +13,9 @@ public:
const DataStream & input_stream_,
SortDescription prefix_description_,
SortDescription result_description_,
size_t max_block_size,
UInt64 limit);
size_t max_block_size_,
UInt64 limit_,
bool has_filtration_);
String getName() const override { return "FinishSorting"; }
@ -31,6 +32,7 @@ private:
SortDescription result_description;
size_t max_block_size;
UInt64 limit;
bool has_filtration;
};
}

View File

@ -13,7 +13,7 @@
#include <Processors/Merges/ReplacingSortedTransform.h>
#include <Processors/Merges/SummingSortedTransform.h>
#include <Processors/Merges/VersionedCollapsingTransform.h>
#include <Storages/MergeTree/MergeTreeSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeInOrderSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeReverseSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeThreadSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
@ -179,26 +179,32 @@ template<typename TSource>
ProcessorPtr ReadFromMergeTree::createSource(
const RangesInDataPart & part,
const Names & required_columns,
bool use_uncompressed_cache)
bool use_uncompressed_cache,
bool has_limit_below_one_block)
{
return std::make_shared<TSource>(
data, metadata_snapshot, part.data_part, max_block_size, preferred_block_size_bytes,
preferred_max_column_in_block_size_bytes, required_columns, part.ranges, use_uncompressed_cache,
prewhere_info, actions_settings, true, reader_settings, virt_column_names, part.part_index_in_query);
preferred_max_column_in_block_size_bytes, required_columns, part.ranges, use_uncompressed_cache, prewhere_info,
actions_settings, true, reader_settings, virt_column_names, part.part_index_in_query, has_limit_below_one_block);
}
Pipe ReadFromMergeTree::readInOrder(
RangesInDataParts parts_with_range,
Names required_columns,
ReadType read_type,
bool use_uncompressed_cache)
bool use_uncompressed_cache,
UInt64 limit)
{
Pipes pipes;
/// For reading in order it makes sense to read only
/// one range per task to reduce number of read rows.
bool has_limit_below_one_block = read_type != ReadType::Default && limit && limit < max_block_size;
for (const auto & part : parts_with_range)
{
auto source = read_type == ReadType::InReverseOrder
? createSource<MergeTreeReverseSelectProcessor>(part, required_columns, use_uncompressed_cache)
: createSource<MergeTreeSelectProcessor>(part, required_columns, use_uncompressed_cache);
? createSource<MergeTreeReverseSelectProcessor>(part, required_columns, use_uncompressed_cache, has_limit_below_one_block)
: createSource<MergeTreeInOrderSelectProcessor>(part, required_columns, use_uncompressed_cache, has_limit_below_one_block);
pipes.emplace_back(std::move(source));
}
@ -224,7 +230,7 @@ Pipe ReadFromMergeTree::read(
return readFromPool(parts_with_range, required_columns, max_streams,
min_marks_for_concurrent_read, use_uncompressed_cache);
auto pipe = readInOrder(parts_with_range, required_columns, read_type, use_uncompressed_cache);
auto pipe = readInOrder(parts_with_range, required_columns, read_type, use_uncompressed_cache, 0);
/// Use ConcatProcessor to concat sources together.
/// It is needed to read in parts order (and so in PK order) if single thread is used.
@ -403,7 +409,6 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
{
RangesInDataPart part = parts_with_ranges.back();
parts_with_ranges.pop_back();
size_t & marks_in_part = info.sum_marks_in_parts.back();
/// We will not take too few rows from a part.
@ -418,8 +423,13 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
MarkRanges ranges_to_get_from_part;
/// We take full part if it contains enough marks or
/// if we know limit and part contains less than 'limit' rows.
bool take_full_part = marks_in_part <= need_marks
|| (input_order_info->limit && input_order_info->limit < part.getRowsCount());
/// We take the whole part if it is small enough.
if (marks_in_part <= need_marks)
if (take_full_part)
{
ranges_to_get_from_part = part.ranges;
@ -449,6 +459,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
}
parts_with_ranges.emplace_back(part);
}
ranges_to_get_from_part = split_ranges(ranges_to_get_from_part, input_order_info->direction);
new_parts.emplace_back(part.data_part, part.part_index_in_query, std::move(ranges_to_get_from_part));
}
@ -457,8 +468,8 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
? ReadFromMergeTree::ReadType::InOrder
: ReadFromMergeTree::ReadType::InReverseOrder;
pipes.emplace_back(read(std::move(new_parts), column_names, read_type,
requested_num_streams, info.min_marks_for_concurrent_read, info.use_uncompressed_cache));
pipes.emplace_back(readInOrder(std::move(new_parts), column_names, read_type,
info.use_uncompressed_cache, input_order_info->limit));
}
if (need_preliminary_merge)
@ -486,7 +497,8 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
pipe.getHeader(),
pipe.numOutputPorts(),
sort_description,
max_block_size);
max_block_size,
0, true);
pipe.addTransform(std::move(transform));
}

View File

@ -116,10 +116,10 @@ private:
Pipe read(RangesInDataParts parts_with_range, Names required_columns, ReadType read_type, size_t max_streams, size_t min_marks_for_concurrent_read, bool use_uncompressed_cache);
Pipe readFromPool(RangesInDataParts parts_with_ranges, Names required_columns, size_t max_streams, size_t min_marks_for_concurrent_read, bool use_uncompressed_cache);
Pipe readInOrder(RangesInDataParts parts_with_range, Names required_columns, ReadType read_type, bool use_uncompressed_cache);
Pipe readInOrder(RangesInDataParts parts_with_range, Names required_columns, ReadType read_type, bool use_uncompressed_cache, UInt64 limit);
template<typename TSource>
ProcessorPtr createSource(const RangesInDataPart & part, const Names & required_columns, bool use_uncompressed_cache);
ProcessorPtr createSource(const RangesInDataPart & part, const Names & required_columns, bool use_uncompressed_cache, bool has_limit_below_one_block);
Pipe spreadMarkRangesAmongStreams(
RangesInDataParts && parts_with_ranges,

View File

@ -200,6 +200,7 @@ void MergeSortingTransform::consume(Chunk chunk)
description,
max_merged_block_size,
limit,
false,
nullptr,
quiet,
use_average_block_sizes,

View File

@ -465,6 +465,19 @@ Block MergeTreeBaseSelectProcessor::transformHeader(
return block;
}
std::unique_ptr<MergeTreeBlockSizePredictor> MergeTreeBaseSelectProcessor::getSizePredictor(
const MergeTreeData::DataPartPtr & data_part,
const MergeTreeReadTaskColumns & task_columns,
const Block & sample_block)
{
const auto & required_column_names = task_columns.columns.getNames();
const auto & required_pre_column_names = task_columns.pre_columns.getNames();
NameSet complete_column_names(required_column_names.begin(), required_column_names.end());
complete_column_names.insert(required_pre_column_names.begin(), required_pre_column_names.end());
return std::make_unique<MergeTreeBlockSizePredictor>(
data_part, Names(complete_column_names.begin(), complete_column_names.end()), sample_block);
}
MergeTreeBaseSelectProcessor::~MergeTreeBaseSelectProcessor() = default;

View File

@ -37,6 +37,11 @@ public:
static Block transformHeader(
Block block, const PrewhereInfoPtr & prewhere_info, const DataTypePtr & partition_value_type, const Names & virtual_columns);
static std::unique_ptr<MergeTreeBlockSizePredictor> getSizePredictor(
const MergeTreeData::DataPartPtr & data_part,
const MergeTreeReadTaskColumns & task_columns,
const Block & sample_block);
protected:
Chunk generate() final;

View File

@ -70,7 +70,7 @@ struct MergeTreeReadTaskColumns
/// column names to read during PREWHERE
NamesAndTypesList pre_columns;
/// resulting block may require reordering in accordance with `ordered_names`
bool should_reorder;
bool should_reorder = false;
};
MergeTreeReadTaskColumns getReadTaskColumns(

View File

@ -894,7 +894,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
{
case MergeTreeData::MergingParams::Ordinary:
merged_transform = std::make_unique<MergingSortedTransform>(
header, pipes.size(), sort_description, merge_block_size, 0, rows_sources_write_buf.get(), true, blocks_are_granules_size);
header, pipes.size(), sort_description, merge_block_size, 0, false, rows_sources_write_buf.get(), true, blocks_are_granules_size);
break;
case MergeTreeData::MergingParams::Collapsing:

View File

@ -0,0 +1,54 @@
#include <Storages/MergeTree/MergeTreeInOrderSelectProcessor.h>
namespace DB
{
namespace ErrorCodes
{
extern const int MEMORY_LIMIT_EXCEEDED;
}
bool MergeTreeInOrderSelectProcessor::getNewTask()
try
{
if (all_mark_ranges.empty())
{
finish();
return false;
}
if (!reader)
initializeReaders();
MarkRanges mark_ranges_for_task;
/// If we need to read few rows, set one range per task to reduce number of read data.
if (has_limit_below_one_block)
{
mark_ranges_for_task = { std::move(all_mark_ranges.front()) };
all_mark_ranges.pop_front();
}
else
{
mark_ranges_for_task = std::move(all_mark_ranges);
all_mark_ranges.clear();
}
auto size_predictor = (preferred_block_size_bytes == 0) ? nullptr
: getSizePredictor(data_part, task_columns, sample_block);
task = std::make_unique<MergeTreeReadTask>(
data_part, mark_ranges_for_task, part_index_in_query, ordered_names, column_name_set, task_columns.columns,
task_columns.pre_columns, prewhere_info && prewhere_info->remove_prewhere_column,
task_columns.should_reorder, std::move(size_predictor));
return true;
}
catch (...)
{
/// Suspicion of the broken part. A part is added to the queue for verification.
if (getCurrentExceptionCode() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
storage.reportBrokenPart(data_part->name);
throw;
}
}

View File

@ -0,0 +1,31 @@
#pragma once
#include <Storages/MergeTree/MergeTreeSelectProcessor.h>
namespace DB
{
/// Used to read data from single part with select query in order of primary key.
/// Cares about PREWHERE, virtual columns, indexes etc.
/// To read data from multiple parts, Storage (MergeTree) creates multiple such objects.
class MergeTreeInOrderSelectProcessor final : public MergeTreeSelectProcessor
{
public:
template <typename... Args>
MergeTreeInOrderSelectProcessor(Args &&... args)
: MergeTreeSelectProcessor{std::forward<Args>(args)...}
{
LOG_DEBUG(log, "Reading {} ranges in order from part {}, approx. {} rows starting from {}",
all_mark_ranges.size(), data_part->name, total_rows,
data_part->index_granularity.getMarkStartingRow(all_mark_ranges.front().begin));
}
String getName() const override { return "MergeTreeInOrder"; }
private:
bool getNewTask() override;
Poco::Logger * log = &Poco::Logger::get("MergeTreeInOrderSelectProcessor");
};
}

View File

@ -228,29 +228,20 @@ std::vector<size_t> MergeTreeReadPool::fillPerPartInfo(
per_part_sum_marks.push_back(sum_marks);
auto [required_columns, required_pre_columns, should_reorder] =
getReadTaskColumns(data, metadata_snapshot, part.data_part, column_names, prewhere_info, check_columns);
auto task_columns = getReadTaskColumns(data, metadata_snapshot, part.data_part, column_names, prewhere_info, check_columns);
if (predict_block_size_bytes)
{
const auto & required_column_names = required_columns.getNames();
const auto & required_pre_column_names = required_pre_columns.getNames();
NameSet complete_column_names(required_column_names.begin(), required_column_names.end());
complete_column_names.insert(required_pre_column_names.begin(), required_pre_column_names.end());
auto size_predictor = !predict_block_size_bytes ? nullptr
: MergeTreeBaseSelectProcessor::getSizePredictor(part.data_part, task_columns, sample_block);
per_part_size_predictor.emplace_back(std::make_unique<MergeTreeBlockSizePredictor>(
part.data_part, Names(complete_column_names.begin(), complete_column_names.end()), sample_block));
}
else
per_part_size_predictor.emplace_back(nullptr);
per_part_size_predictor.emplace_back(std::move(size_predictor));
/// will be used to distinguish between PREWHERE and WHERE columns when applying filter
const auto & required_column_names = required_columns.getNames();
const auto & required_column_names = task_columns.columns.getNames();
per_part_column_name_set.emplace_back(required_column_names.begin(), required_column_names.end());
per_part_pre_columns.push_back(std::move(required_pre_columns));
per_part_columns.push_back(std::move(required_columns));
per_part_should_reorder.push_back(should_reorder);
per_part_pre_columns.push_back(std::move(task_columns.pre_columns));
per_part_columns.push_back(std::move(task_columns.columns));
per_part_should_reorder.push_back(task_columns.should_reorder);
parts_with_idx.push_back({ part.data_part, part.part_index_in_query });
}

View File

@ -1,8 +1,4 @@
#include <Storages/MergeTree/MergeTreeReverseSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeBaseSelectProcessor.h>
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <Interpreters/Context.h>
namespace DB
{
@ -12,74 +8,10 @@ namespace ErrorCodes
extern const int MEMORY_LIMIT_EXCEEDED;
}
MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor(
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const MergeTreeData::DataPartPtr & owned_data_part_,
UInt64 max_block_size_rows_,
size_t preferred_block_size_bytes_,
size_t preferred_max_column_in_block_size_bytes_,
Names required_columns_,
MarkRanges mark_ranges_,
bool use_uncompressed_cache_,
const PrewhereInfoPtr & prewhere_info_,
ExpressionActionsSettings actions_settings,
bool check_columns,
const MergeTreeReaderSettings & reader_settings_,
const Names & virt_column_names_,
size_t part_index_in_query_,
bool quiet)
:
MergeTreeBaseSelectProcessor{
metadata_snapshot_->getSampleBlockForColumns(required_columns_, storage_.getVirtuals(), storage_.getStorageID()),
storage_, metadata_snapshot_, prewhere_info_, std::move(actions_settings), max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
reader_settings_, use_uncompressed_cache_, virt_column_names_},
required_columns{std::move(required_columns_)},
data_part{owned_data_part_},
all_mark_ranges(std::move(mark_ranges_)),
part_index_in_query(part_index_in_query_),
path(data_part->getFullRelativePath())
{
/// Let's estimate total number of rows for progress bar.
for (const auto & range : all_mark_ranges)
total_marks_count += range.end - range.begin;
size_t total_rows = data_part->index_granularity.getRowsCountInRanges(all_mark_ranges);
if (!quiet)
LOG_DEBUG(log, "Reading {} ranges in reverse order from part {}, approx. {} rows starting from {}",
all_mark_ranges.size(), data_part->name, total_rows,
data_part->index_granularity.getMarkStartingRow(all_mark_ranges.front().begin));
addTotalRowsApprox(total_rows);
ordered_names = header_without_virtual_columns.getNames();
task_columns = getReadTaskColumns(storage, metadata_snapshot, data_part, required_columns, prewhere_info, check_columns);
/// will be used to distinguish between PREWHERE and WHERE columns when applying filter
const auto & column_names = task_columns.columns.getNames();
column_name_set = NameSet{column_names.begin(), column_names.end()};
if (use_uncompressed_cache)
owned_uncompressed_cache = storage.getContext()->getUncompressedCache();
owned_mark_cache = storage.getContext()->getMarkCache();
reader = data_part->getReader(task_columns.columns, metadata_snapshot,
all_mark_ranges, owned_uncompressed_cache.get(),
owned_mark_cache.get(), reader_settings);
if (prewhere_info)
pre_reader = data_part->getReader(task_columns.pre_columns, metadata_snapshot, all_mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings);
}
bool MergeTreeReverseSelectProcessor::getNewTask()
try
{
if ((chunks.empty() && all_mark_ranges.empty()) || total_marks_count == 0)
if (chunks.empty() && all_mark_ranges.empty())
{
finish();
return false;
@ -90,21 +22,15 @@ try
if (all_mark_ranges.empty())
return true;
if (!reader)
initializeReaders();
/// Read ranges from right to left.
MarkRanges mark_ranges_for_task = { all_mark_ranges.back() };
all_mark_ranges.pop_back();
std::unique_ptr<MergeTreeBlockSizePredictor> size_predictor;
if (preferred_block_size_bytes)
{
const auto & required_column_names = task_columns.columns.getNames();
const auto & required_pre_column_names = task_columns.pre_columns.getNames();
NameSet complete_column_names(required_column_names.begin(), required_column_names.end());
complete_column_names.insert(required_pre_column_names.begin(), required_pre_column_names.end());
size_predictor = std::make_unique<MergeTreeBlockSizePredictor>(
data_part, Names(complete_column_names.begin(), complete_column_names.end()), metadata_snapshot->getSampleBlock());
}
auto size_predictor = (preferred_block_size_bytes == 0) ? nullptr
: getSizePredictor(data_part, task_columns, sample_block);
task = std::make_unique<MergeTreeReadTask>(
data_part, mark_ranges_for_task, part_index_in_query, ordered_names, column_name_set,
@ -150,17 +76,4 @@ Chunk MergeTreeReverseSelectProcessor::readFromPart()
return res;
}
void MergeTreeReverseSelectProcessor::finish()
{
/** Close the files (before destroying the object).
* When many sources are created, but simultaneously reading only a few of them,
* buffers don't waste memory.
*/
reader.reset();
pre_reader.reset();
data_part.reset();
}
MergeTreeReverseSelectProcessor::~MergeTreeReverseSelectProcessor() = default;
}

View File

@ -1,77 +1,33 @@
#pragma once
#include <Storages/MergeTree/MergeTreeThreadSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MarkRange.h>
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/MergeTree/MergeTreeSelectProcessor.h>
namespace DB
{
/// Used to read data from single part with select query
/// in reverse order of primary key.
/// Cares about PREWHERE, virtual columns, indexes etc.
/// To read data from multiple parts, Storage (MergeTree) creates multiple such objects.
class MergeTreeReverseSelectProcessor : public MergeTreeBaseSelectProcessor
class MergeTreeReverseSelectProcessor final : public MergeTreeSelectProcessor
{
public:
MergeTreeReverseSelectProcessor(
const MergeTreeData & storage,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeData::DataPartPtr & owned_data_part,
UInt64 max_block_size_rows,
size_t preferred_block_size_bytes,
size_t preferred_max_column_in_block_size_bytes,
Names required_columns_,
MarkRanges mark_ranges,
bool use_uncompressed_cache,
const PrewhereInfoPtr & prewhere_info,
ExpressionActionsSettings actions_settings,
bool check_columns,
const MergeTreeReaderSettings & reader_settings,
const Names & virt_column_names = {},
size_t part_index_in_query = 0,
bool quiet = false);
~MergeTreeReverseSelectProcessor() override;
template <typename... Args>
MergeTreeReverseSelectProcessor(Args &&... args)
: MergeTreeSelectProcessor{std::forward<Args>(args)...}
{
LOG_DEBUG(log, "Reading {} ranges in reverse order from part {}, approx. {} rows starting from {}",
all_mark_ranges.size(), data_part->name, total_rows,
data_part->index_granularity.getMarkStartingRow(all_mark_ranges.front().begin));
}
String getName() const override { return "MergeTreeReverse"; }
/// Closes readers and unlock part locks
void finish();
protected:
private:
bool getNewTask() override;
Chunk readFromPart() override;
private:
Block header;
/// Used by Task
Names required_columns;
/// Names from header. Used in order to order columns in read blocks.
Names ordered_names;
NameSet column_name_set;
MergeTreeReadTaskColumns task_columns;
/// Data part will not be removed if the pointer owns it
MergeTreeData::DataPartPtr data_part;
/// Mark ranges we should read (in ascending order)
MarkRanges all_mark_ranges;
/// Total number of marks we should read
size_t total_marks_count = 0;
/// Value of _part_index virtual column (used only in SelectExecutor)
size_t part_index_in_query = 0;
String path;
Chunks chunks;
Poco::Logger * log = &Poco::Logger::get("MergeTreeReverseSelectProcessor");
};

View File

@ -7,11 +7,6 @@
namespace DB
{
namespace ErrorCodes
{
extern const int MEMORY_LIMIT_EXCEEDED;
}
MergeTreeSelectProcessor::MergeTreeSelectProcessor(
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
@ -28,96 +23,48 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
const MergeTreeReaderSettings & reader_settings_,
const Names & virt_column_names_,
size_t part_index_in_query_,
bool quiet)
:
MergeTreeBaseSelectProcessor{
bool has_limit_below_one_block_)
: MergeTreeBaseSelectProcessor{
metadata_snapshot_->getSampleBlockForColumns(required_columns_, storage_.getVirtuals(), storage_.getStorageID()),
storage_, metadata_snapshot_, prewhere_info_, std::move(actions_settings), max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
reader_settings_, use_uncompressed_cache_, virt_column_names_},
required_columns{std::move(required_columns_)},
data_part{owned_data_part_},
sample_block(metadata_snapshot_->getSampleBlock()),
all_mark_ranges(std::move(mark_ranges_)),
part_index_in_query(part_index_in_query_),
check_columns(check_columns_)
has_limit_below_one_block(has_limit_below_one_block_),
check_columns(check_columns_),
total_rows(data_part->index_granularity.getRowsCountInRanges(all_mark_ranges))
{
/// Let's estimate total number of rows for progress bar.
for (const auto & range : all_mark_ranges)
total_marks_count += range.end - range.begin;
size_t total_rows = data_part->index_granularity.getRowsCountInRanges(all_mark_ranges);
if (!quiet)
LOG_DEBUG(log, "Reading {} ranges from part {}, approx. {} rows starting from {}",
all_mark_ranges.size(), data_part->name, total_rows,
data_part->index_granularity.getMarkStartingRow(all_mark_ranges.front().begin));
addTotalRowsApprox(total_rows);
ordered_names = header_without_virtual_columns.getNames();
}
bool MergeTreeSelectProcessor::getNewTask()
try
void MergeTreeSelectProcessor::initializeReaders()
{
/// Produce no more than one task
if (!is_first_task || total_marks_count == 0)
{
finish();
return false;
}
is_first_task = false;
task_columns = getReadTaskColumns(
storage, metadata_snapshot, data_part,
required_columns, prewhere_info, check_columns);
std::unique_ptr<MergeTreeBlockSizePredictor> size_predictor;
if (preferred_block_size_bytes)
{
const auto & required_column_names = task_columns.columns.getNames();
const auto & required_pre_column_names = task_columns.pre_columns.getNames();
NameSet complete_column_names(required_column_names.begin(), required_column_names.end());
complete_column_names.insert(required_pre_column_names.begin(), required_pre_column_names.end());
size_predictor = std::make_unique<MergeTreeBlockSizePredictor>(
data_part, Names(complete_column_names.begin(), complete_column_names.end()), metadata_snapshot->getSampleBlock());
}
/// will be used to distinguish between PREWHERE and WHERE columns when applying filter
/// Will be used to distinguish between PREWHERE and WHERE columns when applying filter
const auto & column_names = task_columns.columns.getNames();
column_name_set = NameSet{column_names.begin(), column_names.end()};
task = std::make_unique<MergeTreeReadTask>(
data_part, all_mark_ranges, part_index_in_query, ordered_names, column_name_set, task_columns.columns,
task_columns.pre_columns, prewhere_info && prewhere_info->remove_prewhere_column,
task_columns.should_reorder, std::move(size_predictor));
if (use_uncompressed_cache)
owned_uncompressed_cache = storage.getContext()->getUncompressedCache();
if (!reader)
{
if (use_uncompressed_cache)
owned_uncompressed_cache = storage.getContext()->getUncompressedCache();
owned_mark_cache = storage.getContext()->getMarkCache();
owned_mark_cache = storage.getContext()->getMarkCache();
reader = data_part->getReader(task_columns.columns, metadata_snapshot, all_mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings);
reader = data_part->getReader(task_columns.columns, metadata_snapshot, all_mark_ranges,
if (prewhere_info)
pre_reader = data_part->getReader(task_columns.pre_columns, metadata_snapshot, all_mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings);
if (prewhere_info)
pre_reader = data_part->getReader(task_columns.pre_columns, metadata_snapshot, all_mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings);
}
return true;
}
catch (...)
{
/// Suspicion of the broken part. A part is added to the queue for verification.
if (getCurrentExceptionCode() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
storage.reportBrokenPart(data_part->name);
throw;
}
void MergeTreeSelectProcessor::finish()
{
@ -130,8 +77,6 @@ void MergeTreeSelectProcessor::finish()
data_part.reset();
}
MergeTreeSelectProcessor::~MergeTreeSelectProcessor() = default;
}

View File

@ -1,5 +1,5 @@
#pragma once
#include <Storages/MergeTree/MergeTreeThreadSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeBaseSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MarkRange.h>
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
@ -28,24 +28,21 @@ public:
bool use_uncompressed_cache,
const PrewhereInfoPtr & prewhere_info,
ExpressionActionsSettings actions_settings,
bool check_columns,
bool check_columns_,
const MergeTreeReaderSettings & reader_settings,
const Names & virt_column_names = {},
size_t part_index_in_query = 0,
bool quiet = false);
size_t part_index_in_query_ = 0,
bool has_limit_below_one_block_ = false);
~MergeTreeSelectProcessor() override;
String getName() const override { return "MergeTree"; }
/// Closes readers and unlock part locks
void finish();
protected:
bool getNewTask() override;
private:
/// Defer initialization from constructor, because it may be heavy
/// and it's better to do it lazily in `getNewTask`, which is executing in parallel.
void initializeReaders();
/// Used by Task
Names required_columns;
@ -58,17 +55,19 @@ private:
/// Data part will not be removed if the pointer owns it
MergeTreeData::DataPartPtr data_part;
/// Cache getSampleBlock call, which might be heavy.
Block sample_block;
/// Mark ranges we should read (in ascending order)
MarkRanges all_mark_ranges;
/// Total number of marks we should read
size_t total_marks_count = 0;
/// Value of _part_index virtual column (used only in SelectExecutor)
size_t part_index_in_query = 0;
/// If true, every task will be created only with one range.
/// It reduces amount of read data for queries with small LIMIT.
bool has_limit_below_one_block = false;
bool check_columns;
bool is_first_task = true;
Poco::Logger * log = &Poco::Logger::get("MergeTreeSelectProcessor");
size_t total_rows = 0;
};
}

View File

@ -37,7 +37,7 @@ ReadInOrderOptimizer::ReadInOrderOptimizer(
array_join_result_to_source = syntax_result->array_join_result_to_source;
}
InputOrderInfoPtr ReadInOrderOptimizer::getInputOrder(const StorageMetadataPtr & metadata_snapshot, ContextPtr context) const
InputOrderInfoPtr ReadInOrderOptimizer::getInputOrder(const StorageMetadataPtr & metadata_snapshot, ContextPtr context, UInt64 limit) const
{
Names sorting_key_columns = metadata_snapshot->getSortingKeyColumns();
if (!metadata_snapshot->hasSortingKey())
@ -155,7 +155,8 @@ InputOrderInfoPtr ReadInOrderOptimizer::getInputOrder(const StorageMetadataPtr &
if (order_key_prefix_descr.empty())
return {};
return std::make_shared<InputOrderInfo>(std::move(order_key_prefix_descr), read_direction);
return std::make_shared<InputOrderInfo>(std::move(order_key_prefix_descr), read_direction, limit);
}
}

View File

@ -22,7 +22,7 @@ public:
const SortDescription & required_sort_description,
const TreeRewriterResultPtr & syntax_result);
InputOrderInfoPtr getInputOrder(const StorageMetadataPtr & metadata_snapshot, ContextPtr context) const;
InputOrderInfoPtr getInputOrder(const StorageMetadataPtr & metadata_snapshot, ContextPtr context, UInt64 limit = 0) const;
private:
/// Actions for every element of order expression to analyze functions for monotonicity

View File

@ -83,9 +83,10 @@ struct InputOrderInfo
{
SortDescription order_key_prefix_descr;
int direction;
UInt64 limit;
InputOrderInfo(const SortDescription & order_key_prefix_descr_, int direction_)
: order_key_prefix_descr(order_key_prefix_descr_), direction(direction_) {}
InputOrderInfo(const SortDescription & order_key_prefix_descr_, int direction_, UInt64 limit_)
: order_key_prefix_descr(order_key_prefix_descr_), direction(direction_), limit(limit_) {}
bool operator ==(const InputOrderInfo & other) const
{

View File

@ -58,6 +58,7 @@ SRCS(
MergeTree/MergeTreeDataSelectExecutor.cpp
MergeTree/MergeTreeDataWriter.cpp
MergeTree/MergeTreeDeduplicationLog.cpp
MergeTree/MergeTreeInOrderSelectProcessor.cpp
MergeTree/MergeTreeIndexAggregatorBloomFilter.cpp
MergeTree/MergeTreeIndexBloomFilter.cpp
MergeTree/MergeTreeIndexConditionBloomFilter.cpp

View File

@ -9,9 +9,9 @@ ExpressionTransform
(SettingQuotaAndLimits)
(ReadFromMergeTree)
ExpressionTransform × 4
MergeTree 0 → 1
MergeTreeInOrder 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTree × 2 0 → 1
MergeTreeInOrder × 2 0 → 1
ExpressionTransform
MergeTree 0 → 1
MergeTreeInOrder 0 → 1

View File

@ -5,7 +5,7 @@ ExpressionTransform
ExpressionTransform
ReplacingSorted 2 → 1
ExpressionTransform × 2
MergeTree × 2 0 → 1
MergeTreeInOrder × 2 0 → 1
0 0
1 1
2 2
@ -22,4 +22,4 @@ ExpressionTransform × 2
Copy × 2 1 → 2
AddingSelector × 2
ExpressionTransform × 2
MergeTree × 2 0 → 1
MergeTreeInOrder × 2 0 → 1

View File

@ -0,0 +1,2 @@
1
1

View File

@ -0,0 +1,21 @@
DROP TABLE IF EXISTS order_by_desc;
CREATE TABLE order_by_desc (u UInt32, s String)
ENGINE MergeTree ORDER BY u PARTITION BY u % 100
SETTINGS index_granularity = 1024;
INSERT INTO order_by_desc SELECT number, repeat('a', 1024) FROM numbers(1024 * 300);
OPTIMIZE TABLE order_by_desc FINAL;
SELECT s FROM order_by_desc ORDER BY u DESC LIMIT 10 FORMAT Null
SETTINGS max_memory_usage = '400M';
SELECT s FROM order_by_desc ORDER BY u LIMIT 10 FORMAT Null
SETTINGS max_memory_usage = '400M';
SYSTEM FLUSH LOGS;
SELECT read_rows < 110000 FROM system.query_log
WHERE type = 'QueryFinish' AND current_database = currentDatabase()
AND event_time > now() - INTERVAL 10 SECOND
AND lower(query) LIKE lower('SELECT s FROM order_by_desc ORDER BY u%');