Merge pull request #45364 from nickitat/aggr_partitions_independently

Add option to aggregate partitions independently
This commit is contained in:
Alexey Milovidov 2023-02-19 17:44:18 +03:00 committed by GitHub
commit 17992b178a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 1667 additions and 409 deletions

View File

@ -7,7 +7,7 @@ sidebar_label: Custom Partitioning Key
# Custom Partitioning Key
:::warning
In most cases you do not need a partition key, and in most other cases you do not need a partition key more granular than by months. Partitioning does not speed up queries (in contrast to the ORDER BY expression).
In most cases you do not need a partition key, and in most other cases you do not need a partition key more granular than by months.
You should never use too granular of partitioning. Don't partition your data by client identifiers or names. Instead, make a client identifier or name the first column in the ORDER BY expression.
:::
@ -133,3 +133,48 @@ The `detached` directory contains parts that were detached from the table using
Note that on the operating server, you cannot manually change the set of parts or their data on the file system, since the server will not know about it. For non-replicated tables, you can do this when the server is stopped, but it isnt recommended. For replicated tables, the set of parts cannot be changed in any case.
ClickHouse allows you to perform operations with the partitions: delete them, copy from one table to another, or create a backup. See the list of all operations in the section [Manipulations With Partitions and Parts](../../../sql-reference/statements/alter/partition.md#alter_manipulations-with-partitions).
## Group By optimisation using partition key
For some combinations of table's partition key and query's group by key it might be possible to execute aggregation for each partition independently.
Then we'll not have to merge partially aggregated data from all execution threads at the end,
because we provided with the guarantee that each group by key value cannot appear in working sets of two different threads.
The typical example is:
``` sql
CREATE TABLE session_log
(
UserID UInt64,
SessionID UUID
)
ENGINE = MergeTree
PARTITION BY sipHash64(UserID) % 16
ORDER BY tuple();
SELECT
UserID,
COUNT()
FROM session_log
GROUP BY UserID;
```
:::warning
Performance of such a query heavily depends on the table layout. Because of that the optimisation is not enabled by default.
:::
The key factors for a good performance:
- number of partitions involved in the query should be sufficiently large (more than `max_threads / 2`), otherwise query will underutilize the machine
- partitions shouldn't be too small, so batch processing won't degenerate into row-by-row processing
- partitions should be comparable in size, so all threads will do roughly the same amount of work
:::info
It's recommended to apply some hash function to columns in `partition by` clause in order to distribute data evenly between partitions.
:::
Relevant settings are:
- `allow_aggregate_partitions_independently` - controls if the use of optimisation is enabled
- `force_aggregate_partitions_independently` - forces its use when it's applicable from the correctness standpoint, but getting disabled by internal logic that estimates its expediency
- `max_number_of_partitions_for_independent_aggregation` - hard limit on the maximal number of partitions table could have

View File

@ -561,6 +561,9 @@ class IColumn;
\
M(Bool, kafka_disable_num_consumers_limit, false, "Disable limit on kafka_num_consumers that depends on the number of available CPU cores", 0) \
M(Bool, enable_software_prefetch_in_aggregation, true, "Enable use of software prefetch in aggregation", 0) \
M(Bool, allow_aggregate_partitions_independently, false, "Enable independent aggregation of partitions on separate threads when partition key suits group by key. Beneficial when number of partitions close to number of cores and partitions have roughly the same size", 0) \
M(Bool, force_aggregate_partitions_independently, false, "Force the use of optimization when it is applicable, but heuristics decided not to use it", 0) \
M(UInt64, max_number_of_partitions_for_independent_aggregation, 128, "Maximal number of partitions in table to apply optimization", 0) \
/** Experimental feature for moving data between shards. */ \
\
M(Bool, allow_experimental_query_deduplication, false, "Experimental data deduplication for SELECT queries based on part UUIDs", 0) \

View File

@ -1055,6 +1055,14 @@ void ActionsDAG::assertDeterministic() const
"Expression must be deterministic but it contains non-deterministic part `{}`", node.result_name);
}
bool ActionsDAG::hasNonDeterministic() const
{
for (const auto & node : nodes)
if (!node.is_deterministic)
return true;
return false;
}
void ActionsDAG::addMaterializingOutputActions()
{
for (auto & output_node : outputs)

View File

@ -225,6 +225,7 @@ public:
bool hasStatefulFunctions() const;
bool trivial() const; /// If actions has no functions or array join.
void assertDeterministic() const; /// Throw if not isDeterministic.
bool hasNonDeterministic() const;
#if USE_EMBEDDED_COMPILER
void compileExpressions(size_t min_count_to_compile_expression, const std::unordered_set<const Node *> & lazy_executed_nodes = {});

View File

@ -1695,6 +1695,21 @@ Block Aggregator::mergeAndConvertOneBucketToBlock(
return block;
}
Block Aggregator::convertOneBucketToBlock(AggregatedDataVariants & variants, Arena * arena, bool final, Int32 bucket) const
{
const auto method = variants.type;
Block block;
if (false) {} // NOLINT
#define M(NAME) \
else if (method == AggregatedDataVariants::Type::NAME) \
block = convertOneBucketToBlock(variants, *variants.NAME, arena, final, bucket); \
APPLY_FOR_VARIANTS_TWO_LEVEL(M)
#undef M
return block;
}
template <typename Method>
void Aggregator::writeToTemporaryFileImpl(

View File

@ -1088,6 +1088,7 @@ private:
friend struct AggregatedDataVariants;
friend class ConvertingAggregatedToChunksTransform;
friend class ConvertingAggregatedToChunksSource;
friend class ConvertingAggregatedToChunksWithMergingSource;
friend class AggregatingInOrderTransform;
/// Data structure of source blocks.
@ -1307,6 +1308,8 @@ private:
bool final,
Int32 bucket) const;
Block convertOneBucketToBlock(AggregatedDataVariants & variants, Arena * arena, bool final, Int32 bucket) const;
Block mergeAndConvertOneBucketToBlock(
ManyAggregatedDataVariants & variants,
Arena * arena,

View File

@ -234,7 +234,15 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
auto many_data = std::make_shared<ManyAggregatedData>(streams);
for (size_t j = 0; j < streams; ++j)
{
auto aggregation_for_set = std::make_shared<AggregatingTransform>(input_header, transform_params_for_set, many_data, j, merge_threads, temporary_data_merge_threads);
auto aggregation_for_set = std::make_shared<AggregatingTransform>(
input_header,
transform_params_for_set,
many_data,
j,
merge_threads,
temporary_data_merge_threads,
should_produce_results_in_order_of_bucket_number,
skip_merging);
// For each input stream we have `grouping_sets_size` copies, so port index
// for transform #j should skip ports of first (j-1) streams.
connect(*ports[i + grouping_sets_size * j], aggregation_for_set->getInputs().front());
@ -371,6 +379,15 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
many_data, counter++);
});
if (skip_merging)
{
pipeline.addSimpleTransform([&](const Block & header)
{ return std::make_shared<FinalizeAggregatedTransform>(header, transform_params); });
pipeline.resize(params.max_threads);
aggregating_in_order = collector.detachProcessors(0);
return;
}
aggregating_in_order = collector.detachProcessors(0);
auto transform = std::make_shared<FinishAggregatingInOrderTransform>(
@ -425,16 +442,26 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
if (pipeline.getNumStreams() > 1)
{
/// Add resize transform to uniformly distribute data between aggregating streams.
if (!storage_has_evenly_distributed_read)
/// But not if we execute aggregation over partitioned data in which case data streams shouldn't be mixed.
if (!storage_has_evenly_distributed_read && !skip_merging)
pipeline.resize(pipeline.getNumStreams(), true, true);
auto many_data = std::make_shared<ManyAggregatedData>(pipeline.getNumStreams());
size_t counter = 0;
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<AggregatingTransform>(header, transform_params, many_data, counter++, merge_threads, temporary_data_merge_threads);
});
pipeline.addSimpleTransform(
[&](const Block & header)
{
return std::make_shared<AggregatingTransform>(
header,
transform_params,
many_data,
counter++,
merge_threads,
temporary_data_merge_threads,
should_produce_results_in_order_of_bucket_number,
skip_merging);
});
pipeline.resize(should_produce_results_in_order_of_bucket_number ? 1 : params.max_threads, true /* force */);
@ -453,11 +480,12 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
void AggregatingStep::describeActions(FormatSettings & settings) const
{
params.explain(settings.out, settings.offset);
String prefix(settings.offset, settings.indent_char);
if (!sort_description_for_merging.empty())
{
String prefix(settings.offset, settings.indent_char);
settings.out << prefix << "Order: " << dumpSortDescription(sort_description_for_merging) << '\n';
}
settings.out << prefix << "Skip merging: " << skip_merging << '\n';
}
void AggregatingStep::describeActions(JSONBuilder::JSONMap & map) const
@ -465,6 +493,7 @@ void AggregatingStep::describeActions(JSONBuilder::JSONMap & map) const
params.explain(map);
if (!sort_description_for_merging.empty())
map.add("Order", dumpSortDescription(sort_description_for_merging));
map.add("Skip merging", skip_merging);
}
void AggregatingStep::describePipeline(FormatSettings & settings) const

View File

@ -61,6 +61,7 @@ public:
bool isGroupingSets() const { return !grouping_sets_params.empty(); }
void applyOrder(SortDescription sort_description_for_merging_, SortDescription group_by_sort_description_);
bool memoryBoundMergingWillBeUsed() const;
void skipMerging() { skip_merging = true; }
private:
void updateOutputStream() override;
@ -72,6 +73,7 @@ private:
size_t aggregation_in_order_max_block_bytes;
size_t merge_threads;
size_t temporary_data_merge_threads;
bool skip_merging = false; // if we aggregate partitioned data merging is not needed
bool storage_has_evenly_distributed_read;
bool group_by_use_nulls;
@ -84,7 +86,7 @@ private:
SortDescription group_by_sort_description;
/// These settings are used to determine if we should resize pipeline to 1 at the end.
bool should_produce_results_in_order_of_bucket_number;
const bool should_produce_results_in_order_of_bucket_number;
bool memory_bound_merging_of_aggregation_results_enabled;
bool explicit_sorting_required_for_aggregation_in_order;

View File

