diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 023e67ec3de..62059507b77 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -1754,16 +1754,27 @@ MergeTreeData::DataPartsVector MergeTreeData::getActivePartsToReplace( } -void MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction) +bool MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction) { - auto removed = renameTempPartAndReplace(part, increment, out_transaction); - if (!removed.empty()) - throw Exception("Added part " + part->name + " covers " + toString(removed.size()) - + " existing part(s) (including " + removed[0]->name + ")", ErrorCodes::LOGICAL_ERROR); + if (out_transaction && &out_transaction->data != this) + throw Exception("MergeTreeData::Transaction for one table cannot be used with another. It is a bug.", + ErrorCodes::LOGICAL_ERROR); + + DataPartsVector covered_parts; + { + auto lock = lockParts(); + if (!renameTempPartAndReplace(part, increment, out_transaction, lock, &covered_parts)) + return false; + } + if (!covered_parts.empty()) + throw Exception("Added part " + part->name + " covers " + toString(covered_parts.size()) + + " existing part(s) (including " + covered_parts[0]->name + ")", ErrorCodes::LOGICAL_ERROR); + + return true; } -void MergeTreeData::renameTempPartAndReplace( +bool MergeTreeData::renameTempPartAndReplace( MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction, std::unique_lock & lock, DataPartsVector * out_covered_parts) { @@ -1816,7 +1827,7 @@ void MergeTreeData::renameTempPartAndReplace( if (covering_part) { LOG_WARNING(log, "Tried to add obsolete part {} covered by {}", part_name, covering_part->getNameWithState()); - return; + return false; } /// All checks are passed. Now we can rename the part on disk. @@ -1854,6 +1865,8 @@ void MergeTreeData::renameTempPartAndReplace( for (DataPartPtr & covered_part : covered_parts) out_covered_parts->emplace_back(std::move(covered_part)); } + + return true; } MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace( diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 217e5000cf6..155c2d432c9 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -423,7 +423,8 @@ public: /// If out_transaction != nullptr, adds the part in the PreCommitted state (the part will be added to the /// active set later with out_transaction->commit()). /// Else, commits the part immediately. - void renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr); + /// Returns true if part was added. Returns false if part is covered by bigger part. + bool renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr); /// The same as renameTempPartAndAdd but the block range of the part can contain existing parts. /// Returns all parts covered by the added part (in ascending order). @@ -432,7 +433,7 @@ public: MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr); /// Low-level version of previous one, doesn't lock mutex - void renameTempPartAndReplace( + bool renameTempPartAndReplace( MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction, DataPartsLock & lock, DataPartsVector * out_covered_parts = nullptr); diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp index 1bbc56d940d..03885d90ece 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp @@ -27,6 +27,7 @@ namespace ErrorCodes extern const int INSERT_WAS_DEDUPLICATED; extern const int TIMEOUT_EXCEEDED; extern const int NO_ACTIVE_REPLICAS; + extern const int DUPLICATE_DATA_PART; } @@ -204,165 +205,223 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart( storage.check(part->getColumns()); assertSessionIsNotExpired(zookeeper); - /// Obtain incremental block number and lock it. The lock holds our intention to add the block to the filesystem. - /// We remove the lock just after renaming the part. In case of exception, block number will be marked as abandoned. - /// Also, make deduplication check. If a duplicate is detected, no nodes are created. + String temporary_part_name = part->name; - /// Allocate new block number and check for duplicates - bool deduplicate_block = !block_id.empty(); - String block_id_path = deduplicate_block ? storage.zookeeper_path + "/blocks/" + block_id : ""; - auto block_number_lock = storage.allocateBlockNumber(part->info.partition_id, zookeeper, block_id_path); - - if (!block_number_lock) + while (true) { - LOG_INFO(log, "Block with ID {} already exists; ignoring it.", block_id); - part->is_duplicate = true; - last_block_is_duplicate = true; - ProfileEvents::increment(ProfileEvents::DuplicatedInsertedBlocks); - return; - } + /// Obtain incremental block number and lock it. The lock holds our intention to add the block to the filesystem. + /// We remove the lock just after renaming the part. In case of exception, block number will be marked as abandoned. + /// Also, make deduplication check. If a duplicate is detected, no nodes are created. - Int64 block_number = block_number_lock->getNumber(); + /// Allocate new block number and check for duplicates + bool deduplicate_block = !block_id.empty(); + String block_id_path = deduplicate_block ? storage.zookeeper_path + "/blocks/" + block_id : ""; + auto block_number_lock = storage.allocateBlockNumber(part->info.partition_id, zookeeper, block_id_path); - /// Set part attributes according to part_number. Prepare an entry for log. - - part->info.min_block = block_number; - part->info.max_block = block_number; - part->info.level = 0; - - String part_name = part->getNewName(part->info); - part->name = part_name; - - StorageReplicatedMergeTree::LogEntry log_entry; - log_entry.type = StorageReplicatedMergeTree::LogEntry::GET_PART; - log_entry.create_time = time(nullptr); - log_entry.source_replica = storage.replica_name; - log_entry.new_part_name = part_name; - log_entry.quorum = quorum; - log_entry.block_id = block_id; - - /// Simultaneously add information about the part to all the necessary places in ZooKeeper and remove block_number_lock. - - /// Information about the part. - Coordination::Requests ops; - - storage.getCommitPartOps(ops, part, block_id_path); - - /// Replication log. - ops.emplace_back(zkutil::makeCreateRequest( - storage.zookeeper_path + "/log/log-", - log_entry.toString(), - zkutil::CreateMode::PersistentSequential)); - - /// Deletes the information that the block number is used for writing. - block_number_lock->getUnlockOps(ops); - - /** If you need a quorum - create a node in which the quorum is monitored. - * (If such a node already exists, then someone has managed to make another quorum record at the same time, but for it the quorum has not yet been reached. - * You can not do the next quorum record at this time.) - */ - if (quorum) - { - ReplicatedMergeTreeQuorumEntry quorum_entry; - quorum_entry.part_name = part_name; - quorum_entry.required_number_of_replicas = quorum; - quorum_entry.replicas.insert(storage.replica_name); - - /** At this point, this node will contain information that the current replica received a part. - * When other replicas will receive this part (in the usual way, processing the replication log), - * they will add themselves to the contents of this node. - * When it contains information about `quorum` number of replicas, this node is deleted, - * which indicates that the quorum has been reached. - */ - - ops.emplace_back( - zkutil::makeCreateRequest( - quorum_info.status_path, - quorum_entry.toString(), - zkutil::CreateMode::Persistent)); - - /// Make sure that during the insertion time, the replica was not reinitialized or disabled (when the server is finished). - ops.emplace_back( - zkutil::makeCheckRequest( - storage.replica_path + "/is_active", - quorum_info.is_active_node_version)); - - /// Unfortunately, just checking the above is not enough, because `is_active` node can be deleted and reappear with the same version. - /// But then the `host` value will change. We will check this. - /// It's great that these two nodes change in the same transaction (see MergeTreeRestartingThread). - ops.emplace_back( - zkutil::makeCheckRequest( - storage.replica_path + "/host", - quorum_info.host_node_version)); - } - - MergeTreeData::Transaction transaction(storage); /// If you can not add a part to ZK, we'll remove it back from the working set. - storage.renameTempPartAndAdd(part, nullptr, &transaction); - - Coordination::Responses responses; - Coordination::Error multi_code = zookeeper->tryMultiNoThrow(ops, responses); /// 1 RTT - - if (multi_code == Coordination::Error::ZOK) - { - transaction.commit(); - storage.merge_selecting_task->schedule(); - - /// Lock nodes have been already deleted, do not delete them in destructor - block_number_lock->assumeUnlocked(); - } - else if (multi_code == Coordination::Error::ZCONNECTIONLOSS - || multi_code == Coordination::Error::ZOPERATIONTIMEOUT) - { - /** If the connection is lost, and we do not know if the changes were applied, we can not delete the local part - * if the changes were applied, the inserted block appeared in `/blocks/`, and it can not be inserted again. - */ - transaction.commit(); - storage.enqueuePartForCheck(part->name, MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER); - - /// We do not know whether or not data has been inserted. - throw Exception("Unknown status, client must retry. Reason: " + String(Coordination::errorMessage(multi_code)), - ErrorCodes::UNKNOWN_STATUS_OF_INSERT); - } - else if (Coordination::isUserError(multi_code)) - { - String failed_op_path = zkutil::KeeperMultiException(multi_code, ops, responses).getPathForFirstFailedOp(); - - if (multi_code == Coordination::Error::ZNODEEXISTS && deduplicate_block && failed_op_path == block_id_path) + Int64 block_number; + String existing_part_name; + if (block_number_lock) { - /// Block with the same id have just appeared in table (or other replica), rollback thee insertion. - LOG_INFO(log, "Block with ID {} already exists; ignoring it (removing part {})", block_id, part->name); + block_number = block_number_lock->getNumber(); - part->is_duplicate = true; - transaction.rollback(); - last_block_is_duplicate = true; - ProfileEvents::increment(ProfileEvents::DuplicatedInsertedBlocks); - } - else if (multi_code == Coordination::Error::ZNODEEXISTS && failed_op_path == quorum_info.status_path) - { - transaction.rollback(); + /// Set part attributes according to part_number. Prepare an entry for log. - throw Exception("Another quorum insert has been already started", ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE); + part->info.min_block = block_number; + part->info.max_block = block_number; + part->info.level = 0; + + part->name = part->getNewName(part->info); } else { - /// NOTE: We could be here if the node with the quorum existed, but was quickly removed. - transaction.rollback(); - throw Exception("Unexpected logical error while adding block " + toString(block_number) + " with ID '" + block_id + "': " - + Coordination::errorMessage(multi_code) + ", path " + failed_op_path, - ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR); + /// This block was already written to some replica. Get the part name for it. + /// Note: race condition with DROP PARTITION operation is possible. User will get "No node" exception and it is Ok. + existing_part_name = zookeeper->get(storage.zookeeper_path + "/blocks/" + block_id); + + /// If it exists on our replica, ignore it. + if (storage.getActiveContainingPart(existing_part_name)) + { + LOG_INFO(log, "Block with ID {} already exists locally as part {}; ignoring it.", block_id, existing_part_name); + part->is_duplicate = true; + last_block_is_duplicate = true; + ProfileEvents::increment(ProfileEvents::DuplicatedInsertedBlocks); + return; + } + + LOG_INFO(log, "Block with ID {} already exists on other replicas as part {}; will write it locally with that name.", + block_id, existing_part_name); + + /// If it does not exist, we will write a new part with existing name. + /// Note that it may also appear on filesystem right now in PreCommitted state due to concurrent inserts of the same data. + /// It will be checked when we will try to rename directory. + + part->name = existing_part_name; + part->info = MergeTreePartInfo::fromPartName(existing_part_name, storage.format_version); + + /// Don't do subsequent duplicate check. + block_id_path.clear(); } - } - else if (Coordination::isHardwareError(multi_code)) - { - transaction.rollback(); - throw Exception("Unrecoverable network error while adding block " + toString(block_number) + " with ID '" + block_id + "': " - + Coordination::errorMessage(multi_code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR); - } - else - { - transaction.rollback(); - throw Exception("Unexpected ZooKeeper error while adding block " + toString(block_number) + " with ID '" + block_id + "': " - + Coordination::errorMessage(multi_code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR); + + StorageReplicatedMergeTree::LogEntry log_entry; + log_entry.type = StorageReplicatedMergeTree::LogEntry::GET_PART; + log_entry.create_time = time(nullptr); + log_entry.source_replica = storage.replica_name; + log_entry.new_part_name = part->name; + log_entry.quorum = quorum; + log_entry.block_id = block_id; + + /// Simultaneously add information about the part to all the necessary places in ZooKeeper and remove block_number_lock. + + /// Information about the part. + Coordination::Requests ops; + + storage.getCommitPartOps(ops, part, block_id_path); + + /// Replication log. + ops.emplace_back(zkutil::makeCreateRequest( + storage.zookeeper_path + "/log/log-", + log_entry.toString(), + zkutil::CreateMode::PersistentSequential)); + + /// Deletes the information that the block number is used for writing. + if (block_number_lock) + block_number_lock->getUnlockOps(ops); + + /** If you need a quorum - create a node in which the quorum is monitored. + * (If such a node already exists, then someone has managed to make another quorum record at the same time, + * but for it the quorum has not yet been reached. + * You can not do the next quorum record at this time.) + */ + if (quorum) /// TODO Duplicate blocks. + { + ReplicatedMergeTreeQuorumEntry quorum_entry; + quorum_entry.part_name = part->name; + quorum_entry.required_number_of_replicas = quorum; + quorum_entry.replicas.insert(storage.replica_name); + + /** At this point, this node will contain information that the current replica received a part. + * When other replicas will receive this part (in the usual way, processing the replication log), + * they will add themselves to the contents of this node. + * When it contains information about `quorum` number of replicas, this node is deleted, + * which indicates that the quorum has been reached. + */ + + ops.emplace_back( + zkutil::makeCreateRequest( + quorum_info.status_path, + quorum_entry.toString(), + zkutil::CreateMode::Persistent)); + + /// Make sure that during the insertion time, the replica was not reinitialized or disabled (when the server is finished). + ops.emplace_back( + zkutil::makeCheckRequest( + storage.replica_path + "/is_active", + quorum_info.is_active_node_version)); + + /// Unfortunately, just checking the above is not enough, because `is_active` node can be deleted and reappear with the same version. + /// But then the `host` value will change. We will check this. + /// It's great that these two nodes change in the same transaction (see MergeTreeRestartingThread). + ops.emplace_back( + zkutil::makeCheckRequest( + storage.replica_path + "/host", + quorum_info.host_node_version)); + } + + MergeTreeData::Transaction transaction(storage); /// If you can not add a part to ZK, we'll remove it back from the working set. + bool renamed = false; + try + { + renamed = storage.renameTempPartAndAdd(part, nullptr, &transaction); + } + catch (const Exception & e) + { + if (e.code() != ErrorCodes::DUPLICATE_DATA_PART) + throw; + } + if (!renamed) + { + if (!existing_part_name.empty()) + { + LOG_INFO(log, "Part {} is duplicate and it is already written by concurrent request; ignoring it.", block_id, existing_part_name); + return; + } + else + throw Exception("Part with name {} is already written by concurrent request. It should not happen for non-duplicate data parts because unique names are assigned for them. It's a bug", ErrorCodes::LOGICAL_ERROR); + } + + Coordination::Responses responses; + Coordination::Error multi_code = zookeeper->tryMultiNoThrow(ops, responses); /// 1 RTT + + if (multi_code == Coordination::Error::ZOK) + { + transaction.commit(); + storage.merge_selecting_task->schedule(); + + /// Lock nodes have been already deleted, do not delete them in destructor + if (block_number_lock) + block_number_lock->assumeUnlocked(); + } + else if (multi_code == Coordination::Error::ZCONNECTIONLOSS + || multi_code == Coordination::Error::ZOPERATIONTIMEOUT) + { + /** If the connection is lost, and we do not know if the changes were applied, we can not delete the local part + * if the changes were applied, the inserted block appeared in `/blocks/`, and it can not be inserted again. + */ + transaction.commit(); + storage.enqueuePartForCheck(part->name, MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER); + + /// We do not know whether or not data has been inserted. + throw Exception("Unknown status, client must retry. Reason: " + String(Coordination::errorMessage(multi_code)), + ErrorCodes::UNKNOWN_STATUS_OF_INSERT); + } + else if (Coordination::isUserError(multi_code)) + { + String failed_op_path = zkutil::KeeperMultiException(multi_code, ops, responses).getPathForFirstFailedOp(); + + if (multi_code == Coordination::Error::ZNODEEXISTS && deduplicate_block && failed_op_path == block_id_path) + { + /// Block with the same id have just appeared in table (or other replica), rollback thee insertion. + LOG_INFO(log, "Block with ID {} already exists (it was just appeared). Renaming part {} back to {}. Will retry write.", + block_id, part->name, temporary_part_name); + + transaction.rollback(); + + part->is_duplicate = true; + part->is_temp = true; + part->state = MergeTreeDataPartState::Temporary; + part->renameTo(temporary_part_name); + + continue; + } + else if (multi_code == Coordination::Error::ZNODEEXISTS && failed_op_path == quorum_info.status_path) + { + transaction.rollback(); + + throw Exception("Another quorum insert has been already started", ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE); + } + else + { + /// NOTE: We could be here if the node with the quorum existed, but was quickly removed. + transaction.rollback(); + throw Exception("Unexpected logical error while adding block " + toString(block_number) + " with ID '" + block_id + "': " + + Coordination::errorMessage(multi_code) + ", path " + failed_op_path, + ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR); + } + } + else if (Coordination::isHardwareError(multi_code)) + { + transaction.rollback(); + throw Exception("Unrecoverable network error while adding block " + toString(block_number) + " with ID '" + block_id + "': " + + Coordination::errorMessage(multi_code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR); + } + else + { + transaction.rollback(); + throw Exception("Unexpected ZooKeeper error while adding block " + toString(block_number) + " with ID '" + block_id + "': " + + Coordination::errorMessage(multi_code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR); + } + + break; } if (quorum) @@ -386,7 +445,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart( ReplicatedMergeTreeQuorumEntry quorum_entry(value); /// If the node has time to disappear, and then appear again for the next insert. - if (quorum_entry.part_name != part_name) + if (quorum_entry.part_name != part->name) break; if (!event->tryWait(quorum_timeout_ms)) diff --git a/tests/queries/0_stateless/01319_manual_write_to_replicas.reference b/tests/queries/0_stateless/01319_manual_write_to_replicas.reference new file mode 100644 index 00000000000..0e3a632a4ee --- /dev/null +++ b/tests/queries/0_stateless/01319_manual_write_to_replicas.reference @@ -0,0 +1,6 @@ +Hello, world +--- +Hello, world +Hello, world +Hello, world +Hello, world diff --git a/tests/queries/0_stateless/01319_manual_write_to_replicas.sql b/tests/queries/0_stateless/01319_manual_write_to_replicas.sql new file mode 100644 index 00000000000..5388f0017c0 --- /dev/null +++ b/tests/queries/0_stateless/01319_manual_write_to_replicas.sql @@ -0,0 +1,25 @@ +DROP TABLE IF EXISTS r1; +DROP TABLE IF EXISTS r2; + +CREATE TABLE r1 (x String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/r', 'r1') ORDER BY x; +CREATE TABLE r2 (x String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/r', 'r2') ORDER BY x; + +SYSTEM STOP REPLICATED SENDS; + +INSERT INTO r1 VALUES ('Hello, world'); +SELECT * FROM r1; +SELECT * FROM r2; +INSERT INTO r2 VALUES ('Hello, world'); +SELECT '---'; +SELECT * FROM r1; +SELECT * FROM r2; + +SYSTEM START REPLICATED SENDS; +SYSTEM SYNC REPLICA r1; +SYSTEM SYNC REPLICA r2; + +SELECT * FROM r1; +SELECT * FROM r2; + +DROP TABLE r1; +DROP TABLE r2;