reduce memory usage in queries with 'ORDER BY primary_key'

This commit is contained in:
Anton Popov 2021-06-25 19:22:39 +03:00
parent 4a250dc14e
commit fa8ad1a7c6
25 changed files with 226 additions and 203 deletions

View File

@ -84,7 +84,7 @@ TEST(MergingSortedTest, SimpleBlockSizeTest)
EXPECT_EQ(pipe.numOutputPorts(), 3); EXPECT_EQ(pipe.numOutputPorts(), 3);
auto transform = std::make_shared<MergingSortedTransform>(pipe.getHeader(), pipe.numOutputPorts(), sort_description, auto transform = std::make_shared<MergingSortedTransform>(pipe.getHeader(), pipe.numOutputPorts(), sort_description,
DEFAULT_MERGE_BLOCK_SIZE, 0, nullptr, false, true); DEFAULT_MERGE_BLOCK_SIZE, 0, false, nullptr, false, true);
pipe.addTransform(std::move(transform)); pipe.addTransform(std::move(transform));
@ -129,7 +129,7 @@ TEST(MergingSortedTest, MoreInterestingBlockSizes)
EXPECT_EQ(pipe.numOutputPorts(), 3); EXPECT_EQ(pipe.numOutputPorts(), 3);
auto transform = std::make_shared<MergingSortedTransform>(pipe.getHeader(), pipe.numOutputPorts(), sort_description, auto transform = std::make_shared<MergingSortedTransform>(pipe.getHeader(), pipe.numOutputPorts(), sort_description,
DEFAULT_MERGE_BLOCK_SIZE, 0, nullptr, false, true); DEFAULT_MERGE_BLOCK_SIZE, 0, false, nullptr, false, true);
pipe.addTransform(std::move(transform)); pipe.addTransform(std::move(transform));

View File

@ -16,7 +16,7 @@ public:
const Block & header, size_t num_inputs, const Block & header, size_t num_inputs,
SortDescription description_, size_t max_block_size) SortDescription description_, size_t max_block_size)
: IMergingTransform( : IMergingTransform(
num_inputs, header, header, true, num_inputs, header, header, true, false,
header, header,
num_inputs, num_inputs,
std::move(description_), std::move(description_),

View File

@ -20,7 +20,7 @@ public:
WriteBuffer * out_row_sources_buf_ = nullptr, WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false) bool use_average_block_sizes = false)
: IMergingTransform( : IMergingTransform(
num_inputs, header, header, true, num_inputs, header, header, true, false,
header, header,
num_inputs, num_inputs,
std::move(description_), std::move(description_),

View File

@ -19,7 +19,7 @@ public:
SortDescription description, SortDescription description,
size_t max_block_size) size_t max_block_size)
: IMergingTransform( : IMergingTransform(
num_inputs, header, header, true, num_inputs, header, header, true, false,
header, header,
num_inputs, num_inputs,
params, params,

View File

@ -15,7 +15,7 @@ public:
SortDescription description_, size_t max_block_size, SortDescription description_, size_t max_block_size,
Graphite::Params params_, time_t time_of_merge_) Graphite::Params params_, time_t time_of_merge_)
: IMergingTransform( : IMergingTransform(
num_inputs, header, header, true, num_inputs, header, header, true, false,
header, header,
num_inputs, num_inputs,
std::move(description_), std::move(description_),

View File

@ -14,9 +14,11 @@ IMergingTransformBase::IMergingTransformBase(
size_t num_inputs, size_t num_inputs,
const Block & input_header, const Block & input_header,
const Block & output_header, const Block & output_header,
bool have_all_inputs_) bool have_all_inputs_,
bool expected_one_block_)
: IProcessor(InputPorts(num_inputs, input_header), {output_header}) : IProcessor(InputPorts(num_inputs, input_header), {output_header})
, have_all_inputs(have_all_inputs_) , have_all_inputs(have_all_inputs_)
, expected_one_block(expected_one_block_)
{ {
} }
@ -64,10 +66,7 @@ IProcessor::Status IMergingTransformBase::prepareInitializeInputs()
continue; continue;
if (input_states[i].is_initialized) if (input_states[i].is_initialized)
{
// input.setNotNeeded();
continue; continue;
}
input.setNeeded(); input.setNeeded();
@ -77,12 +76,17 @@ IProcessor::Status IMergingTransformBase::prepareInitializeInputs()
continue; continue;
} }
auto chunk = input.pull(); /// setNotNeeded after reading first chunk, because in optimismtic case
/// (e.g. with optimized 'ORDER BY primary_key LIMIT n' and small 'n')
/// we won't have to read any chunks anymore;
auto chunk = input.pull(expected_one_block);
if (!chunk.hasRows()) if (!chunk.hasRows())
{ {
if (!input.isFinished()) if (!input.isFinished())
{
input.setNeeded();
all_inputs_has_data = false; all_inputs_has_data = false;
}
continue; continue;
} }

