From ff8a1c04fb7de63b4cc811495900ff834a11138b Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sat, 30 May 2020 20:53:55 +0300 Subject: [PATCH 1/2] Fix data race in clickhouse-copier --- programs/copier/ClusterCopier.cpp | 42 ++++++++++++++++++------------- programs/copier/ClusterCopier.h | 5 ++-- 2 files changed, 26 insertions(+), 21 deletions(-) diff --git a/programs/copier/ClusterCopier.cpp b/programs/copier/ClusterCopier.cpp index d50b89738aa..6f9deb30c94 100644 --- a/programs/copier/ClusterCopier.cpp +++ b/programs/copier/ClusterCopier.cpp @@ -4,6 +4,8 @@ #include #include +#include + namespace DB { @@ -177,7 +179,11 @@ void ClusterCopier::discoverTablePartitions(const ConnectionTimeouts & timeouts, ThreadPool thread_pool(num_threads ? num_threads : 2 * getNumberOfPhysicalCPUCores()); for (const TaskShardPtr & task_shard : task_table.all_shards) - thread_pool.scheduleOrThrowOnError([this, timeouts, task_shard]() { discoverShardPartitions(timeouts, task_shard); }); + thread_pool.scheduleOrThrowOnError([this, timeouts, task_shard]() + { + setThreadName("DiscoverPartns"); + discoverShardPartitions(timeouts, task_shard); + }); LOG_DEBUG(log, "Waiting for {} setup jobs", thread_pool.active()); thread_pool.wait(); @@ -610,7 +616,7 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t task_table.cluster_push, query_alter_ast_string, nullptr, - &settings_push, + settings_push, PoolMode::GET_MANY, ClusterExecutionMode::ON_EACH_NODE); @@ -639,7 +645,7 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t task_table.cluster_push, query_deduplicate_ast_string, nullptr, - &task_cluster->settings_push, + task_cluster->settings_push, PoolMode::GET_MANY); LOG_INFO(log, "Number of shard that executed OPTIMIZE DEDUPLICATE query successfully : {}", toString(num_nodes)); @@ -819,7 +825,7 @@ bool ClusterCopier::tryDropPartitionPiece( size_t num_shards = executeQueryOnCluster( cluster_push, query, nullptr, - &settings_push, + settings_push, PoolMode::GET_MANY, ClusterExecutionMode::ON_EACH_NODE); @@ -1357,7 +1363,7 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl( LOG_DEBUG(log, "Create destination tables. Query: {}", query); UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, - create_query_push_ast, &task_cluster->settings_push, + create_query_push_ast, task_cluster->settings_push, PoolMode::GET_MANY); LOG_DEBUG(log, "Destination tables {} have been created on {} shards of {}", getQuotedTable(task_table.table_push), shards, task_table.cluster_push->getShardCount()); } @@ -1480,7 +1486,7 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl( LOG_DEBUG(log, "Create destination tables. Query: {}", query); UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, - create_query_push_ast, &task_cluster->settings_push, + create_query_push_ast, task_cluster->settings_push, PoolMode::GET_MANY); LOG_DEBUG(log, "Destination tables {} have been created on {} shards of {}", getQuotedTable(task_table.table_push), shards, task_table.cluster_push->getShardCount()); } @@ -1549,7 +1555,7 @@ void ClusterCopier::dropHelpingTables(const TaskTable & task_table) UInt64 num_nodes = executeQueryOnCluster( cluster_push, query, nullptr, - &settings_push, + settings_push, PoolMode::GET_MANY, ClusterExecutionMode::ON_EACH_NODE); @@ -1576,7 +1582,7 @@ void ClusterCopier::dropParticularPartitionPieceFromAllHelpingTables(const TaskT UInt64 num_nodes = executeQueryOnCluster( cluster_push, query, nullptr, - &settings_push, + settings_push, PoolMode::GET_MANY, ClusterExecutionMode::ON_EACH_NODE); @@ -1789,13 +1795,11 @@ UInt64 ClusterCopier::executeQueryOnCluster( const ClusterPtr & cluster, const String & query, const ASTPtr & query_ast_, - const Settings * settings, + const Settings & current_settings, PoolMode pool_mode, ClusterExecutionMode execution_mode, UInt64 max_successful_executions_per_shard) const { - Settings current_settings = settings ? *settings : task_cluster->settings_common; - auto num_shards = cluster->getShardsInfo().size(); std::vector per_shard_num_successful_replicas(num_shards, 0); @@ -1815,8 +1819,10 @@ UInt64 ClusterCopier::executeQueryOnCluster( std::atomic origin_replicas_number; /// We need to execute query on one replica at least - auto do_for_shard = [&] (UInt64 shard_index) + auto do_for_shard = [&] (UInt64 shard_index, Settings shard_settings) { + setThreadName("QueryForShard"); + const Cluster::ShardInfo & shard = cluster->getShardsInfo().at(shard_index); UInt64 & num_successful_executions = per_shard_num_successful_replicas.at(shard_index); num_successful_executions = 0; @@ -1846,10 +1852,10 @@ UInt64 ClusterCopier::executeQueryOnCluster( /// Will try to make as many as possible queries if (shard.hasRemoteConnections()) { - current_settings.max_parallel_replicas = num_remote_replicas ? num_remote_replicas : 1; + shard_settings.max_parallel_replicas = num_remote_replicas ? num_remote_replicas : 1; - auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings).getSaturated(current_settings.max_execution_time); - auto connections = shard.pool->getMany(timeouts, ¤t_settings, pool_mode); + auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(shard_settings).getSaturated(shard_settings.max_execution_time); + auto connections = shard.pool->getMany(timeouts, &shard_settings, pool_mode); for (auto & connection : connections) { @@ -1859,7 +1865,7 @@ UInt64 ClusterCopier::executeQueryOnCluster( try { /// CREATE TABLE and DROP PARTITION queries return empty block - RemoteBlockInputStream stream{*connection, query, Block{}, context, ¤t_settings}; + RemoteBlockInputStream stream{*connection, query, Block{}, context, &shard_settings}; NullBlockOutputStream output{Block{}}; copyData(stream, output); @@ -1878,7 +1884,7 @@ UInt64 ClusterCopier::executeQueryOnCluster( ThreadPool thread_pool(std::min(num_shards, getNumberOfPhysicalCPUCores())); for (UInt64 shard_index = 0; shard_index < num_shards; ++shard_index) - thread_pool.scheduleOrThrowOnError([=] { do_for_shard(shard_index); }); + thread_pool.scheduleOrThrowOnError([=, shard_settings = current_settings] { do_for_shard(shard_index, std::move(shard_settings)); }); thread_pool.wait(); } @@ -1898,7 +1904,7 @@ UInt64 ClusterCopier::executeQueryOnCluster( LOG_INFO(log, "There was an error while executing ALTER on each node. Query was executed on {} nodes. But had to be executed on {}", toString(successful_nodes), toString(origin_replicas_number.load())); } - return successful_nodes; } + } diff --git a/programs/copier/ClusterCopier.h b/programs/copier/ClusterCopier.h index 3d6400f51d4..d25cc8c501f 100644 --- a/programs/copier/ClusterCopier.h +++ b/programs/copier/ClusterCopier.h @@ -15,7 +15,6 @@ namespace DB class ClusterCopier { public: - ClusterCopier(const String & task_path_, const String & host_id_, const String & proxy_database_name_, @@ -187,8 +186,8 @@ protected: UInt64 executeQueryOnCluster( const ClusterPtr & cluster, const String & query, - const ASTPtr & query_ast_ = nullptr, - const Settings * settings = nullptr, + const ASTPtr & query_ast_, + const Settings & current_settings, PoolMode pool_mode = PoolMode::GET_ALL, ClusterExecutionMode execution_mode = ClusterExecutionMode::ON_EACH_SHARD, UInt64 max_successful_executions_per_shard = 0) const; From 26100b64deb4718be61965ee4f517fb0bed7edb9 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sat, 30 May 2020 23:30:08 +0300 Subject: [PATCH 2/2] Remove function parameter --- programs/copier/ClusterCopier.cpp | 24 ++++-------------------- programs/copier/ClusterCopier.h | 1 - 2 files changed, 4 insertions(+), 21 deletions(-) diff --git a/programs/copier/ClusterCopier.cpp b/programs/copier/ClusterCopier.cpp index 6f9deb30c94..bc32f586a01 100644 --- a/programs/copier/ClusterCopier.cpp +++ b/programs/copier/ClusterCopier.cpp @@ -615,7 +615,6 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t size_t num_nodes = executeQueryOnCluster( task_table.cluster_push, query_alter_ast_string, - nullptr, settings_push, PoolMode::GET_MANY, ClusterExecutionMode::ON_EACH_NODE); @@ -644,7 +643,6 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t UInt64 num_nodes = executeQueryOnCluster( task_table.cluster_push, query_deduplicate_ast_string, - nullptr, task_cluster->settings_push, PoolMode::GET_MANY); @@ -824,7 +822,6 @@ bool ClusterCopier::tryDropPartitionPiece( /// We have to drop partition_piece on each replica size_t num_shards = executeQueryOnCluster( cluster_push, query, - nullptr, settings_push, PoolMode::GET_MANY, ClusterExecutionMode::ON_EACH_NODE); @@ -1362,9 +1359,7 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl( String query = queryToString(create_query_push_ast); LOG_DEBUG(log, "Create destination tables. Query: {}", query); - UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, - create_query_push_ast, task_cluster->settings_push, - PoolMode::GET_MANY); + UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, task_cluster->settings_push, PoolMode::GET_MANY); LOG_DEBUG(log, "Destination tables {} have been created on {} shards of {}", getQuotedTable(task_table.table_push), shards, task_table.cluster_push->getShardCount()); } @@ -1485,9 +1480,7 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl( String query = queryToString(create_query_push_ast); LOG_DEBUG(log, "Create destination tables. Query: {}", query); - UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, - create_query_push_ast, task_cluster->settings_push, - PoolMode::GET_MANY); + UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, task_cluster->settings_push, PoolMode::GET_MANY); LOG_DEBUG(log, "Destination tables {} have been created on {} shards of {}", getQuotedTable(task_table.table_push), shards, task_table.cluster_push->getShardCount()); } catch (...) @@ -1554,7 +1547,6 @@ void ClusterCopier::dropHelpingTables(const TaskTable & task_table) /// We have to drop partition_piece on each replica UInt64 num_nodes = executeQueryOnCluster( cluster_push, query, - nullptr, settings_push, PoolMode::GET_MANY, ClusterExecutionMode::ON_EACH_NODE); @@ -1581,7 +1573,6 @@ void ClusterCopier::dropParticularPartitionPieceFromAllHelpingTables(const TaskT /// We have to drop partition_piece on each replica UInt64 num_nodes = executeQueryOnCluster( cluster_push, query, - nullptr, settings_push, PoolMode::GET_MANY, ClusterExecutionMode::ON_EACH_NODE); @@ -1794,7 +1785,6 @@ bool ClusterCopier::checkPresentPartitionPiecesOnCurrentShard(const ConnectionTi UInt64 ClusterCopier::executeQueryOnCluster( const ClusterPtr & cluster, const String & query, - const ASTPtr & query_ast_, const Settings & current_settings, PoolMode pool_mode, ClusterExecutionMode execution_mode, @@ -1803,14 +1793,8 @@ UInt64 ClusterCopier::executeQueryOnCluster( auto num_shards = cluster->getShardsInfo().size(); std::vector per_shard_num_successful_replicas(num_shards, 0); - ASTPtr query_ast; - if (query_ast_ == nullptr) - { - ParserQuery p_query(query.data() + query.size()); - query_ast = parseQuery(p_query, query, current_settings.max_query_size, current_settings.max_parser_depth); - } - else - query_ast = query_ast_; + ParserQuery p_query(query.data() + query.size()); + ASTPtr query_ast = parseQuery(p_query, query, current_settings.max_query_size, current_settings.max_parser_depth); /// We will have to execute query on each replica of a shard. if (execution_mode == ClusterExecutionMode::ON_EACH_NODE) diff --git a/programs/copier/ClusterCopier.h b/programs/copier/ClusterCopier.h index d25cc8c501f..beaf247dfc8 100644 --- a/programs/copier/ClusterCopier.h +++ b/programs/copier/ClusterCopier.h @@ -186,7 +186,6 @@ protected: UInt64 executeQueryOnCluster( const ClusterPtr & cluster, const String & query, - const ASTPtr & query_ast_, const Settings & current_settings, PoolMode pool_mode = PoolMode::GET_ALL, ClusterExecutionMode execution_mode = ClusterExecutionMode::ON_EACH_SHARD,