split up copying and moving

This commit is contained in:
Nikita Mikhaylov 2020-03-13 17:19:20 +03:00
parent cce69b0744
commit 42fae556c1
3 changed files with 199 additions and 85 deletions

View File

@ -433,8 +433,14 @@ bool ClusterCopier::checkPartitionPieceIsClean(
bool ClusterCopier::checkAllPiecesInPartitionAreDone(const TaskTable & task_table, const String & partition_name, const TasksShard & shards_with_partition)
{
bool answer = true;
for (size_t piece_number = 0; piece_number < task_table.number_of_splits; piece_number++)
answer &= checkPartitionPieceIsDone(task_table, partition_name, piece_number, shards_with_partition);
for (size_t piece_number = 0; piece_number < task_table.number_of_splits; ++piece_number)
{
bool piece_is_done = checkPartitionPieceIsDone(task_table, partition_name, piece_number, shards_with_partition);
if (!piece_is_done)
LOG_DEBUG(log, "Partition " << partition_name << " piece " + toString(piece_number) + " is not already done.");
answer &= piece_is_done;
}
return answer;
}
@ -528,10 +534,156 @@ bool ClusterCopier::checkPartitionPieceIsDone(const TaskTable & task_table, cons
}
}
LOG_INFO(log, "Partition " << partition_name << " is copied successfully");
LOG_INFO(log, "Partition " << partition_name << " piece number " << toString(piece_number) << " is copied successfully");
return true;
}
PartitionTaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & task_table, const String & partition_name)
{
LOG_DEBUG(log, "Try to move " << partition_name << " to destionation table");
auto zookeeper = context.getZooKeeper();
const auto current_partition_attach_is_active = task_table.getPartitionAttachIsActivePath(partition_name);
const auto current_partition_attach_is_done = task_table.getPartitionAttachIsDonePath(partition_name);
/// Create ephemeral node to mark that we are active and process the partition
zookeeper->createAncestors(current_partition_attach_is_active);
zkutil::EphemeralNodeHolderPtr partition_attach_node_holder;
try
{
partition_attach_node_holder = zkutil::EphemeralNodeHolder::create(current_partition_attach_is_active, *zookeeper, host_id);
}
catch (const Coordination::Exception & e)
{
if (e.code == Coordination::ZNODEEXISTS)
{
LOG_DEBUG(log, "Someone is already moving pieces " << current_partition_attach_is_active);
return PartitionTaskStatus::Active;
}
throw;
}
/// Exit if task has been already processed;
/// create blocking node to signal cleaning up if it is abandoned
{
String status_data;
if (zookeeper->tryGet(current_partition_attach_is_done, status_data))
{
TaskStateWithOwner status = TaskStateWithOwner::fromString(status_data);
if (status.state == TaskState::Finished)
{
LOG_DEBUG(log, "All pieces for partition from this task " << current_partition_attach_is_active
<< " has been successfully moved to destination table by " << status.owner);
return PartitionTaskStatus::Finished;
}
/// Task is abandoned, because previously we created ephemeral node, possibly in other copier's process.
/// Initialize DROP PARTITION
LOG_DEBUG(log, "Moving piece for partition " << current_partition_attach_is_active
<< " has not been successfully finished by " << status.owner
<< ". Will try to move by myself.");
}
}
/// Try start processing, create node about it
{
String start_state = TaskStateWithOwner::getData(TaskState::Started, host_id);
zookeeper->create(current_partition_attach_is_done, start_state, zkutil::CreateMode::Persistent);
}
for (size_t current_piece_number = 0; current_piece_number < task_table.number_of_splits; ++current_piece_number)
{
/// Move partition to original destination table.
{
/// TODO: Execute alter table move partition.
LOG_DEBUG(log, "Trying to move partition " << partition_name
<< " piece " << toString(current_piece_number) << " to original table");
ASTPtr query_alter_ast;
String query_alter_ast_string;
DatabaseAndTableName original_table = task_table.table_push;
DatabaseAndTableName helping_table = DatabaseAndTableName(original_table.first, original_table.second + "_piece_" + toString(current_piece_number));
query_alter_ast_string += " ALTER TABLE " + getQuotedTable(original_table) +
" ATTACH PARTITION " + partition_name +
" FROM " + getQuotedTable(helping_table) +
" SETTINGS replication_alter_partitions_sync=2;";
LOG_DEBUG(log, "Executing ALTER query: " << query_alter_ast_string);
try
{
size_t num_nodes = 0;
for (UInt64 try_num = 0; try_num < max_shard_partition_piece_tries_for_alter; ++try_num)
{
///FIXME: We have to be sure that every node in cluster executed this query
UInt64 current_num_nodes = executeQueryOnCluster(
task_table.cluster_push,
query_alter_ast_string,
nullptr,
&task_cluster->settings_push,
PoolMode::GET_MANY,
ClusterExecutionMode::ON_EACH_NODE);
num_nodes = std::max(current_num_nodes, num_nodes);
}
LOG_INFO(log, "Number of nodes that executed ALTER query successfully : " << toString(num_nodes));
}
catch (...)
{
LOG_DEBUG(log, "Error while moving partition " << partition_name
<< " piece " << toString(current_piece_number) << "to original table");
throw;
}
try
{
String query_deduplicate_ast_string;
if (!task_table.isReplicatedTable())
{
query_deduplicate_ast_string += " OPTIMIZE TABLE " + getQuotedTable(original_table) +
" PARTITION " + partition_name + " DEDUPLICATE;";
LOG_DEBUG(log, "Executing OPTIMIZE DEDUPLICATE query: " << query_alter_ast_string);
UInt64 num_nodes = executeQueryOnCluster(
task_table.cluster_push,
query_deduplicate_ast_string,
nullptr,
&task_cluster->settings_push,
PoolMode::GET_MANY);
LOG_INFO(log, "Number of shard that executed OPTIMIZE DEDUPLICATE query successfully : " << toString(num_nodes));
}
}
catch(...)
{
LOG_DEBUG(log, "Error while executing OPTIMIZE DEDUPLICATE partition " << partition_name << "in the original table");
throw;
}
}
}
/// Create node to signal that we finished moving
{
String state_finished = TaskStateWithOwner::getData(TaskState::Finished, host_id);
zookeeper->set(current_partition_attach_is_done, state_finished, 0);
}
return PartitionTaskStatus::Finished;
}
/// Removes MATERIALIZED and ALIAS columns from create table query
ASTPtr ClusterCopier::removeAliasColumnsFromCreateQuery(const ASTPtr & query_ast)
{
@ -838,20 +990,37 @@ bool ClusterCopier::tryProcessTable(const ConnectionTimeouts & timeouts, TaskTab
/// Check that whole cluster partition is done
/// Firstly check the number of failed partition tasks, then look into ZooKeeper and ensure that each partition is done
bool partition_is_done = num_failed_shards == 0;
bool partition_copying_is_done = num_failed_shards == 0;
try
{
partition_is_done =
partition_copying_is_done =
!has_shard_to_process
|| (partition_is_done && checkAllPiecesInPartitionAreDone(task_table, partition_name, expected_shards));
|| (partition_copying_is_done && checkAllPiecesInPartitionAreDone(task_table, partition_name, expected_shards));
}
catch (...)
{
tryLogCurrentException(log);
partition_is_done = false;
partition_copying_is_done = false;
}
if (partition_is_done)
bool partition_moving_is_done = false;
/// Try to move only if all pieces were copied.
if (partition_copying_is_done)
{
try
{
auto res = tryMoveAllPiecesToDestinationTable(task_table, partition_name);
if (res == PartitionTaskStatus::Finished)
partition_moving_is_done = true;
}
catch (...)
{
tryLogCurrentException(log, "Some error occured while moving pieces to destination table for partition " + partition_name);
}
}
if (partition_copying_is_done && partition_moving_is_done)
{
task_table.finished_cluster_partitions.emplace(partition_name);
@ -1350,80 +1519,6 @@ PartitionTaskStatus ClusterCopier::processPartitionPieceTaskImpl(
tryLogCurrentException(log, "Error while creating original table. Maybe we are not first.");
}
/// Move partition to original destination table.
{
/// TODO: Execute alter table move partition.
LOG_DEBUG(log, "Trying to move partition " << task_partition.name
<< " piece " << toString(current_piece_number) << " to original table");
ASTPtr query_alter_ast;
String query_alter_ast_string;
DatabaseAndTableName original_table = task_table.table_push;
DatabaseAndTableName helping_table = DatabaseAndTableName(original_table.first, original_table.second + "_piece_" + toString(current_piece_number));
query_alter_ast_string += " ALTER TABLE " + getQuotedTable(original_table) +
" ATTACH PARTITION " + task_partition.name +
" FROM " + getQuotedTable(helping_table) +
" SETTINGS replication_alter_partitions_sync=2;";
LOG_DEBUG(log, "Executing ALTER query: " << query_alter_ast_string);
try
{
size_t num_nodes = 0;
for (UInt64 try_num = 0; try_num < max_shard_partition_piece_tries_for_alter; ++try_num)
{
///FIXME: We have to be sure that every node in cluster executed this query
UInt64 current_num_nodes = executeQueryOnCluster(
task_table.cluster_push,
query_alter_ast_string,
nullptr,
&task_cluster->settings_push,
PoolMode::GET_MANY,
ClusterExecutionMode::ON_EACH_NODE);
num_nodes = std::max(current_num_nodes, num_nodes);
}
LOG_INFO(log, "Number of shard that executed ALTER query successfully : " << toString(num_nodes));
}
catch (...)
{
LOG_DEBUG(log, "Error while moving partition " << task_partition.name
<< " piece " << toString(current_piece_number) << "to original table");
throw;
}
try
{
String query_deduplicate_ast_string;
if (!task_table.isReplicatedTable())
{
query_deduplicate_ast_string += " OPTIMIZE TABLE " + getQuotedTable(original_table) +
" PARTITION " + task_partition.name + " DEDUPLICATE;";
LOG_DEBUG(log, "Executing OPTIMIZE DEDUPLICATE query: " << query_alter_ast_string);
UInt64 num_nodes = executeQueryOnCluster(
task_table.cluster_push,
query_deduplicate_ast_string,
nullptr,
&task_cluster->settings_push,
PoolMode::GET_MANY);
LOG_INFO(log, "Number of shard that executed OPTIMIZE DEDUPLICATE query successfully : " << toString(num_nodes));
}
}
catch(...)
{
LOG_DEBUG(log, "Error while executing OPTIMIZE DEDUPLICATE partition " << task_partition.name << "in the original table");
throw;
}
}
/// Finalize the processing, change state of current partition task (and also check is_dirty flag)
{
String state_finished = TaskStateWithOwner::getData(TaskState::Finished, host_id);

View File

@ -104,6 +104,10 @@ protected:
bool checkPartitionPieceIsDone(const TaskTable & task_table, const String & partition_name,
size_t piece_number, const TasksShard & shards_with_partition);
/*Alter successful insertion to helping tables it will move all pieces to destination table*/
PartitionTaskStatus tryMoveAllPiecesToDestinationTable(const TaskTable & task_table, const String & partition_name);
/// Removes MATERIALIZED and ALIAS columns from create table query
ASTPtr removeAliasColumnsFromCreateQuery(const ASTPtr & query_ast);

View File

@ -25,6 +25,10 @@ struct TaskTable {
String getPartitionPath(const String & partition_name) const;
String getPartitionAttachIsActivePath(const String & partition_name) const;
String getPartitionAttachIsDonePath(const String & partition_name) const;
String getPartitionPiecePath(const String & partition_name, const size_t piece_number) const;
String getCertainPartitionIsDirtyPath(const String & partition_name) const;
@ -42,7 +46,7 @@ struct TaskTable {
[[maybe_unused]] String getReplicatedEngineFirstArgumentForCurrentPiece(const size_t piece_number) const;
bool isReplicatedTable() { return engine_push_zk_path != ""; }
bool isReplicatedTable() const { return engine_push_zk_path != ""; }
/// Partitions will be splitted into number-of-splits pieces.
/// Each piece will be copied independently. (10 by default)
@ -191,12 +195,23 @@ struct TaskShard
};
inline String TaskTable::getPartitionPath(const String &partition_name) const {
inline String TaskTable::getPartitionPath(const String & partition_name) const
{
return task_cluster.task_zookeeper_path // root
+ "/tables/" + table_id // tables/dst_cluster.merge.hits
+ "/" + escapeForFileName(partition_name); // 201701
}
inline String TaskTable::getPartitionAttachIsActivePath(const String & partition_name) const
{
return getPartitionPath(partition_name) + "/attach_active";
}
inline String TaskTable::getPartitionAttachIsDonePath(const String & partition_name) const
{
return getPartitionPath(partition_name) + "/attach_is_done";
}
inline String TaskTable::getPartitionPiecePath(const String & partition_name, size_t piece_number) const
{
assert(piece_number < number_of_splits);
@ -241,7 +256,7 @@ inline TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConf
name_in_config = table_key;
number_of_splits = config.getUInt64(table_prefix + "number_of_splits", 2);
number_of_splits = config.getUInt64(table_prefix + "number_of_splits", 10);
cluster_pull_name = config.getString(table_prefix + "cluster_pull");
cluster_push_name = config.getString(table_prefix + "cluster_push");