View File

@ -16,7 +16,8 @@ public:
size_t num_inputs, size_t num_inputs,
const Block & input_header, const Block & input_header,
const Block & output_header, const Block & output_header,
bool have_all_inputs_); bool have_all_inputs_,
bool expected_one_block_);
OutputPort & getOutputPort() { return outputs.front(); } OutputPort & getOutputPort() { return outputs.front(); }
@ -66,6 +67,7 @@ private:
std::vector<InputState> input_states; std::vector<InputState> input_states;
std::atomic<bool> have_all_inputs; std::atomic<bool> have_all_inputs;
bool is_initialized = false; bool is_initialized = false;
bool expected_one_block = false;
IProcessor::Status prepareInitializeInputs(); IProcessor::Status prepareInitializeInputs();
}; };
@ -81,8 +83,9 @@ public:
const Block & input_header, const Block & input_header,
const Block & output_header, const Block & output_header,
bool have_all_inputs_, bool have_all_inputs_,
bool expected_one_block_,
Args && ... args) Args && ... args)
: IMergingTransformBase(num_inputs, input_header, output_header, have_all_inputs_) : IMergingTransformBase(num_inputs, input_header, output_header, have_all_inputs_, expected_one_block_)
, algorithm(std::forward<Args>(args) ...) , algorithm(std::forward<Args>(args) ...)
{ {
} }

View File

