Merge remote-tracking branch 'allmazz/feat/59376' into create-table-with-clone-as

This commit is contained in:
Tuan Pham Anh 2024-09-03 07:54:12 +00:00
commit 4b3da04f6c
25 changed files with 784 additions and 165 deletions

View File

@ -351,7 +351,7 @@ ALTER TABLE mt DELETE IN PARTITION ID '2' WHERE p = 2;
You can specify the partition expression in `ALTER ... PARTITION` queries in different ways:
- As a value from the `partition` column of the `system.parts` table. For example, `ALTER TABLE visits DETACH PARTITION 201901`.
- Using the keyword `ALL`. It can be used only with DROP/DETACH/ATTACH. For example, `ALTER TABLE visits ATTACH PARTITION ALL`.
- Using the keyword `ALL`. It can be used only with DROP/DETACH/ATTACH/ATTACH FROM. For example, `ALTER TABLE visits ATTACH PARTITION ALL`.
- As a tuple of expressions or constants that matches (in types) the table partitioning keys tuple. In the case of a single element partitioning key, the expression should be wrapped in the `tuple (...)` function. For example, `ALTER TABLE visits DETACH PARTITION tuple(toYYYYMM(toDate('2019-01-25')))`.
- Using the partition ID. Partition ID is a string identifier of the partition (human-readable, if possible) that is used as the names of partitions in the file system and in ZooKeeper. The partition ID must be specified in the `PARTITION ID` clause, in a single quotes. For example, `ALTER TABLE visits DETACH PARTITION ID '201901'`.
- In the [ALTER ATTACH PART](#attach-partitionpart) and [DROP DETACHED PART](#drop-detached-partitionpart) query, to specify the name of a part, use string literal with a value from the `name` column of the [system.detached_parts](/docs/en/operations/system-tables/detached_parts.md/#system_tables-detached_parts) table. For example, `ALTER TABLE visits ATTACH PART '201901_1_1_0'`.

View File

@ -8,7 +8,7 @@ title: "CREATE ROW POLICY"
Creates a [row policy](../../../guides/sre/user-management/index.md#row-policy-management), i.e. a filter used to determine which rows a user can read from a table.
:::tip
Row policies makes sense only for users with readonly access. If user can modify table or copy partitions between tables, it defeats the restrictions of row policies.
Row policies make sense only for users with readonly access. If a user can modify a table or copy partitions between tables, it defeats the restrictions of row policies.
:::
Syntax:
@ -24,40 +24,40 @@ CREATE [ROW] POLICY [IF NOT EXISTS | OR REPLACE] policy_name1 [ON CLUSTER cluste
## USING Clause
Allows to specify a condition to filter rows. An user will see a row if the condition is calculated to non-zero for the row.
Allows specifying a condition to filter rows. A user will see a row if the condition is calculated to non-zero for the row.
## TO Clause
In the section `TO` you can provide a list of users and roles this policy should work for. For example, `CREATE ROW POLICY ... TO accountant, john@localhost`.
In the `TO` section you can provide a list of users and roles this policy should work for. For example, `CREATE ROW POLICY ... TO accountant, john@localhost`.
Keyword `ALL` means all the ClickHouse users including current user. Keyword `ALL EXCEPT` allow to exclude some users from the all users list, for example, `CREATE ROW POLICY ... TO ALL EXCEPT accountant, john@localhost`
Keyword `ALL` means all the ClickHouse users, including current user. Keyword `ALL EXCEPT` allows excluding some users from the all users list, for example, `CREATE ROW POLICY ... TO ALL EXCEPT accountant, john@localhost`
:::note
If there are no row policies defined for a table then any user can `SELECT` all the row from the table. Defining one or more row policies for the table makes the access to the table depending on the row policies no matter if those row policies are defined for the current user or not. For example, the following policy
If there are no row policies defined for a table, then any user can `SELECT` all the rows from the table. Defining one or more row policies for the table makes access to the table dependent on the row policies, no matter if those row policies are defined for the current user or not. For example, the following policy:
`CREATE ROW POLICY pol1 ON mydb.table1 USING b=1 TO mira, peter`
forbids the users `mira` and `peter` to see the rows with `b != 1`, and any non-mentioned user (e.g., the user `paul`) will see no rows from `mydb.table1` at all.
forbids the users `mira` and `peter` from seeing the rows with `b != 1`, and any non-mentioned user (e.g., the user `paul`) will see no rows from `mydb.table1` at all.
If that's not desirable it can't be fixed by adding one more row policy, like the following:
If that's not desirable, it can be fixed by adding one more row policy, like the following:
`CREATE ROW POLICY pol2 ON mydb.table1 USING 1 TO ALL EXCEPT mira, peter`
:::
## AS Clause
It's allowed to have more than one policy enabled on the same table for the same user at the one time. So we need a way to combine the conditions from multiple policies.
It's allowed to have more than one policy enabled on the same table for the same user at one time. So we need a way to combine the conditions from multiple policies.
By default policies are combined using the boolean `OR` operator. For example, the following policies
By default, policies are combined using the boolean `OR` operator. For example, the following policies:
``` sql
CREATE ROW POLICY pol1 ON mydb.table1 USING b=1 TO mira, peter
CREATE ROW POLICY pol2 ON mydb.table1 USING c=2 TO peter, antonio
```
enables the user `peter` to see rows with either `b=1` or `c=2`.
enable the user `peter` to see rows with either `b=1` or `c=2`.
The `AS` clause specifies how policies should be combined with other policies. Policies can be either permissive or restrictive. By default policies are permissive, which means they are combined using the boolean `OR` operator.
The `AS` clause specifies how policies should be combined with other policies. Policies can be either permissive or restrictive. By default, policies are permissive, which means they are combined using the boolean `OR` operator.
A policy can be defined as restrictive as an alternative. Restrictive policies are combined using the boolean `AND` operator.
@ -68,25 +68,25 @@ row_is_visible = (one or more of the permissive policies' conditions are non-zer
(all of the restrictive policies's conditions are non-zero)
```
For example, the following policies
For example, the following policies:
``` sql
CREATE ROW POLICY pol1 ON mydb.table1 USING b=1 TO mira, peter
CREATE ROW POLICY pol2 ON mydb.table1 USING c=2 AS RESTRICTIVE TO peter, antonio
```
enables the user `peter` to see rows only if both `b=1` AND `c=2`.
enable the user `peter` to see rows only if both `b=1` AND `c=2`.
Database policies are combined with table policies.
For example, the following policies
For example, the following policies:
``` sql
CREATE ROW POLICY pol1 ON mydb.* USING b=1 TO mira, peter
CREATE ROW POLICY pol2 ON mydb.table1 USING c=2 AS RESTRICTIVE TO peter, antonio
```
enables the user `peter` to see table1 rows only if both `b=1` AND `c=2`, although
enable the user `peter` to see table1 rows only if both `b=1` AND `c=2`, although
any other table in mydb would have only `b=1` policy applied for the user.

View File

@ -280,7 +280,7 @@ SYSTEM START REPLICATION QUEUES [ON CLUSTER cluster_name] [[db.]replicated_merge
Ждет когда таблица семейства `ReplicatedMergeTree` будет синхронизирована с другими репликами в кластере, но не более `receive_timeout` секунд:
``` sql
SYSTEM SYNC REPLICA [db.]replicated_merge_tree_family_table_name [STRICT | LIGHTWEIGHT [FROM 'srcReplica1'[, 'srcReplica2'[, ...]]] | PULL]
SYSTEM SYNC REPLICA [ON CLUSTER cluster_name] [db.]replicated_merge_tree_family_table_name [STRICT | LIGHTWEIGHT [FROM 'srcReplica1'[, 'srcReplica2'[, ...]]] | PULL]
```
После выполнения этого запроса таблица `[db.]replicated_merge_tree_family_table_name` загружает команды из общего реплицированного лога в свою собственную очередь репликации. Затем запрос ждет, пока реплика не обработает все загруженные команды. Поддерживаются следующие модификаторы:

View File

@ -209,7 +209,7 @@ std::map<std::pair<TypeIndex, String>, NodeToSubcolumnTransformer> node_transfor
},
};
std::tuple<FunctionNode *, ColumnNode *, TableNode *> getTypedNodesForOptimization(const QueryTreeNodePtr & node)
std::tuple<FunctionNode *, ColumnNode *, TableNode *> getTypedNodesForOptimization(const QueryTreeNodePtr & node, const ContextPtr & context)
{
auto * function_node = node->as<FunctionNode>();
if (!function_node)
@ -232,6 +232,12 @@ std::tuple<FunctionNode *, ColumnNode *, TableNode *> getTypedNodesForOptimizati
const auto & storage_snapshot = table_node->getStorageSnapshot();
auto column = first_argument_column_node->getColumn();
/// If view source is set we cannot optimize because it doesn't support moving functions to subcolumns.
/// The storage is replaced to the view source but it happens only after building a query tree and applying passes.
auto view_source = context->getViewSource();
if (view_source && view_source->getStorageID().getFullNameNotQuoted() == storage->getStorageID().getFullNameNotQuoted())
return {};
if (!storage->supportsOptimizationToSubcolumns() || storage->isVirtualColumn(column.name, storage_snapshot->metadata))
return {};
@ -266,7 +272,7 @@ public:
return;
}
auto [function_node, first_argument_node, table_node] = getTypedNodesForOptimization(node);
auto [function_node, first_argument_node, table_node] = getTypedNodesForOptimization(node, getContext());
if (function_node && first_argument_node && table_node)
{
enterImpl(*function_node, *first_argument_node, *table_node);
@ -416,7 +422,7 @@ public:
if (!getSettings().optimize_functions_to_subcolumns)
return;
auto [function_node, first_argument_column_node, table_node] = getTypedNodesForOptimization(node);
auto [function_node, first_argument_column_node, table_node] = getTypedNodesForOptimization(node, getContext());
if (!function_node || !first_argument_column_node || !table_node)
return;

View File

@ -59,6 +59,18 @@ class CompiledAggregateFunctionsHolder;
class NativeWriter;
struct OutputBlockColumns;
struct GroupingSetsParams
{
GroupingSetsParams() = default;
GroupingSetsParams(Names used_keys_, Names missing_keys_) : used_keys(std::move(used_keys_)), missing_keys(std::move(missing_keys_)) { }
Names used_keys;
Names missing_keys;
};
using GroupingSetsParamsList = std::vector<GroupingSetsParams>;
/** How are "total" values calculated with WITH TOTALS?
* (For more details, see TotalsHavingTransform.)
*

View File

@ -347,6 +347,27 @@ bool shouldIgnoreQuotaAndLimits(const StorageID & table_id)
return false;
}
GroupingSetsParamsList getAggregatorGroupingSetsParams(const NamesAndTypesLists & aggregation_keys_list, const Names & all_keys)
{
GroupingSetsParamsList result;
for (const auto & aggregation_keys : aggregation_keys_list)
{
NameSet keys;
for (const auto & key : aggregation_keys)
keys.insert(key.name);
Names missing_keys;
for (const auto & key : all_keys)
if (!keys.contains(key))
missing_keys.push_back(key);
result.emplace_back(aggregation_keys.getNames(), std::move(missing_keys));
}
return result;
}
}
InterpreterSelectQuery::InterpreterSelectQuery(
@ -2005,13 +2026,12 @@ static void executeMergeAggregatedImpl(
bool has_grouping_sets,
const Settings & settings,
const NamesAndTypesList & aggregation_keys,
const NamesAndTypesLists & aggregation_keys_list,
const AggregateDescriptions & aggregates,
bool should_produce_results_in_order_of_bucket_number,
SortDescription group_by_sort_description)
{
auto keys = aggregation_keys.getNames();
if (has_grouping_sets)
keys.insert(keys.begin(), "__grouping_set");
/** There are two modes of distributed aggregation.
*
@ -2029,10 +2049,12 @@ static void executeMergeAggregatedImpl(
*/
Aggregator::Params params(keys, aggregates, overflow_row, settings.max_threads, settings.max_block_size, settings.min_hit_rate_to_use_consecutive_keys_optimization);
auto grouping_sets_params = getAggregatorGroupingSetsParams(aggregation_keys_list, keys);
auto merging_aggregated = std::make_unique<MergingAggregatedStep>(
query_plan.getCurrentDataStream(),
params,
grouping_sets_params,
final,
/// Grouping sets don't work with distributed_aggregation_memory_efficient enabled (#43989)
settings.distributed_aggregation_memory_efficient && is_remote_storage && !has_grouping_sets,
@ -2653,30 +2675,6 @@ static Aggregator::Params getAggregatorParams(
};
}
static GroupingSetsParamsList getAggregatorGroupingSetsParams(const SelectQueryExpressionAnalyzer & query_analyzer, const Names & all_keys)
{
GroupingSetsParamsList result;
if (query_analyzer.useGroupingSetKey())
{
auto const & aggregation_keys_list = query_analyzer.aggregationKeysList();
for (const auto & aggregation_keys : aggregation_keys_list)
{
NameSet keys;
for (const auto & key : aggregation_keys)
keys.insert(key.name);
Names missing_keys;
for (const auto & key : all_keys)
if (!keys.contains(key))
missing_keys.push_back(key);
result.emplace_back(aggregation_keys.getNames(), std::move(missing_keys));
}
}
return result;
}
void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const ActionsAndProjectInputsFlagPtr & expression, bool overflow_row, bool final, InputOrderInfoPtr group_by_info)
{
executeExpression(query_plan, expression, "Before GROUP BY");
@ -2696,7 +2694,7 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
settings.group_by_two_level_threshold,
settings.group_by_two_level_threshold_bytes);
auto grouping_sets_params = getAggregatorGroupingSetsParams(*query_analyzer, keys);
auto grouping_sets_params = getAggregatorGroupingSetsParams(query_analyzer->aggregationKeysList(), keys);
SortDescription group_by_sort_description;
SortDescription sort_description_for_merging;
@ -2764,6 +2762,7 @@ void InterpreterSelectQuery::executeMergeAggregated(QueryPlan & query_plan, bool
has_grouping_sets,
context->getSettingsRef(),
query_analyzer->aggregationKeys(),
query_analyzer->aggregationKeysList(),
query_analyzer->aggregates(),
should_produce_results_in_order_of_bucket_number,
std::move(group_by_sort_description));

View File

@ -504,8 +504,6 @@ void addMergingAggregatedStep(QueryPlan & query_plan,
*/
auto keys = aggregation_analysis_result.aggregation_keys;
if (!aggregation_analysis_result.grouping_sets_parameters_list.empty())
keys.insert(keys.begin(), "__grouping_set");
Aggregator::Params params(keys,
aggregation_analysis_result.aggregate_descriptions,
@ -530,6 +528,7 @@ void addMergingAggregatedStep(QueryPlan & query_plan,
auto merging_aggregated = std::make_unique<MergingAggregatedStep>(
query_plan.getCurrentDataStream(),
params,
aggregation_analysis_result.grouping_sets_parameters_list,
query_analysis_result.aggregate_final,
/// Grouping sets don't work with distributed_aggregation_memory_efficient enabled (#43989)
settings.distributed_aggregation_memory_efficient && (is_remote_storage || parallel_replicas_from_merge_tree) && !query_analysis_result.aggregation_with_rollup_or_cube_or_grouping_sets,

View File

@ -151,6 +151,61 @@ void AggregatingStep::applyOrder(SortDescription sort_description_for_merging_,
explicit_sorting_required_for_aggregation_in_order = false;
}
ActionsDAG AggregatingStep::makeCreatingMissingKeysForGroupingSetDAG(
const Block & in_header,
const Block & out_header,
const GroupingSetsParamsList & grouping_sets_params,
UInt64 group,
bool group_by_use_nulls)
{
/// Here we create a DAG which fills missing keys and adds `__grouping_set` column
ActionsDAG dag(in_header.getColumnsWithTypeAndName());
ActionsDAG::NodeRawConstPtrs outputs;
outputs.reserve(out_header.columns() + 1);
auto grouping_col = ColumnConst::create(ColumnUInt64::create(1, group), 0);
const auto * grouping_node = &dag.addColumn(
{ColumnPtr(std::move(grouping_col)), std::make_shared<DataTypeUInt64>(), "__grouping_set"});
grouping_node = &dag.materializeNode(*grouping_node);
outputs.push_back(grouping_node);
const auto & missing_columns = grouping_sets_params[group].missing_keys;
const auto & used_keys = grouping_sets_params[group].used_keys;
auto to_nullable_function = FunctionFactory::instance().get("toNullable", nullptr);
for (size_t i = 0; i < out_header.columns(); ++i)
{
const auto & col = out_header.getByPosition(i);
const auto missing_it = std::find_if(
missing_columns.begin(), missing_columns.end(), [&](const auto & missing_col) { return missing_col == col.name; });
const auto used_it = std::find_if(
used_keys.begin(), used_keys.end(), [&](const auto & used_col) { return used_col == col.name; });
if (missing_it != missing_columns.end())
{
auto column_with_default = col.column->cloneEmpty();
col.type->insertDefaultInto(*column_with_default);
column_with_default->finalize();
auto column = ColumnConst::create(std::move(column_with_default), 0);
const auto * node = &dag.addColumn({ColumnPtr(std::move(column)), col.type, col.name});
node = &dag.materializeNode(*node);
outputs.push_back(node);
}
else
{
const auto * column_node = dag.getOutputs()[in_header.getPositionByName(col.name)];
if (used_it != used_keys.end() && group_by_use_nulls && column_node->result_type->canBeInsideNullable())
outputs.push_back(&dag.addFunction(to_nullable_function, { column_node }, col.name));
else
outputs.push_back(column_node);
}
}
dag.getOutputs().swap(outputs);
return dag;
}
void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings)
{
QueryPipelineProcessorsCollector collector(pipeline, this);
@ -300,51 +355,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
{
const auto & header = ports[set_counter]->getHeader();
/// Here we create a DAG which fills missing keys and adds `__grouping_set` column
ActionsDAG dag(header.getColumnsWithTypeAndName());
ActionsDAG::NodeRawConstPtrs outputs;
outputs.reserve(output_header.columns() + 1);
auto grouping_col = ColumnConst::create(ColumnUInt64::create(1, set_counter), 0);
const auto * grouping_node = &dag.addColumn(
{ColumnPtr(std::move(grouping_col)), std::make_shared<DataTypeUInt64>(), "__grouping_set"});
grouping_node = &dag.materializeNode(*grouping_node);
outputs.push_back(grouping_node);
const auto & missing_columns = grouping_sets_params[set_counter].missing_keys;
const auto & used_keys = grouping_sets_params[set_counter].used_keys;
auto to_nullable_function = FunctionFactory::instance().get("toNullable", nullptr);
for (size_t i = 0; i < output_header.columns(); ++i)
{
auto & col = output_header.getByPosition(i);
const auto missing_it = std::find_if(
missing_columns.begin(), missing_columns.end(), [&](const auto & missing_col) { return missing_col == col.name; });
const auto used_it = std::find_if(
used_keys.begin(), used_keys.end(), [&](const auto & used_col) { return used_col == col.name; });
if (missing_it != missing_columns.end())
{
auto column_with_default = col.column->cloneEmpty();
col.type->insertDefaultInto(*column_with_default);
column_with_default->finalize();
auto column = ColumnConst::create(std::move(column_with_default), 0);
const auto * node = &dag.addColumn({ColumnPtr(std::move(column)), col.type, col.name});
node = &dag.materializeNode(*node);
outputs.push_back(node);
}
else
{
const auto * column_node = dag.getOutputs()[header.getPositionByName(col.name)];
if (used_it != used_keys.end() && group_by_use_nulls && column_node->result_type->canBeInsideNullable())
outputs.push_back(&dag.addFunction(to_nullable_function, { column_node }, col.name));
else
outputs.push_back(column_node);
}
}
dag.getOutputs().swap(outputs);
auto dag = makeCreatingMissingKeysForGroupingSetDAG(header, output_header, grouping_sets_params, set_counter, group_by_use_nulls);
auto expression = std::make_shared<ExpressionActions>(std::move(dag), settings.getActionsSettings());
auto transform = std::make_shared<ExpressionTransform>(header, expression);

View File

@ -7,18 +7,6 @@
namespace DB
{
struct GroupingSetsParams
{
GroupingSetsParams() = default;
GroupingSetsParams(Names used_keys_, Names missing_keys_) : used_keys(std::move(used_keys_)), missing_keys(std::move(missing_keys_)) { }
Names used_keys;
Names missing_keys;
};
using GroupingSetsParamsList = std::vector<GroupingSetsParams>;
Block appendGroupingSetColumn(Block header);
Block generateOutputHeader(const Block & input_header, const Names & keys, bool use_nulls);
@ -77,6 +65,13 @@ public:
/// Argument input_stream would be the second input (from projection).
std::unique_ptr<AggregatingProjectionStep> convertToAggregatingProjection(const DataStream & input_stream) const;
static ActionsDAG makeCreatingMissingKeysForGroupingSetDAG(
const Block & in_header,
const Block & out_header,
const GroupingSetsParamsList & grouping_sets_params,
UInt64 group,
bool group_by_use_nulls);
private:
void updateOutputStream() override;

View File

@ -10,6 +10,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
static bool memoryBoundMergingWillBeUsed(
const DataStream & input_stream,
bool memory_bound_merging_of_aggregation_results_enabled,
@ -37,6 +42,7 @@ static ITransformingStep::Traits getTraits(bool should_produce_results_in_order_
MergingAggregatedStep::MergingAggregatedStep(
const DataStream & input_stream_,
Aggregator::Params params_,
GroupingSetsParamsList grouping_sets_params_,
bool final_,
bool memory_efficient_aggregation_,
size_t max_threads_,
@ -48,9 +54,10 @@ MergingAggregatedStep::MergingAggregatedStep(
bool memory_bound_merging_of_aggregation_results_enabled_)
: ITransformingStep(
input_stream_,
params_.getHeader(input_stream_.header, final_),
MergingAggregatedTransform::appendGroupingIfNeeded(input_stream_.header, params_.getHeader(input_stream_.header, final_)),
getTraits(should_produce_results_in_order_of_bucket_number_))
, params(std::move(params_))
, grouping_sets_params(std::move(grouping_sets_params_))
, final(final_)
, memory_efficient_aggregation(memory_efficient_aggregation_)
, max_threads(max_threads_)
@ -89,10 +96,13 @@ void MergingAggregatedStep::applyOrder(SortDescription sort_description, DataStr
void MergingAggregatedStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
auto transform_params = std::make_shared<AggregatingTransformParams>(pipeline.getHeader(), std::move(params), final);
if (memoryBoundMergingWillBeUsed())
{
if (input_streams.front().header.has("__grouping_set") || !grouping_sets_params.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Memory bound merging of aggregated results is not supported for grouping sets.");
auto transform_params = std::make_shared<AggregatingTransformParams>(pipeline.getHeader(), std::move(params), final);
auto transform = std::make_shared<FinishAggregatingInOrderTransform>(
pipeline.getHeader(),
pipeline.getNumStreams(),
@ -127,15 +137,19 @@ void MergingAggregatedStep::transformPipeline(QueryPipelineBuilder & pipeline, c
pipeline.resize(1);
/// Now merge the aggregated blocks
pipeline.addSimpleTransform([&](const Block & header)
{ return std::make_shared<MergingAggregatedTransform>(header, transform_params, max_threads); });
auto transform = std::make_shared<MergingAggregatedTransform>(pipeline.getHeader(), params, final, grouping_sets_params, max_threads);
pipeline.addTransform(std::move(transform));
}
else
{
if (input_streams.front().header.has("__grouping_set") || !grouping_sets_params.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Memory efficient merging of aggregated results is not supported for grouping sets.");
auto num_merge_threads = memory_efficient_merge_threads
? memory_efficient_merge_threads
: max_threads;
auto transform_params = std::make_shared<AggregatingTransformParams>(pipeline.getHeader(), std::move(params), final);
pipeline.addMergingAggregatedMemoryEfficientTransform(transform_params, num_merge_threads);
}
@ -154,7 +168,9 @@ void MergingAggregatedStep::describeActions(JSONBuilder::JSONMap & map) const
void MergingAggregatedStep::updateOutputStream()
{
output_stream = createOutputStream(input_streams.front(), params.getHeader(input_streams.front().header, final), getDataStreamTraits());
const auto & in_header = input_streams.front().header;
output_stream = createOutputStream(input_streams.front(),
MergingAggregatedTransform::appendGroupingIfNeeded(in_header, params.getHeader(in_header, final)), getDataStreamTraits());
if (is_order_overwritten) /// overwrite order again
applyOrder(group_by_sort_description, overwritten_sort_scope);
}

View File

@ -16,6 +16,7 @@ public:
MergingAggregatedStep(
const DataStream & input_stream_,
Aggregator::Params params_,
GroupingSetsParamsList grouping_sets_params_,
bool final_,
bool memory_efficient_aggregation_,
size_t max_threads_,
@ -43,6 +44,7 @@ private:
Aggregator::Params params;
GroupingSetsParamsList grouping_sets_params;
bool final;
bool memory_efficient_aggregation;
size_t max_threads;

View File

@ -1,7 +1,10 @@
#include <Processors/Transforms/MergingAggregatedTransform.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Processors/Transforms/AggregatingInOrderTransform.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include <Common/logger_useful.h>
#include <Interpreters/ExpressionActions.h>
#include <DataTypes/DataTypesNumber.h>
namespace DB
{
@ -10,11 +13,192 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
MergingAggregatedTransform::MergingAggregatedTransform(
Block header_, AggregatingTransformParamsPtr params_, size_t max_threads_)
: IAccumulatingTransform(std::move(header_), params_->getHeader())
, params(std::move(params_)), max_threads(max_threads_)
Block MergingAggregatedTransform::appendGroupingIfNeeded(const Block & in_header, Block out_header)
{
/// __grouping_set is neither GROUP BY key nor an aggregate function.
/// It behaves like a GROUP BY key, but we cannot append it to keys
/// because it changes hashing method and buckets for two level aggregation.
/// Now, this column is processed "manually" by merging each group separately.
if (in_header.has("__grouping_set"))
out_header.insert(0, in_header.getByName("__grouping_set"));
return out_header;
}
/// We should keep the order for GROUPING SET keys.
/// Initiator creates a separate Aggregator for every group, so should we do here.
/// Otherwise, two-level aggregation will split the data into different buckets,
/// and the result may have duplicating rows.
static ActionsDAG makeReorderingActions(const Block & in_header, const GroupingSetsParams & params)
{
ActionsDAG reordering(in_header.getColumnsWithTypeAndName());
auto & outputs = reordering.getOutputs();
ActionsDAG::NodeRawConstPtrs new_outputs;
new_outputs.reserve(in_header.columns() + params.used_keys.size() - params.used_keys.size());
std::unordered_map<std::string_view, size_t> index;
for (size_t pos = 0; pos < outputs.size(); ++pos)
index.emplace(outputs[pos]->result_name, pos);
for (const auto & used_name : params.used_keys)
{
auto & idx = index[used_name];
new_outputs.push_back(outputs[idx]);
}
for (const auto & used_name : params.used_keys)
index[used_name] = outputs.size();
for (const auto & missing_name : params.missing_keys)
index[missing_name] = outputs.size();
for (const auto * output : outputs)
{
if (index[output->result_name] != outputs.size())
new_outputs.push_back(output);
}
outputs.swap(new_outputs);
return reordering;
}
MergingAggregatedTransform::~MergingAggregatedTransform() = default;
MergingAggregatedTransform::MergingAggregatedTransform(
Block header_,
Aggregator::Params params,
bool final,
GroupingSetsParamsList grouping_sets_params,
size_t max_threads_)
: IAccumulatingTransform(header_, appendGroupingIfNeeded(header_, params.getHeader(header_, final)))
, max_threads(max_threads_)
{
if (!grouping_sets_params.empty())
{
if (!header_.has("__grouping_set"))
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot find __grouping_set column in header of MergingAggregatedTransform with grouping sets."
"Header {}", header_.dumpStructure());
auto in_header = header_;
in_header.erase(header_.getPositionByName("__grouping_set"));
auto out_header = params.getHeader(header_, final);
grouping_sets.reserve(grouping_sets_params.size());
for (const auto & grouping_set_params : grouping_sets_params)
{
size_t group = grouping_sets.size();
auto reordering = makeReorderingActions(in_header, grouping_set_params);
Aggregator::Params set_params(grouping_set_params.used_keys,
params.aggregates,
params.overflow_row,
params.max_threads,
params.max_block_size,
params.min_hit_rate_to_use_consecutive_keys_optimization);
auto transform_params = std::make_shared<AggregatingTransformParams>(reordering.updateHeader(in_header), std::move(set_params), final);
auto creating = AggregatingStep::makeCreatingMissingKeysForGroupingSetDAG(
transform_params->getHeader(),
out_header,
grouping_sets_params, group, false);
auto & groupiung_set = grouping_sets.emplace_back();
groupiung_set.reordering_key_columns_actions = std::make_shared<ExpressionActions>(std::move(reordering));
groupiung_set.creating_missing_keys_actions = std::make_shared<ExpressionActions>(std::move(creating));
groupiung_set.params = std::move(transform_params);
}
}
else
{
auto & groupiung_set = grouping_sets.emplace_back();
groupiung_set.params = std::make_shared<AggregatingTransformParams>(header_, std::move(params), final);
}
}
void MergingAggregatedTransform::addBlock(Block block)
{
if (grouping_sets.size() == 1)
{
auto bucket = block.info.bucket_num;
if (grouping_sets[0].reordering_key_columns_actions)
grouping_sets[0].reordering_key_columns_actions->execute(block);
grouping_sets[0].bucket_to_blocks[bucket].emplace_back(std::move(block));
return;
}
auto grouping_position = block.getPositionByName("__grouping_set");
auto grouping_column = block.getByPosition(grouping_position).column;
block.erase(grouping_position);
/// Split a block by __grouping_set values.
const auto * grouping_column_typed = typeid_cast<const ColumnUInt64 *>(grouping_column.get());
if (!grouping_column_typed)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected UInt64 column for __grouping_set, got {}", grouping_column->getName());
IColumn::Selector selector;
const auto & grouping_data = grouping_column_typed->getData();
size_t num_rows = grouping_data.size();
UInt64 last_group = grouping_data[0];
UInt64 max_group = last_group;
for (size_t row = 1; row < num_rows; ++row)
{
auto group = grouping_data[row];
/// Optimization for equal ranges.
if (last_group == group)
continue;
/// Optimization for single group.
if (selector.empty())
selector.reserve(num_rows);
/// Fill the last equal range.
selector.resize_fill(row, last_group);
last_group = group;
max_group = std::max(last_group, max_group);
}
if (max_group >= grouping_sets.size())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Invalid group number {}. Number of groups {}.", last_group, grouping_sets.size());
/// Optimization for single group.
if (selector.empty())
{
auto bucket = block.info.bucket_num;
grouping_sets[last_group].reordering_key_columns_actions->execute(block);
grouping_sets[last_group].bucket_to_blocks[bucket].emplace_back(std::move(block));
return;
}
/// Fill the last equal range.
selector.resize_fill(num_rows, last_group);
const size_t num_groups = max_group + 1;
Blocks splitted_blocks(num_groups);
for (size_t group_id = 0; group_id < num_groups; ++group_id)
splitted_blocks[group_id] = block.cloneEmpty();
size_t columns_in_block = block.columns();
for (size_t col_idx_in_block = 0; col_idx_in_block < columns_in_block; ++col_idx_in_block)
{
MutableColumns splitted_columns = block.getByPosition(col_idx_in_block).column->scatter(num_groups, selector);
for (size_t group_id = 0; group_id < num_groups; ++group_id)
splitted_blocks[group_id].getByPosition(col_idx_in_block).column = std::move(splitted_columns[group_id]);
}
for (size_t group = 0; group < num_groups; ++group)
{
auto & splitted_block = splitted_blocks[group];
splitted_block.info = block.info;
grouping_sets[group].reordering_key_columns_actions->execute(splitted_block);
grouping_sets[group].bucket_to_blocks[block.info.bucket_num].emplace_back(std::move(splitted_block));
}
}
void MergingAggregatedTransform::consume(Chunk chunk)
@ -46,7 +230,7 @@ void MergingAggregatedTransform::consume(Chunk chunk)
block.info.is_overflows = agg_info->is_overflows;
block.info.bucket_num = agg_info->bucket_num;
bucket_to_blocks[agg_info->bucket_num].emplace_back(std::move(block));
addBlock(std::move(block));
}
else if (chunk.getChunkInfos().get<ChunkInfoWithAllocatedBytes>())
{
@ -54,7 +238,7 @@ void MergingAggregatedTransform::consume(Chunk chunk)
block.info.is_overflows = false;
block.info.bucket_num = -1;
bucket_to_blocks[block.info.bucket_num].emplace_back(std::move(block));
addBlock(std::move(block));
}
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk should have AggregatedChunkInfo in MergingAggregatedTransform.");
@ -70,9 +254,23 @@ Chunk MergingAggregatedTransform::generate()
/// Exception safety. Make iterator valid in case any method below throws.
next_block = blocks.begin();
/// TODO: this operation can be made async. Add async for IAccumulatingTransform.
params->aggregator.mergeBlocks(std::move(bucket_to_blocks), data_variants, max_threads, is_cancelled);
blocks = params->aggregator.convertToBlocks(data_variants, params->final, max_threads);
for (auto & grouping_set : grouping_sets)
{
auto & params = grouping_set.params;
auto & bucket_to_blocks = grouping_set.bucket_to_blocks;
AggregatedDataVariants data_variants;
/// TODO: this operation can be made async. Add async for IAccumulatingTransform.
params->aggregator.mergeBlocks(std::move(bucket_to_blocks), data_variants, max_threads, is_cancelled);
auto merged_blocks = params->aggregator.convertToBlocks(data_variants, params->final, max_threads);
if (grouping_set.creating_missing_keys_actions)
for (auto & block : merged_blocks)
grouping_set.creating_missing_keys_actions->execute(block);
blocks.splice(blocks.end(), std::move(merged_blocks));
}
next_block = blocks.begin();
}

View File

@ -6,26 +6,46 @@
namespace DB
{
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
/** A pre-aggregate stream of blocks in which each block is already aggregated.
* Aggregate functions in blocks should not be finalized so that their states can be merged.
*/
class MergingAggregatedTransform : public IAccumulatingTransform
{
public:
MergingAggregatedTransform(Block header_, AggregatingTransformParamsPtr params_, size_t max_threads_);
MergingAggregatedTransform(
Block header_,
Aggregator::Params params_,
bool final_,
GroupingSetsParamsList grouping_sets_params,
size_t max_threads_);
~MergingAggregatedTransform() override;
String getName() const override { return "MergingAggregatedTransform"; }
static Block appendGroupingIfNeeded(const Block & in_header, Block out_header);
protected:
void consume(Chunk chunk) override;
Chunk generate() override;
private:
AggregatingTransformParamsPtr params;
LoggerPtr log = getLogger("MergingAggregatedTransform");
size_t max_threads;
AggregatedDataVariants data_variants;
Aggregator::BucketToBlocks bucket_to_blocks;
struct GroupingSet
{
Aggregator::BucketToBlocks bucket_to_blocks;
ExpressionActionsPtr reordering_key_columns_actions;
ExpressionActionsPtr creating_missing_keys_actions;
AggregatingTransformParamsPtr params;
};
using GroupingSets = std::vector<GroupingSet>;
GroupingSets grouping_sets;
UInt64 total_input_rows = 0;
UInt64 total_input_blocks = 0;
@ -35,6 +55,8 @@ private:
bool consume_started = false;
bool generate_started = false;
void addBlock(Block block);
};
}

View File

@ -5009,7 +5009,7 @@ void MergeTreeData::checkAlterPartitionIsPossible(
const auto * partition_ast = command.partition->as<ASTPartition>();
if (partition_ast && partition_ast->all)
{
if (command.type != PartitionCommand::DROP_PARTITION && command.type != PartitionCommand::ATTACH_PARTITION)
if (command.type != PartitionCommand::DROP_PARTITION && command.type != PartitionCommand::ATTACH_PARTITION && (command.type == PartitionCommand::REPLACE_PARTITION && command.replace))
throw DB::Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Only support DROP/DETACH/ATTACH PARTITION ALL currently");
}
else
@ -5810,7 +5810,7 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, ContextPtr loc
const auto & partition_ast = ast->as<ASTPartition &>();
if (partition_ast.all)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Only Support DETACH PARTITION ALL currently");
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Only Support DROP/DETACH/ATTACH PARTITION ALL currently");
if (!partition_ast.value)
{

View File

@ -2101,9 +2101,22 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
ProfileEventsScope profile_events_scope;
MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table, source_metadata_snapshot, my_metadata_snapshot);
String partition_id = getPartitionIDFromQuery(partition, local_context);
DataPartsVector src_parts;
String partition_id;
bool is_all = partition->as<ASTPartition>()->all;
if (is_all)
{
if (replace)
throw DB::Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Only support DROP/DETACH/ATTACH PARTITION ALL currently");
src_parts = src_data.getVisibleDataPartsVector(local_context);
}
else
{
partition_id = getPartitionIDFromQuery(partition, local_context);
src_parts = src_data.getVisibleDataPartsVectorInPartition(local_context, partition_id);
}
DataPartsVector src_parts = src_data.getVisibleDataPartsVectorInPartition(local_context, partition_id);
MutableDataPartsVector dst_parts;
std::vector<scope_guard> dst_parts_locks;
@ -2111,6 +2124,9 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
for (const DataPartPtr & src_part : src_parts)
{
if (is_all)
partition_id = src_part->partition.getID(src_data);
if (!canReplacePartition(src_part))
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Cannot replace partition '{}' because part '{}' has inconsistent granularity with table",

View File

@ -8030,24 +8030,77 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
/// First argument is true, because we possibly will add new data to current table.
auto lock1 = lockForShare(query_context->getCurrentQueryId(), query_context->getSettingsRef().lock_acquire_timeout);
auto lock2 = source_table->lockForShare(query_context->getCurrentQueryId(), query_context->getSettingsRef().lock_acquire_timeout);
auto storage_settings_ptr = getSettings();
auto source_metadata_snapshot = source_table->getInMemoryMetadataPtr();
auto metadata_snapshot = getInMemoryMetadataPtr();
const auto storage_settings_ptr = getSettings();
const auto source_metadata_snapshot = source_table->getInMemoryMetadataPtr();
const auto metadata_snapshot = getInMemoryMetadataPtr();
const MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table, source_metadata_snapshot, metadata_snapshot);
Stopwatch watch;
std::unordered_set<String> partitions;
if (partition->as<ASTPartition>()->all)
{
if (replace)
throw DB::Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Only support DROP/DETACH/ATTACH PARTITION ALL currently");
partitions = src_data.getAllPartitionIds();
}
else
{
partitions = std::unordered_set<String>();
partitions.emplace(getPartitionIDFromQuery(partition, query_context));
}
LOG_INFO(log, "Will try to attach {} partitions", partitions.size());
const Stopwatch watch;
ProfileEventsScope profile_events_scope;
const auto zookeeper = getZooKeeper();
MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table, source_metadata_snapshot, metadata_snapshot);
String partition_id = getPartitionIDFromQuery(partition, query_context);
const bool zero_copy_enabled = storage_settings_ptr->allow_remote_fs_zero_copy_replication
|| dynamic_cast<const MergeTreeData *>(source_table.get())->getSettings()->allow_remote_fs_zero_copy_replication;
std::unique_ptr<ReplicatedMergeTreeLogEntryData> entries[partitions.size()];
size_t idx = 0;
for (const auto & partition_id : partitions)
{
entries[idx] = replacePartitionFromImpl(watch,
profile_events_scope,
metadata_snapshot,
src_data,
partition_id,
zookeeper,
replace,
zero_copy_enabled,
storage_settings_ptr->always_use_copy_instead_of_hardlinks,
query_context);
++idx;
}
for (const auto & entry : entries)
waitForLogEntryToBeProcessedIfNecessary(*entry, query_context);
}
std::unique_ptr<ReplicatedMergeTreeLogEntryData> StorageReplicatedMergeTree::replacePartitionFromImpl(
const Stopwatch & watch,
ProfileEventsScope & profile_events_scope,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeData & src_data,
const String & partition_id,
const ZooKeeperPtr & zookeeper,
bool replace,
const bool & zero_copy_enabled,
const bool & always_use_copy_instead_of_hardlinks,
const ContextPtr & query_context)
{
/// NOTE: Some covered parts may be missing in src_all_parts if corresponding log entries are not executed yet.
DataPartsVector src_all_parts = src_data.getVisibleDataPartsVectorInPartition(query_context, partition_id);
LOG_DEBUG(log, "Cloning {} parts", src_all_parts.size());
static const String TMP_PREFIX = "tmp_replace_from_";
auto zookeeper = getZooKeeper();
std::optional<ZooKeeperMetadataTransaction> txn;
if (auto query_txn = query_context->getZooKeeperMetadataTransaction())
txn.emplace(query_txn->getZooKeeper(),
query_txn->getDatabaseZooKeeperPath(),
query_txn->isInitialQuery(),
query_txn->getTaskZooKeeperPath());
/// Retry if alter_partition_version changes
for (size_t retry = 0; retry < 1000; ++retry)
@ -8133,11 +8186,9 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
UInt64 index = lock->getNumber();
MergeTreePartInfo dst_part_info(partition_id, index, index, src_part->info.level);
bool zero_copy_enabled = storage_settings_ptr->allow_remote_fs_zero_copy_replication
|| dynamic_cast<const MergeTreeData *>(source_table.get())->getSettings()->allow_remote_fs_zero_copy_replication;
IDataPartStorage::ClonePartParams clone_params
{
.copy_instead_of_hardlink = storage_settings_ptr->always_use_copy_instead_of_hardlinks || (zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport()),
.copy_instead_of_hardlink = always_use_copy_instead_of_hardlinks || (zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport()),
.metadata_version_to_write = metadata_snapshot->getMetadataVersion()
};
if (replace)
@ -8145,7 +8196,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
/// Replace can only work on the same disk
auto [dst_part, part_lock] = cloneAndLoadDataPart(
src_part,
TMP_PREFIX,
TMP_PREFIX_REPLACE_PARTITION_FROM,
dst_part_info,
metadata_snapshot,
clone_params,
@ -8160,7 +8211,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
/// Attach can work on another disk
auto [dst_part, part_lock] = cloneAndLoadDataPart(
src_part,
TMP_PREFIX,
TMP_PREFIX_REPLACE_PARTITION_FROM,
dst_part_info,
metadata_snapshot,
clone_params,
@ -8176,15 +8227,15 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
part_checksums.emplace_back(hash_hex);
}
ReplicatedMergeTreeLogEntryData entry;
auto entry = std::make_unique<ReplicatedMergeTreeLogEntryData>();
{
auto src_table_id = src_data.getStorageID();
entry.type = ReplicatedMergeTreeLogEntryData::REPLACE_RANGE;
entry.source_replica = replica_name;
entry.create_time = time(nullptr);
entry.replace_range_entry = std::make_shared<ReplicatedMergeTreeLogEntryData::ReplaceRangeEntry>();
entry->type = ReplicatedMergeTreeLogEntryData::REPLACE_RANGE;
entry->source_replica = replica_name;
entry->create_time = time(nullptr);
entry->replace_range_entry = std::make_shared<ReplicatedMergeTreeLogEntryData::ReplaceRangeEntry>();
auto & entry_replace = *entry.replace_range_entry;
auto & entry_replace = *entry->replace_range_entry;
entry_replace.drop_range_part_name = drop_range_fake_part_name;
entry_replace.from_database = src_table_id.database_name;
entry_replace.from_table = src_table_id.table_name;
@ -8217,7 +8268,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
ephemeral_locks[i].getUnlockOp(ops);
}
if (auto txn = query_context->getZooKeeperMetadataTransaction())
if (txn)
txn->moveOpsTo(ops);
delimiting_block_lock->getUnlockOp(ops);
@ -8225,7 +8276,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
ops.emplace_back(zkutil::makeSetRequest(alter_partition_version_path, "", alter_partition_version_stat.version));
/// Just update version, because merges assignment relies on it
ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "log", "", -1));
ops.emplace_back(zkutil::makeCreateRequest(fs::path(zookeeper_path) / "log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential));
ops.emplace_back(zkutil::makeCreateRequest(fs::path(zookeeper_path) / "log/log-", entry->toString(), zkutil::CreateMode::PersistentSequential));
Transaction transaction(*this, NO_TRANSACTION_RAW);
{
@ -8275,14 +8326,11 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
}
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*op_results.back()).path_created;
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
entry->znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
for (auto & lock : ephemeral_locks)
lock.assumeUnlocked();
lock2.reset();
lock1.reset();
/// We need to pull the REPLACE_RANGE before cleaning the replaced parts (otherwise CHeckThread may decide that parts are lost)
queue.pullLogsToQueue(getZooKeeperAndAssertNotReadonly(), {}, ReplicatedMergeTreeQueue::SYNC);
// No need to block operations further, especially that in case we have to wait for mutation to finish, the intent would block
@ -8291,10 +8339,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
parts_holder.clear();
cleanup_thread.wakeup();
waitForLogEntryToBeProcessedIfNecessary(entry, query_context);
return;
return entry;
}
throw Exception(

View File

@ -37,6 +37,7 @@
#include <base/defines.h>
#include <Core/BackgroundSchedulePool.h>
#include <QueryPipeline/Pipe.h>
#include <Common/ProfileEventsScope.h>
#include <Storages/MergeTree/BackgroundJobsAssignee.h>
#include <Parsers/SyncReplicaMode.h>
@ -1013,6 +1014,18 @@ private:
DataPartsVector::const_iterator it;
};
const String TMP_PREFIX_REPLACE_PARTITION_FROM = "tmp_replace_from_";
std::unique_ptr<ReplicatedMergeTreeLogEntryData> replacePartitionFromImpl(
const Stopwatch & watch,
ProfileEventsScope & profile_events_scope,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeData & src_data,
const String & partition_id,
const zkutil::ZooKeeperPtr & zookeeper,
bool replace,
const bool & zero_copy_enabled,
const bool & always_use_copy_instead_of_hardlinks,
const ContextPtr & query_context);
};
String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info);

View File

@ -10,11 +10,12 @@ REPLACE recursive
4 8
1
ATTACH FROM
5 8
6 8
10 12
OPTIMIZE
5 8 5
5 8 3
10 12 9
10 12 5
After restart
5 8
10 12
DETACH+ATTACH PARTITION
3 4
7 7

View File

@ -53,12 +53,16 @@ DROP TABLE src;
CREATE TABLE src (p UInt64, k String, d UInt64) ENGINE = MergeTree PARTITION BY p ORDER BY k;
INSERT INTO src VALUES (1, '0', 1);
INSERT INTO src VALUES (1, '1', 1);
INSERT INTO src VALUES (2, '2', 1);
INSERT INTO src VALUES (3, '3', 1);
SYSTEM STOP MERGES dst;
INSERT INTO dst VALUES (1, '1', 2);
INSERT INTO dst VALUES (1, '1', 2), (1, '2', 0);
ALTER TABLE dst ATTACH PARTITION 1 FROM src;
SELECT count(), sum(d) FROM dst;
ALTER TABLE dst ATTACH PARTITION ALL FROM src;
SELECT count(), sum(d) FROM dst;
SELECT 'OPTIMIZE';
SELECT count(), sum(d), uniqExact(_part) FROM dst;

View File

@ -16,6 +16,7 @@ REPLACE recursive
ATTACH FROM
5 8
5 8
7 12
REPLACE with fetch
4 6
4 6

View File

@ -82,6 +82,8 @@ $CLICKHOUSE_CLIENT --query="DROP TABLE src;"
$CLICKHOUSE_CLIENT --query="CREATE TABLE src (p UInt64, k String, d UInt64) ENGINE = MergeTree PARTITION BY p ORDER BY k;"
$CLICKHOUSE_CLIENT --query="INSERT INTO src VALUES (1, '0', 1);"
$CLICKHOUSE_CLIENT --query="INSERT INTO src VALUES (1, '1', 1);"
$CLICKHOUSE_CLIENT --query="INSERT INTO src VALUES (3, '1', 2);"
$CLICKHOUSE_CLIENT --query="INSERT INTO src VALUES (4, '1', 2);"
$CLICKHOUSE_CLIENT --query="INSERT INTO dst_r2 VALUES (1, '1', 2);"
query_with_retry "ALTER TABLE dst_r2 ATTACH PARTITION 1 FROM src;"
@ -90,6 +92,13 @@ $CLICKHOUSE_CLIENT --query="SYSTEM SYNC REPLICA dst_r1;"
$CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM dst_r1;"
$CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM dst_r2;"
query_with_retry "ALTER TABLE dst_r2 ATTACH PARTITION ALL FROM src;"
$CLICKHOUSE_CLIENT --query="SYSTEM SYNC REPLICA dst_r2;"
$CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM dst_r2;"
query_with_retry "ALTER TABLE dst_r2 DROP PARTITION 3;"
$CLICKHOUSE_CLIENT --query="SYSTEM SYNC REPLICA dst_r2;"
query_with_retry "ALTER TABLE dst_r2 DROP PARTITION 4;"
$CLICKHOUSE_CLIENT --query="SYSTEM SYNC REPLICA dst_r2;"
$CLICKHOUSE_CLIENT --query="SELECT 'REPLACE with fetch';"
$CLICKHOUSE_CLIENT --query="DROP TABLE src;"

View File

@ -11,3 +11,215 @@
0 6 4
1 10 4
2 14 4
-- { echo On }
SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
1 ['.']
2 ['.','.']
2 ['.','.','.']
2 ['.','.','.','.']
2 ['.','.','.','.','.']
2 ['.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.','.']
SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
1 ['.']
1 ['.']
2 ['.','.']
2 ['.','.']
2 ['.','.','.']
2 ['.','.','.']
2 ['.','.','.','.']
2 ['.','.','.','.']
2 ['.','.','.','.','.']
2 ['.','.','.','.','.']
2 ['.','.','.','.','.','.']
2 ['.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.','.']
SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
1 1
2 2
2 3
2 4
2 5
2 6
2 7
2 8
2 9
SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
1 1
1 1
2 2
2 2
2 3
2 3
2 4
2 4
2 5
2 5
2 6
2 6
2 7
2 7
2 8
2 8
2 9
2 9
SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
1 1
1 1
2 2
2 2
2 3
2 3
2 4
2 4
2 5
2 5
2 6
2 6
2 7
2 7
2 8
2 8
2 9
2 9
SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k), (k, number + 2)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
1 1
1 1
1 1
2 2
2 2
2 2
2 3
2 3
2 3
2 4
2 4
2 4
2 5
2 5
2 5
2 6
2 6
2 6
2 7
2 7
2 7
2 8
2 8
2 8
2 9
2 9
2 9
SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
2 ['.']
2 ['.','.']
2 ['.','.','.']
2 ['.','.','.','.']
2 ['.','.','.','.','.']
2 ['.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.','.']
SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
2 ['.']
2 ['.']
2 ['.','.']
2 ['.','.']
2 ['.','.','.']
2 ['.','.','.']
2 ['.','.','.','.']
2 ['.','.','.','.']
2 ['.','.','.','.','.']
2 ['.','.','.','.','.']
2 ['.','.','.','.','.','.']
2 ['.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.','.']
SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
2 1
2 2
2 3
2 4
2 5
2 6
2 7
2 8
2 9
SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
2 1
2 1
2 2
2 2
2 3
2 3
2 4
2 4
2 5
2 5
2 6
2 6
2 7
2 7
2 8
2 8
2 9
2 9
SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
2 1
2 1
2 2
2 2
2 3
2 3
2 4
2 4
2 5
2 5
2 6
2 6
2 7
2 7
2 8
2 8
2 9
2 9
SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k), (k, number + 2)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
2 1
2 1
2 1
2 2
2 2
2 2
2 3
2 3
2 3
2 4
2 4
2 4
2 5
2 5
2 5
2 6
2 6
2 6
2 7
2 7
2 7
2 8
2 8
2 8
2 9
2 9
2 9

View File

@ -43,3 +43,23 @@ GROUP BY
ORDER BY
sum_value ASC,
count_value ASC;
set prefer_localhost_replica = 1;
-- { echo On }
SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k), (k, number + 2)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k), (k, number + 2)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;

View File

@ -0,0 +1 @@
['key1','key2'] ['value1','value2']

View File

@ -0,0 +1,37 @@
DROP TABLE IF EXISTS rawtable;
DROP TABLE IF EXISTS raw_to_attributes_mv;
DROP TABLE IF EXISTS attributes;
SET optimize_functions_to_subcolumns = 1;
CREATE TABLE rawtable
(
`Attributes` Map(String, String),
)
ENGINE = MergeTree
ORDER BY tuple();
CREATE MATERIALIZED VIEW raw_to_attributes_mv TO attributes
(
`AttributeKeys` Array(String),
`AttributeValues` Array(String)
)
AS SELECT
mapKeys(Attributes) AS AttributeKeys,
mapValues(Attributes) AS AttributeValues
FROM rawtable;
CREATE TABLE attributes
(
`AttributeKeys` Array(String),
`AttributeValues` Array(String)
)
ENGINE = ReplacingMergeTree
ORDER BY tuple();
INSERT INTO rawtable VALUES ({'key1': 'value1', 'key2': 'value2'});
SELECT * FROM raw_to_attributes_mv ORDER BY AttributeKeys;
DROP TABLE IF EXISTS rawtable;
DROP TABLE IF EXISTS raw_to_attributes_mv;
DROP TABLE IF EXISTS attributes;