Merge branch 'master' of github.com:ClickHouse/ClickHouse into vector-index

This commit is contained in:
flynn 2024-08-30 03:19:29 +00:00
commit a6fdc81bdc
18 changed files with 656 additions and 125 deletions

View File

@ -62,6 +62,7 @@ Other upcoming meetups
* [Oslo Meetup](https://www.meetup.com/open-source-real-time-data-warehouse-real-time-analytics/events/302938622) - October 31 * [Oslo Meetup](https://www.meetup.com/open-source-real-time-data-warehouse-real-time-analytics/events/302938622) - October 31
* [Ghent Meetup](https://www.meetup.com/clickhouse-belgium-user-group/events/303049405/) - November 19 * [Ghent Meetup](https://www.meetup.com/clickhouse-belgium-user-group/events/303049405/) - November 19
* [Dubai Meetup](https://www.meetup.com/clickhouse-dubai-meetup-group/events/303096989/) - November 21 * [Dubai Meetup](https://www.meetup.com/clickhouse-dubai-meetup-group/events/303096989/) - November 21
* [Paris Meetup](https://www.meetup.com/clickhouse-france-user-group/events/303096434) - November 26

2
contrib/libfiu vendored

@ -1 +1 @@
Subproject commit b85edbde4cf974b1b40d27828a56f0505f4e2ee5 Subproject commit a1290d8cd3d7b4541d6c976e0a54f572ac03f2a3

View File

@ -8,7 +8,7 @@ title: "CREATE ROW POLICY"
Creates a [row policy](../../../guides/sre/user-management/index.md#row-policy-management), i.e. a filter used to determine which rows a user can read from a table. Creates a [row policy](../../../guides/sre/user-management/index.md#row-policy-management), i.e. a filter used to determine which rows a user can read from a table.
:::tip :::tip
Row policies makes sense only for users with readonly access. If user can modify table or copy partitions between tables, it defeats the restrictions of row policies. Row policies make sense only for users with readonly access. If a user can modify a table or copy partitions between tables, it defeats the restrictions of row policies.
::: :::
Syntax: Syntax:
@ -24,40 +24,40 @@ CREATE [ROW] POLICY [IF NOT EXISTS | OR REPLACE] policy_name1 [ON CLUSTER cluste
## USING Clause ## USING Clause
Allows to specify a condition to filter rows. An user will see a row if the condition is calculated to non-zero for the row. Allows specifying a condition to filter rows. A user will see a row if the condition is calculated to non-zero for the row.
## TO Clause ## TO Clause
In the section `TO` you can provide a list of users and roles this policy should work for. For example, `CREATE ROW POLICY ... TO accountant, john@localhost`. In the `TO` section you can provide a list of users and roles this policy should work for. For example, `CREATE ROW POLICY ... TO accountant, john@localhost`.
Keyword `ALL` means all the ClickHouse users including current user. Keyword `ALL EXCEPT` allow to exclude some users from the all users list, for example, `CREATE ROW POLICY ... TO ALL EXCEPT accountant, john@localhost` Keyword `ALL` means all the ClickHouse users, including current user. Keyword `ALL EXCEPT` allows excluding some users from the all users list, for example, `CREATE ROW POLICY ... TO ALL EXCEPT accountant, john@localhost`
:::note :::note
If there are no row policies defined for a table then any user can `SELECT` all the row from the table. Defining one or more row policies for the table makes the access to the table depending on the row policies no matter if those row policies are defined for the current user or not. For example, the following policy If there are no row policies defined for a table, then any user can `SELECT` all the rows from the table. Defining one or more row policies for the table makes access to the table dependent on the row policies, no matter if those row policies are defined for the current user or not. For example, the following policy:
`CREATE ROW POLICY pol1 ON mydb.table1 USING b=1 TO mira, peter` `CREATE ROW POLICY pol1 ON mydb.table1 USING b=1 TO mira, peter`
forbids the users `mira` and `peter` to see the rows with `b != 1`, and any non-mentioned user (e.g., the user `paul`) will see no rows from `mydb.table1` at all. forbids the users `mira` and `peter` from seeing the rows with `b != 1`, and any non-mentioned user (e.g., the user `paul`) will see no rows from `mydb.table1` at all.
If that's not desirable it can't be fixed by adding one more row policy, like the following: If that's not desirable, it can be fixed by adding one more row policy, like the following:
`CREATE ROW POLICY pol2 ON mydb.table1 USING 1 TO ALL EXCEPT mira, peter` `CREATE ROW POLICY pol2 ON mydb.table1 USING 1 TO ALL EXCEPT mira, peter`
::: :::
## AS Clause ## AS Clause
It's allowed to have more than one policy enabled on the same table for the same user at the one time. So we need a way to combine the conditions from multiple policies. It's allowed to have more than one policy enabled on the same table for the same user at one time. So we need a way to combine the conditions from multiple policies.
By default policies are combined using the boolean `OR` operator. For example, the following policies By default, policies are combined using the boolean `OR` operator. For example, the following policies:
``` sql ``` sql
CREATE ROW POLICY pol1 ON mydb.table1 USING b=1 TO mira, peter CREATE ROW POLICY pol1 ON mydb.table1 USING b=1 TO mira, peter
CREATE ROW POLICY pol2 ON mydb.table1 USING c=2 TO peter, antonio CREATE ROW POLICY pol2 ON mydb.table1 USING c=2 TO peter, antonio
``` ```
enables the user `peter` to see rows with either `b=1` or `c=2`. enable the user `peter` to see rows with either `b=1` or `c=2`.
The `AS` clause specifies how policies should be combined with other policies. Policies can be either permissive or restrictive. By default policies are permissive, which means they are combined using the boolean `OR` operator. The `AS` clause specifies how policies should be combined with other policies. Policies can be either permissive or restrictive. By default, policies are permissive, which means they are combined using the boolean `OR` operator.
A policy can be defined as restrictive as an alternative. Restrictive policies are combined using the boolean `AND` operator. A policy can be defined as restrictive as an alternative. Restrictive policies are combined using the boolean `AND` operator.
@ -68,25 +68,25 @@ row_is_visible = (one or more of the permissive policies' conditions are non-zer
(all of the restrictive policies's conditions are non-zero) (all of the restrictive policies's conditions are non-zero)
``` ```
For example, the following policies For example, the following policies:
``` sql ``` sql
CREATE ROW POLICY pol1 ON mydb.table1 USING b=1 TO mira, peter CREATE ROW POLICY pol1 ON mydb.table1 USING b=1 TO mira, peter
CREATE ROW POLICY pol2 ON mydb.table1 USING c=2 AS RESTRICTIVE TO peter, antonio CREATE ROW POLICY pol2 ON mydb.table1 USING c=2 AS RESTRICTIVE TO peter, antonio
``` ```
enables the user `peter` to see rows only if both `b=1` AND `c=2`. enable the user `peter` to see rows only if both `b=1` AND `c=2`.
Database policies are combined with table policies. Database policies are combined with table policies.
For example, the following policies For example, the following policies:
``` sql ``` sql
CREATE ROW POLICY pol1 ON mydb.* USING b=1 TO mira, peter CREATE ROW POLICY pol1 ON mydb.* USING b=1 TO mira, peter
CREATE ROW POLICY pol2 ON mydb.table1 USING c=2 AS RESTRICTIVE TO peter, antonio CREATE ROW POLICY pol2 ON mydb.table1 USING c=2 AS RESTRICTIVE TO peter, antonio
``` ```
enables the user `peter` to see table1 rows only if both `b=1` AND `c=2`, although enable the user `peter` to see table1 rows only if both `b=1` AND `c=2`, although
any other table in mydb would have only `b=1` policy applied for the user. any other table in mydb would have only `b=1` policy applied for the user.

View File

@ -280,7 +280,7 @@ SYSTEM START REPLICATION QUEUES [ON CLUSTER cluster_name] [[db.]replicated_merge
Ждет когда таблица семейства `ReplicatedMergeTree` будет синхронизирована с другими репликами в кластере, но не более `receive_timeout` секунд: Ждет когда таблица семейства `ReplicatedMergeTree` будет синхронизирована с другими репликами в кластере, но не более `receive_timeout` секунд:
``` sql ``` sql
SYSTEM SYNC REPLICA [db.]replicated_merge_tree_family_table_name [STRICT | LIGHTWEIGHT [FROM 'srcReplica1'[, 'srcReplica2'[, ...]]] | PULL] SYSTEM SYNC REPLICA [ON CLUSTER cluster_name] [db.]replicated_merge_tree_family_table_name [STRICT | LIGHTWEIGHT [FROM 'srcReplica1'[, 'srcReplica2'[, ...]]] | PULL]
``` ```
После выполнения этого запроса таблица `[db.]replicated_merge_tree_family_table_name` загружает команды из общего реплицированного лога в свою собственную очередь репликации. Затем запрос ждет, пока реплика не обработает все загруженные команды. Поддерживаются следующие модификаторы: После выполнения этого запроса таблица `[db.]replicated_merge_tree_family_table_name` загружает команды из общего реплицированного лога в свою собственную очередь репликации. Затем запрос ждет, пока реплика не обработает все загруженные команды. Поддерживаются следующие модификаторы:

View File

@ -209,7 +209,7 @@ std::map<std::pair<TypeIndex, String>, NodeToSubcolumnTransformer> node_transfor
}, },
}; };
std::tuple<FunctionNode *, ColumnNode *, TableNode *> getTypedNodesForOptimization(const QueryTreeNodePtr & node) std::tuple<FunctionNode *, ColumnNode *, TableNode *> getTypedNodesForOptimization(const QueryTreeNodePtr & node, const ContextPtr & context)
{ {
auto * function_node = node->as<FunctionNode>(); auto * function_node = node->as<FunctionNode>();
if (!function_node) if (!function_node)
@ -232,6 +232,12 @@ std::tuple<FunctionNode *, ColumnNode *, TableNode *> getTypedNodesForOptimizati
const auto & storage_snapshot = table_node->getStorageSnapshot(); const auto & storage_snapshot = table_node->getStorageSnapshot();
auto column = first_argument_column_node->getColumn(); auto column = first_argument_column_node->getColumn();
/// If view source is set we cannot optimize because it doesn't support moving functions to subcolumns.
/// The storage is replaced to the view source but it happens only after building a query tree and applying passes.
auto view_source = context->getViewSource();
if (view_source && view_source->getStorageID().getFullNameNotQuoted() == storage->getStorageID().getFullNameNotQuoted())
return {};
if (!storage->supportsOptimizationToSubcolumns() || storage->isVirtualColumn(column.name, storage_snapshot->metadata)) if (!storage->supportsOptimizationToSubcolumns() || storage->isVirtualColumn(column.name, storage_snapshot->metadata))
return {}; return {};
@ -266,7 +272,7 @@ public:
return; return;
} }
auto [function_node, first_argument_node, table_node] = getTypedNodesForOptimization(node); auto [function_node, first_argument_node, table_node] = getTypedNodesForOptimization(node, getContext());
if (function_node && first_argument_node && table_node) if (function_node && first_argument_node && table_node)
{ {
enterImpl(*function_node, *first_argument_node, *table_node); enterImpl(*function_node, *first_argument_node, *table_node);
@ -416,7 +422,7 @@ public:
if (!getSettings().optimize_functions_to_subcolumns) if (!getSettings().optimize_functions_to_subcolumns)
return; return;
auto [function_node, first_argument_column_node, table_node] = getTypedNodesForOptimization(node); auto [function_node, first_argument_column_node, table_node] = getTypedNodesForOptimization(node, getContext());
if (!function_node || !first_argument_column_node || !table_node) if (!function_node || !first_argument_column_node || !table_node)
return; return;

View File

@ -59,6 +59,18 @@ class CompiledAggregateFunctionsHolder;
class NativeWriter; class NativeWriter;
struct OutputBlockColumns; struct OutputBlockColumns;
struct GroupingSetsParams
{
GroupingSetsParams() = default;
GroupingSetsParams(Names used_keys_, Names missing_keys_) : used_keys(std::move(used_keys_)), missing_keys(std::move(missing_keys_)) { }
Names used_keys;
Names missing_keys;
};
using GroupingSetsParamsList = std::vector<GroupingSetsParams>;
/** How are "total" values calculated with WITH TOTALS? /** How are "total" values calculated with WITH TOTALS?
* (For more details, see TotalsHavingTransform.) * (For more details, see TotalsHavingTransform.)
* *

View File

@ -347,6 +347,27 @@ bool shouldIgnoreQuotaAndLimits(const StorageID & table_id)
return false; return false;
} }
GroupingSetsParamsList getAggregatorGroupingSetsParams(const NamesAndTypesLists & aggregation_keys_list, const Names & all_keys)
{
GroupingSetsParamsList result;
for (const auto & aggregation_keys : aggregation_keys_list)
{
NameSet keys;
for (const auto & key : aggregation_keys)
keys.insert(key.name);
Names missing_keys;
for (const auto & key : all_keys)
if (!keys.contains(key))
missing_keys.push_back(key);
result.emplace_back(aggregation_keys.getNames(), std::move(missing_keys));
}
return result;
}
} }
InterpreterSelectQuery::InterpreterSelectQuery( InterpreterSelectQuery::InterpreterSelectQuery(
@ -2005,13 +2026,12 @@ static void executeMergeAggregatedImpl(
bool has_grouping_sets, bool has_grouping_sets,
const Settings & settings, const Settings & settings,
const NamesAndTypesList & aggregation_keys, const NamesAndTypesList & aggregation_keys,
const NamesAndTypesLists & aggregation_keys_list,
const AggregateDescriptions & aggregates, const AggregateDescriptions & aggregates,
bool should_produce_results_in_order_of_bucket_number, bool should_produce_results_in_order_of_bucket_number,
SortDescription group_by_sort_description) SortDescription group_by_sort_description)
{ {
auto keys = aggregation_keys.getNames(); auto keys = aggregation_keys.getNames();
if (has_grouping_sets)
keys.insert(keys.begin(), "__grouping_set");
/** There are two modes of distributed aggregation. /** There are two modes of distributed aggregation.
* *
@ -2029,10 +2049,12 @@ static void executeMergeAggregatedImpl(
*/ */
Aggregator::Params params(keys, aggregates, overflow_row, settings.max_threads, settings.max_block_size, settings.min_hit_rate_to_use_consecutive_keys_optimization); Aggregator::Params params(keys, aggregates, overflow_row, settings.max_threads, settings.max_block_size, settings.min_hit_rate_to_use_consecutive_keys_optimization);
auto grouping_sets_params = getAggregatorGroupingSetsParams(aggregation_keys_list, keys);
auto merging_aggregated = std::make_unique<MergingAggregatedStep>( auto merging_aggregated = std::make_unique<MergingAggregatedStep>(
query_plan.getCurrentDataStream(), query_plan.getCurrentDataStream(),
params, params,
grouping_sets_params,
final, final,
/// Grouping sets don't work with distributed_aggregation_memory_efficient enabled (#43989) /// Grouping sets don't work with distributed_aggregation_memory_efficient enabled (#43989)
settings.distributed_aggregation_memory_efficient && is_remote_storage && !has_grouping_sets, settings.distributed_aggregation_memory_efficient && is_remote_storage && !has_grouping_sets,
@ -2653,30 +2675,6 @@ static Aggregator::Params getAggregatorParams(
}; };
} }
static GroupingSetsParamsList getAggregatorGroupingSetsParams(const SelectQueryExpressionAnalyzer & query_analyzer, const Names & all_keys)
{
GroupingSetsParamsList result;
if (query_analyzer.useGroupingSetKey())
{
auto const & aggregation_keys_list = query_analyzer.aggregationKeysList();
for (const auto & aggregation_keys : aggregation_keys_list)
{
NameSet keys;
for (const auto & key : aggregation_keys)
keys.insert(key.name);
Names missing_keys;
for (const auto & key : all_keys)
if (!keys.contains(key))
missing_keys.push_back(key);
result.emplace_back(aggregation_keys.getNames(), std::move(missing_keys));
}
}
return result;
}
void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const ActionsAndProjectInputsFlagPtr & expression, bool overflow_row, bool final, InputOrderInfoPtr group_by_info) void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const ActionsAndProjectInputsFlagPtr & expression, bool overflow_row, bool final, InputOrderInfoPtr group_by_info)
{ {
executeExpression(query_plan, expression, "Before GROUP BY"); executeExpression(query_plan, expression, "Before GROUP BY");
@ -2696,7 +2694,7 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
settings.group_by_two_level_threshold, settings.group_by_two_level_threshold,
settings.group_by_two_level_threshold_bytes); settings.group_by_two_level_threshold_bytes);
auto grouping_sets_params = getAggregatorGroupingSetsParams(*query_analyzer, keys); auto grouping_sets_params = getAggregatorGroupingSetsParams(query_analyzer->aggregationKeysList(), keys);
SortDescription group_by_sort_description; SortDescription group_by_sort_description;
SortDescription sort_description_for_merging; SortDescription sort_description_for_merging;
@ -2764,6 +2762,7 @@ void InterpreterSelectQuery::executeMergeAggregated(QueryPlan & query_plan, bool
has_grouping_sets, has_grouping_sets,
context->getSettingsRef(), context->getSettingsRef(),
query_analyzer->aggregationKeys(), query_analyzer->aggregationKeys(),
query_analyzer->aggregationKeysList(),
query_analyzer->aggregates(), query_analyzer->aggregates(),
should_produce_results_in_order_of_bucket_number, should_produce_results_in_order_of_bucket_number,
std::move(group_by_sort_description)); std::move(group_by_sort_description));

View File

@ -504,8 +504,6 @@ void addMergingAggregatedStep(QueryPlan & query_plan,
*/ */
auto keys = aggregation_analysis_result.aggregation_keys; auto keys = aggregation_analysis_result.aggregation_keys;
if (!aggregation_analysis_result.grouping_sets_parameters_list.empty())
keys.insert(keys.begin(), "__grouping_set");
Aggregator::Params params(keys, Aggregator::Params params(keys,
aggregation_analysis_result.aggregate_descriptions, aggregation_analysis_result.aggregate_descriptions,
@ -530,6 +528,7 @@ void addMergingAggregatedStep(QueryPlan & query_plan,
auto merging_aggregated = std::make_unique<MergingAggregatedStep>( auto merging_aggregated = std::make_unique<MergingAggregatedStep>(
query_plan.getCurrentDataStream(), query_plan.getCurrentDataStream(),
params, params,
aggregation_analysis_result.grouping_sets_parameters_list,
query_analysis_result.aggregate_final, query_analysis_result.aggregate_final,
/// Grouping sets don't work with distributed_aggregation_memory_efficient enabled (#43989) /// Grouping sets don't work with distributed_aggregation_memory_efficient enabled (#43989)
settings.distributed_aggregation_memory_efficient && (is_remote_storage || parallel_replicas_from_merge_tree) && !query_analysis_result.aggregation_with_rollup_or_cube_or_grouping_sets, settings.distributed_aggregation_memory_efficient && (is_remote_storage || parallel_replicas_from_merge_tree) && !query_analysis_result.aggregation_with_rollup_or_cube_or_grouping_sets,

View File

@ -151,6 +151,61 @@ void AggregatingStep::applyOrder(SortDescription sort_description_for_merging_,
explicit_sorting_required_for_aggregation_in_order = false; explicit_sorting_required_for_aggregation_in_order = false;
} }
ActionsDAG AggregatingStep::makeCreatingMissingKeysForGroupingSetDAG(
const Block & in_header,
const Block & out_header,
const GroupingSetsParamsList & grouping_sets_params,
UInt64 group,
bool group_by_use_nulls)
{
/// Here we create a DAG which fills missing keys and adds `__grouping_set` column
ActionsDAG dag(in_header.getColumnsWithTypeAndName());
ActionsDAG::NodeRawConstPtrs outputs;
outputs.reserve(out_header.columns() + 1);
auto grouping_col = ColumnConst::create(ColumnUInt64::create(1, group), 0);
const auto * grouping_node = &dag.addColumn(
{ColumnPtr(std::move(grouping_col)), std::make_shared<DataTypeUInt64>(), "__grouping_set"});
grouping_node = &dag.materializeNode(*grouping_node);
outputs.push_back(grouping_node);
const auto & missing_columns = grouping_sets_params[group].missing_keys;
const auto & used_keys = grouping_sets_params[group].used_keys;
auto to_nullable_function = FunctionFactory::instance().get("toNullable", nullptr);
for (size_t i = 0; i < out_header.columns(); ++i)
{
const auto & col = out_header.getByPosition(i);
const auto missing_it = std::find_if(
missing_columns.begin(), missing_columns.end(), [&](const auto & missing_col) { return missing_col == col.name; });
const auto used_it = std::find_if(
used_keys.begin(), used_keys.end(), [&](const auto & used_col) { return used_col == col.name; });
if (missing_it != missing_columns.end())
{
auto column_with_default = col.column->cloneEmpty();
col.type->insertDefaultInto(*column_with_default);
column_with_default->finalize();
auto column = ColumnConst::create(std::move(column_with_default), 0);
const auto * node = &dag.addColumn({ColumnPtr(std::move(column)), col.type, col.name});
node = &dag.materializeNode(*node);
outputs.push_back(node);
}
else
{
const auto * column_node = dag.getOutputs()[in_header.getPositionByName(col.name)];
if (used_it != used_keys.end() && group_by_use_nulls && column_node->result_type->canBeInsideNullable())
outputs.push_back(&dag.addFunction(to_nullable_function, { column_node }, col.name));
else
outputs.push_back(column_node);
}
}
dag.getOutputs().swap(outputs);
return dag;
}
void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings) void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings)
{ {
QueryPipelineProcessorsCollector collector(pipeline, this); QueryPipelineProcessorsCollector collector(pipeline, this);
@ -300,51 +355,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
{ {
const auto & header = ports[set_counter]->getHeader(); const auto & header = ports[set_counter]->getHeader();
/// Here we create a DAG which fills missing keys and adds `__grouping_set` column auto dag = makeCreatingMissingKeysForGroupingSetDAG(header, output_header, grouping_sets_params, set_counter, group_by_use_nulls);
ActionsDAG dag(header.getColumnsWithTypeAndName());
ActionsDAG::NodeRawConstPtrs outputs;
outputs.reserve(output_header.columns() + 1);
auto grouping_col = ColumnConst::create(ColumnUInt64::create(1, set_counter), 0);
const auto * grouping_node = &dag.addColumn(
{ColumnPtr(std::move(grouping_col)), std::make_shared<DataTypeUInt64>(), "__grouping_set"});
grouping_node = &dag.materializeNode(*grouping_node);
outputs.push_back(grouping_node);
const auto & missing_columns = grouping_sets_params[set_counter].missing_keys;
const auto & used_keys = grouping_sets_params[set_counter].used_keys;
auto to_nullable_function = FunctionFactory::instance().get("toNullable", nullptr);
for (size_t i = 0; i < output_header.columns(); ++i)
{
auto & col = output_header.getByPosition(i);
const auto missing_it = std::find_if(
missing_columns.begin(), missing_columns.end(), [&](const auto & missing_col) { return missing_col == col.name; });
const auto used_it = std::find_if(
used_keys.begin(), used_keys.end(), [&](const auto & used_col) { return used_col == col.name; });
if (missing_it != missing_columns.end())
{
auto column_with_default = col.column->cloneEmpty();
col.type->insertDefaultInto(*column_with_default);
column_with_default->finalize();
auto column = ColumnConst::create(std::move(column_with_default), 0);
const auto * node = &dag.addColumn({ColumnPtr(std::move(column)), col.type, col.name});
node = &dag.materializeNode(*node);
outputs.push_back(node);
}
else
{
const auto * column_node = dag.getOutputs()[header.getPositionByName(col.name)];
if (used_it != used_keys.end() && group_by_use_nulls && column_node->result_type->canBeInsideNullable())
outputs.push_back(&dag.addFunction(to_nullable_function, { column_node }, col.name));
else
outputs.push_back(column_node);
}
}
dag.getOutputs().swap(outputs);
auto expression = std::make_shared<ExpressionActions>(std::move(dag), settings.getActionsSettings()); auto expression = std::make_shared<ExpressionActions>(std::move(dag), settings.getActionsSettings());
auto transform = std::make_shared<ExpressionTransform>(header, expression); auto transform = std::make_shared<ExpressionTransform>(header, expression);

View File

@ -7,18 +7,6 @@
namespace DB namespace DB
{ {
struct GroupingSetsParams
{
GroupingSetsParams() = default;
GroupingSetsParams(Names used_keys_, Names missing_keys_) : used_keys(std::move(used_keys_)), missing_keys(std::move(missing_keys_)) { }
Names used_keys;
Names missing_keys;
};
using GroupingSetsParamsList = std::vector<GroupingSetsParams>;
Block appendGroupingSetColumn(Block header); Block appendGroupingSetColumn(Block header);
Block generateOutputHeader(const Block & input_header, const Names & keys, bool use_nulls); Block generateOutputHeader(const Block & input_header, const Names & keys, bool use_nulls);
@ -77,6 +65,13 @@ public:
/// Argument input_stream would be the second input (from projection). /// Argument input_stream would be the second input (from projection).
std::unique_ptr<AggregatingProjectionStep> convertToAggregatingProjection(const DataStream & input_stream) const; std::unique_ptr<AggregatingProjectionStep> convertToAggregatingProjection(const DataStream & input_stream) const;
static ActionsDAG makeCreatingMissingKeysForGroupingSetDAG(
const Block & in_header,
const Block & out_header,
const GroupingSetsParamsList & grouping_sets_params,
UInt64 group,
bool group_by_use_nulls);
private: private:
void updateOutputStream() override; void updateOutputStream() override;

View File

@ -10,6 +10,11 @@
namespace DB namespace DB
{ {
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
static bool memoryBoundMergingWillBeUsed( static bool memoryBoundMergingWillBeUsed(
const DataStream & input_stream, const DataStream & input_stream,
bool memory_bound_merging_of_aggregation_results_enabled, bool memory_bound_merging_of_aggregation_results_enabled,
@ -37,6 +42,7 @@ static ITransformingStep::Traits getTraits(bool should_produce_results_in_order_
MergingAggregatedStep::MergingAggregatedStep( MergingAggregatedStep::MergingAggregatedStep(
const DataStream & input_stream_, const DataStream & input_stream_,
Aggregator::Params params_, Aggregator::Params params_,
GroupingSetsParamsList grouping_sets_params_,
bool final_, bool final_,
bool memory_efficient_aggregation_, bool memory_efficient_aggregation_,
size_t max_threads_, size_t max_threads_,
@ -48,9 +54,10 @@ MergingAggregatedStep::MergingAggregatedStep(
bool memory_bound_merging_of_aggregation_results_enabled_) bool memory_bound_merging_of_aggregation_results_enabled_)
: ITransformingStep( : ITransformingStep(
input_stream_, input_stream_,
params_.getHeader(input_stream_.header, final_), MergingAggregatedTransform::appendGroupingIfNeeded(input_stream_.header, params_.getHeader(input_stream_.header, final_)),
getTraits(should_produce_results_in_order_of_bucket_number_)) getTraits(should_produce_results_in_order_of_bucket_number_))
, params(std::move(params_)) , params(std::move(params_))
, grouping_sets_params(std::move(grouping_sets_params_))
, final(final_) , final(final_)
, memory_efficient_aggregation(memory_efficient_aggregation_) , memory_efficient_aggregation(memory_efficient_aggregation_)
, max_threads(max_threads_) , max_threads(max_threads_)
@ -89,10 +96,13 @@ void MergingAggregatedStep::applyOrder(SortDescription sort_description, DataStr
void MergingAggregatedStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) void MergingAggregatedStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{ {
auto transform_params = std::make_shared<AggregatingTransformParams>(pipeline.getHeader(), std::move(params), final);
if (memoryBoundMergingWillBeUsed()) if (memoryBoundMergingWillBeUsed())
{ {
if (input_streams.front().header.has("__grouping_set") || !grouping_sets_params.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Memory bound merging of aggregated results is not supported for grouping sets.");
auto transform_params = std::make_shared<AggregatingTransformParams>(pipeline.getHeader(), std::move(params), final);
auto transform = std::make_shared<FinishAggregatingInOrderTransform>( auto transform = std::make_shared<FinishAggregatingInOrderTransform>(
pipeline.getHeader(), pipeline.getHeader(),
pipeline.getNumStreams(), pipeline.getNumStreams(),
@ -127,15 +137,19 @@ void MergingAggregatedStep::transformPipeline(QueryPipelineBuilder & pipeline, c
pipeline.resize(1); pipeline.resize(1);
/// Now merge the aggregated blocks /// Now merge the aggregated blocks
pipeline.addSimpleTransform([&](const Block & header) auto transform = std::make_shared<MergingAggregatedTransform>(pipeline.getHeader(), params, final, grouping_sets_params, max_threads);
{ return std::make_shared<MergingAggregatedTransform>(header, transform_params, max_threads); }); pipeline.addTransform(std::move(transform));
} }
else else
{ {
if (input_streams.front().header.has("__grouping_set") || !grouping_sets_params.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Memory efficient merging of aggregated results is not supported for grouping sets.");
auto num_merge_threads = memory_efficient_merge_threads auto num_merge_threads = memory_efficient_merge_threads
? memory_efficient_merge_threads ? memory_efficient_merge_threads
: max_threads; : max_threads;
auto transform_params = std::make_shared<AggregatingTransformParams>(pipeline.getHeader(), std::move(params), final);
pipeline.addMergingAggregatedMemoryEfficientTransform(transform_params, num_merge_threads); pipeline.addMergingAggregatedMemoryEfficientTransform(transform_params, num_merge_threads);
} }
@ -154,7 +168,9 @@ void MergingAggregatedStep::describeActions(JSONBuilder::JSONMap & map) const
void MergingAggregatedStep::updateOutputStream() void MergingAggregatedStep::updateOutputStream()
{ {
output_stream = createOutputStream(input_streams.front(), params.getHeader(input_streams.front().header, final), getDataStreamTraits()); const auto & in_header = input_streams.front().header;
output_stream = createOutputStream(input_streams.front(),
MergingAggregatedTransform::appendGroupingIfNeeded(in_header, params.getHeader(in_header, final)), getDataStreamTraits());
if (is_order_overwritten) /// overwrite order again if (is_order_overwritten) /// overwrite order again
applyOrder(group_by_sort_description, overwritten_sort_scope); applyOrder(group_by_sort_description, overwritten_sort_scope);
} }

View File

@ -16,6 +16,7 @@ public:
MergingAggregatedStep( MergingAggregatedStep(
const DataStream & input_stream_, const DataStream & input_stream_,
Aggregator::Params params_, Aggregator::Params params_,
GroupingSetsParamsList grouping_sets_params_,
bool final_, bool final_,
bool memory_efficient_aggregation_, bool memory_efficient_aggregation_,
size_t max_threads_, size_t max_threads_,
@ -43,6 +44,7 @@ private:
Aggregator::Params params; Aggregator::Params params;
GroupingSetsParamsList grouping_sets_params;
bool final; bool final;
bool memory_efficient_aggregation; bool memory_efficient_aggregation;
size_t max_threads; size_t max_threads;

View File

@ -1,7 +1,10 @@
#include <Processors/Transforms/MergingAggregatedTransform.h> #include <Processors/Transforms/MergingAggregatedTransform.h>
#include <Processors/Transforms/AggregatingTransform.h> #include <Processors/Transforms/AggregatingTransform.h>
#include <Processors/Transforms/AggregatingInOrderTransform.h> #include <Processors/Transforms/AggregatingInOrderTransform.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
#include <Interpreters/ExpressionActions.h>
#include <DataTypes/DataTypesNumber.h>
namespace DB namespace DB
{ {
@ -10,11 +13,192 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
} }
MergingAggregatedTransform::MergingAggregatedTransform( Block MergingAggregatedTransform::appendGroupingIfNeeded(const Block & in_header, Block out_header)
Block header_, AggregatingTransformParamsPtr params_, size_t max_threads_)
: IAccumulatingTransform(std::move(header_), params_->getHeader())
, params(std::move(params_)), max_threads(max_threads_)
{ {
/// __grouping_set is neither GROUP BY key nor an aggregate function.
/// It behaves like a GROUP BY key, but we cannot append it to keys
/// because it changes hashing method and buckets for two level aggregation.
/// Now, this column is processed "manually" by merging each group separately.
if (in_header.has("__grouping_set"))
out_header.insert(0, in_header.getByName("__grouping_set"));
return out_header;
}
/// We should keep the order for GROUPING SET keys.
/// Initiator creates a separate Aggregator for every group, so should we do here.
/// Otherwise, two-level aggregation will split the data into different buckets,
/// and the result may have duplicating rows.
static ActionsDAG makeReorderingActions(const Block & in_header, const GroupingSetsParams & params)
{
ActionsDAG reordering(in_header.getColumnsWithTypeAndName());
auto & outputs = reordering.getOutputs();
ActionsDAG::NodeRawConstPtrs new_outputs;
new_outputs.reserve(in_header.columns() + params.used_keys.size() - params.used_keys.size());
std::unordered_map<std::string_view, size_t> index;
for (size_t pos = 0; pos < outputs.size(); ++pos)
index.emplace(outputs[pos]->result_name, pos);
for (const auto & used_name : params.used_keys)
{
auto & idx = index[used_name];
new_outputs.push_back(outputs[idx]);
}
for (const auto & used_name : params.used_keys)
index[used_name] = outputs.size();
for (const auto & missing_name : params.missing_keys)
index[missing_name] = outputs.size();
for (const auto * output : outputs)
{
if (index[output->result_name] != outputs.size())
new_outputs.push_back(output);
}
outputs.swap(new_outputs);
return reordering;
}
MergingAggregatedTransform::~MergingAggregatedTransform() = default;
MergingAggregatedTransform::MergingAggregatedTransform(
Block header_,
Aggregator::Params params,
bool final,
GroupingSetsParamsList grouping_sets_params,
size_t max_threads_)
: IAccumulatingTransform(header_, appendGroupingIfNeeded(header_, params.getHeader(header_, final)))
, max_threads(max_threads_)
{
if (!grouping_sets_params.empty())
{
if (!header_.has("__grouping_set"))
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot find __grouping_set column in header of MergingAggregatedTransform with grouping sets."
"Header {}", header_.dumpStructure());
auto in_header = header_;
in_header.erase(header_.getPositionByName("__grouping_set"));
auto out_header = params.getHeader(header_, final);
grouping_sets.reserve(grouping_sets_params.size());
for (const auto & grouping_set_params : grouping_sets_params)
{
size_t group = grouping_sets.size();
auto reordering = makeReorderingActions(in_header, grouping_set_params);
Aggregator::Params set_params(grouping_set_params.used_keys,
params.aggregates,
params.overflow_row,
params.max_threads,
params.max_block_size,
params.min_hit_rate_to_use_consecutive_keys_optimization);
auto transform_params = std::make_shared<AggregatingTransformParams>(reordering.updateHeader(in_header), std::move(set_params), final);
auto creating = AggregatingStep::makeCreatingMissingKeysForGroupingSetDAG(
transform_params->getHeader(),
out_header,
grouping_sets_params, group, false);
auto & groupiung_set = grouping_sets.emplace_back();
groupiung_set.reordering_key_columns_actions = std::make_shared<ExpressionActions>(std::move(reordering));
groupiung_set.creating_missing_keys_actions = std::make_shared<ExpressionActions>(std::move(creating));
groupiung_set.params = std::move(transform_params);
}
}
else
{
auto & groupiung_set = grouping_sets.emplace_back();
groupiung_set.params = std::make_shared<AggregatingTransformParams>(header_, std::move(params), final);
}
}
void MergingAggregatedTransform::addBlock(Block block)
{
if (grouping_sets.size() == 1)
{
auto bucket = block.info.bucket_num;
if (grouping_sets[0].reordering_key_columns_actions)
grouping_sets[0].reordering_key_columns_actions->execute(block);
grouping_sets[0].bucket_to_blocks[bucket].emplace_back(std::move(block));
return;
}
auto grouping_position = block.getPositionByName("__grouping_set");
auto grouping_column = block.getByPosition(grouping_position).column;
block.erase(grouping_position);
/// Split a block by __grouping_set values.
const auto * grouping_column_typed = typeid_cast<const ColumnUInt64 *>(grouping_column.get());
if (!grouping_column_typed)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected UInt64 column for __grouping_set, got {}", grouping_column->getName());
IColumn::Selector selector;
const auto & grouping_data = grouping_column_typed->getData();
size_t num_rows = grouping_data.size();
UInt64 last_group = grouping_data[0];
UInt64 max_group = last_group;
for (size_t row = 1; row < num_rows; ++row)
{
auto group = grouping_data[row];
/// Optimization for equal ranges.
if (last_group == group)
continue;
/// Optimization for single group.
if (selector.empty())
selector.reserve(num_rows);
/// Fill the last equal range.
selector.resize_fill(row, last_group);
last_group = group;
max_group = std::max(last_group, max_group);
}
if (max_group >= grouping_sets.size())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Invalid group number {}. Number of groups {}.", last_group, grouping_sets.size());
/// Optimization for single group.
if (selector.empty())
{
auto bucket = block.info.bucket_num;
grouping_sets[last_group].reordering_key_columns_actions->execute(block);
grouping_sets[last_group].bucket_to_blocks[bucket].emplace_back(std::move(block));
return;
}
/// Fill the last equal range.
selector.resize_fill(num_rows, last_group);
const size_t num_groups = max_group + 1;
Blocks splitted_blocks(num_groups);
for (size_t group_id = 0; group_id < num_groups; ++group_id)
splitted_blocks[group_id] = block.cloneEmpty();
size_t columns_in_block = block.columns();
for (size_t col_idx_in_block = 0; col_idx_in_block < columns_in_block; ++col_idx_in_block)
{
MutableColumns splitted_columns = block.getByPosition(col_idx_in_block).column->scatter(num_groups, selector);
for (size_t group_id = 0; group_id < num_groups; ++group_id)
splitted_blocks[group_id].getByPosition(col_idx_in_block).column = std::move(splitted_columns[group_id]);
}
for (size_t group = 0; group < num_groups; ++group)
{
auto & splitted_block = splitted_blocks[group];
splitted_block.info = block.info;
grouping_sets[group].reordering_key_columns_actions->execute(splitted_block);
grouping_sets[group].bucket_to_blocks[block.info.bucket_num].emplace_back(std::move(splitted_block));
}
} }
void MergingAggregatedTransform::consume(Chunk chunk) void MergingAggregatedTransform::consume(Chunk chunk)
@ -46,7 +230,7 @@ void MergingAggregatedTransform::consume(Chunk chunk)
block.info.is_overflows = agg_info->is_overflows; block.info.is_overflows = agg_info->is_overflows;
block.info.bucket_num = agg_info->bucket_num; block.info.bucket_num = agg_info->bucket_num;
bucket_to_blocks[agg_info->bucket_num].emplace_back(std::move(block)); addBlock(std::move(block));
} }
else if (chunk.getChunkInfos().get<ChunkInfoWithAllocatedBytes>()) else if (chunk.getChunkInfos().get<ChunkInfoWithAllocatedBytes>())
{ {
@ -54,7 +238,7 @@ void MergingAggregatedTransform::consume(Chunk chunk)
block.info.is_overflows = false; block.info.is_overflows = false;
block.info.bucket_num = -1; block.info.bucket_num = -1;
bucket_to_blocks[block.info.bucket_num].emplace_back(std::move(block)); addBlock(std::move(block));
} }
else else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk should have AggregatedChunkInfo in MergingAggregatedTransform."); throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk should have AggregatedChunkInfo in MergingAggregatedTransform.");
@ -70,9 +254,23 @@ Chunk MergingAggregatedTransform::generate()
/// Exception safety. Make iterator valid in case any method below throws. /// Exception safety. Make iterator valid in case any method below throws.
next_block = blocks.begin(); next_block = blocks.begin();
/// TODO: this operation can be made async. Add async for IAccumulatingTransform. for (auto & grouping_set : grouping_sets)
params->aggregator.mergeBlocks(std::move(bucket_to_blocks), data_variants, max_threads, is_cancelled); {
blocks = params->aggregator.convertToBlocks(data_variants, params->final, max_threads); auto & params = grouping_set.params;
auto & bucket_to_blocks = grouping_set.bucket_to_blocks;
AggregatedDataVariants data_variants;
/// TODO: this operation can be made async. Add async for IAccumulatingTransform.
params->aggregator.mergeBlocks(std::move(bucket_to_blocks), data_variants, max_threads, is_cancelled);
auto merged_blocks = params->aggregator.convertToBlocks(data_variants, params->final, max_threads);
if (grouping_set.creating_missing_keys_actions)
for (auto & block : merged_blocks)
grouping_set.creating_missing_keys_actions->execute(block);
blocks.splice(blocks.end(), std::move(merged_blocks));
}
next_block = blocks.begin(); next_block = blocks.begin();
} }

View File

@ -6,26 +6,46 @@
namespace DB namespace DB
{ {
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
/** A pre-aggregate stream of blocks in which each block is already aggregated. /** A pre-aggregate stream of blocks in which each block is already aggregated.
* Aggregate functions in blocks should not be finalized so that their states can be merged. * Aggregate functions in blocks should not be finalized so that their states can be merged.
*/ */
class MergingAggregatedTransform : public IAccumulatingTransform class MergingAggregatedTransform : public IAccumulatingTransform
{ {
public: public:
MergingAggregatedTransform(Block header_, AggregatingTransformParamsPtr params_, size_t max_threads_); MergingAggregatedTransform(
Block header_,
Aggregator::Params params_,
bool final_,
GroupingSetsParamsList grouping_sets_params,
size_t max_threads_);
~MergingAggregatedTransform() override;
String getName() const override { return "MergingAggregatedTransform"; } String getName() const override { return "MergingAggregatedTransform"; }
static Block appendGroupingIfNeeded(const Block & in_header, Block out_header);
protected: protected:
void consume(Chunk chunk) override; void consume(Chunk chunk) override;
Chunk generate() override; Chunk generate() override;
private: private:
AggregatingTransformParamsPtr params;
LoggerPtr log = getLogger("MergingAggregatedTransform"); LoggerPtr log = getLogger("MergingAggregatedTransform");
size_t max_threads; size_t max_threads;
AggregatedDataVariants data_variants; struct GroupingSet
Aggregator::BucketToBlocks bucket_to_blocks; {
Aggregator::BucketToBlocks bucket_to_blocks;
ExpressionActionsPtr reordering_key_columns_actions;
ExpressionActionsPtr creating_missing_keys_actions;
AggregatingTransformParamsPtr params;
};
using GroupingSets = std::vector<GroupingSet>;
GroupingSets grouping_sets;
UInt64 total_input_rows = 0; UInt64 total_input_rows = 0;
UInt64 total_input_blocks = 0; UInt64 total_input_blocks = 0;
@ -35,6 +55,8 @@ private:
bool consume_started = false; bool consume_started = false;
bool generate_started = false; bool generate_started = false;
void addBlock(Block block);
}; };
} }

View File

@ -11,3 +11,215 @@
0 6 4 0 6 4
1 10 4 1 10 4
2 14 4 2 14 4
-- { echo On }
SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
1 ['.']
2 ['.','.']
2 ['.','.','.']
2 ['.','.','.','.']
2 ['.','.','.','.','.']
2 ['.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.','.']
SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
1 ['.']
1 ['.']
2 ['.','.']
2 ['.','.']
2 ['.','.','.']
2 ['.','.','.']
2 ['.','.','.','.']
2 ['.','.','.','.']
2 ['.','.','.','.','.']
2 ['.','.','.','.','.']
2 ['.','.','.','.','.','.']
2 ['.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.','.']
SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
1 1
2 2
2 3
2 4
2 5
2 6
2 7
2 8
2 9
SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
1 1
1 1
2 2
2 2
2 3
2 3
2 4
2 4
2 5
2 5
2 6
2 6
2 7
2 7
2 8
2 8
2 9
2 9
SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
1 1
1 1
2 2
2 2
2 3
2 3
2 4
2 4
2 5
2 5
2 6
2 6
2 7
2 7
2 8
2 8
2 9
2 9
SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k), (k, number + 2)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
1 1
1 1
1 1
2 2
2 2
2 2
2 3
2 3
2 3
2 4
2 4
2 4
2 5
2 5
2 5
2 6
2 6
2 6
2 7
2 7
2 7
2 8
2 8
2 8
2 9
2 9
2 9
SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
2 ['.']
2 ['.','.']
2 ['.','.','.']
2 ['.','.','.','.']
2 ['.','.','.','.','.']
2 ['.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.','.']
SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
2 ['.']
2 ['.']
2 ['.','.']
2 ['.','.']
2 ['.','.','.']
2 ['.','.','.']
2 ['.','.','.','.']
2 ['.','.','.','.']
2 ['.','.','.','.','.']
2 ['.','.','.','.','.']
2 ['.','.','.','.','.','.']
2 ['.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.','.']
SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
2 1
2 2
2 3
2 4
2 5
2 6
2 7
2 8
2 9
SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
2 1
2 1
2 2
2 2
2 3
2 3
2 4
2 4
2 5
2 5
2 6
2 6
2 7
2 7
2 8
2 8
2 9
2 9
SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
2 1
2 1
2 2
2 2
2 3
2 3
2 4
2 4
2 5
2 5
2 6
2 6
2 7
2 7
2 8
2 8
2 9
2 9
SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k), (k, number + 2)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
2 1
2 1
2 1
2 2
2 2
2 2
2 3
2 3
2 3
2 4
2 4
2 4
2 5
2 5
2 5
2 6
2 6
2 6
2 7
2 7
2 7
2 8
2 8
2 8
2 9
2 9
2 9

View File

@ -43,3 +43,23 @@ GROUP BY
ORDER BY ORDER BY
sum_value ASC, sum_value ASC,
count_value ASC; count_value ASC;
set prefer_localhost_replica = 1;
-- { echo On }
SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k), (k, number + 2)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k), (k, number + 2)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;

View File

@ -0,0 +1 @@
['key1','key2'] ['value1','value2']

View File

@ -0,0 +1,37 @@
DROP TABLE IF EXISTS rawtable;
DROP TABLE IF EXISTS raw_to_attributes_mv;
DROP TABLE IF EXISTS attributes;
SET optimize_functions_to_subcolumns = 1;
CREATE TABLE rawtable
(
`Attributes` Map(String, String),
)
ENGINE = MergeTree
ORDER BY tuple();
CREATE MATERIALIZED VIEW raw_to_attributes_mv TO attributes
(
`AttributeKeys` Array(String),
`AttributeValues` Array(String)
)
AS SELECT
mapKeys(Attributes) AS AttributeKeys,
mapValues(Attributes) AS AttributeValues
FROM rawtable;
CREATE TABLE attributes
(
`AttributeKeys` Array(String),
`AttributeValues` Array(String)
)
ENGINE = ReplacingMergeTree
ORDER BY tuple();
INSERT INTO rawtable VALUES ({'key1': 'value1', 'key2': 'value2'});
SELECT * FROM raw_to_attributes_mv ORDER BY AttributeKeys;
DROP TABLE IF EXISTS rawtable;
DROP TABLE IF EXISTS raw_to_attributes_mv;
DROP TABLE IF EXISTS attributes;