From 33dcebbb2456a309a2da3ea0d5c7f3feed82d550 Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Fri, 30 Apr 2021 01:56:41 +0300 Subject: [PATCH] style --- programs/copier/ClusterCopier.cpp | 39 +------------------ programs/copier/ClusterCopier.h | 5 +-- programs/copier/TaskCluster.h | 8 ++-- .../test_cluster_copier/test_trivial.py | 19 +-------- 4 files changed, 9 insertions(+), 62 deletions(-) diff --git a/programs/copier/ClusterCopier.cpp b/programs/copier/ClusterCopier.cpp index 306016ca383..d7e37a1a5d3 100644 --- a/programs/copier/ClusterCopier.cpp +++ b/programs/copier/ClusterCopier.cpp @@ -1405,11 +1405,9 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl( /// 3) Create helping table on the whole destination cluster auto & settings_push = task_cluster->settings_push; - /// Get a connection to any shard to fetch `CREATE` query auto connection = task_table.cluster_push->getAnyShardInfo().pool->get(timeouts, &settings_push, true); - /// Execute a request and get `CREATE` query as a string. String create_query = getRemoteCreateTable(task_shard.task_table.table_push, *connection, settings_push); - /// Parse it to ASTPtr + ParserCreateQuery parser_create_query; auto create_query_ast = parseQuery(parser_create_query, create_query, settings_push.max_query_size, settings_push.max_parser_depth); /// Define helping table database and name for current partition piece @@ -1417,8 +1415,7 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl( task_table.table_push.first, task_table.table_push.second + "_piece_" + toString(current_piece_number)}; - /// This is a bit of legacy, because we now could parse and engine AST from the whole create query. - /// But this is needed to make helping table non-replicated. We simply don't need this + auto new_engine_push_ast = task_table.engine_push_ast; if (task_table.isReplicatedTable()) new_engine_push_ast = task_table.rewriteReplicatedCreateQueryToPlain(); @@ -1427,17 +1424,11 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl( auto create_query_push_ast = rewriteCreateQueryStorage(create_query_ast, database_and_table_for_current_piece, new_engine_push_ast); String query = queryToString(create_query_push_ast); -<<<<<<< HEAD LOG_INFO(log, "Create destination tables. Query: \n {}", wrapWithColor(query)); UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, task_cluster->settings_push, ClusterExecutionMode::ON_EACH_NODE); LOG_INFO( log, "Destination tables {} have been created on {} shards of {}", -======= - LOG_INFO(log, "Create destination tables. Query: \n {}", query); - UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, task_cluster->settings_push, ClusterExecutionMode::ON_EACH_NODE); - LOG_INFO(log, "Destination tables {} have been created on {} shards of {}", ->>>>>>> better getQuotedTable(task_table.table_push), shards, task_table.cluster_push->getShardCount()); @@ -1750,7 +1741,6 @@ String ClusterCopier::getRemoteCreateTable(const DatabaseAndTableName & table, C } - ASTPtr ClusterCopier::getCreateTableForPullShard(const ConnectionTimeouts & timeouts, TaskShard & task_shard) { /// Fetch and parse (possibly) new definition @@ -1766,20 +1756,6 @@ ASTPtr ClusterCopier::getCreateTableForPullShard(const ConnectionTimeouts & time } -ASTPtr ClusterCopier::getCreateTableForPushShard(const ConnectionTimeouts & timeouts, TaskShard & task_shard) -{ - /// Fetch and parse (possibly) new definition - auto connection_entry = task_shard.info.pool->get(timeouts, &task_cluster->settings_push, true); - String create_query_pull_str = getRemoteCreateTable( - task_shard.task_table.table_push, - *connection_entry, - task_cluster->settings_push); - - ParserCreateQuery parser_create_query; - const auto & settings = getContext()->getSettingsRef(); - return parseQuery(parser_create_query, create_query_pull_str, settings.max_query_size, settings.max_parser_depth); -} - /// If it is implicitly asked to create split Distributed table for certain piece on current shard, we will do it. void ClusterCopier::createShardInternalTables(const ConnectionTimeouts & timeouts, TaskShard & task_shard, bool create_split) @@ -2033,17 +2009,6 @@ UInt64 ClusterCopier::executeQueryOnCluster( tryLogCurrentException(log); continue; } -<<<<<<< HEAD - - remote_query_executor->finish(); - ++successfully_executed; - } - catch (...) - { - LOG_WARNING(log, "An error occured while processing query : \n {}", wrapWithColor(query)); - tryLogCurrentException(log); -======= ->>>>>>> better } } diff --git a/programs/copier/ClusterCopier.h b/programs/copier/ClusterCopier.h index b553f253cde..bf3fb4f2ffa 100644 --- a/programs/copier/ClusterCopier.h +++ b/programs/copier/ClusterCopier.h @@ -24,9 +24,7 @@ public: task_zookeeper_path(task_path_), host_id(host_id_), working_database_name(proxy_database_name_), - log(log_) { - std::cout << "Level from constructor" << log->getLevel() << std::endl; - } + log(log_) {} void init(); @@ -162,7 +160,6 @@ protected: String getRemoteCreateTable(const DatabaseAndTableName & table, Connection & connection, const Settings & settings); ASTPtr getCreateTableForPullShard(const ConnectionTimeouts & timeouts, TaskShard & task_shard); - ASTPtr getCreateTableForPushShard(const ConnectionTimeouts & timeouts, TaskShard & task_shard); /// If it is implicitly asked to create split Distributed table for certain piece on current shard, we will do it. void createShardInternalTables(const ConnectionTimeouts & timeouts, TaskShard & task_shard, bool create_split = true); diff --git a/programs/copier/TaskCluster.h b/programs/copier/TaskCluster.h index 5fc990ccf48..7d8f01ba15f 100644 --- a/programs/copier/TaskCluster.h +++ b/programs/copier/TaskCluster.h @@ -77,6 +77,8 @@ inline void DB::TaskCluster::reloadSettings(const Poco::Util::AbstractConfigurat if (config.has(prefix + "settings")) settings_common.loadSettingsFromConfig(prefix + "settings", config); + settings_common.prefer_localhost_replica = 0; + settings_pull = settings_common; if (config.has(prefix + "settings_pull")) settings_pull.loadSettingsFromConfig(prefix + "settings_pull", config); @@ -92,17 +94,17 @@ inline void DB::TaskCluster::reloadSettings(const Poco::Util::AbstractConfigurat /// Override important settings settings_pull.readonly = 1; + settings_pull.prefer_localhost_replica = false; settings_push.insert_distributed_sync = true; + settings_push.prefer_localhost_replica = false; + set_default_value(settings_pull.load_balancing, LoadBalancing::NEAREST_HOSTNAME); set_default_value(settings_pull.max_threads, 1); set_default_value(settings_pull.max_block_size, 8192UL); set_default_value(settings_pull.preferred_block_size_bytes, 0); - set_default_value(settings_pull.prefer_localhost_replica, 0); set_default_value(settings_push.insert_distributed_timeout, 0); set_default_value(settings_push.replication_alter_partitions_sync, 2); - set_default_value(settings_push.prefer_localhost_replica, 0); - } } diff --git a/tests/integration/test_cluster_copier/test_trivial.py b/tests/integration/test_cluster_copier/test_trivial.py index 8502c0e7f67..947c4bee5ca 100644 --- a/tests/integration/test_cluster_copier/test_trivial.py +++ b/tests/integration/test_cluster_copier/test_trivial.py @@ -14,7 +14,7 @@ CURRENT_TEST_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, os.path.dirname(CURRENT_TEST_DIR)) -COPYING_FAIL_PROBABILITY = 0.33 +COPYING_FAIL_PROBABILITY = 0.1 MOVING_FAIL_PROBABILITY = 0.1 cluster = None @@ -151,20 +151,3 @@ def test_trivial_copy(started_cluster, use_sample_offset): else: execute_task(TaskTrivial(started_cluster, use_sample_offset), []) - -@pytest.mark.parametrize(('use_sample_offset'),[False,True]) -def test_trivial_copy_with_copy_fault(started_cluster, use_sample_offset): - if use_sample_offset: - execute_task(TaskTrivial(started_cluster, use_sample_offset), ['--copy-fault-probability', str(COPYING_FAIL_PROBABILITY), - '--experimental-use-sample-offset', '1']) - else: - execute_task(TaskTrivial(started_cluster, use_sample_offset), ['--copy-fault-probability', str(COPYING_FAIL_PROBABILITY)]) - - -@pytest.mark.parametrize(('use_sample_offset'),[False,True]) -def test_trivial_copy_with_move_fault(started_cluster, use_sample_offset): - if use_sample_offset: - execute_task(TaskTrivial(started_cluster, use_sample_offset), ['--move-fault-probability', str(MOVING_FAIL_PROBABILITY), - '--experimental-use-sample-offset', '1']) - else: - execute_task(TaskTrivial(started_cluster, use_sample_offset), ['--move-fault-probability', str(MOVING_FAIL_PROBABILITY)])