From d752b1c14358793490dd0b6fb7a3c41837ae866b Mon Sep 17 00:00:00 2001 From: Guillaume Tassery Date: Fri, 30 Aug 2019 06:24:05 +0200 Subject: [PATCH] Fix synchronisation bug --- .../Storages/StorageReplicatedMergeTree.cpp | 45 ++++++++++++++----- 1 file changed, 35 insertions(+), 10 deletions(-) diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 8cbeb26f2c7..ccb0353ef3d 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -4978,7 +4978,7 @@ void StorageReplicatedMergeTree::movePartitionTo(const StoragePtr & dest_table, { auto dest_table_storage = std::dynamic_pointer_cast(dest_table); if (!dest_table_storage) - throw Exception("Table " + this->getTableName() + " supports attachPartitionFrom only for MergeTree family of table engines." + throw Exception("Table " + this->getTableName() + " supports attachPartitionFrom only for ReplicatedMergeTree family of table engines." " Got " + dest_table->getName(), ErrorCodes::NOT_IMPLEMENTED); auto lock1 = lockStructureForShare(false, context.getCurrentQueryId()); @@ -5040,7 +5040,7 @@ void StorageReplicatedMergeTree::movePartitionTo(const StoragePtr & dest_table, ErrorCodes::LOGICAL_ERROR); String hash_hex = src_part->checksums.getTotalChecksumHex(); - String block_id_path = "" ; + String block_id_path = ""; //(dest_table_storage->zookeeper_path + "/blocks/" + partition_id + "_replace_from_" + hash_hex); auto lock = dest_table_storage->allocateBlockNumber(partition_id, zookeeper, block_id_path); if (!lock) @@ -5060,15 +5060,30 @@ void StorageReplicatedMergeTree::movePartitionTo(const StoragePtr & dest_table, part_checksums.emplace_back(hash_hex); } + ReplicatedMergeTreeLogEntryData entry_delete; + { + entry_delete.type = LogEntry::DROP_RANGE; + entry_delete.source_replica = replica_name; + entry_delete.new_part_name = drop_range_fake_part_name; + entry_delete.detach = false; + entry_delete.create_time = time(nullptr); + } + ReplicatedMergeTreeLogEntryData entry; { + MergeTreePartInfo drop_range_dest; + drop_range_dest.partition_id = drop_range.partition_id; + drop_range_dest.max_block = drop_range.max_block; + drop_range_dest.min_block = drop_range.max_block; + drop_range_dest.level = drop_range.level; + entry.type = ReplicatedMergeTreeLogEntryData::REPLACE_RANGE; - entry.source_replica = replica_name; + entry.source_replica = dest_table_storage->replica_name; entry.create_time = time(nullptr); entry.replace_range_entry = std::make_shared(); auto & entry_replace = *entry.replace_range_entry; - entry_replace.drop_range_part_name = drop_range_fake_part_name; + entry_replace.drop_range_part_name = getPartNamePossiblyFake(format_version, drop_range_dest); entry_replace.from_database = src_data.database_name; entry_replace.from_table = src_data.table_name; for (const auto & part : src_parts) @@ -5083,8 +5098,7 @@ void StorageReplicatedMergeTree::movePartitionTo(const StoragePtr & dest_table, /// We are almost ready to commit changes, remove fetches and merges from drop range queue.removePartProducingOpsInRange(zookeeper, drop_range, entry); - /// Remove deduplication block_ids of replacing parts - dest_table_storage->clearBlocksInPartition(*zookeeper, drop_range.partition_id, drop_range.max_block, drop_range.max_block); + clearBlocksInPartition(*zookeeper, drop_range.partition_id, drop_range.max_block, drop_range.max_block); DataPartsVector parts_to_remove; Coordination::Responses op_results; @@ -5105,20 +5119,25 @@ void StorageReplicatedMergeTree::movePartitionTo(const StoragePtr & dest_table, } } - ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential)); + ops.emplace_back(zkutil::makeCreateRequest(dest_table_storage->zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential)); + Transaction transaction(*dest_table_storage); { - Transaction transaction(*dest_table_storage); auto data_parts_lock = lockParts(); for (MutableDataPartPtr & part : dst_parts) dest_table_storage->renameTempPartAndReplace(part, nullptr, &transaction, data_parts_lock); + } + + op_results = zookeeper->multi(ops); + + { + auto data_parts_lock = lockParts(); transaction.commit(&data_parts_lock); parts_to_remove = removePartsInRangeFromWorkingSet(drop_range, true, false, data_parts_lock); } - op_results = zookeeper->multi(ops); PartLog::addNewParts(global_context, dst_parts, watch.elapsed()); } catch (...) @@ -5142,9 +5161,15 @@ void StorageReplicatedMergeTree::movePartitionTo(const StoragePtr & dest_table, /// If necessary, wait until the operation is performed on all replicas. if (context.getSettingsRef().replication_alter_partitions_sync > 1) - waitForAllReplicasToProcessLogEntry(entry); + dest_table_storage->waitForAllReplicasToProcessLogEntry(entry); + + Coordination::Requests ops_dest; + ops_dest.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/log/log-", entry_delete.toString(), zkutil::CreateMode::PersistentSequential)); + + zookeeper->multi(ops_dest); } + void StorageReplicatedMergeTree::getCommitPartOps( Coordination::Requests & ops, MutableDataPartPtr & part,