Try to choose sorting transform based on sort description with fallback

This commit is contained in:
Igor Nikonov 2022-07-11 06:40:26 +00:00
parent d7888de869
commit 47bed7e318
6 changed files with 133 additions and 90 deletions

View File

@ -3,7 +3,7 @@
#include <IO/Operators.h> #include <IO/Operators.h>
#include <Common/JSONBuilder.h> #include <Common/JSONBuilder.h>
#include <Common/SipHash.h> #include <Common/SipHash.h>
#include <Common/logger_useful.h>
#if USE_EMBEDDED_COMPILER #if USE_EMBEDDED_COMPILER
#include <DataTypes/Native.h> #include <DataTypes/Native.h>
#include <Interpreters/JIT/compileFunction.h> #include <Interpreters/JIT/compileFunction.h>
@ -13,6 +13,13 @@
namespace DB namespace DB
{ {
static Poco::Logger * getLogger()
{
static Poco::Logger & logger = Poco::Logger::get("SortDescription");
return &logger;
}
void dumpSortDescription(const SortDescription & description, WriteBuffer & out) void dumpSortDescription(const SortDescription & description, WriteBuffer & out)
{ {
bool first = true; bool first = true;
@ -50,7 +57,10 @@ bool SortDescription::hasPrefix(const SortDescription & prefix) const
for (size_t i = 0; i < prefix.size(); ++i) for (size_t i = 0; i < prefix.size(); ++i)
{ {
if ((*this)[i] != prefix[i]) if ((*this)[i] != prefix[i])
{
LOG_DEBUG(getLogger(), "index: {}\norigin: {}\nprefix: {}", i, (*this)[i].dump(), prefix[i].dump());
return false; return false;
}
} }
return true; return true;
} }
@ -89,12 +99,6 @@ static std::string getSortDescriptionDump(const SortDescription & description, c
return buffer.str(); return buffer.str();
} }
static Poco::Logger * getLogger()
{
static Poco::Logger & logger = Poco::Logger::get("SortDescription");
return &logger;
}
void compileSortDescriptionIfNeeded(SortDescription & description, const DataTypes & sort_description_types, bool increase_compile_attempts) void compileSortDescriptionIfNeeded(SortDescription & description, const DataTypes & sort_description_types, bool increase_compile_attempts)
{ {
static std::unordered_map<UInt128, UInt64, UInt128Hash> counter; static std::unordered_map<UInt128, UInt64, UInt128Hash> counter;

View File

@ -10,7 +10,7 @@ namespace DB
MergingSortedTransform::MergingSortedTransform( MergingSortedTransform::MergingSortedTransform(
const Block & header, const Block & header,
size_t num_inputs, size_t num_inputs,
SortDescription description_, const SortDescription & description_,
size_t max_block_size, size_t max_block_size,
SortingQueueStrategy sorting_queue_strategy, SortingQueueStrategy sorting_queue_strategy,
UInt64 limit_, UInt64 limit_,
@ -19,7 +19,11 @@ MergingSortedTransform::MergingSortedTransform(
bool use_average_block_sizes, bool use_average_block_sizes,
bool have_all_inputs_) bool have_all_inputs_)
: IMergingTransform( : IMergingTransform(
num_inputs, header, header, have_all_inputs_, limit_, num_inputs,
header,
header,
have_all_inputs_,
limit_,
header, header,
num_inputs, num_inputs,
std::move(description_), std::move(description_),

View File

@ -14,7 +14,7 @@ public:
MergingSortedTransform( MergingSortedTransform(
const Block & header, const Block & header,
size_t num_inputs, size_t num_inputs,
SortDescription description, const SortDescription & description,
size_t max_block_size, size_t max_block_size,
SortingQueueStrategy sorting_queue_strategy, SortingQueueStrategy sorting_queue_strategy,
UInt64 limit_ = 0, UInt64 limit_ = 0,

View File

@ -147,6 +147,7 @@ ReadFromMergeTree::ReadFromMergeTree(
if (std::find_if(header.begin(), header.end(), [&](ColumnWithTypeAndName const & col) { return col.name == column_name; }) if (std::find_if(header.begin(), header.end(), [&](ColumnWithTypeAndName const & col) { return col.name == column_name; })
== header.end()) == header.end())
break; break;
// fixme: there is no information about NULLS direction
sort_description.emplace_back(column_name, sort_direction); sort_description.emplace_back(column_name, sort_direction);
} }
output_stream->sort_description = std::move(sort_description); output_stream->sort_description = std::move(sort_description);

View File

@ -8,6 +8,7 @@
#include <Processors/Transforms/PartialSortingTransform.h> #include <Processors/Transforms/PartialSortingTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h> #include <QueryPipeline/QueryPipelineBuilder.h>
#include <Common/JSONBuilder.h> #include <Common/JSONBuilder.h>
#include "Core/SortDescription.h"
namespace DB namespace DB
{ {
@ -102,10 +103,64 @@ void SortingStep::updateLimit(size_t limit_)
} }
} }
void SortingStep::finishSorting(QueryPipelineBuilder & pipeline)
{
if (pipeline.getNumStreams() > 1)
{
auto transform = std::make_shared<MergingSortedTransform>(
pipeline.getHeader(), pipeline.getNumStreams(), prefix_description, max_block_size, SortingQueueStrategy::Batch, 0);
pipeline.addTransform(std::move(transform));
}
pipeline.addSimpleTransform(
[&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
{
if (stream_type != QueryPipelineBuilder::StreamType::Main)
return nullptr;
return std::make_shared<PartialSortingTransform>(header, result_description, limit);
});
bool increase_sort_description_compile_attempts = true;
/// NOTE limits are not applied to the size of temporary sets in FinishSortingTransform
pipeline.addSimpleTransform(
[&, increase_sort_description_compile_attempts](const Block & header) mutable -> ProcessorPtr
{
/** For multiple FinishSortingTransform we need to count identical comparators only once per QueryPlan
* To property support min_count_to_compile_sort_description.
*/
bool increase_sort_description_compile_attempts_current = increase_sort_description_compile_attempts;
if (increase_sort_description_compile_attempts)
increase_sort_description_compile_attempts = false;
return std::make_shared<FinishSortingTransform>(
header, prefix_description, result_description, max_block_size, limit, increase_sort_description_compile_attempts_current);
});
}
void SortingStep::mergingSorted(QueryPipelineBuilder & pipeline, const SortDescription & sort_desc, const UInt64 limit_)
{
if (pipeline.getNumStreams() > 1)
{
auto transform = std::make_shared<MergingSortedTransform>(
pipeline.getHeader(),
pipeline.getNumStreams(),
sort_desc,
max_block_size,
SortingQueueStrategy::Batch,
limit_);
pipeline.addTransform(std::move(transform));
}
}
void SortingStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) void SortingStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{ {
const auto input_sort_mode = input_streams.front().sort_mode; const auto input_sort_mode = input_streams.front().sort_mode;
const SortDescription& input_sort_desc = input_streams.front().sort_description; const SortDescription & input_sort_desc = input_streams.front().sort_description;
if (input_sort_mode == DataStream::SortMode::Stream && input_sort_desc.hasPrefix(result_description)) if (input_sort_mode == DataStream::SortMode::Stream && input_sort_desc.hasPrefix(result_description))
return; return;
@ -113,27 +168,36 @@ void SortingStep::transformPipeline(QueryPipelineBuilder & pipeline, const Build
/// merge sorted /// merge sorted
if (input_sort_mode == DataStream::SortMode::Chunk && input_sort_desc.hasPrefix(result_description)) if (input_sort_mode == DataStream::SortMode::Chunk && input_sort_desc.hasPrefix(result_description))
{ {
if (pipeline.getNumStreams() > 1) mergingSorted(pipeline, result_description, limit);
{
auto transform = std::make_shared<MergingSortedTransform>(
pipeline.getHeader(), pipeline.getNumStreams(), result_description, max_block_size, SortingQueueStrategy::Batch, limit);
pipeline.addTransform(std::move(transform));
}
return; return;
} }
/// finish shorting /// finish shorting
if (input_sort_mode == DataStream::SortMode::Chunk && result_description.hasPrefix(input_sort_desc)) if (input_sort_mode == DataStream::SortMode::Chunk && result_description.hasPrefix(input_sort_desc))
{ {
if (pipeline.getNumStreams() > 1) finishSorting(pipeline);
return;
}
if (type == Type::FinishSorting)
{
bool need_finish_sorting = (prefix_description.size() < result_description.size());
mergingSorted(pipeline, prefix_description, (need_finish_sorting ? 0 : limit));
if (need_finish_sorting)
{ {
auto transform = std::make_shared<MergingSortedTransform>( finishSorting(pipeline);
pipeline.getHeader(), pipeline.getNumStreams(), prefix_description, max_block_size, SortingQueueStrategy::Batch, 0);
pipeline.addTransform(std::move(transform));
} }
return;
}
if (type == Type::MergingSorted)
{ /// If there are several streams, then we merge them into one
mergingSorted(pipeline, result_description, limit);
return;
}
/// Full sorting
{
pipeline.addSimpleTransform( pipeline.addSimpleTransform(
[&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr [&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
{ {
@ -143,87 +207,54 @@ void SortingStep::transformPipeline(QueryPipelineBuilder & pipeline, const Build
return std::make_shared<PartialSortingTransform>(header, result_description, limit); return std::make_shared<PartialSortingTransform>(header, result_description, limit);
}); });
StreamLocalLimits limits;
limits.mode = LimitsMode::LIMITS_CURRENT; //-V1048
limits.size_limits = size_limits;
pipeline.addSimpleTransform(
[&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
{
if (stream_type != QueryPipelineBuilder::StreamType::Main)
return nullptr;
return std::make_shared<LimitsCheckingTransform>(header, limits);
});
bool increase_sort_description_compile_attempts = true; bool increase_sort_description_compile_attempts = true;
/// NOTE limits are not applied to the size of temporary sets in FinishSortingTransform
pipeline.addSimpleTransform( pipeline.addSimpleTransform(
[&, increase_sort_description_compile_attempts](const Block & header) mutable -> ProcessorPtr [&, increase_sort_description_compile_attempts](
const Block & header, QueryPipelineBuilder::StreamType stream_type) mutable -> ProcessorPtr
{ {
/** For multiple FinishSortingTransform we need to count identical comparators only once per QueryPlan if (stream_type == QueryPipelineBuilder::StreamType::Totals)
* To property support min_count_to_compile_sort_description. return nullptr;
*/
/** For multiple FinishSortingTransform we need to count identical comparators only once per QueryPlan.
* To property support min_count_to_compile_sort_description.
*/
bool increase_sort_description_compile_attempts_current = increase_sort_description_compile_attempts; bool increase_sort_description_compile_attempts_current = increase_sort_description_compile_attempts;
if (increase_sort_description_compile_attempts) if (increase_sort_description_compile_attempts)
increase_sort_description_compile_attempts = false; increase_sort_description_compile_attempts = false;
return std::make_shared<FinishSortingTransform>( return std::make_shared<MergeSortingTransform>(
header, header,
prefix_description,
result_description, result_description,
max_block_size, max_block_size,
limit, limit,
increase_sort_description_compile_attempts_current); increase_sort_description_compile_attempts_current,
});
return;
}
/// Full sorting
{
pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
{
if (stream_type != QueryPipelineBuilder::StreamType::Main)
return nullptr;
return std::make_shared<PartialSortingTransform>(header, result_description, limit);
});
StreamLocalLimits limits;
limits.mode = LimitsMode::LIMITS_CURRENT; //-V1048
limits.size_limits = size_limits;
pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
{
if (stream_type != QueryPipelineBuilder::StreamType::Main)
return nullptr;
return std::make_shared<LimitsCheckingTransform>(header, limits);
});
bool increase_sort_description_compile_attempts = true;
pipeline.addSimpleTransform([&, increase_sort_description_compile_attempts](const Block & header, QueryPipelineBuilder::StreamType stream_type) mutable -> ProcessorPtr
{
if (stream_type == QueryPipelineBuilder::StreamType::Totals)
return nullptr;
/** For multiple FinishSortingTransform we need to count identical comparators only once per QueryPlan.
* To property support min_count_to_compile_sort_description.
*/
bool increase_sort_description_compile_attempts_current = increase_sort_description_compile_attempts;
if (increase_sort_description_compile_attempts)
increase_sort_description_compile_attempts = false;
return std::make_shared<MergeSortingTransform>(
header, result_description, max_block_size, limit, increase_sort_description_compile_attempts_current,
max_bytes_before_remerge / pipeline.getNumStreams(), max_bytes_before_remerge / pipeline.getNumStreams(),
remerge_lowered_memory_bytes_ratio, remerge_lowered_memory_bytes_ratio,
max_bytes_before_external_sort, max_bytes_before_external_sort,
tmp_volume, tmp_volume,
min_free_disk_space); min_free_disk_space);
}); });
/// If there are several streams, then we merge them into one /// If there are several streams, then we merge them into one
if (pipeline.getNumStreams() > 1) if (pipeline.getNumStreams() > 1)
{ {
auto transform = std::make_shared<MergingSortedTransform>( auto transform = std::make_shared<MergingSortedTransform>(
pipeline.getHeader(), pipeline.getHeader(), pipeline.getNumStreams(), result_description, max_block_size, SortingQueueStrategy::Batch, limit);
pipeline.getNumStreams(),
result_description,
max_block_size,
SortingQueueStrategy::Batch,
limit);
pipeline.addTransform(std::move(transform)); pipeline.addTransform(std::move(transform));
} }

View File

@ -54,17 +54,20 @@ public:
private: private:
void updateOutputStream() override; void updateOutputStream() override;
// enum class Type void finishSorting(QueryPipelineBuilder & pipeline);
// { void mergingSorted(QueryPipelineBuilder & pipeline, const SortDescription & sort_desc, UInt64 limit_);
// Full,
// FinishSorting,
// MergingSorted,
// };
// Type type; enum class Type
{
Full,
FinishSorting,
MergingSorted,
};
SortDescription prefix_description; Type type;
SortDescription result_description;
const SortDescription prefix_description;
const SortDescription result_description;
size_t max_block_size; size_t max_block_size;
UInt64 limit; UInt64 limit;
SizeLimits size_limits; SizeLimits size_limits;