@ -13,12 +13,13 @@ MergingSortedTransform::MergingSortedTransform(
SortDescription description_, SortDescription description_,
size_t max_block_size, size_t max_block_size,
UInt64 limit_, UInt64 limit_,
bool expected_one_block_,
WriteBuffer * out_row_sources_buf_, WriteBuffer * out_row_sources_buf_,
bool quiet_, bool quiet_,
bool use_average_block_sizes, bool use_average_block_sizes,
bool have_all_inputs_) bool have_all_inputs_)
: IMergingTransform( : IMergingTransform(
num_inputs, header, header, have_all_inputs_, num_inputs, header, header, have_all_inputs_, expected_one_block_,
header, header,
num_inputs, num_inputs,
std::move(description_), std::move(description_),

View File

@ -17,6 +17,7 @@ public:
SortDescription description, SortDescription description,
size_t max_block_size, size_t max_block_size,
UInt64 limit_ = 0, UInt64 limit_ = 0,
bool expected_one_block_ = false,
WriteBuffer * out_row_sources_buf_ = nullptr, WriteBuffer * out_row_sources_buf_ = nullptr,
bool quiet_ = false, bool quiet_ = false,
bool use_average_block_sizes = false, bool use_average_block_sizes = false,

View File

@ -18,7 +18,7 @@ public:
WriteBuffer * out_row_sources_buf_ = nullptr, WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false) bool use_average_block_sizes = false)
: IMergingTransform( : IMergingTransform(
num_inputs, header, header, true, num_inputs, header, header, true, false,
header, header,
num_inputs, num_inputs,
std::move(description_), std::move(description_),

View File

@ -19,7 +19,7 @@ public:
const Names & partition_key_columns, const Names & partition_key_columns,
size_t max_block_size) size_t max_block_size)
: IMergingTransform( : IMergingTransform(
num_inputs, header, header, true, num_inputs, header, header, true, false,
header, header,
num_inputs, num_inputs,
std::move(description_), std::move(description_),

View File

@ -19,7 +19,7 @@ public:
WriteBuffer * out_row_sources_buf_ = nullptr, WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false) bool use_average_block_sizes = false)
: IMergingTransform( : IMergingTransform(
num_inputs, header, header, true, num_inputs, header, header, true, false,
header, header,
num_inputs, num_inputs,
std::move(description_), std::move(description_),

View File

@ -58,11 +58,14 @@ void FinishSortingStep::transformPipeline(QueryPipeline & pipeline, const BuildQ
if (pipeline.getNumStreams() > 1) if (pipeline.getNumStreams() > 1)
{ {
UInt64 limit_for_merging = (need_finish_sorting ? 0 : limit); UInt64 limit_for_merging = (need_finish_sorting ? 0 : limit);
bool expected_one_block = limit_for_merging && limit_for_merging < max_block_size;
auto transform = std::make_shared<MergingSortedTransform>( auto transform = std::make_shared<MergingSortedTransform>(
pipeline.getHeader(), pipeline.getHeader(),
pipeline.getNumStreams(), pipeline.getNumStreams(),
prefix_description, prefix_description,
max_block_size, limit_for_merging); max_block_size,
limit_for_merging,
expected_one_block);
pipeline.addTransform(std::move(transform)); pipeline.addTransform(std::move(transform));
} }

View File

@ -13,7 +13,7 @@
#include <Processors/Merges/ReplacingSortedTransform.h> #include <Processors/Merges/ReplacingSortedTransform.h>
#include <Processors/Merges/SummingSortedTransform.h> #include <Processors/Merges/SummingSortedTransform.h>
#include <Processors/Merges/VersionedCollapsingTransform.h> #include <Processors/Merges/VersionedCollapsingTransform.h>
#include <Storages/MergeTree/MergeTreeSelectProcessor.h> #include <Storages/MergeTree/MergeTreeInOrderSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeReverseSelectProcessor.h> #include <Storages/MergeTree/MergeTreeReverseSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.h> #include <Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h> #include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
@ -175,12 +175,13 @@ template<typename TSource>
ProcessorPtr ReadFromMergeTree::createSource( ProcessorPtr ReadFromMergeTree::createSource(
const RangesInDataPart & part, const RangesInDataPart & part,
const Names & required_columns, const Names & required_columns,
bool use_uncompressed_cache) bool use_uncompressed_cache,
bool one_range_per_task)
{ {
return std::make_shared<TSource>( return std::make_shared<TSource>(
data, metadata_snapshot, part.data_part, max_block_size, preferred_block_size_bytes, data, metadata_snapshot, part.data_part, max_block_size, preferred_block_size_bytes,
preferred_max_column_in_block_size_bytes, required_columns, part.ranges, use_uncompressed_cache, preferred_max_column_in_block_size_bytes, required_columns, part.ranges, use_uncompressed_cache,
prewhere_info, true, reader_settings, virt_column_names, part.part_index_in_query); prewhere_info, true, reader_settings, virt_column_names, part.part_index_in_query, one_range_per_task);
} }
Pipe ReadFromMergeTree::readInOrder( Pipe ReadFromMergeTree::readInOrder(
@ -190,11 +191,15 @@ Pipe ReadFromMergeTree::readInOrder(
bool use_uncompressed_cache) bool use_uncompressed_cache)
{ {
Pipes pipes; Pipes pipes;
/// For reading in order it makes sense to read only
/// one range per task to reduce number of read rows.
bool one_range_per_task = read_type != ReadType::Default;
for (const auto & part : parts_with_range) for (const auto & part : parts_with_range)
{ {
auto source = read_type == ReadType::InReverseOrder auto source = read_type == ReadType::InReverseOrder
? createSource<MergeTreeReverseSelectProcessor>(part, required_columns, use_uncompressed_cache) ? createSource<MergeTreeReverseSelectProcessor>(part, required_columns, use_uncompressed_cache, one_range_per_task)
: createSource<MergeTreeSelectProcessor>(part, required_columns, use_uncompressed_cache); : createSource<MergeTreeInOrderSelectProcessor>(part, required_columns, use_uncompressed_cache, one_range_per_task);
pipes.emplace_back(std::move(source)); pipes.emplace_back(std::move(source));
} }
@ -445,6 +450,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
} }
parts_with_ranges.emplace_back(part); parts_with_ranges.emplace_back(part);
} }
ranges_to_get_from_part = split_ranges(ranges_to_get_from_part, input_order_info->direction); ranges_to_get_from_part = split_ranges(ranges_to_get_from_part, input_order_info->direction);
new_parts.emplace_back(part.data_part, part.part_index_in_query, std::move(ranges_to_get_from_part)); new_parts.emplace_back(part.data_part, part.part_index_in_query, std::move(ranges_to_get_from_part));
} }
@ -482,7 +488,8 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
pipe.getHeader(), pipe.getHeader(),
pipe.numOutputPorts(), pipe.numOutputPorts(),
sort_description, sort_description,
max_block_size); max_block_size,
0, true);
pipe.addTransform(std::move(transform)); pipe.addTransform(std::move(transform));
} }

