allow to copy materialized and alias

This commit is contained in:
Nikita Mikhaylov 2021-04-23 01:32:16 +03:00
parent c41cc36046
commit 24af47063e
4 changed files with 17 additions and 15 deletions

View File

@ -697,8 +697,7 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
/// Remove column's TTL expression from `CREATE` query
/// This is needed to create internal Distributed table
/// Also it removes MATEREALIZED or ALIAS columns not to copy additional and useless data over the network.
/// TODO: Make removing MATERIALIZED and ALIAS columns optional.
ASTPtr ClusterCopier::removeAliasColumnsFromCreateQuery(const ASTPtr & query_ast)
ASTPtr ClusterCopier::removeAliasMaterializedAndTTLColumnsFromCreateQuery(const ASTPtr & query_ast, bool allow_to_copy_alias_and_materialized_columns)
{
const ASTs & column_asts = query_ast->as<ASTCreateQuery &>().columns_list->columns->children;
auto new_columns = std::make_shared<ASTExpressionList>();
@ -708,7 +707,7 @@ ASTPtr ClusterCopier::removeAliasColumnsFromCreateQuery(const ASTPtr & query_ast
const auto & column = column_ast->as<ASTColumnDeclaration &>();
/// Skip this columns
if (!column.default_specifier.empty())
if (!column.default_specifier.empty() && !allow_to_copy_alias_and_materialized_columns)
{
ColumnDefaultKind kind = columnDefaultKindFromString(column.default_specifier);
if (kind == ColumnDefaultKind::Materialized || kind == ColumnDefaultKind::Alias)
@ -716,12 +715,12 @@ ASTPtr ClusterCopier::removeAliasColumnsFromCreateQuery(const ASTPtr & query_ast
}
/// Remove TTL on columns definition.
auto new_column = column_ast->clone();
auto * new_column_ptr = new_column->as<ASTColumnDeclaration>();
if (new_column_ptr->ttl)
new_column_ptr->ttl.reset();
auto new_column_ast = column_ast->clone();
auto & new_column = new_column_ast->as<ASTColumnDeclaration &>();
if (new_column.ttl)
new_column.ttl.reset();
new_columns->children.emplace_back(new_column);
new_columns->children.emplace_back(new_column_ast);
}
ASTPtr new_query_ast = query_ast->clone();
@ -1751,8 +1750,6 @@ void ClusterCopier::createShardInternalTables(const ConnectionTimeouts & timeout
/// We need to update table definitions for each part, it could be changed after ALTER
task_shard.current_pull_table_create_query = getCreateTableForPullShard(timeouts, task_shard);
// task_shard.current_push_table_create_query = getCreateTableForPushShard(timeouts, task_shard);
/// Create local Distributed tables:
/// a table fetching data from current shard and a table inserting data to the whole destination cluster
@ -1774,8 +1771,9 @@ void ClusterCopier::createShardInternalTables(const ConnectionTimeouts & timeout
auto storage_shard_ast = createASTStorageDistributed(shard_read_cluster_name, task_table.table_pull.first, task_table.table_pull.second);
auto create_query_ast = removeAliasColumnsFromCreateQuery(task_shard.current_pull_table_create_query);
// auto create_query_ast = task_shard.current_pull_table_create_query;
auto create_query_ast = removeAliasMaterializedAndTTLColumnsFromCreateQuery(
task_shard.current_pull_table_create_query,
task_table.allow_to_copy_alias_and_materialized_columns);
auto create_table_pull_ast = rewriteCreateQueryStorage(create_query_ast, task_shard.table_read_shard, storage_shard_ast);
dropAndCreateLocalTable(create_table_pull_ast);

View File

@ -120,7 +120,7 @@ protected:
TaskStatus tryMoveAllPiecesToDestinationTable(const TaskTable & task_table, const String & partition_name);
/// Removes MATERIALIZED and ALIAS columns from create table query
static ASTPtr removeAliasColumnsFromCreateQuery(const ASTPtr & query_ast);
static ASTPtr removeAliasMaterializedAndTTLColumnsFromCreateQuery(const ASTPtr & query_ast, bool allow_to_copy_alias_and_materialized_columns);
bool tryDropPartitionPiece(ShardPartition & task_partition, size_t current_piece_number,
const zkutil::ZooKeeperPtr & zookeeper, const CleanStateClock & clean_state_clock);

View File

@ -92,7 +92,7 @@ inline void DB::TaskCluster::reloadSettings(const Poco::Util::AbstractConfigurat
/// Override important settings
settings_pull.readonly = 1;
settings_push.insert_distributed_sync = 1;
settings_push.insert_distributed_sync = true;
set_default_value(settings_pull.load_balancing, LoadBalancing::NEAREST_HOSTNAME);
set_default_value(settings_pull.max_threads, 1);
set_default_value(settings_pull.max_block_size, 8192UL);

View File

@ -56,6 +56,8 @@ struct TaskTable
/// Each piece will be copied independently. (10 by default)
size_t number_of_splits;
bool allow_to_copy_alias_and_materialized_columns{false};
String name_in_config;
/// Used as task ID
@ -250,7 +252,9 @@ inline TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConf
name_in_config = table_key;
number_of_splits = config.getUInt64(table_prefix + "number_of_splits", 10);
number_of_splits = config.getUInt64(table_prefix + "number_of_splits", 3);
allow_to_copy_alias_and_materialized_columns = config.getBool(table_prefix + "allow_to_copy_alias_and_materialized_columns", false);
cluster_pull_name = config.getString(table_prefix + "cluster_pull");
cluster_push_name = config.getString(table_prefix + "cluster_push");