Fix review comments

This commit is contained in:
Igor Nikonov 2022-08-08 14:43:03 +00:00
parent 86c5280db3
commit 15bdeba074
4 changed files with 26 additions and 46 deletions

View File

@ -44,7 +44,10 @@ void SortColumnDescription::explain(JSONBuilder::JSONMap & map) const
bool SortDescription::hasPrefix(const SortDescription & prefix) const bool SortDescription::hasPrefix(const SortDescription & prefix) const
{ {
if (prefix.empty() || prefix.size() > size()) if (prefix.empty())
return true;
if (prefix.size() > size())
return false; return false;
for (size_t i = 0; i < prefix.size(); ++i) for (size_t i = 0; i < prefix.size(); ++i)

View File

@ -12,12 +12,6 @@
namespace DB namespace DB
{ {
static Poco::Logger * getLogger()
{
static Poco::Logger & logger = Poco::Logger::get("ExpressionStep");
return &logger;
}
static ITransformingStep::Traits getTraits(const ActionsDAGPtr & actions, const Block & header, const SortDescription & sort_description) static ITransformingStep::Traits getTraits(const ActionsDAGPtr & actions, const Block & header, const SortDescription & sort_description)
{ {
return ITransformingStep::Traits return ITransformingStep::Traits
@ -41,8 +35,6 @@ ExpressionStep::ExpressionStep(const DataStream & input_stream_, const ActionsDA
getTraits(actions_dag_, input_stream_.header, input_stream_.sort_description)) getTraits(actions_dag_, input_stream_.header, input_stream_.sort_description))
, actions_dag(actions_dag_) , actions_dag(actions_dag_)
{ {
LOG_DEBUG(getLogger(), "ActionsDAG:\n{}", actions_dag->dumpDAG());
/// Some columns may be removed by expression. /// Some columns may be removed by expression.
updateDistinctColumns(output_stream->header, output_stream->distinct_columns); updateDistinctColumns(output_stream->header, output_stream->distinct_columns);
} }

View File

@ -41,7 +41,7 @@ SortingStep::SortingStep(
size_t min_free_disk_space_, size_t min_free_disk_space_,
bool optimize_sorting_for_input_stream_) bool optimize_sorting_for_input_stream_)
: ITransformingStep(input_stream, input_stream.header, getTraits(limit_)) : ITransformingStep(input_stream, input_stream.header, getTraits(limit_))
, type(Type::Auto) , type(Type::Full)
, result_description(description_) , result_description(description_)
, max_block_size(max_block_size_) , max_block_size(max_block_size_)
, limit(limit_) , limit(limit_)
@ -233,38 +233,10 @@ void SortingStep::fullSort(QueryPipelineBuilder & pipeline, const SortDescriptio
} }
} }
static Poco::Logger * getLogger()
{
static Poco::Logger & logger = Poco::Logger::get("SortingStep");
return &logger;
}
void SortingStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) void SortingStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{ {
const auto input_sort_mode = input_streams.front().sort_mode; /// we consider that a caller has more information what sorting to apply (depends on what constructor was used)
const SortDescription & input_sort_desc = input_streams.front().sort_description; /// so we'll try to infer what sorting to use only in case of Full sorting
// dump sorting info
LOG_DEBUG(getLogger(), "Sort type : {}", type);
LOG_DEBUG(getLogger(), "Sort mode : {}", input_sort_mode);
LOG_DEBUG(getLogger(), "Input ({}): {}", input_sort_desc.size(), dumpSortDescription(input_sort_desc));
LOG_DEBUG(getLogger(), "Prefix({}): {}", prefix_description.size(), dumpSortDescription(prefix_description));
LOG_DEBUG(getLogger(), "Result({}): {}", result_description.size(), dumpSortDescription(result_description));
if (optimize_sorting_for_input_stream)
{
if (input_sort_mode == DataStream::SortMode::Stream && input_sort_desc.hasPrefix(result_description))
return;
/// merge sorted
if (input_sort_mode == DataStream::SortMode::Port && input_sort_desc.hasPrefix(result_description))
{
LOG_DEBUG(getLogger(), "MergingSorted, SortMode::Port");
mergingSorted(pipeline, result_description, limit);
return;
}
}
if (type == Type::MergingSorted) if (type == Type::MergingSorted)
{ {
mergingSorted(pipeline, result_description, limit); mergingSorted(pipeline, result_description, limit);
@ -282,18 +254,31 @@ void SortingStep::transformPipeline(QueryPipelineBuilder & pipeline, const Build
return; return;
} }
if (optimize_sorting_for_input_stream && input_sort_mode == DataStream::SortMode::Chunk) const auto input_sort_mode = input_streams.front().sort_mode;
const SortDescription & input_sort_desc = input_streams.front().sort_description;
if (optimize_sorting_for_input_stream)
{ {
if (input_sort_desc.hasPrefix(result_description)) /// skip sorting if stream is already sorted
if (input_sort_mode == DataStream::SortMode::Stream && input_sort_desc.hasPrefix(result_description))
return;
/// merge sorted
if (input_sort_mode == DataStream::SortMode::Port && input_sort_desc.hasPrefix(result_description))
{
type = Type::MergingSorted;
mergingSorted(pipeline, result_description, limit);
return;
}
/// if chunks already sorted according to result_sort_desc, then we can skip chunk sorting
if (input_sort_mode == DataStream::SortMode::Chunk && input_sort_desc.hasPrefix(result_description))
{ {
LOG_DEBUG(getLogger(), "Almost FullSort");
const bool skip_partial_sort = true; const bool skip_partial_sort = true;
fullSort(pipeline, result_description, limit, skip_partial_sort); fullSort(pipeline, result_description, limit, skip_partial_sort);
return; return;
} }
} }
LOG_DEBUG(getLogger(), "FullSort");
fullSort(pipeline, result_description, limit); fullSort(pipeline, result_description, limit);
} }

View File

@ -65,11 +65,11 @@ private:
QueryPipelineBuilder & pipeline, QueryPipelineBuilder & pipeline,
const SortDescription & result_sort_desc, const SortDescription & result_sort_desc,
UInt64 limit_, UInt64 limit_,
bool skip_partial_sort = false); /// if chunks already sorted according to result_sort_desc, then skip chunk sorting bool skip_partial_sort = false);
enum class Type enum class Type
{ {
Auto, Full,
FinishSorting, FinishSorting,
MergingSorted, MergingSorted,
}; };