View File

@ -111,7 +111,7 @@ private:
Pipe readInOrder(RangesInDataParts parts_with_range, Names required_columns, ReadType read_type, bool use_uncompressed_cache); Pipe readInOrder(RangesInDataParts parts_with_range, Names required_columns, ReadType read_type, bool use_uncompressed_cache);
template<typename TSource> template<typename TSource>
ProcessorPtr createSource(const RangesInDataPart & part, const Names & required_columns, bool use_uncompressed_cache); ProcessorPtr createSource(const RangesInDataPart & part, const Names & required_columns, bool use_uncompressed_cache, bool one_range_per_task);
Pipe spreadMarkRangesAmongStreams( Pipe spreadMarkRangesAmongStreams(
RangesInDataParts && parts_with_ranges, RangesInDataParts && parts_with_ranges,

View File

@ -200,6 +200,7 @@ void MergeSortingTransform::consume(Chunk chunk)
description, description,
max_merged_block_size, max_merged_block_size,
limit, limit,
false,
nullptr, nullptr,
quiet, quiet,
use_average_block_sizes, use_average_block_sizes,

View File

@ -891,7 +891,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
{ {
case MergeTreeData::MergingParams::Ordinary: case MergeTreeData::MergingParams::Ordinary:
merged_transform = std::make_unique<MergingSortedTransform>( merged_transform = std::make_unique<MergingSortedTransform>(
header, pipes.size(), sort_description, merge_block_size, 0, rows_sources_write_buf.get(), true, blocks_are_granules_size); header, pipes.size(), sort_description, merge_block_size, 0, false, rows_sources_write_buf.get(), true, blocks_are_granules_size);
break; break;
case MergeTreeData::MergingParams::Collapsing: case MergeTreeData::MergingParams::Collapsing:

View File

@ -0,0 +1,80 @@
#include <Storages/MergeTree/MergeTreeInOrderSelectProcessor.h>
namespace DB
{
namespace ErrorCodes
{
extern const int MEMORY_LIMIT_EXCEEDED;
}
MergeTreeInOrderSelectProcessor::MergeTreeInOrderSelectProcessor(
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const MergeTreeData::DataPartPtr & owned_data_part_,
UInt64 max_block_size_rows_,
size_t preferred_block_size_bytes_,
size_t preferred_max_column_in_block_size_bytes_,
Names required_columns_,
MarkRanges mark_ranges_,
bool use_uncompressed_cache_,
const PrewhereInfoPtr & prewhere_info_,
bool check_columns_,
const MergeTreeReaderSettings & reader_settings_,
const Names & virt_column_names_,
bool one_range_per_task_,
bool quiet)
: MergeTreeSelectProcessor{
storage_, metadata_snapshot_, owned_data_part_, max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
required_columns_, std::move(mark_ranges_), use_uncompressed_cache_, prewhere_info_,
check_columns_, reader_settings_, virt_column_names_, one_range_per_task_}
{
if (!quiet)
LOG_DEBUG(log, "Reading {} ranges in order from part {}, approx. {} rows starting from {}",
all_mark_ranges.size(), data_part->name, total_rows,
data_part->index_granularity.getMarkStartingRow(all_mark_ranges.front().begin));
}
bool MergeTreeInOrderSelectProcessor::getNewTask()
try
{
/// Produce no more than one task
if (all_mark_ranges.empty())
{
finish();
return false;
}
auto size_predictor = (preferred_block_size_bytes == 0)
? nullptr
: std::make_unique<MergeTreeBlockSizePredictor>(data_part, ordered_names, metadata_snapshot->getSampleBlock());
MarkRanges mark_ranges_for_task;
if (one_range_per_task)
{
mark_ranges_for_task = { std::move(all_mark_ranges.front()) };
all_mark_ranges.pop_front();
}
else
{
mark_ranges_for_task = std::move(all_mark_ranges);
all_mark_ranges.clear();
}
task = std::make_unique<MergeTreeReadTask>(
data_part, mark_ranges_for_task, part_index_in_query, ordered_names, column_name_set, task_columns.columns,
task_columns.pre_columns, prewhere_info && prewhere_info->remove_prewhere_column,
task_columns.should_reorder, std::move(size_predictor));
return true;
}
catch (...)
{
/// Suspicion of the broken part. A part is added to the queue for verification.
if (getCurrentExceptionCode() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
storage.reportBrokenPart(data_part->name);
throw;
}
}

View File

@ -0,0 +1,39 @@
#pragma once
#include <Storages/MergeTree/MergeTreeSelectProcessor.h>
namespace DB
{
/// Used to read data from single part with select query in order of primary key.
/// Cares about PREWHERE, virtual columns, indexes etc.
/// To read data from multiple parts, Storage (MergeTree) creates multiple such objects.
class MergeTreeInOrderSelectProcessor : public MergeTreeSelectProcessor
{
public:
MergeTreeInOrderSelectProcessor(
const MergeTreeData & storage,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeData::DataPartPtr & owned_data_part,
UInt64 max_block_size_rows,
size_t preferred_block_size_bytes,
size_t preferred_max_column_in_block_size_bytes,
Names required_columns_,
MarkRanges mark_ranges,
bool use_uncompressed_cache,
const PrewhereInfoPtr & prewhere_info,
bool check_columns,
const MergeTreeReaderSettings & reader_settings,
const Names & virt_column_names = {},
bool one_range_per_task_ = false,
bool quiet = false);
String getName() const override { return "MergeTreeInOrder"; }
private:
bool getNewTask() override;
Poco::Logger * log = &Poco::Logger::get("MergeTreeInOrderSelectProcessor");
};
}

View File

@ -1,8 +1,4 @@
#include <Storages/MergeTree/MergeTreeReverseSelectProcessor.h> #include <Storages/MergeTree/MergeTreeReverseSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeBaseSelectProcessor.h>
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <Interpreters/Context.h>
namespace DB namespace DB
{ {
@ -23,62 +19,27 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor(
MarkRanges mark_ranges_, MarkRanges mark_ranges_,
bool use_uncompressed_cache_, bool use_uncompressed_cache_,
const PrewhereInfoPtr & prewhere_info_, const PrewhereInfoPtr & prewhere_info_,
bool check_columns, bool check_columns_,
const MergeTreeReaderSettings & reader_settings_, const MergeTreeReaderSettings & reader_settings_,
const Names & virt_column_names_, const Names & virt_column_names_,
size_t part_index_in_query_, bool one_range_per_task_,
bool quiet) bool quiet)
: : MergeTreeSelectProcessor{
MergeTreeBaseSelectProcessor{ storage_, metadata_snapshot_, owned_data_part_, max_block_size_rows_,
metadata_snapshot_->getSampleBlockForColumns(required_columns_, storage_.getVirtuals(), storage_.getStorageID()),
storage_, metadata_snapshot_, prewhere_info_, max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
reader_settings_, use_uncompressed_cache_, virt_column_names_}, required_columns_, std::move(mark_ranges_), use_uncompressed_cache_, prewhere_info_,
required_columns{std::move(required_columns_)}, check_columns_, reader_settings_, virt_column_names_, one_range_per_task_}
data_part{owned_data_part_},
all_mark_ranges(std::move(mark_ranges_)),
part_index_in_query(part_index_in_query_),
path(data_part->getFullRelativePath())
{ {
/// Let's estimate total number of rows for progress bar.
for (const auto & range : all_mark_ranges)
total_marks_count += range.end - range.begin;
size_t total_rows = data_part->index_granularity.getRowsCountInRanges(all_mark_ranges);
if (!quiet) if (!quiet)
LOG_DEBUG(log, "Reading {} ranges in reverse order from part {}, approx. {} rows starting from {}", LOG_DEBUG(log, "Reading {} ranges in reverse order from part {}, approx. {} rows starting from {}",
all_mark_ranges.size(), data_part->name, total_rows, all_mark_ranges.size(), data_part->name, total_rows,
data_part->index_granularity.getMarkStartingRow(all_mark_ranges.front().begin)); data_part->index_granularity.getMarkStartingRow(all_mark_ranges.front().begin));
addTotalRowsApprox(total_rows);
ordered_names = header_without_virtual_columns.getNames();
task_columns = getReadTaskColumns(storage, metadata_snapshot, data_part, required_columns, prewhere_info, check_columns);
/// will be used to distinguish between PREWHERE and WHERE columns when applying filter
const auto & column_names = task_columns.columns.getNames();
column_name_set = NameSet{column_names.begin(), column_names.end()};
if (use_uncompressed_cache)
owned_uncompressed_cache = storage.getContext()->getUncompressedCache();
owned_mark_cache = storage.getContext()->getMarkCache();
reader = data_part->getReader(task_columns.columns, metadata_snapshot,
all_mark_ranges, owned_uncompressed_cache.get(),
owned_mark_cache.get(), reader_settings);
if (prewhere_info)
pre_reader = data_part->getReader(task_columns.pre_columns, metadata_snapshot, all_mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings);
} }
bool MergeTreeReverseSelectProcessor::getNewTask() bool MergeTreeReverseSelectProcessor::getNewTask()
try try
{ {
if ((chunks.empty() && all_mark_ranges.empty()) || total_marks_count == 0) if (chunks.empty() && all_mark_ranges.empty())
{ {
finish(); finish();
return false; return false;
@ -141,17 +102,4 @@ Chunk MergeTreeReverseSelectProcessor::readFromPart()
return res; return res;
} }
void MergeTreeReverseSelectProcessor::finish()
{
/** Close the files (before destroying the object).
* When many sources are created, but simultaneously reading only a few of them,
* buffers don't waste memory.
*/
reader.reset();
pre_reader.reset();
data_part.reset();
}
MergeTreeReverseSelectProcessor::~MergeTreeReverseSelectProcessor() = default;
} }

View File

@ -1,19 +1,15 @@
#pragma once #pragma once
#include <DataStreams/IBlockInputStream.h> #include <Storages/MergeTree/MergeTreeSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MarkRange.h>
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
#include <Storages/SelectQueryInfo.h>
namespace DB namespace DB
{ {
/// Used to read data from single part with select query /// Used to read data from single part with select query
/// in reverse order of primary key.
/// Cares about PREWHERE, virtual columns, indexes etc. /// Cares about PREWHERE, virtual columns, indexes etc.
/// To read data from multiple parts, Storage (MergeTree) creates multiple such objects. /// To read data from multiple parts, Storage (MergeTree) creates multiple such objects.
class MergeTreeReverseSelectProcessor : public MergeTreeBaseSelectProcessor class MergeTreeReverseSelectProcessor : public MergeTreeSelectProcessor
{ {
public: public:
MergeTreeReverseSelectProcessor( MergeTreeReverseSelectProcessor(
@ -30,46 +26,16 @@ public:
bool check_columns, bool check_columns,
const MergeTreeReaderSettings & reader_settings, const MergeTreeReaderSettings & reader_settings,
const Names & virt_column_names = {}, const Names & virt_column_names = {},
size_t part_index_in_query = 0, bool one_range_per_task_ = false,
bool quiet = false); bool quiet = false);
~MergeTreeReverseSelectProcessor() override;
String getName() const override { return "MergeTreeReverse"; } String getName() const override { return "MergeTreeReverse"; }
/// Closes readers and unlock part locks private:
void finish();
protected:
bool getNewTask() override; bool getNewTask() override;
Chunk readFromPart() override; Chunk readFromPart() override;
private:
Block header;
/// Used by Task
Names required_columns;
/// Names from header. Used in order to order columns in read blocks.
Names ordered_names;
NameSet column_name_set;
MergeTreeReadTaskColumns task_columns;
/// Data part will not be removed if the pointer owns it
MergeTreeData::DataPartPtr data_part;
/// Mark ranges we should read (in ascending order)
MarkRanges all_mark_ranges;
/// Total number of marks we should read
size_t total_marks_count = 0;
/// Value of _part_index virtual column (used only in SelectExecutor)
size_t part_index_in_query = 0;
String path;
Chunks chunks; Chunks chunks;
Poco::Logger * log = &Poco::Logger::get("MergeTreeReverseSelectProcessor"); Poco::Logger * log = &Poco::Logger::get("MergeTreeReverseSelectProcessor");
}; };

View File

@ -26,10 +26,8 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
bool check_columns_, bool check_columns_,
const MergeTreeReaderSettings & reader_settings_, const MergeTreeReaderSettings & reader_settings_,
const Names & virt_column_names_, const Names & virt_column_names_,
size_t part_index_in_query_, bool one_range_per_task_)
bool quiet) : MergeTreeBaseSelectProcessor{
:
MergeTreeBaseSelectProcessor{
metadata_snapshot_->getSampleBlockForColumns(required_columns_, storage_.getVirtuals(), storage_.getStorageID()), metadata_snapshot_->getSampleBlockForColumns(required_columns_, storage_.getVirtuals(), storage_.getStorageID()),
storage_, metadata_snapshot_, prewhere_info_, max_block_size_rows_, storage_, metadata_snapshot_, prewhere_info_, max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
@ -37,78 +35,33 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
required_columns{std::move(required_columns_)}, required_columns{std::move(required_columns_)},
data_part{owned_data_part_}, data_part{owned_data_part_},
all_mark_ranges(std::move(mark_ranges_)), all_mark_ranges(std::move(mark_ranges_)),
part_index_in_query(part_index_in_query_), one_range_per_task(one_range_per_task_),
check_columns(check_columns_) check_columns(check_columns_),
total_rows(data_part->index_granularity.getRowsCountInRanges(all_mark_ranges))
{ {
/// Let's estimate total number of rows for progress bar. /// will be used to distinguish between PREWHERE and WHERE columns when applying filter
for (const auto & range : all_mark_ranges) const auto & column_names = task_columns.columns.getNames();
total_marks_count += range.end - range.begin; column_name_set = NameSet{column_names.begin(), column_names.end()};
size_t total_rows = data_part->index_granularity.getRowsCountInRanges(all_mark_ranges);
if (!quiet)
LOG_DEBUG(log, "Reading {} ranges from part {}, approx. {} rows starting from {}",
all_mark_ranges.size(), data_part->name, total_rows,
data_part->index_granularity.getMarkStartingRow(all_mark_ranges.front().begin));
addTotalRowsApprox(total_rows);
ordered_names = header_without_virtual_columns.getNames();
}
bool MergeTreeSelectProcessor::getNewTask()
try
{
/// Produce no more than one task
if (!is_first_task || total_marks_count == 0)
{
finish();
return false;
}
is_first_task = false;
task_columns = getReadTaskColumns( task_columns = getReadTaskColumns(
storage, metadata_snapshot, data_part, storage, metadata_snapshot, data_part,
required_columns, prewhere_info, check_columns); required_columns, prewhere_info, check_columns);
auto size_predictor = (preferred_block_size_bytes == 0) if (use_uncompressed_cache)
? nullptr owned_uncompressed_cache = storage.getContext()->getUncompressedCache();
: std::make_unique<MergeTreeBlockSizePredictor>(data_part, ordered_names, metadata_snapshot->getSampleBlock());
/// will be used to distinguish between PREWHERE and WHERE columns when applying filter owned_mark_cache = storage.getContext()->getMarkCache();
const auto & column_names = task_columns.columns.getNames();
column_name_set = NameSet{column_names.begin(), column_names.end()};
task = std::make_unique<MergeTreeReadTask>( reader = data_part->getReader(task_columns.columns, metadata_snapshot, all_mark_ranges,
data_part, all_mark_ranges, part_index_in_query, ordered_names, column_name_set, task_columns.columns, owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings);
task_columns.pre_columns, prewhere_info && prewhere_info->remove_prewhere_column,
task_columns.should_reorder, std::move(size_predictor));
if (!reader) if (prewhere_info)
{ pre_reader = data_part->getReader(task_columns.pre_columns, metadata_snapshot, all_mark_ranges,
if (use_uncompressed_cache)
owned_uncompressed_cache = storage.getContext()->getUncompressedCache();
owned_mark_cache = storage.getContext()->getMarkCache();
reader = data_part->getReader(task_columns.columns, metadata_snapshot, all_mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings); owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings);
if (prewhere_info) addTotalRowsApprox(total_rows);
pre_reader = data_part->getReader(task_columns.pre_columns, metadata_snapshot, all_mark_ranges, ordered_names = header_without_virtual_columns.getNames();
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings);
}
return true;
} }
catch (...)
{
/// Suspicion of the broken part. A part is added to the queue for verification.
if (getCurrentExceptionCode() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
storage.reportBrokenPart(data_part->name);
throw;
}
void MergeTreeSelectProcessor::finish() void MergeTreeSelectProcessor::finish()
{ {
@ -121,8 +74,6 @@ void MergeTreeSelectProcessor::finish()
data_part.reset(); data_part.reset();
} }
MergeTreeSelectProcessor::~MergeTreeSelectProcessor() = default; MergeTreeSelectProcessor::~MergeTreeSelectProcessor() = default;
} }

View File

@ -1,6 +1,6 @@
#pragma once #pragma once
#include <DataStreams/IBlockInputStream.h> #include <DataStreams/IBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.h> #include <Storages/MergeTree/MergeTreeBaseSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeData.h> #include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MarkRange.h> #include <Storages/MergeTree/MarkRange.h>
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h> #include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
@ -30,21 +30,18 @@ public:
bool check_columns, bool check_columns,
const MergeTreeReaderSettings & reader_settings, const MergeTreeReaderSettings & reader_settings,
const Names & virt_column_names = {}, const Names & virt_column_names = {},
size_t part_index_in_query = 0, bool one_range_per_task_ = false);
bool quiet = false);
~MergeTreeSelectProcessor() override; ~MergeTreeSelectProcessor() override;
String getName() const override { return "MergeTree"; } String getName() const override = 0;
/// Closes readers and unlock part locks /// Closes readers and unlock part locks
void finish(); void finish();
protected: protected:
bool getNewTask() override; bool getNewTask() override = 0;
private:
/// Used by Task /// Used by Task
Names required_columns; Names required_columns;
@ -59,15 +56,14 @@ private:
/// Mark ranges we should read (in ascending order) /// Mark ranges we should read (in ascending order)
MarkRanges all_mark_ranges; MarkRanges all_mark_ranges;
/// Total number of marks we should read
size_t total_marks_count = 0;
/// Value of _part_index virtual column (used only in SelectExecutor) /// Value of _part_index virtual column (used only in SelectExecutor)
size_t part_index_in_query = 0; size_t part_index_in_query = 0;
/// If true, every task will be created only with one range.
/// It reduces amount of read data for queries with small LIMIT.
bool one_range_per_task = false;
bool check_columns; bool check_columns;
bool is_first_task = true; size_t total_rows;
Poco::Logger * log = &Poco::Logger::get("MergeTreeSelectProcessor");
}; };
} }

View File

@ -0,0 +1,2 @@
1
1

View File

@ -0,0 +1,21 @@
DROP TABLE IF EXISTS order_by_desc;
CREATE TABLE order_by_desc (u UInt32, s String)
ENGINE MergeTree ORDER BY u PARTITION BY u % 100
SETTINGS index_granularity = 1024;
INSERT INTO order_by_desc SELECT number, repeat('a', 1024) FROM numbers(1024 * 300);
OPTIMIZE TABLE order_by_desc FINAL;
SELECT s FROM order_by_desc ORDER BY u DESC LIMIT 10 FORMAT Null
SETTINGS max_memory_usage = '400M';
SELECT s FROM order_by_desc ORDER BY u LIMIT 10 FORMAT Null
SETTINGS max_memory_usage = '400M';
SYSTEM FLUSH LOGS;
SELECT read_rows < 110000 FROM system.query_log
WHERE type = 'QueryFinish' AND current_database = currentDatabase()
AND event_time > now() - INTERVAL 10 SECOND
AND lower(query) LIKE lower('SELECT s FROM order_by_desc ORDER BY u%');