Fix synchronisation bug

This commit is contained in:
Guillaume Tassery 2019-08-30 06:24:05 +02:00
parent 62e302d689
commit d752b1c143

View File

@ -4978,7 +4978,7 @@ void StorageReplicatedMergeTree::movePartitionTo(const StoragePtr & dest_table,
{
auto dest_table_storage = std::dynamic_pointer_cast<StorageReplicatedMergeTree>(dest_table);
if (!dest_table_storage)
throw Exception("Table " + this->getTableName() + " supports attachPartitionFrom only for MergeTree family of table engines."
throw Exception("Table " + this->getTableName() + " supports attachPartitionFrom only for ReplicatedMergeTree family of table engines."
" Got " + dest_table->getName(), ErrorCodes::NOT_IMPLEMENTED);
auto lock1 = lockStructureForShare(false, context.getCurrentQueryId());
@ -5040,7 +5040,7 @@ void StorageReplicatedMergeTree::movePartitionTo(const StoragePtr & dest_table,
ErrorCodes::LOGICAL_ERROR);
String hash_hex = src_part->checksums.getTotalChecksumHex();
String block_id_path = "" ;
String block_id_path = ""; //(dest_table_storage->zookeeper_path + "/blocks/" + partition_id + "_replace_from_" + hash_hex);
auto lock = dest_table_storage->allocateBlockNumber(partition_id, zookeeper, block_id_path);
if (!lock)
@ -5060,15 +5060,30 @@ void StorageReplicatedMergeTree::movePartitionTo(const StoragePtr & dest_table,
part_checksums.emplace_back(hash_hex);
}
ReplicatedMergeTreeLogEntryData entry_delete;
{
entry_delete.type = LogEntry::DROP_RANGE;
entry_delete.source_replica = replica_name;
entry_delete.new_part_name = drop_range_fake_part_name;
entry_delete.detach = false;
entry_delete.create_time = time(nullptr);
}
ReplicatedMergeTreeLogEntryData entry;
{
MergeTreePartInfo drop_range_dest;
drop_range_dest.partition_id = drop_range.partition_id;
drop_range_dest.max_block = drop_range.max_block;
drop_range_dest.min_block = drop_range.max_block;
drop_range_dest.level = drop_range.level;
entry.type = ReplicatedMergeTreeLogEntryData::REPLACE_RANGE;
entry.source_replica = replica_name;
entry.source_replica = dest_table_storage->replica_name;
entry.create_time = time(nullptr);
entry.replace_range_entry = std::make_shared<ReplicatedMergeTreeLogEntryData::ReplaceRangeEntry>();
auto & entry_replace = *entry.replace_range_entry;
entry_replace.drop_range_part_name = drop_range_fake_part_name;
entry_replace.drop_range_part_name = getPartNamePossiblyFake(format_version, drop_range_dest);
entry_replace.from_database = src_data.database_name;
entry_replace.from_table = src_data.table_name;
for (const auto & part : src_parts)
@ -5083,8 +5098,7 @@ void StorageReplicatedMergeTree::movePartitionTo(const StoragePtr & dest_table,
/// We are almost ready to commit changes, remove fetches and merges from drop range
queue.removePartProducingOpsInRange(zookeeper, drop_range, entry);
/// Remove deduplication block_ids of replacing parts
dest_table_storage->clearBlocksInPartition(*zookeeper, drop_range.partition_id, drop_range.max_block, drop_range.max_block);
clearBlocksInPartition(*zookeeper, drop_range.partition_id, drop_range.max_block, drop_range.max_block);
DataPartsVector parts_to_remove;
Coordination::Responses op_results;
@ -5105,20 +5119,25 @@ void StorageReplicatedMergeTree::movePartitionTo(const StoragePtr & dest_table,
}
}
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential));
ops.emplace_back(zkutil::makeCreateRequest(dest_table_storage->zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential));
Transaction transaction(*dest_table_storage);
{
Transaction transaction(*dest_table_storage);
auto data_parts_lock = lockParts();
for (MutableDataPartPtr & part : dst_parts)
dest_table_storage->renameTempPartAndReplace(part, nullptr, &transaction, data_parts_lock);
}
op_results = zookeeper->multi(ops);
{
auto data_parts_lock = lockParts();
transaction.commit(&data_parts_lock);
parts_to_remove = removePartsInRangeFromWorkingSet(drop_range, true, false, data_parts_lock);
}
op_results = zookeeper->multi(ops);
PartLog::addNewParts(global_context, dst_parts, watch.elapsed());
}
catch (...)
@ -5142,9 +5161,15 @@ void StorageReplicatedMergeTree::movePartitionTo(const StoragePtr & dest_table,
/// If necessary, wait until the operation is performed on all replicas.
if (context.getSettingsRef().replication_alter_partitions_sync > 1)
waitForAllReplicasToProcessLogEntry(entry);
dest_table_storage->waitForAllReplicasToProcessLogEntry(entry);
Coordination::Requests ops_dest;
ops_dest.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/log/log-", entry_delete.toString(), zkutil::CreateMode::PersistentSequential));
zookeeper->multi(ops_dest);
}
void StorageReplicatedMergeTree::getCommitPartOps(
Coordination::Requests & ops,
MutableDataPartPtr & part,