@ -71,17 +71,24 @@ size_t tryRemoveRedundantDistinct(QueryPlan::Node * parent_node, QueryPlan::Node
/// - Something - - Expression - Something -
size_t tryLiftUpUnion(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes);
size_t tryAggregatePartitionsIndependently(QueryPlan::Node * node, QueryPlan::Nodes &);
inline const auto & getOptimizations()
{
static const std::array<Optimization, 9> optimizations = {{
static const std::array<Optimization, 10> optimizations = {{
{tryLiftUpArrayJoin, "liftUpArrayJoin", &QueryPlanOptimizationSettings::optimize_plan},
{tryPushDownLimit, "pushDownLimit", &QueryPlanOptimizationSettings::optimize_plan},
{trySplitFilter, "splitFilter", &QueryPlanOptimizationSettings::optimize_plan},
{tryMergeExpressions, "mergeExpressions", &QueryPlanOptimizationSettings::optimize_plan},
{tryPushDownFilter, "pushDownFilter", &QueryPlanOptimizationSettings::filter_push_down},
{tryExecuteFunctionsAfterSorting, "liftUpFunctions", &QueryPlanOptimizationSettings::optimize_plan},
{tryReuseStorageOrderingForWindowFunctions, "reuseStorageOrderingForWindowFunctions", &QueryPlanOptimizationSettings::optimize_plan},
{tryReuseStorageOrderingForWindowFunctions,
"reuseStorageOrderingForWindowFunctions",
&QueryPlanOptimizationSettings::optimize_plan},
{tryLiftUpUnion, "liftUpUnion", &QueryPlanOptimizationSettings::optimize_plan},
{tryAggregatePartitionsIndependently,
"aggregatePartitionsIndependently",
&QueryPlanOptimizationSettings::aggregate_partitions_independently},
{tryRemoveRedundantDistinct, "removeRedundantDistinct", &QueryPlanOptimizationSettings::remove_redundant_distinct},
}};

View File

@ -15,6 +15,7 @@ QueryPlanOptimizationSettings QueryPlanOptimizationSettings::fromSettings(const
settings.read_in_order = from.optimize_read_in_order && from.query_plan_read_in_order;
settings.aggregation_in_order = from.optimize_aggregation_in_order && from.query_plan_aggregation_in_order;
settings.remove_redundant_sorting = from.query_plan_remove_redundant_sorting;
settings.aggregate_partitions_independently = from.allow_aggregate_partitions_independently;
settings.remove_redundant_distinct = from.query_plan_remove_redundant_distinct;
return settings;
}

View File

@ -33,6 +33,8 @@ struct QueryPlanOptimizationSettings
/// If removing redundant sorting is enabled, for example, ORDER BY clauses in subqueries
bool remove_redundant_sorting = true;
bool aggregate_partitions_independently = false;
/// If removing redundant distinct steps is enabled
bool remove_redundant_distinct = true;

View File

@ -0,0 +1,216 @@
#include <Processors/QueryPlan/Optimizations/actionsDAGUtils.h>
#include <Core/Field.h>
#include <Functions/IFunction.h>
#include <stack>
namespace DB
{
MatchedTrees::Matches matchTrees(const ActionsDAG & inner_dag, const ActionsDAG & outer_dag)
{
using Parents = std::set<const ActionsDAG::Node *>;
std::unordered_map<const ActionsDAG::Node *, Parents> inner_parents;
std::unordered_map<std::string_view, const ActionsDAG::Node *> inner_inputs_and_constants;
{
std::stack<const ActionsDAG::Node *> stack;
for (const auto * out : inner_dag.getOutputs())
{
if (inner_parents.contains(out))
continue;
stack.push(out);
inner_parents.emplace(out, Parents());
while (!stack.empty())
{
const auto * node = stack.top();
stack.pop();
if (node->type == ActionsDAG::ActionType::INPUT || node->type == ActionsDAG::ActionType::COLUMN)
inner_inputs_and_constants.emplace(node->result_name, node);
for (const auto * child : node->children)
{
auto [it, inserted] = inner_parents.emplace(child, Parents());
it->second.emplace(node);
if (inserted)
stack.push(child);
}
}
}
}
struct Frame
{
const ActionsDAG::Node * node;
ActionsDAG::NodeRawConstPtrs mapped_children;
};
MatchedTrees::Matches matches;
std::stack<Frame> stack;
for (const auto & node : outer_dag.getNodes())
{
if (matches.contains(&node))
continue;
stack.push(Frame{&node, {}});
while (!stack.empty())
{
auto & frame = stack.top();
frame.mapped_children.reserve(frame.node->children.size());
while (frame.mapped_children.size() < frame.node->children.size())
{
const auto * child = frame.node->children[frame.mapped_children.size()];
auto it = matches.find(child);
if (it == matches.end())
{
/// If match map does not contain a child, it was not visited.
stack.push(Frame{child, {}});
break;
}
/// A node from found match may be nullptr.
/// It means that node is visited, but no match was found.
frame.mapped_children.push_back(it->second.node);
}
if (frame.mapped_children.size() < frame.node->children.size())
continue;
/// Create an empty match for current node.
/// match.node will be set if match is found.
auto & match = matches[frame.node];
if (frame.node->type == ActionsDAG::ActionType::INPUT || frame.node->type == ActionsDAG::ActionType::COLUMN)
{
const ActionsDAG::Node * mapped = nullptr;
if (auto it = inner_inputs_and_constants.find(frame.node->result_name); it != inner_inputs_and_constants.end())
mapped = it->second;
match.node = mapped;
}
else if (frame.node->type == ActionsDAG::ActionType::ALIAS)
{
match = matches[frame.node->children.at(0)];
}
else if (frame.node->type == ActionsDAG::ActionType::FUNCTION)
{
//std::cerr << "... Processing " << frame.node->function_base->getName() << std::endl;
bool found_all_children = true;
for (const auto * child : frame.mapped_children)
if (!child)
found_all_children = false;
if (found_all_children && !frame.mapped_children.empty())
{
Parents container;
Parents * intersection = &inner_parents[frame.mapped_children[0]];
if (frame.mapped_children.size() > 1)
{
std::vector<Parents *> other_parents;
size_t mapped_children_size = frame.mapped_children.size();
other_parents.reserve(mapped_children_size);
for (size_t i = 1; i < mapped_children_size; ++i)
other_parents.push_back(&inner_parents[frame.mapped_children[i]]);
for (const auto * parent : *intersection)
{
bool is_common = true;
for (const auto * set : other_parents)
{
if (!set->contains(parent))
{
is_common = false;
break;
}
}
if (is_common)
container.insert(parent);
}
intersection = &container;
}
//std::cerr << ".. Candidate parents " << intersection->size() << std::endl;
if (!intersection->empty())
{
auto func_name = frame.node->function_base->getName();
for (const auto * parent : *intersection)
{
//std::cerr << ".. candidate " << parent->result_name << std::endl;
if (parent->type == ActionsDAG::ActionType::FUNCTION && func_name == parent->function_base->getName())
{
const auto & children = parent->children;
size_t num_children = children.size();
if (frame.mapped_children.size() == num_children)
{
bool all_children_matched = true;
for (size_t i = 0; all_children_matched && i < num_children; ++i)
all_children_matched = frame.mapped_children[i] == children[i];
if (all_children_matched)
{
match.node = parent;
break;
}
}
}
}
}
}
if (!match.node && frame.node->function_base->hasInformationAboutMonotonicity())
{
size_t num_const_args = 0;
const ActionsDAG::Node * monotonic_child = nullptr;
for (const auto * child : frame.node->children)
{
if (child->column)
++num_const_args;
else
monotonic_child = child;
}
if (monotonic_child && num_const_args + 1 == frame.node->children.size())
{
const auto & child_match = matches[monotonic_child];
if (child_match.node)
{
auto info = frame.node->function_base->getMonotonicityForRange(*monotonic_child->result_type, {}, {});
if (info.is_monotonic)
{
MatchedTrees::Monotonicity monotonicity;
monotonicity.direction *= info.is_positive ? 1 : -1;
monotonicity.strict = info.is_strict;
if (child_match.monotonicity)
{
monotonicity.direction *= child_match.monotonicity->direction;
if (!child_match.monotonicity->strict)
monotonicity.strict = false;
}
match.node = child_match.node;
match.monotonicity = monotonicity;
}
}
}
}
}
stack.pop();
}
}
return matches;
}
}

View File

@ -0,0 +1,43 @@
#pragma once
#include <Interpreters/ActionsDAG.h>
namespace DB
{
/// This structure stores a node mapping from one DAG to another.
/// The rule is following:
/// * Input nodes are mapped by name.
/// * Function is mapped to function if all children are mapped and function names are same.
/// * Alias is mapped to it's children mapping.
/// * Monotonic function can be mapped to it's children mapping if direct mapping does not exist.
/// In this case, information about monotonicity is filled.
/// * Mapped node is nullptr if there is no mapping found.
///
/// Overall, directly mapped nodes represent equal calculations.
/// Notes:
/// * Mapped DAG can contain many nodes which represent the same calculation.
/// In this case mapping is ambiguous and only one node is mapped.
/// * Aliases for mapped DAG are not supported.
/// DAG for PK does not contain aliases and ambiguous nodes.
struct MatchedTrees
{
/// Monotonicity is calculated for monotonic functions chain.
/// Chain is not strict if there is any non-strict monotonic function.
struct Monotonicity
{
int direction = 1;
bool strict = true;
};
struct Match
{
const ActionsDAG::Node * node = nullptr;
std::optional<Monotonicity> monotonicity;
};
using Matches = std::unordered_map<const ActionsDAG::Node *, Match>;
};
MatchedTrees::Matches matchTrees(const ActionsDAG & inner_dag, const ActionsDAG & outer_dag);
}

View File

@ -1,27 +1,29 @@
#include <Parsers/ASTWindowDefinition.h>
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/JoinStep.h>
#include <Processors/QueryPlan/ArrayJoinStep.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
#include <Processors/QueryPlan/CubeStep.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <Processors/QueryPlan/SortingStep.h>
#include <Processors/QueryPlan/TotalsHavingStep.h>
#include <Processors/QueryPlan/DistinctStep.h>
#include <Processors/QueryPlan/UnionStep.h>
#include <Processors/QueryPlan/WindowStep.h>
#include <Columns/IColumn.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <Functions/IFunction.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/ArrayJoinAction.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/TableJoin.h>
#include <Common/typeid_cast.h>
#include <Parsers/ASTWindowDefinition.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include <Processors/QueryPlan/ArrayJoinStep.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
#include <Processors/QueryPlan/CubeStep.h>
#include <Processors/QueryPlan/DistinctStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Processors/QueryPlan/JoinStep.h>
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
#include <Processors/QueryPlan/Optimizations/actionsDAGUtils.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <Processors/QueryPlan/SortingStep.h>
#include <Processors/QueryPlan/TotalsHavingStep.h>
#include <Processors/QueryPlan/UnionStep.h>
#include <Processors/QueryPlan/WindowStep.h>
#include <Storages/StorageMerge.h>
#include <Functions/IFunction.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <Columns/IColumn.h>
#include <Common/typeid_cast.h>
#include <stack>
@ -318,246 +320,6 @@ void enreachFixedColumns(const ActionsDAG & dag, FixedColumns & fixed_columns)
}
}
/// This structure stores a node mapping from one DAG to another.
/// The rule is following:
/// * Input nodes are mapped by name.
/// * Function is mapped to function if all children are mapped and function names are same.
/// * Alias is mapped to it's children mapping.
/// * Monotonic function can be mapped to it's children mapping if direct mapping does not exist.
/// In this case, information about monotonicity is filled.
/// * Mapped node is nullptr if there is no mapping found.
///
/// Overall, directly mapped nodes represent equal calculations.
/// Notes:
/// * Mapped DAG can contain many nodes which represent the same calculation.
/// In this case mapping is ambiguous and only one node is mapped.
/// * Aliases for mapped DAG are not supported.
/// DAG for PK does not contain aliases and ambiguous nodes.
struct MatchedTrees
{
/// Monotonicity is calculated for monotonic functions chain.
/// Chain is not strict if there is any non-strict monotonic function.
struct Monotonicity
{
int direction = 1;
bool strict = true;
};
struct Match
{
const ActionsDAG::Node * node = nullptr;
std::optional<Monotonicity> monotonicity;
};
using Matches = std::unordered_map<const ActionsDAG::Node *, Match>;
};
MatchedTrees::Matches matchTrees(const ActionsDAG & inner_dag, const ActionsDAG & outer_dag)
{
using Parents = std::set<const ActionsDAG::Node *>;
std::unordered_map<const ActionsDAG::Node *, Parents> inner_parents;
std::unordered_map<std::string_view, const ActionsDAG::Node *> inner_inputs;
{
std::stack<const ActionsDAG::Node *> stack;
for (const auto * out : inner_dag.getOutputs())
{
if (inner_parents.contains(out))
continue;
stack.push(out);
inner_parents.emplace(out, Parents());
while (!stack.empty())
{
const auto * node = stack.top();
stack.pop();
if (node->type == ActionsDAG::ActionType::INPUT)
inner_inputs.emplace(node->result_name, node);
for (const auto * child : node->children)
{
auto [it, inserted] = inner_parents.emplace(child, Parents());
it->second.emplace(node);
if (inserted)
stack.push(child);
}
}
}
}
struct Frame
{
const ActionsDAG::Node * node;
ActionsDAG::NodeRawConstPtrs mapped_children;
};
MatchedTrees::Matches matches;
std::stack<Frame> stack;
for (const auto & node : outer_dag.getNodes())
{
if (matches.contains(&node))
continue;
stack.push(Frame{&node, {}});
while (!stack.empty())
{
auto & frame = stack.top();
frame.mapped_children.reserve(frame.node->children.size());
while (frame.mapped_children.size() < frame.node->children.size())
{
const auto * child = frame.node->children[frame.mapped_children.size()];
auto it = matches.find(child);
if (it == matches.end())
{
/// If match map does not contain a child, it was not visited.
stack.push(Frame{child, {}});
break;
}
/// A node from found match may be nullptr.
/// It means that node is visited, but no match was found.
frame.mapped_children.push_back(it->second.node);
}
if (frame.mapped_children.size() < frame.node->children.size())
continue;
/// Create an empty match for current node.
/// natch.node will be set if match is found.
auto & match = matches[frame.node];
if (frame.node->type == ActionsDAG::ActionType::INPUT)
{
const ActionsDAG::Node * mapped = nullptr;
if (auto it = inner_inputs.find(frame.node->result_name); it != inner_inputs.end())
mapped = it->second;
match.node = mapped;
}
else if (frame.node->type == ActionsDAG::ActionType::ALIAS)
{
match = matches[frame.node->children.at(0)];
}
else if (frame.node->type == ActionsDAG::ActionType::FUNCTION)
{
//std::cerr << "... Processing " << frame.node->function_base->getName() << std::endl;
bool found_all_children = true;
for (const auto * child : frame.mapped_children)
if (!child)
found_all_children = false;
if (found_all_children && !frame.mapped_children.empty())
{
Parents container;
Parents * intersection = &inner_parents[frame.mapped_children[0]];
if (frame.mapped_children.size() > 1)
{
std::vector<Parents *> other_parents;
size_t mapped_children_size = frame.mapped_children.size();
other_parents.reserve(mapped_children_size);
for (size_t i = 1; i < mapped_children_size; ++i)
other_parents.push_back(&inner_parents[frame.mapped_children[i]]);
for (const auto * parent : *intersection)
{
bool is_common = true;
for (const auto * set : other_parents)
{
if (!set->contains(parent))
{
is_common = false;
break;
}
}
if (is_common)
container.insert(parent);
}
intersection = &container;
}
//std::cerr << ".. Candidate parents " << intersection->size() << std::endl;
if (!intersection->empty())
{
auto func_name = frame.node->function_base->getName();
for (const auto * parent : *intersection)
{
//std::cerr << ".. candidate " << parent->result_name << std::endl;
if (parent->type == ActionsDAG::ActionType::FUNCTION && func_name == parent->function_base->getName())
{
const auto & children = parent->children;
size_t num_children = children.size();
if (frame.mapped_children.size() == num_children)
{
bool all_children_matched = true;
for (size_t i = 0; all_children_matched && i < num_children; ++i)
all_children_matched = frame.mapped_children[i] == children[i];
if (all_children_matched)
{
match.node = parent;
break;
}
}
}
}
}
}
if (!match.node && frame.node->function_base->hasInformationAboutMonotonicity())
{
size_t num_const_args = 0;
const ActionsDAG::Node * monotonic_child = nullptr;
for (const auto * child : frame.node->children)
{
if (child->column)
++num_const_args;
else
monotonic_child = child;
}
if (monotonic_child && num_const_args + 1 == frame.node->children.size())
{
const auto & child_match = matches[monotonic_child];
if (child_match.node)
{
auto info = frame.node->function_base->getMonotonicityForRange(*monotonic_child->result_type, {}, {});
if (info.is_monotonic)
{
MatchedTrees::Monotonicity monotonicity;
monotonicity.direction *= info.is_positive ? 1 : -1;
monotonicity.strict = info.is_strict;
if (child_match.monotonicity)
{
monotonicity.direction *= child_match.monotonicity->direction;
if (!child_match.monotonicity->strict)
monotonicity.strict = false;
}
match.node = child_match.node;
match.monotonicity = monotonicity;
}
}
}
}
}
stack.pop();
}
}
return matches;
}
InputOrderInfoPtr buildInputOrderInfo(
const FixedColumns & fixed_columns,
const ActionsDAGPtr & dag,

View File

@ -0,0 +1,214 @@
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
#include <Functions/IFunction.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/Optimizations/actionsDAGUtils.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <stack>
#include <unordered_map>
using namespace DB;
namespace
{
using NodeSet = std::unordered_set<const ActionsDAG::Node *>;
using NodeMap = std::unordered_map<const ActionsDAG::Node *, bool>;
struct Frame
{
const ActionsDAG::Node * node = nullptr;
size_t next_child = 0;
};
bool isInjectiveFunction(const ActionsDAG::Node * node)
{
if (node->function_base->isInjective({}))
return true;
size_t fixed_args = 0;
for (const auto & child : node->children)
if (child->type == ActionsDAG::ActionType::COLUMN)
++fixed_args;
static const std::vector<String> injective = {"plus", "minus", "negate", "tuple"};
return (fixed_args + 1 >= node->children.size()) && (std::ranges::find(injective, node->function_base->getName()) != injective.end());
}
void removeInjectiveFunctionsFromResultsRecursively(const ActionsDAG::Node * node, NodeSet & irreducible, NodeSet & visited)
{
if (visited.contains(node))
return;
visited.insert(node);
switch (node->type)
{
case ActionsDAG::ActionType::ALIAS:
assert(node->children.size() == 1);
removeInjectiveFunctionsFromResultsRecursively(node->children.at(0), irreducible, visited);
break;
case ActionsDAG::ActionType::ARRAY_JOIN:
UNREACHABLE();
case ActionsDAG::ActionType::COLUMN:
irreducible.insert(node);
break;
case ActionsDAG::ActionType::FUNCTION:
if (!isInjectiveFunction(node))
{
irreducible.insert(node);
}
else
{
for (const auto & child : node->children)
removeInjectiveFunctionsFromResultsRecursively(child, irreducible, visited);
}
break;
case ActionsDAG::ActionType::INPUT:
irreducible.insert(node);
break;
}
}
/// Our objective is to replace injective function nodes in `actions` results with its children
/// until only the irreducible subset of nodes remains. Against these set of nodes we will match partition key expression
/// to determine if it maps all rows with the same value of group by key to the same partition.
NodeSet removeInjectiveFunctionsFromResultsRecursively(const ActionsDAGPtr & actions)
{
NodeSet irreducible;
NodeSet visited;
for (const auto & node : actions->getOutputs())
removeInjectiveFunctionsFromResultsRecursively(node, irreducible, visited);
return irreducible;
}
bool allOutputsDependsOnlyOnAllowedNodes(
const NodeSet & irreducible_nodes, const MatchedTrees::Matches & matches, const ActionsDAG::Node * node, NodeMap & visited)
{
if (visited.contains(node))
return visited[node];
bool res = false;
/// `matches` maps partition key nodes into nodes in group by actions
if (matches.contains(node))
{
const auto & match = matches.at(node);
/// Function could be mapped into its argument. In this case .monotonicity != std::nullopt (see matchTrees)
if (match.node && match.node->result_name == node->result_name && !match.monotonicity)
res = irreducible_nodes.contains(match.node);
}
if (!res)
{
switch (node->type)
{
case ActionsDAG::ActionType::ALIAS:
assert(node->children.size() == 1);
res = allOutputsDependsOnlyOnAllowedNodes(irreducible_nodes, matches, node->children.at(0), visited);
break;
case ActionsDAG::ActionType::ARRAY_JOIN:
UNREACHABLE();
case ActionsDAG::ActionType::COLUMN:
/// Constants doesn't matter, so let's always consider them matched.
res = true;
break;
case ActionsDAG::ActionType::FUNCTION:
res = true;
for (const auto & child : node->children)
res &= allOutputsDependsOnlyOnAllowedNodes(irreducible_nodes, matches, child, visited);
break;
case ActionsDAG::ActionType::INPUT:
break;
}
}
visited[node] = res;
return res;
}
/// Here we check that partition key expression is a deterministic function of the reduced set of group by key nodes.
/// No need to explicitly check that each function is deterministic, because it is a guaranteed property of partition key expression (checked on table creation).
/// So it is left only to check that each output node depends only on the allowed set of nodes (`irreducible_nodes`).
bool allOutputsDependsOnlyOnAllowedNodes(
const ActionsDAG & partition_actions, const NodeSet & irreducible_nodes, const MatchedTrees::Matches & matches)
{
NodeMap visited;
bool res = true;
for (const auto & node : partition_actions.getOutputs())
if (node->type != ActionsDAG::ActionType::INPUT)
res &= allOutputsDependsOnlyOnAllowedNodes(irreducible_nodes, matches, node, visited);
return res;
}
/// 0. Partition key columns should be a subset of group by key columns.
/// 1. Optimization is applicable if partition by expression is a deterministic function of col1, ..., coln and group by key is injective functions of these col1, ..., coln.
/// 2. To find col1, ..., coln we apply removeInjectiveFunctionsFromResultsRecursively to group by key actions.
/// 3. We match partition key actions with group by key actions to find col1', ..., coln' in partition key actions.
/// 4. We check that partition key is indeed a deterministic function of col1', ..., coln'.
bool isPartitionKeySuitsGroupByKey(
const ReadFromMergeTree & reading, const ActionsDAGPtr & group_by_actions, const AggregatingStep & aggregating)
{
if (aggregating.isGroupingSets())
return false;
if (group_by_actions->hasArrayJoin() || group_by_actions->hasStatefulFunctions() || group_by_actions->hasNonDeterministic())
return false;
const auto & gb_key_required_columns = group_by_actions->getRequiredColumnsNames();
const auto & partition_actions = reading.getStorageMetadata()->getPartitionKey().expression->getActionsDAG();
/// Check that PK columns is a subset of GBK columns.
for (const auto & col : partition_actions.getRequiredColumnsNames())
if (std::ranges::find(gb_key_required_columns, col) == gb_key_required_columns.end())
return false;
const auto irreducibe_nodes = removeInjectiveFunctionsFromResultsRecursively(group_by_actions);
const auto matches = matchTrees(*group_by_actions, partition_actions);
return allOutputsDependsOnlyOnAllowedNodes(partition_actions, irreducibe_nodes, matches);
}
}
namespace DB::QueryPlanOptimizations
{
size_t tryAggregatePartitionsIndependently(QueryPlan::Node * node, QueryPlan::Nodes &)
{
if (!node || node->children.size() != 1)
return 0;
auto * aggregating_step = typeid_cast<AggregatingStep *>(node->step.get());
if (!aggregating_step)
return 0;
const auto * expression_node = node->children.front();
const auto * expression_step = typeid_cast<const ExpressionStep *>(expression_node->step.get());
if (!expression_step)
return 0;
auto * maybe_reading_step = expression_node->children.front()->step.get();
if (const auto * filter = typeid_cast<const FilterStep *>(maybe_reading_step))
{
const auto * filter_node = expression_node->children.front();
if (filter_node->children.size() != 1 || !filter_node->children.front()->step)
return 0;
maybe_reading_step = filter_node->children.front()->step.get();
}
auto * reading = typeid_cast<ReadFromMergeTree *>(maybe_reading_step);
if (!reading)
return 0;
if (!reading->willOutputEachPartitionThroughSeparatePort()
&& isPartitionKeySuitsGroupByKey(*reading, expression_step->getExpression(), *aggregating_step))
{
if (reading->requestOutputEachPartitionThroughSeparatePort())
aggregating_step->skipMerging();
}
return 0;
}
}

View File

@ -1,18 +1,12 @@
#include <Storages/MergeTree/MergeTreeReadPool.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <base/sort.h>
#include <Common/JSONBuilder.h>
#include <Common/logger_useful.h>
#include <Common/isLocalAddress.h>
#include "Storages/MergeTree/RequestResponse.h"
#include <IO/Operators.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/TreeRewriter.h>
#include <IO/Operators.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTSelectQuery.h>
#include <Poco/Logger.h>
#include <Processors/ConcatProcessor.h>
#include <Processors/Merges/AggregatingSortedTransform.h>
#include <Processors/Merges/CollapsingSortedTransform.h>
@ -22,28 +16,74 @@
#include <Processors/Merges/SummingSortedTransform.h>
#include <Processors/Merges/VersionedCollapsingTransform.h>
#include <Processors/QueryPlan/PartsSplitter.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/Transforms/ReverseTransform.h>
#include <Processors/Transforms/ReadFromMergeTreeDependencyTransform.h>
#include <Processors/Transforms/ReverseTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <Storages/MergeTree/MergeTreeInOrderSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeReadPool.h>
#include <Storages/MergeTree/MergeTreeReverseSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeThreadSelectProcessor.h>
#include <Storages/MergeTree/MergeTreePrefetchedReadPool.h>
#include <Storages/MergeTree/MergeTreeSource.h>
#include <Storages/MergeTree/MergeTreeThreadSelectProcessor.h>
#include <Storages/MergeTree/RangesInDataPart.h>
#include <Storages/MergeTree/RequestResponse.h>
#include <Storages/VirtualColumnUtils.h>
#include <base/sort.h>
#include <Poco/Logger.h>
#include <Common/JSONBuilder.h>
#include <Common/isLocalAddress.h>
#include <Common/logger_useful.h>
#include <algorithm>
#include <functional>
#include <iterator>
#include <limits>
#include <memory>
#include <numeric>
#include <queue>
#include <stdexcept>
#include <unordered_map>
using namespace DB;
namespace
{
template <typename Container, typename Getter>
size_t countPartitions(const Container & parts, Getter get_partition_id)
{
if (parts.empty())
return 0;
String cur_partition_id = get_partition_id(parts[0]);
size_t unique_partitions = 1;
for (size_t i = 1; i < parts.size(); ++i)
{
if (get_partition_id(parts[i]) != cur_partition_id)
{
++unique_partitions;
cur_partition_id = get_partition_id(parts[i]);
}
}
return unique_partitions;
}
size_t countPartitions(const RangesInDataParts & parts_with_ranges)
{
auto get_partition_id = [](const RangesInDataPart & rng) { return rng.data_part->info.partition_id; };
return countPartitions(parts_with_ranges, get_partition_id);
}
size_t countPartitions(const MergeTreeData::DataPartsVector & prepared_parts)
{
auto get_partition_id = [](const MergeTreeData::DataPartPtr data_part) { return data_part->info.partition_id; };
return countPartitions(prepared_parts, get_partition_id);
}
}
namespace ProfileEvents
{
@ -443,8 +483,7 @@ Pipe ReadFromMergeTree::read(
return readFromPoolParallelReplicas(parts_with_range, required_columns, max_streams, min_marks_for_concurrent_read, use_uncompressed_cache);
if (read_type == ReadType::Default && max_streams > 1)
return readFromPool(parts_with_range, required_columns, max_streams,
min_marks_for_concurrent_read, use_uncompressed_cache);
return readFromPool(parts_with_range, required_columns, max_streams, min_marks_for_concurrent_read, use_uncompressed_cache);
auto pipe = readInOrder(parts_with_range, required_columns, read_type, use_uncompressed_cache, /*limit */0, /*pool*/nullptr);
@ -526,9 +565,7 @@ struct PartRangesReadInfo
}
Pipe ReadFromMergeTree::spreadMarkRangesAmongStreams(
RangesInDataParts && parts_with_ranges,
const Names & column_names)
Pipe ReadFromMergeTree::spreadMarkRangesAmongStreams(RangesInDataParts && parts_with_ranges, size_t num_streams, const Names & column_names)
{
const auto & settings = context->getSettingsRef();
const auto data_settings = data.getSettings();
@ -540,7 +577,6 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreams(
if (0 == info.sum_marks)
return {};
size_t num_streams = requested_num_streams;
if (num_streams > 1)
{
/// Reduce the number of num_streams if the data is small.
@ -564,6 +600,7 @@ static ActionsDAGPtr createProjection(const Block & header)
Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
RangesInDataParts && parts_with_ranges,
size_t num_streams,
const Names & column_names,
ActionsDAGPtr & out_projection,
const InputOrderInfoPtr & input_order_info)
@ -589,7 +626,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
{
auto & outputs = prewhere_info->prewhere_actions->getOutputs();
std::unordered_set<const ActionsDAG::Node *> outputs_set(outputs.begin(), outputs.end());
for (const auto * input : prewhere_info->prewhere_actions->getInputs())
for (const auto * input : prewhere_info->prewhere_actions->getInputs())
{
if (!outputs_set.contains(input))
{
@ -600,8 +637,8 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
}
/// Let's split ranges to avoid reading much data.
auto split_ranges = [rows_granularity = data_settings->index_granularity, max_block_size = max_block_size]
(const auto & ranges, int direction)
auto split_ranges
= [rows_granularity = data_settings->index_granularity, max_block_size = max_block_size](const auto & ranges, int direction)
{
MarkRanges new_ranges;
const size_t max_marks_in_range = (max_block_size + rows_granularity - 1) / rows_granularity;
@ -645,11 +682,11 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
return new_ranges;
};
const size_t min_marks_per_stream = (info.sum_marks - 1) / requested_num_streams + 1;
const size_t min_marks_per_stream = (info.sum_marks - 1) / num_streams + 1;
bool need_preliminary_merge = (parts_with_ranges.size() > settings.read_in_order_two_level_merge_threshold);
std::vector<RangesInDataParts> splitted_parts_and_ranges;
splitted_parts_and_ranges.reserve(requested_num_streams);
splitted_parts_and_ranges.reserve(num_streams);
const auto read_type = input_order_info->direction == 1
? ReadFromMergeTree::ReadType::InOrder
@ -684,7 +721,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
}
for (size_t i = 0; i < requested_num_streams && !parts_with_ranges.empty(); ++i)
for (size_t i = 0; i < num_streams && !parts_with_ranges.empty(); ++i)
{
size_t need_marks = min_marks_per_stream;
RangesInDataParts new_parts;
@ -699,21 +736,18 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
size_t & marks_in_part = info.sum_marks_in_parts.back();
/// We will not take too few rows from a part.
if (marks_in_part >= info.min_marks_for_concurrent_read &&
need_marks < info.min_marks_for_concurrent_read)
if (marks_in_part >= info.min_marks_for_concurrent_read && need_marks < info.min_marks_for_concurrent_read)
need_marks = info.min_marks_for_concurrent_read;
/// Do not leave too few rows in the part.
if (marks_in_part > need_marks &&
marks_in_part - need_marks < info.min_marks_for_concurrent_read)
if (marks_in_part > need_marks && marks_in_part - need_marks < info.min_marks_for_concurrent_read)
need_marks = marks_in_part;
MarkRanges ranges_to_get_from_part;
/// We take full part if it contains enough marks or
/// if we know limit and part contains less than 'limit' rows.
bool take_full_part = marks_in_part <= need_marks
|| (input_order_info->limit && input_order_info->limit < part.getRowsCount());
bool take_full_part = marks_in_part <= need_marks || (input_order_info->limit && input_order_info->limit < part.getRowsCount());
/// We take the whole part if it is small enough.
if (take_full_part)
@ -764,7 +798,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
if (!pipes.empty())
pipe_header = pipes.front().getHeader();
if (need_preliminary_merge)
if (need_preliminary_merge || output_each_partition_through_separate_port)
{
size_t prefix_size = input_order_info->used_prefix_of_sorting_key_size;
auto order_key_prefix_ast = metadata_for_reading->getSortingKey().expression_list_ast->clone();
@ -783,25 +817,33 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
auto sorting_key_expr = std::make_shared<ExpressionActions>(sorting_key_prefix_expr);
for (auto & pipe : pipes)
auto merge_streams = [&](Pipe & pipe)
{
pipe.addSimpleTransform([sorting_key_expr](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, sorting_key_expr);
});
{ return std::make_shared<ExpressionTransform>(header, sorting_key_expr); });
if (pipe.numOutputPorts() > 1)
{
auto transform = std::make_shared<MergingSortedTransform>(
pipe.getHeader(),
pipe.numOutputPorts(),
sort_description,
max_block_size,
SortingQueueStrategy::Batch);
pipe.getHeader(), pipe.numOutputPorts(), sort_description, max_block_size, SortingQueueStrategy::Batch);
pipe.addTransform(std::move(transform));
}
};
if (!pipes.empty() && output_each_partition_through_separate_port)
{
/// In contrast with usual aggregation in order that allocates separate AggregatingTransform for each data part,
/// aggregation of partitioned data uses the same AggregatingTransform for all parts of the same partition.
/// Thus we need to merge all partition parts into a single sorted stream.
Pipe pipe = Pipe::unitePipes(std::move(pipes));
merge_streams(pipe);
out_projection = createProjection(pipe_header);
return pipe;
}
for (auto & pipe : pipes)
merge_streams(pipe);
}
if (!pipes.empty() && (need_preliminary_merge || have_input_columns_removed_after_prewhere))
@ -864,16 +906,14 @@ static void addMergingFinal(
Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
RangesInDataParts && parts_with_ranges,
const Names & column_names,
ActionsDAGPtr & out_projection)
RangesInDataParts && parts_with_ranges, size_t num_streams, const Names & column_names, ActionsDAGPtr & out_projection)
{
const auto & settings = context->getSettingsRef();
const auto data_settings = data.getSettings();
PartRangesReadInfo info(parts_with_ranges, settings, *data_settings);
size_t num_streams = requested_num_streams;
assert(num_streams == requested_num_streams);
if (num_streams > settings.max_final_threads)
num_streams = settings.max_final_threads;
@ -1350,6 +1390,67 @@ bool ReadFromMergeTree::requestReadingInOrder(size_t prefix_size, int direction,
return true;
}
bool ReadFromMergeTree::requestOutputEachPartitionThroughSeparatePort()
{
if (isQueryWithFinal())
return false;
const auto & settings = context->getSettingsRef();
const auto partitions_cnt = countPartitions(prepared_parts);
if (!settings.force_aggregate_partitions_independently && (partitions_cnt == 1 || partitions_cnt < settings.max_threads / 2))
{
LOG_TRACE(
log,
"Independent aggregation by partitions won't be used because there are too few of them: {}. You can set "
"force_aggregate_partitions_independently to suppress this check",
partitions_cnt);
return false;
}
if (!settings.force_aggregate_partitions_independently
&& (partitions_cnt > settings.max_number_of_partitions_for_independent_aggregation))
{
LOG_TRACE(
log,
"Independent aggregation by partitions won't be used because there are too many of them: {}. You can increase "
"max_number_of_partitions_for_independent_aggregation (current value is {}) or set "
"force_aggregate_partitions_independently to suppress this check",
partitions_cnt,
settings.max_number_of_partitions_for_independent_aggregation);
return false;
}
if (!settings.force_aggregate_partitions_independently)
{
std::unordered_map<String, size_t> partition_rows;
for (const auto & part : prepared_parts)
partition_rows[part->info.partition_id] += part->rows_count;
size_t sum_rows = 0;
size_t max_rows = 0;
for (const auto & [_, rows] : partition_rows)
{
sum_rows += rows;
max_rows = std::max(max_rows, rows);
}
/// Merging shouldn't take more time than preaggregation in normal cases. And exec time is proportional to the amount of data.
/// We assume that exec time of independent aggr is proportional to the maximum of sizes and
/// exec time of ordinary aggr is proportional to sum of sizes divided by number of threads and multiplied by two (preaggregation + merging).
const size_t avg_rows_in_partition = sum_rows / settings.max_threads;
if (max_rows > avg_rows_in_partition * 2)
{
LOG_TRACE(
log,
"Independent aggregation by partitions won't be used because there are too big skew in the number of rows between "
"partitions. You can set force_aggregate_partitions_independently to suppress this check");
return false;
}
}
return output_each_partition_through_separate_port = true;
}
ReadFromMergeTree::AnalysisResult ReadFromMergeTree::getAnalysisResult() const
{
auto result_ptr = analyzed_result_ptr ? analyzed_result_ptr : selectRangesToRead(prepared_parts);
@ -1359,6 +1460,106 @@ ReadFromMergeTree::AnalysisResult ReadFromMergeTree::getAnalysisResult() const
return std::get<ReadFromMergeTree::AnalysisResult>(result_ptr->result);
}
bool ReadFromMergeTree::isQueryWithFinal() const
{
const auto & select = query_info.query->as<ASTSelectQuery &>();
if (query_info.table_expression_modifiers)
return query_info.table_expression_modifiers->hasFinal();
else
return select.final();
}
Pipe ReadFromMergeTree::spreadMarkRanges(
RangesInDataParts && parts_with_ranges, size_t num_streams, AnalysisResult & result, ActionsDAGPtr & result_projection)
{
bool final = isQueryWithFinal();
const auto & input_order_info = query_info.getInputOrderInfo();
Names column_names_to_read = result.column_names_to_read;
if (!final && result.sampling.use_sampling)
{
/// Add columns needed for `sample_by_ast` to `column_names_to_read`.
/// Skip this if final was used, because such columns were already added from PK.
std::vector<String> add_columns = result.sampling.filter_expression->getRequiredColumns().getNames();
column_names_to_read.insert(column_names_to_read.end(), add_columns.begin(), add_columns.end());
::sort(column_names_to_read.begin(), column_names_to_read.end());
column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()), column_names_to_read.end());
}
/// Construct a proper coordinator
if (input_order_info && is_parallel_reading_from_replicas && context->getClientInfo().interface == ClientInfo::Interface::LOCAL)
{
assert(context->parallel_reading_coordinator);
auto mode = input_order_info->direction == 1 ? CoordinationMode::WithOrder : CoordinationMode::ReverseOrder;
context->parallel_reading_coordinator->setMode(mode);
}
if (final)
{
if (is_parallel_reading_from_replicas)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Final modifier is not supported with parallel replicas");
if (output_each_partition_through_separate_port)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Optimisation isn't supposed to be used for queries with final");
/// Add columns needed to calculate the sorting expression and the sign.
std::vector<String> add_columns = metadata_for_reading->getColumnsRequiredForSortingKey();
column_names_to_read.insert(column_names_to_read.end(), add_columns.begin(), add_columns.end());
if (!data.merging_params.is_deleted_column.empty())
column_names_to_read.push_back(data.merging_params.is_deleted_column);
if (!data.merging_params.sign_column.empty())
column_names_to_read.push_back(data.merging_params.sign_column);
if (!data.merging_params.version_column.empty())
column_names_to_read.push_back(data.merging_params.version_column);
::sort(column_names_to_read.begin(), column_names_to_read.end());
column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()), column_names_to_read.end());
return spreadMarkRangesAmongStreamsFinal(std::move(parts_with_ranges), num_streams, column_names_to_read, result_projection);
}
else if (input_order_info)
{
return spreadMarkRangesAmongStreamsWithOrder(
std::move(parts_with_ranges), num_streams, column_names_to_read, result_projection, input_order_info);
}
else
{
return spreadMarkRangesAmongStreams(std::move(parts_with_ranges), num_streams, column_names_to_read);
}
}
Pipe ReadFromMergeTree::groupStreamsByPartition(AnalysisResult & result, ActionsDAGPtr & result_projection)
{
auto && parts_with_ranges = std::move(result.parts_with_ranges);
if (parts_with_ranges.empty())
return {};
const size_t partitions_cnt = std::max<size_t>(countPartitions(parts_with_ranges), 1);
const size_t partitions_per_stream = std::max<size_t>(1, partitions_cnt / requested_num_streams);
const size_t num_streams = std::max<size_t>(1, requested_num_streams / partitions_cnt);
Pipes pipes;
for (auto begin = parts_with_ranges.begin(), end = begin; end != parts_with_ranges.end(); begin = end)
{
for (size_t i = 0; i < partitions_per_stream; ++i)
end = std::find_if(
end,
parts_with_ranges.end(),
[&end](const auto & part) { return end->data_part->info.partition_id != part.data_part->info.partition_id; });
RangesInDataParts partition_parts{std::make_move_iterator(begin), std::make_move_iterator(end)};
pipes.emplace_back(spreadMarkRanges(std::move(partition_parts), num_streams, result, result_projection));
if (!pipes.back().empty())
pipes.back().resize(1);
}
return Pipe::unitePipes(std::move(pipes));
}
void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
auto result = getAnalysisResult();
@ -1393,70 +1594,9 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons
/// NOTE: It may lead to double computation of expressions.
ActionsDAGPtr result_projection;
Names column_names_to_read = std::move(result.column_names_to_read);
bool final = isFinal(query_info);
if (!final && result.sampling.use_sampling)
{
/// Add columns needed for `sample_by_ast` to `column_names_to_read`.
/// Skip this if final was used, because such columns were already added from PK.
std::vector<String> add_columns = result.sampling.filter_expression->getRequiredColumns().getNames();
column_names_to_read.insert(column_names_to_read.end(), add_columns.begin(), add_columns.end());
::sort(column_names_to_read.begin(), column_names_to_read.end());
column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()),
column_names_to_read.end());
}
Pipe pipe;
const auto & input_order_info = query_info.getInputOrderInfo();
/// Construct a proper coordinator
if (input_order_info && is_parallel_reading_from_replicas && context->getClientInfo().interface == ClientInfo::Interface::LOCAL)
{
assert(context->parallel_reading_coordinator);
auto mode = input_order_info->direction == 1 ? CoordinationMode::WithOrder : CoordinationMode::ReverseOrder;
context->parallel_reading_coordinator->setMode(mode);
}
if (final && is_parallel_reading_from_replicas)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Final modifier is not supported with parallel replicas");
if (final)
{
/// Add columns needed to calculate the sorting expression and the sign.
std::vector<String> add_columns = metadata_for_reading->getColumnsRequiredForSortingKey();
column_names_to_read.insert(column_names_to_read.end(), add_columns.begin(), add_columns.end());
if (!data.merging_params.is_deleted_column.empty())
column_names_to_read.push_back(data.merging_params.is_deleted_column);
if (!data.merging_params.sign_column.empty())
column_names_to_read.push_back(data.merging_params.sign_column);
if (!data.merging_params.version_column.empty())
column_names_to_read.push_back(data.merging_params.version_column);
::sort(column_names_to_read.begin(), column_names_to_read.end());
column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()), column_names_to_read.end());
pipe = spreadMarkRangesAmongStreamsFinal(
std::move(result.parts_with_ranges),
column_names_to_read,
result_projection);
}
else if (input_order_info)
{
pipe = spreadMarkRangesAmongStreamsWithOrder(
std::move(result.parts_with_ranges),
column_names_to_read,
result_projection,
input_order_info);
}
else
{
pipe = spreadMarkRangesAmongStreams(
std::move(result.parts_with_ranges),
column_names_to_read);
}
Pipe pipe = output_each_partition_through_separate_port
? groupStreamsByPartition(result, result_projection)
: spreadMarkRanges(std::move(result.parts_with_ranges), requested_num_streams, result, result_projection);
for (const auto & processor : pipe.getProcessors())
processor->setStorageLimits(query_info.storage_limits);

