most tests ok

This commit is contained in:
Nikita Mikhaylov 2020-03-07 03:05:49 +03:00
parent 848eae2293
commit 12d5900d71
3 changed files with 65 additions and 10 deletions

View File

@ -904,17 +904,43 @@ PartitionTaskStatus ClusterCopier::iterateThroughAllPiecesInPartition(const Conn
{
const size_t total_number_of_pieces = task_partition.task_shard.task_table.number_of_splits;
PartitionTaskStatus res;
PartitionTaskStatus answer = PartitionTaskStatus::Finished;
PartitionTaskStatus res{PartitionTaskStatus::Finished};
bool was_failed_pieces = false;
bool was_active_pieces = false;
bool was_error = false;
for (size_t piece_number = 0; piece_number < total_number_of_pieces; piece_number++)
{
res = processPartitionPieceTaskImpl(timeouts, task_partition, piece_number, is_unprioritized_task);
if (res == PartitionTaskStatus::Error)
answer = res;
for (UInt64 try_num = 0; try_num < max_shard_partition_tries; ++try_num)
{
res = processPartitionPieceTaskImpl(timeouts, task_partition, piece_number, is_unprioritized_task);
/// Exit if success
if (res == PartitionTaskStatus::Finished)
break;
was_error = true;
/// Skip if the task is being processed by someone
if (res == PartitionTaskStatus::Active)
break;
/// Repeat on errors
std::this_thread::sleep_for(default_sleep_time);
}
was_active_pieces = (res == PartitionTaskStatus::Active);
was_failed_pieces = (res == PartitionTaskStatus::Error);
}
return answer;
if (was_failed_pieces)
return PartitionTaskStatus::Error;
if (was_active_pieces)
return PartitionTaskStatus::Active;
return PartitionTaskStatus::Finished;
}
@ -1266,6 +1292,7 @@ PartitionTaskStatus ClusterCopier::processPartitionPieceTaskImpl(
/// Try create original table (if not exists) on each shard
try
{
auto create_query_push_ast = rewriteCreateQueryStorage(task_shard.current_pull_table_create_query,
task_table.table_push, task_table.engine_push_ast);
@ -1279,6 +1306,10 @@ PartitionTaskStatus ClusterCopier::processPartitionPieceTaskImpl(
LOG_DEBUG(log, "Destination tables " << getQuotedTable(task_table.table_push)
<< " have been created on " << shards << " shards of " << task_table.cluster_push->getShardCount());
}
catch (...)
{
tryLogCurrentException(log, "Error while creating original table. Maybe we are not first.");
}
/// Move partition to original destination table.
{
@ -1293,16 +1324,25 @@ PartitionTaskStatus ClusterCopier::processPartitionPieceTaskImpl(
DatabaseAndTableName original_table = task_table.table_push;
DatabaseAndTableName helping_table = DatabaseAndTableName(original_table.first, original_table.second + "_piece_" + toString(current_piece_number));
query_alter_ast_string += " ALTER TABLE " + getQuotedTable(helping_table) +
" MOVE PARTITION " + task_partition.name +
" TO TABLE " + getQuotedTable(original_table);
query_alter_ast_string += " ALTER TABLE " + getQuotedTable(original_table) +
" ATTACH PARTITION " + task_partition.name +
" FROM " + getQuotedTable(helping_table);
LOG_DEBUG(log, "Executing ALTER query: " << query_alter_ast_string);
// query_alter_ast_string += " INSERT INTO " + getQuotedTable(original_table) +
// " SELECT * FROM " + getQuotedTable(helping_table);
//
// query_alter_ast_string += " WHERE (" + queryToString(task_table.engine_push_partition_key_ast) + " = (" + task_partition.name + " AS partition_key))";
//
// LOG_DEBUG(log, "Executing ALTER query: " << query_alter_ast_string);
try
{
UInt64 num_shards = executeQueryOnCluster(task_table.cluster_push, query_alter_ast_string, nullptr, &task_cluster->settings_push, PoolMode::GET_ONE, 1);
///FIXME: We have to be sure that every node in cluster executed this query
UInt64 num_shards = executeQueryOnCluster(task_table.cluster_push, query_alter_ast_string, nullptr, &task_cluster->settings_push, PoolMode::GET_MANY);
assert(num_shards > 0);
LOG_INFO(log, "Number of shard that executed ALTER query successfully : " << toString(num_shards));
}
catch (...)
@ -1436,6 +1476,8 @@ void ClusterCopier::createShardInternalTables(const ConnectionTimeouts & timeout
task_shard.list_of_split_tables_on_shard[piece_number],
storage_piece_split_ast);
std::cout << "anime" << queryToString(create_table_split_piece_ast) << std::endl;
dropAndCreateLocalTable(create_table_split_piece_ast);
}
}

View File

@ -4877,6 +4877,9 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_
ErrorCodes::LOGICAL_ERROR);
String hash_hex = src_part->checksums.getTotalChecksumHex();
LOG_INFO(log, "Trying to attach " << src_part->name << "with hash_hex " << hash_hex);
String block_id_path = replace ? "" : (zookeeper_path + "/blocks/" + partition_id + "_replace_from_" + hash_hex);
auto lock = allocateBlockNumber(partition_id, zookeeper, block_id_path);

View File

@ -92,6 +92,16 @@ class Task1:
def check(self):
assert TSV(self.cluster.instances['s0_0_0'].query("SELECT count() FROM hits_all")) == TSV("1002\n")
for anime in ['s1_0_0', 's1_0_1', 's1_1_0']:
a = self.cluster.instances[anime].query("SELECT count() FROM hits_piece_0")
b = self.cluster.instances[anime].query("SELECT count() FROM hits_piece_1")
c = self.cluster.instances[anime].query("SELECT count() FROM hits")
print(anime, a, b, int(a) + int(b), c)
print(self.cluster.instances[anime].query("select partition, name, database, table, hash_of_all_files, hash_of_uncompressed_files, uncompressed_hash_of_compressed_files from system.parts where table like '%hits%' format TSV"))
assert TSV(self.cluster.instances['s1_0_0'].query("SELECT DISTINCT d % 2 FROM hits")) == TSV("1\n")
assert TSV(self.cluster.instances['s1_1_0'].query("SELECT DISTINCT d % 2 FROM hits")) == TSV("0\n")
assert TSV(self.cluster.instances['s1_0_0'].query("SELECT count() FROM hits_all")) == TSV("1002\n")
assert TSV(self.cluster.instances['s1_0_0'].query("SELECT DISTINCT d % 2 FROM hits")) == TSV("1\n")