This commit is contained in:
Vladimir Chebotarev 2021-11-02 18:40:37 +03:00
parent 1a67740cd3
commit ec22f6d539
13 changed files with 240 additions and 6 deletions

View File

@ -406,6 +406,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, parallel_view_processing, false, "Enables pushing to attached views concurrently instead of sequentially.", 0) \
M(Bool, enable_unaligned_array_join, false, "Allow ARRAY JOIN with multiple arrays that have different sizes. When this settings is enabled, arrays will be resized to the longest one.", 0) \
M(Bool, optimize_read_in_order, true, "Enable ORDER BY optimization for reading data in corresponding order in MergeTree tables.", 0) \
M(Bool, optimize_read_in_window_order, true, "Enable ORDER BY optimization in window clause for reading data in corresponding order in MergeTree tables.", 0) \
M(Bool, optimize_aggregation_in_order, false, "Enable GROUP BY optimization for aggregating data in corresponding order in MergeTree tables.", 0) \
M(UInt64, aggregation_in_order_max_block_bytes, 50000000, "Maximal size of block in bytes accumulated during aggregation in order of primary key. Lower block size allows to parallelize more final merge stage of aggregation.", 0) \
M(UInt64, read_in_order_two_level_merge_threshold, 100, "Minimal number of parts to read to run preliminary merge step during multithread reading in order of primary key.", 0) \

View File

