Merge branch 'ClickHouse:master' into sandbox_

This commit is contained in:
Yarik Briukhovetskyi 2024-08-30 00:58:38 +02:00 committed by GitHub
commit 69c6419f6d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 655 additions and 124 deletions

View File

@ -62,6 +62,7 @@ Other upcoming meetups
* [Oslo Meetup](https://www.meetup.com/open-source-real-time-data-warehouse-real-time-analytics/events/302938622) - October 31
* [Ghent Meetup](https://www.meetup.com/clickhouse-belgium-user-group/events/303049405/) - November 19
* [Dubai Meetup](https://www.meetup.com/clickhouse-dubai-meetup-group/events/303096989/) - November 21
* [Paris Meetup](https://www.meetup.com/clickhouse-france-user-group/events/303096434) - November 26

View File

@ -8,7 +8,7 @@ title: "CREATE ROW POLICY"
Creates a [row policy](../../../guides/sre/user-management/index.md#row-policy-management), i.e. a filter used to determine which rows a user can read from a table.
:::tip
Row policies makes sense only for users with readonly access. If user can modify table or copy partitions between tables, it defeats the restrictions of row policies.
Row policies make sense only for users with readonly access. If a user can modify a table or copy partitions between tables, it defeats the restrictions of row policies.
:::
Syntax:
@ -24,40 +24,40 @@ CREATE [ROW] POLICY [IF NOT EXISTS | OR REPLACE] policy_name1 [ON CLUSTER cluste
## USING Clause
Allows to specify a condition to filter rows. An user will see a row if the condition is calculated to non-zero for the row.
Allows specifying a condition to filter rows. A user will see a row if the condition is calculated to non-zero for the row.
## TO Clause
In the section `TO` you can provide a list of users and roles this policy should work for. For example, `CREATE ROW POLICY ... TO accountant, john@localhost`.
In the `TO` section you can provide a list of users and roles this policy should work for. For example, `CREATE ROW POLICY ... TO accountant, john@localhost`.
Keyword `ALL` means all the ClickHouse users including current user. Keyword `ALL EXCEPT` allow to exclude some users from the all users list, for example, `CREATE ROW POLICY ... TO ALL EXCEPT accountant, john@localhost`
Keyword `ALL` means all the ClickHouse users, including current user. Keyword `ALL EXCEPT` allows excluding some users from the all users list, for example, `CREATE ROW POLICY ... TO ALL EXCEPT accountant, john@localhost`
:::note
If there are no row policies defined for a table then any user can `SELECT` all the row from the table. Defining one or more row policies for the table makes the access to the table depending on the row policies no matter if those row policies are defined for the current user or not. For example, the following policy
If there are no row policies defined for a table, then any user can `SELECT` all the rows from the table. Defining one or more row policies for the table makes access to the table dependent on the row policies, no matter if those row policies are defined for the current user or not. For example, the following policy:
`CREATE ROW POLICY pol1 ON mydb.table1 USING b=1 TO mira, peter`
forbids the users `mira` and `peter` to see the rows with `b != 1`, and any non-mentioned user (e.g., the user `paul`) will see no rows from `mydb.table1` at all.
forbids the users `mira` and `peter` from seeing the rows with `b != 1`, and any non-mentioned user (e.g., the user `paul`) will see no rows from `mydb.table1` at all.
If that's not desirable it can't be fixed by adding one more row policy, like the following:
If that's not desirable, it can be fixed by adding one more row policy, like the following:
`CREATE ROW POLICY pol2 ON mydb.table1 USING 1 TO ALL EXCEPT mira, peter`
:::
## AS Clause
It's allowed to have more than one policy enabled on the same table for the same user at the one time. So we need a way to combine the conditions from multiple policies.
It's allowed to have more than one policy enabled on the same table for the same user at one time. So we need a way to combine the conditions from multiple policies.
By default policies are combined using the boolean `OR` operator. For example, the following policies
By default, policies are combined using the boolean `OR` operator. For example, the following policies:
``` sql
CREATE ROW POLICY pol1 ON mydb.table1 USING b=1 TO mira, peter
CREATE ROW POLICY pol2 ON mydb.table1 USING c=2 TO peter, antonio
```
enables the user `peter` to see rows with either `b=1` or `c=2`.
enable the user `peter` to see rows with either `b=1` or `c=2`.
The `AS` clause specifies how policies should be combined with other policies. Policies can be either permissive or restrictive. By default policies are permissive, which means they are combined using the boolean `OR` operator.
The `AS` clause specifies how policies should be combined with other policies. Policies can be either permissive or restrictive. By default, policies are permissive, which means they are combined using the boolean `OR` operator.
A policy can be defined as restrictive as an alternative. Restrictive policies are combined using the boolean `AND` operator.
@ -68,25 +68,25 @@ row_is_visible = (one or more of the permissive policies' conditions are non-zer
(all of the restrictive policies's conditions are non-zero)
```
For example, the following policies
For example, the following policies:
``` sql
CREATE ROW POLICY pol1 ON mydb.table1 USING b=1 TO mira, peter
CREATE ROW POLICY pol2 ON mydb.table1 USING c=2 AS RESTRICTIVE TO peter, antonio
```
enables the user `peter` to see rows only if both `b=1` AND `c=2`.
enable the user `peter` to see rows only if both `b=1` AND `c=2`.
Database policies are combined with table policies.
For example, the following policies
For example, the following policies:
``` sql
CREATE ROW POLICY pol1 ON mydb.* USING b=1 TO mira, peter
CREATE ROW POLICY pol2 ON mydb.table1 USING c=2 AS RESTRICTIVE TO peter, antonio
```
enables the user `peter` to see table1 rows only if both `b=1` AND `c=2`, although
enable the user `peter` to see table1 rows only if both `b=1` AND `c=2`, although
any other table in mydb would have only `b=1` policy applied for the user.

View File

@ -280,7 +280,7 @@ SYSTEM START REPLICATION QUEUES [ON CLUSTER cluster_name] [[db.]replicated_merge
Ждет когда таблица семейства `ReplicatedMergeTree` будет синхронизирована с другими репликами в кластере, но не более `receive_timeout` секунд:
``` sql
SYSTEM SYNC REPLICA [db.]replicated_merge_tree_family_table_name [STRICT | LIGHTWEIGHT [FROM 'srcReplica1'[, 'srcReplica2'[, ...]]] | PULL]
SYSTEM SYNC REPLICA [ON CLUSTER cluster_name] [db.]replicated_merge_tree_family_table_name [STRICT | LIGHTWEIGHT [FROM 'srcReplica1'[, 'srcReplica2'[, ...]]] | PULL]
```
После выполнения этого запроса таблица `[db.]replicated_merge_tree_family_table_name` загружает команды из общего реплицированного лога в свою собственную очередь репликации. Затем запрос ждет, пока реплика не обработает все загруженные команды. Поддерживаются следующие модификаторы:

View File

@ -209,7 +209,7 @@ std::map<std::pair<TypeIndex, String>, NodeToSubcolumnTransformer> node_transfor
},
};
std::tuple<FunctionNode *, ColumnNode *, TableNode *> getTypedNodesForOptimization(const QueryTreeNodePtr & node)
std::tuple<FunctionNode *, ColumnNode *, TableNode *> getTypedNodesForOptimization(const QueryTreeNodePtr & node, const ContextPtr & context)
{
auto * function_node = node->as<FunctionNode>();
if (!function_node)
@ -232,6 +232,12 @@ std::tuple<FunctionNode *, ColumnNode *, TableNode *> getTypedNodesForOptimizati
const auto & storage_snapshot = table_node->getStorageSnapshot();
auto column = first_argument_column_node->getColumn();
/// If view source is set we cannot optimize because it doesn't support moving functions to subcolumns.
/// The storage is replaced to the view source but it happens only after building a query tree and applying passes.
auto view_source = context->getViewSource();
if (view_source && view_source->getStorageID().getFullNameNotQuoted() == storage->getStorageID().getFullNameNotQuoted())
return {};
if (!storage->supportsOptimizationToSubcolumns() || storage->isVirtualColumn(column.name, storage_snapshot->metadata))
return {};
@ -266,7 +272,7 @@ public:
return;
}
auto [function_node, first_argument_node, table_node] = getTypedNodesForOptimization(node);
auto [function_node, first_argument_node, table_node] = getTypedNodesForOptimization(node, getContext());
if (function_node && first_argument_node && table_node)
{
enterImpl(*function_node, *first_argument_node, *table_node);
@ -416,7 +422,7 @@ public:
if (!getSettings().optimize_functions_to_subcolumns)
return;
auto [function_node, first_argument_column_node, table_node] = getTypedNodesForOptimization(node);
auto [function_node, first_argument_column_node, table_node] = getTypedNodesForOptimization(node, getContext());
if (!function_node || !first_argument_column_node || !table_node)
return;

View File

@ -59,6 +59,18 @@ class CompiledAggregateFunctionsHolder;
class NativeWriter;
struct OutputBlockColumns;
struct GroupingSetsParams
{
GroupingSetsParams() = default;
GroupingSetsParams(Names used_keys_, Names missing_keys_) : used_keys(std::move(used_keys_)), missing_keys(std::move(missing_keys_)) { }
Names used_keys;
Names missing_keys;
};
using GroupingSetsParamsList = std::vector<GroupingSetsParams>;
/** How are "total" values calculated with WITH TOTALS?
* (For more details, see TotalsHavingTransform.)
*

View File

@ -347,6 +347,27 @@ bool shouldIgnoreQuotaAndLimits(const StorageID & table_id)
return false;
}
GroupingSetsParamsList getAggregatorGroupingSetsParams(const NamesAndTypesLists & aggregation_keys_list, const Names & all_keys)
{
GroupingSetsParamsList result;
for (const auto & aggregation_keys : aggregation_keys_list)
{
NameSet keys;
for (const auto & key : aggregation_keys)
keys.insert(key.name);
Names missing_keys;
for (const auto & key : all_keys)
if (!keys.contains(key))
missing_keys.push_back(key);
result.emplace_back(aggregation_keys.getNames(), std::move(missing_keys));
}
return result;
}
}
InterpreterSelectQuery::InterpreterSelectQuery(
@ -2005,13 +2026,12 @@ static void executeMergeAggregatedImpl(
bool has_grouping_sets,
const Settings & settings,
const NamesAndTypesList & aggregation_keys,
const NamesAndTypesLists & aggregation_keys_list,
const AggregateDescriptions & aggregates,
bool should_produce_results_in_order_of_bucket_number,
SortDescription group_by_sort_description)
{
auto keys = aggregation_keys.getNames();
if (has_grouping_sets)
keys.insert(keys.begin(), "__grouping_set");
/** There are two modes of distributed aggregation.
*
@ -2029,10 +2049,12 @@ static void executeMergeAggregatedImpl(
*/
Aggregator::Params params(keys, aggregates, overflow_row, settings.max_threads, settings.max_block_size, settings.min_hit_rate_to_use_consecutive_keys_optimization);
auto grouping_sets_params = getAggregatorGroupingSetsParams(aggregation_keys_list, keys);
auto merging_aggregated = std::make_unique<MergingAggregatedStep>(
query_plan.getCurrentDataStream(),
params,
grouping_sets_params,
final,
/// Grouping sets don't work with distributed_aggregation_memory_efficient enabled (#43989)
settings.distributed_aggregation_memory_efficient && is_remote_storage && !has_grouping_sets,
@ -2653,30 +2675,6 @@ static Aggregator::Params getAggregatorParams(
};
}
static GroupingSetsParamsList getAggregatorGroupingSetsParams(const SelectQueryExpressionAnalyzer & query_analyzer, const Names & all_keys)
{
GroupingSetsParamsList result;
if (query_analyzer.useGroupingSetKey())
{
auto const & aggregation_keys_list = query_analyzer.aggregationKeysList();
for (const auto & aggregation_keys : aggregation_keys_list)
{
NameSet keys;
for (const auto & key : aggregation_keys)
keys.insert(key.name);
Names missing_keys;
for (const auto & key : all_keys)
if (!keys.contains(key))
missing_keys.push_back(key);
result.emplace_back(aggregation_keys.getNames(), std::move(missing_keys));
}
}
return result;
}
void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const ActionsAndProjectInputsFlagPtr & expression, bool overflow_row, bool final, InputOrderInfoPtr group_by_info)
{
executeExpression(query_plan, expression, "Before GROUP BY");
@ -2696,7 +2694,7 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
settings.group_by_two_level_threshold,
settings.group_by_two_level_threshold_bytes);
auto grouping_sets_params = getAggregatorGroupingSetsParams(*query_analyzer, keys);
auto grouping_sets_params = getAggregatorGroupingSetsParams(query_analyzer->aggregationKeysList(), keys);
SortDescription group_by_sort_description;
SortDescription sort_description_for_merging;
@ -2764,6 +2762,7 @@ void InterpreterSelectQuery::executeMergeAggregated(QueryPlan & query_plan, bool
has_grouping_sets,
context->getSettingsRef(),
query_analyzer->aggregationKeys(),
query_analyzer->aggregationKeysList(),
query_analyzer->aggregates(),
should_produce_results_in_order_of_bucket_number,
std::move(group_by_sort_description));

View File

@ -504,8 +504,6 @@ void addMergingAggregatedStep(QueryPlan & query_plan,
*/
auto keys = aggregation_analysis_result.aggregation_keys;
if (!aggregation_analysis_result.grouping_sets_parameters_list.empty())
keys.insert(keys.begin(), "__grouping_set");
Aggregator::Params params(keys,
aggregation_analysis_result.aggregate_descriptions,
@ -530,6 +528,7 @@ void addMergingAggregatedStep(QueryPlan & query_plan,
auto merging_aggregated = std::make_unique<MergingAggregatedStep>(
query_plan.getCurrentDataStream(),
params,
aggregation_analysis_result.grouping_sets_parameters_list,
query_analysis_result.aggregate_final,
/// Grouping sets don't work with distributed_aggregation_memory_efficient enabled (#43989)
settings.distributed_aggregation_memory_efficient && (is_remote_storage || parallel_replicas_from_merge_tree) && !query_analysis_result.aggregation_with_rollup_or_cube_or_grouping_sets,

View File

@ -151,6 +151,61 @@ void AggregatingStep::applyOrder(SortDescription sort_description_for_merging_,
explicit_sorting_required_for_aggregation_in_order = false;
}
ActionsDAG AggregatingStep::makeCreatingMissingKeysForGroupingSetDAG(
const Block & in_header,
const Block & out_header,
const GroupingSetsParamsList & grouping_sets_params,
UInt64 group,
bool group_by_use_nulls)
{
/// Here we create a DAG which fills missing keys and adds `__grouping_set` column
ActionsDAG dag(in_header.getColumnsWithTypeAndName());
ActionsDAG::NodeRawConstPtrs outputs;
outputs.reserve(out_header.columns() + 1);
auto grouping_col = ColumnConst::create(ColumnUInt64::create(1, group), 0);
const auto * grouping_node = &dag.addColumn(
{ColumnPtr(std::move(grouping_col)), std::make_shared<DataTypeUInt64>(), "__grouping_set"});
grouping_node = &dag.materializeNode(*grouping_node);
outputs.push_back(grouping_node);
const auto & missing_columns = grouping_sets_params[group].missing_keys;
const auto & used_keys = grouping_sets_params[group].used_keys;
auto to_nullable_function = FunctionFactory::instance().get("toNullable", nullptr);
for (size_t i = 0; i < out_header.columns(); ++i)
{
const auto & col = out_header.getByPosition(i);
const auto missing_it = std::find_if(
missing_columns.begin(), missing_columns.end(), [&](const auto & missing_col) { return missing_col == col.name; });
const auto used_it = std::find_if(
used_keys.begin(), used_keys.end(), [&](const auto & used_col) { return used_col == col.name; });
if (missing_it != missing_columns.end())
{
auto column_with_default = col.column->cloneEmpty();
col.type->insertDefaultInto(*column_with_default);
column_with_default->finalize();
auto column = ColumnConst::create(std::move(column_with_default), 0);
const auto * node = &dag.addColumn({ColumnPtr(std::move(column)), col.type, col.name});
node = &dag.materializeNode(*node);
outputs.push_back(node);
}
else
{
const auto * column_node = dag.getOutputs()[in_header.getPositionByName(col.name)];
if (used_it != used_keys.end() && group_by_use_nulls && column_node->result_type->canBeInsideNullable())
outputs.push_back(&dag.addFunction(to_nullable_function, { column_node }, col.name));
else
outputs.push_back(column_node);
}
}
dag.getOutputs().swap(outputs);
return dag;
}
void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings)
{
QueryPipelineProcessorsCollector collector(pipeline, this);
@ -300,51 +355,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
{
const auto & header = ports[set_counter]->getHeader();
/// Here we create a DAG which fills missing keys and adds `__grouping_set` column
ActionsDAG dag(header.getColumnsWithTypeAndName());
ActionsDAG::NodeRawConstPtrs outputs;
outputs.reserve(output_header.columns() + 1);
auto grouping_col = ColumnConst::create(ColumnUInt64::create(1, set_counter), 0);
const auto * grouping_node = &dag.addColumn(
{ColumnPtr(std::move(grouping_col)), std::make_shared<DataTypeUInt64>(), "__grouping_set"});
grouping_node = &dag.materializeNode(*grouping_node);
outputs.push_back(grouping_node);
const auto & missing_columns = grouping_sets_params[set_counter].missing_keys;
const auto & used_keys = grouping_sets_params[set_counter].used_keys;
auto to_nullable_function = FunctionFactory::instance().get("toNullable", nullptr);
for (size_t i = 0; i < output_header.columns(); ++i)
{
auto & col = output_header.getByPosition(i);
const auto missing_it = std::find_if(
missing_columns.begin(), missing_columns.end(), [&](const auto & missing_col) { return missing_col == col.name; });
const auto used_it = std::find_if(
used_keys.begin(), used_keys.end(), [&](const auto & used_col) { return used_col == col.name; });
if (missing_it != missing_columns.end())
{
auto column_with_default = col.column->cloneEmpty();
col.type->insertDefaultInto(*column_with_default);
column_with_default->finalize();
auto column = ColumnConst::create(std::move(column_with_default), 0);
const auto * node = &dag.addColumn({ColumnPtr(std::move(column)), col.type, col.name});
node = &dag.materializeNode(*node);
outputs.push_back(node);
}
else
{
const auto * column_node = dag.getOutputs()[header.getPositionByName(col.name)];
if (used_it != used_keys.end() && group_by_use_nulls && column_node->result_type->canBeInsideNullable())
outputs.push_back(&dag.addFunction(to_nullable_function, { column_node }, col.name));
else
outputs.push_back(column_node);
}
}
dag.getOutputs().swap(outputs);
auto dag = makeCreatingMissingKeysForGroupingSetDAG(header, output_header, grouping_sets_params, set_counter, group_by_use_nulls);
auto expression = std::make_shared<ExpressionActions>(std::move(dag), settings.getActionsSettings());
auto transform = std::make_shared<ExpressionTransform>(header, expression);

View File

@ -7,18 +7,6 @@
namespace DB
{
struct GroupingSetsParams
{
GroupingSetsParams() = default;
GroupingSetsParams(Names used_keys_, Names missing_keys_) : used_keys(std::move(used_keys_)), missing_keys(std::move(missing_keys_)) { }
Names used_keys;
Names missing_keys;
};
using GroupingSetsParamsList = std::vector<GroupingSetsParams>;
Block appendGroupingSetColumn(Block header);
Block generateOutputHeader(const Block & input_header, const Names & keys, bool use_nulls);
@ -77,6 +65,13 @@ public:
/// Argument input_stream would be the second input (from projection).
std::unique_ptr<AggregatingProjectionStep> convertToAggregatingProjection(const DataStream & input_stream) const;
static ActionsDAG makeCreatingMissingKeysForGroupingSetDAG(
const Block & in_header,
const Block & out_header,
const GroupingSetsParamsList & grouping_sets_params,
UInt64 group,
bool group_by_use_nulls);
private:
void updateOutputStream() override;

View File

@ -10,6 +10,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
static bool memoryBoundMergingWillBeUsed(
const DataStream & input_stream,
bool memory_bound_merging_of_aggregation_results_enabled,
@ -37,6 +42,7 @@ static ITransformingStep::Traits getTraits(bool should_produce_results_in_order_
MergingAggregatedStep::MergingAggregatedStep(
const DataStream & input_stream_,
Aggregator::Params params_,
GroupingSetsParamsList grouping_sets_params_,
bool final_,
bool memory_efficient_aggregation_,
size_t max_threads_,
@ -48,9 +54,10 @@ MergingAggregatedStep::MergingAggregatedStep(
bool memory_bound_merging_of_aggregation_results_enabled_)
: ITransformingStep(
input_stream_,
params_.getHeader(input_stream_.header, final_),
MergingAggregatedTransform::appendGroupingIfNeeded(input_stream_.header, params_.getHeader(input_stream_.header, final_)),
getTraits(should_produce_results_in_order_of_bucket_number_))
, params(std::move(params_))
, grouping_sets_params(std::move(grouping_sets_params_))
, final(final_)
, memory_efficient_aggregation(memory_efficient_aggregation_)
, max_threads(max_threads_)
@ -89,10 +96,13 @@ void MergingAggregatedStep::applyOrder(SortDescription sort_description, DataStr
void MergingAggregatedStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
auto transform_params = std::make_shared<AggregatingTransformParams>(pipeline.getHeader(), std::move(params), final);
if (memoryBoundMergingWillBeUsed())
{
if (input_streams.front().header.has("__grouping_set") || !grouping_sets_params.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Memory bound merging of aggregated results is not supported for grouping sets.");
auto transform_params = std::make_shared<AggregatingTransformParams>(pipeline.getHeader(), std::move(params), final);
auto transform = std::make_shared<FinishAggregatingInOrderTransform>(
pipeline.getHeader(),
pipeline.getNumStreams(),
@ -127,15 +137,19 @@ void MergingAggregatedStep::transformPipeline(QueryPipelineBuilder & pipeline, c
pipeline.resize(1);
/// Now merge the aggregated blocks
pipeline.addSimpleTransform([&](const Block & header)
{ return std::make_shared<MergingAggregatedTransform>(header, transform_params, max_threads); });
auto transform = std::make_shared<MergingAggregatedTransform>(pipeline.getHeader(), params, final, grouping_sets_params, max_threads);
pipeline.addTransform(std::move(transform));
}
else
{
if (input_streams.front().header.has("__grouping_set") || !grouping_sets_params.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Memory efficient merging of aggregated results is not supported for grouping sets.");
auto num_merge_threads = memory_efficient_merge_threads
? memory_efficient_merge_threads
: max_threads;
auto transform_params = std::make_shared<AggregatingTransformParams>(pipeline.getHeader(), std::move(params), final);
pipeline.addMergingAggregatedMemoryEfficientTransform(transform_params, num_merge_threads);
}
@ -154,7 +168,9 @@ void MergingAggregatedStep::describeActions(JSONBuilder::JSONMap & map) const
void MergingAggregatedStep::updateOutputStream()
{
output_stream = createOutputStream(input_streams.front(), params.getHeader(input_streams.front().header, final), getDataStreamTraits());
const auto & in_header = input_streams.front().header;
output_stream = createOutputStream(input_streams.front(),
MergingAggregatedTransform::appendGroupingIfNeeded(in_header, params.getHeader(in_header, final)), getDataStreamTraits());
if (is_order_overwritten) /// overwrite order again
applyOrder(group_by_sort_description, overwritten_sort_scope);
}

View File

@ -16,6 +16,7 @@ public:
MergingAggregatedStep(
const DataStream & input_stream_,
Aggregator::Params params_,
GroupingSetsParamsList grouping_sets_params_,
bool final_,
bool memory_efficient_aggregation_,
size_t max_threads_,
@ -43,6 +44,7 @@ private:
Aggregator::Params params;
GroupingSetsParamsList grouping_sets_params;
bool final;
bool memory_efficient_aggregation;
size_t max_threads;

View File

@ -1,7 +1,10 @@
#include <Processors/Transforms/MergingAggregatedTransform.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Processors/Transforms/AggregatingInOrderTransform.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include <Common/logger_useful.h>
#include <Interpreters/ExpressionActions.h>
#include <DataTypes/DataTypesNumber.h>
namespace DB
{
@ -10,11 +13,192 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
MergingAggregatedTransform::MergingAggregatedTransform(
Block header_, AggregatingTransformParamsPtr params_, size_t max_threads_)
: IAccumulatingTransform(std::move(header_), params_->getHeader())
, params(std::move(params_)), max_threads(max_threads_)
Block MergingAggregatedTransform::appendGroupingIfNeeded(const Block & in_header, Block out_header)
{
/// __grouping_set is neither GROUP BY key nor an aggregate function.
/// It behaves like a GROUP BY key, but we cannot append it to keys
/// because it changes hashing method and buckets for two level aggregation.
/// Now, this column is processed "manually" by merging each group separately.
if (in_header.has("__grouping_set"))
out_header.insert(0, in_header.getByName("__grouping_set"));
return out_header;
}
/// We should keep the order for GROUPING SET keys.
/// Initiator creates a separate Aggregator for every group, so should we do here.
/// Otherwise, two-level aggregation will split the data into different buckets,
/// and the result may have duplicating rows.
static ActionsDAG makeReorderingActions(const Block & in_header, const GroupingSetsParams & params)
{
ActionsDAG reordering(in_header.getColumnsWithTypeAndName());
auto & outputs = reordering.getOutputs();
ActionsDAG::NodeRawConstPtrs new_outputs;
new_outputs.reserve(in_header.columns() + params.used_keys.size() - params.used_keys.size());
std::unordered_map<std::string_view, size_t> index;
for (size_t pos = 0; pos < outputs.size(); ++pos)
index.emplace(outputs[pos]->result_name, pos);
for (const auto & used_name : params.used_keys)
{
auto & idx = index[used_name];
new_outputs.push_back(outputs[idx]);
}
for (const auto & used_name : params.used_keys)
index[used_name] = outputs.size();
for (const auto & missing_name : params.missing_keys)
index[missing_name] = outputs.size();
for (const auto * output : outputs)
{
if (index[output->result_name] != outputs.size())
new_outputs.push_back(output);
}
outputs.swap(new_outputs);
return reordering;
}
MergingAggregatedTransform::~MergingAggregatedTransform() = default;
MergingAggregatedTransform::MergingAggregatedTransform(
Block header_,
Aggregator::Params params,
bool final,
GroupingSetsParamsList grouping_sets_params,
size_t max_threads_)
: IAccumulatingTransform(header_, appendGroupingIfNeeded(header_, params.getHeader(header_, final)))
, max_threads(max_threads_)
{
if (!grouping_sets_params.empty())
{
if (!header_.has("__grouping_set"))
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot find __grouping_set column in header of MergingAggregatedTransform with grouping sets."
"Header {}", header_.dumpStructure());
auto in_header = header_;
in_header.erase(header_.getPositionByName("__grouping_set"));
auto out_header = params.getHeader(header_, final);
grouping_sets.reserve(grouping_sets_params.size());
for (const auto & grouping_set_params : grouping_sets_params)
{
size_t group = grouping_sets.size();
auto reordering = makeReorderingActions(in_header, grouping_set_params);
Aggregator::Params set_params(grouping_set_params.used_keys,
params.aggregates,
params.overflow_row,
params.max_threads,
params.max_block_size,
params.min_hit_rate_to_use_consecutive_keys_optimization);
auto transform_params = std::make_shared<AggregatingTransformParams>(reordering.updateHeader(in_header), std::move(set_params), final);
auto creating = AggregatingStep::makeCreatingMissingKeysForGroupingSetDAG(
transform_params->getHeader(),
out_header,
grouping_sets_params, group, false);
auto & groupiung_set = grouping_sets.emplace_back();
groupiung_set.reordering_key_columns_actions = std::make_shared<ExpressionActions>(std::move(reordering));
groupiung_set.creating_missing_keys_actions = std::make_shared<ExpressionActions>(std::move(creating));
groupiung_set.params = std::move(transform_params);
}
}
else
{
auto & groupiung_set = grouping_sets.emplace_back();
groupiung_set.params = std::make_shared<AggregatingTransformParams>(header_, std::move(params), final);
}
}
void MergingAggregatedTransform::addBlock(Block block)
{
if (grouping_sets.size() == 1)
{
auto bucket = block.info.bucket_num;
if (grouping_sets[0].reordering_key_columns_actions)
grouping_sets[0].reordering_key_columns_actions->execute(block);
grouping_sets[0].bucket_to_blocks[bucket].emplace_back(std::move(block));
return;
}
auto grouping_position = block.getPositionByName("__grouping_set");
auto grouping_column = block.getByPosition(grouping_position).column;
block.erase(grouping_position);
/// Split a block by __grouping_set values.
const auto * grouping_column_typed = typeid_cast<const ColumnUInt64 *>(grouping_column.get());
if (!grouping_column_typed)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected UInt64 column for __grouping_set, got {}", grouping_column->getName());
IColumn::Selector selector;
const auto & grouping_data = grouping_column_typed->getData();
size_t num_rows = grouping_data.size();
UInt64 last_group = grouping_data[0];
UInt64 max_group = last_group;
for (size_t row = 1; row < num_rows; ++row)
{
auto group = grouping_data[row];
/// Optimization for equal ranges.
if (last_group == group)
continue;
/// Optimization for single group.
if (selector.empty())
selector.reserve(num_rows);
/// Fill the last equal range.
selector.resize_fill(row, last_group);
last_group = group;
max_group = std::max(last_group, max_group);
}
if (max_group >= grouping_sets.size())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Invalid group number {}. Number of groups {}.", last_group, grouping_sets.size());
/// Optimization for single group.
if (selector.empty())
{
auto bucket = block.info.bucket_num;
grouping_sets[last_group].reordering_key_columns_actions->execute(block);
grouping_sets[last_group].bucket_to_blocks[bucket].emplace_back(std::move(block));
return;
}
/// Fill the last equal range.
selector.resize_fill(num_rows, last_group);
const size_t num_groups = max_group + 1;
Blocks splitted_blocks(num_groups);
for (size_t group_id = 0; group_id < num_groups; ++group_id)
splitted_blocks[group_id] = block.cloneEmpty();
size_t columns_in_block = block.columns();
for (size_t col_idx_in_block = 0; col_idx_in_block < columns_in_block; ++col_idx_in_block)
{
MutableColumns splitted_columns = block.getByPosition(col_idx_in_block).column->scatter(num_groups, selector);
for (size_t group_id = 0; group_id < num_groups; ++group_id)
splitted_blocks[group_id].getByPosition(col_idx_in_block).column = std::move(splitted_columns[group_id]);
}
for (size_t group = 0; group < num_groups; ++group)
{
auto & splitted_block = splitted_blocks[group];
splitted_block.info = block.info;
grouping_sets[group].reordering_key_columns_actions->execute(splitted_block);
grouping_sets[group].bucket_to_blocks[block.info.bucket_num].emplace_back(std::move(splitted_block));
}
}
void MergingAggregatedTransform::consume(Chunk chunk)
@ -46,7 +230,7 @@ void MergingAggregatedTransform::consume(Chunk chunk)
block.info.is_overflows = agg_info->is_overflows;
block.info.bucket_num = agg_info->bucket_num;
bucket_to_blocks[agg_info->bucket_num].emplace_back(std::move(block));
addBlock(std::move(block));
}
else if (chunk.getChunkInfos().get<ChunkInfoWithAllocatedBytes>())
{
@ -54,7 +238,7 @@ void MergingAggregatedTransform::consume(Chunk chunk)
block.info.is_overflows = false;
block.info.bucket_num = -1;
bucket_to_blocks[block.info.bucket_num].emplace_back(std::move(block));
addBlock(std::move(block));
}
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk should have AggregatedChunkInfo in MergingAggregatedTransform.");
@ -70,9 +254,23 @@ Chunk MergingAggregatedTransform::generate()
/// Exception safety. Make iterator valid in case any method below throws.
next_block = blocks.begin();
/// TODO: this operation can be made async. Add async for IAccumulatingTransform.
params->aggregator.mergeBlocks(std::move(bucket_to_blocks), data_variants, max_threads, is_cancelled);
blocks = params->aggregator.convertToBlocks(data_variants, params->final, max_threads);
for (auto & grouping_set : grouping_sets)
{
auto & params = grouping_set.params;
auto & bucket_to_blocks = grouping_set.bucket_to_blocks;
AggregatedDataVariants data_variants;
/// TODO: this operation can be made async. Add async for IAccumulatingTransform.
params->aggregator.mergeBlocks(std::move(bucket_to_blocks), data_variants, max_threads, is_cancelled);
auto merged_blocks = params->aggregator.convertToBlocks(data_variants, params->final, max_threads);
if (grouping_set.creating_missing_keys_actions)
for (auto & block : merged_blocks)
grouping_set.creating_missing_keys_actions->execute(block);
blocks.splice(blocks.end(), std::move(merged_blocks));
}
next_block = blocks.begin();
}

View File

@ -6,26 +6,46 @@
namespace DB
{
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
/** A pre-aggregate stream of blocks in which each block is already aggregated.
* Aggregate functions in blocks should not be finalized so that their states can be merged.
*/
class MergingAggregatedTransform : public IAccumulatingTransform
{
public:
MergingAggregatedTransform(Block header_, AggregatingTransformParamsPtr params_, size_t max_threads_);
MergingAggregatedTransform(
Block header_,
Aggregator::Params params_,
bool final_,
GroupingSetsParamsList grouping_sets_params,
size_t max_threads_);
~MergingAggregatedTransform() override;
String getName() const override { return "MergingAggregatedTransform"; }
static Block appendGroupingIfNeeded(const Block & in_header, Block out_header);
protected:
void consume(Chunk chunk) override;
Chunk generate() override;
private:
AggregatingTransformParamsPtr params;
LoggerPtr log = getLogger("MergingAggregatedTransform");
size_t max_threads;
AggregatedDataVariants data_variants;
Aggregator::BucketToBlocks bucket_to_blocks;
struct GroupingSet
{
Aggregator::BucketToBlocks bucket_to_blocks;
ExpressionActionsPtr reordering_key_columns_actions;
ExpressionActionsPtr creating_missing_keys_actions;
AggregatingTransformParamsPtr params;
};
using GroupingSets = std::vector<GroupingSet>;
GroupingSets grouping_sets;
UInt64 total_input_rows = 0;
UInt64 total_input_blocks = 0;
@ -35,6 +55,8 @@ private:
bool consume_started = false;
bool generate_started = false;
void addBlock(Block block);
};
}

View File

@ -11,3 +11,215 @@
0 6 4
1 10 4
2 14 4
-- { echo On }
SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
1 ['.']
2 ['.','.']
2 ['.','.','.']
2 ['.','.','.','.']
2 ['.','.','.','.','.']
2 ['.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.','.']
SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
1 ['.']
1 ['.']
2 ['.','.']
2 ['.','.']
2 ['.','.','.']
2 ['.','.','.']
2 ['.','.','.','.']
2 ['.','.','.','.']
2 ['.','.','.','.','.']
2 ['.','.','.','.','.']
2 ['.','.','.','.','.','.']
2 ['.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.','.']
SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
1 1
2 2
2 3
2 4
2 5
2 6
2 7
2 8
2 9
SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
1 1
1 1
2 2
2 2
2 3
2 3
2 4
2 4
2 5
2 5
2 6
2 6
2 7
2 7
2 8
2 8
2 9
2 9
SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
1 1
1 1
2 2
2 2
2 3
2 3
2 4
2 4
2 5
2 5
2 6
2 6
2 7
2 7
2 8
2 8
2 9
2 9
SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k), (k, number + 2)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
1 1
1 1
1 1
2 2
2 2
2 2
2 3
2 3
2 3
2 4
2 4
2 4
2 5
2 5
2 5
2 6
2 6
2 6
2 7
2 7
2 7
2 8
2 8
2 8
2 9
2 9
2 9
SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
2 ['.']
2 ['.','.']
2 ['.','.','.']
2 ['.','.','.','.']
2 ['.','.','.','.','.']
2 ['.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.','.']
SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
2 ['.']
2 ['.']
2 ['.','.']
2 ['.','.']
2 ['.','.','.']
2 ['.','.','.']
2 ['.','.','.','.']
2 ['.','.','.','.']
2 ['.','.','.','.','.']
2 ['.','.','.','.','.']
2 ['.','.','.','.','.','.']
2 ['.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.','.']
SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
2 1
2 2
2 3
2 4
2 5
2 6
2 7
2 8
2 9
SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
2 1
2 1
2 2
2 2
2 3
2 3
2 4
2 4
2 5
2 5
2 6
2 6
2 7
2 7
2 8
2 8
2 9
2 9
SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
2 1
2 1
2 2
2 2
2 3
2 3
2 4
2 4
2 5
2 5
2 6
2 6
2 7
2 7
2 8
2 8
2 9
2 9
SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k), (k, number + 2)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
2 1
2 1
2 1
2 2
2 2
2 2
2 3
2 3
2 3
2 4
2 4
2 4
2 5
2 5
2 5
2 6
2 6
2 6
2 7
2 7
2 7
2 8
2 8
2 8
2 9
2 9
2 9

View File

@ -43,3 +43,23 @@ GROUP BY
ORDER BY
sum_value ASC,
count_value ASC;
set prefer_localhost_replica = 1;
-- { echo On }
SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k), (k, number + 2)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k), (k, number + 2)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;

View File

@ -0,0 +1 @@
['key1','key2'] ['value1','value2']

View File

@ -0,0 +1,37 @@
DROP TABLE IF EXISTS rawtable;
DROP TABLE IF EXISTS raw_to_attributes_mv;
DROP TABLE IF EXISTS attributes;
SET optimize_functions_to_subcolumns = 1;
CREATE TABLE rawtable
(
`Attributes` Map(String, String),
)
ENGINE = MergeTree
ORDER BY tuple();
CREATE MATERIALIZED VIEW raw_to_attributes_mv TO attributes
(
`AttributeKeys` Array(String),
`AttributeValues` Array(String)
)
AS SELECT
mapKeys(Attributes) AS AttributeKeys,
mapValues(Attributes) AS AttributeValues
FROM rawtable;
CREATE TABLE attributes
(
`AttributeKeys` Array(String),
`AttributeValues` Array(String)
)
ENGINE = ReplacingMergeTree
ORDER BY tuple();
INSERT INTO rawtable VALUES ({'key1': 'value1', 'key2': 'value2'});
SELECT * FROM raw_to_attributes_mv ORDER BY AttributeKeys;
DROP TABLE IF EXISTS rawtable;
DROP TABLE IF EXISTS raw_to_attributes_mv;
DROP TABLE IF EXISTS attributes;