View File

@ -163,6 +163,10 @@ public:
static bool isFinal(const SelectQueryInfo & query_info);
/// Returns true if the optimisation is applicable (and applies it then).
bool requestOutputEachPartitionThroughSeparatePort();
bool willOutputEachPartitionThroughSeparatePort() const { return output_each_partition_through_separate_port; }
private:
static MergeTreeDataSelectAnalysisResultPtr selectRangesToReadImpl(
MergeTreeData::DataPartsVector parts,
@ -177,6 +181,8 @@ private:
bool sample_factor_column_queried,
Poco::Logger * log);
bool isQueryWithFinal() const;
int getSortDirection() const
{
const InputOrderInfoPtr & order_info = query_info.getInputOrderInfo();
@ -212,6 +218,9 @@ private:
const size_t preferred_max_column_in_block_size_bytes;
const bool sample_factor_column_queried;
/// Used for aggregation optimisation (see DB::QueryPlanOptimizations::tryAggregateEachPartitionIndependently).
bool output_each_partition_through_separate_port = false;
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read;
Poco::Logger * log;
@ -227,20 +236,22 @@ private:
template<typename TSource>
ProcessorPtr createSource(const RangesInDataPart & part, const Names & required_columns, bool use_uncompressed_cache, bool has_limit_below_one_block, MergeTreeInOrderReadPoolParallelReplicasPtr pool);
Pipe spreadMarkRangesAmongStreams(
RangesInDataParts && parts_with_ranges,
const Names & column_names);
Pipe spreadMarkRanges(
RangesInDataParts && parts_with_ranges, size_t num_streams, AnalysisResult & result, ActionsDAGPtr & result_projection);
Pipe groupStreamsByPartition(AnalysisResult & result, ActionsDAGPtr & result_projection);
Pipe spreadMarkRangesAmongStreams(RangesInDataParts && parts_with_ranges, size_t num_streams, const Names & column_names);
Pipe spreadMarkRangesAmongStreamsWithOrder(
RangesInDataParts && parts_with_ranges,
size_t num_streams,
const Names & column_names,
ActionsDAGPtr & out_projection,
const InputOrderInfoPtr & input_order_info);
Pipe spreadMarkRangesAmongStreamsFinal(
RangesInDataParts && parts,
const Names & column_names,
ActionsDAGPtr & out_projection);
RangesInDataParts && parts, size_t num_streams, const Names & column_names, ActionsDAGPtr & out_projection);
MergeTreeDataSelectAnalysisResultPtr selectRangesToRead(MergeTreeData::DataPartsVector parts) const;
ReadFromMergeTree::AnalysisResult getAnalysisResult() const;

