Merge pull request #21912 from songenjie/clickhouse-copier-create-destination-once

[ClickHouse][Copier] Improve copier work
This commit is contained in:
Nikita Mikhaylov 2021-03-30 14:42:00 +03:00 committed by GitHub
commit 18dc213cee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 122 additions and 57 deletions

View File

@ -599,11 +599,13 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
toString(current_piece_number));
Settings settings_push = task_cluster->settings_push;
/// It is important, ALTER ATTACH PARTITION must be done synchronously
/// And we will execute this ALTER query on each replica of a shard.
/// It is correct, because this query is idempotent.
settings_push.replication_alter_partitions_sync = 2;
ClusterExecutionMode execution_mode = ClusterExecutionMode::ON_EACH_NODE;
UInt64 max_successful_executions_per_shard = 0;
if (settings_push.replication_alter_partitions_sync == 1)
{
execution_mode = ClusterExecutionMode::ON_EACH_SHARD;
max_successful_executions_per_shard = 1;
}
query_alter_ast_string += " ALTER TABLE " + getQuotedTable(original_table) +
((partition_name == "'all'") ? " ATTACH PARTITION ID " : " ATTACH PARTITION ") + partition_name +
@ -613,14 +615,33 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
try
{
size_t num_nodes = executeQueryOnCluster(
task_table.cluster_push,
query_alter_ast_string,
settings_push,
PoolMode::GET_MANY,
ClusterExecutionMode::ON_EACH_NODE);
/// Try attach partition on each shard
UInt64 num_nodes = executeQueryOnCluster(
task_table.cluster_push,
query_alter_ast_string,
task_cluster->settings_push,
PoolMode::GET_MANY,
execution_mode,
max_successful_executions_per_shard);
LOG_INFO(log, "Number of nodes that executed ALTER query successfully : {}", toString(num_nodes));
if (settings_push.replication_alter_partitions_sync == 1)
{
LOG_INFO(
log,
"Destination tables {} have been executed alter query successfully on {} shards of {}",
getQuotedTable(task_table.table_push),
num_nodes,
task_table.cluster_push->getShardCount());
if (num_nodes != task_table.cluster_push->getShardCount())
{
return TaskStatus::Error;
}
}
else
{
LOG_INFO(log, "Number of nodes that executed ALTER query successfully : {}", toString(num_nodes));
}
}
catch (...)
{
@ -856,6 +877,16 @@ bool ClusterCopier::tryDropPartitionPiece(
bool ClusterCopier::tryProcessTable(const ConnectionTimeouts & timeouts, TaskTable & task_table)
{
/// Create destination table
TaskStatus task_status = TaskStatus::Error;
task_status = tryCreateDestinationTable(timeouts, task_table);
/// Exit if success
if (task_status != TaskStatus::Finished)
{
LOG_WARNING(log, "Create destination Tale Failed ");
return false;
}
/// An heuristic: if previous shard is already done, then check next one without sleeps due to max_workers constraint
bool previous_shard_is_instantly_finished = false;
@ -932,7 +963,7 @@ bool ClusterCopier::tryProcessTable(const ConnectionTimeouts & timeouts, TaskTab
/// Do not sleep if there is a sequence of already processed shards to increase startup
bool is_unprioritized_task = !previous_shard_is_instantly_finished && shard->priority.is_remote;
TaskStatus task_status = TaskStatus::Error;
task_status = TaskStatus::Error;
bool was_error = false;
has_shard_to_process = true;
for (UInt64 try_num = 0; try_num < max_shard_partition_tries; ++try_num)
@ -1050,6 +1081,44 @@ bool ClusterCopier::tryProcessTable(const ConnectionTimeouts & timeouts, TaskTab
return table_is_done;
}
TaskStatus ClusterCopier::tryCreateDestinationTable(const ConnectionTimeouts & timeouts, TaskTable & task_table)
{
/// Try create original table (if not exists) on each shard
//TaskTable & task_table = task_shard.task_table;
const TaskShardPtr task_shard = task_table.all_shards.at(0);
/// We need to update table definitions for each part, it could be changed after ALTER
task_shard->current_pull_table_create_query = getCreateTableForPullShard(timeouts, *task_shard);
try
{
auto create_query_push_ast
= rewriteCreateQueryStorage(task_shard->current_pull_table_create_query, task_table.table_push, task_table.engine_push_ast);
auto & create = create_query_push_ast->as<ASTCreateQuery &>();
create.if_not_exists = true;
InterpreterCreateQuery::prepareOnClusterQuery(create, context, task_table.cluster_push_name);
String query = queryToString(create_query_push_ast);
LOG_DEBUG(log, "Create destination tables. Query: {}", query);
UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, task_cluster->settings_push, PoolMode::GET_MANY);
LOG_INFO(
log,
"Destination tables {} have been created on {} shards of {}",
getQuotedTable(task_table.table_push),
shards,
task_table.cluster_push->getShardCount());
if (shards != task_table.cluster_push->getShardCount())
{
return TaskStatus::Error;
}
}
catch (...)
{
tryLogCurrentException(log, "Error while creating original table. Maybe we are not first.");
}
return TaskStatus::Finished;
}
/// Job for copying partition from particular shard.
TaskStatus ClusterCopier::tryProcessPartitionTask(const ConnectionTimeouts & timeouts, ShardPartition & task_partition, bool is_unprioritized_task)
{
@ -1366,8 +1435,17 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
LOG_DEBUG(log, "Create destination tables. Query: {}", query);
UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, task_cluster->settings_push, PoolMode::GET_MANY);
LOG_DEBUG(log, "Destination tables {} have been created on {} shards of {}",
getQuotedTable(task_table.table_push), shards, task_table.cluster_push->getShardCount());
LOG_INFO(
log,
"Destination tables {} have been created on {} shards of {}",
getQuotedTable(task_table.table_push),
shards,
task_table.cluster_push->getShardCount());
if (shards != task_table.cluster_push->getShardCount())
{
return TaskStatus::Error;
}
}
/// Do the copying
@ -1477,26 +1555,6 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
LOG_INFO(log, "Partition {} piece {} copied. But not moved to original destination table.", task_partition.name, toString(current_piece_number));
/// Try create original table (if not exists) on each shard
try
{
auto create_query_push_ast = rewriteCreateQueryStorage(task_shard.current_pull_table_create_query,
task_table.table_push, task_table.engine_push_ast);
auto & create = create_query_push_ast->as<ASTCreateQuery &>();
create.if_not_exists = true;
InterpreterCreateQuery::prepareOnClusterQuery(create, context, task_table.cluster_push_name);
String query = queryToString(create_query_push_ast);
LOG_DEBUG(log, "Create destination tables. Query: {}", query);
UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, task_cluster->settings_push, PoolMode::GET_MANY);
LOG_DEBUG(log, "Destination tables {} have been created on {} shards of {}", getQuotedTable(task_table.table_push), shards, task_table.cluster_push->getShardCount());
}
catch (...)
{
tryLogCurrentException(log, "Error while creating original table. Maybe we are not first.");
}
/// Finalize the processing, change state of current partition task (and also check is_dirty flag)
{
String state_finished = TaskStateWithOwner::getData(TaskState::Finished, host_id);
@ -1538,33 +1596,36 @@ void ClusterCopier::dropLocalTableIfExists(const DatabaseAndTableName & table_na
interpreter.execute();
}
void ClusterCopier::dropHelpingTablesByPieceNumber(const TaskTable & task_table, size_t current_piece_number)
{
LOG_DEBUG(log, "Removing helping tables piece {}", current_piece_number);
DatabaseAndTableName original_table = task_table.table_push;
DatabaseAndTableName helping_table
= DatabaseAndTableName(original_table.first, original_table.second + "_piece_" + toString(current_piece_number));
String query = "DROP TABLE IF EXISTS " + getQuotedTable(helping_table);
const ClusterPtr & cluster_push = task_table.cluster_push;
Settings settings_push = task_cluster->settings_push;
LOG_DEBUG(log, "Execute distributed DROP TABLE: {}", query);
/// We have to drop partition_piece on each replica
UInt64 num_nodes = executeQueryOnCluster(cluster_push, query, settings_push, PoolMode::GET_MANY, ClusterExecutionMode::ON_EACH_NODE);
LOG_INFO(log, "DROP TABLE query was successfully executed on {} nodes.", toString(num_nodes));
}
void ClusterCopier::dropHelpingTables(const TaskTable & task_table)
{
LOG_DEBUG(log, "Removing helping tables");
for (size_t current_piece_number = 0; current_piece_number < task_table.number_of_splits; ++current_piece_number)
{
DatabaseAndTableName original_table = task_table.table_push;
DatabaseAndTableName helping_table = DatabaseAndTableName(original_table.first, original_table.second + "_piece_" + toString(current_piece_number));
String query = "DROP TABLE IF EXISTS " + getQuotedTable(helping_table);
const ClusterPtr & cluster_push = task_table.cluster_push;
Settings settings_push = task_cluster->settings_push;
LOG_DEBUG(log, "Execute distributed DROP TABLE: {}", query);
/// We have to drop partition_piece on each replica
UInt64 num_nodes = executeQueryOnCluster(
cluster_push, query,
settings_push,
PoolMode::GET_MANY,
ClusterExecutionMode::ON_EACH_NODE);
LOG_DEBUG(log, "DROP TABLE query was successfully executed on {} nodes.", toString(num_nodes));
dropHelpingTablesByPieceNumber(task_table, current_piece_number);
}
}
void ClusterCopier::dropParticularPartitionPieceFromAllHelpingTables(const TaskTable & task_table, const String & partition_name)
{
LOG_DEBUG(log, "Try drop partition partition from all helping tables.");
@ -1586,7 +1647,7 @@ void ClusterCopier::dropParticularPartitionPieceFromAllHelpingTables(const TaskT
PoolMode::GET_MANY,
ClusterExecutionMode::ON_EACH_NODE);
LOG_DEBUG(log, "DROP PARTITION query was successfully executed on {} nodes.", toString(num_nodes));
LOG_INFO(log, "DROP PARTITION query was successfully executed on {} nodes.", toString(num_nodes));
}
LOG_DEBUG(log, "All helping tables dropped partition {}", partition_name);
}

View File

@ -123,12 +123,13 @@ protected:
bool tryDropPartitionPiece(ShardPartition & task_partition, const size_t current_piece_number,
const zkutil::ZooKeeperPtr & zookeeper, const CleanStateClock & clean_state_clock);
static constexpr UInt64 max_table_tries = 1000;
static constexpr UInt64 max_shard_partition_tries = 600;
static constexpr UInt64 max_shard_partition_piece_tries_for_alter = 100;
static constexpr UInt64 max_table_tries = 3;
static constexpr UInt64 max_shard_partition_tries = 3;
static constexpr UInt64 max_shard_partition_piece_tries_for_alter = 3;
bool tryProcessTable(const ConnectionTimeouts & timeouts, TaskTable & task_table);
TaskStatus tryCreateDestinationTable(const ConnectionTimeouts & timeouts, TaskTable & task_table);
/// Job for copying partition from particular shard.
TaskStatus tryProcessPartitionTask(const ConnectionTimeouts & timeouts,
ShardPartition & task_partition,
@ -149,6 +150,8 @@ protected:
void dropHelpingTables(const TaskTable & task_table);
void dropHelpingTablesByPieceNumber(const TaskTable & task_table, size_t current_piece_number);
/// Is used for usage less disk space.
/// After all pieces were successfully moved to original destination
/// table we can get rid of partition pieces (partitions in helping tables).

View File

@ -98,6 +98,7 @@ inline void DB::TaskCluster::reloadSettings(const Poco::Util::AbstractConfigurat
set_default_value(settings_pull.max_block_size, 8192UL);
set_default_value(settings_pull.preferred_block_size_bytes, 0);
set_default_value(settings_push.insert_distributed_timeout, 0);
set_default_value(settings_push.replication_alter_partitions_sync, 2);
}
}