@ -615,6 +615,15 @@ void ExpressionAnalyzer::getRootActions(const ASTPtr & ast, bool no_makeset_for_
actions = visitor_data.getActions();
}
const NamesAndTypesList & ExpressionAnalyzer::getColumnsAfterWindow() const
{
return columns_after_window;
}
const NamesAndTypesList & ExpressionAnalyzer::getColumnsAfterJoin() const
{
return columns_after_join;
}
void ExpressionAnalyzer::getRootActionsNoMakeSet(const ASTPtr & ast, ActionsDAGPtr & actions, bool only_consts)
{

View File

@ -181,8 +181,12 @@ protected:
ArrayJoinActionPtr addMultipleArrayJoinAction(ActionsDAGPtr & actions, bool is_left) const;
public:
void getRootActions(const ASTPtr & ast, bool no_makeset_for_subqueries, ActionsDAGPtr & actions, bool only_consts = false);
const NamesAndTypesList & getColumnsAfterWindow() const;
const NamesAndTypesList & getColumnsAfterJoin() const;
protected:
/** Similar to getRootActions but do not make sets when analyzing IN functions. It's used in
* analyzeAggregation which happens earlier than analyzing PREWHERE and WHERE. If we did, the
* prepared sets would not be applicable for MergeTree index optimization.

View File

@ -886,7 +886,7 @@ static FillColumnDescription getWithFillDescription(const ASTOrderByElement & or
return descr;
}
static SortDescription getSortDescription(const ASTSelectQuery & query, ContextPtr context)
SortDescription InterpreterSelectQuery::getSortDescription(const ASTSelectQuery & query, ContextPtr context_)
{
SortDescription order_descr;
order_descr.reserve(query.orderBy()->children.size());
@ -900,7 +900,7 @@ static SortDescription getSortDescription(const ASTSelectQuery & query, ContextP
collator = std::make_shared<Collator>(order_by_elem.collation->as<ASTLiteral &>().value.get<String>());
if (order_by_elem.with_fill)
{
FillColumnDescription fill_desc = getWithFillDescription(order_by_elem, context);
FillColumnDescription fill_desc = getWithFillDescription(order_by_elem, context_);
order_descr.emplace_back(name, order_by_elem.direction, order_by_elem.nulls_direction, collator, true, fill_desc);
}
else
@ -1033,12 +1033,12 @@ static std::pair<UInt64, UInt64> getLimitLengthAndOffset(const ASTSelectQuery &
}
static UInt64 getLimitForSorting(const ASTSelectQuery & query, ContextPtr context)
UInt64 InterpreterSelectQuery::getLimitForSorting(const ASTSelectQuery & query, ContextPtr context_)
{
/// Partial sort can be done if there is LIMIT but no DISTINCT or LIMIT BY, neither ARRAY JOIN.
if (!query.distinct && !query.limitBy() && !query.limit_with_ties && !query.arrayJoinExpressionList().first && query.limitLength())
{
auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, context);
auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, context_);
if (limit_length > std::numeric_limits<UInt64>::max() - limit_offset)
return 0;

View File

@ -128,6 +128,9 @@ public:
/// It will set shard_num and shard_count to the client_info
void setProperClientInfo(size_t replica_num, size_t replica_count);
static SortDescription getSortDescription(const ASTSelectQuery & query, ContextPtr context);
static UInt64 getLimitForSorting(const ASTSelectQuery & query, ContextPtr context);
private:
InterpreterSelectQuery(
const ASTPtr & query_ptr_,

View File

@ -48,15 +48,19 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes
/// May split ExpressionStep and lift up only a part of it.
size_t tryExecuteFunctionsAfterSorting(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes);
/// FIXME
size_t tryReuseStorageOrdering(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes);
inline const auto & getOptimizations()
{
static const std::array<Optimization, 6> optimizations = {{
static const std::array<Optimization, 7> optimizations = {{
{tryLiftUpArrayJoin, "liftUpArrayJoin", &QueryPlanOptimizationSettings::optimize_plan},
{tryPushDownLimit, "pushDownLimit", &QueryPlanOptimizationSettings::optimize_plan},
{trySplitFilter, "splitFilter", &QueryPlanOptimizationSettings::optimize_plan},
{tryMergeExpressions, "mergeExpressions", &QueryPlanOptimizationSettings::optimize_plan},
{tryPushDownFilter, "pushDownFilter", &QueryPlanOptimizationSettings::filter_push_down},
{tryExecuteFunctionsAfterSorting, "liftUpFunctions", &QueryPlanOptimizationSettings::optimize_plan},
{tryReuseStorageOrdering, "reuseStorageOrdering", &QueryPlanOptimizationSettings::optimize_plan}
}};
return optimizations;

View File

@ -0,0 +1,161 @@
#include <Parsers/ASTWindowDefinition.h>
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/JoinStep.h>
#include <Processors/QueryPlan/ArrayJoinStep.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
#include <Processors/QueryPlan/CubeStep.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <Processors/QueryPlan/SettingQuotaAndLimitsStep.h>
#include <Processors/QueryPlan/SortingStep.h>
#include <Processors/QueryPlan/TotalsHavingStep.h>
#include <Processors/QueryPlan/DistinctStep.h>
#include <Processors/QueryPlan/UnionStep.h>
#include <Processors/QueryPlan/WindowStep.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/ArrayJoinAction.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/TableJoin.h>
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <Columns/IColumn.h>
#include <base/logger_useful.h>
namespace DB::ErrorCodes
{
extern const int LOGICAL_ERROR;
}
namespace DB::QueryPlanOptimizations
{
size_t tryReuseStorageOrdering(QueryPlan::Node * parent_node, QueryPlan::Nodes & /*nodes*/)
{
auto log = &Poco::Logger::get("tryReuseStorageOrdering()");
auto * window_node = parent_node;
auto * window = typeid_cast<WindowStep *>(window_node->step.get());
if (!window)
return 0;
if (window_node->children.size() != 1)
return 0;
auto * sorting_node = window_node->children.front();
auto * sorting = typeid_cast<SortingStep *>(sorting_node->step.get());
if (!sorting)
return 0;
if (sorting_node->children.size() != 1)
return 0;
auto * possible_read_from_merge_tree_node = sorting_node->children.front();
auto * expression = typeid_cast<ExpressionStep *>(possible_read_from_merge_tree_node->step.get());
if (expression)
{
if (possible_read_from_merge_tree_node->children.size() != 1)
return 0;
possible_read_from_merge_tree_node = possible_read_from_merge_tree_node->children.front();
}
auto * quota_and_limits = typeid_cast<SettingQuotaAndLimitsStep *>(possible_read_from_merge_tree_node->step.get());
if (quota_and_limits)
{
if (possible_read_from_merge_tree_node->children.size() != 1)
return 0;
possible_read_from_merge_tree_node = possible_read_from_merge_tree_node->children.front();
}
auto * read_from_merge_tree = typeid_cast<ReadFromMergeTree *>(possible_read_from_merge_tree_node->step.get());
if (!read_from_merge_tree)
{
return 0;
}
/// Window <- Sorting <- [Expression] <- [SettingQuotaAndLimits] <- ReadFromMergeTree
auto context = read_from_merge_tree->getContext();
if (!context->getSettings().optimize_read_in_window_order)
{
return 0;
}
const auto & query_info = read_from_merge_tree->getQueryInfo();
const auto * select_query = query_info.query->as<ASTSelectQuery>();
auto analyzer = std::make_unique<ExpressionAnalyzer>(query_info.query, query_info.syntax_analyzer_result, context);
if (select_query->window()) // FIXME
{
LOG_DEBUG(log, "Query has window");
ManyExpressionActions order_by_elements_actions;
auto process_children = [&](ASTPtr node)
{
for (const auto & column_ast : node->children)
{
LOG_DEBUG(log, "After join: {}", analyzer->getColumnsAfterJoin().toString());
LOG_DEBUG(log, "After window: {}", analyzer->getColumnsAfterWindow().toString());
auto actions_dag = std::make_shared<ActionsDAG>(analyzer->getColumnsAfterJoin());
analyzer->getRootActions(column_ast, false, actions_dag);
order_by_elements_actions.emplace_back(
std::make_shared<ExpressionActions>(actions_dag, ExpressionActionsSettings::fromContext(context, CompileExpressions::yes)));
}
};
for (const auto & ptr : select_query->window()->children)
{
const auto & elem = ptr->as<const ASTWindowListElement &>();
auto ast = elem.definition.get();
const auto & definition = ast->as<const ASTWindowDefinition &>();
if (definition.partition_by)
{
process_children(definition.partition_by);
}
if (definition.order_by)
{
process_children(definition.order_by);
}
}
auto order_optimizer = std::make_shared<ReadInOrderOptimizer>(
*select_query,
order_by_elements_actions,
//InterpreterSelectQuery::getSortDescription(*select_query, context),
window->getSortDescription(),
query_info.syntax_analyzer_result);
read_from_merge_tree->setQueryInfoOrderOptimizer(order_optimizer);
LOG_DEBUG(log, "Order optimizer is set");
/// If we don't have filtration, we can pushdown limit to reading stage for optimizations.
UInt64 limit = (select_query->hasFiltration() || select_query->groupBy()) ? 0 : InterpreterSelectQuery::getLimitForSorting(*select_query, context);
auto order_info = order_optimizer->getInputOrder(
query_info.projection ? query_info.projection->desc->metadata : read_from_merge_tree->getStorageMetadata(),
context,
limit);
read_from_merge_tree->setQueryInfoInputOrderInfo(order_info);
/// FIXME Window+Sorting may repeat few times.
sorting->convertToFinishSorting(order_info->order_key_prefix_descr);
LOG_DEBUG(log, "Input order info is set");
}
else
{
LOG_DEBUG(log, "Query has no window");
}
return 0;
}
}