View File

@ -6,6 +6,8 @@
#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
#include <Core/ProtocolDefines.h>
#include <Processors/Transforms/SquashingChunksTransform.h>
namespace ProfileEvents
{
extern const Event ExternalAggregationMerge;
@ -81,7 +83,7 @@ namespace
/// Worker which merges buckets for two-level aggregation.
/// Atomically increments bucket counter and returns merged result.
class ConvertingAggregatedToChunksSource : public ISource
class ConvertingAggregatedToChunksWithMergingSource : public ISource
{
public:
static constexpr UInt32 NUM_BUCKETS = 256;
@ -101,19 +103,17 @@ public:
using SharedDataPtr = std::shared_ptr<SharedData>;
ConvertingAggregatedToChunksSource(
AggregatingTransformParamsPtr params_,
ManyAggregatedDataVariantsPtr data_,
SharedDataPtr shared_data_,
Arena * arena_)
ConvertingAggregatedToChunksWithMergingSource(
AggregatingTransformParamsPtr params_, ManyAggregatedDataVariantsPtr data_, SharedDataPtr shared_data_, Arena * arena_)
: ISource(params_->getHeader(), false)
, params(std::move(params_))
, data(std::move(data_))
, shared_data(std::move(shared_data_))
, arena(arena_)
{}
{
}
String getName() const override { return "ConvertingAggregatedToChunksSource"; }
String getName() const override { return "ConvertingAggregatedToChunksWithMergingSource"; }
protected:
Chunk generate() override
@ -138,21 +138,145 @@ private:
Arena * arena;
};
/// Asks Aggregator to convert accumulated aggregation state into blocks (without merging) and pushes them to later steps.
class ConvertingAggregatedToChunksSource : public ISource
{
public:
ConvertingAggregatedToChunksSource(AggregatingTransformParamsPtr params_, AggregatedDataVariantsPtr variant_)
: ISource(params_->getHeader(), false), params(params_), variant(variant_)
{
}
String getName() const override { return "ConvertingAggregatedToChunksSource"; }
protected:
Chunk generate() override
{
if (variant->isTwoLevel())
{
if (current_bucket_num < NUM_BUCKETS)
{
Arena * arena = variant->aggregates_pool;
Block block = params->aggregator.convertOneBucketToBlock(*variant, arena, params->final, current_bucket_num++);
return convertToChunk(block);
}
}
else if (!single_level_converted)
{
Block block = params->aggregator.prepareBlockAndFillSingleLevel<true /* return_single_block */>(*variant, params->final);
single_level_converted = true;
return convertToChunk(block);
}
return {};
}
private:
static constexpr UInt32 NUM_BUCKETS = 256;
AggregatingTransformParamsPtr params;
AggregatedDataVariantsPtr variant;
UInt32 current_bucket_num = 0;
bool single_level_converted = false;
};
/// Reads chunks from GroupingAggregatedTransform (stored in ChunksToMerge structure) and outputs them.
class FlattenChunksToMergeTransform : public IProcessor
{
public:
explicit FlattenChunksToMergeTransform(const Block & input_header, const Block & output_header)
: IProcessor({input_header}, {output_header})
{
}
String getName() const override { return "FlattenChunksToMergeTransform"; }
private:
void work() override
{
}
void process(Chunk && chunk)
{
if (!chunk.hasChunkInfo())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected chunk with chunk info in {}", getName());
const auto & info = chunk.getChunkInfo();
const auto * chunks_to_merge = typeid_cast<const ChunksToMerge *>(info.get());
if (!chunks_to_merge)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected chunk with ChunksToMerge info in {}", getName());
if (chunks_to_merge->chunks)
for (auto & cur_chunk : *chunks_to_merge->chunks)
chunks.emplace_back(std::move(cur_chunk));
}
Status prepare() override
{
auto & input = inputs.front();
auto & output = outputs.front();
if (output.isFinished())
{
input.close();
return Status::Finished;
}
if (!output.canPush())
{
input.setNotNeeded();
return Status::PortFull;
}
if (!chunks.empty())
{
output.push(std::move(chunks.front()));
chunks.pop_front();
if (!chunks.empty())
return Status::Ready;
}
if (input.isFinished() && chunks.empty())
{
output.finish();
return Status::Finished;
}
if (input.isFinished())
return Status::Ready;
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
Chunk chunk = input.pull(true /* set_not_needed */);
process(std::move(chunk));
return Status::Ready;
}
std::list<Chunk> chunks;
};
/// Generates chunks with aggregated data.
/// In single level case, aggregates data itself.
/// In two-level case, creates `ConvertingAggregatedToChunksSource` workers:
/// In two-level case, creates `ConvertingAggregatedToChunksWithMergingSource` workers:
///
/// ConvertingAggregatedToChunksSource ->
/// ConvertingAggregatedToChunksSource -> ConvertingAggregatedToChunksTransform -> AggregatingTransform
/// ConvertingAggregatedToChunksSource ->
/// ConvertingAggregatedToChunksWithMergingSource ->
/// ConvertingAggregatedToChunksWithMergingSource -> ConvertingAggregatedToChunksTransform -> AggregatingTransform
/// ConvertingAggregatedToChunksWithMergingSource ->
///
/// Result chunks guaranteed to be sorted by bucket number.
class ConvertingAggregatedToChunksTransform : public IProcessor
{
public:
ConvertingAggregatedToChunksTransform(AggregatingTransformParamsPtr params_, ManyAggregatedDataVariantsPtr data_, size_t num_threads_)
: IProcessor({}, {params_->getHeader()})
, params(std::move(params_)), data(std::move(data_)), num_threads(num_threads_) {}
: IProcessor({}, {params_->getHeader()}), params(std::move(params_)), data(std::move(data_)), num_threads(num_threads_)
{
}
String getName() const override { return "ConvertingAggregatedToChunksTransform"; }
@ -298,7 +422,7 @@ private:
AggregatingTransformParamsPtr params;
ManyAggregatedDataVariantsPtr data;
ConvertingAggregatedToChunksSource::SharedDataPtr shared_data;
ConvertingAggregatedToChunksWithMergingSource::SharedDataPtr shared_data;
size_t num_threads;
@ -368,13 +492,13 @@ private:
void createSources()
{
AggregatedDataVariantsPtr & first = data->at(0);
shared_data = std::make_shared<ConvertingAggregatedToChunksSource::SharedData>();
shared_data = std::make_shared<ConvertingAggregatedToChunksWithMergingSource::SharedData>();
for (size_t thread = 0; thread < num_threads; ++thread)
{
/// Select Arena to avoid race conditions
Arena * arena = first->aggregates_pools.at(thread).get();
auto source = std::make_shared<ConvertingAggregatedToChunksSource>(params, data, shared_data, arena);
auto source = std::make_shared<ConvertingAggregatedToChunksWithMergingSource>(params, data, shared_data, arena);
processors.emplace_back(std::move(source));
}
@ -382,8 +506,15 @@ private:
};
AggregatingTransform::AggregatingTransform(Block header, AggregatingTransformParamsPtr params_)
: AggregatingTransform(std::move(header), std::move(params_)
, std::make_unique<ManyAggregatedData>(1), 0, 1, 1)
: AggregatingTransform(
std::move(header),
std::move(params_),
std::make_unique<ManyAggregatedData>(1),
0,
1,
1,
true /* should_produce_results_in_order_of_bucket_number */,
false /* skip_merging */)
{
}
@ -393,7 +524,9 @@ AggregatingTransform::AggregatingTransform(
ManyAggregatedDataPtr many_data_,
size_t current_variant,
size_t max_threads_,
size_t temporary_data_merge_threads_)
size_t temporary_data_merge_threads_,
bool should_produce_results_in_order_of_bucket_number_,
bool skip_merging_)
: IProcessor({std::move(header)}, {params_->getHeader()})
, params(std::move(params_))
, key_columns(params->params.keys_size)
@ -402,6 +535,8 @@ AggregatingTransform::AggregatingTransform(
, variants(*many_data->variants[current_variant])
, max_threads(std::min(many_data->variants.size(), max_threads_))
, temporary_data_merge_threads(temporary_data_merge_threads_)
, should_produce_results_in_order_of_bucket_number(should_produce_results_in_order_of_bucket_number_)
, skip_merging(skip_merging_)
{
}
@ -575,9 +710,51 @@ void AggregatingTransform::initGenerate()
if (!params->aggregator.hasTemporaryData())
{
auto prepared_data = params->aggregator.prepareVariantsToMerge(many_data->variants);
auto prepared_data_ptr = std::make_shared<ManyAggregatedDataVariants>(std::move(prepared_data));
processors.emplace_back(std::make_shared<ConvertingAggregatedToChunksTransform>(params, std::move(prepared_data_ptr), max_threads));
if (!skip_merging)
{
auto prepared_data = params->aggregator.prepareVariantsToMerge(many_data->variants);
auto prepared_data_ptr = std::make_shared<ManyAggregatedDataVariants>(std::move(prepared_data));
processors.emplace_back(
std::make_shared<ConvertingAggregatedToChunksTransform>(params, std::move(prepared_data_ptr), max_threads));
}
else
{
auto prepared_data = params->aggregator.prepareVariantsToMerge(many_data->variants);
Pipes pipes;
for (auto & variant : prepared_data)
/// Converts hash tables to blocks with data (finalized or not).
pipes.emplace_back(std::make_shared<ConvertingAggregatedToChunksSource>(params, variant));
Pipe pipe = Pipe::unitePipes(std::move(pipes));
if (!pipe.empty())
{
if (should_produce_results_in_order_of_bucket_number)
{
/// Groups chunks with the same bucket_id and outputs them (as a vector of chunks) in order of bucket_id.
pipe.addTransform(std::make_shared<GroupingAggregatedTransform>(pipe.getHeader(), pipe.numOutputPorts(), params));
/// Outputs one chunk from group at a time in order of bucket_id.
pipe.addTransform(std::make_shared<FlattenChunksToMergeTransform>(pipe.getHeader(), params->getHeader()));
}
else
{
/// If this is a final stage, we no longer have to keep chunks from different buckets into different chunks.
/// So now we can insert transform that will keep chunks size under control. It makes few times difference in exec time in some cases.
if (params->final)
{
pipe.addSimpleTransform(
[this](const Block & header)
{
/// Just a reasonable constant, matches default value for the setting `preferred_block_size_bytes`
static constexpr size_t oneMB = 1024 * 1024;
return std::make_shared<SimpleSquashingChunksTransform>(header, params->params.max_block_size, oneMB);
});
}
/// AggregatingTransform::expandPipeline expects single output port.
/// It's not a big problem because we do resize() to max_threads after AggregatingTransform.
pipe.resize(1);
}
}
processors = Pipe::detachProcessors(std::move(pipe));
}
}
else
{

View File

@ -149,7 +149,9 @@ public:
ManyAggregatedDataPtr many_data,
size_t current_variant,
size_t max_threads,
size_t temporary_data_merge_threads);
size_t temporary_data_merge_threads,
bool should_produce_results_in_order_of_bucket_number_ = true,
bool skip_merging_ = false);
~AggregatingTransform() override;
String getName() const override { return "AggregatingTransform"; }
@ -181,6 +183,8 @@ private:
AggregatedDataVariants & variants;
size_t max_threads = 1;
size_t temporary_data_merge_threads = 1;
bool should_produce_results_in_order_of_bucket_number = true;
bool skip_merging = false; /// If we aggregate partitioned data merging is not needed.
/// TODO: calculate time only for aggregation.
Stopwatch watch;

View File

@ -86,7 +86,8 @@ private:
bool read_from_all_inputs = false;
std::vector<bool> read_from_input;
bool expect_several_chunks_for_single_bucket_per_source = false;
/// If we aggregate partitioned data several chunks might be produced for the same bucket: one for each partition.
bool expect_several_chunks_for_single_bucket_per_source = true;
/// Add chunk read from input to chunks_map, overflow_chunks or single_level_chunks according to it's chunk info.
void addChunk(Chunk chunk, size_t input);

View File

@ -50,4 +50,34 @@ void SquashingChunksTransform::work()
}
}
SimpleSquashingChunksTransform::SimpleSquashingChunksTransform(
const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes)
: ISimpleTransform(header, header, true), squashing(min_block_size_rows, min_block_size_bytes)
{
}
void SimpleSquashingChunksTransform::transform(Chunk & chunk)
{
if (!finished)
{
if (auto block = squashing.add(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns())))
chunk.setColumns(block.getColumns(), block.rows());
}
else
{
auto block = squashing.add({});
chunk.setColumns(block.getColumns(), block.rows());
}
}
IProcessor::Status SimpleSquashingChunksTransform::prepare()
{
if (!finished && input.isFinished())
{
finished = true;
return Status::Ready;
}
return ISimpleTransform::prepare();
}
}

