Add check for existing data. [#CLICKHOUSE-3346]

This commit is contained in:
Vitaliy Lyudvichenko 2017-11-14 19:59:45 +03:00
parent e996da050d
commit 1517422f4d

View File

@ -230,7 +230,6 @@ struct TaskTable
String sharding_key_str; String sharding_key_str;
ASTPtr sharding_key_ast; ASTPtr sharding_key_ast;
ASTPtr engine_split_ast; ASTPtr engine_split_ast;
String engine_split_str;
/// Additional WHERE expression to filter input data /// Additional WHERE expression to filter input data
String where_condition_str; String where_condition_str;
@ -444,7 +443,6 @@ TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfigurati
ParserExpressionWithOptionalAlias parser_expression(false); ParserExpressionWithOptionalAlias parser_expression(false);
sharding_key_ast = parseQuery(parser_expression, sharding_key_str); sharding_key_ast = parseQuery(parser_expression, sharding_key_str);
engine_split_ast = createASTStorageDistributed(cluster_push_name, table_push.first, table_push.second, sharding_key_ast); engine_split_ast = createASTStorageDistributed(cluster_push_name, table_push.first, table_push.second, sharding_key_ast);
engine_split_str = queryToString(engine_split_ast);
table_split = DatabaseAndTableName(task_cluster.default_local_database, ".split." + name_in_config); table_split = DatabaseAndTableName(task_cluster.default_local_database, ".split." + name_in_config);
} }
@ -680,7 +678,7 @@ public:
for (auto & task_partition : shards[0]->partitions) for (auto & task_partition : shards[0]->partitions)
{ {
LOG_DEBUG(log, "Process partition " << task_partition.first << " for local shard " << shard_ptr->numberInCluster()); LOG_DEBUG(log, "Processing partition " << task_partition.first << " for local shard " << shard_ptr->numberInCluster());
processPartitionTask(task_partition.second); processPartitionTask(task_partition.second);
} }
} }
@ -694,7 +692,7 @@ public:
do do
{ {
LOG_DEBUG(log, "Process partition " << partition_name << " for the whole cluster" LOG_DEBUG(log, "Processing partition " << partition_name << " for the whole cluster"
<< " (" << shards_with_partition.size() << " shards)"); << " (" << shards_with_partition.size() << " shards)");
size_t num_successful_shards = 0; size_t num_successful_shards = 0;
@ -720,6 +718,9 @@ public:
tryLogCurrentException(log); tryLogCurrentException(log);
is_done = false; is_done = false;
} }
if (!is_done && num_successful_shards == 0)
std::this_thread::sleep_for(default_sleep_time);
} while (!is_done); } while (!is_done);
} }
} }
@ -886,8 +887,11 @@ protected:
/// Remove all status nodes /// Remove all status nodes
zookeeper->tryRemoveRecursive(current_shards_path); zookeeper->tryRemoveRecursive(current_shards_path);
String query = "ALTER TABLE " + getDatabaseDotTable(task_table.table_push) + String query = "ALTER TABLE " + getDatabaseDotTable(task_table.table_push);
" DROP PARTITION ID " + backQuoteIfNeed(task_partition.name); query += " DROP PARTITION " + task_partition.name + "";
/// TODO: use this statement after servers will be updated up to 1.1.54310
// query += " DROP PARTITION ID '" + task_partition.name + "'";
ClusterPtr & cluster_push = task_table.cluster_push; ClusterPtr & cluster_push = task_table.cluster_push;
Settings & settings_push = task_cluster->settings_push; Settings & settings_push = task_cluster->settings_push;
@ -903,7 +907,10 @@ protected:
} }
/// Remove the locking node /// Remove the locking node
cleaner_holder.reset();
zookeeper->remove(is_dirty_flag_path); zookeeper->remove(is_dirty_flag_path);
LOG_INFO(log, "Partition " << task_partition.name << " was dropped on cluster " << task_table.cluster_push_name);
return true; return true;
} }
@ -932,6 +939,30 @@ protected:
String current_task_is_active_path = task_partition.getActiveWorkerPath(); String current_task_is_active_path = task_partition.getActiveWorkerPath();
String current_task_status_path = task_partition.getShardStatusPath(); String current_task_status_path = task_partition.getShardStatusPath();
/// Auxiliary functions:
/// Creates is_dirty node to initialize DROP PARTITION
auto create_is_dirty_node = [&] ()
{
auto code = zookeeper->tryCreate(is_dirty_flag_path, current_task_status_path, zkutil::CreateMode::Persistent);
if (code != ZOK && code != ZNODEEXISTS)
throw zkutil::KeeperException(code, is_dirty_flag_path);
};
/// Returns SELECT query filtering current partition and applying user filter
auto get_select_query = [&] (const DatabaseAndTableName & from_table, const String & fields)
{
String query;
query += "SELECT " + fields + " FROM " + getDatabaseDotTable(from_table);
query += " WHERE (_part LIKE '" + task_partition.name + "%')";
if (!task_table.where_condition_str.empty())
query += " AND (" + task_table.where_condition_str + ")";
ParserQuery p_query(query.data() + query.size());
return parseQuery(p_query, query);
};
/// Load balancing /// Load balancing
auto worker_node_holder = createWorkerNodeAndWaitIfNeed(zookeeper, current_task_status_path); auto worker_node_holder = createWorkerNodeAndWaitIfNeed(zookeeper, current_task_status_path);
@ -986,7 +1017,7 @@ protected:
// Task is abandoned, initialize DROP PARTITION // Task is abandoned, initialize DROP PARTITION
LOG_DEBUG(log, "Task " << current_task_status_path << " has not been successfully finished by " << status.owner); LOG_DEBUG(log, "Task " << current_task_status_path << " has not been successfully finished by " << status.owner);
zookeeper->create(is_dirty_flag_path, current_task_status_path, zkutil::CreateMode::Persistent); create_is_dirty_node();
return false; return false;
} }
@ -1021,13 +1052,48 @@ protected:
auto create_table_pull_ast = rewriteCreateQueryStorage(create_query_pull_ast, table_shard, storage_shard_ast); auto create_table_pull_ast = rewriteCreateQueryStorage(create_query_pull_ast, table_shard, storage_shard_ast);
auto create_table_split_ast = rewriteCreateQueryStorage(create_query_pull_ast, table_split, storage_split_ast); auto create_table_split_ast = rewriteCreateQueryStorage(create_query_pull_ast, table_split, storage_split_ast);
LOG_DEBUG(log, "Create shard reading table. Query: " << queryToString(create_table_pull_ast)); //LOG_DEBUG(log, "Create shard reading table. Query: " << queryToString(create_table_pull_ast));
dropAndCreateLocalTable(create_table_pull_ast); dropAndCreateLocalTable(create_table_pull_ast);
LOG_DEBUG(log, "Create split table. Query: " << queryToString(create_table_split_ast)); //LOG_DEBUG(log, "Create split table. Query: " << queryToString(create_table_split_ast));
dropAndCreateLocalTable(create_table_split_ast); dropAndCreateLocalTable(create_table_split_ast);
} }
/// Check that destination partition is empty if we are first worker
/// NOTE: this check is incorrect if pull and push tables have different partition key!
{
ASTPtr query_select_ast = get_select_query(table_split, "count()");
UInt64 count;
{
Context local_context = context;
// Use pull (i.e. readonly) settings, but fetch data from destination servers
context.getSettingsRef() = task_cluster->settings_pull;
context.getSettingsRef().skip_unavailable_shards = true;
InterpreterSelectQuery interperter(query_select_ast, local_context);
BlockIO io = interperter.execute();
Block block = getBlockWithAllStreamData(io.in);
count = (block) ? block.safeGetByPosition(0).column->getUInt(0) : 0;
}
if (count != 0)
{
zkutil::Stat stat_shards;
zookeeper->get(task_partition.getPartitionShardsPath(), &stat_shards);
if (stat_shards.numChildren == 0)
{
LOG_WARNING(log, "There are no any workers for partition " << task_partition.name
<< ", but destination table contains " << count << " rows"
<< ". Partition will be dropped and refilled.");
create_is_dirty_node();
return false;
}
}
}
/// Try start processing, create node about it /// Try start processing, create node about it
{ {
String start_state = TaskStateWithOwner::getData(TaskState::Started, host_id); String start_state = TaskStateWithOwner::getData(TaskState::Started, host_id);
@ -1059,19 +1125,9 @@ protected:
/// Do the copying /// Do the copying
{ {
ASTPtr query_select_ast; // Select all fields
{ ASTPtr query_select_ast = get_select_query(table_shard, "*");
String query; LOG_DEBUG(log, "Executing SELECT query: " << queryToString(query_select_ast));
query += "SELECT * FROM " + getDatabaseDotTable(table_shard);
query += " WHERE (_part LIKE '" + task_partition.name + "%')";
if (!task_table.where_condition_str.empty())
query += " AND (" + task_table.where_condition_str + ")";
ParserQuery p_query(query.data() + query.size());
query_select_ast = parseQuery(p_query, query);
LOG_DEBUG(log, "Executing SELECT query: " << query);
}
ASTPtr query_insert_ast; ASTPtr query_insert_ast;
{ {