From 1517422f4dd2d3d901718defc9b8f10ed520bef8 Mon Sep 17 00:00:00 2001 From: Vitaliy Lyudvichenko Date: Tue, 14 Nov 2017 19:59:45 +0300 Subject: [PATCH] Add check for existing data. [#CLICKHOUSE-3346] --- dbms/src/Server/ClusterCopier.cpp | 100 +++++++++++++++++++++++------- 1 file changed, 78 insertions(+), 22 deletions(-) diff --git a/dbms/src/Server/ClusterCopier.cpp b/dbms/src/Server/ClusterCopier.cpp index 9505e1e2189..e0c4139584f 100644 --- a/dbms/src/Server/ClusterCopier.cpp +++ b/dbms/src/Server/ClusterCopier.cpp @@ -230,7 +230,6 @@ struct TaskTable String sharding_key_str; ASTPtr sharding_key_ast; ASTPtr engine_split_ast; - String engine_split_str; /// Additional WHERE expression to filter input data String where_condition_str; @@ -444,7 +443,6 @@ TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfigurati ParserExpressionWithOptionalAlias parser_expression(false); sharding_key_ast = parseQuery(parser_expression, sharding_key_str); engine_split_ast = createASTStorageDistributed(cluster_push_name, table_push.first, table_push.second, sharding_key_ast); - engine_split_str = queryToString(engine_split_ast); table_split = DatabaseAndTableName(task_cluster.default_local_database, ".split." + name_in_config); } @@ -680,7 +678,7 @@ public: for (auto & task_partition : shards[0]->partitions) { - LOG_DEBUG(log, "Process partition " << task_partition.first << " for local shard " << shard_ptr->numberInCluster()); + LOG_DEBUG(log, "Processing partition " << task_partition.first << " for local shard " << shard_ptr->numberInCluster()); processPartitionTask(task_partition.second); } } @@ -694,7 +692,7 @@ public: do { - LOG_DEBUG(log, "Process partition " << partition_name << " for the whole cluster" + LOG_DEBUG(log, "Processing partition " << partition_name << " for the whole cluster" << " (" << shards_with_partition.size() << " shards)"); size_t num_successful_shards = 0; @@ -720,6 +718,9 @@ public: tryLogCurrentException(log); is_done = false; } + + if (!is_done && num_successful_shards == 0) + std::this_thread::sleep_for(default_sleep_time); } while (!is_done); } } @@ -886,8 +887,11 @@ protected: /// Remove all status nodes zookeeper->tryRemoveRecursive(current_shards_path); - String query = "ALTER TABLE " + getDatabaseDotTable(task_table.table_push) + - " DROP PARTITION ID " + backQuoteIfNeed(task_partition.name); + String query = "ALTER TABLE " + getDatabaseDotTable(task_table.table_push); + query += " DROP PARTITION " + task_partition.name + ""; + + /// TODO: use this statement after servers will be updated up to 1.1.54310 + // query += " DROP PARTITION ID '" + task_partition.name + "'"; ClusterPtr & cluster_push = task_table.cluster_push; Settings & settings_push = task_cluster->settings_push; @@ -903,7 +907,10 @@ protected: } /// Remove the locking node + cleaner_holder.reset(); zookeeper->remove(is_dirty_flag_path); + + LOG_INFO(log, "Partition " << task_partition.name << " was dropped on cluster " << task_table.cluster_push_name); return true; } @@ -932,6 +939,30 @@ protected: String current_task_is_active_path = task_partition.getActiveWorkerPath(); String current_task_status_path = task_partition.getShardStatusPath(); + /// Auxiliary functions: + + /// Creates is_dirty node to initialize DROP PARTITION + auto create_is_dirty_node = [&] () + { + auto code = zookeeper->tryCreate(is_dirty_flag_path, current_task_status_path, zkutil::CreateMode::Persistent); + if (code != ZOK && code != ZNODEEXISTS) + throw zkutil::KeeperException(code, is_dirty_flag_path); + }; + + /// Returns SELECT query filtering current partition and applying user filter + auto get_select_query = [&] (const DatabaseAndTableName & from_table, const String & fields) + { + String query; + query += "SELECT " + fields + " FROM " + getDatabaseDotTable(from_table); + query += " WHERE (_part LIKE '" + task_partition.name + "%')"; + if (!task_table.where_condition_str.empty()) + query += " AND (" + task_table.where_condition_str + ")"; + + ParserQuery p_query(query.data() + query.size()); + return parseQuery(p_query, query); + }; + + /// Load balancing auto worker_node_holder = createWorkerNodeAndWaitIfNeed(zookeeper, current_task_status_path); @@ -986,7 +1017,7 @@ protected: // Task is abandoned, initialize DROP PARTITION LOG_DEBUG(log, "Task " << current_task_status_path << " has not been successfully finished by " << status.owner); - zookeeper->create(is_dirty_flag_path, current_task_status_path, zkutil::CreateMode::Persistent); + create_is_dirty_node(); return false; } @@ -1021,13 +1052,48 @@ protected: auto create_table_pull_ast = rewriteCreateQueryStorage(create_query_pull_ast, table_shard, storage_shard_ast); auto create_table_split_ast = rewriteCreateQueryStorage(create_query_pull_ast, table_split, storage_split_ast); - LOG_DEBUG(log, "Create shard reading table. Query: " << queryToString(create_table_pull_ast)); + //LOG_DEBUG(log, "Create shard reading table. Query: " << queryToString(create_table_pull_ast)); dropAndCreateLocalTable(create_table_pull_ast); - LOG_DEBUG(log, "Create split table. Query: " << queryToString(create_table_split_ast)); + //LOG_DEBUG(log, "Create split table. Query: " << queryToString(create_table_split_ast)); dropAndCreateLocalTable(create_table_split_ast); } + /// Check that destination partition is empty if we are first worker + /// NOTE: this check is incorrect if pull and push tables have different partition key! + { + ASTPtr query_select_ast = get_select_query(table_split, "count()"); + UInt64 count; + { + Context local_context = context; + // Use pull (i.e. readonly) settings, but fetch data from destination servers + context.getSettingsRef() = task_cluster->settings_pull; + context.getSettingsRef().skip_unavailable_shards = true; + + InterpreterSelectQuery interperter(query_select_ast, local_context); + BlockIO io = interperter.execute(); + + Block block = getBlockWithAllStreamData(io.in); + count = (block) ? block.safeGetByPosition(0).column->getUInt(0) : 0; + } + + if (count != 0) + { + zkutil::Stat stat_shards; + zookeeper->get(task_partition.getPartitionShardsPath(), &stat_shards); + + if (stat_shards.numChildren == 0) + { + LOG_WARNING(log, "There are no any workers for partition " << task_partition.name + << ", but destination table contains " << count << " rows" + << ". Partition will be dropped and refilled."); + + create_is_dirty_node(); + return false; + } + } + } + /// Try start processing, create node about it { String start_state = TaskStateWithOwner::getData(TaskState::Started, host_id); @@ -1059,19 +1125,9 @@ protected: /// Do the copying { - ASTPtr query_select_ast; - { - String query; - query += "SELECT * FROM " + getDatabaseDotTable(table_shard); - query += " WHERE (_part LIKE '" + task_partition.name + "%')"; - if (!task_table.where_condition_str.empty()) - query += " AND (" + task_table.where_condition_str + ")"; - - ParserQuery p_query(query.data() + query.size()); - query_select_ast = parseQuery(p_query, query); - - LOG_DEBUG(log, "Executing SELECT query: " << query); - } + // Select all fields + ASTPtr query_select_ast = get_select_query(table_shard, "*"); + LOG_DEBUG(log, "Executing SELECT query: " << queryToString(query_select_ast)); ASTPtr query_insert_ast; {