View File

@ -1,7 +1,8 @@
#pragma once
#include <Processors/Sinks/SinkToStorage.h>
#include <Interpreters/SquashingTransform.h>
#include <Processors/ISimpleTransform.h>
#include <Processors/Sinks/SinkToStorage.h>
namespace DB
{
@ -28,4 +29,23 @@ private:
Chunk finish_chunk;
};
/// Doesn't care about propagating exceptions and thus doesn't throw LOGICAL_ERROR if the following transform closes its input port.
class SimpleSquashingChunksTransform : public ISimpleTransform
{
public:
explicit SimpleSquashingChunksTransform(const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes);
String getName() const override { return "SimpleSquashingTransform"; }
protected:
void transform(Chunk &) override;
IProcessor::Status prepare() override;
private:
SquashingTransform squashing;
/// When consumption is finished we need to release the final chunk regardless of its size.
bool finished = false;
};
}

View File

@ -0,0 +1,49 @@
<test>
<settings>
<!-- will enable in a subsequent PR -->
<!-- <allow_aggregate_partitions_independently>1</allow_aggregate_partitions_independently> -->
<!-- <force_aggregate_partitions_independently>1</force_aggregate_partitions_independently> -->
<!-- <max_number_of_partitions_for_independent_aggregation>256</max_number_of_partitions_for_independent_aggregation> -->
<max_memory_usage>0</max_memory_usage>
<max_partitions_per_insert_block>256</max_partitions_per_insert_block>
</settings>
<substitutions>
<substitution>
<name>size</name>
<values>
<!-- <value>10000</value> -->
<value>100000</value>
<value>1000000</value>
<value>10000000</value>
<!-- <value>100000000</value> -->
</values>
</substitution>
<substitution>
<name>partitions</name>
<values>
<!-- <value>2</value> -->
<!-- <value>4</value> -->
<!-- <value>8</value> -->
<value>16</value>
<value>32</value>
<value>64</value>
<!-- <value>128</value> -->
<!-- <value>256</value> -->
</values>
</substitution>
</substitutions>
<create_query>create table t_{size}_{partitions}(a UInt64) engine=MergeTree order by a partition by sipHash64(a) % {partitions}</create_query>
<fill_query>insert into t_{size}_{partitions} select * from numbers_mt({size})</fill_query>
<fill_query>optimize table t_{size}_{partitions} final</fill_query>
<query>select a from t_{size}_{partitions} group by a format Null</query>
<query>select a from t_{size}_{partitions} group by a format Null settings optimize_aggregation_in_order = 1</query>
<drop_query>drop table t_{size}_{partitions}</drop_query>
</test>

