Merge pull request #11313 from ClickHouse/fix-race-condition-cluster-copier

Fix data race in clickhouse-copier
This commit is contained in:
alexey-milovidov 2020-05-31 17:00:54 +03:00 committed by GitHub
commit 3de876b2bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 27 additions and 39 deletions

View File

@ -4,6 +4,8 @@
#include <Common/ZooKeeper/ZooKeeper.h> #include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/KeeperException.h> #include <Common/ZooKeeper/KeeperException.h>
#include <Common/setThreadName.h>
namespace DB namespace DB
{ {
@ -177,7 +179,11 @@ void ClusterCopier::discoverTablePartitions(const ConnectionTimeouts & timeouts,
ThreadPool thread_pool(num_threads ? num_threads : 2 * getNumberOfPhysicalCPUCores()); ThreadPool thread_pool(num_threads ? num_threads : 2 * getNumberOfPhysicalCPUCores());
for (const TaskShardPtr & task_shard : task_table.all_shards) for (const TaskShardPtr & task_shard : task_table.all_shards)
thread_pool.scheduleOrThrowOnError([this, timeouts, task_shard]() { discoverShardPartitions(timeouts, task_shard); }); thread_pool.scheduleOrThrowOnError([this, timeouts, task_shard]()
{
setThreadName("DiscoverPartns");
discoverShardPartitions(timeouts, task_shard);
});
LOG_DEBUG(log, "Waiting for {} setup jobs", thread_pool.active()); LOG_DEBUG(log, "Waiting for {} setup jobs", thread_pool.active());
thread_pool.wait(); thread_pool.wait();
@ -609,8 +615,7 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
size_t num_nodes = executeQueryOnCluster( size_t num_nodes = executeQueryOnCluster(
task_table.cluster_push, task_table.cluster_push,
query_alter_ast_string, query_alter_ast_string,
nullptr, settings_push,
&settings_push,
PoolMode::GET_MANY, PoolMode::GET_MANY,
ClusterExecutionMode::ON_EACH_NODE); ClusterExecutionMode::ON_EACH_NODE);
@ -638,8 +643,7 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
UInt64 num_nodes = executeQueryOnCluster( UInt64 num_nodes = executeQueryOnCluster(
task_table.cluster_push, task_table.cluster_push,
query_deduplicate_ast_string, query_deduplicate_ast_string,
nullptr, task_cluster->settings_push,
&task_cluster->settings_push,
PoolMode::GET_MANY); PoolMode::GET_MANY);
LOG_INFO(log, "Number of shard that executed OPTIMIZE DEDUPLICATE query successfully : {}", toString(num_nodes)); LOG_INFO(log, "Number of shard that executed OPTIMIZE DEDUPLICATE query successfully : {}", toString(num_nodes));
@ -818,8 +822,7 @@ bool ClusterCopier::tryDropPartitionPiece(
/// We have to drop partition_piece on each replica /// We have to drop partition_piece on each replica
size_t num_shards = executeQueryOnCluster( size_t num_shards = executeQueryOnCluster(
cluster_push, query, cluster_push, query,
nullptr, settings_push,
&settings_push,
PoolMode::GET_MANY, PoolMode::GET_MANY,
ClusterExecutionMode::ON_EACH_NODE); ClusterExecutionMode::ON_EACH_NODE);
@ -1356,9 +1359,7 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
String query = queryToString(create_query_push_ast); String query = queryToString(create_query_push_ast);
LOG_DEBUG(log, "Create destination tables. Query: {}", query); LOG_DEBUG(log, "Create destination tables. Query: {}", query);
UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, task_cluster->settings_push, PoolMode::GET_MANY);
create_query_push_ast, &task_cluster->settings_push,
PoolMode::GET_MANY);
LOG_DEBUG(log, "Destination tables {} have been created on {} shards of {}", getQuotedTable(task_table.table_push), shards, task_table.cluster_push->getShardCount()); LOG_DEBUG(log, "Destination tables {} have been created on {} shards of {}", getQuotedTable(task_table.table_push), shards, task_table.cluster_push->getShardCount());
} }
@ -1479,9 +1480,7 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
String query = queryToString(create_query_push_ast); String query = queryToString(create_query_push_ast);
LOG_DEBUG(log, "Create destination tables. Query: {}", query); LOG_DEBUG(log, "Create destination tables. Query: {}", query);
UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, task_cluster->settings_push, PoolMode::GET_MANY);
create_query_push_ast, &task_cluster->settings_push,
PoolMode::GET_MANY);
LOG_DEBUG(log, "Destination tables {} have been created on {} shards of {}", getQuotedTable(task_table.table_push), shards, task_table.cluster_push->getShardCount()); LOG_DEBUG(log, "Destination tables {} have been created on {} shards of {}", getQuotedTable(task_table.table_push), shards, task_table.cluster_push->getShardCount());
} }
catch (...) catch (...)
@ -1548,8 +1547,7 @@ void ClusterCopier::dropHelpingTables(const TaskTable & task_table)
/// We have to drop partition_piece on each replica /// We have to drop partition_piece on each replica
UInt64 num_nodes = executeQueryOnCluster( UInt64 num_nodes = executeQueryOnCluster(
cluster_push, query, cluster_push, query,
nullptr, settings_push,
&settings_push,
PoolMode::GET_MANY, PoolMode::GET_MANY,
ClusterExecutionMode::ON_EACH_NODE); ClusterExecutionMode::ON_EACH_NODE);
@ -1575,8 +1573,7 @@ void ClusterCopier::dropParticularPartitionPieceFromAllHelpingTables(const TaskT
/// We have to drop partition_piece on each replica /// We have to drop partition_piece on each replica
UInt64 num_nodes = executeQueryOnCluster( UInt64 num_nodes = executeQueryOnCluster(
cluster_push, query, cluster_push, query,
nullptr, settings_push,
&settings_push,
PoolMode::GET_MANY, PoolMode::GET_MANY,
ClusterExecutionMode::ON_EACH_NODE); ClusterExecutionMode::ON_EACH_NODE);
@ -1788,25 +1785,16 @@ bool ClusterCopier::checkPresentPartitionPiecesOnCurrentShard(const ConnectionTi
UInt64 ClusterCopier::executeQueryOnCluster( UInt64 ClusterCopier::executeQueryOnCluster(
const ClusterPtr & cluster, const ClusterPtr & cluster,
const String & query, const String & query,
const ASTPtr & query_ast_, const Settings & current_settings,
const Settings * settings,
PoolMode pool_mode, PoolMode pool_mode,
ClusterExecutionMode execution_mode, ClusterExecutionMode execution_mode,
UInt64 max_successful_executions_per_shard) const UInt64 max_successful_executions_per_shard) const
{ {
Settings current_settings = settings ? *settings : task_cluster->settings_common;
auto num_shards = cluster->getShardsInfo().size(); auto num_shards = cluster->getShardsInfo().size();
std::vector<UInt64> per_shard_num_successful_replicas(num_shards, 0); std::vector<UInt64> per_shard_num_successful_replicas(num_shards, 0);
ASTPtr query_ast;
if (query_ast_ == nullptr)
{
ParserQuery p_query(query.data() + query.size()); ParserQuery p_query(query.data() + query.size());
query_ast = parseQuery(p_query, query, current_settings.max_query_size, current_settings.max_parser_depth); ASTPtr query_ast = parseQuery(p_query, query, current_settings.max_query_size, current_settings.max_parser_depth);
}
else
query_ast = query_ast_;
/// We will have to execute query on each replica of a shard. /// We will have to execute query on each replica of a shard.
if (execution_mode == ClusterExecutionMode::ON_EACH_NODE) if (execution_mode == ClusterExecutionMode::ON_EACH_NODE)
@ -1815,8 +1803,10 @@ UInt64 ClusterCopier::executeQueryOnCluster(
std::atomic<size_t> origin_replicas_number; std::atomic<size_t> origin_replicas_number;
/// We need to execute query on one replica at least /// We need to execute query on one replica at least
auto do_for_shard = [&] (UInt64 shard_index) auto do_for_shard = [&] (UInt64 shard_index, Settings shard_settings)
{ {
setThreadName("QueryForShard");
const Cluster::ShardInfo & shard = cluster->getShardsInfo().at(shard_index); const Cluster::ShardInfo & shard = cluster->getShardsInfo().at(shard_index);
UInt64 & num_successful_executions = per_shard_num_successful_replicas.at(shard_index); UInt64 & num_successful_executions = per_shard_num_successful_replicas.at(shard_index);
num_successful_executions = 0; num_successful_executions = 0;
@ -1846,10 +1836,10 @@ UInt64 ClusterCopier::executeQueryOnCluster(
/// Will try to make as many as possible queries /// Will try to make as many as possible queries
if (shard.hasRemoteConnections()) if (shard.hasRemoteConnections())
{ {
current_settings.max_parallel_replicas = num_remote_replicas ? num_remote_replicas : 1; shard_settings.max_parallel_replicas = num_remote_replicas ? num_remote_replicas : 1;
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings).getSaturated(current_settings.max_execution_time); auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(shard_settings).getSaturated(shard_settings.max_execution_time);
auto connections = shard.pool->getMany(timeouts, &current_settings, pool_mode); auto connections = shard.pool->getMany(timeouts, &shard_settings, pool_mode);
for (auto & connection : connections) for (auto & connection : connections)
{ {
@ -1859,7 +1849,7 @@ UInt64 ClusterCopier::executeQueryOnCluster(
try try
{ {
/// CREATE TABLE and DROP PARTITION queries return empty block /// CREATE TABLE and DROP PARTITION queries return empty block
RemoteBlockInputStream stream{*connection, query, Block{}, context, &current_settings}; RemoteBlockInputStream stream{*connection, query, Block{}, context, &shard_settings};
NullBlockOutputStream output{Block{}}; NullBlockOutputStream output{Block{}};
copyData(stream, output); copyData(stream, output);
@ -1878,7 +1868,7 @@ UInt64 ClusterCopier::executeQueryOnCluster(
ThreadPool thread_pool(std::min<UInt64>(num_shards, getNumberOfPhysicalCPUCores())); ThreadPool thread_pool(std::min<UInt64>(num_shards, getNumberOfPhysicalCPUCores()));
for (UInt64 shard_index = 0; shard_index < num_shards; ++shard_index) for (UInt64 shard_index = 0; shard_index < num_shards; ++shard_index)
thread_pool.scheduleOrThrowOnError([=] { do_for_shard(shard_index); }); thread_pool.scheduleOrThrowOnError([=, shard_settings = current_settings] { do_for_shard(shard_index, std::move(shard_settings)); });
thread_pool.wait(); thread_pool.wait();
} }
@ -1898,7 +1888,7 @@ UInt64 ClusterCopier::executeQueryOnCluster(
LOG_INFO(log, "There was an error while executing ALTER on each node. Query was executed on {} nodes. But had to be executed on {}", toString(successful_nodes), toString(origin_replicas_number.load())); LOG_INFO(log, "There was an error while executing ALTER on each node. Query was executed on {} nodes. But had to be executed on {}", toString(successful_nodes), toString(origin_replicas_number.load()));
} }
return successful_nodes; return successful_nodes;
} }
} }

View File

@ -15,7 +15,6 @@ namespace DB
class ClusterCopier class ClusterCopier
{ {
public: public:
ClusterCopier(const String & task_path_, ClusterCopier(const String & task_path_,
const String & host_id_, const String & host_id_,
const String & proxy_database_name_, const String & proxy_database_name_,
@ -187,8 +186,7 @@ protected:
UInt64 executeQueryOnCluster( UInt64 executeQueryOnCluster(
const ClusterPtr & cluster, const ClusterPtr & cluster,
const String & query, const String & query,
const ASTPtr & query_ast_ = nullptr, const Settings & current_settings,
const Settings * settings = nullptr,
PoolMode pool_mode = PoolMode::GET_ALL, PoolMode pool_mode = PoolMode::GET_ALL,
ClusterExecutionMode execution_mode = ClusterExecutionMode::ON_EACH_SHARD, ClusterExecutionMode execution_mode = ClusterExecutionMode::ON_EACH_SHARD,
UInt64 max_successful_executions_per_shard = 0) const; UInt64 max_successful_executions_per_shard = 0) const;