successful copying

This commit is contained in:
Nikita Mikhaylov 2020-02-20 20:26:20 +03:00
parent ce16a6e536
commit 1be4a35f15
5 changed files with 81 additions and 57 deletions

View File

@ -17,7 +17,7 @@ void ClusterCopier::init()
{
if (response.error != Coordination::ZOK)
return;
UInt64 version = ++task_descprtion_version;
UInt64 version = ++task_description_version;
LOG_DEBUG(log, "Task description should be updated, local version " << version);
};
@ -227,9 +227,9 @@ void ClusterCopier::reloadTaskDescription()
void ClusterCopier::updateConfigIfNeeded()
{
UInt64 version_to_update = task_descprtion_version;
bool is_outdated_version = task_descprtion_current_version != version_to_update;
bool is_expired_session = !task_description_watch_zookeeper || task_description_watch_zookeeper->expired();
UInt64 version_to_update = task_description_version;
bool is_outdated_version = task_description_current_version != version_to_update;
bool is_expired_session = !task_description_watch_zookeeper || task_description_watch_zookeeper->expired();
if (!is_outdated_version && !is_expired_session)
return;
@ -237,7 +237,7 @@ void ClusterCopier::updateConfigIfNeeded()
LOG_DEBUG(log, "Updating task description");
reloadTaskDescription();
task_descprtion_current_version = version_to_update;
task_description_current_version = version_to_update;
}
void ClusterCopier::process(const ConnectionTimeouts & timeouts)
@ -312,6 +312,13 @@ void ClusterCopier::process(const ConnectionTimeouts & timeouts)
/// Protected section
/*
* Creates task worker node and checks maximum number of workers not to exceed the limit.
* To achive this we have to check version of workers_version_path node and create current_worker_path
* node atomically.
* */
zkutil::EphemeralNodeHolder::Ptr ClusterCopier::createTaskWorkerNodeAndWaitIfNeed(
const zkutil::ZooKeeperPtr & zookeeper,
const String & description,
@ -324,8 +331,8 @@ zkutil::EphemeralNodeHolder::Ptr ClusterCopier::createTaskWorkerNodeAndWaitIfNee
std::this_thread::sleep_for(current_sleep_time);
String workers_version_path = getWorkersPathVersion();
String workers_path = getWorkersPath();
String current_worker_path = getCurrentWorkerNodePath();
String workers_path = getWorkersPath();
String current_worker_path = getCurrentWorkerNodePath();
UInt64 num_bad_version_errors = 0;
@ -629,6 +636,9 @@ std::shared_ptr<ASTCreateQuery> ClusterCopier::rewriteCreateQueryStorage(const A
return res;
}
/// TODO: implement tryDropPartitionPiece which is simply tryDropPartition, but on different table.
bool ClusterCopier::tryDropPartition(ShardPartition & task_partition, const zkutil::ZooKeeperPtr & zookeeper, const CleanStateClock & clean_state_clock)
{
if (is_safe_mode)
@ -636,11 +646,11 @@ bool ClusterCopier::tryDropPartition(ShardPartition & task_partition, const zkut
TaskTable & task_table = task_partition.task_shard.task_table;
const String current_shards_path = task_partition.getPartitionShardsPath();
const String current_shards_path = task_partition.getPartitionShardsPath();
const String current_partition_active_workers_dir = task_partition.getPartitionActiveWorkersPath();
const String is_dirty_flag_path = task_partition.getCommonPartitionIsDirtyPath();
const String dirty_cleaner_path = is_dirty_flag_path + "/cleaner";
const String is_dirty_cleaned_path = task_partition.getCommonPartitionIsCleanedPath();
const String is_dirty_flag_path = task_partition.getCommonPartitionIsDirtyPath();
const String dirty_cleaner_path = is_dirty_flag_path + "/cleaner";
const String is_dirty_cleaned_path = task_partition.getCommonPartitionIsCleanedPath();
zkutil::EphemeralNodeHolder::Ptr cleaner_holder;
try
@ -957,12 +967,15 @@ PartitionTaskStatus ClusterCopier::iterateThroughAllPiecesInPartition(const Conn
}
PartitionTaskStatus ClusterCopier::processPartitionPieceTaskImpl(const ConnectionTimeouts & timeouts, ShardPartition & task_partition,
const size_t current_piece_number, bool is_unprioritized_task)
/*...*/
PartitionTaskStatus ClusterCopier::processPartitionPieceTaskImpl(
const ConnectionTimeouts & timeouts, ShardPartition & task_partition,
const size_t current_piece_number, bool is_unprioritized_task)
{
TaskShard & task_shard = task_partition.task_shard;
TaskTable & task_table = task_shard.task_table;
ClusterPartition & cluster_partition = task_table.getClusterPartition(task_partition.name);
ClusterPartition & cluster_partition = task_table.getClusterPartition(task_partition.name);
ShardPartitionPiece & partition_piece = task_partition.pieces[current_piece_number];
const size_t number_of_splits = task_table.number_of_splits;
@ -974,15 +987,15 @@ PartitionTaskStatus ClusterCopier::processPartitionPieceTaskImpl(const Connectio
auto zookeeper = context.getZooKeeper();
const String piece_is_dirty_flag_path = partition_piece.getPartitionPieceIsDirtyPath();
const String piece_is_dirty_cleaned_path = partition_piece.getPartitionPieceIsCleanedPath();
const String piece_is_dirty_flag_path = partition_piece.getPartitionPieceIsDirtyPath();
const String piece_is_dirty_cleaned_path = partition_piece.getPartitionPieceIsCleanedPath();
const String current_task_piece_is_active_path = partition_piece.getActiveWorkerPath();
const String current_task_piece_status_path = partition_piece.getShardStatusPath();
const String current_task_piece_status_path = partition_piece.getShardStatusPath();
/// Auxiliary functions:
/// Creates is_dirty node to initialize DROP PARTITION
auto create_is_dirty_node = [&, this] (const CleanStateClock & clock)
auto create_is_dirty_node = [&] (const CleanStateClock & clock)
{
if (clock.is_stale())
LOG_DEBUG(log, "Clean state clock is stale while setting dirty flag, cowardly bailing");
@ -1001,16 +1014,17 @@ PartitionTaskStatus ClusterCopier::processPartitionPieceTaskImpl(const Connectio
};
/// Returns SELECT query filtering current partition and applying user filter
auto get_select_query = [&] (const DatabaseAndTableName & from_table, const String & fields, String limit = "",
bool enable_splitting = false)
auto get_select_query = [&] (const DatabaseAndTableName & from_table, const String & fields, bool enable_splitting, String limit = "")
{
String query;
query += "SELECT " + fields + " FROM " + getQuotedTable(from_table);
/// TODO: Bad, it is better to rewrite with ASTLiteral(partition_key_field)
query += " WHERE (" + queryToString(task_table.engine_push_partition_key_ast) + " = (" + task_partition.name + " AS partition_key))";
if (enable_splitting)
query += " AND ( cityHash64(" + primary_key_comma_separated + ") = " + toString(current_piece_number) + " )";
query += " AND ( cityHash64(" + primary_key_comma_separated + ") %" + toString(number_of_splits) +
" = " + toString(current_piece_number) + " )";
if (!task_table.where_condition_str.empty())
query += " AND (" + task_table.where_condition_str + ")";
@ -1031,23 +1045,26 @@ PartitionTaskStatus ClusterCopier::processPartitionPieceTaskImpl(const Connectio
LogicalClock task_start_clock;
{
Coordination::Stat stat{};
if (zookeeper->exists(task_partition.getPartitionShardsPath(), &stat))
if (zookeeper->exists(partition_piece.getPartitionPieceShardsPath(), &stat))
task_start_clock = LogicalClock(stat.mzxid);
}
/// Do not start if partition is dirty, try to clean it
/// Do not start if partition piece is dirty, try to clean it
if (clean_state_clock.is_clean()
&& (!task_start_clock.hasHappened() || clean_state_clock.discovery_zxid <= task_start_clock))
{
LOG_DEBUG(log, "Partition " << task_partition.name << " appears to be clean");
LOG_DEBUG(log, "Partition " << task_partition.name
<< " piece " + toString(current_piece_number) + " appears to be clean");
zookeeper->createAncestors(current_task_piece_status_path);
}
else
{
LOG_DEBUG(log, "Partition " << task_partition.name << " is dirty, try to drop it");
LOG_DEBUG(log, "Partition " << task_partition.name
<< " piece " + toString(current_piece_number) + " is dirty, try to drop it");
try
{
/// TODO: tryDropPartitionPiece.
tryDropPartition(task_partition, zookeeper, clean_state_clock);
}
catch (...)
@ -1085,13 +1102,16 @@ PartitionTaskStatus ClusterCopier::processPartitionPieceTaskImpl(const Connectio
TaskStateWithOwner status = TaskStateWithOwner::fromString(status_data);
if (status.state == TaskState::Finished)
{
LOG_DEBUG(log, "Task " << current_task_piece_status_path << " has been successfully executed by " << status.owner);
LOG_DEBUG(log, "Task " << current_task_piece_status_path
<< " has been successfully executed by " << status.owner);
return PartitionTaskStatus::Finished;
}
// Task is abandoned, initialize DROP PARTITION
LOG_DEBUG(log, "Task " << current_task_piece_status_path << " has not been successfully finished by " <<
status.owner << ". Partition will be dropped and refilled.");
/// Task is abandoned, because previously we created ephemeral node, possibly in other copier's process.
/// Initialize DROP PARTITION
LOG_DEBUG(log, "Task " << current_task_piece_status_path
<< " has not been successfully finished by " << status.owner
<< ". Partition will be dropped and refilled.");
create_is_dirty_node(clean_state_clock);
return PartitionTaskStatus::Error;
@ -1101,13 +1121,14 @@ PartitionTaskStatus ClusterCopier::processPartitionPieceTaskImpl(const Connectio
/// Check that destination partition is empty if we are first worker
/// NOTE: this check is incorrect if pull and push tables have different partition key!
String clean_start_status;
if (!zookeeper->tryGet(task_partition.getPartitionCleanStartPath(), clean_start_status) || clean_start_status != "ok")
if (!zookeeper->tryGet(partition_piece.getPartitionPieceCleanStartPath(), clean_start_status) || clean_start_status != "ok")
{
zookeeper->createIfNotExists(task_partition.getPartitionCleanStartPath(), "");
auto checker = zkutil::EphemeralNodeHolder::create(task_partition.getPartitionCleanStartPath() + "/checker",
zookeeper->createIfNotExists(partition_piece.getPartitionPieceCleanStartPath(), "");
auto checker = zkutil::EphemeralNodeHolder::create(partition_piece.getPartitionPieceCleanStartPath() + "/checker",
*zookeeper, host_id);
// Maybe we are the first worker
ASTPtr query_select_ast = get_select_query(task_shard.table_split_shard, "count()");
/// TODO: Why table_split_shard???
ASTPtr query_select_ast = get_select_query(task_shard.table_split_shard, "count()", /*enable_splitting*/ true);
UInt64 count;
{
Context local_context = context;
@ -1122,20 +1143,21 @@ PartitionTaskStatus ClusterCopier::processPartitionPieceTaskImpl(const Connectio
if (count != 0)
{
Coordination::Stat stat_shards{};
zookeeper->get(task_partition.getPartitionShardsPath(), & stat_shards);
zookeeper->get(partition_piece.getPartitionPieceShardsPath(), &stat_shards);
/// NOTE: partition is still fresh if dirt discovery happens before cleaning
if (stat_shards.numChildren == 0)
{
LOG_WARNING(log, "There are no workers for partition " << task_partition.name
<< ", but destination table contains " << count << " rows"
<< ". Partition will be dropped and refilled.");
<< " piece " << toString(current_piece_number)
<< ", but destination table contains " << count << " rows"
<< ". Partition will be dropped and refilled.");
create_is_dirty_node(clean_state_clock);
return PartitionTaskStatus::Error;
}
}
zookeeper->set(task_partition.getPartitionCleanStartPath(), "ok");
zookeeper->set(partition_piece.getPartitionPieceCleanStartPath(), "ok");
}
/// At this point, we need to sync that the destination table is clean
/// before any actual work
@ -1146,12 +1168,14 @@ PartitionTaskStatus ClusterCopier::processPartitionPieceTaskImpl(const Connectio
CleanStateClock new_clean_state_clock (zookeeper, piece_is_dirty_flag_path, piece_is_dirty_cleaned_path);
if (clean_state_clock != new_clean_state_clock)
{
LOG_INFO(log, "Partition " << task_partition.name << " clean state changed, cowardly bailing");
LOG_INFO(log, "Partition " << task_partition.name << " piece "
<< toString(current_piece_number) << " clean state changed, cowardly bailing");
return PartitionTaskStatus::Error;
}
else if (!new_clean_state_clock.is_clean())
{
LOG_INFO(log, "Partition " << task_partition.name << " is dirty and will be dropped and refilled");
LOG_INFO(log, "Partition " << task_partition.name << " piece "
<< toString(current_piece_number) << " is dirty and will be dropped and refilled");
create_is_dirty_node(new_clean_state_clock);
return PartitionTaskStatus::Error;
}
@ -1163,7 +1187,7 @@ PartitionTaskStatus ClusterCopier::processPartitionPieceTaskImpl(const Connectio
/// Define push table for current partition piece
auto database_and_table_for_current_piece= std::pair<String, String>(
task_table.table_push.first,
task_table.table_push.second + ".piece_" + toString(current_piece_number));
task_table.table_push.second + "_piece_" + toString(current_piece_number));
auto create_query_push_ast = rewriteCreateQueryStorage(task_shard.current_pull_table_create_query,
database_and_table_for_current_piece, task_table.engine_push_ast);
@ -1174,8 +1198,8 @@ PartitionTaskStatus ClusterCopier::processPartitionPieceTaskImpl(const Connectio
UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query,
create_query_push_ast, &task_cluster->settings_push,
PoolMode::GET_MANY);
LOG_DEBUG(log, "Destination tables " << getQuotedTable(task_table.table_push) <<
" have been created on " << shards << " shards of " << task_table.cluster_push->getShardCount());
LOG_DEBUG(log, "Destination tables " << getQuotedTable(task_table.table_push)
<< " have been created on " << shards << " shards of " << task_table.cluster_push->getShardCount());
}
/// Do the copying
@ -1188,7 +1212,7 @@ PartitionTaskStatus ClusterCopier::processPartitionPieceTaskImpl(const Connectio
}
// Select all fields
ASTPtr query_select_ast = get_select_query(task_shard.table_read_shard, "*", inject_fault ? "1" : "");
ASTPtr query_select_ast = get_select_query(task_shard.table_read_shard, "*", /*enable_splitting*/ inject_fault ? "1" : "");
LOG_DEBUG(log, "Executing SELECT query and pull from " << task_shard.getDescription()
<< " : " << queryToString(query_select_ast));
@ -1361,7 +1385,7 @@ void ClusterCopier::createShardInternalTables(const ConnectionTimeouts & timeout
String split_shard_prefix = ".split.";
task_shard.table_read_shard = DatabaseAndTableName(working_database_name, read_shard_prefix + task_table.table_id);
task_shard.table_split_shard = DatabaseAndTableName(
working_database_name, split_shard_prefix + task_table.table_id + ".piece_" + toString(piece_number));
working_database_name, split_shard_prefix + task_table.table_id + "_piece_" + toString(piece_number));
/// Create special cluster with single shard
String shard_read_cluster_name = read_shard_prefix + task_table.cluster_pull_name;

View File

@ -156,8 +156,8 @@ private:
String working_database_name;
/// Auto update config stuff
UInt64 task_descprtion_current_version = 1;
std::atomic<UInt64> task_descprtion_version{1};
UInt64 task_description_current_version = 1;
std::atomic<UInt64> task_description_version{1};
Coordination::WatchCallback task_description_watch_callback;
/// ZooKeeper session used to set the callback
zkutil::ZooKeeperPtr task_description_watch_zookeeper;

View File

@ -56,7 +56,7 @@ inline String ShardPartitionPiece::getPartitionPieceIsCleanedPath() const
inline String ShardPartitionPiece::getPartitionPieceActiveWorkersPath() const
{
return getPartitionPiecePath() + "/partition_active_workers";
return getPartitionPiecePath() + "/partition_piece_active_workers";
}
inline String ShardPartitionPiece::getActiveWorkerPath() const

View File

@ -20,11 +20,11 @@ struct TaskTable {
String getPartitionPath(const String & partition_name) const;
[[maybe_unused]] String getPartitionPiecePath(const String & partition_name, const size_t piece_number) const;
String getPartitionPiecePath(const String & partition_name, const size_t piece_number) const;
String getCertainPartitionIsDirtyPath(const String & partition_name) const;
[[maybe_unused]] String getCertainPartitionPieceIsDirtyPath(const String & partition_name, const size_t piece_number) const
String getCertainPartitionPieceIsDirtyPath(const String & partition_name, const size_t piece_number) const
{
UNUSED(partition_name);
UNUSED(piece_number);
@ -33,7 +33,7 @@ struct TaskTable {
String getCertainPartitionIsCleanedPath(const String & partition_name) const;
[[maybe_unused]] String getCertainPartitionPieceIsCleanedPath(const String & partition_name, const size_t piece_number) const
String getCertainPartitionPieceIsCleanedPath(const String & partition_name, const size_t piece_number) const
{
UNUSED(partition_name);
UNUSED(piece_number);
@ -42,7 +42,7 @@ struct TaskTable {
String getCertainPartitionTaskStatusPath(const String & partition_name) const;
[[maybe_unused]] String getCertainPartitionPieceTaskStatusPath(const String & partition_name, const size_t piece_number) const
String getCertainPartitionPieceTaskStatusPath(const String & partition_name, const size_t piece_number) const
{
UNUSED(partition_name);
UNUSED(piece_number);
@ -181,7 +181,10 @@ struct TaskShard
/// Internal distributed tables
DatabaseAndTableName table_read_shard;
DatabaseAndTableName table_split_shard;
std::vector<DatabaseAndTableName> list_of_split_tables_on_shard;
};
@ -255,7 +258,7 @@ inline TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConf
auxiliary_engine_split_asts.emplace_back
(
createASTStorageDistributed(cluster_push_name, table_push.first,
table_push.second + ".piece_" + toString(piece_number), sharding_key_ast)
table_push.second + "_piece_" + toString(piece_number), sharding_key_ast)
);
}
}

View File

@ -157,11 +157,8 @@ public:
bool is_clean() const
{
return
!is_stale()
&& (
!discovery_zxid.hasHappened()
|| (clean_state_zxid.hasHappened() && discovery_zxid <= clean_state_zxid));
return !is_stale()
&& (!discovery_zxid.hasHappened() || (clean_state_zxid.hasHappened() && discovery_zxid <= clean_state_zxid));
}
bool is_stale() const