View File

@ -0,0 +1,215 @@
(Expression)
ExpressionTransform × 16
(Aggregating)
Resize 4 → 16
AggregatingTransform × 4
(Expression)
ExpressionTransform × 4
(ReadFromMergeTree)
Resize 3 → 1
MergeTreeThread × 3 0 → 1
Resize 3 → 1
MergeTreeThread × 3 0 → 1
Resize 3 → 1
MergeTreeThread × 3 0 → 1
Resize 3 → 1
MergeTreeThread × 3 0 → 1
1000000
(Expression)
ExpressionTransform × 16
(Aggregating)
Resize 8 → 16
AggregatingTransform × 8
(Expression)
ExpressionTransform × 8
(ReadFromMergeTree)
Resize 2 → 1
MergeTreeThread × 2 0 → 1
Resize 2 → 1
MergeTreeThread × 2 0 → 1
Resize 2 → 1
MergeTreeThread × 2 0 → 1
Resize 2 → 1
MergeTreeThread × 2 0 → 1
Resize 2 → 1
MergeTreeThread × 2 0 → 1
Resize 2 → 1
MergeTreeThread × 2 0 → 1
Resize 2 → 1
MergeTreeThread × 2 0 → 1
Resize 2 → 1
MergeTreeThread × 2 0 → 1
1000000
(Expression)
ExpressionTransform × 16
(Aggregating)
Resize 16 → 16
AggregatingTransform × 16
(Expression)
ExpressionTransform × 16
(ReadFromMergeTree)
Concat 2 → 1
MergeTreeInOrder × 2 0 → 1
Concat 2 → 1
MergeTreeInOrder × 2 0 → 1
Concat 2 → 1
MergeTreeInOrder × 2 0 → 1
Concat 2 → 1
MergeTreeInOrder × 2 0 → 1
Concat 2 → 1
MergeTreeInOrder × 2 0 → 1
Concat 2 → 1
MergeTreeInOrder × 2 0 → 1
Concat 2 → 1
MergeTreeInOrder × 2 0 → 1
Concat 2 → 1
MergeTreeInOrder × 2 0 → 1
Concat 2 → 1
MergeTreeInOrder × 2 0 → 1
Concat 2 → 1
MergeTreeInOrder × 2 0 → 1
Concat 2 → 1
MergeTreeInOrder × 2 0 → 1
Concat 2 → 1
MergeTreeInOrder × 2 0 → 1
Concat 2 → 1
MergeTreeInOrder × 2 0 → 1
Concat 2 → 1
MergeTreeInOrder × 2 0 → 1
Concat 2 → 1
MergeTreeInOrder × 2 0 → 1
Concat 2 → 1
MergeTreeInOrder × 2 0 → 1
1000000
1000000
(Expression)
ExpressionTransform × 16
(Aggregating)
Resize 4 → 16
FinalizeAggregatedTransform × 4
AggregatingInOrderTransform × 4
(Expression)
ExpressionTransform × 4
(ReadFromMergeTree)
ExpressionTransform × 4
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
1000000
(Expression)
ExpressionTransform × 16
(Aggregating)
Resize 8 → 16
FinalizeAggregatedTransform × 8
AggregatingInOrderTransform × 8
(Expression)
ExpressionTransform × 8
(ReadFromMergeTree)
ExpressionTransform × 8
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
1000000
(Expression)
ExpressionTransform × 16
(Aggregating)
FinalizeAggregatedTransform × 16
AggregatingInOrderTransform × 16
(Expression)
ExpressionTransform × 16
(ReadFromMergeTree)
ExpressionTransform × 16
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
1000000
Skip merging: 1
Skip merging: 1
Skip merging: 0
Skip merging: 1
Skip merging: 0
Skip merging: 1
Skip merging: 0
Skip merging: 1
Skip merging: 0
Skip merging: 0
Skip merging: 0
Skip merging: 1
Skip merging: 0
Skip merging: 0
Skip merging: 0