View File

@ -982,6 +982,30 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
return std::make_shared<MergeTreeDataSelectAnalysisResult>(MergeTreeDataSelectAnalysisResult{.result = std::move(result)});
}
void ReadFromMergeTree::setQueryInfoOrderOptimizer(std::shared_ptr<ReadInOrderOptimizer> order_optimizer)
{
if (query_info.projection)
{
query_info.projection->order_optimizer = order_optimizer;
}
else
{
query_info.order_optimizer = order_optimizer;
}
}
void ReadFromMergeTree::setQueryInfoInputOrderInfo(InputOrderInfoPtr order_info)
{
if (query_info.projection)
{
query_info.projection->input_order_info = order_info;
}
else
{
query_info.input_order_info = order_info;
}
}
ReadFromMergeTree::AnalysisResult ReadFromMergeTree::getAnalysisResult() const
{
auto result_ptr = analyzed_result_ptr ? analyzed_result_ptr : selectRangesToRead(prepared_parts);
@ -1048,6 +1072,8 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons
if (select.final())
{
LOG_DEBUG(log, "Select is final");
/// Add columns needed to calculate the sorting expression and the sign.
std::vector<String> add_columns = metadata_for_reading->getColumnsRequiredForSortingKey();
column_names_to_read.insert(column_names_to_read.end(), add_columns.begin(), add_columns.end());
@ -1065,8 +1091,10 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons
column_names_to_read,
result_projection);
}
else if ((settings.optimize_read_in_order || settings.optimize_aggregation_in_order) && input_order_info)
else if ((settings.optimize_read_in_order || settings.optimize_aggregation_in_order || settings.optimize_read_in_window_order) && input_order_info)
{
LOG_DEBUG(log, "Input order info is present");
pipe = spreadMarkRangesAmongStreamsWithOrder(
std::move(result.parts_with_ranges),
column_names_to_read,
@ -1075,6 +1103,8 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons
}
else
{
LOG_DEBUG(log, "Something went wrong");
pipe = spreadMarkRangesAmongStreams(
std::move(result.parts_with_ranges),
column_names_to_read);

View File

@ -129,6 +129,13 @@ public:
bool sample_factor_column_queried,
Poco::Logger * log);
ContextPtr getContext() const { return context; }
const SelectQueryInfo & getQueryInfo() const { return query_info; }
StorageMetadataPtr getStorageMetadata() const { return metadata_for_reading; }
void setQueryInfoOrderOptimizer(std::shared_ptr<ReadInOrderOptimizer> read_in_order_optimizer);
void setQueryInfoInputOrderInfo(InputOrderInfoPtr order_info);
private:
const MergeTreeReaderSettings reader_settings;

View File

@ -112,6 +112,12 @@ void SortingStep::updateLimit(size_t limit_)
}
}
void SortingStep::convertToFinishSorting(SortDescription prefix_description_)
{
type = Type::FinishSorting;
prefix_description = std::move(prefix_description_);
}
void SortingStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
if (type == Type::FinishSorting)

View File

@ -54,6 +54,8 @@ public:
SortDescription getSortDescription() const { return result_description; }
void convertToFinishSorting(SortDescription prefix_description);
private:
enum class Type

View File

@ -138,4 +138,9 @@ void WindowStep::describeActions(JSONBuilder::JSONMap & map) const
map.add("Functions", std::move(functions_array));
}
const SortDescription & WindowStep::getSortDescription() const
{
return window_description.full_sort_description;
}
}

View File

@ -25,6 +25,8 @@ public:
void describeActions(JSONBuilder::JSONMap & map) const override;
void describeActions(FormatSettings & settings) const override;
const SortDescription & getSortDescription() const;
private:
WindowDescription window_description;
std::vector<WindowFunctionDescription> window_functions;