View File

@ -0,0 +1,260 @@
-- Tags: long
set max_threads = 16;
set allow_aggregate_partitions_independently = 1;
set force_aggregate_partitions_independently = 1;
set allow_experimental_projection_optimization = 0;
create table t1(a UInt32) engine=MergeTree order by tuple() partition by a % 4 settings index_granularity = 8192, index_granularity_bytes = 10485760;
system stop merges t1;
insert into t1 select number from numbers_mt(1e6);
insert into t1 select number from numbers_mt(1e6);
explain pipeline select a from t1 group by a;
select count() from (select throwIf(count() != 2) from t1 group by a);
drop table t1;
create table t2(a UInt32) engine=MergeTree order by tuple() partition by a % 8;
system stop merges t2;
insert into t2 select number from numbers_mt(1e6);
insert into t2 select number from numbers_mt(1e6);
explain pipeline select a from t2 group by a;
select count() from (select throwIf(count() != 2) from t2 group by a);
drop table t2;
create table t3(a UInt32) engine=MergeTree order by tuple() partition by a % 16;
system stop merges t3;
insert into t3 select number from numbers_mt(1e6);
insert into t3 select number from numbers_mt(1e6);
explain pipeline select a from t3 group by a;
select count() from (select throwIf(count() != 2) from t3 group by a);
select throwIf(count() != 4) from remote('127.0.0.{1,2}', currentDatabase(), t3) group by a format Null;
-- if we happened to switch to external aggregation at some point, merging will happen as usual
select count() from (select throwIf(count() != 2) from t3 group by a) settings max_bytes_before_external_group_by = '1Ki';
drop table t3;
-- aggregation in order --
set optimize_aggregation_in_order = 1;
create table t4(a UInt32) engine=MergeTree order by a partition by a % 4;
system stop merges t4;
insert into t4 select number from numbers_mt(1e6);
insert into t4 select number from numbers_mt(1e6);
explain pipeline select a from t4 group by a settings read_in_order_two_level_merge_threshold = 1e12;
select count() from (select throwIf(count() != 2) from t4 group by a);
drop table t4;
create table t5(a UInt32) engine=MergeTree order by a partition by a % 8;
system stop merges t5;
insert into t5 select number from numbers_mt(1e6);
insert into t5 select number from numbers_mt(1e6);
explain pipeline select a from t5 group by a settings read_in_order_two_level_merge_threshold = 1e12;
select count() from (select throwIf(count() != 2) from t5 group by a);
drop table t5;
create table t6(a UInt32) engine=MergeTree order by a partition by a % 16;
system stop merges t6;
insert into t6 select number from numbers_mt(1e6);
insert into t6 select number from numbers_mt(1e6);
explain pipeline select a from t6 group by a settings read_in_order_two_level_merge_threshold = 1e12;
select count() from (select throwIf(count() != 2) from t6 group by a);
drop table t6;
set optimize_aggregation_in_order = 0;
create table t7(a UInt32) engine=MergeTree order by a partition by intDiv(a, 2);
insert into t7 select number from numbers_mt(100);
select replaceRegexpOne(explain, '^[ ]*(.*)', '\\1') from (
explain actions=1 select intDiv(a, 2) as a1 from t7 group by a1
) where explain like '%Skip merging: %';
drop table t7;
create table t8(a UInt32) engine=MergeTree order by a partition by intDiv(a, 2) * 2 + 1;
insert into t8 select number from numbers_mt(100);
select replaceRegexpOne(explain, '^[ ]*(.*)', '\\1') from (
explain actions=1 select intDiv(a, 2) + 1 as a1 from t8 group by a1
) where explain like '%Skip merging: %';
drop table t8;
create table t9(a UInt32) engine=MergeTree order by a partition by intDiv(a, 2);
insert into t9 select number from numbers_mt(100);
select replaceRegexpOne(explain, '^[ ]*(.*)', '\\1') from (
explain actions=1 select intDiv(a, 3) as a1 from t9 group by a1
) where explain like '%Skip merging: %';
drop table t9;
create table t10(a UInt32, b UInt32) engine=MergeTree order by a partition by (intDiv(a, 2), intDiv(b, 3));
insert into t10 select number, number from numbers_mt(100);
select replaceRegexpOne(explain, '^[ ]*(.*)', '\\1') from (
explain actions=1 select intDiv(a, 2) + 1 as a1, intDiv(b, 3) as b1 from t10 group by a1, b1, pi()
) where explain like '%Skip merging: %';
drop table t10;
-- multiplication by 2 is not injective, so optimization is not applicable
create table t11(a UInt32, b UInt32) engine=MergeTree order by a partition by (intDiv(a, 2), intDiv(b, 3));
insert into t11 select number, number from numbers_mt(100);
select replaceRegexpOne(explain, '^[ ]*(.*)', '\\1') from (
explain actions=1 select intDiv(a, 2) + 1 as a1, intDiv(b, 3) * 2 as b1 from t11 group by a1, b1, pi()
) where explain like '%Skip merging: %';
drop table t11;
create table t12(a UInt32, b UInt32) engine=MergeTree order by a partition by a % 16;
insert into t12 select number, number from numbers_mt(100);
select replaceRegexpOne(explain, '^[ ]*(.*)', '\\1') from (
explain actions=1 select a, b from t12 group by a, b, pi()
) where explain like '%Skip merging: %';
drop table t12;
create table t13(a UInt32, b UInt32) engine=MergeTree order by a partition by (intDiv(a, 2), intDiv(b, 3));
insert into t13 select number, number from numbers_mt(100);
select replaceRegexpOne(explain, '^[ ]*(.*)', '\\1') from (
explain actions=1 select s from t13 group by intDiv(a, 2) + intDiv(b, 3) as s, pi()
) where explain like '%Skip merging: %';
drop table t13;
create table t14(a UInt32, b UInt32) engine=MergeTree order by a partition by intDiv(a, 2) + intDiv(b, 3);
insert into t14 select number, number from numbers_mt(100);
select replaceRegexpOne(explain, '^[ ]*(.*)', '\\1') from (
explain actions=1 select intDiv(a, 2) as a1, intDiv(b, 3) as b1 from t14 group by a1, b1, pi()
) where explain like '%Skip merging: %';
drop table t14;
-- to few partitions --
create table t15(a UInt32, b UInt32) engine=MergeTree order by a partition by a < 90;
insert into t15 select number, number from numbers_mt(100);
select replaceRegexpOne(explain, '^[ ]*(.*)', '\\1') from (
explain actions=1 select a from t15 group by a
) where explain like '%Skip merging: %'
settings force_aggregate_partitions_independently = 0;
drop table t15;
-- to many partitions --
create table t16(a UInt32, b UInt32) engine=MergeTree order by a partition by a % 16;
insert into t16 select number, number from numbers_mt(100);
select replaceRegexpOne(explain, '^[ ]*(.*)', '\\1') from (
explain actions=1 select a from t16 group by a
) where explain like '%Skip merging: %'
settings force_aggregate_partitions_independently = 0, max_number_of_partitions_for_independent_aggregation = 4;
drop table t16;
-- to big skew --
create table t17(a UInt32, b UInt32) engine=MergeTree order by a partition by a < 90;
insert into t17 select number, number from numbers_mt(100);
select replaceRegexpOne(explain, '^[ ]*(.*)', '\\1') from (
explain actions=1 select a from t17 group by a
) where explain like '%Skip merging: %'
settings force_aggregate_partitions_independently = 0, max_threads = 4;
drop table t17;
create table t18(a UInt32, b UInt32) engine=MergeTree order by a partition by a;
insert into t18 select number, number from numbers_mt(50);
select replaceRegexpOne(explain, '^[ ]*(.*)', '\\1') from (
explain actions=1 select a1 from t18 group by intDiv(a, 2) as a1
) where explain like '%Skip merging: %';
drop table t18;
create table t19(a UInt32, b UInt32) engine=MergeTree order by a partition by a;
insert into t19 select number, number from numbers_mt(50);
select replaceRegexpOne(explain, '^[ ]*(.*)', '\\1') from (
explain actions=1 select a1 from t19 group by blockNumber() as a1
) where explain like '%Skip merging: %';
drop table t19;
create table t20(a UInt32, b UInt32) engine=MergeTree order by a partition by a;
insert into t20 select number, number from numbers_mt(50);
select replaceRegexpOne(explain, '^[ ]*(.*)', '\\1') from (
explain actions=1 select a1 from t20 group by rand(a) as a1
) where explain like '%Skip merging: %';
drop table t20;
create table t21(a UInt64, b UInt64) engine=MergeTree order by a partition by a % 16;
insert into t21 select number, number from numbers_mt(1e6);
select a from t21 group by a limit 10 format Null;
drop table t21;
create table t22(a UInt32, b UInt32) engine=SummingMergeTree order by a partition by a % 16;
insert into t22 select number, number from numbers_mt(1e6);
select replaceRegexpOne(explain, '^[ ]*(.*)', '\\1') from (
explain actions=1 select a from t22 final group by a
) where explain like '%Skip merging: %';
drop table t22;